summaryrefslogtreecommitdiff
path: root/kombu/transport
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-01-15 18:34:47 +0000
committerAsk Solem <ask@celeryproject.org>2012-01-15 18:35:21 +0000
commit8507bd8dffc8ff42c8b456a2d3b7b0f5d63cbb65 (patch)
treefa38e776578df3ac3b3f1a4ae3ade4b67c66a459 /kombu/transport
parent2a0bfebdf703839f29068fb9638f580f2619369d (diff)
downloadkombu-8507bd8dffc8ff42c8b456a2d3b7b0f5d63cbb65.tar.gz
Cosmetics
Diffstat (limited to 'kombu/transport')
-rw-r--r--kombu/transport/SQS.py6
-rw-r--r--kombu/transport/beanstalk.py6
-rw-r--r--kombu/transport/couchdb.py6
-rw-r--r--kombu/transport/django/__init__.py11
-rw-r--r--kombu/transport/mongodb.py6
-rw-r--r--kombu/transport/redis.py12
-rw-r--r--kombu/transport/sqlalchemy/__init__.py6
7 files changed, 27 insertions, 26 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index fd3fc2fb..4ec202ef 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -15,7 +15,7 @@ import string
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from boto import exception
from boto import sdb as _sdb
@@ -214,7 +214,7 @@ class Channel(virtual.Channel):
"""Put message onto queue."""
q = self._new_queue(queue)
m = Message()
- m.set_body(serialize(message))
+ m.set_body(dumps(message))
q.write(m)
def _put_fanout(self, exchange, message, **kwargs):
@@ -228,7 +228,7 @@ class Channel(virtual.Channel):
rs = q.get_messages(1)
if rs:
m = rs[0]
- payload = deserialize(rs[0].get_body())
+ payload = loads(rs[0].get_body())
if queue in self._noack_queues:
q.delete_message(m)
else:
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index e84ab88b..ae15b119 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -14,7 +14,7 @@ import socket
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from beanstalkc import Connection, BeanstalkcException, SocketError
from . import virtual
@@ -31,7 +31,7 @@ class Channel(virtual.Channel):
item, dest = None, None
if job:
try:
- item = deserialize(job.body)
+ item = loads(job.body)
dest = job.stats()["tube"]
except Exception:
job.bury()
@@ -44,7 +44,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
priority = message["properties"]["delivery_info"]["priority"]
self.client.use(queue)
- self.client.put(serialize(message), priority=priority)
+ self.client.put(dumps(message), priority=priority)
def _get(self, queue):
if queue not in self.client.watching():
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index a964f731..014afee4 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -15,7 +15,7 @@ from Queue import Empty
import socket
import couchdb
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from ..utils import uuid4
from . import virtual
@@ -47,7 +47,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
self.client.save({'_id': uuid4().hex,
'queue': queue,
- 'payload': serialize(message)})
+ 'payload': dumps(message)})
def _get(self, queue):
result = self._query(queue, limit=1)
@@ -56,7 +56,7 @@ class Channel(virtual.Channel):
item = result.rows[0].value
self.client.delete(item)
- return deserialize(item["payload"])
+ return loads(item["payload"])
def _purge(self, queue):
result = self._query(queue)
diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py
index b7ced9e8..84e5fcbe 100644
--- a/kombu/transport/django/__init__.py
+++ b/kombu/transport/django/__init__.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from django.conf import settings
from django.core import exceptions as errors
@@ -12,10 +12,11 @@ from .. import virtual
from .models import Queue
-VERSION = (0, 9, 4)
+VERSION = (1, 0, 0)
__version__ = ".".join(map(str, VERSION))
-POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)
+POLLING_INTERVAL = getattr(settings, "KOMBU_POLLING_INTERVAL",
+ getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0))
class Channel(virtual.Channel):
@@ -24,7 +25,7 @@ class Channel(virtual.Channel):
Queue.objects.get_or_create(name=queue)
def _put(self, queue, message, **kwargs):
- Queue.objects.publish(queue, serialize(message))
+ Queue.objects.publish(queue, dumps(message))
def basic_consume(self, queue, *args, **kwargs):
qinfo = self.state.bindings[queue]
@@ -37,7 +38,7 @@ class Channel(virtual.Channel):
#self.refresh_connection()
m = Queue.objects.fetch(queue)
if m:
- return deserialize(m)
+ return loads(m)
raise Empty()
def _size(self, queue):
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 55b30c29..bb649c90 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -14,7 +14,7 @@ from Queue import Empty
import pymongo
from pymongo import errors
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from pymongo.connection import Connection
from . import virtual
@@ -43,13 +43,13 @@ class Channel(virtual.Channel):
# as of mongo 2.0 empty results won't raise an error
if msg['value'] is None:
raise Empty()
- return deserialize(msg["value"]["payload"])
+ return loads(msg["value"]["payload"])
def _size(self, queue):
return self.client.find({"queue": queue}).count()
def _put(self, queue, message, **kwargs):
- self.client.insert({"payload": serialize(message), "queue": queue})
+ self.client.insert({"payload": dumps(message), "queue": queue})
def _purge(self, queue):
size = self._size(queue)
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 2985cb07..23d6ef5d 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -12,7 +12,7 @@ from __future__ import absolute_import
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from ..exceptions import VersionMismatch
from ..utils import eventio, cached_property
@@ -197,7 +197,7 @@ class Channel(virtual.Channel):
if response is not None:
payload = self._handle_message(c, response)
if payload["type"] == "message":
- return (deserialize(payload["data"]),
+ return (loads(payload["data"]),
self._fanout_to_queue[payload["channel"]])
raise Empty()
@@ -222,7 +222,7 @@ class Channel(virtual.Channel):
raise Empty()
if dest__item:
dest, item = dest__item
- return deserialize(item), dest
+ return loads(item), dest
else:
raise Empty()
finally:
@@ -244,7 +244,7 @@ class Channel(virtual.Channel):
"""
item = self._avail_client.rpop(queue)
if item:
- return deserialize(item)
+ return loads(item)
raise Empty()
def _size(self, queue):
@@ -252,11 +252,11 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
"""Deliver message."""
- self._avail_client.lpush(queue, serialize(message))
+ self._avail_client.lpush(queue, dumps(message))
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
- self._avail_client.publish(exchange, serialize(message))
+ self._avail_client.publish(exchange, dumps(message))
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == "fanout":
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index 6c8e73e5..8f7c25a2 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -2,7 +2,7 @@
from Queue import Empty
-from anyjson import serialize, deserialize
+from anyjson import loads, dumps
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
@@ -59,7 +59,7 @@ class Channel(virtual.Channel):
def _put(self, queue, payload, **kwargs):
obj = self._get_or_create(queue)
- message = Message(serialize(payload), obj)
+ message = Message(dumps(payload), obj)
self.session.add(message)
try:
self.session.commit()
@@ -81,7 +81,7 @@ class Channel(virtual.Channel):
.first()
if msg:
msg.visible = False
- return deserialize(msg.payload)
+ return loads(msg.payload)
raise Empty()
finally:
self.session.commit()