noddde
Persistence

Custom Adapters

Build your own persistence adapter for any database driver — no ORM required.

You can implement your own adapter for any database driver -- no ORM required. All you need is a class (or plain object) implementing the PersistenceAdapter interface from @noddde/core.

The PersistenceAdapter Interface

packages/core/src/persistence/persistence-adapter.ts
import type {
  PersistenceAdapter,
  EventSourcedAggregatePersistence,
  StateStoredAggregatePersistence,
  SagaPersistence,
  UnitOfWorkFactory,
  SnapshotStore,
  OutboxStore,
  IdempotencyStore,
  AggregateLocker,
} from "@noddde/core";

interface PersistenceAdapter {
  // --- Required ---
  unitOfWorkFactory: UnitOfWorkFactory;

  // --- Optional (engine errors at runtime if needed but missing) ---
  eventSourcedPersistence?: EventSourcedAggregatePersistence;
  stateStoredPersistence?: StateStoredAggregatePersistence;
  sagaPersistence?: SagaPersistence;
  snapshotStore?: SnapshotStore;
  outboxStore?: OutboxStore;
  idempotencyStore?: IdempotencyStore;
  aggregateLocker?: AggregateLocker;

  // --- Lifecycle hooks (optional) ---
  init?(): Promise<void>; // called by Domain.init()
  close?(): Promise<void>; // called by Domain.shutdown()
}

Only unitOfWorkFactory is required. Everything else is optional -- the engine validates at runtime that the adapter provides what the domain needs. For example, if your domain has event-sourced aggregates, the engine will error if eventSourcedPersistence is missing.

Persistence Interfaces

These are the interfaces your adapter needs to implement depending on which features your domain uses:

UnitOfWorkFactory (required)

packages/core/src/persistence/unit-of-work.ts
type UnitOfWorkFactory = () => UnitOfWork;

A UnitOfWorkFactory returns a fresh UnitOfWork per command dispatch -- your commit() must wrap all enlisted operations in a database transaction and return the deferred events for post-commit publishing. For the full UnitOfWork contract and the buffering / commit / publish lifecycle, see Persistence — Unit of Work.

EventSourcedAggregatePersistence

packages/core/src/persistence/event-sourced-aggregate-persistence.ts
interface EventSourcedAggregatePersistence {
  save(
    aggregateName: string,
    aggregateId: string,
    events: Event[],
    expectedVersion: number,
  ): Promise<void>;
  load(aggregateName: string, aggregateId: string): Promise<Event[]>;
}

Appends events to an aggregate stream. save() must enforce optimistic concurrency: if expectedVersion doesn't match the current stream length, throw ConcurrencyError. load() returns events ordered by sequence number, or [] for new aggregates.

StateStoredAggregatePersistence

packages/core/src/persistence/state-stored-aggregate-persistence.ts
interface StateStoredAggregatePersistence {
  save(
    aggregateName: string,
    aggregateId: string,
    state: any,
    expectedVersion: number,
  ): Promise<void>;
  load(
    aggregateName: string,
    aggregateId: string,
  ): Promise<{ state: any; version: number } | null>;
}

Stores the latest aggregate state as a JSON snapshot. save() must enforce optimistic concurrency (compare expectedVersion against stored version). load() returns null for new aggregates.

SagaPersistence

packages/core/src/persistence/saga-persistence.ts
interface SagaPersistence {
  save(sagaName: string, sagaId: string, state: any): Promise<void>;
  load(sagaName: string, sagaId: string): Promise<any | undefined | null>;
}

Simple key-value persistence for saga workflow state. No concurrency control required.

SnapshotStore (optional)

packages/core/src/persistence/snapshot-store.ts
interface SnapshotStore {
  load(aggregateName: string, aggregateId: string): Promise<Snapshot | null>;
  save(
    aggregateName: string,
    aggregateId: string,
    snapshot: Snapshot,
  ): Promise<void>;
}
// Snapshot = { state: any; version: number }

Caches aggregate state at a point in time so event replay can start from the snapshot version instead of the beginning.

OutboxStore (optional)

packages/core/src/edd/outbox-store.ts
interface OutboxStore {
  save(entries: OutboxEntry[]): Promise<void>;
  loadUnpublished(batchSize?: number): Promise<OutboxEntry[]>;
  markPublished(ids: string[]): Promise<void>;
  markPublishedByEventIds(eventIds: string[]): Promise<void>;
  deletePublished(olderThan?: Date): Promise<void>;
}

Transactional outbox for guaranteed at-least-once event delivery. save() is called inside the UoW transaction. See Outbox Pattern for details.

AggregateLocker (optional)

packages/core/src/persistence/aggregate-locker.ts
interface AggregateLocker {
  acquire(
    aggregateName: string,
    aggregateId: string,
    timeoutMs?: number,
  ): Promise<void>;
  release(aggregateName: string, aggregateId: string): Promise<void>;
}

Pessimistic locking for serializing command execution against the same aggregate. acquire() blocks until the lock is available or throws LockTimeoutError. release() must be idempotent.

Implementation Guide: MongoDB Native Driver

This walkthrough builds a custom adapter using the MongoDB native driver (mongodb). The same principles apply to any database -- PostgreSQL with pg, DynamoDB, or anything else.

MongoDB multi-document transactions (available since MongoDB 4.0) require a replica set. For local development, use mongod --replSet rs0 or a Docker image pre-configured as a single-node replica set.

Create the UnitOfWork

The UoW is the only required piece. It wraps all enlisted operations in a database transaction so that aggregate persistence, outbox writes, and any other enlisted operations commit or fail atomically.

In MongoDB, this maps to a ClientSession with an active transaction:

adapters/mongo-persistence.ts
import { MongoClient, type ClientSession } from "mongodb";
import type { UnitOfWork, UnitOfWorkFactory, Event } from "@noddde/core";

/**
 * Shared mutable ref so persistence classes see the active transaction session.
 * This is the bridge between the UoW and persistence classes.
 */
interface SessionStore {
  current: ClientSession | null;
}

class MongoUnitOfWork implements UnitOfWork {
  private operations: (() => Promise<void>)[] = [];
  private events: Event[] = [];
  private sealed = false;

  constructor(
    private readonly client: MongoClient,
    private readonly sessionStore: SessionStore,
  ) {}

  enlist(op: () => Promise<void>) {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.operations.push(op);
  }

  deferPublish(...events: Event[]) {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.events.push(...events);
  }

  async commit(): Promise<Event[]> {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.sealed = true;

    const session = this.client.startSession();
    try {
      session.startTransaction();
      this.sessionStore.current = session;

      for (const op of this.operations) {
        await op();
      }

      await session.commitTransaction();
      return this.events;
    } catch (err) {
      await session.abortTransaction();
      throw err;
    } finally {
      this.sessionStore.current = null;
      await session.endSession();
    }
  }

  async rollback(): Promise<void> {
    if (this.sealed) throw new Error("UnitOfWork already completed");
    this.sealed = true;
    this.operations = [];
    this.events = [];
  }
}

The key pattern: a session store holds a mutable reference to the active MongoDB session. All persistence classes read from it so their operations participate in the same transaction.

Implement Persistence Classes

Each persistence class reads sessionStore.current to pass the active session to MongoDB operations. When no UoW is active (e.g., read-only load() calls), it falls back to a sessionless call.

adapters/mongo-persistence.ts
import type { Db, Collection } from "mongodb";
import { ConcurrencyError } from "@noddde/core";
import type { EventSourcedAggregatePersistence, Event } from "@noddde/core";

class MongoEventSourcedPersistence implements EventSourcedAggregatePersistence {
  private readonly collection: Collection;

  constructor(
    db: Db,
    private readonly sessionStore: SessionStore,
  ) {
    this.collection = db.collection("noddde_events");
  }

  /** Returns session options if inside a UoW, or empty object otherwise. */
  private get sessionOpts() {
    return this.sessionStore.current
      ? { session: this.sessionStore.current }
      : {};
  }

  async save(
    aggregateName: string,
    aggregateId: string,
    events: Event[],
    expectedVersion: number,
  ) {
    if (events.length === 0) return;

    const docs = events.map((event, i) => ({
      aggregateName,
      aggregateId,
      sequenceNumber: expectedVersion + i + 1,
      eventName: event.name,
      payload: event.payload,
      metadata: event.metadata ?? null,
      createdAt: event.metadata?.timestamp
        ? new Date(event.metadata.timestamp)
        : new Date(),
    }));

    try {
      await this.collection.insertMany(docs, this.sessionOpts);
    } catch (err: any) {
      if (err.code === 11000) {
        // duplicate key
        throw new ConcurrencyError(
          aggregateName,
          aggregateId,
          expectedVersion,
          -1,
        );
      }
      throw err;
    }
  }

  async load(aggregateName: string, aggregateId: string): Promise<Event[]> {
    const docs = await this.collection
      .find({ aggregateName, aggregateId }, this.sessionOpts)
      .sort({ sequenceNumber: 1 })
      .toArray();

    return docs.map((doc) => ({
      name: doc.eventName,
      payload: doc.payload,
      ...(doc.metadata ? { metadata: doc.metadata } : {}),
    }));
  }
}

MongoDB stores documents natively as BSON -- no need to JSON.stringify() payloads. This is a key difference from SQL adapters where payloads are stored as JSON text.

For optimistic concurrency in StateStoredAggregatePersistence, use a findOneAndUpdate with a version filter:

adapters/mongo-persistence.ts
import type { StateStoredAggregatePersistence } from "@noddde/core";

class MongoStateStoredPersistence implements StateStoredAggregatePersistence {
  private readonly collection: Collection;

  constructor(
    db: Db,
    private readonly sessionStore: SessionStore,
  ) {
    this.collection = db.collection("noddde_aggregate_states");
  }

  private get sessionOpts() {
    return this.sessionStore.current
      ? { session: this.sessionStore.current }
      : {};
  }

  async save(
    aggregateName: string,
    aggregateId: string,
    state: any,
    expectedVersion: number,
  ) {
    if (expectedVersion === 0) {
      // New aggregate: insert with version 1
      try {
        await this.collection.insertOne(
          { aggregateName, aggregateId, state, version: 1 },
          this.sessionOpts,
        );
      } catch (err: any) {
        if (err.code === 11000) {
          throw new ConcurrencyError(aggregateName, aggregateId, 0, -1);
        }
        throw err;
      }
    } else {
      // Existing aggregate: update only if version matches
      const result = await this.collection.findOneAndUpdate(
        { aggregateName, aggregateId, version: expectedVersion },
        { $set: { state, version: expectedVersion + 1 } },
        this.sessionOpts,
      );
      if (!result) {
        throw new ConcurrencyError(
          aggregateName,
          aggregateId,
          expectedVersion,
          -1,
        );
      }
    }
  }

  async load(aggregateName: string, aggregateId: string) {
    const doc = await this.collection.findOne(
      { aggregateName, aggregateId },
      this.sessionOpts,
    );
    return doc ? { state: doc.state, version: doc.version } : null;
  }
}

Assemble the Adapter

All persistence classes share a single sessionStore so they participate in the same transaction:

adapters/mongo-persistence.ts
import type { PersistenceAdapter } from "@noddde/core";
import { MongoClient, type Db } from "mongodb";

class MongoAdapter implements PersistenceAdapter {
  readonly unitOfWorkFactory;
  readonly eventSourcedPersistence;
  readonly stateStoredPersistence;
  readonly sagaPersistence;
  // Add optional stores as needed:
  // readonly snapshotStore;
  // readonly outboxStore;

  private readonly sessionStore: SessionStore = { current: null };

  constructor(
    private readonly client: MongoClient,
    db: Db,
  ) {
    this.unitOfWorkFactory = () =>
      new MongoUnitOfWork(client, this.sessionStore);
    this.eventSourcedPersistence = new MongoEventSourcedPersistence(
      db,
      this.sessionStore,
    );
    this.stateStoredPersistence = new MongoStateStoredPersistence(
      db,
      this.sessionStore,
    );
    this.sagaPersistence = new MongoSagaPersistence(db, this.sessionStore);
  }

  /** Create required indexes on first use. */
  async init() {
    const db = this.client.db();
    await db
      .collection("noddde_events")
      .createIndex(
        { aggregateName: 1, aggregateId: 1, sequenceNumber: 1 },
        { unique: true },
      );
    await db
      .collection("noddde_aggregate_states")
      .createIndex({ aggregateName: 1, aggregateId: 1 }, { unique: true });
  }

  async close() {
    await this.client.close();
  }
}

Wire It Up

main.ts
import { defineDomain } from "@noddde/core";
import { wireDomain } from "@noddde/engine";
import { MongoClient } from "mongodb";

const client = new MongoClient(process.env.MONGODB_URI!);
await client.connect();
const db = client.db("my-app");

const adapter = new MongoAdapter(client, db);

const domain = await wireDomain(
  defineDomain({
    writeModel: { aggregates: { Order } },
    readModel: { projections: {} },
  }),
  { persistenceAdapter: adapter },
);

Key Implementation Rules

  1. Session/transaction store pattern -- All persistence classes must share a single mutable reference to the active transaction context (a ClientSession for MongoDB, a PoolClient for PostgreSQL, etc.). The UoW sets it before executing operations and clears it in a finally block.

  2. Optimistic concurrency -- EventSourcedAggregatePersistence.save() and StateStoredAggregatePersistence.save() must throw ConcurrencyError (from @noddde/core) when the expected version doesn't match. Use unique indexes (MongoDB E11000 / PostgreSQL 23505) or version-conditional updates.

  3. UoW is single-use -- After commit() or rollback(), all further calls must throw "UnitOfWork already completed".

  4. Event ordering -- load() must return events sorted by sequence number ascending.

  5. Idempotent release -- AggregateLocker.release() must not throw if the lock is already released.

  6. Only implement what you need -- If your domain has no sagas, skip sagaPersistence. If you don't need the outbox pattern, skip outboxStore. The engine validates at runtime and gives clear error messages if something is missing.

On this page