Topic Based Exchange - RabbitMQ

Working of RabbitMQ

Before diving directly into what topic-based exchanges are and how they work, it is obligatory to have a sound understanding of how messages are published and subscribed.
The first thing to understand is that messages are not published directly to a Queue. Instead, the Producer sends messages through an Exchange.
You can think of an Exchange as a mail delivery person ensuring that the message ends up in the correct Queue.

How a message is routed depends on several things including:

  • the exchange type - specifies routing rules
  • routing keys and
  • header attributes.

These all act as addresses for messages.

From a Queue's perspective, you can check exchanges and routing rules
that are linked to this specific Queue. These links are called Bindings. While the Routing Key is like an address for the message, A Binding links the Queue to an Exchange.

In RabbitMQ there are four main types of Exchanges: Direct, Topic, Fanout, and Headers

This is mainly what the exchange looks for when deciding how to route the message to Queues:

Credits: Bikram Kundu

For local development, Existing exchanges, queues, channels, and types can be seen in the management interface or through "rabbitmqadmin."

Screenshot-2022-09-16-at-6.03.49-PM

Local Interface: http://localhost:15672/#/exchanges

Publishing & Subscribing using a topic-based exchange

Topic exchanges route messages to one or more queues based on the match between a message routing key and the pattern that was used to link a queue to an exchange.

Let's demonstrate the pub/sub procedure in python:

We'll use Pika python AMQP Client Library for the forthcoming operations. Install it using:

pip install pika

publisher.py

import pika


class Publisher:
    def __init__(self, config):
        self.config = config

    def publish(self, routing_key, message):
        connection = self.create_connection()
        channel = connection.channel()

        """
        Creates/verifies an exchange.
        """
        channel.exchange_declare(
            exchange=self.config["exchange"], exchange_type="topic"
        )

        # Publish 'message' to the 'exchange' matching
        # the provided routing key
        channel.basic_publish(
            exchange=self.config["exchange"], routing_key=routing_key, 
            body=message
        )
        
        print(f" [x] Sent message {message} for {routing_key})

    # Create new connection
    def create_connection(self):
        param = pika.ConnectionParameters(
            host=self.config["host"], port=self.config["port"]
        )
        return pika.BlockingConnection(param)


config = {"host": "localhost", "port": 5672, "exchange": "my_exchange"}
publisher = Publisher(config)
publisher.publish('black', 'single word routing key')
publisher.publish('black.mamba', 'one more word added')
publisher.publish('black.abc.xyz', 'one or more words')

The above script helps us achieve:

  • Create connection and channel to RMQ host.
  • Declaring an exchange: my_exchange.
  • Publish different messages using various patterns of routing keys.

Execute for declaring exchange:

> python publisher.py

subscriber.py

import pika
import sys
class Subscriber:
    def __init__(self, queueName, bindingKey, config):
        self.queueName = queueName
        self.bindingKey = bindingKey
        self.config = config
        self.connection = self._create_connection()

    def __del__(self):
        self.connection.close()
        
    def _create_connection(self):
        parameters=pika.ConnectionParameters(host=self.config['host'],    
        port = self.config['port'])
        return pika.BlockingConnection(parameters)

    def on_message_callback(self, channel, method, properties, body):
        print("body -", body)

    def setup(self):
        print("setup started")
        channel = self.connection.channel()
        
        # This method creates or checks a queue
        channel.queue_declare(queue=self.queueName)
        print("Queue declared")

        # Binds the queue to the specified exchange
        channel.queue_bind(queue=self.queueName,exchange=self.config
        ['exchange'],routing_key=self.bindingKey)

        channel.basic_consume(queue=self.queueName,
        on_message_callback=self.on_message_callback, auto_ack=True)

        print(" [*] Waiting for data for "  + self.queueName + ". To exit press CTRL+C")
        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.stop_consuming()
            
config = { 'host': 'localhost','port': 5672, 'exchange' : 'my_exchange'}

if len(sys.argv) < 2:
    print('Usage: ' + __file__ + ' <QueueName> <BindingKey>')
    sys.exit()
else:
    queueName = sys.argv[1]
    #key in the form exchange.*
    key = sys.argv[2]
    subscriber = Subscriber(queueName, key, config)
    subscriber.setup()

Execute it using:

python subscriber.py testQueue black.#

The above subscriber.py script helps us achieve:

  • Create connection and channel to RMQ host.
  • Bind queue:testQueue to exchange my_exchange using binding key black.#.
  • Consume data and trigger function on_message_callback when the publisher emits the message.

Re-execute publisher for publishing data:

> python publisher.py
[x] Sent message 'single word routing key' for 'black'
[x] Sent message 'One more word added' for 'black.mamba'
[x] Sent message 'one or more words' for 'black.abc.xyz'

Subscriber's output:

setup started
Queue declared
 [*] Waiting for data for testQueue. To exit press CTRL+C
body - b'single word routing key'
body - b'One more word added'
body - b'one or more words'
Ending Note: Topic-based exchange in RabbitMQ can be used for solving complex use cases like triggering and executing long-running background tasks, communicating among services, scaling up-down during peak hours, to make reliable backups, etc.
Kartik Kapgate

Kartik Kapgate

Software Engineer | Backend