Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Managing Queue Messages Using RabbitMQ Client

DZone's Guide to

Managing Queue Messages Using RabbitMQ Client

In this article, I am going to talk about how we can use the RabbitMQ Client library to manage queue messages and provide code snippets to explain the same.

· Database Zone ·
Free Resource

Databases are better when they can run themselves. CockroachDB is a SQL database that automates scaling and recovery. Check it out here.

Often, we encounter issues with error queues or dead letter queues being flooded with messages. And since they are the lowest priority for any application, the general tendency is to forget about these queues. However, these queues are as important as any other queues and we need to periodically clean the messages from these queues or they will eat up the space allotted to RabbitMQ.

Now, when we think of cleaning error queue messages, what are the options we have? We can’t just blindly delete all the messages there, as some of them might be important. For example, in a critical scenario like banking, a message not processed successfully by the batch process can lead to landing that message in the error queue. The error may be due to transient failures (e.g. network error, database deadlock, service unavailable, etc.), so we need to rerun that message again from the error queue. We should be able to view the message before making the decision to purge or move the message back to the original queue for processing. Also, there should be a functionality to selectively purge/move messages or bulk purge/move messages. So, to manage any error queue message or any queue message, we should be looking for these features:

  • Delete/purge all messages from the queue.
  • Selectively delete/purge messages from the queue.
  • Move all messages from the queue to the original queue (or any other queue).
  • Selectively move messages from the queue to the original queue (or any other queue).

Implementation

Using AMQPS protocol has some limitations regarding the information we can get and operations we can perform using the RabbitMQ client. However, RabbitMQ has an HTTP Client that enables you to perform all operations, but the network security team in most of the organizations won’t be opening that firewall port for you for security reasons.

I will be using the RabbitMQ Client for this article. The RabbitMQ .NET client is an implementation of an AMQP 0-9-1 client library for C# (and, implicitly, other .NET languages). Starting with the 4.0 release, it supports .NET Core and .NET 4.5.1+.

The library is open-source and is dual-licensed under the Apache License v2 and the Mozilla Public License v1.1. The current release of the RabbitMQ .NET/C# client library is 5.0.1. It is recommended to use the NuGet package. Release notes can be found on the GitHub releases page.

User Interface

For the Move All and Purge All options, we are looking for this interface:

Image title

We should create a JSON file containing the Error Queue Name and the Original Queue Name along with the application and application group. The JSON will look something like this:

{
        "ApplicationGroup": "Group1",
        "ApplicationName": "Application1",
        "OriginalQueueName": "OriginalQueue1",           
        "ErrorQueueName": "Queue1"          
}

Clicking on the Queue Name will take us to another screen that will allow us to selectively delete or move the messages after viewing.

Image title

Code Snippets

Let’s walk through the code snippets for the methods we need to perform the required operations:

View All Queue Info

This method will get all the message count information from RabbitMQ. This will return a list of message objects. The code is straightforward:

[Serializable]
    public class Message
    {        
      private readonly string serializableBody;
      public Message ( string serializableBody )
      {
        this.serializableBody = serializableBody;
      }
      public MessageHeader Header
      {
        get;
        private set;
      }
      public string MessageBody
      {
        get 
        {
          return this.serializableBody;
        }           
      }
      public byte[] GetBody()
      {
        return (byte[])((object)Encoding.ASCII.GetBytes(this.MessageBody));              
      }
}

[Serializable]
    public class MessageHeader
    {
        public MessageHeader()
        {
            this.MessageId = Guid.NewGuid().ToString();
            this.Properties = new Dictionary<string, string>();
            this.IsPersistent = true;
        }
        public string AppId { get; set; }   
        public string MessageId { get; set; }
        public string MessageName { get; set; }
        public DateTime GeneratedAtUtc { get; set; }
        public string ExpirationInMilliseconds { get; set; }
        public bool IsPersistent { get; set; }
        public bool Delete { get; set; }
        public bool Move { get; set; }  
        public IDictionary<string, string> Properties { get; private set; }

    }

  public class QueueInfoModel
  {        
    public string Environment { get; set; }
    public string ApplicationGroup { get; set; }
    public string ApplicationName { get; set; }
    public string ErrorQueueName { get; set; 
    public string OriginalQueueName { get; set; }  
    public int MessageCount { get; set;}
  }

public class RabbitMQQueueManager
{
  private readonly string environment;
  private readonly log4net.ILog logger; 
  private const string DELETE_KEY = "delete";
  private const string MOVE_KEY = "move";
  private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
  private const string ERROR_QUEUENAME_KEY = "errorQueueName";
  private const string EXCHANGE_KEY = "originalQueueExchange";
  private const ushort PREFETCH_SIZE = 50;
  public List<QueueInfoModel> GetAllQueueData ( string appgroup = "", string appname = "", string queuename = "" )
{
    List<QueueInfoModel> queueDetails;
    List<QueueInfoModel> queueInfoList = new List<QueueInfoModel> ();
    try
    {
      queueDetails = ReadAllQueueInfoFromJsonConfig ();
      if (!string.IsNullOrEmpty (appgroup))
      {
        queueDetails = queueDetails.Where (x => x.ApplicationGroup.Contains (appgroup, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      if (!string.IsNullOrEmpty (appname))
      {
        queueDetails = queueDetails.Where (x => x.ApplicationName.Contains (appname, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      if (!string.IsNullOrEmpty (queuename))
      {
        queueDetails = queueDetails.Where (x => x.ErrorQueueName.Contains (queuename, StringComparison.CurrentCultureIgnoreCase) || x.OriginalQueueName.Contains (queuename, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      using (var connection = GetRabbitMqConnection ())
      {
        using (var channel = connection.CreateModel ())
        {
          foreach (var queueDetail in queueDetails)
          {
            try
            {
              queueDetail.MessageCount = (int)channel.MessageCount (queueDetail.ErrorQueueName);
            }
            catch (Exception ex)
            {
              this.logger.Error (string.Format ("Error Occured while getting message count for the queue {0}, skipping this queue", queueDetail.ErrorQueueName), ex);
            }                            
            queueInfoList.Add (queueDetail);                           
          }
        }
      }
    }
    catch (Exception)
    {
      throw;
    }
    finally
    {
      queueDetails = null;
    }
    return queueInfoList;
  }
  private List<QueueInfoModel> ReadAllQueueInfoFromJsonConfig ()
  {
    var fullpath = AppDomain.CurrentDomain.BaseDirectory + @"\ErrorMessageQueues.Json";
    var queueDetails = JsonConvert.DeserializeObject<List<QueueInfoModel>> (File.ReadAllText (fullpath));

    return queueDetails;
  }

  private IConnection GetRabbitMqConnection ()
  {
    try
    {               
      ConnectionFactory connectionFactory = new ConnectionFactory ();
      connectionFactory.Uri = "Your URI";
      connectionFactory.Protocol = Protocols.DefaultProtocol;
      connectionFactory.AutomaticRecoveryEnabled = true;
      IConnection conn = connectionFactory.CreateConnection ();
      return conn;
    }
    catch (Exception)
    {               
      throw;
    }
}
}

Delete/Purge All Messages

This is another straightforward operation. The RabbitMQ client provides a purge method.

using (var connection = GetRabbitMqConnection ())
{
  using (var channel = connection.CreateModel ())
  {
    channel.QueuePurge (queueName);
  }
}

Move All Messages From the Queue to Original Queue (or Any Other Queue)

In this operation, we will be reading through each message in the queue and then publishing them to the destination queue. We need to take care of any exception cases that may result in message loss. The RabbitMQ Client does not provide peek functionality out-of-the-box, so we will first read the message and publish it to the destination queue. Then, we will ACK (acknowledge) the message to be removed from the queue. In case of any exception in publishing the message, we need to NACK (no acknowledge) the message. This will put the message back in the error queue.

public List<string> MoveAllMessagesToQueue ( string errorQueueName, string originalQueueName )
{
  QueueingBasicConsumer consumer = null;            
  List<string> messageList = new List<string> ();
  BasicDeliverEventArgs result = null;
  using (var rmqConnection = GetRabbitMqConnection ())
  {
    using (var channel = rmqConnection.CreateModel ())
    {
      try
      {
        var queueMessageCount = (ushort)channel.MessageCount (errorQueueName);
        //Check if =queue message count is less than prefetch count, 
        //if yes then set the prefetch count to queue message count.
        var pfCount = queueMessageCount >= PREFETCH_SIZE ? PREFETCH_SIZE : queueMessageCount;

        channel.BasicQos (0, pfCount, false);
        consumer = new QueueingBasicConsumer (channel);
        channel.BasicConsume (errorQueueName, false, consumer);
        for (int i = 0; i < queueMessageCount; i++)
        {
          if (!channel.IsOpen)
          {
            throw new ApplicationException ("Channel is closed");
          }
          result = consumer.Queue.Dequeue ();
          try
          {
            channel.BasicPublish (string.Empty, originalQueueName, true, result.BasicProperties, result.Body);
            channel.BasicAck (result.DeliveryTag, false);                                
            messageList.Add (result.BasicProperties.MessageId);
          }
          catch (Exception ex)
          {
            ////Nack the message in case of any exception while reading the message.
            channel.BasicNack (result.DeliveryTag, false, true);
            this.logger.Warn ("Error Occured while performing delete operation for message ID: " + result.BasicProperties.MessageId, ex);
          }
        }
      }
      catch (Exception)
      {
        ////Nack the message back to queue in case of exception
        if (result != null)
        {
          channel.BasicNack (result.DeliveryTag, false, true);
        }
        throw;
      }
    }
  }

  this.logger.Info (string.Format ("Successfully moved all messages from error queue {0} to destination queue {1}", errorQueueName, originalQueueName));

  return messageList;
}

Get Messages From the Queue for Viewing

This method will be called once we click on any queue to see the messages. For this, we will deque each message and NACK them to be put back to the queue. This is equivalent to PEEK functionality, but since the RabbitMQ Client does not provide this out-of-the-box, we have to do this workaround. We only need to view this data and then decide the operation we need to perform (purge/move). Also, we are passing the number of messages we need to view — this will be helpful in paging.

public List<Message> GetMessagesFromQueueNoAck ( string queueName, int messageCount = -1 )
        {
            QueueingBasicConsumer consumer = null;          
            var responseMessages = new List<Message> ();
            BasicDeliverEventArgs result = null;
            using (var rmqConnection = GetRabbitMqConnection ())
            {
                using (var channel = rmqConnection.CreateModel ())
                {
                    try
                    {
                        var queueMessageCount = (int)channel.MessageCount (queueName);
                        var queueInfo = GetAllQueueData ("", "", queueName).FirstOrDefault ();
                        var count = messageCount > -1 ? messageCount <= queueMessageCount ? messageCount : queueMessageCount : queueMessageCount;
                        var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                        channel.BasicQos (0, (ushort)pfCount, false);
                        consumer = new QueueingBasicConsumer (channel);
                        channel.BasicConsume (queueName, false, consumer);
                        for (int i = 0; i < pfCount; i++)
                        {
                            if (!channel.IsOpen)
                            {
                                throw new ApplicationException ("Channel is closed");
                            }
                            result = consumer.Queue.Dequeue ();
                            try
                            {
                                string messageData = System.Text.Encoding.UTF8.GetString (result.Body);
                                var rMessage = new Message (messageData);
                                RmqHeaderHandler.ReadRmqMessageProperties (result.BasicProperties, rMessage);
                                channel.BasicNack (result.DeliveryTag, false, true);
                                ////Set Message properties
                                Type t = queueInfo.GetType ();
                                foreach (PropertyInfo pi in t.GetProperties ())
                                {
                                    rMessage.Header.Properties.Add (pi.Name, pi.GetValue (queueInfo, null).ToString ());
                                }
                                responseMessages.Add (rMessage);
                            }
                            catch (Exception ex)
                            {
                                ////Nack the message in case of any exception while reading the message.
                                channel.BasicNack (result.DeliveryTag, false, true);
                                this.logger.Warn ("Error Occured while getting message for message ID" + result.BasicProperties.MessageId, ex);
                            }
                        }
                    }
                    catch (Exception)
                    {
                        ////Nack the message back to queue in case of exception
                        if (result != null)
                        {
                            channel.BasicNack (result.DeliveryTag, false, true);
                        }
                        throw;
                    }
                }
            }
            return responseMessages;
        }

public static class RmqHeaderHandler
    {
        private const byte NonPersistentDeliveryMode = 1;
        private const byte PersistentDeliveryMode = 2;
        private const string SecurityTokenKey = "SecurityToken";
        private const string Properties = "properties";
        private const string MessageNameKey = "MessageName";
        private const string SystemPropertiesKey = "SystemProperties";
        private const string ApplicationPropertiesKey = "ApplicationProperties";
        #region Public Methods
        public static void ReadDynamicMessageProperties(dynamic messageProperties, Message message)
        {
            try
            {                   
                message.Header.AppId = messageProperties.appId;
                message.Header.MessageId = messageProperties.messageId;  
                message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                message.Header.ExpirationInMilliseconds = messageProperties.expirationInMilliseconds;
                message.Header.IsPersistent = messageProperties.isPersistent;
                message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                message.Header.Move = Convert.ToBoolean(messageProperties.move);
                if (messageProperties.ContainsKey ("messageName"))
                {
                    message.Header.MessageName = messageProperties.messageName;
                }                
                if (messageProperties.ContainsKey("properties"))
                {  
                    var customProperties = Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>(Convert.ToString(messageProperties.properties));
                    foreach (var propPair in customProperties)
                    {
                        message.Header.Properties.Add(propPair.Key, propPair.Value);
                    }
                }
            }
            catch(Exception)
            {
                throw;
            }
        }

        public static void ReadRmqMessageProperties(IBasicProperties messageProperties, Message message)
        {
            message.Header.AppId = messageProperties.AppId;
            message.Header.MessageId = messageProperties.MessageId;
            message.Header.GeneratedAtUtc = new DateTime(messageProperties.Timestamp.UnixTime);
            message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
            message.Header.IsPersistent = messageProperties.DeliveryMode == PersistentDeliveryMode;
            if (messageProperties.Headers.ContainsKey (SystemPropertiesKey))
            {
                var systemProperties = DeserializeMessageProperties ((byte[])messageProperties.Headers[SystemPropertiesKey]);
                if (systemProperties.ContainsKey (MessageNameKey))
                {
                    message.Header.MessageName = systemProperties[MessageNameKey];
                }
            }
            if (messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
            {
                var applicationProperties = DeserializeMessageProperties((byte[])messageProperties.Headers[ApplicationPropertiesKey]);
                foreach (var propPair in applicationProperties)
                {
                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                }
            }
        }

        #endregion
        #region private methods        
        private static Dictionary<string, string> DeserializeMessageProperties(byte[] properties)
        {
            var serializer = new JsonMessageSerializer();
            var serializedText = serializer.Serialize(properties);
            return serializer.Deserialize<Dictionary<string, string>>(serializedText);
        }
        #endregion
}

Perform Selected Delete/Move on the Queue Messages

This method will take care of three scenarios:

  1. Move the selected message to the original queue: For this, we will look for the message marked with the "move" attribute. We will first deque a batch of messages and then match the message IDs with the provided messages. If there's a match, we will pick only those messages and publish to the original queue. Once published, we will ACK the message to be removed from the error queue. In case of an exception, we will NACK the message to be put back to the error queue.

  2. Delete selected messages: For this, we will look for messages marked with the "delete" attribute. We will first deque a batch of messages and then match the message IDs with the provided messages. If there's a match, we will pick only those messages and NACK them. This will remove them from error queue. 

  3. Push messages to the back of the queue: This scenario will happen when the user has not made a selection of "purge" or "move" for any queue message or messages. In this case, we will just publish these messages back to the same queue. This will push these messages to the back of the queue. We will be able to browse to the next set of messages. For this, we will match the batch of dequeued messages without any deletion of the move attribute. Then, we will publish to the same queue and ACK the message. This can be done in a transaction, as well, to prevent message loss.

Databases should be easy to deploy, easy to use, and easy to scale. If you agree, you should check out CockroachDB, a scalable SQL database built for businesses of every size. Check it out here. 

Topics:
rabbitmq ,queue management ,database ,tutorial ,client library ,c# ,queue messages

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}