Why Asynchronous Communication?
In a synchronous system, Service A calls Service B and waits for a response. If Service B is slow, Service A is slow. If Service B is down, Service A fails. Every service in the call chain is as fragile as the weakest link.
Message queues break this coupling. Instead of calling Service B directly, Service A puts a message on a queue and moves on. Service B processes the message when it is ready. If Service B is temporarily down, the messages wait in the queue until it recovers.
Synchronous (tightly coupled):
βββββββββ βββββββββ βββββββββ
β Svc A ββββββΆβ Svc B ββββββΆβ Svc C β
βββββββββ βββββββββ βββββββββ
β β β
β waiting... β waiting... β processing
β β β
βΌ βΌ βΌ
Total latency = A + B + C
If B is down, A fails immediately
Asynchronous (decoupled):
βββββββββ βββββββββββ βββββββββ
β Svc A ββββββΆβ Queue βββββββ Svc B β
βββββββββ βββββββββββ βββββββββ
β β
βΌ done βΌ processes when ready
A returns immediately
If B is down, messages wait in queue
This article covers message queue fundamentals, communication patterns, delivery guarantees, error handling with dead letter queues, and a practical comparison of RabbitMQ and Kafka.
Message Queue Fundamentals
A message queue is a buffer that stores messages sent by producers and delivers them to consumers. It acts as middleware between services.
ββββββββββββ βββββββββββββββββββββββββββ ββββββββββββ
β Producer βββββΆβ Queue βββββΆβ Consumer β
β β β [msg1][msg2][msg3][msg4] β β β
ββββββββββββ βββββββββββββββββββββββββββ ββββββββββββ
Producer: Creates and sends messages
Queue: Stores messages durably, delivers in order
Consumer: Receives and processes messages
Basic Producer/Consumer Pattern
// Producer β sends order to processing queue
import amqp from 'amqplib';
async function publishOrder(order: Order) {
const connection = await amqp.connect(process.env.RABBITMQ_URL);
const channel = await connection.createChannel();
await channel.assertQueue('order-processing', {
durable: true, // Queue survives broker restart
});
channel.sendToQueue('order-processing',
Buffer.from(JSON.stringify({
orderId: order.id,
userId: order.userId,
items: order.items,
total: order.total,
createdAt: new Date().toISOString(),
})),
{
persistent: true, // Message survives broker restart
messageId: order.id,
timestamp: Date.now(),
}
);
console.log(`Order ${order.id} published to queue`);
}
// Consumer β processes orders from the queue
async function startOrderConsumer() {
const connection = await amqp.connect(process.env.RABBITMQ_URL);
const channel = await connection.createChannel();
await channel.assertQueue('order-processing', { durable: true });
channel.prefetch(10); // Process up to 10 messages concurrently
channel.consume('order-processing', async (msg) => {
if (!msg) return;
try {
const order = JSON.parse(msg.content.toString());
await processOrder(order);
// Acknowledge β message removed from queue
channel.ack(msg);
} catch (error) {
console.error('Failed to process order:', error);
// Negative acknowledge β message returned to queue for retry
// requeue: false sends it to dead letter queue if configured
channel.nack(msg, false, false);
}
});
console.log('Order consumer started, waiting for messages...');
}
Benefits of Message Queues
Decoupling. Producers and consumers do not know about each other. You can add, remove, or replace consumers without changing the producer.
Reliability. Messages are persisted to disk. If a consumer crashes mid-processing, the message is redelivered to another consumer.
Load leveling. If traffic spikes and producers send 10,000 messages per second but consumers can only process 1,000 per second, the queue absorbs the burst. Consumers process at their own pace.
Without queue:
10,000 req/sec βββΆ Service (capacity: 1,000/sec) βββΆ CRASH
With queue:
10,000 req/sec βββΆ Queue (absorbs burst) βββΆ Service (1,000/sec, no crash)
Queue depth: 9,000 messages
Drain time: ~9 seconds
Retry and error handling. Failed messages can be automatically retried or routed to a dead letter queue for investigation.
Publish/Subscribe Pattern
In the pub/sub pattern, a message is delivered to ALL subscribers, not just one consumer. This is ideal when multiple services need to react to the same event.
Point-to-Point (one consumer per message):
Producer βββΆ Queue βββΆ Consumer A (Consumer A OR B gets it)
βββΆ Consumer B
Publish/Subscribe (all subscribers get every message):
Producer βββΆ Topic/Exchange βββΆ Consumer A (A AND B AND C all get it)
βββΆ Consumer B
βββΆ Consumer C
RabbitMQ Pub/Sub with Fanout Exchange
// Publisher
async function publishUserEvent(event: UserEvent) {
const channel = await getChannel();
// Fanout exchange: delivers to ALL bound queues
await channel.assertExchange('user-events', 'fanout', { durable: true });
channel.publish('user-events', '',
Buffer.from(JSON.stringify(event)),
{ persistent: true }
);
}
// Subscriber 1: Email service
async function startEmailSubscriber() {
const channel = await getChannel();
await channel.assertExchange('user-events', 'fanout', { durable: true });
// Each subscriber gets its own queue bound to the exchange
const queue = await channel.assertQueue('email-notifications', { durable: true });
await channel.bindQueue(queue.queue, 'user-events', '');
channel.consume(queue.queue, async (msg) => {
if (!msg) return;
const event = JSON.parse(msg.content.toString());
if (event.type === 'user.registered') {
await sendWelcomeEmail(event.data.email);
}
channel.ack(msg);
});
}
// Subscriber 2: Analytics service
async function startAnalyticsSubscriber() {
const channel = await getChannel();
await channel.assertExchange('user-events', 'fanout', { durable: true });
const queue = await channel.assertQueue('analytics-tracking', { durable: true });
await channel.bindQueue(queue.queue, 'user-events', '');
channel.consume(queue.queue, async (msg) => {
if (!msg) return;
const event = JSON.parse(msg.content.toString());
await trackEvent(event.type, event.data);
channel.ack(msg);
});
}
// Subscriber 3: Audit log service
async function startAuditSubscriber() {
const channel = await getChannel();
await channel.assertExchange('user-events', 'fanout', { durable: true });
const queue = await channel.assertQueue('audit-log', { durable: true });
await channel.bindQueue(queue.queue, 'user-events', '');
channel.consume(queue.queue, async (msg) => {
if (!msg) return;
const event = JSON.parse(msg.content.toString());
await writeAuditLog(event);
channel.ack(msg);
});
}
Kafka Pub/Sub with Consumer Groups
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
});
// Producer
const producer = kafka.producer();
async function publishOrderEvent(event: OrderEvent) {
await producer.connect();
await producer.send({
topic: 'order-events',
messages: [{
key: event.orderId, // Partition key: same order always goes to same partition
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'correlation-id': event.correlationId,
},
}],
});
}
// Consumer Group: Order Processing
// Each partition is consumed by exactly one consumer in the group
const orderConsumer = kafka.consumer({ groupId: 'order-processing' });
async function startOrderProcessing() {
await orderConsumer.connect();
await orderConsumer.subscribe({ topic: 'order-events', fromBeginning: false });
await orderConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
switch (event.type) {
case 'order.created':
await handleOrderCreated(event);
break;
case 'order.cancelled':
await handleOrderCancelled(event);
break;
}
},
});
}
// Different consumer group: Analytics (gets ALL messages independently)
const analyticsConsumer = kafka.consumer({ groupId: 'analytics' });
async function startAnalytics() {
await analyticsConsumer.connect();
await analyticsConsumer.subscribe({ topic: 'order-events', fromBeginning: true });
await analyticsConsumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
await trackOrderMetrics(event);
},
});
}
Event Sourcing
Instead of storing the current state of an entity, store every state change as an immutable event. The current state is derived by replaying all events.
Traditional (state-based):
ββββββββββββββββββββββββββββββββββββββββββββββββ
β accounts table β
β id: 123 | balance: $750 | name: Alice β
ββββββββββββββββββββββββββββββββββββββββββββββββ
Question: How did the balance become $750? No idea.
Event Sourcing (event-based):
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β account_events (append-only log) β
β β
β 1. AccountOpened { id: 123, name: "Alice" } β
β 2. MoneyDeposited { id: 123, amount: $1000 } β
β 3. MoneyWithdrawn { id: 123, amount: $200 } β
β 4. MoneyWithdrawn { id: 123, amount: $50 } β
β β
β Current balance: $0 + $1000 - $200 - $50 = $750 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Full history preserved. Every change is auditable.
// Event sourcing implementation
type AccountEvent =
| { type: 'AccountOpened'; data: { accountId: string; name: string } }
| { type: 'MoneyDeposited'; data: { accountId: string; amount: number } }
| { type: 'MoneyWithdrawn'; data: { accountId: string; amount: number } }
| { type: 'AccountClosed'; data: { accountId: string; reason: string } };
type AccountState = {
id: string;
name: string;
balance: number;
isOpen: boolean;
};
// Reducer: applies events to build current state
function applyEvent(state: AccountState, event: AccountEvent): AccountState {
switch (event.type) {
case 'AccountOpened':
return { id: event.data.accountId, name: event.data.name, balance: 0, isOpen: true };
case 'MoneyDeposited':
return { ...state, balance: state.balance + event.data.amount };
case 'MoneyWithdrawn':
return { ...state, balance: state.balance - event.data.amount };
case 'AccountClosed':
return { ...state, isOpen: false };
}
}
// Rebuild state from events
function getAccountState(events: AccountEvent[]): AccountState {
return events.reduce(applyEvent, {} as AccountState);
}
// Event store
class EventStore {
async appendEvent(streamId: string, event: AccountEvent) {
await db.query(
'INSERT INTO events (stream_id, event_type, data, created_at) VALUES ($1, $2, $3, NOW())',
[streamId, event.type, JSON.stringify(event.data)]
);
// Publish to message broker for other services
await messageBroker.publish(`account.${event.type}`, event);
}
async getEvents(streamId: string): Promise<AccountEvent[]> {
const result = await db.query(
'SELECT event_type, data FROM events WHERE stream_id = $1 ORDER BY created_at ASC',
[streamId]
);
return result.rows.map(row => ({
type: row.event_type,
data: row.data,
}));
}
}
// Usage
const store = new EventStore();
// Deposit money
await store.appendEvent('account-123', {
type: 'MoneyDeposited',
data: { accountId: 'account-123', amount: 500 },
});
// Get current state
const events = await store.getEvents('account-123');
const currentState = getAccountState(events);
console.log(currentState.balance); // Computed from all events
Event sourcing trade-offs:
| Benefit | Cost |
|---|---|
| Complete audit trail | Storage grows indefinitely |
| Time-travel debugging | Event schema evolution is complex |
| Replay events to new read models | Eventual consistency in read models |
| Natural fit for event-driven systems | Steeper learning curve |
Delivery Guarantees
How many times will a consumer receive a given message? This is one of the most important decisions in messaging system design.
At-Most-Once Delivery
The message is delivered zero or one time. It may be lost but is never duplicated.
Producer βββΆ Broker βββΆ Consumer
β
βΌ ACK sent before processing
(if consumer crashes during processing,
message is lost)
// At-most-once: acknowledge immediately, then process
channel.consume('queue', async (msg) => {
channel.ack(msg); // ACK first
// If this crashes, message is already acknowledged β lost
await processMessage(msg);
});
Use case: Metrics, logs, non-critical notifications where occasional loss is acceptable.
At-Least-Once Delivery
The message is delivered one or more times. It is never lost but may be duplicated.
Producer βββΆ Broker βββΆ Consumer
β
βΌ Process first, then ACK
(if consumer crashes after processing
but before ACK, message is redelivered)
// At-least-once: process first, acknowledge after
channel.consume('queue', async (msg) => {
try {
await processMessage(msg); // Process first
channel.ack(msg); // ACK after success
} catch (error) {
channel.nack(msg); // Requeue on failure
}
});
// Consumer MUST be idempotent β processing same message twice
// should produce the same result
async function processMessage(msg: Message) {
const data = JSON.parse(msg.content.toString());
// Idempotency check
const alreadyProcessed = await db.query(
'SELECT 1 FROM processed_messages WHERE message_id = $1',
[data.messageId]
);
if (alreadyProcessed.rows.length > 0) {
console.log(`Message ${data.messageId} already processed, skipping`);
return;
}
await db.transaction(async (trx) => {
await trx.query(
'INSERT INTO processed_messages (message_id) VALUES ($1)',
[data.messageId]
);
await trx.query(
'INSERT INTO orders (id, user_id, total) VALUES ($1, $2, $3)',
[data.orderId, data.userId, data.total]
);
});
}
Use case: Payment processing, order creation β anything where losing a message is unacceptable.
Exactly-Once Delivery
The message is delivered exactly one time. No loss, no duplication. This is extremely difficult to achieve in a distributed system.
The truth: True exactly-once delivery across distributed systems is theoretically impossible. What systems actually provide is "effectively exactly-once" through at-least-once delivery plus idempotent processing.
Kafka's "exactly-once semantics" (EOS):
Producer (idempotent) βββΆ Broker (deduplicates) βββΆ Consumer (transactional)
This works within Kafka's ecosystem (produce-transform-produce).
For external systems, you still need idempotent consumers.
// Kafka exactly-once configuration
const producer = kafka.producer({
idempotent: true, // Enable idempotent producer
transactionalId: 'my-app-tx', // Enable transactions
});
async function processAndForward(message: KafkaMessage) {
const transaction = await producer.transaction();
try {
// Process input
const result = transform(message);
// Produce output (within transaction)
await transaction.send({
topic: 'output-topic',
messages: [{ value: JSON.stringify(result) }],
});
// Commit consumer offset (within same transaction)
await transaction.sendOffsets({
consumerGroupId: 'my-group',
topics: [{
topic: 'input-topic',
partitions: [{ partition: 0, offset: message.offset }],
}],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
Dead Letter Queues
A dead letter queue (DLQ) captures messages that cannot be processed after multiple retry attempts. Without a DLQ, poison messages (messages that always fail) would block the queue forever or be lost silently.
Normal flow:
Producer βββΆ Main Queue βββΆ Consumer βββΆ Success β
With DLQ:
Producer βββΆ Main Queue βββΆ Consumer βββΆ Fail (attempt 1)
βββΆ Fail (attempt 2)
βββΆ Fail (attempt 3)
βββΆ Dead Letter Queue
β
βΌ
Alert + Manual review
// RabbitMQ DLQ setup
async function setupQueuesWithDLQ() {
const channel = await getChannel();
// Dead letter queue
await channel.assertQueue('order-processing-dlq', {
durable: true,
});
// Main queue with DLQ configuration
await channel.assertQueue('order-processing', {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'order-processing-dlq',
'x-message-ttl': 30000, // Messages expire after 30s if not consumed
},
});
}
// Consumer with retry logic
async function startConsumerWithRetry() {
const channel = await getChannel();
const MAX_RETRIES = 3;
channel.consume('order-processing', async (msg) => {
if (!msg) return;
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) as number;
try {
const order = JSON.parse(msg.content.toString());
await processOrder(order);
channel.ack(msg);
} catch (error) {
if (retryCount < MAX_RETRIES) {
// Republish with incremented retry count
channel.sendToQueue('order-processing',
msg.content,
{
...msg.properties,
headers: { ...msg.properties.headers, 'x-retry-count': retryCount + 1 },
}
);
channel.ack(msg); // Ack original to prevent duplicate
} else {
// Max retries exceeded β send to DLQ
channel.nack(msg, false, false);
await alerting.send({
severity: 'high',
message: `Order processing failed after ${MAX_RETRIES} retries`,
orderId: JSON.parse(msg.content.toString()).orderId,
error: error.message,
});
}
}
});
}
// DLQ consumer β for manual review or automated reprocessing
async function startDLQConsumer() {
const channel = await getChannel();
channel.consume('order-processing-dlq', async (msg) => {
if (!msg) return;
const order = JSON.parse(msg.content.toString());
const retryCount = msg.properties.headers?.['x-retry-count'] || 0;
// Log for investigation
console.error('Dead letter message:', {
orderId: order.orderId,
retryCount,
originalTimestamp: msg.properties.timestamp,
});
// Store in database for manual review
await db.query(
`INSERT INTO dead_letters (queue, message_id, content, retry_count, created_at)
VALUES ($1, $2, $3, $4, NOW())`,
['order-processing', msg.properties.messageId, msg.content.toString(), retryCount]
);
channel.ack(msg);
});
}
RabbitMQ vs Kafka
These are the two most popular messaging systems, and they solve fundamentally different problems.
RabbitMQ: Smart broker, dumb consumers
ββββββββββββ ββββββββββββββββββββββββββββ ββββββββββββ
β Producer ββββββΆβ RabbitMQ (routes msgs) ββββββΆβ Consumer β
ββββββββββββ β Exchanges β Queues β ββββββββββββ
β Message deleted on ACK β
ββββββββββββββββββββββββββββ
Kafka: Dumb broker, smart consumers
ββββββββββββ ββββββββββββββββββββββββββββ ββββββββββββ
β Producer ββββββΆβ Kafka (append-only log) βββββββ Consumer β
ββββββββββββ β Topics β Partitions β β(pulls) β
β Messages retained β ββββββββββββ
ββββββββββββββββββββββββββββ
Comparison Table
| Aspect | RabbitMQ | Kafka |
|---|---|---|
| Model | Message queue (push) | Distributed log (pull) |
| Message lifetime | Deleted after ACK | Retained for configured period |
| Routing | Flexible (exchange types: direct, fanout, topic, headers) | Topic + partition only |
| Ordering | Per-queue FIFO | Per-partition FIFO |
| Throughput | 10K-50K msg/sec per node | 100K-1M+ msg/sec per node |
| Consumer groups | Competing consumers on same queue | Built-in consumer groups with offset tracking |
| Replay | Not possible (messages deleted) | Replay from any offset |
| Latency | Lower (push-based) | Slightly higher (poll-based, but configurable) |
| Protocol | AMQP, MQTT, STOMP | Custom binary protocol |
| Complexity | Moderate | Higher (ZooKeeper/KRaft, partition management) |
| Best for | Task queues, RPC, complex routing | Event streaming, log aggregation, high throughput |
| Scaling | Add nodes (limited by exchange routing) | Add partitions (linear scaling) |
When to Choose RabbitMQ
- Traditional task queues (email sending, image processing)
- Request/reply patterns (RPC over messaging)
- Complex routing logic (route by message attributes)
- Lower throughput requirements with complex message flows
- Team is familiar with traditional message brokers
When to Choose Kafka
- High-throughput event streaming (100K+ events/sec)
- Event sourcing (need to replay events)
- Log aggregation across multiple services
- Stream processing (Kafka Streams, ksqlDB)
- Data pipeline between systems (CDC, ETL)
- Multiple consumer groups reading the same data independently
Architecture Patterns
CQRS with Event-Driven Communication
Command side (writes):
ββββββββββββ ββββββββββββββββ βββββββββββββββββ
β Client ββββββΆβ Command API ββββββΆβ Write Database β
ββββββββββββ ββββββββ¬ββββββββ βββββββββββββββββ
β
βΌ (publishes events)
ββββββββββββββββ
β Message Queueβ
ββββββββ¬ββββββββ
β
βΌ (consumes events)
ββββββββββββββββ ββββββββββββββββ
β Projector ββββββΆβ Read Database β
ββββββββββββββββ β (optimized β
β for queries) β
Query side (reads): ββββββββ¬ββββββββ
ββββββββββββ ββββββββββββββββ β
β Client ββββββΆβ Query API ββββββββββββββ
ββββββββββββ ββββββββββββββββ
Saga Orchestration via Message Queue
// Saga orchestrator β coordinates multi-service transactions
class OrderSagaOrchestrator {
async startSaga(order: Order) {
const sagaId = generateId();
// Step 1: Reserve inventory
await messageBroker.publish('inventory.reserve', {
sagaId,
orderId: order.id,
items: order.items,
});
await sagaStore.save(sagaId, {
step: 'INVENTORY_RESERVE',
orderId: order.id,
status: 'PENDING',
});
}
async onInventoryReserved(event: InventoryReservedEvent) {
const saga = await sagaStore.get(event.sagaId);
// Step 2: Process payment
await messageBroker.publish('payment.process', {
sagaId: event.sagaId,
orderId: saga.orderId,
amount: saga.total,
});
await sagaStore.update(event.sagaId, { step: 'PAYMENT_PROCESS' });
}
async onPaymentCompleted(event: PaymentCompletedEvent) {
const saga = await sagaStore.get(event.sagaId);
// Step 3: Confirm order
await messageBroker.publish('order.confirm', {
sagaId: event.sagaId,
orderId: saga.orderId,
});
await sagaStore.update(event.sagaId, { step: 'COMPLETED', status: 'SUCCESS' });
}
// Compensating actions on failure
async onPaymentFailed(event: PaymentFailedEvent) {
const saga = await sagaStore.get(event.sagaId);
// Compensate: release inventory
await messageBroker.publish('inventory.release', {
sagaId: event.sagaId,
orderId: saga.orderId,
});
await sagaStore.update(event.sagaId, { step: 'COMPENSATING', status: 'FAILED' });
}
}
Key Takeaways
- Use synchronous communication for queries that need immediate responses. Use asynchronous (queues) for commands that can be processed later.
- At-least-once delivery with idempotent consumers is the pragmatic sweet spot. True exactly-once across system boundaries is nearly impossible.
- Dead letter queues are not optional. Every production queue needs a DLQ strategy. Poison messages will happen.
- Choose RabbitMQ for traditional task queues with complex routing. Choose Kafka for high-throughput event streaming with replay capability.
- Event sourcing provides a complete audit trail and enables powerful patterns (CQRS, temporal queries), but adds significant complexity. Use it when the audit trail or replay capability provides clear business value.
- Monitor queue depth, consumer lag, and processing latency. A growing queue depth means consumers cannot keep up β scale them out or investigate bottlenecks.
- Design messages to be self-contained. A consumer should not need to call back to the producer for additional data.
- Version your message schemas from day one. Schema evolution is inevitable, and breaking consumer compatibility causes outages.