Azure queues – updating messages

The default invisibility timeout of 30 seconds may not suffice on occasions. For those situations, you can change the timeout so that you get more time to do something with your message prior to deleting it.

void DeQueueMessagesWithTimeOut(std::chrono::seconds timeout)
{
  auto storage_account = cloud_storage_account::parse(
      U("UseDevelopmentStorage=true"));
  auto queue_client = storage_account.create_cloud_queue_client();
  auto queue = queue_client.get_queue_reference(
      U("sample"));
  bool created = queue.create_if_not_exists();

  queue.download_attributes();
  int count = queue.approximate_message_count();
  ucout << U("Approx count = ") << count << endl;

  queue_request_options options;
  operation_context op_context;
  auto messages = queue.get_messages(
      count, timeout, options, op_context);
  for (auto message : messages)
  {
    // process the message here
    queue.delete_message(message);
    ucout << message.content_as_string() << endl;
  }

  queue.download_attributes();
  count = queue.approximate_message_count();
  ucout << U("Approx count = ") << count << endl;
}

You can also update an existing message. The code snippet below shows how that can be done.

void UpdateQueueMessage()
{
  auto storage_account = cloud_storage_account::parse(
      U("UseDevelopmentStorage=true"));
  auto queue_client = storage_account.create_cloud_queue_client();
  auto queue = queue_client.get_queue_reference(
      U("sample"));
  bool created = queue.create_if_not_exists(); 

  auto message = queue.get_message();
  message.set_content(U("Updated content"));
  queue.update_message(
      message, std::chrono::seconds(0), true);
}

One thing to be aware of is that the position of the message in the queue is lost. The updated content goes back to the back of the queue. This shouldn’t matter though for most practical scenarios as the order of items in the queue is not expected to be depended on by consumers. This might be a little different to how you used queues in your programming classes, where it was a guaranteed FIFO structure. But remember, those queues did not allow in-place edits either.

Using Azure queues from C++

Azure queues are used to store a large number of items (referred to as messages). Each message can go up to 64 Kb. A typical use for queues is intra-app communication, for example your website might add messages to the queue for a background service to process later on. The C++ Azure storage SDK maintains its consistent API when it comes to queues as well (so if you know how to use blobs and tables, you are good). Here’s a code snippet that shows how to insert messages into a queue.

void InsertQueueMessages()
{
  auto storage_account = cloud_storage_account::parse(
      U("UseDevelopmentStorage=true"));
  auto queue_client = storage_account
      .create_cloud_queue_client();
  auto queue = queue_client.get_queue_reference(
      U("sample"));
  bool created = queue.create_if_not_exists();

  queue.add_message(cloud_queue_message(
      U("queue message 01")));
  queue.add_message(cloud_queue_message(
      U("queue message 02")));
  queue.add_message(cloud_queue_message(
      U("queue message 03")));
}

Like any decent queue, you can either peek or dequeue a message. Here’s an example snippet that shows how to peek into a queue.

void PeekQueueMessage()
{
  auto storage_account = cloud_storage_account::parse(
      U("UseDevelopmentStorage=true"));
  auto queue_client = storage_account
      .create_cloud_queue_client();
  auto queue = queue_client.get_queue_reference(
      U("sample"));
  bool created = queue.create_if_not_exists();

  queue.download_attributes();
  int count = queue.approximate_message_count();
  ucout << U("Approx count = ") << count << endl;

  // Peek the next 2 messages (assumes at least 2 present)
  auto messages = queue.peek_messages(2);
  for (auto message : messages)
  {
    ucout << message.content_as_string() << endl;
  }
}

The count method has ‘approximate‘ in the name because the count returned is not refreshed until the next call to download_attributes.

Here’s a code snippet that shows how to dequeue items from a queue.

void DeQueueMessages()
{
  auto storage_account = cloud_storage_account::parse(
      U("UseDevelopmentStorage=true"));
  auto queue_client = storage_account
      .create_cloud_queue_client();
  auto queue = queue_client.get_queue_reference(
      U("sample"));
  bool created = queue.create_if_not_exists();

  queue.download_attributes();
  int count = queue.approximate_message_count();
  ucout << U("Approx count = ") << count << endl;

  for (size_t i = 0; i < count; i++)
  {
    auto message = queue.get_message();
    queue.delete_message(message);
    ucout << message.content_as_string() << endl;
  }

  queue.download_attributes();
  count = queue.approximate_message_count();
  ucout << U("Approx count = ") << count << endl;
}

Notice how I call delete_message after calling get_message. This is because get_message only hides the item from other callers for a default time out (30 seconds) after which the message is back. This allows you to rollback changes in case something goes wrong. So you’d get a message, process it, and when you are done processing it, you can call delete_message. This assumes you do your processing in under 30 seconds. If you want to increase (or decrease) that time out, you can do that too. I’ll cover that in the next blog entry along with showing how to update a message in the queue.

Downloading blob content

I missed this in the previous blog entry. The SDK makes it really trivial to extract blob data. For text content you can directly get the text, and for non-text content you can either download to a file or to a stream. The code snippet below shows how it’s done.

void DownloadBlobData()
{
  auto storage_account = cloud_storage_account::parse(
      U("UseDevelopmentStorage=true"));
  auto blob_client = storage_account.create_cloud_blob_client();
  auto container = blob_client.get_container_reference(
      U("textdata"));
  bool created = container.create_if_not_exists();

  // Read the text content directly
  auto text_blob1 = container.get_block_blob_reference(
      U("texts/text1"));
  auto text = text_blob1.download_text();
  ucout << text << endl;

  // Download the blob data to a file
  auto text_blob2 = container.get_block_blob_reference(
      U("texts/text2"));
  text_blob2.download_to_file(
      U("d:\\tmp\\blobdata.txt"));

  // Download the blob data to an ostream
  stringstreambuf buffer;
  concurrency::streams::ostream output(buffer);
  text_blob2.download_to_stream(output);
  cout << buffer.collection() << endl;
}