diff options
author | Ask Solem <ask@celeryproject.org> | 2012-02-03 15:38:45 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-02-03 15:38:45 +0000 |
commit | d1cbcef94cca5ed3bef92a2ac708759dac35f9f6 (patch) | |
tree | c9eafbfcfed3130cc3b7a286dfac42fdab7d2f63 | |
parent | 7b706c40abd8ef128e938acfdbf7b36d8838f1a3 (diff) | |
download | kombu-d1cbcef94cca5ed3bef92a2ac708759dac35f9f6.tar.gz |
PEP8ify + pyflakes
-rw-r--r-- | kombu/tests/test_transport_mongodb.py | 5 | ||||
-rw-r--r-- | kombu/transport/mongodb.py | 79 |
2 files changed, 46 insertions, 38 deletions
diff --git a/kombu/tests/test_transport_mongodb.py b/kombu/tests/test_transport_mongodb.py index bd1c06d9..57fa9f14 100644 --- a/kombu/tests/test_transport_mongodb.py +++ b/kombu/tests/test_transport_mongodb.py @@ -1,16 +1,12 @@ from __future__ import absolute_import from __future__ import with_statement -import socket - from ..connection import BrokerConnection from ..entity import Exchange, Queue from ..messaging import Consumer, Producer from .utils import TestCase -import sys - class test_MongoDBTransport(TestCase): @@ -23,7 +19,6 @@ class test_MongoDBTransport(TestCase): self.q2 = Queue("test_transport_memory2", exchange=self.e, routing_key="test_transport_mongodb2") - def test_produce_consume(self): channel = self.c.channel() diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 2e98fc0b..d924e465 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -13,6 +13,7 @@ from __future__ import absolute_import from Queue import Empty import pymongo + from pymongo import errors from anyjson import loads, dumps from pymongo.connection import Connection @@ -22,18 +23,21 @@ from . import virtual DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 27017 -__author__ = "Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>; Scott Lyons <scottalyons@gmail.com>" +__author__ = """\ +Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>;\ +Scott Lyons <scottalyons@gmail.com>;\ +""" class Channel(virtual.Channel): _client = None - supports_fanout=True + supports_fanout = True _fanout_queues = {} - + def __init__(self, *vargs, **kwargs): super_ = super(Channel, self) super_.__init__(*vargs, **kwargs) - + self._queue_cursors = {} self._queue_readcounts = {} @@ -44,7 +48,7 @@ class Channel(virtual.Channel): try: if queue in self._fanout_queues: msg = self._queue_cursors[queue].next() - self._queue_readcounts[queue]+=1 + self._queue_readcounts[queue] += 1 return loads(msg["payload"]) else: msg = self.client.command("findandmodify", "messages", @@ -56,20 +60,22 @@ class Channel(virtual.Channel): raise except StopIteration: raise Empty() - + # as of mongo 2.0 empty results won't raise an error - if msg['value'] is None: + if msg["value"] is None: raise Empty() return loads(msg["value"]["payload"]) def _size(self, queue): if queue in self._fanout_queues: - return self._queue_cursors[queue].count() - self._queue_readcounts[queue] - + return (self._queue_cursors[queue].count() - + self._queue_readcounts[queue]) + return self.client.messages.find({"queue": queue}).count() def _put(self, queue, message, **kwargs): - self.client.messages.insert({"payload": dumps(message), "queue": queue}) + self.client.messages.insert({"payload": dumps(message), + "queue": queue}) def _purge(self, queue): size = self._size(queue) @@ -100,55 +106,62 @@ class Channel(virtual.Channel): database = getattr(mongoconn, dbname) if conninfo.userid: database.authenticate(conninfo.userid, conninfo.password) - + self.db = database col = database.messages col.ensure_index([("queue", 1)]) - + if "messages.broadcast" not in database.collection_names(): capsize = conninfo.capped_queue_size or 100000 - database.create_collection("messages.broadcast", size=capsize, capped=True) - + database.create_collection("messages.broadcast", size=capsize, + capped=True) + self.bcast = getattr(database, "messages.broadcast") self.bcast.ensure_index([("queue", 1)]) - + self.routing = getattr(database, "messages.routing") self.routing.ensure_index([("queue", 1), ("exchange", 1)]) return database - + def get_table(self, exchange): - """Get table of bindings for `exchange`.""" - brokerRoutes = self.client.messages.routing.find({"exchange":exchange}) + """Get table of bindings for ``exchange``.""" + brokerRoutes = self.client.messages.routing.find({ + "exchange": exchange}) localRoutes = self.state.exchanges[exchange]["table"] for route in brokerRoutes: - localRoutes.append((route["routing_key"], route["pattern"], route["queue"])) + localRoutes.append((route["routing_key"], + route["pattern"], + route["queue"])) return set(localRoutes) - + def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message.""" - self.client.messages.broadcast.insert({"payload": dumps(message), "queue": exchange}) - + self.client.messages.broadcast.insert({"payload": dumps(message), + "queue": exchange}) + def _queue_bind(self, exchange, routing_key, pattern, queue): if self.typeof(exchange).type == "fanout": - cursor = self.bcast.find(query={"queue":exchange}, sort=[("$natural", 1)], tailable=True) + cursor = self.bcast.find(query={"queue": exchange}, + sort=[("$natural", 1)], tailable=True) # Fast forward the cursor past old events self._queue_cursors[queue] = cursor.skip(cursor.count()) self._queue_readcounts[queue] = cursor.count() self._fanout_queues[queue] = exchange - - meta = dict(exchange=exchange, queue=queue, routing_key=routing_key, pattern=pattern) + + meta = {"exchange": exchange, + "queue": queue, + "routing_key": routing_key, + "pattern": pattern} self.client.messages.routing.update(meta, meta, upsert=True) - - - def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs): - self.routing.remove({"queue":queue}) - super(Channel, self).queue_delete(queue, if_unusued, if_empty, **kwargs) + + def queue_delete(self, queue, **kwargs): + self.routing.remove({"queue": queue}) + super(Channel, self).queue_delete(queue, **kwargs) if queue in self._fanout_queues: self._queue_cursors[queue].close() - del self._queue_cursors[queue] - del self._fanout_queues[queue] - + self._queue_cursors.pop(queue, None) + self._fanout_queues.pop(queue, None) @property def client(self): |