Copy public class OrderCreatedEventConsumer : BackgroundService
{
private readonly IOrderReadRepository _orderReadRepository;
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderCreatedEventConsumer(IOrderReadRepository orderReadRepository)
{
_orderReadRepository = orderReadRepository;
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "order_exchange", type: ExchangeType.Fanout);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: "order_exchange", routingKey: "");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderCreatedEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(message);
var order = new OrderReadModel
{
Id = orderCreatedEvent.OrderId,
ProductName = orderCreatedEvent.ProductName,
Price = orderCreatedEvent.Price
};
_orderReadRepository.Add(order);
await _orderReadRepository.SaveChangesAsync();
};
_channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}