From 1f5fd76014afa55e7a28852f6aef36f07b6c20a6 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Wed, 6 Jul 2011 13:51:53 +0100 Subject: [PATCH] Updated puka tutorial samples. --- python-puka/emit_log.py | 18 ++++++++++++++++++ python-puka/new_task.py | 19 +++++++++++-------- python-puka/receive.py | 16 ++++++++-------- python-puka/receive_logs.py | 24 ++++++++++++++++++++++++ python-puka/send.py | 17 +++++++++-------- python-puka/worker.py | 17 +++++++---------- 6 files changed, 77 insertions(+), 34 deletions(-) create mode 100755 python-puka/emit_log.py create mode 100755 python-puka/receive_logs.py diff --git a/python-puka/emit_log.py b/python-puka/emit_log.py new file mode 100755 index 0000000..7fe34d5 --- /dev/null +++ b/python-puka/emit_log.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +import puka +import sys + +client = puka.Client("amqp://localhost/") +promise = client.connect() +client.wait(promise) + + +promise = client.exchange_declare(exchange='logs', type='fanout') +client.wait(promise) + +message = ' '.join(sys.argv[1:]) or "info: Hello World!" +promise = client.basic_publish(exchange='logs', routing_key='', body=message) +client.wait(promise) + +print " [x] Sent %r" % (message,) +client.close() diff --git a/python-puka/new_task.py b/python-puka/new_task.py index 783e13d..0d294e7 100755 --- a/python-puka/new_task.py +++ b/python-puka/new_task.py @@ -3,16 +3,19 @@ import puka import sys client = puka.Client("amqp://localhost/") -ticket = client.connect() -client.wait(ticket) +promise = client.connect() +client.wait(promise) -ticket = client.queue_declare(queue='task_queue') -client.wait(ticket) + +promise = client.queue_declare(queue='task_queue', durable=True) +client.wait(promise) message = ' '.join(sys.argv[1:]) or "Hello World!" -ticket = client.basic_publish(exchange='', - routing_key='task_queue', - body=message) -client.wait(ticket) +promise = client.basic_publish(exchange='', + routing_key='task_queue', + body=message, + headers={'delivery_mode': 2}) +client.wait(promise) print " [x] Sent %r" % (message,) +client.close() diff --git a/python-puka/receive.py b/python-puka/receive.py index 41ac2ff..30e639b 100755 --- a/python-puka/receive.py +++ b/python-puka/receive.py @@ -2,17 +2,17 @@ import puka client = puka.Client("amqp://localhost/") -ticket = client.connect() -client.wait(ticket) +promise = client.connect() +client.wait(promise) -ticket = client.queue_declare(queue='hello') -client.wait(ticket) +promise = client.queue_declare(queue='hello') +client.wait(promise) + print ' [*] Waiting for messages. To exit press CTRL+C' -consume_ticket = client.basic_consume(queue='hello', - no_ack=True) +consume_promise = client.basic_consume(queue='hello', no_ack=True) while True: - msg_result = client.wait(consume_ticket) - print " [x] Received %r %r" % (msg_result['body'], msg_result,) + msg_result = client.wait(consume_promise) + print " [x] Received %r" % (msg_result['body'],) diff --git a/python-puka/receive_logs.py b/python-puka/receive_logs.py new file mode 100755 index 0000000..613ea53 --- /dev/null +++ b/python-puka/receive_logs.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +import puka + +client = puka.Client("amqp://localhost/") +promise = client.connect() +client.wait(promise) + + +promise = client.exchange_declare(exchange='logs', type='fanout') +client.wait(promise) + +promise = client.queue_declare(exclusive=True) +queue_name = client.wait(promise)['queue'] + +promise = client.queue_bind(exchange='logs', queue=queue_name) +client.wait(promise) + + +print ' [*] Waiting for logs. To exit press CTRL+C' + +consume_promise = client.basic_consume(queue=queue_name, no_ack=True) +while True: + msg_result = client.wait(consume_promise) + print " [x] %r" % (msg_result['body'],) diff --git a/python-puka/send.py b/python-puka/send.py index e9387c0..e01d8fe 100755 --- a/python-puka/send.py +++ b/python-puka/send.py @@ -2,16 +2,17 @@ import puka client = puka.Client("amqp://localhost/") -ticket = client.connect() -client.wait(ticket) +promise = client.connect() +client.wait(promise) -ticket = client.queue_declare(queue='hello') -client.wait(ticket) -ticket = client.basic_publish(exchange='', - routing_key='hello', - body="Hello world!") -client.wait(ticket) +promise = client.queue_declare(queue='hello') +client.wait(promise) +promise = client.basic_publish(exchange='', + routing_key='hello', + body="Hello World!") +client.wait(promise) print " [x] Sent 'Hello World!'" +client.close() diff --git a/python-puka/worker.py b/python-puka/worker.py index e44c70f..e8d1b10 100755 --- a/python-puka/worker.py +++ b/python-puka/worker.py @@ -3,21 +3,18 @@ import puka import time client = puka.Client("amqp://localhost/") -ticket = client.connect() -client.wait(ticket) +promise = client.connect() +client.wait(promise) -ticket = client.queue_declare(queue='task_queue') -client.wait(ticket) +promise = client.queue_declare(queue='task_queue', durable=True) +client.wait(promise) print ' [*] Waiting for messages. To exit press CTRL+C' -consume_ticket = client.basic_consume(queue='task_queue', - prefetch_count=1) +consume_promise = client.basic_consume(queue='task_queue', prefetch_count=1) while True: - msg_result = client.wait(consume_ticket) - body = msg_result['body'] - print " [x] Received %r" % (body,) + msg_result = client.wait(consume_promise) + print " [x] Received %r" % (msg_result['body'],) time.sleep( body.count('.') ) print " [x] Done" client.basic_ack(msg_result) -