diff options
-rw-r--r-- | Modules/_librabbitmq/connection.c | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 29a0e52..54f2427 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -1305,6 +1305,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, amqp_connection_state_t conn, int piggyback) { amqp_frame_t frame; + amqp_channel_t cur_channel = 0; amqp_basic_deliver_t *deliver; amqp_basic_properties_t *props; Py_ssize_t body_target; @@ -1333,6 +1334,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe; + cur_channel = frame.channel; + delivery_info = PyDict_New(); deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded; /* need consumer tag for later. @@ -1351,7 +1354,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, } Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = cur_channel == 0 ? + amqp_simple_wait_frame(conn, &frame) : + amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; @@ -1361,6 +1366,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, goto finally; } + /* if piggybacked, 'channel' is still 0 at this point */ + cur_channel = frame.channel; + /* channel */ channel = PyInt_FromLong((unsigned long)frame.channel); @@ -1374,7 +1382,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, for (i = 0; body_received < body_target; i++) { Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; |