Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Playing with Azure Event Hub

DZone's Guide to

Playing with Azure Event Hub

One developer shares with us his experience in poking around Azure Event Hub and helps in getting started with using it.

· Cloud Zone ·
Free Resource

Discover a centralized approach to monitor your virtual infrastructure, on-premise IT environment, and cloud infrastructure – all on a single platform.

I've recently been playing with the Azure Event Hub. This is basically a way of transmitting large amounts* of data between systems. In a later post, I may try and test these limits by designing some kind of game based on this.

As a quick disclaimer, it's worth bearing in mind that I am playing with this technology, and so much of the content of this post can be found in the links at the bottom of this post — you won't find anything original here, just a record of my findings. You may find more (and more accurate) information in those.

The first step, as with many Azure services, is to create a namespace:

For a healthy amount of data transference, you'll pay around £10 per month.

Finally, we'll create event hub within the namespace:

When you create the event hub, it asks how many partitions you need. This basically splits the message delivery, and it's clever enough to work out that if you have 3 partitions and two listeners you should have two slots, and one listener needs only one slot:

We'll need an access policy so that we have permission to listen:

We'll need to create two applications: a producer and a consumer.

Let's start with a producer. Create a new console app and add this NuGet library.

Here's the code:

class Program {
 private static EventHubClient eventHubClient;
 private
 const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Publisher;SharedAccessKey=key;EntityPath=pcm-eventhub1";
 private
 const string EhEntityPath = "pcm-eventhub1";

 public static async Task Main(string[] args) {
  EventHubsConnectionStringBuilder connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString) {
   EntityPath = EhEntityPath
  };

  eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

  while (true) {
   Console.Write("Please enter message to send: ");
   string message = Console.ReadLine();
   if (string.IsNullOrWhiteSpace(message)) break;

   await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
  }

  await eventHubClient.CloseAsync();

  Console.WriteLine("Press ENTER to exit.");
  Console.ReadLine();
 }
}

Consumer

Next we'll create a consumer, so the first thing we'll need is to grant permissions for listening:

We'll create a second new console application with this same library and the processor library, too.

class Program {
 private
 const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Listener;SharedAccessKey=key;EntityPath=pcm-eventhub1";
 private
 const string EhEntityPath = "pcm-eventhub1";
 private
 const string StorageContainerName = "eventhub";
 private
 const string StorageAccountName = "pcmeventhubstorage";
 private
 const string StorageAccountKey = "key";

 private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);

 static async Task Main(string[] args) {
  Console.WriteLine("Registering EventProcessor...");

  var eventProcessorHost = new EventProcessorHost(
   EhEntityPath,
   PartitionReceiver.DefaultConsumerGroupName,
   EhConnectionString,
   StorageConnectionString,
   StorageContainerName);

  // Registers the Event Processor Host and starts receiving messages
  await eventProcessorHost.RegisterEventProcessorAsync < EventsProcessor > ();

  Console.WriteLine("Receiving. Press ENTER to stop worker.");
  Console.ReadLine();

  // Disposes of the Event Processor Host
  await eventProcessorHost.UnregisterEventProcessorAsync();
 }
}

class EventsProcessor: IEventProcessor {
 public Task CloseAsync(PartitionContext context, CloseReason reason) {
  Console.WriteLine($ "Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
  return Task.CompletedTask;
 }

 public Task OpenAsync(PartitionContext context) {
  Console.WriteLine($ "SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
  return Task.CompletedTask;
 }

 public Task ProcessErrorAsync(PartitionContext context, Exception error) {
  Console.WriteLine($ "Error on Partition: {context.PartitionId}, Error: {error.Message}");
  return Task.CompletedTask;
 }

 public Task ProcessEventsAsync(PartitionContext context, IEnumerable < EventData > messages) {
  foreach(var eventData in messages) {
   var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
   Console.WriteLine($ "Message received. Partition: '{context.PartitionId}', Data: '{data}'");
  }

  return context.CheckpointAsync();
 }
}

As you can see, we can now transmit data through the Event Hub into client applications:

Footnotes

*Large, in terms of frequency, rather than volume — for example, transmitting a small message twice a second, rather than uploading a petabyte of data.

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send

Learn how to auto-discover your containers and monitor their performance, capture Docker host and container metrics to allocate host resources, and provision containers.

Topics:
azure ,event ,hub ,cloud ,integration ,c# ,code ,producer application ,consumer application

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}