Back to Home/Work Queue Pattern

Work Queue Pattern

Overview

Work Queue (aka Task Queue) distributes time-consuming tasks among multiple workers. Main idea: avoid doing resource-intensive task immediately and having to wait for it to complete. Instead, schedule task to be done later.

Use Case:

  • Web server processing image uploads (resize, compress)
  • Background data processing
  • Email/notification sending

Architecture

[Producer] --messages--> [Queue] --> [Worker 1]
                                   |
                                   |--> [Worker 2]
                                   |
                                   |--> [Worker N]
  • Round-robin dispatch: Each message sent to next consumer in sequence
  • Fair dispatch: Use prefetch_count=1 to prevent overloading one worker
  • Acknowledgments: Worker must ack message after processing (prevents loss if worker dies)

Producer Implementation

import pika

# Connection setup
conn_param = pika.ConnectionParameters(
    host='concurp1.isc.heia-fr.ch',
    port=5072,
    credentials=pika.PlainCredentials('guest', 'guest')
)

connection = pika.BlockingConnection(conn_param)
channel = connection.channel()

# Declare durable queue (survives broker restart)
channel.queue_declare(queue='task_queue', durable=True)

# Publish persistent message
message = "Task data: process this image"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message.encode(),
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    )
)

print(f"[x] Sent '{message}'")
connection.close()

Important: Set durable=True AND delivery_mode=2 to ensure queue and messages survive broker restart.

Worker/Consumer Implementation

import pika
import time

conn_param = pika.ConnectionParameters(
    host='concurp1.isc.heia-fr.ch',
    port=5072,
    credentials=pika.PlainCredentials('guest', 'guest')
)

connection = pika.BlockingConnection(conn_param)
channel = connection.channel()

# Same queue declaration (idempotent)
channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    message = body.decode()
    print(f"[x] Received '{message}'")

    # Simulate work
    time.sleep(message.count('.'))

    print(f"[x] Done processing '{message}'")

    # Manual acknowledgment (CRITICAL)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Fair dispatch: don't give more than 1 message to worker at a time
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=False  # manual ack required
)

print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Critical Settings

SettingPurposeDefault
durable=TrueQueue survives broker restartFalse
delivery_mode=2Message persists to disk1 (transient)
auto_ack=FalseManual acknowledgment (prevents loss)False
prefetch_count=1Fair dispatch (one task at a time)0 (unlimited)

Common Pitfalls

❌ Forgetting manual ack

If worker crashes before ack, message is lost forever with auto_ack=True

❌ Not setting prefetch_count

One fast worker gets all messages, slow workers stay idle. Use prefetch_count=1

❌ Durable queue but transient messages

Queue survives restart, but messages don't. Need both durable=True AND delivery_mode=2

Testing Strategy

  1. Start 3 worker processes
  2. Send 10 messages from producer
  3. Observe round-robin distribution (workers get ~3-4 messages each)
  4. Kill one worker mid-processing (Ctrl+C)
  5. Verify unacked message is redelivered to another worker
  6. Restart broker, verify queue and messages still exist

Related Patterns

Common Problems