diff --git a/dotnet/PublisherConfirms/PublisherConfirms.cs b/dotnet/PublisherConfirms/PublisherConfirms.cs index cd159e6..5fbb1a1 100644 --- a/dotnet/PublisherConfirms/PublisherConfirms.cs +++ b/dotnet/PublisherConfirms/PublisherConfirms.cs @@ -1,7 +1,7 @@ -using System.Buffers.Binary; +using RabbitMQ.Client; +using System.Buffers.Binary; using System.Diagnostics; using System.Text; -using RabbitMQ.Client; const ushort MAX_OUTSTANDING_CONFIRMS = 256; @@ -94,29 +94,36 @@ async Task PublishMessagesInBatchAsync() ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props); publishTasks.Add(publishTask); - // NOTE: [publishTasks] should be published after the final message has been added, - // even if the # of tasks it contains isn't equal to [batchSize]. - if (publishTasks.Count == batchSize || i+1 == MESSAGE_COUNT) - { - foreach (ValueTask pt in publishTasks) - { - try - { - await pt; - } - catch (Exception ex) - { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); - } - } - publishTasks.Clear(); - } + await MaybeAwaitPublishes(publishTasks, batchSize); } + // Await any remaining tasks in case message count was not + // evenly divisible by batch size. + await MaybeAwaitPublishes(publishTasks, 0); + sw.Stop(); Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); } +static async Task MaybeAwaitPublishes(List publishTasks, int batchSize) +{ + if (publishTasks.Count >= batchSize) + { + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); + } + } + publishTasks.Clear(); + } +} + async Task HandlePublishConfirmsAsynchronously() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");