summaryrefslogtreecommitdiff
path: root/tools/consume.c
diff options
context:
space:
mode:
authorDavid Wragg <dpw@lshift.net>2010-02-20 22:58:40 +0000
committerDavid Wragg <dpw@lshift.net>2010-02-20 22:58:40 +0000
commitc90336ef0b6c8507a2f409db7e33dd1844b25517 (patch)
treefe58dc22a21ac75341dd1064fb48ba60a5c2e70c /tools/consume.c
parent55ac202750859482c4319addb8c54368b2369455 (diff)
downloadrabbitmq-c-github-ask-c90336ef0b6c8507a2f409db7e33dd1844b25517.tar.gz
Command line AMQP tools based on rabbitmq-c
Diffstat (limited to 'tools/consume.c')
-rw-r--r--tools/consume.c77
1 files changed, 77 insertions, 0 deletions
diff --git a/tools/consume.c b/tools/consume.c
new file mode 100644
index 0000000..6ca00e0
--- /dev/null
+++ b/tools/consume.c
@@ -0,0 +1,77 @@
+#include "config.h"
+
+#include <stdio.h>
+
+#include <popt.h>
+
+#include "common.h"
+#include "common_consume.h"
+
+static void do_consume(amqp_connection_state_t conn, int no_ack,
+ const char * const *argv)
+{
+ if (!amqp_basic_consume(conn, 1, setup_queue(conn),
+ AMQP_EMPTY_BYTES, 0, no_ack, 0))
+ die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
+
+ for (;;) {
+ amqp_frame_t frame;
+ struct pipeline pl;
+ uint64_t delivery_tag;
+ int res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for header frame");
+
+ if (frame.frame_type != AMQP_FRAME_METHOD
+ || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ continue;
+
+ amqp_basic_deliver_t *deliver
+ = (amqp_basic_deliver_t *)frame.payload.method.decoded;
+ delivery_tag = deliver->delivery_tag;
+
+ pipeline(argv, &pl);
+ copy_body(conn, pl.infd);
+
+ if (finish_pipeline(&pl) && !no_ack)
+ die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
+ "basic.ack");
+
+ amqp_maybe_release_buffers(conn);
+ }
+}
+
+int main(int argc, const char **argv)
+{
+ poptContext opts;
+ int no_ack;
+ amqp_connection_state_t conn;
+ const char * const *cmd_argv;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ INCLUDE_OPTIONS(consume_queue_options),
+ {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
+ "consume in no-ack mode", NULL},
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ opts = process_options(argc, argv, options,
+ "[OPTIONS]... <command> <args>");
+
+ cmd_argv = poptGetArgs(opts);
+ if (!cmd_argv[0]) {
+ fprintf(stderr, "consuming command not specified");
+ goto error;
+ }
+
+ conn = make_connection();
+ do_consume(conn, no_ack, cmd_argv);
+ close_connection(conn);
+ return 0;
+
+error:
+ poptFreeContext(opts);
+ return 1;
+}