Back/Connection Failures

Problem: Connection Failures

Symptoms

  • AMQPConnectionError: Connection refused
  • StreamLostError: Stream connection lost
  • ConnectionClosed: (320) CONNECTION_FORCED
  • Consumer stops receiving messages
  • Publisher hangs indefinitely

Root Causes

1. Network Interruption

WiFi drops, VPN disconnects, firewall kills idle connection

# Error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(54, 'Connection reset by peer')

2. Heartbeat Timeout

Broker doesn't receive heartbeat from client. Assumes client dead, closes connection.

# Default heartbeat: 60s
# If client doesn't send heartbeat in 120s (2x interval), broker kills connection

# Caused by:
# - Blocking operation in callback (no heartbeat sent)
# - CPU overload (no time to send heartbeat)
# - Network congestion

3. Wrong Credentials / Permissions

# Error
pika.exceptions.AMQPConnectionError: (403, "ACCESS_REFUSED - Login was refused")

# Or
pika.exceptions.ChannelClosedByBroker: (403, 'ACCESS_REFUSED')

4. Broker Restart / Crash

RabbitMQ server restarts for maintenance or crashes. All connections dropped.

Solutions

✅ Solution 1: Auto-Reconnect Pattern

import pika
import time

def create_connection():
    while True:
        try:
            connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host='concurp1.isc.heia-fr.ch',
                    port=5072,
                    credentials=pika.PlainCredentials('guest', 'guest'),
                    heartbeat=600,  # 10 min
                    blocked_connection_timeout=300
                )
            )
            print("[✓] Connected to RabbitMQ")
            return connection
        except pika.exceptions.AMQPConnectionError as e:
            print(f"[✗] Connection failed: {e}")
            print("[⟳] Retrying in 5s...")
            time.sleep(5)

# Usage
connection = create_connection()
channel = connection.channel()

# In consumer loop
try:
    channel.start_consuming()
except (pika.exceptions.StreamLostError, pika.exceptions.AMQPConnectionError):
    print("[✗] Connection lost, reconnecting...")
    connection = create_connection()
    channel = connection.channel()
    # Re-setup consumer
    channel.basic_consume(...)
    channel.start_consuming()

✅ Solution 2: Increase Heartbeat Interval

For long-running tasks, increase heartbeat to avoid timeout during processing

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest'),
        heartbeat=600  # 10 minutes (default is 60s)
    )
)

✅ Solution 3: Connection Pooling

Reuse connections across multiple operations. Don't create new connection per message.

class ConnectionPool:
    def __init__(self):
        self.connection = None
        self.channel = None

    def get_channel(self):
        if self.connection is None or self.connection.is_closed:
            self.connection = create_connection()
            self.channel = self.connection.channel()
        return self.channel

pool = ConnectionPool()

# Reuse channel
for msg in messages:
    channel = pool.get_channel()
    channel.basic_publish(...)

✅ Solution 4: Connection Health Check

def is_connection_open(connection):
    return connection and connection.is_open

def ensure_connection():
    global connection, channel
    if not is_connection_open(connection):
        print("[⟳] Reconnecting...")
        connection = create_connection()
        channel = connection.channel()
        setup_consumer(channel)

# Check periodically
import threading

def health_check():
    while True:
        time.sleep(30)  # Check every 30s
        ensure_connection()

threading.Thread(target=health_check, daemon=True).start()

✅ Solution 5: Graceful Shutdown

import signal
import sys

def signal_handler(sig, frame):
    print("[!] Shutting down gracefully...")
    try:
        channel.stop_consuming()
        connection.close()
    except Exception as e:
        print(f"[✗] Error during shutdown: {e}")
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Now Ctrl+C closes connection cleanly

Connection Parameters Reference

ParameterDefaultRecommendedPurpose
heartbeat60s600sDetect dead connections
blocked_connection_timeoutNone300sTimeout when broker blocks
connection_attempts13-5Retry on initial connect
retry_delay2s5sWait between retries
socket_timeout10s10sTCP socket timeout

Complete Robust Connection Example

import pika
import time
import signal
import sys

class RobustConsumer:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.should_stop = False

    def connect(self):
        while not self.should_stop:
            try:
                self.connection = pika.BlockingConnection(
                    pika.ConnectionParameters(
                        host='concurp1.isc.heia-fr.ch',
                        port=5072,
                        credentials=pika.PlainCredentials('guest', 'guest'),
                        heartbeat=600,
                        blocked_connection_timeout=300,
                        connection_attempts=3,
                        retry_delay=5
                    )
                )
                self.channel = self.connection.channel()
                self.channel.queue_declare(queue='task_queue', durable=True)
                self.channel.basic_qos(prefetch_count=1)
                print("[✓] Connected")
                return True
            except Exception as e:
                print(f"[✗] Connection failed: {e}")
                print("[⟳] Retrying in 5s...")
                time.sleep(5)
        return False

    def callback(self, ch, method, properties, body):
        try:
            print(f"[x] Processing {body.decode()}")
            time.sleep(1)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[✗] Error: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

    def start(self):
        while not self.should_stop:
            if not self.connect():
                break

            self.channel.basic_consume(
                queue='task_queue',
                on_message_callback=self.callback
            )

            try:
                print("[*] Consuming...")
                self.channel.start_consuming()
            except (pika.exceptions.StreamLostError, pika.exceptions.AMQPConnectionError) as e:
                print(f"[✗] Connection lost: {e}")
                print("[⟳] Reconnecting...")
                continue
            except KeyboardInterrupt:
                print("[!] Interrupted")
                self.stop()

    def stop(self):
        self.should_stop = True
        if self.channel:
            self.channel.stop_consuming()
        if self.connection:
            self.connection.close()
        print("[✓] Stopped")

# Run
consumer = RobustConsumer()
consumer.start()

Testing Connection Resilience

  1. Test Network Drop:
    • Start consumer
    • Disable WiFi for 10 seconds
    • Re-enable WiFi
    • Verify consumer reconnects automatically
  2. Test Broker Restart:
    • Start consumer
    • Restart RabbitMQ broker
    • Verify consumer reconnects
  3. Test Heartbeat Timeout:
    • Set heartbeat=10
    • Add time.sleep(30) in callback
    • Verify connection dropped, then reconnects