CQRS + MediatR + EDA + RabbitMQ

Project Structure

  1. Commands: Define and handle commands.

  2. Events: Define and handle events.

  3. Queries: Define and handle queries.

  4. Repositories: Access data for the read and write databases.

  5. Synchronization: Synchronize between the read and write databases using RabbitMQ.

Example Code

1. Setting Up RabbitMQ

Docker Compose for RabbitMQ

version: '3'
services:
  rabbitmq:
    image: "rabbitmq:3-management"
    ports:
      - "5672:5672"
      - "15672:15672"

2. Commands

Commands/CreateOrderCommand.cs

public class CreateOrderCommand : IRequest<int>
{
    public string ProductName { get; set; }
    public decimal Price { get; set; }
}

Handlers/CreateOrderCommandHandler.cs

public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, int>
{
    private readonly IOrderWriteRepository _orderWriteRepository;
    private readonly IEventPublisher _eventPublisher;

    public CreateOrderCommandHandler(IOrderWriteRepository orderWriteRepository, IEventPublisher eventPublisher)
    {
        _orderWriteRepository = orderWriteRepository;
        _eventPublisher = eventPublisher;
    }

    public async Task<int> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order { ProductName = request.ProductName, Price = request.Price };
        _orderWriteRepository.Add(order);
        await _orderWriteRepository.SaveChangesAsync();

        var orderCreatedEvent = new OrderCreatedEvent(order.Id, order.ProductName, order.Price);
        await _eventPublisher.Publish(orderCreatedEvent);

        return order.Id;
    }
}

3. Events

Events/OrderCreatedEvent.cs

public class OrderCreatedEvent
{
    public int OrderId { get; }
    public string ProductName { get; }
    public decimal Price { get; }

    public OrderCreatedEvent(int orderId, string productName, decimal price)
    {
        OrderId = orderId;
        ProductName = productName;
        Price = price;
    }
}

4. Event Publisher Interface

Services/IEventPublisher.cs

public interface IEventPublisher
{
    Task Publish<T>(T @event) where T : class;
}

5. RabbitMQ Event Publisher

Services/RabbitMQEventPublisher.cs

public class RabbitMQEventPublisher : IEventPublisher
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public RabbitMQEventPublisher()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.ExchangeDeclare(exchange: "order_exchange", type: ExchangeType.Fanout);
    }

    public Task Publish<T>(T @event) where T : class
    {
        var message = JsonSerializer.Serialize(@event);
        var body = Encoding.UTF8.GetBytes(message);

        _channel.BasicPublish(exchange: "order_exchange", routingKey: "", basicProperties: null, body: body);
        return Task.CompletedTask;
    }
}

6. Event Consumer

Services/OrderCreatedEventConsumer.cs

public class OrderCreatedEventConsumer : BackgroundService
{
    private readonly IOrderReadRepository _orderReadRepository;
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public OrderCreatedEventConsumer(IOrderReadRepository orderReadRepository)
    {
        _orderReadRepository = orderReadRepository;
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.ExchangeDeclare(exchange: "order_exchange", type: ExchangeType.Fanout);
        var queueName = _channel.QueueDeclare().QueueName;
        _channel.QueueBind(queue: queueName, exchange: "order_exchange", routingKey: "");

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var orderCreatedEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(message);

            var order = new OrderReadModel
            {
                Id = orderCreatedEvent.OrderId,
                ProductName = orderCreatedEvent.ProductName,
                Price = orderCreatedEvent.Price
            };

            _orderReadRepository.Add(order);
            await _orderReadRepository.SaveChangesAsync();
        };
        _channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _channel.Close();
        _connection.Close();
        base.Dispose();
    }
}

7. Startup Configuration

Startup.cs

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddControllers();
        
        // Register MediatR
        services.AddMediatR(typeof(Startup));

        // Register handlers
        services.AddTransient<IRequestHandler<CreateOrderCommand, int>, CreateOrderCommandHandler>();

        // Register repositories
        services.AddScoped<IOrderWriteRepository, OrderWriteRepository>();
        services.AddScoped<IOrderReadRepository, OrderReadRepository>();

        // Register RabbitMQ event publisher and consumer
        services.AddSingleton<IEventPublisher, RabbitMQEventPublisher>();
        services.AddHostedService<OrderCreatedEventConsumer>();
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapControllers();
        });
    }
}

8. Repositories (Assuming)

Repositories/IOrderWriteRepository.cs

public interface IOrderWriteRepository
{
    void Add(Order order);
    Task SaveChangesAsync();
}

Repositories/IOrderReadRepository.cs

public interface IOrderReadRepository
{
    void Add(OrderReadModel order);
    Task SaveChangesAsync();
    Task<OrderReadModel> GetByIdAsync(int id);
}

9. Controllers

Controllers/OrderController.cs

[ApiController]
[Route("api/[controller]")]
public class OrderController : ControllerBase
{
    private readonly IMediator _mediator;

    public OrderController(IMediator mediator)
    {
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderCommand command)
    {
        var orderId = await _mediator.Send(command);
        return Ok(new { OrderId = orderId });
    }

    [HttpGet("{id}")]
    public async Task<IActionResult> GetOrderById(int id)
    {
        var order = await _mediator.Send(new GetOrderByIdQuery(id));
        if (order == null)
        {
            return NotFound();
        }
        return Ok(order);
    }
}

Conclusion

By using RabbitMQ for event handling, the CQRS system can efficiently synchronize the read and write databases. RabbitMQ acts as a reliable intermediary that ensures events are disseminated and state is synchronized across system components. MediatR helps manage commands and events, creating a flexible and scalable architecture.

Last updated