Persistence Adapters
Production-ready persistence using Drizzle, Prisma, TypeORM, or your own custom adapter with any supported database.
noddde provides three ORM adapter packages that implement all persistence interfaces and UnitOfWork using your ORM's native transaction mechanism. Pick the ORM you already use -- each adapter works with whatever database your ORM supports (PostgreSQL, MySQL, SQLite, etc.). If you don't use an ORM, you can build your own adapter for any database driver.
Available Adapters
| Package | ORM | Schema |
|---|---|---|
@noddde/drizzle | Drizzle ORM | TypeScript table builders (per dialect) |
@noddde/prisma | Prisma | .prisma schema file |
@noddde/typeorm | TypeORM | TypeScript entity decorators |
Dialect Support Matrix
Persistence (event store, state store, saga store, snapshot store, outbox store) and concurrency control work with every dialect supported by your ORM. The only dialect restriction applies to pessimistic locking, which requires database-level advisory locks:
| Dialect | Persistence | No concurrency / Optimistic | Pessimistic locking |
|---|---|---|---|
| PostgreSQL | ✅ All ORMs | ✅ All ORMs | ✅ All ORMs |
| MySQL | ✅ All ORMs | ✅ All ORMs | ✅ All ORMs |
| MariaDB | ✅ All ORMs | ✅ All ORMs | ✅ All ORMs |
| SQLite | ✅ All ORMs | ✅ All ORMs | ❌ No advisory locks |
| MSSQL | ✅ TypeORM | ✅ TypeORM | ✅ TypeORM only |
For SQLite or any dialect without advisory lock support, use InMemoryAggregateLocker from @noddde/engine for single-process deployments, or choose the optimistic strategy instead.
Each package exports a class-based adapter that implements the PersistenceAdapter interface from @noddde/core. Pass it to wireDomain via the persistenceAdapter property and the engine resolves all persistence concerns automatically:
import { DrizzleAdapter } from "@noddde/drizzle";
import { wireDomain } from "@noddde/engine";
const adapter = new DrizzleAdapter(db);
const domain = await wireDomain(definition, {
persistenceAdapter: adapter,
aggregates: {
Room: { persistence: "event-sourced" },
Inventory: {}, // defaults to state-stored from adapter
},
});The adapter provides all stores (event-sourced, state-stored, saga, snapshot, outbox, UoW, advisory locker). The engine infers what it needs based on the domain definition -- no manual mapping required.
All three adapters also support per-aggregate dedicated state tables via the stateStored() helper method.
The createXxxAdapter factory functions are still supported but deprecated.
Prefer the class-based API (DrizzleAdapter, PrismaAdapter,
TypeORMAdapter) for new code.
Drizzle
Installation
yarn add @noddde/drizzle drizzle-orm
# Plus your database driver, e.g.:
yarn add better-sqlite3 # or: pg, mysql2Schema Setup
The package exports convenience table definitions for each Drizzle dialect. Import from the sub-path matching your database:
// SQLite
import {
events,
aggregateStates,
sagaStates,
snapshots,
outbox,
} from "@noddde/drizzle/sqlite";
// PostgreSQL (uses serial PK, jsonb for payloads)
import {
events,
aggregateStates,
sagaStates,
snapshots,
outbox,
} from "@noddde/drizzle/pg";
// MySQL (uses int auto-increment, varchar(255), json)
import {
events,
aggregateStates,
sagaStates,
snapshots,
outbox,
} from "@noddde/drizzle/mysql";You can also define your own tables matching the expected column structure -- the adapter does not require using the provided schemas.
Configuration
import Database from "better-sqlite3";
import { drizzle } from "drizzle-orm/better-sqlite3";
import { DrizzleAdapter } from "@noddde/drizzle";
import { everyNEvents } from "@noddde/core";
import { defineDomain, wireDomain } from "@noddde/engine";
const db = drizzle(new Database("app.db"));
const adapter = new DrizzleAdapter(db);
const bankingDomain = defineDomain({
writeModel: { aggregates: { BankAccount } },
readModel: { projections: { BankAccount: BankAccountProjection } },
});
const domain = await wireDomain(bankingDomain, {
persistenceAdapter: adapter,
aggregates: {
BankAccount: {
persistence: "event-sourced",
snapshots: { strategy: everyNEvents(100) },
},
},
});The adapter auto-infers the dialect from the Drizzle db instance and selects the correct pre-built schemas (PostgreSQL, MySQL, or SQLite). No schema imports or table config needed.
How Drizzle Transactions Work
The adapter detects the dialect automatically. For SQLite (sync drivers like better-sqlite3), it uses explicit BEGIN/COMMIT/ROLLBACK SQL statements. For PostgreSQL and MySQL, it uses the native db.transaction() callback, which ensures connection affinity in pooled environments.
The persistence classes and UnitOfWork share a transaction store. When a transaction is active, all queries automatically route through it.
Advanced Configuration
For custom table overrides, pass a config object as the second argument:
import { DrizzleAdapter } from "@noddde/drizzle";
import { myEvents, mySagas } from "./schema"; // your own Drizzle tables
const db = drizzle(pool);
const adapter = new DrizzleAdapter(db, {
tables: { eventStore: myEvents, sagaStore: mySagas },
});Custom tables override the auto-resolved ones for that specific store. Tables you don't override use the built-in schemas for the detected dialect.
Per-Aggregate State Tables
By default, all state-stored aggregates share a single noddde_aggregate_states table, discriminated by an aggregate_name column. For production workloads, you may want each aggregate to have its own dedicated table with a domain-specific schema.
Use the stateStored() helper method to bind an aggregate to a dedicated Drizzle table:
const adapter = new DrizzleAdapter(db);
const domain = await wireDomain(definition, {
persistenceAdapter: adapter,
aggregates: {
Payment: { persistence: "event-sourced" },
Order: { persistence: adapter.stateStored(orders) },
BankAccount: { persistence: adapter.stateStored(bankAccounts) },
},
});Column resolution works in two modes:
-
Convention-based (default): The adapter scans the Drizzle table definition for columns named
aggregate_id,state, andversion. If your table follows this naming, no explicit mapping is needed. -
Explicit mapping: For tables with custom column names, provide a
columnsobject:
adapter.stateStored(orders, {
aggregateId: orders.orderId,
state: orders.orderData,
version: orders.rev,
});If convention-based resolution fails (columns not found), the adapter throws with a clear error listing the available columns in the table.
The stateStored() method returns a StateStoredAggregatePersistence that can be passed directly to the persistence field in per-aggregate wiring. It shares the same transaction store as all other stores on the adapter, ensuring UoW atomicity across shared and dedicated tables.
Prisma
Installation
yarn add @noddde/prisma @prisma/client
yarn add -D prismaSchema Setup
Copy the three model definitions from the package's Prisma schema into your own schema.prisma:
model NodddeEvent {
id Int @id @default(autoincrement())
aggregateName String @map("aggregate_name")
aggregateId String @map("aggregate_id")
sequenceNumber Int @map("sequence_number")
eventName String @map("event_name")
payload String
metadata String?
createdAt DateTime @map("created_at")
@@unique([aggregateName, aggregateId, sequenceNumber])
@@map("noddde_events")
}
model NodddeAggregateState {
aggregateName String @map("aggregate_name")
aggregateId String @map("aggregate_id")
state String
version Int @default(0)
@@id([aggregateName, aggregateId])
@@map("noddde_aggregate_states")
}
model NodddeSagaState {
sagaName String @map("saga_name")
sagaId String @map("saga_id")
state String
@@id([sagaName, sagaId])
@@map("noddde_saga_states")
}
model NodddeSnapshot {
aggregateName String @map("aggregate_name")
aggregateId String @map("aggregate_id")
state String
version Int @default(0)
@@id([aggregateName, aggregateId])
@@map("noddde_snapshots")
}
// Only needed if using the outbox pattern
model NodddeOutbox {
id String @id
event String
aggregateName String? @map("aggregate_name")
aggregateId String? @map("aggregate_id")
createdAt DateTime @map("created_at")
publishedAt DateTime? @map("published_at")
@@map("noddde_outbox")
}Then run prisma generate and your preferred migration command.
Configuration
import { PrismaClient } from "@prisma/client";
import { PrismaAdapter } from "@noddde/prisma";
import { everyNEvents } from "@noddde/core";
import { defineDomain, wireDomain } from "@noddde/engine";
const prisma = new PrismaClient();
const adapter = new PrismaAdapter(prisma);
const bankingDomain = defineDomain({
writeModel: { aggregates: { BankAccount } },
readModel: { projections: { BankAccount: BankAccountProjection } },
});
const domain = await wireDomain(bankingDomain, {
persistenceAdapter: adapter,
aggregates: {
BankAccount: {
persistence: "event-sourced",
snapshots: { strategy: everyNEvents(100) },
},
},
});The Prisma adapter uses built-in Prisma models (NodddeEvent, NodddeAggregateState, etc.) for all stores. No configuration needed beyond the PrismaClient instance.
How Prisma Transactions Work
The Prisma adapter uses interactive transactions via prisma.$transaction(async (tx) => { ... }). When a unit of work commits, it sets txStore.current to the transactional client tx, and all persistence classes route their queries through it. Prisma automatically rolls back the transaction if any operation throws.
Advanced Configuration
For per-aggregate state tables, use the stateStored() helper method:
const adapter = new PrismaAdapter(prisma);
const domain = await wireDomain(definition, {
persistenceAdapter: adapter,
aggregates: {
Order: {
persistence: adapter.stateStored("order", {
aggregateId: "orderId",
state: "orderData",
version: "rev",
}),
},
},
});Since Prisma uses generated model delegates (e.g., prisma.order), the first argument to stateStored() must match the camelCase delegate name on your PrismaClient. The adapter validates that the model exists at creation time.
For pessimistic concurrency, pass the dialect option since PrismaClient does not expose the database provider at runtime:
const adapter = new PrismaAdapter(prisma, { dialect: "postgresql" });TypeORM
Installation
yarn add @noddde/typeorm typeorm reflect-metadata
# Plus your database driver, e.g.:
yarn add pgSchema Setup
The package exports TypeORM entity classes decorated with @Entity, @Column, etc. Register them in your DataSource configuration:
import {
NodddeEventEntity,
NodddeAggregateStateEntity,
NodddeSagaStateEntity,
NodddeSnapshotEntity,
NodddeOutboxEntryEntity, // only needed if using the outbox pattern
} from "@noddde/typeorm";
const dataSource = new DataSource({
type: "postgres",
url: process.env.DATABASE_URL,
entities: [
NodddeEventEntity,
NodddeAggregateStateEntity,
NodddeSagaStateEntity,
NodddeSnapshotEntity,
NodddeOutboxEntryEntity,
],
synchronize: true, // use migrations in production
});
await dataSource.initialize();For production, use TypeORM migrations instead of synchronize: true.
Configuration
import { TypeORMAdapter } from "@noddde/typeorm";
import { everyNEvents } from "@noddde/core";
import { defineDomain, wireDomain } from "@noddde/engine";
const adapter = new TypeORMAdapter(dataSource);
const bankingDomain = defineDomain({
writeModel: { aggregates: { BankAccount } },
readModel: { projections: { BankAccount: BankAccountProjection } },
});
const domain = await wireDomain(bankingDomain, {
persistenceAdapter: adapter,
aggregates: {
BankAccount: {
persistence: "event-sourced",
snapshots: { strategy: everyNEvents(100) },
},
},
});The TypeORM adapter uses built-in entity classes for all stores. The database type is auto-detected from dataSource.options.type for advisory locking support.
How TypeORM Transactions Work
The TypeORM adapter uses dataSource.manager.transaction() to wrap operations. When a unit of work commits, it sets txStore.current to the transactional EntityManager, and all persistence classes use it for their repository operations.
Advanced Configuration
For per-aggregate state tables, use the stateStored() helper method with a TypeORM entity class:
import { TypeORMAdapter } from "@noddde/typeorm";
import { OrderEntity } from "./entities"; // your own TypeORM entity
const adapter = new TypeORMAdapter(dataSource);
const domain = await wireDomain(definition, {
persistenceAdapter: adapter,
aggregates: {
Order: {
persistence: adapter.stateStored(OrderEntity, {
aggregateId: "orderId",
state: "orderData",
version: "rev",
}),
},
},
});How Transactions Work
Each adapter section above describes its ORM-specific transaction mechanism. Under the hood, all three follow the same pattern for integrating with the Unit of Work:
- The adapter opens a database transaction
- It sets
txStore.currentto the transaction-scoped database client - All enlisted persistence operations execute within that transaction, because they read
txStore.currentfor their queries - On success, the transaction commits and deferred events are returned for publishing
- On failure, the transaction rolls back and no events are published
This shared transaction store pattern means persistence classes do not need to know whether they are operating inside a unit of work or not -- they always read from txStore.current, which is null outside a transaction and points to the active transaction client inside one.
Concurrency Control
All three adapters support both optimistic and pessimistic concurrency strategies. Here is what each adapter provides at the database level.
Optimistic Concurrency (built-in)
Handled automatically by the persistence implementations via database constraints:
- Events table: A unique constraint on
(aggregate_name, aggregate_id, sequence_number)prevents concurrent appends. Violations throwConcurrencyError. - States table: A
versioncolumn enables optimistic locking. Updates useWHERE version = expectedVersion; zero rows affected throwsConcurrencyError.
Advisory Lockers (for pessimistic concurrency)
Each adapter exports an advisory locker for use with the pessimistic strategy. See the Dialect Support Matrix above for which databases support locking.
| Adapter | Constructor | Dialect Detection |
|---|---|---|
DrizzleAdvisoryLocker | (db, dialect) | Explicit: "pg" | "mysql" | "sqlite" (throws) |
PrismaAdvisoryLocker | (prisma, dialect) | Explicit: "postgresql" | "mysql" | "mariadb" |
TypeORMAdvisoryLocker | (dataSource) | Auto-detects from dataSource.options.type |
Under the hood, each dialect uses the database's native advisory lock mechanism:
| Dialect | Lock mechanism | Lock key format |
|---|---|---|
| PostgreSQL | pg_advisory_lock / pg_try_advisory_lock | 64-bit FNV-1a hash of name:id |
| MySQL/MariaDB | GET_LOCK / RELEASE_LOCK | First 64 chars of name:id (MySQL limit) |
| MSSQL | sp_getapplock / sp_releaseapplock | First 255 chars of name:id (TypeORM only) |
SQLite has no advisory lock mechanism. For single-process SQLite deployments, use InMemoryAggregateLocker from @noddde/engine.
Advisory locks are session-level, spanning beyond the database transaction. This is intentional: the lock covers the entire load→execute→save lifecycle.
Choosing an Adapter
| Factor | Drizzle | Prisma | TypeORM |
|---|---|---|---|
| Schema definition | TypeScript table builders | .prisma schema file | Decorator-based entities |
| Code generation | None | Required (prisma generate) | None |
| Type safety | Full (inferred from schema) | Full (generated client) | Partial (decorator metadata) |
| Bundle size | Lightweight | Heavier (generated client) | Medium |
| Sync driver support | Yes (better-sqlite3) | No (async only) | Yes |
| Migration tooling | Drizzle Kit | Prisma Migrate | TypeORM migrations |
All three provide identical functionality for noddde's purposes. The choice comes down to which ORM your project already uses.
Custom Adapters
You can implement your own adapter for any database driver -- no ORM required. All you need is a class (or plain object) implementing the PersistenceAdapter interface from @noddde/core.
The PersistenceAdapter Interface
import type {
PersistenceAdapter,
EventSourcedAggregatePersistence,
StateStoredAggregatePersistence,
SagaPersistence,
UnitOfWorkFactory,
SnapshotStore,
OutboxStore,
IdempotencyStore,
AggregateLocker,
} from "@noddde/core";
interface PersistenceAdapter {
// --- Required ---
unitOfWorkFactory: UnitOfWorkFactory;
// --- Optional (engine errors at runtime if needed but missing) ---
eventSourcedPersistence?: EventSourcedAggregatePersistence;
stateStoredPersistence?: StateStoredAggregatePersistence;
sagaPersistence?: SagaPersistence;
snapshotStore?: SnapshotStore;
outboxStore?: OutboxStore;
idempotencyStore?: IdempotencyStore;
aggregateLocker?: AggregateLocker;
// --- Lifecycle hooks (optional) ---
init?(): Promise<void>; // called by Domain.init()
close?(): Promise<void>; // called by Domain.shutdown()
}Only unitOfWorkFactory is required. Everything else is optional -- the engine validates at runtime that the adapter provides what the domain needs. For example, if your domain has event-sourced aggregates, the engine will error if eventSourcedPersistence is missing.
Persistence Interfaces
These are the interfaces your adapter needs to implement depending on which features your domain uses:
UnitOfWorkFactory (required)
type UnitOfWorkFactory = () => UnitOfWork;
interface UnitOfWork {
/** Buffer a write operation for deferred execution. */
enlist(operation: () => Promise<void>): void;
/** Schedule events for publishing after commit. */
deferPublish(...events: Event[]): void;
/** Execute all operations atomically, return deferred events. */
commit(): Promise<Event[]>;
/** Discard all operations and events. */
rollback(): Promise<void>;
}The UnitOfWork is the core of transactional guarantees. Each command dispatch gets a fresh instance. Your commit() must wrap all enlisted operations in a database transaction and return the deferred events for post-commit publishing.
EventSourcedAggregatePersistence
interface EventSourcedAggregatePersistence {
save(
aggregateName: string,
aggregateId: string,
events: Event[],
expectedVersion: number,
): Promise<void>;
load(aggregateName: string, aggregateId: string): Promise<Event[]>;
}Appends events to an aggregate stream. save() must enforce optimistic concurrency: if expectedVersion doesn't match the current stream length, throw ConcurrencyError. load() returns events ordered by sequence number, or [] for new aggregates.
StateStoredAggregatePersistence
interface StateStoredAggregatePersistence {
save(
aggregateName: string,
aggregateId: string,
state: any,
expectedVersion: number,
): Promise<void>;
load(
aggregateName: string,
aggregateId: string,
): Promise<{ state: any; version: number } | null>;
}Stores the latest aggregate state as a JSON snapshot. save() must enforce optimistic concurrency (compare expectedVersion against stored version). load() returns null for new aggregates.
SagaPersistence
interface SagaPersistence {
save(sagaName: string, sagaId: string, state: any): Promise<void>;
load(sagaName: string, sagaId: string): Promise<any | undefined | null>;
}Simple key-value persistence for saga workflow state. No concurrency control required.
SnapshotStore (optional)
interface SnapshotStore {
load(aggregateName: string, aggregateId: string): Promise<Snapshot | null>;
save(
aggregateName: string,
aggregateId: string,
snapshot: Snapshot,
): Promise<void>;
}
// Snapshot = { state: any; version: number }Caches aggregate state at a point in time so event replay can start from the snapshot version instead of the beginning.
OutboxStore (optional)
interface OutboxStore {
save(entries: OutboxEntry[]): Promise<void>;
loadUnpublished(batchSize?: number): Promise<OutboxEntry[]>;
markPublished(ids: string[]): Promise<void>;
markPublishedByEventIds(eventIds: string[]): Promise<void>;
deletePublished(olderThan?: Date): Promise<void>;
}Transactional outbox for guaranteed at-least-once event delivery. save() is called inside the UoW transaction. See Outbox Pattern for details.
AggregateLocker (optional)
interface AggregateLocker {
acquire(
aggregateName: string,
aggregateId: string,
timeoutMs?: number,
): Promise<void>;
release(aggregateName: string, aggregateId: string): Promise<void>;
}Pessimistic locking for serializing command execution against the same aggregate. acquire() blocks until the lock is available or throws LockTimeoutError. release() must be idempotent.
Implementation Guide: MongoDB Native Driver
This walkthrough builds a custom adapter using the MongoDB native driver (mongodb). The same principles apply to any database -- PostgreSQL with pg, DynamoDB, or anything else.
MongoDB multi-document transactions (available since MongoDB 4.0) require a
replica set. For local development, use mongod --replSet rs0 or a Docker
image pre-configured as a single-node replica set.
Step 1: Create the UnitOfWork
The UoW is the only required piece. It wraps all enlisted operations in a database transaction so that aggregate persistence, outbox writes, and any other enlisted operations commit or fail atomically.
In MongoDB, this maps to a ClientSession with an active transaction:
import { MongoClient, type ClientSession } from "mongodb";
import type { UnitOfWork, UnitOfWorkFactory, Event } from "@noddde/core";
/**
* Shared mutable ref so persistence classes see the active transaction session.
* This is the bridge between the UoW and persistence classes.
*/
interface SessionStore {
current: ClientSession | null;
}
class MongoUnitOfWork implements UnitOfWork {
private operations: (() => Promise<void>)[] = [];
private events: Event[] = [];
private sealed = false;
constructor(
private readonly client: MongoClient,
private readonly sessionStore: SessionStore,
) {}
enlist(op: () => Promise<void>) {
if (this.sealed) throw new Error("UnitOfWork already completed");
this.operations.push(op);
}
deferPublish(...events: Event[]) {
if (this.sealed) throw new Error("UnitOfWork already completed");
this.events.push(...events);
}
async commit(): Promise<Event[]> {
if (this.sealed) throw new Error("UnitOfWork already completed");
this.sealed = true;
const session = this.client.startSession();
try {
session.startTransaction();
this.sessionStore.current = session;
for (const op of this.operations) {
await op();
}
await session.commitTransaction();
return this.events;
} catch (err) {
await session.abortTransaction();
throw err;
} finally {
this.sessionStore.current = null;
await session.endSession();
}
}
async rollback(): Promise<void> {
if (this.sealed) throw new Error("UnitOfWork already completed");
this.sealed = true;
this.operations = [];
this.events = [];
}
}The key pattern: a session store holds a mutable reference to the active MongoDB session. All persistence classes read from it so their operations participate in the same transaction.
Step 2: Implement Persistence Classes
Each persistence class reads sessionStore.current to pass the active session to MongoDB operations. When no UoW is active (e.g., read-only load() calls), it falls back to a sessionless call.
import type { Db, Collection } from "mongodb";
import { ConcurrencyError } from "@noddde/core";
import type { EventSourcedAggregatePersistence, Event } from "@noddde/core";
class MongoEventSourcedPersistence implements EventSourcedAggregatePersistence {
private readonly collection: Collection;
constructor(
db: Db,
private readonly sessionStore: SessionStore,
) {
this.collection = db.collection("noddde_events");
}
/** Returns session options if inside a UoW, or empty object otherwise. */
private get sessionOpts() {
return this.sessionStore.current
? { session: this.sessionStore.current }
: {};
}
async save(
aggregateName: string,
aggregateId: string,
events: Event[],
expectedVersion: number,
) {
if (events.length === 0) return;
const docs = events.map((event, i) => ({
aggregateName,
aggregateId,
sequenceNumber: expectedVersion + i + 1,
eventName: event.name,
payload: event.payload,
metadata: event.metadata ?? null,
createdAt: event.metadata?.timestamp
? new Date(event.metadata.timestamp)
: new Date(),
}));
try {
await this.collection.insertMany(docs, this.sessionOpts);
} catch (err: any) {
if (err.code === 11000) {
// duplicate key
throw new ConcurrencyError(
aggregateName,
aggregateId,
expectedVersion,
-1,
);
}
throw err;
}
}
async load(aggregateName: string, aggregateId: string): Promise<Event[]> {
const docs = await this.collection
.find({ aggregateName, aggregateId }, this.sessionOpts)
.sort({ sequenceNumber: 1 })
.toArray();
return docs.map((doc) => ({
name: doc.eventName,
payload: doc.payload,
...(doc.metadata ? { metadata: doc.metadata } : {}),
}));
}
}MongoDB stores documents natively as BSON -- no need to JSON.stringify()
payloads. This is a key difference from SQL adapters where payloads are stored
as JSON text.
For optimistic concurrency in StateStoredAggregatePersistence, use a findOneAndUpdate with a version filter:
import type { StateStoredAggregatePersistence } from "@noddde/core";
class MongoStateStoredPersistence implements StateStoredAggregatePersistence {
private readonly collection: Collection;
constructor(
db: Db,
private readonly sessionStore: SessionStore,
) {
this.collection = db.collection("noddde_aggregate_states");
}
private get sessionOpts() {
return this.sessionStore.current
? { session: this.sessionStore.current }
: {};
}
async save(
aggregateName: string,
aggregateId: string,
state: any,
expectedVersion: number,
) {
if (expectedVersion === 0) {
// New aggregate: insert with version 1
try {
await this.collection.insertOne(
{ aggregateName, aggregateId, state, version: 1 },
this.sessionOpts,
);
} catch (err: any) {
if (err.code === 11000) {
throw new ConcurrencyError(aggregateName, aggregateId, 0, -1);
}
throw err;
}
} else {
// Existing aggregate: update only if version matches
const result = await this.collection.findOneAndUpdate(
{ aggregateName, aggregateId, version: expectedVersion },
{ $set: { state, version: expectedVersion + 1 } },
this.sessionOpts,
);
if (!result) {
throw new ConcurrencyError(
aggregateName,
aggregateId,
expectedVersion,
-1,
);
}
}
}
async load(aggregateName: string, aggregateId: string) {
const doc = await this.collection.findOne(
{ aggregateName, aggregateId },
this.sessionOpts,
);
return doc ? { state: doc.state, version: doc.version } : null;
}
}Step 3: Assemble the Adapter
All persistence classes share a single sessionStore so they participate in the same transaction:
import type { PersistenceAdapter } from "@noddde/core";
import { MongoClient, type Db } from "mongodb";
class MongoAdapter implements PersistenceAdapter {
readonly unitOfWorkFactory;
readonly eventSourcedPersistence;
readonly stateStoredPersistence;
readonly sagaPersistence;
// Add optional stores as needed:
// readonly snapshotStore;
// readonly outboxStore;
private readonly sessionStore: SessionStore = { current: null };
constructor(
private readonly client: MongoClient,
db: Db,
) {
this.unitOfWorkFactory = () =>
new MongoUnitOfWork(client, this.sessionStore);
this.eventSourcedPersistence = new MongoEventSourcedPersistence(
db,
this.sessionStore,
);
this.stateStoredPersistence = new MongoStateStoredPersistence(
db,
this.sessionStore,
);
this.sagaPersistence = new MongoSagaPersistence(db, this.sessionStore);
}
/** Create required indexes on first use. */
async init() {
const db = this.client.db();
await db
.collection("noddde_events")
.createIndex(
{ aggregateName: 1, aggregateId: 1, sequenceNumber: 1 },
{ unique: true },
);
await db
.collection("noddde_aggregate_states")
.createIndex({ aggregateName: 1, aggregateId: 1 }, { unique: true });
}
async close() {
await this.client.close();
}
}Step 4: Wire It Up
import { wireDomain, defineDomain } from "@noddde/engine";
import { MongoClient } from "mongodb";
const client = new MongoClient(process.env.MONGODB_URI!);
await client.connect();
const db = client.db("my-app");
const adapter = new MongoAdapter(client, db);
const domain = await wireDomain(
defineDomain({
writeModel: { aggregates: { Order } },
readModel: { projections: {} },
}),
{ persistenceAdapter: adapter },
);Key Implementation Rules
-
Session/transaction store pattern -- All persistence classes must share a single mutable reference to the active transaction context (a
ClientSessionfor MongoDB, aPoolClientfor PostgreSQL, etc.). The UoW sets it before executing operations and clears it in afinallyblock. -
Optimistic concurrency --
EventSourcedAggregatePersistence.save()andStateStoredAggregatePersistence.save()must throwConcurrencyError(from@noddde/core) when the expected version doesn't match. Use unique indexes (MongoDBE11000/ PostgreSQL23505) or version-conditional updates. -
UoW is single-use -- After
commit()orrollback(), all further calls must throw"UnitOfWork already completed". -
Event ordering --
load()must return events sorted by sequence number ascending. -
Idempotent release --
AggregateLocker.release()must not throw if the lock is already released. -
Only implement what you need -- If your domain has no sagas, skip
sagaPersistence. If you don't need the outbox pattern, skipoutboxStore. The engine validates at runtime and gives clear error messages if something is missing.
Database Schema Reference
All adapters use the same logical schema. You are responsible for creating these tables using your ORM's migration tooling (Drizzle Kit, Prisma Migrate, TypeORM migrations) or raw SQL.
Tables Overview
| Table | Purpose | Key | Required |
|---|---|---|---|
noddde_events | Event streams | Auto-increment id | If using event-sourced aggregates |
noddde_aggregate_states | State snapshots | Composite (aggregate_name, aggregate_id) | If using state-stored aggregates |
noddde_saga_states | Saga state | Composite (saga_name, saga_id) | If using sagas |
noddde_snapshots | Event-sourced snapshots | Composite (aggregate_name, aggregate_id) | Optional (performance optimization) |
noddde_outbox | Transactional outbox | id (string) | If using the outbox pattern |
The noddde_aggregate_states table is a shared table for all state-stored
aggregates. If you need dedicated per-aggregate tables (e.g., for direct SQL
queries), see Per-Aggregate Dedicated State
Tables below.
States and event payloads are serialized as JSON strings (or native JSON types where supported), making the schema database-agnostic.
Dialect Type Mapping
| Column type | PostgreSQL | MySQL | SQLite | MSSQL |
|---|---|---|---|---|
| Auto-increment | SERIAL | INT AUTO_INCREMENT | INTEGER | INT IDENTITY |
| String | TEXT | VARCHAR(255) | TEXT | NVARCHAR(255) |
| Integer | INTEGER | INT | INTEGER | INT |
| JSON payload | JSONB | JSON | TEXT | NVARCHAR(MAX) |
| JSON metadata | JSONB (nullable) | JSON (nullable) | TEXT (nullable) | NVARCHAR(MAX) (nullable) |
| Timestamp | TIMESTAMPTZ | TIMESTAMP(3) | TEXT (ISO 8601) | DATETIME2 |
| Nullable timestamp | TIMESTAMPTZ (nullable) | TIMESTAMP(3) (nullable) | TEXT (nullable) | DATETIME2 (nullable) |
PostgreSQL
CREATE TABLE IF NOT EXISTS noddde_events (
id SERIAL PRIMARY KEY,
aggregate_name TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
event_name TEXT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB, -- event metadata envelope (nullable for backcompat)
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() -- populated from event.metadata.timestamp
);
CREATE UNIQUE INDEX IF NOT EXISTS noddde_events_stream_version_idx
ON noddde_events (aggregate_name, aggregate_id, sequence_number);
CREATE TABLE IF NOT EXISTS noddde_aggregate_states (
aggregate_name TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
state JSONB NOT NULL,
version INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (aggregate_name, aggregate_id)
);
CREATE TABLE IF NOT EXISTS noddde_saga_states (
saga_name TEXT NOT NULL,
saga_id TEXT NOT NULL,
state JSONB NOT NULL,
PRIMARY KEY (saga_name, saga_id)
);
CREATE TABLE IF NOT EXISTS noddde_snapshots (
aggregate_name TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
state JSONB NOT NULL,
version INTEGER NOT NULL,
PRIMARY KEY (aggregate_name, aggregate_id)
);
CREATE TABLE IF NOT EXISTS noddde_outbox (
id TEXT PRIMARY KEY,
event JSONB NOT NULL,
aggregate_name TEXT,
aggregate_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);MySQL
CREATE TABLE IF NOT EXISTS noddde_events (
id INT AUTO_INCREMENT PRIMARY KEY,
aggregate_name VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
sequence_number INT NOT NULL,
event_name VARCHAR(255) NOT NULL,
payload JSON NOT NULL,
metadata JSON,
created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
UNIQUE INDEX noddde_events_stream_version_idx (aggregate_name, aggregate_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS noddde_aggregate_states (
aggregate_name VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
state TEXT NOT NULL,
version INT NOT NULL DEFAULT 0,
PRIMARY KEY (aggregate_name, aggregate_id)
);
CREATE TABLE IF NOT EXISTS noddde_saga_states (
saga_name VARCHAR(255) NOT NULL,
saga_id VARCHAR(255) NOT NULL,
state TEXT NOT NULL,
PRIMARY KEY (saga_name, saga_id)
);
CREATE TABLE IF NOT EXISTS noddde_snapshots (
aggregate_name VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
state TEXT NOT NULL,
version INT NOT NULL,
PRIMARY KEY (aggregate_name, aggregate_id)
);
CREATE TABLE IF NOT EXISTS noddde_outbox (
id VARCHAR(255) PRIMARY KEY,
event JSON NOT NULL,
aggregate_name VARCHAR(255),
aggregate_id VARCHAR(255),
created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
published_at TIMESTAMP(3)
);SQLite
CREATE TABLE IF NOT EXISTS noddde_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
aggregate_name TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
event_name TEXT NOT NULL,
payload TEXT NOT NULL, -- JSON stored as text
metadata TEXT, -- JSON stored as text
created_at TEXT NOT NULL -- ISO 8601 string (SQLite has no native timestamp)
);
CREATE UNIQUE INDEX IF NOT EXISTS noddde_events_stream_version_idx
ON noddde_events (aggregate_name, aggregate_id, sequence_number);
CREATE TABLE IF NOT EXISTS noddde_aggregate_states (
aggregate_name TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
state TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (aggregate_name, aggregate_id)
);
CREATE TABLE IF NOT EXISTS noddde_saga_states (
saga_name TEXT NOT NULL,
saga_id TEXT NOT NULL,
state TEXT NOT NULL,
PRIMARY KEY (saga_name, saga_id)
);
CREATE TABLE IF NOT EXISTS noddde_snapshots (
aggregate_name TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
state TEXT NOT NULL,
version INTEGER NOT NULL,
PRIMARY KEY (aggregate_name, aggregate_id)
);
CREATE TABLE IF NOT EXISTS noddde_outbox (
id TEXT PRIMARY KEY,
event TEXT NOT NULL,
aggregate_name TEXT,
aggregate_id TEXT,
created_at TEXT NOT NULL,
published_at TEXT
);MSSQL
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_events' AND xtype='U')
CREATE TABLE noddde_events (
id INT IDENTITY(1,1) PRIMARY KEY,
aggregate_name NVARCHAR(255) NOT NULL,
aggregate_id NVARCHAR(255) NOT NULL,
sequence_number INT NOT NULL,
event_name NVARCHAR(255) NOT NULL,
payload NVARCHAR(MAX) NOT NULL,
metadata NVARCHAR(MAX),
created_at DATETIME2 NOT NULL DEFAULT GETUTCDATE()
);
CREATE UNIQUE INDEX noddde_events_stream_version_idx
ON noddde_events (aggregate_name, aggregate_id, sequence_number);
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_aggregate_states' AND xtype='U')
CREATE TABLE noddde_aggregate_states (
aggregate_name NVARCHAR(255) NOT NULL,
aggregate_id NVARCHAR(255) NOT NULL,
state NVARCHAR(MAX) NOT NULL,
version INT NOT NULL DEFAULT 0,
PRIMARY KEY (aggregate_name, aggregate_id)
);
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_saga_states' AND xtype='U')
CREATE TABLE noddde_saga_states (
saga_name NVARCHAR(255) NOT NULL,
saga_id NVARCHAR(255) NOT NULL,
state NVARCHAR(MAX) NOT NULL,
PRIMARY KEY (saga_name, saga_id)
);
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_snapshots' AND xtype='U')
CREATE TABLE noddde_snapshots (
aggregate_name NVARCHAR(255) NOT NULL,
aggregate_id NVARCHAR(255) NOT NULL,
state NVARCHAR(MAX) NOT NULL,
version INT NOT NULL,
PRIMARY KEY (aggregate_name, aggregate_id)
);
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_outbox' AND xtype='U')
CREATE TABLE noddde_outbox (
id NVARCHAR(255) PRIMARY KEY,
event NVARCHAR(MAX) NOT NULL,
aggregate_name NVARCHAR(255),
aggregate_id NVARCHAR(255),
created_at DATETIME2 NOT NULL DEFAULT GETUTCDATE(),
published_at DATETIME2
);Per-Aggregate Dedicated State Tables
By default, all state-stored aggregates share the noddde_aggregate_states table. For aggregates where you need direct SQL queries on domain fields, create a dedicated table and use the adapter's stateStored() helper:
-- Example: dedicated table for an Order aggregate
CREATE TABLE IF NOT EXISTS orders (
aggregate_id TEXT PRIMARY KEY,
state JSONB NOT NULL, -- or TEXT for MySQL/SQLite
version INTEGER NOT NULL DEFAULT 0
);The dedicated table must have at least aggregate_id, state, and version columns. Column names can be customized via the stateStored() options. Each adapter accepts its own table reference format:
// Drizzle: pass the Drizzle table object
adapter.stateStored(ordersTable);
// Prisma: pass the camelCase model delegate name
adapter.stateStored("order", {
aggregateId: "orderId",
state: "data",
version: "rev",
});
// TypeORM: pass the entity class
adapter.stateStored(OrderEntity, {
aggregateId: "orderId",
state: "data",
version: "rev",
});See the Drizzle, Prisma, and TypeORM "Advanced Configuration" sections above for full per-adapter examples.
Next Steps
- Unit of Work -- Atomic persistence and deferred event publishing
- Persistence -- Choosing between event-sourced and state-stored strategies
- Infrastructure -- The full infrastructure provider system