Rust: Tutorial 4 - Routing

This commit is contained in:
Adel Vilkov 2019-06-04 20:41:22 +03:00
parent afc16d8b77
commit 06b8e15c5f
6 changed files with 1439 additions and 2 deletions

View File

@ -34,7 +34,6 @@ fn main() {
.and_then(|c| {
let channel = c.clone();
// generate an queue with a random name which deletes itself after use
// let queue_name = format!("amq.gen-{:?}", Uuid::new_v4());
let queue_options = QueueDeclareOptions {
durable: false,
exclusive: true,

1250
rust/04-routing/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,16 @@
[package]
name = "tutorial-04"
version = "0.1.0"
authors = ["Adel Vilkov <vilkov.adel@gmail.com>"]
edition = "2018"
[dependencies]
failure = "^0.1"
futures = "^0.1"
itertools = "^0.8.0"
lapin-futures = "^0.18"
tokio = "^0.1.8"
tokio-codec = "^0.1"
tokio-io = "^0.1"
tokio-sync = "^0.1"
tokio-timer = "^0.2"

View File

@ -0,0 +1,62 @@
use crate::lapin::channel::{BasicProperties, BasicPublishOptions, ExchangeDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use itertools::free::join;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
let args: Vec<_> = std::env::args().skip(1).collect();
let severity = match args.len() {
0 => "info".to_string(),
_ => args.first().unwrap().clone(),
};
let message = match args.len() {
x if x < 2 => "Hello, world!".to_string(),
_ => join(args[1..].iter(), " "),
};
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, _)| client.create_channel().map_err(Error::from)) // create a channel
.and_then(move |c| {
let channel = c.clone();
channel
// declare a new exchange
.exchange_declare(
"direct_logs",
"direct",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
.and_then(move |ch| {
// if successful, send a message
ch.basic_publish(
"direct_logs",
&severity,
message.as_bytes().to_vec(),
BasicPublishOptions::default(),
BasicProperties::default(),
)
.map(|_| println!("Sent a message"))
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -0,0 +1,103 @@
use crate::lapin::channel::{
BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, heartbeat)| {
// do a heartbeat on a dedicated thread to keep us connected
tokio::spawn(heartbeat.map_err(|_| ()));
// create a channel
client.create_channel().map_err(Error::from)
})
.and_then(|c| {
let channel = c.clone();
// generate an queue with a random name which deletes itself after use
let queue_options = QueueDeclareOptions {
durable: false,
exclusive: true,
auto_delete: true,
nowait: false,
passive: false,
ticket: 0u16,
};
channel
.exchange_declare(
"direct_logs",
"direct",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
// declare a queue
.and_then(move |ch| {
// declare a queue using specified options
// if name is empty it will be generated
ch.queue_declare("", queue_options, FieldTable::new())
.map(move |queue| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// bind our queue to declared exchange
let name = queue.name();
let c = ch.clone();
let args: Vec<_> = std::env::args().skip(1).collect();
let severities = match args.len() {
0 => vec!["info".to_string()],
_ => args,
};
let binds = severities.into_iter().map(move |severity| {
c.queue_bind(
&name,
"direct_logs",
&severity,
QueueBindOptions::default(),
FieldTable::new(),
)
});
futures::future::join_all(binds).map(move |_| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// create a message receiver
ch.basic_consume(
&queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::new(),
)
.map(move |s| (ch.clone(), s))
})
.and_then(move |(ch, stream)| {
// print received messages
stream.for_each(move |message| {
let severity = &message.routing_key;
let text = std::str::from_utf8(&message.data).unwrap();
println!("Received: [{:?}] {:?}", severity, text);
ch.basic_ack(message.delivery_tag, false)
})
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -46,7 +46,14 @@ cargo run --bin emit "hi" # specify a custom message
```
#### [Tutorial four: Routing](https://www.rabbitmq.com/tutorial-four-dotnet.html)
// TODO
```
cd 04-routing
```
Start receiver and emitter in separate shells:
```
cargo run --bin receive info error # specify log levels
cargo run --bin emit-direct error "help!" # specify severity and custom message
```
#### [Tutorial five: Topics](https://www.rabbitmq.com/tutorial-five-dotnet.html)
// TODO