Intro

There are many ways to interact with Azure Storage Queue, one of them is from C# code using Windows Azure Storage Library. This client library enables working with the Microsoft Azure storage services which include the blob and file service for storing binary and text data, the table service for storing structured non-relational data, and the queue service for storing messages that may be accessed by a client. In this post I will show how to work with Azure Storage Queue Service.

Prerequisites

Refer to previous posts to get started with Azure Storage Queue Service.

Code

Create a new console application and install Azure Storage library using NuGet from package manager console:

Install-Package WindowsAzure.Storage

Get Azure Storage Account keys using PowerShell:

PS C:\WINDOWS\system32> Get-AzureRmStorageAccountKey -ResourceGroupName Default -Name default748347

KeyName Value                                                                                    Permissions
------- -----                                                                                    -----------
key1    YmSWW+PHrBH9CrMlEV9hcY2d9k4rhBReFjlKk171Aa8074liULNLdUL+RhFkaPQNNJ+lkA1AGGjXfLfOMJgy0w==        FULL
key2    /1JI+XVkghkaW/JqlnmVAXe7Zo/DRs/U8B8qoqATKnPSoqjh+vh3vjm+624rJv5uHtL8l0HUUnMq/yuqCheZ8w==        FULL

Use on of these keys and the storage account name to create a connection string to the default748347 storage acccount:

var connectionString = @"DefaultEndpointsProtocol=https;AccountName=default748347;AccountKey=YmSWW+PHrBH9CrMlEV9hcY2d9k4rhBReFjlKk171Aa8074liULNLdUL+RhFkaPQNNJ+lkA1AGGjXfLfOMJgy0w==";

The connection string consist of 3 segments:

  • DefaultEndpointsProtocol - we use https to communicate with the storage account
  • AccountName - storage account name
  • AccountKey - one of the keys from above

Execute the following code in a console application:

var connectionString = @"DefaultEndpointsProtocol=https;AccountName=default748347;AccountKey=YmSWd+PHrBH9CrMlEV9hcY2d9k4rhBReFjlKk171Aa8074liULNLdUL+RhFkaPQNNJ+lkA1AGGjXfLfOMJgy0w==";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference("test");
Console.WriteLine("Exists : {0}", queue.Exists());

The code is self-explanatory. We create a CloudStorageAccount object from connectionString and use it to create a client for the queue. The queue doesn't exist yet.

  1. It is possible to create a queue if it doesn't exist yet, add the following line:
queue.CreateIfNotExists();

or delete if it exists:

queue.DeleteIfExists();

Ordinary Create and Delete operations are also available. It is also posible to Clear the queue.

These lines will add a message to the queue:

var message = new CloudQueueMessage("test");
queue.AddMessage(message);

To read a message from the front of the queue without removing it, do the following:

var peekedMessage = queue.PeekMessage();
Console.WriteLine(peekedMessage.AsString);

This code reads only visible messages.

To read the message from the queue and remove it:

var message = queue.GetMessage();    
Console.WriteLine(message.AsString);
queue.DeleteMessage(message);

It reads only visible messages. A message returned from GetMessage becomes invisible to any other code reading messages from this queue. By default, this message stays invisible for 30 seconds. DeleteMessage removes the message from the queue completely.

The following code throws NRE exception:

queue.Clear();
var message1 = new CloudQueueMessage("test");
queue.AddMessage(message1);

var message2 = queue.GetMessage();
Console.WriteLine(message2.AsString);

var peekedMessage = queue.PeekMessage();
Console.WriteLine(peekedMessage.AsString);

When message2 has been retrieved from the queue it becomes invisible, so PeekMessage will return null.

This code works fine:

queue.Clear();
var message1 = new CloudQueueMessage("test");
queue.AddMessage(message1);

var message2 = queue.GetMessage();
Console.WriteLine(message2.AsString);

Thread.Sleep(TimeSpan.FromSeconds(30));

var peekedMessage = queue.PeekMessage();
Console.WriteLine(peekedMessage.AsString);

After 30 seconds message becomes visible again.

AddMessage method has params that can change a behaviour of the message, for example timeToLive, it is a second param in AddMessage method:

var message1 = new CloudQueueMessage("test");
queue.AddMessage(message1, TimeSpan.FromSeconds(1));

Thread.Sleep(TimeSpan.FromSeconds(2));

var message2 = queue.GetMessage();
Console.WriteLine(message2.AsString);

This code throws exception, because we instructed the queue to remove the message after 1 second, so the call to GetMessage will return null.

The third param of AddMessage is initialVisibilityDelay. Consider a following example:

var message1 = new CloudQueueMessage("test");
queue.AddMessage(message1, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10));

Thread.Sleep(TimeSpan.FromSeconds(2));

var message2 = queue.GetMessage();
if (message2 != null)
{
    Console.WriteLine($"After 2 seconds: {message2.AsString}");
}

Thread.Sleep(TimeSpan.FromSeconds(10));

message2 = queue.GetMessage();
if (message2 != null)
{
    Console.WriteLine($"After 10 seconds: {message2.AsString}");
}

It outputs After 10 seconds: test. The message is invisible for 10 seconds, so the first if statement is not executed, but after 10 seconds it is possible to get the message from the queue.

The following code dequeues the message multiple times and shows all info about the message:

var message1 = new CloudQueueMessage(DateTime.Now.Ticks.ToString());
queue.AddMessage(message1);

while (true)
{
    var message2 = queue.GetMessage();
    Console.WriteLine($"InertionTime: {message2.InsertionTime}, ExpirationTime: {message2.ExpirationTime}, NextVisibleTime: {message2.NextVisibleTime}, Content: {message2.AsString}, PopReceipt: {message2.PopReceipt}, DequeueCount: {message2.DequeueCount}");
    message2.SetMessageContent(DateTime.Now.Ticks.ToString());
    queue.UpdateMessage(message2, TimeSpan.FromSeconds(1), MessageUpdateFields.Visibility | MessageUpdateFields.Content);

    Thread.Sleep(TimeSpan.FromSeconds(1));
}    

All message properties are read-only, it is possible to set message content and visibility using special methods. DequeueCount changes automatically when message dequeued from a queue. InertionTime and ExpirationTime can't be changed after message has been added to queue, the only way to do it is to remove from queue and add it one more time with different parameters. The other limitation is you need to always specify visibilityTimeout and MessageUpdateFields.Visibility, otherwise you will get an exception:

"System.ArgumentException: Calls to UpdateMessage must include the Visibility flag."

To read all messages from the queue in a loop, consider the following technique:

for (var i = 0; i < 100; i++)
{
    queue.AddMessage(new CloudQueueMessage($"message-{i}"));
}

CloudQueueMessage message;
while((message = queue.GetMessage()) != null)
{
    Console.WriteLine(message.AsString);
    queue.DeleteMessage(message);
}

Console.WriteLine("All Messages Processed!");

It is possible to read messages in batches, consider the following example:

for (var i = 0; i < 100; i++)
{
    queue.AddMessage(new CloudQueueMessage($"message-{i}"));
}

var count = 0;
IEnumerable<CloudQueueMessage> messages;
while ((messages = queue.GetMessages(10)) != null && messages.Count() != 0)
{
    Console.WriteLine("-- Batch Start --");
    foreach (var message in messages)
    {
        Console.WriteLine(message.AsString);
        queue.DeleteMessage(message);
        count++;
    }

    Console.WriteLine("-- Batch End --");
}

Console.WriteLine("Processed {0} messages", count);
Console.WriteLine("All Messages Processed!");

This code will read all messages from the queue in batches and process them. It will stop executing the code when there will be no messages in the queue. It is not possible to read more than 32 messages in a single batch. Keep this in mind when working with queues.

The following example shows how producer-consumer pattern can be implemented for basic tests:

var messagesCount = 10;
var workersCount = 3;
var random = new Random((int)DateTime.Now.Ticks);
Parallel.For(0, messagesCount, i =>
{
    // Create work items in parallel, to simulate random input from many workers
    Console.WriteLine($"Add message-{i}");
    queue.AddMessage(new CloudQueueMessage($"message-{i}"));
});

var workers = new List<Task>();
for (var i = 0; i < workersCount; i++)
{
    var worker = new Task(async c =>
    {
        CloudQueueMessage message;
        while ((message = queue.GetMessage()) != null)
        {
            Console.WriteLine($"T{c}: Processing {message.AsString}");
            await Task.Delay(TimeSpan.FromSeconds(random.Next(10)));
            queue.DeleteMessage(message);
        }

        Console.WriteLine($"T{c}: All Messages Processed!");
    }, i);

    workers.Add(worker);
}

workers.ForEach(w => w.Start());
Console.ReadLine();   

This code will add 10 messages to the queue and 3 workers will process all messages one by one. It is possible to increase workers count to process messages faster. This example shows a concept of horizontal scaling - the more workers you have, the faster work will be done.

Summary

In this post I have showed a few samples of communication with Azure Storage Queue. I used C# + Azure Storage library to accomplish basic tasks. The Azure Storage Queue is a very simplistic, but yet powerful service.


;