More consistent rpc client, more wrapping
This commit is contained in:
parent
b47b345937
commit
f9b06ed777
@ -11,7 +11,8 @@ public static void Main(string[] args) {
|
||||
channel.ExchangeDeclare("direct_logs", "direct");
|
||||
|
||||
string severity = (args.Length > 0) ? args[0] : "info";
|
||||
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray())
|
||||
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1)
|
||||
.ToArray())
|
||||
: "Hello World!";
|
||||
byte[] body = System.Text.Encoding.UTF8.GetBytes(message);
|
||||
channel.BasicPublish("direct_logs", severity, null, body);
|
||||
|
@ -11,7 +11,8 @@ public static void Main(string[] args) {
|
||||
channel.ExchangeDeclare("topic_logs", "topic");
|
||||
|
||||
string routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
|
||||
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray())
|
||||
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1)
|
||||
.ToArray())
|
||||
: "Hello World!";
|
||||
byte[] body = System.Text.Encoding.UTF8.GetBytes(message);
|
||||
channel.BasicPublish("topic_logs", routingKey, null, body);
|
||||
|
@ -2,41 +2,53 @@
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
class RPCClient {
|
||||
private static string RpcCall(string message) {
|
||||
string response = null;
|
||||
class RPCClient : IDisposable {
|
||||
private IConnection connection;
|
||||
private IModel channel;
|
||||
private string replyQueueName;
|
||||
private QueueingBasicConsumer consumer;
|
||||
|
||||
public RPCClient() {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.HostName = "localhost";
|
||||
using (IConnection connection = factory.CreateConnection())
|
||||
using (IModel channel = connection.CreateModel()) {
|
||||
string replyQueueName = channel.QueueDeclare();
|
||||
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
|
||||
channel.BasicConsume(replyQueueName, false, consumer);
|
||||
connection = factory.CreateConnection();
|
||||
channel = connection.CreateModel();
|
||||
replyQueueName = channel.QueueDeclare();
|
||||
consumer = new QueueingBasicConsumer(channel);
|
||||
channel.BasicConsume(replyQueueName, false, consumer);
|
||||
}
|
||||
|
||||
string corrId = Guid.NewGuid().ToString();
|
||||
IBasicProperties props = channel.CreateBasicProperties();
|
||||
props.ReplyTo = replyQueueName;
|
||||
props.CorrelationId = corrId;
|
||||
public string Call(string message) {
|
||||
string response = null;
|
||||
string corrId = Guid.NewGuid().ToString();
|
||||
IBasicProperties props = channel.CreateBasicProperties();
|
||||
props.ReplyTo = replyQueueName;
|
||||
props.CorrelationId = corrId;
|
||||
|
||||
byte[] messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
|
||||
channel.BasicPublish("", "rpc_queue", props, messageBytes);
|
||||
byte[] messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
|
||||
channel.BasicPublish("", "rpc_queue", props, messageBytes);
|
||||
|
||||
while (true) {
|
||||
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
|
||||
if (ea.BasicProperties.CorrelationId == corrId) {
|
||||
byte[] body = ea.Body;
|
||||
response = System.Text.Encoding.UTF8.GetString(body);
|
||||
channel.BasicCancel(consumer.ConsumerTag);
|
||||
break;
|
||||
}
|
||||
while (true) {
|
||||
BasicDeliverEventArgs ea =
|
||||
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
|
||||
if (ea.BasicProperties.CorrelationId == corrId) {
|
||||
byte[] body = ea.Body;
|
||||
response = System.Text.Encoding.UTF8.GetString(body);
|
||||
channel.BasicCancel(consumer.ConsumerTag);
|
||||
break;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
public void Dispose() {
|
||||
connection.Close();
|
||||
}
|
||||
|
||||
public static void Main() {
|
||||
Console.WriteLine(" [x] Requesting fib(30)");
|
||||
string response = RpcCall("30");
|
||||
Console.WriteLine(" [.] Got '{0}'", response);
|
||||
using (RPCClient rpcClient = new RPCClient()) {
|
||||
string response = rpcClient.Call("30");
|
||||
Console.WriteLine(" [.] Got '{0}'", response);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,8 @@ public static void Main() {
|
||||
|
||||
while(true) {
|
||||
string response = null;
|
||||
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
|
||||
BasicDeliverEventArgs ea =
|
||||
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
|
||||
|
||||
byte[] body = ea.Body;
|
||||
IBasicProperties props = ea.BasicProperties;
|
||||
@ -32,8 +33,10 @@ public static void Main() {
|
||||
Console.WriteLine(" [.] " + e);
|
||||
response = "";
|
||||
} finally {
|
||||
byte[] responseBytes = System.Text.Encoding.UTF8.GetBytes(response);
|
||||
channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes);
|
||||
byte[] responseBytes =
|
||||
System.Text.Encoding.UTF8.GetBytes(response);
|
||||
channel.BasicPublish("", props.ReplyTo, replyProps,
|
||||
responseBytes);
|
||||
channel.BasicAck(ea.DeliveryTag, false);
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,8 @@ public static void Main(string[] args) {
|
||||
channel.QueueBind(queue_name, "direct_logs", severity);
|
||||
}
|
||||
|
||||
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
Console.WriteLine(" [*] Waiting for messages. " +
|
||||
"To exit press CTRL+C");
|
||||
|
||||
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
|
||||
channel.BasicConsume(queue_name, true, consumer);
|
||||
@ -35,7 +36,8 @@ public static void Main(string[] args) {
|
||||
byte[] body = ea.Body;
|
||||
string message = System.Text.Encoding.UTF8.GetString(body);
|
||||
string routingKey = ea.RoutingKey;
|
||||
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
|
||||
Console.WriteLine(" [x] Received '{0}':'{1}'",
|
||||
routingKey, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,9 +22,11 @@ public static void Main(string[] args) {
|
||||
channel.QueueBind(queue_name, "topic_logs", bindingKey);
|
||||
}
|
||||
|
||||
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
Console.WriteLine(" [*] Waiting for messages. " +
|
||||
"To exit press CTRL+C");
|
||||
|
||||
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
|
||||
QueueingBasicConsumer consumer =
|
||||
new QueueingBasicConsumer(channel);
|
||||
channel.BasicConsume(queue_name, true, consumer);
|
||||
|
||||
while(true) {
|
||||
@ -34,7 +36,8 @@ public static void Main(string[] args) {
|
||||
byte[] body = ea.Body;
|
||||
string message = System.Text.Encoding.UTF8.GetString(body);
|
||||
string routingKey = ea.RoutingKey;
|
||||
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
|
||||
Console.WriteLine(" [x] Received '{0}':'{1}'",
|
||||
routingKey, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user