diff options
author | Ask Solem <ask@celeryproject.org> | 2012-01-15 18:34:47 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-01-15 18:35:21 +0000 |
commit | 8507bd8dffc8ff42c8b456a2d3b7b0f5d63cbb65 (patch) | |
tree | fa38e776578df3ac3b3f1a4ae3ade4b67c66a459 /kombu/transport | |
parent | 2a0bfebdf703839f29068fb9638f580f2619369d (diff) | |
download | kombu-8507bd8dffc8ff42c8b456a2d3b7b0f5d63cbb65.tar.gz |
Cosmetics
Diffstat (limited to 'kombu/transport')
-rw-r--r-- | kombu/transport/SQS.py | 6 | ||||
-rw-r--r-- | kombu/transport/beanstalk.py | 6 | ||||
-rw-r--r-- | kombu/transport/couchdb.py | 6 | ||||
-rw-r--r-- | kombu/transport/django/__init__.py | 11 | ||||
-rw-r--r-- | kombu/transport/mongodb.py | 6 | ||||
-rw-r--r-- | kombu/transport/redis.py | 12 | ||||
-rw-r--r-- | kombu/transport/sqlalchemy/__init__.py | 6 |
7 files changed, 27 insertions, 26 deletions
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index fd3fc2fb..4ec202ef 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -15,7 +15,7 @@ import string from Queue import Empty -from anyjson import serialize, deserialize +from anyjson import loads, dumps from boto import exception from boto import sdb as _sdb @@ -214,7 +214,7 @@ class Channel(virtual.Channel): """Put message onto queue.""" q = self._new_queue(queue) m = Message() - m.set_body(serialize(message)) + m.set_body(dumps(message)) q.write(m) def _put_fanout(self, exchange, message, **kwargs): @@ -228,7 +228,7 @@ class Channel(virtual.Channel): rs = q.get_messages(1) if rs: m = rs[0] - payload = deserialize(rs[0].get_body()) + payload = loads(rs[0].get_body()) if queue in self._noack_queues: q.delete_message(m) else: diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index e84ab88b..ae15b119 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -14,7 +14,7 @@ import socket from Queue import Empty -from anyjson import serialize, deserialize +from anyjson import loads, dumps from beanstalkc import Connection, BeanstalkcException, SocketError from . import virtual @@ -31,7 +31,7 @@ class Channel(virtual.Channel): item, dest = None, None if job: try: - item = deserialize(job.body) + item = loads(job.body) dest = job.stats()["tube"] except Exception: job.bury() @@ -44,7 +44,7 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): priority = message["properties"]["delivery_info"]["priority"] self.client.use(queue) - self.client.put(serialize(message), priority=priority) + self.client.put(dumps(message), priority=priority) def _get(self, queue): if queue not in self.client.watching(): diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index a964f731..014afee4 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -15,7 +15,7 @@ from Queue import Empty import socket import couchdb -from anyjson import serialize, deserialize +from anyjson import loads, dumps from ..utils import uuid4 from . import virtual @@ -47,7 +47,7 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): self.client.save({'_id': uuid4().hex, 'queue': queue, - 'payload': serialize(message)}) + 'payload': dumps(message)}) def _get(self, queue): result = self._query(queue, limit=1) @@ -56,7 +56,7 @@ class Channel(virtual.Channel): item = result.rows[0].value self.client.delete(item) - return deserialize(item["payload"]) + return loads(item["payload"]) def _purge(self, queue): result = self._query(queue) diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py index b7ced9e8..84e5fcbe 100644 --- a/kombu/transport/django/__init__.py +++ b/kombu/transport/django/__init__.py @@ -3,7 +3,7 @@ from __future__ import absolute_import from Queue import Empty -from anyjson import serialize, deserialize +from anyjson import loads, dumps from django.conf import settings from django.core import exceptions as errors @@ -12,10 +12,11 @@ from .. import virtual from .models import Queue -VERSION = (0, 9, 4) +VERSION = (1, 0, 0) __version__ = ".".join(map(str, VERSION)) -POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0) +POLLING_INTERVAL = getattr(settings, "KOMBU_POLLING_INTERVAL", + getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)) class Channel(virtual.Channel): @@ -24,7 +25,7 @@ class Channel(virtual.Channel): Queue.objects.get_or_create(name=queue) def _put(self, queue, message, **kwargs): - Queue.objects.publish(queue, serialize(message)) + Queue.objects.publish(queue, dumps(message)) def basic_consume(self, queue, *args, **kwargs): qinfo = self.state.bindings[queue] @@ -37,7 +38,7 @@ class Channel(virtual.Channel): #self.refresh_connection() m = Queue.objects.fetch(queue) if m: - return deserialize(m) + return loads(m) raise Empty() def _size(self, queue): diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 55b30c29..bb649c90 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -14,7 +14,7 @@ from Queue import Empty import pymongo from pymongo import errors -from anyjson import serialize, deserialize +from anyjson import loads, dumps from pymongo.connection import Connection from . import virtual @@ -43,13 +43,13 @@ class Channel(virtual.Channel): # as of mongo 2.0 empty results won't raise an error if msg['value'] is None: raise Empty() - return deserialize(msg["value"]["payload"]) + return loads(msg["value"]["payload"]) def _size(self, queue): return self.client.find({"queue": queue}).count() def _put(self, queue, message, **kwargs): - self.client.insert({"payload": serialize(message), "queue": queue}) + self.client.insert({"payload": dumps(message), "queue": queue}) def _purge(self, queue): size = self._size(queue) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 2985cb07..23d6ef5d 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -12,7 +12,7 @@ from __future__ import absolute_import from Queue import Empty -from anyjson import serialize, deserialize +from anyjson import loads, dumps from ..exceptions import VersionMismatch from ..utils import eventio, cached_property @@ -197,7 +197,7 @@ class Channel(virtual.Channel): if response is not None: payload = self._handle_message(c, response) if payload["type"] == "message": - return (deserialize(payload["data"]), + return (loads(payload["data"]), self._fanout_to_queue[payload["channel"]]) raise Empty() @@ -222,7 +222,7 @@ class Channel(virtual.Channel): raise Empty() if dest__item: dest, item = dest__item - return deserialize(item), dest + return loads(item), dest else: raise Empty() finally: @@ -244,7 +244,7 @@ class Channel(virtual.Channel): """ item = self._avail_client.rpop(queue) if item: - return deserialize(item) + return loads(item) raise Empty() def _size(self, queue): @@ -252,11 +252,11 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): """Deliver message.""" - self._avail_client.lpush(queue, serialize(message)) + self._avail_client.lpush(queue, dumps(message)) def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message.""" - self._avail_client.publish(exchange, serialize(message)) + self._avail_client.publish(exchange, dumps(message)) def _queue_bind(self, exchange, routing_key, pattern, queue): if self.typeof(exchange).type == "fanout": diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index 6c8e73e5..8f7c25a2 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -2,7 +2,7 @@ from Queue import Empty -from anyjson import serialize, deserialize +from anyjson import loads, dumps from sqlalchemy import create_engine from sqlalchemy.exc import OperationalError from sqlalchemy.orm import sessionmaker @@ -59,7 +59,7 @@ class Channel(virtual.Channel): def _put(self, queue, payload, **kwargs): obj = self._get_or_create(queue) - message = Message(serialize(payload), obj) + message = Message(dumps(payload), obj) self.session.add(message) try: self.session.commit() @@ -81,7 +81,7 @@ class Channel(virtual.Channel): .first() if msg: msg.visible = False - return deserialize(msg.payload) + return loads(msg.payload) raise Empty() finally: self.session.commit() |