Merge branch 'rockie-yang-master'

This commit is contained in:
Michael Klishin 2016-12-18 22:14:20 +03:00
commit d312d5e2a9
No known key found for this signature in database
GPG Key ID: D1A1B77724CE35D5
17 changed files with 715 additions and 0 deletions

4
.gitignore vendored
View File

@ -23,3 +23,7 @@ java*/.idea/encodings.xml
.vscode/
obj/
bin/
target/
.DS_Store
*.iml
.idea/

57
scala/README.md Normal file
View File

@ -0,0 +1,57 @@
# RabbitMQ Tutorials in Scala
This is a minimalistic Scala port of the RabbitMQ tutorials in Java.
The port is admittedly quite close to Java in terms of code style.
This is primarily to the fact that RabbitMQ Java client still supports
JDK 6 and doesn't have a lambda-friendly API.
## Compiling the Code
mvn compile
## Running Examples
### Hello World
Execute the following command to receive a hello world:
mvn exec:java -Dexec.mainClass="Recv"
Execute the following in a separate shell to send a hello world:
mvn exec:java -Dexec.mainClass="Send"
### Work Queues
Send a message which will be finshed immediately:
mvn exec:java -Dexec.mainClass="NewTask"
Send a message which need some second to execute each . is one second.
mvn exec:java -Dexec.mainClass="NewTask" -Dexec.args="rabbit1 ...."
To start a worker (run in a separate shell):
mvn exec:java -Dexec.mainClass="Worker"
Add more workers to the same queue, message will be distributed in the
round robin manner.
### Publish and Subscriber
mvn exec:java -Dexec.mainClass="EmitLog" -Dexec.args="rabbit1 msg1"
mvn exec:java -Dexec.mainClass="ReceiveLogs"
### RPC
In one shell:
mvn exec:java -Dexec.mainClass="RPCServer"
In another shell:
mvn exec:java -Dexec.mainClass="RPCClient"

91
scala/pom.xml Normal file
View File

@ -0,0 +1,91 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>explore</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.1-SNAPSHOT</version>
<properties>
<scala.version>2.11.8</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>

View File

@ -0,0 +1,34 @@
import com.rabbitmq.client.ConnectionFactory
object EmitLog {
private val EXCHANGE_NAME = "logs"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
val message = getMessage(argv)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"))
println(" [x] Sent '" + message + "'")
channel.close()
connection.close()
}
private def getMessage(strings: Array[String]): String = {
if (strings.length < 1) return "info: Hello World!"
joinStrings(strings, " ")
}
private def joinStrings(strings: Array[String], delimiter: String): String = {
val length = strings.length
if (length == 0) return ""
val words = new StringBuilder(strings(0))
for (i <- 1 until length) {
words.append(delimiter).append(strings(i))
}
words.toString
}
}

View File

@ -0,0 +1,42 @@
import com.rabbitmq.client.ConnectionFactory
object EmitLogDirect {
private val EXCHANGE_NAME = "direct_logs"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "direct")
val severity = getSeverity(argv)
val message = getMessage(argv)
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"))
println(" [x] Sent '" + severity + "':'" + message + "'")
channel.close()
connection.close()
}
private def getSeverity(strings: Array[String]): String = {
if (strings.length < 1) return "info"
strings(0)
}
private def getMessage(strings: Array[String]): String = {
if (strings.length < 2) return "Hello World!"
joinStrings(strings, " ", 1)
}
private def joinStrings(strings: Array[String], delimiter: String, startIndex: Int): String = {
val length = strings.length
if (length == 0) return ""
if (length < startIndex) return ""
val words = new StringBuilder(strings(startIndex))
for (i <- startIndex + 1 until length) {
words.append(delimiter).append(strings(i))
}
words.toString
}
}

View File

@ -0,0 +1,37 @@
import java.util.HashMap
import com.rabbitmq.client._
//remove if not needed
object EmitLogHeader {
private val EXCHANGE_NAME = "header_test"
def main(argv: Array[String]) {
if (argv.length < 1) {
System.err.println("Usage: EmitLogHeader message queueName [headers]...")
System.exit(1)
}
val routingKey = "ourTestRoutingKey"
val message = argv(0)
val headers = new HashMap[String, Object]()
for (i <- 1 until argv.length by 2) {
println("Adding header " + argv(i) + " with value " + argv(i + 1) +
" to Map")
headers.put(argv(i), argv(i + 1))
}
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "headers")
val builder = new AMQP.BasicProperties.Builder()
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode)
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority)
builder.headers(headers)
val theProps = builder.build()
channel.basicPublish(EXCHANGE_NAME, routingKey, theProps, message.getBytes("UTF-8"))
println(" [x] Sent message: '" + message + "'")
}
}

View File

@ -0,0 +1,53 @@
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
object EmitLogTopic {
private val EXCHANGE_NAME = "topic_logs"
def main(argv: Array[String]) {
var connection: Connection = null
var channel: Channel = null
try {
val factory = new ConnectionFactory()
factory.setHost("localhost")
connection = factory.newConnection()
channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "topic")
val routingKey = getRouting(argv)
val message = getMessage(argv)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"))
println(" [x] Sent '" + routingKey + "':'" + message + "'")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
try {
connection.close()
} catch {
case ignore: Exception =>
}
}
}
}
private def getRouting(strings: Array[String]): String = {
if (strings.length < 1) return "anonymous.info"
strings(0)
}
private def getMessage(strings: Array[String]): String = {
if (strings.length < 2) return "Hello World!"
joinStrings(strings, " ", 1)
}
private def joinStrings(strings: Array[String], delimiter: String, startIndex: Int): String = {
val length = strings.length
if (length == 0) return ""
if (length < startIndex) return ""
val words = new StringBuilder(strings(startIndex))
for (i <- startIndex + 1 until length) {
words.append(delimiter).append(strings(i))
}
words.toString
}
}

View File

@ -0,0 +1,35 @@
import com.rabbitmq.client.{ConnectionFactory, MessageProperties}
object NewTask {
private val TASK_QUEUE_NAME = "task_queue"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)
val message = getMessage(argv)
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
println(" [x] Sent '" + message + "'")
channel.close()
connection.close()
}
private def getMessage(strings: Array[String]): String = {
if (strings.length < 1) return "Hello World!"
joinStrings(strings, " ")
}
private def joinStrings(strings: Array[String], delimiter: String): String = {
val length = strings.length
if (length == 0) return ""
val words = new StringBuilder(strings(0))
for (i <- 1 until length) {
words.append(delimiter).append(strings(i))
}
words.toString
}
}

View File

@ -0,0 +1,64 @@
import java.util.UUID
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, QueueingConsumer}
class RPCClient(host: String) {
val factory = new ConnectionFactory()
factory.setHost(host)
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
val requestQueueName: String = "rpc_queue"
val replyQueueName: String = channel.queueDeclare().getQueue
val consumer: QueueingConsumer = new QueueingConsumer(channel)
channel.basicConsume(replyQueueName, true, consumer)
def call(message: String): String = {
var response: String = null
val corrId = UUID.randomUUID().toString
val props = new BasicProperties.Builder().correlationId(corrId)
.replyTo(replyQueueName)
.build()
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"))
while (response == null) {
val delivery = consumer.nextDelivery()
if (delivery.getProperties.getCorrelationId == corrId) {
response = new String(delivery.getBody, "UTF-8")
}
}
response
}
def close() {
connection.close()
}
}
object RPCClient {
def main(argv: Array[String]) {
var fibonacciRpc: RPCClient = null
var response: String = null
try {
val host = if (argv.isEmpty) "localhost" else argv(0)
fibonacciRpc = new RPCClient(host)
println(" [x] Requesting fib(30)")
response = fibonacciRpc.call("30")
println(" [.] Got '" + response + "'")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close()
} catch {
case ignore: Exception =>
}
}
}
}
}

View File

@ -0,0 +1,60 @@
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, QueueingConsumer}
object RPCServer {
private val RPC_QUEUE_NAME = "rpc_queue"
def main(argv: Array[String]) {
var connection: Connection = null
var channel: Channel = null
try {
val factory = new ConnectionFactory()
factory.setHost("localhost")
connection = factory.newConnection()
channel = connection.createChannel()
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null)
channel.basicQos(1)
val consumer = new QueueingConsumer(channel)
channel.basicConsume(RPC_QUEUE_NAME, false, consumer)
println(" [x] Awaiting RPC requests")
while (true) {
var response: String = null
val delivery = consumer.nextDelivery()
val props = delivery.getProperties
val replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId)
.build()
try {
val message = new String(delivery.getBody, "UTF-8")
val n = java.lang.Integer.parseInt(message)
println(" [.] fib(" + message + ")")
response = "" + fib(n)
} catch {
case e: Exception => {
println(" [.] " + e.toString)
response = ""
}
} finally {
channel.basicPublish("", props.getReplyTo, replyProps, response.getBytes("UTF-8"))
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
try {
connection.close()
} catch {
case ignore: Exception =>
}
}
}
}
private def fib(n: Int): Int = {
if (n == 0) return 0
if (n == 1) return 1
fib(n - 1) + fib(n - 2)
}
}

View File

@ -0,0 +1,46 @@
import java.util.HashMap
import com.rabbitmq.client._
//remove if not needed
object ReceiveLogHeader {
private val EXCHANGE_NAME = "header_test"
def main(argv: Array[String]) {
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsHeader queueName [headers]...")
System.exit(1)
}
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "headers")
val routingKeyFromUser = "ourTestRoutingKey"
val queueInputName = argv(0)
val headers = new HashMap[String, Object]()
for (i <- 1 until argv.length by 2) {
headers.put(argv(i), argv(i + 1))
println("Binding header " + argv(i) + " and value " + argv(i + 1) +
" to queue " +
queueInputName)
}
val queueName = channel.queueDeclare(queueInputName, true, false, false, null)
.getQueue
channel.queueBind(queueName, EXCHANGE_NAME, routingKeyFromUser, headers)
println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]) {
val message = new String(body, "UTF-8")
println(" [x] Received '" + envelope.getRoutingKey + "':'" + message +
"'")
}
}
channel.basicConsume(queueName, true, consumer)
}
}

View File

@ -0,0 +1,29 @@
import com.rabbitmq.client._
object ReceiveLogs {
private val EXCHANGE_NAME = "logs"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
val queueName = channel.queueDeclare().getQueue
channel.queueBind(queueName, EXCHANGE_NAME, "")
println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]) {
val message = new String(body, "UTF-8")
println(" [x] Received '" + message + "'")
}
}
channel.basicConsume(queueName, true, consumer)
}
}

View File

@ -0,0 +1,35 @@
import com.rabbitmq.client._
object ReceiveLogsDirect {
private val EXCHANGE_NAME = "direct_logs"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "direct")
val queueName = channel.queueDeclare().getQueue
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]")
System.exit(1)
}
for (severity <- argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity)
}
println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]) {
val message = new String(body, "UTF-8")
println(" [x] Received '" + envelope.getRoutingKey + "':'" + message +
"'")
}
}
channel.basicConsume(queueName, true, consumer)
}
}

View File

@ -0,0 +1,35 @@
import com.rabbitmq.client._
object ReceiveLogsTopic {
private val EXCHANGE_NAME = "topic_logs"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "topic")
val queueName = channel.queueDeclare().getQueue
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...")
System.exit(1)
}
for (bindingKey <- argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey)
}
println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]) {
val message = new String(body, "UTF-8")
println(" [x] Received '" + envelope.getRoutingKey + "':'" + message +
"'")
}
}
channel.basicConsume(queueName, true, consumer)
}
}

View File

@ -0,0 +1,27 @@
import com.rabbitmq.client._
object Recv {
private val QUEUE_NAME = "hello"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]) {
var message = new String(body, "UTF-8")
println(" [x] Received '" + message + "'")
}
}
channel.basicConsume(QUEUE_NAME, true, consumer)
}
}

View File

@ -0,0 +1,20 @@
import com.rabbitmq.client.ConnectionFactory
object Send {
private val QUEUE_NAME = "hello"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
val message = "Hello World!"
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"))
println(" [x] Sent '" + message + "'")
channel.close()
connection.close()
}
}

View File

@ -0,0 +1,46 @@
import com.rabbitmq.client._
object Worker {
private val TASK_QUEUE_NAME = "task_queue"
def main(argv: Array[String]) {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)
println(" [*] Waiting for messages. To exit press CTRL+C")
channel.basicQos(1)
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]) {
val message = new String(body, "UTF-8")
println(" [x] Received '" + message + "'")
try {
doWork(message)
} finally {
println(" Done")
channel.basicAck(envelope.getDeliveryTag, false)
}
}
}
channel.basicConsume(TASK_QUEUE_NAME, false, consumer)
}
private def doWork(task: String) {
print(" [x] Processing ")
for (ch <- task.toCharArray() if ch == '.') {
try {
print(".")
Thread.sleep(1000)
} catch {
case _ignored: InterruptedException => Thread.currentThread().interrupt()
}
}
}
}