Merge pull request #79 from Ferada/master

Add Clojure RPC tutorial code.
This commit is contained in:
Michael Klishin 2015-11-11 01:54:09 +03:00
commit 8081b8f670
3 changed files with 82 additions and 1 deletions

View File

@ -40,6 +40,7 @@ Code examples are executed via `lein run`:
[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-java.html)
TBD
lein run -m rabbitmq.tutorials.rpc-server
lein run -m rabbitmq.tutorials.rpc-client
To learn more, visit [Langohr documentation](http://clojurerabbitmq.info) site.

View File

@ -0,0 +1,45 @@
;; note: this is example code and shouldn't be run in production as-is
(ns rabbitmq.tutorials.rpc-client
(:require [langohr.core :as lc]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.basic :as lb]
[langohr.consumers :as lcons]))
(def ^{:const true} q "rpc_queue")
(defn correlation-id-equals?
[correlation-id d]
(= (.getCorrelationId (.getProperties d)) correlation-id))
(defrecord FibonacciClient [conn ch cbq consumer]
clojure.lang.IFn
(invoke [this n]
(let [correlation-id (str (java.util.UUID/randomUUID))]
(lb/publish ch "" q (str n) {:reply-to cbq
:correlation-id correlation-id})
(lb/consume ch cbq consumer)
(-> (first (filter (partial correlation-id-equals? correlation-id)
(lcons/deliveries-seq consumer)))
.getBody
(String. "UTF-8")
(read-string))))
java.io.Closeable
(close [this]
(.close conn)))
(defn make-fibonacci-rpc-client
[]
(let [conn (lc/connect)
ch (lch/open conn)
cbq (lq/declare ch "" {:auto-delete false :exclusive true})
consumer (lcons/create-queueing ch {})]
(->FibonacciClient conn ch (:queue cbq) consumer)))
(defn -main
[& args]
(with-open [fibonacci-rpc (make-fibonacci-rpc-client)]
(println " [x] Requesting fib(30)")
(let [response (fibonacci-rpc 30)]
(println (format " [.] Got %s" response)))))

View File

@ -0,0 +1,35 @@
(ns rabbitmq.tutorials.rpc-server
(:require [langohr.core :as lc]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.basic :as lb]
[langohr.consumers :as lcons]))
(def ^{:const true} q "rpc_queue")
(defn fib
[n]
(if (zero? n)
0
(if (= n 1)
1
(+ (fib (- n 1))
(fib (- n 2))))))
(defn handle-delivery
"Handles message delivery"
[ch {:keys [delivery-tag reply-to correlation-id]} payload]
(let [n (read-string (String. payload "UTF-8"))]
(println (format " [.] fib(%s)" n))
(let [response (fib n)]
(lb/publish ch "" reply-to (str response) {:correlation-id correlation-id})
(lb/ack ch delivery-tag))))
(defn -main
[& args]
(with-open [conn (lc/connect)]
(let [ch (lch/open conn)]
(lq/declare ch q {:auto-delete false})
(lb/qos ch 1)
(println " [x] Awaiting RPC requests")
(lcons/blocking-subscribe ch "rpc_queue" handle-delivery))))