summaryrefslogtreecommitdiff
path: root/kombu/transport/mongodb.py
diff options
context:
space:
mode:
authorAlex Koshelev <daevaorn@gmail.com>2015-11-07 18:56:09 +0300
committerAsk Solem <ask@celeryproject.org>2015-12-09 11:34:54 -0800
commitd480221a6edcb44072ef8ad5ea5fc32dc58651f3 (patch)
treeab9c9681cb11b38dc9774bbc86375710379f1917 /kombu/transport/mongodb.py
parentf2e8b9791f51984cbae12e5a08b6d729fec57791 (diff)
downloadkombu-d480221a6edcb44072ef8ad5ea5fc32dc58651f3.tar.gz
Rearrange methods at MongoDB channel class
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r--kombu/transport/mongodb.py99
1 files changed, 51 insertions, 48 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index b072dd4b..44d9cf46 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -106,6 +106,8 @@ class Channel(virtual.Channel):
# Evaluate connection
self._create_client()
+ # AbstractChannel/Channel interface implementation
+
def _new_queue(self, queue, **kwargs):
if self.ttl:
self.get_queues().update({'_id': queue},
@@ -164,6 +166,54 @@ class Channel(virtual.Channel):
return size
+ def get_table(self, exchange):
+ """Get table of bindings for ``exchange``."""
+ localRoutes = frozenset(self.state.exchanges[exchange]['table'])
+ brokerRoutes = self.get_messages().routing.find(
+ {'exchange': exchange}
+ )
+
+ return localRoutes | frozenset((r['routing_key'],
+ r['pattern'],
+ r['queue']) for r in brokerRoutes)
+
+ def _queue_bind(self, exchange, routing_key, pattern, queue):
+ if self.typeof(exchange).type == 'fanout':
+ self.create_broadcast_cursor(exchange, routing_key, pattern, queue)
+ self._fanout_queues[queue] = exchange
+
+ lookup = {'exchange': exchange,
+ 'queue': queue,
+ 'routing_key': routing_key,
+ 'pattern': pattern}
+
+ data = lookup.copy()
+
+ if self.ttl:
+ data['expire_at'] = self.get_expire(queue, 'x-expires')
+
+ self.get_routing().update(lookup, data, upsert=True)
+
+ def queue_delete(self, queue, **kwargs):
+ self.get_routing().remove({'queue': queue})
+
+ if self.ttl:
+ self.get_queues().remove({'_id': queue})
+
+ super(Channel, self).queue_delete(queue, **kwargs)
+
+ if queue in self._fanout_queues:
+ try:
+ cursor = self._broadcast_cursors.pop(queue)
+ except KeyError:
+ pass
+ else:
+ cursor.close()
+
+ self._fanout_queues.pop(queue)
+
+ # Implementation details
+
def _parse_uri(self, scheme='mongodb://'):
# See mongodb uri documentation:
# http://docs.mongodb.org/manual/reference/connection-string/
@@ -266,60 +316,13 @@ class Channel(virtual.Channel):
self.get_queues().ensure_index([('expire_at', 1)], expireAfterSeconds=0)
- def get_table(self, exchange):
- """Get table of bindings for ``exchange``."""
- # TODO Store a more complete exchange metatable in the
- # routing collection
- localRoutes = frozenset(self.state.exchanges[exchange]['table'])
- brokerRoutes = self.get_messages().routing.find(
- {'exchange': exchange}
- )
-
- return localRoutes | frozenset((r['routing_key'],
- r['pattern'],
- r['queue']) for r in brokerRoutes)
-
def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
self.get_broadcast().insert({'payload': dumps(message),
'queue': exchange})
- def _queue_bind(self, exchange, routing_key, pattern, queue):
- if self.typeof(exchange).type == 'fanout':
- self.create_broadcast_cursor(exchange, routing_key, pattern, queue)
- self._fanout_queues[queue] = exchange
-
- lookup = {'exchange': exchange,
- 'queue': queue,
- 'routing_key': routing_key,
- 'pattern': pattern}
-
- data = lookup.copy()
-
- if self.ttl:
- data['expire_at'] = self.get_expire(queue, 'x-expires')
-
- self.get_routing().update(lookup, data, upsert=True)
-
- def queue_delete(self, queue, **kwargs):
- self.get_routing().remove({'queue': queue})
-
- if self.ttl:
- self.get_queues().remove({'_id': queue})
-
- super(Channel, self).queue_delete(queue, **kwargs)
-
- if queue in self._fanout_queues:
- try:
- cursor = self._broadcast_cursors.pop(queue)
- except KeyError:
- pass
- else:
- cursor.close()
-
- self._fanout_queues.pop(queue)
-
def _create_client(self):
+ '''Actualy creates connection'''
self._open()
self._ensure_indexes()