60 lines
1.8 KiB
JavaScript
60 lines
1.8 KiB
JavaScript
const rabbit = require("rabbitmq-stream-js-client");
|
|
|
|
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
|
|
|
|
async function main() {
|
|
console.log("Connecting...");
|
|
const client = await rabbit.connect({
|
|
hostname: "localhost",
|
|
port: 5552,
|
|
username: "guest",
|
|
password: "guest",
|
|
vhost: "/",
|
|
});
|
|
|
|
console.log("Making sure the stream exists...");
|
|
const streamName = "stream-offset-tracking-javascript";
|
|
await client.createStream({ stream: streamName, arguments: {} });
|
|
|
|
const consumerRef = "offset-tracking-tutorial";
|
|
let firstOffset = undefined;
|
|
let offsetSpecification = rabbit.Offset.first();
|
|
try {
|
|
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName });
|
|
offsetSpecification = rabbit.Offset.offset(offset + 1n);
|
|
} catch (e) {}
|
|
|
|
let lastOffset = offsetSpecification.value;
|
|
let messageCount = 0;
|
|
const consumer = await client.declareConsumer(
|
|
{ stream: streamName, offset: offsetSpecification, consumerRef },
|
|
async (message) => {
|
|
messageCount++;
|
|
if (!firstOffset && messageCount === 1) {
|
|
firstOffset = message.offset;
|
|
console.log("First message received");
|
|
}
|
|
if (messageCount % 10 === 0) {
|
|
await consumer.storeOffset(message.offset);
|
|
}
|
|
if (message.content.toString() === "marker") {
|
|
console.log("Marker found");
|
|
lastOffset = message.offset;
|
|
await consumer.storeOffset(message.offset);
|
|
await consumer.close(true);
|
|
}
|
|
}
|
|
);
|
|
|
|
console.log(`Start consuming...`);
|
|
await sleep(2000);
|
|
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
|
|
}
|
|
|
|
main()
|
|
.then(async () => process.exit(0))
|
|
.catch((res) => {
|
|
console.log("Error while receiving message!", res);
|
|
process.exit(-1);
|
|
});
|