noddde

Persistence

Choosing between event-sourced and state-stored persistence strategies, configuring built-in implementations, and implementing custom persistence.

noddde supports two persistence strategies for aggregates: event sourcing (store events, replay to rebuild state) and state storage (store final state directly). Both use the same aggregate definition -- the persistence strategy is a deployment concern, not a modeling concern.

For the conceptual foundation of event sourcing and CQRS, see CQRS and Event Sourcing.

Two Persistence Strategies

Event-Sourced Persistence

With event sourcing, events are the source of truth. State is ephemeral -- derived on every load by replaying the complete event history through the aggregate's apply handlers.

interface EventSourcedAggregatePersistence {
  save(
    aggregateName: string,
    aggregateId: ID,
    events: Event[],
    expectedVersion: number,
  ): Promise<void>;
  load(aggregateName: string, aggregateId: ID): Promise<Event[]>;
}
  • save appends new events to the aggregate's event stream. The expectedVersion must match the current stream length -- if another command was saved concurrently, a ConcurrencyError is thrown.
  • load returns all events for an aggregate instance, in order. The version is implicitly events.length.

Each aggregate instance has its own event stream, identified by the combination of aggregateName and aggregateId. Streams are independent -- loading events for one instance does not include events from another.

State-Stored Persistence

With state storage, the current state is saved directly. No event replay is needed -- the loaded state is used as-is.

interface StateStoredAggregatePersistence {
  save(
    aggregateName: string,
    aggregateId: ID,
    state: any,
    expectedVersion: number,
  ): Promise<void>;
  load(
    aggregateName: string,
    aggregateId: ID,
  ): Promise<{ state: any; version: number } | null>;
}
  • save overwrites the current state snapshot. The expectedVersion must match the stored version -- if it does not, a ConcurrencyError is thrown.
  • load returns the last saved state and version as { state, version }, or null for new aggregates (version 0).

State Snapshotting (Event-Sourced Optimization)

For long-lived aggregates with large event streams, replaying every event on each command dispatch becomes expensive. State snapshotting periodically caches the aggregate state at a known event stream version. On subsequent loads, the engine starts from the snapshot and replays only the events that occurred after it.

Snapshots are not the source of truth — the event stream is. Deleting all snapshots is safe; the engine falls back to full replay.

interface Snapshot {
  state: any;
  version: number; // event stream version at snapshot time
}

interface SnapshotStore {
  load(aggregateName: string, aggregateId: string): Promise<Snapshot | null>;
  save(
    aggregateName: string,
    aggregateId: string,
    snapshot: Snapshot,
  ): Promise<void>;
}
  • load returns the latest snapshot, or null if none exists (triggers full replay).
  • save overwrites any previously stored snapshot for the same aggregate instance. Only the latest snapshot is kept.

Snapshot Strategy

A SnapshotStrategy is a pure function that decides whether to take a snapshot after each command. The built-in everyNEvents factory covers the common case:

import { everyNEvents } from "@noddde/core";

// Snapshot every 100 events
const strategy = everyNEvents(100);

You can also write custom strategies:

import type { SnapshotStrategy } from "@noddde/core";

const customStrategy: SnapshotStrategy = ({ version, eventsSinceSnapshot }) => {
  // Snapshot every 50 events, but always after the first event
  return version === 1 || eventsSinceSnapshot >= 50;
};

Configuring Snapshots

Add snapshotStore and snapshotStrategy to the infrastructure section of your domain configuration:

import {
  configureDomain,
  InMemoryEventSourcedAggregatePersistence,
  InMemorySnapshotStore,
} from "@noddde/engine";
import { everyNEvents } from "@noddde/core";

const domain = await configureDomain<MyInfrastructure>({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: {} },
  infrastructure: {
    aggregatePersistence: () => new InMemoryEventSourcedAggregatePersistence(),
    snapshotStore: () => new InMemorySnapshotStore(),
    snapshotStrategy: everyNEvents(100),
  },
});

Both snapshotStore and snapshotStrategy are optional. If either is omitted, no automatic snapshotting occurs.

Partial Event Loading

When a snapshot exists, the engine only needs events that occurred after it. If the persistence implementation also implements PartialEventLoad, the engine uses loadAfterVersion() to avoid loading the full event stream:

interface PartialEventLoad {
  loadAfterVersion(
    aggregateName: string,
    aggregateId: string,
    afterVersion: number,
  ): Promise<Event[]>;
}

The built-in InMemoryEventSourcedAggregatePersistence implements PartialEventLoad. If the persistence does not implement it, the engine falls back to load() + Array.slice() — correct, but less efficient for large streams.

How Snapshot Saving Works

Snapshots are saved after the unit of work commits, as a best-effort operation. A failed snapshot save does not roll back a successfully committed command — the engine silently catches the error. The next command that triggers the strategy will attempt to save again.

Saga Persistence

Sagas are always state-stored. The SagaPersistence interface mirrors the state-stored aggregate interface:

interface SagaPersistence {
  save(sagaName: string, sagaId: ID, state: any): Promise<void>;
  load(sagaName: string, sagaId: ID): Promise<any>;
}

Saga persistence has no expectedVersion parameter and no concurrency control. Sagas coordinate workflows, not domain truth — each saga instance is triggered by events arriving sequentially on the event bus. If you need concurrency guarantees on saga state, implement versioning in a custom SagaPersistence adapter.

Required when using sagas (processModel) in your domain configuration.

Configuring Persistence

Provide a persistence implementation via the aggregatePersistence factory in the infrastructure section:

import {
  configureDomain,
  InMemoryEventSourcedAggregatePersistence,
  InMemorySagaPersistence,
} from "@noddde/engine";

const domain = await configureDomain<BankingInfrastructure>({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: { BankAccount: BankAccountProjection } },
  infrastructure: {
    aggregatePersistence: () => new InMemoryEventSourcedAggregatePersistence(),
    sagaPersistence: () => new InMemorySagaPersistence(),
    // ...other providers
  },
});

The aggregatePersistence factory returns either an EventSourcedAggregatePersistence or a StateStoredAggregatePersistence. The framework uses it to load and save state for all aggregates in the domain.

Concurrency Control

noddde offers three concurrency modes for aggregate persistence, configured via aggregateConcurrency. All modes work with every database dialect — only pessimistic locking has dialect restrictions.

No Concurrency Control (default)

When aggregateConcurrency is omitted, no retry logic or locking is applied. The version check on save() still catches conflicts at the database level — if two concurrent commands target the same aggregate, the second to persist will throw ConcurrencyError. The error propagates directly to the caller.

This is appropriate for low-contention scenarios and works with every database dialect supported by your ORM adapter.

const domain = await configureDomain<MyInfrastructure>({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: {} },
  infrastructure: {
    aggregatePersistence: () => new InMemoryEventSourcedAggregatePersistence(),
    // no aggregateConcurrency — ConcurrencyError propagates to caller
  },
});

Optimistic Concurrency (with retries)

Opt into automatic retries by setting maxRetries. On ConcurrencyError, the domain re-executes the full load→execute→save cycle against the latest state, up to maxRetries times. If all retries are exhausted, ConcurrencyError propagates.

const domain = await configureDomain<MyInfrastructure>({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: {} },
  infrastructure: {
    aggregatePersistence: () => new InMemoryEventSourcedAggregatePersistence(),
    aggregateConcurrency: {
      maxRetries: 3, // retry up to 3 times on ConcurrencyError
    },
  },
});

Command handlers may be called multiple times during retry and should be side-effect-free (the Decider pattern already ensures this: handlers only produce events).

Pessimistic Concurrency

For high-contention aggregates where retries waste work, pessimistic locking serializes access by acquiring an exclusive lock before loading the aggregate.

import { InMemoryAggregateLocker } from "@noddde/engine";

const domain = await configureDomain<MyInfrastructure>({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: {} },
  infrastructure: {
    aggregatePersistence: () => new InMemoryEventSourcedAggregatePersistence(),
    aggregateConcurrency: {
      strategy: "pessimistic",
      locker: new InMemoryAggregateLocker(),
      lockTimeoutMs: 5000, // optional: throw LockTimeoutError after 5s
    },
  },
});

The lock is acquired before loading, held during command execution, and released after persistence commits (or on error). The version check on save() remains as a safety net. No retry loop is needed since the lock prevents conflicts.

For production, use database-backed advisory lockers instead of the in-memory implementation:

PackageLockerPostgreSQLMySQL / MariaDBMSSQL
@noddde/drizzleDrizzleAdvisoryLockerpg_advisory_lockGET_LOCK
@noddde/prismaPrismaAdvisoryLockerpg_advisory_lockGET_LOCK
@noddde/typeormTypeORMAdvisoryLockerpg_advisory_lockGET_LOCKsp_getapplock

SQLite does not support advisory locks -- use InMemoryAggregateLocker for single-process SQLite deployments. See the Dialect Support Matrix for full details.

When to Choose Each Strategy

FactorNone (default)Optimistic (retries)Pessimistic
Low contentionBest (zero overhead)Unnecessary overheadUnnecessary overhead
High contentionErrors propagate to callerRetries waste workBest (serialized access)
Distributed systemsWorks everywhereWorks everywhereRequires shared lock backend
Dialect supportAll dialectsAll dialectsPG, MySQL, MSSQL (TypeORM)
ComplexityLowestLowHigher (lock management)
Latency under contentionFails fastVariable (retries)Predictable (queueing)

Inside an explicit withUnitOfWork(), optimistic ConcurrencyError is never retried -- it propagates immediately. Pessimistic locking still acquires/releases per command within the UoW.

Built-in In-Memory Implementations

The @noddde/engine package includes in-memory implementations that require no external dependencies. These are suitable for development, testing, and prototyping. Data is lost when the process exits.

InMemoryEventSourcedAggregatePersistence

import { InMemoryEventSourcedAggregatePersistence } from "@noddde/engine";

const persistence = new InMemoryEventSourcedAggregatePersistence();

Stores event streams in memory, keyed by (aggregateName, aggregateId). Also implements PartialEventLoad for optimized snapshot-based loading.

InMemorySnapshotStore

import { InMemorySnapshotStore } from "@noddde/engine";

const snapshotStore = new InMemorySnapshotStore();

Stores aggregate state snapshots in memory, keyed by (aggregateName, aggregateId). Each save overwrites the previous snapshot. Returns null for unknown aggregates.

InMemoryStateStoredAggregatePersistence

import { InMemoryStateStoredAggregatePersistence } from "@noddde/engine";

const persistence = new InMemoryStateStoredAggregatePersistence();

Stores aggregate state snapshots in memory, keyed by (aggregateName, aggregateId).

InMemoryIdempotencyStore

import { InMemoryIdempotencyStore } from "@noddde/engine";

const store = new InMemoryIdempotencyStore(); // no TTL
const storeWithTTL = new InMemoryIdempotencyStore(3600_000); // 1 hour TTL

Stores processed command records in memory, keyed by commandId. When ttlMs is provided, exists() performs lazy cleanup of expired records.

InMemorySagaPersistence

import { InMemorySagaPersistence } from "@noddde/engine";

const persistence = new InMemorySagaPersistence();

Stores saga instance state in memory, keyed by (sagaName, sagaId).

Same Aggregate, Different Strategy

The same aggregate definition works with either persistence strategy. The defineAggregate function does not know or care about persistence -- it defines initialState, commands, and apply regardless.

This means you can:

  • Start with state storage for simplicity, then migrate to event sourcing without changing your aggregate
  • Use event sourcing in production and state storage in tests
  • Switch strategies by changing only the aggregatePersistence factory

The strategy is a configuration choice, not a modeling choice.

Decision Matrix

FactorEvent SourcingState Storage
Audit trailFull historyCurrent state only
Storage growthGrows with eventsConstant per aggregate
Load performanceSlower (replay, mitigated by snapshots)Faster (direct read)
Projection rebuildPossibleNot possible
Temporal queriesSupportedNot supported
ComplexityHigherLower
DebuggingExcellentStandard

When to Choose Event Sourcing

  • You need a complete audit trail of every change
  • Temporal queries are important ("what was the balance at 3pm on Tuesday?")
  • Multiple projections consume the same events to build different read models
  • You want to replay events to rebuild projections or fix bugs
  • Your domain has complex business rules where understanding how the state changed matters

When to Choose State Storage

  • No audit trail is needed -- you only care about current state
  • The domain is simple enough that a direct state snapshot is sufficient
  • Performance is critical -- loading state is O(1) instead of O(n) where n is event count
  • Storage efficiency matters -- one record per aggregate instead of growing event streams

Implementing Custom Persistence

The persistence interfaces are deliberately minimal -- save and load are the only two operations. This makes it straightforward to implement for any storage backend:

import {
  Event,
  EventSourcedAggregatePersistence,
  ConcurrencyError,
  type ID,
} from "@noddde/core";

class PostgresEventStore implements EventSourcedAggregatePersistence {
  constructor(private readonly pool: Pool) {}

  async load(aggregateName: string, aggregateId: ID): Promise<Event[]> {
    const result = await this.pool.query(
      `SELECT name, payload FROM events
       WHERE aggregate_name = $1 AND aggregate_id = $2
       ORDER BY sequence_number ASC`,
      [aggregateName, aggregateId],
    );
    return result.rows.map((row) => ({
      name: row.name,
      payload: row.payload,
    }));
  }

  async save(
    aggregateName: string,
    aggregateId: ID,
    events: Event[],
    expectedVersion: number,
  ): Promise<void> {
    try {
      for (let i = 0; i < events.length; i++) {
        const event = events[i]!;
        await this.pool.query(
          `INSERT INTO events (aggregate_name, aggregate_id, sequence_number, name, payload)
           VALUES ($1, $2, $3, $4, $5)`,
          [
            aggregateName,
            aggregateId,
            expectedVersion + i + 1,
            event.name,
            event.payload,
          ],
        );
      }
    } catch (error: unknown) {
      // Check for unique constraint violation (database-specific)
      const message = error instanceof Error ? error.message : "";
      if (/unique|duplicate/i.test(message)) {
        throw new ConcurrencyError(
          aggregateName,
          aggregateId,
          expectedVersion,
          -1,
        );
      }
      throw error; // Re-throw unexpected errors (connection issues, etc.)
    }
  }
}

Key implementation considerations:

  • Ordering -- Events must be loaded in the order they were saved. Use a sequence number column.
  • Atomicity -- All events from a single command must be saved atomically (within one transaction).
  • Concurrency -- Use a unique constraint on (aggregate_name, aggregate_id, sequence_number) to detect concurrent writes. The expectedVersion tells you which sequence numbers to use for the new events.
  • Serialization -- Event payloads must round-trip correctly. JSON is the common choice, but be careful with types like Date that do not survive JSON.parse.

For production-ready persistence with transaction support, consider the ORM Adapters packages instead of writing custom implementations.

Moving to Production

noddde provides three ORM adapter packages that cover persistence and unit of work out of the box:

In-MemoryProduction Alternative
InMemoryEventSourcedAggregatePersistence@noddde/drizzle, @noddde/prisma, or @noddde/typeorm
InMemoryStateStoredAggregatePersistence@noddde/drizzle, @noddde/prisma, or @noddde/typeorm
InMemorySagaPersistence@noddde/drizzle, @noddde/prisma, or @noddde/typeorm
InMemorySnapshotStore@noddde/drizzle, @noddde/prisma, or @noddde/typeorm
InMemoryIdempotencyStoreCustom implementation (PostgreSQL, Redis, etc.)

Each adapter provides a single factory function that creates all persistence implementations wired to share a database transaction context. Your aggregate and projection code remains unchanged -- only the infrastructure factories in configureDomain need to be updated.

Unit of Work

The domain uses the Unit of Work pattern to ensure atomicity in the write model. Every command dispatch buffers persistence operations and defers event publishing until all writes succeed. You can also group multiple commands into an explicit unit of work.

How Unit of Work Works

When you dispatch a command, the domain automatically creates a unit of work that:

  1. Buffers persistence -- aggregate state/events are not written immediately
  2. Defers event publishing -- events are collected but not dispatched to the EventBus
  3. Commits atomically -- all persistence operations execute together
  4. Publishes after commit -- events reach projections and sagas only after persistence succeeds

If persistence fails, no events are published. This prevents inconsistencies between the store and downstream subscribers.

Implicit Unit of Work

Every dispatchCommand call is automatically wrapped in its own unit of work. No configuration needed:

// This is already wrapped in a unit of work
await domain.dispatchCommand({
  name: "CreateBankAccount",
  targetAggregateId: "acc-001",
  payload: { owner: "Alice" },
});

Explicit Unit of Work

Use domain.withUnitOfWork() to group multiple commands into a single atomic boundary:

await domain.withUnitOfWork(async () => {
  await domain.dispatchCommand({
    name: "DebitAccount",
    targetAggregateId: "acc-001",
    payload: { amount: 500 },
  });
  await domain.dispatchCommand({
    name: "CreditAccount",
    targetAggregateId: "acc-002",
    payload: { amount: 500 },
  });
  // Both commands persist atomically
  // Events are published together after commit
});

If any command within the unit of work throws, all buffered persistence operations are discarded and no events are published.

withUnitOfWork() returns the value from its callback:

const accountId = await domain.withUnitOfWork(async () => {
  const id = await domain.dispatchCommand(createAccountCommand);
  await domain.dispatchCommand(initialDepositCommand);
  return id;
});

Nesting is not supported -- calling withUnitOfWork() inside an active unit of work throws an error.

Saga Unit of Work

Saga reactions are automatically wrapped in a unit of work. When a saga handler dispatches commands, the saga state persistence and all dispatched command operations share a single unit of work:

Event arrives --> saga handler executes
  --> saga state persisted  |
  --> command 1 persisted    |-- all in one unit of work
  --> command 2 persisted    |
  --> events published after commit

This ensures that saga state transitions and the commands they trigger are always consistent.

The UnitOfWork Interface

interface UnitOfWork {
  enlist(operation: () => Promise<void>): void;
  deferPublish(...events: Event[]): void;
  commit(): Promise<Event[]>;
  rollback(): Promise<void>;
}
  • enlist -- buffers a persistence operation
  • deferPublish -- schedules events for post-commit publishing
  • commit -- executes all operations, returns events for the domain to publish
  • rollback -- discards all operations and events

A UnitOfWork is single-use: after commit() or rollback(), further calls throw.

Custom UnitOfWork Factory

By default, the domain uses InMemoryUnitOfWork. For production use with a database, provide a custom unitOfWorkFactory in the domain configuration:

const domain = await configureDomain<BankingInfrastructure>({
  writeModel: { aggregates: { BankAccount } },
  readModel: { projections: { BankAccount: BankAccountProjection } },
  infrastructure: {
    aggregatePersistence: () => new PostgresEventStore(),
    unitOfWorkFactory: () => () => new PostgresUnitOfWork(pool),
    // ...other providers
  },
});

The factory is a function that returns a UnitOfWorkFactory (a function that returns a UnitOfWork). The outer function is called once during domain initialization. The inner factory is called once per unit of work boundary.

For database-backed unit of work implementations using real transactions, see the ORM Adapters packages.

Next Steps

On this page