From b66137315cbe816686041d46486b4da3588c99b4 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 17 Oct 2013 23:43:57 +0400 Subject: [PATCH] Port tutorial 4 to Go --- go/emit_log_direct.go | 78 +++++++++++++++++++++++++++++++++++++++ go/receive_logs_direct.go | 78 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 go/emit_log_direct.go create mode 100644 go/receive_logs_direct.go diff --git a/go/emit_log_direct.go b/go/emit_log_direct.go new file mode 100644 index 0000000..636bce8 --- /dev/null +++ b/go/emit_log_direct.go @@ -0,0 +1,78 @@ +package main + +import ( + "github.com/streadway/amqp" + "log" + "os" + "fmt" + "strings" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + panic(fmt.Sprintf("%s: %s", msg, err)) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + err = ch.ExchangeDeclare( + "logs_direct", // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // noWait + nil, // arguments + ) + failOnError(err, "Failed to declare an exchange") + + body := bodyFrom(os.Args) + err = ch.Publish( + "logs_direct", // exchange + severityFrom(os.Args), // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body), + }) + + failOnError(err, "Failed to publish a message") + log.Printf(" [x] Sent %s", body) + + os.Exit(0) +} + +func bodyFrom(args []string) string { + var s string + if (len(args) < 2) || os.Args[2] == "" { + s = "hello" + } else { + s = strings.Join(args[1:], " ") + + } + + return s +} + +func severityFrom(args []string) string { + var s string + + if (len(args) < 1) || os.Args[1] == "" { + s = "info" + } else { + s = os.Args[1] + + } + + return s +} \ No newline at end of file diff --git a/go/receive_logs_direct.go b/go/receive_logs_direct.go new file mode 100644 index 0000000..ef2c715 --- /dev/null +++ b/go/receive_logs_direct.go @@ -0,0 +1,78 @@ +package main + +import ( + "github.com/streadway/amqp" + "log" + "os" + "fmt" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + panic(fmt.Sprintf("%s: %s", msg, err)) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + err = ch.ExchangeDeclare( + "logs_direct", // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // noWait + nil, // arguments + ) + failOnError(err, "Failed to declare an exchange") + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when usused + false, // exclusive + false, // noWait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + var s string + for _, s = range os.Args { + log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) + err = ch.QueueBind( + q.Name, // queue name + s, // routing key + "logs_direct", // exchange + false, + nil) + failOnError(err, "Failed to bind a queue") + } + + msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + + done := make(chan bool) + var d amqp.Delivery + + go func() { + for d = range msgs { + log.Printf(" [x] %s", d.Body) + done <- true + } + }() + + log.Printf(" [*] Waiting for logs. To exit press CTRL+C") + select { + case <-done: + break + } + log.Printf("Done") + + os.Exit(0) +}