add nodejs-stream offset-tracking tutorials

This commit is contained in:
magne 2024-08-08 11:33:43 +02:00
parent db0a9ce16b
commit 0d25d323d0
4 changed files with 107 additions and 8 deletions

View File

@ -0,0 +1,61 @@
const rabbit = require("rabbitmq-stream-js-client");
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
async function main() {
const streamName = "stream-offset-tracking-javascript";
console.log("Connecting...");
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
});
const consumerRef = "offset-tracking-tutorial";
let firstOffset = undefined;
let offsetSpecification = rabbit.Offset.first();
try {
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName });
firstOffset = offset + 1n;
offsetSpecification = rabbit.Offset.offset(firstOffset);
} catch (e) {}
let lastOffset = offsetSpecification.value;
let messageCount = 0;
const consumer = await client.declareConsumer(
{ stream: streamName, offset: offsetSpecification, consumerRef },
async (message) => {
messageCount++;
if (!firstOffset && messageCount === 1) {
firstOffset = message.offset;
console.log("First message received");
}
if (messageCount % 10 === 0) {
console.log("Storing offset");
await consumer.storeOffset(message.offset);
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
await consumer.storeOffset(message.offset);
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
await consumer.close(true);
}
}
);
console.log(`Start consuming...`);
await sleep(2000);
}
main()
.then(async () => {
await new Promise(function () {});
})
.catch((res) => {
console.log("Error while receiving message!", res);
process.exit(-1);
});

View File

@ -0,0 +1,36 @@
const rabbit = require("rabbitmq-stream-js-client");
async function main() {
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "localhost",
username: "guest",
password: "guest",
});
console.log("Making sure the stream exists...");
const streamName = "stream-offset-tracking-javascript";
await client.createStream({ stream: streamName, arguments: {} });
console.log("Creating the publisher...");
const publisher = await client.declarePublisher({ stream: streamName });
const messageCount = 100;
console.log(`Publishing ${messageCount} messages`);
for (let i = 0; i < messageCount; i++) {
const body = i === messageCount - 1 ? "marker" : `hello ${i}`;
await publisher.send(Buffer.from(body));
}
console.log("Closing the connection...");
await client.close();
}
main()
.then(() => console.log("done!"))
.catch((res) => {
console.log("Error in publishing message!", res);
process.exit(-1);
});

View File

@ -8,7 +8,7 @@
"name": "rabbitmq-stream-node-tutorial",
"version": "1.0.0",
"dependencies": {
"rabbitmq-stream-js-client": "^0.3.1"
"rabbitmq-stream-js-client": "^0.4.1"
}
},
"node_modules/lru-cache": {
@ -23,9 +23,9 @@
}
},
"node_modules/rabbitmq-stream-js-client": {
"version": "0.3.1",
"resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.3.1.tgz",
"integrity": "sha512-x2xfH+otHquRNPzzClWMuMa2njvqgbrG0YRY/AE51aL0PFCXlv0NZ9OXR7Y73X63+9kUzoYLDWBX4bw1rTfX8Q==",
"version": "0.4.1",
"resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.4.1.tgz",
"integrity": "sha512-Dny3vFup/TQMcWXIKQUl3hdQQC1/ixeUEf4uEgzvwaFK/dIaUhsBT4J7i0mD581TUbCNhXFw4uWEXle9bXdmtA==",
"dependencies": {
"semver": "^7.5.4"
}
@ -60,9 +60,9 @@
}
},
"rabbitmq-stream-js-client": {
"version": "0.3.1",
"resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.3.1.tgz",
"integrity": "sha512-x2xfH+otHquRNPzzClWMuMa2njvqgbrG0YRY/AE51aL0PFCXlv0NZ9OXR7Y73X63+9kUzoYLDWBX4bw1rTfX8Q==",
"version": "0.4.1",
"resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.4.1.tgz",
"integrity": "sha512-Dny3vFup/TQMcWXIKQUl3hdQQC1/ixeUEf4uEgzvwaFK/dIaUhsBT4J7i0mD581TUbCNhXFw4uWEXle9bXdmtA==",
"requires": {
"semver": "^7.5.4"
}

View File

@ -3,10 +3,12 @@
"version": "1.0.0",
"description": "Tutorial for the nodejs RabbitMQ stream client",
"scripts": {
"offset-tracking-publish": "node offset_tracking_send.js",
"offset-tracking-receive": "node offset_tracking_receive.js",
"send": "node send.js",
"receive": "node receive.js"
},
"dependencies": {
"rabbitmq-stream-js-client": "^0.3.1"
"rabbitmq-stream-js-client": "^0.4.1"
}
}