diff options
-rw-r--r-- | kombu/transport/SLMQ.py | 4 | ||||
-rw-r--r-- | kombu/transport/SQS.py | 4 | ||||
-rw-r--r-- | kombu/transport/beanstalk.py | 3 | ||||
-rw-r--r-- | kombu/transport/couchdb.py | 3 | ||||
-rw-r--r-- | kombu/transport/django/__init__.py | 3 | ||||
-rw-r--r-- | kombu/transport/mongodb.py | 5 | ||||
-rw-r--r-- | kombu/transport/redis.py | 6 | ||||
-rw-r--r-- | kombu/transport/sqlalchemy/__init__.py | 3 | ||||
-rw-r--r-- | kombu/transport/zookeeper.py | 3 |
9 files changed, 20 insertions, 14 deletions
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 0a79deec..d6589f66 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -16,7 +16,7 @@ import os from kombu.five import Empty, text_t from kombu.utils import cached_property # , uuid -from kombu.utils.encoding import safe_str +from kombu.utils.encoding import bytes_to_str, safe_str from . import virtual @@ -96,7 +96,7 @@ class Channel(virtual.Channel): rs = q.pop(1) if rs['items']: m = rs['items'][0] - payload = loads(m['body']) + payload = loads(bytes_to_str(m['body'])) if queue in self._noack_queues: q.message(m['id']).delete() else: diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 7b4d360b..4ddfcc09 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -23,7 +23,7 @@ from boto.sqs.message import Message from kombu.five import Empty, range, text_t from kombu.utils import cached_property, uuid -from kombu.utils.encoding import safe_str +from kombu.utils.encoding import bytes_to_str, safe_str from . import virtual @@ -250,7 +250,7 @@ class Channel(virtual.Channel): rs = q.get_messages(1) if rs: m = rs[0] - payload = loads(rs[0].get_body()) + payload = loads(bytes_to_str(rs[0].get_body())) if queue in self._noack_queues: q.delete_message(m) else: diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index f583899e..9dff8b49 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -16,6 +16,7 @@ import socket from anyjson import loads, dumps from kombu.five import Empty +from kombu.utils.encoding import bytes_to_str from . import virtual @@ -31,7 +32,7 @@ class Channel(virtual.Channel): item, dest = None, None if job: try: - item = loads(job.body) + item = loads(bytes_to_str(job.body)) dest = job.stats()['tube'] except Exception: job.bury() diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index 42864cfd..009dbbdc 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -17,6 +17,7 @@ from anyjson import loads, dumps from kombu.five import Empty from kombu.utils import uuid4 +from kombu.utils.encoding import bytes_to_str from . import virtual @@ -56,7 +57,7 @@ class Channel(virtual.Channel): item = result.rows[0].value self.client.delete(item) - return loads(item['payload']) + return loads(bytes_to_str(item['payload'])) def _purge(self, queue): result = self._query(queue) diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py index 7c4ec6a4..67bfa576 100644 --- a/kombu/transport/django/__init__.py +++ b/kombu/transport/django/__init__.py @@ -8,6 +8,7 @@ from django.core import exceptions as errors from kombu.five import Empty from kombu.transport import virtual +from kombu.utils.encoding import bytes_to_str from .models import Queue @@ -37,7 +38,7 @@ class Channel(virtual.Channel): #self.refresh_connection() m = Queue.objects.fetch(queue) if m: - return loads(m) + return loads(bytes_to_str(m)) raise Empty() def _size(self, queue): diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 01005042..b8c078f3 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -18,6 +18,7 @@ from pymongo import MongoClient from kombu.five import Empty from kombu.syn import _detect_environment +from kombu.utils.encoding import bytes_to_str from . import virtual @@ -50,7 +51,7 @@ class Channel(virtual.Channel): if queue in self._fanout_queues: msg = next(self._queue_cursors[queue]) self._queue_readcounts[queue] += 1 - return loads(msg['payload']) + return loads(bytes_to_str(msg['payload'])) else: msg = self.client.command( 'findandmodify', 'messages', @@ -67,7 +68,7 @@ class Channel(virtual.Channel): # as of mongo 2.0 empty results won't raise an error if msg['value'] is None: raise Empty() - return loads(msg['value']['payload']) + return loads(bytes_to_str(msg['value']['payload'])) def _size(self, queue): if queue in self._fanout_queues: diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index df39cb46..2283f93b 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -162,7 +162,7 @@ class QoS(virtual.QoS): p, _, _ = self._remove_from_indices( tag, client.pipeline().hget(self.unacked_key, tag)).execute() if p: - M, EX, RK = loads(p) + M, EX, RK = loads(bytes_to_str(p)) # json is unicode self.channel._do_restore_message(M, EX, RK, client, leftmost) @cached_property @@ -419,7 +419,7 @@ class Channel(virtual.Channel): .hdel(self.unacked_key, tag) \ .execute() if P: - M, EX, RK = loads(P) + M, EX, RK = loads(bytes_to_str(P)) # json is unicode self._do_restore_message(M, EX, RK, client, leftmost) def _restore_at_beginning(self, message): @@ -533,7 +533,7 @@ class Channel(virtual.Channel): for pri in PRIORITY_STEPS: item = client.rpop(self._q_for_pri(queue, pri)) if item: - return loads(item) + return loads(bytes_to_str(item)) raise Empty() def _size(self, queue): diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index 90f6ac57..c085b469 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -12,6 +12,7 @@ from sqlalchemy.orm import sessionmaker from kombu.five import Empty from kombu.transport import virtual from kombu.utils import cached_property +from kombu.utils.encoding import bytes_to_str from .models import (ModelBase, Queue as QueueBase, Message as MessageBase, class_registry, metadata) @@ -102,7 +103,7 @@ class Channel(virtual.Channel): .first() if msg: msg.visible = False - return loads(msg.payload) + return loads(bytes_to_str(msg.payload)) raise Empty() finally: self.session.commit() diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index 0d524e25..2d1c8abc 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -33,6 +33,7 @@ import socket from anyjson import loads, dumps from kombu.five import Empty +from kombu.utils.encoding import bytes_to_str from . import virtual @@ -117,7 +118,7 @@ class Channel(virtual.Channel): if msg is None: raise Empty() - return loads(msg) + return loads(bytes_to_str(msg)) def _purge(self, queue): count = 0 |