Build Microservices using MassTransit and RabbitMQ with .NET
Using MassTransit, RabbitMQ and .NET to create distributed communication
Intro
In this short post we’re going to take a look at MassTransit, an open-source solution for building distributed systems with .NET. In a nutshell, it’s a service bus that handles messaging between different parts of a single application or between completely separate systems. We’re also going to be using RabbitMQ, a popular free and open-source message broker.
You can find the source code for this post on GitHub by clicking here or at the end of the post.
Architecture Overview
We’re going to build a very basic microservices system for an e-commerce application. The following is the system design diagram that explains what we’re going to end up with. Let’s take a look.
User Interaction:
Products Page: This is where the user interaction begins. Users access the products page to browse through the available items. This page communicates with the Product Catalog Microservice to fetch product details.
Order: After selecting products, the user then sends a request to the Order Microservice with the order. It validates the incoming order data and then publishes an "order-created" event to a message queue.
Inventory Microservice: This service subscribes to the "order-created" queue. Upon receiving a message containing the details of the order (product id and quantity), it checks product availability against the current inventory. Once done, it sends a new message to the "order-processed" queue containing the status of the order, which would be either "To be delivered" or "Not enough inventory" depending on the product availability.
Order Status: Once the order has been processed and confirmed, the final step involves the Order Microservice reacting to the "order-processed" message by finalizing the order details and updating the order status in the Orders page.
This architecture ensures that each microservice operates independently but communicates effectively through defined messages and queues, which are managed by RabbitMQ. This decouples the services by abstracting direct dependencies among them, allowing for easier scaling and maintenance.
The use of RabbitMQ for managing messages not only helps in maintaining service decoupling but also enhances the system's ability to handle high volumes of orders by balancing loads and managing failed message reprocessing.
Let’s Build
To follow along you need to install RabitMQ. Learn how from here Installing RabbitMQ | RabbitMQ
Create a new project folder and solution
mkdir MassTransitExample
dotnet new sln -n MassTransitExample
Create SharedContacts class library
We’re going to use this library to store our entities, messages and in-memory databases for orders and products, since it would be an overkill to use a real database for our case.
dotnet new classlib -n SharedContracts
You can now remove the Class1.cs file that was created as we’re not going to use it. Let’s now create all the classes in it.
Product.cs
Our product record.
public record Product(int ProductId, string Name, decimal Price, int Quantity);
ProductsDatabase.cs
Our in-memory database with products.
namespace SharedContracts;
public class ProductsDatabase
{
public static List<Product> Products = new List<Product>
{
new Product(1, "MacBook Pro", 2999.99M, 5),
new Product(2, "Harry Potter and The Order of The Phoenix", 19.99M, 3),
new Product(3, "Leather Jacket", 699.99M, 1),
};
}
Order.cs
The model for creating new orders. It will receive a product id and quantity. The status of every new order would be set to "Pending" by default.
public record Order(int ProductId, int Quantity)
{
public string OrderId { get; } = Guid.NewGuid().ToString();
public string Status { get; set; } = "Pending";
}
OrdersDatabase.cs
Our in-memory database with orders.
namespace SharedContracts;
public class OrdersDatabase
{
public static List<Order> Orders = new();
}
OrderReceived.cs
The message we’ll use for received orders.
namespace Messaging;
public record OrderReceived(string OrderId, int ProductId, int Quantity);
OrderProcessed.cs
The message we’ll use for processed orders.
namespace Messaging;
public record OrderProcessed(string OrderId, string Status);
We’re done with SharedContracts. Let’s now create the rest of our system.
Create ProductCatalog.API
This will be our microservice that’s responsible for showing products to the user.
Go back to the root of the project.
Create a new API project named ProductCatalog.API
dotnet new webapi -n ProductCatalog.API -minimal
Add reference to our SharedContracts library.
dotnet add ProductCatalog.API/ProductCatalog.API.csproj reference SharedContracts/SharedContracts.csproj
Replace Program.cs with the following:
using SharedContracts;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddCors(options =>
{
options.AddDefaultPolicy(
policy =>
{
policy.AllowAnyOrigin()
.AllowAnyMethod()
.AllowAnyHeader();
});
});
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseHttpsRedirection();
app.UseCors();
// In memory database of products
app.MapGet("/products", () =>
{
return ProductsDatabase.Products;
});
app.Run();
Here we add a GET endpoint to get all of the products from the database. Also, we’ve added CORS, otherwise we’d get errors down the road when we add our UI project.
Now go back to the root folder.
Create Order.API
This will be our microservice that’s responsible for handling new orders.
1. Create a new API project named Order.API
dotnet new webapi -n Order.API -minimal
2. Add reference to our SharedContracts library.
dotnet add Order.API/Order.API.csproj reference SharedContracts/SharedContracts.csproj
3. Install MassTransit & MassTransit.RabbitMQ
We need both packages so we can utilize MassTransit and RabbitMQ.
dotnet add Order.API/Order.API.csproj package MassTransit
dotnet add Order.API/Order.API.csproj package MassTransit.RabbitMQ
4. Create Consumers folder inside the Order.API project
We’ll use this folder to create our OrderProcessedConsumer.cs. This consumer will receive the processed order message from Inventory.API (will create it soon) and will update the Orders database with the status of the order. If the inventory is enough then we’ll create a status of “To be delivered” and if not we’ll set a status of “Not enough inventory”. The user will be able to see this on their Orders page later on.
5. Create OrderProcessedConsumer.cs inside Consumers folder and add the following code:
using MassTransit;
using Messaging;
using SharedContracts;
namespace OrderAPI.Consumers;
public class OrderProcessedConsumer : IConsumer<OrderProcessed>
{
public Task Consume(ConsumeContext<OrderProcessed> context)
{
var message = context.Message;
var order = OrdersDatabase.Orders.SingleOrDefault(o => o.OrderId == message.OrderId);
if (order != null)
{
order.Status = message.Status;
}
return Task.CompletedTask;
}
}
Here we receive the OrderProcessed message and we set the status to the Order entity.
6. Program.cs
The last step is to update our Program.cs to use MassTransit with RabbitMQ and to add the OrderProcessedConsumer. Also, we’ve created a POST endpoint for creating new orders. We also need to add CORS here, too. Here’s the code:
using MassTransit;
using Messaging;
using OrderAPI.Consumers;
using SharedContracts;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddMassTransit(config =>
{
config.AddConsumer<OrderProcessedConsumer>();
config.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h => {
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("order-processed-queue", e =>
{
e.ConfigureConsumer<OrderProcessedConsumer>(context);
});
});
});
builder.Services.AddCors(options =>
{
options.AddDefaultPolicy(
policy =>
{
policy.AllowAnyOrigin()
.AllowAnyMethod()
.AllowAnyHeader();
});
});
var app = builder.Build();
// Configure the HTTP request pipeline.
// In-memory storage for orders
app.UseHttpsRedirection();
app.UseCors();
// Create new order
app.MapPost("/orders", async (Order order, IBus bus) =>
{
// Add to in-memory storage
OrdersDatabase.Orders.Add(order);
Console.WriteLine("Order added to db.");
// Publish to message broker
// NOTE: In a real app we might have to use outbox pattern
await bus.Publish(new OrderReceived(order.OrderId, order.ProductId, order.Quantity));
Console.WriteLine("Order published to message broker.");
});
// Get all orders
app.MapGet("/orders", () =>
{
return OrdersDatabase.Orders;
});
app.Run();
The code is easy to understand, hopefully. We’ll receive the processed orders inside a "order-processed-queue" queue.
We’re done with Orders.API. Let’s now move to the Inventory.API, the microservice that will be responsible with checking whether or not we have enough products in stock so we’re able to fulfil an order.
Create Inventory.API
This will be our microservice that’s responsible for handling product availability.
1. Create a new API project named Inventory.API
dotnet new webapi -n Inventory.API -minimal
2. Add reference to our SharedContracts library
dotnet add Inventory.API/Inventory.API.csproj reference SharedContracts/SharedContracts.csproj
3. Install MassTransit & MassTransit.RabbitMQ
dotnet add Inventory.API/Inventory.API.csproj package MassTransit
dotnet add Inventory.API/Inventory.API.csproj package MassTransit.RabbitMQ
4. Create Consumers folder
Just like with Orders.API, we’ll create a Consumers folder and in it we will put our OrderReceivedConsumer.cs file that will handle the received orders. It will be Inventory’s responsibility to then check the stock for each received order and if there’s enough quantity of a given product in an order then it will send an OrderProcessed message to Order.API (see above) with a status of “To be delivered”, otherwise it would set a status of “Not enough inventory”.
5. OrderReceivedConsumer.cs
using MassTransit;
using Messaging;
using SharedContracts;
namespace Inventory.API.Consumers;
public class OrderReceivedConsumer : IConsumer<OrderReceived>
{
private readonly IBus _bus;
public OrderReceivedConsumer(IBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<OrderReceived> context)
{
var message = context.Message;
Console.WriteLine($"Received order ID: {message.OrderId}. Checking availability...");
// Simulate checking availability
bool available = ProductsDatabase.Products.Any(p => p.ProductId == message.ProductId && p.Quantity >= message.Quantity);
if (available)
{
Console.WriteLine("Product is available. Processing order...");
await _bus.Publish(new OrderProcessed(message.OrderId, "To be delivered"));
}
else
{
Console.WriteLine("Product is not available. Cancelling order...");
await _bus.Publish(new OrderProcessed(message.OrderId, "Not enough inventory"));
}
}
}
6. Program.cs
The code is pretty straight forward, pretty similar to what we have in Order.API. The service will receive new messages in "order-created-queue" and will be handled by the OrderReceivedConsumer above.
using MassTransit;
using Inventory.API.Consumers;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderReceivedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("order-created-queue", e =>
{
e.ConfigureConsumer<OrderReceivedConsumer>(context);
});
});
});
builder.Services.AddCors(options =>
{
options.AddDefaultPolicy(
policy =>
{
policy.AllowAnyOrigin()
.AllowAnyMethod()
.AllowAnyHeader();
});
});
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseHttpsRedirection();
app.UseCors();
app.Run();
Now, what’s left is to create our UI. For that we’ll use a simble Blazor WebAssembly project.
Create Blazor.UI
1. Go back to the solution root folder and create new Blazor WebAssembly project
dotnet new blazorwasm -o Blazor.UI
2. Make sure you have Microsoft.Extensions.Http package
dotnet add Blazor.UI/Blazor.UI.csproj package Microsoft.Extensions.Http
3. Pages. We’ll have two pages - one for listing all products and another one for listing all of our orders.
Home.razor
@page "/"
@using Messaging
@inject IHttpClientFactory ClientFactory
<PageTitle>Products</PageTitle>
<h1>All products</h1>
@if (products == null)
{
<p>Loading products...</p>
}
else if (products.Length == 0)
{
<p>No products available.</p>
}
else
{
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
@foreach (var product in products)
{
<div style="border: 1px solid gray; padding: 10px; width: 300px;">
<h3>@product.Name</h3>
<p>Price: @product.Price.ToString("C")</p>
<select @bind="productQuantities[product.ProductId]">
@for (int qty = 1; qty <= 10; qty++)
{
<option value="@qty">@qty</option>
}
</select>
<button @onclick="@(async () => await Purchase(product.ProductId))">Purchase</button>
</div>
}
</div>
}
<NavLink href="/orders">Orders</NavLink>
@code {
private Product[]? products;
private HttpClient? productsApi;
private HttpClient? ordersApi;
private Dictionary<int, int> productQuantities; // Stores quantities for each product
protected override async Task OnInitializedAsync()
{
productsApi = ClientFactory.CreateClient("ProductsAPI");
ordersApi = ClientFactory.CreateClient("OrdersAPI");
products = await productsApi.GetFromJsonAsync<Product[]>("products") ?? Array.Empty<Product>();
InitializeQuantities();
}
private void InitializeQuantities()
{
productQuantities = new Dictionary<int, int>();
foreach (var product in products)
{
productQuantities[product.ProductId] = 1; // Initialize each product's quantity to 1
}
}
private async Task Purchase(int productId)
{
int qty = productQuantities[productId]; // Fetch the selected quantity for this product
await ordersApi.PostAsJsonAsync("orders", new Order(productId, qty));
}
}
Orders.razor
@inject IHttpClientFactory ClientFactory
@page "/orders"
<h3>Orders</h3>
@if (orders == null)
{
<p>Loading orders...</p>
}
else if (orders.Length == 0)
{
<p>No orders available.</p>
}
else
{
<table>
<thead>
<tr>
<th>Order ID</th>
<th>Product ID</th>
<th>Quantity</th>
<th>Status</th>
</tr>
</thead>
<tbody>
@foreach (var order in orders)
{
<tr>
<td>@order.OrderId</td>
<td>@order.ProductId</td>
<td>@order.Quantity</td>
<td>@order.Status</td>
</tr>
}
</tbody>
</table>
}
@code {
private HttpClient? ordersApi;
private Order[]? orders;
protected override async Task OnInitializedAsync()
{
ordersApi = ClientFactory.CreateClient("OrdersAPI");
orders = await ordersApi.GetFromJsonAsync<Order[]>("orders") ?? Array.Empty<Order>();
}
}
Both of the pages inject HTTP clients for each service (you’ll see how below) and use them to create requests.
Program.cs
using Blazor.UI;
using Microsoft.AspNetCore.Components.Web;
using Microsoft.AspNetCore.Components.WebAssembly.Hosting;
var builder = WebAssemblyHostBuilder.CreateDefault(args);
builder.RootComponents.Add<App>("#app");
builder.RootComponents.Add<HeadOutlet>("head::after");
builder.Services.AddHttpClient("ProductsAPI", client =>
client.BaseAddress = new Uri("https://localhost:7272/"));
builder.Services.AddHttpClient("OrdersAPI", client =>
client.BaseAddress = new Uri("https://localhost:7030/"));
await builder.Build().RunAsync();
Here we add two HTTP clients, one for our products api and another one for our orders api. Make sure to change the ports as your projects would probably be using different ones.
That’s it! Let’s now see what this project does.
First, make sure you run all the projects.
Here’s our amazing UI. We have all products and Orders page link at the bottom.
If you go to Orders page now you will see that we don’t have any orders yet.
Let’s now try and purchase 2 MacBook Pro.
Click Purcahse.
Here’s what we get in Order.API console:
So now the Order microservice pushed the order to the Inventory microservice so it can check if it has enough MacBooks in stock.
Here’s the Inventory service:
So, as we see, it turns out that we have enough MacBooks to fulfill this order. Once the Inventory microservice is done it sends a message back to the Order microservice to notify it.
Now let’s go to Orders page and see the result.
Let’s now try to purchase 10 MacBook Pro (which we don’t have).
Again, Order microservice sends an event that there is a new order, the inventory checks if they have in stock what’s been purchased and now…
The product is not available, so we cancel the order.
Now, if you go to Orders page you’ll see the Not enough inventory against it.
Hope this was useful to someone :)
Links
Source code: https://github.com/kaldren/MassTransitExample