Event Bus Adapters
Production-ready event buses for distributed deployments using Kafka, NATS, RabbitMQ, or the built-in in-memory EventEmitter.
noddde ships with an in-memory EventEmitterEventBus for single-process development and three message-broker adapters for distributed production deployments. All implementations conform to the EventBus interface from @noddde/core, which extends Closeable for automatic lifecycle management. Broker adapters also implement Connectable — the Domain auto-calls connect() during wiring, so you never need to manage the connection lifecycle manually.
Available Adapters
| Package | Broker | Client Library | Delivery Guarantee |
|---|---|---|---|
@noddde/engine | — | Node.js EventEmitter | In-process only |
@noddde/kafka | Kafka | kafkajs | At-least-once |
@noddde/nats | NATS | nats | At-least-once (JetStream) |
@noddde/rabbitmq | RabbitMQ | amqplib | At-least-once |
The EventBus Interface
Every adapter implements this interface:
import type { Closeable } from "@noddde/core";
type AsyncEventHandler = (event: Event) => void | Promise<void>;
interface EventBus extends Closeable {
dispatch<TEvent extends Event>(event: TEvent): Promise<void>;
on(eventName: string, handler: AsyncEventHandler): void;
}dispatch(event)— publishes a domain event to all subscribers.on(eventName, handler)— registers a handler for a given event name. Multiple handlers per event are supported (fan-out).close()— inherited fromCloseable. Unsubscribes all handlers and releases connections. Called automatically by the Domain on shutdown.
In-Memory (EventEmitterEventBus)
The default event bus, included in @noddde/engine. No external dependencies required.
import { EventEmitterEventBus } from "@noddde/engine";
const eventBus = new EventEmitterEventBus();This is the right choice for development, testing, and single-process deployments. Events are dispatched synchronously within the same process — there is no network I/O, no serialization, and no delivery guarantee beyond the process boundary.
Kafka
Installation
yarn add @noddde/kafka kafkajsConfiguration
import { KafkaEventBus } from "@noddde/kafka";
const eventBus = new KafkaEventBus({
brokers: ["localhost:9092"],
clientId: "my-service",
groupId: "my-service-group",
topicPrefix: "noddde.", // optional — topics become "noddde.AccountCreated"
sessionTimeout: 60000, // optional — increase for slow handlers
heartbeatInterval: 5000, // optional — must be < sessionTimeout / 3
resilience: { maxAttempts: 11, initialDelayMs: 500, maxRetries: 5 }, // optional — connection + delivery resilience
});| Option | Type | Required | Default | Description |
|---|---|---|---|---|
brokers | string[] | Yes | — | Kafka broker addresses |
clientId | string | Yes | — | Client identifier |
groupId | string | Yes | — | Consumer group ID. Events fan out across different group IDs |
topicPrefix | string | No | "" | Prefix prepended to event names to form topic names |
sessionTimeout | number | No | 30000 | Consumer session timeout in ms. Increase if handlers are slow |
heartbeatInterval | number | No | 3000 | Consumer heartbeat interval in ms. Must be less than sessionTimeout/3 |
resilience | BrokerResilience | No | maxAttempts=6, initialDelayMs=300, maxDelayMs=30000 | Connection resilience config. maxAttempts maps to kafkajs retries (minus 1), initialDelayMs to initialRetryTime, maxDelayMs to maxRetryTime. maxRetries limits per-message delivery attempts |
How It Works
- Publishing: Events are serialized as JSON and sent to a Kafka topic derived from the event name (
${topicPrefix}${event.name}). If the event has ametadata.correlationId, it is used as the message key for partition-level ordering. - Subscribing: Handlers registered via
on()receive messages through a Kafka consumer. Handlers registered beforeconnect()are buffered and subscriptions are created when the connection is established. Multiple handlers for the same event are invoked in parallel viaPromise.all()— independent handlers (projections, sagas) do not block each other. - Delivery: At-least-once. The consumer runs with
autoCommit: false— offsets are committed explicitly viaconsumer.commitOffsets()only after all handlers complete successfully. If any handler fails, the offset is not committed and the message will be redelivered. After a successful commit, the in-memory delivery count entry for the message is pruned to prevent unbounded memory growth. Consumers must be idempotent. - Poison message protection: Malformed messages that fail JSON deserialization are logged and skipped (offset committed), preventing corrupt data from blocking the partition. If
resilience.maxRetriesis configured, messages that exceed the delivery attempt limit are also skipped with a warning. - Connection resilience: The optional
resilienceconfig (BrokerResiliencefrom@noddde/core) is mapped to kafkajs retry options for automatic reconnection on broker unavailability.maxAttemptsmaps to kafkajsretries(minus 1, since kafkajs counts retries after the first attempt),initialDelayMstoinitialRetryTime, andmaxDelayMstomaxRetryTime. ThesessionTimeoutandheartbeatIntervaloptions tune consumer rebalance behavior for slow handlers. Concurrentconnect()calls are deduplicated via a connection promise mutex to prevent parallel connection attempts.
Wiring with Domain
import { wireDomain } from "@noddde/engine";
import { KafkaEventBus } from "@noddde/kafka";
const domain = await wireDomain(myDomain, {
buses: () => ({
eventBus: new KafkaEventBus({
brokers: ["localhost:9092"],
clientId: "my-service",
groupId: "my-service-group",
}),
}),
});The Domain auto-calls connect() on the event bus during wiring (via Connectable auto-discovery) and close() on shutdown (via Closeable auto-discovery). No manual lifecycle management needed.
NATS
Installation
yarn add @noddde/nats natsConfiguration
import { NatsEventBus } from "@noddde/nats";
const eventBus = new NatsEventBus({
servers: "localhost:4222",
streamName: "noddde-events", // optional — enables JetStream durable subscriptions
subjectPrefix: "noddde.", // optional — subjects become "noddde.AccountCreated"
prefetchCount: 256, // optional — backpressure control
resilience: { maxAttempts: -1, initialDelayMs: 2000, maxRetries: 10 }, // optional — connection + delivery resilience
});| Option | Type | Required | Default | Description |
|---|---|---|---|---|
servers | string | string[] | Yes | — | NATS server URL(s) |
streamName | string | No | — | JetStream stream name for durable subscriptions |
subjectPrefix | string | No | "" | Prefix prepended to event names to form subjects |
prefetchCount | number | No | 256 | Maximum unacknowledged messages per consumer. Maps to JetStream maxAckPending. Provides backpressure control |
resilience | BrokerResilience | No | — | Connection resilience. maxAttempts maps to maxReconnectAttempts (-1 = infinite). initialDelayMs maps to reconnectTimeWait (default: 2000ms). maxDelayMs is ignored (NATS uses fixed intervals). maxRetries maps to JetStream maxDeliver |
How It Works
- Publishing: Events are serialized as JSON, encoded as
Uint8Array, and published to a NATS subject (${subjectPrefix}${event.name}). When astreamNameis configured, JetStream publish acknowledgment is awaited. - Subscribing: JetStream consumers with durable subscriptions are created for each registered event name. Handlers registered before
connect()are buffered. Multiple handlers for the same event are invoked in parallel viaPromise.all()— independent handlers do not block each other. - Delivery: At-least-once with JetStream. Messages are acknowledged only after all handlers complete successfully. If any handler fails, the message is explicitly nacked for immediate redelivery. Consumers must be idempotent.
- Poison message protection: Malformed messages that fail JSON deserialization are permanently discarded via
msg.term(), preventing corrupt data from blocking the subscription via infinite redelivery. Ifresilience.maxRetriesis configured, it maps to the JetStreammaxDeliverconsumer option, limiting how many times NATS will redeliver a message before discarding it server-side. - Backpressure: The
prefetchCountoption controls how many unacknowledged messages the server delivers to each consumer (mapped to JetStreammaxAckPending), providing natural backpressure when handlers are slow. - Connection resilience: NATS native reconnection is enabled by default. The optional
resilienceconfig mapsmaxAttemptsto NATSmaxReconnectAttemptsandinitialDelayMstoreconnectTimeWaitfor automatic recovery on connection loss.maxDelayMsis ignored because NATS uses fixed-interval reconnection (no exponential backoff). - Shutdown:
close()callsnc.drain()to process in-flight messages before disconnecting.
Wiring with Domain
import { wireDomain } from "@noddde/engine";
import { NatsEventBus } from "@noddde/nats";
const domain = await wireDomain(myDomain, {
buses: () => ({
eventBus: new NatsEventBus({
servers: "localhost:4222",
streamName: "noddde-events",
}),
}),
});RabbitMQ
Installation
yarn add @noddde/rabbitmq amqplib
yarn add -D @types/amqplibConfiguration
import { RabbitMqEventBus } from "@noddde/rabbitmq";
const eventBus = new RabbitMqEventBus({
url: "amqp://localhost:5672",
exchangeName: "noddde.events", // optional (this is the default)
exchangeType: "topic", // optional — "topic" (default) or "fanout"
queuePrefix: "noddde", // optional — queues become "noddde.AccountCreated"
prefetchCount: 10, // optional — backpressure control
resilience: {
// optional — connection resilience with exponential backoff
maxAttempts: 3,
initialDelayMs: 1000,
maxDelayMs: 30000,
},
});| Option | Type | Required | Default | Description |
|---|---|---|---|---|
url | string | Yes | --- | AMQP connection URL |
exchangeName | string | No | "noddde.events" | Exchange name for event publishing |
exchangeType | "topic" | "fanout" | No | "topic" | Exchange type |
queuePrefix | string | No | "noddde" | Queue name prefix |
prefetchCount | number | No | 10 | Maximum unacknowledged messages per consumer. Provides backpressure control |
resilience.maxAttempts | number | No | 3 | Maximum number of connection attempts |
resilience.initialDelayMs | number | No | 1000 | Initial delay between retries in milliseconds. Doubles on each retry |
resilience.maxDelayMs | number | No | 30000 | Maximum delay between retries in milliseconds |
How It Works
- Publishing: Events are serialized as JSON and published to the configured exchange with the event name as the routing key. Messages are marked
{ persistent: true }for durability. The bus uses a confirm channel (createConfirmChannel) and awaitswaitForConfirms()after each publish, guaranteeing the broker has accepted the message beforedispatch()resolves. - Subscribing: Each event name gets a dedicated durable queue (
${queuePrefix}.${eventName}) bound to the exchange. Multiple handlers for the same event are invoked in parallel viaPromise.all()-- independent handlers do not block each other. Consumers must be idempotent since handlers that completed before a failure will re-execute on redelivery. - Delivery: At-least-once with manual acknowledgment. On handler success, the message is acked. On handler failure, the message is nacked with requeue for redelivery.
- Poison message protection: Malformed messages that fail JSON deserialization are acknowledged and skipped, preventing infinite nack/requeue loops. If
resilience.maxRetriesis configured, messages that exceed the delivery count limit (tracked via an in-memory counter per message) are also acknowledged and discarded. The in-memory approach is used instead ofx-deathheaders becausex-deathrequires a dead-letter exchange to be configured. - Backpressure: The
prefetchCountoption limits how many unacknowledged messages the broker sends to this consumer, providing natural backpressure when handlers are slow. - Connection resilience: The
resilienceoption enables retry with exponential backoff on initial connection failure. Delay doubles on each attempt, capped atmaxDelayMs. If all attempts fail, the last error is thrown. After a successful connection,errorandclosehandlers are registered on the AMQP connection to detect unexpected disconnections. On unexpected close, the bus automatically attempts reconnection using the same backoff configuration, then re-establishes all consumers.
Wiring with Domain
import { wireDomain } from "@noddde/engine";
import { RabbitMqEventBus } from "@noddde/rabbitmq";
const domain = await wireDomain(myDomain, {
buses: () => ({
eventBus: new RabbitMqEventBus({
url: "amqp://localhost:5672",
}),
}),
});Connection Lifecycle
All three broker adapters implement Connectable from @noddde/core. The Domain manages the full lifecycle automatically:
- Construct — you create the bus with configuration in the
buses()factory. - Register handlers — the Domain registers event handlers via
on()for projections, sagas, and standalone event handlers. All adapter implementations buffer handlers registered beforeconnect(). - Auto-connect — after all handler registration is complete, the Domain detects
Connectablebuses and callsconnect()automatically. Connecting after handler registration prevents a race condition where broker-backed buses deliver queued messages before handlers are ready. If the broker is unreachable, wiring fails fast with a clear error. - Dispatch events —
dispatch()is called by the engine during the command lifecycle. - Auto-close — the Domain calls
close()on shutdown (viaCloseableauto-discovery). Unsubscribes handlers, disconnects, and releases resources. Idempotent.
You never need to call connect() or close() manually.
Resilience
All three broker adapters accept an optional resilience field of type BrokerResilience (from @noddde/core). This provides a consistent configuration shape across adapters for both connection-level and message-level resilience.
Connection Resilience
| Field | Kafka | NATS | RabbitMQ |
|---|---|---|---|
maxAttempts | retries (minus 1) | maxReconnectAttempts (-1 = infinite) | Connection retry attempts |
initialDelayMs | initialRetryTime | reconnectTimeWait (fixed interval) | Base delay (exponential backoff) |
maxDelayMs | maxRetryTime | Ignored (fixed intervals) | Backoff cap |
Message Delivery Resilience
The maxRetries field limits per-message delivery attempts, preventing poison messages from blocking consumers indefinitely:
| Adapter | Mechanism | What Happens After Limit |
|---|---|---|
| Kafka | Consumer-side delivery count tracking | Message offset committed (skipped) |
| NATS | JetStream maxDeliver consumer option | NATS discards the message server-side |
| RabbitMQ | In-memory delivery count tracking | Message acked (discarded) |
All adapters also protect against deserialization failures (malformed JSON). Poison messages that cannot be parsed are logged and discarded without blocking the consumer.
Choosing an Adapter
| Scenario | Recommended Adapter |
|---|---|
| Development and testing | EventEmitterEventBus (in-memory) |
| Single-process production | EventEmitterEventBus (in-memory) |
| High-throughput event streaming | KafkaEventBus |
| Lightweight distributed messaging | NatsEventBus |
| Reliable message brokering with flexible routing | RabbitMqEventBus |
All broker adapters provide at-least-once delivery. Choose based on your existing infrastructure and operational expertise rather than feature differences.
Custom Event Bus
You can implement the EventBus interface directly for any message transport. If your bus needs an async connection step, also implement Connectable — the Domain will auto-call connect() during wiring:
import type {
EventBus,
AsyncEventHandler,
Connectable,
Event,
} from "@noddde/core";
export class RedisEventBus implements EventBus, Connectable {
private readonly handlers = new Map<string, AsyncEventHandler[]>();
async connect(): Promise<void> {
// Connect to Redis
}
on(eventName: string, handler: AsyncEventHandler): void {
const existing = this.handlers.get(eventName) ?? [];
this.handlers.set(eventName, [...existing, handler]);
}
async dispatch<TEvent extends Event>(event: TEvent): Promise<void> {
// Publish to Redis Streams, Pub/Sub, etc.
}
async close(): Promise<void> {
this.handlers.clear();
// Disconnect from Redis
}
}The requirements are:
dispatch()publishes to all subscribers for the event name.on()supports multiple handlers per event name (fan-out).close()releases all resources and is idempotent.connect()(if implementingConnectable) establishes the connection and is idempotent.