Merge branch 'Matrixbirds-master'

This commit is contained in:
Michael Klishin 2018-12-28 21:34:25 +03:00
commit 8f075b0c57
No known key found for this signature in database
GPG Key ID: 2C0DA45F4F944489
16 changed files with 491 additions and 0 deletions

1
.gitignore vendored
View File

@ -31,3 +31,4 @@ target/
.vs/
*.log
.packages

53
dart/README.md Normal file
View File

@ -0,0 +1,53 @@
# Dart code for RabbitMQ tutorials
Here you can find an [Dart](https://www.dartlang.org/) port of
[RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html).
## Requirements
To run this code you need a [Dart 2 server platform installed](https://www.dartlang.org/tools/sdk#install)
### Dart 2.0+
These tutorials use [dart_amqp](https://github.com/achilleasa/dart_amqp).
To install dependencies with pub, run:
pub get
## Code
To run the examples, use `dart source_file.dart`.
Tutorial one: "Hello World!":
dart receive.dart
dart send.dart
Tutorial two: Work Queues
dart worker.dart
dart new_task.dart
Tutorial three: Publish/Subscribe
dart receive_logs.dart
dart emit_log.dart
Tutorial four: Routing
dart receive_logs_direct.dart info warning
dart emit_log_direct.dart info "A message"
dart emit_log_direct.dart warning "A warning"
Tutorial five: Topics
dart receive_logs_topic.dart "info.*" "warn.*"
dart emit_log_topic.dart "info.connections" "Connected"
dart emit_log_topic.dart "warn.connecctions" "A warning"
Tutorial six: RPC (Request/Response)
dart rpc_server.dart
dart rpc_client.dart

21
dart/emit_log.dart Normal file
View File

@ -0,0 +1,21 @@
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
String msg = arguments.isEmpty ? "Hello World!" : arguments[0];
client
.channel()
.then((Channel channel) =>
channel.exchange("logs", ExchangeType.FANOUT, durable: false))
.then((Exchange exchange) {
exchange.publish(msg, null);
print(" [x] Sent ${msg}");
return client.close();
});
}

23
dart/emit_log_direct.dart Normal file
View File

@ -0,0 +1,23 @@
import "package:dart_amqp/dart_amqp.dart";
void main(List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
String routingKey = arguments.length < 1 ? "info" : arguments[0];
String msg = arguments.length < 2 ? "Hello World!" : arguments[1];
client
.channel()
.then((Channel channel) =>
channel.exchange("direct_logs", ExchangeType.DIRECT,
durable: false))
.then((Exchange exchange) {
exchange.publish(msg, routingKey);
print(" [x] Sent ${routingKey}: ${msg}");
return client.close();
});
}

23
dart/emit_log_topic.dart Normal file
View File

@ -0,0 +1,23 @@
import "package:dart_amqp/dart_amqp.dart";
void main(List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
String routingKey = arguments.length < 1 ? "anonymous.info" : arguments[0];
String msg = arguments.length < 2 ? "Hello World!" : arguments[1];
client
.channel()
.then((Channel channel) =>
channel.exchange("topic_logs", ExchangeType.TOPIC,
durable: false))
.then((Exchange exchange) {
exchange.publish(msg, routingKey);
print(" [x] Sent ${routingKey}: ${msg}");
return client.close();
});
}

24
dart/new_task.dart Normal file
View File

@ -0,0 +1,24 @@
import "package:dart_amqp/dart_amqp.dart";
void main(List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
String consumeTag = "task_queue";
String msg = arguments.isEmpty ? "Hello World!" : arguments[0];
MessageProperties properties = new MessageProperties.persistentMessage();
client
.channel()
.then((Channel channel) =>
channel.queue(consumeTag, durable: true))
.then((Queue queue) {
queue.publish(msg, properties: properties);
print(" [x] Sent ${msg}");
return client.close();
});
}

19
dart/pubspec.lock Normal file
View File

@ -0,0 +1,19 @@
# Generated by pub
# See https://www.dartlang.org/tools/pub/glossary#lockfile
packages:
dart_amqp:
dependency: "direct main"
description:
name: dart_amqp
url: "https://pub.dartlang.org"
source: hosted
version: "0.1.1"
logging:
dependency: transitive
description:
name: logging
url: "https://pub.dartlang.org"
source: hosted
version: "0.11.3+2"
sdks:
dart: ">=2.0.0-dev <3.0.0"

3
dart/pubspec.yaml Normal file
View File

@ -0,0 +1,3 @@
name: dart_rabbitmq_example
dependencies:
dart_amqp: 0.1.1

34
dart/receive.dart Normal file
View File

@ -0,0 +1,34 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});
String msg = arguments.isEmpty ? "Hello World!": arguments[0];
String queueTag = "hello";
client
.channel()
.then((Channel channel) => channel.queue(queueTag, durable: false))
.then((Queue queue) {
print(" [*] Waiting for messages in ${queueTag}. To Exit press CTRL+C");
return queue.consume(consumerTag: queueTag, noAck: true);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] Received ${event.payloadAsString}");
});
});
}

34
dart/receive_logs.dart Normal file
View File

@ -0,0 +1,34 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});
String msg = arguments.isEmpty ? "Hello World!": arguments[0];
client
.channel()
.then((Channel channel) {
return channel.exchange("logs", ExchangeType.FANOUT, durable: false);
})
.then((Exchange exchange) {
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
return exchange.bindPrivateQueueConsumer(null);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] Received ${event.payloadAsString}");
});
});
}

View File

@ -0,0 +1,40 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
if (arguments.isEmpty) {
print("Usage: receive_logs_direct.dart [info] [warning] [error]");
return;
}
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});
List<String> routingKeys = arguments.sublist(0, 2);
client
.channel()
.then((Channel channel) {
return channel.exchange("direct_logs", ExchangeType.DIRECT, durable: false);
})
.then((Exchange exchange) {
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
return exchange.bindPrivateQueueConsumer(routingKeys,
consumerTag: "direct_logs", noAck: true
);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] ${event.routingKey}:'${event.payloadAsString}'");
});
});
}

View File

@ -0,0 +1,40 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
if (arguments.isEmpty) {
print("Usage: receive_logs_direct.dart <topic> [<topic>, ...]");
return;
}
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});
List<String> routingKeys = arguments.sublist(0);
client
.channel()
.then((Channel channel) {
return channel.exchange("topic_logs", ExchangeType.TOPIC, durable: false);
})
.then((Exchange exchange) {
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
return exchange.bindPrivateQueueConsumer(routingKeys,
consumerTag: "topic_logs", noAck: true
);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] ${event.routingKey}:'${event.payloadAsString}'");
});
});
}

80
dart/rpc_client.dart Normal file
View File

@ -0,0 +1,80 @@
import "dart:io";
import "dart:async";
import "dart:math";
import "package:dart_amqp/dart_amqp.dart";
var UUID = () => "${(new Random()).nextDouble()}";
class RPCClient {
Client client;
String queueTag;
String _replyQueueTag;
Completer contextChannel;
Map<String, Completer> _channels = new Map<String, Completer>();
Queue _queue;
RPCClient() : client = new Client(),
queueTag = "rpc_queue" {
contextChannel = new Completer();
client
.channel()
.then((Channel channel) => channel.queue(queueTag))
.then((Queue rpcQueue) {
_queue = rpcQueue;
return rpcQueue.channel.privateQueue();
})
.then((Queue rpcQueue) {
rpcQueue.consume(noAck: true)
.then((Consumer consumer) {
_replyQueueTag = consumer.queue.name;
consumer.listen(handler);
contextChannel.complete();
});
});
}
void handler (AmqpMessage event) {
if (!_channels
.containsKey(
event.properties.corellationId)) return;
print(" [.] Got ${event.payloadAsString}");
_channels
.remove(event.properties.corellationId)
.complete(event.payloadAsString);
}
Future<String> call(int n) {
return contextChannel.future
.then((_) {
String uuid = UUID();
Completer<String> channel = new Completer<String>();
MessageProperties properties = new MessageProperties()
..replyTo = _replyQueueTag
..corellationId = uuid;
_channels[uuid] = channel;
print(" [x] Requesting ${n}");
_queue.publish({"n": n}, properties: properties);
return channel.future;
});
}
Future close() {
_channels.forEach((_, var next) => next.complete("RPC client closed"));
_channels.clear();
client.close();
}
}
void main(List<String> arguments) {
if (arguments.isEmpty) {
print("Usage: rpc_client.dart num");
return;
}
RPCClient client = new RPCClient();
int n = arguments.isEmpty ? 30 : num.parse(arguments[0]);
client.call(n)
.then((String res) {
print(" [x] fib(${n}) = ${res}");
})
.then((_) => client.close())
.then((_) => null);
}

36
dart/rpc_server.dart Normal file
View File

@ -0,0 +1,36 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";
// Slow implementation of fib
int fib(int n) {
if (n >= 0 && n <= 1) {
return n;
} else
return fib(n - 1) + fib(n - 2);
}
void main(List<String> args) {
Client client = new Client();
// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
exit(0);
});
});
client
.channel()
.then((Channel channel) => channel.qos(0, 1))
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
print(" [x] Awaiting RPC request");
consumer.listen((AmqpMessage message) {
var n = message.payloadAsJson["n"];
print(" [.] fib(${n})");
message.reply(fib(n).toString());
});
});
}

23
dart/send.dart Normal file
View File

@ -0,0 +1,23 @@
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
String consumeTag = "hello";
String msg = "hello";
client
.channel()
.then((Channel channel) {
return channel.queue(consumeTag, durable: false);
})
.then((Queue queue) {
queue.publish(msg);
print(" [x] Sent ${msg}");
client.close();
});
}

37
dart/worker.dart Normal file
View File

@ -0,0 +1,37 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";
void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);
Client client = new Client(settings: settings);
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});
String consumeTag = "task_queue";
client
.channel()
.then((Channel channel) {
return channel.qos(0, 1)
.then((Channel channel) =>
channel.queue(consumeTag, durable: true));
})
.then((Queue queue) => queue.consume(noAck: false))
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
String payload = event.payloadAsString;
print(" [x] Received ${payload}");
sleep(new Duration(seconds : payload.split(".").length));
print(" [x] Done");
event.ack();
});
});
}