System Designadvanced

Message Queues & Event-Driven Architecture

Master asynchronous communication patterns with message queues. Learn pub/sub, event sourcing, delivery guarantees, dead letter queues, and RabbitMQ vs Kafka trade-offs.

16 min readΒ·Published Apr 28, 2026
system-designmessage-queuesevent-drivenkafka

Why Asynchronous Communication?

In a synchronous system, Service A calls Service B and waits for a response. If Service B is slow, Service A is slow. If Service B is down, Service A fails. Every service in the call chain is as fragile as the weakest link.

Message queues break this coupling. Instead of calling Service B directly, Service A puts a message on a queue and moves on. Service B processes the message when it is ready. If Service B is temporarily down, the messages wait in the queue until it recovers.

Synchronous (tightly coupled):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”
β”‚ Svc A │────▢│ Svc B │────▢│ Svc C β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜
   β”‚             β”‚              β”‚
   β”‚ waiting...  β”‚ waiting...   β”‚ processing
   β”‚             β”‚              β”‚
   β–Ό             β–Ό              β–Ό
 Total latency = A + B + C
 If B is down, A fails immediately

Asynchronous (decoupled):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”
β”‚ Svc A │────▢│  Queue  │◀────│ Svc B β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜
   β”‚                              β”‚
   β–Ό done                         β–Ό processes when ready
 A returns immediately
 If B is down, messages wait in queue

This article covers message queue fundamentals, communication patterns, delivery guarantees, error handling with dead letter queues, and a practical comparison of RabbitMQ and Kafka.

Message Queue Fundamentals

A message queue is a buffer that stores messages sent by producers and delivers them to consumers. It acts as middleware between services.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Producer │───▢│         Queue           │───▢│ Consumer β”‚
β”‚          β”‚    β”‚ [msg1][msg2][msg3][msg4] β”‚    β”‚          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Producer: Creates and sends messages
Queue: Stores messages durably, delivers in order
Consumer: Receives and processes messages

Basic Producer/Consumer Pattern

// Producer β€” sends order to processing queue
import amqp from 'amqplib';

async function publishOrder(order: Order) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL);
  const channel = await connection.createChannel();

  await channel.assertQueue('order-processing', {
    durable: true,       // Queue survives broker restart
  });

  channel.sendToQueue('order-processing',
    Buffer.from(JSON.stringify({
      orderId: order.id,
      userId: order.userId,
      items: order.items,
      total: order.total,
      createdAt: new Date().toISOString(),
    })),
    {
      persistent: true,  // Message survives broker restart
      messageId: order.id,
      timestamp: Date.now(),
    }
  );

  console.log(`Order ${order.id} published to queue`);
}
// Consumer β€” processes orders from the queue
async function startOrderConsumer() {
  const connection = await amqp.connect(process.env.RABBITMQ_URL);
  const channel = await connection.createChannel();

  await channel.assertQueue('order-processing', { durable: true });
  channel.prefetch(10); // Process up to 10 messages concurrently

  channel.consume('order-processing', async (msg) => {
    if (!msg) return;

    try {
      const order = JSON.parse(msg.content.toString());
      await processOrder(order);

      // Acknowledge β€” message removed from queue
      channel.ack(msg);
    } catch (error) {
      console.error('Failed to process order:', error);

      // Negative acknowledge β€” message returned to queue for retry
      // requeue: false sends it to dead letter queue if configured
      channel.nack(msg, false, false);
    }
  });

  console.log('Order consumer started, waiting for messages...');
}

Benefits of Message Queues

Decoupling. Producers and consumers do not know about each other. You can add, remove, or replace consumers without changing the producer.

Reliability. Messages are persisted to disk. If a consumer crashes mid-processing, the message is redelivered to another consumer.

Load leveling. If traffic spikes and producers send 10,000 messages per second but consumers can only process 1,000 per second, the queue absorbs the burst. Consumers process at their own pace.

Without queue:
10,000 req/sec ──▢ Service (capacity: 1,000/sec) ──▢ CRASH

With queue:
10,000 req/sec ──▢ Queue (absorbs burst) ──▢ Service (1,000/sec, no crash)
                   Queue depth: 9,000 messages
                   Drain time: ~9 seconds

Retry and error handling. Failed messages can be automatically retried or routed to a dead letter queue for investigation.

Publish/Subscribe Pattern

In the pub/sub pattern, a message is delivered to ALL subscribers, not just one consumer. This is ideal when multiple services need to react to the same event.

Point-to-Point (one consumer per message):
Producer ──▢ Queue ──▢ Consumer A  (Consumer A OR B gets it)
                   ──▢ Consumer B

Publish/Subscribe (all subscribers get every message):
Producer ──▢ Topic/Exchange ──▢ Consumer A  (A AND B AND C all get it)
                            ──▢ Consumer B
                            ──▢ Consumer C

RabbitMQ Pub/Sub with Fanout Exchange

// Publisher
async function publishUserEvent(event: UserEvent) {
  const channel = await getChannel();

  // Fanout exchange: delivers to ALL bound queues
  await channel.assertExchange('user-events', 'fanout', { durable: true });

  channel.publish('user-events', '',
    Buffer.from(JSON.stringify(event)),
    { persistent: true }
  );
}

// Subscriber 1: Email service
async function startEmailSubscriber() {
  const channel = await getChannel();
  await channel.assertExchange('user-events', 'fanout', { durable: true });

  // Each subscriber gets its own queue bound to the exchange
  const queue = await channel.assertQueue('email-notifications', { durable: true });
  await channel.bindQueue(queue.queue, 'user-events', '');

  channel.consume(queue.queue, async (msg) => {
    if (!msg) return;
    const event = JSON.parse(msg.content.toString());

    if (event.type === 'user.registered') {
      await sendWelcomeEmail(event.data.email);
    }
    channel.ack(msg);
  });
}

// Subscriber 2: Analytics service
async function startAnalyticsSubscriber() {
  const channel = await getChannel();
  await channel.assertExchange('user-events', 'fanout', { durable: true });

  const queue = await channel.assertQueue('analytics-tracking', { durable: true });
  await channel.bindQueue(queue.queue, 'user-events', '');

  channel.consume(queue.queue, async (msg) => {
    if (!msg) return;
    const event = JSON.parse(msg.content.toString());
    await trackEvent(event.type, event.data);
    channel.ack(msg);
  });
}

// Subscriber 3: Audit log service
async function startAuditSubscriber() {
  const channel = await getChannel();
  await channel.assertExchange('user-events', 'fanout', { durable: true });

  const queue = await channel.assertQueue('audit-log', { durable: true });
  await channel.bindQueue(queue.queue, 'user-events', '');

  channel.consume(queue.queue, async (msg) => {
    if (!msg) return;
    const event = JSON.parse(msg.content.toString());
    await writeAuditLog(event);
    channel.ack(msg);
  });
}

Kafka Pub/Sub with Consumer Groups

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
});

// Producer
const producer = kafka.producer();

async function publishOrderEvent(event: OrderEvent) {
  await producer.connect();
  await producer.send({
    topic: 'order-events',
    messages: [{
      key: event.orderId,   // Partition key: same order always goes to same partition
      value: JSON.stringify(event),
      headers: {
        'event-type': event.type,
        'correlation-id': event.correlationId,
      },
    }],
  });
}

// Consumer Group: Order Processing
// Each partition is consumed by exactly one consumer in the group
const orderConsumer = kafka.consumer({ groupId: 'order-processing' });

async function startOrderProcessing() {
  await orderConsumer.connect();
  await orderConsumer.subscribe({ topic: 'order-events', fromBeginning: false });

  await orderConsumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());

      switch (event.type) {
        case 'order.created':
          await handleOrderCreated(event);
          break;
        case 'order.cancelled':
          await handleOrderCancelled(event);
          break;
      }
    },
  });
}

// Different consumer group: Analytics (gets ALL messages independently)
const analyticsConsumer = kafka.consumer({ groupId: 'analytics' });

async function startAnalytics() {
  await analyticsConsumer.connect();
  await analyticsConsumer.subscribe({ topic: 'order-events', fromBeginning: true });

  await analyticsConsumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value.toString());
      await trackOrderMetrics(event);
    },
  });
}

Event Sourcing

Instead of storing the current state of an entity, store every state change as an immutable event. The current state is derived by replaying all events.

Traditional (state-based):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  accounts table                              β”‚
β”‚  id: 123  |  balance: $750  |  name: Alice   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Question: How did the balance become $750? No idea.

Event Sourcing (event-based):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  account_events (append-only log)                        β”‚
β”‚                                                          β”‚
β”‚  1. AccountOpened    { id: 123, name: "Alice" }          β”‚
β”‚  2. MoneyDeposited   { id: 123, amount: $1000 }         β”‚
β”‚  3. MoneyWithdrawn   { id: 123, amount: $200 }          β”‚
β”‚  4. MoneyWithdrawn   { id: 123, amount: $50 }           β”‚
β”‚                                                          β”‚
β”‚  Current balance: $0 + $1000 - $200 - $50 = $750        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Full history preserved. Every change is auditable.
// Event sourcing implementation
type AccountEvent =
  | { type: 'AccountOpened'; data: { accountId: string; name: string } }
  | { type: 'MoneyDeposited'; data: { accountId: string; amount: number } }
  | { type: 'MoneyWithdrawn'; data: { accountId: string; amount: number } }
  | { type: 'AccountClosed'; data: { accountId: string; reason: string } };

type AccountState = {
  id: string;
  name: string;
  balance: number;
  isOpen: boolean;
};

// Reducer: applies events to build current state
function applyEvent(state: AccountState, event: AccountEvent): AccountState {
  switch (event.type) {
    case 'AccountOpened':
      return { id: event.data.accountId, name: event.data.name, balance: 0, isOpen: true };
    case 'MoneyDeposited':
      return { ...state, balance: state.balance + event.data.amount };
    case 'MoneyWithdrawn':
      return { ...state, balance: state.balance - event.data.amount };
    case 'AccountClosed':
      return { ...state, isOpen: false };
  }
}

// Rebuild state from events
function getAccountState(events: AccountEvent[]): AccountState {
  return events.reduce(applyEvent, {} as AccountState);
}

// Event store
class EventStore {
  async appendEvent(streamId: string, event: AccountEvent) {
    await db.query(
      'INSERT INTO events (stream_id, event_type, data, created_at) VALUES ($1, $2, $3, NOW())',
      [streamId, event.type, JSON.stringify(event.data)]
    );

    // Publish to message broker for other services
    await messageBroker.publish(`account.${event.type}`, event);
  }

  async getEvents(streamId: string): Promise<AccountEvent[]> {
    const result = await db.query(
      'SELECT event_type, data FROM events WHERE stream_id = $1 ORDER BY created_at ASC',
      [streamId]
    );
    return result.rows.map(row => ({
      type: row.event_type,
      data: row.data,
    }));
  }
}

// Usage
const store = new EventStore();

// Deposit money
await store.appendEvent('account-123', {
  type: 'MoneyDeposited',
  data: { accountId: 'account-123', amount: 500 },
});

// Get current state
const events = await store.getEvents('account-123');
const currentState = getAccountState(events);
console.log(currentState.balance); // Computed from all events

Event sourcing trade-offs:

BenefitCost
Complete audit trailStorage grows indefinitely
Time-travel debuggingEvent schema evolution is complex
Replay events to new read modelsEventual consistency in read models
Natural fit for event-driven systemsSteeper learning curve

Delivery Guarantees

How many times will a consumer receive a given message? This is one of the most important decisions in messaging system design.

At-Most-Once Delivery

The message is delivered zero or one time. It may be lost but is never duplicated.

Producer ──▢ Broker ──▢ Consumer
                         β”‚
                         β–Ό ACK sent before processing
                       (if consumer crashes during processing,
                        message is lost)
// At-most-once: acknowledge immediately, then process
channel.consume('queue', async (msg) => {
  channel.ack(msg); // ACK first

  // If this crashes, message is already acknowledged β€” lost
  await processMessage(msg);
});

Use case: Metrics, logs, non-critical notifications where occasional loss is acceptable.

At-Least-Once Delivery

The message is delivered one or more times. It is never lost but may be duplicated.

Producer ──▢ Broker ──▢ Consumer
                         β”‚
                         β–Ό Process first, then ACK
                       (if consumer crashes after processing
                        but before ACK, message is redelivered)
// At-least-once: process first, acknowledge after
channel.consume('queue', async (msg) => {
  try {
    await processMessage(msg); // Process first
    channel.ack(msg);          // ACK after success
  } catch (error) {
    channel.nack(msg);         // Requeue on failure
  }
});

// Consumer MUST be idempotent β€” processing same message twice
// should produce the same result
async function processMessage(msg: Message) {
  const data = JSON.parse(msg.content.toString());

  // Idempotency check
  const alreadyProcessed = await db.query(
    'SELECT 1 FROM processed_messages WHERE message_id = $1',
    [data.messageId]
  );

  if (alreadyProcessed.rows.length > 0) {
    console.log(`Message ${data.messageId} already processed, skipping`);
    return;
  }

  await db.transaction(async (trx) => {
    await trx.query(
      'INSERT INTO processed_messages (message_id) VALUES ($1)',
      [data.messageId]
    );
    await trx.query(
      'INSERT INTO orders (id, user_id, total) VALUES ($1, $2, $3)',
      [data.orderId, data.userId, data.total]
    );
  });
}

Use case: Payment processing, order creation β€” anything where losing a message is unacceptable.

Exactly-Once Delivery

The message is delivered exactly one time. No loss, no duplication. This is extremely difficult to achieve in a distributed system.

The truth: True exactly-once delivery across distributed systems is theoretically impossible. What systems actually provide is "effectively exactly-once" through at-least-once delivery plus idempotent processing.

Kafka's "exactly-once semantics" (EOS):

Producer (idempotent) ──▢ Broker (deduplicates) ──▢ Consumer (transactional)

This works within Kafka's ecosystem (produce-transform-produce).
For external systems, you still need idempotent consumers.
// Kafka exactly-once configuration
const producer = kafka.producer({
  idempotent: true,              // Enable idempotent producer
  transactionalId: 'my-app-tx', // Enable transactions
});

async function processAndForward(message: KafkaMessage) {
  const transaction = await producer.transaction();

  try {
    // Process input
    const result = transform(message);

    // Produce output (within transaction)
    await transaction.send({
      topic: 'output-topic',
      messages: [{ value: JSON.stringify(result) }],
    });

    // Commit consumer offset (within same transaction)
    await transaction.sendOffsets({
      consumerGroupId: 'my-group',
      topics: [{
        topic: 'input-topic',
        partitions: [{ partition: 0, offset: message.offset }],
      }],
    });

    await transaction.commit();
  } catch (error) {
    await transaction.abort();
    throw error;
  }
}

Dead Letter Queues

A dead letter queue (DLQ) captures messages that cannot be processed after multiple retry attempts. Without a DLQ, poison messages (messages that always fail) would block the queue forever or be lost silently.

Normal flow:
Producer ──▢ Main Queue ──▢ Consumer ──▢ Success βœ“

With DLQ:
Producer ──▢ Main Queue ──▢ Consumer ──▢ Fail (attempt 1)
                                     ──▢ Fail (attempt 2)
                                     ──▢ Fail (attempt 3)
                                     ──▢ Dead Letter Queue
                                              β”‚
                                              β–Ό
                                    Alert + Manual review
// RabbitMQ DLQ setup
async function setupQueuesWithDLQ() {
  const channel = await getChannel();

  // Dead letter queue
  await channel.assertQueue('order-processing-dlq', {
    durable: true,
  });

  // Main queue with DLQ configuration
  await channel.assertQueue('order-processing', {
    durable: true,
    arguments: {
      'x-dead-letter-exchange': '',
      'x-dead-letter-routing-key': 'order-processing-dlq',
      'x-message-ttl': 30000,    // Messages expire after 30s if not consumed
    },
  });
}

// Consumer with retry logic
async function startConsumerWithRetry() {
  const channel = await getChannel();
  const MAX_RETRIES = 3;

  channel.consume('order-processing', async (msg) => {
    if (!msg) return;

    const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) as number;

    try {
      const order = JSON.parse(msg.content.toString());
      await processOrder(order);
      channel.ack(msg);
    } catch (error) {
      if (retryCount < MAX_RETRIES) {
        // Republish with incremented retry count
        channel.sendToQueue('order-processing',
          msg.content,
          {
            ...msg.properties,
            headers: { ...msg.properties.headers, 'x-retry-count': retryCount + 1 },
          }
        );
        channel.ack(msg); // Ack original to prevent duplicate
      } else {
        // Max retries exceeded β€” send to DLQ
        channel.nack(msg, false, false);
        await alerting.send({
          severity: 'high',
          message: `Order processing failed after ${MAX_RETRIES} retries`,
          orderId: JSON.parse(msg.content.toString()).orderId,
          error: error.message,
        });
      }
    }
  });
}

// DLQ consumer β€” for manual review or automated reprocessing
async function startDLQConsumer() {
  const channel = await getChannel();

  channel.consume('order-processing-dlq', async (msg) => {
    if (!msg) return;

    const order = JSON.parse(msg.content.toString());
    const retryCount = msg.properties.headers?.['x-retry-count'] || 0;

    // Log for investigation
    console.error('Dead letter message:', {
      orderId: order.orderId,
      retryCount,
      originalTimestamp: msg.properties.timestamp,
    });

    // Store in database for manual review
    await db.query(
      `INSERT INTO dead_letters (queue, message_id, content, retry_count, created_at)
       VALUES ($1, $2, $3, $4, NOW())`,
      ['order-processing', msg.properties.messageId, msg.content.toString(), retryCount]
    );

    channel.ack(msg);
  });
}

RabbitMQ vs Kafka

These are the two most popular messaging systems, and they solve fundamentally different problems.

RabbitMQ: Smart broker, dumb consumers
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Producer │────▢│  RabbitMQ (routes msgs)  │────▢│ Consumer β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  Exchanges β†’ Queues      β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β”‚  Message deleted on ACK   β”‚
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Kafka: Dumb broker, smart consumers
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Producer │────▢│  Kafka (append-only log) │◀────│ Consumer β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  Topics β†’ Partitions     β”‚     β”‚(pulls)   β”‚
                 β”‚  Messages retained        β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Comparison Table

AspectRabbitMQKafka
ModelMessage queue (push)Distributed log (pull)
Message lifetimeDeleted after ACKRetained for configured period
RoutingFlexible (exchange types: direct, fanout, topic, headers)Topic + partition only
OrderingPer-queue FIFOPer-partition FIFO
Throughput10K-50K msg/sec per node100K-1M+ msg/sec per node
Consumer groupsCompeting consumers on same queueBuilt-in consumer groups with offset tracking
ReplayNot possible (messages deleted)Replay from any offset
LatencyLower (push-based)Slightly higher (poll-based, but configurable)
ProtocolAMQP, MQTT, STOMPCustom binary protocol
ComplexityModerateHigher (ZooKeeper/KRaft, partition management)
Best forTask queues, RPC, complex routingEvent streaming, log aggregation, high throughput
ScalingAdd nodes (limited by exchange routing)Add partitions (linear scaling)

When to Choose RabbitMQ

  • Traditional task queues (email sending, image processing)
  • Request/reply patterns (RPC over messaging)
  • Complex routing logic (route by message attributes)
  • Lower throughput requirements with complex message flows
  • Team is familiar with traditional message brokers

When to Choose Kafka

  • High-throughput event streaming (100K+ events/sec)
  • Event sourcing (need to replay events)
  • Log aggregation across multiple services
  • Stream processing (Kafka Streams, ksqlDB)
  • Data pipeline between systems (CDC, ETL)
  • Multiple consumer groups reading the same data independently

Architecture Patterns

CQRS with Event-Driven Communication

Command side (writes):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Client  │────▢│ Command API  │────▢│ Write Database β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                        β–Ό (publishes events)
                 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                 β”‚ Message Queueβ”‚
                 β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                        β–Ό (consumes events)
                 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                 β”‚ Projector    │────▢│ Read Database β”‚
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚ (optimized    β”‚
                                      β”‚  for queries) β”‚
Query side (reads):                   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  Client  │────▢│  Query API   β”‚β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Saga Orchestration via Message Queue

// Saga orchestrator β€” coordinates multi-service transactions
class OrderSagaOrchestrator {
  async startSaga(order: Order) {
    const sagaId = generateId();

    // Step 1: Reserve inventory
    await messageBroker.publish('inventory.reserve', {
      sagaId,
      orderId: order.id,
      items: order.items,
    });

    await sagaStore.save(sagaId, {
      step: 'INVENTORY_RESERVE',
      orderId: order.id,
      status: 'PENDING',
    });
  }

  async onInventoryReserved(event: InventoryReservedEvent) {
    const saga = await sagaStore.get(event.sagaId);

    // Step 2: Process payment
    await messageBroker.publish('payment.process', {
      sagaId: event.sagaId,
      orderId: saga.orderId,
      amount: saga.total,
    });

    await sagaStore.update(event.sagaId, { step: 'PAYMENT_PROCESS' });
  }

  async onPaymentCompleted(event: PaymentCompletedEvent) {
    const saga = await sagaStore.get(event.sagaId);

    // Step 3: Confirm order
    await messageBroker.publish('order.confirm', {
      sagaId: event.sagaId,
      orderId: saga.orderId,
    });

    await sagaStore.update(event.sagaId, { step: 'COMPLETED', status: 'SUCCESS' });
  }

  // Compensating actions on failure
  async onPaymentFailed(event: PaymentFailedEvent) {
    const saga = await sagaStore.get(event.sagaId);

    // Compensate: release inventory
    await messageBroker.publish('inventory.release', {
      sagaId: event.sagaId,
      orderId: saga.orderId,
    });

    await sagaStore.update(event.sagaId, { step: 'COMPENSATING', status: 'FAILED' });
  }
}

Key Takeaways

  • Use synchronous communication for queries that need immediate responses. Use asynchronous (queues) for commands that can be processed later.
  • At-least-once delivery with idempotent consumers is the pragmatic sweet spot. True exactly-once across system boundaries is nearly impossible.
  • Dead letter queues are not optional. Every production queue needs a DLQ strategy. Poison messages will happen.
  • Choose RabbitMQ for traditional task queues with complex routing. Choose Kafka for high-throughput event streaming with replay capability.
  • Event sourcing provides a complete audit trail and enables powerful patterns (CQRS, temporal queries), but adds significant complexity. Use it when the audit trail or replay capability provides clear business value.
  • Monitor queue depth, consumer lag, and processing latency. A growing queue depth means consumers cannot keep up β€” scale them out or investigate bottlenecks.
  • Design messages to be self-contained. A consumer should not need to call back to the producer for additional data.
  • Version your message schemas from day one. Schema evolution is inevitable, and breaking consumer compatibility causes outages.

Found this helpful?

Support devsofus β€” help us keep creating free dev guides.

Related Articles