Producer sends messages 1, 2, 3 in order. Consumer receives 1, 3, 2. Order violated.
Impact: State corruption (user created before account), database constraint violations, business logic errors
RabbitMQ distributes messages round-robin. Consumer A gets msg 1, Consumer B gets msg 2. If B processes faster, messages arrive out-of-order.
# Producer sends: 1, 2, 3
[Queue] --> Consumer A (slow) <-- msg 1 (takes 5s)
|
--> Consumer B (fast) <-- msg 2 (takes 1s) ✓ FINISHES FIRST
|
--> Consumer C (medium) <-- msg 3 (takes 2s)
Result: 2, 3, 1 (order violated)Consumer crashes while processing msg 2. RabbitMQ requeues msg 2. Meanwhile msg 3 already processed. Redelivered msg 2 arrives AFTER msg 3.
Timeline: 1. Msg 1 processed ✓ 2. Msg 2 delivered, consumer CRASHES before ack 3. Msg 3 processed ✓ 4. Msg 2 redelivered (requeued) 5. Msg 2 processed ✓ Final order: 1, 3, 2
Consumer uses threading/async to process messages concurrently. Race condition causes out-of-order completion.
# ❌ BAD - Threads complete out-of-order
def callback(ch, method, properties, body):
thread = threading.Thread(target=process, args=(body,))
thread.start() # Non-blocking, order not preservedOnly ONE consumer processes queue. Guarantees FIFO order.
# Only run ONE instance of this consumer
channel.basic_consume(
queue='strict_order_queue',
on_message_callback=callback,
auto_ack=False
)
def callback(ch, method, properties, body):
# Process synchronously (blocking)
result = process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.start_consuming() # Run only ONE instanceTrade-off: No parallelism. Slower throughput. Good for strict ordering requirements.
Producer adds sequence number to each message. Consumer buffers out-of-order messages and reorders them.
# Producer
sequence = 0
for msg in messages:
sequence += 1
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps({'seq': sequence, 'data': msg}).encode()
)
# Consumer
expected_seq = 1
buffer = {} # {seq: message}
def callback(ch, method, properties, body):
msg = json.loads(body.decode())
seq = msg['seq']
# Buffer message
buffer[seq] = msg
# Process all consecutive messages
global expected_seq
while expected_seq in buffer:
process_message(buffer[expected_seq])
del buffer[expected_seq]
expected_seq += 1
ch.basic_ack(delivery_tag=method.delivery_tag)Use routing key to send related messages to same queue. Different entities can process in parallel, but same entity always ordered.
# Producer
channel.basic_publish(
exchange='direct_exchange',
routing_key=f'user_{user_id}', # Route by user ID
body=message.encode()
)
# Consumer binds to specific user queue
channel.queue_bind(
exchange='direct_exchange',
queue='user_123_queue',
routing_key='user_123'
)
# Result: All messages for user 123 processed in order
# Messages for different users can process in parallelLimit consumer to ONE message at a time. Process synchronously (no threads).
channel.basic_qos(prefetch_count=1) # Only 1 message at a time
def callback(ch, method, properties, body):
# Synchronous processing (blocking)
result = process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Next message only delivered AFTER ack
channel.basic_consume(
queue='ordered_queue',
on_message_callback=callback,
auto_ack=False
)# Make early messages slow
if seq == 1:
time.sleep(5) # Delay first message| Solution | Pros | Cons |
|---|---|---|
| Single Consumer | Simple, guaranteed order | No parallelism, bottleneck |
| Sequence Numbers | Allows parallelism | Complex, memory overhead |
| Separate Queues | Per-entity parallelism | Queue explosion |
| prefetch_count=1 | Simple config | Slower throughput |