Inter-context LISTEN/NOTIFY worker-consumer dispatch + race-free at-least-once dedup
Inter-context LISTEN/NOTIFY worker-consumer dispatch + race-free at-least-once dedup
First landed at SPEC-466 (Stream D W4) as the repo’s first full LISTEN/NOTIFY worker-consumer inter-context dispatch. Stream H feedback-loop consumers reuse the same seam.
Problem
A producer context (worlds) writes a typed event to core.outbox inside its business transaction, and a consumer context (platform) must project it — but spectral.platform may not import spectral.worlds.* (ADR-065 D4 / the inter-context-import rule). The substrate (core.events) defined the EventPublisher/EventListener Protocols, but no concrete listener daemon existed, so nothing drained the outbox or routed it to a handler.
Two subtler traps surfaced in review:
- Dispatch can be faked. An inter-context IT that publishes then calls a drain it drives itself proves nothing about real NOTIFY delivery.
- Dedup is not race-free by default. A check-then-act (
SELECT core.event_handledthen plainINSERT) lets two listeners processing two outbox rows that share oneidempotency_keyboth pass the SELECT, both run the handler, and the loser’s INSERT raiseUniqueViolation— bouncing the row back topending(noisy; handler ran twice).
Root Cause
- The cross-context dependency is a runtime substrate dependency (shared
core.outboxtable + LISTEN/NOTIFY relay per ADR-044), not a code dependency. It must be wired at the framework layer, not imported. SELECT ... FOR UPDATE SKIP LOCKEDguarantees two listeners never claim the same outbox row, but the design admits multiple outbox rows carrying the sameidempotency_key. SKIP LOCKED says nothing about idempotency-key collisions across distinct rows. The race-free gate is the primary key, not the pre-check SELECT.
Solution
Concrete listener (spectral.platform.infrastructure.events.outbox_listener.OutboxListener, consumer context’s infrastructure layer):
- Holds a dedicated connection;
LISTEN outbox_gen_<N>— the channel must match what the producer’spg_notifytrigger fires on (the producer stampschannelfromSPECTRAL_GENERATIONand writes it to the outbox row; the trigger firespg_notify(new.channel, ...)). - Claims one row at a time:
SELECT ... FOR UPDATE SKIP LOCKEDfiltered bystatus='pending' AND generation=%s AND deleted_at IS NULL,ORDER BY created_at. - One transaction per row: claim → run handler → record dedup → mark
delivered, all atomic. A handler exception rolls the row back topending(at-least-once, never lost, never stuckdelivered). - Startup
LISTEN+commithappen before the catch-updrain_pending(), so rows committed between LISTEN-registration and first drain still fire a NOTIFY the loop catches; a poll-interval fallback covers a missed NOTIFY.
Race-free dedup (the key pattern):
# Handle, THEN claim the idempotency key as the authoritative gate:await handler(envelope)row = await conn.execute( "INSERT INTO core.event_handled (handler_name, idempotency_key, ...) " "VALUES (%s, %s, ...) ON CONFLICT (handler_name, idempotency_key) DO NOTHING " "RETURNING 1", (handler_name, idempotency_key, ...),).fetchone()# row is None => another worker already owns this key: concede, mark delivered, no raise.# row is not None => this txn won the key.The PK (handler_name, idempotency_key) is the race-free dedup point; the pre-check SELECT is only a fast-path optimization. The handler must additionally be idempotent (e.g. an ON CONFLICT DO UPDATE upsert) since under concurrency it can run twice before either commit lands.
Framework-layer wiring (apps/workers/src/spectral_workers/event_consumers.py): build_world_model_card_projection_consumer composes the consumer handler + its repository over the listener connection via the SPEC-421 Container DI seam. The handler stays single-context; the cross-context flow is the substrate, not an import. This builder is the precedent Stream H consumers follow.
Proving real dispatch in the IT: LISTEN on a raw connection before publishing, then block on connection.notifies(timeout=10s, stop_after=1) and fail the test if the timeout elapses — a channel mismatch surfaces as a timeout, not a silent poll-driven pass. Then assert the projected snapshot is byte-equal to the producer payload AND independently assert content over seeded state (don’t seed empty — empty snapshots pass even if the producer silently returns empties).
Prevention
Best Practices
- Migration that first writes a new
content_classvalue tocore.outbox/core.event_handledmust extend the CHECK via DROP+ADD and grant the writer role minimal privileges (SELECT, INSERT— deferDELETEto the retention-sweep migration; never grant DELETE on the dedup log to the busy listener role). - Migration-SQL GRANT lints must handle multi-table object lists (
GRANT ... ON a, b TO r) AND schema-wide forms (GRANT ... ON ALL TABLES IN SCHEMA worlds) or a cross-context write grant slips through silently.
Warning Signs
- An inter-context IT that never blocks on a real
notifies()with a failing timeout. - A dedup path whose only protection is a
SELECTbefore anINSERT. RESET ROLEinside an RLS-active listener session (falls back to session user, bypasses RLS — use capturedcurrent_user+psycopg.sql.Identifierrestore).
References
- SPEC-466 merge
bebecf3; review fixesd9b0a84 - ADR-044 (event substrate), ADR-064 (notification reads event replica), ADR-065 D4 (import boundary)
src/spectral/core/events/protocols.py(EventPublisher/EventListener Protocols)