IEventProcessor not reading from Event Hub

I am currently working through implementing an Event Hub reader using EventProcessorHost and a simple IEventProcessor implementation. I have confirmed that telemetry data is being written into the Event Hub using Paolo Salvatori's excellent Service Bus Explorer. I have successfully configured the EventProcessorHost to use a storage account for leases and checkpoints. I can see the Event Hub data files in the storage account. The problem that I am seeing at this point is that the IEventProcessor implementation does not appear to be reading anything from the Event Hub.

I am not receiving any exceptions. The test console app is connecting to the storage account without issue. I have noticed that the logging statement I added to the constructor is never being called, so it looks like the receiver is never actually being created. I feel like I am missing something simple. Can anyone help me determine what I have missed? Thank you!

IEventProcessor Implementation:

namespace Receiver { internal class SimpleEventProcessor : IEventProcessor { private Stopwatch _checkPointStopwatch; public SimpleEventProcessor() { Console.WriteLine("SimpleEventProcessor created"); } #region Implementation of IEventProcessor public Task OpenAsync(PartitionContext context) { Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}", context.Lease.PartitionId, context.Lease.Offset); _checkPointStopwatch = new Stopwatch(); _checkPointStopwatch.Start(); return Task.FromResult<object>(null); } public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var data in messages.Select(eventData => Encoding.UTF8.GetString(eventData.GetBytes()))) { Console.WriteLine("Message received. Partition: '{0}', Data: '{1}'", context.Lease.PartitionId, data); } if (_checkPointStopwatch.Elapsed > TimeSpan.FromSeconds(30)) { await context.CheckpointAsync(); _checkPointStopwatch.Restart(); } } public async Task CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine("Processor shutting down. Partition '{0}', Reason: {1}", context.Lease.PartitionId, reason); if (reason == CloseReason.Shutdown) { await context.CheckpointAsync(); } } #endregion } }

Test Console Code:

namespace EventHubTestConsole { internal class Program { private static void Main(string[] args) { AsyncPump.Run((Func<Task>) MainAsync); } private static async Task MainAsync() { const string eventHubConnectionString = "Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>"; const string eventHubName = "<event hub name>"; const string storageAccountName = "<storage account name>"; const string storageAccountKey = "<valid storage key>"; var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey); Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString); var eventProcessorHostName = Guid.NewGuid().ToString(); var eventProcessorHost = new EventProcessorHost( eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); var epo = new EventProcessorOptions { MaxBatchSize = 100, PrefetchCount = 1, ReceiveTimeOut = TimeSpan.FromSeconds(20), InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7) }; epo.ExceptionReceived += OnExceptionReceived; await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo); Console.WriteLine("Receiving. Please enter to stop worker."); Console.ReadLine(); } public static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs args) { Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message); } }

Category:c# Views:3 Time:2019-03-12

Related post

Copyright (C) dskims.com, All Rights Reserved.

processed in 0.093 (s). 11 q(s)