Introduction to RabbitMQ
Decoupling of software components is one of the most important parts of software design. One way of achieving this is using messaging systems, which provide an asynchronous way of communication between components (services). In this article, we will cover one such system: RabbitMQ.
RabbitMQ is a message broker that implements Advanced Message Queuing Protocol (AMQP). It provides client libraries for major programming languages.
Besides being used for decoupling software components, RabbitMQ can be used for:
- Performing background operations
- Performing asynchronous operation
Making RabbitMQ easier to understand
Think of RabbitMQ as backend AJAX calls. Think of an ajax call like:
$.post({
url: "/discover/bar",
dataType: "json",
data: {
some: "paylaod",
goes: "here",
more: ["stuff", "things", "etc"]
}
});
This is a perfect example of how we’ve been already using most of the concepts that RabbitMQ encapsulates.
Instead of dealing with HTTP requests that may be exposed to the internet, RabbitMQ is more often used for back-end services.
There are some key differences, of course. But this is less an analogy than it is a direct parallel—a different implementation of the same basic idea.
Some of these parallels and differences include the following:
AJAX |
RABBITMQ |
HTTP |
AMQP |
jQuery.ajax |
RMQ message “producer” (SDK / API) |
HTML form encoded data |
JSON documents |
Web Server |
RMQ Server / Message Broker |
API Endpoint / URL |
Exchange, Routing Key, Queue |
Route / request handler (e.g. MVC controller / action) |
RMQ message “consumer” (SDK / API) |
There’s more subtlety and stark contrast in this comparison then I am explaining in this simple table, but this should give you an idea of how to start thinking about RabbitMQ.
There’s also a lot of new terminology to learn with RabbitMQ (and distributed systems), as with any tech that is new to you. But most of these terms and specifics don’t matter right now.
The one thing that does matter though, is the message broker.
Terminologies
- Producing means nothing more than sending. A program that sends messages is a producer.
- A queue is the name for a post box which lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can only be stored inside a queue. A queue is only bound by the host's memory & disk limits, it's essentially a large message buffer. Many producers can send messages that go to one queue, and many consumers can try to receive data from one queue.
- Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages.
Setup
To begin, let’s run RabbitMQ using the official setup guide here.
We’ll naturally use the Java client for interacting with the RabbitMQ server; the Maven dependency for this client is:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
After running the RabbitMQ broker using the official guide, we need to connect to it using java client:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
We use the ConnectionFactory to setup the connection with the server—it takes care of the protocol (AMQP) and authentication as well. Here we connect to the server on localhost and we can modify the host-name by using the setHost function.
We can use setPort to set the port if the default port is not used by the RabbitMQ Server; the default port for RabbitMQ is 15672:
factory.setPort(15678);
We can set the username and password:
factory.setUsername("user1"Copy of Introduction to RabbitMQ);
factory.setPassword("MyPassword");
Further, we will use this connection for publishing and consuming messages.
Producer
Consider a simple scenario where a web application allows users to add new products to a website. Any time a new product is added, we need to send an email to customers.
First, let’s define a queue:
channel.queueDeclare("products_queue", false, false, false, null);
Each time when a user adds a new product, we will publish a message to a queue:
String message = "product details";
channel.basicPublish("", "products_queue", null, message.getBytes());
Lastly, we close the channel and the connection:
channel.close();
connection.close();
Consumer
Let’s see what we can implement on the consumer side; we’re going to declare the same queue:
channel.queueDeclare("products_queue", false, false, false, null);
Here’s how we define the consumer that will process messages from a queue asynchronously:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
// process the message
}
};
channel.basicConsume("products_queue", true, consumer);
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way we can scale easily.
Message acknowledgment
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies when it’s only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack (short for acknowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed, and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It's time to set this flag to false and send a proper acknowledgment from the worker, once we're done with a task.
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
Conclusion
This simple article covered basic concepts of RabbitMQ and discussed a simple example using it.
Recent Stories
Top DiscoverSDK Experts
Compare Products
Select up to three two products to compare by clicking on the compare icon () of each product.
{{compareToolModel.Error}}
{{CommentsModel.TotalCount}} Comments
Your Comment