diff options
author | Tony Garnock-Jones <tonygarnockjones@gmail.com> | 2010-02-25 15:13:06 +1300 |
---|---|---|
committer | Tony Garnock-Jones <tonygarnockjones@gmail.com> | 2010-02-25 15:13:06 +1300 |
commit | 83b5975e97bdf9ab2f8dc78f9dafe02ed18b183a (patch) | |
tree | 38377891634f0eb23bbf6163dadc0bc7106deb58 | |
parent | 9cbb7acf50d3893439c69799d599031ac06a1474 (diff) | |
parent | 44d5648dcc34c35b4ba2df15315d0c14b5e026d5 (diff) | |
download | rabbitmq-c-github-ask-83b5975e97bdf9ab2f8dc78f9dafe02ed18b183a.tar.gz |
Merge bug22346 into defaultlibrabbitmq-0.1-amqp_0_8
-rw-r--r-- | .hgignore | 5 | ||||
-rw-r--r-- | Makefile.am | 10 | ||||
-rw-r--r-- | configure.ac | 27 | ||||
-rw-r--r-- | tools/Makefile.am | 12 | ||||
-rw-r--r-- | tools/common.c | 413 | ||||
-rw-r--r-- | tools/common.h | 93 | ||||
-rw-r--r-- | tools/common_consume.c | 160 | ||||
-rw-r--r-- | tools/common_consume.h | 54 | ||||
-rw-r--r-- | tools/consume.c | 128 | ||||
-rw-r--r-- | tools/get.c | 91 | ||||
-rw-r--r-- | tools/publish.c | 140 |
11 files changed, 1130 insertions, 3 deletions
@@ -18,7 +18,7 @@ ^librabbitmq/amqp_framing\.[ch]$ -^(|librabbitmq/|tests/|examples/)Makefile(\.in)?$ +^(|librabbitmq/|tests/|examples/|tools/)Makefile(\.in)?$ ^tests/test_tables$ ^examples/amqp_sendstring$ ^examples/amqp_exchange_declare$ @@ -28,3 +28,6 @@ ^examples/amqp_unbind$ ^examples/amqp_bind$ ^examples/amqp_listenq$ +^tools/amqp-publish$ +^tools/amqp-get$ +^tools/amqp-consume$ diff --git a/Makefile.am b/Makefile.am index 3153ad9..c5ff6e7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,13 @@ -SUBDIRS=librabbitmq tests examples +if TOOLS +TOOLS_SUBDIR=tools +else +TOOLS_SUBDIR= +endif + +SUBDIRS=librabbitmq tests examples $(TOOLS_SUBDIR) squeakyclean: maintainer-clean - rm -f Makefile.in librabbitmq/Makefile.in tests/Makefile.in examples/Makefile.in + rm -f Makefile.in librabbitmq/Makefile.in tests/Makefile.in examples/Makefile.in tools/Makefile.in rm -f aclocal.m4 rm -f config.guess config.h.in* config.sub configure rm -f depcomp install-sh ltmain.sh missing diff --git a/configure.ac b/configure.ac index cb835e8..30b7d05 100644 --- a/configure.ac +++ b/configure.ac @@ -5,6 +5,7 @@ AM_INIT_AUTOMAKE AC_CONFIG_HEADER([config.h]) dnl Program checks +AC_GNU_SOURCE AC_PROG_CC dnl Library checks @@ -63,9 +64,35 @@ AC_SUBST(AMQP_CODEGEN_DIR) AC_SUBST(AMQP_SPEC_JSON_PATH) AC_SUBST(PYTHON) +# Check for libpopt, which we need to build the tools + +AC_ARG_WITH([popt], + [AS_HELP_STRING([--with-popt], [use the popt library. Needed for tools])], + [], + [with_popt=check]) + +LIBPOPT= +AS_IF([test "x$with_popt" != xno], + [AC_CHECK_LIB([popt], [poptGetContext], + [AC_SUBST([LIBPOPT], ["-lpopt"]) + AC_DEFINE([HAVE_LIBPOPT], [1], [Define if you have libpopt]) + ], + [if test "x$with_popt" != xcheck; then + AC_MSG_FAILURE([--with-popt was given, but test for libpopt failed]) + fi + ])]) + +AS_IF([test "x$LIBPOPT" != "x"], + [AC_CHECK_HEADER([popt.h], [], + [AC_MSG_FAILURE([You have libpopt, but could not find the popt.h header])]) + ]) + +AM_CONDITIONAL(TOOLS, test "x$LIBPOPT" != "x") + AC_OUTPUT( Makefile librabbitmq/Makefile tests/Makefile examples/Makefile +tools/Makefile ) diff --git a/tools/Makefile.am b/tools/Makefile.am new file mode 100644 index 0000000..f67d46b --- /dev/null +++ b/tools/Makefile.am @@ -0,0 +1,12 @@ +bin_PROGRAMS = amqp-publish amqp-get amqp-consume + +AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la + +LDADD=$(LIBPOPT) + +noinst_HEADERS = common.h + +amqp_publish_SOURCES = publish.c common.c +amqp_get_SOURCES = get.c common.c common_consume.c +amqp_consume_SOURCES = consume.c common.c common_consume.c diff --git a/tools/common.c b/tools/common.c new file mode 100644 index 0000000..0772738 --- /dev/null +++ b/tools/common.c @@ -0,0 +1,413 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <string.h> + +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <spawn.h> +#include <sys/wait.h> + +#include <popt.h> + +#include "common.h" + +extern char **environ; + +void die(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +void die_errno(int err, const char *fmt, ...) +{ + if (err == 0) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(err)); + exit(1); +} + +char *amqp_server_exception_string(amqp_rpc_reply_t r) +{ + int res; + char *s; + + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m + = (amqp_connection_close_t *)r.reply.decoded; + res = asprintf(&s, "server connection error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m + = (amqp_channel_close_t *)r.reply.decoded; + res = asprintf(&s, "server channel error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + default: + res = asprintf(&s, "unknown server error, method id 0x%08X", + r.reply.id); + break; + } + + return res >= 0 ? s : NULL; +} + +char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +{ + const char *s; + + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + s = "normal response"; + break; + + case AMQP_RESPONSE_NONE: + s = "missing RPC reply type"; + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + s = strerror(r.library_errno); + else + s = "end of stream"; + + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); + + default: + abort(); + } + + return strdup(s); +} + +void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) +{ + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); + exit(1); +} + +void set_cloexec(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFD); + if (flags == -1 + || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) + die_errno(errno, "set_cloexec"); +} + +static char *amqp_server = "localhost"; +static char *amqp_vhost = "/"; +static char *amqp_username = "guest"; +static char *amqp_password = "guest"; + +const char *connect_options_title = "Connection options"; +struct poptOption connect_options[] = { + {"server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "server"}, + {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost"}, + {"username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username"}, + {"password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password"}, + { NULL, 0, 0, NULL, 0 } +}; + +amqp_connection_state_t make_connection(void) +{ + int s; + amqp_connection_state_t conn; + char *host = amqp_server; + int port = 0; + + /* parse the server string into a hostname and a port */ + char *colon = strchr(amqp_server, ':'); + if (colon) { + char *port_end; + size_t host_len = colon - amqp_server; + host = malloc(host_len + 1); + memcpy(host, amqp_server, host_len); + host[host_len] = 0; + + port = strtol(colon+1, &port_end, 10); + if (port < 0 + || port > 65535 + || port_end == colon+1 + || *port_end != 0) + die("bad server port number in %s", amqp_server); + } + + s = amqp_open_socket(host, port ? port : 5672); + if (s < 0) { + if (s == -ENOENT) + die("unknown host %s", host); + else + die_errno(-s, "opening socket to %s", amqp_server); + } + + set_cloexec(s); + + conn = amqp_new_connection(); + amqp_set_sockfd(conn, s); + + die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, + amqp_username, amqp_password), + "logging in to AMQP server"); + + if (!amqp_channel_open(conn, 1)) + die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + + return conn; +} + +void close_connection(amqp_connection_state_t conn) +{ + int s = amqp_get_sockfd(conn); + + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "closing channel"); + die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "closing connection"); + amqp_destroy_connection(conn); + + if (close(s) < 0) + die_errno(errno, "closing socket"); +} + +amqp_bytes_t read_all(int fd) +{ + size_t space = 4096; + amqp_bytes_t bytes; + + bytes.bytes = malloc(space); + bytes.len = 0; + + for (;;) { + ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len); + if (res == 0) + break; + + if (res < 0) { + if (errno == EINTR) + continue; + + die_errno(errno, "reading"); + } + + bytes.len += res; + if (bytes.len == space) { + space *= 2; + bytes.bytes = realloc(bytes.bytes, space); + } + } + + return bytes; +} + +void write_all(int fd, amqp_bytes_t data) +{ + while (data.len > 0) { + ssize_t res = write(fd, data.bytes, data.len); + if (res < 0) + die_errno(errno, "write"); + + data.len -= res; + data.bytes += res; + } +} + +void copy_body(amqp_connection_state_t conn, int fd) +{ + size_t body_remaining; + amqp_frame_t frame; + + int res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for header frame"); + if (frame.frame_type != AMQP_FRAME_HEADER) + die("expected header, got frame type 0x%X", + frame.frame_type); + + body_remaining = frame.payload.properties.body_size; + while (body_remaining) { + res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for body frame"); + if (frame.frame_type != AMQP_FRAME_BODY) + die("expected body, got frame type 0x%X", + frame.frame_type); + + write_all(fd, frame.payload.body_fragment); + body_remaining -= frame.payload.body_fragment.len; + } +} + +void pipeline(const char * const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} + +poptContext process_options(int argc, const char **argv, + struct poptOption *options, + const char *help) +{ + int c; + poptContext opts = poptGetContext(NULL, argc, argv, options, 0); + poptSetOtherOptionHelp(opts, help); + + while ((c = poptGetNextOpt(opts)) >= 0) { + // no options require explicit handling + } + + if (c < -1) { + fprintf(stderr, "%s: %s\n", + poptBadOption(opts, POPT_BADOPTION_NOALIAS), + poptStrerror(c)); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + return opts; +} + +void process_all_options(int argc, const char **argv, + struct poptOption *options) +{ + poptContext opts = process_options(argc, argv, options, + "[OPTIONS]..."); + const char *opt = poptPeekArg(opts); + + if (opt) { + fprintf(stderr, "unexpected operand: %s\n", opt); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + poptFreeContext(opts); +} + +amqp_bytes_t cstring_bytes(const char *str) +{ + return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES; +} diff --git a/tools/common.h b/tools/common.h new file mode 100644 index 0000000..09a9242 --- /dev/null +++ b/tools/common.h @@ -0,0 +1,93 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <stdint.h> + +#include <amqp.h> +#include <amqp_framing.h> + +extern char *amqp_server_exception_string(amqp_rpc_reply_t r); +extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); + +extern void die(const char *fmt, ...) + __attribute__ ((format (printf, 1, 2))); +extern void die_errno(int err, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); +extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); + +extern const char *connect_options_title; +extern struct poptOption connect_options[]; +extern amqp_connection_state_t make_connection(void); +extern void close_connection(amqp_connection_state_t conn); + +extern amqp_bytes_t read_all(int fd); +extern void write_all(int fd, amqp_bytes_t data); + +extern void copy_body(amqp_connection_state_t conn, int fd); + +struct pipeline { + int pid; + int infd; +}; + +extern void pipeline(const char * const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); + +#define INCLUDE_OPTIONS(options) \ + {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} + +extern poptContext process_options(int argc, const char **argv, + struct poptOption *options, + const char *help); +extern void process_all_options(int argc, const char **argv, + struct poptOption *options); + +extern amqp_bytes_t cstring_bytes(const char *str); diff --git a/tools/common_consume.c b/tools/common_consume.c new file mode 100644 index 0000000..318fa2c --- /dev/null +++ b/tools/common_consume.c @@ -0,0 +1,160 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> + +#include <popt.h> + +#include "common.h" + +static char *queue; +static char *exchange; +static char *exchange_type; +static char *routing_key; + +const char *consume_queue_options_title = "Source queue options"; +struct poptOption consume_queue_options[] = { + {"queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue"}, + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "bind the queue to this exchange", "exchange"}, + {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0, + "create auto-delete exchange of this type for binding", "type"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to bind with", "routing key"}, + { NULL, 0, 0, NULL, 0 } +}; + +/* Convert a amqp_bytes_t to an escaped string form for printing. We + use the same escaping conventions as rabbitmqctl. */ +static char *stringify_bytes(amqp_bytes_t bytes) +{ + /* W will need up to 4 chars per byte, plus the terminating 0 */ + char *res = malloc(bytes.len * 4 + 1); + uint8_t *data = bytes.bytes; + char *p = res; + size_t i; + + for (i = 0; i < bytes.len; i++) { + if (data[i] >= 32 && data[i] != 127) { + *p++ = data[i]; + } + else { + *p++ = '\\'; + *p++ = '0' + (data[i] >> 6); + *p++ = '0' + (data[i] >> 3 & 0x7); + *p++ = '0' + (data[i] & 0x7); + } + } + + *p = 0; + return res; +} + +amqp_bytes_t setup_queue(amqp_connection_state_t conn) +{ + /* if an exchange name wasn't provided, check that we don't + have options that require it. */ + if (!exchange) { + char *opt = NULL; + if (routing_key) + opt = "--routing-key"; + else if (exchange_type) + opt = "--exchange-type"; + + if (opt) { + fprintf(stderr, + "%s option requires an exchange name to be " + "provided with --exchange\n", opt); + exit(1); + } + } + + /* Declare the queue. If the queue already exists, this won't have + any effect. */ + amqp_bytes_t queue_bytes = cstring_bytes(queue); + amqp_queue_declare_ok_t *res + = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1, + AMQP_EMPTY_TABLE); + if (!res) + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + + if (!queue) { + // the server should have provided a queue name + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", sq); + free(sq); + } + + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + + if (exchange_type) { + // we should create the exchange + if (!amqp_exchange_declare(conn, 1, eb, + amqp_cstring_bytes(exchange_type), + 0, 0, 1, AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); + } + + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); + } + + return queue_bytes; +} diff --git a/tools/common_consume.h b/tools/common_consume.h new file mode 100644 index 0000000..703f4bd --- /dev/null +++ b/tools/common_consume.h @@ -0,0 +1,54 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +extern const char *consume_queue_options_title; +extern struct poptOption consume_queue_options[]; +extern amqp_bytes_t setup_queue(amqp_connection_state_t conn); + diff --git a/tools/consume.c b/tools/consume.c new file mode 100644 index 0000000..7ede926 --- /dev/null +++ b/tools/consume.c @@ -0,0 +1,128 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include "config.h" + +#include <stdio.h> + +#include <popt.h> + +#include "common.h" +#include "common_consume.h" + +static void do_consume(amqp_connection_state_t conn, int no_ack, + const char * const *argv) +{ + if (!amqp_basic_consume(conn, 1, setup_queue(conn), + AMQP_EMPTY_BYTES, 0, no_ack, 0)) + die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); + + for (;;) { + amqp_frame_t frame; + struct pipeline pl; + uint64_t delivery_tag; + int res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for header frame"); + + if (frame.frame_type != AMQP_FRAME_METHOD + || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + + amqp_basic_deliver_t *deliver + = (amqp_basic_deliver_t *)frame.payload.method.decoded; + delivery_tag = deliver->delivery_tag; + + pipeline(argv, &pl); + copy_body(conn, pl.infd); + + if (finish_pipeline(&pl) && !no_ack) + die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0), + "basic.ack"); + + amqp_maybe_release_buffers(conn); + } +} + +int main(int argc, const char **argv) +{ + poptContext opts; + int no_ack; + amqp_connection_state_t conn; + const char * const *cmd_argv; + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + INCLUDE_OPTIONS(consume_queue_options), + {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, + "consume in no-ack mode", NULL}, + POPT_AUTOHELP + { NULL, 0, 0, NULL, 0 } + }; + + opts = process_options(argc, argv, options, + "[OPTIONS]... <command> <args>"); + + cmd_argv = poptGetArgs(opts); + if (!cmd_argv || !cmd_argv[0]) { + fprintf(stderr, "consuming command not specified\n"); + poptPrintUsage(opts, stderr, 0); + goto error; + } + + conn = make_connection(); + do_consume(conn, no_ack, cmd_argv); + close_connection(conn); + return 0; + +error: + poptFreeContext(opts); + return 1; +} diff --git a/tools/get.c b/tools/get.c new file mode 100644 index 0000000..b433e2b --- /dev/null +++ b/tools/get.c @@ -0,0 +1,91 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include "config.h" + +#include <stdio.h> + +#include <popt.h> + +#include "common.h" +#include "common_consume.h" + +static int do_get(amqp_connection_state_t conn) +{ + amqp_rpc_reply_t r + = amqp_basic_get(conn, 1, setup_queue(conn), 1); + die_rpc(r, "basic.get"); + + if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) + return 0; + + copy_body(conn, 1); + return 1; +} + +int main(int argc, const char **argv) +{ + amqp_connection_state_t conn; + int got_something; + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + INCLUDE_OPTIONS(consume_queue_options), + POPT_AUTOHELP + { NULL, 0, 0, NULL, 0 } + }; + + process_all_options(argc, argv, options); + + conn = make_connection(); + got_something = do_get(conn); + close_connection(conn); + return got_something ? 0 : 2; +} diff --git a/tools/publish.c b/tools/publish.c new file mode 100644 index 0000000..a5b86b2 --- /dev/null +++ b/tools/publish.c @@ -0,0 +1,140 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <popt.h> + +#include "common.h" + +static void do_publish(amqp_connection_state_t conn, + char *exchange, char *routing_key, + amqp_basic_properties_t *props, amqp_bytes_t body) +{ + int res = amqp_basic_publish(conn, 1, + cstring_bytes(exchange), + cstring_bytes(routing_key), + 0, 0, props, body); + if (res != 0) + die_errno(-res, "basic.publish"); +} + +int main(int argc, const char **argv) +{ + amqp_connection_state_t conn; + char *exchange = NULL; + char *routing_key = NULL; + char *content_type = NULL; + char *content_encoding = NULL; + char *body = NULL; + amqp_basic_properties_t props; + amqp_bytes_t body_bytes; + int delivery = 1; /* non-persistent by default */ + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "the exchange to publish to", "exchange"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to publish with", "routing key"}, + {"persistent", 'p', POPT_ARG_VAL, &delivery, 2, + "use the persistent delivery mode", NULL}, + {"content-type", 'C', POPT_ARG_STRING, &content_type, 0, + "the content-type for the message", "content type"}, + {"content-encoding", 'E', POPT_ARG_STRING, + &content_encoding, 0, + "the content-encoding for the message", "content encoding"}, + {"body", 'b', POPT_ARG_STRING, &body, 0, + "specify the message body", "body"}, + POPT_AUTOHELP + { NULL, 0, 0, NULL, 0 } + }; + + process_all_options(argc, argv, options); + + if (!exchange && !routing_key) { + fprintf(stderr, + "neither exchange nor routing key specified\n"); + return 1; + } + + memset(&props, 0, sizeof props); + props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; + props.delivery_mode = 2; // persistent delivery mode + + if (content_type) { + props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes(content_type); + } + + if (content_encoding) { + props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; + props.content_encoding = amqp_cstring_bytes(content_encoding); + } + + conn = make_connection(); + + if (body) + body_bytes = amqp_cstring_bytes(body); + else + body_bytes = read_all(0); + + do_publish(conn, exchange, routing_key, &props, body_bytes); + + if (!body) + free(body_bytes.bytes); + + close_connection(conn); + return 0; +} |