DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Feature Flags in .NET 8 and Azure
  • Implementing CRUD Operations With NLP Using Microsoft.Extensions.AI
  • Obfuscation vs Encryption: How to Protect Your .NET Code the Right Way
  • Zero to AI Hero, Part 3: Unleashing the Power of Agents in Semantic Kernel

Trending

  • Building Scalable and Resilient Data Pipelines With Apache Airflow
  • Power BI Embedded Analytics — Part 2: Power BI Embedded Overview
  • *You* Can Shape Trend Reports: Join DZone's Software Supply Chain Security Research
  • Scalable System Design: Core Concepts for Building Reliable Software
  1. DZone
  2. Coding
  3. Languages
  4. Managing Queue Messages Using RabbitMQ Client

Managing Queue Messages Using RabbitMQ Client

In this article, I am going to talk about how we can use the RabbitMQ Client library to manage queue messages and provide code snippets to explain the same.

By 
Abhijit Mandal user avatar
Abhijit Mandal
·
May. 08, 18 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
27.1K Views

Join the DZone community and get the full member experience.

Join For Free

Often, we encounter issues with error queues or dead letter queues being flooded with messages. And since they are the lowest priority for any application, the general tendency is to forget about these queues. However, these queues are as important as any other queues and we need to periodically clean the messages from these queues or they will eat up the space allotted to RabbitMQ.

Now, when we think of cleaning error queue messages, what are the options we have? We can’t just blindly delete all the messages there, as some of them might be important. For example, in a critical scenario like banking, a message not processed successfully by the batch process can lead to landing that message in the error queue. The error may be due to transient failures (e.g. network error, database deadlock, service unavailable, etc.), so we need to rerun that message again from the error queue. We should be able to view the message before making the decision to purge or move the message back to the original queue for processing. Also, there should be a functionality to selectively purge/move messages or bulk purge/move messages. So, to manage any error queue message or any queue message, we should be looking for these features:

  • Delete/purge all messages from the queue.
  • Selectively delete/purge messages from the queue.
  • Move all messages from the queue to the original queue (or any other queue).
  • Selectively move messages from the queue to the original queue (or any other queue).

Implementation

Using AMQPS protocol has some limitations regarding the information we can get and operations we can perform using the RabbitMQ client. However, RabbitMQ has an HTTP Client that enables you to perform all operations, but the network security team in most of the organizations won’t be opening that firewall port for you for security reasons.

I will be using the RabbitMQ Client for this article. The RabbitMQ .NET client is an implementation of an AMQP 0-9-1 client library for C# (and, implicitly, other .NET languages). Starting with the 4.0 release, it supports .NET Core and .NET 4.5.1+.

The library is open-source and is dual-licensed under the Apache License v2 and the Mozilla Public License v1.1. The current release of the RabbitMQ .NET/C# client library is 5.0.1. It is recommended to use the NuGet package. Release notes can be found on the GitHub releases page.

User Interface

For the Move All and Purge All options, we are looking for this interface:

Image title

We should create a JSON file containing the Error Queue Name and the Original Queue Name along with the application and application group. The JSON will look something like this:

{
        "ApplicationGroup": "Group1",
        "ApplicationName": "Application1",
        "OriginalQueueName": "OriginalQueue1",           
        "ErrorQueueName": "Queue1"          
}

Clicking on the Queue Name will take us to another screen that will allow us to selectively delete or move the messages after viewing.

Image title

Code Snippets

Let’s walk through the code snippets for the methods we need to perform the required operations:

View All Queue Info

This method will get all the message count information from RabbitMQ. This will return a list of message objects. The code is straightforward:

[Serializable]
    public class Message
    {        
      private readonly string serializableBody;
      public Message ( string serializableBody )
      {
        this.serializableBody = serializableBody;
      }
      public MessageHeader Header
      {
        get;
        private set;
      }
      public string MessageBody
      {
        get 
        {
          return this.serializableBody;
        }           
      }
      public byte[] GetBody()
      {
        return (byte[])((object)Encoding.ASCII.GetBytes(this.MessageBody));              
      }
}

[Serializable]
    public class MessageHeader
    {
        public MessageHeader()
        {
            this.MessageId = Guid.NewGuid().ToString();
            this.Properties = new Dictionary<string, string>();
            this.IsPersistent = true;
        }
        public string AppId { get; set; }   
        public string MessageId { get; set; }
        public string MessageName { get; set; }
        public DateTime GeneratedAtUtc { get; set; }
        public string ExpirationInMilliseconds { get; set; }
        public bool IsPersistent { get; set; }
        public bool Delete { get; set; }
        public bool Move { get; set; }  
        public IDictionary<string, string> Properties { get; private set; }

    }

  public class QueueInfoModel
  {        
    public string Environment { get; set; }
    public string ApplicationGroup { get; set; }
    public string ApplicationName { get; set; }
    public string ErrorQueueName { get; set; 
    public string OriginalQueueName { get; set; }  
    public int MessageCount { get; set;}
  }

public class RabbitMQQueueManager
{
  private readonly string environment;
  private readonly log4net.ILog logger; 
  private const string DELETE_KEY = "delete";
  private const string MOVE_KEY = "move";
  private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
  private const string ERROR_QUEUENAME_KEY = "errorQueueName";
  private const string EXCHANGE_KEY = "originalQueueExchange";
  private const ushort PREFETCH_SIZE = 50;
  public List<QueueInfoModel> GetAllQueueData ( string appgroup = "", string appname = "", string queuename = "" )
{
    List<QueueInfoModel> queueDetails;
    List<QueueInfoModel> queueInfoList = new List<QueueInfoModel> ();
    try
    {
      queueDetails = ReadAllQueueInfoFromJsonConfig ();
      if (!string.IsNullOrEmpty (appgroup))
      {
        queueDetails = queueDetails.Where (x => x.ApplicationGroup.Contains (appgroup, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      if (!string.IsNullOrEmpty (appname))
      {
        queueDetails = queueDetails.Where (x => x.ApplicationName.Contains (appname, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      if (!string.IsNullOrEmpty (queuename))
      {
        queueDetails = queueDetails.Where (x => x.ErrorQueueName.Contains (queuename, StringComparison.CurrentCultureIgnoreCase) || x.OriginalQueueName.Contains (queuename, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      using (var connection = GetRabbitMqConnection ())
      {
        using (var channel = connection.CreateModel ())
        {
          foreach (var queueDetail in queueDetails)
          {
            try
            {
              queueDetail.MessageCount = (int)channel.MessageCount (queueDetail.ErrorQueueName);
            }
            catch (Exception ex)
            {
              this.logger.Error (string.Format ("Error Occured while getting message count for the queue {0}, skipping this queue", queueDetail.ErrorQueueName), ex);
            }                            
            queueInfoList.Add (queueDetail);                           
          }
        }
      }
    }
    catch (Exception)
    {
      throw;
    }
    finally
    {
      queueDetails = null;
    }
    return queueInfoList;
  }
  private List<QueueInfoModel> ReadAllQueueInfoFromJsonConfig ()
  {
    var fullpath = AppDomain.CurrentDomain.BaseDirectory + @"\ErrorMessageQueues.Json";
    var queueDetails = JsonConvert.DeserializeObject<List<QueueInfoModel>> (File.ReadAllText (fullpath));

    return queueDetails;
  }

  private IConnection GetRabbitMqConnection ()
  {
    try
    {               
      ConnectionFactory connectionFactory = new ConnectionFactory ();
      connectionFactory.Uri = "Your URI";
      connectionFactory.Protocol = Protocols.DefaultProtocol;
      connectionFactory.AutomaticRecoveryEnabled = true;
      IConnection conn = connectionFactory.CreateConnection ();
      return conn;
    }
    catch (Exception)
    {               
      throw;
    }
}
}

Delete/Purge All Messages

This is another straightforward operation. The RabbitMQ client provides a purge method.

using (var connection = GetRabbitMqConnection ())
{
  using (var channel = connection.CreateModel ())
  {
    channel.QueuePurge (queueName);
  }
}

Move All Messages From the Queue to Original Queue (or Any Other Queue)

In this operation, we will be reading through each message in the queue and then publishing them to the destination queue. We need to take care of any exception cases that may result in message loss. The RabbitMQ Client does not provide peek functionality out-of-the-box, so we will first read the message and publish it to the destination queue. Then, we will ACK (acknowledge) the message to be removed from the queue. In case of any exception in publishing the message, we need to NACK (no acknowledge) the message. This will put the message back in the error queue.

public List<string> MoveAllMessagesToQueue ( string errorQueueName, string originalQueueName )
{
  QueueingBasicConsumer consumer = null;            
  List<string> messageList = new List<string> ();
  BasicDeliverEventArgs result = null;
  using (var rmqConnection = GetRabbitMqConnection ())
  {
    using (var channel = rmqConnection.CreateModel ())
    {
      try
      {
        var queueMessageCount = (ushort)channel.MessageCount (errorQueueName);
        //Check if =queue message count is less than prefetch count, 
        //if yes then set the prefetch count to queue message count.
        var pfCount = queueMessageCount >= PREFETCH_SIZE ? PREFETCH_SIZE : queueMessageCount;

        channel.BasicQos (0, pfCount, false);
        consumer = new QueueingBasicConsumer (channel);
        channel.BasicConsume (errorQueueName, false, consumer);
        for (int i = 0; i < queueMessageCount; i++)
        {
          if (!channel.IsOpen)
          {
            throw new ApplicationException ("Channel is closed");
          }
          result = consumer.Queue.Dequeue ();
          try
          {
            channel.BasicPublish (string.Empty, originalQueueName, true, result.BasicProperties, result.Body);
            channel.BasicAck (result.DeliveryTag, false);                                
            messageList.Add (result.BasicProperties.MessageId);
          }
          catch (Exception ex)
          {
            ////Nack the message in case of any exception while reading the message.
            channel.BasicNack (result.DeliveryTag, false, true);
            this.logger.Warn ("Error Occured while performing delete operation for message ID: " + result.BasicProperties.MessageId, ex);
          }
        }
      }
      catch (Exception)
      {
        ////Nack the message back to queue in case of exception
        if (result != null)
        {
          channel.BasicNack (result.DeliveryTag, false, true);
        }
        throw;
      }
    }
  }

  this.logger.Info (string.Format ("Successfully moved all messages from error queue {0} to destination queue {1}", errorQueueName, originalQueueName));

  return messageList;
}

Get Messages From the Queue for Viewing

This method will be called once we click on any queue to see the messages. For this, we will deque each message and NACK them to be put back to the queue. This is equivalent to PEEK functionality, but since the RabbitMQ Client does not provide this out-of-the-box, we have to do this workaround. We only need to view this data and then decide the operation we need to perform (purge/move). Also, we are passing the number of messages we need to view — this will be helpful in paging.

public List<Message> GetMessagesFromQueueNoAck ( string queueName, int messageCount = -1 )
        {
            QueueingBasicConsumer consumer = null;          
            var responseMessages = new List<Message> ();
            BasicDeliverEventArgs result = null;
            using (var rmqConnection = GetRabbitMqConnection ())
            {
                using (var channel = rmqConnection.CreateModel ())
                {
                    try
                    {
                        var queueMessageCount = (int)channel.MessageCount (queueName);
                        var queueInfo = GetAllQueueData ("", "", queueName).FirstOrDefault ();
                        var count = messageCount > -1 ? messageCount <= queueMessageCount ? messageCount : queueMessageCount : queueMessageCount;
                        var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                        channel.BasicQos (0, (ushort)pfCount, false);
                        consumer = new QueueingBasicConsumer (channel);
                        channel.BasicConsume (queueName, false, consumer);
                        for (int i = 0; i < pfCount; i++)
                        {
                            if (!channel.IsOpen)
                            {
                                throw new ApplicationException ("Channel is closed");
                            }
                            result = consumer.Queue.Dequeue ();
                            try
                            {
                                string messageData = System.Text.Encoding.UTF8.GetString (result.Body);
                                var rMessage = new Message (messageData);
                                RmqHeaderHandler.ReadRmqMessageProperties (result.BasicProperties, rMessage);
                                channel.BasicNack (result.DeliveryTag, false, true);
                                ////Set Message properties
                                Type t = queueInfo.GetType ();
                                foreach (PropertyInfo pi in t.GetProperties ())
                                {
                                    rMessage.Header.Properties.Add (pi.Name, pi.GetValue (queueInfo, null).ToString ());
                                }
                                responseMessages.Add (rMessage);
                            }
                            catch (Exception ex)
                            {
                                ////Nack the message in case of any exception while reading the message.
                                channel.BasicNack (result.DeliveryTag, false, true);
                                this.logger.Warn ("Error Occured while getting message for message ID" + result.BasicProperties.MessageId, ex);
                            }
                        }
                    }
                    catch (Exception)
                    {
                        ////Nack the message back to queue in case of exception
                        if (result != null)
                        {
                            channel.BasicNack (result.DeliveryTag, false, true);
                        }
                        throw;
                    }
                }
            }
            return responseMessages;
        }

public static class RmqHeaderHandler
    {
        private const byte NonPersistentDeliveryMode = 1;
        private const byte PersistentDeliveryMode = 2;
        private const string SecurityTokenKey = "SecurityToken";
        private const string Properties = "properties";
        private const string MessageNameKey = "MessageName";
        private const string SystemPropertiesKey = "SystemProperties";
        private const string ApplicationPropertiesKey = "ApplicationProperties";
        #region Public Methods
        public static void ReadDynamicMessageProperties(dynamic messageProperties, Message message)
        {
            try
            {                   
                message.Header.AppId = messageProperties.appId;
                message.Header.MessageId = messageProperties.messageId;  
                message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                message.Header.ExpirationInMilliseconds = messageProperties.expirationInMilliseconds;
                message.Header.IsPersistent = messageProperties.isPersistent;
                message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                message.Header.Move = Convert.ToBoolean(messageProperties.move);
                if (messageProperties.ContainsKey ("messageName"))
                {
                    message.Header.MessageName = messageProperties.messageName;
                }                
                if (messageProperties.ContainsKey("properties"))
                {  
                    var customProperties = Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>(Convert.ToString(messageProperties.properties));
                    foreach (var propPair in customProperties)
                    {
                        message.Header.Properties.Add(propPair.Key, propPair.Value);
                    }
                }
            }
            catch(Exception)
            {
                throw;
            }
        }

        public static void ReadRmqMessageProperties(IBasicProperties messageProperties, Message message)
        {
            message.Header.AppId = messageProperties.AppId;
            message.Header.MessageId = messageProperties.MessageId;
            message.Header.GeneratedAtUtc = new DateTime(messageProperties.Timestamp.UnixTime);
            message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
            message.Header.IsPersistent = messageProperties.DeliveryMode == PersistentDeliveryMode;
            if (messageProperties.Headers.ContainsKey (SystemPropertiesKey))
            {
                var systemProperties = DeserializeMessageProperties ((byte[])messageProperties.Headers[SystemPropertiesKey]);
                if (systemProperties.ContainsKey (MessageNameKey))
                {
                    message.Header.MessageName = systemProperties[MessageNameKey];
                }
            }
            if (messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
            {
                var applicationProperties = DeserializeMessageProperties((byte[])messageProperties.Headers[ApplicationPropertiesKey]);
                foreach (var propPair in applicationProperties)
                {
                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                }
            }
        }

        #endregion
        #region private methods        
        private static Dictionary<string, string> DeserializeMessageProperties(byte[] properties)
        {
            var serializer = new JsonMessageSerializer();
            var serializedText = serializer.Serialize(properties);
            return serializer.Deserialize<Dictionary<string, string>>(serializedText);
        }
        #endregion
}

Perform Selected Delete/Move on the Queue Messages

This method will take care of three scenarios:

  1. Move the selected message to the original queue: For this, we will look for the message marked with the "move" attribute. We will first deque a batch of messages and then match the message IDs with the provided messages. If there's a match, we will pick only those messages and publish to the original queue. Once published, we will ACK the message to be removed from the error queue. In case of an exception, we will NACK the message to be put back to the error queue.

  2. Delete selected messages: For this, we will look for messages marked with the "delete" attribute. We will first deque a batch of messages and then match the message IDs with the provided messages. If there's a match, we will pick only those messages and NACK them. This will remove them from error queue. 

  3. Push messages to the back of the queue: This scenario will happen when the user has not made a selection of "purge" or "move" for any queue message or messages. In this case, we will just publish these messages back to the same queue. This will push these messages to the back of the queue. We will be able to browse to the next set of messages. For this, we will match the batch of dequeued messages without any deletion of the move attribute. Then, we will publish to the same queue and ACK the message. This can be done in a transaction, as well, to prevent message loss.

.NET

Opinions expressed by DZone contributors are their own.

Related

  • Feature Flags in .NET 8 and Azure
  • Implementing CRUD Operations With NLP Using Microsoft.Extensions.AI
  • Obfuscation vs Encryption: How to Protect Your .NET Code the Right Way
  • Zero to AI Hero, Part 3: Unleashing the Power of Agents in Semantic Kernel

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!