Flash Sale — Concurrency Strategies
Handling concurrent purchases with optimistic and pessimistic concurrency, snapshots, idempotency, and the outbox pattern.
A flash sale where limited stock sells first-come-first-served. Multiple buyers purchase the same item concurrently. This sample demonstrates both optimistic concurrency with retry and pessimistic concurrency with advisory locks — showing when to choose each strategy.
Full source: samples/sample-flash-sale — clone and run locally with
npm start.
Stack: TypeORM + PostgreSQL | @noddde/nats (both optimistic and pessimistic entry points)
Why Optimistic
The decide handler is trivial: check stock, decrement, return event. If two buyers load the same version simultaneously, one succeeds and the other gets a ConcurrencyError. The framework automatically retries — re-loading the latest stock, re-running the check, and re-attempting the save. Total cost of a retry is microseconds.
Pessimistic locking would add unnecessary overhead here. Acquiring and releasing a database lock for every purchase adds latency that matters in a high-throughput flash sale.
The Aggregate
The aggregate uses extracted handlers — each decide handler and evolve handler lives in its own file, typed with InferDecideHandler and InferEvolveHandler.
Decide handlers (extracted)
// deciders/decide-create-flash-sale.ts
import type { InferDecideHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";
export const decideCreateFlashSale: InferDecideHandler<
FlashSaleItemTypes,
"CreateFlashSale"
> = (command) => ({
name: "FlashSaleCreated",
payload: {
itemId: command.targetAggregateId,
initialStock: command.payload.initialStock,
},
});// deciders/decide-purchase-item.ts
import type { InferDecideHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";
export const decidePurchaseItem: InferDecideHandler<
FlashSaleItemTypes,
"PurchaseItem"
> = (command, state) => {
if (state.stock <= 0) {
return {
name: "PurchaseRejected",
payload: { buyerId: command.payload.buyerId, reason: "out_of_stock" },
};
}
return {
name: "ItemPurchased",
payload: {
buyerId: command.payload.buyerId,
quantity: command.payload.quantity,
},
};
};The PurchaseItem handler either produces ItemPurchased (decrements stock) or PurchaseRejected (stock exhausted). Both are valid domain events — rejection is not an error, it is a recorded fact.
Evolve handlers (extracted)
// evolvers/evolve-flash-sale-created.ts
import type { InferEvolveHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";
export const evolveFlashSaleCreated: InferEvolveHandler<
FlashSaleItemTypes,
"FlashSaleCreated"
> = (payload) => ({
stock: payload.initialStock,
sold: 0,
buyers: [],
});// evolvers/evolve-item-purchased.ts
import type { InferEvolveHandler } from "@noddde/core";
import type { FlashSaleItemTypes } from "../flash-sale-item";
export const evolveItemPurchased: InferEvolveHandler<
FlashSaleItemTypes,
"ItemPurchased"
> = (payload, state) => ({
stock: state.stock - payload.quantity,
sold: state.sold + payload.quantity,
buyers: [...state.buyers, payload.buyerId],
});The PurchaseRejected evolve handler follows the same pattern — it returns state unchanged (no-op).
Aggregate definition (wiring)
// flash-sale-item.ts
import { defineAggregate } from "@noddde/core";
import type { AggregateTypes, Infrastructure } from "@noddde/core";
export type FlashSaleItemTypes = AggregateTypes & {
state: FlashSaleState;
events: FlashSaleEvent;
commands: FlashSaleCommand;
infrastructure: Infrastructure;
};
export const FlashSaleItem = defineAggregate<FlashSaleItemTypes>({
initialState: initialFlashSaleState,
decide: {
CreateFlashSale: decideCreateFlashSale,
PurchaseItem: decidePurchaseItem,
},
evolve: {
FlashSaleCreated: evolveFlashSaleCreated,
ItemPurchased: evolveItemPurchased,
PurchaseRejected: evolvePurchaseRejected,
},
});Domain Configuration
import {
defineDomain,
wireDomain,
InMemoryCommandBus,
InMemoryQueryBus,
} from "@noddde/engine";
import { NatsEventBus } from "@noddde/nats";
const flashSaleDomain = defineDomain({
writeModel: { aggregates: { FlashSaleItem } },
readModel: { projections: {} },
});
const domain = await wireDomain(flashSaleDomain, {
aggregates: {
persistence: () => drizzleInfra.eventSourcedPersistence,
concurrency: { maxRetries: 5 },
},
buses: () => ({
commandBus: new InMemoryCommandBus(),
eventBus: new NatsEventBus({
servers: `localhost:${natsContainer.getMappedPort(4222)}`,
consumerGroup: "flash-sale",
streamName: "flash-sale-events",
subjectPrefix: "flash-sale.",
}),
queryBus: new InMemoryQueryBus(),
}),
unitOfWork: () => drizzleInfra.unitOfWorkFactory,
});maxRetries: 5 means the framework retries up to 5 times on ConcurrencyError. For 8 concurrent buyers competing for 5 items, this is sufficient to handle the contention.
The event bus uses @noddde/nats (NatsEventBus) with JetStream enabled, started automatically via TestContainers alongside PostgreSQL. Set EVENT_BUS=in-memory to skip the NATS container and use EventEmitterEventBus instead. See Event Bus Adapters for the full adapter reference.
What Happens at Runtime
1. Create flash sale: 5 items in stock
2. 8 buyers fire PurchaseItem concurrently (Promise.all)
3. All 8 load version 0 (the initial state after CreateFlashSale)
4. Buyer 1 saves at version 1 → success
5. Buyers 2-8 attempt to save at version 1 → ConcurrencyError (version is now 1)
6. Framework retries: re-loads version 1, re-runs handler
7. Buyer 2 saves at version 2 → success
8. Process continues until stock = 0
9. Remaining buyers get PurchaseRejected (out_of_stock) — no ConcurrencyErrorAll 8 commands complete successfully. No errors are thrown to the caller. The framework handles contention internally.
Running the Sample
cd samples/sample-flash-sale && yarn startRequires Docker running (Testcontainers spins up PostgreSQL and NATS automatically).
Key Patterns Demonstrated
- Optimistic concurrency with
maxRetries— contention resolved via retry, not locking - PostgreSQL unique constraint —
(aggregate_name, aggregate_id, sequence_number)catches concurrent inserts at the database level - Rejection events —
PurchaseRejectedis a domain event, not an exception. The aggregate records what happened. - Distributed event bus —
@noddde/natswith JetStream durable subscriptions - Testcontainers — real PostgreSQL + NATS containers, no external setup
- Drizzle adapter —
createDrizzlePersistencewith PostgreSQL schema