diff options
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r-- | kombu/transport/mongodb.py | 94 |
1 files changed, 47 insertions, 47 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index c4283d74..e8342425 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -22,7 +22,7 @@ from kombu.exceptions import StdChannelError from . import virtual -DEFAULT_HOST = "127.0.0.1" +DEFAULT_HOST = '127.0.0.1' DEFAULT_PORT = 27017 __author__ = """\ @@ -51,33 +51,33 @@ class Channel(virtual.Channel): if queue in self._fanout_queues: msg = self._queue_cursors[queue].next() self._queue_readcounts[queue] += 1 - return loads(msg["payload"]) + return loads(msg['payload']) else: - msg = self.client.command("findandmodify", "messages", - query={"queue": queue}, - sort={"_id": pymongo.ASCENDING}, remove=True) + msg = self.client.command('findandmodify', 'messages', + query={'queue': queue}, + sort={'_id': pymongo.ASCENDING}, remove=True) except errors.OperationFailure, exc: - if "No matching object found" in exc.args[0]: + if 'No matching object found' in exc.args[0]: raise Empty() raise except StopIteration: raise Empty() # as of mongo 2.0 empty results won't raise an error - if msg["value"] is None: + if msg['value'] is None: raise Empty() - return loads(msg["value"]["payload"]) + return loads(msg['value']['payload']) def _size(self, queue): if queue in self._fanout_queues: return (self._queue_cursors[queue].count() - self._queue_readcounts[queue]) - return self.client.messages.find({"queue": queue}).count() + return self.client.messages.find({'queue': queue}).count() def _put(self, queue, message, **kwargs): - self.client.messages.insert({"payload": dumps(message), - "queue": queue}) + self.client.messages.insert({'payload': dumps(message), + 'queue': queue}) def _purge(self, queue): size = self._size(queue) @@ -86,7 +86,7 @@ class Channel(virtual.Channel): cursor.rewind() self._queue_cursors[queue] = cursor.skip(cursor.count()) else: - self.client.messages.remove({"queue": queue}) + self.client.messages.remove({'queue': queue}) return size def close(self): @@ -107,18 +107,18 @@ class Channel(virtual.Channel): if not conninfo.hostname: conninfo.hostname = DEFAULT_HOST - for part in conninfo.hostname.split("/"): + for part in conninfo.hostname.split('/'): if not hostname: - hostname = "mongodb://" + part + hostname = 'mongodb://' + part continue dbname = part - if "?" in part: + if '?' in part: # In case someone is passing options # to the mongodb connection. Right now # it is not permitted by kombu - dbname, options = part.split("?") - hostname += "/?" + options + dbname, options = part.split('?') + hostname += '/?' + options # At this point we expect the hostname to be something like # (considering replica set form too): @@ -126,14 +126,14 @@ class Channel(virtual.Channel): # mongodb://[username:password@]host1[:port1][,host2[:port2], # ...[,hostN[:portN]]][/[?options]] mongoconn = Connection(host=hostname) - version = mongoconn.server_info()["version"] - if tuple(map(int, version.split(".")[:2])) < (1, 3): + version = mongoconn.server_info()['version'] + if tuple(map(int, version.split('.')[:2])) < (1, 3): raise NotImplementedError( - "Kombu requires MongoDB version 1.3+, but connected to %s" % ( + 'Kombu requires MongoDB version 1.3+, but connected to %s' % ( version, )) - if not dbname or dbname == "/": - dbname = "kombu_default" + if not dbname or dbname == '/': + dbname = 'kombu_default' database = getattr(mongoconn, dbname) @@ -143,56 +143,56 @@ class Channel(virtual.Channel): self.db = database col = database.messages - col.ensure_index([("queue", 1)]) + col.ensure_index([('queue', 1)]) - if "messages.broadcast" not in database.collection_names(): + if 'messages.broadcast' not in database.collection_names(): capsize = conninfo.transport_options.get( - "capped_queue_size") or 100000 - database.create_collection("messages.broadcast", size=capsize, + 'capped_queue_size') or 100000 + database.create_collection('messages.broadcast', size=capsize, capped=True) - self.bcast = getattr(database, "messages.broadcast") - self.bcast.ensure_index([("queue", 1)]) + self.bcast = getattr(database, 'messages.broadcast') + self.bcast.ensure_index([('queue', 1)]) - self.routing = getattr(database, "messages.routing") - self.routing.ensure_index([("queue", 1), ("exchange", 1)]) + self.routing = getattr(database, 'messages.routing') + self.routing.ensure_index([('queue', 1), ('exchange', 1)]) return database #TODO: Store a more complete exchange metatable in the routing collection def get_table(self, exchange): """Get table of bindings for ``exchange``.""" brokerRoutes = self.client.messages.routing.find({ - "exchange": exchange}) + 'exchange': exchange}) - localRoutes = self.state.exchanges[exchange]["table"] + localRoutes = self.state.exchanges[exchange]['table'] for route in brokerRoutes: - localRoutes.append((route["routing_key"], - route["pattern"], - route["queue"])) + localRoutes.append((route['routing_key'], + route['pattern'], + route['queue'])) return set(localRoutes) def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message.""" - self.client.messages.broadcast.insert({"payload": dumps(message), - "queue": exchange}) + self.client.messages.broadcast.insert({'payload': dumps(message), + 'queue': exchange}) def _queue_bind(self, exchange, routing_key, pattern, queue): - if self.typeof(exchange).type == "fanout": - cursor = self.bcast.find(query={"queue": exchange}, - sort=[("$natural", 1)], tailable=True) + if self.typeof(exchange).type == 'fanout': + cursor = self.bcast.find(query={'queue': exchange}, + sort=[('$natural', 1)], tailable=True) # Fast forward the cursor past old events self._queue_cursors[queue] = cursor.skip(cursor.count()) self._queue_readcounts[queue] = cursor.count() self._fanout_queues[queue] = exchange - meta = {"exchange": exchange, - "queue": queue, - "routing_key": routing_key, - "pattern": pattern} + meta = {'exchange': exchange, + 'queue': queue, + 'routing_key': routing_key, + 'pattern': pattern} self.client.messages.routing.update(meta, meta, upsert=True) def queue_delete(self, queue, **kwargs): - self.routing.remove({"queue": queue}) + self.routing.remove({'queue': queue}) super(Channel, self).queue_delete(queue, **kwargs) if queue in self._fanout_queues: self._queue_cursors[queue].close() @@ -215,8 +215,8 @@ class Transport(virtual.Transport): channel_errors = (StdChannelError, errors.ConnectionFailure, errors.OperationFailure, ) - driver_type = "mongodb" - driver_name = "pymongo" + driver_type = 'mongodb' + driver_name = 'pymongo' def driver_version(self): return pymongo.version |