summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-05-28 17:23:49 +0100
committerAsk Solem <ask@celeryproject.org>2014-05-28 17:23:49 +0100
commit530829374136d14da1758879830d4f784ab07b87 (patch)
tree18e81f6d8ec93dbbb2fe20adbfcf6f5deab54a2d
parent6b79a8917cbce0c3e431bb0866ba2729e6f9c77a (diff)
downloadlibrabbitmq-530829374136d14da1758879830d4f784ab07b87.tar.gz
_recv now handles CONNECTION_CLOSE and CHANNEL_CLOSE frames. Closes #30
-rw-r--r--Modules/_librabbitmq/connection.c43
1 files changed, 40 insertions, 3 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 400144c..850e6c4 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -1185,6 +1185,40 @@ finally:
return retval;
}
+void
+PyRabbitMQ_SetErr_ReceivedFrame(PyRabbitMQ_Connection *self, amqp_frame_t* frame)
+{
+ static char errstr[512];
+
+ switch(frame->payload.method.id) {
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *chanm = (amqp_channel_close_t *)frame->payload.method.decoded;
+ snprintf(errstr, sizeof(errstr),
+ "channel error %d, message: %.*s",
+ chanm->reply_code,
+ (int) chanm->reply_text.len,
+ (char *) chanm->reply_text.bytes);
+ PyErr_SetString(PyRabbitMQExc_ChannelError, errstr);
+ PyRabbitMQ_revive_channel(self, frame->channel);
+ break;
+ }
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *connm = (amqp_connection_close_t *)frame->payload.method.decoded;
+ snprintf(errstr, sizeof(errstr),
+ "server connection error %d message: %.*s",
+ connm->reply_code,
+ (int) connm->reply_text.len,
+ (char *) connm->reply_text.bytes);
+ PyErr_SetString(PyRabbitMQExc_ConnectionError, errstr);
+ PyRabbitMQ_Connection_close(self);
+ break;
+ }
+ default: {
+ PyErr_SetString(PyRabbitMQExc_ConnectionError, "Bad frame read");
+ break;
+ }
+ }
+}
int
PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
@@ -1216,9 +1250,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
retval = amqp_simple_wait_frame(conn, &frame);
Py_END_ALLOW_THREADS;
if (retval < 0) break;
- if (frame.frame_type != AMQP_FRAME_METHOD
- || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
+ if (frame.frame_type != AMQP_FRAME_METHOD) continue;
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe;
delivery_info = PyDict_New();
deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded;
@@ -1242,6 +1275,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
Py_END_ALLOW_THREADS;
if (retval < 0) break;
+ if (frame.frame_type == AMQP_FRAME_METHOD) goto altframe;
if (frame.frame_type != AMQP_FRAME_HEADER) {
PyRabbitMQ_SetErr_UnexpectedHeader(&frame);
goto finally;
@@ -1264,6 +1298,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
Py_END_ALLOW_THREADS;
if (retval < 0) break;
+ if (frame.frame_type == AMQP_FRAME_METHOD) goto altframe;
if (frame.frame_type != AMQP_FRAME_BODY) {
PyErr_SetString(PyRabbitMQExc_ChannelError,
"Expected body, got unexpected frame");
@@ -1313,6 +1348,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
}
goto finally;
+altframe:
+ PyRabbitMQ_SetErr_ReceivedFrame(self, &frame);
error:
retval = -1;