Consumer waits idle while broker sends one message at a time. Network round-trip overhead kills throughput.
❌ Bad (prefetch=1)
channel.basic_qos(prefetch_count=1) # Consumer idle during network RTT # [Process msg] [Wait] [Process] [Wait] # Throughput: ~100 msg/s
✅ Good (prefetch=10-50)
channel.basic_qos(prefetch_count=20) # Consumer has buffer of messages # [Process][Process][Process]... # Throughput: ~2000 msg/s
Connection setup expensive (TCP handshake, auth, channel creation). Reuse connections.
❌ Bad
for msg in messages:
conn = pika.BlockingConnection(...)
channel = conn.channel()
channel.basic_publish(...)
conn.close()
# 1000 messages = 1000 connections✅ Good
conn = pika.BlockingConnection(...)
channel = conn.channel()
for msg in messages:
channel.basic_publish(...)
conn.close()
# 1000 messages = 1 connectionOne consumer can't keep up. Add more consumer instances (horizontal scaling).
# Terminal 1 python consumer.py # Terminal 2 python consumer.py # Add second instance # Terminal 3 python consumer.py # Add third instance # Queue distributes messages round-robin # Throughput 3x faster
Callback blocks on slow I/O. Use threading to process in parallel.
❌ Slow (synchronous)
def callback(ch, method, props, body):
# Blocks entire consumer
result = slow_api_call(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Processes 1 msg at a time✅ Fast (threaded)
def callback(ch, method, props, body):
threading.Thread(
target=process_async,
args=(ch, method, body)
).start()
def process_async(ch, method, body):
result = slow_api_call(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Processes multiple msgs in parallelPublishing messages one-by-one. Use batching + confirm_delivery().
channel.confirm_delivery()
batch = []
for msg in messages:
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=msg.encode()
)
batch.append(msg)
# Flush batch every 100 messages
if len(batch) >= 100:
# All published in one network round-trip
batch.clear()
# Much faster than individual publishesPersistence adds disk I/O overhead. Use transient messages for non-critical data.
Persistent (slower)
channel.basic_publish(
...,
properties=pika.BasicProperties(
delivery_mode=2 # Disk write
)
)
# ~1000 msg/sTransient (faster)
channel.basic_publish(
...,
properties=pika.BasicProperties(
delivery_mode=1 # RAM only
)
)
# ~10000 msg/sHigh RTT between client and broker. Run consumers closer to broker.
# Check latency ping concurp1.isc.heia-fr.ch # If >50ms, consider: # - Running consumers in same datacenter as broker # - Increasing prefetch_count to hide latency # - Using connection pooling
| Use Case | prefetch_count | Persistence | Workers |
|---|---|---|---|
| High throughput | 50-100 | Transient | Many (10+) |
| Low latency | 1-5 | Transient | Few (1-3) |
| Reliable processing | 1-10 | Persistent | Medium (3-5) |
| Heavy processing | 5-20 | Persistent | Many + threading |
import time
# Producer
start = time.time()
for i in range(10000):
channel.basic_publish(...)
duration = time.time() - start
print(f"Throughput: {10000/duration:.0f} msg/s")
# Consumer
count = 0
start = time.time()
def callback(ch, method, props, body):
global count
count += 1
if count == 10000:
duration = time.time() - start
print(f"Throughput: {10000/duration:.0f} msg/s")
ch.basic_ack(delivery_tag=method.delivery_tag)# Check queue stats rabbitmqctl list_queues name messages messages_ready messages_unacknowledged # Check rates rabbitmqctl list_queues name message_stats.publish message_stats.deliver