summaryrefslogtreecommitdiff
path: root/kombu/transport/mongodb.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r--kombu/transport/mongodb.py94
1 files changed, 47 insertions, 47 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index c4283d74..e8342425 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -22,7 +22,7 @@ from kombu.exceptions import StdChannelError
from . import virtual
-DEFAULT_HOST = "127.0.0.1"
+DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 27017
__author__ = """\
@@ -51,33 +51,33 @@ class Channel(virtual.Channel):
if queue in self._fanout_queues:
msg = self._queue_cursors[queue].next()
self._queue_readcounts[queue] += 1
- return loads(msg["payload"])
+ return loads(msg['payload'])
else:
- msg = self.client.command("findandmodify", "messages",
- query={"queue": queue},
- sort={"_id": pymongo.ASCENDING}, remove=True)
+ msg = self.client.command('findandmodify', 'messages',
+ query={'queue': queue},
+ sort={'_id': pymongo.ASCENDING}, remove=True)
except errors.OperationFailure, exc:
- if "No matching object found" in exc.args[0]:
+ if 'No matching object found' in exc.args[0]:
raise Empty()
raise
except StopIteration:
raise Empty()
# as of mongo 2.0 empty results won't raise an error
- if msg["value"] is None:
+ if msg['value'] is None:
raise Empty()
- return loads(msg["value"]["payload"])
+ return loads(msg['value']['payload'])
def _size(self, queue):
if queue in self._fanout_queues:
return (self._queue_cursors[queue].count() -
self._queue_readcounts[queue])
- return self.client.messages.find({"queue": queue}).count()
+ return self.client.messages.find({'queue': queue}).count()
def _put(self, queue, message, **kwargs):
- self.client.messages.insert({"payload": dumps(message),
- "queue": queue})
+ self.client.messages.insert({'payload': dumps(message),
+ 'queue': queue})
def _purge(self, queue):
size = self._size(queue)
@@ -86,7 +86,7 @@ class Channel(virtual.Channel):
cursor.rewind()
self._queue_cursors[queue] = cursor.skip(cursor.count())
else:
- self.client.messages.remove({"queue": queue})
+ self.client.messages.remove({'queue': queue})
return size
def close(self):
@@ -107,18 +107,18 @@ class Channel(virtual.Channel):
if not conninfo.hostname:
conninfo.hostname = DEFAULT_HOST
- for part in conninfo.hostname.split("/"):
+ for part in conninfo.hostname.split('/'):
if not hostname:
- hostname = "mongodb://" + part
+ hostname = 'mongodb://' + part
continue
dbname = part
- if "?" in part:
+ if '?' in part:
# In case someone is passing options
# to the mongodb connection. Right now
# it is not permitted by kombu
- dbname, options = part.split("?")
- hostname += "/?" + options
+ dbname, options = part.split('?')
+ hostname += '/?' + options
# At this point we expect the hostname to be something like
# (considering replica set form too):
@@ -126,14 +126,14 @@ class Channel(virtual.Channel):
# mongodb://[username:password@]host1[:port1][,host2[:port2],
# ...[,hostN[:portN]]][/[?options]]
mongoconn = Connection(host=hostname)
- version = mongoconn.server_info()["version"]
- if tuple(map(int, version.split(".")[:2])) < (1, 3):
+ version = mongoconn.server_info()['version']
+ if tuple(map(int, version.split('.')[:2])) < (1, 3):
raise NotImplementedError(
- "Kombu requires MongoDB version 1.3+, but connected to %s" % (
+ 'Kombu requires MongoDB version 1.3+, but connected to %s' % (
version, ))
- if not dbname or dbname == "/":
- dbname = "kombu_default"
+ if not dbname or dbname == '/':
+ dbname = 'kombu_default'
database = getattr(mongoconn, dbname)
@@ -143,56 +143,56 @@ class Channel(virtual.Channel):
self.db = database
col = database.messages
- col.ensure_index([("queue", 1)])
+ col.ensure_index([('queue', 1)])
- if "messages.broadcast" not in database.collection_names():
+ if 'messages.broadcast' not in database.collection_names():
capsize = conninfo.transport_options.get(
- "capped_queue_size") or 100000
- database.create_collection("messages.broadcast", size=capsize,
+ 'capped_queue_size') or 100000
+ database.create_collection('messages.broadcast', size=capsize,
capped=True)
- self.bcast = getattr(database, "messages.broadcast")
- self.bcast.ensure_index([("queue", 1)])
+ self.bcast = getattr(database, 'messages.broadcast')
+ self.bcast.ensure_index([('queue', 1)])
- self.routing = getattr(database, "messages.routing")
- self.routing.ensure_index([("queue", 1), ("exchange", 1)])
+ self.routing = getattr(database, 'messages.routing')
+ self.routing.ensure_index([('queue', 1), ('exchange', 1)])
return database
#TODO: Store a more complete exchange metatable in the routing collection
def get_table(self, exchange):
"""Get table of bindings for ``exchange``."""
brokerRoutes = self.client.messages.routing.find({
- "exchange": exchange})
+ 'exchange': exchange})
- localRoutes = self.state.exchanges[exchange]["table"]
+ localRoutes = self.state.exchanges[exchange]['table']
for route in brokerRoutes:
- localRoutes.append((route["routing_key"],
- route["pattern"],
- route["queue"]))
+ localRoutes.append((route['routing_key'],
+ route['pattern'],
+ route['queue']))
return set(localRoutes)
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
- self.client.messages.broadcast.insert({"payload": dumps(message),
- "queue": exchange})
+ self.client.messages.broadcast.insert({'payload': dumps(message),
+ 'queue': exchange})
def _queue_bind(self, exchange, routing_key, pattern, queue):
- if self.typeof(exchange).type == "fanout":
- cursor = self.bcast.find(query={"queue": exchange},
- sort=[("$natural", 1)], tailable=True)
+ if self.typeof(exchange).type == 'fanout':
+ cursor = self.bcast.find(query={'queue': exchange},
+ sort=[('$natural', 1)], tailable=True)
# Fast forward the cursor past old events
self._queue_cursors[queue] = cursor.skip(cursor.count())
self._queue_readcounts[queue] = cursor.count()
self._fanout_queues[queue] = exchange
- meta = {"exchange": exchange,
- "queue": queue,
- "routing_key": routing_key,
- "pattern": pattern}
+ meta = {'exchange': exchange,
+ 'queue': queue,
+ 'routing_key': routing_key,
+ 'pattern': pattern}
self.client.messages.routing.update(meta, meta, upsert=True)
def queue_delete(self, queue, **kwargs):
- self.routing.remove({"queue": queue})
+ self.routing.remove({'queue': queue})
super(Channel, self).queue_delete(queue, **kwargs)
if queue in self._fanout_queues:
self._queue_cursors[queue].close()
@@ -215,8 +215,8 @@ class Transport(virtual.Transport):
channel_errors = (StdChannelError,
errors.ConnectionFailure,
errors.OperationFailure, )
- driver_type = "mongodb"
- driver_name = "pymongo"
+ driver_type = 'mongodb'
+ driver_name = 'pymongo'
def driver_version(self):
return pymongo.version