summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-10-22 15:59:12 +0100
committerAsk Solem <ask@celeryproject.org>2012-11-02 16:24:30 +0000
commit999dd7176f60b1cd18aa6ace86763903fe8b3cc1 (patch)
treec29cd3bcb3447aee88cedc2da095ebad0682e348
parent4eed981f89faa4cf683a31a2c14f35d3131432ec (diff)
downloadlibrabbitmq-999dd7176f60b1cd18aa6ace86763903fe8b3cc1.tar.gz
Channel exceptions now restores channel instead of closing the connection
-rw-r--r--Modules/_librabbitmq/connection.c199
-rw-r--r--librabbitmq/__init__.py13
-rw-r--r--pavement.py5
3 files changed, 121 insertions, 96 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 7ca5b40..77f18bb 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -11,6 +11,9 @@
#include "distmeta.h"
#include "_amqstate.h"
+#define PYRABBITMQ_CONNECTION_ERROR 0x10
+#define PYRABBITMQ_CHANNEL_ERROR 0x20
+
/* ------: Private Prototypes :------------------------------------------- */
PyMODINIT_FUNC init_librabbitmq(void);
@@ -62,9 +65,16 @@ int
PyRabbitMQ_recv(PyRabbitMQ_Connection *, PyObject *,
amqp_connection_state_t, int);
+unsigned int
+PyRabbitMQ_revive_channel(PyRabbitMQ_Connection *, unsigned int);
+
+ unsigned int
+PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *, unsigned int);
+
int PyRabbitMQ_HandleError(int, char const *);
_PYRMQ_INLINE int PyRabbitMQ_HandlePollError(int);
-int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t, PyObject *, const char *);
+int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int,
+ amqp_rpc_reply_t, const char *);
void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*);
int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *);
@@ -503,19 +513,19 @@ PyRabbitMQ_HandlePollError(int ready)
}
-int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply,
- PyObject *exc_cls, const char *context)
+int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *self, unsigned int channel,
+ amqp_rpc_reply_t reply, const char *context)
{
char errorstr[1024];
switch (reply.reply_type) {
case AMQP_RESPONSE_NORMAL:
- return 1;
+ return 0;
case AMQP_RESPONSE_NONE:
snprintf(errorstr, sizeof(errorstr),
"%s: missing RPC reply type!", context);
- break;
+ goto connerror;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
snprintf(errorstr, sizeof(errorstr), "%s: %s",
@@ -523,7 +533,7 @@ int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply,
reply.library_error
? amqp_error_string(reply.library_error)
: "(end-of-stream)");
- break;
+ goto connerror;
case AMQP_RESPONSE_SERVER_EXCEPTION:
@@ -535,7 +545,7 @@ int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply,
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
- break;
+ goto connerror;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) reply.reply.decoded;
@@ -543,18 +553,24 @@ int PyRabbitMQ_HandleAMQError(amqp_rpc_reply_t reply,
"%s: server channel error %d, message: %.*s",
context, m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
- break;
+ goto chanerror;
}
default:
snprintf(errorstr, sizeof(errorstr),
"%s: unknown server error, method id 0x%08X",
context, reply.reply.id);
- break;
+ goto connerror;
}
break;
}
- PyErr_SetString(exc_cls, errorstr);
- return 0;
+connerror:
+ PyErr_SetString(PyRabbitMQExc_ConnectionError, errorstr);
+ PyRabbitMQ_Connection_close(self);
+ return PYRABBITMQ_CONNECTION_ERROR;
+chanerror:
+ PyErr_SetString(PyRabbitMQExc_ChannelError, errorstr);
+ PyRabbitMQ_revive_channel(self, channel);
+ return PYRABBITMQ_CHANNEL_ERROR;
}
@@ -566,6 +582,21 @@ void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t* frame)
PyErr_SetString(PyRabbitMQExc_ChannelError, errorstr);
}
+unsigned int
+PyRabbitMQ_revive_channel(PyRabbitMQ_Connection *self, unsigned int channel)
+{
+ int status = -1;
+ amqp_channel_close_ok_t req;
+ status = amqp_send_method(self->conn, (amqp_channel_t)channel,
+ AMQP_CHANNEL_CLOSE_OK_METHOD, &req);
+ if (status < 0) {
+ PyErr_SetString(PyRabbitMQExc_ConnectionError, "Couldn't revive channel");
+ PyRabbitMQ_Connection_close(self);
+ return 1;
+ }
+ return PyRabbitMQ_Connection_create_channel(self, channel);
+}
+
/* ------: Connection :--------------------------------------------------- */
/*
@@ -691,9 +722,8 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
self->frame_max, self->heartbeat,
AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ConnectionError, "Couldn't log in"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't log in"))
+ goto bail;
/* after tune */
self->connected = 1;
@@ -730,6 +760,20 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self)
}
+unsigned int
+PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel)
+{
+ amqp_rpc_reply_t reply;
+
+ Py_BEGIN_ALLOW_THREADS;
+ amqp_channel_open(self->conn, channel);
+ reply = amqp_get_rpc_reply(self->conn);
+ Py_END_ALLOW_THREADS;
+
+ return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel");
+}
+
+
/*
* Connection._channel_open
*/
@@ -737,7 +781,6 @@ static PyObject *
PyRabbitMQ_Connection_channel_open(PyRabbitMQ_Connection *self, PyObject *args)
{
unsigned int channel;
- amqp_rpc_reply_t reply;
if (PyRabbitMQ_Not_Connected(self))
goto bail;
@@ -745,19 +788,10 @@ PyRabbitMQ_Connection_channel_open(PyRabbitMQ_Connection *self, PyObject *args)
if (!PyArg_ParseTuple(args, "I", &channel))
goto bail;
- Py_BEGIN_ALLOW_THREADS;
- amqp_channel_open(self->conn, channel);
- reply = amqp_get_rpc_reply(self->conn);
- Py_END_ALLOW_THREADS;
-
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "Couldn't create channel"))
- goto error;
+ if (PyRabbitMQ_Connection_create_channel(self, channel))
+ goto bail;;
Py_RETURN_NONE;
-
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -766,12 +800,25 @@ bail:
/*
* Connection._channel_close
*/
+
+unsigned int
+PyRabbitMQ_Connection_destroy_channel(PyRabbitMQ_Connection *self,
+ unsigned int channel)
+{
+ amqp_rpc_reply_t reply;
+ Py_BEGIN_ALLOW_THREADS;
+ reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS);
+ amqp_maybe_release_buffers(self->conn);
+ Py_END_ALLOW_THREADS;
+
+ return PyRabbitMQ_HandleAMQError(self, channel, reply, "Couldn't close channel");
+}
+
static PyObject*
PyRabbitMQ_Connection_channel_close(PyRabbitMQ_Connection *self,
PyObject *args)
{
unsigned int channel = 0;
- amqp_rpc_reply_t reply;
if (PyRabbitMQ_Not_Connected(self))
goto error;
@@ -779,13 +826,7 @@ PyRabbitMQ_Connection_channel_close(PyRabbitMQ_Connection *self,
if (!PyArg_ParseTuple(args, "I", &channel))
goto error;
- Py_BEGIN_ALLOW_THREADS;
- reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS);
- amqp_maybe_release_buffers(self->conn);
- Py_END_ALLOW_THREADS;
-
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "Couldn't close channel"))
+ if (PyRabbitMQ_Connection_destroy_channel(self, channel))
goto error;
Py_RETURN_NONE;
@@ -1031,13 +1072,10 @@ PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "queue.bind"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind"))
+ goto bail;
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1082,13 +1120,10 @@ PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "queue.unbind"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind"))
+ goto bail;
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1126,13 +1161,11 @@ PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (ok == NULL && !PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "queue.delete"))
- goto error;
+ if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel,
+ reply, "queue.delete"))
+ goto bail;
return PyInt_FromLong((long)ok->message_count);
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1182,9 +1215,8 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
reply = amqp_get_rpc_reply(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "queue.declare"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare"))
+ goto bail;
if ((ret = PyTuple_New(3)) == NULL) goto bail;
PyTuple_SET_ITEM(ret, 0, PyString_FromStringAndSize(ok->queue.bytes,
@@ -1192,8 +1224,6 @@ PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self,
PyTuple_SET_ITEM(ret, 1, PyInt_FromLong((long)ok->message_count));
PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count));
return ret;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1227,13 +1257,10 @@ PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "queue.purge"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge"))
+ goto bail;
return PyInt_FromLong((long)ok->message_count);
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1283,12 +1310,9 @@ PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self,
reply = amqp_get_rpc_reply(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "exchange.declare"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare"))
+ goto bail;
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1321,13 +1345,10 @@ PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "exchange.delete"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete"))
+ goto bail;
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1387,7 +1408,7 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
Py_RETURN_NONE;
error:
- PyRabbitMQ_Connection_close(self);
+ PyRabbitMQ_revive_channel(self, channel);
bail:
return 0;
}
@@ -1422,7 +1443,7 @@ PyRabbitMQ_Connection_basic_ack(PyRabbitMQ_Connection *self,
Py_RETURN_NONE;
error:
- PyRabbitMQ_Connection_close(self);
+ PyRabbitMQ_revive_channel(self, channel);
bail:
return 0;
}
@@ -1456,7 +1477,7 @@ static PyObject *PyRabbitMQ_Connection_basic_reject(PyRabbitMQ_Connection *self,
Py_RETURN_NONE;
error:
- PyRabbitMQ_Connection_close(self);
+ PyRabbitMQ_revive_channel(self, channel);
bail:
return 0;
}
@@ -1489,14 +1510,11 @@ PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "basic.cancel"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel"))
+ goto bail;
ok:
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1545,13 +1563,10 @@ PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self,
reply = amqp_get_rpc_reply(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "basic.consume"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume"))
+ goto bail;
return PySTRING_FROM_AMQBYTES(ok->consumer_tag);
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1612,13 +1627,10 @@ PyRabbitMQ_Connection_flow(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "channel.flow"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "channel.flow"))
+ goto bail;
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1648,13 +1660,10 @@ PyRabbitMQ_Connection_basic_recover(PyRabbitMQ_Connection *self,
amqp_maybe_release_buffers(self->conn);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "basic.recover"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.recover"))
+ goto bail;
Py_RETURN_NONE;
-error:
- PyRabbitMQ_Connection_close(self);
bail:
return 0;
}
@@ -1736,9 +1745,8 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self,
(amqp_boolean_t)no_ack);
Py_END_ALLOW_THREADS;
- if (!PyRabbitMQ_HandleAMQError(reply,
- PyRabbitMQExc_ChannelError, "basic.get"))
- goto error;
+ if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.get"))
+ goto bail;
if (reply.reply.id != AMQP_BASIC_GET_OK_METHOD)
goto empty;
@@ -1764,7 +1772,6 @@ PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self,
}
}
return p;
-
error:
PyRabbitMQ_Connection_close(self);
bail:
diff --git a/librabbitmq/__init__.py b/librabbitmq/__init__.py
index cce3a08..c6fedd7 100644
--- a/librabbitmq/__init__.py
+++ b/librabbitmq/__init__.py
@@ -33,6 +33,7 @@ class Message(object):
def reject(self):
return self.channel.basic_reject(self.delivery_info['delivery_tag'])
+
class Channel(object):
Message = Message
is_open = False
@@ -43,6 +44,12 @@ class Channel(object):
self.next_consumer_tag = itertools.count(1).next
self.no_ack_consumers = set()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_info):
+ self.close()
+
def basic_qos(self, prefetch_size=0, prefetch_count=0, _global=False):
return self.connection._basic_qos(self.channel_id,
prefetch_size, prefetch_count, _global)
@@ -169,6 +176,12 @@ class Connection(_librabbitmq.Connection):
if not lazy:
self.connect()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_info):
+ self.close()
+
def reconnect(self):
self.close()
self.connect()
diff --git a/pavement.py b/pavement.py
index 4ba2f56..16cc9fb 100644
--- a/pavement.py
+++ b/pavement.py
@@ -105,6 +105,11 @@ def test(options):
@task
+def funtest(options):
+ sh('make install && (cd funtests; python setup.py test)')
+
+
+@task
@cmdopts([
("noerror", "E", "Ignore errors"),
])