From 6917f2789903da5cbc497fc77a56e1d639294697 Mon Sep 17 00:00:00 2001 From: Henry Tang Date: Sat, 18 Apr 2020 15:44:10 +0800 Subject: Simplify memory leak fix (avoid using amqp_pool_alloc) --- Modules/_librabbitmq/connection.c | 214 +++++++++++++------------------------- Modules/_librabbitmq/connection.h | 25 ----- 2 files changed, 74 insertions(+), 165 deletions(-) diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index a7bf338..c9e7b35 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -78,12 +78,27 @@ _PYRMQ_INLINE int RabbitMQ_wait_timeout(int, double); _PYRMQ_INLINE void basic_properties_to_PyDict(amqp_basic_properties_t*, PyObject*); -_PYRMQ_INLINE int -PyDict_to_basic_properties(PyObject *, +// Keep track of PyObject references with increased reference count +// Entries are stored in fixed size array. +#define PYOBJECT_ARRAY_MAX 5 +typedef struct pyobject_array_t { + int num_entries; + PyObject *entries[PYOBJECT_ARRAY_MAX]; + struct pyobject_array_t *next; +} pyobject_array_t; + + +static void PyObjectArray_XDECREF(pyobject_array_t *array); + +_PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *, PyObject *obj); +_PYRMQ_INLINE PyObject* PyObjectArray_Maybe_Unicode(PyObject *, pyobject_array_t *); + + +static int PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, amqp_connection_state_t, amqp_pool_t *, - pyobject_pool_t *); + pyobject_array_t *); _PYRMQ_INLINE void amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t, @@ -111,8 +126,8 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int, void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*); int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *); -static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *); -static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *); +static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_array_t *); +static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_array_t *); static PyObject* AMQTable_toPyDict(amqp_table_t *table); static PyObject* AMQArray_toPyList(amqp_array_t *array); @@ -129,26 +144,6 @@ int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *self) return 0; } -// Keep track of PyObject references with increased reference count -// Entries are stored in fixed size array. -#define PYOBJECT_ARRAY_MAX 5 -typedef struct pyobject_array_t { - int num_entries; - PyObject *entries[PYOBJECT_ARRAY_MAX]; -} pyobject_array_t; - - -static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *); -static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *, PyObject *); -static void PyObjectPool_XDECREF(pyobject_pool_t *array); - -_PYRMQ_INLINE PyObject* PyObjectPool_Maybe_Unicode(PyObject *, pyobject_pool_t *); - -static void PyObjectArray_XDECREF(pyobject_array_t *array); - -_PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *, PyObject *obj); -_PYRMQ_INLINE PyObject* PyObjectArray_Maybe_Unicode(PyObject *, pyobject_array_t *); - /* ------: AMQP Utils :--------------------------------------------------- */ _PYRMQ_INLINE amqp_table_entry_t* @@ -268,7 +263,7 @@ AMQArray_SetIntValue(amqp_array_t *array, int value) } static amqp_table_t -PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool) +PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_array_t *pyobj_array) { PyObject *dkey = NULL; PyObject *dvalue = NULL; @@ -285,7 +280,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool dst.num_entries = 0; dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_table_entry_t)); while (PyDict_Next(src, &pos, &dkey, &dvalue)) { - dkey = PyObjectPool_Maybe_Unicode(dkey, pyobj_pool); + dkey = PyObjectArray_Maybe_Unicode(dkey, pyobj_array); if (dvalue == Py_None) { /* None */ @@ -295,13 +290,13 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool /* Dict */ AMQTable_SetTableValue(&dst, PyString_AS_AMQBYTES(dkey), - PyDict_ToAMQTable(conn, dvalue, pool, pyobj_pool)); + PyDict_ToAMQTable(conn, dvalue, pool, pyobj_array)); } else if (PyList_Check(dvalue) || PyTuple_Check(dvalue)) { /* List */ AMQTable_SetArrayValue(&dst, PyString_AS_AMQBYTES(dkey), - PyIter_ToAMQArray(conn, dvalue, pool, pyobj_pool)); + PyIter_ToAMQArray(conn, dvalue, pool, pyobj_array)); } else if (PyBool_Check(dvalue)) { /* Bool */ @@ -344,7 +339,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool if (is_unicode) { if ((dvalue = PyUnicode_AsEncodedString(dvalue, "utf-8", "strict")) == NULL) goto error; - PyObjectPool_AddEntry(pyobj_pool, dvalue); + PyObjectArray_AddEntry(pyobj_array, dvalue); } AMQTable_SetStringValue(&dst, PyString_AS_AMQBYTES(dkey), @@ -367,7 +362,7 @@ error: } static amqp_array_t -PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool) +PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_array_t *pyobj_array) { Py_ssize_t pos = 0; uint64_t clong_value = 0; @@ -394,12 +389,12 @@ PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool else if (PyDict_Check(item)) { /* Dict */ AMQArray_SetTableValue( - &dst, PyDict_ToAMQTable(conn, item, pool, pyobj_pool)); + &dst, PyDict_ToAMQTable(conn, item, pool, pyobj_array)); } else if (PyList_Check(item) || PyTuple_Check(item)) { /* List */ AMQArray_SetArrayValue( - &dst, PyIter_ToAMQArray(conn, item, pool, pyobj_pool)); + &dst, PyIter_ToAMQArray(conn, item, pool, pyobj_array)); } else if (PyLong_Check(item) || PyInt_Check(item)) { /* Int | Long */ @@ -705,59 +700,58 @@ AMQArray_toPyList(amqp_array_t *array) return list; } -_PYRMQ_INLINE int -PyDict_to_basic_properties(PyObject *p, +static int PyDict_to_basic_properties(PyObject *p, amqp_basic_properties_t *props, amqp_connection_state_t conn, amqp_pool_t *pool, - pyobject_pool_t *pyobj_pool) + pyobject_array_t *pyobj_array) { PyObject *value = NULL; props->headers = amqp_empty_table; props->_flags = AMQP_BASIC_HEADERS_FLAG; if ((value = PyDict_GetItemString(p, "content_type")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->content_type = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; } if ((value = PyDict_GetItemString(p, "content_encoding")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->content_encoding = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; } if ((value = PyDict_GetItemString(p, "correlation_id")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->correlation_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CORRELATION_ID_FLAG; } if ((value = PyDict_GetItemString(p, "reply_to")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->reply_to = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_REPLY_TO_FLAG; } if ((value = PyDict_GetItemString(p, "expiration")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->expiration = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_EXPIRATION_FLAG; } if ((value = PyDict_GetItemString(p, "message_id")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->message_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_MESSAGE_ID_FLAG; } if ((value = PyDict_GetItemString(p, "type")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->type = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_TYPE_FLAG; } if ((value = PyDict_GetItemString(p, "user_id")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->user_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_USER_ID_FLAG; } if ((value = PyDict_GetItemString(p, "app_id")) != NULL) { - if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1; + if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1; props->app_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_APP_ID_FLAG; } @@ -775,7 +769,7 @@ PyDict_to_basic_properties(PyObject *p, } if ((value = PyDict_GetItemString(p, "headers")) != NULL) { - props->headers = PyDict_ToAMQTable(conn, value, pool, pyobj_pool); + props->headers = PyDict_ToAMQTable(conn, value, pool, pyobj_array); if (PyErr_Occurred()) return -1; } return 1; @@ -812,19 +806,10 @@ amqp_basic_deliver_to_PyDict(PyObject *dest, /* ------: Keep track of increased reference counts :------------------------ */ _PYRMQ_INLINE PyObject* -PyObjectArray_Maybe_Unicode(PyObject *s, pyobject_array_t *pyobj_pool) -{ - if (PyUnicode_Check(s)) { - return PyObjectArray_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s)); - } - return s; -} - -_PYRMQ_INLINE PyObject* -PyObjectPool_Maybe_Unicode(PyObject *s, pyobject_pool_t *pyobj_pool) +PyObjectArray_Maybe_Unicode(PyObject *s, pyobject_array_t *array) { if (PyUnicode_Check(s)) { - return PyObjectPool_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s)); + return PyObjectArray_AddEntry(array, PyUnicode_AsASCIIString(s)); } return s; } @@ -832,64 +817,38 @@ PyObjectPool_Maybe_Unicode(PyObject *s, pyobject_pool_t *pyobj_pool) _PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *array, PyObject *obj) { - if (obj && array->num_entries < PYOBJECT_ARRAY_MAX) { - array->entries[array->num_entries] = obj; - array->num_entries++; + if (!obj) { + return obj; } - return obj; -} - -static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *array, PyObject *obj) -{ - if (obj) { - if (array->num_entries == PYOBJECT_POOL_MAX) { - if (!array->next) - array->next = PyObjectPool_New(array->pool); - - PyObjectPool_AddEntry(array->next, obj); - } else { - array->entries[array->num_entries] = obj; - array->num_entries++; + if (array->num_entries == PYOBJECT_ARRAY_MAX) { + if (!array->next) { + array->next = (pyobject_array_t *) calloc(1, sizeof(pyobject_array_t)); } - } - return obj; -} + return PyObjectArray_AddEntry(array->next, obj); + } -static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *pool) -{ - pyobject_pool_t *array = amqp_pool_alloc(pool, sizeof(pyobject_pool_t)); - array->num_entries = 0; - array->entries = amqp_pool_alloc( - pool, PYOBJECT_POOL_MAX * sizeof(PyObject *)); - array->pool = pool; - array->next = (pyobject_pool_t *) 0; + array->entries[array->num_entries] = obj; + array->num_entries++; - return array; + return obj; } -static void PyObjectPool_XDECREF(pyobject_pool_t *array) +static void PyObjectArray_XDECREF(pyobject_array_t *array) { int i; - if (!array) + if (!array) { return; - - for (i = 0; i < array->num_entries; ++i) { - Py_XDECREF(array->entries[i]); } - array->num_entries = 0; - if (array->next) { - PyObjectPool_XDECREF(array->next); + PyObjectArray_XDECREF(array->next); + free(array->next); + array->next = (pyobject_array_t *) 0; } -} -static void PyObjectArray_XDECREF(pyobject_array_t *array) -{ - int i; for (i = 0; i < array->num_entries; ++i) { Py_XDECREF(array->entries[i]); @@ -1169,7 +1128,7 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) amqp_pool_t pool; amqp_table_t properties; - pyobject_pool_t *pyobj_pool = NULL; + pyobject_array_t pyobj_array = {0}; if (self->connected) { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected"); @@ -1196,14 +1155,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) if (self->client_properties != NULL && PyDict_Check(self->client_properties)) { init_amqp_pool(&pool, self->frame_max); - pyobj_pool = PyObjectPool_New(&pool); - properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool, pyobj_pool); + properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool, &pyobj_array); reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, &properties, AMQP_SASL_METHOD_PLAIN, self->userid, self->password); - PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); } else { reply = amqp_login(self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, @@ -1225,7 +1183,7 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) error: PyRabbitMQ_Connection_close(self); bail: - PyObjectPool_XDECREF(pyobj_pool); + PyObjectArray_XDECREF(&pyobj_array); return 0; } @@ -1597,7 +1555,6 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, amqp_rpc_reply_t reply; pyobject_array_t pyobj_array = {0}; - pyobject_pool_t *pyobj_pool = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1614,8 +1571,7 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, PyErr_NoMemory(); goto bail; } - pyobj_pool = PyObjectPool_New(channel_pool); - bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); + bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array); if (PyErr_Occurred()) goto bail; @@ -1626,17 +1582,15 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(routing_key), bargs); reply = amqp_get_rpc_reply(self->conn); - PyObjectPool_XDECREF(pyobj_pool); amqp_maybe_release_buffers_on_channel(self->conn, channel); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind")) goto bail; Py_RETURN_NONE; bail: - PyObjectPool_XDECREF(pyobj_pool); PyObjectArray_XDECREF(&pyobj_array); return 0; @@ -1661,7 +1615,6 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, amqp_rpc_reply_t reply; pyobject_array_t pyobj_array = {0}; - pyobject_pool_t *pyobj_pool = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1678,8 +1631,7 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, PyErr_NoMemory(); goto bail; } - pyobj_pool = PyObjectPool_New(channel_pool); - uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); + uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array); if (PyErr_Occurred()) goto bail; @@ -1690,17 +1642,15 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, PyString_AS_AMQBYTES(routing_key), uargs); reply = amqp_get_rpc_reply(self->conn); - PyObjectPool_XDECREF(pyobj_pool); amqp_maybe_release_buffers_on_channel(self->conn, channel); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind")) goto bail; Py_RETURN_NONE; bail: - PyObjectPool_XDECREF(pyobj_pool); PyObjectArray_XDECREF(&pyobj_array); return 0; @@ -1775,7 +1725,6 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, PyObject *ret = NULL; pyobject_array_t pyobj_array = {0}; - pyobject_pool_t *pyobj_pool = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1790,8 +1739,7 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, PyErr_NoMemory(); goto bail; } - pyobj_pool = PyObjectPool_New(channel_pool); - qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); + qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array); if (PyErr_Occurred()) goto bail; @@ -1805,9 +1753,8 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, qargs ); reply = amqp_get_rpc_reply(self->conn); - PyObjectPool_XDECREF(pyobj_pool); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare")) goto bail; @@ -1818,7 +1765,6 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count)); return ret; bail: - PyObjectPool_XDECREF(pyobj_pool); PyObjectArray_XDECREF(&pyobj_array); return 0; @@ -1888,7 +1834,6 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, amqp_rpc_reply_t reply; pyobject_array_t pyobj_array = {0}; - pyobject_pool_t *pyobj_pool = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -1906,8 +1851,7 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, goto bail; } - pyobj_pool = PyObjectPool_New(channel_pool); - eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); + eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array); if (PyErr_Occurred()) goto bail; @@ -1920,15 +1864,13 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, 0, 0, eargs ); reply = amqp_get_rpc_reply(self->conn); - PyObjectPool_XDECREF(pyobj_pool); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare")) goto bail; Py_RETURN_NONE; bail: - PyObjectPool_XDECREF(pyobj_pool); PyObjectArray_XDECREF(&pyobj_array); return 0; @@ -1962,8 +1904,8 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, (amqp_boolean_t)if_unused); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete")) goto bail; @@ -1999,7 +1941,6 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, memset(&props, 0, sizeof(props)); pyobject_array_t pyobj_array = {0}; - pyobject_pool_t *pyobj_pool = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -2015,8 +1956,7 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, Py_INCREF(propdict); channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); - pyobj_pool = PyObjectPool_New(channel_pool); - if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool, pyobj_pool) < 1) { + if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool, &pyobj_array) < 1) { goto bail; } Py_DECREF(propdict); @@ -2032,10 +1972,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, (amqp_boolean_t)immediate, &props, bytes); - PyObjectPool_XDECREF(pyobj_pool); amqp_maybe_release_buffers_on_channel(self->conn, channel); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (!PyRabbitMQ_HandleError(ret, "basic.publish")) { goto error; @@ -2045,7 +1984,6 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, error: PyRabbitMQ_revive_channel(self, channel); bail: - PyObjectPool_XDECREF(pyobj_pool); PyObjectArray_XDECREF(&pyobj_array); return 0; @@ -2184,7 +2122,6 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, amqp_table_t cargs = amqp_empty_table; pyobject_array_t pyobj_array = {0}; - pyobject_pool_t *pyobj_pool = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -2202,8 +2139,7 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, goto bail; } - pyobj_pool = PyObjectPool_New(channel_pool); - cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool); + cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array); if (PyErr_Occurred()) goto bail; @@ -2216,16 +2152,14 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, exclusive, cargs); reply = amqp_get_rpc_reply(self->conn); - PyObjectPool_XDECREF(pyobj_pool); - PyObjectArray_XDECREF(&pyobj_array); Py_END_ALLOW_THREADS; + PyObjectArray_XDECREF(&pyobj_array); if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume")) goto bail; return PySTRING_FROM_AMQBYTES(ok->consumer_tag); bail: - PyObjectPool_XDECREF(pyobj_pool); PyObjectArray_XDECREF(&pyobj_array); return 0; diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h index 9dea28d..9595085 100644 --- a/Modules/_librabbitmq/connection.h +++ b/Modules/_librabbitmq/connection.h @@ -104,8 +104,6 @@ buffer_toMemoryView(char *buf, Py_ssize_t buf_len) { PySTRING_FROM_AMQBYTES(table->headers.entries[i].key), \ stmt); \ -_PYRMQ_INLINE PyObject* Maybe_Unicode(PyObject *); - #if defined(__C99__) || defined(__GNUC__) # define PyString_AS_AMQBYTES(s) \ (amqp_bytes_t){Py_SIZE(s), (void *)PyBytes_AS_STRING(s)} @@ -122,14 +120,6 @@ PyString_AS_AMQBYTES(PyObject *s) } #endif -_PYRMQ_INLINE PyObject* -Maybe_Unicode(PyObject *s) -{ - if (PyUnicode_Check(s)) - return PyUnicode_AsASCIIString(s); - return s; -} - #define PYRMQ_IS_TIMEOUT(t) (t > 0.0) #define PYRMQ_IS_NONBLOCK(t) (t == -1) #define PYRMQ_SHOULD_POLL(t) (PYRMQ_IS_TIMEOUT(t) || PYRMQ_IS_NONBLOCK(t)) @@ -172,21 +162,6 @@ typedef struct { PyObject *weakreflist; } PyRabbitMQ_Connection; -// Keep track of PyObject references with increased reference count. -// Entries are stored in the channel pool. -#define PYOBJECT_POOL_MAX 100 -typedef struct pyobject_pool_t_ { - int num_entries; - PyObject **entries; - amqp_pool_t *pool; - struct pyobject_pool_t_ *next; -} pyobject_pool_t; - -int -PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, - amqp_connection_state_t, amqp_pool_t *, - pyobject_pool_t *); - /* Connection method sigs */ static PyRabbitMQ_Connection* PyRabbitMQ_ConnectionType_new(PyTypeObject *, PyObject *, PyObject *); -- cgit v1.2.1