Introduction:
In today’s distributed system landscape, efficient communication between microservices is crucial for maintaining scalability, reliability, and performance. In this technical blog, we’ll delve into how Masstransit and RabbitMQ can be combined to streamline communication within microservices architecture.
Introduction to RabbitMQ:
Overview of RabbitMQ:
RabbitMQ is a highly reliable message broker that acts as an intermediary for communication between distributed applications. It implements the Advanced Message Queuing Protocol (AMQP), a standardized messaging protocol, making it interoperable with a wide range of platforms and languages.
Message Queues, Exchanges, and Bindings:
- Message Queues: RabbitMQ uses message queues to store and route messages between producers and consumers. Messages are placed in queues until they are consumed by subscribers.
- Exchanges: Exchanges receive messages from producers and route them to the appropriate queues based on routing rules.
- Bindings: Bindings define the relationship between exchanges and queues, determining how messages are routed.
Why To Use RabbitMQ:
Reliable Messaging:
RabbitMQ ensures reliable message delivery by providing features such as message acknowledgment, message persistence, and message retry mechanisms.
Message Routing and Filtering:
RabbitMQ supports flexible message routing and filtering based on message attributes and patterns. This allows messages to be selectively routed to specific queues or consumers based on predefined criteria.
Installing RabbitMQ through Docker:
To install RabbitMQ locally using Docker, you can follow these steps:
- Install Docker:
Make sure you have Docker installed on your machine. You can download and install Docker from the official website: Docker.
- Pull RabbitMQ Image:
Open a terminal or command prompt and run the following command to pull the official RabbitMQ Docker image from Docker Hub:
docker pull rabbitmq
- Run RabbitMQ Container:
Once the image is downloaded, you can run a RabbitMQ container using the following command:
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq
- -d: Run the container in the background.
- –name my-rabbit: Assign a name to the container (you can choose any name).
- -p 5672:5672: Map the RabbitMQ default port for AMQP (5672).
- -p 15672:15672: Map the port for the RabbitMQ management plugin (15672).
- Access RabbitMQ Management Console:
You can access the RabbitMQ Management Console by opening your web browser and navigating to http://localhost:15672. The default credentials are:
- Username: guest
- Password: guest
Practical Example of Message Passing using RabbitMQ
Develop two console applications: one serving as the producer and the other as the consumer. The producer application will be responsible for sending messages to RabbitMQ, while the consumer application will retrieve those messages.”
Note: You will need to install a Nuget package RabbitMQ.client
Following is the Producer code that produce message:
RabbitMQproducer/Program.cs
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabitMQ.Producer
{
static class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
Uri = new Uri("amqp://guest:guest@localhost:5672")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("demo-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = new { Name = "Producer", Message = "Hello!", Date =
DateTime.Now };
var body =
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("", "demo-queue", null, body);
}
}
ConnectionFactory:
- An instance of ConnectionFactory is created to configure the RabbitMQ connection parameters. It specifies the URI of the RabbitMQ server (amqp://guest:guest@localhost:5672).
Connection and Channel Creation:
- Using the connection factory, a connection to the RabbitMQ server is established (CreateConnection()). The using statement ensures that the connection is automatically disposed of when it’s no longer needed.
- A channel is created on the connection (CreateModel()), which is used for most of the API calls.
Queue Declaration:
- The QueueDeclare method is used to declare the queue named “demo-queue”. It specifies options such as durability, exclusivity, and autodeletion.
Message Preparation:
- A message object is created using an anonymous type with properties for the name of the producer, a message, and the current date/time.
- The message object is serialized to JSON using SerializeObject, and the result is encoded as UTF-8 bytes.
Message Publishing:
- The BasicPublish method sends the serialized message body to the
“demo-queue”. The “” parameter specifies the default exchange, and the “demo-queue” parameter specifies the routing key (queue name). The null parameter indicates no message properties are set.
Following is the Consumer code that consume message:
RabbitMQconsumer/Program.cs
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabitMQ.Producer
{
static class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
Uri = new Uri("amqp://guest:guest@localhost:5672")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("demo-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = new { Name = "Producer", Message = "Hello!", Date = DateTime.Now };
var body =
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("", "demo-queue", null, body);
}
}
}
ConnectionFactory:
- An instance of ConnectionFactory is created to configure the RabbitMQ connection parameters. It specifies the URI of the RabbitMQ server (amqp://guest:guest@localhost:5672).
Connection and Channel Creation:
- Using the connection factory, a connection to the RabbitMQ server is established (CreateConnection()). The using statement ensures that the connection is automatically disposed of when it’s no longer needed.
- A channel is created on the connection (CreateModel()), which is used for most of the API calls.
Queue Declaration:
- The QueueDeclare method is used to declare the queue named “demo-queue”. It specifies options such as durability, exclusivity, and autodeletion.
EventingBasicConsumer:
- An instance of EventingBasicConsumer is created, which is a consumer that raises events upon message delivery.
- The Received event handler is attached to the consumer. When a message is received, the handler extracts the message body, decodes it as a UTF-8 string, and prints it to the console.
BasicConsume:
- The BasicConsume method starts consuming messages from the “demo-queue”. It specifies the queue name, whether messages should be acknowledged automatically (true), and the consumer instance to handle received messages.
Console.ReadLine:
- The ReadLine statement keeps the application running so that it continues to listen for messages until the user presses Enter.
Now run both the console Applications together and producer application will produce the HELLO message and consumer application will consume this message through RabbitMQ queue and will print it
The pitch in the RabbitMQ demo-queue shows that data has been received successfully in the Queue
The consumer application has received the data successfully and printed it.
- Message-based Communication
- Fault Tolerance
- Scalability
- Integration Capabilities
Practical Example of Message Passing using MassTransit
Getting Started- Creating two Web API projects one as Order Service and other as Inventory Service
- Order Service will be responsible for sending messages using rabbitMQ as a message broker through Mass Transit.
- Inventory Service will Consume the Order data from the Queue through Mass Transit
- Shared Order Model has been created which will be used as a data model.
Installation
Install the following Nuget packages in your .NET project.- MassTransit
- AspNetCore
- RabbitMQ
- Extensions.DependencyInjection
OrderService/Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddMassTransit(config => {
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
});
});
services.AddMassTransitHostedService();
}
2- Create a new Controller<strong> OrderController.cs</strong> in Controllers folder.
OrderController.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using Model;
// For more information on enabling Web API for empty projects, visit
https://go.microsoft.com/fwlink/?LinkID=397860
namespace OrderService.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class OrderController : ControllerBase
{
private readonly IPublishEndpoint publishEndpoint;
public OrderController(IPublishEndpoint publishEndpoint)
{
this.publishEndpoint = publishEndpoint;
}
// GET: api/
[HttpGet]
public IEnumerable Get()
{
return new string[] { "value1", "value2" };
}
// GET api//5
[HttpGet("{id}")]
public string Get(int id)
{
return "value";
}
// POST api/
[HttpPost]
public async Task Post([FromBody] Order order)
{
await publishEndpoint.Publish(order);
return Ok();
}
// PUT api//5
[HttpPut("{id}")]
public void Put(int id, [FromBody] string value)
{
}
// DELETE api//5
[HttpDelete("{id}")]
public void Delete(int id)
{
}
}
}
3- Now for the InventoryService configure the RabbitMQ consumer in Startup.cs of InventoryService.
InventoryService/Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddMassTransit(config => {
config.AddConsumer();
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.ReceiveEndpoint("order-queue", c => {
c.ConfigureConsumer(ctx);
});
});
});
services.AddMassTransitHostedService();
}
InventoryServie/OrderConsumer.cs
using MassTransit;
using Microsoft.Extensions.Logging;
using Model;
using System;
using System.Threading.Tasks;
namespace InventoryService
{
internal class OrderConsumer : IConsumer
{
private readonly ILogger logger;
public OrderConsumer(ILogger logger)
{
this.logger = logger;
}
public async Task Consume(ConsumeContext context)
{
await Console.Out.WriteLineAsync(context.Message.Name);
logger.LogInformation($"Got new message
{context.Message.Name}");
}
}
}
5- Add shared Order Model Project in both WebAPI projects
Model/Order.cs
using System;
namespace Model
{
public class Order
{
public string Name { get; set; }
}
}
8- The consumer has consumed the message from Order-queue and printed the message.
Thank You!