Skip to content
GitHub
Integration Issues

Inter-context LISTEN/NOTIFY worker-consumer dispatch + race-free at-least-once dedup

Inter-context LISTEN/NOTIFY worker-consumer dispatch + race-free at-least-once dedup

First landed at SPEC-466 (Stream D W4) as the repo’s first full LISTEN/NOTIFY worker-consumer inter-context dispatch. Stream H feedback-loop consumers reuse the same seam.

Problem

A producer context (worlds) writes a typed event to core.outbox inside its business transaction, and a consumer context (platform) must project it — but spectral.platform may not import spectral.worlds.* (ADR-065 D4 / the inter-context-import rule). The substrate (core.events) defined the EventPublisher/EventListener Protocols, but no concrete listener daemon existed, so nothing drained the outbox or routed it to a handler.

Two subtler traps surfaced in review:

  • Dispatch can be faked. An inter-context IT that publishes then calls a drain it drives itself proves nothing about real NOTIFY delivery.
  • Dedup is not race-free by default. A check-then-act (SELECT core.event_handled then plain INSERT) lets two listeners processing two outbox rows that share one idempotency_key both pass the SELECT, both run the handler, and the loser’s INSERT raise UniqueViolation — bouncing the row back to pending (noisy; handler ran twice).

Root Cause

  • The cross-context dependency is a runtime substrate dependency (shared core.outbox table + LISTEN/NOTIFY relay per ADR-044), not a code dependency. It must be wired at the framework layer, not imported.
  • SELECT ... FOR UPDATE SKIP LOCKED guarantees two listeners never claim the same outbox row, but the design admits multiple outbox rows carrying the same idempotency_key. SKIP LOCKED says nothing about idempotency-key collisions across distinct rows. The race-free gate is the primary key, not the pre-check SELECT.

Solution

Concrete listener (spectral.platform.infrastructure.events.outbox_listener.OutboxListener, consumer context’s infrastructure layer):

  • Holds a dedicated connection; LISTEN outbox_gen_<N> — the channel must match what the producer’s pg_notify trigger fires on (the producer stamps channel from SPECTRAL_GENERATION and writes it to the outbox row; the trigger fires pg_notify(new.channel, ...)).
  • Claims one row at a time: SELECT ... FOR UPDATE SKIP LOCKED filtered by status='pending' AND generation=%s AND deleted_at IS NULL, ORDER BY created_at.
  • One transaction per row: claim → run handler → record dedup → mark delivered, all atomic. A handler exception rolls the row back to pending (at-least-once, never lost, never stuck delivered).
  • Startup LISTEN + commit happen before the catch-up drain_pending(), so rows committed between LISTEN-registration and first drain still fire a NOTIFY the loop catches; a poll-interval fallback covers a missed NOTIFY.

Race-free dedup (the key pattern):

# Handle, THEN claim the idempotency key as the authoritative gate:
await handler(envelope)
row = await conn.execute(
"INSERT INTO core.event_handled (handler_name, idempotency_key, ...) "
"VALUES (%s, %s, ...) ON CONFLICT (handler_name, idempotency_key) DO NOTHING "
"RETURNING 1",
(handler_name, idempotency_key, ...),
).fetchone()
# row is None => another worker already owns this key: concede, mark delivered, no raise.
# row is not None => this txn won the key.

The PK (handler_name, idempotency_key) is the race-free dedup point; the pre-check SELECT is only a fast-path optimization. The handler must additionally be idempotent (e.g. an ON CONFLICT DO UPDATE upsert) since under concurrency it can run twice before either commit lands.

Framework-layer wiring (apps/workers/src/spectral_workers/event_consumers.py): build_world_model_card_projection_consumer composes the consumer handler + its repository over the listener connection via the SPEC-421 Container DI seam. The handler stays single-context; the cross-context flow is the substrate, not an import. This builder is the precedent Stream H consumers follow.

Proving real dispatch in the IT: LISTEN on a raw connection before publishing, then block on connection.notifies(timeout=10s, stop_after=1) and fail the test if the timeout elapses — a channel mismatch surfaces as a timeout, not a silent poll-driven pass. Then assert the projected snapshot is byte-equal to the producer payload AND independently assert content over seeded state (don’t seed empty — empty snapshots pass even if the producer silently returns empties).

Prevention

Best Practices

  • Migration that first writes a new content_class value to core.outbox/core.event_handled must extend the CHECK via DROP+ADD and grant the writer role minimal privileges (SELECT, INSERT — defer DELETE to the retention-sweep migration; never grant DELETE on the dedup log to the busy listener role).
  • Migration-SQL GRANT lints must handle multi-table object lists (GRANT ... ON a, b TO r) AND schema-wide forms (GRANT ... ON ALL TABLES IN SCHEMA worlds) or a cross-context write grant slips through silently.

Warning Signs

  • An inter-context IT that never blocks on a real notifies() with a failing timeout.
  • A dedup path whose only protection is a SELECT before an INSERT.
  • RESET ROLE inside an RLS-active listener session (falls back to session user, bypasses RLS — use captured current_user + psycopg.sql.Identifier restore).

References

  • SPEC-466 merge bebecf3; review fixes d9b0a84
  • ADR-044 (event substrate), ADR-064 (notification reads event replica), ADR-065 D4 (import boundary)
  • src/spectral/core/events/protocols.py (EventPublisher/EventListener Protocols)