diff options
Diffstat (limited to 'tools/consume.c')
-rw-r--r-- | tools/consume.c | 119 |
1 files changed, 111 insertions, 8 deletions
diff --git a/tools/consume.c b/tools/consume.c index 7ede926..40b61d1 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -51,17 +51,105 @@ #include "config.h" #include <stdio.h> +#include <stdlib.h> #include <popt.h> #include "common.h" -#include "common_consume.h" -static void do_consume(amqp_connection_state_t conn, int no_ack, - const char * const *argv) +/* Convert a amqp_bytes_t to an escaped string form for printing. We + use the same escaping conventions as rabbitmqctl. */ +static char *stringify_bytes(amqp_bytes_t bytes) { - if (!amqp_basic_consume(conn, 1, setup_queue(conn), - AMQP_EMPTY_BYTES, 0, no_ack, 0)) + /* We will need up to 4 chars per byte, plus the terminating 0 */ + char *res = malloc(bytes.len * 4 + 1); + uint8_t *data = bytes.bytes; + char *p = res; + size_t i; + + for (i = 0; i < bytes.len; i++) { + if (data[i] >= 32 && data[i] != 127) { + *p++ = data[i]; + } + else { + *p++ = '\\'; + *p++ = '0' + (data[i] >> 6); + *p++ = '0' + (data[i] >> 3 & 0x7); + *p++ = '0' + (data[i] & 0x7); + } + } + + *p = 0; + return res; +} + +static amqp_bytes_t setup_queue(amqp_connection_state_t conn, + char *queue, char *exchange, + char *exchange_type, char *routing_key) +{ + amqp_bytes_t queue_bytes; + amqp_queue_declare_ok_t *res; + + /* if an exchange name wasn't provided, check that we don't + have options that require it. */ + if (!exchange) { + char *opt = NULL; + if (routing_key) + opt = "--routing-key"; + else if (exchange_type) + opt = "--exchange-type"; + + if (opt) { + fprintf(stderr, + "%s option requires an exchange name to be " + "provided with --exchange\n", opt); + exit(1); + } + } + + /* Declare the queue as auto-delete. If the queue already + exists, this won't have any effect. */ + queue_bytes = cstring_bytes(queue); + res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1, + AMQP_EMPTY_TABLE); + if (!res) + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + + if (!queue) { + /* the server should have provided a queue name */ + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", sq); + free(sq); + } + + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + + if (exchange_type) { + /* we should create the exchange */ + if (!amqp_exchange_declare(conn, 1, eb, + amqp_cstring_bytes(exchange_type), + 0, 0, 1, AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); + } + + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); + } + + return queue_bytes; +} + +static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, + int no_ack, const char * const *argv) +{ + if (!amqp_basic_consume(conn, 1, queue, AMQP_EMPTY_BYTES, 0, no_ack, + 0)) die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); for (;;) { @@ -97,10 +185,23 @@ int main(int argc, const char **argv) int no_ack; amqp_connection_state_t conn; const char * const *cmd_argv; - + char *queue = NULL; + char *exchange = NULL; + char *exchange_type = NULL; + char *routing_key = NULL; + amqp_bytes_t queue_bytes; + struct poptOption options[] = { INCLUDE_OPTIONS(connect_options), - INCLUDE_OPTIONS(consume_queue_options), + {"queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue"}, + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "bind the queue to this exchange", "exchange"}, + {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0, + "create auto-delete exchange of this type for binding", + "type"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to bind with", "routing key"}, {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode", NULL}, POPT_AUTOHELP @@ -118,7 +219,9 @@ int main(int argc, const char **argv) } conn = make_connection(); - do_consume(conn, no_ack, cmd_argv); + queue_bytes = setup_queue(conn, queue, exchange, exchange_type, + routing_key); + do_consume(conn, queue_bytes, no_ack, cmd_argv); close_connection(conn); return 0; |