From 4eaef68cbeba1ea53817f073667c18fc54c0965e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 9 Jul 2024 16:18:20 +0200 Subject: [PATCH] Start stream tutorial 2 for .NET --- .../OffsetTrackingSend/OffsetTrackingSend.cs | 31 +++++++++++++++++++ .../OffsetTrackingSend.csproj | 14 +++++++++ go-stream/offset_tracking_send.go | 2 +- .../src/main/java/OffsetTrackingSend.java | 2 +- 4 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.cs create mode 100644 dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.csproj diff --git a/dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.cs b/dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.cs new file mode 100644 index 0000000..9fe16d2 --- /dev/null +++ b/dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.cs @@ -0,0 +1,31 @@ +using System.Text; +using System.Threading.Tasks; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.Reliable; + +var streamSystem = await StreamSystem.Create(new StreamSystemConfig()); + +var stream = "stream-offset-tracking-dotnet"; +await streamSystem.CreateStream(new StreamSpec(stream)); + +var messageCount = 100; +var confirmedCde = new CountdownEvent(messageCount); +var producer = await Producer.Create(new ProducerConfig(streamSystem, stream) { + ConfirmationHandler = async confirmation => { + if (confirmation.Status == ConfirmationStatus.Confirmed) { + confirmedCde.Signal(); + } + await Task.CompletedTask.ConfigureAwait(false); + } +}); + +Console.WriteLine("Publishing {0} messages...", messageCount); +for (int i = 0; i < messageCount; i++) { + var body = i == messageCount - 1 ? "marker" : "hello"; + await producer.Send(new Message(Encoding.UTF8.GetBytes(body))); +} + +confirmedCde.Wait(); +Console.WriteLine("Messages confirmed."); +await producer.Close(); +await streamSystem.Close(); diff --git a/dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.csproj b/dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.csproj new file mode 100644 index 0000000..f700c37 --- /dev/null +++ b/dotnet-stream/OffsetTrackingSend/OffsetTrackingSend.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/go-stream/offset_tracking_send.go b/go-stream/offset_tracking_send.go index 0314e46..84d2358 100644 --- a/go-stream/offset_tracking_send.go +++ b/go-stream/offset_tracking_send.go @@ -32,7 +32,7 @@ func main() { ch := make(chan bool) handlePublishConfirm(chPublishConfirm, messageCount, ch) - fmt.Printf("Publishing %d messages\n", messageCount) + fmt.Printf("Publishing %d messages...\n", messageCount) for i := 0; i < messageCount; i++ { var body string if i == messageCount-1 { diff --git a/java-stream-mvn/src/main/java/OffsetTrackingSend.java b/java-stream-mvn/src/main/java/OffsetTrackingSend.java index c2724db..11ec162 100644 --- a/java-stream-mvn/src/main/java/OffsetTrackingSend.java +++ b/java-stream-mvn/src/main/java/OffsetTrackingSend.java @@ -19,7 +19,7 @@ public class OffsetTrackingSend { int messageCount = 100; CountDownLatch confirmedLatch = new CountDownLatch(messageCount); - System.out.printf("Publishing %d messages%n", messageCount); + System.out.printf("Publishing %d messages...%n", messageCount); IntStream.range(0, messageCount).forEach(i -> { String body = i == messageCount - 1 ? "marker" : "hello"; producer.send(producer.messageBuilder().addData(body.getBytes(UTF_8)).build(),