summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-11-15 12:53:55 +0000
committerAsk Solem <ask@celeryproject.org>2013-11-15 12:53:55 +0000
commit5003fa22e3aa8af3ba896dc4b703dc60763b07f1 (patch)
tree4df9e600de578a80d3e02e3ae92c4f8aff8e4afb /kombu
parente8c1c4e3843bb14b1027bb4832520aaf8476f6c3 (diff)
downloadkombu-5003fa22e3aa8af3ba896dc4b703dc60763b07f1.tar.gz
[Py3] anyjson.loads requires string argument. Closes #270
Diffstat (limited to 'kombu')
-rw-r--r--kombu/transport/SLMQ.py4
-rw-r--r--kombu/transport/SQS.py4
-rw-r--r--kombu/transport/beanstalk.py3
-rw-r--r--kombu/transport/couchdb.py3
-rw-r--r--kombu/transport/django/__init__.py3
-rw-r--r--kombu/transport/mongodb.py5
-rw-r--r--kombu/transport/redis.py6
-rw-r--r--kombu/transport/sqlalchemy/__init__.py3
-rw-r--r--kombu/transport/zookeeper.py3
9 files changed, 20 insertions, 14 deletions
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py
index 0a79deec..d6589f66 100644
--- a/kombu/transport/SLMQ.py
+++ b/kombu/transport/SLMQ.py
@@ -16,7 +16,7 @@ import os
from kombu.five import Empty, text_t
from kombu.utils import cached_property # , uuid
-from kombu.utils.encoding import safe_str
+from kombu.utils.encoding import bytes_to_str, safe_str
from . import virtual
@@ -96,7 +96,7 @@ class Channel(virtual.Channel):
rs = q.pop(1)
if rs['items']:
m = rs['items'][0]
- payload = loads(m['body'])
+ payload = loads(bytes_to_str(m['body']))
if queue in self._noack_queues:
q.message(m['id']).delete()
else:
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index 7b4d360b..4ddfcc09 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -23,7 +23,7 @@ from boto.sqs.message import Message
from kombu.five import Empty, range, text_t
from kombu.utils import cached_property, uuid
-from kombu.utils.encoding import safe_str
+from kombu.utils.encoding import bytes_to_str, safe_str
from . import virtual
@@ -250,7 +250,7 @@ class Channel(virtual.Channel):
rs = q.get_messages(1)
if rs:
m = rs[0]
- payload = loads(rs[0].get_body())
+ payload = loads(bytes_to_str(rs[0].get_body()))
if queue in self._noack_queues:
q.delete_message(m)
else:
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index f583899e..9dff8b49 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -16,6 +16,7 @@ import socket
from anyjson import loads, dumps
from kombu.five import Empty
+from kombu.utils.encoding import bytes_to_str
from . import virtual
@@ -31,7 +32,7 @@ class Channel(virtual.Channel):
item, dest = None, None
if job:
try:
- item = loads(job.body)
+ item = loads(bytes_to_str(job.body))
dest = job.stats()['tube']
except Exception:
job.bury()
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index 42864cfd..009dbbdc 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -17,6 +17,7 @@ from anyjson import loads, dumps
from kombu.five import Empty
from kombu.utils import uuid4
+from kombu.utils.encoding import bytes_to_str
from . import virtual
@@ -56,7 +57,7 @@ class Channel(virtual.Channel):
item = result.rows[0].value
self.client.delete(item)
- return loads(item['payload'])
+ return loads(bytes_to_str(item['payload']))
def _purge(self, queue):
result = self._query(queue)
diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py
index 7c4ec6a4..67bfa576 100644
--- a/kombu/transport/django/__init__.py
+++ b/kombu/transport/django/__init__.py
@@ -8,6 +8,7 @@ from django.core import exceptions as errors
from kombu.five import Empty
from kombu.transport import virtual
+from kombu.utils.encoding import bytes_to_str
from .models import Queue
@@ -37,7 +38,7 @@ class Channel(virtual.Channel):
#self.refresh_connection()
m = Queue.objects.fetch(queue)
if m:
- return loads(m)
+ return loads(bytes_to_str(m))
raise Empty()
def _size(self, queue):
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 01005042..b8c078f3 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -18,6 +18,7 @@ from pymongo import MongoClient
from kombu.five import Empty
from kombu.syn import _detect_environment
+from kombu.utils.encoding import bytes_to_str
from . import virtual
@@ -50,7 +51,7 @@ class Channel(virtual.Channel):
if queue in self._fanout_queues:
msg = next(self._queue_cursors[queue])
self._queue_readcounts[queue] += 1
- return loads(msg['payload'])
+ return loads(bytes_to_str(msg['payload']))
else:
msg = self.client.command(
'findandmodify', 'messages',
@@ -67,7 +68,7 @@ class Channel(virtual.Channel):
# as of mongo 2.0 empty results won't raise an error
if msg['value'] is None:
raise Empty()
- return loads(msg['value']['payload'])
+ return loads(bytes_to_str(msg['value']['payload']))
def _size(self, queue):
if queue in self._fanout_queues:
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index df39cb46..2283f93b 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -162,7 +162,7 @@ class QoS(virtual.QoS):
p, _, _ = self._remove_from_indices(
tag, client.pipeline().hget(self.unacked_key, tag)).execute()
if p:
- M, EX, RK = loads(p)
+ M, EX, RK = loads(bytes_to_str(p)) # json is unicode
self.channel._do_restore_message(M, EX, RK, client, leftmost)
@cached_property
@@ -419,7 +419,7 @@ class Channel(virtual.Channel):
.hdel(self.unacked_key, tag) \
.execute()
if P:
- M, EX, RK = loads(P)
+ M, EX, RK = loads(bytes_to_str(P)) # json is unicode
self._do_restore_message(M, EX, RK, client, leftmost)
def _restore_at_beginning(self, message):
@@ -533,7 +533,7 @@ class Channel(virtual.Channel):
for pri in PRIORITY_STEPS:
item = client.rpop(self._q_for_pri(queue, pri))
if item:
- return loads(item)
+ return loads(bytes_to_str(item))
raise Empty()
def _size(self, queue):
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index 90f6ac57..c085b469 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -12,6 +12,7 @@ from sqlalchemy.orm import sessionmaker
from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils import cached_property
+from kombu.utils.encoding import bytes_to_str
from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
class_registry, metadata)
@@ -102,7 +103,7 @@ class Channel(virtual.Channel):
.first()
if msg:
msg.visible = False
- return loads(msg.payload)
+ return loads(bytes_to_str(msg.payload))
raise Empty()
finally:
self.session.commit()
diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py
index 0d524e25..2d1c8abc 100644
--- a/kombu/transport/zookeeper.py
+++ b/kombu/transport/zookeeper.py
@@ -33,6 +33,7 @@ import socket
from anyjson import loads, dumps
from kombu.five import Empty
+from kombu.utils.encoding import bytes_to_str
from . import virtual
@@ -117,7 +118,7 @@ class Channel(virtual.Channel):
if msg is None:
raise Empty()
- return loads(msg)
+ return loads(bytes_to_str(msg))
def _purge(self, queue):
count = 0