Uso de RabbitMQ en .Net
docker pull rabbitmq
docker network create myrabbit
docker run -d --network myrabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
rabbitmq-plugins enable rabbitmq_management
private RabbitConsumer CreateConsumerForQueue(string queueName)
{
RabbitConsumer RabbitConsumer = new RabbitConsumer() { QueueName = queueName };
InitChannel(RabbitConsumer, queueName);
RabbitConsumer.Consumer = new AsyncEventingBasicConsumer(RabbitConsumer.Channel);
RabbitConsumer.Consumer.Received += Consumer_Received;
RabbitConsumer.Channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: RabbitConsumer.Consumer);
return RabbitConsumer;
}
private void InitChannel(RabbitConsumer rabbitConsumer,string queueName)
{
ConnectionFactory ConnectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, HostName = _rabbitConfig.HostName, Port = _rabbitConfig.HostPort };
rabbitConsumer.Connection = ConnectionFactory.CreateConnection();
rabbitConsumer.Channel = rabbitConsumer.Connection.CreateModel();
rabbitConsumer.Channel.BasicQos(0, _rabbitConfig.PrefetchCount, false);
#region DECLARE DEAD_LETTER
rabbitConsumer.Channel.ExchangeDeclare($"DeadLetterExchange_{queueName}", ExchangeType.Fanout);
rabbitConsumer.Channel.QueueDeclare(queue: $"DeadLetter_{queueName}",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
rabbitConsumer.Channel.QueueBind($"DeadLetter_{queueName}", $"DeadLetterExchange_{queueName}", "");
Dictionary<string,object> Parameters = new Dictionary<string, object>();
Parameters.Add("x-dead-letter-exchange", $"DeadLetterExchange_{queueName}");
#endregion DECLARE DEAD_LETTER
rabbitConsumer.Channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: Parameters);
}
private Task Consumer_Received(object sender, BasicDeliverEventArgs basicDeliverEventArgs)
{
try
{
RabbitConsumer RabbitConsumer = _rabbitConsumers.SingleOrDefault(rc => rc.QueueName.Equals(basicDeliverEventArgs.RoutingKey));
byte[] BodyBytes = basicDeliverEventArgs.Body.ToArray();
string Message = Encoding.UTF8.GetString(BodyBytes);
this._logger.LogDebug(" [x] Received {0}", Message);
int retriesCount = 0;
while (ProcessMessage(Message, basicDeliverEventArgs.RoutingKey) && retriesCount < _rabbitConfig.NumOfRetries)
{
this._logger.LogError($"ProcessMessage failed with message {Message}");
retriesCount++;
_logger.LogError($"Message {Message} thrown back at queue for retry. New retry count: {retriesCount}");
}
if (retriesCount >= _rabbitConfig.NumOfRetries)
{
this._logger.LogCritical($"Rejected message {Message}");
RabbitConsumer.Channel.BasicNack(basicDeliverEventArgs.DeliveryTag,false, false);
}
else
{
RabbitConsumer.Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error when receive message from queue {basicDeliverEventArgs.RoutingKey}");
}
finally
{
}
return Task.CompletedTask;
}
comments powered by Disqus