Orchestrating Parallel Workloads with RabbitMQ Fanout Architecture

The Single Queue Constraint: Why Round-Robin Isn’t Enough

In a standard RabbitMQ configuration, multiple consumers connected to a single queue operate under a Compensating Consumers pattern. RabbitMQ distributes messages using a Round-Robin algorithm, ensuring each message is processed exactly once by one of the available workers.

While this is excellent for horizontal scaling of a single task, it is technically impossible for two distinct services (e.g., a Jenkins Pipeline and a Rest API) to read the same message from the same queue.

These are completely unrelated examples, merely to illustrate two isolated consumers that need to receive the same message at the same time.

Once a consumer acknowledges a message, it is purged from that specific queue to prevent duplicate processing.

https://github.com/faustobranco/devops-db/tree/master/knowledge-base/rabbitmq/MultiQueue

The Solution: The Fanout Exchange Pattern

To allow multiple independent systems to react to the same event, we must decouple the producer from the queues using a Fanout Exchange.

  • The Producer sends a message to an Exchange, not a Queue.
  • The Fanout Exchange acts as a message duplicator, routing a copy of the message to every queue bound to it.
  • Independent Queues ensure that each service has its own persistent buffer. If the Jenkins worker is down, the Health worker can still process its copy without interference.

1. Infrastructure Configuration (REST API)

Using the RabbitMQ Management API is the most efficient way to ensure your infrastructure is declared idempotently. Below are the corrected calls using your production DNS.

A. Creating the Exchange

We declare the jenkins_exchange as a fanout. This is the “broadcaster.”

Bash

curl -i -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ -H "content-type:application/json" \
    -X PUT https://rabbitmq.devops-db.internal/api/exchanges/%2f/jenkins_exchange \
    -d '{"type":"fanout","durable":true}'

B. Declaring Service-Specific Queues

Each service needs its own “mailbox.” We also attach a Dead Letter Exchange (DLX) to handle malformed JSON payloads.

Bash

# Queue for Jenkins Service
curl -i -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ -H "content-type:application/json" \
    -X PUT https://rabbitmq.devops-db.internal/api/queues/%2f/jenkins_deploy_queue \
    -d '{"durable":true,"arguments":{"x-dead-letter-exchange":"jenkins_dlx"}}'

# Queue for Health/Monitoring Service
curl -i -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ -H "content-type:application/json" \
    -X PUT https://rabbitmq.devops-db.internal/api/queues/%2f/health_monitor_queue \
    -d '{"durable":true,"arguments":{"x-dead-letter-exchange":"jenkins_dlx"}}'

C. Binding Queues to the Exchange

Without a binding, the Exchange will discard the messages. This creates the link between the broadcaster and the mailboxes.

Bash

# Link Jenkins
curl -i -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ -H "content-type:application/json" \
    -X POST https://rabbitmq.devops-db.internal/api/bindings/%2f/e/jenkins_exchange/q/jenkins_deploy_queue \
    -d '{"routing_key":""}'

# Link Health Monitor
curl -i -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ -H "content-type:application/json" \
    -X POST https://rabbitmq.devops-db.internal/api/bindings/%2f/e/jenkins_exchange/q/health_monitor_queue \
    -d '{"routing_key":""}'

2. Implementation: The Adaptive Worker

The following Python implementation uses a polymorphic approach, allowing a single codebase to serve different business logics based on a runtime flag.

import pika
import requests
import json
import sys
import urllib3
import argparse

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Infrastructure Constants
RABBITMQ_HOST = '172.21.5.76'
RABBITMQ_PORT = 31572
RABBITMQ_USER = 'admin'
RABBITMQ_PASS = 'J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ'
EXCHANGE_NAME = 'jenkins_exchange'

# Service Endpoints
JENKINS_URL = 'https://jenkins.devops-db.internal' 
JENKINS_JOB_PATH = 'job/infrastructure/job/pipelines/job/tests/job/RabbitMQ-Example1'
JENKINS_USER = 'fbranco'
JENKINS_API_TOKEN = '11168682ae86c6a8abe39b219fb1d0424e'
HEALTH_API_URL = 'https://dns-api.devops-db.internal/health'

def trigger_jenkins_pipeline(payload):
    build_url = f"{JENKINS_URL}/{JENKINS_JOB_PATH}/buildWithParameters"
    auth_credentials = (JENKINS_USER, JENKINS_API_TOKEN)
    params = {'PAYLOAD_JSON': json.dumps(payload)}
    
    try:
        response = requests.post(build_url, auth=auth_credentials, data=params, verify=False, timeout=10)
        response.raise_for_status()
        return True
    except Exception as err:
        print(f"Jenkins Integration Error: {err}")
        return False

def trigger_health_api(payload):
    # Note: Health check endpoints often expect GET. 
    # If the API accepts POST with data, use requests.post
    try:
        response = requests.get(HEALTH_API_URL, verify=False, timeout=5)
        response.raise_for_status()
        return True
    except Exception as err:
        print(f"Health API Monitoring Error: {err}")
        return False

def process_message(channel, method, properties, body, worker_type):
    print(f"\n[x] Processing message for: {worker_type}")
    try:
        payload = json.loads(body)
        
        # Strategy pattern based on worker type
        if worker_type == 'jenkins':
            success = trigger_jenkins_pipeline(payload)
        else:
            success = trigger_health_api(payload)
        
        if success:
            channel.basic_ack(delivery_tag=method.delivery_tag)
            print("[+] Task completed and acknowledged.")
        else:
            # Requeue if it's a transient network error
            channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
            
    except json.JSONDecodeError:
        print("[!] Invalid JSON format. Rejecting to DLX.")
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

def start_worker(worker_type):
    queue_name = 'jenkins_deploy_queue' if worker_type == 'jenkins' else 'health_monitor_queue'
    
    credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials))
    channel = connection.channel()

    # Ensure infrastructure exists via AMQP declaration
    channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='fanout', durable=True)
    channel.queue_declare(queue=queue_name, durable=True, arguments={'x-dead-letter-exchange': 'jenkins_dlx'})
    channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name)

    channel.basic_qos(prefetch_count=1)
    
    # Passing context to the callback using a lambda
    on_msg = lambda ch, method, props, body: process_message(ch, method, props, body, worker_type)
    
    channel.basic_consume(queue=queue_name, on_message_callback=on_msg)
    print(f"[*] Subscribed to {queue_name}. Waiting for messages...")
    channel.start_consuming()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--type', choices=['jenkins', 'health'], required=True)
    args = parser.parse_args()
    start_worker(args.type)

Practical Example

I’ll send a message first, even before starting the two consumers, so I can see the status in the RabbitMQ UI.

python3 messages.py
[+] Sending VALID JSON message...
Done. Message published.

Or

curl -s -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ \
    -X GET https://rabbitmq.devops-db.internal/api/queues/%2f/jenkins_deploy_queue \
    | jq '{name: .name, total: .messages, ready: .messages_ready, unacked: .messages_unacknowledged, workers: .consumers}'

{
  "name": "jenkins_deploy_queue",
  "total": 1,
  "ready": 1,
  "unacked": 0,
  "workers": 0
}

Activate the two consumers on different terminals.

python3 rabbitmq_worker.py --type health

and

python3 rabbitmq_worker.py --type jenkins

or

curl -s -u admin:J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ \
    -X GET https://rabbitmq.devops-db.internal/api/queues/%2f/jenkins_deploy_queue \
    | jq '{name: .name, total: .messages, ready: .messages_ready, unacked: .messages_unacknowledged, workers: .consumers}'
{
  "name": "jenkins_deploy_queue",
  "total": 0,
  "ready": 0,
  "unacked": 0,
  "workers": 1
}

Summary of Benefits

  1. Fault Tolerance: If the health worker crashes, messages remain safely stored in the health_monitor_queue without blocking the jenkins worker.
  2. Scalability: You can add a third worker (e.g., a logging service) simply by creating a new queue and binding it to the same exchange—no changes required to existing code.
  3. Observability: By separating queues, you can monitor the individual lag of each service in the RabbitMQ Dashboard.