diff options
author | Ask Solem <ask@celeryproject.org> | 2012-10-22 15:59:12 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-11-02 16:24:30 +0000 |
commit | 999dd7176f60b1cd18aa6ace86763903fe8b3cc1 (patch) | |
tree | c29cd3bcb3447aee88cedc2da095ebad0682e348 | |
parent | 4eed981f89faa4cf683a31a2c14f35d3131432ec (diff) | |
download | librabbitmq-999dd7176f60b1cd18aa6ace86763903fe8b3cc1.tar.gz |
Channel exceptions now restores channel instead of closing the connection
-rw-r--r-- | Modules/_librabbitmq/connection.c | 199 | ||||
-rw-r--r-- | librabbitmq/__init__.py | 13 | ||||
-rw-r--r-- | pavement.py | 5 |
3 files changed, 121 insertions, 96 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 7ca5b40..77f18bb 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -11,6 +11,9 @@ #include "distmeta.h" #include "_amqstate.h" +#define PYRABBITMQ_CONNECTION_ERROR 0x10 +#define PYRABBITMQ_CHANNEL_ERROR 0x20 + /* ------: Private Prototypes :------------------------------------------- */ PyMODINIT_FUNC init_librabbitmq(void); @@ -62,9 +65,16 @@ int PyRabbitMQ_recv(PyRabbitMQ_Connection *, PyObject *, amqp_connection_state_t, int); +unsigned int +PyRabbitMQ_revive_channel(PyRabbitMQ_Connection *, unsigned int); + + unsigned int +PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *, unsigned int); + int PyRabbitMQ_HandleError(int, char const *); _PYRMQ_INLINE int PyRabbitMQ_HandlePollError(int); -int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t, PyObject *, const char *); +int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int, + amqp_rpc_reply_t, const char *); void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*); int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *); @@ -503,19 +513,19 @@ PyRabbitMQ_HandlePollError(int ready) } -int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply, - PyObject *exc_cls, const char *context) +int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *self, unsigned int channel, + amqp_rpc_reply_t reply, const char *context) { char errorstr[1024]; switch (reply.reply_type) { case AMQP_RESPONSE_NORMAL: - return 1; + return 0; case AMQP_RESPONSE_NONE: snprintf(errorstr, sizeof(errorstr), "%s: missing RPC reply type!", context); - break; + goto connerror; case AMQP_RESPONSE_LIBRARY_EXCEPTION: snprintf(errorstr, sizeof(errorstr), "%s: %s", @@ -523,7 +533,7 @@ int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply, reply.library_error ? amqp_error_string(reply.library_error) : "(end-of-stream)"); - break; + goto connerror; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -535,7 +545,7 @@ int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply, context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; + goto connerror; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *) reply.reply.decoded; @@ -543,18 +553,24 @@ int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply, "%s: server channel error %d, message: %.*s", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; + goto chanerror; } default: snprintf(errorstr, sizeof(errorstr), "%s: unknown server error, method id 0x%08X", context, reply.reply.id); - break; + goto connerror; } break; } - PyErr_SetString(exc_cls, errorstr); - return 0; +connerror: + PyErr_SetString(PyRabbitMQExc_ConnectionError, errorstr); + PyRabbitMQ_Connection_close(self); + return PYRABBITMQ_CONNECTION_ERROR; +chanerror: + PyErr_SetString(PyRabbitMQExc_ChannelError, errorstr); + PyRabbitMQ_revive_channel(self, channel); + return PYRABBITMQ_CHANNEL_ERROR; } @@ -566,6 +582,21 @@ void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t* frame) PyErr_SetString(PyRabbitMQExc_ChannelError, errorstr); } +unsigned int +PyRabbitMQ_revive_channel(PyRabbitMQ_Connection *self, unsigned int channel) +{ + int status = -1; + amqp_channel_close_ok_t req; + status = amqp_send_method(self->conn, (amqp_channel_t)channel, + AMQP_CHANNEL_CLOSE_OK_METHOD, &req); + if (status < 0) { + PyErr_SetString(PyRabbitMQExc_ConnectionError, "Couldn't revive channel"); + PyRabbitMQ_Connection_close(self); + return 1; + } + return PyRabbitMQ_Connection_create_channel(self, channel); +} + /* ------: Connection :--------------------------------------------------- */ /* @@ -691,9 +722,8 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) self->frame_max, self->heartbeat, AMQP_SASL_METHOD_PLAIN, self->userid, self->password); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ConnectionError, "Couldn't log in")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't log in")) + goto bail; /* after tune */ self->connected = 1; @@ -730,6 +760,20 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self) } +unsigned int +PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel) +{ + amqp_rpc_reply_t reply; + + Py_BEGIN_ALLOW_THREADS; + amqp_channel_open(self->conn, channel); + reply = amqp_get_rpc_reply(self->conn); + Py_END_ALLOW_THREADS; + + return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel"); +} + + /* * Connection._channel_open */ @@ -737,7 +781,6 @@ static PyObject * PyRabbitMQ_Connection_channel_open(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel; - amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; @@ -745,19 +788,10 @@ PyRabbitMQ_Connection_channel_open(PyRabbitMQ_Connection *self, PyObject *args) if (!PyArg_ParseTuple(args, "I", &channel)) goto bail; - Py_BEGIN_ALLOW_THREADS; - amqp_channel_open(self->conn, channel); - reply = amqp_get_rpc_reply(self->conn); - Py_END_ALLOW_THREADS; - - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "Couldn't create channel")) - goto error; + if (PyRabbitMQ_Connection_create_channel(self, channel)) + goto bail;; Py_RETURN_NONE; - -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -766,12 +800,25 @@ bail: /* * Connection._channel_close */ + +unsigned int +PyRabbitMQ_Connection_destroy_channel(PyRabbitMQ_Connection *self, + unsigned int channel) +{ + amqp_rpc_reply_t reply; + Py_BEGIN_ALLOW_THREADS; + reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS); + amqp_maybe_release_buffers(self->conn); + Py_END_ALLOW_THREADS; + + return PyRabbitMQ_HandleAMQError(self, channel, reply, "Couldn't close channel"); +} + static PyObject* PyRabbitMQ_Connection_channel_close(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel = 0; - amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto error; @@ -779,13 +826,7 @@ PyRabbitMQ_Connection_channel_close(PyRabbitMQ_Connection *self, if (!PyArg_ParseTuple(args, "I", &channel)) goto error; - Py_BEGIN_ALLOW_THREADS; - reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS); - amqp_maybe_release_buffers(self->conn); - Py_END_ALLOW_THREADS; - - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "Couldn't close channel")) + if (PyRabbitMQ_Connection_destroy_channel(self, channel)) goto error; Py_RETURN_NONE; @@ -1031,13 +1072,10 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "queue.bind")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind")) + goto bail; Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1082,13 +1120,10 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "queue.unbind")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind")) + goto bail; Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1126,13 +1161,11 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (ok == NULL && !PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "queue.delete")) - goto error; + if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel, + reply, "queue.delete")) + goto bail; return PyInt_FromLong((long)ok->message_count); -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1182,9 +1215,8 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "queue.declare")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare")) + goto bail; if ((ret = PyTuple_New(3)) == NULL) goto bail; PyTuple_SET_ITEM(ret, 0, PyString_FromStringAndSize(ok->queue.bytes, @@ -1192,8 +1224,6 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, PyTuple_SET_ITEM(ret, 1, PyInt_FromLong((long)ok->message_count)); PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count)); return ret; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1227,13 +1257,10 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "queue.purge")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge")) + goto bail; return PyInt_FromLong((long)ok->message_count); -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1283,12 +1310,9 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "exchange.declare")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare")) + goto bail; Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1321,13 +1345,10 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "exchange.delete")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete")) + goto bail; Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1387,7 +1408,7 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, Py_RETURN_NONE; error: - PyRabbitMQ_Connection_close(self); + PyRabbitMQ_revive_channel(self, channel); bail: return 0; } @@ -1422,7 +1443,7 @@ PyRabbitMQ_Connection_basic_ack(PyRabbitMQ_Connection *self, Py_RETURN_NONE; error: - PyRabbitMQ_Connection_close(self); + PyRabbitMQ_revive_channel(self, channel); bail: return 0; } @@ -1456,7 +1477,7 @@ static PyObject *PyRabbitMQ_Connection_basic_reject(PyRabbitMQ_Connection *self, Py_RETURN_NONE; error: - PyRabbitMQ_Connection_close(self); + PyRabbitMQ_revive_channel(self, channel); bail: return 0; } @@ -1489,14 +1510,11 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "basic.cancel")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel")) + goto bail; ok: Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1545,13 +1563,10 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "basic.consume")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume")) + goto bail; return PySTRING_FROM_AMQBYTES(ok->consumer_tag); -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1612,13 +1627,10 @@ PyRabbitMQ_Connection_flow(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "channel.flow")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "channel.flow")) + goto bail; Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1648,13 +1660,10 @@ PyRabbitMQ_Connection_basic_recover(PyRabbitMQ_Connection *self, amqp_maybe_release_buffers(self->conn); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "basic.recover")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.recover")) + goto bail; Py_RETURN_NONE; -error: - PyRabbitMQ_Connection_close(self); bail: return 0; } @@ -1736,9 +1745,8 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self, (amqp_boolean_t)no_ack); Py_END_ALLOW_THREADS; - if (!PyRabbitMQ_HandleAMQError(reply, - PyRabbitMQExc_ChannelError, "basic.get")) - goto error; + if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.get")) + goto bail; if (reply.reply.id != AMQP_BASIC_GET_OK_METHOD) goto empty; @@ -1764,7 +1772,6 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self, } } return p; - error: PyRabbitMQ_Connection_close(self); bail: diff --git a/librabbitmq/__init__.py b/librabbitmq/__init__.py index cce3a08..c6fedd7 100644 --- a/librabbitmq/__init__.py +++ b/librabbitmq/__init__.py @@ -33,6 +33,7 @@ class Message(object): def reject(self): return self.channel.basic_reject(self.delivery_info['delivery_tag']) + class Channel(object): Message = Message is_open = False @@ -43,6 +44,12 @@ class Channel(object): self.next_consumer_tag = itertools.count(1).next self.no_ack_consumers = set() + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + def basic_qos(self, prefetch_size=0, prefetch_count=0, _global=False): return self.connection._basic_qos(self.channel_id, prefetch_size, prefetch_count, _global) @@ -169,6 +176,12 @@ class Connection(_librabbitmq.Connection): if not lazy: self.connect() + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + def reconnect(self): self.close() self.connect() diff --git a/pavement.py b/pavement.py index 4ba2f56..16cc9fb 100644 --- a/pavement.py +++ b/pavement.py @@ -105,6 +105,11 @@ def test(options): @task +def funtest(options): + sh('make install && (cd funtests; python setup.py test)') + + +@task @cmdopts([ ("noerror", "E", "Ignore errors"), ]) |