Back/Routing Pattern

Routing Pattern (Direct Exchange)

Overview

Routing pattern uses direct exchange to route messages to queues based on exact routing key match. Selective pub/sub.

Use Case:

  • Log levels (error, warning, info) → different handlers
  • Task types (email, sms, push) → specialized workers
  • Priority queues (high, medium, low)

Architecture

[Producer] --routing_key="error"--> [Direct Exchange]
                                            |
                                            |--(binding key="error")-> [Error Queue] -> [Error Handler]
                                            |
                                            |--(binding key="warning")-> [Warning Queue] -> [Warning Handler]
                                            |
                                            +--(binding key="info")-> [Info Queue] -> [Info Handler]

Message with key "error" ONLY goes to Error Queue

Publisher

import pika

connection = pika.BlockingConnection(...)
channel = connection.channel()

# Declare direct exchange
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')

# Publish with routing key
severity = 'error'  # or 'warning', 'info'
message = 'Database connection failed'

channel.basic_publish(
    exchange='logs_direct',
    routing_key=severity,  # Messages routed by this key
    body=f"{severity}: {message}".encode()
)

print(f"[✓] Sent [{severity}]: {message}")

Subscriber

import pika
import sys

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='logs_direct', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Subscribe to specific severities
severities = sys.argv[1:] if len(sys.argv) > 1 else ['info']

for severity in severities:
    channel.queue_bind(
        exchange='logs_direct',
        queue=queue_name,
        routing_key=severity  # Only receive messages with this key
    )

print(f"[*] Subscribed to: {severities}")

def callback(ch, method, properties, body):
    print(f"[x] {method.routing_key}: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

# Usage:
# python subscriber.py error          # Only errors
# python subscriber.py error warning  # Errors + warnings
# python subscriber.py info           # Only info

Multiple Bindings

One queue can bind to multiple routing keys.

# All logs queue (receives everything)
channel.queue_bind(exchange='logs_direct', queue='all_logs', routing_key='info')
channel.queue_bind(exchange='logs_direct', queue='all_logs', routing_key='warning')
channel.queue_bind(exchange='logs_direct', queue='all_logs', routing_key='error')

# Critical logs queue (only errors)
channel.queue_bind(exchange='logs_direct', queue='critical_logs', routing_key='error')

Key Points

ConceptDetail
Exchange Typedirect
RoutingExact key match
Use CaseSelective broadcast
Multiple BindingsSupported (queue can receive multiple keys)

Related Patterns