summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-11-27 17:03:55 +0000
committerAsk Solem <ask@celeryproject.org>2011-11-27 17:03:55 +0000
commit196ba98d617a237b1cc41813f7f0b8f6a8de9cbf (patch)
tree4bfee90b67736027d8f8f55acd383cf408028304
parent2ee489777a333db223fea2da9764fa825af31c6d (diff)
downloadkombu-196ba98d617a237b1cc41813f7f0b8f6a8de9cbf.tar.gz
syn.blocking is useless, so remove usage of it
-rw-r--r--docs/reference/kombu.syn.rst4
-rw-r--r--kombu/entity.py65
-rw-r--r--kombu/messaging.py9
-rw-r--r--kombu/syn.py56
4 files changed, 40 insertions, 94 deletions
diff --git a/docs/reference/kombu.syn.rst b/docs/reference/kombu.syn.rst
index d3e1d23c..f5c650be 100644
--- a/docs/reference/kombu.syn.rst
+++ b/docs/reference/kombu.syn.rst
@@ -7,5 +7,5 @@
.. currentmodule:: kombu.syn
.. automodule:: kombu.syn
- :members:
- :undoc-members:
+
+ .. autofunction:: detect_environment
diff --git a/kombu/entity.py b/kombu/entity.py
index 08a12b9e..b7ab322e 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -9,7 +9,6 @@ Exchange and Queue declarations.
"""
from kombu.abstract import MaybeChannelBound
-from kombu.syn import blocking as _SYN
TRANSIENT_DELIVERY_MODE = 1
PERSISTENT_DELIVERY_MODE = 2
@@ -146,12 +145,12 @@ class Exchange(MaybeChannelBound):
response will not be waited for. Default is :const:`False`.
"""
- return _SYN(self.channel.exchange_declare, exchange=self.name,
- type=self.type,
- durable=self.durable,
- auto_delete=self.auto_delete,
- arguments=self.arguments,
- nowait=nowait)
+ return self.channel.exchange_declare(exchange=self.name,
+ type=self.type,
+ durable=self.durable,
+ auto_delete=self.auto_delete,
+ arguments=self.arguments,
+ nowait=nowait)
def Message(self, body, delivery_mode=None, priority=None,
content_type=None, content_encoding=None, properties=None,
@@ -220,9 +219,9 @@ class Exchange(MaybeChannelBound):
response will not be waited for. Default is :const:`False`.
"""
- return _SYN(self.channel.exchange_delete, exchange=self.name,
- if_unused=if_unused,
- nowait=nowait)
+ return self.channel.exchange_delete(exchange=self.name,
+ if_unused=if_unused,
+ nowait=nowait)
def __eq__(self, other):
if isinstance(other, Exchange):
@@ -392,13 +391,13 @@ class Queue(MaybeChannelBound):
without modifying the server state.
"""
- ret = _SYN(self.channel.queue_declare, queue=self.name,
- passive=passive,
- durable=self.durable,
- exclusive=self.exclusive,
- auto_delete=self.auto_delete,
- arguments=self.queue_arguments,
- nowait=nowait)
+ ret = self.channel.queue_declare(queue=self.name,
+ passive=passive,
+ durable=self.durable,
+ exclusive=self.exclusive,
+ auto_delete=self.auto_delete,
+ arguments=self.queue_arguments,
+ nowait=nowait)
if not self.name:
self.name = ret[0]
return ret
@@ -409,11 +408,11 @@ class Queue(MaybeChannelBound):
:keyword nowait: Do not wait for a reply.
"""
- return _SYN(self.channel.queue_bind, queue=self.name,
- exchange=self.exchange.name,
- routing_key=self.routing_key,
- arguments=self.binding_arguments,
- nowait=nowait)
+ return self.channel.queue_bind(queue=self.name,
+ exchange=self.exchange.name,
+ routing_key=self.routing_key,
+ arguments=self.binding_arguments,
+ nowait=nowait)
def get(self, no_ack=None):
"""Poll the server for a new message.
@@ -430,14 +429,14 @@ class Queue(MaybeChannelBound):
is more important than performance.
"""
- message = _SYN(self.channel.basic_get, queue=self.name, no_ack=no_ack)
+ message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
if message is not None:
return self.channel.message_to_python(message)
def purge(self, nowait=False):
"""Remove all messages from the queue."""
- return _SYN(self.channel.queue_purge, queue=self.name,
- nowait=nowait) or 0
+ return self.channel.queue_purge(queue=self.name,
+ nowait=nowait) or 0
def consume(self, consumer_tag='', callback=None, no_ack=None,
nowait=False):
@@ -484,17 +483,17 @@ class Queue(MaybeChannelBound):
:keyword nowait: Do not wait for a reply.
"""
- return _SYN(self.channel.queue_delete, queue=self.name,
- if_unused=if_unused,
- if_empty=if_empty,
- nowait=nowait)
+ return self.channel.queue_delete(queue=self.name,
+ if_unused=if_unused,
+ if_empty=if_empty,
+ nowait=nowait)
def unbind(self):
"""Delete the binding on the server."""
- return _SYN(self.channel.queue_unbind, queue=self.name,
- exchange=self.exchange.name,
- routing_key=self.routing_key,
- arguments=self.binding_arguments)
+ return self.channel.queue_unbind(queue=self.name,
+ exchange=self.exchange.name,
+ routing_key=self.routing_key,
+ arguments=self.binding_arguments)
def __eq__(self, other):
if isinstance(other, Queue):
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 0d81e7a5..e9207e4d 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -13,7 +13,6 @@ from itertools import count
from kombu import entity
from kombu.compression import compress
from kombu.serialization import encode
-from kombu.syn import blocking as _SYN
from kombu.utils import maybe_list
@@ -383,9 +382,9 @@ class Consumer(object):
Currently not supported by RabbitMQ.
"""
- return _SYN(self.channel.basic_qos, prefetch_size,
- prefetch_count,
- apply_global)
+ return self.channel.basic_qos(prefetch_size,
+ prefetch_count,
+ apply_global)
def recover(self, requeue=False):
"""Redeliver unacknowledged messages.
@@ -399,7 +398,7 @@ class Consumer(object):
delivering it to an alternative subscriber.
"""
- return _SYN(self.channel.basic_recover, requeue=requeue)
+ return self.channel.basic_recover(requeue=requeue)
def receive(self, body, message):
"""Method called when a message is received.
diff --git a/kombu/syn.py b/kombu/syn.py
index a80801aa..f2da7b12 100644
--- a/kombu/syn.py
+++ b/kombu/syn.py
@@ -1,64 +1,12 @@
-"""
-kombu.syn
-=========
-
-Thread synchronization.
-
-:copyright: (c) 2009 - 2011 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
-"""
import sys
-#: current blocking method
-__sync_current = None
-
def blocking(fun, *args, **kwargs):
- """Make sure function is called by blocking and waiting for the result,
- even if we're currently in a monkey patched eventlet/gevent
- environment."""
- if __sync_current is None:
- select_blocking_method(detect_environment())
- return __sync_current(fun, *args, **kwargs)
+ return fun(*args, **kwargs)
def select_blocking_method(type):
- """Select blocking method, where `type` is one of default
- gevent or eventlet."""
- global __sync_current
- __sync_current = {"eventlet": _sync_eventlet,
- "gevent": _sync_gevent,
- "default": _sync_default}[type]()
-
-
-def _sync_default():
- """Create blocking primitive."""
-
- def __blocking__(fun, *args, **kwargs):
- return fun(*args, **kwargs)
-
- return __blocking__
-
-
-def _sync_eventlet():
- """Create Eventlet blocking primitive."""
- from eventlet import spawn
-
- def __eblocking__(fun, *args, **kwargs):
- return spawn(fun, *args, **kwargs).wait()
-
- return __eblocking__
-
-
-def _sync_gevent():
- """Create gevent blocking primitive."""
- from gevent import Greenlet
-
- def __gblocking__(fun, *args, **kwargs):
- return Greenlet.spawn(fun, *args, **kwargs).get()
-
- return __gblocking__
+ pass
def detect_environment():