1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
"""
kombu.transport.librabbitmq
===========================
`librabbitmq`_ transport.
.. _`librabbitmq`: http://pypi.python.org/librabbitmq/
:copyright: (c) 2010 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import socket
try:
import librabbitmq as amqp
from librabbitmq import ChannelError, ConnectionError
except ImportError:
try:
import pylibrabbitmq as amqp # noqa
from pylibrabbitmq import ChannelError, ConnectionError # noqa
except ImportError:
raise ImportError("No module named librabbitmq")
from kombu.exceptions import StdChannelError
from kombu.utils.amq_manager import get_manager
from . import base
DEFAULT_PORT = 5672
class Message(base.Message):
def __init__(self, channel, props, info, body):
super(Message, self).__init__(channel,
body=body,
delivery_info=info,
properties=props,
delivery_tag=info['delivery_tag'],
content_type=props['content_type'],
content_encoding=props['content_encoding'],
headers=props.get('headers'))
class Channel(amqp.Channel, base.StdChannel):
Message = Message
def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Encapsulate data into a AMQP message."""
properties = properties if properties is not None else {}
properties.update({'content_type': content_type,
'content_encoding': content_encoding,
'headers': headers,
'priority': priority})
return body, properties
class Connection(amqp.Connection):
Channel = Channel
Message = Message
class Transport(base.Transport):
Connection = Connection
default_port = DEFAULT_PORT
connection_errors = (ConnectionError,
socket.error,
IOError,
OSError)
channel_errors = (StdChannelError, ChannelError, )
driver_type = 'amqp'
driver_name = 'librabbitmq'
nb_keep_draining = True
def __init__(self, client, **kwargs):
self.client = client
self.default_port = kwargs.get('default_port') or self.default_port
def driver_version(self):
return amqp.__version__
def create_channel(self, connection):
return connection.channel()
def drain_events(self, connection, **kwargs):
return connection.drain_events(**kwargs)
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
for name, default_value in self.default_connection_params.items():
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
conn = self.Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,
virtual_host=conninfo.virtual_host,
login_method=conninfo.login_method,
insist=conninfo.insist,
ssl=conninfo.ssl,
connect_timeout=conninfo.connect_timeout)
conn.client = self.client
self.client.drain_events = conn.drain_events
return conn
def close_connection(self, connection):
"""Close the AMQP broker connection."""
connection.close()
def on_poll_init(self, poller):
pass
def on_poll_start(self):
return {}
def eventmap(self, connection):
return {connection.fileno(): self.client.drain_nowait}
def get_manager(self, *args, **kwargs):
return get_manager(self.client, *args, **kwargs)
@property
def default_connection_params(self):
return {'userid': 'guest', 'password': 'guest',
'port': self.default_port,
'hostname': 'localhost', 'login_method': 'AMQPLAIN'}
|