Example of a Jenkins pipeline call for a RabbitMQ message.
Introduction
In modern infrastructure, engineering teams often operate in hybrid environments where legacy Virtual Machines coexist with modern Kubernetes clusters. Integrating these two worlds seamlessly presents unique networking and architectural challenges.
This article details the construction of an asynchronous, event-driven CI/CD pipeline. We will deploy RabbitMQ on a Kubernetes cluster to act as a message broker, capturing deployment triggers, and securely routing them to a Jenkins instance running on a dedicated Virtual Machine. We will explore the technical “whys” behind the AMQP protocol, Kubernetes networking limitations, and how to build a resilient Python worker to bridge the gap.
Phase 1: Deploying RabbitMQ on Kubernetes
To ensure a production-ready baseline, we utilize the official Bitnami Helm chart for RabbitMQ. The configuration requires a careful balance between administrative access (HTTP/HTTPS) and application messaging (AMQP).
Here is the values.yaml used for our deployment:
ingress:
enabled: true
ingressClassName: nginx
hostname: rabbitmq.devops-db.internal
path: /
pathType: Prefix
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: "step-ca-issuer"
tls: true
global:
security:
allowInsecureImages: true
image:
registry: public.ecr.aws
repository: bitnami/rabbitmq
tag: 3.13.7-debian-12-r0
auth:
username: admin
password: J4VPegzqSKC6Syji9ga6w1JDcTRgrvDQ
erlangCookie: secretcookie
replicaCount: 1
management:
enabled: true
service:
type: NodePort
nodePorts:
amqp: 31572
persistence:
enabled: true
size: 8Gi
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
rabbitmq:
extraPlugins: "rabbitmq_management"
Deployment is executed via standard Helm commands:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm upgrade --install rabbitmq bitnami/rabbitmq --namespace rabbitmq --create-namespace -f values.yaml
Note: For security, we also created a dedicated jenkins_user with the management tag and full permissions on the / virtual host via rabbitmqctl.
Phase 2: Demystifying AMQP and the Networking Bottleneck
Once deployed, you can access the RabbitMQ management UI via https://rabbitmq.devops-db.internal. However, when attempting to send messages using the AMQP protocol to that same DNS, the connection times out. Why?
The Protocol Clash The NGINX Ingress Controller is designed to handle Layer 7 traffic (HTTP/HTTPS). It intercepts the web traffic, terminates the SSL, and routes it to the management UI. AMQP (Advanced Message Queuing Protocol), on the other hand, is a Layer 4 protocol. It requires a persistent, raw TCP connection to ensure low overhead and guaranteed message delivery. The Ingress simply drops this unrecognized TCP traffic.
The NodePort Solution To bypass the Ingress, we exposed the AMQP service using a Kubernetes NodePort (31572). This opens a direct TCP port on the physical Kubernetes nodes. By pointing our clients directly to the node’s internal IP (172.21.5.76:31572), the traffic hits the kube-proxy, which seamlessly forwards it to the RabbitMQ pod.
Architectural Evolution: High Availability (HA) Relying on a single physical Node IP (172.21.5.76) creates a Single Point of Failure (SPOF). If that specific server crashes, the pipeline breaks, even if Kubernetes reschedules the pod to another node.
The industry standard to solve this is using a DNS Round Robin or an external Load Balancer. By creating a DNS record like rabbitmq-amqp.devops-db.internal that resolves to the IPs of all Kubernetes worker nodes simultaneously, the AMQP client receives multiple IPs. If the primary node fails, the client library automatically tries the next IP, preserving High Availability without relying on Layer 7 Ingress routing.
Phase 3: The RabbitMQ Infrastructure (The Post Office)
A common misconception is that producers send messages directly to queues. In RabbitMQ, producers send messages to Exchanges (the post office sorting center), which then route the messages to Queues (the mailboxes) based on a Binding (the routing rules). If a message arrives at an Exchange with no bound queues, it is discardedโa phenomenon known as an “unroutable message.”

Using the Management UI, we provisioned the following:
- Queue:
jenkins_deploy_queue - Exchange:
jenkins_exchange(Type: Direct) - Binding: Bound the queue to the exchange using the routing key
deploy_app.
Phase 4: The Producer (Triggering the Event)
With the infrastructure ready, we need a mechanism to simulate an application requesting a deployment. We wrote a Python script using the pika library to act as our producer.
Using vi messages.py, we created the following script:
import pika
import sys
import json
RABBITMQ_HOST = '172.21.5.76' # rabbitmq-amqp.devops-db.internal
RABBITMQ_PORT = 31572
RABBITMQ_USER = 'jenkins_user'
RABBITMQ_PASS = 'NZhBj0XxNvJ6OVUaO5UsUBlipxsTop9a'
EXCHANGE_NAME = 'jenkins_exchange'
ROUTING_KEY = 'deploy_app'
def trigger_deployment():
try:
print(f"Connecting to AMQP Broker at {RABBITMQ_HOST}:{RABBITMQ_PORT}...")
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("AMQP connection established successfully.")
deployment_payload = {
"application": "frontend-service",
"environment": "production",
"version": "v1.4.2",
"author": "devops-team"
}
message_body = json.dumps(deployment_payload)
print(f"Publishing deployment trigger to exchange '{EXCHANGE_NAME}'...")
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
print("Trigger successfully sent! Check your Jenkins dashboard.")
connection.close()
except Exception as err:
print(f"CRITICAL: Unexpected error during execution. Error: {err}")
sys.exit(1)
if __name__ == '__main__':
trigger_deployment()
Phase 5: The Worker Pattern (Bridging AMQP to Jenkins)
Legacy Jenkins plugins for RabbitMQ often lack support for modern Declarative Pipelines. Instead of relying on obsolete plugins, we implemented the Worker Pattern.
We created a Python daemon (rabbitmq_worker.py) that runs alongside Jenkins. It continuously listens to the jenkins_deploy_queue via AMQP. When a message arrives, it translates the event into a standard HTTP POST request, triggering the native Jenkins REST API.
This approach implements the “Fat Event” pattern: instead of parsing individual variables, the worker wraps the entire JSON payload into a single parameter called PAYLOAD_JSON and forwards it to Jenkins.
import pika
import requests
import json
import sys
import urllib3
# Suppress warnings for internal Kubernetes self-signed certificates
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
RABBITMQ_HOST = '172.21.5.76' # rabbitmq-amqp.devops-db.internal
RABBITMQ_PORT = 31572
RABBITMQ_USER = 'jenkins_user'
RABBITMQ_PASS = 'NZhBj0XxNvJ6OVUaO5UsUBlipxsTop9a'
QUEUE_NAME = 'jenkins_deploy_queue'
JENKINS_URL = 'https://jenkins.devops-db.internal'
JENKINS_JOB_PATH = 'job/infrastructure/job/pipelines/job/tests/job/RabbitMQ-Example1'
JENKINS_USER = 'fbranco'
JENKINS_API_TOKEN = '11168682ae86c6a8abe39b219fb1d0424e'
def trigger_jenkins_pipeline(payload):
print(f"Triggering Jenkins pipeline with raw JSON payload via REST API...")
build_url = f"{JENKINS_URL}/{JENKINS_JOB_PATH}/buildWithParameters"
auth_credentials = (JENKINS_USER, JENKINS_API_TOKEN)
jenkins_parameters = {
'PAYLOAD_JSON': json.dumps(payload)
}
try:
response = requests.post(build_url, auth=auth_credentials, data=jenkins_parameters, verify=False)
response.raise_for_status()
print(f"Jenkins build triggered successfully! HTTP Status: {response.status_code}")
return True
except requests.exceptions.RequestException as err:
print(f"CRITICAL: Failed to communicate with Jenkins API. Error: {err}")
return False
def process_message(channel, method, properties, body):
print(f"\n[x] Received message from RabbitMQ: {body.decode()}")
try:
payload = json.loads(body)
success = trigger_jenkins_pipeline(payload)
if success:
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
print("Requeueing message to prevent data loss. Will try again.")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
except json.JSONDecodeError:
print("Invalid JSON. Discarding message.")
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
def start_worker():
# ... AMQP connection setup (omitted for brevity, available in repository) ...
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=process_message, auto_ack=False)
print(f"[*] Worker is listening to '{QUEUE_NAME}'. To exit press CTRL+C")
channel.start_consuming()
if __name__ == '__main__':
try:
start_worker()
except KeyboardInterrupt:
print("\n[!] Process interrupted by user (CTRL+C). Shutting down gracefully...")
sys.exit(0)
Notice the crucial verify=False flag and the urllib3 import. Because the Jenkins API request travels across the internal network using a custom Kubernetes CA (step-ca-issuer), standard SSL verification would fail and drop the deployment trigger.
Furthermore, if Jenkins is offline, the API call fails, and the worker executes channel.basic_nack(requeue=True). The message remains safely in the RabbitMQ queue until Jenkins recovers, ensuring zero data loss.
Phase 6: The Jenkins Declarative Pipeline
The final step is instructing Jenkins to unpack the “Fat Event”. We configured the Pipeline to expect a single PAYLOAD_JSON text parameter. Using Groovy’s native JsonSlurperClassic, we parse the string back into a manageable object.
Note: Using native Groovy libraries in pipelines triggers the Jenkins Script Security Sandbox. An administrator must explicitly approve the new groovy.json.JsonSlurperClassic signature in the “In-process Script Approval” settings before the pipeline can execute.
import groovy.json.JsonSlurperClassic
pipeline {
agent any
parameters {
text(name: 'PAYLOAD_JSON', defaultValue: '{}', description: 'Raw JSON payload from RabbitMQ')
}
stages {
stage('Process Payload') {
steps {
echo 'Parsing raw JSON payload natively...'
script {
def slurper = new JsonSlurperClassic()
def jsonProps = slurper.parseText(params.PAYLOAD_JSON)
echo "--- DEPLOYMENT PAYLOAD DETAILS ---"
echo "Target Application : ${jsonProps.application}"
echo "Target Environment : ${jsonProps.environment}"
echo "Requested Version : ${jsonProps.version}"
echo "Trigger Author : ${jsonProps.author}"
echo "----------------------------------"
}
}
}
}
}
Conclusion
By utilizing RabbitMQ as an intermediary, we decoupled our trigger sources from the deployment engine. This architecture solves the brittleness of direct HTTP webhooks, providing fault tolerance, message persistence, and a highly customizable data conduit. The combination of Kubernetes NodePorts for raw TCP traffic and a dedicated Python worker ensures that legacy and cloud-native systems can communicate flawlessly.
The Road Ahead: From Automation to a Deployment Platform
While our current implementation serves as a robust Proof of Concept (PoC), this architecture is designed to scale into a full-fledged Internal Developer Platform (IDP). By moving away from hardcoded variables and embracing a Dynamic Event Router pattern, we can unlock several enterprise-grade features:
- Dynamic Routing via YAML: Instead of a single-purpose script, the Worker can be evolved to read a central
routes.yamlconfiguration. This allows teams to map dozens of different RabbitMQ queues to specific Jenkins pipelines dynamically, without touching the core Python code. - Observability and Dead Letter Exchanges (DLX): In production, not every message succeeds. By implementing DLX patterns, messages that fail after a certain number of retries can be automatically routed to a “quarantine” queue, triggering alerts in Slack or PagerDuty to notify the DevOps team of stalled deployments.
- Parallel Execution at Scale: Utilizing Python’s
asyncioor threading capabilities, a single Worker can process multiple high-traffic queues in parallel, ensuring that a large frontend deployment doesn’t block an urgent database migration. - Security Hardening: The next evolutionary step involves moving secrets (like the
JENKINS_API_TOKENand RabbitMQ credentials) out of the code and into a secure vaulting solution like HashiCorp Vault or Kubernetes Secrets, injected at runtime via environment variables.
Example
# routes.yaml
rabbit_connection:
host: rabbitmq-amqp.devops-db.internal
port: 31572
credentials_secret_path: /vault/secrets/rabbitmq
pipelines:
- queue_name: frontend_deploy_queue
jenkins_target: job/frontend/job/deploy-prod
max_retries: 3
timeout: 30
- queue_name: backend_db_migration_queue
jenkins_target: job/database/job/flyway-migrate
max_retries: 1
timeout: 60By implementing this bridge, we aren’t just running a script; we are laying the foundation for a resilient, observable, and scalable delivery ecosystem.
