Back to Home/RPC Pattern

RPC Pattern (Remote Procedure Call)

Overview

RPC over RabbitMQ allows client to call remote function on server and wait for result. Client sends request message, blocks waiting for response. Server processes request and sends result back.

Use Case: Lab21 Translation Service

  • Client sends word to translate
  • Server translates word using dictionary
  • Server returns translated word
  • Client displays result to user

Architecture

[Client] --request--> [rpc_queue] --> [Server]
    ^                                       |
    |                                       |
    +------- [reply_to_queue] <--response--+

Key Components:
1. rpc_queue: where server listens for requests
2. reply_to: unique queue per client for responses
3. correlation_id: UUID to match request with response

How It Works

  1. Client creates exclusive callback queue (auto-deleted when connection closes)
  2. Client sends request to rpc_queue with:
    • reply_to = callback queue name
    • correlation_id = unique UUID
  3. Client blocks waiting for response on callback queue
  4. Server consumes from rpc_queue, processes request
  5. Server publishes response to reply_to queue with same correlation_id
  6. Client receives response, matches correlation_id, returns result

RPC Client Implementation

import pika
import uuid

class RpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host='concurp1.isc.heia-fr.ch',
                port=5072,
                credentials=pika.PlainCredentials('guest', 'guest')
            )
        )
        self.channel = self.connection.channel()

        # Create exclusive callback queue (auto-deleted when connection closes)
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # Listen for responses
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        # Check if this response matches our request
        if self.corr_id == props.correlation_id:
            self.response = body.decode()

    def call(self, word):
        # Reset response
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # Send request
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=word.encode()
        )

        # Wait for response (blocks here)
        while self.response is None:
            self.connection.process_data_events()

        return self.response

    def close(self):
        self.connection.close()

# Usage
rpc = RpcClient()
result = rpc.call("hello")
print(f"Translation: {result}")
rpc.close()

RPC Server Implementation

import pika

# Translation dictionary
DICTIONARY = {
    "hello": "bonjour",
    "world": "monde",
    "cat": "chat",
    "dog": "chien",
}

def translate(word):
    """Translate word using dictionary."""
    return DICTIONARY.get(word.lower(), word)  # Return original if not found

def on_request(ch, method, props, body):
    word = body.decode()
    print(f"[.] Translating '{word}'")

    # Perform translation
    response = translate(word)

    # Send response back to client's callback queue
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,  # Client's callback queue
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id  # Echo back correlation_id
        ),
        body=response.encode()
    )

    # Acknowledge request
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(f"[x] Sent translation: '{response}'")

# Setup
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='concurp1.isc.heia-fr.ch',
        port=5072,
        credentials=pika.PlainCredentials('guest', 'guest')
    )
)
channel = connection.channel()

# Declare RPC queue
channel.queue_declare(queue='rpc_queue')

# Fair dispatch
channel.basic_qos(prefetch_count=1)

# Start consuming
channel.basic_consume(
    queue='rpc_queue',
    on_message_callback=on_request
)

print("[*] Awaiting RPC requests. To exit press CTRL+C")
channel.start_consuming()

Lab21: Unix Pipe Translator

Lab21 builds translator that works in Unix pipes: cat file.txt | python translator.py | less

Client (reads stdin, writes stdout)

import sys
import re

# Create RPC client (same as above)
rpc = RpcClient()

# Read from stdin line by line
for line in sys.stdin:
    # Split into words, preserving punctuation
    words = re.findall(r'\b\w+\b|[^\w\s]', line)

    translated_words = []
    for word in words:
        if word.isalpha():
            # Translate word via RPC
            translated = rpc.call(word)
            translated_words.append(translated)
        else:
            # Keep punctuation as-is
            translated_words.append(word)

    # Write to stdout
    print(' '.join(translated_words))

rpc.close()

Server with ConfigParser

import pika
import configparser

# Load config.yaml
config = configparser.ConfigParser()
config.read('config.yaml')

DICTIONARY = dict(config.items('dictionary'))

def translate(word):
    return DICTIONARY.get(word.lower(), word)

# Same RPC server code as above...

Critical Configuration

SettingPurpose
exclusive=TrueCallback queue auto-deleted when client disconnects
correlation_idMatch response with request (allows parallel calls)
reply_toTell server where to send response
process_data_events()Non-blocking event loop (allows waiting for response)

Common Pitfalls

❌ Not checking correlation_id

Multiple parallel calls can return responses out-of-order. Always verify correlation_id matches.

❌ Forgetting to ack on server

Server must acknowledge request after sending response. Otherwise message redelivered.

❌ Client timeout not implemented

If server crashes, client blocks forever. Add timeout to process_data_events().

❌ Using same connection for multiple clients

Each client needs own connection and callback queue. Sharing causes response mix-ups.

How to Test

  1. Basic RPC:
    # Terminal 1: Start server
    python rpc_server.py
    
    # Terminal 2: Test client
    python rpc_client.py
  2. Unix Pipe (Lab21):
    echo "hello world" | python translator.py
    cat input.txt | python translator.py | less
  3. Load Test:
    • Start 3 server instances (parallel processing)
    • Send 100 RPC calls from client
    • Verify all responses received correctly

Related Patterns