Producer sends 100 messages. Only 87 are processed. 13 messages disappear without trace. No errors thrown.
Impact: Data loss, incomplete processing, silent failures
Problem: Worker crashes AFTER receiving message but BEFORE processing
With auto_ack=True, RabbitMQ removes message from queue immediately when delivered. If worker dies, message lost.
# ❌ BAD - Message lost if worker crashes
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=True # RabbitMQ immediately forgets message
)
def callback(ch, method, properties, body):
# If crash happens here, message is GONE
process_data(body) # <-- crash here = lost messageProblem: RabbitMQ broker restarts/crashes
Messages stored in RAM only. Broker restart = all messages deleted.
# ❌ BAD - Messages lost on broker restart
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message.encode()
# No delivery_mode=2 means transient message
)Queue itself disappears on broker restart. Even persistent messages lost if queue not durable.
# ❌ BAD - Queue deleted on broker restart channel.queue_declare(queue='task_queue') # durable defaults to False
Network failure between basic_publish() and broker receiving message.
# ❌ BAD - No confirmation message was received channel.basic_publish(...) # Returns immediately, no confirmation # Network dies here --> message may not reach broker
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False # Manual ack required
)
def callback(ch, method, properties, body):
try:
process_data(body)
# Only ack AFTER successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Processing failed: {e}")
# Reject and requeue for retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message.encode(),
properties=pika.BasicProperties(
delivery_mode=2 # Persistent (survives broker restart)
)
)channel.queue_declare(
queue='task_queue',
durable=True # Queue survives broker restart
)# Enable publisher confirms
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message.encode(),
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True # Return message if unroutable
)
print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
print("Message could not be routed")
except pika.exceptions.NackError:
print("Message was nacked by broker")import pika
conn_param = pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
connection = pika.BlockingConnection(conn_param)
channel = connection.channel()
# Durable queue
channel.queue_declare(queue='task_queue', durable=True)
# Enable publisher confirms
channel.confirm_delivery()
for i in range(100):
message = f"Task {i}"
try:
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message.encode(),
properties=pika.BasicProperties(
delivery_mode=2 # Persistent
),
mandatory=True
)
print(f"[✓] Sent '{message}'")
except Exception as e:
print(f"[✗] Failed to send '{message}': {e}")
connection.close()import pika
import time
conn_param = pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
connection = pika.BlockingConnection(conn_param)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
message = body.decode()
print(f"[x] Received '{message}'")
try:
# Simulate processing
time.sleep(message.count('.'))
print(f"[✓] Done processing '{message}'")
# Acknowledge ONLY after success
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[✗] Failed processing '{message}': {e}")
# Reject and requeue for retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# Fair dispatch
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False # Manual ack
)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()| Cause | Solution |
|---|---|
| Worker crashes before processing | auto_ack=False + manual ack |
| Broker restarts | durable=True + delivery_mode=2 |
| Network failure during publish | channel.confirm_delivery() |
| Queue deleted on restart | queue_declare(durable=True) |
Golden Rule: For zero message loss, need ALL THREE: durable queue + persistent messages + manual acknowledgments