summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Modules/_librabbitmq/connection.c12
1 files changed, 10 insertions, 2 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 29a0e52..54f2427 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -1305,6 +1305,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
amqp_connection_state_t conn, int piggyback)
{
amqp_frame_t frame;
+ amqp_channel_t cur_channel = 0;
amqp_basic_deliver_t *deliver;
amqp_basic_properties_t *props;
Py_ssize_t body_target;
@@ -1333,6 +1334,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
if (frame.frame_type != AMQP_FRAME_METHOD) continue;
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe;
+ cur_channel = frame.channel;
+
delivery_info = PyDict_New();
deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded;
/* need consumer tag for later.
@@ -1351,7 +1354,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
}
Py_BEGIN_ALLOW_THREADS;
- retval = amqp_simple_wait_frame(conn, &frame);
+ retval = cur_channel == 0 ?
+ amqp_simple_wait_frame(conn, &frame) :
+ amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;
@@ -1361,6 +1366,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
goto finally;
}
+ /* if piggybacked, 'channel' is still 0 at this point */
+ cur_channel = frame.channel;
+
/* channel */
channel = PyInt_FromLong((unsigned long)frame.channel);
@@ -1374,7 +1382,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
for (i = 0; body_received < body_target; i++) {
Py_BEGIN_ALLOW_THREADS;
- retval = amqp_simple_wait_frame(conn, &frame);
+ retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;