refactored rpc_server.js

This commit is contained in:
Giuseppe Privitera 2015-08-06 16:51:52 +01:00
parent bd9f582bf3
commit 38411ac07f

View File

@ -1,42 +1,32 @@
#!/usr/bin/env node
var amqp = require('amqplib');
var amqp = require('amqplib/callback_api');
var conn = amqp.connect('amqp://localhost');
conn.then(createChannel).then(null, console.warn);
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'rpc_queue';
function createChannel(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(consume);
}
function consume(ch) {
var ok = ch.assertQueue('rpc_queue', {durable: false});
ok = ok.then(function() {
ch.assertQueue(q, {durable: false});
ch.prefetch(1);
return ch.consume('rpc_queue', reply);
});
return ok.then(function(_ignore) {
console.log(' [x] Awaiting RPC requests');
});
ch.consume(q, function reply(msg) {
var n = parseInt(msg.content.toString());
var r = fibonacci(n);
function reply(msg) {
var n = parseInt(msg.content.toString());
console.log(' [.] fib(%d)', n);
var response = fib(n);
ch.sendToQueue( msg.properties.replyTo,
new Buffer(response.toString()),
console.log(" [.] fib(%d)", n);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(r.toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
}
}
function fib(n) {
if(n == 0)
return 0;
else if(n == 1)
return 1;
ch.ack(msg);
});
});
});
function fibonacci(n) {
if (n == 0 || n == 1)
return n;
else
return fib(n-1) + fib(n-2);
return fibonacci(n - 1) + fibonacci(n - 2);
}