Back to Home/Lost Messages Problem

Problem: Messages Lost

Symptom

Producer sends 100 messages. Only 87 are processed. 13 messages disappear without trace. No errors thrown.

Impact: Data loss, incomplete processing, silent failures

Root Causes

1. Auto-Acknowledgment Enabled

Problem: Worker crashes AFTER receiving message but BEFORE processing

With auto_ack=True, RabbitMQ removes message from queue immediately when delivered. If worker dies, message lost.

# ❌ BAD - Message lost if worker crashes
channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=True  # RabbitMQ immediately forgets message
)

def callback(ch, method, properties, body):
    # If crash happens here, message is GONE
    process_data(body)  # <-- crash here = lost message

2. Non-Persistent Messages

Problem: RabbitMQ broker restarts/crashes

Messages stored in RAM only. Broker restart = all messages deleted.

# ❌ BAD - Messages lost on broker restart
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message.encode()
    # No delivery_mode=2 means transient message
)

3. Non-Durable Queue

Queue itself disappears on broker restart. Even persistent messages lost if queue not durable.

# ❌ BAD - Queue deleted on broker restart
channel.queue_declare(queue='task_queue')  # durable defaults to False

4. Connection Lost Before Message Published

Network failure between basic_publish() and broker receiving message.

# ❌ BAD - No confirmation message was received
channel.basic_publish(...)  # Returns immediately, no confirmation
# Network dies here --> message may not reach broker

Solutions

✅ Solution 1: Manual Acknowledgments

channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=False  # Manual ack required
)

def callback(ch, method, properties, body):
    try:
        process_data(body)
        # Only ack AFTER successful processing
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Processing failed: {e}")
        # Reject and requeue for retry
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

✅ Solution 2: Persistent Messages

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message.encode(),
    properties=pika.BasicProperties(
        delivery_mode=2  # Persistent (survives broker restart)
    )
)

✅ Solution 3: Durable Queue

channel.queue_declare(
    queue='task_queue',
    durable=True  # Queue survives broker restart
)

✅ Solution 4: Publisher Confirms

# Enable publisher confirms
channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message.encode(),
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True  # Return message if unroutable
    )
    print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
    print("Message could not be routed")
except pika.exceptions.NackError:
    print("Message was nacked by broker")

Complete Reliable Example

Producer (Reliable)

import pika

conn_param = pika.ConnectionParameters(
    host='concurp1.isc.heia-fr.ch',
    port=5072,
    credentials=pika.PlainCredentials('guest', 'guest')
)

connection = pika.BlockingConnection(conn_param)
channel = connection.channel()

# Durable queue
channel.queue_declare(queue='task_queue', durable=True)

# Enable publisher confirms
channel.confirm_delivery()

for i in range(100):
    message = f"Task {i}"
    try:
        channel.basic_publish(
            exchange='',
            routing_key='task_queue',
            body=message.encode(),
            properties=pika.BasicProperties(
                delivery_mode=2  # Persistent
            ),
            mandatory=True
        )
        print(f"[✓] Sent '{message}'")
    except Exception as e:
        print(f"[✗] Failed to send '{message}': {e}")

connection.close()

Consumer (Reliable)

import pika
import time

conn_param = pika.ConnectionParameters(
    host='concurp1.isc.heia-fr.ch',
    port=5072,
    credentials=pika.PlainCredentials('guest', 'guest')
)

connection = pika.BlockingConnection(conn_param)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    message = body.decode()
    print(f"[x] Received '{message}'")

    try:
        # Simulate processing
        time.sleep(message.count('.'))
        print(f"[✓] Done processing '{message}'")

        # Acknowledge ONLY after success
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"[✗] Failed processing '{message}': {e}")

        # Reject and requeue for retry
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# Fair dispatch
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=False  # Manual ack
)

print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

How to Test

  1. Test Worker Crash:
    • Start consumer
    • Send 10 messages
    • Kill consumer (Ctrl+C) while processing message 5
    • Start consumer again
    • Verify message 5 is redelivered
  2. Test Broker Restart:
    • Send 10 persistent messages to durable queue
    • Restart RabbitMQ broker
    • Start consumer
    • Verify all 10 messages still exist
  3. Test Network Failure:
    • Enable publisher confirms
    • Disconnect network during publish
    • Verify exception raised, message not lost

Summary

CauseSolution
Worker crashes before processingauto_ack=False + manual ack
Broker restartsdurable=True + delivery_mode=2
Network failure during publishchannel.confirm_delivery()
Queue deleted on restartqueue_declare(durable=True)

Golden Rule: For zero message loss, need ALL THREE: durable queue + persistent messages + manual acknowledgments