Avoid somewhat complicated if logic.

This commit is contained in:
Luke Bakken 2025-02-05 11:07:15 -08:00
parent f60fa9fbfe
commit f8095d8690
No known key found for this signature in database
GPG Key ID: D99DE30E43EAE440

View File

@ -1,7 +1,7 @@
using System.Buffers.Binary;
using RabbitMQ.Client;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Text;
using RabbitMQ.Client;
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
@ -94,29 +94,36 @@ async Task PublishMessagesInBatchAsync()
ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props);
publishTasks.Add(publishTask);
// NOTE: [publishTasks] should be published after the final message has been added,
// even if the # of tasks it contains isn't equal to [batchSize].
if (publishTasks.Count == batchSize || i+1 == MESSAGE_COUNT)
{
foreach (ValueTask pt in publishTasks)
{
try
{
await pt;
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
}
}
publishTasks.Clear();
}
await MaybeAwaitPublishes(publishTasks, batchSize);
}
// Await any remaining tasks in case message count was not
// evenly divisible by batch size.
await MaybeAwaitPublishes(publishTasks, 0);
sw.Stop();
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
}
static async Task MaybeAwaitPublishes(List<ValueTask> publishTasks, int batchSize)
{
if (publishTasks.Count >= batchSize)
{
foreach (ValueTask pt in publishTasks)
{
try
{
await pt;
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
}
}
publishTasks.Clear();
}
}
async Task HandlePublishConfirmsAsynchronously()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");