diff --git a/clojure/src/rabbitmq/tutorials/new_task.clj b/clojure/src/rabbitmq/tutorials/new_task.clj new file mode 100644 index 0000000..eb713d5 --- /dev/null +++ b/clojure/src/rabbitmq/tutorials/new_task.clj @@ -0,0 +1,18 @@ +(ns rabbitmq.tutorials.new-task + (:require [langohr.core :as lc] + [langohr.channel :as lch] + [langohr.queue :as lq] + [langohr.basic :as lb] + [clojure.string :as s])) + + +(defn -main + [& args] + (with-open [conn (lc/connect)] + (let [ch (lch/open conn) + payload (if (empty? args) + "Hello, world!" + (s/join " " args))] + (lq/declare ch "task_queue" :durable true :auto-delete false) + (lb/publish ch "" "task_queue" payload :persistent true) + (println (format " [x] Sent %s" payload))))) diff --git a/clojure/src/rabbitmq/tutorials/worker.clj b/clojure/src/rabbitmq/tutorials/worker.clj new file mode 100644 index 0000000..c1bd84d --- /dev/null +++ b/clojure/src/rabbitmq/tutorials/worker.clj @@ -0,0 +1,34 @@ +(ns rabbitmq.tutorials.worker + (:require [langohr.core :as lc] + [langohr.channel :as lch] + [langohr.queue :as lq] + [langohr.basic :as lb] + [langohr.consumers :as lcons] + [clojure.string :as s])) + +(def ^{:const true} q "task_queue") + +(defn ^:private occurences-of + [^String s ^Character c] + (let [chars (map identity s)] + (count (filter (fn [x] (= x c)) chars)))) + +(defn handle-delivery + "Handles message delivery" + [ch {:keys [delivery-tag]} payload] + (let [s (String. payload "UTF-8") + n (occurences-of s \.)] + (println (format " [x] Received %s" s)) + (Thread/sleep ^double (* 1000 n)) + (println " [x] Done") + (lb/ack ch delivery-tag))) + + +(defn -main + [& args] + (with-open [conn (lc/connect)] + (let [ch (lch/open conn)] + (lq/declare ch q :durable true :auto-delete false) + (lb/qos ch 1) + (println " [*] Waiting for messages. To exit press CTRL+C") + (lcons/blocking-subscribe ch q handle-delivery))))