summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlaper Fesp <flaper87@gmail.com>2012-09-09 15:35:27 +0200
committerFlaper Fesp <flaper87@gmail.com>2012-09-09 15:35:27 +0200
commita8f740cd7de9c7a3e719863907d3a51c2b5c6e94 (patch)
treeac440b7a46720c31ddffe884b5466b5ae3b8dcff
parent08dddbe3a425d5594c5bfd097915c6a9b93bebbe (diff)
downloadkombu-feature/coll_queue.tar.gz
Starting to migrate fanout queues toofeature/coll_queue
-rw-r--r--kombu/transport/mongodb.py66
1 files changed, 40 insertions, 26 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 5fcbc4fe..0d6d22cc 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -36,6 +36,9 @@ class Channel(virtual.Channel):
supports_fanout = True
_fanout_queues = {}
+ _bcasts = {}
+ _routings = {}
+
def __init__(self, *vargs, **kwargs):
super_ = super(Channel, self)
super_.__init__(*vargs, **kwargs)
@@ -138,33 +141,46 @@ class Channel(virtual.Channel):
'Kombu requires MongoDB version 1.3+, but connected to %s' % (
version, ))
- database = getattr(mongoconn, dbname)
-
# This is done by the connection uri
# if conninfo.userid:
# database.authenticate(conninfo.userid, conninfo.password)
- self.db = database
- col = database.messages
- col.ensure_index([('queue', 1)])
+ return getattr(mongoconn, dbname)
+
+ def _broadcast(self, exchange):
+ """
+ """
+ bcast = self._bcasts.get(exchange)
- if 'messages.broadcast' not in database.collection_names():
- capsize = conninfo.transport_options.get(
- 'capped_queue_size') or 100000
- database.create_collection('messages.broadcast', size=capsize,
- capped=True)
+ if bcast:
+ return bcast
+
+ name = '%s.broadcast' % exchange
+ if not name in self.client.collection_names():
+ conninfo = self.connection.client
+ capsize = conninfo.transport_options.get('capped_queue_size', 100000)
+ self.client.create_collection(name, size=capsize, capped=True)
+
+ bcast = getattr(self.client, name)
+ self._bcasts[exchange] = bcast
+ return bcast
+
+ def _routing(self, exchange):
+ """
+ """
+ routing = self._routings.get(exchange)
- self.bcast = getattr(database, 'messages.broadcast')
- self.bcast.ensure_index([('queue', 1)])
+ if routing:
+ return routing
- self.routing = getattr(database, 'messages.routing')
- self.routing.ensure_index([('queue', 1), ('exchange', 1)])
- return database
+ routing = getattr(self.client, '%s.routing' % exchange)
+ routing.ensure_index([('queue', 1)])
+ self._routings[exchange] = routing
+ return routing
#TODO: Store a more complete exchange metatable in the routing collection
def get_table(self, exchange):
"""Get table of bindings for ``exchange``."""
- brokerRoutes = self.client.messages.routing.find({
- 'exchange': exchange})
+ brokerRoutes = self._routing(exchange).find()
localRoutes = self.state.exchanges[exchange]['table']
for route in brokerRoutes:
@@ -175,23 +191,21 @@ class Channel(virtual.Channel):
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
- self.client.messages.broadcast.insert({'payload': dumps(message),
- 'queue': exchange})
+ self._broadcast(exchange).insert({'payload': dumps(message)})
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == 'fanout':
- cursor = self.bcast.find(query={'queue': exchange},
- sort=[('$natural', 1)], tailable=True)
+ cursor = self._broadcast(exchange).find(query={}, sort=[('$natural', 1)], tailable=True)
# Fast forward the cursor past old events
self._queue_cursors[queue] = cursor.skip(cursor.count())
self._queue_readcounts[queue] = cursor.count()
self._fanout_queues[queue] = exchange
- meta = {'exchange': exchange,
- 'queue': queue,
- 'routing_key': routing_key,
- 'pattern': pattern}
- self.client.messages.routing.update(meta, meta, upsert=True)
+ meta = {'queue': queue,
+ # 'exchange': exchange,
+ 'pattern': pattern,
+ 'routing_key': routing_key}
+ self._routing(queue).update(meta, meta, upsert=True)
def queue_delete(self, queue, **kwargs):
self.routing.remove({'queue': queue})