System hangs. No messages processed. All consumers blocked waiting. Nothing progresses.
Impact: Complete system freeze, requires manual intervention (restart)
Service A calls Service B via RPC. Service B calls Service A via RPC. Both block waiting for response → deadlock.
# Service A
def handle_request(ch, method, props, body):
result = rpc_client.call_service_b() # Blocks here
return result
# Service B
def handle_request(ch, method, props, body):
result = rpc_client.call_service_a() # Blocks here
return result
Timeline:
1. Service A receives request, calls Service B (blocks waiting)
2. Service B receives request, calls Service A (blocks waiting)
3. Service A can't respond (blocked on B)
4. Service B can't respond (blocked on A)
5. DEADLOCK ☠️Queue reaches max length. Publisher blocks trying to send. Consumer can't receive because waiting for lock held by publisher.
# Queue max length = 100
channel.queue_declare(
queue='limited_queue',
arguments={'x-max-length': 100}
)
# Producer sends 101 messages
for i in range(101):
channel.basic_publish(...) # Blocks on msg 101 (queue full)
# Consumer can't process because producer holds connection lockConsumer callback makes blocking RPC call on SAME connection. Connection blocked by RPC, can't process incoming RPC response.
# ❌ BAD - Same connection for consume + RPC
connection = pika.BlockingConnection(...)
channel = connection.channel()
def callback(ch, method, properties, body):
# This blocks the connection
result = rpc_client.call(data) # Uses SAME connection
# RPC response can't arrive (connection blocked by this callback)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming() # DEADLOCKConsumer prefetches 100 messages. Processing first message blocks waiting for external resource. Other 99 messages can't be processed (no ack). New messages can't arrive (prefetch full).
channel.basic_qos(prefetch_count=100)
def callback(ch, method, properties, body):
# First message blocks on slow external call
external_api.slow_call() # Takes 10 minutes
# Other 99 prefetched messages can't be processed
# Ack never happens, prefetch buffer stays full
ch.basic_ack(delivery_tag=method.delivery_tag)Don't block waiting for response. Use callback queues.
# Service A
def handle_request(ch, method, props, body):
# Send async request to B (don't wait)
channel.basic_publish(
exchange='',
routing_key='service_b_queue',
body=body,
properties=pika.BasicProperties(reply_to='service_a_callback')
)
# Process immediately, response comes to callback queue
ch.basic_ack(delivery_tag=method.delivery_tag)
# No blocking = no deadlockUse different connection for RPC client vs consumer.
# Connection 1: For consuming
consume_conn = pika.BlockingConnection(...)
consume_channel = consume_conn.channel()
# Connection 2: For RPC client (separate!)
rpc_conn = pika.BlockingConnection(...)
rpc_channel = rpc_conn.channel()
rpc_client = RpcClient(rpc_channel)
def callback(ch, method, properties, body):
# RPC uses different connection, no blocking
result = rpc_client.call(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
consume_channel.basic_consume(queue='task_queue', on_message_callback=callback)
consume_channel.start_consuming()Add timeout to RPC calls. Fail fast instead of blocking forever.
class RpcClient:
def call(self, data, timeout=5.0):
self.response = None
self.corr_id = str(uuid.uuid4())
channel.basic_publish(...)
# Wait with timeout
start = time.time()
while self.response is None:
if time.time() - start > timeout:
raise TimeoutError("RPC call timed out")
self.connection.process_data_events(time_limit=0.1)
return self.responsePrevent buffer from filling up. Allow other consumers to take over.
# Instead of prefetch_count=100 channel.basic_qos(prefetch_count=1) # Only 1 message at a time # If processing blocks, other consumers can take remaining messages
Move blocking call to separate thread. Main thread acks immediately.
import threading
def callback(ch, method, properties, body):
# Ack immediately
ch.basic_ack(delivery_tag=method.delivery_tag)
# Process in background thread
thread = threading.Thread(
target=process_blocking_task,
args=(body,)
)
thread.start()
def process_blocking_task(body):
# Blocking operation here (separate thread, won't block consumer)
external_api.slow_call()# Check queue status
rabbitmqctl list_queues name messages consumers
# Check connections
rabbitmqctl list_connections
# Thread dump (Python)
import threading
print(threading.enumerate())
for thread in threading.enumerate():
print(thread, thread.is_alive())| Cause | Solution |
|---|---|
| RPC cycle | Use async pattern, no blocking waits |
| Same connection for RPC + consume | Separate connections |
| Blocking callback | Move to thread, ack immediately |
| Prefetch buffer full | Lower prefetch_count |
| Infinite wait | Add timeouts |