Project Structure
Commands : Define and handle commands.
Events : Define and handle events.
Queries : Define and handle queries.
Repositories : Access data for the read and write databases.
Synchronization : Synchronize between the read and write databases using RabbitMQ.
Example Code
1. Setting Up RabbitMQ
Docker Compose for RabbitMQ
Copy version : '3'
services :
rabbitmq :
image : "rabbitmq:3-management"
ports :
- "5672:5672"
- "15672:15672"
2. Commands
Commands/CreateOrderCommand.cs
Copy public class CreateOrderCommand : IRequest < int >
{
public string ProductName { get ; set ; }
public decimal Price { get ; set ; }
}
Handlers/CreateOrderCommandHandler.cs
Copy public class CreateOrderCommandHandler : IRequestHandler < CreateOrderCommand , int >
{
private readonly IOrderWriteRepository _orderWriteRepository;
private readonly IEventPublisher _eventPublisher;
public CreateOrderCommandHandler ( IOrderWriteRepository orderWriteRepository , IEventPublisher eventPublisher)
{
_orderWriteRepository = orderWriteRepository;
_eventPublisher = eventPublisher;
}
public async Task < int > Handle ( CreateOrderCommand request , CancellationToken cancellationToken)
{
var order = new Order { ProductName = request . ProductName , Price = request . Price };
_orderWriteRepository .Add(order);
await _orderWriteRepository .SaveChangesAsync();
var orderCreatedEvent = new OrderCreatedEvent ( order . Id , order . ProductName , order . Price );
await _eventPublisher .Publish(orderCreatedEvent);
return order . Id ;
}
}
3. Events
Events/OrderCreatedEvent.cs
Copy public class OrderCreatedEvent
{
public int OrderId { get ; }
public string ProductName { get ; }
public decimal Price { get ; }
public OrderCreatedEvent ( int orderId , string productName , decimal price)
{
OrderId = orderId;
ProductName = productName;
Price = price;
}
}
4. Event Publisher Interface
Services/IEventPublisher.cs
Copy public interface IEventPublisher
{
Task Publish < T >( T @event) where T : class ;
}
5. RabbitMQ Event Publisher
Services/RabbitMQEventPublisher.cs
Copy public class RabbitMQEventPublisher : IEventPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMQEventPublisher ()
{
var factory = new ConnectionFactory () { HostName = "localhost" };
_connection = factory .CreateConnection();
_channel = _connection .CreateModel();
_channel .ExchangeDeclare(exchange : "order_exchange" , type : ExchangeType . Fanout );
}
public Task Publish < T >( T @event) where T : class
{
var message = JsonSerializer .Serialize(@event);
var body = Encoding . UTF8 .GetBytes(message);
_channel .BasicPublish(exchange : "order_exchange" , routingKey : "" , basicProperties : null , body : body);
return Task . CompletedTask ;
}
}
6. Event Consumer
Services/OrderCreatedEventConsumer.cs
Copy public class OrderCreatedEventConsumer : BackgroundService
{
private readonly IOrderReadRepository _orderReadRepository;
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderCreatedEventConsumer ( IOrderReadRepository orderReadRepository)
{
_orderReadRepository = orderReadRepository;
var factory = new ConnectionFactory () { HostName = "localhost" };
_connection = factory .CreateConnection();
_channel = _connection .CreateModel();
_channel .ExchangeDeclare(exchange : "order_exchange" , type : ExchangeType . Fanout );
var queueName = _channel .QueueDeclare(). QueueName ;
_channel .QueueBind(queue : queueName , exchange : "order_exchange" , routingKey : "" );
var consumer = new EventingBasicConsumer (_channel);
consumer . Received += async (model , ea) =>
{
var body = ea . Body .ToArray();
var message = Encoding . UTF8 .GetString(body);
var orderCreatedEvent = JsonSerializer .Deserialize < OrderCreatedEvent > (message);
var order = new OrderReadModel
{
Id = orderCreatedEvent . OrderId ,
ProductName = orderCreatedEvent . ProductName ,
Price = orderCreatedEvent . Price
};
_orderReadRepository .Add(order);
await _orderReadRepository .SaveChangesAsync();
};
_channel .BasicConsume(queue : queueName , autoAck : true , consumer : consumer);
}
protected override Task ExecuteAsync ( CancellationToken stoppingToken)
{
return Task . CompletedTask ;
}
public override void Dispose ()
{
_channel .Close();
_connection .Close();
base.Dispose();
}
}
7. Startup Configuration
Startup.cs
Copy public class Startup
{
public void ConfigureServices ( IServiceCollection services)
{
services .AddControllers();
// Register MediatR
services .AddMediatR( typeof ( Startup ));
// Register handlers
services .AddTransient < IRequestHandler < CreateOrderCommand, int> , CreateOrderCommandHandler > ();
// Register repositories
services .AddScoped < IOrderWriteRepository, OrderWriteRepository > ();
services .AddScoped < IOrderReadRepository, OrderReadRepository > ();
// Register RabbitMQ event publisher and consumer
services .AddSingleton < IEventPublisher, RabbitMQEventPublisher > ();
services .AddHostedService < OrderCreatedEventConsumer > ();
}
public void Configure ( IApplicationBuilder app , IWebHostEnvironment env)
{
if ( env .IsDevelopment())
{
app .UseDeveloperExceptionPage();
}
app .UseRouting();
app .UseEndpoints(endpoints =>
{
endpoints .MapControllers();
});
}
}
8. Repositories (Assuming)
Repositories/IOrderWriteRepository.cs
Copy public interface IOrderWriteRepository
{
void Add ( Order order);
Task SaveChangesAsync ();
}
Repositories/IOrderReadRepository.cs
Copy public interface IOrderReadRepository
{
void Add ( OrderReadModel order);
Task SaveChangesAsync ();
Task < OrderReadModel > GetByIdAsync ( int id);
}
9. Controllers
Controllers/OrderController.cs
Copy [ ApiController ]
[ Route ( "api/[controller]" )]
public class OrderController : ControllerBase
{
private readonly IMediator _mediator;
public OrderController ( IMediator mediator)
{
_mediator = mediator;
}
[ HttpPost ]
public async Task < IActionResult > CreateOrder ([ FromBody ] CreateOrderCommand command)
{
var orderId = await _mediator .Send(command);
return Ok( new { OrderId = orderId });
}
[ HttpGet ( "{id}" )]
public async Task < IActionResult > GetOrderById ( int id)
{
var order = await _mediator .Send( new GetOrderByIdQuery (id));
if (order == null )
{
return NotFound();
}
return Ok(order);
}
}
Conclusion
By using RabbitMQ for event handling, the CQRS system can efficiently synchronize the read and write databases. RabbitMQ acts as a reliable intermediary that ensures events are disseminated and state is synchronized across system components. MediatR helps manage commands and events, creating a flexible and scalable architecture.
Last updated 7 months ago