SSE 200 does not mean "subscribed" — a Realtime join-before-publish race drops first-turn tokens
SSE join-before-publish race (SPEC-568)
Problem
The operator’s World Agent chat reply was lost on the first turn after an API
boot. The workers ran the agent (confirmed by real xAI/Grok 200s) and published
the tokens, but the operator’s SSE stream (GET …/stream) returned 200 and then
delivered nothing — no error. Repeat turns worked.
Investigation
- Isolated the bridge — a standalone publisher + subscriber (the real adapters) against the running local Realtime server delivered every chunk. Transport/auth were fine.
- Independent probe during a real dispatch — a second subscriber on the conversation channel received the tokens. So the workers publish correctly.
- Instrumented the api subscriber — on a cold API the SSE route returns HTTP 200
(StreamingResponse start) before the Realtime channel join completes. The join
(the first one pays the websocket
connect()cost) can finish after the workers publish. Supabase Realtime does not replay to a not-yet-joined channel, so the broadcast is dropped. - A/B confirmed the race — cold first turn loses tokens; once the shared client is warm, joins are instant and every turn delivers.
Root cause
Two compounding facts:
- HTTP 200 on an SSE stream means “response started”, not “subscribed”. The channel join happened lazily inside the StreamingResponse generator, after the 200. The client (cockpit) treats the open stream as “ready” and dispatches — so the publish can beat the join.
- The test double diverged from the production adapter. Production used
SupabaseBroadcastSubscriber(lazy connect + channel join); the unit tests and the “live bridge” IT exercisedRealtimeChannelSubscriber, a plainasync forover a fake bus with no join step. The two adapters differed exactly on the racing line, so no test modelled it.
Solution
Join the channel before the HTTP response begins. ChannelSubscriber.subscribe
became async open(channel), which blocks until the channel reports SUBSCRIBED
(fail-closed TimeoutError → 503), then returns the joined chunk iterator. The route
awaits open before returning StreamingResponse(sse_stream(chunks)). The cockpit’s
existing await-stream-then-dispatch becomes correct with no client change.
# Route (after):channel = realtime_channel_name(conversation_id)chunks = await container.subscriber.open(channel) # blocks until SUBSCRIBED, pre-200return StreamingResponse(sse_stream(chunks), media_type="text/event-stream")The regression guard targets the production subscriber: open() must not return
until SUBSCRIBED, and it must time out when the join never lands.
Prevention
Best practices
- For a subscribe-then-trigger flow over a no-replay broker (Supabase Realtime broadcast, most pub/sub), the subscriber must be joined before the trigger fires. An HTTP 200 / connection-open is not a join.
- Test the production adapter’s ordering contract, not a convenience double that omits the step that races. When a real adapter and a test double diverge, the divergence is where bugs hide.
Warning signs
- A streaming endpoint that returns 200 then “sometimes” delivers nothing, worst on the first request after boot.
- A subscriber whose join is awaited inside the response generator rather than before the response starts.
- Two classes implementing one Protocol where only the non-production one is tested.
References
apps/api/src/spectral_api/operator/agent_task_streaming.py(open+sse_stream)apps/api/src/spectral_api/operator/world_agent_routes.py(stream_chat_turn)- Linear: SPEC-568. Commit
d1fa4c4.