diff --git a/ruby/rpc_client.rb b/ruby/rpc_client.rb new file mode 100755 index 0000000..91d8c24 --- /dev/null +++ b/ruby/rpc_client.rb @@ -0,0 +1,66 @@ +#!/usr/bin/env ruby +# encoding: utf-8 + +# Note: This is just proof of concept. For +# real-world usage, you are strongly advised +# to use https://github.com/ruby-amqp/rpc +# or some other RPC library. + +require "amqp" + +class FibonacciRpcClient + def initialize + subscribe_to_callback_queue + end + + def connection + @connection ||= AMQP.connect(:host => "localhost") + end + + def channel + @channel ||= AMQP::Channel.new(self.connection) + end + + def callback_queue + @callback_queue ||= self.channel.queue("", :exclusive => true) + end + + def requests + @requests ||= Hash.new + end + + def call(n, &block) + corr_id = rand(10_000_000).to_s + self.requests[corr_id] = nil + self.callback_queue.append_callback(:declare) do + AMQP::Exchange.default.publish(n.to_s, :routing_key => "rpc_queue", :reply_to => self.callback_queue.name, :correlation_id => corr_id) + + EM.add_periodic_timer(0.1) do + # p self.requests + if result = self.requests[corr_id] + block.call(result.to_i) + EM.stop + end + end + end + end + + private + def subscribe_to_callback_queue + self.callback_queue.subscribe do |header, body| + corr_id = header.correlation_id + unless self.requests[corr_id] + self.requests[corr_id] = body + end + end + end +end + +EM.run do + fibonacci_rpc = FibonacciRpcClient.new() + + puts " [x] Requesting fib(30)" + fibonacci_rpc.call(30) do |response| + puts " [.] Got #{response}" + end +end diff --git a/ruby/rpc_server.rb b/ruby/rpc_server.rb new file mode 100755 index 0000000..c7f6837 --- /dev/null +++ b/ruby/rpc_server.rb @@ -0,0 +1,28 @@ +#!/usr/bin/env ruby +# encoding: utf-8 + +require "amqp" + +def fib(n) + return n if n == 0 || n == 1 + return fib(n - 1) + fib(n - 2) +end + +AMQP.start(:host => "localhost") do |connection| + channel = AMQP::Channel.new(connection) + queue = channel.queue("rpc_queue") + + channel.prefetch(1) + + queue.subscribe(:ack => true) do |header, body| + n = body.to_i + + puts " [.] fib(#{n})" + response = fib(n) + + AMQP::Exchange.default.publish(response.to_s, :routing_key => header.reply_to, :correlation_id => header.correlation_id) + header.ack + end + + puts " [x] Awaiting RPC requests" +end