Project Structure
Commands : Define and handle commands.
Events : Define and handle events.
Queries : Define and handle queries.
Repositories : Access data for the read and write databases.
Synchronization : Synchronize between the read and write databases using RabbitMQ.
Example Code
1. Setting Up RabbitMQ
Docker Compose for RabbitMQ
Copy version : '3'
services :
rabbitmq :
image : "rabbitmq:3-management"
ports :
- "5672:5672"
- "15672:15672"
2. Commands
Commands/CreateOrderCommand.cs
Copy public class CreateOrderCommand : IRequest < int >
{
public string ProductName { get ; set ; }
public decimal Price { get ; set ; }
}
Handlers/CreateOrderCommandHandler.cs
Copy public class CreateOrderCommandHandler : IRequestHandler < CreateOrderCommand , int >
{
private readonly IOrderWriteRepository _orderWriteRepository;
private readonly IEventPublisher _eventPublisher;
public CreateOrderCommandHandler ( IOrderWriteRepository orderWriteRepository , IEventPublisher eventPublisher)
{
_orderWriteRepository = orderWriteRepository;
_eventPublisher = eventPublisher;
}
public async Task < int > Handle ( CreateOrderCommand request , CancellationToken cancellationToken)
{
var order = new Order { ProductName = request . ProductName , Price = request . Price };
_orderWriteRepository . Add (order);
await _orderWriteRepository . SaveChangesAsync ();
var orderCreatedEvent = new OrderCreatedEvent ( order . Id , order . ProductName , order . Price );
await _eventPublisher . Publish (orderCreatedEvent);
return order . Id ;
}
}
3. Events
Events/OrderCreatedEvent.cs
Copy public class OrderCreatedEvent
{
public int OrderId { get ; }
public string ProductName { get ; }
public decimal Price { get ; }
public OrderCreatedEvent ( int orderId , string productName , decimal price)
{
OrderId = orderId;
ProductName = productName;
Price = price;
}
}
4. Event Publisher Interface
Services/IEventPublisher.cs
Copy public interface IEventPublisher
{
Task Publish < T >( T @event) where T : class ;
}
5. RabbitMQ Event Publisher
Services/RabbitMQEventPublisher.cs
Copy public class RabbitMQEventPublisher : IEventPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMQEventPublisher ()
{
var factory = new ConnectionFactory () { HostName = "localhost" };
_connection = factory . CreateConnection ();
_channel = _connection . CreateModel ();
_channel . ExchangeDeclare (exchange : "order_exchange" , type : ExchangeType . Fanout );
}
public Task Publish < T >( T @event) where T : class
{
var message = JsonSerializer . Serialize (@event);
var body = Encoding . UTF8 . GetBytes (message);
_channel . BasicPublish (exchange : "order_exchange" , routingKey : "" , basicProperties : null , body : body);
return Task . CompletedTask ;
}
}
6. Event Consumer
Services/OrderCreatedEventConsumer.cs
Copy public class OrderCreatedEventConsumer : BackgroundService
{
private readonly IOrderReadRepository _orderReadRepository;
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderCreatedEventConsumer ( IOrderReadRepository orderReadRepository)
{
_orderReadRepository = orderReadRepository;
var factory = new ConnectionFactory () { HostName = "localhost" };
_connection = factory . CreateConnection ();
_channel = _connection . CreateModel ();
_channel . ExchangeDeclare (exchange : "order_exchange" , type : ExchangeType . Fanout );
var queueName = _channel . QueueDeclare (). QueueName ;
_channel . QueueBind (queue : queueName , exchange : "order_exchange" , routingKey : "" );
var consumer = new EventingBasicConsumer (_channel);
consumer . Received += async (model , ea) =>
{
var body = ea . Body . ToArray ();
var message = Encoding . UTF8 . GetString (body);
var orderCreatedEvent = JsonSerializer . Deserialize < OrderCreatedEvent >(message);
var order = new OrderReadModel
{
Id = orderCreatedEvent . OrderId ,
ProductName = orderCreatedEvent . ProductName ,
Price = orderCreatedEvent . Price
};
_orderReadRepository . Add (order);
await _orderReadRepository . SaveChangesAsync ();
};
_channel . BasicConsume (queue : queueName , autoAck : true , consumer : consumer);
}
protected override Task ExecuteAsync ( CancellationToken stoppingToken)
{
return Task . CompletedTask ;
}
public override void Dispose ()
{
_channel . Close ();
_connection . Close ();
base. Dispose ();
}
}
7. Startup Configuration
Startup.cs
Copy public class Startup
{
public void ConfigureServices ( IServiceCollection services)
{
services . AddControllers ();
// Register MediatR
services . AddMediatR ( typeof ( Startup ));
// Register handlers
services . AddTransient < IRequestHandler < CreateOrderCommand , int > , CreateOrderCommandHandler >();
// Register repositories
services . AddScoped < IOrderWriteRepository , OrderWriteRepository >();
services . AddScoped < IOrderReadRepository , OrderReadRepository >();
// Register RabbitMQ event publisher and consumer
services . AddSingleton < IEventPublisher , RabbitMQEventPublisher >();
services . AddHostedService < OrderCreatedEventConsumer >();
}
public void Configure ( IApplicationBuilder app , IWebHostEnvironment env)
{
if ( env . IsDevelopment ())
{
app . UseDeveloperExceptionPage ();
}
app . UseRouting ();
app . UseEndpoints (endpoints =>
{
endpoints . MapControllers ();
});
}
}
8. Repositories (Assuming)
Repositories/IOrderWriteRepository.cs
Copy public interface IOrderWriteRepository
{
void Add ( Order order);
Task SaveChangesAsync ();
}
Repositories/IOrderReadRepository.cs
Copy public interface IOrderReadRepository
{
void Add ( OrderReadModel order);
Task SaveChangesAsync ();
Task < OrderReadModel > GetByIdAsync ( int id);
}
9. Controllers
Controllers/OrderController.cs
Copy [ ApiController ]
[ Route ( "api/[controller]" )]
public class OrderController : ControllerBase
{
private readonly IMediator _mediator;
public OrderController ( IMediator mediator)
{
_mediator = mediator;
}
[ HttpPost ]
public async Task < IActionResult > CreateOrder ([ FromBody ] CreateOrderCommand command)
{
var orderId = await _mediator . Send (command);
return Ok ( new { OrderId = orderId });
}
[ HttpGet ( "{id}" )]
public async Task < IActionResult > GetOrderById ( int id)
{
var order = await _mediator . Send ( new GetOrderByIdQuery (id));
if (order == null )
{
return NotFound ();
}
return Ok (order);
}
}
Conclusion
By using RabbitMQ for event handling, the CQRS system can efficiently synchronize the read and write databases. RabbitMQ acts as a reliable intermediary that ensures events are disseminated and state is synchronized across system components. MediatR helps manage commands and events, creating a flexible and scalable architecture.
Last updated 6 months ago