Simplified RPC examples
This commit is contained in:
parent
e4fee05c96
commit
a04cfcc233
@ -5,9 +5,7 @@ import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public class RPCClient implements AutoCloseable {
|
||||
|
||||
@ -31,12 +29,12 @@ public class RPCClient implements AutoCloseable {
|
||||
String response = fibonacciRpc.call(i_str);
|
||||
System.out.println(" [.] Got '" + response + "'");
|
||||
}
|
||||
} catch (IOException | TimeoutException | InterruptedException e) {
|
||||
} catch (IOException | TimeoutException | InterruptedException | ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public String call(String message) throws IOException, InterruptedException {
|
||||
public String call(String message) throws IOException, InterruptedException, ExecutionException {
|
||||
final String corrId = UUID.randomUUID().toString();
|
||||
|
||||
String replyQueueName = channel.queueDeclare().getQueue();
|
||||
@ -48,16 +46,16 @@ public class RPCClient implements AutoCloseable {
|
||||
|
||||
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
|
||||
|
||||
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
|
||||
final CompletableFuture<String> response = new CompletableFuture<>();
|
||||
|
||||
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
|
||||
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
|
||||
response.offer(new String(delivery.getBody(), "UTF-8"));
|
||||
response.complete(new String(delivery.getBody(), "UTF-8"));
|
||||
}
|
||||
}, consumerTag -> {
|
||||
});
|
||||
|
||||
String result = response.take();
|
||||
String result = response.get();
|
||||
channel.basicCancel(ctag);
|
||||
return result;
|
||||
}
|
||||
|
@ -14,53 +14,36 @@ public class RPCServer {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
|
||||
channel.queuePurge(RPC_QUEUE_NAME);
|
||||
Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
|
||||
channel.queuePurge(RPC_QUEUE_NAME);
|
||||
|
||||
channel.basicQos(1);
|
||||
channel.basicQos(1);
|
||||
|
||||
System.out.println(" [x] Awaiting RPC requests");
|
||||
System.out.println(" [x] Awaiting RPC requests");
|
||||
|
||||
Object monitor = new Object();
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
|
||||
.Builder()
|
||||
.correlationId(delivery.getProperties().getCorrelationId())
|
||||
.build();
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
|
||||
.Builder()
|
||||
.correlationId(delivery.getProperties().getCorrelationId())
|
||||
.build();
|
||||
|
||||
String response = "";
|
||||
String response = "";
|
||||
try {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
int n = Integer.parseInt(message);
|
||||
|
||||
try {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
int n = Integer.parseInt(message);
|
||||
|
||||
System.out.println(" [.] fib(" + message + ")");
|
||||
response += fib(n);
|
||||
} catch (RuntimeException e) {
|
||||
System.out.println(" [.] " + e.toString());
|
||||
} finally {
|
||||
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
// RabbitMq consumer worker thread notifies the RPC server owner thread
|
||||
synchronized (monitor) {
|
||||
monitor.notify();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
|
||||
// Wait and be prepared to consume the message from RPC client.
|
||||
while (true) {
|
||||
synchronized (monitor) {
|
||||
try {
|
||||
monitor.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
System.out.println(" [.] fib(" + message + ")");
|
||||
response += fib(n);
|
||||
} catch (RuntimeException e) {
|
||||
System.out.println(" [.] " + e);
|
||||
} finally {
|
||||
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user