noddde

Flash Sale — Concurrency Strategies

Handling concurrent purchases with optimistic and pessimistic concurrency, snapshots, idempotency, and the outbox pattern.

A flash sale where limited stock sells first-come-first-served. Multiple buyers purchase the same item concurrently. This sample demonstrates both optimistic concurrency with retry and pessimistic concurrency with advisory locks — showing when to choose each strategy.

Full source: samples/sample-flash-sale — clone and run locally with npm start.

Stack: TypeORM + PostgreSQL | @noddde/nats (both optimistic and pessimistic entry points)

Why Optimistic

The decide handler is trivial: check stock, decrement, return event. If two buyers load the same version simultaneously, one succeeds and the other gets a ConcurrencyError. The framework automatically retries — re-loading the latest stock, re-running the check, and re-attempting the save. Total cost of a retry is microseconds.

Pessimistic locking would add unnecessary overhead here. Acquiring and releasing a database lock for every purchase adds latency that matters in a high-throughput flash sale.

The Aggregate

The aggregate uses extracted handlers — each decide handler and evolve handler lives in its own file, typed with InferDecideHandler and InferEvolveHandler.

Decide handlers (extracted)

// deciders/decide-create-flash-sale.ts
import type { InferDecideHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";

export const decideCreateFlashSale: InferDecideHandler<
  FlashSaleItemTypes,
  "CreateFlashSale"
> = (command) => ({
  name: "FlashSaleCreated",
  payload: {
    itemId: command.targetAggregateId,
    initialStock: command.payload.initialStock,
  },
});
// deciders/decide-purchase-item.ts
import type { InferDecideHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";

export const decidePurchaseItem: InferDecideHandler<
  FlashSaleItemTypes,
  "PurchaseItem"
> = (command, state) => {
  if (state.stock <= 0) {
    return {
      name: "PurchaseRejected",
      payload: { buyerId: command.payload.buyerId, reason: "out_of_stock" },
    };
  }
  return {
    name: "ItemPurchased",
    payload: {
      buyerId: command.payload.buyerId,
      quantity: command.payload.quantity,
    },
  };
};

The PurchaseItem handler either produces ItemPurchased (decrements stock) or PurchaseRejected (stock exhausted). Both are valid domain events — rejection is not an error, it is a recorded fact.

Evolve handlers (extracted)

// evolvers/evolve-flash-sale-created.ts
import type { InferEvolveHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";

export const evolveFlashSaleCreated: InferEvolveHandler<
  FlashSaleItemTypes,
  "FlashSaleCreated"
> = (payload) => ({
  stock: payload.initialStock,
  sold: 0,
  buyers: [],
});
// evolvers/evolve-item-purchased.ts
import type { InferEvolveHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";

export const evolveItemPurchased: InferEvolveHandler<
  FlashSaleItemTypes,
  "ItemPurchased"
> = (payload, state) => ({
  stock: state.stock - payload.quantity,
  sold: state.sold + payload.quantity,
  buyers: [...state.buyers, payload.buyerId],
});

The PurchaseRejected evolve handler follows the same pattern — it returns state unchanged (no-op).

Aggregate definition (wiring)

// flash-sale-item.ts
import { defineAggregate } from "@noddde/core";
import type { AggregateTypes, Infrastructure } from "@noddde/core";

export type FlashSaleItemTypes = AggregateTypes & {
  state: FlashSaleState;
  events: FlashSaleEvent;
  commands: FlashSaleCommand;
  infrastructure: Infrastructure;
};

export const FlashSaleItem = defineAggregate<FlashSaleItemTypes>({
  initialState: initialFlashSaleState,
  decide: {
    CreateFlashSale: decideCreateFlashSale,
    PurchaseItem: decidePurchaseItem,
  },
  evolve: {
    FlashSaleCreated: evolveFlashSaleCreated,
    ItemPurchased: evolveItemPurchased,
    PurchaseRejected: evolvePurchaseRejected,
  },
});

Domain Configuration

import {
  defineDomain,
  wireDomain,
  InMemoryCommandBus,
  InMemoryQueryBus,
} from "@noddde/engine";
import { NatsEventBus } from "@noddde/nats";

const flashSaleDomain = defineDomain({
  writeModel: { aggregates: { FlashSaleItem } },
  readModel: { projections: {} },
});

const domain = await wireDomain(flashSaleDomain, {
  aggregates: {
    persistence: () => drizzleInfra.eventSourcedPersistence,
    concurrency: { maxRetries: 5 },
  },
  buses: () => ({
    commandBus: new InMemoryCommandBus(),
    eventBus: new NatsEventBus({
      servers: `localhost:${natsContainer.getMappedPort(4222)}`,
      consumerGroup: "flash-sale",
      streamName: "flash-sale-events",
      subjectPrefix: "flash-sale.",
    }),
    queryBus: new InMemoryQueryBus(),
  }),
  unitOfWork: () => drizzleInfra.unitOfWorkFactory,
});

maxRetries: 5 means the framework retries up to 5 times on ConcurrencyError. For 8 concurrent buyers competing for 5 items, this is sufficient to handle the contention.

The event bus uses @noddde/nats (NatsEventBus) with JetStream enabled, started automatically via TestContainers alongside PostgreSQL. Set EVENT_BUS=in-memory to skip the NATS container and use EventEmitterEventBus instead. See Event Bus Adapters for the full adapter reference.

What Happens at Runtime

1. Create flash sale: 5 items in stock
2. 8 buyers fire PurchaseItem concurrently (Promise.all)
3. All 8 load version 0 (the initial state after CreateFlashSale)
4. Buyer 1 saves at version 1 → success
5. Buyers 2-8 attempt to save at version 1 → ConcurrencyError (version is now 1)
6. Framework retries: re-loads version 1, re-runs handler
7. Buyer 2 saves at version 2 → success
8. Process continues until stock = 0
9. Remaining buyers get PurchaseRejected (out_of_stock) — no ConcurrencyError

All 8 commands complete successfully. No errors are thrown to the caller. The framework handles contention internally.

Running the Sample

cd samples/sample-flash-sale && yarn start

Requires Docker running (Testcontainers spins up PostgreSQL and NATS automatically).

Key Patterns Demonstrated

  • Optimistic concurrency with maxRetries — contention resolved via retry, not locking
  • PostgreSQL unique constraint(aggregate_name, aggregate_id, sequence_number) catches concurrent inserts at the database level
  • Rejection eventsPurchaseRejected is a domain event, not an exception. The aggregate records what happened.
  • Distributed event bus@noddde/nats with JetStream durable subscriptions
  • Testcontainers — real PostgreSQL + NATS containers, no external setup
  • Drizzle adaptercreateDrizzlePersistence with PostgreSQL schema

On this page