rabbitmq-tutorials/javascript-nodejs-stream/offset_tracking_receive.js
Michele Agnello 514aa10587
fix: example beahviour (#467)
Co-authored-by: magne <magnello@coders51.com>
2024-08-26 12:14:41 +02:00

60 lines
1.8 KiB
JavaScript

const rabbit = require("rabbitmq-stream-js-client");
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
async function main() {
console.log("Connecting...");
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
});
console.log("Making sure the stream exists...");
const streamName = "stream-offset-tracking-javascript";
await client.createStream({ stream: streamName, arguments: {} });
const consumerRef = "offset-tracking-tutorial";
let firstOffset = undefined;
let offsetSpecification = rabbit.Offset.first();
try {
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName });
offsetSpecification = rabbit.Offset.offset(offset + 1n);
} catch (e) {}
let lastOffset = offsetSpecification.value;
let messageCount = 0;
const consumer = await client.declareConsumer(
{ stream: streamName, offset: offsetSpecification, consumerRef },
async (message) => {
messageCount++;
if (!firstOffset && messageCount === 1) {
firstOffset = message.offset;
console.log("First message received");
}
if (messageCount % 10 === 0) {
await consumer.storeOffset(message.offset);
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
await consumer.storeOffset(message.offset);
await consumer.close(true);
}
}
);
console.log(`Start consuming...`);
await sleep(2000);
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
}
main()
.then(async () => process.exit(0))
.catch((res) => {
console.log("Error while receiving message!", res);
process.exit(-1);
});