RPC over RabbitMQ allows client to call remote function on server and wait for result. Client sends request message, blocks waiting for response. Server processes request and sends result back.
Use Case: Lab21 Translation Service
[Client] --request--> [rpc_queue] --> [Server]
^ |
| |
+------- [reply_to_queue] <--response--+
Key Components:
1. rpc_queue: where server listens for requests
2. reply_to: unique queue per client for responses
3. correlation_id: UUID to match request with responserpc_queue with:reply_to = callback queue namecorrelation_id = unique UUIDrpc_queue, processes requestreply_to queue with same correlation_idcorrelation_id, returns resultimport pika
import uuid
class RpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
self.channel = self.connection.channel()
# Create exclusive callback queue (auto-deleted when connection closes)
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
# Listen for responses
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
# Check if this response matches our request
if self.corr_id == props.correlation_id:
self.response = body.decode()
def call(self, word):
# Reset response
self.response = None
self.corr_id = str(uuid.uuid4())
# Send request
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=word.encode()
)
# Wait for response (blocks here)
while self.response is None:
self.connection.process_data_events()
return self.response
def close(self):
self.connection.close()
# Usage
rpc = RpcClient()
result = rpc.call("hello")
print(f"Translation: {result}")
rpc.close()import pika
# Translation dictionary
DICTIONARY = {
"hello": "bonjour",
"world": "monde",
"cat": "chat",
"dog": "chien",
}
def translate(word):
"""Translate word using dictionary."""
return DICTIONARY.get(word.lower(), word) # Return original if not found
def on_request(ch, method, props, body):
word = body.decode()
print(f"[.] Translating '{word}'")
# Perform translation
response = translate(word)
# Send response back to client's callback queue
ch.basic_publish(
exchange='',
routing_key=props.reply_to, # Client's callback queue
properties=pika.BasicProperties(
correlation_id=props.correlation_id # Echo back correlation_id
),
body=response.encode()
)
# Acknowledge request
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"[x] Sent translation: '{response}'")
# Setup
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='concurp1.isc.heia-fr.ch',
port=5072,
credentials=pika.PlainCredentials('guest', 'guest')
)
)
channel = connection.channel()
# Declare RPC queue
channel.queue_declare(queue='rpc_queue')
# Fair dispatch
channel.basic_qos(prefetch_count=1)
# Start consuming
channel.basic_consume(
queue='rpc_queue',
on_message_callback=on_request
)
print("[*] Awaiting RPC requests. To exit press CTRL+C")
channel.start_consuming()Lab21 builds translator that works in Unix pipes: cat file.txt | python translator.py | less
import sys
import re
# Create RPC client (same as above)
rpc = RpcClient()
# Read from stdin line by line
for line in sys.stdin:
# Split into words, preserving punctuation
words = re.findall(r'\b\w+\b|[^\w\s]', line)
translated_words = []
for word in words:
if word.isalpha():
# Translate word via RPC
translated = rpc.call(word)
translated_words.append(translated)
else:
# Keep punctuation as-is
translated_words.append(word)
# Write to stdout
print(' '.join(translated_words))
rpc.close()import pika
import configparser
# Load config.yaml
config = configparser.ConfigParser()
config.read('config.yaml')
DICTIONARY = dict(config.items('dictionary'))
def translate(word):
return DICTIONARY.get(word.lower(), word)
# Same RPC server code as above...| Setting | Purpose |
|---|---|
exclusive=True | Callback queue auto-deleted when client disconnects |
correlation_id | Match response with request (allows parallel calls) |
reply_to | Tell server where to send response |
process_data_events() | Non-blocking event loop (allows waiting for response) |
❌ Not checking correlation_id
Multiple parallel calls can return responses out-of-order. Always verify correlation_id matches.
❌ Forgetting to ack on server
Server must acknowledge request after sending response. Otherwise message redelivered.
❌ Client timeout not implemented
If server crashes, client blocks forever. Add timeout to process_data_events().
❌ Using same connection for multiple clients
Each client needs own connection and callback queue. Sharing causes response mix-ups.
# Terminal 1: Start server python rpc_server.py # Terminal 2: Test client python rpc_client.py
echo "hello world" | python translator.py cat input.txt | python translator.py | less