From 13a49b0e26b073f6c5045c353d70901b64e7cc78 Mon Sep 17 00:00:00 2001 From: Henry Tang Date: Mon, 23 Mar 2020 21:08:37 +0800 Subject: Fix memory leaks from "Maybe_Unicode" (fixes #27 #125) --- Modules/_librabbitmq/connection.c | 298 +++++++++++++++++++++++++++++++------- Modules/_librabbitmq/connection.h | 15 +- 2 files changed, 262 insertions(+), 51 deletions(-) diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 3ace2e3..4b57a1b 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -82,7 +82,8 @@ _PYRMQ_INLINE int PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, amqp_connection_state_t, - amqp_pool_t *); + amqp_pool_t *, + pyobject_pool_t *); _PYRMQ_INLINE void amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t, @@ -110,8 +111,8 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int, void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*); int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *); -static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *); -static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *); +static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *); +static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *); static PyObject* AMQTable_toPyDict(amqp_table_t *table); static PyObject* AMQArray_toPyList(amqp_array_t *array); @@ -128,6 +129,26 @@ int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *self) return 0; } +// Keep track of PyObject references with increased reference count +// Entries are stored in fixed size array. +#define PYOBJECT_ARRAY_MAX 5 +typedef struct pyobject_array_t { + int num_entries; + PyObject *entries[PYOBJECT_ARRAY_MAX]; +} pyobject_array_t; + + +static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *); +static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *, PyObject *); +static void PyObjectPool_XDECREF(pyobject_pool_t *array); + +_PYRMQ_INLINE PyObject* PyObjectPool_Maybe_Unicode(PyObject *, pyobject_pool_t *); + +static void PyObjectArray_XDECREF(pyobject_array_t *array); + +_PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *, PyObject *obj); +_PYRMQ_INLINE PyObject* PyObjectArray_Maybe_Unicode(PyObject *, pyobject_array_t *); + /* ------: AMQP Utils :--------------------------------------------------- */ _PYRMQ_INLINE amqp_table_entry_t* @@ -247,7 +268,7 @@ AMQArray_SetIntValue(amqp_array_t *array, int value) } static amqp_table_t -PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool) +PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool) { PyObject *dkey = NULL; PyObject *dvalue = NULL; @@ -264,7 +285,8 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool dst.num_entries = 0; dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_table_entry_t)); while (PyDict_Next(src, &pos, &dkey, &dvalue)) { - dkey = Maybe_Unicode(dkey); + dkey = PyObjectPool_Maybe_Unicode(dkey, pyobj_pool); + if (dvalue == Py_None) { /* None */ AMQTable_SetNilValue(&dst, PyString_AS_AMQBYTES(dkey)); @@ -273,13 +295,13 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool /* Dict */ AMQTable_SetTableValue(&dst, PyString_AS_AMQBYTES(dkey), - PyDict_ToAMQTable(conn, dvalue, pool)); + PyDict_ToAMQTable(conn, dvalue, pool, pyobj_pool)); } else if (PyList_Check(dvalue) || PyTuple_Check(dvalue)) { /* List */ AMQTable_SetArrayValue(&dst, PyString_AS_AMQBYTES(dkey), - PyIter_ToAMQArray(conn, dvalue, pool)); + PyIter_ToAMQArray(conn, dvalue, pool, pyobj_pool)); } else if (PyBool_Check(dvalue)) { /* Bool */ @@ -322,6 +344,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool if (is_unicode) { if ((dvalue = PyUnicode_AsEncodedString(dvalue, "utf-8", "strict")) == NULL) goto error; + PyObjectPool_AddEntry(pyobj_pool, dvalue); } AMQTable_SetStringValue(&dst, PyString_AS_AMQBYTES(dkey), @@ -344,7 +367,7 @@ error: } static amqp_array_t -PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool) +PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool) { Py_ssize_t pos = 0; uint64_t clong_value = 0; @@ -370,12 +393,12 @@ PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool else if (PyDict_Check(item)) { /* Dict */ AMQArray_SetTableValue( - &dst, PyDict_ToAMQTable(conn, item, pool)); + &dst, PyDict_ToAMQTable(conn, item, pool, pyobj_pool)); } else if (PyList_Check(item) || PyTuple_Check(item)) { /* List */ AMQArray_SetArrayValue( - &dst, PyIter_ToAMQArray(conn, item, pool)); + &dst, PyIter_ToAMQArray(conn, item, pool, pyobj_pool)); } else if (PyLong_Check(item) || PyInt_Check(item)) { /* Int | Long */ @@ -389,6 +412,7 @@ PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool if (is_unicode) { if ((item = PyUnicode_AsASCIIString(item)) == NULL) goto item_error; + PyObjectPool_AddEntry(pyobj_pool, item); } AMQArray_SetStringValue( &dst, PyString_AS_AMQBYTES(item)); @@ -681,54 +705,55 @@ _PYRMQ_INLINE int PyDict_to_basic_properties(PyObject *p, amqp_basic_properties_t *props, amqp_connection_state_t conn, - amqp_pool_t *pool) + amqp_pool_t *pool, + pyobject_pool_t *pyobj_pool) { PyObject *value = NULL; props->headers = amqp_empty_table; props->_flags = AMQP_BASIC_HEADERS_FLAG; if ((value = PyDict_GetItemString(p, "content_type")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->content_type = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; } if ((value = PyDict_GetItemString(p, "content_encoding")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->content_encoding = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; } if ((value = PyDict_GetItemString(p, "correlation_id")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->correlation_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CORRELATION_ID_FLAG; } if ((value = PyDict_GetItemString(p, "reply_to")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->reply_to = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_REPLY_TO_FLAG; } if ((value = PyDict_GetItemString(p, "expiration")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->expiration = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_EXPIRATION_FLAG; } if ((value = PyDict_GetItemString(p, "message_id")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->message_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_MESSAGE_ID_FLAG; } if ((value = PyDict_GetItemString(p, "type")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->type = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_TYPE_FLAG; } if ((value = PyDict_GetItemString(p, "user_id")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->user_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_USER_ID_FLAG; } if ((value = PyDict_GetItemString(p, "app_id")) != NULL) { - if ((value = Maybe_Unicode(value)) == NULL) return -1; + if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; props->app_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_APP_ID_FLAG; } @@ -746,7 +771,7 @@ PyDict_to_basic_properties(PyObject *p, } if ((value = PyDict_GetItemString(p, "headers")) != NULL) { - props->headers = PyDict_ToAMQTable(conn, value, pool); + props->headers = PyDict_ToAMQTable(conn, value, pool, pyobj_pool); if (PyErr_Occurred()) return -1; } return 1; @@ -780,6 +805,93 @@ amqp_basic_deliver_to_PyDict(PyObject *dest, return; } +/* ------: Keep track of increased reference counts :------------------------ */ + +_PYRMQ_INLINE PyObject* +PyObjectArray_Maybe_Unicode(PyObject *s, pyobject_array_t *pyobj_pool) +{ + if (PyUnicode_Check(s)) { + return PyObjectArray_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s)); + } + return s; +} + +_PYRMQ_INLINE PyObject* +PyObjectPool_Maybe_Unicode(PyObject *s, pyobject_pool_t *pyobj_pool) +{ + if (PyUnicode_Check(s)) { + return PyObjectPool_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s)); + } + return s; +} + +_PYRMQ_INLINE PyObject* +PyObjectArray_AddEntry(pyobject_array_t *array, PyObject *obj) +{ + if (array->num_entries < PYOBJECT_ARRAY_MAX) { + array->entries[array->num_entries] = obj; + array->num_entries++; + } + + return obj; +} + +static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *array, PyObject *obj) +{ + if (array->num_entries == PYOBJECT_POOL_MAX) { + if (!array->next) + array->next = PyObjectPool_New(array->pool); + + PyObjectPool_AddEntry(array->next, obj); + } else { + array->entries[array->num_entries] = obj; + array->num_entries++; + } + + return obj; +} + +static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *pool) +{ + pyobject_pool_t *array = amqp_pool_alloc(pool, sizeof(pyobject_pool_t)); + array->num_entries = 0; + array->entries = amqp_pool_alloc( + pool, PYOBJECT_POOL_MAX * sizeof(PyObject *)); + array->pool = pool; + array->next = (pyobject_pool_t *) 0; + + return array; +} + +static void PyObjectPool_XDECREF(pyobject_pool_t *array) +{ + int i; + + if (!array) + return; + + for (i = 0; i < array->num_entries; ++i) { + Py_XDECREF(array->entries[i]); + } + + array->num_entries = 0; + + if (array->next) { + PyObjectPool_XDECREF(array->next); + } +} + +static void PyObjectArray_XDECREF(pyobject_array_t *array) +{ + int i; + + for (i = 0; i < array->num_entries; ++i) { + Py_XDECREF(array->entries[i]); + } + + array->num_entries = 0; +} + /* ------: Error Handlers :----------------------------------------------- */ int PyRabbitMQ_HandleError(int ret, char const *context) @@ -1051,6 +1163,8 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) amqp_pool_t pool; amqp_table_t properties; + pyobject_pool_t *pyobj_pool = NULL; + if (self->connected) { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected"); goto bail; @@ -1076,12 +1190,14 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) if (self->client_properties != NULL && PyDict_Check(self->client_properties)) { init_amqp_pool(&pool, self->frame_max); - properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool); + pyobj_pool = PyObjectPool_New(&pool); + properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool, pyobj_pool); reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, &properties, AMQP_SASL_METHOD_PLAIN, self->userid, self->password); + PyObjectPool_XDECREF(pyobj_pool); } else { reply = amqp_login(self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, @@ -1103,8 +1219,9 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) error: PyRabbitMQ_Connection_close(self); bail: - return 0; + PyObjectPool_XDECREF(pyobj_pool); + return 0; } @@ -1473,22 +1590,26 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, amqp_pool_t *channel_pool = NULL; amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + pyobject_pool_t *pyobj_pool = NULL; + if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOOOO", &channel, &queue, &exchange, &routing_key, &arguments)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; - if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; - if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; + if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail; + if ((routing_key = PyObjectArray_Maybe_Unicode(routing_key, &pyobj_array)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } - bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); + pyobj_pool = PyObjectPool_New(channel_pool); + bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); if (PyErr_Occurred()) goto bail; @@ -1499,7 +1620,9 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(routing_key), bargs); reply = amqp_get_rpc_reply(self->conn); + PyObjectPool_XDECREF(pyobj_pool); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind")) @@ -1507,6 +1630,9 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, Py_RETURN_NONE; bail: + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1528,22 +1654,26 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, amqp_pool_t *channel_pool = NULL; amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + pyobject_pool_t *pyobj_pool = NULL; + if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOOOO", &channel, &queue, &exchange, &routing_key, &arguments)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; - if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; - if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; + if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail; + if ((routing_key = PyObjectArray_Maybe_Unicode(routing_key, &pyobj_array)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } - uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); + pyobj_pool = PyObjectPool_New(channel_pool); + uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); if (PyErr_Occurred()) goto bail; @@ -1554,7 +1684,9 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(routing_key), uargs); reply = amqp_get_rpc_reply(self->conn); + PyObjectPool_XDECREF(pyobj_pool); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind")) @@ -1562,6 +1694,9 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, Py_RETURN_NONE; bail: + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1580,13 +1715,15 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self, amqp_queue_delete_ok_t *ok; amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOII", &channel, &queue, &if_unused, &if_empty)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_queue_delete(self->conn, channel, @@ -1596,6 +1733,7 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self, if (ok == NULL) reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel, @@ -1604,6 +1742,7 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self, return PyInt_FromLong((long)ok->message_count); bail: + PyObjectArray_XDECREF(&pyobj_array); return 0; } @@ -1629,6 +1768,9 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, amqp_table_t qargs = amqp_empty_table; PyObject *ret = NULL; + pyobject_array_t pyobj_array = {0}; + pyobject_pool_t *pyobj_pool = NULL; + if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1636,13 +1778,14 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, &channel, &queue, &passive, &durable, &exclusive, &auto_delete, &arguments)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } - qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); + pyobj_pool = PyObjectPool_New(channel_pool); + qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); if (PyErr_Occurred()) goto bail; @@ -1656,6 +1799,8 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, qargs ); reply = amqp_get_rpc_reply(self->conn); + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare")) @@ -1667,6 +1812,9 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count)); return ret; bail: + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1685,18 +1833,21 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self, amqp_queue_purge_ok_t *ok; amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOI", &channel, &queue, &no_wait)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_queue_purge(self->conn, channel, PyString_AS_AMQBYTES(queue)); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge")) @@ -1704,6 +1855,7 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self, return PyInt_FromLong((long)ok->message_count); bail: + PyObjectArray_XDECREF(&pyobj_array); return 0; } @@ -1729,6 +1881,9 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, amqp_pool_t *channel_pool = NULL; amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + pyobject_pool_t *pyobj_pool = NULL; + if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1736,14 +1891,17 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, &channel, &exchange, &type, &passive, &durable, &auto_delete, &arguments)) goto bail; - if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; - if ((type = Maybe_Unicode(type)) == NULL) goto bail; + if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail; + if ((type = PyObjectArray_Maybe_Unicode(type, &pyobj_array)) == NULL) goto bail; + channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } - eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); + + pyobj_pool = PyObjectPool_New(channel_pool); + eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); if (PyErr_Occurred()) goto bail; @@ -1756,12 +1914,17 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, 0, 0, eargs ); reply = amqp_get_rpc_reply(self->conn); + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare")) goto bail; Py_RETURN_NONE; bail: + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1778,12 +1941,14 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOI", &channel, &exchange, &if_unused)) goto bail; - if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; + if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; amqp_exchange_delete(self->conn, channel, @@ -1791,6 +1956,7 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, (amqp_boolean_t)if_unused); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete")) @@ -1798,6 +1964,8 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, Py_RETURN_NONE; bail: + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1824,6 +1992,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, amqp_pool_t *channel_pool = NULL; memset(&props, 0, sizeof(props)); + pyobject_array_t pyobj_array = {0}; + pyobject_pool_t *pyobj_pool = NULL; + if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1831,12 +2002,15 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, &channel, &body_buf, &body_size, &exchange, &routing_key, &propdict, &mandatory, &immediate)) goto bail; - if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; - if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; + + if ((exchange = PyObjectArray_Maybe_Unicode(exchange, &pyobj_array)) == NULL) goto bail; + if ((routing_key = PyObjectArray_Maybe_Unicode(routing_key, &pyobj_array)) == NULL) goto bail; Py_INCREF(propdict); channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); - if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool) < 1) { + + pyobj_pool = PyObjectPool_New(channel_pool); + if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool, pyobj_pool) < 1) { goto bail; } Py_DECREF(propdict); @@ -1852,7 +2026,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, (amqp_boolean_t)immediate, &props, bytes); + PyObjectPool_XDECREF(pyobj_pool); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (!PyRabbitMQ_HandleError(ret, "basic.publish")) { @@ -1863,6 +2039,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, error: PyRabbitMQ_revive_channel(self, channel); bail: + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1949,18 +2128,21 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self, amqp_basic_cancel_ok_t *ok; amqp_rpc_reply_t reply; + pyobject_array_t pyobj_array = {0}; + if (PyRabbitMQ_Not_Connected(self)) goto ok; if (!PyArg_ParseTuple(args, "IO", &channel, &consumer_tag)) goto bail; - if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail; + if ((consumer_tag = PyObjectArray_Maybe_Unicode(consumer_tag, &pyobj_array)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_basic_cancel(self->conn, channel, PyString_AS_AMQBYTES(consumer_tag)); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel")) @@ -1969,6 +2151,8 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self, ok: Py_RETURN_NONE; bail: + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -1993,6 +2177,9 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, amqp_pool_t *channel_pool = NULL; amqp_table_t cargs = amqp_empty_table; + pyobject_array_t pyobj_array = {0}; + pyobject_pool_t *pyobj_pool = NULL; + if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -2000,16 +2187,19 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, &channel, &queue, &consumer_tag, &no_local, &no_ack, &exclusive, &arguments)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; - if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; + if ((consumer_tag = PyObjectArray_Maybe_Unicode(consumer_tag, &pyobj_array)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } - cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); - if (PyErr_Occurred()) goto bail; + + pyobj_pool = PyObjectPool_New(channel_pool); + cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); + if (PyErr_Occurred()) + goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_basic_consume(self->conn, channel, @@ -2020,6 +2210,8 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, exclusive, cargs); reply = amqp_get_rpc_reply(self->conn); + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume")) @@ -2027,6 +2219,9 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, return PySTRING_FROM_AMQBYTES(ok->consumer_tag); bail: + PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); + return 0; } @@ -2192,17 +2387,20 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self, PyObject *delivery_info = NULL; PyObject *message_count = NULL; + pyobject_array_t pyobj_array = {0}; + if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOI", &channel, &queue, &no_ack)) goto bail; - if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; + if ((queue = PyObjectArray_Maybe_Unicode(queue, &pyobj_array)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; reply = amqp_basic_get(self->conn, channel, PyString_AS_AMQBYTES(queue), (amqp_boolean_t)no_ack); + PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.get")) @@ -2225,7 +2423,7 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self, message_count = PyLong_FromLong(ok->message_count); PyDict_SetItemString(delivery_info, "message_count", message_count); Py_XDECREF(message_count); - + if (amqp_data_in_buffer(self->conn)) { if (PyRabbitMQ_recv(self, p, self->conn, 1) < 0) { if (!PyErr_Occurred()) @@ -2240,6 +2438,8 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self, error: PyRabbitMQ_Connection_close(self); bail: + PyObjectArray_XDECREF(&pyobj_array); + return 0; empty: Py_RETURN_NONE; diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h index 273ae75..c7e52e4 100644 --- a/Modules/_librabbitmq/connection.h +++ b/Modules/_librabbitmq/connection.h @@ -19,7 +19,7 @@ #else #define PYRABBITMQ_MOD_INIT(name) PyMODINIT_FUNC init##name(void) #endif - + #if PY_VERSION_HEX >= 0x03000000 /* 3.0 and up */ # define BUILD_METHOD_NAME PyUnicode_FromString @@ -172,9 +172,20 @@ typedef struct { PyObject *weakreflist; } PyRabbitMQ_Connection; +// Keep track of PyObject references with increased reference count. +// Entries are stored in the channel pool. +#define PYOBJECT_POOL_MAX 100 +typedef struct pyobject_pool_t_ { + int num_entries; + PyObject **entries; + amqp_pool_t *pool; + struct pyobject_pool_t_ *next; +} pyobject_pool_t; + int PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, - amqp_connection_state_t, amqp_pool_t *); + amqp_connection_state_t, amqp_pool_t *, + pyobject_pool_t *); /* Connection method sigs */ static PyRabbitMQ_Connection* -- cgit v1.2.1