diff options
author | Ask Solem <ask@celeryproject.org> | 2011-09-12 10:47:22 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2011-09-12 10:47:22 +0100 |
commit | ac1af44fbaff0c743f3259e9fb3c8ca2047a5109 (patch) | |
tree | 84eb7c2ec51656f6108152f2ca5d79d94112836b /kombu/mixins.py | |
parent | 3e82865f7f131c9e89a7cd762f251586ad594a7b (diff) | |
download | kombu-ac1af44fbaff0c743f3259e9fb3c8ca2047a5109.tar.gz |
Task queue example working
Diffstat (limited to 'kombu/mixins.py')
-rw-r--r-- | kombu/mixins.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/kombu/mixins.py b/kombu/mixins.py index 51a0b3e3..b4b2f589 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -20,6 +20,7 @@ from itertools import count from .messaging import Consumer from .log import LogMixin from .utils import cached_property, nested +from .utils.encoding import safe_repr from .utils.limits import TokenBucket __all__ = ["ConsumerMixin"] @@ -88,9 +89,10 @@ class ConsumerMixin(LogMixin): self.on_connection_revived() self.info("Connected to %s", conn.as_uri()) channel = conn.default_channel - with self._consume_from(*self.get_consumers( - partial(Consumer, channel), channel)) as consumers: - yield conn, channel, consumers + cls = partial(Consumer, channel, + on_decode_error=self.on_decode_error) + with self._consume_from(*self.get_consumers(cls, channel)) as c: + yield conn, channel, c @contextmanager def _consume_from(self, *consumers): @@ -103,3 +105,10 @@ class ConsumerMixin(LogMixin): # poses problems for the too often restarts protection # in Connection.ensure_connection return TokenBucket(1) + + def on_decode_error(self, message, exc): + self.critical( + "Can't decode message body: %r (type:%r encoding:%r raw:%r')", + exc, message.content_type, message.content_encoding, + safe_repr(message.body)) + message.ack() |