Outbox Pattern
Guaranteeing at-least-once event delivery with the transactional outbox pattern.
In a CQRS/Event Sourcing system, events are persisted inside a database transaction and then published to the event bus. If the node crashes after the transaction commits but before events are published, those events are lost — projections, sagas, and external subscribers never see them.
The transactional outbox pattern solves this by writing events to an outbox table within the same transaction as aggregate persistence. A background relay polls for unpublished entries and dispatches them, guaranteeing at-least-once delivery.
Quick Example
import {
configureDomain,
InMemoryOutboxStore,
} from "@noddde/engine";
const domain = await configureDomain<MyInfrastructure>({
writeModel: { aggregates: { Order } },
readModel: { projections: { OrderSummary } },
infrastructure: {
outbox: {
store: () => new InMemoryOutboxStore(),
relayOptions: { pollIntervalMs: 1000, batchSize: 100 },
},
// ...other providers
},
});
// Start the background relay (polls for unpublished entries)
domain.startOutboxRelay();
// Commands now write to the outbox atomically with persistence
await domain.dispatchCommand(createOrderCommand);
// Stop the relay when shutting down
domain.stopOutboxRelay();How It Works
When the outbox is configured, the domain modifies the command lifecycle:
Command arrives
|
v
Load state --> Execute handler --> Apply events --> Enrich metadata
|
v
UoW enlists:
1. Aggregate persistence (event store or state store)
2. Outbox entries (one per event) <-- same transaction
|
v
uow.commit() --> both written atomically
|
v
Happy path: eventBus.dispatch(events) <-- immediate, low latency
|
v
Best-effort: outboxStore.markPublishedByEventIds() <-- cleanupOn crash between commit and publish: the outbox relay picks up unpublished entries on its next poll and dispatches them. Events are delivered at least once.
Configuration
Add the outbox field to your domain configuration's infrastructure:
infrastructure: {
outbox: {
// Factory for the outbox store (called once during init)
store: () => new InMemoryOutboxStore(),
// Optional relay options
relayOptions: {
pollIntervalMs: 1000, // Default: 1000ms
batchSize: 100, // Default: 100 entries per poll
},
},
}For production, all three ORM adapter packages (@noddde/drizzle, @noddde/prisma, @noddde/typeorm) include a database-backed OutboxStore that participates in the same database transaction as your aggregate persistence:
// Drizzle (outbox schema table required)
import { outbox } from "@noddde/drizzle/pg";
const infra = createDrizzlePersistence(db, { events, aggregateStates, sagaStates, outbox });
// infra.outboxStore is now available
// Prisma / TypeORM (always included)
const infra = createPrismaPersistence(prisma);
// or: createTypeORMPersistence(dataSource)
// infra.outboxStore is always availableRelay Lifecycle
The outbox relay is not started automatically during domain.init(). You control its lifecycle explicitly:
// Start polling
domain.startOutboxRelay();
// Stop polling (e.g., on SIGTERM)
domain.stopOutboxRelay();
// Manual single-batch processing (useful in tests)
const dispatched = await domain.processOutboxOnce();Both startOutboxRelay() and stopOutboxRelay() are idempotent — calling them multiple times is safe.
The OutboxStore Interface
interface OutboxEntry {
id: string; // UUID v7 (time-ordered)
event: Event; // Fully enriched domain event
aggregateName?: string; // From event metadata
aggregateId?: string; // From event metadata
createdAt: string; // ISO 8601
publishedAt: string | null; // null = pending
}
interface OutboxStore {
save(entries: OutboxEntry[]): Promise<void>;
loadUnpublished(batchSize?: number): Promise<OutboxEntry[]>;
markPublished(ids: string[]): Promise<void>;
markPublishedByEventIds(eventIds: string[]): Promise<void>;
deletePublished(olderThan?: Date): Promise<void>;
}saveis called within the UoW transaction (enlisted alongside aggregate persistence)loadUnpublishedis called by the relay to poll for pending entriesmarkPublishedis used by the relay after dispatching each entrymarkPublishedByEventIdsis the happy-path cleanup (correlates viaevent.metadata.eventId)deletePublishedis for periodic cleanup to prevent unbounded growth
Testing with the Outbox
Use processOutboxOnce() instead of starting the relay in tests:
import { InMemoryOutboxStore } from "@noddde/engine";
const outboxStore = new InMemoryOutboxStore();
const domain = await configureDomain<MyInfrastructure>({
// ...
infrastructure: {
outbox: { store: () => outboxStore },
},
});
await domain.dispatchCommand(someCommand);
// Verify entries were written
const entries = outboxStore.findAll();
expect(entries).toHaveLength(1);
expect(entries[0].event.name).toBe("OrderCreated");
// Manually process the outbox
const dispatched = await domain.processOutboxOnce();
expect(dispatched).toBe(1);At-Least-Once Delivery
The outbox guarantees events are delivered at least once, not exactly once. In the happy path, events are dispatched immediately after commit and marked as published. But if the node crashes:
- The relay dispatches unpublished entries on the next poll
- Events that were already dispatched (but not marked) may be dispatched again
Consumers must be idempotent. Projections in noddde are naturally idempotent (pure reducers). Sagas should check event.metadata.eventId to detect duplicates.
Integration with Unit of Work
The outbox integrates seamlessly with all UoW scenarios:
- Implicit UoW (single
dispatchCommand): outbox entries are written atomically with aggregate persistence - Explicit UoW (
withUnitOfWork): all commands' outbox entries are written together in one transaction - Saga UoW: saga state + dispatched commands + outbox entries all share one transaction
Next Steps
- Persistence -- Unit of Work and persistence strategies
- Idempotent Commands -- Preventing duplicate command execution
- ORM Adapters -- Production persistence with database transactions