Project Structure
Commands : Define and handle commands.
Events : Define and handle events.
Queries : Define and handle queries.
Repositories : Access data for the read and write databases.
Synchronization : Synchronize between the read and write databases using RabbitMQ.
Example Code
1. Setting Up RabbitMQ
Docker Compose for RabbitMQ
Copy version: '3'
services:
rabbitmq:
image: "rabbitmq:3-management"
ports:
- "5672:5672"
- "15672:15672"
2. Commands
Commands/CreateOrderCommand.cs
Copy public class CreateOrderCommand : IRequest<int>
{
public string ProductName { get; set; }
public decimal Price { get; set; }
}
Handlers/CreateOrderCommandHandler.cs
Copy public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, int>
{
private readonly IOrderWriteRepository _orderWriteRepository;
private readonly IEventPublisher _eventPublisher;
public CreateOrderCommandHandler(IOrderWriteRepository orderWriteRepository, IEventPublisher eventPublisher)
{
_orderWriteRepository = orderWriteRepository;
_eventPublisher = eventPublisher;
}
public async Task<int> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{
var order = new Order { ProductName = request.ProductName, Price = request.Price };
_orderWriteRepository.Add(order);
await _orderWriteRepository.SaveChangesAsync();
var orderCreatedEvent = new OrderCreatedEvent(order.Id, order.ProductName, order.Price);
await _eventPublisher.Publish(orderCreatedEvent);
return order.Id;
}
}
3. Events
Events/OrderCreatedEvent.cs
Copy public class OrderCreatedEvent
{
public int OrderId { get; }
public string ProductName { get; }
public decimal Price { get; }
public OrderCreatedEvent(int orderId, string productName, decimal price)
{
OrderId = orderId;
ProductName = productName;
Price = price;
}
}
4. Event Publisher Interface
Services/IEventPublisher.cs
Copy public interface IEventPublisher
{
Task Publish<T>(T @event) where T : class;
}
5. RabbitMQ Event Publisher
Services/RabbitMQEventPublisher.cs
Copy public class RabbitMQEventPublisher : IEventPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMQEventPublisher()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "order_exchange", type: ExchangeType.Fanout);
}
public Task Publish<T>(T @event) where T : class
{
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "order_exchange", routingKey: "", basicProperties: null, body: body);
return Task.CompletedTask;
}
}
6. Event Consumer
Services/OrderCreatedEventConsumer.cs
Copy public class OrderCreatedEventConsumer : BackgroundService
{
private readonly IOrderReadRepository _orderReadRepository;
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderCreatedEventConsumer(IOrderReadRepository orderReadRepository)
{
_orderReadRepository = orderReadRepository;
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "order_exchange", type: ExchangeType.Fanout);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: "order_exchange", routingKey: "");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderCreatedEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(message);
var order = new OrderReadModel
{
Id = orderCreatedEvent.OrderId,
ProductName = orderCreatedEvent.ProductName,
Price = orderCreatedEvent.Price
};
_orderReadRepository.Add(order);
await _orderReadRepository.SaveChangesAsync();
};
_channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
7. Startup Configuration
Startup.cs
Copy public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
// Register MediatR
services.AddMediatR(typeof(Startup));
// Register handlers
services.AddTransient<IRequestHandler<CreateOrderCommand, int>, CreateOrderCommandHandler>();
// Register repositories
services.AddScoped<IOrderWriteRepository, OrderWriteRepository>();
services.AddScoped<IOrderReadRepository, OrderReadRepository>();
// Register RabbitMQ event publisher and consumer
services.AddSingleton<IEventPublisher, RabbitMQEventPublisher>();
services.AddHostedService<OrderCreatedEventConsumer>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
8. Repositories (Assuming)
Repositories/IOrderWriteRepository.cs
Copy public interface IOrderWriteRepository
{
void Add(Order order);
Task SaveChangesAsync();
}
Repositories/IOrderReadRepository.cs
Copy public interface IOrderReadRepository
{
void Add(OrderReadModel order);
Task SaveChangesAsync();
Task<OrderReadModel> GetByIdAsync(int id);
}
9. Controllers
Controllers/OrderController.cs
Copy [ApiController]
[Route("api/[controller]")]
public class OrderController : ControllerBase
{
private readonly IMediator _mediator;
public OrderController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderCommand command)
{
var orderId = await _mediator.Send(command);
return Ok(new { OrderId = orderId });
}
[HttpGet("{id}")]
public async Task<IActionResult> GetOrderById(int id)
{
var order = await _mediator.Send(new GetOrderByIdQuery(id));
if (order == null)
{
return NotFound();
}
return Ok(order);
}
}
Conclusion
By using RabbitMQ for event handling, the CQRS system can efficiently synchronize the read and write databases. RabbitMQ acts as a reliable intermediary that ensures events are disseminated and state is synchronized across system components. MediatR helps manage commands and events, creating a flexible and scalable architecture.
Last updated 8 months ago