channel(); $channel->queue_declare('rpc_queue', false, false, false, false); function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requests\n"; $callback = function($req) { $n = intval($req->body); echo " [.] fib(", $n, ")\n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->properties['correlation_id']) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->properties['reply_to']); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>