From aea11bc941479abdcc7532bc142fcd4109a36934 Mon Sep 17 00:00:00 2001 From: Amichai Schreiber Date: Thu, 17 Aug 2017 23:56:58 +0300 Subject: in recv, make sure all frames are read from the same channel. seems to fix #97 --- Modules/_librabbitmq/connection.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 6626e47..10b0ddf 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -1302,6 +1302,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, amqp_connection_state_t conn, int piggyback) { amqp_frame_t frame; + amqp_channel_t cur_channel = 0; amqp_basic_deliver_t *deliver; amqp_basic_properties_t *props; PY_SIZE_TYPE body_target; @@ -1330,6 +1331,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe; + cur_channel = frame.channel; + delivery_info = PyDict_New(); deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded; /* need consumer tag for later. @@ -1348,7 +1351,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, } Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = cur_channel == 0 ? + amqp_simple_wait_frame(conn, &frame) : + amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; @@ -1358,6 +1363,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, goto finally; } + /* if piggybacked, 'channel' is still 0 at this point */ + cur_channel = frame.channel; + /* channel */ channel = PyInt_FromLong((unsigned long)frame.channel); @@ -1371,7 +1379,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, for (i = 0; body_received < body_target; i++) { Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; -- cgit v1.2.1