summaryrefslogtreecommitdiff
path: root/kombu/mixins.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-09-12 10:47:22 +0100
committerAsk Solem <ask@celeryproject.org>2011-09-12 10:47:22 +0100
commitac1af44fbaff0c743f3259e9fb3c8ca2047a5109 (patch)
tree84eb7c2ec51656f6108152f2ca5d79d94112836b /kombu/mixins.py
parent3e82865f7f131c9e89a7cd762f251586ad594a7b (diff)
downloadkombu-ac1af44fbaff0c743f3259e9fb3c8ca2047a5109.tar.gz
Task queue example working
Diffstat (limited to 'kombu/mixins.py')
-rw-r--r--kombu/mixins.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/kombu/mixins.py b/kombu/mixins.py
index 51a0b3e3..b4b2f589 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -20,6 +20,7 @@ from itertools import count
from .messaging import Consumer
from .log import LogMixin
from .utils import cached_property, nested
+from .utils.encoding import safe_repr
from .utils.limits import TokenBucket
__all__ = ["ConsumerMixin"]
@@ -88,9 +89,10 @@ class ConsumerMixin(LogMixin):
self.on_connection_revived()
self.info("Connected to %s", conn.as_uri())
channel = conn.default_channel
- with self._consume_from(*self.get_consumers(
- partial(Consumer, channel), channel)) as consumers:
- yield conn, channel, consumers
+ cls = partial(Consumer, channel,
+ on_decode_error=self.on_decode_error)
+ with self._consume_from(*self.get_consumers(cls, channel)) as c:
+ yield conn, channel, c
@contextmanager
def _consume_from(self, *consumers):
@@ -103,3 +105,10 @@ class ConsumerMixin(LogMixin):
# poses problems for the too often restarts protection
# in Connection.ensure_connection
return TokenBucket(1)
+
+ def on_decode_error(self, message, exc):
+ self.critical(
+ "Can't decode message body: %r (type:%r encoding:%r raw:%r')",
+ exc, message.content_type, message.content_encoding,
+ safe_repr(message.body))
+ message.ack()