1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
"""
kombu.transport.librabbitmq
===========================
pylibrabbitmq transport.
:copyright: (c) 2010 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
import socket
import pylibrabbitmq as amqp
from pylibrabbitmq import ChannelError, ConnectionError
from kombu.transport import base
DEFAULT_PORT = 5672
class Message(base.Message):
"""A message received by the broker.
.. attribute:: body
The message body.
.. attribute:: delivery_tag
The message delivery tag, uniquely identifying this message.
.. attribute:: channel
The channel instance the message was received on.
"""
def __init__(self, channel, message, **kwargs):
props = message.properties
info = message.delivery_info
super(Message, self).__init__(channel,
body=message.body,
delivery_info=info,
properties=props,
delivery_tag=info["delivery_tag"],
content_type=props["content_type"],
content_encoding=props["content_encoding"],
headers=props.get("application_headers"),
**kwargs)
class Channel(amqp.Channel, base.StdChannel):
Message = Message
def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Encapsulate data into a AMQP message."""
properties = dict({"content_type": content_type,
"content_encoding": content_encoding,
"application_headers": headers,
"priority": priority}, **properties or {})
return amqp.Message(body, properties=properties)
def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value."""
return self.Message(self, raw_message)
class Connection(amqp.Connection):
Channel = Channel
class Transport(base.Transport):
Connection = Connection
default_port = DEFAULT_PORT
connection_errors = (ConnectionError,
socket.error,
IOError,
OSError)
channel_errors = (ChannelError, )
def __init__(self, client, **kwargs):
self.client = client
self.default_port = kwargs.get("default_port") or self.default_port
def create_channel(self, connection):
return connection.channel()
def drain_events(self, connection, **kwargs):
return connection.drain_events(**kwargs)
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
for name, default_value in self.default_connection_params.items():
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
conn = self.Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,
virtual_host=conninfo.virtual_host,
login_method=conninfo.login_method,
insist=conninfo.insist,
ssl=conninfo.ssl,
connect_timeout=conninfo.connect_timeout)
conn.client = self.client
return conn
def close_connection(self, connection):
"""Close the AMQP broker connection."""
connection.close()
@property
def default_connection_params(self):
return {"userid": "guest", "password": "guest",
"port": self.default_port,
"hostname": "localhost", "login_method": "AMQPLAIN"}
|