Merge pull request #278 from mmalek/master

Rust: add tutorial 6; use async; consistent file layout
This commit is contained in:
Michael Klishin 2020-11-20 15:39:24 +03:00 committed by GitHub
commit e8ad99adc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1806 additions and 7137 deletions

79
rust/.gitignore vendored
View File

@ -1,78 +1 @@
# Override the root-level .gitignore
!*
# Build output
**/build/**
**/target/**
# Clib dependencies
**/deps/**
# VS Code
**/.vscode/**
# Test binaries
*.test
# Test coverage information files
*.gcda
*.gcda.info
*.gcno
# Debugger outputs
*.bin
*.cap
*.trace
# Output screenshots
/*.bmp
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
*.log
*.swp
!bin/

File diff suppressed because it is too large Load Diff

View File

@ -1,15 +0,0 @@
[package]
name = "tutorial-01"
version = "0.1.0"
authors = ["Adel Vilkov <vilkov.adel@gmail.com>"]
edition = "2018"
[dependencies]
failure = "^0.1"
futures = "^0.1"
lapin-futures = "^0.18"
tokio = "^0.1.8"
tokio-codec = "^0.1"
tokio-io = "^0.1"
tokio-sync = "^0.1"
tokio-timer = "^0.2"

View File

@ -1,60 +0,0 @@
use crate::lapin::channel::{BasicConsumeOptions, QueueDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, heartbeat)| {
// do a heartbeat on a dedicated thread to keep us connected
tokio::spawn(heartbeat.map_err(|_| ()));
// create a channel
client.create_channel().map_err(Error::from)
})
.and_then(|channel| {
let ch = channel.clone();
channel
// declare a queue
.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::new())
.and_then(move |queue| {
// create a message receiver
channel.basic_consume(
&queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::new(),
)
})
.and_then(|stream| {
// print received messages
stream.for_each(move |message| {
println!(
"Received: {:?}",
std::str::from_utf8(&message.data).unwrap()
);
ch.basic_ack(message.delivery_tag, false)
})
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -1,46 +0,0 @@
use crate::lapin::channel::{BasicProperties, BasicPublishOptions, QueueDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, _)| client.create_channel().map_err(Error::from)) // create a channel
.and_then(|channel| {
channel
// declare a new queue
.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::new())
.and_then(move |_| {
// if successful, send a message
channel
.basic_publish(
"",
"hello",
b"Hello World!".to_vec(),
BasicPublishOptions::default(),
BasicProperties::default(),
)
.map(|_| println!("Sent a message"))
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +0,0 @@
[package]
name = "tutorial-02"
version = "0.1.0"
authors = ["Adel Vilkov <vilkov.adel@gmail.com>"]
edition = "2018"
[dependencies]
failure = "^0.1"
futures = "^0.1"
itertools = "^0.8.0"
lapin-futures = "^0.18"
tokio = "^0.1.8"
tokio-codec = "^0.1"
tokio-io = "^0.1"
tokio-sync = "^0.1"
tokio-timer = "^0.2"

View File

@ -1,52 +0,0 @@
use crate::lapin::channel::{BasicProperties, BasicPublishOptions, QueueDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use itertools::free::join;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
let args: Vec<_> = std::env::args().skip(1).collect();
let message = match args.len() {
0 => "hello".to_string(),
_ => join(args, " "),
};
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, _)| client.create_channel().map_err(Error::from)) // create a channel
.and_then(|channel| {
channel
// declare a new queue
.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::new())
.and_then(move |_| {
// if successful, send a message
channel
.basic_publish(
"",
"hello",
message.as_bytes().to_vec(),
BasicPublishOptions::default(),
BasicProperties::default(),
)
.map(|_| println!("Sent a message"))
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -1,63 +0,0 @@
use crate::lapin::channel::{BasicConsumeOptions, QueueDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use std::thread;
use std::time::Duration;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, heartbeat)| {
// do a heartbeat on a dedicated thread to keep us connected
tokio::spawn(heartbeat.map_err(|_| ()));
// create a channel
client.create_channel().map_err(Error::from)
})
.and_then(|channel| {
let ch = channel.clone();
channel
// declare a queue
.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::new())
.and_then(move |queue| {
// create a message receiver
channel.basic_consume(
&queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::new(),
)
})
.and_then(|stream| {
// print received messages
stream.for_each(move |message| {
let text = std::str::from_utf8(&message.data).unwrap();
println!("Received: {:?}", text);
// Imitate a second of work per one symbol in message
thread::sleep(Duration::from_secs(text.len() as u64));
println!("Done");
ch.basic_ack(message.delivery_tag, false)
})
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +0,0 @@
[package]
name = "tutorial-03"
version = "0.1.0"
authors = ["Adel Vilkov <vilkov.adel@gmail.com>"]
edition = "2018"
[dependencies]
failure = "^0.1"
futures = "^0.1"
itertools = "^0.8.0"
lapin-futures = "^0.18"
tokio = "^0.1.8"
tokio-codec = "^0.1"
tokio-io = "^0.1"
tokio-sync = "^0.1"
tokio-timer = "^0.2"

View File

@ -1,58 +0,0 @@
use crate::lapin::channel::{BasicProperties, BasicPublishOptions, ExchangeDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use itertools::free::join;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
let args: Vec<_> = std::env::args().skip(1).collect();
let message = match args.len() {
0 => "hello".to_string(),
_ => join(args, " "),
};
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, _)| client.create_channel().map_err(Error::from)) // create a channel
.and_then(move |c| {
let channel = c.clone();
channel
// declare a new exchange
.exchange_declare(
"logs",
"fanout",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
.and_then(move |ch| {
// if successful, send a message
ch.basic_publish(
"logs",
"",
message.as_bytes().to_vec(),
BasicPublishOptions::default(),
BasicProperties::default(),
)
.map(|_| println!("Sent a message"))
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -1,94 +0,0 @@
use crate::lapin::channel::{
BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, heartbeat)| {
// do a heartbeat on a dedicated thread to keep us connected
tokio::spawn(heartbeat.map_err(|_| ()));
// create a channel
client.create_channel().map_err(Error::from)
})
.and_then(|c| {
let channel = c.clone();
// generate an queue with a random name which deletes itself after use
let queue_options = QueueDeclareOptions {
durable: false,
exclusive: true,
auto_delete: true,
nowait: false,
passive: false,
ticket: 0u16,
};
channel
.exchange_declare(
"logs",
"fanout",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
// declare a queue
.and_then(move |ch| {
// declare a queue using specified options
// if name is empty it will be generated
ch.queue_declare("", queue_options, FieldTable::new())
.map(move |queue| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// bind our queue to declared exchange
let name = queue.name();
ch.queue_bind(
&name,
"logs",
"",
QueueBindOptions::default(),
FieldTable::new(),
)
.map(move |_| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// create a message receiver
ch.basic_consume(
&queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::new(),
)
.map(move |s| (ch.clone(), s))
})
.and_then(move |(ch, stream)| {
// print received messages
stream.for_each(move |message| {
let text = std::str::from_utf8(&message.data).unwrap();
println!("Received: {:?}", text);
ch.basic_ack(message.delivery_tag, false)
})
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +0,0 @@
[package]
name = "tutorial-04"
version = "0.1.0"
authors = ["Adel Vilkov <vilkov.adel@gmail.com>"]
edition = "2018"
[dependencies]
failure = "^0.1"
futures = "^0.1"
itertools = "^0.8.0"
lapin-futures = "^0.18"
tokio = "^0.1.8"
tokio-codec = "^0.1"
tokio-io = "^0.1"
tokio-sync = "^0.1"
tokio-timer = "^0.2"

View File

@ -1,62 +0,0 @@
use crate::lapin::channel::{BasicProperties, BasicPublishOptions, ExchangeDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use itertools::free::join;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
let args: Vec<_> = std::env::args().skip(1).collect();
let severity = match args.len() {
0 => "info".to_string(),
_ => args.first().unwrap().clone(),
};
let message = match args.len() {
x if x < 2 => "Hello, world!".to_string(),
_ => join(args[1..].iter(), " "),
};
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, _)| client.create_channel().map_err(Error::from)) // create a channel
.and_then(move |c| {
let channel = c.clone();
channel
// declare a new exchange
.exchange_declare(
"direct_logs",
"direct",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
.and_then(move |ch| {
// if successful, send a message
ch.basic_publish(
"direct_logs",
&severity,
message.as_bytes().to_vec(),
BasicPublishOptions::default(),
BasicProperties::default(),
)
.map(|_| println!("Sent a message"))
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -1,103 +0,0 @@
use crate::lapin::channel::{
BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, heartbeat)| {
// do a heartbeat on a dedicated thread to keep us connected
tokio::spawn(heartbeat.map_err(|_| ()));
// create a channel
client.create_channel().map_err(Error::from)
})
.and_then(|c| {
let channel = c.clone();
// generate an queue with a random name which deletes itself after use
let queue_options = QueueDeclareOptions {
durable: false,
exclusive: true,
auto_delete: true,
nowait: false,
passive: false,
ticket: 0u16,
};
channel
.exchange_declare(
"direct_logs",
"direct",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
// declare a queue
.and_then(move |ch| {
// declare a queue using specified options
// if name is empty it will be generated
ch.queue_declare("", queue_options, FieldTable::new())
.map(move |queue| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// bind our queue to declared exchange
let name = queue.name();
let c = ch.clone();
let args: Vec<_> = std::env::args().skip(1).collect();
let severities = match args.len() {
0 => vec!["info".to_string()],
_ => args,
};
let binds = severities.into_iter().map(move |severity| {
c.queue_bind(
&name,
"direct_logs",
&severity,
QueueBindOptions::default(),
FieldTable::new(),
)
});
futures::future::join_all(binds).map(move |_| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// create a message receiver
ch.basic_consume(
&queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::new(),
)
.map(move |s| (ch.clone(), s))
})
.and_then(move |(ch, stream)| {
// print received messages
stream.for_each(move |message| {
let severity = &message.routing_key;
let text = std::str::from_utf8(&message.data).unwrap();
println!("Received: [{:?}] {:?}", severity, text);
ch.basic_ack(message.delivery_tag, false)
})
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

1250
rust/05-topics/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +0,0 @@
[package]
name = "tutorial-05"
version = "0.1.0"
authors = ["Adel Vilkov <vilkov.adel@gmail.com>"]
edition = "2018"
[dependencies]
failure = "^0.1"
futures = "^0.1"
itertools = "^0.8.0"
lapin-futures = "^0.18"
tokio = "^0.1.8"
tokio-codec = "^0.1"
tokio-io = "^0.1"
tokio-sync = "^0.1"
tokio-timer = "^0.2"

View File

@ -1,62 +0,0 @@
use crate::lapin::channel::{BasicProperties, BasicPublishOptions, ExchangeDeclareOptions};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use itertools::free::join;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
let args: Vec<_> = std::env::args().skip(1).collect();
let topic = match args.len() {
0 => "anonymous.info".to_string(),
_ => args.first().unwrap().clone(),
};
let message = match args.len() {
x if x < 2 => "Hello, world!".to_string(),
_ => join(args[1..].iter(), " "),
};
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, _)| client.create_channel().map_err(Error::from)) // create a channel
.and_then(move |c| {
let channel = c.clone();
channel
// declare a new exchange
.exchange_declare(
"topic_logs",
"topic",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
.and_then(move |ch| {
// if successful, send a message
ch.basic_publish(
"topic_logs",
&topic,
message.as_bytes().to_vec(),
BasicPublishOptions::default(),
BasicProperties::default(),
)
.map(|_| println!("Sent a message"))
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

View File

@ -1,103 +0,0 @@
use crate::lapin::channel::{
BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use crate::lapin::client::ConnectionOptions;
use crate::lapin::types::FieldTable;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use lapin_futures as lapin;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use lapin::client::Client as AMQPClient;
fn main() {
let addr = "127.0.0.1:5672".parse().unwrap();
Runtime::new()
.unwrap()
.block_on_all(
TcpStream::connect(&addr) // try to initiate a TCP connection
.map_err(Error::from)
.and_then(|stream| {
// if successful, pass it to AMQP client
AMQPClient::connect(stream, ConnectionOptions::default()).map_err(Error::from)
})
.and_then(|(client, heartbeat)| {
// do a heartbeat on a dedicated thread to keep us connected
tokio::spawn(heartbeat.map_err(|_| ()));
// create a channel
client.create_channel().map_err(Error::from)
})
.and_then(|c| {
let channel = c.clone();
// generate an queue with a random name which deletes itself after use
let queue_options = QueueDeclareOptions {
durable: false,
exclusive: true,
auto_delete: true,
nowait: false,
passive: false,
ticket: 0u16,
};
channel
.exchange_declare(
"topic_logs",
"topic",
ExchangeDeclareOptions::default(),
FieldTable::new(),
)
.map(move |_| channel.clone())
// declare a queue
.and_then(move |ch| {
// declare a queue using specified options
// if name is empty it will be generated
ch.queue_declare("", queue_options, FieldTable::new())
.map(move |queue| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// bind our queue to declared exchange
let name = queue.name();
let c = ch.clone();
let args: Vec<_> = std::env::args().skip(1).collect();
let topics = match args.len() {
0 => vec!["anonymous.info".to_string()],
_ => args,
};
let binds = topics.into_iter().map(move |topic| {
c.queue_bind(
&name,
"topic_logs",
&topic,
QueueBindOptions::default(),
FieldTable::new(),
)
});
futures::future::join_all(binds).map(move |_| (ch.clone(), queue))
})
.and_then(move |(ch, queue)| {
// create a message receiver
ch.basic_consume(
&queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::new(),
)
.map(move |s| (ch.clone(), s))
})
.and_then(move |(ch, stream)| {
// print received messages
stream.for_each(move |message| {
let topic = &message.routing_key;
let text = std::str::from_utf8(&message.data).unwrap();
println!("Received: [{:?}] {:?}", topic, text);
ch.basic_ack(message.delivery_tag, false)
})
})
.map_err(Error::from)
}),
)
.expect("Failed to create tokio runtime");
}

1065
rust/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

11
rust/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "rabbitmq-tutorials"
version = "1.0.0"
authors = ["Michal Malek <michalm@fastmail.fm>"]
edition = "2018"
[dependencies]
futures = "0.3.7"
lapin = "1.5"
tokio = { version = "0.3.3", features = ["macros", "rt-multi-thread", "stream"] }
uuid = { version = "0.8.1", features = ["v4"] }

View File

@ -3,7 +3,7 @@
Here you can find the Rust code examples for [RabbitMQ
tutorials](https://www.rabbitmq.com/getstarted.html).
The examples use [lapin](https://github.com/sozu-proxy/lapin) client library.
The examples use [lapin](https://github.com/CleverCloud/lapin) client library.
You should have a RabbitMQ server running on default port.
@ -12,51 +12,34 @@ You should have a RabbitMQ server running on default port.
* [Rust and Cargo](https://www.rust-lang.org/tools/install)
## Code
Each tutorial is a separate crate where each source file corresponds to a
binary executable. Each cargo command should be launched in a separate shell.
Each cargo command should be launched in a separate shell.
#### [Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorial-one-dotnet.html)
```
cd 01-hello-world
```
```
cargo run --bin receive
cargo run --bin send
```
#### [Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorial-one-python.html)
#### [Tutorial two: Work Queues](https://www.rabbitmq.com/tutorial-two-dotnet.html)
```
cd 02-work-queues
```
```
cargo run --bin worker
cargo run --bin new-task "hi" # specify a custom message
```
cargo run --bin receive
cargo run --bin send
#### [Tutorial three: Publish/Subscribe](https://www.rabbitmq.com/tutorial-three-dotnet.html)
```
cd 03-publish-subscribe
```
```
cargo run --bin subscribe
cargo run --bin publish "hi" # specify a custom message
```
#### [Tutorial two: Work Queues](https://www.rabbitmq.com/tutorial-two-python.html)
#### [Tutorial four: Routing](https://www.rabbitmq.com/tutorial-four-dotnet.html)
```
cd 04-routing
```
```
cargo run --bin receive-direct info error # specify log levels
cargo run --bin emit-direct error "help!" # specify severity and custom message
```
cargo run --bin worker
cargo run --bin new_task "hi" # specify a custom message
#### [Tutorial five: Topics](https://www.rabbitmq.com/tutorial-five-dotnet.html)
```
cd 05-topics
```
```
cargo run --bin receive-topic kern.* # specify topic filter
cargo run --bin emit-topic kern.mem "No memory left!" # specify topic and message
```
#### [Tutorial three: Publish/Subscribe](https://www.rabbitmq.com/tutorial-three-python.html)
cargo run --bin receive_logs
cargo run --bin emit_log "hi" # specify a custom message
#### [Tutorial four: Routing](https://www.rabbitmq.com/tutorial-four-python.html)
cargo run --bin receive_logs_direct info error # specify log levels
cargo run --bin emit_log_direct error "help!" # specify severity and custom message
#### [Tutorial five: Topics](https://www.rabbitmq.com/tutorial-five-python.html)
cargo run --bin receive_logs_topic kern.* # specify topic filter
cargo run --bin emit_log_topic kern.mem "No memory left!" # specify topic and message
#### [Tutorial six: RPC](https://www.rabbitmq.com/tutorial-six-python.html)
cargo run --bin rpc_server
cargo run --bin rpc_client

41
rust/src/bin/emit_log.rs Normal file
View File

@ -0,0 +1,41 @@
use lapin::{
options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties, ExchangeKind,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<_> = std::env::args().skip(1).collect();
let message = match args.len() {
0 => b"hello".to_vec(),
_ => args.join(" ").into_bytes(),
};
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.exchange_declare(
"logs",
ExchangeKind::Fanout,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
channel
.basic_publish(
"logs",
"",
BasicPublishOptions::default(),
message.clone(),
BasicProperties::default(),
)
.await?;
println!(" [x] Sent {:?}", std::str::from_utf8(&message)?);
conn.close(0, "").await?;
Ok(())
}

View File

@ -0,0 +1,46 @@
use lapin::{
options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties, ExchangeKind,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<_> = std::env::args().skip(1).collect();
let severity = args.first().map(String::as_str).unwrap_or("info");
let message = match args.len() {
x if x < 2 => b"Hello, world!".to_vec(),
_ => args[1..].join(" ").into_bytes(),
};
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.exchange_declare(
"direct_logs",
ExchangeKind::Direct,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
channel
.basic_publish(
"direct_logs",
severity,
BasicPublishOptions::default(),
message.clone(),
BasicProperties::default(),
)
.await?;
println!(
" [x] Sent {}:{:?}",
severity,
std::str::from_utf8(&message)?
);
conn.close(0, "").await?;
Ok(())
}

View File

@ -0,0 +1,46 @@
use lapin::{
options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties, ExchangeKind,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<_> = std::env::args().skip(1).collect();
let routing_key = args.first().map(String::as_str).unwrap_or("anonymous.info");
let message = match args.len() {
x if x < 2 => b"Hello, world!".to_vec(),
_ => args[1..].join(" ").into_bytes(),
};
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.exchange_declare(
"topic_logs",
ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
channel
.basic_publish(
"topic_logs",
routing_key,
BasicPublishOptions::default(),
message.clone(),
BasicProperties::default(),
)
.await?;
println!(
" [x] Sent {}:{:?}",
routing_key,
std::str::from_utf8(&message)?
);
conn.close(0, "").await?;
Ok(())
}

38
rust/src/bin/new_task.rs Normal file
View File

@ -0,0 +1,38 @@
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<_> = std::env::args().skip(1).collect();
let message = match args.len() {
0 => b"hello".to_vec(),
_ => args.join(" ").into_bytes(),
};
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare(
"task_queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
channel
.basic_publish(
"",
"task_queue",
BasicPublishOptions::default(),
message.clone(),
BasicProperties::default(),
)
.await?;
println!(" [x] Sent {:?}", std::str::from_utf8(&message)?);
conn.close(0, "").await?;
Ok(())
}

37
rust/src/bin/receive.rs Normal file
View File

@ -0,0 +1,37 @@
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
"hello",
"consumer",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!(" [*] Waiting for messages. To exit press CTRL+C");
for delivery in consumer {
let (_, delivery) = delivery?;
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
}
Ok(())
}

View File

@ -0,0 +1,59 @@
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.exchange_declare(
"logs",
ExchangeKind::Fanout,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
channel
.queue_bind(
queue.name().as_str(),
"logs",
"",
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
queue.name().as_str(),
"consumer",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!(" [*] Waiting for logs. To exit press CTRL+C");
for delivery in consumer {
let (_, delivery) = delivery?;
println!(" [x] {:?}", std::str::from_utf8(&delivery.data)?);
}
Ok(())
}

View File

@ -0,0 +1,73 @@
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let severities: Vec<_> = std::env::args().skip(1).collect();
if severities.is_empty() {
eprintln!(
"Usage: {} [info] [warning] [error]\n",
std::env::args().next().unwrap_or_else(|| "receive-direct".into())
);
std::process::exit(1);
}
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.exchange_declare(
"direct_logs",
ExchangeKind::Direct,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
futures::future::join_all(severities.into_iter().map(|severity| {
channel.queue_bind(
queue.name().as_str(),
"direct_logs",
&severity,
QueueBindOptions::default(),
FieldTable::default(),
)
}))
.await;
let consumer = channel
.basic_consume(
queue.name().as_str(),
"consumer",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!(" [*] Waiting for logs. To exit press CTRL+C");
for delivery in consumer {
let (_, delivery) = delivery?;
println!(
" [x] {}:{:?}",
delivery.routing_key,
std::str::from_utf8(&delivery.data)?
);
}
Ok(())
}

View File

@ -0,0 +1,73 @@
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let binding_keys: Vec<_> = std::env::args().skip(1).collect();
if binding_keys.is_empty() {
eprintln!(
"Usage: {} [binding_key]...\n",
std::env::args().next().unwrap_or_else(|| "receive-topic".into())
);
std::process::exit(1);
}
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.exchange_declare(
"topic_logs",
ExchangeKind::Topic,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
futures::future::join_all(binding_keys.into_iter().map(|binding_key| {
channel.queue_bind(
queue.name().as_str(),
"topic_logs",
&binding_key,
QueueBindOptions::default(),
FieldTable::default(),
)
}))
.await;
let consumer = channel
.basic_consume(
queue.name().as_str(),
"consumer",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
println!(" [*] Waiting for logs. To exit press CTRL+C");
for delivery in consumer {
let (_, delivery) = delivery?;
println!(
" [x] {}:{:?}",
delivery.routing_key,
std::str::from_utf8(&delivery.data)?
);
}
Ok(())
}

117
rust/src/bin/rpc_client.rs Normal file
View File

@ -0,0 +1,117 @@
use lapin::{
options::*, types::FieldTable, types::ShortString, BasicProperties, Channel, Connection,
ConnectionProperties, Consumer, Queue,
};
use std::convert::TryInto;
use std::fmt::Display;
use tokio::stream::StreamExt;
use uuid::Uuid;
#[derive(Debug)]
enum Error {
CannotDecodeReply,
NoReply,
}
impl std::error::Error for Error {}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::CannotDecodeReply => write!(f, "Cannot decode reply"),
Error::NoReply => write!(f, "No reply arrived"),
}
}
}
struct FibonacciRpcClient {
conn: Connection,
channel: Channel,
callback_queue: Queue,
consumer: Consumer,
correlation_id: ShortString,
}
impl FibonacciRpcClient {
async fn new() -> Result<Self, lapin::Error> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let callback_queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
callback_queue.name().as_str(),
"rpc_client",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
let correlation_id = Uuid::new_v4().to_string().into();
Ok(Self {
conn,
channel,
callback_queue,
consumer,
correlation_id,
})
}
async fn call(&mut self, n: u64) -> Result<u64, Box<dyn std::error::Error>> {
self.channel
.basic_publish(
"",
"rpc_queue",
BasicPublishOptions::default(),
n.to_le_bytes().to_vec(),
BasicProperties::default()
.with_reply_to(self.callback_queue.name().clone())
.with_correlation_id(self.correlation_id.clone()),
)
.await?
.await?;
while let Some(delivery) = self.consumer.next().await {
let (_, reply) = delivery?;
if reply.properties.correlation_id().as_ref() == Some(&self.correlation_id) {
return Ok(u64::from_le_bytes(
reply
.data
.as_slice()
.try_into()
.map_err(|_| Error::CannotDecodeReply)?,
));
}
}
Err(Box::new(Error::NoReply))
}
async fn close(&self) -> Result<(), lapin::Error> {
self.conn.close(0, "").await
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut fibonacci_rpc = FibonacciRpcClient::new().await?;
println!(" [x] Requesting fib(30)");
let response = fibonacci_rpc.call(30).await?;
println!(" [.] Got {}", response);
fibonacci_rpc.close().await?;
Ok(())
}

100
rust/src/bin/rpc_server.rs Normal file
View File

@ -0,0 +1,100 @@
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use std::convert::TryInto;
use std::fmt::Display;
#[derive(Debug)]
enum Error {
CannotDecodeArg,
MissingReplyTo,
MissingCorrelationId,
}
impl std::error::Error for Error {}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::CannotDecodeArg => write!(f, "Cannot decode argument"),
Error::MissingReplyTo => write!(f, "Missing 'reply to' property"),
Error::MissingCorrelationId => write!(f, "Missing 'correlation id' property"),
}
}
}
fn fib(n: u64) -> u64 {
if n < 2 {
n
} else {
fib(n - 1) + fib(n - 2)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare(
"rpc_queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
channel.basic_qos(1, BasicQosOptions::default()).await?;
let consumer = channel
.basic_consume(
"rpc_queue",
"rpc_server",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
println!(" [x] Awaiting RPC requests");
for delivery in consumer {
let (channel, delivery) = delivery?;
let n = u64::from_le_bytes(
delivery
.data
.as_slice()
.try_into()
.map_err(|_| Error::CannotDecodeArg)?,
);
println!(" [.] fib({})", n);
let response = fib(n);
let routing_key = delivery
.properties
.reply_to()
.as_ref()
.ok_or(Error::MissingReplyTo)?
.as_str();
let correlation_id = delivery
.properties
.correlation_id()
.clone()
.ok_or(Error::MissingCorrelationId)?;
channel
.basic_publish(
"",
routing_key,
BasicPublishOptions::default(),
response.to_le_bytes().to_vec(),
BasicProperties::default().with_correlation_id(correlation_id),
)
.await?;
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await?;
}
Ok(())
}

32
rust/src/bin/send.rs Normal file
View File

@ -0,0 +1,32 @@
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
channel
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
b"Hello World!".to_vec(),
BasicProperties::default(),
)
.await?;
println!(" [x] Sent \"Hello World!\"");
conn.close(0, "").await?;
Ok(())
}

41
rust/src/bin/worker.rs Normal file
View File

@ -0,0 +1,41 @@
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use std::thread;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare(
"task_queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
"task_queue",
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
println!(" [*] Waiting for messages. To exit press CTRL+C");
for delivery in consumer {
let (channel, delivery) = delivery?;
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
thread::sleep(Duration::from_secs(delivery.data.len() as u64));
println!(" [x] Done");
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await?;
}
Ok(())
}