Update ruby tutorial six
This commit is contained in:
parent
d241706976
commit
1f9dbc5a1b
@ -1,66 +1,70 @@
|
||||
#!/usr/bin/env ruby
|
||||
# encoding: utf-8
|
||||
|
||||
require "bunny"
|
||||
require "thread"
|
||||
|
||||
conn = Bunny.new(:automatically_recover => false)
|
||||
conn.start
|
||||
|
||||
ch = conn.create_channel
|
||||
|
||||
require 'bunny'
|
||||
require 'thread'
|
||||
|
||||
class FibonacciClient
|
||||
attr_reader :reply_queue
|
||||
attr_accessor :response, :call_id
|
||||
attr_reader :lock, :condition
|
||||
attr_accessor :call_id, :response, :lock, :condition, :connection,
|
||||
:channel, :server_queue_name, :reply_queue, :exchange
|
||||
|
||||
def initialize(ch, server_queue)
|
||||
@ch = ch
|
||||
@x = ch.default_exchange
|
||||
def initialize(server_queue_name)
|
||||
@connection = Bunny.new(automatically_recover: false)
|
||||
@connection.start
|
||||
|
||||
@server_queue = server_queue
|
||||
@reply_queue = ch.queue("", :exclusive => true)
|
||||
@channel = connection.create_channel
|
||||
@exchange = channel.default_exchange
|
||||
@server_queue_name = server_queue_name
|
||||
|
||||
setup_reply_queue
|
||||
end
|
||||
|
||||
@lock = Mutex.new
|
||||
def call(n)
|
||||
@call_id = generate_uuid
|
||||
|
||||
exchange.publish(n.to_s,
|
||||
routing_key: server_queue_name,
|
||||
correlation_id: call_id,
|
||||
reply_to: reply_queue.name)
|
||||
|
||||
# wait for the signal to continue the execution
|
||||
lock.synchronize { condition.wait(lock) }
|
||||
|
||||
response
|
||||
end
|
||||
|
||||
def stop
|
||||
channel.close
|
||||
connection.close
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def setup_reply_queue
|
||||
@lock = Mutex.new
|
||||
@condition = ConditionVariable.new
|
||||
that = self
|
||||
that = self
|
||||
@reply_queue = channel.queue('', exclusive: true)
|
||||
|
||||
@reply_queue.subscribe do |delivery_info, properties, payload|
|
||||
reply_queue.subscribe do |_delivery_info, properties, payload|
|
||||
if properties[:correlation_id] == that.call_id
|
||||
that.response = payload.to_i
|
||||
that.lock.synchronize{that.condition.signal}
|
||||
|
||||
# sends the signal to continue the execution of #call
|
||||
that.lock.synchronize { that.condition.signal }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def call(n)
|
||||
self.call_id = self.generate_uuid
|
||||
|
||||
@x.publish(n.to_s,
|
||||
:routing_key => @server_queue,
|
||||
:correlation_id => call_id,
|
||||
:reply_to => @reply_queue.name)
|
||||
|
||||
lock.synchronize{condition.wait(lock)}
|
||||
response
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def generate_uuid
|
||||
# very naive but good enough for code
|
||||
# examples
|
||||
# very naive but good enough for code examples
|
||||
"#{rand}#{rand}#{rand}"
|
||||
end
|
||||
end
|
||||
|
||||
client = FibonacciClient.new('rpc_queue')
|
||||
|
||||
client = FibonacciClient.new(ch, "rpc_queue")
|
||||
puts " [x] Requesting fib(30)"
|
||||
puts ' [x] Requesting fib(30)'
|
||||
response = client.call(30)
|
||||
|
||||
puts " [.] Got #{response}"
|
||||
|
||||
ch.close
|
||||
conn.close
|
||||
client.stop
|
||||
|
@ -1,51 +1,52 @@
|
||||
#!/usr/bin/env ruby
|
||||
# encoding: utf-8
|
||||
|
||||
require "bunny"
|
||||
|
||||
conn = Bunny.new(:automatically_recover => false)
|
||||
conn.start
|
||||
|
||||
ch = conn.create_channel
|
||||
require 'bunny'
|
||||
|
||||
class FibonacciServer
|
||||
|
||||
def initialize(ch)
|
||||
@ch = ch
|
||||
def initialize
|
||||
@connection = Bunny.new
|
||||
@connection.start
|
||||
@channel = @connection.create_channel
|
||||
end
|
||||
|
||||
def start(queue_name)
|
||||
@q = @ch.queue(queue_name)
|
||||
@x = @ch.default_exchange
|
||||
@queue = channel.queue(queue_name)
|
||||
@exchange = channel.default_exchange
|
||||
subscribe_to_queue
|
||||
end
|
||||
|
||||
@q.subscribe(:block => true) do |delivery_info, properties, payload|
|
||||
n = payload.to_i
|
||||
r = self.class.fib(n)
|
||||
def stop
|
||||
channel.close
|
||||
connection.close
|
||||
end
|
||||
|
||||
puts " [.] fib(#{n})"
|
||||
private
|
||||
|
||||
@x.publish(r.to_s, :routing_key => properties.reply_to, :correlation_id => properties.correlation_id)
|
||||
attr_reader :channel, :exchange, :queue, :connection
|
||||
|
||||
def subscribe_to_queue
|
||||
queue.subscribe(block: true) do |_delivery_info, properties, payload|
|
||||
result = fibonacci(payload.to_i)
|
||||
|
||||
exchange.publish(
|
||||
result.to_s,
|
||||
routing_key: properties.reply_to,
|
||||
correlation_id: properties.correlation_id
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def fibonacci(value)
|
||||
return value if value.zero? || value == 1
|
||||
|
||||
def self.fib(n)
|
||||
case n
|
||||
when 0 then 0
|
||||
when 1 then 1
|
||||
else
|
||||
fib(n - 1) + fib(n - 2)
|
||||
end
|
||||
fibonacci(value - 1) + fibonacci(value - 2)
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
server = FibonacciServer.new(ch)
|
||||
puts " [x] Awaiting RPC requests"
|
||||
server.start("rpc_queue")
|
||||
rescue Interrupt => _
|
||||
ch.close
|
||||
conn.close
|
||||
server = FibonacciServer.new
|
||||
|
||||
exit(0)
|
||||
puts ' [x] Awaiting RPC requests'
|
||||
server.start('rpc_queue')
|
||||
rescue Interrupt => _
|
||||
server.stop
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user