connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' ); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false ); $this->channel->basic_consume( $this->callback_queue, '', false, true, false, false, array( $this, 'onResponse' ) ); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->getBody(); } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array( 'correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue ) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while (!$this->response) { $this->channel->wait(); } return intval($this->response); } } $fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo ' [.] Got ', $response, "\n"; ?>