Playing with Azure Event Hub
One developer shares with us his experience in poking around Azure Event Hub and helps in getting started with using it.
Join the DZone community and get the full member experience.
Join For FreeI've recently been playing with the Azure Event Hub. This is basically a way of transmitting large amounts* of data between systems. In a later post, I may try and test these limits by designing some kind of game based on this.
As a quick disclaimer, it's worth bearing in mind that I am playing with this technology, and so much of the content of this post can be found in the links at the bottom of this post — you won't find anything original here, just a record of my findings. You may find more (and more accurate) information in those.
The first step, as with many Azure services, is to create a namespace:

For a healthy amount of data transference, you'll pay around £10 per month.

Finally, we'll create event hub within the namespace:

When you create the event hub, it asks how many partitions you need. This basically splits the message delivery, and it's clever enough to work out that if you have 3 partitions and two listeners you should have two slots, and one listener needs only one slot:

We'll need an access policy so that we have permission to listen:

We'll need to create two applications: a producer and a consumer.
Let's start with a producer. Create a new console app and add this NuGet library.
Here's the code:
class Program {
private static EventHubClient eventHubClient;
private
const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Publisher;SharedAccessKey=key;EntityPath=pcm-eventhub1";
private
const string EhEntityPath = "pcm-eventhub1";
public static async Task Main(string[] args) {
EventHubsConnectionStringBuilder connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString) {
EntityPath = EhEntityPath
};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
while (true) {
Console.Write("Please enter message to send: ");
string message = Console.ReadLine();
if (string.IsNullOrWhiteSpace(message)) break;
await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}
await eventHubClient.CloseAsync();
Console.WriteLine("Press ENTER to exit.");
Console.ReadLine();
}
}
Consumer
Next we'll create a consumer, so the first thing we'll need is to grant permissions for listening:

We'll create a second new console application with this same library and the processor library, too.
class Program {
private
const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Listener;SharedAccessKey=key;EntityPath=pcm-eventhub1";
private
const string EhEntityPath = "pcm-eventhub1";
private
const string StorageContainerName = "eventhub";
private
const string StorageAccountName = "pcmeventhubstorage";
private
const string StorageAccountKey = "key";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
static async Task Main(string[] args) {
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EhEntityPath,
PartitionReceiver.DefaultConsumerGroupName,
EhConnectionString,
StorageConnectionString,
StorageContainerName);
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync < EventsProcessor > ();
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
}
class EventsProcessor: IEventProcessor {
public Task CloseAsync(PartitionContext context, CloseReason reason) {
Console.WriteLine($ "Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context) {
Console.WriteLine($ "SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error) {
Console.WriteLine($ "Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable < EventData > messages) {
foreach(var eventData in messages) {
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($ "Message received. Partition: '{context.PartitionId}', Data: '{data}'");
}
return context.CheckpointAsync();
}
}
As you can see, we can now transmit data through the Event Hub into client applications:

Footnotes
*Large, in terms of frequency, rather than volume — for example, transmitting a small message twice a second, rather than uploading a petabyte of data.
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-sendPublished at DZone with permission of Paul Michaels, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments