summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmichai Schreiber <amichai@workey.co>2017-08-17 23:56:58 +0300
committerAmichai Schreiber <amichai@workey.co>2017-08-17 23:56:58 +0300
commitaea11bc941479abdcc7532bc142fcd4109a36934 (patch)
tree38298f225ccd800ddb623175d30d1ed3e3acce8a
parent91100605c60bab96c67330352f984a9bdf72d45c (diff)
downloadlibrabbitmq-aea11bc941479abdcc7532bc142fcd4109a36934.tar.gz
in recv, make sure all frames are read from the same channel. seems to fix #97
-rw-r--r--Modules/_librabbitmq/connection.c12
1 files changed, 10 insertions, 2 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 6626e47..10b0ddf 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -1302,6 +1302,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
amqp_connection_state_t conn, int piggyback)
{
amqp_frame_t frame;
+ amqp_channel_t cur_channel = 0;
amqp_basic_deliver_t *deliver;
amqp_basic_properties_t *props;
PY_SIZE_TYPE body_target;
@@ -1330,6 +1331,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
if (frame.frame_type != AMQP_FRAME_METHOD) continue;
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe;
+ cur_channel = frame.channel;
+
delivery_info = PyDict_New();
deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded;
/* need consumer tag for later.
@@ -1348,7 +1351,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
}
Py_BEGIN_ALLOW_THREADS;
- retval = amqp_simple_wait_frame(conn, &frame);
+ retval = cur_channel == 0 ?
+ amqp_simple_wait_frame(conn, &frame) :
+ amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;
@@ -1358,6 +1363,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
goto finally;
}
+ /* if piggybacked, 'channel' is still 0 at this point */
+ cur_channel = frame.channel;
+
/* channel */
channel = PyInt_FromLong((unsigned long)frame.channel);
@@ -1371,7 +1379,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
for (i = 0; body_received < body_target; i++) {
Py_BEGIN_ALLOW_THREADS;
- retval = amqp_simple_wait_frame(conn, &frame);
+ retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;