Skip to content
GitHub
Integration Issues

SSE 200 does not mean "subscribed" — a Realtime join-before-publish race drops first-turn tokens

SSE join-before-publish race (SPEC-568)

Problem

The operator’s World Agent chat reply was lost on the first turn after an API boot. The workers ran the agent (confirmed by real xAI/Grok 200s) and published the tokens, but the operator’s SSE stream (GET …/stream) returned 200 and then delivered nothing — no error. Repeat turns worked.

Investigation

  1. Isolated the bridge — a standalone publisher + subscriber (the real adapters) against the running local Realtime server delivered every chunk. Transport/auth were fine.
  2. Independent probe during a real dispatch — a second subscriber on the conversation channel received the tokens. So the workers publish correctly.
  3. Instrumented the api subscriber — on a cold API the SSE route returns HTTP 200 (StreamingResponse start) before the Realtime channel join completes. The join (the first one pays the websocket connect() cost) can finish after the workers publish. Supabase Realtime does not replay to a not-yet-joined channel, so the broadcast is dropped.
  4. A/B confirmed the race — cold first turn loses tokens; once the shared client is warm, joins are instant and every turn delivers.

Root cause

Two compounding facts:

  1. HTTP 200 on an SSE stream means “response started”, not “subscribed”. The channel join happened lazily inside the StreamingResponse generator, after the 200. The client (cockpit) treats the open stream as “ready” and dispatches — so the publish can beat the join.
  2. The test double diverged from the production adapter. Production used SupabaseBroadcastSubscriber (lazy connect + channel join); the unit tests and the “live bridge” IT exercised RealtimeChannelSubscriber, a plain async for over a fake bus with no join step. The two adapters differed exactly on the racing line, so no test modelled it.

Solution

Join the channel before the HTTP response begins. ChannelSubscriber.subscribe became async open(channel), which blocks until the channel reports SUBSCRIBED (fail-closed TimeoutError → 503), then returns the joined chunk iterator. The route awaits open before returning StreamingResponse(sse_stream(chunks)). The cockpit’s existing await-stream-then-dispatch becomes correct with no client change.

# Route (after):
channel = realtime_channel_name(conversation_id)
chunks = await container.subscriber.open(channel) # blocks until SUBSCRIBED, pre-200
return StreamingResponse(sse_stream(chunks), media_type="text/event-stream")

The regression guard targets the production subscriber: open() must not return until SUBSCRIBED, and it must time out when the join never lands.

Prevention

Best practices

  • For a subscribe-then-trigger flow over a no-replay broker (Supabase Realtime broadcast, most pub/sub), the subscriber must be joined before the trigger fires. An HTTP 200 / connection-open is not a join.
  • Test the production adapter’s ordering contract, not a convenience double that omits the step that races. When a real adapter and a test double diverge, the divergence is where bugs hide.

Warning signs

  • A streaming endpoint that returns 200 then “sometimes” delivers nothing, worst on the first request after boot.
  • A subscriber whose join is awaited inside the response generator rather than before the response starts.
  • Two classes implementing one Protocol where only the non-production one is tested.

References

  • apps/api/src/spectral_api/operator/agent_task_streaming.py (open + sse_stream)
  • apps/api/src/spectral_api/operator/world_agent_routes.py (stream_chat_turn)
  • Linear: SPEC-568. Commit d1fa4c4.