There are multiple ways to have two services communicate with each other, such as REST APIs, GraphQL, Publish/Subscribe models, and Message Queues.
In this article, we will explore RabbitMQ, a popular open-source message queue tool.
RabbitMQ is an open-source, production-capable, message queue. It’s typically used between services to allow for asynchronous communication.
Asynchronous communication is a method of communication that does not pause subsequent functions or commands while waiting for the result of the request. The flow of commands continues while the request is being processed by the other service. Once the result is received from the other service, the flow for that result proceeds, without having to stall the other functions of the system.
Why and When to Use Asynchronous Communication? #
The main reason why Asynchronous functions are implemented, is to allow the other functions that are not reliant on the asynchronous one, proceed to execute.
For example, say you had a webpage that displays the employees and departments in a company. Each of these is a separate function that does not rely on the other:
function getEmployees();
function getDepartments();
For the sake of this example, let’s say each function takes 3 seconds to complete processing. If the requests are synchronous, it would take the getEmployees function 3 seconds to complete, then it would take an additional 3 seconds for the getDepartments function to complete. Totalling up to 6 seconds.
Now in our case, the getDepartments function does not need the getEmployees function to complete before executing. Therefore, we turn our function from synchronous, to asynchronous:
await getEmployees();
await getDepartments();
In the above code, the getEmployees function will start and not stop the flow, it will run separately. Right away, the getDepartments function starts running as well. Now instead of waiting the full 3 seconds for a function to complete before the next one can start, they are running at the same time.
This reduces our loading times from 6 seconds to as low as 3 seconds.
On a small scale and with fast functions, you might not feel the difference as much. But imagine if you had 10s of functions, the load times will be in minutes!
Now that we got that out of the way, we’ll proceed to install and set up RabbitMQ.
Prerequisites #
To go through this tutorial, you will need the following:
- Linux System
- Docker
- Node JS
Installation and Set Up #
To run RabbitMQ, we can use Docker:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management
This will run an instance of RabbitMQ in a docker container. There are two ports that are used and exposed:
5672- This port is used to communicate with RabbitMQ. It will be used by our app15672- This port is the admin dashboard, displaying information about the RabbitMQ app that is running
You can access the dashboard by going to localhost:15672. You will be asked to sign in, the default username and password are guest.
IMPORTANT: In a real-world environment, make sure to change the default username and password to something more secure
Once you access the system, you will see multiple tabs. We’ll go through the ones relevant for this tutorial later.
Setting Up The Apps #
We’ll create two Node JS applications for this tutorial. One will be used to send data to the queue, and the other will be receiving that data.
To start, make a new directory and set up create a package.json file inside:
mkdir rabbitmq_tutorial
npm init -y
Next, install the AMQP package:
npm install amqplib
AMQP is a protocol, like HTTP, that is used for messaging
Next, create two files, sender.js and receiver.js:
touch sender.js receiver.js
We’ll start by building the sender functionality, the flow is as follows:
- Import the package
- Connect to our RabbitMQ instance
- Create the channel for communication
- Create the queue we will send data to
- Send some data to the queue
- Close the connection
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(connectionError, connection) {
if(connectionError){
throw connectionError
}
connection.createChannel(function(channelError, channel) {
if(channelError) {
throw channelError
}
let queueName = 'tutorial'
let data = Math.random().toString(10)
console.log(`Data that will be sent: ${data}`)
channel.assertQueue(queueName)
channel.sendToQueue(queueName, Buffer.from(data))
})
setTimeout(function () {
connection.close()
process.exit(0)
}, 500)
})
We create a random number as our data, which will be printed after we run the command.
We also have a setTimeout that waits for half a second before closing the connection and exiting the process.
You may have noticed that we did not send the data right away, but we placed the data in a Buffer.from(data) function. This function converts the data to a byte array, this is to allow us to send any data type to the queue, such as a String, JSON Object, or even a File (not recommended).
You’ll also notice that we gave the queue a name, which is tutorial. We will use the name in the receiver functionality.
We can run the sender.js file and see the data value that will be sent:
$ node sender.js
Data that will be sent: 0.22235865247646225
Assuming you face no issues, the command will exit right away and not print anything.
You can head back to the dashboard at localhost:15672, click on the Queues and Streams tab. You should see a queue there with the name tutorial. Click on the name to view more details.
You should see a dropdown Get Messages. Open it, and click on Get Message(s). It should display the message there, including the Payload which is the data we sent.
We can start writing the receiver.js file. The flow will be as follows:
- Import the package
- Connect to our RabbitMQ instance
- Create the channel for communication
- Connect to the specific queue we created
- Consume the content that is coming to the queue
- Close the connection
let amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(connectionError, connection) {
if(connectionError){
throw connectionError
}
connection.createChannel(function(channelError, channel) {
if(channelError) {
throw channelError
}
let queueName = 'tutorial'
channel.assertQueue(queueName)
channel.consume(queueName, function(message) {
console.log(message.content.toString())
}, {
noAck: true
})
})
})
You can see an object as the second argument of the
channel.consume()function with the key-valuenoAck: true, we will explain this in the next section
If you run the receiver file, you should get the data that is sitting in the queue:
$ node receiver.js
0.22235865247646225
You can keep the receiver file running in one terminal and open another terminal to send more data, which will be received by the receiver because it is still consuming the content of the channel:
$ node sender.js
Data that will be sent: 0.22235865247646225
% node receiver.js
0.22235865247646225
0.0199977246627685
Acknowledgment in Queues #
When a message is sent to a queue, the only way to remove it is for the receiving service to “acknowledgment”. This is a mechanism to ensure that the message has been successfully processed by the receiving service.
For example, say you have a receiver that reads messages from queues and sends email based on the content. Once a message is received, the data is extracted and sent to the mail service. While processing one of the messages, there was failure, say the mail service went down. Instead of the message being deleted from the queue, the queue waits until it receives an acknowledgment.
The code would look more like this:
...
channel.consume(queueName, function(message) {
// Process message
// Send Mail and receive confirmation
channel.ack(message)
}, {
noAck: false
})
In the code above, we set the noAck to false to disable automatic acknowledgment. Inside the function, we process the received message. Once all the processing is complete, we acknowledgment the message by calling the channel.ack(message) function.
Conclusion #
RabbitMQ is a great open-source tool for asynchronous communication between multiple services and in an Event-Driven Architecture.
It allows for more flexibility for the sender since it does not wait for the service to complete the processing, and for the receiver since it can process the messages as they come.
It also keeps the messages in the queue until they are processed. If the receiving service is offline, the message will be in the queue until the service is up and ready to process the message.
Thank you for reading this article, hope you enjoyed it. See you in the next one!
References: