noddde

Event Bus Adapters

Production-ready event buses for distributed deployments using Kafka, NATS, RabbitMQ, or the built-in in-memory EventEmitter.

noddde ships with an in-memory EventEmitterEventBus for single-process development and three message-broker adapters for distributed production deployments. All implementations conform to the EventBus interface from @noddde/core, which extends Closeable for automatic lifecycle management. Broker adapters also implement Connectable — the Domain auto-calls connect() during wiring, so you never need to manage the connection lifecycle manually.

Available Adapters

PackageBrokerClient LibraryDelivery Guarantee
@noddde/engineNode.js EventEmitterIn-process only
@noddde/kafkaKafkakafkajsAt-least-once
@noddde/natsNATSnatsAt-least-once (JetStream)
@noddde/rabbitmqRabbitMQamqplibAt-least-once

The EventBus Interface

Every adapter implements this interface:

import type { Closeable } from "@noddde/core";

type AsyncEventHandler = (event: Event) => void | Promise<void>;

interface EventBus extends Closeable {
  dispatch<TEvent extends Event>(event: TEvent): Promise<void>;
  on(eventName: string, handler: AsyncEventHandler): void;
}
  • dispatch(event) — publishes a domain event to all subscribers.
  • on(eventName, handler) — registers a handler for a given event name. Multiple handlers per event are supported (fan-out).
  • close() — inherited from Closeable. Unsubscribes all handlers and releases connections. Called automatically by the Domain on shutdown.

In-Memory (EventEmitterEventBus)

The default event bus, included in @noddde/engine. No external dependencies required.

import { EventEmitterEventBus } from "@noddde/engine";

const eventBus = new EventEmitterEventBus();

This is the right choice for development, testing, and single-process deployments. Events are dispatched synchronously within the same process — there is no network I/O, no serialization, and no delivery guarantee beyond the process boundary.

Kafka

Installation

yarn add @noddde/kafka kafkajs

Configuration

import { KafkaEventBus } from "@noddde/kafka";

const eventBus = new KafkaEventBus({
  brokers: ["localhost:9092"],
  clientId: "my-service",
  groupId: "my-service-group",
  topicPrefix: "noddde.", // optional — topics become "noddde.AccountCreated"
  sessionTimeout: 60000, // optional — increase for slow handlers
  heartbeatInterval: 5000, // optional — must be < sessionTimeout / 3
  resilience: { maxAttempts: 11, initialDelayMs: 500, maxRetries: 5 }, // optional — connection + delivery resilience
});
OptionTypeRequiredDefaultDescription
brokersstring[]YesKafka broker addresses
clientIdstringYesClient identifier
groupIdstringYesConsumer group ID. Events fan out across different group IDs
topicPrefixstringNo""Prefix prepended to event names to form topic names
sessionTimeoutnumberNo30000Consumer session timeout in ms. Increase if handlers are slow
heartbeatIntervalnumberNo3000Consumer heartbeat interval in ms. Must be less than sessionTimeout/3
resilienceBrokerResilienceNomaxAttempts=6, initialDelayMs=300, maxDelayMs=30000Connection resilience config. maxAttempts maps to kafkajs retries (minus 1), initialDelayMs to initialRetryTime, maxDelayMs to maxRetryTime. maxRetries limits per-message delivery attempts

How It Works

  • Publishing: Events are serialized as JSON and sent to a Kafka topic derived from the event name (${topicPrefix}${event.name}). If the event has a metadata.correlationId, it is used as the message key for partition-level ordering.
  • Subscribing: Handlers registered via on() receive messages through a Kafka consumer. Handlers registered before connect() are buffered and subscriptions are created when the connection is established. Multiple handlers for the same event are invoked in parallel via Promise.all() — independent handlers (projections, sagas) do not block each other.
  • Delivery: At-least-once. The consumer runs with autoCommit: false — offsets are committed explicitly via consumer.commitOffsets() only after all handlers complete successfully. If any handler fails, the offset is not committed and the message will be redelivered. After a successful commit, the in-memory delivery count entry for the message is pruned to prevent unbounded memory growth. Consumers must be idempotent.
  • Poison message protection: Malformed messages that fail JSON deserialization are logged and skipped (offset committed), preventing corrupt data from blocking the partition. If resilience.maxRetries is configured, messages that exceed the delivery attempt limit are also skipped with a warning.
  • Connection resilience: The optional resilience config (BrokerResilience from @noddde/core) is mapped to kafkajs retry options for automatic reconnection on broker unavailability. maxAttempts maps to kafkajs retries (minus 1, since kafkajs counts retries after the first attempt), initialDelayMs to initialRetryTime, and maxDelayMs to maxRetryTime. The sessionTimeout and heartbeatInterval options tune consumer rebalance behavior for slow handlers. Concurrent connect() calls are deduplicated via a connection promise mutex to prevent parallel connection attempts.

Wiring with Domain

import { wireDomain } from "@noddde/engine";
import { KafkaEventBus } from "@noddde/kafka";

const domain = await wireDomain(myDomain, {
  buses: () => ({
    eventBus: new KafkaEventBus({
      brokers: ["localhost:9092"],
      clientId: "my-service",
      groupId: "my-service-group",
    }),
  }),
});

The Domain auto-calls connect() on the event bus during wiring (via Connectable auto-discovery) and close() on shutdown (via Closeable auto-discovery). No manual lifecycle management needed.

NATS

Installation

yarn add @noddde/nats nats

Configuration

import { NatsEventBus } from "@noddde/nats";

const eventBus = new NatsEventBus({
  servers: "localhost:4222",
  streamName: "noddde-events", // optional — enables JetStream durable subscriptions
  subjectPrefix: "noddde.", // optional — subjects become "noddde.AccountCreated"
  prefetchCount: 256, // optional — backpressure control
  resilience: { maxAttempts: -1, initialDelayMs: 2000, maxRetries: 10 }, // optional — connection + delivery resilience
});
OptionTypeRequiredDefaultDescription
serversstring | string[]YesNATS server URL(s)
streamNamestringNoJetStream stream name for durable subscriptions
subjectPrefixstringNo""Prefix prepended to event names to form subjects
prefetchCountnumberNo256Maximum unacknowledged messages per consumer. Maps to JetStream maxAckPending. Provides backpressure control
resilienceBrokerResilienceNoConnection resilience. maxAttempts maps to maxReconnectAttempts (-1 = infinite). initialDelayMs maps to reconnectTimeWait (default: 2000ms). maxDelayMs is ignored (NATS uses fixed intervals). maxRetries maps to JetStream maxDeliver

How It Works

  • Publishing: Events are serialized as JSON, encoded as Uint8Array, and published to a NATS subject (${subjectPrefix}${event.name}). When a streamName is configured, JetStream publish acknowledgment is awaited.
  • Subscribing: JetStream consumers with durable subscriptions are created for each registered event name. Handlers registered before connect() are buffered. Multiple handlers for the same event are invoked in parallel via Promise.all() — independent handlers do not block each other.
  • Delivery: At-least-once with JetStream. Messages are acknowledged only after all handlers complete successfully. If any handler fails, the message is explicitly nacked for immediate redelivery. Consumers must be idempotent.
  • Poison message protection: Malformed messages that fail JSON deserialization are permanently discarded via msg.term(), preventing corrupt data from blocking the subscription via infinite redelivery. If resilience.maxRetries is configured, it maps to the JetStream maxDeliver consumer option, limiting how many times NATS will redeliver a message before discarding it server-side.
  • Backpressure: The prefetchCount option controls how many unacknowledged messages the server delivers to each consumer (mapped to JetStream maxAckPending), providing natural backpressure when handlers are slow.
  • Connection resilience: NATS native reconnection is enabled by default. The optional resilience config maps maxAttempts to NATS maxReconnectAttempts and initialDelayMs to reconnectTimeWait for automatic recovery on connection loss. maxDelayMs is ignored because NATS uses fixed-interval reconnection (no exponential backoff).
  • Shutdown: close() calls nc.drain() to process in-flight messages before disconnecting.

Wiring with Domain

import { wireDomain } from "@noddde/engine";
import { NatsEventBus } from "@noddde/nats";

const domain = await wireDomain(myDomain, {
  buses: () => ({
    eventBus: new NatsEventBus({
      servers: "localhost:4222",
      streamName: "noddde-events",
    }),
  }),
});

RabbitMQ

Installation

yarn add @noddde/rabbitmq amqplib
yarn add -D @types/amqplib

Configuration

import { RabbitMqEventBus } from "@noddde/rabbitmq";

const eventBus = new RabbitMqEventBus({
  url: "amqp://localhost:5672",
  exchangeName: "noddde.events", // optional (this is the default)
  exchangeType: "topic", // optional — "topic" (default) or "fanout"
  queuePrefix: "noddde", // optional — queues become "noddde.AccountCreated"
  prefetchCount: 10, // optional — backpressure control
  resilience: {
    // optional — connection resilience with exponential backoff
    maxAttempts: 3,
    initialDelayMs: 1000,
    maxDelayMs: 30000,
  },
});
OptionTypeRequiredDefaultDescription
urlstringYes---AMQP connection URL
exchangeNamestringNo"noddde.events"Exchange name for event publishing
exchangeType"topic" | "fanout"No"topic"Exchange type
queuePrefixstringNo"noddde"Queue name prefix
prefetchCountnumberNo10Maximum unacknowledged messages per consumer. Provides backpressure control
resilience.maxAttemptsnumberNo3Maximum number of connection attempts
resilience.initialDelayMsnumberNo1000Initial delay between retries in milliseconds. Doubles on each retry
resilience.maxDelayMsnumberNo30000Maximum delay between retries in milliseconds

How It Works

  • Publishing: Events are serialized as JSON and published to the configured exchange with the event name as the routing key. Messages are marked { persistent: true } for durability. The bus uses a confirm channel (createConfirmChannel) and awaits waitForConfirms() after each publish, guaranteeing the broker has accepted the message before dispatch() resolves.
  • Subscribing: Each event name gets a dedicated durable queue (${queuePrefix}.${eventName}) bound to the exchange. Multiple handlers for the same event are invoked in parallel via Promise.all() -- independent handlers do not block each other. Consumers must be idempotent since handlers that completed before a failure will re-execute on redelivery.
  • Delivery: At-least-once with manual acknowledgment. On handler success, the message is acked. On handler failure, the message is nacked with requeue for redelivery.
  • Poison message protection: Malformed messages that fail JSON deserialization are acknowledged and skipped, preventing infinite nack/requeue loops. If resilience.maxRetries is configured, messages that exceed the delivery count limit (tracked via an in-memory counter per message) are also acknowledged and discarded. The in-memory approach is used instead of x-death headers because x-death requires a dead-letter exchange to be configured.
  • Backpressure: The prefetchCount option limits how many unacknowledged messages the broker sends to this consumer, providing natural backpressure when handlers are slow.
  • Connection resilience: The resilience option enables retry with exponential backoff on initial connection failure. Delay doubles on each attempt, capped at maxDelayMs. If all attempts fail, the last error is thrown. After a successful connection, error and close handlers are registered on the AMQP connection to detect unexpected disconnections. On unexpected close, the bus automatically attempts reconnection using the same backoff configuration, then re-establishes all consumers.

Wiring with Domain

import { wireDomain } from "@noddde/engine";
import { RabbitMqEventBus } from "@noddde/rabbitmq";

const domain = await wireDomain(myDomain, {
  buses: () => ({
    eventBus: new RabbitMqEventBus({
      url: "amqp://localhost:5672",
    }),
  }),
});

Connection Lifecycle

All three broker adapters implement Connectable from @noddde/core. The Domain manages the full lifecycle automatically:

  1. Construct — you create the bus with configuration in the buses() factory.
  2. Register handlers — the Domain registers event handlers via on() for projections, sagas, and standalone event handlers. All adapter implementations buffer handlers registered before connect().
  3. Auto-connect — after all handler registration is complete, the Domain detects Connectable buses and calls connect() automatically. Connecting after handler registration prevents a race condition where broker-backed buses deliver queued messages before handlers are ready. If the broker is unreachable, wiring fails fast with a clear error.
  4. Dispatch eventsdispatch() is called by the engine during the command lifecycle.
  5. Auto-close — the Domain calls close() on shutdown (via Closeable auto-discovery). Unsubscribes handlers, disconnects, and releases resources. Idempotent.

You never need to call connect() or close() manually.

Resilience

All three broker adapters accept an optional resilience field of type BrokerResilience (from @noddde/core). This provides a consistent configuration shape across adapters for both connection-level and message-level resilience.

Connection Resilience

FieldKafkaNATSRabbitMQ
maxAttemptsretries (minus 1)maxReconnectAttempts (-1 = infinite)Connection retry attempts
initialDelayMsinitialRetryTimereconnectTimeWait (fixed interval)Base delay (exponential backoff)
maxDelayMsmaxRetryTimeIgnored (fixed intervals)Backoff cap

Message Delivery Resilience

The maxRetries field limits per-message delivery attempts, preventing poison messages from blocking consumers indefinitely:

AdapterMechanismWhat Happens After Limit
KafkaConsumer-side delivery count trackingMessage offset committed (skipped)
NATSJetStream maxDeliver consumer optionNATS discards the message server-side
RabbitMQIn-memory delivery count trackingMessage acked (discarded)

All adapters also protect against deserialization failures (malformed JSON). Poison messages that cannot be parsed are logged and discarded without blocking the consumer.

Choosing an Adapter

ScenarioRecommended Adapter
Development and testingEventEmitterEventBus (in-memory)
Single-process productionEventEmitterEventBus (in-memory)
High-throughput event streamingKafkaEventBus
Lightweight distributed messagingNatsEventBus
Reliable message brokering with flexible routingRabbitMqEventBus

All broker adapters provide at-least-once delivery. Choose based on your existing infrastructure and operational expertise rather than feature differences.

Custom Event Bus

You can implement the EventBus interface directly for any message transport. If your bus needs an async connection step, also implement Connectable — the Domain will auto-call connect() during wiring:

import type {
  EventBus,
  AsyncEventHandler,
  Connectable,
  Event,
} from "@noddde/core";

export class RedisEventBus implements EventBus, Connectable {
  private readonly handlers = new Map<string, AsyncEventHandler[]>();

  async connect(): Promise<void> {
    // Connect to Redis
  }

  on(eventName: string, handler: AsyncEventHandler): void {
    const existing = this.handlers.get(eventName) ?? [];
    this.handlers.set(eventName, [...existing, handler]);
  }

  async dispatch<TEvent extends Event>(event: TEvent): Promise<void> {
    // Publish to Redis Streams, Pub/Sub, etc.
  }

  async close(): Promise<void> {
    this.handlers.clear();
    // Disconnect from Redis
  }
}

The requirements are:

  • dispatch() publishes to all subscribers for the event name.
  • on() supports multiple handlers per event name (fan-out).
  • close() releases all resources and is idempotent.
  • connect() (if implementing Connectable) establishes the connection and is idempotent.

On this page