diff options
author | Asif Saifuddin Auvi <auvipy@users.noreply.github.com> | 2018-01-15 16:55:14 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-15 16:55:14 +0600 |
commit | 5b9bf23124be4f007907483d6c35bf0c28597162 (patch) | |
tree | 7af1897c4672d2f28042cfbb457466b8b87a5bac | |
parent | 3e73ff8071788d06ff7ccb557d7877df0a18983a (diff) | |
parent | aea11bc941479abdcc7532bc142fcd4109a36934 (diff) | |
download | librabbitmq-5b9bf23124be4f007907483d6c35bf0c28597162.tar.gz |
Merge pull request #98 from merutak/recv-same-channel
in recv, make sure all frames are read from the same channel
-rw-r--r-- | Modules/_librabbitmq/connection.c | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 29a0e52..54f2427 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -1305,6 +1305,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, amqp_connection_state_t conn, int piggyback) { amqp_frame_t frame; + amqp_channel_t cur_channel = 0; amqp_basic_deliver_t *deliver; amqp_basic_properties_t *props; Py_ssize_t body_target; @@ -1333,6 +1334,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe; + cur_channel = frame.channel; + delivery_info = PyDict_New(); deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded; /* need consumer tag for later. @@ -1351,7 +1354,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, } Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = cur_channel == 0 ? + amqp_simple_wait_frame(conn, &frame) : + amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; @@ -1361,6 +1366,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, goto finally; } + /* if piggybacked, 'channel' is still 0 at this point */ + cur_channel = frame.channel; + /* channel */ channel = PyInt_FromLong((unsigned long)frame.channel); @@ -1374,7 +1382,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, for (i = 0; body_received < body_target; i++) { Py_BEGIN_ALLOW_THREADS; - retval = amqp_simple_wait_frame(conn, &frame); + retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; |