diff options
author | Ask Solem <ask@celeryproject.org> | 2013-07-04 14:51:55 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-07-04 14:51:55 +0100 |
commit | 0839a7dd23cd197607d6efe699d24945484a3003 (patch) | |
tree | f784748fc8b2980401498d70667f78fb65c687ff | |
parent | 906505c11f8b026a34c937939c7ac581782cba7a (diff) | |
download | librabbitmq-rabbitc-latest.tar.gz |
Trying to udpate code to new changes in rabbitmq-crabbitc-latest
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | Modules/_librabbitmq/connection.c | 111 | ||||
-rw-r--r-- | Modules/_librabbitmq/connection.h | 3 | ||||
-rw-r--r-- | setup.py | 11 |
4 files changed, 82 insertions, 46 deletions
@@ -1,7 +1,8 @@ RABBIT_DIR=rabbitmq-c CODEGEN_DIR=rabbitmq-codegen RABBIT_TARGET=clib -RABBIT_DIST=librabbitmq-0.2.0 +RABBIT_VERSION=0.3.0 +RABBIT_DIST=librabbitmq-$(RABBIT_VERSION) CONFIGURE_ARGS=--disable-tools --disable-docs --enable-regen-amqp-framing diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 29f5076..9b4f14f 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -9,6 +9,7 @@ #include <sys/time.h> #include <amqp.h> +#include <amqp_tcp_socket.h> #include "connection.h" #include "distmeta.h" @@ -71,7 +72,8 @@ basic_properties_to_PyDict(amqp_basic_properties_t*, PyObject*); _PYRMQ_INLINE int PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, - amqp_connection_state_t ); + amqp_connection_state_t, + amqp_pool_t *); _PYRMQ_INLINE void amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t, @@ -99,8 +101,10 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int, void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*); int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *); -static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *); -static amqp_array_t PyList_ToAMQArray(amqp_connection_state_t, PyObject *); +static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, + amqp_pool_t *); +static amqp_array_t PyList_ToAMQArray(amqp_connection_state_t, PyObject *, + amqp_pool_t *); static PyObject* AMQTable_toPyDict(amqp_table_t *table); static PyObject* AMQArray_toPyList(amqp_array_t *array); @@ -213,17 +217,18 @@ AMQArray_SetIntValue(amqp_array_t *array, int value) } static amqp_table_t -PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src) +PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *channel_pool) { PY_SIZE_TYPE pos = 0; PY_SIZE_TYPE size = PyDict_Size(src); /* allocate new table */ - amqp_table_t dst = {.num_entries = 0, + /*amqp_table_t dst = {.num_entries = 0, .entries=amqp_pool_alloc(&conn->frame_pool, - size * sizeof(amqp_table_entry_t))}; + size * sizeof(amqp_table_entry_t))};*/ + amqp_table_t dst; dst.num_entries = 0; - dst.entries = amqp_pool_alloc(&conn->frame_pool, + dst.entries = amqp_pool_alloc(&channel_pool, size * sizeof(amqp_table_entry_t)); PyObject *dkey = NULL; @@ -234,13 +239,13 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src) /* Dict */ AMQTable_SetTableValue(&dst, PyString_AS_AMQBYTES(dkey), - PyDict_ToAMQTable(conn, dvalue)); + PyDict_ToAMQTable(conn, dvalue, channel_pool)); } else if (PyList_Check(dvalue)) { /* List */ AMQTable_SetArrayValue(&dst, PyString_AS_AMQBYTES(dkey), - PyList_ToAMQArray(conn, dvalue)); + PyList_ToAMQArray(conn, dvalue, channel_pool)); } else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) { /* Int | Long */ @@ -293,11 +298,11 @@ error: } static amqp_array_t -PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src) +PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *channel_pool) { PY_SIZE_TYPE size = PyList_Size(src); amqp_array_t dst = {.num_entries = 0, - .entries=amqp_pool_alloc(&conn->frame_pool, + .entries=amqp_pool_alloc(&channel_pool, size * sizeof(amqp_field_value_t))}; for (PY_SIZE_TYPE pos = 0; pos < size; ++pos) { @@ -306,12 +311,12 @@ PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src) if (PyDict_Check(dvalue)) { /* Dict */ AMQArray_SetTableValue(&dst, - PyDict_ToAMQTable(conn, dvalue)); + PyDict_ToAMQTable(conn, dvalue, channel_pool)); } else if (PyList_Check(dvalue)) { /* List */ AMQArray_SetArrayValue(&dst, - PyList_ToAMQArray(conn, dvalue)); + PyList_ToAMQArray(conn, dvalue, channel_pool)); } else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) { /* Int | Long */ @@ -619,7 +624,8 @@ AMQArray_toPyList(amqp_array_t *array) _PYRMQ_INLINE int PyDict_to_basic_properties(PyObject *p, amqp_basic_properties_t *props, - amqp_connection_state_t conn) + amqp_connection_state_t conn, + amqp_pool_t* channel_pool) { PyObject *value = NULL; props->headers = AMQP_EMPTY_TABLE; @@ -684,7 +690,7 @@ PyDict_to_basic_properties(PyObject *p, } if ((value = PyDict_GetItemString(p, "headers")) != NULL) { - props->headers = PyDict_ToAMQTable(conn, value); + props->headers = PyDict_ToAMQTable(conn, value, channel_pool); if (PyErr_Occurred()) return -1; } @@ -772,7 +778,7 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *self, unsigned int channel, snprintf(errorstr, sizeof(errorstr), "%s: %s", context, reply.library_error - ? amqp_error_string(reply.library_error) + ? amqp_error_string2(reply.library_error) : "(end-of-stream)"); goto connerror; @@ -950,12 +956,24 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected"); goto bail; } - Py_BEGIN_ALLOW_THREADS; self->conn = amqp_new_connection(); - self->sockfd = amqp_open_socket(self->hostname, self->port); + amqp_socket_t *socket = amqp_tcp_socket_new(); + if (!socket) goto error; + + int status = 0; + Py_BEGIN_ALLOW_THREADS; + status = amqp_socket_open(socket, self->hostname, self->port); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleError(self->sockfd, "Error opening socket")) + if (status) { + PyRabbitMQ_HandleError(status, "Error opening socket"); goto error; + } + + Py_BEGIN_ALLOW_THREADS; + amqp_set_socket(self->conn, socket); + /* req by rabbitmq-c 0.3.0, 0 is default pagesize */ + Py_END_ALLOW_THREADS; + self->sockfd = amqp_get_sockfd(self->conn); amqp_table_entry_t client_properties[1]; amqp_table_entry_t capabilities[1]; @@ -965,20 +983,23 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) capabilities[0].key = amqp_cstring_bytes("consumer_cancel_notify"); capabilities[0].value.kind = AMQP_FIELD_KIND_BOOLEAN; capabilities[0].value.value.boolean = 1; - capability_table.num_entries = 1; + + capability_table.num_entries = sizeof(capabilities) / sizeof(amqp_table_entry_t); capability_table.entries = capabilities; client_properties[0].key = amqp_cstring_bytes("capabilities"); client_properties[0].value.kind = AMQP_FIELD_KIND_TABLE; client_properties[0].value.value.table = capability_table; - client_property_table.num_entries = 1; + client_property_table.num_entries = sizeof(client_properties) / sizeof(amqp_table_entry_t); client_property_table.entries = client_properties; + /*printf("CLIENT PROPERTY TABLE %s\n", + client_property_table.entries[0].value.value.table.entries[0].key.bytes);*/ + Py_BEGIN_ALLOW_THREADS; - amqp_set_sockfd(self->conn, self->sockfd); reply = amqp_login_with_properties( self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, - &client_properties, + &client_property_table, AMQP_SASL_METHOD_PLAIN, self->userid, self->password ); Py_END_ALLOW_THREADS; @@ -1066,7 +1087,7 @@ PyRabbitMQ_Connection_destroy_channel(PyRabbitMQ_Connection *self, amqp_rpc_reply_t reply; Py_BEGIN_ALLOW_THREADS; reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; return PyRabbitMQ_HandleAMQError(self, channel, reply, "Couldn't close channel"); @@ -1190,7 +1211,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, amqp_frame_t frame; if (!piggyback) { Py_BEGIN_ALLOW_THREADS; - amqp_maybe_release_buffers(conn); + amqp_maybe_release_buffers_on_channel(conn, channel); retval = amqp_simple_wait_frame(conn, &frame); Py_END_ALLOW_THREADS; if (retval < 0) { @@ -1333,7 +1354,9 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; - amqp_table_t bargs = PyDict_ToAMQTable(self->conn, arguments); + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel); + if (channel_pool == NULL) goto bail; + amqp_table_t bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; @@ -1344,7 +1367,7 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(routing_key), bargs); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind")) @@ -1379,7 +1402,9 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; - amqp_table_t uargs = PyDict_ToAMQTable(self->conn, arguments); + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel); + if (channel_pool == NULL) goto bail; + amqp_table_t uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; @@ -1390,7 +1415,7 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(routing_key), uargs); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind")) @@ -1430,7 +1455,7 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self, (amqp_boolean_t)if_empty); if (ok == NULL) reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel, @@ -1468,7 +1493,9 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, &exclusive, &auto_delete, &arguments)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; - amqp_table_t qargs = PyDict_ToAMQTable(self->conn, arguments); + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel); + if (channel_pool == NULL) goto bail; + amqp_table_t qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; @@ -1523,7 +1550,7 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self, ok = amqp_queue_purge(self->conn, channel, PyString_AS_AMQBYTES(queue)); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge")) @@ -1562,7 +1589,9 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((type = Maybe_Unicode(type)) == NULL) goto bail; - amqp_table_t eargs = PyDict_ToAMQTable(self->conn, arguments); + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel); + if (channel_pool == NULL) goto bail; + amqp_table_t eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; @@ -1608,7 +1637,7 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(exchange), (amqp_boolean_t)if_unused); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete")) @@ -1649,7 +1678,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, Py_INCREF(propdict); amqp_basic_properties_t props; memset(&props, 0, sizeof(props)); - if (!PyDict_to_basic_properties(propdict, &props, self->conn)) + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel); + if (channel_pool == NULL) goto bail; + if (!PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool)) goto bail; Py_DECREF(propdict); @@ -1766,7 +1797,7 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self, amqp_basic_cancel(self->conn, channel, PyString_AS_AMQBYTES(consumer_tag)); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel")) @@ -1806,7 +1837,9 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail; - amqp_table_t cargs = PyDict_ToAMQTable(self->conn, arguments); + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel); + if (channel_pool == NULL) goto bail; + amqp_table_t cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; Py_BEGIN_ALLOW_THREADS; @@ -1878,7 +1911,7 @@ PyRabbitMQ_Connection_flow(PyRabbitMQ_Connection *self, Py_BEGIN_ALLOW_THREADS; amqp_channel_flow(self->conn, channel, (amqp_boolean_t)active); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "channel.flow")) @@ -1909,7 +1942,7 @@ PyRabbitMQ_Connection_basic_recover(PyRabbitMQ_Connection *self, Py_BEGIN_ALLOW_THREADS; amqp_basic_recover(self->conn, channel, (amqp_boolean_t)requeue); reply = amqp_get_rpc_reply(self->conn); - amqp_maybe_release_buffers(self->conn); + amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.recover")) diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h index f7dc8c1..89dc72d 100644 --- a/Modules/_librabbitmq/connection.h +++ b/Modules/_librabbitmq/connection.h @@ -153,7 +153,8 @@ typedef struct { int PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, - amqp_connection_state_t); + amqp_connection_state_t, + amqp_pool_t *); /* Connection method sigs */ static PyRabbitMQ_Connection* @@ -81,21 +81,22 @@ def create_builder(): ]) librabbit_files = map(LRMQSRC, [ 'amqp_api.c', - 'amqp_mem.c', - 'amqp_url.c', 'amqp_connection.c', - 'amqp_socket.c', 'amqp_framing.c', + 'amqp_mem.c', + 'amqp_socket.c', 'amqp_table.c', + 'amqp_tcp_socket.c', + 'amqp_timer.c', + 'amqp_url.c', ]) incdirs.append(LRMQDIST()) # for config.h if platform.system() == 'Windows': incdirs.append(LRMQSRC('windows')) - librabbit_files.append(LRMQSRC('windows', 'socket.c')) else: incdirs.append(LRMQSRC('unix')) - librabbit_files.append(LRMQSRC('unix', 'socket.c')) + #librabbit_files.append(LRMQSRC('unix', 'socket.c')) librabbitmq_ext = Extension( '_librabbitmq', |