Skip to content
GitHub
Contract Surfaces

Event Substrate

Spectral’s event substrate — used both within a single context and between worlds and platform — is Postgres LISTEN/NOTIFY plus a transactional outbox, with deployment-generation stamping for cross-version worker coexistence. Decision lineage in ADR-044; DLQ + retry semantics in ADR-054; operational runbook at docs/runbooks/event-substrate.md.


  • Producer writes a row to core.outbox inside its business transaction.
  • An AFTER INSERT trigger (core.outbox_notify()) fires pg_notify(NEW.channel, NEW.id::text) at commit.
  • A dedicated listener holds a direct-to-Postgres connection (port 5432; not via Supavisor — LISTEN is incompatible with transaction-mode pooling) and pulls rows via SELECT ... FOR UPDATE SKIP LOCKED.
  • Polling fallback at 5 s cadence activates when the LISTEN connection drops past the reconnect-backoff cap (30 s). Polling is a degradation mode of LISTEN, not a second substrate. NOTIFY is a latency optimization; SKIP LOCKED is the correctness primitive.

NOTIFY payload is the outbox row ID only (~36 bytes) — well within Postgres’ 8 KB cap. Full payload lives in the row, pulled by the handler.


Single table in core (collapses earlier per-context drafts). Columns:

  • id uuid pk
  • event_type text
  • event_version int
  • occurred_at timestamptz
  • source text (producing context)
  • target text null (target context; null = broadcast)
  • content_class text
  • channel text default 'outbox_default'
  • generation bigint (publisher stamps from SPECTRAL_GENERATION per service env var)
  • workspace_id uuid null
  • payload jsonb
  • idempotency_key text
  • trace_context text null
  • status text check in ('pending','in_flight','delivered','failed')
  • attempts int default 0
  • last_error text null
  • failure_history jsonb default '[]'
  • first_failed_at timestamptz null
  • deleted_at timestamptz null

DLQ = status='failed' filter on core.outbox. No separate DLQ table.


core.outbox_notify() reads NEW.channel and pg_notifies on it. The trigger function stays frozen forever; publishers compose channel names.

  • outbox_default — column default; used as a safety placeholder
  • outbox_gen_<N> — per-deployment-generation channel; workers LISTEN only on their own generation’s channel

A worker at gen-(N+1) is structurally incapable of receiving gen-N NOTIFY. Future processing-class splits (hot/cold, LLM-bound/CPU-bound) compose as outbox_gen_<N>_<class> — the trigger still reads NEW.channel verbatim.


spectral.core.events.envelope.EventEnvelope (frozen pydantic):

  • event_id: UUID
  • event_type: str
  • event_version: int
  • occurred_at: datetime
  • source: Literal["platform", "worlds", "core"]
  • target: Literal["platform", "worlds", "core"] | None
  • workspace_id: UUID | None
  • payload: dict[str, Any]
  • idempotency_key: str
  • trace_context: str | None

payload is dict[str, Any] at the substrate layer; producer-owned typed payload models live in <producer>.contracts.events.* per ADR-065 D2 (e.g. spectral.platform.contracts.events.failure_cluster_detected). Auto-generated docs for each event live under Events. Consumers parse the dict via their own local <EventName>Event model per ADR-065 D4 (consumer ACL pattern); they do not import the producer’s typed class outside tests/contracts/.


spectral.core.events.protocols:

  • EventPublisher.publish(envelope) -> None
  • EventListener.listen(*, channel, handler_name, handler) -> None
  • EventHandler = Callable[[EventEnvelope], Awaitable[None]]

Per-context implementations live in spectral.<context>.infrastructure.events.


idempotency_key defaults to event_id. Consumers persist handled identities in the single core.event_handled table with UNIQUE(handler_name, idempotency_key). handler_name must be scope-qualified (e.g., worlds.scan_completed_indexer) to guarantee global uniqueness.

The event_handled retention policy is 60d / 7d / HARD_DELETE — strictly exceeds the outbox active+grace floor (52 days) to guarantee dedup correctness. A contract test pins the inequality.


event_version integer on the envelope; bumped when payload shape changes. Producer-owned typed payloads under <producer>.contracts.events.* are the single source of truth for the wire shape (per ADR-065 D2); each module exports Final-typed <EVENT>_TYPE and <EVENT>_VERSION constants alongside the payload class. Consumers ignore unknown fields (forward-compatible). Producers never remove fields within a major version (additive-only). Breaking changes ship as a new event_type (e.g., platform.failure_cluster.detected.v2) with a deprecation deadline; bilateral contract tests in tests/contracts/ catch drift via model_json_schema() snapshots per ADR-066.


The substrate is the notification flow mechanism per Contract Surfaces — producer-owned typed payloads live in <producer>.contracts.events.*; consumers parse via their own local <EventName>Event model (consumer ACL pattern). Call flow uses callee-owned OHS Protocols in <callee>.contracts.protocols.* with bridge tools composed in apps/*, never via SQL grants between contexts (per ADR-063). Full doctrine — payload pattern, ACL, OHS Protocol, bridge tool — in ADR-065.

Catalogs of existing events and Protocols live at Events and Protocols. The catalog pages are narrative and updated by hand alongside the contract module they describe.


Default retry policy in spectral.core.events.retry.DEFAULT_RETRY_POLICY: 5 retries, 1 s base, 2× multiplier, 5 min cap, full jitter. Per-handler override allowed.

A default classifier separates transient (retry) from terminal (immediate → FAILED):

  • Terminal: IntegrityError, ValueError from payload validation, pydantic.ValidationError, explicit TerminalHandlerError
  • Transient: ConnectionError, TimeoutError, OperationalError, default for any other exception

Replay via core.outbox_replay(p_event_id, p_new_generation, p_replayed_by) snapshots the current cycle into failure_history, clears current-cycle fields, updates generation, and resets status='pending'. The idempotency_key is preserved — a replay against an already-processed event dedups via core.event_handled.

Operator surface at alpha: Supabase Studio + SQL. Tool-surface (DLQ inspection + replay) lives on the Operations Agent per agent-tool-invocation D7.


Per-publish structlog entry with the seven canonical fields plus {event_id, event_type, source, target, workspace_id}. Per-handle structlog entry adds {attempts, duration_ms, status_result, handler_name}. OTel span linkage: producer emits a span; envelope carries W3C traceparent in trace_context; consumer continues the span. pg_notification_queue_usage() exported as a gauge; alert at 25% fill.


  • Step 1 (in-Postgres): outbox > 5M rows steady-state → partition by time-range on occurred_at.
  • Step 2 (substrate swap): outbox > 20M rows sustained, OR consumer ack p99 > 500 ms sustained for a week, OR first production incident attributable to LISTEN/NOTIFY queue-fill / lock contention / outbox bloat → swap relay to NATS JetStream or Redis Streams. The substrate envelope (in spectral.core.events) and producer-owned typed payloads (in <producer>.contracts.events.* per ADR-065 D2) are unchanged across the swap — only the relay implementation changes.