summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
Diffstat (limited to 'kombu')
-rw-r--r--kombu/tests/__init__.py18
-rw-r--r--kombu/tests/mocks.py2
-rw-r--r--kombu/tests/test_messaging.py8
-rw-r--r--kombu/tests/test_pools.py1
-rw-r--r--kombu/tests/test_transport_redis.py6
-rw-r--r--kombu/transport/SQS.py6
-rw-r--r--kombu/transport/beanstalk.py6
-rw-r--r--kombu/transport/couchdb.py6
-rw-r--r--kombu/transport/django/__init__.py11
-rw-r--r--kombu/transport/mongodb.py6
-rw-r--r--kombu/transport/redis.py12
-rw-r--r--kombu/transport/sqlalchemy/__init__.py6
-rw-r--r--kombu/utils/log.py27
13 files changed, 49 insertions, 66 deletions
diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py
index 0abda411..efd71436 100644
--- a/kombu/tests/__init__.py
+++ b/kombu/tests/__init__.py
@@ -12,30 +12,40 @@ except ImportError:
anyjson.force_implementation("simplejson")
moduleindex = ("kombu.abstract",
- "kombu.compat",
- "kombu.common",
"kombu.clocks",
+ "kombu.common",
+ "kombu.compat",
"kombu.compression",
"kombu.connection",
"kombu.entity",
"kombu.exceptions",
+ "kombu.log",
"kombu.messaging",
+ "kombu.mixins",
"kombu.pidbox",
"kombu.pools",
"kombu.serialization",
"kombu.simple",
"kombu.utils",
"kombu.utils.compat",
+ "kombu.utils.debug",
+ "kombu.utils.encoding",
+ "kombu.utils.functional",
+ "kombu.utils.limits",
"kombu.transport",
+ "kombu.transport.amqplib",
"kombu.transport.base",
"kombu.transport.beanstalk",
+ "kombu.transport.couchdb",
+ "kombu.transport.django",
+ "kombu.transport.django.managers",
+ "kombu.transport.django.models",
"kombu.transport.memory",
"kombu.transport.mongodb",
- "kombu.transport.amqplib",
- "kombu.transport.couchdb",
"kombu.transport.pika",
"kombu.transport.redis",
"kombu.transport.SQS",
+ "kombu.transport.sqlalchemy",
"kombu.transport.virtual",
"kombu.transport.virtual.exchange",
"kombu.transport.virtual.scheduling")
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index ac94c4b9..ce0a3897 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -96,7 +96,7 @@ class Channel(base.StdChannel):
def message_to_python(self, message, *args, **kwargs):
self._called("message_to_python")
- return Message(self, body=anyjson.serialize(message),
+ return Message(self, body=anyjson.dumps(message),
delivery_tag=self.deliveries(),
throw_decode_error=self.throw_decode_error,
content_type="application/json", content_encoding="utf-8")
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 13ba513f..85ebd65c 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -61,7 +61,7 @@ class test_Producer(TestCase):
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer="json")
m, ctype, cencoding = p._prepare(message, headers={})
- self.assertDictEqual(message, anyjson.deserialize(m))
+ self.assertDictEqual(message, anyjson.loads(m))
self.assertEqual(ctype, "application/json")
self.assertEqual(cencoding, "utf-8")
@@ -76,7 +76,7 @@ class test_Producer(TestCase):
self.assertEqual(cencoding, "utf-8")
self.assertEqual(headers["compression"], "application/x-gzip")
import zlib
- self.assertEqual(anyjson.deserialize(
+ self.assertEqual(anyjson.loads(
zlib.decompress(m).decode("utf-8")), message)
def test_prepare_custom_content_type(self):
@@ -155,7 +155,7 @@ class test_Producer(TestCase):
self.assertIn("basic_publish", channel)
m, exc, rkey = ret
- self.assertDictEqual(message, anyjson.deserialize(m["body"]))
+ self.assertDictEqual(message, anyjson.loads(m["body"]))
self.assertDictContainsSubset({"content_type": "application/json",
"content_encoding": "utf-8",
"priority": 0}, m)
@@ -476,7 +476,7 @@ class test_Consumer(TestCase):
self.assertTrue(thrown)
m, exc = thrown[0]
- self.assertEqual(anyjson.deserialize(m), {"foo": "bar"})
+ self.assertEqual(anyjson.loads(m), {"foo": "bar"})
self.assertIsInstance(exc, ValueError)
def test_recover(self):
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index 8556926f..ccc80d8f 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -178,7 +178,6 @@ class test_PoolGroup(TestCase):
pools.set_limit(pools.get_limit())
-
class test_fun_PoolGroup(TestCase):
def test_connections_behavior(self):
diff --git a/kombu/tests/test_transport_redis.py b/kombu/tests/test_transport_redis.py
index bae6b23a..4291a15b 100644
--- a/kombu/tests/test_transport_redis.py
+++ b/kombu/tests/test_transport_redis.py
@@ -4,7 +4,7 @@ from __future__ import with_statement
import socket
import types
-from anyjson import serialize
+from anyjson import dumps
from itertools import count
from Queue import Empty, Queue as _Queue
@@ -265,7 +265,7 @@ class test_Channel(TestCase):
s = self.channel.subclient = Mock()
self.channel._fanout_to_queue["a"] = "b"
s.parse_response.return_value = ["message", "a",
- serialize({"hello": "world"})]
+ dumps({"hello": "world"})]
payload, queue = self.channel._receive()
self.assertDictEqual(payload, {"hello": "world"})
self.assertEqual(queue, "b")
@@ -325,7 +325,7 @@ class test_Channel(TestCase):
body = {"hello": "world"}
self.channel._put_fanout("exchange", body)
- c.publish.assert_called_with("exchange", serialize(body))
+ c.publish.assert_called_with("exchange", dumps(body))
def test_delete(self):
x = self.channel
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index fd3fc2fb..4ec202ef 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -15,7 +15,7 @@ import string
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from boto import exception
from boto import sdb as _sdb
@@ -214,7 +214,7 @@ class Channel(virtual.Channel):
"""Put message onto queue."""
q = self._new_queue(queue)
m = Message()
- m.set_body(serialize(message))
+ m.set_body(dumps(message))
q.write(m)
def _put_fanout(self, exchange, message, **kwargs):
@@ -228,7 +228,7 @@ class Channel(virtual.Channel):
rs = q.get_messages(1)
if rs:
m = rs[0]
- payload = deserialize(rs[0].get_body())
+ payload = loads(rs[0].get_body())
if queue in self._noack_queues:
q.delete_message(m)
else:
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index e84ab88b..ae15b119 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -14,7 +14,7 @@ import socket
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from beanstalkc import Connection, BeanstalkcException, SocketError
from . import virtual
@@ -31,7 +31,7 @@ class Channel(virtual.Channel):
item, dest = None, None
if job:
try:
- item = deserialize(job.body)
+ item = loads(job.body)
dest = job.stats()["tube"]
except Exception:
job.bury()
@@ -44,7 +44,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
priority = message["properties"]["delivery_info"]["priority"]
self.client.use(queue)
- self.client.put(serialize(message), priority=priority)
+ self.client.put(dumps(message), priority=priority)
def _get(self, queue):
if queue not in self.client.watching():
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index a964f731..014afee4 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -15,7 +15,7 @@ from Queue import Empty
import socket
import couchdb
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from ..utils import uuid4
from . import virtual
@@ -47,7 +47,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
self.client.save({'_id': uuid4().hex,
'queue': queue,
- 'payload': serialize(message)})
+ 'payload': dumps(message)})
def _get(self, queue):
result = self._query(queue, limit=1)
@@ -56,7 +56,7 @@ class Channel(virtual.Channel):
item = result.rows[0].value
self.client.delete(item)
- return deserialize(item["payload"])
+ return loads(item["payload"])
def _purge(self, queue):
result = self._query(queue)
diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py
index b7ced9e8..84e5fcbe 100644
--- a/kombu/transport/django/__init__.py
+++ b/kombu/transport/django/__init__.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from django.conf import settings
from django.core import exceptions as errors
@@ -12,10 +12,11 @@ from .. import virtual
from .models import Queue
-VERSION = (0, 9, 4)
+VERSION = (1, 0, 0)
__version__ = ".".join(map(str, VERSION))
-POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)
+POLLING_INTERVAL = getattr(settings, "KOMBU_POLLING_INTERVAL",
+ getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0))
class Channel(virtual.Channel):
@@ -24,7 +25,7 @@ class Channel(virtual.Channel):
Queue.objects.get_or_create(name=queue)
def _put(self, queue, message, **kwargs):
- Queue.objects.publish(queue, serialize(message))
+ Queue.objects.publish(queue, dumps(message))
def basic_consume(self, queue, *args, **kwargs):
qinfo = self.state.bindings[queue]
@@ -37,7 +38,7 @@ class Channel(virtual.Channel):
#self.refresh_connection()
m = Queue.objects.fetch(queue)
if m:
- return deserialize(m)
+ return loads(m)
raise Empty()
def _size(self, queue):
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 55b30c29..bb649c90 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -14,7 +14,7 @@ from Queue import Empty
import pymongo
from pymongo import errors
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from pymongo.connection import Connection
from . import virtual
@@ -43,13 +43,13 @@ class Channel(virtual.Channel):
# as of mongo 2.0 empty results won't raise an error
if msg['value'] is None:
raise Empty()
- return deserialize(msg["value"]["payload"])
+ return loads(msg["value"]["payload"])
def _size(self, queue):
return self.client.find({"queue": queue}).count()
def _put(self, queue, message, **kwargs):
- self.client.insert({"payload": serialize(message), "queue": queue})
+ self.client.insert({"payload": dumps(message), "queue": queue})
def _purge(self, queue):
size = self._size(queue)
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 2985cb07..23d6ef5d 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -12,7 +12,7 @@ from __future__ import absolute_import
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from ..exceptions import VersionMismatch
from ..utils import eventio, cached_property
@@ -197,7 +197,7 @@ class Channel(virtual.Channel):
if response is not None:
payload = self._handle_message(c, response)
if payload["type"] == "message":
- return (deserialize(payload["data"]),
+ return (loads(payload["data"]),
self._fanout_to_queue[payload["channel"]])
raise Empty()
@@ -222,7 +222,7 @@ class Channel(virtual.Channel):
raise Empty()
if dest__item:
dest, item = dest__item
- return deserialize(item), dest
+ return loads(item), dest
else:
raise Empty()
finally:
@@ -244,7 +244,7 @@ class Channel(virtual.Channel):
"""
item = self._avail_client.rpop(queue)
if item:
- return deserialize(item)
+ return loads(item)
raise Empty()
def _size(self, queue):
@@ -252,11 +252,11 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
"""Deliver message."""
- self._avail_client.lpush(queue, serialize(message))
+ self._avail_client.lpush(queue, dumps(message))
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
- self._avail_client.publish(exchange, serialize(message))
+ self._avail_client.publish(exchange, dumps(message))
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == "fanout":
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index 6c8e73e5..8f7c25a2 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -2,7 +2,7 @@
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
@@ -59,7 +59,7 @@ class Channel(virtual.Channel):
def _put(self, queue, payload, **kwargs):
obj = self._get_or_create(queue)
- message = Message(serialize(payload), obj)
+ message = Message(dumps(payload), obj)
self.session.add(message)
try:
self.session.commit()
@@ -81,7 +81,7 @@ class Channel(virtual.Channel):
.first()
if msg:
msg.visible = False
- return deserialize(msg.payload)
+ return loads(msg.payload)
raise Empty()
finally:
self.session.commit()
diff --git a/kombu/utils/log.py b/kombu/utils/log.py
deleted file mode 100644
index f8c694bc..00000000
--- a/kombu/utils/log.py
+++ /dev/null
@@ -1,27 +0,0 @@
-"""
-kombu.utils.log
-===============
-
-Logging utilities.
-
-:copyright: (c) 2009 - 2012 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
-"""
-from __future__ import absolute_import
-
-import logging
-
-
-class NullHandler(logging.Handler):
-
- def emit(self, record):
- pass
-
-
-def get_logger(logger):
- if isinstance(logger, basestring):
- logger = logging.getLogger(logger)
- if not logger.handlers:
- logger.addHandler(NullHandler())
- return logger