summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-02-03 15:38:45 +0000
committerAsk Solem <ask@celeryproject.org>2012-02-03 15:38:45 +0000
commitd1cbcef94cca5ed3bef92a2ac708759dac35f9f6 (patch)
treec9eafbfcfed3130cc3b7a286dfac42fdab7d2f63
parent7b706c40abd8ef128e938acfdbf7b36d8838f1a3 (diff)
downloadkombu-d1cbcef94cca5ed3bef92a2ac708759dac35f9f6.tar.gz
PEP8ify + pyflakes
-rw-r--r--kombu/tests/test_transport_mongodb.py5
-rw-r--r--kombu/transport/mongodb.py79
2 files changed, 46 insertions, 38 deletions
diff --git a/kombu/tests/test_transport_mongodb.py b/kombu/tests/test_transport_mongodb.py
index bd1c06d9..57fa9f14 100644
--- a/kombu/tests/test_transport_mongodb.py
+++ b/kombu/tests/test_transport_mongodb.py
@@ -1,16 +1,12 @@
from __future__ import absolute_import
from __future__ import with_statement
-import socket
-
from ..connection import BrokerConnection
from ..entity import Exchange, Queue
from ..messaging import Consumer, Producer
from .utils import TestCase
-import sys
-
class test_MongoDBTransport(TestCase):
@@ -23,7 +19,6 @@ class test_MongoDBTransport(TestCase):
self.q2 = Queue("test_transport_memory2",
exchange=self.e,
routing_key="test_transport_mongodb2")
-
def test_produce_consume(self):
channel = self.c.channel()
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 2e98fc0b..d924e465 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -13,6 +13,7 @@ from __future__ import absolute_import
from Queue import Empty
import pymongo
+
from pymongo import errors
from anyjson import loads, dumps
from pymongo.connection import Connection
@@ -22,18 +23,21 @@ from . import virtual
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 27017
-__author__ = "Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>; Scott Lyons <scottalyons@gmail.com>"
+__author__ = """\
+Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>;\
+Scott Lyons <scottalyons@gmail.com>;\
+"""
class Channel(virtual.Channel):
_client = None
- supports_fanout=True
+ supports_fanout = True
_fanout_queues = {}
-
+
def __init__(self, *vargs, **kwargs):
super_ = super(Channel, self)
super_.__init__(*vargs, **kwargs)
-
+
self._queue_cursors = {}
self._queue_readcounts = {}
@@ -44,7 +48,7 @@ class Channel(virtual.Channel):
try:
if queue in self._fanout_queues:
msg = self._queue_cursors[queue].next()
- self._queue_readcounts[queue]+=1
+ self._queue_readcounts[queue] += 1
return loads(msg["payload"])
else:
msg = self.client.command("findandmodify", "messages",
@@ -56,20 +60,22 @@ class Channel(virtual.Channel):
raise
except StopIteration:
raise Empty()
-
+
# as of mongo 2.0 empty results won't raise an error
- if msg['value'] is None:
+ if msg["value"] is None:
raise Empty()
return loads(msg["value"]["payload"])
def _size(self, queue):
if queue in self._fanout_queues:
- return self._queue_cursors[queue].count() - self._queue_readcounts[queue]
-
+ return (self._queue_cursors[queue].count() -
+ self._queue_readcounts[queue])
+
return self.client.messages.find({"queue": queue}).count()
def _put(self, queue, message, **kwargs):
- self.client.messages.insert({"payload": dumps(message), "queue": queue})
+ self.client.messages.insert({"payload": dumps(message),
+ "queue": queue})
def _purge(self, queue):
size = self._size(queue)
@@ -100,55 +106,62 @@ class Channel(virtual.Channel):
database = getattr(mongoconn, dbname)
if conninfo.userid:
database.authenticate(conninfo.userid, conninfo.password)
-
+
self.db = database
col = database.messages
col.ensure_index([("queue", 1)])
-
+
if "messages.broadcast" not in database.collection_names():
capsize = conninfo.capped_queue_size or 100000
- database.create_collection("messages.broadcast", size=capsize, capped=True)
-
+ database.create_collection("messages.broadcast", size=capsize,
+ capped=True)
+
self.bcast = getattr(database, "messages.broadcast")
self.bcast.ensure_index([("queue", 1)])
-
+
self.routing = getattr(database, "messages.routing")
self.routing.ensure_index([("queue", 1), ("exchange", 1)])
return database
-
+
def get_table(self, exchange):
- """Get table of bindings for `exchange`."""
- brokerRoutes = self.client.messages.routing.find({"exchange":exchange})
+ """Get table of bindings for ``exchange``."""
+ brokerRoutes = self.client.messages.routing.find({
+ "exchange": exchange})
localRoutes = self.state.exchanges[exchange]["table"]
for route in brokerRoutes:
- localRoutes.append((route["routing_key"], route["pattern"], route["queue"]))
+ localRoutes.append((route["routing_key"],
+ route["pattern"],
+ route["queue"]))
return set(localRoutes)
-
+
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
- self.client.messages.broadcast.insert({"payload": dumps(message), "queue": exchange})
-
+ self.client.messages.broadcast.insert({"payload": dumps(message),
+ "queue": exchange})
+
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == "fanout":
- cursor = self.bcast.find(query={"queue":exchange}, sort=[("$natural", 1)], tailable=True)
+ cursor = self.bcast.find(query={"queue": exchange},
+ sort=[("$natural", 1)], tailable=True)
# Fast forward the cursor past old events
self._queue_cursors[queue] = cursor.skip(cursor.count())
self._queue_readcounts[queue] = cursor.count()
self._fanout_queues[queue] = exchange
-
- meta = dict(exchange=exchange, queue=queue, routing_key=routing_key, pattern=pattern)
+
+ meta = {"exchange": exchange,
+ "queue": queue,
+ "routing_key": routing_key,
+ "pattern": pattern}
self.client.messages.routing.update(meta, meta, upsert=True)
-
-
- def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs):
- self.routing.remove({"queue":queue})
- super(Channel, self).queue_delete(queue, if_unusued, if_empty, **kwargs)
+
+ def queue_delete(self, queue, **kwargs):
+ self.routing.remove({"queue": queue})
+ super(Channel, self).queue_delete(queue, **kwargs)
if queue in self._fanout_queues:
self._queue_cursors[queue].close()
- del self._queue_cursors[queue]
- del self._fanout_queues[queue]
-
+ self._queue_cursors.pop(queue, None)
+ self._fanout_queues.pop(queue, None)
@property
def client(self):