Search⌘ K
AI Features

Creating a RabbitMQ Consumer for the Core App

Explore how to create a RabbitMQ consumer in a Python microservices core app with pika. Learn to connect to the message broker, handle message types for creating, updating, and deleting database records, and ensure smooth message consumption using callbacks and automatic acknowledgments.

We'll cover the following...

After creating the RabbitMQ producer for the Core app, we must also create a RabbitMQ consumer for it.

Creating a consumer in the Core app

To create a RabbitMQ consumer in the Core app, we'll create a file in the backendservice2 app directory folder named consumer.py. Then, we'll open the file and import pika and json, like so:

import pika, json

We also have to import our House model and db from our core.py, as follows:

from core import House, db

After doing this, we can use pika to create a connection and define a callback function to consume our messages, as shown below:

Python
import pika, json
from core import House, db
params = pika.URLParameters('{{Your_AMQP_URL}}')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='core')
def callback(ch, method, properties, body):
print('Received in core')
data = json.loads(body)
print(data)
if properties.content_type == 'house_created':
house = House(id=data['id'], name=data['name'], image=data['image'], description=data['description'])
db.session.add(house)
db.session.commit()
print('House Created')
elif properties.content_type == 'house_updated':
house = House.query.get(data['id'])
house.name = data['name']
house.image = data['image']
house.description = data['description']
db.session.commit()
print('House Updated')
elif properties.content_type == 'product_deleted':
house = House.query.get(data)
db.session.delete(house)
db.session.commit()
print('House Deleted')
channel.basic_consume(queue='core', on_message_callback=callback, auto_ack=True)
print('Started Consuming')
channel.start_consuming()
channel.close()

  • Line 7: We create a variable, params, which is set to pika.URLParameters('{{Your_AMQP_URL}}'). This means we want to connect to a message broker, in this ...