diff options
author | Ask Solem <ask@celeryproject.org> | 2012-07-24 14:33:59 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-07-24 14:33:59 +0100 |
commit | f78af4fd2cb22b2433f8147c0673476d28cdccc6 (patch) | |
tree | 5476fb97e58dba4287b0cb8dd12b19c20ec3a839 /kombu/transport/pyamqp.py | |
parent | 8041a3e06e192f72daeba59f2b1adf76d3c33e94 (diff) | |
download | kombu-f78af4fd2cb22b2433f8147c0673476d28cdccc6.tar.gz |
Adds py-amqp transport (pyamqp)
Diffstat (limited to 'kombu/transport/pyamqp.py')
-rw-r--r-- | kombu/transport/pyamqp.py | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py new file mode 100644 index 00000000..4875adb7 --- /dev/null +++ b/kombu/transport/pyamqp.py @@ -0,0 +1,133 @@ +""" +kombu.transport.pyamqp +====================== + +pure python amqp transport. + +:copyright: (c) 2009 - 2012 by Ask Solem. +:license: BSD, see LICENSE for more details. + +""" +from __future__ import absolute_import + +import amqp +import socket + +from kombu.exceptions import StdChannelError +from kombu.utils.encoding import str_to_bytes +from kombu.utils.amq_manager import get_manager + +from . import base + +DEFAULT_PORT = 5672 + + +class Message(base.Message): + + def __init__(self, channel, msg, **kwargs): + props = msg.properties + super(Message, self).__init__(channel, + body=msg.body, + delivery_tag=msg.delivery_tag, + content_type=props.get('content_type'), + content_encoding=props.get('content_encoding'), + delivery_info=msg.delivery_info, + properties=msg.properties, + headers=props.get('application_headers') or {}, + **kwargs) + + +class Channel(amqp.Channel, base.StdChannel): + Message = Message + + def prepare_message(self, message_data, priority=None, + content_type=None, content_encoding=None, headers=None, + properties=None): + """Encapsulate data into a AMQP message.""" + return amqp.Message(message_data, priority=priority, + content_type=content_type, + content_encoding=content_encoding, + application_headers=headers, + **properties) + + def message_to_python(self, raw_message): + """Convert encoded message body back to a Python value.""" + return self.Message(self, raw_message) + + +class Connection(amqp.Connection): + Channel = Channel + + +class Transport(base.Transport): + Connection = Connection + + default_port = DEFAULT_PORT + + # it's very annoying that pyamqp sometimes raises AttributeError + # if the connection is lost, but nothing we can do about that here. + connection_errors = amqp.Connection.connection_errors + channel_errors = (StdChannelError, ) + amqp.Connection.channel_errors + + nb_keep_draining = True + driver_name = "py-amqp" + driver_type = "amqp" + + def __init__(self, client, **kwargs): + self.client = client + self.default_port = kwargs.get("default_port") or self.default_port + + def create_channel(self, connection): + return connection.channel() + + def drain_events(self, connection, **kwargs): + return connection.drain_events(**kwargs) + + def establish_connection(self): + """Establish connection to the AMQP broker.""" + conninfo = self.client + for name, default_value in self.default_connection_params.items(): + if not getattr(conninfo, name, None): + setattr(conninfo, name, default_value) + if conninfo.hostname == 'localhost': + conninfo.hostname = '127.0.0.1' + conn = self.Connection(host=conninfo.host, + userid=conninfo.userid, + password=conninfo.password, + login_method=conninfo.login_method, + virtual_host=conninfo.virtual_host, + insist=conninfo.insist, + ssl=conninfo.ssl, + connect_timeout=conninfo.connect_timeout, + heartbeat=conninfo.heartbeat) + conn.client = self.client + return conn + + def close_connection(self, connection): + """Close the AMQP broker connection.""" + connection.client = None + connection.close() + + def is_alive(self, connection): + return connection.is_alive() + + def verify_connection(self, connection): + return connection.channels is not None and self.is_alive(connection) + + def eventmap(self, connection): + return {connection.sock: self.client.drain_nowait} + + def on_poll_init(self, poller): + pass + + def on_poll_start(self): + return {} + + @property + def default_connection_params(self): + return {'userid': 'guest', 'password': 'guest', + 'port': self.default_port, + 'hostname': 'localhost', 'login_method': 'AMQPLAIN'} + + def get_manager(self, *args, **kwargs): + return get_manager(self.client, *args, **kwargs) |