Start stream tutorial 2 for .NET

This commit is contained in:
Arnaud Cogoluègnes 2024-07-09 16:18:20 +02:00
parent 2cf71a3d70
commit 4eaef68cbe
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
4 changed files with 47 additions and 2 deletions

View File

@ -0,0 +1,31 @@
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
var streamSystem = await StreamSystem.Create(new StreamSystemConfig());
var stream = "stream-offset-tracking-dotnet";
await streamSystem.CreateStream(new StreamSpec(stream));
var messageCount = 100;
var confirmedCde = new CountdownEvent(messageCount);
var producer = await Producer.Create(new ProducerConfig(streamSystem, stream) {
ConfirmationHandler = async confirmation => {
if (confirmation.Status == ConfirmationStatus.Confirmed) {
confirmedCde.Signal();
}
await Task.CompletedTask.ConfigureAwait(false);
}
});
Console.WriteLine("Publishing {0} messages...", messageCount);
for (int i = 0; i < messageCount; i++) {
var body = i == messageCount - 1 ? "marker" : "hello";
await producer.Send(new Message(Encoding.UTF8.GetBytes(body)));
}
confirmedCde.Wait();
Console.WriteLine("Messages confirmed.");
await producer.Close();
await streamSystem.Close();

View File

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Stream.Client" Version="1.8.7" />
</ItemGroup>
</Project>

View File

@ -32,7 +32,7 @@ func main() {
ch := make(chan bool)
handlePublishConfirm(chPublishConfirm, messageCount, ch)
fmt.Printf("Publishing %d messages\n", messageCount)
fmt.Printf("Publishing %d messages...\n", messageCount)
for i := 0; i < messageCount; i++ {
var body string
if i == messageCount-1 {

View File

@ -19,7 +19,7 @@ public class OffsetTrackingSend {
int messageCount = 100;
CountDownLatch confirmedLatch = new CountDownLatch(messageCount);
System.out.printf("Publishing %d messages%n", messageCount);
System.out.printf("Publishing %d messages...%n", messageCount);
IntStream.range(0, messageCount).forEach(i -> {
String body = i == messageCount - 1 ? "marker" : "hello";
producer.send(producer.messageBuilder().addData(body.getBytes(UTF_8)).build(),