summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakub Stasiak <jakub@stasiak.at>2014-10-05 21:25:46 +0100
committerJakub Stasiak <jakub@stasiak.at>2014-10-10 08:53:11 +0100
commit75a1ab8cafb2bd3aa9ab96503b54fbddb068f58d (patch)
tree703c1c3837ae70d0f7ce3790d6dcf94edc339026
parent5f5c137676044f0b3d9459a6d487ec80eac14b26 (diff)
downloadeventlet-75a1ab8cafb2bd3aa9ab96503b54fbddb068f58d.tar.gz
Remove most of the deprecated code
Closes GH #144
-rw-r--r--NEWS5
-rw-r--r--doc/ssl.rst6
-rw-r--r--eventlet/api.py224
-rw-r--r--eventlet/coros.py272
-rw-r--r--eventlet/green/_socket_nodns.py16
-rw-r--r--eventlet/pool.py321
-rw-r--r--eventlet/proc.py739
-rw-r--r--eventlet/processes.py169
-rw-r--r--eventlet/twistedutil/__init__.py2
-rw-r--r--eventlet/twistedutil/protocol.py2
-rw-r--r--eventlet/util.py104
-rw-r--r--tests/api_test.py73
-rw-r--r--tests/processes_test.py110
-rw-r--r--tests/ssl_test.py28
-rw-r--r--tests/stdlib/test_thread__boundedsem.py6
-rw-r--r--tests/test__coros_queue.py259
-rw-r--r--tests/test__event.py2
-rw-r--r--tests/test__pool.py321
-rw-r--r--tests/test__proc.py401
-rw-r--r--tests/test__socket_errors.py1
-rw-r--r--tests/test__twistedutil_protocol.py2
-rw-r--r--tox.ini3
22 files changed, 66 insertions, 3000 deletions
diff --git a/NEWS b/NEWS
index 1ff4aaa..a25e4ab 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,8 @@
+0.16.0 (not released yet)
+=========================
+
+* removed deprecated modules: api, most of coros, pool, proc, processes and util
+
0.15.2
======
greenio: fixed memory leak, introduced in 0.15.1; Thanks to Michael Kerrin, Tushar Gohad
diff --git a/doc/ssl.rst b/doc/ssl.rst
index 0d47364..225ce56 100644
--- a/doc/ssl.rst
+++ b/doc/ssl.rst
@@ -6,8 +6,8 @@ Eventlet makes it easy to use non-blocking SSL sockets. If you're using Python
In either case, the the ``green`` modules handle SSL sockets transparently, just like their standard counterparts. As an example, :mod:`eventlet.green.urllib2` can be used to fetch https urls in as non-blocking a fashion as you please::
from eventlet.green import urllib2
- from eventlet import coros
- bodies = [coros.execute(urllib2.urlopen, url)
+ from eventlet import spawn
+ bodies = [spawn(urllib2.urlopen, url)
for url in ("https://secondlife.com","https://google.com")]
for b in bodies:
print(b.wait().read())
@@ -55,4 +55,4 @@ Here's an example of a server::
client_conn.close()
connection.close()
-.. _pyOpenSSL: https://launchpad.net/pyopenssl \ No newline at end of file
+.. _pyOpenSSL: https://launchpad.net/pyopenssl
diff --git a/eventlet/api.py b/eventlet/api.py
deleted file mode 100644
index ca1c1ba..0000000
--- a/eventlet/api.py
+++ /dev/null
@@ -1,224 +0,0 @@
-import errno
-import sys
-import socket
-import string
-import linecache
-import inspect
-import warnings
-
-from eventlet.support import greenlets as greenlet
-from eventlet import hubs
-from eventlet import greenthread
-from eventlet import debug
-from eventlet import Timeout
-
-__all__ = [
- 'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub',
- 'GreenletExit', 'kill', 'sleep', 'spawn', 'spew', 'switch',
- 'ssl_listener', 'tcp_listener', 'trampoline',
- 'unspew', 'use_hub', 'with_timeout', 'timeout']
-
-warnings.warn(
- "eventlet.api is deprecated! Nearly everything in it has moved "
- "to the eventlet module.", DeprecationWarning, stacklevel=2)
-
-
-def get_hub(*a, **kw):
- warnings.warn(
- "eventlet.api.get_hub has moved to eventlet.hubs.get_hub",
- DeprecationWarning, stacklevel=2)
- return hubs.get_hub(*a, **kw)
-
-
-def get_default_hub(*a, **kw):
- warnings.warn(
- "eventlet.api.get_default_hub has moved to"
- " eventlet.hubs.get_default_hub",
- DeprecationWarning, stacklevel=2)
- return hubs.get_default_hub(*a, **kw)
-
-
-def use_hub(*a, **kw):
- warnings.warn(
- "eventlet.api.use_hub has moved to eventlet.hubs.use_hub",
- DeprecationWarning, stacklevel=2)
- return hubs.use_hub(*a, **kw)
-
-
-def switch(coro, result=None, exc=None):
- if exc is not None:
- return coro.throw(exc)
- return coro.switch(result)
-
-Greenlet = greenlet.greenlet
-
-
-def tcp_listener(address, backlog=50):
- """
- Listen on the given ``(ip, port)`` *address* with a TCP socket. Returns a
- socket object on which one should call ``accept()`` to accept a connection
- on the newly bound socket.
- """
- warnings.warn(
- """eventlet.api.tcp_listener is deprecated. Please use eventlet.listen instead.""",
- DeprecationWarning, stacklevel=2)
-
- from eventlet import greenio, util
- socket = greenio.GreenSocket(util.tcp_socket())
- util.socket_bind_and_listen(socket, address, backlog=backlog)
- return socket
-
-
-def ssl_listener(address, certificate, private_key):
- """Listen on the given (ip, port) *address* with a TCP socket that
- can do SSL. Primarily useful for unit tests, don't use in production.
-
- *certificate* and *private_key* should be the filenames of the appropriate
- certificate and private key files to use with the SSL socket.
-
- Returns a socket object on which one should call ``accept()`` to
- accept a connection on the newly bound socket.
- """
- warnings.warn("""eventlet.api.ssl_listener is deprecated. Please use eventlet.wrap_ssl(eventlet.listen(
- )) instead.""",
- DeprecationWarning, stacklevel=2)
- from eventlet import util
- import socket
-
- socket = util.wrap_ssl(socket.socket(), certificate, private_key, True)
- socket.bind(address)
- socket.listen(50)
- return socket
-
-
-def connect_tcp(address, localaddr=None):
- """
- Create a TCP connection to address ``(host, port)`` and return the socket.
- Optionally, bind to localaddr ``(host, port)`` first.
- """
- warnings.warn(
- """eventlet.api.connect_tcp is deprecated. Please use eventlet.connect instead.""",
- DeprecationWarning, stacklevel=2)
-
- from eventlet import greenio, util
- desc = greenio.GreenSocket(util.tcp_socket())
- if localaddr is not None:
- desc.bind(localaddr)
- desc.connect(address)
- return desc
-
-TimeoutError = greenthread.TimeoutError
-
-trampoline = hubs.trampoline
-
-spawn = greenthread.spawn
-spawn_n = greenthread.spawn_n
-
-
-kill = greenthread.kill
-
-call_after = greenthread.call_after
-call_after_local = greenthread.call_after_local
-call_after_global = greenthread.call_after_global
-
-
-class _SilentException(BaseException):
- pass
-
-
-class FakeTimer(object):
- def cancel(self):
- pass
-
-
-class timeout(object):
- """Raise an exception in the block after timeout.
-
- Example::
-
- with timeout(10):
- urllib2.open('http://example.com')
-
- Assuming code block is yielding (i.e. gives up control to the hub),
- an exception provided in *exc* argument will be raised
- (:class:`~eventlet.api.TimeoutError` if *exc* is omitted)::
-
- try:
- with timeout(10, MySpecialError, error_arg_1):
- urllib2.open('http://example.com')
- except MySpecialError as e:
- print("special error received")
-
- When *exc* is ``None``, code block is interrupted silently.
- """
-
- def __init__(self, seconds, *throw_args):
- self.seconds = seconds
- if seconds is None:
- return
- if not throw_args:
- self.throw_args = (TimeoutError(), )
- elif throw_args == (None, ):
- self.throw_args = (_SilentException(), )
- else:
- self.throw_args = throw_args
-
- def __enter__(self):
- if self.seconds is None:
- self.timer = FakeTimer()
- else:
- self.timer = exc_after(self.seconds, *self.throw_args)
- return self.timer
-
- def __exit__(self, typ, value, tb):
- self.timer.cancel()
- if typ is _SilentException and value in self.throw_args:
- return True
-
-
-with_timeout = greenthread.with_timeout
-
-exc_after = greenthread.exc_after
-
-sleep = greenthread.sleep
-
-getcurrent = greenlet.getcurrent
-GreenletExit = greenlet.GreenletExit
-
-spew = debug.spew
-unspew = debug.unspew
-
-
-def named(name):
- """Return an object given its name.
-
- The name uses a module-like syntax, eg::
-
- os.path.join
-
- or::
-
- mulib.mu.Resource
- """
- toimport = name
- obj = None
- import_err_strings = []
- while toimport:
- try:
- obj = __import__(toimport)
- break
- except ImportError as err:
- # print('Import error on %s: %s' % (toimport, err)) # debugging spam
- import_err_strings.append(err.__str__())
- toimport = '.'.join(toimport.split('.')[:-1])
- if obj is None:
- raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings))
- for seg in name.split('.')[1:]:
- try:
- obj = getattr(obj, seg)
- except AttributeError:
- dirobj = dir(obj)
- dirobj.sort()
- raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % (
- seg, obj, dirobj, name, import_err_strings))
- return obj
diff --git a/eventlet/coros.py b/eventlet/coros.py
index 7407ad5..431e6f0 100644
--- a/eventlet/coros.py
+++ b/eventlet/coros.py
@@ -1,78 +1,21 @@
from __future__ import print_function
-import collections
-import traceback
-import warnings
-
-import eventlet
from eventlet import event as _event
-from eventlet import hubs
-from eventlet import greenthread
-from eventlet import semaphore as semaphoremod
-
-
-class NOT_USED:
- def __repr__(self):
- return 'NOT_USED'
-
-NOT_USED = NOT_USED()
-
-
-def Event(*a, **kw):
- warnings.warn("The Event class has been moved to the event module! "
- "Please construct event.Event objects instead.",
- DeprecationWarning, stacklevel=2)
- return _event.Event(*a, **kw)
-
-
-def event(*a, **kw):
- warnings.warn(
- "The event class has been capitalized and moved! Please "
- "construct event.Event objects instead.",
- DeprecationWarning, stacklevel=2)
- return _event.Event(*a, **kw)
-
-
-def Semaphore(count):
- warnings.warn(
- "The Semaphore class has moved! Please "
- "use semaphore.Semaphore instead.",
- DeprecationWarning, stacklevel=2)
- return semaphoremod.Semaphore(count)
-
-
-def BoundedSemaphore(count):
- warnings.warn(
- "The BoundedSemaphore class has moved! Please "
- "use semaphore.BoundedSemaphore instead.",
- DeprecationWarning, stacklevel=2)
- return semaphoremod.BoundedSemaphore(count)
-
-
-def semaphore(count=0, limit=None):
- warnings.warn(
- "coros.semaphore is deprecated. Please use either "
- "semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
- DeprecationWarning, stacklevel=2)
- if limit is None:
- return Semaphore(count)
- else:
- return BoundedSemaphore(count)
class metaphore(object):
"""This is sort of an inverse semaphore: a counter that starts at 0 and
waits only if nonzero. It's used to implement a "wait for all" scenario.
- >>> from eventlet import api, coros
+ >>> from eventlet import coros, spawn_n
>>> count = coros.metaphore()
>>> count.wait()
>>> def decrementer(count, id):
... print("{0} decrementing".format(id))
... count.dec()
...
- >>> _ = eventlet.spawn(decrementer, count, 'A')
- >>> _ = eventlet.spawn(decrementer, count, 'B')
+ >>> _ = spawn_n(decrementer, count, 'A')
+ >>> _ = spawn_n(decrementer, count, 'B')
>>> count.inc(2)
>>> count.wait()
A decrementing
@@ -116,212 +59,3 @@ class metaphore(object):
resume the caller once the count decrements to zero again.
"""
self.event.wait()
-
-
-def execute(func, *args, **kw):
- """ Executes an operation asynchronously in a new coroutine, returning
- an event to retrieve the return value.
-
- This has the same api as the :meth:`eventlet.coros.CoroutinePool.execute`
- method; the only difference is that this one creates a new coroutine
- instead of drawing from a pool.
-
- >>> from eventlet import coros
- >>> evt = coros.execute(lambda a: ('foo', a), 1)
- >>> evt.wait()
- ('foo', 1)
- """
- warnings.warn(
- "Coros.execute is deprecated. Please use eventlet.spawn "
- "instead.", DeprecationWarning, stacklevel=2)
- return greenthread.spawn(func, *args, **kw)
-
-
-def CoroutinePool(*args, **kwargs):
- warnings.warn(
- "CoroutinePool is deprecated. Please use "
- "eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2)
- from eventlet.pool import Pool
- return Pool(*args, **kwargs)
-
-
-class Queue(object):
-
- def __init__(self):
- warnings.warn(
- "coros.Queue is deprecated. Please use "
- "eventlet.queue.Queue instead.",
- DeprecationWarning, stacklevel=2)
- self.items = collections.deque()
- self._waiters = set()
-
- def __nonzero__(self):
- return len(self.items) > 0
-
- __bool__ = __nonzero__
-
- def __len__(self):
- return len(self.items)
-
- def __repr__(self):
- params = (self.__class__.__name__, hex(id(self)),
- len(self.items), len(self._waiters))
- return '<%s at %s items[%d] _waiters[%s]>' % params
-
- def send(self, result=None, exc=None):
- if exc is not None and not isinstance(exc, tuple):
- exc = (exc, )
- self.items.append((result, exc))
- if self._waiters:
- hubs.get_hub().schedule_call_global(0, self._do_send)
-
- def send_exception(self, *args):
- # the arguments are the same as for greenlet.throw
- return self.send(exc=args)
-
- def _do_send(self):
- if self._waiters and self.items:
- waiter = self._waiters.pop()
- result, exc = self.items.popleft()
- waiter.switch((result, exc))
-
- def wait(self):
- if self.items:
- result, exc = self.items.popleft()
- if exc is None:
- return result
- else:
- eventlet.getcurrent().throw(*exc)
- else:
- self._waiters.add(eventlet.getcurrent())
- try:
- result, exc = hubs.get_hub().switch()
- if exc is None:
- return result
- else:
- eventlet.getcurrent().throw(*exc)
- finally:
- self._waiters.discard(eventlet.getcurrent())
-
- def ready(self):
- return len(self.items) > 0
-
- def full(self):
- # for consistency with Channel
- return False
-
- def waiting(self):
- return len(self._waiters)
-
- def __iter__(self):
- return self
-
- def next(self):
- return self.wait()
-
-
-class Channel(object):
-
- def __init__(self, max_size=0):
- warnings.warn(
- "coros.Channel is deprecated. Please use "
- "eventlet.queue.Queue(0) instead.",
- DeprecationWarning, stacklevel=2)
- self.max_size = max_size
- self.items = collections.deque()
- self._waiters = set()
- self._senders = set()
-
- def __nonzero__(self):
- return len(self.items) > 0
-
- __bool__ = __nonzero__
-
- def __len__(self):
- return len(self.items)
-
- def __repr__(self):
- params = (self.__class__.__name__, hex(id(self)),
- self.max_size, len(self.items),
- len(self._waiters), len(self._senders))
- return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params
-
- def send(self, result=None, exc=None):
- if exc is not None and not isinstance(exc, tuple):
- exc = (exc, )
- if eventlet.getcurrent() is hubs.get_hub().greenlet:
- self.items.append((result, exc))
- if self._waiters:
- hubs.get_hub().schedule_call_global(0, self._do_switch)
- else:
- self.items.append((result, exc))
- # note that send() does not work well with timeouts. if your timeout fires
- # after this point, the item will remain in the queue
- if self._waiters:
- hubs.get_hub().schedule_call_global(0, self._do_switch)
- if len(self.items) > self.max_size:
- self._senders.add(eventlet.getcurrent())
- try:
- hubs.get_hub().switch()
- finally:
- self._senders.discard(eventlet.getcurrent())
-
- def send_exception(self, *args):
- # the arguments are the same as for greenlet.throw
- return self.send(exc=args)
-
- def _do_switch(self):
- while True:
- if self._waiters and self.items:
- waiter = self._waiters.pop()
- result, exc = self.items.popleft()
- try:
- waiter.switch((result, exc))
- except:
- traceback.print_exc()
- elif self._senders and len(self.items) <= self.max_size:
- sender = self._senders.pop()
- try:
- sender.switch()
- except:
- traceback.print_exc()
- else:
- break
-
- def wait(self):
- if self.items:
- result, exc = self.items.popleft()
- if len(self.items) <= self.max_size:
- hubs.get_hub().schedule_call_global(0, self._do_switch)
- if exc is None:
- return result
- else:
- eventlet.getcurrent().throw(*exc)
- else:
- if self._senders:
- hubs.get_hub().schedule_call_global(0, self._do_switch)
- self._waiters.add(eventlet.getcurrent())
- try:
- result, exc = hubs.get_hub().switch()
- if exc is None:
- return result
- else:
- eventlet.getcurrent().throw(*exc)
- finally:
- self._waiters.discard(eventlet.getcurrent())
-
- def ready(self):
- return len(self.items) > 0
-
- def full(self):
- return len(self.items) >= self.max_size
-
- def waiting(self):
- return max(0, len(self._waiters) - len(self.items))
-
-
-def queue(max_size=None):
- if max_size is None:
- return Queue()
- else:
- return Channel(max_size)
diff --git a/eventlet/green/_socket_nodns.py b/eventlet/green/_socket_nodns.py
index df83737..373c140 100644
--- a/eventlet/green/_socket_nodns.py
+++ b/eventlet/green/_socket_nodns.py
@@ -9,7 +9,6 @@ slurp_properties(__socket, globals(),
os = __import__('os')
import sys
-import warnings
from eventlet.hubs import get_hub
from eventlet.greenio import GreenSocket as socket
from eventlet.greenio import SSL as _SSL # for exceptions
@@ -86,18 +85,3 @@ class GreenSSLObject(object):
for debugging purposes; do not parse the content of this string because its
format can't be parsed unambiguously."""
return str(self.connection.get_peer_certificate().get_issuer())
-
-
-try:
- from eventlet.green import ssl as ssl_module
- sslerror = __socket.sslerror
- __socket.ssl
-except AttributeError:
- # if the real socket module doesn't have the ssl method or sslerror
- # exception, we can't emulate them
- pass
-else:
- def ssl(sock, certificate=None, private_key=None):
- warnings.warn("socket.ssl() is deprecated. Use ssl.wrap_socket() instead.",
- DeprecationWarning, stacklevel=2)
- return ssl_module.sslwrap_simple(sock, private_key, certificate)
diff --git a/eventlet/pool.py b/eventlet/pool.py
deleted file mode 100644
index be9db8f..0000000
--- a/eventlet/pool.py
+++ /dev/null
@@ -1,321 +0,0 @@
-from __future__ import print_function
-
-from eventlet import coros, proc, api
-from eventlet.semaphore import Semaphore
-from eventlet.support import six
-
-import warnings
-warnings.warn(
- "The pool module is deprecated. Please use the "
- "eventlet.GreenPool and eventlet.GreenPile classes instead.",
- DeprecationWarning, stacklevel=2)
-
-
-class Pool(object):
- def __init__(self, min_size=0, max_size=4, track_events=False):
- if min_size > max_size:
- raise ValueError('min_size cannot be bigger than max_size')
- self.max_size = max_size
- self.sem = Semaphore(max_size)
- self.procs = proc.RunningProcSet()
- if track_events:
- self.results = coros.queue()
- else:
- self.results = None
-
- def resize(self, new_max_size):
- """ Change the :attr:`max_size` of the pool.
-
- If the pool gets resized when there are more than *new_max_size*
- coroutines checked out, when they are returned to the pool they will be
- discarded. The return value of :meth:`free` will be negative in this
- situation.
- """
- max_size_delta = new_max_size - self.max_size
- self.sem.counter += max_size_delta
- self.max_size = new_max_size
-
- @property
- def current_size(self):
- """ The number of coroutines that are currently executing jobs. """
- return len(self.procs)
-
- def free(self):
- """ Returns the number of coroutines that are available for doing
- work."""
- return self.sem.counter
-
- def execute(self, func, *args, **kwargs):
- """Execute func in one of the coroutines maintained
- by the pool, when one is free.
-
- Immediately returns a :class:`~eventlet.proc.Proc` object which can be
- queried for the func's result.
-
- >>> pool = Pool()
- >>> task = pool.execute(lambda a: ('foo', a), 1)
- >>> task.wait()
- ('foo', 1)
- """
- # if reentering an empty pool, don't try to wait on a coroutine freeing
- # itself -- instead, just execute in the current coroutine
- if self.sem.locked() and api.getcurrent() in self.procs:
- p = proc.spawn(func, *args, **kwargs)
- try:
- p.wait()
- except:
- pass
- else:
- self.sem.acquire()
- p = self.procs.spawn(func, *args, **kwargs)
- # assuming the above line cannot raise
- p.link(lambda p: self.sem.release())
- if self.results is not None:
- p.link(self.results)
- return p
-
- execute_async = execute
-
- def _execute(self, evt, func, args, kw):
- p = self.execute(func, *args, **kw)
- p.link(evt)
- return p
-
- def waitall(self):
- """ Calling this function blocks until every coroutine
- completes its work (i.e. there are 0 running coroutines)."""
- return self.procs.waitall()
-
- wait_all = waitall
-
- def wait(self):
- """Wait for the next execute in the pool to complete,
- and return the result."""
- return self.results.wait()
-
- def waiting(self):
- """Return the number of coroutines waiting to execute.
- """
- if self.sem.balance < 0:
- return -self.sem.balance
- else:
- return 0
-
- def killall(self):
- """ Kill every running coroutine as immediately as possible."""
- return self.procs.killall()
-
- def launch_all(self, function, iterable):
- """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
- in its own coroutine -- like ``itertools.starmap()``, but in parallel.
- Discard values returned by ``function()``. You should call
- ``wait_all()`` to wait for all coroutines, newly-launched plus any
- previously-submitted :meth:`execute` or :meth:`execute_async` calls, to
- complete.
-
- >>> pool = Pool()
- >>> def saw(x):
- ... print("I saw %s!" % x)
- ...
- >>> pool.launch_all(saw, "ABC")
- >>> pool.wait_all()
- I saw A!
- I saw B!
- I saw C!
- """
- for tup in iterable:
- self.execute(function, *tup)
-
- def process_all(self, function, iterable):
- """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
- in its own coroutine -- like ``itertools.starmap()``, but in parallel.
- Discard values returned by ``function()``. Don't return until all
- coroutines, newly-launched plus any previously-submitted :meth:`execute()`
- or :meth:`execute_async` calls, have completed.
-
- >>> from eventlet import coros
- >>> pool = coros.CoroutinePool()
- >>> def saw(x): print("I saw %s!" % x)
- ...
- >>> pool.process_all(saw, "DEF")
- I saw D!
- I saw E!
- I saw F!
- """
- self.launch_all(function, iterable)
- self.wait_all()
-
- def generate_results(self, function, iterable, qsize=None):
- """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
- in its own coroutine -- like ``itertools.starmap()``, but in parallel.
- Yield each of the values returned by ``function()``, in the order
- they're completed rather than the order the coroutines were launched.
-
- Iteration stops when we've yielded results for each arguments tuple in
- *iterable*. Unlike :meth:`wait_all` and :meth:`process_all`, this
- function does not wait for any previously-submitted :meth:`execute` or
- :meth:`execute_async` calls.
-
- Results are temporarily buffered in a queue. If you pass *qsize=*, this
- value is used to limit the max size of the queue: an attempt to buffer
- too many results will suspend the completed :class:`CoroutinePool`
- coroutine until the requesting coroutine (the caller of
- :meth:`generate_results`) has retrieved one or more results by calling
- this generator-iterator's ``next()``.
-
- If any coroutine raises an uncaught exception, that exception will
- propagate to the requesting coroutine via the corresponding ``next()``
- call.
-
- What I particularly want these tests to illustrate is that using this
- generator function::
-
- for result in generate_results(function, iterable):
- # ... do something with result ...
- pass
-
- executes coroutines at least as aggressively as the classic eventlet
- idiom::
-
- events = [pool.execute(function, *args) for args in iterable]
- for event in events:
- result = event.wait()
- # ... do something with result ...
-
- even without a distinct event object for every arg tuple in *iterable*,
- and despite the funny flow control from interleaving launches of new
- coroutines with yields of completed coroutines' results.
-
- (The use case that makes this function preferable to the classic idiom
- above is when the *iterable*, which may itself be a generator, produces
- millions of items.)
-
- >>> from eventlet import coros
- >>> from eventlet.support import six
- >>> import string
- >>> pool = coros.CoroutinePool(max_size=5)
- >>> pausers = [coros.Event() for x in range(2)]
- >>> def longtask(evt, desc):
- ... print("%s woke up with %s" % (desc, evt.wait()))
- ...
- >>> pool.launch_all(longtask, zip(pausers, "AB"))
- >>> def quicktask(desc):
- ... print("returning %s" % desc)
- ... return desc
- ...
-
- (Instead of using a ``for`` loop, step through :meth:`generate_results`
- items individually to illustrate timing)
-
- >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
- >>> print(six.next(step))
- returning a
- returning b
- returning c
- a
- >>> print(six.next(step))
- b
- >>> print(six.next(step))
- c
- >>> print(six.next(step))
- returning d
- returning e
- returning f
- d
- >>> pausers[0].send("A")
- >>> print(six.next(step))
- e
- >>> print(six.next(step))
- f
- >>> print(six.next(step))
- A woke up with A
- returning g
- returning h
- returning i
- g
- >>> print("".join([six.next(step) for x in range(3)]))
- returning j
- returning k
- returning l
- returning m
- hij
- >>> pausers[1].send("B")
- >>> print("".join([six.next(step) for x in range(4)]))
- B woke up with B
- returning n
- returning o
- returning p
- returning q
- klmn
- """
- # Get an iterator because of our funny nested loop below. Wrap the
- # iterable in enumerate() so we count items that come through.
- tuples = iter(enumerate(iterable))
- # If the iterable is empty, this whole function is a no-op, and we can
- # save ourselves some grief by just quitting out. In particular, once
- # we enter the outer loop below, we're going to wait on the queue --
- # but if we launched no coroutines with that queue as the destination,
- # we could end up waiting a very long time.
- try:
- index, args = six.next(tuples)
- except StopIteration:
- return
- # From this point forward, 'args' is the current arguments tuple and
- # 'index+1' counts how many such tuples we've seen.
- # This implementation relies on the fact that _execute() accepts an
- # event-like object, and -- unless it's None -- the completed
- # coroutine calls send(result). We slyly pass a queue rather than an
- # event -- the same queue instance for all coroutines. This is why our
- # queue interface intentionally resembles the event interface.
- q = coros.queue(max_size=qsize)
- # How many results have we yielded so far?
- finished = 0
- # This first loop is only until we've launched all the coroutines. Its
- # complexity is because if iterable contains more args tuples than the
- # size of our pool, attempting to _execute() the (poolsize+1)th
- # coroutine would suspend until something completes and send()s its
- # result to our queue. But to keep down queue overhead and to maximize
- # responsiveness to our caller, we'd rather suspend on reading the
- # queue. So we stuff the pool as full as we can, then wait for
- # something to finish, then stuff more coroutines into the pool.
- try:
- while True:
- # Before each yield, start as many new coroutines as we can fit.
- # (The self.free() test isn't 100% accurate: if we happen to be
- # executing in one of the pool's coroutines, we could _execute()
- # without waiting even if self.free() reports 0. See _execute().)
- # The point is that we don't want to wait in the _execute() call,
- # we want to wait in the q.wait() call.
- # IMPORTANT: at start, and whenever we've caught up with all
- # coroutines we've launched so far, we MUST iterate this inner
- # loop at least once, regardless of self.free() -- otherwise the
- # q.wait() call below will deadlock!
- # Recall that index is the index of the NEXT args tuple that we
- # haven't yet launched. Therefore it counts how many args tuples
- # we've launched so far.
- while self.free() > 0 or finished == index:
- # Just like the implementation of execute_async(), save that
- # we're passing our queue instead of None as the "event" to
- # which to send() the result.
- self._execute(q, function, args, {})
- # We've consumed that args tuple, advance to next.
- index, args = six.next(tuples)
- # Okay, we've filled up the pool again, yield a result -- which
- # will probably wait for a coroutine to complete. Although we do
- # have q.ready(), so we could iterate without waiting, we avoid
- # that because every yield could involve considerable real time.
- # We don't know how long it takes to return from yield, so every
- # time we do, take the opportunity to stuff more requests into the
- # pool before yielding again.
- yield q.wait()
- # Be sure to count results so we know when to stop!
- finished += 1
- except StopIteration:
- pass
- # Here we've exhausted the input iterable. index+1 is the total number
- # of coroutines we've launched. We probably haven't yielded that many
- # results yet. Wait for the rest of the results, yielding them as they
- # arrive.
- while finished < index + 1:
- yield q.wait()
- finished += 1
diff --git a/eventlet/proc.py b/eventlet/proc.py
deleted file mode 100644
index 52e98d8..0000000
--- a/eventlet/proc.py
+++ /dev/null
@@ -1,739 +0,0 @@
-"""
-This module provides means to spawn, kill and link coroutines. Linking means
-subscribing to the coroutine's result, either in form of return value or
-unhandled exception.
-
-To create a linkable coroutine use spawn function provided by this module:
-
- >>> def demofunc(x, y):
- ... return x / y
- >>> p = spawn(demofunc, 6, 2)
-
-The return value of :func:`spawn` is an instance of :class:`Proc` class that
-you can "link":
-
- * ``p.link(obj)`` - notify *obj* when the coroutine is finished
-
-What "notify" means here depends on the type of *obj*: a callable is simply
-called, an :class:`~eventlet.coros.Event` or a :class:`~eventlet.coros.queue`
-is notified using ``send``/``send_exception`` methods and if *obj* is another
-greenlet it's killed with :class:`LinkedExited` exception.
-
-Here's an example:
-
->>> event = coros.Event()
->>> _ = p.link(event)
->>> event.wait()
-3
-
-Now, even though *p* is finished it's still possible to link it. In this
-case the notification is performed immediatelly:
-
->>> try:
-... p.link()
-... except LinkedCompleted:
-... print('LinkedCompleted')
-LinkedCompleted
-
-(Without an argument, the link is created to the current greenlet)
-
-There are also :meth:`~eventlet.proc.Source.link_value` and
-:func:`link_exception` methods that only deliver a return value and an
-unhandled exception respectively (plain :meth:`~eventlet.proc.Source.link`
-delivers both). Suppose we want to spawn a greenlet to do an important part of
-the task; if it fails then there's no way to complete the task so the parent
-must fail as well; :meth:`~eventlet.proc.Source.link_exception` is useful here:
-
->>> p = spawn(demofunc, 1, 0)
->>> _ = p.link_exception()
->>> try:
-... api.sleep(1)
-... except LinkedFailed:
-... print('LinkedFailed')
-LinkedFailed
-
-One application of linking is :func:`waitall` function: link to a bunch of
-coroutines and wait for all them to complete. Such a function is provided by
-this module.
-"""
-import sys
-
-from eventlet import api, coros, hubs
-from eventlet.support import six
-
-import warnings
-warnings.warn(
- "The proc module is deprecated! Please use the greenthread "
- "module, or any of the many other Eventlet cross-coroutine "
- "primitives, instead.",
- DeprecationWarning, stacklevel=2)
-
-__all__ = ['LinkedExited',
- 'LinkedFailed',
- 'LinkedCompleted',
- 'LinkedKilled',
- 'ProcExit',
- 'Link',
- 'waitall',
- 'killall',
- 'Source',
- 'Proc',
- 'spawn',
- 'spawn_link',
- 'spawn_link_value',
- 'spawn_link_exception']
-
-
-class LinkedExited(Exception):
- """Raised when a linked proc exits"""
- msg = "%r exited"
-
- def __init__(self, name=None, msg=None):
- self.name = name
- if msg is None:
- msg = self.msg % self.name
- Exception.__init__(self, msg)
-
-
-class LinkedCompleted(LinkedExited):
- """Raised when a linked proc finishes the execution cleanly"""
-
- msg = "%r completed successfully"
-
-
-class LinkedFailed(LinkedExited):
- """Raised when a linked proc dies because of unhandled exception"""
- msg = "%r failed with %s"
-
- def __init__(self, name, typ, value=None, tb=None):
- msg = self.msg % (name, typ.__name__)
- LinkedExited.__init__(self, name, msg)
-
-
-class LinkedKilled(LinkedFailed):
- """Raised when a linked proc dies because of unhandled GreenletExit
- (i.e. it was killed)
- """
- msg = """%r was killed with %s"""
-
-
-def getLinkedFailed(name, typ, value=None, tb=None):
- if issubclass(typ, api.GreenletExit):
- return LinkedKilled(name, typ, value, tb)
- return LinkedFailed(name, typ, value, tb)
-
-
-class ProcExit(api.GreenletExit):
- """Raised when this proc is killed."""
-
-
-class Link(object):
- """
- A link to a greenlet, triggered when the greenlet exits.
- """
-
- def __init__(self, listener):
- self.listener = listener
-
- def cancel(self):
- self.listener = None
-
- def __enter__(self):
- pass
-
- def __exit__(self, *args):
- self.cancel()
-
-
-class LinkToEvent(Link):
-
- def __call__(self, source):
- if self.listener is None:
- return
- if source.has_value():
- self.listener.send(source.value)
- else:
- self.listener.send_exception(*source.exc_info())
-
-
-class LinkToGreenlet(Link):
-
- def __call__(self, source):
- if source.has_value():
- self.listener.throw(LinkedCompleted(source.name))
- else:
- self.listener.throw(getLinkedFailed(source.name, *source.exc_info()))
-
-
-class LinkToCallable(Link):
-
- def __call__(self, source):
- self.listener(source)
-
-
-def waitall(lst, trap_errors=False, queue=None):
- if queue is None:
- queue = coros.queue()
- index = -1
- for (index, linkable) in enumerate(lst):
- linkable.link(decorate_send(queue, index))
- len = index + 1
- results = [None] * len
- count = 0
- while count < len:
- try:
- index, value = queue.wait()
- except Exception:
- if not trap_errors:
- raise
- else:
- results[index] = value
- count += 1
- return results
-
-
-class decorate_send(object):
-
- def __init__(self, event, tag):
- self._event = event
- self._tag = tag
-
- def __repr__(self):
- params = (type(self).__name__, self._tag, self._event)
- return '<%s tag=%r event=%r>' % params
-
- def __getattr__(self, name):
- assert name != '_event'
- return getattr(self._event, name)
-
- def send(self, value):
- self._event.send((self._tag, value))
-
-
-def killall(procs, *throw_args, **kwargs):
- if not throw_args:
- throw_args = (ProcExit, )
- wait = kwargs.pop('wait', False)
- if kwargs:
- raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(kwargs.keys()))
- for g in procs:
- if not g.dead:
- hubs.get_hub().schedule_call_global(0, g.throw, *throw_args)
- if wait and api.getcurrent() is not hubs.get_hub().greenlet:
- api.sleep(0)
-
-
-class NotUsed(object):
-
- def __str__(self):
- return '<Source instance does not hold a value or an exception>'
-
- __repr__ = __str__
-
-_NOT_USED = NotUsed()
-
-
-def spawn_greenlet(function, *args):
- """Create a new greenlet that will run ``function(*args)``.
- The current greenlet won't be unscheduled. Keyword arguments aren't
- supported (limitation of greenlet), use :func:`spawn` to work around that.
- """
- g = api.Greenlet(function)
- g.parent = hubs.get_hub().greenlet
- hubs.get_hub().schedule_call_global(0, g.switch, *args)
- return g
-
-
-class Source(object):
- """Maintain a set of links to the listeners. Delegate the sent value or
- the exception to all of them.
-
- To set up a link, use :meth:`link_value`, :meth:`link_exception` or
- :meth:`link` method. The latter establishes both "value" and "exception"
- link. It is possible to link to events, queues, greenlets and callables.
-
- >>> source = Source()
- >>> event = coros.Event()
- >>> _ = source.link(event)
-
- Once source's :meth:`send` or :meth:`send_exception` method is called, all
- the listeners with the right type of link will be notified ("right type"
- means that exceptions won't be delivered to "value" links and values won't
- be delivered to "exception" links). Once link has been fired it is removed.
-
- Notifying listeners is performed in the **mainloop** greenlet. Under the
- hood notifying a link means executing a callback, see :class:`Link` class
- for details. Notification *must not* attempt to switch to the hub, i.e.
- call any blocking functions.
-
- >>> source.send('hello')
- >>> event.wait()
- 'hello'
-
- Any error happened while sending will be logged as a regular unhandled
- exception. This won't prevent other links from being fired.
-
- There 3 kinds of listeners supported:
-
- 1. If *listener* is a greenlet (regardless if it's a raw greenlet or an
- extension like :class:`Proc`), a subclass of :class:`LinkedExited`
- exception is raised in it.
-
- 2. If *listener* is something with send/send_exception methods (event,
- queue, :class:`Source` but not :class:`Proc`) the relevant method is
- called.
-
- 3. If *listener* is a callable, it is called with 1 argument (the result)
- for "value" links and with 3 arguments ``(typ, value, tb)`` for
- "exception" links.
- """
-
- def __init__(self, name=None):
- self.name = name
- self._value_links = {}
- self._exception_links = {}
- self.value = _NOT_USED
- self._exc = None
-
- def _repr_helper(self):
- result = []
- result.append(repr(self.name))
- if self.value is not _NOT_USED:
- if self._exc is None:
- res = repr(self.value)
- if len(res) > 50:
- res = res[:50] + '...'
- result.append('result=%s' % res)
- else:
- result.append('raised=%s' % (self._exc, ))
- result.append('{%s:%s}' % (len(self._value_links), len(self._exception_links)))
- return result
-
- def __repr__(self):
- klass = type(self).__name__
- return '<%s at %s %s>' % (klass, hex(id(self)), ' '.join(self._repr_helper()))
-
- def ready(self):
- return self.value is not _NOT_USED
-
- def has_value(self):
- return self.value is not _NOT_USED and self._exc is None
-
- def has_exception(self):
- return self.value is not _NOT_USED and self._exc is not None
-
- def exc_info(self):
- if not self._exc:
- return (None, None, None)
- elif len(self._exc) == 3:
- return self._exc
- elif len(self._exc) == 1:
- if isinstance(self._exc[0], type):
- return self._exc[0], None, None
- else:
- return self._exc[0].__class__, self._exc[0], None
- elif len(self._exc) == 2:
- return self._exc[0], self._exc[1], None
- else:
- return self._exc
-
- def link_value(self, listener=None, link=None):
- if self.ready() and self._exc is not None:
- return
- if listener is None:
- listener = api.getcurrent()
- if link is None:
- link = self.getLink(listener)
- if self.ready() and listener is api.getcurrent():
- link(self)
- else:
- self._value_links[listener] = link
- if self.value is not _NOT_USED:
- self._start_send()
- return link
-
- def link_exception(self, listener=None, link=None):
- if self.value is not _NOT_USED and self._exc is None:
- return
- if listener is None:
- listener = api.getcurrent()
- if link is None:
- link = self.getLink(listener)
- if self.ready() and listener is api.getcurrent():
- link(self)
- else:
- self._exception_links[listener] = link
- if self.value is not _NOT_USED:
- self._start_send_exception()
- return link
-
- def link(self, listener=None, link=None):
- if listener is None:
- listener = api.getcurrent()
- if link is None:
- link = self.getLink(listener)
- if self.ready() and listener is api.getcurrent():
- if self._exc is None:
- link(self)
- else:
- link(self)
- else:
- self._value_links[listener] = link
- self._exception_links[listener] = link
- if self.value is not _NOT_USED:
- if self._exc is None:
- self._start_send()
- else:
- self._start_send_exception()
- return link
-
- def unlink(self, listener=None):
- if listener is None:
- listener = api.getcurrent()
- self._value_links.pop(listener, None)
- self._exception_links.pop(listener, None)
-
- @staticmethod
- def getLink(listener):
- if hasattr(listener, 'throw'):
- return LinkToGreenlet(listener)
- if hasattr(listener, 'send'):
- return LinkToEvent(listener)
- elif hasattr(listener, '__call__'):
- return LinkToCallable(listener)
- else:
- raise TypeError("Don't know how to link to %r" % (listener, ))
-
- def send(self, value):
- assert not self.ready(), "%s has been fired already" % self
- self.value = value
- self._exc = None
- self._start_send()
-
- def _start_send(self):
- links_items = list(six.iteritems(self._value_links))
- hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._value_links)
-
- def send_exception(self, *throw_args):
- assert not self.ready(), "%s has been fired already" % self
- self.value = None
- self._exc = throw_args
- self._start_send_exception()
-
- def _start_send_exception(self):
- links_items = list(six.iteritems(self._exception_links))
- hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._exception_links)
-
- def _do_send(self, links, consult):
- while links:
- listener, link = links.pop()
- try:
- if listener in consult:
- try:
- link(self)
- finally:
- consult.pop(listener, None)
- except:
- hubs.get_hub().schedule_call_global(0, self._do_send, links, consult)
- raise
-
- def wait(self, timeout=None, *throw_args):
- """Wait until :meth:`send` or :meth:`send_exception` is called or
- *timeout* has expired. Return the argument of :meth:`send` or raise the
- argument of :meth:`send_exception`. If *timeout* has expired, ``None``
- is returned.
-
- The arguments, when provided, specify how many seconds to wait and what
- to do when *timeout* has expired. They are treated the same way as
- :func:`~eventlet.api.timeout` treats them.
- """
- if self.value is not _NOT_USED:
- if self._exc is None:
- return self.value
- else:
- api.getcurrent().throw(*self._exc)
- if timeout is not None:
- timer = api.timeout(timeout, *throw_args)
- timer.__enter__()
- if timeout == 0:
- if timer.__exit__(None, None, None):
- return
- else:
- try:
- api.getcurrent().throw(*timer.throw_args)
- except:
- if not timer.__exit__(*sys.exc_info()):
- raise
- return
- EXC = True
- try:
- try:
- waiter = Waiter()
- self.link(waiter)
- try:
- return waiter.wait()
- finally:
- self.unlink(waiter)
- except:
- EXC = False
- if timeout is None or not timer.__exit__(*sys.exc_info()):
- raise
- finally:
- if timeout is not None and EXC:
- timer.__exit__(None, None, None)
-
-
-class Waiter(object):
-
- def __init__(self):
- self.greenlet = None
-
- def send(self, value):
- """Wake up the greenlet that is calling wait() currently (if there is one).
- Can only be called from get_hub().greenlet.
- """
- assert api.getcurrent() is hubs.get_hub().greenlet
- if self.greenlet is not None:
- self.greenlet.switch(value)
-
- def send_exception(self, *throw_args):
- """Make greenlet calling wait() wake up (if there is a wait()).
- Can only be called from get_hub().greenlet.
- """
- assert api.getcurrent() is hubs.get_hub().greenlet
- if self.greenlet is not None:
- self.greenlet.throw(*throw_args)
-
- def wait(self):
- """Wait until send or send_exception is called. Return value passed
- into send() or raise exception passed into send_exception().
- """
- assert self.greenlet is None
- current = api.getcurrent()
- assert current is not hubs.get_hub().greenlet
- self.greenlet = current
- try:
- return hubs.get_hub().switch()
- finally:
- self.greenlet = None
-
-
-class Proc(Source):
- """A linkable coroutine based on Source.
- Upon completion, delivers coroutine's result to the listeners.
- """
-
- def __init__(self, name=None):
- self.greenlet = None
- Source.__init__(self, name)
-
- def _repr_helper(self):
- if self.greenlet is not None and self.greenlet.dead:
- dead = '(dead)'
- else:
- dead = ''
- return ['%r%s' % (self.greenlet, dead)] + Source._repr_helper(self)
-
- def __repr__(self):
- klass = type(self).__name__
- return '<%s %s>' % (klass, ' '.join(self._repr_helper()))
-
- def __nonzero__(self):
- if self.ready():
- # with current _run this does not makes any difference
- # still, let keep it there
- return False
- # otherwise bool(proc) is the same as bool(greenlet)
- if self.greenlet is not None:
- return bool(self.greenlet)
-
- __bool__ = __nonzero__
-
- @property
- def dead(self):
- return self.ready() or self.greenlet.dead
-
- @classmethod
- def spawn(cls, function, *args, **kwargs):
- """Return a new :class:`Proc` instance that is scheduled to execute
- ``function(*args, **kwargs)`` upon the next hub iteration.
- """
- proc = cls()
- proc.run(function, *args, **kwargs)
- return proc
-
- def run(self, function, *args, **kwargs):
- """Create a new greenlet to execute ``function(*args, **kwargs)``.
- The created greenlet is scheduled to run upon the next hub iteration.
- """
- assert self.greenlet is None, "'run' can only be called once per instance"
- if self.name is None:
- self.name = str(function)
- self.greenlet = spawn_greenlet(self._run, function, args, kwargs)
-
- def _run(self, function, args, kwargs):
- """Internal top level function.
- Execute *function* and send its result to the listeners.
- """
- try:
- result = function(*args, **kwargs)
- except:
- self.send_exception(*sys.exc_info())
- raise # let mainloop log the exception
- else:
- self.send(result)
-
- def throw(self, *throw_args):
- """Used internally to raise the exception.
-
- Behaves exactly like greenlet's 'throw' with the exception that
- :class:`ProcExit` is raised by default. Do not use this function as it
- leaves the current greenlet unscheduled forever. Use :meth:`kill`
- method instead.
- """
- if not self.dead:
- if not throw_args:
- throw_args = (ProcExit, )
- self.greenlet.throw(*throw_args)
-
- def kill(self, *throw_args):
- """
- Raise an exception in the greenlet. Unschedule the current greenlet so
- that this :class:`Proc` can handle the exception (or die).
-
- The exception can be specified with *throw_args*. By default,
- :class:`ProcExit` is raised.
- """
- if not self.dead:
- if not throw_args:
- throw_args = (ProcExit, )
- hubs.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args)
- if api.getcurrent() is not hubs.get_hub().greenlet:
- api.sleep(0)
-
- # QQQ maybe Proc should not inherit from Source (because its send() and send_exception()
- # QQQ methods are for internal use only)
-
-
-spawn = Proc.spawn
-
-
-def spawn_link(function, *args, **kwargs):
- p = spawn(function, *args, **kwargs)
- p.link()
- return p
-
-
-def spawn_link_value(function, *args, **kwargs):
- p = spawn(function, *args, **kwargs)
- p.link_value()
- return p
-
-
-def spawn_link_exception(function, *args, **kwargs):
- p = spawn(function, *args, **kwargs)
- p.link_exception()
- return p
-
-
-class wrap_errors(object):
- """Helper to make function return an exception, rather than raise it.
-
- Because every exception that is unhandled by greenlet will be logged by the hub,
- it is desirable to prevent non-error exceptions from leaving a greenlet.
- This can done with simple try/except construct:
-
- def func1(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except (A, B, C) as ex:
- return ex
-
- wrap_errors provides a shortcut to write that in one line:
-
- func1 = wrap_errors((A, B, C), func)
-
- It also preserves __str__ and __repr__ of the original function.
- """
-
- def __init__(self, errors, func):
- """Make a new function from `func', such that it catches `errors' (an
- Exception subclass, or a tuple of Exception subclasses) and return
- it as a value.
- """
- self.errors = errors
- self.func = func
-
- def __call__(self, *args, **kwargs):
- try:
- return self.func(*args, **kwargs)
- except self.errors as ex:
- return ex
-
- def __str__(self):
- return str(self.func)
-
- def __repr__(self):
- return repr(self.func)
-
- def __getattr__(self, item):
- return getattr(self.func, item)
-
-
-class RunningProcSet(object):
- """
- Maintain a set of :class:`Proc` s that are still running, that is,
- automatically remove a proc when it's finished. Provide a way to wait/kill
- all of them
- """
-
- def __init__(self, *args):
- self.procs = set(*args)
- if args:
- for p in self.args[0]:
- p.link(lambda p: self.procs.discard(p))
-
- def __len__(self):
- return len(self.procs)
-
- def __contains__(self, item):
- if isinstance(item, api.Greenlet):
- # special case for "api.getcurrent() in running_proc_set" to work
- for x in self.procs:
- if x.greenlet == item:
- return True
- else:
- return item in self.procs
-
- def __iter__(self):
- return iter(self.procs)
-
- def add(self, p):
- self.procs.add(p)
- p.link(lambda p: self.procs.discard(p))
-
- def spawn(self, func, *args, **kwargs):
- p = spawn(func, *args, **kwargs)
- self.add(p)
- return p
-
- def waitall(self, trap_errors=False):
- while self.procs:
- waitall(self.procs, trap_errors=trap_errors)
-
- def killall(self, *throw_args, **kwargs):
- return killall(self.procs, *throw_args, **kwargs)
-
-
-class Pool(object):
-
- linkable_class = Proc
-
- def __init__(self, limit):
- self.semaphore = coros.Semaphore(limit)
-
- def allocate(self):
- self.semaphore.acquire()
- g = self.linkable_class()
- g.link(lambda *_args: self.semaphore.release())
- return g
diff --git a/eventlet/processes.py b/eventlet/processes.py
deleted file mode 100644
index 1b5dfd6..0000000
--- a/eventlet/processes.py
+++ /dev/null
@@ -1,169 +0,0 @@
-import warnings
-warnings.warn("eventlet.processes is deprecated in favor of "
- "eventlet.green.subprocess, which is API-compatible with the standard "
- " library subprocess module.",
- DeprecationWarning, stacklevel=2)
-
-import errno
-import os
-import signal
-
-import eventlet
-from eventlet import greenio, pools
-from eventlet.green import subprocess
-
-
-class DeadProcess(RuntimeError):
- pass
-
-
-def cooperative_wait(pobj, check_interval=0.01):
- """ Waits for a child process to exit, returning the status
- code.
-
- Unlike ``os.wait``, :func:`cooperative_wait` does not block the entire
- process, only the calling coroutine. If the child process does not die,
- :func:`cooperative_wait` could wait forever.
-
- The argument *check_interval* is the amount of time, in seconds, that
- :func:`cooperative_wait` will sleep between calls to ``os.waitpid``.
- """
- try:
- while True:
- status = pobj.poll()
- if status >= 0:
- return status
- eventlet.sleep(check_interval)
- except OSError as e:
- if e.errno == errno.ECHILD:
- # no child process, this happens if the child process
- # already died and has been cleaned up, or if you just
- # called with a random pid value
- return -1
- else:
- raise
-
-
-class Process(object):
- """Construct Process objects, then call read, and write on them."""
- process_number = 0
-
- def __init__(self, command, args, dead_callback=None):
- self.process_number = self.process_number + 1
- Process.process_number = self.process_number
- self.command = command
- self.args = args
- self._dead_callback = dead_callback
- self.run()
-
- def run(self):
- self.dead = False
- self.started = False
- self.proc = None
-
- args = [self.command]
- args.extend(self.args)
- self.proc = subprocess.Popen(
- args=args,
- shell=False,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- close_fds=True,
- )
- self.child_stdout_stderr = self.proc.stdout
- self.child_stdin = self.proc.stdin
-
- self.sendall = self.child_stdin.write
- self.send = self.child_stdin.write
- self.recv = self.child_stdout_stderr.read
- self.readline = self.child_stdout_stderr.readline
- self._read_first_result = False
-
- def wait(self):
- return cooperative_wait(self.proc)
-
- def dead_callback(self):
- self.wait()
- self.dead = True
- if self._dead_callback:
- self._dead_callback()
-
- def makefile(self, mode, *arg):
- if mode.startswith('r'):
- return self.child_stdout_stderr
- if mode.startswith('w'):
- return self.child_stdin
- raise RuntimeError("Unknown mode", mode)
-
- def read(self, amount=None):
- """Reads from the stdout and stderr of the child process.
- The first call to read() will return a string; subsequent
- calls may raise a DeadProcess when EOF occurs on the pipe.
- """
- result = self.child_stdout_stderr.read(amount)
- if result == '' and self._read_first_result:
- # This process is dead.
- self.dead_callback()
- raise DeadProcess
- else:
- self._read_first_result = True
- return result
-
- def write(self, stuff):
- written = 0
- try:
- written = self.child_stdin.write(stuff)
- self.child_stdin.flush()
- except ValueError as e:
- # File was closed
- assert str(e) == 'I/O operation on closed file'
- if written == 0:
- self.dead_callback()
- raise DeadProcess
-
- def flush(self):
- self.child_stdin.flush()
-
- def close(self):
- self.child_stdout_stderr.close()
- self.child_stdin.close()
- self.dead_callback()
-
- def close_stdin(self):
- self.child_stdin.close()
-
- def kill(self, sig=None):
- if sig is None:
- sig = signal.SIGTERM
- pid = self.getpid()
- os.kill(pid, sig)
-
- def getpid(self):
- return self.proc.pid
-
-
-class ProcessPool(pools.Pool):
- def __init__(self, command, args=None, min_size=0, max_size=4):
- """*command*
- the command to run
- """
- self.command = command
- if args is None:
- args = []
- self.args = args
- pools.Pool.__init__(self, min_size, max_size)
-
- def create(self):
- """Generate a process
- """
- def dead_callback():
- self.current_size -= 1
- return Process(self.command, self.args, dead_callback)
-
- def put(self, item):
- if not item.dead:
- if item.proc.poll() != -1:
- item.dead_callback()
- else:
- pools.Pool.put(self, item)
diff --git a/eventlet/twistedutil/__init__.py b/eventlet/twistedutil/__init__.py
index a5b8613..4ff50aa 100644
--- a/eventlet/twistedutil/__init__.py
+++ b/eventlet/twistedutil/__init__.py
@@ -62,7 +62,7 @@ if __name__=='__main__':
test()
elif num==1:
spawn(test)
- from eventlet.api import sleep
+ from eventlet import sleep
print('sleeping..')
sleep(5)
print('done sleeping..')
diff --git a/eventlet/twistedutil/protocol.py b/eventlet/twistedutil/protocol.py
index 60d43ad..4549bd0 100644
--- a/eventlet/twistedutil/protocol.py
+++ b/eventlet/twistedutil/protocol.py
@@ -8,8 +8,8 @@ from twisted.python import failure
from eventlet import greenthread
from eventlet import getcurrent
-from eventlet.coros import Queue
from eventlet.event import Event as BaseEvent
+from eventlet.queue import Queue
class ValueQueue(Queue):
diff --git a/eventlet/util.py b/eventlet/util.py
deleted file mode 100644
index ea0a464..0000000
--- a/eventlet/util.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import socket
-import warnings
-
-
-__original_socket__ = socket.socket
-
-
-def tcp_socket():
- warnings.warn(
- "eventlet.util.tcp_socket is deprecated. "
- "Please use the standard socket technique for this instead: "
- "sock = socket.socket()",
- DeprecationWarning, stacklevel=2)
- s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM)
- return s
-
-
-# if ssl is available, use eventlet.green.ssl for our ssl implementation
-from eventlet.green import ssl
-
-
-def wrap_ssl(sock, certificate=None, private_key=None, server_side=False):
- warnings.warn(
- "eventlet.util.wrap_ssl is deprecated. "
- "Please use the eventlet.green.ssl.wrap_socket()",
- DeprecationWarning, stacklevel=2)
- return ssl.wrap_socket(
- sock,
- keyfile=private_key,
- certfile=certificate,
- server_side=server_side,
- )
-
-
-def wrap_socket_with_coroutine_socket(use_thread_pool=None):
- warnings.warn(
- "eventlet.util.wrap_socket_with_coroutine_socket() is now "
- "eventlet.patcher.monkey_patch(all=False, socket=True)",
- DeprecationWarning, stacklevel=2)
- from eventlet import patcher
- patcher.monkey_patch(all=False, socket=True)
-
-
-def wrap_pipes_with_coroutine_pipes():
- warnings.warn(
- "eventlet.util.wrap_pipes_with_coroutine_pipes() is now "
- "eventlet.patcher.monkey_patch(all=False, os=True)",
- DeprecationWarning, stacklevel=2)
- from eventlet import patcher
- patcher.monkey_patch(all=False, os=True)
-
-
-def wrap_select_with_coroutine_select():
- warnings.warn(
- "eventlet.util.wrap_select_with_coroutine_select() is now "
- "eventlet.patcher.monkey_patch(all=False, select=True)",
- DeprecationWarning, stacklevel=2)
- from eventlet import patcher
- patcher.monkey_patch(all=False, select=True)
-
-
-def wrap_threading_local_with_coro_local():
- """
- monkey patch ``threading.local`` with something that is greenlet aware.
- Since greenlets cannot cross threads, so this should be semantically
- identical to ``threadlocal.local``
- """
- warnings.warn(
- "eventlet.util.wrap_threading_local_with_coro_local() is now "
- "eventlet.patcher.monkey_patch(all=False, thread=True) -- though"
- "note that more than just _local is patched now.",
- DeprecationWarning, stacklevel=2)
-
- from eventlet import patcher
- patcher.monkey_patch(all=False, thread=True)
-
-
-def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):
- warnings.warn(
- "eventlet.util.socket_bind_and_listen is deprecated."
- "Please use the standard socket methodology for this instead:"
- "sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)"
- "sock.bind(addr)"
- "sock.listen(backlog)",
- DeprecationWarning, stacklevel=2)
- set_reuse_addr(descriptor)
- descriptor.bind(addr)
- descriptor.listen(backlog)
- return descriptor
-
-
-def set_reuse_addr(descriptor):
- warnings.warn(
- "eventlet.util.set_reuse_addr is deprecated."
- "Please use the standard socket methodology for this instead:"
- "sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)",
- DeprecationWarning, stacklevel=2)
- try:
- descriptor.setsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR,
- descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1)
- except socket.error:
- pass
diff --git a/tests/api_test.py b/tests/api_test.py
index 5bee32d..8cb7410 100644
--- a/tests/api_test.py
+++ b/tests/api_test.py
@@ -1,21 +1,17 @@
import os
import socket
from unittest import TestCase, main
-import warnings
import eventlet
-from eventlet import greenio, util, hubs, greenthread, spawn
+from eventlet import greenio, hubs, greenthread, spawn
+from eventlet.green import ssl
from tests import skip_if_no_ssl
-warnings.simplefilter('ignore', DeprecationWarning)
-from eventlet import api
-warnings.simplefilter('default', DeprecationWarning)
-
def check_hub():
# Clear through the descriptor queue
- api.sleep(0)
- api.sleep(0)
+ eventlet.sleep(0)
+ eventlet.sleep(0)
hub = hubs.get_hub()
for nm in 'get_readers', 'get_writers':
dct = getattr(hub, nm)()
@@ -50,7 +46,7 @@ class TestApi(TestCase):
listenfd.close()
server = eventlet.listen(('0.0.0.0', 0))
- api.spawn(accept_once, server)
+ eventlet.spawn_n(accept_once, server)
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb')
@@ -74,13 +70,16 @@ class TestApi(TestCase):
greenio.shutdown_safe(listenfd)
listenfd.close()
- server = api.ssl_listener(('0.0.0.0', 0),
- self.certificate_file,
- self.private_key_file)
- api.spawn(accept_once, server)
+ server = eventlet.wrap_ssl(
+ eventlet.listen(('0.0.0.0', 0)),
+ self.private_key_file,
+ self.certificate_file,
+ server_side=True
+ )
+ eventlet.spawn_n(accept_once, server)
raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
- client = util.wrap_ssl(raw_client)
+ client = ssl.wrap_socket(raw_client)
fd = socket._fileobject(client, 'rb', 8192)
assert fd.readline() == b'hello\r\n'
@@ -100,13 +99,13 @@ class TestApi(TestCase):
def server(sock):
client, addr = sock.accept()
- api.sleep(0.1)
+ eventlet.sleep(0.1)
server_evt = spawn(server, server_sock)
- api.sleep(0)
+ eventlet.sleep(0)
try:
desc = eventlet.connect(('127.0.0.1', bound_port))
- api.trampoline(desc, read=True, write=False, timeout=0.001)
- except api.TimeoutError:
+ hubs.trampoline(desc, read=True, write=False, timeout=0.001)
+ except eventlet.TimeoutError:
pass # test passed
else:
assert False, "Didn't timeout"
@@ -128,8 +127,8 @@ class TestApi(TestCase):
def go():
desc = eventlet.connect(('127.0.0.1', bound_port))
try:
- api.trampoline(desc, read=True, timeout=0.1)
- except api.TimeoutError:
+ hubs.trampoline(desc, read=True, timeout=0.1)
+ except eventlet.TimeoutError:
assert False, "Timed out"
server.close()
@@ -138,21 +137,13 @@ class TestApi(TestCase):
greenthread.spawn_after_local(0, go)
- server_coro = api.spawn(client_closer, server)
+ server_coro = eventlet.spawn(client_closer, server)
while not done[0]:
- api.sleep(0)
- api.kill(server_coro)
+ eventlet.sleep(0)
+ eventlet.kill(server_coro)
check_hub()
- def test_named(self):
- named_foo = api.named('tests.api_test.Foo')
- self.assertEqual(named_foo.__name__, "Foo")
-
- def test_naming_missing_class(self):
- self.assertRaises(
- ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
-
def test_killing_dormant(self):
DELAY = 0.1
state = []
@@ -160,33 +151,33 @@ class TestApi(TestCase):
def test():
try:
state.append('start')
- api.sleep(DELAY)
+ eventlet.sleep(DELAY)
except:
state.append('except')
# catching GreenletExit
pass
# when switching to hub, hub makes itself the parent of this greenlet,
# thus after the function's done, the control will go to the parent
- api.sleep(0)
+ eventlet.sleep(0)
state.append('finished')
- g = api.spawn(test)
- api.sleep(DELAY / 2)
+ g = eventlet.spawn(test)
+ eventlet.sleep(DELAY / 2)
self.assertEqual(state, ['start'])
- api.kill(g)
+ eventlet.kill(g)
# will not get there, unless switching is explicitly scheduled by kill
self.assertEqual(state, ['start', 'except'])
- api.sleep(DELAY)
+ eventlet.sleep(DELAY)
self.assertEqual(state, ['start', 'except', 'finished'])
def test_nested_with_timeout(self):
def func():
- return api.with_timeout(0.2, api.sleep, 2, timeout_value=1)
+ return eventlet.with_timeout(0.2, eventlet.sleep, 2, timeout_value=1)
try:
- api.with_timeout(0.1, func)
- self.fail(u'Expected api.TimeoutError')
- except api.TimeoutError:
+ eventlet.with_timeout(0.1, func)
+ self.fail(u'Expected TimeoutError')
+ except eventlet.TimeoutError:
pass
diff --git a/tests/processes_test.py b/tests/processes_test.py
deleted file mode 100644
index 889c824..0000000
--- a/tests/processes_test.py
+++ /dev/null
@@ -1,110 +0,0 @@
-import sys
-import warnings
-from tests import LimitedTestCase, main, skip_on_windows
-
-warnings.simplefilter('ignore', DeprecationWarning)
-from eventlet import processes, api
-warnings.simplefilter('default', DeprecationWarning)
-
-
-class TestEchoPool(LimitedTestCase):
- def setUp(self):
- super(TestEchoPool, self).setUp()
- self.pool = processes.ProcessPool('echo', ["hello"])
-
- @skip_on_windows
- def test_echo(self):
- result = None
-
- proc = self.pool.get()
- try:
- result = proc.read()
- finally:
- self.pool.put(proc)
- self.assertEqual(result, 'hello\n')
-
- @skip_on_windows
- def test_read_eof(self):
- proc = self.pool.get()
- try:
- proc.read()
- self.assertRaises(processes.DeadProcess, proc.read)
- finally:
- self.pool.put(proc)
-
- @skip_on_windows
- def test_empty_echo(self):
- p = processes.Process('echo', ['-n'])
- self.assertEqual('', p.read())
- self.assertRaises(processes.DeadProcess, p.read)
-
-
-class TestCatPool(LimitedTestCase):
- def setUp(self):
- super(TestCatPool, self).setUp()
- api.sleep(0)
- self.pool = processes.ProcessPool('cat')
-
- @skip_on_windows
- def test_cat(self):
- result = None
-
- proc = self.pool.get()
- try:
- proc.write('goodbye')
- proc.close_stdin()
- result = proc.read()
- finally:
- self.pool.put(proc)
-
- self.assertEqual(result, 'goodbye')
-
- @skip_on_windows
- def test_write_to_dead(self):
- result = None
-
- proc = self.pool.get()
- try:
- proc.write('goodbye')
- proc.close_stdin()
- result = proc.read()
- self.assertRaises(processes.DeadProcess, proc.write, 'foo')
- finally:
- self.pool.put(proc)
-
- @skip_on_windows
- def test_close(self):
- result = None
-
- proc = self.pool.get()
- try:
- proc.write('hello')
- proc.close()
- self.assertRaises(processes.DeadProcess, proc.write, 'goodbye')
- finally:
- self.pool.put(proc)
-
-
-class TestDyingProcessesLeavePool(LimitedTestCase):
- def setUp(self):
- super(TestDyingProcessesLeavePool, self).setUp()
- self.pool = processes.ProcessPool('echo', ['hello'], max_size=1)
-
- @skip_on_windows
- def test_dead_process_not_inserted_into_pool(self):
- proc = self.pool.get()
- try:
- try:
- result = proc.read()
- self.assertEqual(result, 'hello\n')
- result = proc.read()
- except processes.DeadProcess:
- pass
- finally:
- self.pool.put(proc)
- proc2 = self.pool.get()
- assert proc is not proc2
-
-
-if __name__ == '__main__':
- main()
diff --git a/tests/ssl_test.py b/tests/ssl_test.py
index 73b9b43..621ff47 100644
--- a/tests/ssl_test.py
+++ b/tests/ssl_test.py
@@ -3,9 +3,9 @@ import warnings
from unittest import main
import eventlet
-from eventlet import util, greenio
+from eventlet import greenio
try:
- from eventlet.green.socket import ssl
+ from eventlet.green import ssl
except ImportError:
pass
from tests import (
@@ -15,8 +15,8 @@ from tests import (
def listen_ssl_socket(address=('127.0.0.1', 0)):
- sock = util.wrap_ssl(socket.socket(), certificate_file,
- private_key_file, True)
+ sock = ssl.wrap_socket(
+ socket.socket(), private_key_file, certificate_file, server_side=True)
sock.bind(address)
sock.listen(50)
@@ -44,7 +44,8 @@ class SSLTest(LimitedTestCase):
server_coro = eventlet.spawn(serve, sock)
- client = util.wrap_ssl(eventlet.connect(('127.0.0.1', sock.getsockname()[1])))
+ client = ssl.wrap_socket(
+ eventlet.connect(('127.0.0.1', sock.getsockname()[1])))
client.write(b'line 1\r\nline 2\r\n\r\n')
self.assertEqual(client.read(8192), b'response')
server_coro.wait()
@@ -64,7 +65,7 @@ class SSLTest(LimitedTestCase):
server_coro = eventlet.spawn(serve, sock)
raw_client = eventlet.connect(('127.0.0.1', sock.getsockname()[1]))
- client = util.wrap_ssl(raw_client)
+ client = ssl.wrap_socket(raw_client)
client.write(b'X')
greenio.shutdown_safe(client)
client.close()
@@ -79,7 +80,7 @@ class SSLTest(LimitedTestCase):
server_coro = eventlet.spawn(serve, sock)
raw_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ssl_client = util.wrap_ssl(raw_client)
+ ssl_client = ssl.wrap_socket(raw_client)
ssl_client.connect(('127.0.0.1', sock.getsockname()[1]))
ssl_client.write(b'abc')
greenio.shutdown_safe(ssl_client)
@@ -91,8 +92,8 @@ class SSLTest(LimitedTestCase):
def serve():
sock, addr = listener.accept()
self.assertEqual(sock.recv(6), b'before')
- sock_ssl = util.wrap_ssl(sock, certificate_file, private_key_file,
- server_side=True)
+ sock_ssl = ssl.wrap_socket(sock, private_key_file, certificate_file,
+ server_side=True)
sock_ssl.do_handshake()
self.assertEqual(sock_ssl.read(6), b'during')
sock2 = sock_ssl.unwrap()
@@ -103,7 +104,7 @@ class SSLTest(LimitedTestCase):
server_coro = eventlet.spawn(serve)
client = eventlet.connect((listener.getsockname()))
client.send(b'before')
- client_ssl = util.wrap_ssl(client)
+ client_ssl = ssl.wrap_socket(client)
client_ssl.do_handshake()
client_ssl.write(b'during')
client2 = client_ssl.unwrap()
@@ -142,7 +143,7 @@ class SSLTest(LimitedTestCase):
client_sock = eventlet.connect(server_sock.getsockname())
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE)
- client = util.wrap_ssl(client_sock)
+ client = ssl.wrap_socket(client_sock)
client.write(b'request')
self.assertEqual(client.read(8), b'response')
stage_1.send()
@@ -159,7 +160,8 @@ class SSLTest(LimitedTestCase):
sock.close()
listener = listen_ssl_socket(('', 0))
eventlet.spawn(serve, listener)
- client = ssl(eventlet.connect(('localhost', listener.getsockname()[1])))
+ client = ssl.wrap_socket(
+ eventlet.connect(('localhost', listener.getsockname()[1])))
self.assertEqual(client.read(1024), b'content')
self.assertEqual(client.read(1024), b'')
@@ -176,7 +178,7 @@ class SSLTest(LimitedTestCase):
listener = listen_ssl_socket(('', 0))
eventlet.spawn(serve, listener)
- ssl(eventlet.connect(('localhost', listener.getsockname()[1])))
+ ssl.wrap_socket(eventlet.connect(('localhost', listener.getsockname()[1])))
if __name__ == '__main__':
main()
diff --git a/tests/stdlib/test_thread__boundedsem.py b/tests/stdlib/test_thread__boundedsem.py
index 02bd95c..9e99dc4 100644
--- a/tests/stdlib/test_thread__boundedsem.py
+++ b/tests/stdlib/test_thread__boundedsem.py
@@ -1,15 +1,15 @@
"""Test that BoundedSemaphore with a very high bound is as good as unbounded one"""
-from eventlet import coros
+from eventlet import semaphore
from eventlet.green import thread
def allocate_lock():
- return coros.semaphore(1, 9999)
+ return semaphore.Semaphore(1, 9999)
original_allocate_lock = thread.allocate_lock
thread.allocate_lock = allocate_lock
original_LockType = thread.LockType
-thread.LockType = coros.CappedSemaphore
+thread.LockType = semaphore.CappedSemaphore
try:
import os.path
diff --git a/tests/test__coros_queue.py b/tests/test__coros_queue.py
deleted file mode 100644
index d4ee257..0000000
--- a/tests/test__coros_queue.py
+++ /dev/null
@@ -1,259 +0,0 @@
-from tests import LimitedTestCase, silence_warnings
-from unittest import main
-import eventlet
-from eventlet import coros, spawn, sleep
-from eventlet.event import Event
-
-
-class TestQueue(LimitedTestCase):
-
- @silence_warnings
- def test_send_first(self):
- q = coros.queue()
- q.send('hi')
- self.assertEqual(q.wait(), 'hi')
-
- @silence_warnings
- def test_send_exception_first(self):
- q = coros.queue()
- q.send(exc=RuntimeError())
- self.assertRaises(RuntimeError, q.wait)
-
- @silence_warnings
- def test_send_last(self):
- q = coros.queue()
-
- def waiter(q):
- timer = eventlet.Timeout(0.1)
- self.assertEqual(q.wait(), 'hi2')
- timer.cancel()
-
- spawn(waiter, q)
- sleep(0)
- sleep(0)
- q.send('hi2')
-
- @silence_warnings
- def test_max_size(self):
- q = coros.queue(2)
- results = []
-
- def putter(q):
- q.send('a')
- results.append('a')
- q.send('b')
- results.append('b')
- q.send('c')
- results.append('c')
-
- spawn(putter, q)
- sleep(0)
- self.assertEqual(results, ['a', 'b'])
- self.assertEqual(q.wait(), 'a')
- sleep(0)
- self.assertEqual(results, ['a', 'b', 'c'])
- self.assertEqual(q.wait(), 'b')
- self.assertEqual(q.wait(), 'c')
-
- @silence_warnings
- def test_zero_max_size(self):
- q = coros.queue(0)
-
- def sender(evt, q):
- q.send('hi')
- evt.send('done')
-
- def receiver(evt, q):
- x = q.wait()
- evt.send(x)
-
- e1 = Event()
- e2 = Event()
-
- spawn(sender, e1, q)
- sleep(0)
- assert not e1.ready()
- spawn(receiver, e2, q)
- self.assertEqual(e2.wait(), 'hi')
- self.assertEqual(e1.wait(), 'done')
-
- @silence_warnings
- def test_multiple_waiters(self):
- # tests that multiple waiters get their results back
- q = coros.queue()
-
- sendings = ['1', '2', '3', '4']
- gts = [eventlet.spawn(q.wait)
- for x in sendings]
-
- eventlet.sleep(0.01) # get 'em all waiting
-
- q.send(sendings[0])
- q.send(sendings[1])
- q.send(sendings[2])
- q.send(sendings[3])
- results = set()
- for i, gt in enumerate(gts):
- results.add(gt.wait())
- self.assertEqual(results, set(sendings))
-
- @silence_warnings
- def test_waiters_that_cancel(self):
- q = coros.queue()
-
- def do_receive(q, evt):
- eventlet.Timeout(0, RuntimeError())
- try:
- result = q.wait()
- evt.send(result)
- except RuntimeError:
- evt.send('timed out')
-
- evt = Event()
- spawn(do_receive, q, evt)
- self.assertEqual(evt.wait(), 'timed out')
-
- q.send('hi')
- self.assertEqual(q.wait(), 'hi')
-
- @silence_warnings
- def test_senders_that_die(self):
- q = coros.queue()
-
- def do_send(q):
- q.send('sent')
-
- spawn(do_send, q)
- self.assertEqual(q.wait(), 'sent')
-
- @silence_warnings
- def test_two_waiters_one_dies(self):
- def waiter(q, evt):
- evt.send(q.wait())
-
- def do_receive(q, evt):
- eventlet.Timeout(0, RuntimeError())
- try:
- result = q.wait()
- evt.send(result)
- except RuntimeError:
- evt.send('timed out')
-
- q = coros.queue()
- dying_evt = Event()
- waiting_evt = Event()
- spawn(do_receive, q, dying_evt)
- spawn(waiter, q, waiting_evt)
- sleep(0)
- q.send('hi')
- self.assertEqual(dying_evt.wait(), 'timed out')
- self.assertEqual(waiting_evt.wait(), 'hi')
-
- @silence_warnings
- def test_two_bogus_waiters(self):
- def do_receive(q, evt):
- eventlet.Timeout(0, RuntimeError())
- try:
- result = q.wait()
- evt.send(result)
- except RuntimeError:
- evt.send('timed out')
-
- q = coros.queue()
- e1 = Event()
- e2 = Event()
- spawn(do_receive, q, e1)
- spawn(do_receive, q, e2)
- sleep(0)
- q.send('sent')
- self.assertEqual(e1.wait(), 'timed out')
- self.assertEqual(e2.wait(), 'timed out')
- self.assertEqual(q.wait(), 'sent')
-
- @silence_warnings
- def test_waiting(self):
- def do_wait(q, evt):
- result = q.wait()
- evt.send(result)
-
- q = coros.queue()
- e1 = Event()
- spawn(do_wait, q, e1)
- sleep(0)
- self.assertEqual(1, q.waiting())
- q.send('hi')
- sleep(0)
- self.assertEqual(0, q.waiting())
- self.assertEqual('hi', e1.wait())
- self.assertEqual(0, q.waiting())
-
-
-class TestChannel(LimitedTestCase):
-
- @silence_warnings
- def test_send(self):
- sleep(0.1)
- channel = coros.queue(0)
-
- events = []
-
- def another_greenlet():
- events.append(channel.wait())
- events.append(channel.wait())
-
- spawn(another_greenlet)
-
- events.append('sending')
- channel.send('hello')
- events.append('sent hello')
- channel.send('world')
- events.append('sent world')
-
- self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
-
- @silence_warnings
- def test_wait(self):
- sleep(0.1)
- channel = coros.queue(0)
- events = []
-
- def another_greenlet():
- events.append('sending hello')
- channel.send('hello')
- events.append('sending world')
- channel.send('world')
- events.append('sent world')
-
- spawn(another_greenlet)
-
- events.append('waiting')
- events.append(channel.wait())
- events.append(channel.wait())
-
- self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
- sleep(0)
- self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
-
- @silence_warnings
- def test_waiters(self):
- c = coros.Channel()
- w1 = eventlet.spawn(c.wait)
- w2 = eventlet.spawn(c.wait)
- w3 = eventlet.spawn(c.wait)
- sleep(0)
- self.assertEqual(c.waiting(), 3)
- s1 = eventlet.spawn(c.send, 1)
- s2 = eventlet.spawn(c.send, 2)
- s3 = eventlet.spawn(c.send, 3)
- sleep(0) # this gets all the sends into a waiting state
- self.assertEqual(c.waiting(), 0)
-
- s1.wait()
- s2.wait()
- s3.wait()
- # NOTE: we don't guarantee that waiters are served in order
- results = sorted([w1.wait(), w2.wait(), w3.wait()])
- self.assertEqual(results, [1, 2, 3])
-
-if __name__ == '__main__':
- main()
diff --git a/tests/test__event.py b/tests/test__event.py
index 8bc7219..065e13b 100644
--- a/tests/test__event.py
+++ b/tests/test__event.py
@@ -1,6 +1,6 @@
import unittest
+from eventlet import spawn, sleep, with_timeout
from eventlet.event import Event
-from eventlet.api import spawn, sleep, with_timeout
import eventlet
from tests import LimitedTestCase
diff --git a/tests/test__pool.py b/tests/test__pool.py
deleted file mode 100644
index 4191625..0000000
--- a/tests/test__pool.py
+++ /dev/null
@@ -1,321 +0,0 @@
-import eventlet
-import warnings
-warnings.simplefilter('ignore', DeprecationWarning)
-from eventlet import pool, coros, api, hubs, timeout
-warnings.simplefilter('default', DeprecationWarning)
-from eventlet import event as _event
-from eventlet.support import six
-from tests import LimitedTestCase
-from unittest import main
-
-
-class TestCoroutinePool(LimitedTestCase):
- klass = pool.Pool
-
- def test_execute_async(self):
- done = _event.Event()
-
- def some_work():
- done.send()
- pool = self.klass(0, 2)
- pool.execute_async(some_work)
- done.wait()
-
- def test_execute(self):
- value = 'return value'
-
- def some_work():
- return value
- pool = self.klass(0, 2)
- worker = pool.execute(some_work)
- self.assertEqual(value, worker.wait())
-
- def test_waiting(self):
- pool = self.klass(0, 1)
- done = _event.Event()
-
- def consume():
- done.wait()
-
- def waiter(pool):
- evt = pool.execute(consume)
- evt.wait()
-
- waiters = []
- waiters.append(eventlet.spawn(waiter, pool))
- api.sleep(0)
- self.assertEqual(pool.waiting(), 0)
- waiters.append(eventlet.spawn(waiter, pool))
- api.sleep(0)
- self.assertEqual(pool.waiting(), 1)
- waiters.append(eventlet.spawn(waiter, pool))
- api.sleep(0)
- self.assertEqual(pool.waiting(), 2)
- done.send(None)
- for w in waiters:
- w.wait()
- self.assertEqual(pool.waiting(), 0)
-
- def test_multiple_coros(self):
- evt = _event.Event()
- results = []
-
- def producer():
- results.append('prod')
- evt.send()
-
- def consumer():
- results.append('cons1')
- evt.wait()
- results.append('cons2')
-
- pool = self.klass(0, 2)
- done = pool.execute(consumer)
- pool.execute_async(producer)
- done.wait()
- self.assertEqual(['cons1', 'prod', 'cons2'], results)
-
- def test_timer_cancel(self):
- # this test verifies that local timers are not fired
- # outside of the context of the execute method
- timer_fired = []
-
- def fire_timer():
- timer_fired.append(True)
-
- def some_work():
- hubs.get_hub().schedule_call_local(0, fire_timer)
- pool = self.klass(0, 2)
- worker = pool.execute(some_work)
- worker.wait()
- api.sleep(0)
- self.assertEqual(timer_fired, [])
-
- def test_reentrant(self):
- pool = self.klass(0, 1)
-
- def reenter():
- waiter = pool.execute(lambda a: a, 'reenter')
- self.assertEqual('reenter', waiter.wait())
-
- outer_waiter = pool.execute(reenter)
- outer_waiter.wait()
-
- evt = _event.Event()
-
- def reenter_async():
- pool.execute_async(lambda a: a, 'reenter')
- evt.send('done')
-
- pool.execute_async(reenter_async)
- evt.wait()
-
- def assert_pool_has_free(self, pool, num_free):
- def wait_long_time(e):
- e.wait()
- timer = timeout.Timeout(1, api.TimeoutError)
- try:
- evt = _event.Event()
- for x in six.moves.range(num_free):
- pool.execute(wait_long_time, evt)
- # if the pool has fewer free than we expect,
- # then we'll hit the timeout error
- finally:
- timer.cancel()
-
- # if the runtime error is not raised it means the pool had
- # some unexpected free items
- timer = timeout.Timeout(0, RuntimeError)
- self.assertRaises(RuntimeError, pool.execute, wait_long_time, evt)
-
- # clean up by causing all the wait_long_time functions to return
- evt.send(None)
- api.sleep(0)
- api.sleep(0)
-
- def test_resize(self):
- pool = self.klass(max_size=2)
- evt = _event.Event()
-
- def wait_long_time(e):
- e.wait()
- pool.execute(wait_long_time, evt)
- pool.execute(wait_long_time, evt)
- self.assertEqual(pool.free(), 0)
- self.assert_pool_has_free(pool, 0)
-
- # verify that the pool discards excess items put into it
- pool.resize(1)
-
- # cause the wait_long_time functions to return, which will
- # trigger puts to the pool
- evt.send(None)
- api.sleep(0)
- api.sleep(0)
-
- self.assertEqual(pool.free(), 1)
- self.assert_pool_has_free(pool, 1)
-
- # resize larger and assert that there are more free items
- pool.resize(2)
- self.assertEqual(pool.free(), 2)
- self.assert_pool_has_free(pool, 2)
-
- def test_stderr_raising(self):
- # testing that really egregious errors in the error handling code
- # (that prints tracebacks to stderr) don't cause the pool to lose
- # any members
- import sys
- pool = self.klass(min_size=1, max_size=1)
-
- def crash(*args, **kw):
- raise RuntimeError("Whoa")
-
- class FakeFile(object):
- write = crash
-
- # we're going to do this by causing the traceback.print_exc in
- # safe_apply to raise an exception and thus exit _main_loop
- normal_err = sys.stderr
- try:
- sys.stderr = FakeFile()
- waiter = pool.execute(crash)
- self.assertRaises(RuntimeError, waiter.wait)
- # the pool should have something free at this point since the
- # waiter returned
- # pool.Pool change: if an exception is raised during execution of a link,
- # the rest of the links are scheduled to be executed on the next hub iteration
- # this introduces a delay in updating pool.sem which makes pool.free() report 0
- # therefore, sleep:
- api.sleep(0)
- self.assertEqual(pool.free(), 1)
- # shouldn't block when trying to get
- t = timeout.Timeout(0.1)
- try:
- pool.execute(api.sleep, 1)
- finally:
- t.cancel()
- finally:
- sys.stderr = normal_err
-
- def test_track_events(self):
- pool = self.klass(track_events=True)
- for x in range(6):
- pool.execute(lambda n: n, x)
- for y in range(6):
- pool.wait()
-
- def test_track_slow_event(self):
- pool = self.klass(track_events=True)
-
- def slow():
- api.sleep(0.1)
- return 'ok'
- pool.execute(slow)
- self.assertEqual(pool.wait(), 'ok')
-
- def test_pool_smash(self):
- # The premise is that a coroutine in a Pool tries to get a token out
- # of a token pool but times out before getting the token. We verify
- # that neither pool is adversely affected by this situation.
- from eventlet import pools
- pool = self.klass(min_size=1, max_size=1)
- tp = pools.TokenPool(max_size=1)
- token = tp.get() # empty pool
-
- def do_receive(tp):
- timeout.Timeout(0, RuntimeError())
- try:
- t = tp.get()
- self.fail("Shouldn't have recieved anything from the pool")
- except RuntimeError:
- return 'timed out'
-
- # the execute makes the token pool expect that coroutine, but then
- # immediately cuts bait
- e1 = pool.execute(do_receive, tp)
- self.assertEqual(e1.wait(), 'timed out')
-
- # the pool can get some random item back
- def send_wakeup(tp):
- tp.put('wakeup')
- api.spawn(send_wakeup, tp)
-
- # now we ask the pool to run something else, which should not
- # be affected by the previous send at all
- def resume():
- return 'resumed'
- e2 = pool.execute(resume)
- self.assertEqual(e2.wait(), 'resumed')
-
- # we should be able to get out the thing we put in there, too
- self.assertEqual(tp.get(), 'wakeup')
-
-
-class PoolBasicTests(LimitedTestCase):
- klass = pool.Pool
-
- def test_execute_async(self):
- p = self.klass(max_size=2)
- self.assertEqual(p.free(), 2)
- r = []
-
- def foo(a):
- r.append(a)
- evt = p.execute(foo, 1)
- self.assertEqual(p.free(), 1)
- evt.wait()
- self.assertEqual(r, [1])
- api.sleep(0)
- self.assertEqual(p.free(), 2)
-
- # Once the pool is exhausted, calling an execute forces a yield.
-
- p.execute_async(foo, 2)
- self.assertEqual(1, p.free())
- self.assertEqual(r, [1])
-
- p.execute_async(foo, 3)
- self.assertEqual(0, p.free())
- self.assertEqual(r, [1])
-
- p.execute_async(foo, 4)
- self.assertEqual(r, [1, 2, 3])
- api.sleep(0)
- self.assertEqual(r, [1, 2, 3, 4])
-
- def test_execute(self):
- p = self.klass()
- evt = p.execute(lambda a: ('foo', a), 1)
- self.assertEqual(evt.wait(), ('foo', 1))
-
- def test_with_intpool(self):
- from eventlet import pools
-
- class IntPool(pools.Pool):
- def create(self):
- self.current_integer = getattr(self, 'current_integer', 0) + 1
- return self.current_integer
-
- def subtest(intpool_size, pool_size, num_executes):
- def run(int_pool):
- token = int_pool.get()
- api.sleep(0.0001)
- int_pool.put(token)
- return token
-
- int_pool = IntPool(max_size=intpool_size)
- pool = self.klass(max_size=pool_size)
- for ix in six.moves.range(num_executes):
- pool.execute(run, int_pool)
- pool.waitall()
-
- subtest(4, 7, 7)
- subtest(50, 75, 100)
- for isize in (20, 30, 40, 50):
- for psize in (25, 35, 50):
- subtest(isize, psize, psize)
-
-
-if __name__ == '__main__':
- main()
diff --git a/tests/test__proc.py b/tests/test__proc.py
deleted file mode 100644
index fb71a64..0000000
--- a/tests/test__proc.py
+++ /dev/null
@@ -1,401 +0,0 @@
-import sys
-import unittest
-import warnings
-warnings.simplefilter('ignore', DeprecationWarning)
-from eventlet import proc
-warnings.simplefilter('default', DeprecationWarning)
-from eventlet import coros
-from eventlet import event as _event
-from eventlet import Timeout, sleep, getcurrent, with_timeout
-from tests import LimitedTestCase, skipped, silence_warnings
-
-DELAY = 0.01
-
-
-class ExpectedError(Exception):
- pass
-
-
-class TestLink_Signal(LimitedTestCase):
-
- @silence_warnings
- def test_send(self):
- s = proc.Source()
- q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
- s.link_value(q1)
- self.assertRaises(Timeout, s.wait, 0)
- assert s.wait(0, None) is None
- assert s.wait(0.001, None) is None
- self.assertRaises(Timeout, s.wait, 0.001)
- s.send(1)
- assert not q1.ready()
- assert s.wait() == 1
- sleep(0)
- assert q1.ready()
- s.link_exception(q2)
- s.link(q3)
- assert not q2.ready()
- sleep(0)
- assert q3.ready()
- assert s.wait() == 1
-
- @silence_warnings
- def test_send_exception(self):
- s = proc.Source()
- q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
- s.link_exception(q1)
- s.send_exception(OSError('hello'))
- sleep(0)
- assert q1.ready()
- s.link_value(q2)
- s.link(q3)
- assert not q2.ready()
- sleep(0)
- assert q3.ready()
- self.assertRaises(OSError, q1.wait)
- self.assertRaises(OSError, q3.wait)
- self.assertRaises(OSError, s.wait)
-
-
-class TestProc(LimitedTestCase):
-
- def test_proc(self):
- p = proc.spawn(lambda: 100)
- receiver = proc.spawn(sleep, 1)
- p.link(receiver)
- self.assertRaises(proc.LinkedCompleted, receiver.wait)
- receiver2 = proc.spawn(sleep, 1)
- p.link(receiver2)
- self.assertRaises(proc.LinkedCompleted, receiver2.wait)
-
- def test_event(self):
- p = proc.spawn(lambda: 100)
- event = _event.Event()
- p.link(event)
- self.assertEqual(event.wait(), 100)
-
- for i in range(3):
- event2 = _event.Event()
- p.link(event2)
- self.assertEqual(event2.wait(), 100)
-
- def test_current(self):
- p = proc.spawn(lambda: 100)
- p.link()
- self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
-
-
-class TestCase(LimitedTestCase):
-
- def link(self, p, listener=None):
- getattr(p, self.link_method)(listener)
-
- def tearDown(self):
- LimitedTestCase.tearDown(self)
- self.p.unlink()
-
- def set_links(self, p, first_time, kill_exc_type):
- event = _event.Event()
- self.link(p, event)
-
- proc_flag = []
-
- def receiver():
- sleep(DELAY)
- proc_flag.append('finished')
- receiver = proc.spawn(receiver)
- self.link(p, receiver)
-
- queue = coros.queue(1)
- self.link(p, queue)
-
- try:
- self.link(p)
- except kill_exc_type:
- if first_time:
- raise
- else:
- assert first_time, 'not raising here only first time'
-
- callback_flag = ['initial']
- self.link(p, lambda *args: callback_flag.remove('initial'))
-
- for _ in range(10):
- self.link(p, _event.Event())
- self.link(p, coros.queue(1))
- return event, receiver, proc_flag, queue, callback_flag
-
- def set_links_timeout(self, link):
- # stuff that won't be touched
- event = _event.Event()
- link(event)
-
- proc_finished_flag = []
-
- def myproc():
- sleep(10)
- proc_finished_flag.append('finished')
- return 555
- myproc = proc.spawn(myproc)
- link(myproc)
-
- queue = coros.queue(0)
- link(queue)
- return event, myproc, proc_finished_flag, queue
-
- def check_timed_out(self, event, myproc, proc_finished_flag, queue):
- X = object()
- assert with_timeout(DELAY, event.wait, timeout_value=X) is X
- assert with_timeout(DELAY, queue.wait, timeout_value=X) is X
- assert with_timeout(DELAY, proc.waitall, [myproc], timeout_value=X) is X
- assert proc_finished_flag == [], proc_finished_flag
-
-
-class TestReturn_link(TestCase):
- link_method = 'link'
-
- def test_return(self):
- def return25():
- return 25
- p = self.p = proc.spawn(return25)
- self._test_return(p, True, 25, proc.LinkedCompleted, lambda: sleep(0))
- # repeating the same with dead process
- for _ in range(3):
- self._test_return(p, False, 25, proc.LinkedCompleted, lambda: sleep(0))
-
- def _test_return(self, p, first_time, result, kill_exc_type, action):
- event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
-
- # stuff that will time out because there's no unhandled exception:
- xxxxx = self.set_links_timeout(p.link_exception)
-
- try:
- sleep(DELAY * 2)
- except kill_exc_type:
- assert first_time, 'raising here only first time'
- else:
- assert not first_time, 'Should not raise LinkedKilled here after first time'
-
- assert not p, p
-
- self.assertEqual(event.wait(), result)
- self.assertEqual(queue.wait(), result)
- self.assertRaises(kill_exc_type, receiver.wait)
- self.assertRaises(kill_exc_type, proc.waitall, [receiver])
-
- sleep(DELAY)
- assert not proc_flag, proc_flag
- assert not callback_flag, callback_flag
-
- self.check_timed_out(*xxxxx)
-
-
-class TestReturn_link_value(TestReturn_link):
- sync = False
- link_method = 'link_value'
-
-
-class TestRaise_link(TestCase):
- link_method = 'link'
-
- def _test_raise(self, p, first_time, kill_exc_type):
- event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
- xxxxx = self.set_links_timeout(p.link_value)
-
- try:
- sleep(DELAY)
- except kill_exc_type:
- assert first_time, 'raising here only first time'
- else:
- assert not first_time, 'Should not raise LinkedKilled here after first time'
-
- assert not p, p
-
- self.assertRaises(ExpectedError, event.wait)
- self.assertRaises(ExpectedError, queue.wait)
- self.assertRaises(kill_exc_type, receiver.wait)
- self.assertRaises(kill_exc_type, proc.waitall, [receiver])
- sleep(DELAY)
- assert not proc_flag, proc_flag
- assert not callback_flag, callback_flag
-
- self.check_timed_out(*xxxxx)
-
- @silence_warnings
- def test_raise(self):
- p = self.p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_raise')))
- self._test_raise(p, True, proc.LinkedFailed)
- # repeating the same with dead process
- for _ in range(3):
- self._test_raise(p, False, proc.LinkedFailed)
-
- def _test_kill(self, p, first_time, kill_exc_type):
- event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
- xxxxx = self.set_links_timeout(p.link_value)
-
- p.kill()
- try:
- sleep(DELAY)
- except kill_exc_type:
- assert first_time, 'raising here only first time'
- else:
- assert not first_time, 'Should not raise LinkedKilled here after first time'
-
- assert not p, p
-
- self.assertRaises(proc.ProcExit, event.wait)
- self.assertRaises(proc.ProcExit, queue.wait)
- self.assertRaises(kill_exc_type, proc.waitall, [receiver])
- self.assertRaises(kill_exc_type, receiver.wait)
-
- sleep(DELAY)
- assert not proc_flag, proc_flag
- assert not callback_flag, callback_flag
-
- self.check_timed_out(*xxxxx)
-
- @silence_warnings
- def test_kill(self):
- p = self.p = proc.spawn(sleep, DELAY)
- self._test_kill(p, True, proc.LinkedKilled)
- # repeating the same with dead process
- for _ in range(3):
- self._test_kill(p, False, proc.LinkedKilled)
-
-
-class TestRaise_link_exception(TestRaise_link):
- link_method = 'link_exception'
-
-
-class TestStuff(LimitedTestCase):
-
- def test_wait_noerrors(self):
- x = proc.spawn(lambda: 1)
- y = proc.spawn(lambda: 2)
- z = proc.spawn(lambda: 3)
- self.assertEqual(proc.waitall([x, y, z]), [1, 2, 3])
- e = _event.Event()
- x.link(e)
- self.assertEqual(e.wait(), 1)
- x.unlink(e)
- e = _event.Event()
- x.link(e)
- self.assertEqual(e.wait(), 1)
- self.assertEqual([proc.waitall([X]) for X in [x, y, z]], [[1], [2], [3]])
-
- # this test is timing-sensitive
- @skipped
- def test_wait_error(self):
- def x():
- sleep(DELAY)
- return 1
- x = proc.spawn(x)
- z = proc.spawn(lambda: 3)
- y = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_wait_error')))
- y.link(x)
- x.link(y)
- y.link(z)
- z.link(y)
- self.assertRaises(ExpectedError, proc.waitall, [x, y, z])
- self.assertRaises(proc.LinkedFailed, proc.waitall, [x])
- self.assertEqual(proc.waitall([z]), [3])
- self.assertRaises(ExpectedError, proc.waitall, [y])
-
- def test_wait_all_exception_order(self):
- # if there're several exceptions raised, the earliest one must be raised by wait
- def first():
- sleep(0.1)
- raise ExpectedError('first')
- a = proc.spawn(first)
- b = proc.spawn(lambda: getcurrent().throw(ExpectedError('second')))
- try:
- proc.waitall([a, b])
- except ExpectedError as ex:
- assert 'second' in str(ex), repr(str(ex))
- sleep(0.2) # sleep to ensure that the other timer is raised
-
- def test_multiple_listeners_error(self):
- # if there was an error while calling a callback
- # it should not prevent the other listeners from being called
- # also, all of the errors should be logged, check the output
- # manually that they are
- p = proc.spawn(lambda: 5)
- results = []
-
- def listener1(*args):
- results.append(10)
- raise ExpectedError('listener1')
-
- def listener2(*args):
- results.append(20)
- raise ExpectedError('listener2')
-
- def listener3(*args):
- raise ExpectedError('listener3')
- p.link(listener1)
- p.link(listener2)
- p.link(listener3)
- sleep(DELAY * 10)
- assert results in [[10, 20], [20, 10]], results
-
- p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_multiple_listeners_error')))
- results = []
- p.link(listener1)
- p.link(listener2)
- p.link(listener3)
- sleep(DELAY * 10)
- assert results in [[10, 20], [20, 10]], results
-
- def _test_multiple_listeners_error_unlink(self, p):
- # notification must not happen after unlink even
- # though notification process has been already started
- results = []
-
- def listener1(*args):
- p.unlink(listener2)
- results.append(5)
- raise ExpectedError('listener1')
-
- def listener2(*args):
- p.unlink(listener1)
- results.append(5)
- raise ExpectedError('listener2')
-
- def listener3(*args):
- raise ExpectedError('listener3')
- p.link(listener1)
- p.link(listener2)
- p.link(listener3)
- sleep(DELAY * 10)
- assert results == [5], results
-
- def test_multiple_listeners_error_unlink_Proc(self):
- p = proc.spawn(lambda: 5)
- self._test_multiple_listeners_error_unlink(p)
-
- def test_multiple_listeners_error_unlink_Source(self):
- p = proc.Source()
- proc.spawn(p.send, 6)
- self._test_multiple_listeners_error_unlink(p)
-
- def test_killing_unlinked(self):
- e = _event.Event()
-
- def func():
- try:
- raise ExpectedError('test_killing_unlinked')
- except:
- e.send_exception(*sys.exc_info())
- p = proc.spawn_link(func)
- try:
- try:
- e.wait()
- except ExpectedError:
- pass
- finally:
- p.unlink() # this disables LinkedCompleted that otherwise would be raised by the next line
- sleep(DELAY)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test__socket_errors.py b/tests/test__socket_errors.py
index 5228ea7..7832de0 100644
--- a/tests/test__socket_errors.py
+++ b/tests/test__socket_errors.py
@@ -1,6 +1,5 @@
import unittest
import socket as _original_sock
-from eventlet import api
from eventlet.green import socket
diff --git a/tests/test__twistedutil_protocol.py b/tests/test__twistedutil_protocol.py
index 29a26cf..337cdbb 100644
--- a/tests/test__twistedutil_protocol.py
+++ b/tests/test__twistedutil_protocol.py
@@ -19,7 +19,7 @@ except ImportError:
pass
from eventlet import spawn, sleep, with_timeout, spawn_after
-from eventlet.coros import Event
+from eventlet.event import Event
try:
from eventlet.green import socket
diff --git a/tox.ini b/tox.ini
index 3dc5f0b..75aace4 100644
--- a/tox.ini
+++ b/tox.ini
@@ -33,8 +33,7 @@ deps =
commands =
nosetests --verbose tests/
nosetests --verbose --with-doctest eventlet/coros.py eventlet/event.py \
- eventlet/pool.py eventlet/pools.py eventlet/proc.py \
- eventlet/queue.py eventlet/timeout.py
+ eventlet/pools.py eventlet/queue.py eventlet/timeout.py
[testenv:pep8]
basepython = python2.7