summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsif Saifuddin Auvi <auvipy@users.noreply.github.com>2018-01-15 16:55:14 +0600
committerGitHub <noreply@github.com>2018-01-15 16:55:14 +0600
commit5b9bf23124be4f007907483d6c35bf0c28597162 (patch)
tree7af1897c4672d2f28042cfbb457466b8b87a5bac
parent3e73ff8071788d06ff7ccb557d7877df0a18983a (diff)
parentaea11bc941479abdcc7532bc142fcd4109a36934 (diff)
downloadlibrabbitmq-5b9bf23124be4f007907483d6c35bf0c28597162.tar.gz
Merge pull request #98 from merutak/recv-same-channel
in recv, make sure all frames are read from the same channel
-rw-r--r--Modules/_librabbitmq/connection.c12
1 files changed, 10 insertions, 2 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 29a0e52..54f2427 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -1305,6 +1305,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
amqp_connection_state_t conn, int piggyback)
{
amqp_frame_t frame;
+ amqp_channel_t cur_channel = 0;
amqp_basic_deliver_t *deliver;
amqp_basic_properties_t *props;
Py_ssize_t body_target;
@@ -1333,6 +1334,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
if (frame.frame_type != AMQP_FRAME_METHOD) continue;
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe;
+ cur_channel = frame.channel;
+
delivery_info = PyDict_New();
deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded;
/* need consumer tag for later.
@@ -1351,7 +1354,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
}
Py_BEGIN_ALLOW_THREADS;
- retval = amqp_simple_wait_frame(conn, &frame);
+ retval = cur_channel == 0 ?
+ amqp_simple_wait_frame(conn, &frame) :
+ amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;
@@ -1361,6 +1366,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
goto finally;
}
+ /* if piggybacked, 'channel' is still 0 at this point */
+ cur_channel = frame.channel;
+
/* channel */
channel = PyInt_FromLong((unsigned long)frame.channel);
@@ -1374,7 +1382,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
for (i = 0; body_received < body_target; i++) {
Py_BEGIN_ALLOW_THREADS;
- retval = amqp_simple_wait_frame(conn, &frame);
+ retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;