diff options
author | Alex Koshelev <daevaorn@gmail.com> | 2015-11-07 18:56:09 +0300 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-12-09 11:34:54 -0800 |
commit | d480221a6edcb44072ef8ad5ea5fc32dc58651f3 (patch) | |
tree | ab9c9681cb11b38dc9774bbc86375710379f1917 /kombu/transport/mongodb.py | |
parent | f2e8b9791f51984cbae12e5a08b6d729fec57791 (diff) | |
download | kombu-d480221a6edcb44072ef8ad5ea5fc32dc58651f3.tar.gz |
Rearrange methods at MongoDB channel class
Diffstat (limited to 'kombu/transport/mongodb.py')
-rw-r--r-- | kombu/transport/mongodb.py | 99 |
1 files changed, 51 insertions, 48 deletions
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index b072dd4b..44d9cf46 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -106,6 +106,8 @@ class Channel(virtual.Channel): # Evaluate connection self._create_client() + # AbstractChannel/Channel interface implementation + def _new_queue(self, queue, **kwargs): if self.ttl: self.get_queues().update({'_id': queue}, @@ -164,6 +166,54 @@ class Channel(virtual.Channel): return size + def get_table(self, exchange): + """Get table of bindings for ``exchange``.""" + localRoutes = frozenset(self.state.exchanges[exchange]['table']) + brokerRoutes = self.get_messages().routing.find( + {'exchange': exchange} + ) + + return localRoutes | frozenset((r['routing_key'], + r['pattern'], + r['queue']) for r in brokerRoutes) + + def _queue_bind(self, exchange, routing_key, pattern, queue): + if self.typeof(exchange).type == 'fanout': + self.create_broadcast_cursor(exchange, routing_key, pattern, queue) + self._fanout_queues[queue] = exchange + + lookup = {'exchange': exchange, + 'queue': queue, + 'routing_key': routing_key, + 'pattern': pattern} + + data = lookup.copy() + + if self.ttl: + data['expire_at'] = self.get_expire(queue, 'x-expires') + + self.get_routing().update(lookup, data, upsert=True) + + def queue_delete(self, queue, **kwargs): + self.get_routing().remove({'queue': queue}) + + if self.ttl: + self.get_queues().remove({'_id': queue}) + + super(Channel, self).queue_delete(queue, **kwargs) + + if queue in self._fanout_queues: + try: + cursor = self._broadcast_cursors.pop(queue) + except KeyError: + pass + else: + cursor.close() + + self._fanout_queues.pop(queue) + + # Implementation details + def _parse_uri(self, scheme='mongodb://'): # See mongodb uri documentation: # http://docs.mongodb.org/manual/reference/connection-string/ @@ -266,60 +316,13 @@ class Channel(virtual.Channel): self.get_queues().ensure_index([('expire_at', 1)], expireAfterSeconds=0) - def get_table(self, exchange): - """Get table of bindings for ``exchange``.""" - # TODO Store a more complete exchange metatable in the - # routing collection - localRoutes = frozenset(self.state.exchanges[exchange]['table']) - brokerRoutes = self.get_messages().routing.find( - {'exchange': exchange} - ) - - return localRoutes | frozenset((r['routing_key'], - r['pattern'], - r['queue']) for r in brokerRoutes) - def _put_fanout(self, exchange, message, routing_key, **kwargs): """Deliver fanout message.""" self.get_broadcast().insert({'payload': dumps(message), 'queue': exchange}) - def _queue_bind(self, exchange, routing_key, pattern, queue): - if self.typeof(exchange).type == 'fanout': - self.create_broadcast_cursor(exchange, routing_key, pattern, queue) - self._fanout_queues[queue] = exchange - - lookup = {'exchange': exchange, - 'queue': queue, - 'routing_key': routing_key, - 'pattern': pattern} - - data = lookup.copy() - - if self.ttl: - data['expire_at'] = self.get_expire(queue, 'x-expires') - - self.get_routing().update(lookup, data, upsert=True) - - def queue_delete(self, queue, **kwargs): - self.get_routing().remove({'queue': queue}) - - if self.ttl: - self.get_queues().remove({'_id': queue}) - - super(Channel, self).queue_delete(queue, **kwargs) - - if queue in self._fanout_queues: - try: - cursor = self._broadcast_cursors.pop(queue) - except KeyError: - pass - else: - cursor.close() - - self._fanout_queues.pop(queue) - def _create_client(self): + '''Actualy creates connection''' self._open() self._ensure_indexes() |