Merge pull request #374 from rabbitmq/rabbitmq-tutorials-373

Consistently format python tutorial files
This commit is contained in:
Luke Bakken 2024-01-10 13:34:50 -08:00 committed by GitHub
commit 14a28058d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 127 additions and 78 deletions

View File

@ -4,12 +4,14 @@ import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.exchange_declare(exchange="logs", exchange_type="fanout")
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
message = " ".join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange="logs", routing_key="", body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -4,14 +4,19 @@ import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
severity = sys.argv[1] if len(sys.argv) > 2 else "info"
message = " ".join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
exchange="direct_logs",
routing_key=severity,
body=message,
)
print(f" [x] Sent {severity}:{message}")
connection.close()

View File

@ -4,14 +4,19 @@ import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.exchange_declare(exchange="topic_logs", exchange_type="topic")
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
routing_key = sys.argv[1] if len(sys.argv) > 2 else "anonymous.info"
message = " ".join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
exchange="topic_logs",
routing_key=routing_key,
body=message,
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

View File

@ -4,18 +4,21 @@ import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.queue_declare(queue="task_queue", durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
message = " ".join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
exchange="",
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
))
),
)
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,29 +1,36 @@
#!/usr/bin/env python
import os
import pika
import sys
import pika
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.queue_declare(queue="hello")
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channel.basic_consume(
queue="hello",
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for messages. To exit press CTRL+C')
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == '__main__':
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print('Interrupted')
print("Interrupted")
try:
sys.exit(0)
except SystemExit:

View File

@ -1,36 +1,41 @@
#!/usr/bin/env python
import os
import pika
import sys
import pika
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.exchange_declare(exchange="logs", exchange_type="fanout")
result = channel.queue_declare(queue='', exclusive=True)
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.queue_bind(exchange="logs", queue=queue_name)
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
print(' [*] Waiting for logs. To exit press CTRL+C')
print(" [*] Waiting for logs. To exit press CTRL+C")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
channel.start_consuming()
if __name__ == '__main__':
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print('Interrupted')
print("Interrupted")
try:
sys.exit(0)
except SystemExit:

View File

@ -1,17 +1,19 @@
#!/usr/bin/env python
import os
import pika
import sys
import pika
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
result = channel.queue_declare(queue='', exclusive=True)
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
@ -21,24 +23,30 @@ def main():
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
exchange="direct_logs",
queue=queue_name,
routing_key=severity,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
print(" [*] Waiting for logs. To exit press CTRL+C")
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
channel.start_consuming()
if __name__ == '__main__':
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print('Interrupted')
print("Interrupted")
try:
sys.exit(0)
except SystemExit:

View File

@ -1,17 +1,19 @@
#!/usr/bin/env python
import os
import pika
import sys
import pika
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.exchange_declare(exchange="topic_logs", exchange_type="topic")
result = channel.queue_declare(queue='', exclusive=True)
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
@ -21,24 +23,30 @@ def main():
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
exchange="topic_logs",
queue=queue_name,
routing_key=binding_key,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
print(" [*] Waiting for logs. To exit press CTRL+C")
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
channel.start_consuming()
if __name__ == '__main__':
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print('Interrupted')
print("Interrupted")
try:
sys.exit(0)
except SystemExit:

View File

@ -5,20 +5,21 @@ import pika
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
result = self.channel.queue_declare(queue="", exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
auto_ack=True,
)
self.response = None
self.corr_id = None
@ -31,13 +32,14 @@ class FibonacciRpcClient(object):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
exchange="",
routing_key="rpc_queue",
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
body=str(n),
)
self.connection.process_data_events(time_limit=None)
return int(self.response)

View File

@ -2,11 +2,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
channel.queue_declare(queue="rpc_queue")
def fib(n):
@ -24,16 +24,17 @@ def on_request(ch, method, props, body):
print(f" [.] fib({n})")
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body=str(response))
ch.basic_publish(
exchange="",
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response),
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
channel.basic_consume(queue="rpc_queue", on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()

View File

@ -2,11 +2,13 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.queue_declare(queue="hello")
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.basic_publish(exchange="", routing_key="hello", body="Hello World!")
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -4,21 +4,22 @@ import time
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
pika.ConnectionParameters(host="localhost"),
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.queue_declare(queue="task_queue", durable=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
time.sleep(body.count(b"."))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.basic_consume(queue="task_queue", on_message_callback=callback)
channel.start_consuming()