summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenry Tang <henryykt@gmail.com>2020-04-18 15:44:10 +0800
committerAsif Saif Uddin <auvipy@gmail.com>2020-04-18 14:57:44 +0600
commit6917f2789903da5cbc497fc77a56e1d639294697 (patch)
tree1dc5950f8e46fd3e5b77f999e2d44810ee886d6c
parentb93ba31757105510b33c293fd664fbed09c533ff (diff)
downloadlibrabbitmq-6917f2789903da5cbc497fc77a56e1d639294697.tar.gz
Simplify memory leak fix (avoid using amqp_pool_alloc)
-rw-r--r--Modules/_librabbitmq/connection.c214
-rw-r--r--Modules/_librabbitmq/connection.h25
2 files changed, 74 insertions, 165 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index a7bf338..c9e7b35 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -78,12 +78,27 @@ _PYRMQ_INLINE int RabbitMQ_wait_timeout(int, double);
_PYRMQ_INLINE void
basic_properties_to_PyDict(amqp_basic_properties_t*, PyObject*);
-_PYRMQ_INLINE int
-PyDict_to_basic_properties(PyObject *,
+// Keep track of PyObject references with increased reference count
+// Entries are stored in fixed size array.
+#define PYOBJECT_ARRAY_MAX 5
+typedef struct pyobject_array_t {
+ int num_entries;
+ PyObject *entries[PYOBJECT_ARRAY_MAX];
+ struct pyobject_array_t *next;
+} pyobject_array_t;
+
+
+static void PyObjectArray_XDECREF(pyobject_array_t *array);
+
+_PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *, PyObject *obj);
+_PYRMQ_INLINE PyObject* PyObjectArray_Maybe_Unicode(PyObject *, pyobject_array_t *);
+
+
+static int PyDict_to_basic_properties(PyObject *,
amqp_basic_properties_t *,
amqp_connection_state_t,
amqp_pool_t *,
- pyobject_pool_t *);
+ pyobject_array_t *);
_PYRMQ_INLINE void
amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t,
@@ -111,8 +126,8 @@ int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int,
void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*);
int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *);
-static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *);
-static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_pool_t *);
+static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_array_t *);
+static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *, pyobject_array_t *);
static PyObject* AMQTable_toPyDict(amqp_table_t *table);
static PyObject* AMQArray_toPyList(amqp_array_t *array);
@@ -129,26 +144,6 @@ int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *self)
return 0;
}
-// Keep track of PyObject references with increased reference count
-// Entries are stored in fixed size array.
-#define PYOBJECT_ARRAY_MAX 5
-typedef struct pyobject_array_t {
- int num_entries;
- PyObject *entries[PYOBJECT_ARRAY_MAX];
-} pyobject_array_t;
-
-
-static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *);
-static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *, PyObject *);
-static void PyObjectPool_XDECREF(pyobject_pool_t *array);
-
-_PYRMQ_INLINE PyObject* PyObjectPool_Maybe_Unicode(PyObject *, pyobject_pool_t *);
-
-static void PyObjectArray_XDECREF(pyobject_array_t *array);
-
-_PYRMQ_INLINE PyObject* PyObjectArray_AddEntry(pyobject_array_t *, PyObject *obj);
-_PYRMQ_INLINE PyObject* PyObjectArray_Maybe_Unicode(PyObject *, pyobject_array_t *);
-
/* ------: AMQP Utils :--------------------------------------------------- */
_PYRMQ_INLINE amqp_table_entry_t*
@@ -268,7 +263,7 @@ AMQArray_SetIntValue(amqp_array_t *array, int value)
}
static amqp_table_t
-PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool)
+PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_array_t *pyobj_array)
{
PyObject *dkey = NULL;
PyObject *dvalue = NULL;
@@ -285,7 +280,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
dst.num_entries = 0;
dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_table_entry_t));
while (PyDict_Next(src, &pos, &dkey, &dvalue)) {
- dkey = PyObjectPool_Maybe_Unicode(dkey, pyobj_pool);
+ dkey = PyObjectArray_Maybe_Unicode(dkey, pyobj_array);
if (dvalue == Py_None) {
/* None */
@@ -295,13 +290,13 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
/* Dict */
AMQTable_SetTableValue(&dst,
PyString_AS_AMQBYTES(dkey),
- PyDict_ToAMQTable(conn, dvalue, pool, pyobj_pool));
+ PyDict_ToAMQTable(conn, dvalue, pool, pyobj_array));
}
else if (PyList_Check(dvalue) || PyTuple_Check(dvalue)) {
/* List */
AMQTable_SetArrayValue(&dst,
PyString_AS_AMQBYTES(dkey),
- PyIter_ToAMQArray(conn, dvalue, pool, pyobj_pool));
+ PyIter_ToAMQArray(conn, dvalue, pool, pyobj_array));
}
else if (PyBool_Check(dvalue)) {
/* Bool */
@@ -344,7 +339,7 @@ PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
if (is_unicode) {
if ((dvalue = PyUnicode_AsEncodedString(dvalue, "utf-8", "strict")) == NULL)
goto error;
- PyObjectPool_AddEntry(pyobj_pool, dvalue);
+ PyObjectArray_AddEntry(pyobj_array, dvalue);
}
AMQTable_SetStringValue(&dst,
PyString_AS_AMQBYTES(dkey),
@@ -367,7 +362,7 @@ error:
}
static amqp_array_t
-PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_pool_t *pyobj_pool)
+PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool, pyobject_array_t *pyobj_array)
{
Py_ssize_t pos = 0;
uint64_t clong_value = 0;
@@ -394,12 +389,12 @@ PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool
else if (PyDict_Check(item)) {
/* Dict */
AMQArray_SetTableValue(
- &dst, PyDict_ToAMQTable(conn, item, pool, pyobj_pool));
+ &dst, PyDict_ToAMQTable(conn, item, pool, pyobj_array));
}
else if (PyList_Check(item) || PyTuple_Check(item)) {
/* List */
AMQArray_SetArrayValue(
- &dst, PyIter_ToAMQArray(conn, item, pool, pyobj_pool));
+ &dst, PyIter_ToAMQArray(conn, item, pool, pyobj_array));
}
else if (PyLong_Check(item) || PyInt_Check(item)) {
/* Int | Long */
@@ -705,59 +700,58 @@ AMQArray_toPyList(amqp_array_t *array)
return list;
}
-_PYRMQ_INLINE int
-PyDict_to_basic_properties(PyObject *p,
+static int PyDict_to_basic_properties(PyObject *p,
amqp_basic_properties_t *props,
amqp_connection_state_t conn,
amqp_pool_t *pool,
- pyobject_pool_t *pyobj_pool)
+ pyobject_array_t *pyobj_array)
{
PyObject *value = NULL;
props->headers = amqp_empty_table;
props->_flags = AMQP_BASIC_HEADERS_FLAG;
if ((value = PyDict_GetItemString(p, "content_type")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->content_type = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
}
if ((value = PyDict_GetItemString(p, "content_encoding")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->content_encoding = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
}
if ((value = PyDict_GetItemString(p, "correlation_id")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->correlation_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
}
if ((value = PyDict_GetItemString(p, "reply_to")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->reply_to = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_REPLY_TO_FLAG;
}
if ((value = PyDict_GetItemString(p, "expiration")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->expiration = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_EXPIRATION_FLAG;
}
if ((value = PyDict_GetItemString(p, "message_id")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->message_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_MESSAGE_ID_FLAG;
}
if ((value = PyDict_GetItemString(p, "type")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->type = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_TYPE_FLAG;
}
if ((value = PyDict_GetItemString(p, "user_id")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->user_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_USER_ID_FLAG;
}
if ((value = PyDict_GetItemString(p, "app_id")) != NULL) {
- if ((value = PyObjectPool_Maybe_Unicode(value, pyobj_pool)) == NULL) return -1;
+ if ((value = PyObjectArray_Maybe_Unicode(value, pyobj_array)) == NULL) return -1;
props->app_id = PyString_AS_AMQBYTES(value);
props->_flags |= AMQP_BASIC_APP_ID_FLAG;
}
@@ -775,7 +769,7 @@ PyDict_to_basic_properties(PyObject *p,
}
if ((value = PyDict_GetItemString(p, "headers")) != NULL) {
- props->headers = PyDict_ToAMQTable(conn, value, pool, pyobj_pool);
+ props->headers = PyDict_ToAMQTable(conn, value, pool, pyobj_array);
if (PyErr_Occurred()) return -1;
}
return 1;
@@ -812,19 +806,10 @@ amqp_basic_deliver_to_PyDict(PyObject *dest,
/* ------: Keep track of increased reference counts :------------------------ */
_PYRMQ_INLINE PyObject*
-PyObjectArray_Maybe_Unicode(PyObject *s, pyobject_array_t *pyobj_pool)
-{
- if (PyUnicode_Check(s)) {
- return PyObjectArray_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s));
- }
- return s;
-}
-
-_PYRMQ_INLINE PyObject*
-PyObjectPool_Maybe_Unicode(PyObject *s, pyobject_pool_t *pyobj_pool)
+PyObjectArray_Maybe_Unicode(PyObject *s, pyobject_array_t *array)
{
if (PyUnicode_Check(s)) {
- return PyObjectPool_AddEntry(pyobj_pool, PyUnicode_AsASCIIString(s));
+ return PyObjectArray_AddEntry(array, PyUnicode_AsASCIIString(s));
}
return s;
}
@@ -832,64 +817,38 @@ PyObjectPool_Maybe_Unicode(PyObject *s, pyobject_pool_t *pyobj_pool)
_PYRMQ_INLINE PyObject*
PyObjectArray_AddEntry(pyobject_array_t *array, PyObject *obj)
{
- if (obj && array->num_entries < PYOBJECT_ARRAY_MAX) {
- array->entries[array->num_entries] = obj;
- array->num_entries++;
+ if (!obj) {
+ return obj;
}
- return obj;
-}
-
-static PyObject *PyObjectPool_AddEntry(pyobject_pool_t *array, PyObject *obj)
-{
- if (obj) {
- if (array->num_entries == PYOBJECT_POOL_MAX) {
- if (!array->next)
- array->next = PyObjectPool_New(array->pool);
-
- PyObjectPool_AddEntry(array->next, obj);
- } else {
- array->entries[array->num_entries] = obj;
- array->num_entries++;
+ if (array->num_entries == PYOBJECT_ARRAY_MAX) {
+ if (!array->next) {
+ array->next = (pyobject_array_t *) calloc(1, sizeof(pyobject_array_t));
}
- }
- return obj;
-}
+ return PyObjectArray_AddEntry(array->next, obj);
+ }
-static pyobject_pool_t *PyObjectPool_New(amqp_pool_t *pool)
-{
- pyobject_pool_t *array = amqp_pool_alloc(pool, sizeof(pyobject_pool_t));
- array->num_entries = 0;
- array->entries = amqp_pool_alloc(
- pool, PYOBJECT_POOL_MAX * sizeof(PyObject *));
- array->pool = pool;
- array->next = (pyobject_pool_t *) 0;
+ array->entries[array->num_entries] = obj;
+ array->num_entries++;
- return array;
+ return obj;
}
-static void PyObjectPool_XDECREF(pyobject_pool_t *array)
+static void PyObjectArray_XDECREF(pyobject_array_t *array)
{
int i;
- if (!array)
+ if (!array) {
return;
-
- for (i = 0; i < array->num_entries; ++i) {
- Py_XDECREF(array->entries[i]);
}
- array->num_entries = 0;
-
if (array->next) {
- PyObjectPool_XDECREF(array->next);
+ PyObjectArray_XDECREF(array->next);
+ free(array->next);
+ array->next = (pyobject_array_t *) 0;
}
-}
-static void PyObjectArray_XDECREF(pyobject_array_t *array)
-{
- int i;
for (i = 0; i < array->num_entries; ++i) {
Py_XDECREF(array->entries[i]);
@@ -1169,7 +1128,7 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
amqp_pool_t pool;
amqp_table_t properties;
- pyobject_pool_t *pyobj_pool = NULL;
+ pyobject_array_t pyobj_array = {0};
if (self->connected) {
PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected");
@@ -1196,14 +1155,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
if (self->client_properties != NULL && PyDict_Check(self->client_properties)) {
init_amqp_pool(&pool, self->frame_max);
- pyobj_pool = PyObjectPool_New(&pool);
- properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool, pyobj_pool);
+ properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool, &pyobj_array);
reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
&properties,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
- PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
} else {
reply = amqp_login(self->conn, self->virtual_host, self->channel_max,
self->frame_max, self->heartbeat,
@@ -1225,7 +1183,7 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
error:
PyRabbitMQ_Connection_close(self);
bail:
- PyObjectPool_XDECREF(pyobj_pool);
+ PyObjectArray_XDECREF(&pyobj_array);
return 0;
}
@@ -1597,7 +1555,6 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
amqp_rpc_reply_t reply;
pyobject_array_t pyobj_array = {0};
- pyobject_pool_t *pyobj_pool = NULL;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1614,8 +1571,7 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
PyErr_NoMemory();
goto bail;
}
- pyobj_pool = PyObjectPool_New(channel_pool);
- bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
+ bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array);
if (PyErr_Occurred())
goto bail;
@@ -1626,17 +1582,15 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(routing_key),
bargs);
reply = amqp_get_rpc_reply(self->conn);
- PyObjectPool_XDECREF(pyobj_pool);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind"))
goto bail;
Py_RETURN_NONE;
bail:
- PyObjectPool_XDECREF(pyobj_pool);
PyObjectArray_XDECREF(&pyobj_array);
return 0;
@@ -1661,7 +1615,6 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
amqp_rpc_reply_t reply;
pyobject_array_t pyobj_array = {0};
- pyobject_pool_t *pyobj_pool = NULL;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1678,8 +1631,7 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
PyErr_NoMemory();
goto bail;
}
- pyobj_pool = PyObjectPool_New(channel_pool);
- uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
+ uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array);
if (PyErr_Occurred())
goto bail;
@@ -1690,17 +1642,15 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
PyString_AS_AMQBYTES(routing_key),
uargs);
reply = amqp_get_rpc_reply(self->conn);
- PyObjectPool_XDECREF(pyobj_pool);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind"))
goto bail;
Py_RETURN_NONE;
bail:
- PyObjectPool_XDECREF(pyobj_pool);
PyObjectArray_XDECREF(&pyobj_array);
return 0;
@@ -1775,7 +1725,6 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
PyObject *ret = NULL;
pyobject_array_t pyobj_array = {0};
- pyobject_pool_t *pyobj_pool = NULL;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1790,8 +1739,7 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
PyErr_NoMemory();
goto bail;
}
- pyobj_pool = PyObjectPool_New(channel_pool);
- qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
+ qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array);
if (PyErr_Occurred())
goto bail;
@@ -1805,9 +1753,8 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
qargs
);
reply = amqp_get_rpc_reply(self->conn);
- PyObjectPool_XDECREF(pyobj_pool);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare"))
goto bail;
@@ -1818,7 +1765,6 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count));
return ret;
bail:
- PyObjectPool_XDECREF(pyobj_pool);
PyObjectArray_XDECREF(&pyobj_array);
return 0;
@@ -1888,7 +1834,6 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
amqp_rpc_reply_t reply;
pyobject_array_t pyobj_array = {0};
- pyobject_pool_t *pyobj_pool = NULL;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -1906,8 +1851,7 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
goto bail;
}
- pyobj_pool = PyObjectPool_New(channel_pool);
- eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
+ eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array);
if (PyErr_Occurred())
goto bail;
@@ -1920,15 +1864,13 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
0, 0, eargs
);
reply = amqp_get_rpc_reply(self->conn);
- PyObjectPool_XDECREF(pyobj_pool);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare"))
goto bail;
Py_RETURN_NONE;
bail:
- PyObjectPool_XDECREF(pyobj_pool);
PyObjectArray_XDECREF(&pyobj_array);
return 0;
@@ -1962,8 +1904,8 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self,
(amqp_boolean_t)if_unused);
reply = amqp_get_rpc_reply(self->conn);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete"))
goto bail;
@@ -1999,7 +1941,6 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
memset(&props, 0, sizeof(props));
pyobject_array_t pyobj_array = {0};
- pyobject_pool_t *pyobj_pool = NULL;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -2015,8 +1956,7 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
Py_INCREF(propdict);
channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel);
- pyobj_pool = PyObjectPool_New(channel_pool);
- if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool, pyobj_pool) < 1) {
+ if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool, &pyobj_array) < 1) {
goto bail;
}
Py_DECREF(propdict);
@@ -2032,10 +1972,9 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
(amqp_boolean_t)immediate,
&props,
bytes);
- PyObjectPool_XDECREF(pyobj_pool);
amqp_maybe_release_buffers_on_channel(self->conn, channel);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (!PyRabbitMQ_HandleError(ret, "basic.publish")) {
goto error;
@@ -2045,7 +1984,6 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
error:
PyRabbitMQ_revive_channel(self, channel);
bail:
- PyObjectPool_XDECREF(pyobj_pool);
PyObjectArray_XDECREF(&pyobj_array);
return 0;
@@ -2184,7 +2122,6 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
amqp_table_t cargs = amqp_empty_table;
pyobject_array_t pyobj_array = {0};
- pyobject_pool_t *pyobj_pool = NULL;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -2202,8 +2139,7 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
goto bail;
}
- pyobj_pool = PyObjectPool_New(channel_pool);
- cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, pyobj_pool);
+ cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool, &pyobj_array);
if (PyErr_Occurred())
goto bail;
@@ -2216,16 +2152,14 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
exclusive,
cargs);
reply = amqp_get_rpc_reply(self->conn);
- PyObjectPool_XDECREF(pyobj_pool);
- PyObjectArray_XDECREF(&pyobj_array);
Py_END_ALLOW_THREADS;
+ PyObjectArray_XDECREF(&pyobj_array);
if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume"))
goto bail;
return PySTRING_FROM_AMQBYTES(ok->consumer_tag);
bail:
- PyObjectPool_XDECREF(pyobj_pool);
PyObjectArray_XDECREF(&pyobj_array);
return 0;
diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h
index 9dea28d..9595085 100644
--- a/Modules/_librabbitmq/connection.h
+++ b/Modules/_librabbitmq/connection.h
@@ -104,8 +104,6 @@ buffer_toMemoryView(char *buf, Py_ssize_t buf_len) {
PySTRING_FROM_AMQBYTES(table->headers.entries[i].key), \
stmt); \
-_PYRMQ_INLINE PyObject* Maybe_Unicode(PyObject *);
-
#if defined(__C99__) || defined(__GNUC__)
# define PyString_AS_AMQBYTES(s) \
(amqp_bytes_t){Py_SIZE(s), (void *)PyBytes_AS_STRING(s)}
@@ -122,14 +120,6 @@ PyString_AS_AMQBYTES(PyObject *s)
}
#endif
-_PYRMQ_INLINE PyObject*
-Maybe_Unicode(PyObject *s)
-{
- if (PyUnicode_Check(s))
- return PyUnicode_AsASCIIString(s);
- return s;
-}
-
#define PYRMQ_IS_TIMEOUT(t) (t > 0.0)
#define PYRMQ_IS_NONBLOCK(t) (t == -1)
#define PYRMQ_SHOULD_POLL(t) (PYRMQ_IS_TIMEOUT(t) || PYRMQ_IS_NONBLOCK(t))
@@ -172,21 +162,6 @@ typedef struct {
PyObject *weakreflist;
} PyRabbitMQ_Connection;
-// Keep track of PyObject references with increased reference count.
-// Entries are stored in the channel pool.
-#define PYOBJECT_POOL_MAX 100
-typedef struct pyobject_pool_t_ {
- int num_entries;
- PyObject **entries;
- amqp_pool_t *pool;
- struct pyobject_pool_t_ *next;
-} pyobject_pool_t;
-
-int
-PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *,
- amqp_connection_state_t, amqp_pool_t *,
- pyobject_pool_t *);
-
/* Connection method sigs */
static PyRabbitMQ_Connection*
PyRabbitMQ_ConnectionType_new(PyTypeObject *, PyObject *, PyObject *);