From 0d25d323d0cde38506eb58752f7a691f818519a0 Mon Sep 17 00:00:00 2001 From: magne Date: Thu, 8 Aug 2024 11:33:43 +0200 Subject: [PATCH] add nodejs-stream offset-tracking tutorials --- .../offset_tracking_receive.js | 61 +++++++++++++++++++ .../offset_tracking_send.js | 36 +++++++++++ javascript-nodejs-stream/package-lock.json | 14 ++--- javascript-nodejs-stream/package.json | 4 +- 4 files changed, 107 insertions(+), 8 deletions(-) create mode 100644 javascript-nodejs-stream/offset_tracking_receive.js create mode 100644 javascript-nodejs-stream/offset_tracking_send.js diff --git a/javascript-nodejs-stream/offset_tracking_receive.js b/javascript-nodejs-stream/offset_tracking_receive.js new file mode 100644 index 0000000..0985418 --- /dev/null +++ b/javascript-nodejs-stream/offset_tracking_receive.js @@ -0,0 +1,61 @@ +const rabbit = require("rabbitmq-stream-js-client"); + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +async function main() { + const streamName = "stream-offset-tracking-javascript"; + + console.log("Connecting..."); + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: "guest", + password: "guest", + vhost: "/", + }); + + const consumerRef = "offset-tracking-tutorial"; + let firstOffset = undefined; + let offsetSpecification = rabbit.Offset.first(); + try { + const offset = await client.queryOffset({ reference: consumerRef, stream: streamName }); + firstOffset = offset + 1n; + offsetSpecification = rabbit.Offset.offset(firstOffset); + } catch (e) {} + + let lastOffset = offsetSpecification.value; + let messageCount = 0; + const consumer = await client.declareConsumer( + { stream: streamName, offset: offsetSpecification, consumerRef }, + async (message) => { + messageCount++; + if (!firstOffset && messageCount === 1) { + firstOffset = message.offset; + console.log("First message received"); + } + if (messageCount % 10 === 0) { + console.log("Storing offset"); + await consumer.storeOffset(message.offset); + } + if (message.content.toString() === "marker") { + console.log("Marker found"); + lastOffset = message.offset; + await consumer.storeOffset(message.offset); + console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`); + await consumer.close(true); + } + } + ); + + console.log(`Start consuming...`); + await sleep(2000); +} + +main() + .then(async () => { + await new Promise(function () {}); + }) + .catch((res) => { + console.log("Error while receiving message!", res); + process.exit(-1); + }); diff --git a/javascript-nodejs-stream/offset_tracking_send.js b/javascript-nodejs-stream/offset_tracking_send.js new file mode 100644 index 0000000..5211387 --- /dev/null +++ b/javascript-nodejs-stream/offset_tracking_send.js @@ -0,0 +1,36 @@ +const rabbit = require("rabbitmq-stream-js-client"); + +async function main() { + console.log("Connecting..."); + const client = await rabbit.connect({ + vhost: "/", + port: 5552, + hostname: "localhost", + username: "guest", + password: "guest", + }); + + console.log("Making sure the stream exists..."); + const streamName = "stream-offset-tracking-javascript"; + await client.createStream({ stream: streamName, arguments: {} }); + + console.log("Creating the publisher..."); + const publisher = await client.declarePublisher({ stream: streamName }); + + const messageCount = 100; + console.log(`Publishing ${messageCount} messages`); + for (let i = 0; i < messageCount; i++) { + const body = i === messageCount - 1 ? "marker" : `hello ${i}`; + await publisher.send(Buffer.from(body)); + } + + console.log("Closing the connection..."); + await client.close(); +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("Error in publishing message!", res); + process.exit(-1); + }); diff --git a/javascript-nodejs-stream/package-lock.json b/javascript-nodejs-stream/package-lock.json index 30d2569..96b1091 100644 --- a/javascript-nodejs-stream/package-lock.json +++ b/javascript-nodejs-stream/package-lock.json @@ -8,7 +8,7 @@ "name": "rabbitmq-stream-node-tutorial", "version": "1.0.0", "dependencies": { - "rabbitmq-stream-js-client": "^0.3.1" + "rabbitmq-stream-js-client": "^0.4.1" } }, "node_modules/lru-cache": { @@ -23,9 +23,9 @@ } }, "node_modules/rabbitmq-stream-js-client": { - "version": "0.3.1", - "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.3.1.tgz", - "integrity": "sha512-x2xfH+otHquRNPzzClWMuMa2njvqgbrG0YRY/AE51aL0PFCXlv0NZ9OXR7Y73X63+9kUzoYLDWBX4bw1rTfX8Q==", + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.4.1.tgz", + "integrity": "sha512-Dny3vFup/TQMcWXIKQUl3hdQQC1/ixeUEf4uEgzvwaFK/dIaUhsBT4J7i0mD581TUbCNhXFw4uWEXle9bXdmtA==", "dependencies": { "semver": "^7.5.4" } @@ -60,9 +60,9 @@ } }, "rabbitmq-stream-js-client": { - "version": "0.3.1", - "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.3.1.tgz", - "integrity": "sha512-x2xfH+otHquRNPzzClWMuMa2njvqgbrG0YRY/AE51aL0PFCXlv0NZ9OXR7Y73X63+9kUzoYLDWBX4bw1rTfX8Q==", + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.4.1.tgz", + "integrity": "sha512-Dny3vFup/TQMcWXIKQUl3hdQQC1/ixeUEf4uEgzvwaFK/dIaUhsBT4J7i0mD581TUbCNhXFw4uWEXle9bXdmtA==", "requires": { "semver": "^7.5.4" } diff --git a/javascript-nodejs-stream/package.json b/javascript-nodejs-stream/package.json index d7fcb4c..3d1a354 100644 --- a/javascript-nodejs-stream/package.json +++ b/javascript-nodejs-stream/package.json @@ -3,10 +3,12 @@ "version": "1.0.0", "description": "Tutorial for the nodejs RabbitMQ stream client", "scripts": { + "offset-tracking-publish": "node offset_tracking_send.js", + "offset-tracking-receive": "node offset_tracking_receive.js", "send": "node send.js", "receive": "node receive.js" }, "dependencies": { - "rabbitmq-stream-js-client": "^0.3.1" + "rabbitmq-stream-js-client": "^0.4.1" } }