summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Hu <roger.hu@gmail.com>2014-06-16 08:01:15 -0700
committerAsk Solem <ask@celeryproject.org>2015-10-30 12:17:28 -0700
commitf5634808758139265656d2a840f38060ee952336 (patch)
treefbae2aa3ce82f61b4b4f4493d81ebeeb266197ae
parent948e2bfea9d496299e0e1cb1be01cb4358263c38 (diff)
downloadlibrabbitmq-f5634808758139265656d2a840f38060ee952336.tar.gz
Allow AMQP client properties to be exposed when connecting to RMQ broker.
We can leverage RMQ extensions similar to https://github.com/ruby-amqp/bunny/tree/master/examples/connection and https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/session.rb#L56-65. Add ability to set boolean parameters too. Switching back to simply args. Uses __init__() to pass in client_properties instead of the connect() parameter.
-rw-r--r--Modules/_librabbitmq/connection.c36
-rw-r--r--Modules/_librabbitmq/connection.h1
-rw-r--r--librabbitmq/__init__.py4
3 files changed, 32 insertions, 9 deletions
diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c
index 2ca4933..8b26a10 100644
--- a/Modules/_librabbitmq/connection.c
+++ b/Modules/_librabbitmq/connection.c
@@ -936,6 +936,7 @@ PyRabbitMQ_ConnectionType_dealloc(PyRabbitMQ_Connection *self)
if (self->weakreflist != NULL)
PyObject_ClearWeakRefs((PyObject*)self);
Py_XDECREF(self->callbacks);
+ Py_XDECREF(self->client_properties);
Py_XDECREF(self->server_properties);
self->ob_type->tp_free(self);
}
@@ -957,6 +958,7 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
"channel_max",
"frame_max",
"heartbeat",
+ "client_properties",
NULL
};
char *hostname = "localhost";
@@ -967,10 +969,11 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
int frame_max = 131072;
int heartbeat = 0;
int port = 5672;
+ PyObject *client_properties = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiii", kwlist,
- &hostname, &userid, &password, &virtual_host, &port,
- &channel_max, &frame_max, &heartbeat)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist,
+ &hostname, &userid, &password, &virtual_host, &port,
+ &channel_max, &frame_max, &heartbeat, &client_properties)) {
return -1;
}
@@ -985,8 +988,10 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
self->weakreflist = NULL;
self->callbacks = PyDict_New();
if (self->callbacks == NULL) return -1;
- self->server_properties = NULL;
+ Py_XINCREF(client_properties);
+ self->client_properties = client_properties;
+ self->server_properties = NULL;
return 0;
}
@@ -1016,6 +1021,8 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
int status;
amqp_socket_t *socket = NULL;
amqp_rpc_reply_t reply;
+ amqp_pool_t pool;
+ amqp_table_t properties;
if (self->connected) {
PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected");
@@ -1039,12 +1046,25 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
Py_BEGIN_ALLOW_THREADS;
self->sockfd = amqp_socket_get_sockfd(socket);
- reply = amqp_login(self->conn, self->virtual_host, self->channel_max,
- self->frame_max, self->heartbeat,
- AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
+
+ if (self->client_properties != NULL && PyDict_Check(self->client_properties)) {
+ init_amqp_pool(&pool, self->frame_max);
+ properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool);
+
+ reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max,
+ self->frame_max, self->heartbeat,
+ &properties,
+ AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
+ } else {
+ reply = amqp_login(self->conn, self->virtual_host, self->channel_max,
+ self->frame_max, self->heartbeat,
+ AMQP_SASL_METHOD_PLAIN, self->userid, self->password);
+ }
+
Py_END_ALLOW_THREADS;
+
if (PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't log in"))
- goto bail;
+ goto bail;
/* after tune */
self->connected = 1;
diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h
index a8b3fa9..98de26c 100644
--- a/Modules/_librabbitmq/connection.h
+++ b/Modules/_librabbitmq/connection.h
@@ -140,6 +140,7 @@ typedef struct {
int sockfd;
int connected;
+ PyObject *client_properties;
PyObject *server_properties;
PyObject *callbacks; /* {channel_id: {consumer_tag:callback}} */
diff --git a/librabbitmq/__init__.py b/librabbitmq/__init__.py
index 169e63f..c3c831c 100644
--- a/librabbitmq/__init__.py
+++ b/librabbitmq/__init__.py
@@ -185,13 +185,15 @@ class Connection(_librabbitmq.Connection):
def __init__(self, host='localhost', userid='guest', password='guest',
virtual_host='/', port=5672, channel_max=0xffff,
- frame_max=131072, heartbeat=0, lazy=False, **kwargs):
+ frame_max=131072, heartbeat=0, lazy=False,
+ client_properties=None, **kwargs):
if ':' in host:
host, port = host.split(':')
super(Connection, self).__init__(
hostname=host, port=int(port), userid=userid, password=password,
virtual_host=virtual_host, channel_max=channel_max,
frame_max=frame_max, heartbeat=heartbeat,
+ client_properties=client_properties,
)
self.channels = {}
self._avail_channel_ids = array('H', xrange(self.channel_max, 0, -1))