diff options
author | Ask Solem <ask@celeryproject.org> | 2011-11-27 17:03:55 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2011-11-27 17:03:55 +0000 |
commit | 196ba98d617a237b1cc41813f7f0b8f6a8de9cbf (patch) | |
tree | 4bfee90b67736027d8f8f55acd383cf408028304 | |
parent | 2ee489777a333db223fea2da9764fa825af31c6d (diff) | |
download | kombu-196ba98d617a237b1cc41813f7f0b8f6a8de9cbf.tar.gz |
syn.blocking is useless, so remove usage of it
-rw-r--r-- | docs/reference/kombu.syn.rst | 4 | ||||
-rw-r--r-- | kombu/entity.py | 65 | ||||
-rw-r--r-- | kombu/messaging.py | 9 | ||||
-rw-r--r-- | kombu/syn.py | 56 |
4 files changed, 40 insertions, 94 deletions
diff --git a/docs/reference/kombu.syn.rst b/docs/reference/kombu.syn.rst index d3e1d23c..f5c650be 100644 --- a/docs/reference/kombu.syn.rst +++ b/docs/reference/kombu.syn.rst @@ -7,5 +7,5 @@ .. currentmodule:: kombu.syn .. automodule:: kombu.syn - :members: - :undoc-members: + + .. autofunction:: detect_environment diff --git a/kombu/entity.py b/kombu/entity.py index 08a12b9e..b7ab322e 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -9,7 +9,6 @@ Exchange and Queue declarations. """ from kombu.abstract import MaybeChannelBound -from kombu.syn import blocking as _SYN TRANSIENT_DELIVERY_MODE = 1 PERSISTENT_DELIVERY_MODE = 2 @@ -146,12 +145,12 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - return _SYN(self.channel.exchange_declare, exchange=self.name, - type=self.type, - durable=self.durable, - auto_delete=self.auto_delete, - arguments=self.arguments, - nowait=nowait) + return self.channel.exchange_declare(exchange=self.name, + type=self.type, + durable=self.durable, + auto_delete=self.auto_delete, + arguments=self.arguments, + nowait=nowait) def Message(self, body, delivery_mode=None, priority=None, content_type=None, content_encoding=None, properties=None, @@ -220,9 +219,9 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - return _SYN(self.channel.exchange_delete, exchange=self.name, - if_unused=if_unused, - nowait=nowait) + return self.channel.exchange_delete(exchange=self.name, + if_unused=if_unused, + nowait=nowait) def __eq__(self, other): if isinstance(other, Exchange): @@ -392,13 +391,13 @@ class Queue(MaybeChannelBound): without modifying the server state. """ - ret = _SYN(self.channel.queue_declare, queue=self.name, - passive=passive, - durable=self.durable, - exclusive=self.exclusive, - auto_delete=self.auto_delete, - arguments=self.queue_arguments, - nowait=nowait) + ret = self.channel.queue_declare(queue=self.name, + passive=passive, + durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete, + arguments=self.queue_arguments, + nowait=nowait) if not self.name: self.name = ret[0] return ret @@ -409,11 +408,11 @@ class Queue(MaybeChannelBound): :keyword nowait: Do not wait for a reply. """ - return _SYN(self.channel.queue_bind, queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments, - nowait=nowait) + return self.channel.queue_bind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key, + arguments=self.binding_arguments, + nowait=nowait) def get(self, no_ack=None): """Poll the server for a new message. @@ -430,14 +429,14 @@ class Queue(MaybeChannelBound): is more important than performance. """ - message = _SYN(self.channel.basic_get, queue=self.name, no_ack=no_ack) + message = self.channel.basic_get(queue=self.name, no_ack=no_ack) if message is not None: return self.channel.message_to_python(message) def purge(self, nowait=False): """Remove all messages from the queue.""" - return _SYN(self.channel.queue_purge, queue=self.name, - nowait=nowait) or 0 + return self.channel.queue_purge(queue=self.name, + nowait=nowait) or 0 def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False): @@ -484,17 +483,17 @@ class Queue(MaybeChannelBound): :keyword nowait: Do not wait for a reply. """ - return _SYN(self.channel.queue_delete, queue=self.name, - if_unused=if_unused, - if_empty=if_empty, - nowait=nowait) + return self.channel.queue_delete(queue=self.name, + if_unused=if_unused, + if_empty=if_empty, + nowait=nowait) def unbind(self): """Delete the binding on the server.""" - return _SYN(self.channel.queue_unbind, queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments) + return self.channel.queue_unbind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key, + arguments=self.binding_arguments) def __eq__(self, other): if isinstance(other, Queue): diff --git a/kombu/messaging.py b/kombu/messaging.py index 0d81e7a5..e9207e4d 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -13,7 +13,6 @@ from itertools import count from kombu import entity from kombu.compression import compress from kombu.serialization import encode -from kombu.syn import blocking as _SYN from kombu.utils import maybe_list @@ -383,9 +382,9 @@ class Consumer(object): Currently not supported by RabbitMQ. """ - return _SYN(self.channel.basic_qos, prefetch_size, - prefetch_count, - apply_global) + return self.channel.basic_qos(prefetch_size, + prefetch_count, + apply_global) def recover(self, requeue=False): """Redeliver unacknowledged messages. @@ -399,7 +398,7 @@ class Consumer(object): delivering it to an alternative subscriber. """ - return _SYN(self.channel.basic_recover, requeue=requeue) + return self.channel.basic_recover(requeue=requeue) def receive(self, body, message): """Method called when a message is received. diff --git a/kombu/syn.py b/kombu/syn.py index a80801aa..f2da7b12 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -1,64 +1,12 @@ -""" -kombu.syn -========= - -Thread synchronization. - -:copyright: (c) 2009 - 2011 by Ask Solem. -:license: BSD, see LICENSE for more details. - -""" import sys -#: current blocking method -__sync_current = None - def blocking(fun, *args, **kwargs): - """Make sure function is called by blocking and waiting for the result, - even if we're currently in a monkey patched eventlet/gevent - environment.""" - if __sync_current is None: - select_blocking_method(detect_environment()) - return __sync_current(fun, *args, **kwargs) + return fun(*args, **kwargs) def select_blocking_method(type): - """Select blocking method, where `type` is one of default - gevent or eventlet.""" - global __sync_current - __sync_current = {"eventlet": _sync_eventlet, - "gevent": _sync_gevent, - "default": _sync_default}[type]() - - -def _sync_default(): - """Create blocking primitive.""" - - def __blocking__(fun, *args, **kwargs): - return fun(*args, **kwargs) - - return __blocking__ - - -def _sync_eventlet(): - """Create Eventlet blocking primitive.""" - from eventlet import spawn - - def __eblocking__(fun, *args, **kwargs): - return spawn(fun, *args, **kwargs).wait() - - return __eblocking__ - - -def _sync_gevent(): - """Create gevent blocking primitive.""" - from gevent import Greenlet - - def __gblocking__(fun, *args, **kwargs): - return Greenlet.spawn(fun, *args, **kwargs).get() - - return __gblocking__ + pass def detect_environment(): |