diff options
author | Dmitry Malinovsky <dmalinovsky@thumbtack.net> | 2014-04-04 17:57:27 +0700 |
---|---|---|
committer | Dmitry Malinovsky <dmalinovsky@thumbtack.net> | 2014-04-04 17:57:27 +0700 |
commit | e85702c95b755e704b7c48f9e4bbddcdd8618a04 (patch) | |
tree | b1549bd202a3682a161eaf2739f917838f33a34c | |
parent | 627ee117f0223c46334c5d30eb2f8f34ece4a6ee (diff) | |
download | kombu-e85702c95b755e704b7c48f9e4bbddcdd8618a04.tar.gz |
Added serialization support
-rw-r--r-- | kombu/pidbox.py | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/kombu/pidbox.py b/kombu/pidbox.py index cbd68317..5c70a382 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -135,7 +135,8 @@ class Node(object): def reply(self, data, exchange, routing_key, ticket, **kwargs): self.mailbox._publish_reply(data, exchange, routing_key, ticket, - channel=self.channel) + channel=self.channel, + serializer=self.mailbox.serializer) class Mailbox(object): @@ -161,8 +162,12 @@ class Mailbox(object): #: Only accepts json messages by default. accept = ['json'] + #: Message serializer + serializer = None + def __init__(self, namespace, - type='direct', connection=None, clock=None, accept=None): + type='direct', connection=None, clock=None, + accept=None, serializer=None): self.namespace = namespace self.connection = connection self.type = type @@ -172,6 +177,7 @@ class Mailbox(object): self._tls = local() self.unclaimed = defaultdict(deque) self.accept = self.accept if accept is None else accept + self.serializer = self.serializer if serializer is None else serializer def __call__(self, connection): bound = copy(self) @@ -242,7 +248,8 @@ class Mailbox(object): pass # queue probably deleted and no one is expecting a reply. def _publish(self, type, arguments, destination=None, - reply_ticket=None, channel=None, timeout=None): + reply_ticket=None, channel=None, timeout=None, + serializer=None): message = {'method': type, 'arguments': arguments, 'destination': destination} @@ -253,16 +260,18 @@ class Mailbox(object): message.update(ticket=reply_ticket, reply_to={'exchange': self.reply_exchange.name, 'routing_key': self.oid}) + serializer = serializer or self.serializer producer = Producer(chan, auto_declare=False) producer.publish( message, exchange=exchange.name, declare=[exchange], headers={'clock': self.clock.forward(), 'expires': time() + timeout if timeout else 0}, + serializer=serializer, ) def _broadcast(self, command, arguments=None, destination=None, reply=False, timeout=1, limit=None, - callback=None, channel=None): + callback=None, channel=None, serializer=None): if destination is not None and \ not isinstance(destination, (list, tuple)): raise ValueError( @@ -277,10 +286,12 @@ class Mailbox(object): if limit is None and destination: limit = destination and len(destination) or None + serializer = serializer or self.serializer self._publish(command, arguments, destination=destination, reply_ticket=reply_ticket, channel=chan, - timeout=timeout) + timeout=timeout, + serializer=serializer) if reply_ticket: return self._collect(reply_ticket, limit=limit, |