Back/Topics Pattern

Topics Pattern (Topic Exchange)

Overview

Topics pattern uses topic exchange with pattern matching. Routing keys are dot-separated words. Bindings use wildcards: * (one word), # (zero or more words).

Use Case:

  • Multi-dimensional filtering (region.severity.service)
  • Hierarchical topics (news.sport.football, news.politics)
  • Complex routing rules

Wildcard Syntax

SymbolMatchesExample
*Exactly ONE word*.error.* matches us.error.db
#Zero or MORE wordsus.# matches us.error.db, us

Example: Log Routing

Publisher

import pika

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='logs_topic', exchange_type='topic')

# Format: region.severity.service
routing_keys = [
    'us.error.database',
    'eu.warning.api',
    'us.info.frontend',
    'eu.error.payment'
]

for key in routing_keys:
    message = f"Log from {key}"
    channel.basic_publish(
        exchange='logs_topic',
        routing_key=key,
        body=message.encode()
    )
    print(f"[✓] Sent [{key}]: {message}")

Subscriber Examples

# Consumer 1: All US logs
channel.queue_bind(exchange='logs_topic', queue=q1, routing_key='us.#')
# Receives: us.error.database, us.info.frontend

# Consumer 2: All errors (any region)
channel.queue_bind(exchange='logs_topic', queue=q2, routing_key='*.error.*')
# Receives: us.error.database, eu.error.payment

# Consumer 3: EU warnings
channel.queue_bind(exchange='logs_topic', queue=q3, routing_key='eu.warning.*')
# Receives: eu.warning.api

# Consumer 4: Everything
channel.queue_bind(exchange='logs_topic', queue=q4, routing_key='#')
# Receives: ALL messages

Complete Example

import pika
import sys

connection = pika.BlockingConnection(...)
channel = connection.channel()

channel.exchange_declare(exchange='logs_topic', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind to patterns from command line
binding_keys = sys.argv[1:] if len(sys.argv) > 1 else ['#']

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='logs_topic',
        queue=queue_name,
        routing_key=binding_key
    )

print(f"[*] Listening for patterns: {binding_keys}")

def callback(ch, method, properties, body):
    print(f"[x] {method.routing_key}: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

# Usage:
# python subscriber.py "us.#"              # All US logs
# python subscriber.py "*.error.*"         # All errors
# python subscriber.py "us.error.*" "eu.error.*"  # US + EU errors

Pattern Matching Examples

Routing KeyBinding PatternMatch?
us.error.databaseus.#✅ Yes
us.error.database*.error.*✅ Yes
us.error.databaseeu.#❌ No
usus.#✅ Yes
usus.*❌ No (needs 2 words)
anything.goes.here#✅ Yes

Common Pitfalls

❌ Using * when you need #

us.* matches us.error but NOT us.error.database. Use us.# instead.

❌ Forgetting dots in routing keys

us_error_database is ONE word. Must be us.error.database

❌ Using topic exchange for simple routing

If no wildcards needed, use direct exchange (simpler, faster)

Related Patterns