summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitry Malinovsky <dmalinovsky@thumbtack.net>2014-04-04 17:57:27 +0700
committerDmitry Malinovsky <dmalinovsky@thumbtack.net>2014-04-04 17:57:27 +0700
commite85702c95b755e704b7c48f9e4bbddcdd8618a04 (patch)
treeb1549bd202a3682a161eaf2739f917838f33a34c
parent627ee117f0223c46334c5d30eb2f8f34ece4a6ee (diff)
downloadkombu-e85702c95b755e704b7c48f9e4bbddcdd8618a04.tar.gz
Added serialization support
-rw-r--r--kombu/pidbox.py21
1 files changed, 16 insertions, 5 deletions
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index cbd68317..5c70a382 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -135,7 +135,8 @@ class Node(object):
def reply(self, data, exchange, routing_key, ticket, **kwargs):
self.mailbox._publish_reply(data, exchange, routing_key, ticket,
- channel=self.channel)
+ channel=self.channel,
+ serializer=self.mailbox.serializer)
class Mailbox(object):
@@ -161,8 +162,12 @@ class Mailbox(object):
#: Only accepts json messages by default.
accept = ['json']
+ #: Message serializer
+ serializer = None
+
def __init__(self, namespace,
- type='direct', connection=None, clock=None, accept=None):
+ type='direct', connection=None, clock=None,
+ accept=None, serializer=None):
self.namespace = namespace
self.connection = connection
self.type = type
@@ -172,6 +177,7 @@ class Mailbox(object):
self._tls = local()
self.unclaimed = defaultdict(deque)
self.accept = self.accept if accept is None else accept
+ self.serializer = self.serializer if serializer is None else serializer
def __call__(self, connection):
bound = copy(self)
@@ -242,7 +248,8 @@ class Mailbox(object):
pass # queue probably deleted and no one is expecting a reply.
def _publish(self, type, arguments, destination=None,
- reply_ticket=None, channel=None, timeout=None):
+ reply_ticket=None, channel=None, timeout=None,
+ serializer=None):
message = {'method': type,
'arguments': arguments,
'destination': destination}
@@ -253,16 +260,18 @@ class Mailbox(object):
message.update(ticket=reply_ticket,
reply_to={'exchange': self.reply_exchange.name,
'routing_key': self.oid})
+ serializer = serializer or self.serializer
producer = Producer(chan, auto_declare=False)
producer.publish(
message, exchange=exchange.name, declare=[exchange],
headers={'clock': self.clock.forward(),
'expires': time() + timeout if timeout else 0},
+ serializer=serializer,
)
def _broadcast(self, command, arguments=None, destination=None,
reply=False, timeout=1, limit=None,
- callback=None, channel=None):
+ callback=None, channel=None, serializer=None):
if destination is not None and \
not isinstance(destination, (list, tuple)):
raise ValueError(
@@ -277,10 +286,12 @@ class Mailbox(object):
if limit is None and destination:
limit = destination and len(destination) or None
+ serializer = serializer or self.serializer
self._publish(command, arguments, destination=destination,
reply_ticket=reply_ticket,
channel=chan,
- timeout=timeout)
+ timeout=timeout,
+ serializer=serializer)
if reply_ticket:
return self._collect(reply_ticket, limit=limit,