noddde

Outbox Pattern

Guaranteeing at-least-once event delivery with the transactional outbox pattern.

In a CQRS/Event Sourcing system, events are persisted inside a database transaction and then published to the event bus. If the node crashes after the transaction commits but before events are published, those events are lost — projections, sagas, and external subscribers never see them.

The transactional outbox pattern solves this by writing events to an outbox table within the same transaction as aggregate persistence. A background relay polls for unpublished entries and dispatches them, guaranteeing at-least-once delivery.

Quick Example

import {
  configureDomain,
  InMemoryOutboxStore,
} from "@noddde/engine";

const domain = await configureDomain<MyInfrastructure>({
  writeModel: { aggregates: { Order } },
  readModel: { projections: { OrderSummary } },
  infrastructure: {
    outbox: {
      store: () => new InMemoryOutboxStore(),
      relayOptions: { pollIntervalMs: 1000, batchSize: 100 },
    },
    // ...other providers
  },
});

// Start the background relay (polls for unpublished entries)
domain.startOutboxRelay();

// Commands now write to the outbox atomically with persistence
await domain.dispatchCommand(createOrderCommand);

// Stop the relay when shutting down
domain.stopOutboxRelay();

How It Works

When the outbox is configured, the domain modifies the command lifecycle:

Command arrives
  |
  v
Load state --> Execute handler --> Apply events --> Enrich metadata
  |
  v
UoW enlists:
  1. Aggregate persistence (event store or state store)
  2. Outbox entries (one per event)            <-- same transaction
  |
  v
uow.commit()  --> both written atomically
  |
  v
Happy path: eventBus.dispatch(events)          <-- immediate, low latency
  |
  v
Best-effort: outboxStore.markPublishedByEventIds()  <-- cleanup

On crash between commit and publish: the outbox relay picks up unpublished entries on its next poll and dispatches them. Events are delivered at least once.

Configuration

Add the outbox field to your domain configuration's infrastructure:

infrastructure: {
  outbox: {
    // Factory for the outbox store (called once during init)
    store: () => new InMemoryOutboxStore(),
    // Optional relay options
    relayOptions: {
      pollIntervalMs: 1000,  // Default: 1000ms
      batchSize: 100,        // Default: 100 entries per poll
    },
  },
}

For production, all three ORM adapter packages (@noddde/drizzle, @noddde/prisma, @noddde/typeorm) include a database-backed OutboxStore that participates in the same database transaction as your aggregate persistence:

// Drizzle (outbox schema table required)
import { outbox } from "@noddde/drizzle/pg";
const infra = createDrizzlePersistence(db, { events, aggregateStates, sagaStates, outbox });
// infra.outboxStore is now available

// Prisma / TypeORM (always included)
const infra = createPrismaPersistence(prisma);
// or: createTypeORMPersistence(dataSource)
// infra.outboxStore is always available

Relay Lifecycle

The outbox relay is not started automatically during domain.init(). You control its lifecycle explicitly:

// Start polling
domain.startOutboxRelay();

// Stop polling (e.g., on SIGTERM)
domain.stopOutboxRelay();

// Manual single-batch processing (useful in tests)
const dispatched = await domain.processOutboxOnce();

Both startOutboxRelay() and stopOutboxRelay() are idempotent — calling them multiple times is safe.

The OutboxStore Interface

interface OutboxEntry {
  id: string;                    // UUID v7 (time-ordered)
  event: Event;                  // Fully enriched domain event
  aggregateName?: string;        // From event metadata
  aggregateId?: string;          // From event metadata
  createdAt: string;             // ISO 8601
  publishedAt: string | null;    // null = pending
}

interface OutboxStore {
  save(entries: OutboxEntry[]): Promise<void>;
  loadUnpublished(batchSize?: number): Promise<OutboxEntry[]>;
  markPublished(ids: string[]): Promise<void>;
  markPublishedByEventIds(eventIds: string[]): Promise<void>;
  deletePublished(olderThan?: Date): Promise<void>;
}
  • save is called within the UoW transaction (enlisted alongside aggregate persistence)
  • loadUnpublished is called by the relay to poll for pending entries
  • markPublished is used by the relay after dispatching each entry
  • markPublishedByEventIds is the happy-path cleanup (correlates via event.metadata.eventId)
  • deletePublished is for periodic cleanup to prevent unbounded growth

Testing with the Outbox

Use processOutboxOnce() instead of starting the relay in tests:

import { InMemoryOutboxStore } from "@noddde/engine";

const outboxStore = new InMemoryOutboxStore();

const domain = await configureDomain<MyInfrastructure>({
  // ...
  infrastructure: {
    outbox: { store: () => outboxStore },
  },
});

await domain.dispatchCommand(someCommand);

// Verify entries were written
const entries = outboxStore.findAll();
expect(entries).toHaveLength(1);
expect(entries[0].event.name).toBe("OrderCreated");

// Manually process the outbox
const dispatched = await domain.processOutboxOnce();
expect(dispatched).toBe(1);

At-Least-Once Delivery

The outbox guarantees events are delivered at least once, not exactly once. In the happy path, events are dispatched immediately after commit and marked as published. But if the node crashes:

  1. The relay dispatches unpublished entries on the next poll
  2. Events that were already dispatched (but not marked) may be dispatched again

Consumers must be idempotent. Projections in noddde are naturally idempotent (pure reducers). Sagas should check event.metadata.eventId to detect duplicates.

Integration with Unit of Work

The outbox integrates seamlessly with all UoW scenarios:

  • Implicit UoW (single dispatchCommand): outbox entries are written atomically with aggregate persistence
  • Explicit UoW (withUnitOfWork): all commands' outbox entries are written together in one transaction
  • Saga UoW: saga state + dispatched commands + outbox entries all share one transaction

Next Steps

On this page