diff options
author | Ask Solem <ask@celeryproject.org> | 2014-05-28 17:23:49 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-05-28 17:23:49 +0100 |
commit | 530829374136d14da1758879830d4f784ab07b87 (patch) | |
tree | 18e81f6d8ec93dbbb2fe20adbfcf6f5deab54a2d | |
parent | 6b79a8917cbce0c3e431bb0866ba2729e6f9c77a (diff) | |
download | librabbitmq-530829374136d14da1758879830d4f784ab07b87.tar.gz |
_recv now handles CONNECTION_CLOSE and CHANNEL_CLOSE frames. Closes #30
-rw-r--r-- | Modules/_librabbitmq/connection.c | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 400144c..850e6c4 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -1185,6 +1185,40 @@ finally: return retval; } +void +PyRabbitMQ_SetErr_ReceivedFrame(PyRabbitMQ_Connection *self, amqp_frame_t* frame) +{ + static char errstr[512]; + + switch(frame->payload.method.id) { + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *chanm = (amqp_channel_close_t *)frame->payload.method.decoded; + snprintf(errstr, sizeof(errstr), + "channel error %d, message: %.*s", + chanm->reply_code, + (int) chanm->reply_text.len, + (char *) chanm->reply_text.bytes); + PyErr_SetString(PyRabbitMQExc_ChannelError, errstr); + PyRabbitMQ_revive_channel(self, frame->channel); + break; + } + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *connm = (amqp_connection_close_t *)frame->payload.method.decoded; + snprintf(errstr, sizeof(errstr), + "server connection error %d message: %.*s", + connm->reply_code, + (int) connm->reply_text.len, + (char *) connm->reply_text.bytes); + PyErr_SetString(PyRabbitMQExc_ConnectionError, errstr); + PyRabbitMQ_Connection_close(self); + break; + } + default: { + PyErr_SetString(PyRabbitMQExc_ConnectionError, "Bad frame read"); + break; + } + } +} int PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, @@ -1216,9 +1250,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, retval = amqp_simple_wait_frame(conn, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; - if (frame.frame_type != AMQP_FRAME_METHOD - || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; + if (frame.frame_type != AMQP_FRAME_METHOD) continue; + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe; delivery_info = PyDict_New(); deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded; @@ -1242,6 +1275,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, Py_END_ALLOW_THREADS; if (retval < 0) break; + if (frame.frame_type == AMQP_FRAME_METHOD) goto altframe; if (frame.frame_type != AMQP_FRAME_HEADER) { PyRabbitMQ_SetErr_UnexpectedHeader(&frame); goto finally; @@ -1264,6 +1298,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, Py_END_ALLOW_THREADS; if (retval < 0) break; + if (frame.frame_type == AMQP_FRAME_METHOD) goto altframe; if (frame.frame_type != AMQP_FRAME_BODY) { PyErr_SetString(PyRabbitMQExc_ChannelError, "Expected body, got unexpected frame"); @@ -1313,6 +1348,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, } goto finally; +altframe: + PyRabbitMQ_SetErr_ReceivedFrame(self, &frame); error: retval = -1; |