diff options
author | Roger Hu <roger.hu@gmail.com> | 2014-06-16 08:01:15 -0700 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-10-30 12:17:28 -0700 |
commit | f5634808758139265656d2a840f38060ee952336 (patch) | |
tree | fbae2aa3ce82f61b4b4f4493d81ebeeb266197ae | |
parent | 948e2bfea9d496299e0e1cb1be01cb4358263c38 (diff) | |
download | librabbitmq-f5634808758139265656d2a840f38060ee952336.tar.gz |
Allow AMQP client properties to be exposed when connecting to RMQ broker.
We can leverage RMQ extensions similar to https://github.com/ruby-amqp/bunny/tree/master/examples/connection and
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/session.rb#L56-65.
Add ability to set boolean parameters too.
Switching back to simply args.
Uses __init__() to pass in client_properties instead of the connect() parameter.
-rw-r--r-- | Modules/_librabbitmq/connection.c | 36 | ||||
-rw-r--r-- | Modules/_librabbitmq/connection.h | 1 | ||||
-rw-r--r-- | librabbitmq/__init__.py | 4 |
3 files changed, 32 insertions, 9 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 2ca4933..8b26a10 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -936,6 +936,7 @@ PyRabbitMQ_ConnectionType_dealloc(PyRabbitMQ_Connection *self) if (self->weakreflist != NULL) PyObject_ClearWeakRefs((PyObject*)self); Py_XDECREF(self->callbacks); + Py_XDECREF(self->client_properties); Py_XDECREF(self->server_properties); self->ob_type->tp_free(self); } @@ -957,6 +958,7 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, "channel_max", "frame_max", "heartbeat", + "client_properties", NULL }; char *hostname = "localhost"; @@ -967,10 +969,11 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, int frame_max = 131072; int heartbeat = 0; int port = 5672; + PyObject *client_properties = NULL; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiii", kwlist, - &hostname, &userid, &password, &virtual_host, &port, - &channel_max, &frame_max, &heartbeat)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist, + &hostname, &userid, &password, &virtual_host, &port, + &channel_max, &frame_max, &heartbeat, &client_properties)) { return -1; } @@ -985,8 +988,10 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, self->weakreflist = NULL; self->callbacks = PyDict_New(); if (self->callbacks == NULL) return -1; - self->server_properties = NULL; + Py_XINCREF(client_properties); + self->client_properties = client_properties; + self->server_properties = NULL; return 0; } @@ -1016,6 +1021,8 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) int status; amqp_socket_t *socket = NULL; amqp_rpc_reply_t reply; + amqp_pool_t pool; + amqp_table_t properties; if (self->connected) { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected"); @@ -1039,12 +1046,25 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) Py_BEGIN_ALLOW_THREADS; self->sockfd = amqp_socket_get_sockfd(socket); - reply = amqp_login(self->conn, self->virtual_host, self->channel_max, - self->frame_max, self->heartbeat, - AMQP_SASL_METHOD_PLAIN, self->userid, self->password); + + if (self->client_properties != NULL && PyDict_Check(self->client_properties)) { + init_amqp_pool(&pool, self->frame_max); + properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool); + + reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max, + self->frame_max, self->heartbeat, + &properties, + AMQP_SASL_METHOD_PLAIN, self->userid, self->password); + } else { + reply = amqp_login(self->conn, self->virtual_host, self->channel_max, + self->frame_max, self->heartbeat, + AMQP_SASL_METHOD_PLAIN, self->userid, self->password); + } + Py_END_ALLOW_THREADS; + if (PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't log in")) - goto bail; + goto bail; /* after tune */ self->connected = 1; diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h index a8b3fa9..98de26c 100644 --- a/Modules/_librabbitmq/connection.h +++ b/Modules/_librabbitmq/connection.h @@ -140,6 +140,7 @@ typedef struct { int sockfd; int connected; + PyObject *client_properties; PyObject *server_properties; PyObject *callbacks; /* {channel_id: {consumer_tag:callback}} */ diff --git a/librabbitmq/__init__.py b/librabbitmq/__init__.py index 169e63f..c3c831c 100644 --- a/librabbitmq/__init__.py +++ b/librabbitmq/__init__.py @@ -185,13 +185,15 @@ class Connection(_librabbitmq.Connection): def __init__(self, host='localhost', userid='guest', password='guest', virtual_host='/', port=5672, channel_max=0xffff, - frame_max=131072, heartbeat=0, lazy=False, **kwargs): + frame_max=131072, heartbeat=0, lazy=False, + client_properties=None, **kwargs): if ':' in host: host, port = host.split(':') super(Connection, self).__init__( hostname=host, port=int(port), userid=userid, password=password, virtual_host=virtual_host, channel_max=channel_max, frame_max=frame_max, heartbeat=heartbeat, + client_properties=client_properties, ) self.channels = {} self._avail_channel_ids = array('H', xrange(self.channel_max, 0, -1)) |