Amqplib – RabbitMQ the easy way

Amqplib – RabbitMQ the easy way

If you have already worked with RabbitMQ message broker in Node.JS/Typescript you probably met the amqplib that is widely popular on NPM and used among many people. The library itself is very well documented and you may find a lot of interesting facts in the documentation. However, as my programming background is mainly based on modern php, I wanted to work with Rabbitmq connection, publisher and consumer structures in Objec-Oriented way. This lack of OOP I tried to solve with creation of the library that is built on the top of the mentioned amqplib and you may find it on NPM named amqplib-plus.

The goal of amqplib-plus library is to create classes for Connection, Publisher and Consumer. The whole library is written in typescript so you can easily integrate it to your own typescript projects or you can use compiled es6 javascript.

Amqplib-plus key features are:

  • connection auto-reconnect feature
  • easy to use object oriented publisher
  • easy to use object oriented consumer that runs user defined function on every consumed message

You can easily install it by running npm install amqplib-plus

Publisher

Publisher object is responsible for sending (publishing) messages to the RabbitMQ message broker. When creating the publisher you pass it the connection instance and the prepareFn. The prepareFn is called whenever the connection is opened (it may be when connecting the first time or after automatic reconnect). You may ensure the existence of queues, exchanges, binds etc. in this function. This function is called once the connection is established and opened.

Publisher implements the IPublisher interface that offers following methods:

  • publish(exchange: string, routKey: string, content: Buffer, options: {}): Promise<void>;
  • sendToQueue(queue: string, content: Buffer, options: {}): Promise<void>;

Use the publish function for publishing to exchange (message will be routed by given routing key). For publishing directly to known queue you may use sendToQueue method. Options are standard amqplib publishing options to be found in official amplib doc.

const conn = require("amqplib-plus/dist/lib/Connection");
const pub = require("amqplib-plus/dist/lib/Publisher");

const options = {
	host: "localhost",
	port: 5672,
	user: "guest",
	pass: "guest",
	vhost: "/",
	heartbeat: 60,
};

// create the Connection instance that manages reconnecting
const connection = new conn.Connection(options);

async function run() {
	await connection.connect();

	// this method to be called before the publisher instance is created and after every reconnect
	// ch is classic Channel object from amqplib
	const preparePublisher = async (ch) => {
		await ch.assertQueue("target-queue", { durable: false });
		await ch.assertExchange("target-exchange", "direct");
		await ch.bindQueue("target-queue", "target-exchange", "routKey");
		console.log("Publisher ready");
	};

	// Creates new publisher instance
	// The preparePublisher fn is called once the connection is opened
	const publisher = new pub.Publisher(connection, preparePublisher);

	// Send messages to message broker
	// It waits until connection is opened and preparePublisher function is called

	// send directly to queue
	await publisher.sendToQueue("target-queue", new Buffer("message content"), {});

	// send to exchange
	await publisher.publish("target-exchange", "routKey", new Buffer("another content"), {});

	console.log("Two messages sent.");
}

run();

Consumer

The consumer instance is responsible for reading messages from defined rabbitMQ queue. It wraps the original amqplib consume function with the logic of reconnection and channel setup. The example below shows how to use in the right way.

At first define your custom consumer behaviour. Create the class that extends the amqplib-plus’s Consumer. You must implement the abstract processMessage(msg, channel): void; method where you handle the message the way you want to, pass it to another service, handle it there etc. and then ack or nack it. The connection instance ensures that if the connection to the broker fails it will be renewed automatically and you don’t have to deal with it on your own. Save the file as CustomConsumer.js.

const Consumer = require("amqplib-plus/dist/lib/Consumer");

class CustomConsumer extends Consumer.Consumer {

	constructor(conn, prepareFn) {
		super(conn, prepareFn);
	}

	processMessage(msg, channel) {
		// Your own messages process logic
		console.log("Message headers:", JSON.stringify(msg.properties.headers));
		console.log("Message body:", msg.content.toString());

		// Your own condition to decide whether to ack/nack/reject
		if (msg.content.toString().length > 10) {
			return channel.nack(msg);
		}

		channel.ack(msg);
	}

}

module.exports = CustomConsumer;

Then use your own CustomConsumer…

const conn = require("amqplib-plus/dist/lib/Connection");
const CustomConsumer = require("./CustomConsumer");

const options = {
	host: "localhost",
	port: 5672,
	user: "guest",
	pass: "guest",
	vhost: "/",
	heartbeat: 60,
};

const connection = new conn.Connection(options);

async function runCustomConsumer() {
	await connection.connect();

	// Method to be called before instance is created and after every connection renewal
	// You should at least assert the queue you want to consume here and ensure it is created with expected settings
	const prepareConsumer = async (ch) => {
		await ch.assertQueue("source-queue", { durable: false });
		await ch.prefetch(5);
	};
	// instantiate the class we created above
	const customConsumer = new CustomConsumer(connection, prepareConsumer);
	// start the consumption
	customConsumer.consume("source-custom-queue", {});
}

runCustomConsumer();

I hope the usage of this library helps you to work with AMQP in Node.JS and eases the newbies start to communicate over message queues. Special thanks to original amqplib library that is really awesome!

If you have any code improvement, suggestion or question, don’t hesitate to create a PR on github or text me.

Close Menu