diff options
author | David Wragg <david@rabbitmq.com> | 2010-09-03 10:18:55 +0100 |
---|---|---|
committer | David Wragg <david@rabbitmq.com> | 2010-09-03 10:18:55 +0100 |
commit | b85fa81a4076536048ea374094dccbd580a5fe6e (patch) | |
tree | 7985515d5dd7925706eaa82623449b6eaa053b7a | |
parent | b339e621a8a85fbd749fdb499161320abed5ebb3 (diff) | |
parent | 1b1340ad50e18edc194f26a7156cab44b8a1bba0 (diff) | |
download | rabbitmq-c-github-ask-b85fa81a4076536048ea374094dccbd580a5fe6e.tar.gz |
Merge amqp_0_9_1 into bug22951 to remove headbug22951
43 files changed, 1458 insertions, 421 deletions
@@ -3,22 +3,17 @@ ## Introduction This is a C-language AMQP client library for use with AMQP servers -speaking protocol versions 0-8 and 0-9-1. +speaking protocol versions 0-9-1. - <http://www.rabbitmq.com/> - <http://www.amqp.org/> - <http://hg.rabbitmq.com/rabbitmq-c> -*NB*: This library's source code supports *either* 0-8 *or* 0-9-1, not -both simultaneously. Please check carefully that you have the variant -you require. - Announcements regarding the library are periodically made on the -RabbitMQ mailing list and on LShift's blog. +RabbitMQ mailing list and on the RabbitMQ blog. - <http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss> - - <http://www.lshift.net/blog/> - - <http://www.lshift.net/blog/category/lshift-sw/rabbitmq/> + - <http://www.rabbitmq.com/blog/> ## Retrieving the code @@ -91,25 +86,3 @@ access an AMQP connection or any of its channels from more than one thread, it is entirely responsible for designing and implementing an appropriate locking scheme. It will generally be much simpler to have a connection exclusive to each thread that needs AMQP service. - -### Switching the codebase to the 0-9-1 protocol version - -In current releases, the default (trunk) branch of the mercurial -repository hosting the `rabbitmq-c` code is set up for AMQP 0-8 -support, with AMQP 0-9-1 support living on a separate mercurial -branch. - -NOTE: Protocol versions 0-8 and 0-9-1 are incompatible on the -wire. You cannot use a client library that speaks one protocol version -to communication with a server that speaks the other. - -To switch your checked-out copy of the source code to 0-9-1 -support, - - (cd rabbitmq-codegen; hg up amqp_0_9_1) - (cd rabbitmq-c; hg up amqp_0_9_1) - -before building the code. If you switch branches after having compiled -the code, make sure to rerun `autoreconf`, `configure`, `make clean` -and `make` after switching branches. - diff --git a/README.windows b/README.windows new file mode 100644 index 0000000..33c8f06 --- /dev/null +++ b/README.windows @@ -0,0 +1,85 @@ +# rabbitmq-c and Windows + +rabbitmq-c can now be built on Windows using the MinGW/MSYS ports of +the GNU toolchain and miscellaneous utilities. This includes the +example programs and tools. + +The results are native Windows DLLs and EXEs, and can be used without +having MinGW installed. But the librabbitmq header files currently +use GCC extensions, and for this reason it is still not possible to +use Microsoft's C/C++ to build applications against the librabbitmq +DLL. Hopefully this will get fixed before long. + + +# Building rabbitmq-c + +rabbitmq-c is built on Windows using MinGW and MSYS. In brief, MinGW +is a native port of the GNU toolchain to Windows; MSYS is a set of +ports of common GNU utilities to run under Windows, so that typical +autotools-based builds will work there. MinGW/MSYS can be used to +build native Windows applications and DLLs, which do not depend on +MinGW/MSYS to run. + +So to build rabbitmq-c on Windows, you need to download and install +the relevant parts of MinGW/MSYS. This can be fairly time consuming - +there are dozens of files to be downloaded and unpacked. To make it +easier, we provide a bash script that automates this process, in +rabbitmq-c/etc/install-mingw.sh. You can run this under cygwin, or +under Linux and copy the results over or put them on a shared drive. +Some MinGW packages are .tar.lzma files, so it requires a system with +the xz compression utility and a tar that supports the -J option. + +Run install-mingw.sh specifying the destination directory, e.g. + + $ etc/install-mingw.sh mingw + +Python is needed for the rabbitmq-c build, so you will also need to +install python under Windows. The Windows installer from python.org +will do fine. + +You will need to copy the source code for rabbitmq-c and +rabbitmq-codegen somewhere under your mingw directory. + +Open a cmd window, and ensure that both the MinGW bin directory and +the python install directory are in the path, e.g. + + C:\>set PATH=%PATH%;C:\mingw\bin;C:\Python26 + +Then start bash, and run the following mount command (substituting the +Windows path of your MinGW install): + + C:\>bash + bash-3.1$ mount 'C:\mingw' /mingw + +Then go to the rabbitmq-c directory. If you got the rabbitmq-c +directory from Mercurial (which is the only way to get it at the +moment), you will need to run autoreconf to produce the configuration +scripts: + + bash-3.1$ autoreconf -i + +This will produce a few lines of informational output while it runs, +but as long as it doesn't mention any errors, you are ok. + +Finally, configure and make: + + bash-3.1$ ./configure && make + [...] + + +# Running the tools without mingw + +You can run the resulting tools EXEs without the rest of MinGW. To do +this, copy the following files into a directory: + +- rabbitmq-c/tools/.libs/*.exe + +- rabbitmq-c/librabbitmq/.libs/librabbitmq-0.dll + +- /bin/libpopt-0.dll + +- /bin/libiconv-2.dll + +- /bin/libintl-8.dll + + diff --git a/configure.ac b/configure.ac index 593b309..be7e695 100644 --- a/configure.ac +++ b/configure.ac @@ -9,6 +9,7 @@ AC_GNU_SOURCE AC_PROG_CC dnl Library checks +AC_LIBTOOL_WIN32_DLL AM_PROG_LIBTOOL dnl Header-file checks @@ -21,6 +22,26 @@ if test "x$GCC" = "xyes"; then fi fi +dnl Detect the kind of host we're building for +AC_CANONICAL_HOST +windows=no +case "${host}" in +*-*-mingw*) + windows=yes + ;; +esac +AM_CONDITIONAL(WINDOWS, test "x$windows" = xyes) +AS_IF([test "x$windows" = xyes], + [AC_DEFINE([WINDOWS], [1], [Define to 1 if on Windows.])] +) + +dnl Decide which API abstraction layer to use +PLATFORM_DIR=unix +if test "x$windows" = xyes ; then + PLATFORM_DIR=windows +fi +AC_SUBST(PLATFORM_DIR) + dnl Enable -m64 if we were asked to do so AC_ARG_ENABLE(64-bit, [ --enable-64-bit produce 64-bit library], @@ -47,7 +68,8 @@ checkPython() { return fi PYTHON=$1 - if $PYTHON -c 'import simplejson' 2>/dev/null + if $PYTHON -c 'import json' 2>/dev/null \ + || $PYTHON -c 'import simplejson' 2>/dev/null then found_python=yes AC_MSG_RESULT($PYTHON) @@ -65,8 +87,12 @@ AC_SUBST(AMQP_CODEGEN_DIR) AC_SUBST(AMQP_SPEC_JSON_PATH) AC_SUBST(PYTHON) -# Check for libpopt, which we need to build the tools +dnl Decide which extra win32 libs we need +EXTRA_LIBS= +AS_IF([test "x$windows" = xyes], [EXTRA_LIBS="-lws2_32 $EXTRA_LIBS"]) +AC_SUBST(EXTRA_LIBS) +dnl Check for libpopt, which we need to build the tools AC_ARG_WITH([popt], [AS_HELP_STRING([--with-popt], [use the popt library. Needed for tools.])], [], diff --git a/etc/install-mingw.sh b/etc/install-mingw.sh new file mode 100755 index 0000000..af70576 --- /dev/null +++ b/etc/install-mingw.sh @@ -0,0 +1,78 @@ +#!/bin/bash + +if [ $# -ne 1 ] ; then + echo "usage: install-mingw.sh <destination directory>" 1>&2 + exit 1 +fi + +unpack_dir=$1 + +if [ -e "$unpack_dir" ] ; then + echo "Destination directory already exists; please delete it if you are sure" 1>&2 + exit 1 +fi + +set -e + +download_dir=/tmp/install-mingw.$$ +mkdir -p $download_dir $unpack_dir + +while read f ; do + wget -P $download_dir -N http://switch.dl.sourceforge.net/project/mingw/$f +done <<EOF +MinGW/mpc/mpc-0.8.1-1/libmpc-0.8.1-1-mingw32-dll-2.tar.lzma +MinGW/BaseSystem/GCC/Version4/gcc-4.5.0-1/gcc-core-4.5.0-1-mingw32-bin.tar.lzma +MinGW/BaseSystem/GCC/Version4/gcc-4.5.0-1/libgcc-4.5.0-1-mingw32-dll-1.tar.lzma +MSYS/BaseSystem/msys-core/msys-1.0.14-1/msysCORE-1.0.14-1-msys-1.0.14-bin.tar.lzma +MinGW/BaseSystem/GNU-Binutils/binutils-2.20.1/binutils-2.20.1-2-mingw32-bin.tar.gz +MinGW/BaseSystem/RuntimeLibrary/MinGW-RT/mingwrt-3.18/mingwrt-3.18-mingw32-dll.tar.gz +MinGW/BaseSystem/RuntimeLibrary/MinGW-RT/mingwrt-3.18/mingwrt-3.18-mingw32-dev.tar.gz +MinGW/pthreads-w32/pthreads-w32-2.8.0-3/libpthread-2.8.0-3-mingw32-dll-2.tar.lzma +MinGW/mpfr/mpfr-2.4.1-1/libmpfr-2.4.1-1-mingw32-dll-1.tar.lzma +MinGW/gmp/gmp-5.0.1-1/libgmpxx-5.0.1-1-mingw32-dll-4.tar.lzma +MinGW/gmp/gmp-5.0.1-1/libgmp-5.0.1-1-mingw32-dll-10.tar.lzma +MinGW/BaseSystem/RuntimeLibrary/Win32-API/w32api-3.14/w32api-3.14-mingw32-dev.tar.gz +MSYS/make/make-3.81-2/make-3.81-2-msys-1.0.11-bin.tar.lzma +MSYS/BaseSystem/bash/bash-3.1.17-2/bash-3.1.17-2-msys-1.0.11-bin.tar.lzma +MSYS/BaseSystem/coreutils/coreutils-5.97-2/coreutils-5.97-2-msys-1.0.11-bin.tar.lzma +MinGW/popt/popt-1.15-1/libpopt-1.15-1-mingw32-dll-0.tar.lzma +MinGW/popt/popt-1.15-1/libpopt-1.15-1-mingw32-dev.tar.lzma +MSYS/BaseSystem/diffutils/diffutils-2.8.7.20071206cvs-2/diffutils-2.8.7.20071206cvs-2-msys-1.0.11-bin.tar.lzma +MSYS/BaseSystem/gawk/gawk-3.1.7-1/gawk-3.1.7-1-msys-1.0.11-bin.tar.lzma +MSYS/BaseSystem/grep/grep-2.5.4-1/grep-2.5.4-1-msys-1.0.11-bin.tar.lzma +MSYS/BaseSystem/sed/sed-4.2.1-1/sed-4.2.1-1-msys-1.0.11-bin.tar.lzma +MSYS/libtool/libtool-2.2.7a-2/libtool-2.2.7a-2-msys-1.0.13-bin.tar.lzma +MinGW/gettext/gettext-0.17-1/libintl-0.17-1-mingw32-dll-8.tar.lzma +MinGW/gettext/gettext-0.17-1/gettext-0.17-1-mingw32-dev.tar.lzma +MinGW/libiconv/libiconv-1.13.1-1/libiconv-1.13.1-1-mingw32-dll-2.tar.lzma +MinGW/libiconv/libiconv-1.13.1-1/libiconv-1.13.1-1-mingw32-dev.tar.lzma +MinGW/libiconv/libiconv-1.13.1-1/libcharset-1.13.1-1-mingw32-dll-1.tar.lzma +MSYS/autoconf/autoconf-2.65-1/autoconf-2.65-1-msys-1.0.13-bin.tar.lzma +MSYS/automake/automake-1.11.1-1/automake-1.11.1-1-msys-1.0.13-bin.tar.lzma +MSYS/m4/m4-1.4.14-1/m4-1.4.14-1-msys-1.0.13-bin.tar.lzma +MSYS/BaseSystem/tar/tar-1.23-1/tar-1.23-1-msys-1.0.13-bin.tar.lzma +MSYS/BaseSystem/regex/regex-1.20090805-2/libregex-1.20090805-2-msys-1.0.13-dll-1.tar.lzma +MSYS/BaseSystem/libiconv/libiconv-1.13.1-2/libiconv-1.13.1-2-msys-1.0.13-dll-2.tar.lzma +MSYS/BaseSystem/gettext/gettext-0.17-2/libintl-0.17-2-msys-dll-8.tar.lzma +MSYS/perl/perl-5.6.1_2-2/perl-5.6.1_2-2-msys-1.0.13-bin.tar.lzma +MSYS/crypt/crypt-1.1_1-3/libcrypt-1.1_1-3-msys-1.0.13-dll-0.tar.lzma +EOF + +for f in $download_dir/* ; do + case $f in + *.tar.gz) + tar -C $unpack_dir -xzf $f + ;; + + *.tar.lzma) + tar -C $unpack_dir -xJf $f + ;; + + *) + echo "Don't know how to unpack $f" 1>&2 + exit 1 + ;; + esac +done + +rm -rf $download_dir diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index 697df2a..1f183a5 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -99,7 +99,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index a8ed9e3..b1754f5 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -51,7 +51,6 @@ #include <stdlib.h> #include <stdio.h> #include <string.h> -#include <errno.h> #include <stdint.h> #include <amqp.h> @@ -84,8 +83,8 @@ static void run(amqp_connection_state_t conn) if (now > next_summary_time) { int countOverInterval = received - previous_received; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); - printf("%lld ms: Received %d - %d since last report (%d Hz)\n", - (now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + printf("%d ms: Received %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); previous_received = received; previous_report_time = now; @@ -94,7 +93,8 @@ static void run(amqp_connection_state_t conn) amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) return; + if (result < 0) + return; if (frame.frame_type != AMQP_FRAME_METHOD) continue; @@ -103,7 +103,9 @@ static void run(amqp_connection_state_t conn) continue; result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) return; + if (result < 0) + return; + if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); abort(); @@ -114,7 +116,8 @@ static void run(amqp_connection_state_t conn) while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) return; + if (result < 0) + return; if (frame.frame_type != AMQP_FRAME_BODY) { fprintf(stderr, "Expected body!"); @@ -165,7 +168,8 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { - die_on_error(-ENOMEM, "Copying queue name"); + fprintf(stderr, "Out of memory while copying queue name"); + return 1; } } @@ -180,8 +184,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index 14bc163..e77ac52 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -89,12 +89,11 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), - 0, 0, 0, AMQP_EMPTY_TABLE); + 0, 0, AMQP_EMPTY_TABLE); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index 448ced1..f208652 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -51,7 +51,6 @@ #include <stdlib.h> #include <stdio.h> #include <string.h> -#include <errno.h> #include <stdint.h> #include <amqp.h> @@ -101,7 +100,8 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { - die_on_error(-ENOMEM, "Copying queue name"); + fprintf(stderr, "Out of memory while copying queue name"); + return 1; } } @@ -125,7 +125,7 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result <= 0) + if (result < 0) break; printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); @@ -143,7 +143,7 @@ int main(int argc, char const * const *argv) { (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_HEADER) { @@ -162,7 +162,7 @@ int main(int argc, char const * const *argv) { while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_BODY) { @@ -187,8 +187,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index 0057826..98c389f 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -51,7 +51,6 @@ #include <stdlib.h> #include <stdio.h> #include <string.h> -#include <errno.h> #include <stdint.h> #include <amqp.h> @@ -107,7 +106,7 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result <= 0) + if (result < 0) break; printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); @@ -125,7 +124,7 @@ int main(int argc, char const * const *argv) { (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_HEADER) { @@ -144,7 +143,7 @@ int main(int argc, char const * const *argv) { while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result <= 0) + if (result < 0) break; if (frame.frame_type != AMQP_FRAME_BODY) { @@ -171,8 +170,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index ac6eebc..b83e030 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -95,8 +95,8 @@ static void send_batch(amqp_connection_state_t conn, if (now > next_summary_time) { int countOverInterval = sent - previous_sent; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); - printf("%lld ms: Sent %d - %d since last report (%d Hz)\n", - (now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + printf("%d ms: Sent %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); previous_sent = sent; previous_report_time = now; @@ -111,10 +111,10 @@ static void send_batch(amqp_connection_state_t conn, { long long stop_time = now_microseconds(); - long long total_delta = stop_time - start_time; + int total_delta = stop_time - start_time; printf("PRODUCER - Message count: %d\n", message_count); - printf("Total time, milliseconds: %lld\n", total_delta / 1000); + printf("Total time, milliseconds: %d\n", total_delta / 1000); printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0))); } } @@ -151,7 +151,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index 6e8e0b6..ccd3866 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -108,7 +108,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 27df916..4b92e12 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -99,7 +99,6 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - amqp_destroy_connection(conn); - die_on_error(close(sockfd), "Closing socket"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; } diff --git a/examples/example_utils.c b/examples/example_utils.c index 628572c..48f21f9 100644 --- a/examples/example_utils.c +++ b/examples/example_utils.c @@ -61,7 +61,9 @@ void die_on_error(int x, char const *context) { if (x < 0) { - fprintf(stderr, "%s: %s\n", context, strerror(-x)); + char *errstr = amqp_error_string(-x); + fprintf(stderr, "%s: %s\n", context, errstr); + free(errstr); exit(1); } } @@ -76,8 +78,7 @@ void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: - fprintf(stderr, "%s: %s\n", context, - x.library_errno ? strerror(x.library_errno) : "(end-of-stream)"); + fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am index b4c8843..82b9f30 100644 --- a/librabbitmq/Makefile.am +++ b/librabbitmq/Makefile.am @@ -1,12 +1,18 @@ lib_LTLIBRARIES = librabbitmq.la -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c +AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR) +librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c $(PLATFORM_DIR)/socket.c +librabbitmq_la_LDFLAGS = -no-undefined +librabbitmq_la_LIBADD = $(EXTRA_LIBS) nodist_librabbitmq_la_SOURCES = amqp_framing.c include_HEADERS = amqp_framing.h amqp.h -noinst_HEADERS = amqp_private.h +noinst_HEADERS = amqp_private.h $(PLATFORM_DIR)/socket.h BUILT_SOURCES = amqp_framing.h amqp_framing.c CLEANFILES = amqp_framing.h amqp_framing.c -EXTRA_DIST = codegen.py +EXTRA_DIST = \ + codegen.py \ + unix/socket.c unix/socket.h \ + windows/socket.c windows/socket.h CODEGEN_PY=$(srcdir)/codegen.py diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index ad9b3e4..40c8292 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -263,7 +263,7 @@ typedef enum amqp_response_type_enum_ { typedef struct amqp_rpc_reply_t_ { amqp_response_type_enum reply_type; amqp_method_t reply; - int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ + int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ } amqp_rpc_reply_t; typedef enum amqp_sasl_method_enum_ { @@ -308,8 +308,8 @@ extern int amqp_tune_connection(amqp_connection_state_t state, int channel_max, int frame_max, int heartbeat); -int amqp_get_channel_max(amqp_connection_state_t state); -extern void amqp_destroy_connection(amqp_connection_state_t state); +extern int amqp_get_channel_max(amqp_connection_state_t state); +extern int amqp_destroy_connection(amqp_connection_state_t state); extern int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, @@ -412,7 +412,6 @@ extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, - amqp_boolean_t auto_delete, amqp_table_t arguments); extern struct amqp_queue_declare_ok_t_ *amqp_queue_declare(amqp_connection_state_t state, @@ -441,7 +440,7 @@ extern struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(amqp_connection_state_t amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, - amqp_bytes_t binding_key, + amqp_bytes_t routing_key, amqp_table_t arguments); extern struct amqp_basic_consume_ok_t_ *amqp_basic_consume(amqp_connection_state_t state, @@ -501,6 +500,14 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); */ extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state); +/* + * Get the error string for the given error code. + * + * The returned string resides on the heap; the caller is responsible + * for freeing it. + */ +extern char *amqp_error_string(int err); + #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 592ab58..b2793ff 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -52,7 +52,6 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include "amqp.h" #include "amqp_framing.h" @@ -60,6 +59,40 @@ #include <assert.h> +static const char *client_error_strings[ERROR_MAX] = { + "could not allocate memory", /* ERROR_NO_MEMORY */ + "received bad AMQP data", /* ERROR_BAD_AQMP_DATA */ + "unknown AMQP class id", /* ERROR_UNKOWN_CLASS */ + "unknown AMQP method id", /* ERROR_UNKOWN_METHOD */ + "unknown host", /* ERROR_GETHOSTBYNAME_FAILED */ + "incompatible AMQP version", /* ERROR_INCOMPATIBLE_AMQP_VERSION */ + "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */ +}; + +char *amqp_error_string(int err) +{ + const char *str; + int category = (err & ERROR_CATEGORY_MASK); + err = (err & ~ERROR_CATEGORY_MASK); + + switch (category) { + case ERROR_CATEGORY_CLIENT: + if (err < 1 || err > ERROR_MAX) + str = "(undefined librabbitmq error)"; + else + str = client_error_strings[err - 1]; + break; + + case ERROR_CATEGORY_OS: + return amqp_os_error_string(err); + + default: + str = "(undefined error category)"; + } + + return strdup(str); +} + #define RPC_REPLY(replytype) \ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ ? (replytype *) state->most_recent_api_result.reply.decoded \ @@ -163,13 +196,12 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, - amqp_boolean_t auto_delete, amqp_table_t arguments) { state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK, amqp_exchange_declare_t, - 0, exchange, type, passive, durable, auto_delete, 0, 0, arguments); + 0, exchange, type, passive, durable, 0, 0, 0, arguments); return RPC_REPLY(amqp_exchange_declare_ok_t); } @@ -220,13 +252,13 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, - amqp_bytes_t binding_key, + amqp_bytes_t routing_key, amqp_table_t arguments) { state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK, amqp_queue_unbind_t, - 0, queue, exchange, binding_key, arguments); + 0, queue, exchange, routing_key, arguments); return RPC_REPLY(amqp_queue_unbind_ok_t); } diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 8623eed..3d95e98 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -52,17 +52,13 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> - -#include <unistd.h> -#include <sys/uio.h> -#include <sys/types.h> +#include <assert.h> #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#include <assert.h> +#include "socket.h" #define INITIAL_FRAME_POOL_PAGE_SIZE 65536 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 @@ -151,7 +147,7 @@ int amqp_tune_connection(amqp_connection_state_t state, newbuf = realloc(state->outbound_buffer.bytes, frame_max); if (newbuf == NULL) { amqp_destroy_connection(state); - return -ENOMEM; + return -ERROR_NO_MEMORY; } state->outbound_buffer.bytes = newbuf; @@ -162,12 +158,19 @@ int amqp_get_channel_max(amqp_connection_state_t state) { return state->channel_max; } -void amqp_destroy_connection(amqp_connection_state_t state) { +int amqp_destroy_connection(amqp_connection_state_t state) { + int s = state->sockfd; + empty_amqp_pool(&state->frame_pool); empty_amqp_pool(&state->decoding_pool); free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); free(state); + + if (s >= 0 && amqp_socket_close(s) < 0) + return -amqp_socket_error(); + else + return 0; } static void return_to_idle(amqp_connection_state_t state) { @@ -199,7 +202,7 @@ int amqp_handle_input(amqp_connection_state_t state, /* state->inbound_buffer.len is always nonzero, because it corresponds to frame_max, which is not permitted to be less than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ - return -ENOMEM; + return -ERROR_NO_MEMORY; } state->state = CONNECTION_STATE_WAITING_FOR_HEADER; } @@ -246,7 +249,7 @@ int amqp_handle_input(amqp_connection_state_t state, /* Check frame end marker (footer) */ if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) { - return -EINVAL; + return -ERROR_BAD_AMQP_DATA; } decoded_frame->channel = D_16(state->inbound_buffer, 1); @@ -392,7 +395,7 @@ static int inner_send_frame(amqp_connection_state_t state, break; default: - return -EINVAL; + abort(); } E_32(state->outbound_buffer, 3, *payload_len); @@ -419,16 +422,14 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame) { amqp_bytes_t encoded; - int payload_len; - int separate_body; + int payload_len, res; - separate_body = inner_send_frame(state, frame, &encoded, &payload_len); - switch (separate_body) { + res = inner_send_frame(state, frame, &encoded, &payload_len); + switch (res) { case 0: - AMQP_CHECK_RESULT(write(state->sockfd, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); - return 0; + res = send(state->sockfd, state->outbound_buffer.bytes, + payload_len + (HEADER_SIZE + FOOTER_SIZE), 0); + break; case 1: { struct iovec iov[3]; @@ -440,13 +441,18 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; assert(FOOTER_SIZE == 1); iov[2].iov_len = FOOTER_SIZE; - AMQP_CHECK_RESULT(writev(state->sockfd, &iov[0], 3)); - return 0; + res = amqp_socket_writev(state->sockfd, &iov[0], 3); + break; } default: - return separate_body; + return res; } + + if (res < 0) + return -amqp_socket_error(); + else + return 0; } int amqp_send_frame_to(amqp_connection_state_t state, diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 6e52dc8..021151a 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -53,7 +53,6 @@ #include <string.h> #include <stdint.h> #include <sys/types.h> -#include <errno.h> #include <assert.h> #include "amqp.h" @@ -102,25 +101,24 @@ void empty_amqp_pool(amqp_pool_t *pool) { empty_blocklist(&pool->pages); } +/* Returns 1 on success, 0 on failure */ static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1); if (x->blocklist == NULL) { x->blocklist = malloc(blocklistlength); - if (x->blocklist == NULL) { - return -ENOMEM; - } + if (x->blocklist == NULL) + return 0; } else { void *newbl = realloc(x->blocklist, blocklistlength); - if (newbl == NULL) { - return -ENOMEM; - } + if (newbl == NULL) + return 0; x->blocklist = newbl; } x->blocklist[x->num_blocks] = block; x->num_blocks++; - return 0; + return 1; } void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { @@ -135,9 +133,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (result == NULL) { return NULL; } - if (record_pool_block(&pool->large_blocks, result) != 0) { + if (!record_pool_block(&pool->large_blocks, result)) return NULL; - } return result; } @@ -156,9 +153,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (pool->alloc_block == NULL) { return NULL; } - if (record_pool_block(&pool->pages, pool->alloc_block) != 0) { + if (!record_pool_block(&pool->pages, pool->alloc_block)) return NULL; - } pool->next_page = pool->pages.num_blocks; } else { pool->alloc_block = pool->pages.blocklist[pool->next_page]; diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 3985619..c30663a 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -55,7 +55,28 @@ extern "C" { #endif -#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */ +/* Error numbering: Because of differences in error numbering on + * different platforms, we want to keep error numbers opaque for + * client code. Internally, we encode the category of an error + * (i.e. where its number comes from) in the top bits of the number + * (assuming that an int has at least 32 bits). + */ +#define ERROR_CATEGORY_MASK (1 << 29) + +#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */ +#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */ + +/* librabbitmq error codes */ +#define ERROR_NO_MEMORY 1 +#define ERROR_BAD_AMQP_DATA 2 +#define ERROR_UNKNOWN_CLASS 3 +#define ERROR_UNKNOWN_METHOD 4 +#define ERROR_GETHOSTBYNAME_FAILED 5 +#define ERROR_INCOMPATIBLE_AMQP_VERSION 6 +#define ERROR_CONNECTION_CLOSED 7 +#define ERROR_MAX 7 + +extern char *amqp_os_error_string(int err); /* * Connection states: @@ -125,7 +146,7 @@ struct amqp_connection_state_t_ { amqp_rpc_reply_t most_recent_api_result; }; -#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) +#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); }) #define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o])) #define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o)) @@ -176,13 +197,6 @@ extern int amqp_encode_table(amqp_bytes_t encoded, #define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, ) -#define AMQP_CHECK_EOF_RESULT(expr) \ - ({ \ - int _result = (expr); \ - if (_result <= 0) return _result; \ - _result; \ - }) - #ifndef NDEBUG extern void amqp_dump(void const *buffer, size_t len); #else diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 17805fa..13f6376 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -52,47 +52,47 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include <stdarg.h> +#include <assert.h> #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#include <sys/types.h> -#include <sys/uio.h> -#include <unistd.h> -#include <sys/socket.h> -#include <netdb.h> -#include <netinet/in.h> +#include "socket.h" -#include <assert.h> int amqp_open_socket(char const *hostname, int portnumber) { - int sockfd; + int sockfd, res; struct sockaddr_in addr; struct hostent *he; + int one = 1; /* used as a buffer by setsockopt below */ + + res = amqp_socket_init(); + if (res) + return res; he = gethostbyname(hostname); - if (he == NULL) { - return -ENOENT; - } + if (he == NULL) + return -ERROR_GETHOSTBYNAME_FAILED; addr.sin_family = AF_INET; addr.sin_port = htons(portnumber); addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0]; sockfd = socket(PF_INET, SOCK_STREAM, 0); - if (sockfd == -1) { - return -errno; - } + if (sockfd == -1) + return -amqp_socket_error(); - if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { - int result = -errno; - close(sockfd); - return result; + if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, + sizeof(one)) < 0 + || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) + { + res = -amqp_socket_error(); + amqp_socket_close(sockfd); + return res; } return sockfd; @@ -104,22 +104,15 @@ static char *header() { header[1] = 'M'; header[2] = 'Q'; header[3] = 'P'; -#ifndef USE_MODERN_AMQP_PROTOCOL_HEADER - header[4] = 1; - header[5] = 1; - header[6] = AMQP_PROTOCOL_VERSION_MAJOR; - header[7] = AMQP_PROTOCOL_VERSION_MINOR; -#else header[4] = 0; header[5] = AMQP_PROTOCOL_VERSION_MAJOR; header[6] = AMQP_PROTOCOL_VERSION_MINOR; header[7] = AMQP_PROTOCOL_VERSION_REVISION; -#endif return header; } int amqp_send_header(amqp_connection_state_t state) { - return write(state->sockfd, header(), 8); + return send(state->sockfd, header(), 8, 0); } int amqp_send_header_to(amqp_connection_state_t state, @@ -194,24 +187,21 @@ static int wait_frame_inner(amqp_connection_state_t state, AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame))); state->sock_inbound_offset += result; - if (decoded_frame->frame_type != 0) { + if (decoded_frame->frame_type != 0) /* Complete frame was read. Return it. */ - return 1; - } + return 0; /* Incomplete or ignored frame. Keep processing input. */ assert(result != 0); } - result = read(state->sockfd, - state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len); - if (result < 0) { - return -errno; - } - if (result == 0) { - /* EOF. */ - return 0; + result = recv(state->sockfd, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0); + if (result <= 0) { + if (result == 0) + return -ERROR_CONNECTION_CLOSED; + else + return -amqp_socket_error(); } state->sock_inbound_limit = result; @@ -229,7 +219,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, state->last_queued_frame = NULL; } *decoded_frame = *f; - return 1; + return 0; } else { return wait_frame_inner(state, decoded_frame); } @@ -241,8 +231,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state, amqp_method_t *output) { amqp_frame_t frame; - - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame)); + int res = amqp_simple_wait_frame(state, &frame); + if (res < 0) + return res; + amqp_assert(frame.channel == expected_channel, "Expected 0x%08X method frame on channel %d, got frame on channel %d", expected_method, @@ -259,7 +251,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state, expected_channel, frame.payload.method.id); *output = frame.payload.method; - return 1; + return 0; } int amqp_send_method(amqp_connection_state_t state, @@ -299,7 +291,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, status = amqp_send_method(state, channel, request_id, decoded_request_method); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -308,9 +300,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, retry: status = wait_frame_inner(state, &frame); - if (status <= 0) { + if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -335,7 +327,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, if (frame_copy == NULL || link == NULL) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = ENOMEM; + result.library_error = ERROR_NO_MEMORY; return result; } @@ -370,6 +362,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_sasl_method_enum sasl_method, va_list vl) { + int res; amqp_method_t method; uint32_t server_frame_max; uint16_t server_channel_max; @@ -377,12 +370,16 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_send_header(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method)); + res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, + &method); + if (res < 0) + return res; + { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { - return -EPROTOTYPE; + return -ERROR_INCOMPATIBLE_AMQP_VERSION; } /* TODO: check that our chosen SASL mechanism is in the list of @@ -394,7 +391,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl); amqp_connection_start_ok_t s; if (response_bytes.bytes == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } s = (amqp_connection_start_ok_t) { @@ -408,7 +405,11 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method)); + res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, + &method); + if (res < 0) + return res; + { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; server_channel_max = s->channel_max; @@ -442,7 +443,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - return 1; + return 0; } amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, @@ -460,11 +461,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, va_start(vl, sasl_method); status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl); - if (status <= 0) { + if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; result.reply.id = 0; result.reply.decoded = NULL; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -492,6 +493,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, result.reply_type = AMQP_RESPONSE_NORMAL; result.reply.id = 0; result.reply.decoded = NULL; - result.library_errno = 0; + result.library_error = 0; return result; } diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index 25c5932..3f5eb61 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -52,10 +52,10 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include "amqp.h" #include "amqp_private.h" +#include "socket.h" #include <assert.h> @@ -86,7 +86,7 @@ static int amqp_decode_array(amqp_bytes_t encoded, int limit; if (entries == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } offset += 4; @@ -99,7 +99,7 @@ static int amqp_decode_array(amqp_bytes_t encoded, newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t)); if (newentries == NULL) { free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } entries = newentries; } @@ -117,7 +117,7 @@ static int amqp_decode_array(amqp_bytes_t encoded, if (output->entries == NULL && num_entries > 0) { /* NULL is legitimate if we requested a zero-length block. */ free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t)); @@ -140,7 +140,7 @@ int amqp_decode_table(amqp_bytes_t encoded, int limit; if (entries == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } offset += 4; @@ -159,7 +159,7 @@ int amqp_decode_table(amqp_bytes_t encoded, newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t)); if (newentries == NULL) { free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } entries = newentries; } @@ -182,7 +182,7 @@ int amqp_decode_table(amqp_bytes_t encoded, if (output->entries == NULL && num_entries > 0) { /* NULL is legitimate if we requested a zero-length block. */ free(entries); - return -ENOMEM; + return -ERROR_NO_MEMORY; } memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); @@ -274,7 +274,7 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_VOID: break; default: - return -EINVAL; + return -ERROR_BAD_AMQP_DATA; } *offsetptr = offset; @@ -410,7 +410,7 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_VOID: break; default: - return -EINVAL; + abort(); } *offsetptr = offset; diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 792a6cf..f911966 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -170,7 +170,7 @@ def genErl(spec): if m.arguments: print " %s *m = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \ (m.structName(), m.structName(), m.structName()) - print " if (m == NULL) { return -ENOMEM; }" + print " if (m == NULL) { return -ERROR_NO_MEMORY; }" else: print " %s *m = NULL; /* no fields */" % (m.structName(),) bitindex = None @@ -197,7 +197,7 @@ def genErl(spec): print " case %d: {" % (c.index,) print " %s *p = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \ (c.structName(), c.structName(), c.structName()) - print " if (p == NULL) { return -ENOMEM; }" + print " if (p == NULL) { return -ERROR_NO_MEMORY; }" print " p->_flags = flags;" for f in c.fields: if spec.resolveDomain(f.domain) == 'bit': @@ -261,12 +261,11 @@ def genErl(spec): print '#include <stdint.h>' print '#include <string.h>' print '#include <stdio.h>' - print '#include <errno.h>' - print '#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */' print print '#include "amqp.h"' print '#include "amqp_framing.h"' print '#include "amqp_private.h"' + print '#include "socket.h"' print """ char const *amqp_constant_name(int constantNumber) { @@ -317,7 +316,7 @@ int amqp_decode_method(amqp_method_number_t methodNumber, switch (methodNumber) {""" for m in methods: genDecodeMethodFields(m) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_METHOD; } }""" @@ -343,7 +342,7 @@ int amqp_decode_properties(uint16_t class_id, switch (class_id) {""" for c in spec.allClasses(): genDecodeProperties(c) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_CLASS; } }""" @@ -358,7 +357,7 @@ int amqp_encode_method(amqp_method_number_t methodNumber, switch (methodNumber) {""" for m in methods: genEncodeMethodFields(m) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_METHOD; } }""" @@ -390,7 +389,7 @@ int amqp_encode_properties(uint16_t class_id, switch (class_id) {""" for c in spec.allClasses(): genEncodeProperties(c) - print """ default: return -ENOENT; + print """ default: return -ERROR_UNKNOWN_CLASS; } }""" diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c new file mode 100644 index 0000000..9d37dfc --- /dev/null +++ b/librabbitmq/unix/socket.c @@ -0,0 +1,85 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <sys/socket.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdint.h> +#include <string.h> + +#include "amqp.h" +#include "amqp_private.h" +#include "socket.h" + +int amqp_socket_socket(int domain, int type, int proto) +{ + int flags; + + int s = socket(domain, type, proto); + if (s < 0) + return s; + + /* Always enable CLOEXEC on the socket */ + flags = fcntl(s, F_GETFD); + if (flags == -1 + || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { + int e = errno; + close(s); + errno = e; + return -1; + } + + return s; +} + +char *amqp_os_error_string(int err) +{ + return strdup(strerror(err)); +} diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h new file mode 100644 index 0000000..5cb37f1 --- /dev/null +++ b/librabbitmq/unix/socket.h @@ -0,0 +1,79 @@ +#ifndef librabbitmq_unix_socket_h +#define librabbitmq_unix_socket_h + +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <errno.h> +#include <sys/types.h> +#include <unistd.h> +#include <sys/uio.h> +#include <sys/socket.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +static inline int amqp_socket_init(void) +{ + return 0; +} + +extern int amqp_socket_socket(int domain, int type, int proto); + +#define amqp_socket_setsockopt setsockopt +#define amqp_socket_close close +#define amqp_socket_writev writev + +static inline int amqp_socket_error() +{ + return errno | ERROR_CATEGORY_OS; +} + +#endif diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c new file mode 100644 index 0000000..9c026bd --- /dev/null +++ b/librabbitmq/windows/socket.c @@ -0,0 +1,88 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <windows.h> +#include <stdint.h> + +#include "amqp.h" +#include "amqp_private.h" +#include "socket.h" + +static int called_wsastartup; + +int amqp_socket_init(void) +{ + if (!called_wsastartup) { + WSADATA data; + int res = WSAStartup(0x0202, &data); + if (res) + return -res; + + called_wsastartup = 1; + } + + return 0; +} + +char *amqp_os_error_string(int err) +{ + char *msg, *copy; + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, err, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) + return strdup("(error retrieving Windows error message)"); + + copy = strdup(msg); + LocalFree(msg); + return copy; +} diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h new file mode 100644 index 0000000..3e0a378 --- /dev/null +++ b/librabbitmq/windows/socket.h @@ -0,0 +1,89 @@ +#ifndef librabbitmq_windows_socket_h +#define librabbitmq_windows_socket_h + +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <winsock2.h> + +extern int amqp_socket_init(void); + +#define amqp_socket_socket socket +#define amqp_socket_close closesocket + +static inline int amqp_socket_setsockopt(int sock, int level, int optname, + const void *optval, size_t optlen) +{ + /* the winsock setsockopt function has its 4th argument as a + const char * */ + return setsockopt(sock, level, optname, (const char *)optval, optlen); +} + +/* same as WSABUF */ +struct iovec { + u_long iov_len; + char *iov_base; +}; + +static inline int amqp_socket_writev(int sock, struct iovec *iov, int nvecs) +{ + DWORD ret; + if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) + return ret; + else + return -1; +} + +static inline int amqp_socket_error() +{ + return WSAGetLastError() | ERROR_CATEGORY_OS; +} + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index 1ac6faf..7c8a4fe 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1,4 +1,4 @@ noinst_PROGRAMS = test_tables -AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(top_srcdir)/librabbitmq/$(PLATFORM_DIR) AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la diff --git a/tests/test_tables.c b/tests/test_tables.c index e620443..be1994b 100644 --- a/tests/test_tables.c +++ b/tests/test_tables.c @@ -52,9 +52,9 @@ #include <stdio.h> #include <string.h> #include <time.h> -#include <errno.h> -#include <stdint.h> +#include <inttypes.h> + #include <amqp.h> #include <amqp_framing.h> #include <amqp_private.h> @@ -75,13 +75,13 @@ static void dump_value(int indent, amqp_field_value_t v) { putchar(' '); switch (v.kind) { case AMQP_FIELD_KIND_BOOLEAN: puts(v.value.boolean ? "true" : "false"); break; - case AMQP_FIELD_KIND_I8: printf("%d\n", v.value.i8); break; - case AMQP_FIELD_KIND_U8: printf("%d\n", v.value.u8); break; - case AMQP_FIELD_KIND_I16: printf("%d\n", v.value.i16); break; - case AMQP_FIELD_KIND_U16: printf("%d\n", v.value.u16); break; - case AMQP_FIELD_KIND_I32: printf("%ld\n", (long) v.value.i32); break; - case AMQP_FIELD_KIND_U32: printf("%lu\n", (unsigned long) v.value.u32); break; - case AMQP_FIELD_KIND_I64: printf("%lld\n", (long long) v.value.i64); break; + case AMQP_FIELD_KIND_I8: printf("%"PRId8"\n", v.value.i8); break; + case AMQP_FIELD_KIND_U8: printf("%"PRIu8"\n", v.value.u8); break; + case AMQP_FIELD_KIND_I16: printf("%"PRId16"\n", v.value.i16); break; + case AMQP_FIELD_KIND_U16: printf("%"PRIu16"\n", v.value.u16); break; + case AMQP_FIELD_KIND_I32: printf("%"PRId32"\n", v.value.i32); break; + case AMQP_FIELD_KIND_U32: printf("%"PRIu32"\n", v.value.u32); break; + case AMQP_FIELD_KIND_I64: printf("%"PRId64"\n", v.value.i64); break; case AMQP_FIELD_KIND_F32: printf("%g\n", (double) v.value.f32); break; case AMQP_FIELD_KIND_F64: printf("%g\n", v.value.f64); break; case AMQP_FIELD_KIND_DECIMAL: @@ -106,7 +106,7 @@ static void dump_value(int indent, amqp_field_value_t v) { } } break; - case AMQP_FIELD_KIND_TIMESTAMP: printf("%llu\n", (unsigned long long) v.value.u64); break; + case AMQP_FIELD_KIND_TIMESTAMP: printf("%"PRIu64"\n", v.value.u64); break; case AMQP_FIELD_KIND_TABLE: putchar('\n'); { @@ -209,7 +209,9 @@ static void test_table_codec(void) { int decoding_offset = 0; result = amqp_decode_table(decoding_bytes, &pool, &decoded, &decoding_offset); if (result < 0) { - printf("Table decoding failed: %d (%s)\n", result, strerror(-result)); + char *errstr = amqp_error_string(-result); + printf("Table decoding failed: %d (%s)\n", result, errstr); + free(errstr); abort(); } printf("BBBBBBBBBB\n"); @@ -227,7 +229,9 @@ static void test_table_codec(void) { result = amqp_encode_table(encoding_result, &table, &offset); if (result < 0) { - printf("Table encoding failed: %d (%s)\n", result, strerror(-result)); + char *errstr = amqp_error_string(-result); + printf("Table encoding failed: %d (%s)\n", result, errstr); + free(errstr); abort(); } @@ -272,7 +276,7 @@ int main(int argc, char const * const *argv) { if ((sizeof(float) != 4) || (vi.i != 0x40490fdb)) { printf("*** ERROR: single floating point encoding does not work as expected\n"); printf("sizeof float is %lu, float is %g, u32 is 0x%08lx\n", - sizeof(float), + (unsigned long)sizeof(float), vi.f, (unsigned long) vi.i); } @@ -280,10 +284,9 @@ int main(int argc, char const * const *argv) { vl.d = M_PI; if ((sizeof(double) != 8) || (vl.l != 0x400921fb54442d18L)) { printf("*** ERROR: double floating point encoding does not work as expected\n"); - printf("sizeof double is %lu, double is %g, u64 is 0x%16llx\n", - sizeof(double), - vl.d, - (unsigned long long) vl.l); + printf("sizeof double is %lu, double is %g, u64 is 0x%16"PRIx64"\n", + (unsigned long)sizeof(double), + vl.d, vl.l); } test_table_codec(); diff --git a/tools/Makefile.am b/tools/Makefile.am index 2c47385..ccd36ca 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -2,15 +2,26 @@ SUBDIRS=doc bin_PROGRAMS = amqp-publish amqp-get amqp-consume amqp-declare-queue amqp-delete-queue -AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_CFLAGS = -I$(top_srcdir)/librabbitmq -I$(srcdir)/$(PLATFORM_DIR) AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la LDADD=$(LIBPOPT) -noinst_HEADERS = common.h +noinst_HEADERS = common.h $(PLATFORM_DIR)/process.h -amqp_publish_SOURCES = publish.c common.c -amqp_get_SOURCES = get.c common.c -amqp_consume_SOURCES = consume.c common.c -amqp_declare_queue_SOURCES = declare_queue.c common.c -amqp_delete_queue_SOURCES = delete_queue.c common.c +COMMON_SOURCES = common.c + +if WINDOWS +COMMON_SOURCES += windows/compat.c +endif + +amqp_publish_SOURCES = publish.c $(COMMON_SOURCES) +amqp_get_SOURCES = get.c $(COMMON_SOURCES) +amqp_consume_SOURCES = consume.c $(PLATFORM_DIR)/process.c $(COMMON_SOURCES) +amqp_declare_queue_SOURCES = declare_queue.c $(COMMON_SOURCES) +amqp_delete_queue_SOURCES = delete_queue.c $(COMMON_SOURCES) + +EXTRA_DIST = \ + unix/process.c unix/process.h \ + windows/process.c windows/process.h \ + windows/compat.c windows/compat.h diff --git a/tools/common.c b/tools/common.c index 6a38a95..c5bda77 100644 --- a/tools/common.c +++ b/tools/common.c @@ -58,14 +58,12 @@ #include <unistd.h> #include <fcntl.h> #include <errno.h> -#include <spawn.h> -#include <sys/wait.h> - -#include <popt.h> #include "common.h" -extern char **environ; +#ifdef WINDOWS +#include "compat.h" +#endif void die(const char *fmt, ...) { @@ -79,14 +77,31 @@ void die(const char *fmt, ...) void die_errno(int err, const char *fmt, ...) { + va_list ap; + if (err == 0) return; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(errno)); + exit(1); +} + +void die_amqp_error(int err, const char *fmt, ...) +{ va_list ap; + char *errstr; + + if (err <= 0) + return; + va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ": %s\n", strerror(err)); + fprintf(stderr, ": %s\n", errstr = amqp_error_string(err)); + free(errstr); exit(1); } @@ -127,24 +142,15 @@ char *amqp_server_exception_string(amqp_rpc_reply_t r) char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { - const char *s; - switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - s = "normal response"; - break; + return strdup("normal response"); case AMQP_RESPONSE_NONE: - s = "missing RPC reply type"; - break; + return strdup("missing RPC reply type"); case AMQP_RESPONSE_LIBRARY_EXCEPTION: - if (r.library_errno) - s = strerror(r.library_errno); - else - s = "end of stream"; - - break; + return amqp_error_string(r.library_error); case AMQP_RESPONSE_SERVER_EXCEPTION: return amqp_server_exception_string(r); @@ -152,33 +158,24 @@ char *amqp_rpc_reply_string(amqp_rpc_reply_t r) default: abort(); } - - return strdup(s); } void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) { + va_list ap; + char *errstr; + if (r.reply_type == AMQP_RESPONSE_NORMAL) return; - va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); + fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r)); + free(errstr); exit(1); } -void set_cloexec(int fd) -{ - int flags; - - flags = fcntl(fd, F_GETFD); - if (flags == -1 - || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) - die_errno(errno, "set_cloexec"); -} - static char *amqp_server = "localhost"; static char *amqp_vhost = "/"; static char *amqp_username = "guest"; @@ -222,14 +219,7 @@ amqp_connection_state_t make_connection(void) } s = amqp_open_socket(host, port ? port : 5672); - if (s < 0) { - if (s == -ENOENT) - die("unknown host %s", host); - else - die_errno(-s, "opening socket to %s", amqp_server); - } - - set_cloexec(s); + die_amqp_error(-s, "opening socket to %s", amqp_server); conn = amqp_new_connection(); amqp_set_sockfd(conn, s); @@ -247,16 +237,14 @@ amqp_connection_state_t make_connection(void) void close_connection(amqp_connection_state_t conn) { - int s = amqp_get_sockfd(conn); - + int res; die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel"); die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "closing connection"); - amqp_destroy_connection(conn); - - if (close(s) < 0) - die_errno(errno, "closing socket"); + + res = amqp_destroy_connection(conn); + die_amqp_error(-res, "closing connection"); } amqp_bytes_t read_all(int fd) @@ -307,8 +295,7 @@ void copy_body(amqp_connection_state_t conn, int fd) amqp_frame_t frame; int res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for header frame"); + die_amqp_error(-res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_HEADER) die("expected header, got frame type 0x%X", frame.frame_type); @@ -316,8 +303,7 @@ void copy_body(amqp_connection_state_t conn, int fd) body_remaining = frame.payload.properties.body_size; while (body_remaining) { res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for body frame"); + die_amqp_error(-res, "waiting for body frame"); if (frame.frame_type != AMQP_FRAME_BODY) die("expected body, got frame type 0x%X", frame.frame_type); @@ -327,47 +313,6 @@ void copy_body(amqp_connection_state_t conn, int fd) } } -void pipeline(const char * const *argv, struct pipeline *pl) -{ - posix_spawn_file_actions_t file_acts; - - int pipefds[2]; - if (pipe(pipefds)) - die_errno(errno, "pipe"); - - die_errno(posix_spawn_file_actions_init(&file_acts), - "posix_spawn_file_actions_init"); - die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), - "posix_spawn_file_actions_adddup2"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), - "posix_spawn_file_actions_addclose"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), - "posix_spawn_file_actions_addclose"); - - die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, - (char * const *)argv, environ), - "posix_spawnp: %s", argv[0]); - - die_errno(posix_spawn_file_actions_destroy(&file_acts), - "posix_spawn_file_actions_destroy"); - - if (close(pipefds[0])) - die_errno(errno, "close"); - - pl->infd = pipefds[1]; -} - -int finish_pipeline(struct pipeline *pl) -{ - int status; - - if (close(pl->infd)) - die_errno(errno, "close"); - if (waitpid(pl->pid, &status, 0) < 0) - die_errno(errno, "waitpid"); - return WIFEXITED(status) && WEXITSTATUS(status) == 0; -} - poptContext process_options(int argc, const char **argv, struct poptOption *options, const char *help) diff --git a/tools/common.h b/tools/common.h index 09a9242..84889d3 100644 --- a/tools/common.h +++ b/tools/common.h @@ -50,6 +50,8 @@ #include <stdint.h> +#include <popt.h> + #include <amqp.h> #include <amqp_framing.h> @@ -60,6 +62,8 @@ extern void die(const char *fmt, ...) __attribute__ ((format (printf, 1, 2))); extern void die_errno(int err, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); +extern void die_amqp_error(int err, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); @@ -73,14 +77,6 @@ extern void write_all(int fd, amqp_bytes_t data); extern void copy_body(amqp_connection_state_t conn, int fd); -struct pipeline { - int pid; - int infd; -}; - -extern void pipeline(const char * const *argv, struct pipeline *pl); -extern int finish_pipeline(struct pipeline *pl); - #define INCLUDE_OPTIONS(options) \ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} diff --git a/tools/consume.c b/tools/consume.c index 7725ba8..34037d9 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -53,9 +53,8 @@ #include <stdio.h> #include <stdlib.h> -#include <popt.h> - #include "common.h" +#include "process.h" /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ @@ -85,61 +84,45 @@ static char *stringify_bytes(amqp_bytes_t bytes) static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue, char *exchange, - char *exchange_type, char *routing_key) + char *routing_key, int declare) { - amqp_bytes_t queue_bytes; - amqp_queue_declare_ok_t *res; + amqp_bytes_t queue_bytes = cstring_bytes(queue); /* if an exchange name wasn't provided, check that we don't have options that require it. */ - if (!exchange) { - char *opt = NULL; - if (routing_key) - opt = "--routing-key"; - else if (exchange_type) - opt = "--exchange-type"; - - if (opt) { - fprintf(stderr, - "%s option requires an exchange name to be " - "provided with --exchange\n", opt); - exit(1); - } + if (!exchange && routing_key) { + fprintf(stderr, "--routing-key option requires an exchange" + " name to be provided with --exchange\n"); + exit(1); } - /* Declare the queue as auto-delete. If the queue already - exists, this won't have any effect. */ - queue_bytes = cstring_bytes(queue); - res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1, - AMQP_EMPTY_TABLE); - if (!res) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); - - if (!queue) { - /* the server should have provided a queue name */ - char *sq; - queue_bytes = amqp_bytes_malloc_dup(res->queue); - sq = stringify_bytes(queue_bytes); - fprintf(stderr, "Server provided queue name: %s\n", sq); - free(sq); - } + if (!queue || exchange || declare) { + /* Declare the queue as auto-delete. */ + amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, + queue_bytes, 0, 0, 1, 1, + AMQP_EMPTY_TABLE); + if (!res) + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + + if (!queue) { + /* the server should have provided a queue name */ + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", + sq); + free(sq); + } - /* Bind to an exchange if requested */ - if (exchange) { - amqp_bytes_t eb = amqp_cstring_bytes(exchange); - - if (exchange_type) { - /* we should create the exchange */ - if (!amqp_exchange_declare(conn, 1, eb, - amqp_cstring_bytes(exchange_type), - 0, 0, 1, AMQP_EMPTY_TABLE)) - die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), + "queue.bind"); } - - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - AMQP_EMPTY_TABLE)) - die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); } return queue_bytes; @@ -157,8 +140,7 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, struct pipeline pl; uint64_t delivery_tag; int res = amqp_simple_wait_frame(conn, &frame); - if (res < 0) - die_errno(-res, "waiting for header frame"); + die_amqp_error(res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) @@ -172,8 +154,9 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, copy_body(conn, pl.infd); if (finish_pipeline(&pl) && !no_ack) - die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0), - "basic.ack"); + die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, + 0), + "basic.ack"); amqp_maybe_release_buffers(conn); } @@ -182,13 +165,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, int main(int argc, const char **argv) { poptContext opts; - int no_ack; amqp_connection_state_t conn; const char * const *cmd_argv; char *queue = NULL; char *exchange = NULL; - char *exchange_type = NULL; char *routing_key = NULL; + int declare = 0; + int no_ack = 0; amqp_bytes_t queue_bytes; struct poptOption options[] = { @@ -197,11 +180,10 @@ int main(int argc, const char **argv) "the queue to consume from", "queue"}, {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, "bind the queue to this exchange", "exchange"}, - {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0, - "create auto-delete exchange of this type for binding", - "type"}, {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, "the routing key to bind with", "routing key"}, + {"declare", 'd', POPT_ARG_NONE, &declare, 0, + "declare an exclusive queue", NULL}, {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode", NULL}, POPT_AUTOHELP @@ -219,8 +201,7 @@ int main(int argc, const char **argv) } conn = make_connection(); - queue_bytes = setup_queue(conn, queue, exchange, exchange_type, - routing_key); + queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); do_consume(conn, queue_bytes, no_ack, cmd_argv); close_connection(conn); return 0; diff --git a/tools/declare_queue.c b/tools/declare_queue.c index 662531f..3536455 100644 --- a/tools/declare_queue.c +++ b/tools/declare_queue.c @@ -55,8 +55,6 @@ #include <string.h> #include <unistd.h> -#include <popt.h> - #include "common.h" int main(int argc, const char **argv) diff --git a/tools/delete_queue.c b/tools/delete_queue.c index 41d0d13..ccd157e 100644 --- a/tools/delete_queue.c +++ b/tools/delete_queue.c @@ -55,8 +55,6 @@ #include <string.h> #include <unistd.h> -#include <popt.h> - #include "common.h" int main(int argc, const char **argv) diff --git a/tools/doc/consume.xml b/tools/doc/consume.xml index 448ade6..16d61ad 100644 --- a/tools/doc/consume.xml +++ b/tools/doc/consume.xml @@ -50,8 +50,7 @@ <para> <command>amqp-consume</command> can consume from an existing queue, or it can create a new queue. It can - optionally bind the queue to an existing exchange, or to a - newly created exchange. + optionally bind the queue to an existing exchange. </para> <para> By default, messages will be consumed with explicit @@ -72,13 +71,16 @@ <listitem> <para> The name of the queue to consume messages - from. If the specified queue does not exist, - an auto-delete queue is created with the given - name. If this option is omitted, a new - auto-delete queue will be created, with a - unique name assigned to the queue by the AMQP - server; that unique name will be displayed on - stderr. + from. + </para> + + <para> + If the <option>--queue</option> option is + omitted, the AMQP server will assign a unique + name to the queue, and that server-assigned + name will be dixsplayed on stderr; this case + implies that an exclusive queue should be + declared. </para> </listitem> </varlistentry> @@ -87,34 +89,36 @@ <term><option>--exchange</option>=<replaceable class="parameter">exchange name</replaceable></term> <listitem> <para> - The name of the exchange to bind the queue to. - If omitted, binding is not performed. The - specified exchange should already exist unless - the <option>--exchange-type</option> option is - used to request the creation of an exchange. + Specifies that an exclusive queue should + be declared, and bound to the given exchange. + The specified exchange should already exist + unless the <option>--exchange-type</option> + option is used to request the creation of an + exchange. </para> </listitem> </varlistentry> <varlistentry> - <term><option>-t</option></term> - <term><option>--exchange-type</option>=<replaceable class="parameter">type</replaceable></term> + <term><option>-r</option></term> + <term><option>--routing-key</option>=<replaceable class="parameter">routing key</replaceable></term> <listitem> <para> - This option indicates that an auto-delete - exchange of the specified type should be - created. The name of the exchange should be - given by the <option>--exchange</option> - option. + The routing key for binding. If omitted, an + empty routing key is assumed. </para> </listitem> </varlistentry> <varlistentry> - <term><option>-r</option></term> - <term><option>--routing-key</option>=<replaceable class="parameter">routing key</replaceable></term> + <term><option>-d</option></term> + <term><option>--declare</option></term> <listitem> <para> - The routing key for the binding. If omitted, - an empty routing key is assumed. + Forces an exclusive queue to be declared, + even when it otherwise would not. That is, + when a queue name is specified with the + <option>--queue</option> option, but no + binding to an exchange is requested with the + <option>--exchange</option> option. </para> </listitem> </varlistentry> @@ -138,7 +142,7 @@ <title>Examples</title> <variablelist> <varlistentry> - <term>Consume messages from the queue + <term>Consume messages from an existing queue <quote><systemitem class="resource">myqueue</systemitem></quote>, and output the message bodies on standard output via @@ -149,7 +153,7 @@ </varlistentry> <varlistentry> - <term>Bind a newly created auto-delete queue to an + <term>Bind a new exclusive queue to an exchange <quote><systemitem class="resource">myexch</systemitem></quote>, and send each message body to the script diff --git a/tools/get.c b/tools/get.c index f746fd1..8f8e0d0 100644 --- a/tools/get.c +++ b/tools/get.c @@ -52,8 +52,6 @@ #include <stdio.h> -#include <popt.h> - #include "common.h" static int do_get(amqp_connection_state_t conn, char *queue) diff --git a/tools/publish.c b/tools/publish.c index 21314b2..0917dae 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -54,8 +54,6 @@ #include <stdlib.h> #include <string.h> -#include <popt.h> - #include "common.h" static void do_publish(amqp_connection_state_t conn, @@ -66,8 +64,7 @@ static void do_publish(amqp_connection_state_t conn, cstring_bytes(exchange), cstring_bytes(routing_key), 0, 0, props, body); - if (res != 0) - die_errno(-res, "basic.publish"); + die_amqp_error(res, "basic.publish"); } int main(int argc, const char **argv) diff --git a/tools/unix/process.c b/tools/unix/process.c new file mode 100644 index 0000000..8a02afb --- /dev/null +++ b/tools/unix/process.c @@ -0,0 +1,100 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <unistd.h> +#include <errno.h> +#include <spawn.h> +#include <sys/wait.h> + +#include "common.h" +#include "process.h" + +extern char **environ; + +void pipeline(const char *const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} diff --git a/tools/unix/process.h b/tools/unix/process.h new file mode 100644 index 0000000..ac2939d --- /dev/null +++ b/tools/unix/process.h @@ -0,0 +1,57 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +struct pipeline { + int pid; + int infd; +}; + +extern void pipeline(const char *const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); diff --git a/tools/windows/compat.c b/tools/windows/compat.c new file mode 100644 index 0000000..f0508b2 --- /dev/null +++ b/tools/windows/compat.c @@ -0,0 +1,73 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> + +#include "compat.h" + +int asprintf(char **strp, const char *fmt, ...) +{ + va_list ap; + int len; + + va_start(ap, fmt); + len = _vscprintf(fmt, ap); + *strp = malloc(len+1); + if (!*strp) + return -1; + + len = vsprintf(*strp, fmt, ap); + *strp[len] = 0; + + va_end(ap); + return len; +} diff --git a/tools/windows/compat.h b/tools/windows/compat.h new file mode 100644 index 0000000..8211b37 --- /dev/null +++ b/tools/windows/compat.h @@ -0,0 +1,51 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +extern int asprintf(char **strp, const char *fmt, ...); diff --git a/tools/windows/process.c b/tools/windows/process.c new file mode 100644 index 0000000..0a005bd --- /dev/null +++ b/tools/windows/process.c @@ -0,0 +1,206 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <stdio.h> +#include <io.h> +#include <windows.h> + +#include "common.h" +#include "process.h" + +void die_windows_error(const char *fmt, ...) +{ + char *msg; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) + msg = "(failed to retrieve Windows error message)"; + + fprintf(stderr, ": %s\n", msg); + exit(1); +} + +static char *make_command_line(const char *const *argv) +{ + int i; + size_t len = 1; /* initial quotes */ + char *buf; + char *dest; + + /* calculate the length of the required buffer, making worst + case assumptions for simplicity */ + for (i = 0;;) { + len += strlen(argv[i]) * 2; + + if (!argv[++i]) + break; + + len += 3; /* quotes, space, quotes */ + } + + len += 2; /* final quotes and the terminating zero */ + + dest = buf = malloc(len); + if (!buf) + die("allocating memory for subprocess command line"); + + *dest++ = '\"'; + + for (i = 0;;) { + const char *src = argv[i]; + for (;;) { + switch (*src) { + case 0: + goto done; + + case '\"': + case '\\': + *dest++ = '\\'; + /* fall through */ + + default: + *dest++ = *src++; + break; + } + } + done: + + if (!argv[++i]) + break; + + *dest++ = '\"'; + *dest++ = ' '; + *dest++ = '\"'; + } + + *dest++ = '\"'; + *dest++ = 0; + return buf; +} + +void pipeline(const char *const *argv, struct pipeline *pl) +{ + HANDLE in_read_handle, in_write_handle; + SECURITY_ATTRIBUTES sec_attr; + PROCESS_INFORMATION proc_info; + STARTUPINFO start_info; + char *cmdline = make_command_line(argv); + + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = TRUE; + sec_attr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) + die_windows_error("CreatePipe"); + + if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) + die_windows_error("SetHandleInformation"); + + /* when in Rome... */ + ZeroMemory(&proc_info, sizeof proc_info); + ZeroMemory(&start_info, sizeof start_info); + + start_info.cb = sizeof start_info; + start_info.dwFlags |= STARTF_USESTDHANDLES; + + if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) + == INVALID_HANDLE_VALUE + || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) + == INVALID_HANDLE_VALUE) + die_windows_error("GetStdHandle"); + + start_info.hStdInput = in_read_handle; + + if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, + NULL, NULL, &start_info, &proc_info)) + die_windows_error("CreateProcess"); + + free(cmdline); + + if (!CloseHandle(proc_info.hThread)) + die_windows_error("CloseHandle for thread"); + if (!CloseHandle(in_read_handle)) + die_windows_error("CloseHandle"); + + pl->proc_handle = proc_info.hProcess; + pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); +} + +int finish_pipeline(struct pipeline *pl) +{ + DWORD code; + + if (close(pl->infd)) + die_errno(errno, "close"); + + for (;;) { + if (!GetExitCodeProcess(pl->proc_handle, &code)) + die_windows_error("GetExitCodeProcess"); + if (code != STILL_ACTIVE) + break; + + if (WaitForSingleObject(pl->proc_handle, INFINITE) + == WAIT_FAILED) + die_windows_error("WaitForSingleObject"); + } + + if (!CloseHandle(pl->proc_handle)) + die_windows_error("CloseHandle for process"); + + return code; +} diff --git a/tools/windows/process.h b/tools/windows/process.h new file mode 100644 index 0000000..df276a7 --- /dev/null +++ b/tools/windows/process.h @@ -0,0 +1,59 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2010 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2010 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include <windef.h> + +struct pipeline { + HANDLE proc_handle; + int infd; +}; + +extern void pipeline(const char *const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); |