diff options
author | Amichai Schreiber <amichai@workey.co> | 2017-08-17 23:56:58 +0300 |
---|---|---|
committer | Amichai Schreiber <amichai@workey.co> | 2017-08-17 23:56:58 +0300 |
commit | aea11bc941479abdcc7532bc142fcd4109a36934 (patch) | |
tree | 38298f225ccd800ddb623175d30d1ed3e3acce8a | |
parent | 91100605c60bab96c67330352f984a9bdf72d45c (diff) | |
download | librabbitmq-aea11bc941479abdcc7532bc142fcd4109a36934.tar.gz |
in recv, make sure all frames are read from the same channel. seems to fix #97
-rw-r--r-- | Modules/_librabbitmq/connection.c | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 6626e47..10b0ddf 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -1302,6 +1302,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, amqp_connection_state_t conn, int piggyback) { amqp_frame_t frame; + amqp_channel_t cur_channel = 0; amqp_basic_deliver_t *deliver; amqp_basic_properties_t *props; PY_SIZE_TYPE body_target; @@ -1330,6 +1331,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe; + cur_channel = frame.channel; + delivery_info = PyDict_New(); deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded; /* need consumer tag for later. @@ -1348,7 +1351,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, } Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = cur_channel == 0 ? + amqp_simple_wait_frame(conn, &frame) : + amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; @@ -1358,6 +1363,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, goto finally; } + /* if piggybacked, 'channel' is still 0 at this point */ + cur_channel = frame.channel; + /* channel */ channel = PyInt_FromLong((unsigned long)frame.channel); @@ -1371,7 +1379,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, for (i = 0; body_received < body_target; i++) { Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; |