diff options
author | Flaper Fesp <flaper87@gmail.com> | 2012-09-09 15:35:27 +0200 |
---|---|---|
committer | Flaper Fesp <flaper87@gmail.com> | 2012-09-09 15:35:27 +0200 |
commit | a8f740cd7de9c7a3e719863907d3a51c2b5c6e94 (patch) | |
tree | ac440b7a46720c31ddffe884b5466b5ae3b8dcff | |
parent | 08dddbe3a425d5594c5bfd097915c6a9b93bebbe (diff) | |
download | kombu-feature/coll_queue.tar.gz |
Starting to migrate fanout queues toofeature/coll_queue
-rw-r--r-- | kombu/transport/mongodb.py | 66 |
1 files changed, 40 insertions, 26 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 5fcbc4fe..0d6d22cc 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -36,6 +36,9 @@ class Channel(virtual.Channel): supports_fanout = True _fanout_queues = {} + _bcasts = {} + _routings = {} + def __init__(self, *vargs, **kwargs): super_ = super(Channel, self) super_.__init__(*vargs, **kwargs) @@ -138,33 +141,46 @@ class Channel(virtual.Channel): 'Kombu requires MongoDB version 1.3+, but connected to %s' % ( version, )) - database = getattr(mongoconn, dbname) - # This is done by the connection uri # if conninfo.userid: # database.authenticate(conninfo.userid, conninfo.password) - self.db = database - col = database.messages - col.ensure_index([('queue', 1)]) + return getattr(mongoconn, dbname) + + def _broadcast(self, exchange): + """ + """ + bcast = self._bcasts.get(exchange) - if 'messages.broadcast' not in database.collection_names(): - capsize = conninfo.transport_options.get( - 'capped_queue_size') or 100000 - database.create_collection('messages.broadcast', size=capsize, - capped=True) + if bcast: + return bcast + + name = '%s.broadcast' % exchange + if not name in self.client.collection_names(): + conninfo = self.connection.client + capsize = conninfo.transport_options.get('capped_queue_size', 100000) + self.client.create_collection(name, size=capsize, capped=True) + + bcast = getattr(self.client, name) + self._bcasts[exchange] = bcast + return bcast + + def _routing(self, exchange): + """ + """ + routing = self._routings.get(exchange) - self.bcast = getattr(database, 'messages.broadcast') - self.bcast.ensure_index([('queue', 1)]) + if routing: + return routing - self.routing = getattr(database, 'messages.routing') - self.routing.ensure_index([('queue', 1), ('exchange', 1)]) - return database + routing = getattr(self.client, '%s.routing' % exchange) + routing.ensure_index([('queue', 1)]) + self._routings[exchange] = routing + return routing #TODO: Store a more complete exchange metatable in the routing collection def get_table(self, exchange): """Get table of bindings for ``exchange``.""" - brokerRoutes = self.client.messages.routing.find({ - 'exchange': exchange}) + brokerRoutes = self._routing(exchange).find() localRoutes = self.state.exchanges[exchange]['table'] for route in brokerRoutes: @@ -175,23 +191,21 @@ class Channel(virtual.Channel): def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message.""" - self.client.messages.broadcast.insert({'payload': dumps(message), - 'queue': exchange}) + self._broadcast(exchange).insert({'payload': dumps(message)}) def _queue_bind(self, exchange, routing_key, pattern, queue): if self.typeof(exchange).type == 'fanout': - cursor = self.bcast.find(query={'queue': exchange}, - sort=[('$natural', 1)], tailable=True) + cursor = self._broadcast(exchange).find(query={}, sort=[('$natural', 1)], tailable=True) # Fast forward the cursor past old events self._queue_cursors[queue] = cursor.skip(cursor.count()) self._queue_readcounts[queue] = cursor.count() self._fanout_queues[queue] = exchange - meta = {'exchange': exchange, - 'queue': queue, - 'routing_key': routing_key, - 'pattern': pattern} - self.client.messages.routing.update(meta, meta, upsert=True) + meta = {'queue': queue, + # 'exchange': exchange, + 'pattern': pattern, + 'routing_key': routing_key} + self._routing(queue).update(meta, meta, upsert=True) def queue_delete(self, queue, **kwargs): self.routing.remove({'queue': queue}) |