noddde

Persistence Adapters

Production-ready persistence using Drizzle, Prisma, TypeORM, or your own custom adapter with any supported database.

noddde provides three ORM adapter packages that implement all persistence interfaces and UnitOfWork using your ORM's native transaction mechanism. Pick the ORM you already use -- each adapter works with whatever database your ORM supports (PostgreSQL, MySQL, SQLite, etc.). If you don't use an ORM, you can build your own adapter for any database driver.

Available Adapters

PackageORMSchema
@noddde/drizzleDrizzle ORMTypeScript table builders (per dialect)
@noddde/prismaPrisma.prisma schema file
@noddde/typeormTypeORMTypeScript entity decorators

Dialect Support Matrix

Persistence (event store, state store, saga store, snapshot store, outbox store) and concurrency control work with every dialect supported by your ORM. The only dialect restriction applies to pessimistic locking, which requires database-level advisory locks:

DialectPersistenceNo concurrency / OptimisticPessimistic locking
PostgreSQL✅ All ORMs✅ All ORMs✅ All ORMs
MySQL✅ All ORMs✅ All ORMs✅ All ORMs
MariaDB✅ All ORMs✅ All ORMs✅ All ORMs
SQLite✅ All ORMs✅ All ORMs❌ No advisory locks
MSSQL✅ TypeORM✅ TypeORM✅ TypeORM only

For SQLite or any dialect without advisory lock support, use InMemoryAggregateLocker from @noddde/engine for single-process deployments, or choose the optimistic strategy instead.

Each package exports a class-based adapter that implements the PersistenceAdapter interface from @noddde/core. Pass it to wireDomain via the persistenceAdapter property and the engine resolves all persistence concerns automatically:

import { DrizzleAdapter } from "@noddde/drizzle";
import { wireDomain } from "@noddde/engine";

const adapter = new DrizzleAdapter(db);

const domain = await wireDomain(definition, {
  persistenceAdapter: adapter,
  aggregates: {
    Room: { persistence: "event-sourced" },
    Inventory: {}, // defaults to state-stored from adapter
  },
});

The adapter provides all stores (event-sourced, state-stored, saga, snapshot, outbox, UoW, advisory locker). The engine infers what it needs based on the domain definition -- no manual mapping required.

All three adapters also support per-aggregate dedicated state tables via the stateStored() helper method.

The createXxxAdapter factory functions are still supported but deprecated. Prefer the class-based API (DrizzleAdapter, PrismaAdapter, TypeORMAdapter) for new code.

Drizzle

Installation

yarn add @noddde/drizzle drizzle-orm
# Plus your database driver, e.g.:
yarn add better-sqlite3  # or: pg, mysql2

Schema Setup

The package exports convenience table definitions for each Drizzle dialect. Import from the sub-path matching your database:

// SQLite
import {
  events,
  aggregateStates,
  sagaStates,
  snapshots,
  outbox,
} from "@noddde/drizzle/sqlite";

// PostgreSQL (uses serial PK, jsonb for payloads)
import {
  events,
  aggregateStates,
  sagaStates,
  snapshots,
  outbox,
} from "@noddde/drizzle/pg";

// MySQL (uses int auto-increment, varchar(255), json)
import {
  events,
  aggregateStates,
  sagaStates,
  snapshots,
  outbox,
} from "@noddde/drizzle/mysql";

You can also define your own tables matching the expected column structure -- the adapter does not require using the provided schemas.

Configuration

import Database from "better-sqlite3";
import { drizzle } from "drizzle-orm/better-sqlite3";
import { DrizzleAdapter } from "@noddde/drizzle";
import { everyNEvents } from "@noddde/core";
import { defineDomain, wireDomain } from "@noddde/engine";

const db = drizzle(new Database("app.db"));
const adapter = new DrizzleAdapter(db);

const bankingDomain = defineDomain({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: { BankAccount: BankAccountProjection } },
});

const domain = await wireDomain(bankingDomain, {
  persistenceAdapter: adapter,
  aggregates: {
    BankAccount: {
      persistence: "event-sourced",
      snapshots: { strategy: everyNEvents(100) },
    },
  },
});

The adapter auto-infers the dialect from the Drizzle db instance and selects the correct pre-built schemas (PostgreSQL, MySQL, or SQLite). No schema imports or table config needed.

How Drizzle Transactions Work

The adapter detects the dialect automatically. For SQLite (sync drivers like better-sqlite3), it uses explicit BEGIN/COMMIT/ROLLBACK SQL statements. For PostgreSQL and MySQL, it uses the native db.transaction() callback, which ensures connection affinity in pooled environments.

The persistence classes and UnitOfWork share a transaction store. When a transaction is active, all queries automatically route through it.

Advanced Configuration

For custom table overrides, pass a config object as the second argument:

import { DrizzleAdapter } from "@noddde/drizzle";
import { myEvents, mySagas } from "./schema"; // your own Drizzle tables

const db = drizzle(pool);
const adapter = new DrizzleAdapter(db, {
  tables: { eventStore: myEvents, sagaStore: mySagas },
});

Custom tables override the auto-resolved ones for that specific store. Tables you don't override use the built-in schemas for the detected dialect.

Per-Aggregate State Tables

By default, all state-stored aggregates share a single noddde_aggregate_states table, discriminated by an aggregate_name column. For production workloads, you may want each aggregate to have its own dedicated table with a domain-specific schema.

Use the stateStored() helper method to bind an aggregate to a dedicated Drizzle table:

const adapter = new DrizzleAdapter(db);

const domain = await wireDomain(definition, {
  persistenceAdapter: adapter,
  aggregates: {
    Payment: { persistence: "event-sourced" },
    Order: { persistence: adapter.stateStored(orders) },
    BankAccount: { persistence: adapter.stateStored(bankAccounts) },
  },
});

Column resolution works in two modes:

  1. Convention-based (default): The adapter scans the Drizzle table definition for columns named aggregate_id, state, and version. If your table follows this naming, no explicit mapping is needed.

  2. Explicit mapping: For tables with custom column names, provide a columns object:

adapter.stateStored(orders, {
  aggregateId: orders.orderId,
  state: orders.orderData,
  version: orders.rev,
});

If convention-based resolution fails (columns not found), the adapter throws with a clear error listing the available columns in the table.

The stateStored() method returns a StateStoredAggregatePersistence that can be passed directly to the persistence field in per-aggregate wiring. It shares the same transaction store as all other stores on the adapter, ensuring UoW atomicity across shared and dedicated tables.

Prisma

Installation

yarn add @noddde/prisma @prisma/client
yarn add -D prisma

Schema Setup

Copy the three model definitions from the package's Prisma schema into your own schema.prisma:

model NodddeEvent {
  id             Int      @id @default(autoincrement())
  aggregateName  String   @map("aggregate_name")
  aggregateId    String   @map("aggregate_id")
  sequenceNumber Int      @map("sequence_number")
  eventName      String   @map("event_name")
  payload        String
  metadata       String?
  createdAt      DateTime @map("created_at")

  @@unique([aggregateName, aggregateId, sequenceNumber])
  @@map("noddde_events")
}

model NodddeAggregateState {
  aggregateName String @map("aggregate_name")
  aggregateId   String @map("aggregate_id")
  state         String
  version       Int    @default(0)

  @@id([aggregateName, aggregateId])
  @@map("noddde_aggregate_states")
}

model NodddeSagaState {
  sagaName String @map("saga_name")
  sagaId   String @map("saga_id")
  state    String

  @@id([sagaName, sagaId])
  @@map("noddde_saga_states")
}

model NodddeSnapshot {
  aggregateName String @map("aggregate_name")
  aggregateId   String @map("aggregate_id")
  state         String
  version       Int    @default(0)

  @@id([aggregateName, aggregateId])
  @@map("noddde_snapshots")
}

// Only needed if using the outbox pattern
model NodddeOutbox {
  id            String    @id
  event         String
  aggregateName String?   @map("aggregate_name")
  aggregateId   String?   @map("aggregate_id")
  createdAt     DateTime  @map("created_at")
  publishedAt   DateTime? @map("published_at")

  @@map("noddde_outbox")
}

Then run prisma generate and your preferred migration command.

Configuration

import { PrismaClient } from "@prisma/client";
import { PrismaAdapter } from "@noddde/prisma";
import { everyNEvents } from "@noddde/core";
import { defineDomain, wireDomain } from "@noddde/engine";

const prisma = new PrismaClient();
const adapter = new PrismaAdapter(prisma);

const bankingDomain = defineDomain({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: { BankAccount: BankAccountProjection } },
});

const domain = await wireDomain(bankingDomain, {
  persistenceAdapter: adapter,
  aggregates: {
    BankAccount: {
      persistence: "event-sourced",
      snapshots: { strategy: everyNEvents(100) },
    },
  },
});

The Prisma adapter uses built-in Prisma models (NodddeEvent, NodddeAggregateState, etc.) for all stores. No configuration needed beyond the PrismaClient instance.

How Prisma Transactions Work

The Prisma adapter uses interactive transactions via prisma.$transaction(async (tx) => { ... }). When a unit of work commits, it sets txStore.current to the transactional client tx, and all persistence classes route their queries through it. Prisma automatically rolls back the transaction if any operation throws.

Advanced Configuration

For per-aggregate state tables, use the stateStored() helper method:

const adapter = new PrismaAdapter(prisma);

const domain = await wireDomain(definition, {
  persistenceAdapter: adapter,
  aggregates: {
    Order: {
      persistence: adapter.stateStored("order", {
        aggregateId: "orderId",
        state: "orderData",
        version: "rev",
      }),
    },
  },
});

Since Prisma uses generated model delegates (e.g., prisma.order), the first argument to stateStored() must match the camelCase delegate name on your PrismaClient. The adapter validates that the model exists at creation time.

For pessimistic concurrency, pass the dialect option since PrismaClient does not expose the database provider at runtime:

const adapter = new PrismaAdapter(prisma, { dialect: "postgresql" });

TypeORM

Installation

yarn add @noddde/typeorm typeorm reflect-metadata
# Plus your database driver, e.g.:
yarn add pg

Schema Setup

The package exports TypeORM entity classes decorated with @Entity, @Column, etc. Register them in your DataSource configuration:

import {
  NodddeEventEntity,
  NodddeAggregateStateEntity,
  NodddeSagaStateEntity,
  NodddeSnapshotEntity,
  NodddeOutboxEntryEntity, // only needed if using the outbox pattern
} from "@noddde/typeorm";

const dataSource = new DataSource({
  type: "postgres",
  url: process.env.DATABASE_URL,
  entities: [
    NodddeEventEntity,
    NodddeAggregateStateEntity,
    NodddeSagaStateEntity,
    NodddeSnapshotEntity,
    NodddeOutboxEntryEntity,
  ],
  synchronize: true, // use migrations in production
});
await dataSource.initialize();

For production, use TypeORM migrations instead of synchronize: true.

Configuration

import { TypeORMAdapter } from "@noddde/typeorm";
import { everyNEvents } from "@noddde/core";
import { defineDomain, wireDomain } from "@noddde/engine";

const adapter = new TypeORMAdapter(dataSource);

const bankingDomain = defineDomain({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: { BankAccount: BankAccountProjection } },
});

const domain = await wireDomain(bankingDomain, {
  persistenceAdapter: adapter,
  aggregates: {
    BankAccount: {
      persistence: "event-sourced",
      snapshots: { strategy: everyNEvents(100) },
    },
  },
});

The TypeORM adapter uses built-in entity classes for all stores. The database type is auto-detected from dataSource.options.type for advisory locking support.

How TypeORM Transactions Work

The TypeORM adapter uses dataSource.manager.transaction() to wrap operations. When a unit of work commits, it sets txStore.current to the transactional EntityManager, and all persistence classes use it for their repository operations.

Advanced Configuration

For per-aggregate state tables, use the stateStored() helper method with a TypeORM entity class:

import { TypeORMAdapter } from "@noddde/typeorm";
import { OrderEntity } from "./entities"; // your own TypeORM entity

const adapter = new TypeORMAdapter(dataSource);

const domain = await wireDomain(definition, {
  persistenceAdapter: adapter,
  aggregates: {
    Order: {
      persistence: adapter.stateStored(OrderEntity, {
        aggregateId: "orderId",
        state: "orderData",
        version: "rev",
      }),
    },
  },
});

How Transactions Work

Each adapter section above describes its ORM-specific transaction mechanism. Under the hood, all three follow the same pattern for integrating with the Unit of Work:

  1. The adapter opens a database transaction
  2. It sets txStore.current to the transaction-scoped database client
  3. All enlisted persistence operations execute within that transaction, because they read txStore.current for their queries
  4. On success, the transaction commits and deferred events are returned for publishing
  5. On failure, the transaction rolls back and no events are published

This shared transaction store pattern means persistence classes do not need to know whether they are operating inside a unit of work or not -- they always read from txStore.current, which is null outside a transaction and points to the active transaction client inside one.

Concurrency Control

All three adapters support both optimistic and pessimistic concurrency strategies. Here is what each adapter provides at the database level.

Optimistic Concurrency (built-in)

Handled automatically by the persistence implementations via database constraints:

  • Events table: A unique constraint on (aggregate_name, aggregate_id, sequence_number) prevents concurrent appends. Violations throw ConcurrencyError.
  • States table: A version column enables optimistic locking. Updates use WHERE version = expectedVersion; zero rows affected throws ConcurrencyError.

Advisory Lockers (for pessimistic concurrency)

Each adapter exports an advisory locker for use with the pessimistic strategy. See the Dialect Support Matrix above for which databases support locking.

AdapterConstructorDialect Detection
DrizzleAdvisoryLocker(db, dialect)Explicit: "pg" | "mysql" | "sqlite" (throws)
PrismaAdvisoryLocker(prisma, dialect)Explicit: "postgresql" | "mysql" | "mariadb"
TypeORMAdvisoryLocker(dataSource)Auto-detects from dataSource.options.type

Under the hood, each dialect uses the database's native advisory lock mechanism:

DialectLock mechanismLock key format
PostgreSQLpg_advisory_lock / pg_try_advisory_lock64-bit FNV-1a hash of name:id
MySQL/MariaDBGET_LOCK / RELEASE_LOCKFirst 64 chars of name:id (MySQL limit)
MSSQLsp_getapplock / sp_releaseapplockFirst 255 chars of name:id (TypeORM only)

SQLite has no advisory lock mechanism. For single-process SQLite deployments, use InMemoryAggregateLocker from @noddde/engine.

Advisory locks are session-level, spanning beyond the database transaction. This is intentional: the lock covers the entire load→execute→save lifecycle.

Choosing an Adapter

FactorDrizzlePrismaTypeORM
Schema definitionTypeScript table builders.prisma schema fileDecorator-based entities
Code generationNoneRequired (prisma generate)None
Type safetyFull (inferred from schema)Full (generated client)Partial (decorator metadata)
Bundle sizeLightweightHeavier (generated client)Medium
Sync driver supportYes (better-sqlite3)No (async only)Yes
Migration toolingDrizzle KitPrisma MigrateTypeORM migrations

All three provide identical functionality for noddde's purposes. The choice comes down to which ORM your project already uses.

Custom Adapters

You can implement your own adapter for any database driver -- no ORM required. All you need is a class (or plain object) implementing the PersistenceAdapter interface from @noddde/core.

The PersistenceAdapter Interface

import type {
  PersistenceAdapter,
  EventSourcedAggregatePersistence,
  StateStoredAggregatePersistence,
  SagaPersistence,
  UnitOfWorkFactory,
  SnapshotStore,
  OutboxStore,
  IdempotencyStore,
  AggregateLocker,
} from "@noddde/core";

interface PersistenceAdapter {
  // --- Required ---
  unitOfWorkFactory: UnitOfWorkFactory;

  // --- Optional (engine errors at runtime if needed but missing) ---
  eventSourcedPersistence?: EventSourcedAggregatePersistence;
  stateStoredPersistence?: StateStoredAggregatePersistence;
  sagaPersistence?: SagaPersistence;
  snapshotStore?: SnapshotStore;
  outboxStore?: OutboxStore;
  idempotencyStore?: IdempotencyStore;
  aggregateLocker?: AggregateLocker;

  // --- Lifecycle hooks (optional) ---
  init?(): Promise<void>; // called by Domain.init()
  close?(): Promise<void>; // called by Domain.shutdown()
}

Only unitOfWorkFactory is required. Everything else is optional -- the engine validates at runtime that the adapter provides what the domain needs. For example, if your domain has event-sourced aggregates, the engine will error if eventSourcedPersistence is missing.

Persistence Interfaces

These are the interfaces your adapter needs to implement depending on which features your domain uses:

UnitOfWorkFactory (required)

type UnitOfWorkFactory = () => UnitOfWork;

interface UnitOfWork {
  /** Buffer a write operation for deferred execution. */
  enlist(operation: () => Promise<void>): void;
  /** Schedule events for publishing after commit. */
  deferPublish(...events: Event[]): void;
  /** Execute all operations atomically, return deferred events. */
  commit(): Promise<Event[]>;
  /** Discard all operations and events. */
  rollback(): Promise<void>;
}

The UnitOfWork is the core of transactional guarantees. Each command dispatch gets a fresh instance. Your commit() must wrap all enlisted operations in a database transaction and return the deferred events for post-commit publishing.

EventSourcedAggregatePersistence

interface EventSourcedAggregatePersistence {
  save(
    aggregateName: string,
    aggregateId: string,
    events: Event[],
    expectedVersion: number,
  ): Promise<void>;
  load(aggregateName: string, aggregateId: string): Promise<Event[]>;
}

Appends events to an aggregate stream. save() must enforce optimistic concurrency: if expectedVersion doesn't match the current stream length, throw ConcurrencyError. load() returns events ordered by sequence number, or [] for new aggregates.

StateStoredAggregatePersistence

interface StateStoredAggregatePersistence {
  save(
    aggregateName: string,
    aggregateId: string,
    state: any,
    expectedVersion: number,
  ): Promise<void>;
  load(
    aggregateName: string,
    aggregateId: string,
  ): Promise<{ state: any; version: number } | null>;
}

Stores the latest aggregate state as a JSON snapshot. save() must enforce optimistic concurrency (compare expectedVersion against stored version). load() returns null for new aggregates.

SagaPersistence

interface SagaPersistence {
  save(sagaName: string, sagaId: string, state: any): Promise<void>;
  load(sagaName: string, sagaId: string): Promise<any | undefined | null>;
}

Simple key-value persistence for saga workflow state. No concurrency control required.

SnapshotStore (optional)

interface SnapshotStore {
  load(aggregateName: string, aggregateId: string): Promise<Snapshot | null>;
  save(
    aggregateName: string,
    aggregateId: string,
    snapshot: Snapshot,
  ): Promise<void>;
}
// Snapshot = { state: any; version: number }

Caches aggregate state at a point in time so event replay can start from the snapshot version instead of the beginning.

OutboxStore (optional)

interface OutboxStore {
  save(entries: OutboxEntry[]): Promise<void>;
  loadUnpublished(batchSize?: number): Promise<OutboxEntry[]>;
  markPublished(ids: string[]): Promise<void>;
  markPublishedByEventIds(eventIds: string[]): Promise<void>;
  deletePublished(olderThan?: Date): Promise<void>;
}

Transactional outbox for guaranteed at-least-once event delivery. save() is called inside the UoW transaction. See Outbox Pattern for details.

AggregateLocker (optional)

interface AggregateLocker {
  acquire(
    aggregateName: string,
    aggregateId: string,
    timeoutMs?: number,
  ): Promise<void>;
  release(aggregateName: string, aggregateId: string): Promise<void>;
}

Pessimistic locking for serializing command execution against the same aggregate. acquire() blocks until the lock is available or throws LockTimeoutError. release() must be idempotent.

Implementation Guide: MongoDB Native Driver

This walkthrough builds a custom adapter using the MongoDB native driver (mongodb). The same principles apply to any database -- PostgreSQL with pg, DynamoDB, or anything else.

MongoDB multi-document transactions (available since MongoDB 4.0) require a replica set. For local development, use mongod --replSet rs0 or a Docker image pre-configured as a single-node replica set.

Step 1: Create the UnitOfWork

The UoW is the only required piece. It wraps all enlisted operations in a database transaction so that aggregate persistence, outbox writes, and any other enlisted operations commit or fail atomically.

In MongoDB, this maps to a ClientSession with an active transaction:

import { MongoClient, type ClientSession } from "mongodb";
import type { UnitOfWork, UnitOfWorkFactory, Event } from "@noddde/core";

/**
 * Shared mutable ref so persistence classes see the active transaction session.
 * This is the bridge between the UoW and persistence classes.
 */
interface SessionStore {
  current: ClientSession | null;
}

class MongoUnitOfWork implements UnitOfWork {
  private operations: (() => Promise<void>)[] = [];
  private events: Event[] = [];
  private sealed = false;

  constructor(
    private readonly client: MongoClient,
    private readonly sessionStore: SessionStore,
  ) {}

  enlist(op: () => Promise<void>) {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.operations.push(op);
  }

  deferPublish(...events: Event[]) {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.events.push(...events);
  }

  async commit(): Promise<Event[]> {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.sealed = true;

    const session = this.client.startSession();
    try {
      session.startTransaction();
      this.sessionStore.current = session;

      for (const op of this.operations) {
        await op();
      }

      await session.commitTransaction();
      return this.events;
    } catch (err) {
      await session.abortTransaction();
      throw err;
    } finally {
      this.sessionStore.current = null;
      await session.endSession();
    }
  }

  async rollback(): Promise<void> {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.sealed = true;
    this.operations = [];
    this.events = [];
  }
}

The key pattern: a session store holds a mutable reference to the active MongoDB session. All persistence classes read from it so their operations participate in the same transaction.

Step 2: Implement Persistence Classes

Each persistence class reads sessionStore.current to pass the active session to MongoDB operations. When no UoW is active (e.g., read-only load() calls), it falls back to a sessionless call.

import type { Db, Collection } from "mongodb";
import { ConcurrencyError } from "@noddde/core";
import type { EventSourcedAggregatePersistence, Event } from "@noddde/core";

class MongoEventSourcedPersistence implements EventSourcedAggregatePersistence {
  private readonly collection: Collection;

  constructor(
    db: Db,
    private readonly sessionStore: SessionStore,
  ) {
    this.collection = db.collection("noddde_events");
  }

  /** Returns session options if inside a UoW, or empty object otherwise. */
  private get sessionOpts() {
    return this.sessionStore.current
      ? { session: this.sessionStore.current }
      : {};
  }

  async save(
    aggregateName: string,
    aggregateId: string,
    events: Event[],
    expectedVersion: number,
  ) {
    if (events.length === 0) return;

    const docs = events.map((event, i) => ({
      aggregateName,
      aggregateId,
      sequenceNumber: expectedVersion + i + 1,
      eventName: event.name,
      payload: event.payload,
      metadata: event.metadata ?? null,
      createdAt: event.metadata?.timestamp
        ? new Date(event.metadata.timestamp)
        : new Date(),
    }));

    try {
      await this.collection.insertMany(docs, this.sessionOpts);
    } catch (err: any) {
      if (err.code === 11000) {
        // duplicate key
        throw new ConcurrencyError(
          aggregateName,
          aggregateId,
          expectedVersion,
          -1,
        );
      }
      throw err;
    }
  }

  async load(aggregateName: string, aggregateId: string): Promise<Event[]> {
    const docs = await this.collection
      .find({ aggregateName, aggregateId }, this.sessionOpts)
      .sort({ sequenceNumber: 1 })
      .toArray();

    return docs.map((doc) => ({
      name: doc.eventName,
      payload: doc.payload,
      ...(doc.metadata ? { metadata: doc.metadata } : {}),
    }));
  }
}

MongoDB stores documents natively as BSON -- no need to JSON.stringify() payloads. This is a key difference from SQL adapters where payloads are stored as JSON text.

For optimistic concurrency in StateStoredAggregatePersistence, use a findOneAndUpdate with a version filter:

import type { StateStoredAggregatePersistence } from "@noddde/core";

class MongoStateStoredPersistence implements StateStoredAggregatePersistence {
  private readonly collection: Collection;

  constructor(
    db: Db,
    private readonly sessionStore: SessionStore,
  ) {
    this.collection = db.collection("noddde_aggregate_states");
  }

  private get sessionOpts() {
    return this.sessionStore.current
      ? { session: this.sessionStore.current }
      : {};
  }

  async save(
    aggregateName: string,
    aggregateId: string,
    state: any,
    expectedVersion: number,
  ) {
    if (expectedVersion === 0) {
      // New aggregate: insert with version 1
      try {
        await this.collection.insertOne(
          { aggregateName, aggregateId, state, version: 1 },
          this.sessionOpts,
        );
      } catch (err: any) {
        if (err.code === 11000) {
          throw new ConcurrencyError(aggregateName, aggregateId, 0, -1);
        }
        throw err;
      }
    } else {
      // Existing aggregate: update only if version matches
      const result = await this.collection.findOneAndUpdate(
        { aggregateName, aggregateId, version: expectedVersion },
        { $set: { state, version: expectedVersion + 1 } },
        this.sessionOpts,
      );
      if (!result) {
        throw new ConcurrencyError(
          aggregateName,
          aggregateId,
          expectedVersion,
          -1,
        );
      }
    }
  }

  async load(aggregateName: string, aggregateId: string) {
    const doc = await this.collection.findOne(
      { aggregateName, aggregateId },
      this.sessionOpts,
    );
    return doc ? { state: doc.state, version: doc.version } : null;
  }
}

Step 3: Assemble the Adapter

All persistence classes share a single sessionStore so they participate in the same transaction:

import type { PersistenceAdapter } from "@noddde/core";
import { MongoClient, type Db } from "mongodb";

class MongoAdapter implements PersistenceAdapter {
  readonly unitOfWorkFactory;
  readonly eventSourcedPersistence;
  readonly stateStoredPersistence;
  readonly sagaPersistence;
  // Add optional stores as needed:
  // readonly snapshotStore;
  // readonly outboxStore;

  private readonly sessionStore: SessionStore = { current: null };

  constructor(
    private readonly client: MongoClient,
    db: Db,
  ) {
    this.unitOfWorkFactory = () =>
      new MongoUnitOfWork(client, this.sessionStore);
    this.eventSourcedPersistence = new MongoEventSourcedPersistence(
      db,
      this.sessionStore,
    );
    this.stateStoredPersistence = new MongoStateStoredPersistence(
      db,
      this.sessionStore,
    );
    this.sagaPersistence = new MongoSagaPersistence(db, this.sessionStore);
  }

  /** Create required indexes on first use. */
  async init() {
    const db = this.client.db();
    await db
      .collection("noddde_events")
      .createIndex(
        { aggregateName: 1, aggregateId: 1, sequenceNumber: 1 },
        { unique: true },
      );
    await db
      .collection("noddde_aggregate_states")
      .createIndex({ aggregateName: 1, aggregateId: 1 }, { unique: true });
  }

  async close() {
    await this.client.close();
  }
}

Step 4: Wire It Up

import { wireDomain, defineDomain } from "@noddde/engine";
import { MongoClient } from "mongodb";

const client = new MongoClient(process.env.MONGODB_URI!);
await client.connect();
const db = client.db("my-app");

const adapter = new MongoAdapter(client, db);

const domain = await wireDomain(
  defineDomain({
    writeModel: { aggregates: { Order } },
    readModel: { projections: {} },
  }),
  { persistenceAdapter: adapter },
);

Key Implementation Rules

  1. Session/transaction store pattern -- All persistence classes must share a single mutable reference to the active transaction context (a ClientSession for MongoDB, a PoolClient for PostgreSQL, etc.). The UoW sets it before executing operations and clears it in a finally block.

  2. Optimistic concurrency -- EventSourcedAggregatePersistence.save() and StateStoredAggregatePersistence.save() must throw ConcurrencyError (from @noddde/core) when the expected version doesn't match. Use unique indexes (MongoDB E11000 / PostgreSQL 23505) or version-conditional updates.

  3. UoW is single-use -- After commit() or rollback(), all further calls must throw "UnitOfWork already completed".

  4. Event ordering -- load() must return events sorted by sequence number ascending.

  5. Idempotent release -- AggregateLocker.release() must not throw if the lock is already released.

  6. Only implement what you need -- If your domain has no sagas, skip sagaPersistence. If you don't need the outbox pattern, skip outboxStore. The engine validates at runtime and gives clear error messages if something is missing.

Database Schema Reference

All adapters use the same logical schema. You are responsible for creating these tables using your ORM's migration tooling (Drizzle Kit, Prisma Migrate, TypeORM migrations) or raw SQL.

Tables Overview

TablePurposeKeyRequired
noddde_eventsEvent streamsAuto-increment idIf using event-sourced aggregates
noddde_aggregate_statesState snapshotsComposite (aggregate_name, aggregate_id)If using state-stored aggregates
noddde_saga_statesSaga stateComposite (saga_name, saga_id)If using sagas
noddde_snapshotsEvent-sourced snapshotsComposite (aggregate_name, aggregate_id)Optional (performance optimization)
noddde_outboxTransactional outboxid (string)If using the outbox pattern

The noddde_aggregate_states table is a shared table for all state-stored aggregates. If you need dedicated per-aggregate tables (e.g., for direct SQL queries), see Per-Aggregate Dedicated State Tables below.

States and event payloads are serialized as JSON strings (or native JSON types where supported), making the schema database-agnostic.

Dialect Type Mapping

Column typePostgreSQLMySQLSQLiteMSSQL
Auto-incrementSERIALINT AUTO_INCREMENTINTEGERINT IDENTITY
StringTEXTVARCHAR(255)TEXTNVARCHAR(255)
IntegerINTEGERINTINTEGERINT
JSON payloadJSONBJSONTEXTNVARCHAR(MAX)
JSON metadataJSONB (nullable)JSON (nullable)TEXT (nullable)NVARCHAR(MAX) (nullable)
TimestampTIMESTAMPTZTIMESTAMP(3)TEXT (ISO 8601)DATETIME2
Nullable timestampTIMESTAMPTZ (nullable)TIMESTAMP(3) (nullable)TEXT (nullable)DATETIME2 (nullable)

PostgreSQL

CREATE TABLE IF NOT EXISTS noddde_events (
  id              SERIAL PRIMARY KEY,
  aggregate_name  TEXT NOT NULL,
  aggregate_id    TEXT NOT NULL,
  sequence_number INTEGER NOT NULL,
  event_name      TEXT NOT NULL,
  payload         JSONB NOT NULL,
  metadata        JSONB,                              -- event metadata envelope (nullable for backcompat)
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()   -- populated from event.metadata.timestamp
);
CREATE UNIQUE INDEX IF NOT EXISTS noddde_events_stream_version_idx
  ON noddde_events (aggregate_name, aggregate_id, sequence_number);

CREATE TABLE IF NOT EXISTS noddde_aggregate_states (
  aggregate_name TEXT NOT NULL,
  aggregate_id   TEXT NOT NULL,
  state          JSONB NOT NULL,
  version        INTEGER NOT NULL DEFAULT 0,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

CREATE TABLE IF NOT EXISTS noddde_saga_states (
  saga_name TEXT NOT NULL,
  saga_id   TEXT NOT NULL,
  state     JSONB NOT NULL,
  PRIMARY KEY (saga_name, saga_id)
);

CREATE TABLE IF NOT EXISTS noddde_snapshots (
  aggregate_name TEXT NOT NULL,
  aggregate_id   TEXT NOT NULL,
  state          JSONB NOT NULL,
  version        INTEGER NOT NULL,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

CREATE TABLE IF NOT EXISTS noddde_outbox (
  id             TEXT PRIMARY KEY,
  event          JSONB NOT NULL,
  aggregate_name TEXT,
  aggregate_id   TEXT,
  created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  published_at   TIMESTAMPTZ
);

MySQL

CREATE TABLE IF NOT EXISTS noddde_events (
  id              INT AUTO_INCREMENT PRIMARY KEY,
  aggregate_name  VARCHAR(255) NOT NULL,
  aggregate_id    VARCHAR(255) NOT NULL,
  sequence_number INT NOT NULL,
  event_name      VARCHAR(255) NOT NULL,
  payload         JSON NOT NULL,
  metadata        JSON,
  created_at      TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  UNIQUE INDEX noddde_events_stream_version_idx (aggregate_name, aggregate_id, sequence_number)
);

CREATE TABLE IF NOT EXISTS noddde_aggregate_states (
  aggregate_name VARCHAR(255) NOT NULL,
  aggregate_id   VARCHAR(255) NOT NULL,
  state          TEXT NOT NULL,
  version        INT NOT NULL DEFAULT 0,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

CREATE TABLE IF NOT EXISTS noddde_saga_states (
  saga_name VARCHAR(255) NOT NULL,
  saga_id   VARCHAR(255) NOT NULL,
  state     TEXT NOT NULL,
  PRIMARY KEY (saga_name, saga_id)
);

CREATE TABLE IF NOT EXISTS noddde_snapshots (
  aggregate_name VARCHAR(255) NOT NULL,
  aggregate_id   VARCHAR(255) NOT NULL,
  state          TEXT NOT NULL,
  version        INT NOT NULL,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

CREATE TABLE IF NOT EXISTS noddde_outbox (
  id             VARCHAR(255) PRIMARY KEY,
  event          JSON NOT NULL,
  aggregate_name VARCHAR(255),
  aggregate_id   VARCHAR(255),
  created_at     TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  published_at   TIMESTAMP(3)
);

SQLite

CREATE TABLE IF NOT EXISTS noddde_events (
  id              INTEGER PRIMARY KEY AUTOINCREMENT,
  aggregate_name  TEXT NOT NULL,
  aggregate_id    TEXT NOT NULL,
  sequence_number INTEGER NOT NULL,
  event_name      TEXT NOT NULL,
  payload         TEXT NOT NULL,                -- JSON stored as text
  metadata        TEXT,                         -- JSON stored as text
  created_at      TEXT NOT NULL                 -- ISO 8601 string (SQLite has no native timestamp)
);
CREATE UNIQUE INDEX IF NOT EXISTS noddde_events_stream_version_idx
  ON noddde_events (aggregate_name, aggregate_id, sequence_number);

CREATE TABLE IF NOT EXISTS noddde_aggregate_states (
  aggregate_name TEXT NOT NULL,
  aggregate_id   TEXT NOT NULL,
  state          TEXT NOT NULL,
  version        INTEGER NOT NULL DEFAULT 0,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

CREATE TABLE IF NOT EXISTS noddde_saga_states (
  saga_name TEXT NOT NULL,
  saga_id   TEXT NOT NULL,
  state     TEXT NOT NULL,
  PRIMARY KEY (saga_name, saga_id)
);

CREATE TABLE IF NOT EXISTS noddde_snapshots (
  aggregate_name TEXT NOT NULL,
  aggregate_id   TEXT NOT NULL,
  state          TEXT NOT NULL,
  version        INTEGER NOT NULL,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

CREATE TABLE IF NOT EXISTS noddde_outbox (
  id             TEXT PRIMARY KEY,
  event          TEXT NOT NULL,
  aggregate_name TEXT,
  aggregate_id   TEXT,
  created_at     TEXT NOT NULL,
  published_at   TEXT
);

MSSQL

IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_events' AND xtype='U')
CREATE TABLE noddde_events (
  id              INT IDENTITY(1,1) PRIMARY KEY,
  aggregate_name  NVARCHAR(255) NOT NULL,
  aggregate_id    NVARCHAR(255) NOT NULL,
  sequence_number INT NOT NULL,
  event_name      NVARCHAR(255) NOT NULL,
  payload         NVARCHAR(MAX) NOT NULL,
  metadata        NVARCHAR(MAX),
  created_at      DATETIME2 NOT NULL DEFAULT GETUTCDATE()
);
CREATE UNIQUE INDEX noddde_events_stream_version_idx
  ON noddde_events (aggregate_name, aggregate_id, sequence_number);

IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_aggregate_states' AND xtype='U')
CREATE TABLE noddde_aggregate_states (
  aggregate_name NVARCHAR(255) NOT NULL,
  aggregate_id   NVARCHAR(255) NOT NULL,
  state          NVARCHAR(MAX) NOT NULL,
  version        INT NOT NULL DEFAULT 0,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_saga_states' AND xtype='U')
CREATE TABLE noddde_saga_states (
  saga_name NVARCHAR(255) NOT NULL,
  saga_id   NVARCHAR(255) NOT NULL,
  state     NVARCHAR(MAX) NOT NULL,
  PRIMARY KEY (saga_name, saga_id)
);

IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_snapshots' AND xtype='U')
CREATE TABLE noddde_snapshots (
  aggregate_name NVARCHAR(255) NOT NULL,
  aggregate_id   NVARCHAR(255) NOT NULL,
  state          NVARCHAR(MAX) NOT NULL,
  version        INT NOT NULL,
  PRIMARY KEY (aggregate_name, aggregate_id)
);

IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='noddde_outbox' AND xtype='U')
CREATE TABLE noddde_outbox (
  id             NVARCHAR(255) PRIMARY KEY,
  event          NVARCHAR(MAX) NOT NULL,
  aggregate_name NVARCHAR(255),
  aggregate_id   NVARCHAR(255),
  created_at     DATETIME2 NOT NULL DEFAULT GETUTCDATE(),
  published_at   DATETIME2
);

Per-Aggregate Dedicated State Tables

By default, all state-stored aggregates share the noddde_aggregate_states table. For aggregates where you need direct SQL queries on domain fields, create a dedicated table and use the adapter's stateStored() helper:

-- Example: dedicated table for an Order aggregate
CREATE TABLE IF NOT EXISTS orders (
  aggregate_id TEXT PRIMARY KEY,
  state        JSONB NOT NULL,       -- or TEXT for MySQL/SQLite
  version      INTEGER NOT NULL DEFAULT 0
);

The dedicated table must have at least aggregate_id, state, and version columns. Column names can be customized via the stateStored() options. Each adapter accepts its own table reference format:

// Drizzle: pass the Drizzle table object
adapter.stateStored(ordersTable);

// Prisma: pass the camelCase model delegate name
adapter.stateStored("order", {
  aggregateId: "orderId",
  state: "data",
  version: "rev",
});

// TypeORM: pass the entity class
adapter.stateStored(OrderEntity, {
  aggregateId: "orderId",
  state: "data",
  version: "rev",
});

See the Drizzle, Prisma, and TypeORM "Advanced Configuration" sections above for full per-adapter examples.

Next Steps

  • Unit of Work -- Atomic persistence and deferred event publishing
  • Persistence -- Choosing between event-sourced and state-stored strategies
  • Infrastructure -- The full infrastructure provider system

On this page