Use condition variable to be notified of RPC responses
Earlier version suffered from a race condition under load that caused Bunny continuation to never unblock.
This commit is contained in:
parent
c2c2e9fa8f
commit
a864a54c38
@ -2,14 +2,18 @@
|
|||||||
# encoding: utf-8
|
# encoding: utf-8
|
||||||
|
|
||||||
require "bunny"
|
require "bunny"
|
||||||
|
require "thread"
|
||||||
|
|
||||||
conn = Bunny.new(:automatically_recover => false)
|
conn = Bunny.new(:automatically_recover => false)
|
||||||
conn.start
|
conn.start
|
||||||
|
|
||||||
ch = conn.create_channel
|
ch = conn.create_channel
|
||||||
|
|
||||||
|
|
||||||
class FibonacciClient
|
class FibonacciClient
|
||||||
attr_reader :reply_queue
|
attr_reader :reply_queue
|
||||||
|
attr_accessor :response, :call_id
|
||||||
|
attr_reader :lock, :condition
|
||||||
|
|
||||||
def initialize(ch, server_queue)
|
def initialize(ch, server_queue)
|
||||||
@ch = ch
|
@ch = ch
|
||||||
@ -17,25 +21,29 @@ class FibonacciClient
|
|||||||
|
|
||||||
@server_queue = server_queue
|
@server_queue = server_queue
|
||||||
@reply_queue = ch.queue("", :exclusive => true)
|
@reply_queue = ch.queue("", :exclusive => true)
|
||||||
|
|
||||||
|
|
||||||
|
@lock = Mutex.new
|
||||||
|
@condition = ConditionVariable.new
|
||||||
|
that = self
|
||||||
|
|
||||||
|
@reply_queue.subscribe do |delivery_info, properties, payload|
|
||||||
|
if properties[:correlation_id] == that.call_id
|
||||||
|
that.response = payload.to_i
|
||||||
|
that.lock.synchronize{that.condition.signal}
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def call(n)
|
def call(n)
|
||||||
correlation_id = self.generate_uuid
|
self.call_id = self.generate_uuid
|
||||||
|
|
||||||
@x.publish(n.to_s,
|
@x.publish(n.to_s,
|
||||||
:routing_key => @server_queue,
|
:routing_key => @server_queue,
|
||||||
:correlation_id => correlation_id,
|
:correlation_id => call_id,
|
||||||
:reply_to => @reply_queue.name)
|
:reply_to => @reply_queue.name)
|
||||||
|
|
||||||
response = nil
|
lock.synchronize{condition.wait(lock)}
|
||||||
@reply_queue.subscribe(:block => true) do |delivery_info, properties, payload|
|
|
||||||
if properties[:correlation_id] == correlation_id
|
|
||||||
response = payload.to_i
|
|
||||||
|
|
||||||
delivery_info.consumer.cancel
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
response
|
response
|
||||||
end
|
end
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user