summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-07-04 14:51:55 +0100
committerAsk Solem <ask@celeryproject.org>2013-07-04 14:51:55 +0100
commit0839a7dd23cd197607d6efe699d24945484a3003 (patch)
treef784748fc8b2980401498d70667f78fb65c687ff
parent906505c11f8b026a34c937939c7ac581782cba7a (diff)
downloadlibrabbitmq-rabbitc-latest.tar.gz
Trying to udpate code to new changes in rabbitmq-crabbitc-latest
-rw-r--r--Makefile3
-rw-r--r--Modules/_librabbitmq/connection.c111
-rw-r--r--Modules/_librabbitmq/connection.h3
-rw-r--r--setup.py11
4 files changed, 82 insertions, 46 deletions
diff --git a/Makefile b/Makefile
index c32cdd7..9c7ef91 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,8 @@
RABBIT_DIR=rabbitmq-c
CODEGEN_DIR=rabbitmq-codegen
RABBIT_TARGET=clib
-RABBIT_DIST=librabbitmq-0.2.0
+RABBIT_VERSION=0.3.0
+RABBIT_DIST=librabbitmq-$(RABBIT_VERSION)
CONFIGURE_ARGS=--disable-tools --disable-docs --enable-regen-amqp-framing
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 29f5076..9b4f14f 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -9,6 +9,7 @@
#include <sys/time.h>
#include <amqp.h>
+#include <amqp_tcp_socket.h>
#include "connection.h"
#include "distmeta.h"
@@ -71,7 +72,8 @@ basic_properties_to_PyDict(amqp_basic_properties_t*, PyObject*);
_PYRMQ_INLINE int
PyDict_to_basic_properties(PyObject *,
amqp_basic_properties_t *,
- amqp_connection_state_t );
+ amqp_connection_state_t,
+ amqp_pool_t *);
_PYRMQ_INLINE void
amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t,
@@ -99,8 +101,10 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int,
void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*);
int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *);
-static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *);
-static amqp_array_t PyList_ToAMQArray(amqp_connection_state_t, PyObject *);
+static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *,
+ amqp_pool_t *);
+static amqp_array_t PyList_ToAMQArray(amqp_connection_state_t, PyObject *,
+ amqp_pool_t *);
static PyObject* AMQTable_toPyDict(amqp_table_t *table);
static PyObject* AMQArray_toPyList(amqp_array_t *array);
@@ -213,17 +217,18 @@ AMQArray_SetIntValue(amqp_array_t *array, int value)
}
static amqp_table_t
-PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src)
+PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *channel_pool)
{
PY_SIZE_TYPE pos = 0;
PY_SIZE_TYPE size = PyDict_Size(src);
/* allocate new table */
- amqp_table_t dst = {.num_entries = 0,
+ /*amqp_table_t dst = {.num_entries = 0,
.entries=amqp_pool_alloc(&conn->frame_pool,
- size * sizeof(amqp_table_entry_t))};
+ size * sizeof(amqp_table_entry_t))};*/
+ amqp_table_t dst;
dst.num_entries = 0;
- dst.entries = amqp_pool_alloc(&conn->frame_pool,
+ dst.entries = amqp_pool_alloc(&channel_pool,
size * sizeof(amqp_table_entry_t));
PyObject *dkey = NULL;
@@ -234,13 +239,13 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src)
/* Dict */
AMQTable_SetTableValue(&dst,
PyString_AS_AMQBYTES(dkey),
- PyDict_ToAMQTable(conn, dvalue));
+ PyDict_ToAMQTable(conn, dvalue, channel_pool));
}
else if (PyList_Check(dvalue)) {
/* List */
AMQTable_SetArrayValue(&dst,
PyString_AS_AMQBYTES(dkey),
- PyList_ToAMQArray(conn, dvalue));
+ PyList_ToAMQArray(conn, dvalue, channel_pool));
}
else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) {
/* Int | Long */
@@ -293,11 +298,11 @@ error:
}
static amqp_array_t
-PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src)
+PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *channel_pool)
{
PY_SIZE_TYPE size = PyList_Size(src);
amqp_array_t dst = {.num_entries = 0,
- .entries=amqp_pool_alloc(&conn->frame_pool,
+ .entries=amqp_pool_alloc(&channel_pool,
size * sizeof(amqp_field_value_t))};
for (PY_SIZE_TYPE pos = 0; pos < size; ++pos) {
@@ -306,12 +311,12 @@ PyList_ToAMQArray(amqp_connection_state_t conn, PyObject *src)
if (PyDict_Check(dvalue)) {
/* Dict */
AMQArray_SetTableValue(&dst,
- PyDict_ToAMQTable(conn, dvalue));
+ PyDict_ToAMQTable(conn, dvalue, channel_pool));
}
else if (PyList_Check(dvalue)) {
/* List */
AMQArray_SetArrayValue(&dst,
- PyList_ToAMQArray(conn, dvalue));
+ PyList_ToAMQArray(conn, dvalue, channel_pool));
}
else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) {
/* Int | Long */
@@ -619,7 +624,8 @@ AMQArray_toPyList(amqp_array_t *array)
_PYRMQ_INLINE int
PyDict_to_basic_properties(PyObject *p,
amqp_basic_properties_t *props,
- amqp_connection_state_t conn)
+ amqp_connection_state_t conn,
+ amqp_pool_t* channel_pool)
{
PyObject *value = NULL;
props->headers = AMQP_EMPTY_TABLE;
@@ -684,7 +690,7 @@ PyDict_to_basic_properties(PyObject *p,
}
if ((value = PyDict_GetItemString(p, "headers")) != NULL) {
- props->headers = PyDict_ToAMQTable(conn, value);
+ props->headers = PyDict_ToAMQTable(conn, value, channel_pool);
if (PyErr_Occurred())
return -1;
}
@@ -772,7 +778,7 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *self, unsigned int channel,
snprintf(errorstr, sizeof(errorstr), "%s: %s",
context,
reply.library_error
- ? amqp_error_string(reply.library_error)
+ ? amqp_error_string2(reply.library_error)
: "(end-of-stream)");
goto connerror;
@@ -950,12 +956,24 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected");
goto bail;
}
- Py_BEGIN_ALLOW_THREADS;
self->conn = amqp_new_connection();
- self->sockfd = amqp_open_socket(self->hostname, self->port);
+ amqp_socket_t *socket = amqp_tcp_socket_new();
+ if (!socket) goto error;
+
+ int status = 0;
+ Py_BEGIN_ALLOW_THREADS;
+ status = amqp_socket_open(socket, self->hostname, self->port);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleError(self->sockfd, "Error opening socket"))
+ if (status) {
+ PyRabbitMQ_HandleError(status, "Error opening socket");
goto error;
+ }
+
+ Py_BEGIN_ALLOW_THREADS;
+ amqp_set_socket(self->conn, socket);
+ /* req by rabbitmq-c 0.3.0, 0 is default pagesize */
+ Py_END_ALLOW_THREADS;
+ self->sockfd = amqp_get_sockfd(self->conn);
amqp_table_entry_t client_properties[1];
amqp_table_entry_t capabilities[1];
@@ -965,20 +983,23 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
capabilities[0].key = amqp_cstring_bytes("consumer_cancel_notify");
capabilities[0].value.kind = AMQP_FIELD_KIND_BOOLEAN;
capabilities[0].value.value.boolean = 1;
- capability_table.num_entries = 1;
+
+ capability_table.num_entries = sizeof(capabilities) / sizeof(amqp_table_entry_t);
capability_table.entries = capabilities;
client_properties[0].key = amqp_cstring_bytes("capabilities");
client_properties[0].value.kind = AMQP_FIELD_KIND_TABLE;
client_properties[0].value.value.table = capability_table;
- client_property_table.num_entries = 1;
+ client_property_table.num_entries = sizeof(client_properties) / sizeof(amqp_table_entry_t);
client_property_table.entries = client_properties;
+ /*printf("CLIENT PROPERTY TABLE %s\n",
+ client_property_table.entries[0].value.value.table.entries[0].key.bytes);*/
+
Py_BEGIN_ALLOW_THREADS;
- amqp_set_sockfd(self->conn, self->sockfd);
reply = amqp_login_with_properties(
self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
- &client_properties,
+ &client_property_table,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password
);
Py_END_ALLOW_THREADS;
@@ -1066,7 +1087,7 @@ PyRabbitMQ_Connection_destroy_channel(PyRabbitMQ_Connection *self,
amqp_rpc_reply_t reply;
Py_BEGIN_ALLOW_THREADS;
reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
return PyRabbitMQ_HandleAMQError(self, channel, reply, "Couldn't close channel");
@@ -1190,7 +1211,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
amqp_frame_t frame;
if (!piggyback) {
Py_BEGIN_ALLOW_THREADS;
- amqp_maybe_release_buffers(conn);
+ amqp_maybe_release_buffers_on_channel(conn, channel);
retval = amqp_simple_wait_frame(conn, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) {
@@ -1333,7 +1354,9 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
- amqp_table_t bargs = PyDict_ToAMQTable(self->conn, arguments);
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel);
+ if (channel_pool == NULL) goto bail;
+ amqp_table_t bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
if (PyErr_Occurred())
goto bail;
@@ -1344,7 +1367,7 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(routing_key),
bargs);
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind"))
@@ -1379,7 +1402,9 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
- amqp_table_t uargs = PyDict_ToAMQTable(self->conn, arguments);
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel);
+ if (channel_pool == NULL) goto bail;
+ amqp_table_t uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
if (PyErr_Occurred())
goto bail;
@@ -1390,7 +1415,7 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(routing_key),
uargs);
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind"))
@@ -1430,7 +1455,7 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self,
(amqp_boolean_t)if_empty);
if (ok == NULL)
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel,
@@ -1468,7 +1493,9 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
&exclusive, &auto_delete, &arguments))
goto bail;
if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
- amqp_table_t qargs = PyDict_ToAMQTable(self->conn, arguments);
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel);
+ if (channel_pool == NULL) goto bail;
+ amqp_table_t qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
if (PyErr_Occurred())
goto bail;
@@ -1523,7 +1550,7 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self,
ok = amqp_queue_purge(self->conn, channel,
PyString_AS_AMQBYTES(queue));
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge"))
@@ -1562,7 +1589,9 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
goto bail;
if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
if ((type = Maybe_Unicode(type)) == NULL) goto bail;
- amqp_table_t eargs = PyDict_ToAMQTable(self->conn, arguments);
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel);
+ if (channel_pool == NULL) goto bail;
+ amqp_table_t eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
if (PyErr_Occurred())
goto bail;
@@ -1608,7 +1637,7 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(exchange),
(amqp_boolean_t)if_unused);
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete"))
@@ -1649,7 +1678,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
Py_INCREF(propdict);
amqp_basic_properties_t props;
memset(&props, 0, sizeof(props));
- if (!PyDict_to_basic_properties(propdict, &props, self->conn))
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel);
+ if (channel_pool == NULL) goto bail;
+ if (!PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool))
goto bail;
Py_DECREF(propdict);
@@ -1766,7 +1797,7 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self,
amqp_basic_cancel(self->conn, channel,
PyString_AS_AMQBYTES(consumer_tag));
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel"))
@@ -1806,7 +1837,9 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail;
- amqp_table_t cargs = PyDict_ToAMQTable(self->conn, arguments);
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(self->conn, channel);
+ if (channel_pool == NULL) goto bail;
+ amqp_table_t cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
if (PyErr_Occurred()) goto bail;
Py_BEGIN_ALLOW_THREADS;
@@ -1878,7 +1911,7 @@ PyRabbitMQ_Connection_flow(PyRabbitMQ_Connection *self,
Py_BEGIN_ALLOW_THREADS;
amqp_channel_flow(self->conn, channel, (amqp_boolean_t)active);
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "channel.flow"))
@@ -1909,7 +1942,7 @@ PyRabbitMQ_Connection_basic_recover(PyRabbitMQ_Connection *self,
Py_BEGIN_ALLOW_THREADS;
amqp_basic_recover(self->conn, channel, (amqp_boolean_t)requeue);
reply = amqp_get_rpc_reply(self->conn);
- amqp_maybe_release_buffers(self->conn);
+ amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.recover"))
diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h
index f7dc8c1..89dc72d 100644
--- a/Modules/_librabbitmq/connection.h
+++ b/Modules/_librabbitmq/connection.h
@@ -153,7 +153,8 @@ typedef struct {
int
PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *,
- amqp_connection_state_t);
+ amqp_connection_state_t,
+ amqp_pool_t *);
/* Connection method sigs */
static PyRabbitMQ_Connection*
diff --git a/setup.py b/setup.py
index 6778dde..30236d8 100644
--- a/setup.py
+++ b/setup.py
@@ -81,21 +81,22 @@ def create_builder():
])
librabbit_files = map(LRMQSRC, [
'amqp_api.c',
- 'amqp_mem.c',
- 'amqp_url.c',
'amqp_connection.c',
- 'amqp_socket.c',
'amqp_framing.c',
+ 'amqp_mem.c',
+ 'amqp_socket.c',
'amqp_table.c',
+ 'amqp_tcp_socket.c',
+ 'amqp_timer.c',
+ 'amqp_url.c',
])
incdirs.append(LRMQDIST()) # for config.h
if platform.system() == 'Windows':
incdirs.append(LRMQSRC('windows'))
- librabbit_files.append(LRMQSRC('windows', 'socket.c'))
else:
incdirs.append(LRMQSRC('unix'))
- librabbit_files.append(LRMQSRC('unix', 'socket.c'))
+ #librabbit_files.append(LRMQSRC('unix', 'socket.c'))
librabbitmq_ext = Extension(
'_librabbitmq',