summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenry Tang <henryykt@gmail.com>2020-03-23 21:08:37 +0800
committerAsif Saif Uddin <auvipy@gmail.com>2020-03-28 18:52:49 +0600
commit13a49b0e26b073f6c5045c353d70901b64e7cc78 (patch)
tree7de0a30fdeff967be01d7815d7290a9c0a3a1a7e
parent9af556dda3f24eb0dd6357fd93c3a2d9af6b4d1d (diff)
downloadlibrabbitmq-13a49b0e26b073f6c5045c353d70901b64e7cc78.tar.gz
Fix memory leaks from "Maybe_Unicode" (fixes #27 #125)
-rw-r--r--Modules/_librabbitmq/connection.c298
-rw-r--r--Modules/_librabbitmq/connection.h15
2 files changed, 262 insertions, 51 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 3ace2e3..4b57a1b 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -82,7 +82,8 @@ _PYRMQ_INLINE int
PyDict_to_basic_properties(PyObject *,
amqp_basic_properties_t *,
amqp_connection_state_t,
- amqp_pool_t *);
+ amqp_pool_t *,
+ pyobject_pool_t *);
_PYRMQ_INLINE void
amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t,
@@ -110,8 +111,8 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int,
void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*);
int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *);
-static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *);
-static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *);
+static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *);
+static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *);
static PyObject* AMQTable_toPyDict(amqp_table_t *table);
static PyObject* AMQArray_toPyList(amqp_array_t *array);
@@ -128,6 +129,26 @@ int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *self)
return 0;
}
+// Keep track of PyObject references with increased reference count
+// Entries are stored in fixed size array.
+#define PYOBJECT_ARRAY_MAX 5
+typedef struct pyobject_array_t {
+ int num_entries;
+ PyObject *entries[PYOBJECT_ARRAY_MAX];
+} pyobject_array_t;
+
+
+static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *);
+static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *, PyObject *);
+static void PyObjectPool_XDECREF(pyobject_pool_t *array);
+
+_PYRMQ_INLINE PyObject* PyObjectPool_Maybe_Unicode(PyObject *, pyobject_pool_t *);
+
+static void PyObjectArray_XDECREF(pyobject_array_t *array);
+
+_PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *, PyObject *obj);
+_PYRMQ_INLINE PyObject* PyObjectArray_Maybe_Unicode(PyObject *, pyobject_array_t *);
+
/* ------: AMQP Utils :--------------------------------------------------- */
_PYRMQ_INLINE amqp_table_entry_t*
@@ -247,7 +268,7 @@ AMQArray_SetIntValue(amqp_array_t *array, int value)
}
static amqp_table_t
-PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool)
+PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool)
{
PyObject *dkey = NULL;
PyObject *dvalue = NULL;
@@ -264,7 +285,8 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
dst.num_entries = 0;
dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_table_entry_t));
while (PyDict_Next(src, &pos, &dkey, &dvalue)) {
- dkey = Maybe_Unicode(dkey);
+ dkey = PyObjectPool_Maybe_Unicode(dkey, pyobj_pool);
+
if (dvalue == Py_None) {
/* None */
AMQTable_SetNilValue(&dst, PyString_AS_AMQBYTES(dkey));
@@ -273,13 +295,13 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
/* Dict */
AMQTable_SetTableValue(&dst,
PyString_AS_AMQBYTES(dkey),
- PyDict_ToAMQTable(conn, dvalue, pool));
+ PyDict_ToAMQTable(conn, dvalue, pool, pyobj_pool));
}
else if (PyList_Check(dvalue) || PyTuple_Check(dvalue)) {
/* List */
AMQTable_SetArrayValue(&dst,
PyString_AS_AMQBYTES(dkey),
- PyIter_ToAMQArray(conn, dvalue, pool));
+ PyIter_ToAMQArray(conn, dvalue, pool, pyobj_pool));
}
else if (PyBool_Check(dvalue)) {
/* Bool */
@@ -322,6 +344,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
if (is_unicode) {
if ((dvalue = PyUnicode_AsEncodedString(dvalue, "utf-8", "strict")) == NULL)
goto error;
+ PyObjectPool_AddEntry(pyobj_pool, dvalue);
}
AMQTable_SetStringValue(&dst,
PyString_AS_AMQBYTES(dkey),
@@ -344,7 +367,7 @@ error:
}
static amqp_array_t
-PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool)
+PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool)
{
Py_ssize_t pos = 0;
uint64_t clong_value = 0;
@@ -370,12 +393,12 @@ PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
else if (PyDict_Check(item)) {
/* Dict */
AMQArray_SetTableValue(
- &dst, PyDict_ToAMQTable(conn, item, pool));
+ &dst, PyDict_ToAMQTable(conn, item, pool, pyobj_pool));
}
else if (PyList_Check(item) || PyTuple_Check(item)) {
/* List */
AMQArray_SetArrayValue(
- &dst, PyIter_ToAMQArray(conn, item, pool));
+ &dst, PyIter_ToAMQArray(conn, item, pool, pyobj_pool));
}
else if (PyLong_Check(item) || PyInt_Check(item)) {
/* Int | Long */
@@ -389,6 +412,7 @@ PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
if (is_unicode) {
if ((item = PyUnicode_AsASCIIString(item)) == NULL)
goto item_error;
+ PyObjectPool_AddEntry(pyobj_pool, item);
}
AMQArray_SetStringValue(
&dst, PyString_AS_AMQBYTES(item));
@@ -681,54 +705,55 @@ _PYRMQ_INLINE int
PyDict_to_basic_properties(PyObject *p,
amqp_basic_properties_t *props,
amqp_connection_state_t conn,
- amqp_pool_t *pool)
+ amqp_pool_t *pool,
+ pyobject_pool_t *pyobj_pool)
{
PyObject *value = NULL;
props->headers = amqp_empty_table;
props->_flags = AMQP_BASIC_HEADERS_FLAG;
if ((value = PyDict_GetItemString(p, "content_type")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->content_type = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
}
if ((value = PyDict_GetItemString(p, "content_encoding")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->content_encoding = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
}
if ((value = PyDict_GetItemString(p, "correlation_id")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->correlation_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
}
if ((value = PyDict_GetItemString(p, "reply_to")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->reply_to = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_REPLY_TO_FLAG;
}
if ((value = PyDict_GetItemString(p, "expiration")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->expiration = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_EXPIRATION_FLAG;
}
if ((value = PyDict_GetItemString(p, "message_id")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->message_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_MESSAGE_ID_FLAG;
}
if ((value = PyDict_GetItemString(p, "type")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->type = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_TYPE_FLAG;
}
if ((value = PyDict_GetItemString(p, "user_id")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->user_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_USER_ID_FLAG;
}
if ((value = PyDict_GetItemString(p, "app_id")) != NULL) {
- if ((value = Maybe_Unicode(value)) == NULL) return -1;
+ if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
props->app_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_APP_ID_FLAG;
}
@@ -746,7 +771,7 @@ PyDict_to_basic_properties(PyObject *p,
}
if ((value = PyDict_GetItemString(p, "headers")) != NULL) {
- props->headers = PyDict_ToAMQTable(conn, value, pool);
+ props->headers = PyDict_ToAMQTable(conn, value, pool, pyobj_pool);
if (PyErr_Occurred()) return -1;
}
return 1;
@@ -780,6 +805,93 @@ amqp_basic_deliver_to_PyDict(PyObject *dest,
return;
}
+/* ------: Keep track of increased reference counts :------------------------ */
+
+_PYRMQ_INLINE PyObject*
+PyObjectArray_Maybe_Unicode(PyObject *s, pyobject_array_t *pyobj_pool)
+{
+ if (PyUnicode_Check(s)) {
+ return PyObjectArray_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s));
+ }
+ return s;
+}
+
+_PYRMQ_INLINE PyObject*
+PyObjectPool_Maybe_Unicode(PyObject *s, pyobject_pool_t *pyobj_pool)
+{
+ if (PyUnicode_Check(s)) {
+ return PyObjectPool_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s));
+ }
+ return s;
+}
+
+_PYRMQ_INLINE PyObject*
+PyObjectArray_AddEntry(pyobject_array_t *array, PyObject *obj)
+{
+ if (array->num_entries < PYOBJECT_ARRAY_MAX) {
+ array->entries[array->num_entries] = obj;
+ array->num_entries++;
+ }
+
+ return obj;
+}
+
+static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *array, PyObject *obj)
+{
+ if (array->num_entries == PYOBJECT_POOL_MAX) {
+ if (!array->next)
+ array->next = PyObjectPool_New(array->pool);
+
+ PyObjectPool_AddEntry(array->next, obj);
+ } else {
+ array->entries[array->num_entries] = obj;
+ array->num_entries++;
+ }
+
+ return obj;
+}
+
+static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *pool)
+{
+ pyobject_pool_t *array = amqp_pool_alloc(pool, sizeof(pyobject_pool_t));
+ array->num_entries = 0;
+ array->entries = amqp_pool_alloc(
+ pool, PYOBJECT_POOL_MAX * sizeof(PyObject *));
+ array->pool = pool;
+ array->next = (pyobject_pool_t *) 0;
+
+ return array;
+}
+
+static void PyObjectPool_XDECREF(pyobject_pool_t *array)
+{
+ int i;
+
+ if (!array)
+ return;
+
+ for (i = 0; i < array->num_entries; ++i) {
+ Py_XDECREF(array->entries[i]);
+ }
+
+ array->num_entries = 0;
+
+ if (array->next) {
+ PyObjectPool_XDECREF(array->next);
+ }
+}
+
+static void PyObjectArray_XDECREF(pyobject_array_t *array)
+{
+ int i;
+
+ for (i = 0; i < array->num_entries; ++i) {
+ Py_XDECREF(array->entries[i]);
+ }
+
+ array->num_entries = 0;
+}
+
/* ------: Error Handlers :----------------------------------------------- */
int PyRabbitMQ_HandleError(int ret, char const *context)
@@ -1051,6 +1163,8 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
amqp_pool_t pool;
amqp_table_t properties;
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (self->connected) {
PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected");
goto bail;
@@ -1076,12 +1190,14 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
if (self->client_properties != NULL && PyDict_Check(self->client_properties)) {
init_amqp_pool(&pool, self->frame_max);
- properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool);
+ pyobj_pool = PyObjectPool_New(&pool);
+ properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool, pyobj_pool);
reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
&properties,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
+ PyObjectPool_XDECREF(pyobj_pool);
} else {
reply = amqp_login(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
@@ -1103,8 +1219,9 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
error:
PyRabbitMQ_Connection_close(self);
bail:
- return 0;
+ PyObjectPool_XDECREF(pyobj_pool);
+ return 0;
}
@@ -1473,22 +1590,26 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
amqp_pool_t *channel_pool = NULL;
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
if (!PyArg_ParseTuple(args, "IOOOO",
&channel, &queue, &exchange, &routing_key, &arguments))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
- if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
- if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
+ if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail;
+ if ((routing_key = PyObjectArray_Maybe_Unicode(routing_key, &pyobj_array)) == NULL) goto bail;
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
if (channel_pool == NULL) {
PyErr_NoMemory();
goto bail;
}
- bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
+ pyobj_pool = PyObjectPool_New(channel_pool);
+ bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
if (PyErr_Occurred())
goto bail;
@@ -1499,7 +1620,9 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(routing_key),
bargs);
reply = amqp_get_rpc_reply(self->conn);
+ PyObjectPool_XDECREF(pyobj_pool);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind"))
@@ -1507,6 +1630,9 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
Py_RETURN_NONE;
bail:
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1528,22 +1654,26 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
amqp_pool_t *channel_pool = NULL;
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
if (!PyArg_ParseTuple(args, "IOOOO",
&channel, &queue, &exchange, &routing_key, &arguments))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
- if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
- if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
+ if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail;
+ if ((routing_key = PyObjectArray_Maybe_Unicode(routing_key, &pyobj_array)) == NULL) goto bail;
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
if (channel_pool == NULL) {
PyErr_NoMemory();
goto bail;
}
- uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
+ pyobj_pool = PyObjectPool_New(channel_pool);
+ uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
if (PyErr_Occurred())
goto bail;
@@ -1554,7 +1684,9 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(routing_key),
uargs);
reply = amqp_get_rpc_reply(self->conn);
+ PyObjectPool_XDECREF(pyobj_pool);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind"))
@@ -1562,6 +1694,9 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
Py_RETURN_NONE;
bail:
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1580,13 +1715,15 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self,
amqp_queue_delete_ok_t *ok;
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
if (!PyArg_ParseTuple(args, "IOII",
&channel, &queue, &if_unused, &if_empty))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
Py_BEGIN_ALLOW_THREADS;
ok = amqp_queue_delete(self->conn, channel,
@@ -1596,6 +1733,7 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self,
if (ok == NULL)
reply = amqp_get_rpc_reply(self->conn);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel,
@@ -1604,6 +1742,7 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self,
return PyInt_FromLong((long)ok->message_count);
bail:
+ PyObjectArray_XDECREF(&pyobj_array);
return 0;
}
@@ -1629,6 +1768,9 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
amqp_table_t qargs = amqp_empty_table;
PyObject *ret = NULL;
+ pyobject_array_t pyobj_array = {0};
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1636,13 +1778,14 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
&channel, &queue, &passive, &durable,
&exclusive, &auto_delete, &arguments))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
if (channel_pool == NULL) {
PyErr_NoMemory();
goto bail;
}
- qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
+ pyobj_pool = PyObjectPool_New(channel_pool);
+ qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
if (PyErr_Occurred())
goto bail;
@@ -1656,6 +1799,8 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
qargs
);
reply = amqp_get_rpc_reply(self->conn);
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare"))
@@ -1667,6 +1812,9 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count));
return ret;
bail:
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1685,18 +1833,21 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self,
amqp_queue_purge_ok_t *ok;
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
if (!PyArg_ParseTuple(args, "IOI", &channel, &queue, &no_wait))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
Py_BEGIN_ALLOW_THREADS;
ok = amqp_queue_purge(self->conn, channel,
PyString_AS_AMQBYTES(queue));
reply = amqp_get_rpc_reply(self->conn);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge"))
@@ -1704,6 +1855,7 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self,
return PyInt_FromLong((long)ok->message_count);
bail:
+ PyObjectArray_XDECREF(&pyobj_array);
return 0;
}
@@ -1729,6 +1881,9 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
amqp_pool_t *channel_pool = NULL;
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1736,14 +1891,17 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
&channel, &exchange, &type, &passive,
&durable, &auto_delete, &arguments))
goto bail;
- if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
- if ((type = Maybe_Unicode(type)) == NULL) goto bail;
+ if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail;
+ if ((type = PyObjectArray_Maybe_Unicode(type, &pyobj_array)) == NULL) goto bail;
+
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
if (channel_pool == NULL) {
PyErr_NoMemory();
goto bail;
}
- eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
+
+ pyobj_pool = PyObjectPool_New(channel_pool);
+ eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
if (PyErr_Occurred())
goto bail;
@@ -1756,12 +1914,17 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
0, 0, eargs
);
reply = amqp_get_rpc_reply(self->conn);
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare"))
goto bail;
Py_RETURN_NONE;
bail:
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1778,12 +1941,14 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self,
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
if (!PyArg_ParseTuple(args, "IOI", &channel, &exchange, &if_unused))
goto bail;
- if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
+ if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail;
Py_BEGIN_ALLOW_THREADS;
amqp_exchange_delete(self->conn, channel,
@@ -1791,6 +1956,7 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self,
(amqp_boolean_t)if_unused);
reply = amqp_get_rpc_reply(self->conn);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete"))
@@ -1798,6 +1964,8 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self,
Py_RETURN_NONE;
bail:
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1824,6 +1992,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
amqp_pool_t *channel_pool = NULL;
memset(&props, 0, sizeof(props));
+ pyobject_array_t pyobj_array = {0};
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1831,12 +2002,15 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
&channel, &body_buf, &body_size, &exchange, &routing_key,
&propdict, &mandatory, &immediate))
goto bail;
- if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail;
- if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail;
+
+ if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail;
+ if ((routing_key = PyObjectArray_Maybe_Unicode(routing_key, &pyobj_array)) == NULL) goto bail;
Py_INCREF(propdict);
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
- if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool) < 1) {
+
+ pyobj_pool = PyObjectPool_New(channel_pool);
+ if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool, pyobj_pool) < 1) {
goto bail;
}
Py_DECREF(propdict);
@@ -1852,7 +2026,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
(amqp_boolean_t)immediate,
&props,
bytes);
+ PyObjectPool_XDECREF(pyobj_pool);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (!PyRabbitMQ_HandleError(ret, "basic.publish")) {
@@ -1863,6 +2039,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
error:
PyRabbitMQ_revive_channel(self, channel);
bail:
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1949,18 +2128,21 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self,
amqp_basic_cancel_ok_t *ok;
amqp_rpc_reply_t reply;
+ pyobject_array_t pyobj_array = {0};
+
if (PyRabbitMQ_Not_Connected(self))
goto ok;
if (!PyArg_ParseTuple(args, "IO", &channel, &consumer_tag))
goto bail;
- if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail;
+ if ((consumer_tag = PyObjectArray_Maybe_Unicode(consumer_tag, &pyobj_array)) == NULL) goto bail;
Py_BEGIN_ALLOW_THREADS;
ok = amqp_basic_cancel(self->conn, channel,
PyString_AS_AMQBYTES(consumer_tag));
reply = amqp_get_rpc_reply(self->conn);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel"))
@@ -1969,6 +2151,8 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self,
ok:
Py_RETURN_NONE;
bail:
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -1993,6 +2177,9 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
amqp_pool_t *channel_pool = NULL;
amqp_table_t cargs = amqp_empty_table;
+ pyobject_array_t pyobj_array = {0};
+ pyobject_pool_t *pyobj_pool = NULL;
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -2000,16 +2187,19 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
&channel, &queue, &consumer_tag, &no_local,
&no_ack, &exclusive, &arguments))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
- if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
+ if ((consumer_tag = PyObjectArray_Maybe_Unicode(consumer_tag, &pyobj_array)) == NULL) goto bail;
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
if (channel_pool == NULL) {
PyErr_NoMemory();
goto bail;
}
- cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool);
- if (PyErr_Occurred()) goto bail;
+
+ pyobj_pool = PyObjectPool_New(channel_pool);
+ cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
+ if (PyErr_Occurred())
+ goto bail;
Py_BEGIN_ALLOW_THREADS;
ok = amqp_basic_consume(self->conn, channel,
@@ -2020,6 +2210,8 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
exclusive,
cargs);
reply = amqp_get_rpc_reply(self->conn);
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume"))
@@ -2027,6 +2219,9 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
return PySTRING_FROM_AMQBYTES(ok->consumer_tag);
bail:
+ PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
}
@@ -2192,17 +2387,20 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self,
PyObject *delivery_info = NULL;
PyObject *message_count = NULL;
+ pyobject_array_t pyobj_array = {0};
+
if (PyRabbitMQ_Not_Connected(self))
goto bail;
if (!PyArg_ParseTuple(args, "IOI", &channel, &queue, &no_ack))
goto bail;
- if ((queue = Maybe_Unicode(queue)) == NULL) goto bail;
+ if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail;
Py_BEGIN_ALLOW_THREADS;
reply = amqp_basic_get(self->conn, channel,
PyString_AS_AMQBYTES(queue),
(amqp_boolean_t)no_ack);
+ PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.get"))
@@ -2225,7 +2423,7 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self,
message_count = PyLong_FromLong(ok->message_count);
PyDict_SetItemString(delivery_info, "message_count", message_count);
Py_XDECREF(message_count);
-
+
if (amqp_data_in_buffer(self->conn)) {
if (PyRabbitMQ_recv(self, p, self->conn, 1) < 0) {
if (!PyErr_Occurred())
@@ -2240,6 +2438,8 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self,
error:
PyRabbitMQ_Connection_close(self);
bail:
+ PyObjectArray_XDECREF(&pyobj_array);
+
return 0;
empty:
Py_RETURN_NONE;
diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h
index 273ae75..c7e52e4 100644
--- a/Modules/_librabbitmq/connection.h
+++ b/Modules/_librabbitmq/connection.h
@@ -19,7 +19,7 @@
#else
#define PYRABBITMQ_MOD_INIT(name) PyMODINIT_FUNC init##name(void)
#endif
-
+
#if PY_VERSION_HEX >= 0x03000000 /* 3.0 and up */
# define BUILD_METHOD_NAME PyUnicode_FromString
@@ -172,9 +172,20 @@ typedef struct {
PyObject *weakreflist;
} PyRabbitMQ_Connection;
+// Keep track of PyObject references with increased reference count.
+// Entries are stored in the channel pool.
+#define PYOBJECT_POOL_MAX 100
+typedef struct pyobject_pool_t_ {
+ int num_entries;
+ PyObject **entries;
+ amqp_pool_t *pool;
+ struct pyobject_pool_t_ *next;
+} pyobject_pool_t;
+
int
PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *,
- amqp_connection_state_t, amqp_pool_t *);
+ amqp_connection_state_t, amqp_pool_t *,
+ pyobject_pool_t *);
/* Connection method sigs */
static PyRabbitMQ_Connection*