diff options
| author | Jakub Stasiak <jakub@stasiak.at> | 2014-10-05 21:25:46 +0100 |
|---|---|---|
| committer | Jakub Stasiak <jakub@stasiak.at> | 2014-10-10 08:53:11 +0100 |
| commit | 75a1ab8cafb2bd3aa9ab96503b54fbddb068f58d (patch) | |
| tree | 703c1c3837ae70d0f7ce3790d6dcf94edc339026 | |
| parent | 5f5c137676044f0b3d9459a6d487ec80eac14b26 (diff) | |
| download | eventlet-75a1ab8cafb2bd3aa9ab96503b54fbddb068f58d.tar.gz | |
Remove most of the deprecated code
Closes GH #144
| -rw-r--r-- | NEWS | 5 | ||||
| -rw-r--r-- | doc/ssl.rst | 6 | ||||
| -rw-r--r-- | eventlet/api.py | 224 | ||||
| -rw-r--r-- | eventlet/coros.py | 272 | ||||
| -rw-r--r-- | eventlet/green/_socket_nodns.py | 16 | ||||
| -rw-r--r-- | eventlet/pool.py | 321 | ||||
| -rw-r--r-- | eventlet/proc.py | 739 | ||||
| -rw-r--r-- | eventlet/processes.py | 169 | ||||
| -rw-r--r-- | eventlet/twistedutil/__init__.py | 2 | ||||
| -rw-r--r-- | eventlet/twistedutil/protocol.py | 2 | ||||
| -rw-r--r-- | eventlet/util.py | 104 | ||||
| -rw-r--r-- | tests/api_test.py | 73 | ||||
| -rw-r--r-- | tests/processes_test.py | 110 | ||||
| -rw-r--r-- | tests/ssl_test.py | 28 | ||||
| -rw-r--r-- | tests/stdlib/test_thread__boundedsem.py | 6 | ||||
| -rw-r--r-- | tests/test__coros_queue.py | 259 | ||||
| -rw-r--r-- | tests/test__event.py | 2 | ||||
| -rw-r--r-- | tests/test__pool.py | 321 | ||||
| -rw-r--r-- | tests/test__proc.py | 401 | ||||
| -rw-r--r-- | tests/test__socket_errors.py | 1 | ||||
| -rw-r--r-- | tests/test__twistedutil_protocol.py | 2 | ||||
| -rw-r--r-- | tox.ini | 3 |
22 files changed, 66 insertions, 3000 deletions
@@ -1,3 +1,8 @@ +0.16.0 (not released yet) +========================= + +* removed deprecated modules: api, most of coros, pool, proc, processes and util + 0.15.2 ====== greenio: fixed memory leak, introduced in 0.15.1; Thanks to Michael Kerrin, Tushar Gohad diff --git a/doc/ssl.rst b/doc/ssl.rst index 0d47364..225ce56 100644 --- a/doc/ssl.rst +++ b/doc/ssl.rst @@ -6,8 +6,8 @@ Eventlet makes it easy to use non-blocking SSL sockets. If you're using Python In either case, the the ``green`` modules handle SSL sockets transparently, just like their standard counterparts. As an example, :mod:`eventlet.green.urllib2` can be used to fetch https urls in as non-blocking a fashion as you please:: from eventlet.green import urllib2 - from eventlet import coros - bodies = [coros.execute(urllib2.urlopen, url) + from eventlet import spawn + bodies = [spawn(urllib2.urlopen, url) for url in ("https://secondlife.com","https://google.com")] for b in bodies: print(b.wait().read()) @@ -55,4 +55,4 @@ Here's an example of a server:: client_conn.close() connection.close() -.. _pyOpenSSL: https://launchpad.net/pyopenssl
\ No newline at end of file +.. _pyOpenSSL: https://launchpad.net/pyopenssl diff --git a/eventlet/api.py b/eventlet/api.py deleted file mode 100644 index ca1c1ba..0000000 --- a/eventlet/api.py +++ /dev/null @@ -1,224 +0,0 @@ -import errno -import sys -import socket -import string -import linecache -import inspect -import warnings - -from eventlet.support import greenlets as greenlet -from eventlet import hubs -from eventlet import greenthread -from eventlet import debug -from eventlet import Timeout - -__all__ = [ - 'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub', - 'GreenletExit', 'kill', 'sleep', 'spawn', 'spew', 'switch', - 'ssl_listener', 'tcp_listener', 'trampoline', - 'unspew', 'use_hub', 'with_timeout', 'timeout'] - -warnings.warn( - "eventlet.api is deprecated! Nearly everything in it has moved " - "to the eventlet module.", DeprecationWarning, stacklevel=2) - - -def get_hub(*a, **kw): - warnings.warn( - "eventlet.api.get_hub has moved to eventlet.hubs.get_hub", - DeprecationWarning, stacklevel=2) - return hubs.get_hub(*a, **kw) - - -def get_default_hub(*a, **kw): - warnings.warn( - "eventlet.api.get_default_hub has moved to" - " eventlet.hubs.get_default_hub", - DeprecationWarning, stacklevel=2) - return hubs.get_default_hub(*a, **kw) - - -def use_hub(*a, **kw): - warnings.warn( - "eventlet.api.use_hub has moved to eventlet.hubs.use_hub", - DeprecationWarning, stacklevel=2) - return hubs.use_hub(*a, **kw) - - -def switch(coro, result=None, exc=None): - if exc is not None: - return coro.throw(exc) - return coro.switch(result) - -Greenlet = greenlet.greenlet - - -def tcp_listener(address, backlog=50): - """ - Listen on the given ``(ip, port)`` *address* with a TCP socket. Returns a - socket object on which one should call ``accept()`` to accept a connection - on the newly bound socket. - """ - warnings.warn( - """eventlet.api.tcp_listener is deprecated. Please use eventlet.listen instead.""", - DeprecationWarning, stacklevel=2) - - from eventlet import greenio, util - socket = greenio.GreenSocket(util.tcp_socket()) - util.socket_bind_and_listen(socket, address, backlog=backlog) - return socket - - -def ssl_listener(address, certificate, private_key): - """Listen on the given (ip, port) *address* with a TCP socket that - can do SSL. Primarily useful for unit tests, don't use in production. - - *certificate* and *private_key* should be the filenames of the appropriate - certificate and private key files to use with the SSL socket. - - Returns a socket object on which one should call ``accept()`` to - accept a connection on the newly bound socket. - """ - warnings.warn("""eventlet.api.ssl_listener is deprecated. Please use eventlet.wrap_ssl(eventlet.listen( - )) instead.""", - DeprecationWarning, stacklevel=2) - from eventlet import util - import socket - - socket = util.wrap_ssl(socket.socket(), certificate, private_key, True) - socket.bind(address) - socket.listen(50) - return socket - - -def connect_tcp(address, localaddr=None): - """ - Create a TCP connection to address ``(host, port)`` and return the socket. - Optionally, bind to localaddr ``(host, port)`` first. - """ - warnings.warn( - """eventlet.api.connect_tcp is deprecated. Please use eventlet.connect instead.""", - DeprecationWarning, stacklevel=2) - - from eventlet import greenio, util - desc = greenio.GreenSocket(util.tcp_socket()) - if localaddr is not None: - desc.bind(localaddr) - desc.connect(address) - return desc - -TimeoutError = greenthread.TimeoutError - -trampoline = hubs.trampoline - -spawn = greenthread.spawn -spawn_n = greenthread.spawn_n - - -kill = greenthread.kill - -call_after = greenthread.call_after -call_after_local = greenthread.call_after_local -call_after_global = greenthread.call_after_global - - -class _SilentException(BaseException): - pass - - -class FakeTimer(object): - def cancel(self): - pass - - -class timeout(object): - """Raise an exception in the block after timeout. - - Example:: - - with timeout(10): - urllib2.open('http://example.com') - - Assuming code block is yielding (i.e. gives up control to the hub), - an exception provided in *exc* argument will be raised - (:class:`~eventlet.api.TimeoutError` if *exc* is omitted):: - - try: - with timeout(10, MySpecialError, error_arg_1): - urllib2.open('http://example.com') - except MySpecialError as e: - print("special error received") - - When *exc* is ``None``, code block is interrupted silently. - """ - - def __init__(self, seconds, *throw_args): - self.seconds = seconds - if seconds is None: - return - if not throw_args: - self.throw_args = (TimeoutError(), ) - elif throw_args == (None, ): - self.throw_args = (_SilentException(), ) - else: - self.throw_args = throw_args - - def __enter__(self): - if self.seconds is None: - self.timer = FakeTimer() - else: - self.timer = exc_after(self.seconds, *self.throw_args) - return self.timer - - def __exit__(self, typ, value, tb): - self.timer.cancel() - if typ is _SilentException and value in self.throw_args: - return True - - -with_timeout = greenthread.with_timeout - -exc_after = greenthread.exc_after - -sleep = greenthread.sleep - -getcurrent = greenlet.getcurrent -GreenletExit = greenlet.GreenletExit - -spew = debug.spew -unspew = debug.unspew - - -def named(name): - """Return an object given its name. - - The name uses a module-like syntax, eg:: - - os.path.join - - or:: - - mulib.mu.Resource - """ - toimport = name - obj = None - import_err_strings = [] - while toimport: - try: - obj = __import__(toimport) - break - except ImportError as err: - # print('Import error on %s: %s' % (toimport, err)) # debugging spam - import_err_strings.append(err.__str__()) - toimport = '.'.join(toimport.split('.')[:-1]) - if obj is None: - raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings)) - for seg in name.split('.')[1:]: - try: - obj = getattr(obj, seg) - except AttributeError: - dirobj = dir(obj) - dirobj.sort() - raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % ( - seg, obj, dirobj, name, import_err_strings)) - return obj diff --git a/eventlet/coros.py b/eventlet/coros.py index 7407ad5..431e6f0 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -1,78 +1,21 @@ from __future__ import print_function -import collections -import traceback -import warnings - -import eventlet from eventlet import event as _event -from eventlet import hubs -from eventlet import greenthread -from eventlet import semaphore as semaphoremod - - -class NOT_USED: - def __repr__(self): - return 'NOT_USED' - -NOT_USED = NOT_USED() - - -def Event(*a, **kw): - warnings.warn("The Event class has been moved to the event module! " - "Please construct event.Event objects instead.", - DeprecationWarning, stacklevel=2) - return _event.Event(*a, **kw) - - -def event(*a, **kw): - warnings.warn( - "The event class has been capitalized and moved! Please " - "construct event.Event objects instead.", - DeprecationWarning, stacklevel=2) - return _event.Event(*a, **kw) - - -def Semaphore(count): - warnings.warn( - "The Semaphore class has moved! Please " - "use semaphore.Semaphore instead.", - DeprecationWarning, stacklevel=2) - return semaphoremod.Semaphore(count) - - -def BoundedSemaphore(count): - warnings.warn( - "The BoundedSemaphore class has moved! Please " - "use semaphore.BoundedSemaphore instead.", - DeprecationWarning, stacklevel=2) - return semaphoremod.BoundedSemaphore(count) - - -def semaphore(count=0, limit=None): - warnings.warn( - "coros.semaphore is deprecated. Please use either " - "semaphore.Semaphore or semaphore.BoundedSemaphore instead.", - DeprecationWarning, stacklevel=2) - if limit is None: - return Semaphore(count) - else: - return BoundedSemaphore(count) class metaphore(object): """This is sort of an inverse semaphore: a counter that starts at 0 and waits only if nonzero. It's used to implement a "wait for all" scenario. - >>> from eventlet import api, coros + >>> from eventlet import coros, spawn_n >>> count = coros.metaphore() >>> count.wait() >>> def decrementer(count, id): ... print("{0} decrementing".format(id)) ... count.dec() ... - >>> _ = eventlet.spawn(decrementer, count, 'A') - >>> _ = eventlet.spawn(decrementer, count, 'B') + >>> _ = spawn_n(decrementer, count, 'A') + >>> _ = spawn_n(decrementer, count, 'B') >>> count.inc(2) >>> count.wait() A decrementing @@ -116,212 +59,3 @@ class metaphore(object): resume the caller once the count decrements to zero again. """ self.event.wait() - - -def execute(func, *args, **kw): - """ Executes an operation asynchronously in a new coroutine, returning - an event to retrieve the return value. - - This has the same api as the :meth:`eventlet.coros.CoroutinePool.execute` - method; the only difference is that this one creates a new coroutine - instead of drawing from a pool. - - >>> from eventlet import coros - >>> evt = coros.execute(lambda a: ('foo', a), 1) - >>> evt.wait() - ('foo', 1) - """ - warnings.warn( - "Coros.execute is deprecated. Please use eventlet.spawn " - "instead.", DeprecationWarning, stacklevel=2) - return greenthread.spawn(func, *args, **kw) - - -def CoroutinePool(*args, **kwargs): - warnings.warn( - "CoroutinePool is deprecated. Please use " - "eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2) - from eventlet.pool import Pool - return Pool(*args, **kwargs) - - -class Queue(object): - - def __init__(self): - warnings.warn( - "coros.Queue is deprecated. Please use " - "eventlet.queue.Queue instead.", - DeprecationWarning, stacklevel=2) - self.items = collections.deque() - self._waiters = set() - - def __nonzero__(self): - return len(self.items) > 0 - - __bool__ = __nonzero__ - - def __len__(self): - return len(self.items) - - def __repr__(self): - params = (self.__class__.__name__, hex(id(self)), - len(self.items), len(self._waiters)) - return '<%s at %s items[%d] _waiters[%s]>' % params - - def send(self, result=None, exc=None): - if exc is not None and not isinstance(exc, tuple): - exc = (exc, ) - self.items.append((result, exc)) - if self._waiters: - hubs.get_hub().schedule_call_global(0, self._do_send) - - def send_exception(self, *args): - # the arguments are the same as for greenlet.throw - return self.send(exc=args) - - def _do_send(self): - if self._waiters and self.items: - waiter = self._waiters.pop() - result, exc = self.items.popleft() - waiter.switch((result, exc)) - - def wait(self): - if self.items: - result, exc = self.items.popleft() - if exc is None: - return result - else: - eventlet.getcurrent().throw(*exc) - else: - self._waiters.add(eventlet.getcurrent()) - try: - result, exc = hubs.get_hub().switch() - if exc is None: - return result - else: - eventlet.getcurrent().throw(*exc) - finally: - self._waiters.discard(eventlet.getcurrent()) - - def ready(self): - return len(self.items) > 0 - - def full(self): - # for consistency with Channel - return False - - def waiting(self): - return len(self._waiters) - - def __iter__(self): - return self - - def next(self): - return self.wait() - - -class Channel(object): - - def __init__(self, max_size=0): - warnings.warn( - "coros.Channel is deprecated. Please use " - "eventlet.queue.Queue(0) instead.", - DeprecationWarning, stacklevel=2) - self.max_size = max_size - self.items = collections.deque() - self._waiters = set() - self._senders = set() - - def __nonzero__(self): - return len(self.items) > 0 - - __bool__ = __nonzero__ - - def __len__(self): - return len(self.items) - - def __repr__(self): - params = (self.__class__.__name__, hex(id(self)), - self.max_size, len(self.items), - len(self._waiters), len(self._senders)) - return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params - - def send(self, result=None, exc=None): - if exc is not None and not isinstance(exc, tuple): - exc = (exc, ) - if eventlet.getcurrent() is hubs.get_hub().greenlet: - self.items.append((result, exc)) - if self._waiters: - hubs.get_hub().schedule_call_global(0, self._do_switch) - else: - self.items.append((result, exc)) - # note that send() does not work well with timeouts. if your timeout fires - # after this point, the item will remain in the queue - if self._waiters: - hubs.get_hub().schedule_call_global(0, self._do_switch) - if len(self.items) > self.max_size: - self._senders.add(eventlet.getcurrent()) - try: - hubs.get_hub().switch() - finally: - self._senders.discard(eventlet.getcurrent()) - - def send_exception(self, *args): - # the arguments are the same as for greenlet.throw - return self.send(exc=args) - - def _do_switch(self): - while True: - if self._waiters and self.items: - waiter = self._waiters.pop() - result, exc = self.items.popleft() - try: - waiter.switch((result, exc)) - except: - traceback.print_exc() - elif self._senders and len(self.items) <= self.max_size: - sender = self._senders.pop() - try: - sender.switch() - except: - traceback.print_exc() - else: - break - - def wait(self): - if self.items: - result, exc = self.items.popleft() - if len(self.items) <= self.max_size: - hubs.get_hub().schedule_call_global(0, self._do_switch) - if exc is None: - return result - else: - eventlet.getcurrent().throw(*exc) - else: - if self._senders: - hubs.get_hub().schedule_call_global(0, self._do_switch) - self._waiters.add(eventlet.getcurrent()) - try: - result, exc = hubs.get_hub().switch() - if exc is None: - return result - else: - eventlet.getcurrent().throw(*exc) - finally: - self._waiters.discard(eventlet.getcurrent()) - - def ready(self): - return len(self.items) > 0 - - def full(self): - return len(self.items) >= self.max_size - - def waiting(self): - return max(0, len(self._waiters) - len(self.items)) - - -def queue(max_size=None): - if max_size is None: - return Queue() - else: - return Channel(max_size) diff --git a/eventlet/green/_socket_nodns.py b/eventlet/green/_socket_nodns.py index df83737..373c140 100644 --- a/eventlet/green/_socket_nodns.py +++ b/eventlet/green/_socket_nodns.py @@ -9,7 +9,6 @@ slurp_properties(__socket, globals(), os = __import__('os') import sys -import warnings from eventlet.hubs import get_hub from eventlet.greenio import GreenSocket as socket from eventlet.greenio import SSL as _SSL # for exceptions @@ -86,18 +85,3 @@ class GreenSSLObject(object): for debugging purposes; do not parse the content of this string because its format can't be parsed unambiguously.""" return str(self.connection.get_peer_certificate().get_issuer()) - - -try: - from eventlet.green import ssl as ssl_module - sslerror = __socket.sslerror - __socket.ssl -except AttributeError: - # if the real socket module doesn't have the ssl method or sslerror - # exception, we can't emulate them - pass -else: - def ssl(sock, certificate=None, private_key=None): - warnings.warn("socket.ssl() is deprecated. Use ssl.wrap_socket() instead.", - DeprecationWarning, stacklevel=2) - return ssl_module.sslwrap_simple(sock, private_key, certificate) diff --git a/eventlet/pool.py b/eventlet/pool.py deleted file mode 100644 index be9db8f..0000000 --- a/eventlet/pool.py +++ /dev/null @@ -1,321 +0,0 @@ -from __future__ import print_function - -from eventlet import coros, proc, api -from eventlet.semaphore import Semaphore -from eventlet.support import six - -import warnings -warnings.warn( - "The pool module is deprecated. Please use the " - "eventlet.GreenPool and eventlet.GreenPile classes instead.", - DeprecationWarning, stacklevel=2) - - -class Pool(object): - def __init__(self, min_size=0, max_size=4, track_events=False): - if min_size > max_size: - raise ValueError('min_size cannot be bigger than max_size') - self.max_size = max_size - self.sem = Semaphore(max_size) - self.procs = proc.RunningProcSet() - if track_events: - self.results = coros.queue() - else: - self.results = None - - def resize(self, new_max_size): - """ Change the :attr:`max_size` of the pool. - - If the pool gets resized when there are more than *new_max_size* - coroutines checked out, when they are returned to the pool they will be - discarded. The return value of :meth:`free` will be negative in this - situation. - """ - max_size_delta = new_max_size - self.max_size - self.sem.counter += max_size_delta - self.max_size = new_max_size - - @property - def current_size(self): - """ The number of coroutines that are currently executing jobs. """ - return len(self.procs) - - def free(self): - """ Returns the number of coroutines that are available for doing - work.""" - return self.sem.counter - - def execute(self, func, *args, **kwargs): - """Execute func in one of the coroutines maintained - by the pool, when one is free. - - Immediately returns a :class:`~eventlet.proc.Proc` object which can be - queried for the func's result. - - >>> pool = Pool() - >>> task = pool.execute(lambda a: ('foo', a), 1) - >>> task.wait() - ('foo', 1) - """ - # if reentering an empty pool, don't try to wait on a coroutine freeing - # itself -- instead, just execute in the current coroutine - if self.sem.locked() and api.getcurrent() in self.procs: - p = proc.spawn(func, *args, **kwargs) - try: - p.wait() - except: - pass - else: - self.sem.acquire() - p = self.procs.spawn(func, *args, **kwargs) - # assuming the above line cannot raise - p.link(lambda p: self.sem.release()) - if self.results is not None: - p.link(self.results) - return p - - execute_async = execute - - def _execute(self, evt, func, args, kw): - p = self.execute(func, *args, **kw) - p.link(evt) - return p - - def waitall(self): - """ Calling this function blocks until every coroutine - completes its work (i.e. there are 0 running coroutines).""" - return self.procs.waitall() - - wait_all = waitall - - def wait(self): - """Wait for the next execute in the pool to complete, - and return the result.""" - return self.results.wait() - - def waiting(self): - """Return the number of coroutines waiting to execute. - """ - if self.sem.balance < 0: - return -self.sem.balance - else: - return 0 - - def killall(self): - """ Kill every running coroutine as immediately as possible.""" - return self.procs.killall() - - def launch_all(self, function, iterable): - """For each tuple (sequence) in *iterable*, launch ``function(*tuple)`` - in its own coroutine -- like ``itertools.starmap()``, but in parallel. - Discard values returned by ``function()``. You should call - ``wait_all()`` to wait for all coroutines, newly-launched plus any - previously-submitted :meth:`execute` or :meth:`execute_async` calls, to - complete. - - >>> pool = Pool() - >>> def saw(x): - ... print("I saw %s!" % x) - ... - >>> pool.launch_all(saw, "ABC") - >>> pool.wait_all() - I saw A! - I saw B! - I saw C! - """ - for tup in iterable: - self.execute(function, *tup) - - def process_all(self, function, iterable): - """For each tuple (sequence) in *iterable*, launch ``function(*tuple)`` - in its own coroutine -- like ``itertools.starmap()``, but in parallel. - Discard values returned by ``function()``. Don't return until all - coroutines, newly-launched plus any previously-submitted :meth:`execute()` - or :meth:`execute_async` calls, have completed. - - >>> from eventlet import coros - >>> pool = coros.CoroutinePool() - >>> def saw(x): print("I saw %s!" % x) - ... - >>> pool.process_all(saw, "DEF") - I saw D! - I saw E! - I saw F! - """ - self.launch_all(function, iterable) - self.wait_all() - - def generate_results(self, function, iterable, qsize=None): - """For each tuple (sequence) in *iterable*, launch ``function(*tuple)`` - in its own coroutine -- like ``itertools.starmap()``, but in parallel. - Yield each of the values returned by ``function()``, in the order - they're completed rather than the order the coroutines were launched. - - Iteration stops when we've yielded results for each arguments tuple in - *iterable*. Unlike :meth:`wait_all` and :meth:`process_all`, this - function does not wait for any previously-submitted :meth:`execute` or - :meth:`execute_async` calls. - - Results are temporarily buffered in a queue. If you pass *qsize=*, this - value is used to limit the max size of the queue: an attempt to buffer - too many results will suspend the completed :class:`CoroutinePool` - coroutine until the requesting coroutine (the caller of - :meth:`generate_results`) has retrieved one or more results by calling - this generator-iterator's ``next()``. - - If any coroutine raises an uncaught exception, that exception will - propagate to the requesting coroutine via the corresponding ``next()`` - call. - - What I particularly want these tests to illustrate is that using this - generator function:: - - for result in generate_results(function, iterable): - # ... do something with result ... - pass - - executes coroutines at least as aggressively as the classic eventlet - idiom:: - - events = [pool.execute(function, *args) for args in iterable] - for event in events: - result = event.wait() - # ... do something with result ... - - even without a distinct event object for every arg tuple in *iterable*, - and despite the funny flow control from interleaving launches of new - coroutines with yields of completed coroutines' results. - - (The use case that makes this function preferable to the classic idiom - above is when the *iterable*, which may itself be a generator, produces - millions of items.) - - >>> from eventlet import coros - >>> from eventlet.support import six - >>> import string - >>> pool = coros.CoroutinePool(max_size=5) - >>> pausers = [coros.Event() for x in range(2)] - >>> def longtask(evt, desc): - ... print("%s woke up with %s" % (desc, evt.wait())) - ... - >>> pool.launch_all(longtask, zip(pausers, "AB")) - >>> def quicktask(desc): - ... print("returning %s" % desc) - ... return desc - ... - - (Instead of using a ``for`` loop, step through :meth:`generate_results` - items individually to illustrate timing) - - >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) - >>> print(six.next(step)) - returning a - returning b - returning c - a - >>> print(six.next(step)) - b - >>> print(six.next(step)) - c - >>> print(six.next(step)) - returning d - returning e - returning f - d - >>> pausers[0].send("A") - >>> print(six.next(step)) - e - >>> print(six.next(step)) - f - >>> print(six.next(step)) - A woke up with A - returning g - returning h - returning i - g - >>> print("".join([six.next(step) for x in range(3)])) - returning j - returning k - returning l - returning m - hij - >>> pausers[1].send("B") - >>> print("".join([six.next(step) for x in range(4)])) - B woke up with B - returning n - returning o - returning p - returning q - klmn - """ - # Get an iterator because of our funny nested loop below. Wrap the - # iterable in enumerate() so we count items that come through. - tuples = iter(enumerate(iterable)) - # If the iterable is empty, this whole function is a no-op, and we can - # save ourselves some grief by just quitting out. In particular, once - # we enter the outer loop below, we're going to wait on the queue -- - # but if we launched no coroutines with that queue as the destination, - # we could end up waiting a very long time. - try: - index, args = six.next(tuples) - except StopIteration: - return - # From this point forward, 'args' is the current arguments tuple and - # 'index+1' counts how many such tuples we've seen. - # This implementation relies on the fact that _execute() accepts an - # event-like object, and -- unless it's None -- the completed - # coroutine calls send(result). We slyly pass a queue rather than an - # event -- the same queue instance for all coroutines. This is why our - # queue interface intentionally resembles the event interface. - q = coros.queue(max_size=qsize) - # How many results have we yielded so far? - finished = 0 - # This first loop is only until we've launched all the coroutines. Its - # complexity is because if iterable contains more args tuples than the - # size of our pool, attempting to _execute() the (poolsize+1)th - # coroutine would suspend until something completes and send()s its - # result to our queue. But to keep down queue overhead and to maximize - # responsiveness to our caller, we'd rather suspend on reading the - # queue. So we stuff the pool as full as we can, then wait for - # something to finish, then stuff more coroutines into the pool. - try: - while True: - # Before each yield, start as many new coroutines as we can fit. - # (The self.free() test isn't 100% accurate: if we happen to be - # executing in one of the pool's coroutines, we could _execute() - # without waiting even if self.free() reports 0. See _execute().) - # The point is that we don't want to wait in the _execute() call, - # we want to wait in the q.wait() call. - # IMPORTANT: at start, and whenever we've caught up with all - # coroutines we've launched so far, we MUST iterate this inner - # loop at least once, regardless of self.free() -- otherwise the - # q.wait() call below will deadlock! - # Recall that index is the index of the NEXT args tuple that we - # haven't yet launched. Therefore it counts how many args tuples - # we've launched so far. - while self.free() > 0 or finished == index: - # Just like the implementation of execute_async(), save that - # we're passing our queue instead of None as the "event" to - # which to send() the result. - self._execute(q, function, args, {}) - # We've consumed that args tuple, advance to next. - index, args = six.next(tuples) - # Okay, we've filled up the pool again, yield a result -- which - # will probably wait for a coroutine to complete. Although we do - # have q.ready(), so we could iterate without waiting, we avoid - # that because every yield could involve considerable real time. - # We don't know how long it takes to return from yield, so every - # time we do, take the opportunity to stuff more requests into the - # pool before yielding again. - yield q.wait() - # Be sure to count results so we know when to stop! - finished += 1 - except StopIteration: - pass - # Here we've exhausted the input iterable. index+1 is the total number - # of coroutines we've launched. We probably haven't yielded that many - # results yet. Wait for the rest of the results, yielding them as they - # arrive. - while finished < index + 1: - yield q.wait() - finished += 1 diff --git a/eventlet/proc.py b/eventlet/proc.py deleted file mode 100644 index 52e98d8..0000000 --- a/eventlet/proc.py +++ /dev/null @@ -1,739 +0,0 @@ -""" -This module provides means to spawn, kill and link coroutines. Linking means -subscribing to the coroutine's result, either in form of return value or -unhandled exception. - -To create a linkable coroutine use spawn function provided by this module: - - >>> def demofunc(x, y): - ... return x / y - >>> p = spawn(demofunc, 6, 2) - -The return value of :func:`spawn` is an instance of :class:`Proc` class that -you can "link": - - * ``p.link(obj)`` - notify *obj* when the coroutine is finished - -What "notify" means here depends on the type of *obj*: a callable is simply -called, an :class:`~eventlet.coros.Event` or a :class:`~eventlet.coros.queue` -is notified using ``send``/``send_exception`` methods and if *obj* is another -greenlet it's killed with :class:`LinkedExited` exception. - -Here's an example: - ->>> event = coros.Event() ->>> _ = p.link(event) ->>> event.wait() -3 - -Now, even though *p* is finished it's still possible to link it. In this -case the notification is performed immediatelly: - ->>> try: -... p.link() -... except LinkedCompleted: -... print('LinkedCompleted') -LinkedCompleted - -(Without an argument, the link is created to the current greenlet) - -There are also :meth:`~eventlet.proc.Source.link_value` and -:func:`link_exception` methods that only deliver a return value and an -unhandled exception respectively (plain :meth:`~eventlet.proc.Source.link` -delivers both). Suppose we want to spawn a greenlet to do an important part of -the task; if it fails then there's no way to complete the task so the parent -must fail as well; :meth:`~eventlet.proc.Source.link_exception` is useful here: - ->>> p = spawn(demofunc, 1, 0) ->>> _ = p.link_exception() ->>> try: -... api.sleep(1) -... except LinkedFailed: -... print('LinkedFailed') -LinkedFailed - -One application of linking is :func:`waitall` function: link to a bunch of -coroutines and wait for all them to complete. Such a function is provided by -this module. -""" -import sys - -from eventlet import api, coros, hubs -from eventlet.support import six - -import warnings -warnings.warn( - "The proc module is deprecated! Please use the greenthread " - "module, or any of the many other Eventlet cross-coroutine " - "primitives, instead.", - DeprecationWarning, stacklevel=2) - -__all__ = ['LinkedExited', - 'LinkedFailed', - 'LinkedCompleted', - 'LinkedKilled', - 'ProcExit', - 'Link', - 'waitall', - 'killall', - 'Source', - 'Proc', - 'spawn', - 'spawn_link', - 'spawn_link_value', - 'spawn_link_exception'] - - -class LinkedExited(Exception): - """Raised when a linked proc exits""" - msg = "%r exited" - - def __init__(self, name=None, msg=None): - self.name = name - if msg is None: - msg = self.msg % self.name - Exception.__init__(self, msg) - - -class LinkedCompleted(LinkedExited): - """Raised when a linked proc finishes the execution cleanly""" - - msg = "%r completed successfully" - - -class LinkedFailed(LinkedExited): - """Raised when a linked proc dies because of unhandled exception""" - msg = "%r failed with %s" - - def __init__(self, name, typ, value=None, tb=None): - msg = self.msg % (name, typ.__name__) - LinkedExited.__init__(self, name, msg) - - -class LinkedKilled(LinkedFailed): - """Raised when a linked proc dies because of unhandled GreenletExit - (i.e. it was killed) - """ - msg = """%r was killed with %s""" - - -def getLinkedFailed(name, typ, value=None, tb=None): - if issubclass(typ, api.GreenletExit): - return LinkedKilled(name, typ, value, tb) - return LinkedFailed(name, typ, value, tb) - - -class ProcExit(api.GreenletExit): - """Raised when this proc is killed.""" - - -class Link(object): - """ - A link to a greenlet, triggered when the greenlet exits. - """ - - def __init__(self, listener): - self.listener = listener - - def cancel(self): - self.listener = None - - def __enter__(self): - pass - - def __exit__(self, *args): - self.cancel() - - -class LinkToEvent(Link): - - def __call__(self, source): - if self.listener is None: - return - if source.has_value(): - self.listener.send(source.value) - else: - self.listener.send_exception(*source.exc_info()) - - -class LinkToGreenlet(Link): - - def __call__(self, source): - if source.has_value(): - self.listener.throw(LinkedCompleted(source.name)) - else: - self.listener.throw(getLinkedFailed(source.name, *source.exc_info())) - - -class LinkToCallable(Link): - - def __call__(self, source): - self.listener(source) - - -def waitall(lst, trap_errors=False, queue=None): - if queue is None: - queue = coros.queue() - index = -1 - for (index, linkable) in enumerate(lst): - linkable.link(decorate_send(queue, index)) - len = index + 1 - results = [None] * len - count = 0 - while count < len: - try: - index, value = queue.wait() - except Exception: - if not trap_errors: - raise - else: - results[index] = value - count += 1 - return results - - -class decorate_send(object): - - def __init__(self, event, tag): - self._event = event - self._tag = tag - - def __repr__(self): - params = (type(self).__name__, self._tag, self._event) - return '<%s tag=%r event=%r>' % params - - def __getattr__(self, name): - assert name != '_event' - return getattr(self._event, name) - - def send(self, value): - self._event.send((self._tag, value)) - - -def killall(procs, *throw_args, **kwargs): - if not throw_args: - throw_args = (ProcExit, ) - wait = kwargs.pop('wait', False) - if kwargs: - raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(kwargs.keys())) - for g in procs: - if not g.dead: - hubs.get_hub().schedule_call_global(0, g.throw, *throw_args) - if wait and api.getcurrent() is not hubs.get_hub().greenlet: - api.sleep(0) - - -class NotUsed(object): - - def __str__(self): - return '<Source instance does not hold a value or an exception>' - - __repr__ = __str__ - -_NOT_USED = NotUsed() - - -def spawn_greenlet(function, *args): - """Create a new greenlet that will run ``function(*args)``. - The current greenlet won't be unscheduled. Keyword arguments aren't - supported (limitation of greenlet), use :func:`spawn` to work around that. - """ - g = api.Greenlet(function) - g.parent = hubs.get_hub().greenlet - hubs.get_hub().schedule_call_global(0, g.switch, *args) - return g - - -class Source(object): - """Maintain a set of links to the listeners. Delegate the sent value or - the exception to all of them. - - To set up a link, use :meth:`link_value`, :meth:`link_exception` or - :meth:`link` method. The latter establishes both "value" and "exception" - link. It is possible to link to events, queues, greenlets and callables. - - >>> source = Source() - >>> event = coros.Event() - >>> _ = source.link(event) - - Once source's :meth:`send` or :meth:`send_exception` method is called, all - the listeners with the right type of link will be notified ("right type" - means that exceptions won't be delivered to "value" links and values won't - be delivered to "exception" links). Once link has been fired it is removed. - - Notifying listeners is performed in the **mainloop** greenlet. Under the - hood notifying a link means executing a callback, see :class:`Link` class - for details. Notification *must not* attempt to switch to the hub, i.e. - call any blocking functions. - - >>> source.send('hello') - >>> event.wait() - 'hello' - - Any error happened while sending will be logged as a regular unhandled - exception. This won't prevent other links from being fired. - - There 3 kinds of listeners supported: - - 1. If *listener* is a greenlet (regardless if it's a raw greenlet or an - extension like :class:`Proc`), a subclass of :class:`LinkedExited` - exception is raised in it. - - 2. If *listener* is something with send/send_exception methods (event, - queue, :class:`Source` but not :class:`Proc`) the relevant method is - called. - - 3. If *listener* is a callable, it is called with 1 argument (the result) - for "value" links and with 3 arguments ``(typ, value, tb)`` for - "exception" links. - """ - - def __init__(self, name=None): - self.name = name - self._value_links = {} - self._exception_links = {} - self.value = _NOT_USED - self._exc = None - - def _repr_helper(self): - result = [] - result.append(repr(self.name)) - if self.value is not _NOT_USED: - if self._exc is None: - res = repr(self.value) - if len(res) > 50: - res = res[:50] + '...' - result.append('result=%s' % res) - else: - result.append('raised=%s' % (self._exc, )) - result.append('{%s:%s}' % (len(self._value_links), len(self._exception_links))) - return result - - def __repr__(self): - klass = type(self).__name__ - return '<%s at %s %s>' % (klass, hex(id(self)), ' '.join(self._repr_helper())) - - def ready(self): - return self.value is not _NOT_USED - - def has_value(self): - return self.value is not _NOT_USED and self._exc is None - - def has_exception(self): - return self.value is not _NOT_USED and self._exc is not None - - def exc_info(self): - if not self._exc: - return (None, None, None) - elif len(self._exc) == 3: - return self._exc - elif len(self._exc) == 1: - if isinstance(self._exc[0], type): - return self._exc[0], None, None - else: - return self._exc[0].__class__, self._exc[0], None - elif len(self._exc) == 2: - return self._exc[0], self._exc[1], None - else: - return self._exc - - def link_value(self, listener=None, link=None): - if self.ready() and self._exc is not None: - return - if listener is None: - listener = api.getcurrent() - if link is None: - link = self.getLink(listener) - if self.ready() and listener is api.getcurrent(): - link(self) - else: - self._value_links[listener] = link - if self.value is not _NOT_USED: - self._start_send() - return link - - def link_exception(self, listener=None, link=None): - if self.value is not _NOT_USED and self._exc is None: - return - if listener is None: - listener = api.getcurrent() - if link is None: - link = self.getLink(listener) - if self.ready() and listener is api.getcurrent(): - link(self) - else: - self._exception_links[listener] = link - if self.value is not _NOT_USED: - self._start_send_exception() - return link - - def link(self, listener=None, link=None): - if listener is None: - listener = api.getcurrent() - if link is None: - link = self.getLink(listener) - if self.ready() and listener is api.getcurrent(): - if self._exc is None: - link(self) - else: - link(self) - else: - self._value_links[listener] = link - self._exception_links[listener] = link - if self.value is not _NOT_USED: - if self._exc is None: - self._start_send() - else: - self._start_send_exception() - return link - - def unlink(self, listener=None): - if listener is None: - listener = api.getcurrent() - self._value_links.pop(listener, None) - self._exception_links.pop(listener, None) - - @staticmethod - def getLink(listener): - if hasattr(listener, 'throw'): - return LinkToGreenlet(listener) - if hasattr(listener, 'send'): - return LinkToEvent(listener) - elif hasattr(listener, '__call__'): - return LinkToCallable(listener) - else: - raise TypeError("Don't know how to link to %r" % (listener, )) - - def send(self, value): - assert not self.ready(), "%s has been fired already" % self - self.value = value - self._exc = None - self._start_send() - - def _start_send(self): - links_items = list(six.iteritems(self._value_links)) - hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._value_links) - - def send_exception(self, *throw_args): - assert not self.ready(), "%s has been fired already" % self - self.value = None - self._exc = throw_args - self._start_send_exception() - - def _start_send_exception(self): - links_items = list(six.iteritems(self._exception_links)) - hubs.get_hub().schedule_call_global(0, self._do_send, links_items, self._exception_links) - - def _do_send(self, links, consult): - while links: - listener, link = links.pop() - try: - if listener in consult: - try: - link(self) - finally: - consult.pop(listener, None) - except: - hubs.get_hub().schedule_call_global(0, self._do_send, links, consult) - raise - - def wait(self, timeout=None, *throw_args): - """Wait until :meth:`send` or :meth:`send_exception` is called or - *timeout* has expired. Return the argument of :meth:`send` or raise the - argument of :meth:`send_exception`. If *timeout* has expired, ``None`` - is returned. - - The arguments, when provided, specify how many seconds to wait and what - to do when *timeout* has expired. They are treated the same way as - :func:`~eventlet.api.timeout` treats them. - """ - if self.value is not _NOT_USED: - if self._exc is None: - return self.value - else: - api.getcurrent().throw(*self._exc) - if timeout is not None: - timer = api.timeout(timeout, *throw_args) - timer.__enter__() - if timeout == 0: - if timer.__exit__(None, None, None): - return - else: - try: - api.getcurrent().throw(*timer.throw_args) - except: - if not timer.__exit__(*sys.exc_info()): - raise - return - EXC = True - try: - try: - waiter = Waiter() - self.link(waiter) - try: - return waiter.wait() - finally: - self.unlink(waiter) - except: - EXC = False - if timeout is None or not timer.__exit__(*sys.exc_info()): - raise - finally: - if timeout is not None and EXC: - timer.__exit__(None, None, None) - - -class Waiter(object): - - def __init__(self): - self.greenlet = None - - def send(self, value): - """Wake up the greenlet that is calling wait() currently (if there is one). - Can only be called from get_hub().greenlet. - """ - assert api.getcurrent() is hubs.get_hub().greenlet - if self.greenlet is not None: - self.greenlet.switch(value) - - def send_exception(self, *throw_args): - """Make greenlet calling wait() wake up (if there is a wait()). - Can only be called from get_hub().greenlet. - """ - assert api.getcurrent() is hubs.get_hub().greenlet - if self.greenlet is not None: - self.greenlet.throw(*throw_args) - - def wait(self): - """Wait until send or send_exception is called. Return value passed - into send() or raise exception passed into send_exception(). - """ - assert self.greenlet is None - current = api.getcurrent() - assert current is not hubs.get_hub().greenlet - self.greenlet = current - try: - return hubs.get_hub().switch() - finally: - self.greenlet = None - - -class Proc(Source): - """A linkable coroutine based on Source. - Upon completion, delivers coroutine's result to the listeners. - """ - - def __init__(self, name=None): - self.greenlet = None - Source.__init__(self, name) - - def _repr_helper(self): - if self.greenlet is not None and self.greenlet.dead: - dead = '(dead)' - else: - dead = '' - return ['%r%s' % (self.greenlet, dead)] + Source._repr_helper(self) - - def __repr__(self): - klass = type(self).__name__ - return '<%s %s>' % (klass, ' '.join(self._repr_helper())) - - def __nonzero__(self): - if self.ready(): - # with current _run this does not makes any difference - # still, let keep it there - return False - # otherwise bool(proc) is the same as bool(greenlet) - if self.greenlet is not None: - return bool(self.greenlet) - - __bool__ = __nonzero__ - - @property - def dead(self): - return self.ready() or self.greenlet.dead - - @classmethod - def spawn(cls, function, *args, **kwargs): - """Return a new :class:`Proc` instance that is scheduled to execute - ``function(*args, **kwargs)`` upon the next hub iteration. - """ - proc = cls() - proc.run(function, *args, **kwargs) - return proc - - def run(self, function, *args, **kwargs): - """Create a new greenlet to execute ``function(*args, **kwargs)``. - The created greenlet is scheduled to run upon the next hub iteration. - """ - assert self.greenlet is None, "'run' can only be called once per instance" - if self.name is None: - self.name = str(function) - self.greenlet = spawn_greenlet(self._run, function, args, kwargs) - - def _run(self, function, args, kwargs): - """Internal top level function. - Execute *function* and send its result to the listeners. - """ - try: - result = function(*args, **kwargs) - except: - self.send_exception(*sys.exc_info()) - raise # let mainloop log the exception - else: - self.send(result) - - def throw(self, *throw_args): - """Used internally to raise the exception. - - Behaves exactly like greenlet's 'throw' with the exception that - :class:`ProcExit` is raised by default. Do not use this function as it - leaves the current greenlet unscheduled forever. Use :meth:`kill` - method instead. - """ - if not self.dead: - if not throw_args: - throw_args = (ProcExit, ) - self.greenlet.throw(*throw_args) - - def kill(self, *throw_args): - """ - Raise an exception in the greenlet. Unschedule the current greenlet so - that this :class:`Proc` can handle the exception (or die). - - The exception can be specified with *throw_args*. By default, - :class:`ProcExit` is raised. - """ - if not self.dead: - if not throw_args: - throw_args = (ProcExit, ) - hubs.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args) - if api.getcurrent() is not hubs.get_hub().greenlet: - api.sleep(0) - - # QQQ maybe Proc should not inherit from Source (because its send() and send_exception() - # QQQ methods are for internal use only) - - -spawn = Proc.spawn - - -def spawn_link(function, *args, **kwargs): - p = spawn(function, *args, **kwargs) - p.link() - return p - - -def spawn_link_value(function, *args, **kwargs): - p = spawn(function, *args, **kwargs) - p.link_value() - return p - - -def spawn_link_exception(function, *args, **kwargs): - p = spawn(function, *args, **kwargs) - p.link_exception() - return p - - -class wrap_errors(object): - """Helper to make function return an exception, rather than raise it. - - Because every exception that is unhandled by greenlet will be logged by the hub, - it is desirable to prevent non-error exceptions from leaving a greenlet. - This can done with simple try/except construct: - - def func1(*args, **kwargs): - try: - return func(*args, **kwargs) - except (A, B, C) as ex: - return ex - - wrap_errors provides a shortcut to write that in one line: - - func1 = wrap_errors((A, B, C), func) - - It also preserves __str__ and __repr__ of the original function. - """ - - def __init__(self, errors, func): - """Make a new function from `func', such that it catches `errors' (an - Exception subclass, or a tuple of Exception subclasses) and return - it as a value. - """ - self.errors = errors - self.func = func - - def __call__(self, *args, **kwargs): - try: - return self.func(*args, **kwargs) - except self.errors as ex: - return ex - - def __str__(self): - return str(self.func) - - def __repr__(self): - return repr(self.func) - - def __getattr__(self, item): - return getattr(self.func, item) - - -class RunningProcSet(object): - """ - Maintain a set of :class:`Proc` s that are still running, that is, - automatically remove a proc when it's finished. Provide a way to wait/kill - all of them - """ - - def __init__(self, *args): - self.procs = set(*args) - if args: - for p in self.args[0]: - p.link(lambda p: self.procs.discard(p)) - - def __len__(self): - return len(self.procs) - - def __contains__(self, item): - if isinstance(item, api.Greenlet): - # special case for "api.getcurrent() in running_proc_set" to work - for x in self.procs: - if x.greenlet == item: - return True - else: - return item in self.procs - - def __iter__(self): - return iter(self.procs) - - def add(self, p): - self.procs.add(p) - p.link(lambda p: self.procs.discard(p)) - - def spawn(self, func, *args, **kwargs): - p = spawn(func, *args, **kwargs) - self.add(p) - return p - - def waitall(self, trap_errors=False): - while self.procs: - waitall(self.procs, trap_errors=trap_errors) - - def killall(self, *throw_args, **kwargs): - return killall(self.procs, *throw_args, **kwargs) - - -class Pool(object): - - linkable_class = Proc - - def __init__(self, limit): - self.semaphore = coros.Semaphore(limit) - - def allocate(self): - self.semaphore.acquire() - g = self.linkable_class() - g.link(lambda *_args: self.semaphore.release()) - return g diff --git a/eventlet/processes.py b/eventlet/processes.py deleted file mode 100644 index 1b5dfd6..0000000 --- a/eventlet/processes.py +++ /dev/null @@ -1,169 +0,0 @@ -import warnings -warnings.warn("eventlet.processes is deprecated in favor of " - "eventlet.green.subprocess, which is API-compatible with the standard " - " library subprocess module.", - DeprecationWarning, stacklevel=2) - -import errno -import os -import signal - -import eventlet -from eventlet import greenio, pools -from eventlet.green import subprocess - - -class DeadProcess(RuntimeError): - pass - - -def cooperative_wait(pobj, check_interval=0.01): - """ Waits for a child process to exit, returning the status - code. - - Unlike ``os.wait``, :func:`cooperative_wait` does not block the entire - process, only the calling coroutine. If the child process does not die, - :func:`cooperative_wait` could wait forever. - - The argument *check_interval* is the amount of time, in seconds, that - :func:`cooperative_wait` will sleep between calls to ``os.waitpid``. - """ - try: - while True: - status = pobj.poll() - if status >= 0: - return status - eventlet.sleep(check_interval) - except OSError as e: - if e.errno == errno.ECHILD: - # no child process, this happens if the child process - # already died and has been cleaned up, or if you just - # called with a random pid value - return -1 - else: - raise - - -class Process(object): - """Construct Process objects, then call read, and write on them.""" - process_number = 0 - - def __init__(self, command, args, dead_callback=None): - self.process_number = self.process_number + 1 - Process.process_number = self.process_number - self.command = command - self.args = args - self._dead_callback = dead_callback - self.run() - - def run(self): - self.dead = False - self.started = False - self.proc = None - - args = [self.command] - args.extend(self.args) - self.proc = subprocess.Popen( - args=args, - shell=False, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - close_fds=True, - ) - self.child_stdout_stderr = self.proc.stdout - self.child_stdin = self.proc.stdin - - self.sendall = self.child_stdin.write - self.send = self.child_stdin.write - self.recv = self.child_stdout_stderr.read - self.readline = self.child_stdout_stderr.readline - self._read_first_result = False - - def wait(self): - return cooperative_wait(self.proc) - - def dead_callback(self): - self.wait() - self.dead = True - if self._dead_callback: - self._dead_callback() - - def makefile(self, mode, *arg): - if mode.startswith('r'): - return self.child_stdout_stderr - if mode.startswith('w'): - return self.child_stdin - raise RuntimeError("Unknown mode", mode) - - def read(self, amount=None): - """Reads from the stdout and stderr of the child process. - The first call to read() will return a string; subsequent - calls may raise a DeadProcess when EOF occurs on the pipe. - """ - result = self.child_stdout_stderr.read(amount) - if result == '' and self._read_first_result: - # This process is dead. - self.dead_callback() - raise DeadProcess - else: - self._read_first_result = True - return result - - def write(self, stuff): - written = 0 - try: - written = self.child_stdin.write(stuff) - self.child_stdin.flush() - except ValueError as e: - # File was closed - assert str(e) == 'I/O operation on closed file' - if written == 0: - self.dead_callback() - raise DeadProcess - - def flush(self): - self.child_stdin.flush() - - def close(self): - self.child_stdout_stderr.close() - self.child_stdin.close() - self.dead_callback() - - def close_stdin(self): - self.child_stdin.close() - - def kill(self, sig=None): - if sig is None: - sig = signal.SIGTERM - pid = self.getpid() - os.kill(pid, sig) - - def getpid(self): - return self.proc.pid - - -class ProcessPool(pools.Pool): - def __init__(self, command, args=None, min_size=0, max_size=4): - """*command* - the command to run - """ - self.command = command - if args is None: - args = [] - self.args = args - pools.Pool.__init__(self, min_size, max_size) - - def create(self): - """Generate a process - """ - def dead_callback(): - self.current_size -= 1 - return Process(self.command, self.args, dead_callback) - - def put(self, item): - if not item.dead: - if item.proc.poll() != -1: - item.dead_callback() - else: - pools.Pool.put(self, item) diff --git a/eventlet/twistedutil/__init__.py b/eventlet/twistedutil/__init__.py index a5b8613..4ff50aa 100644 --- a/eventlet/twistedutil/__init__.py +++ b/eventlet/twistedutil/__init__.py @@ -62,7 +62,7 @@ if __name__=='__main__': test() elif num==1: spawn(test) - from eventlet.api import sleep + from eventlet import sleep print('sleeping..') sleep(5) print('done sleeping..') diff --git a/eventlet/twistedutil/protocol.py b/eventlet/twistedutil/protocol.py index 60d43ad..4549bd0 100644 --- a/eventlet/twistedutil/protocol.py +++ b/eventlet/twistedutil/protocol.py @@ -8,8 +8,8 @@ from twisted.python import failure from eventlet import greenthread from eventlet import getcurrent -from eventlet.coros import Queue from eventlet.event import Event as BaseEvent +from eventlet.queue import Queue class ValueQueue(Queue): diff --git a/eventlet/util.py b/eventlet/util.py deleted file mode 100644 index ea0a464..0000000 --- a/eventlet/util.py +++ /dev/null @@ -1,104 +0,0 @@ -import socket -import warnings - - -__original_socket__ = socket.socket - - -def tcp_socket(): - warnings.warn( - "eventlet.util.tcp_socket is deprecated. " - "Please use the standard socket technique for this instead: " - "sock = socket.socket()", - DeprecationWarning, stacklevel=2) - s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM) - return s - - -# if ssl is available, use eventlet.green.ssl for our ssl implementation -from eventlet.green import ssl - - -def wrap_ssl(sock, certificate=None, private_key=None, server_side=False): - warnings.warn( - "eventlet.util.wrap_ssl is deprecated. " - "Please use the eventlet.green.ssl.wrap_socket()", - DeprecationWarning, stacklevel=2) - return ssl.wrap_socket( - sock, - keyfile=private_key, - certfile=certificate, - server_side=server_side, - ) - - -def wrap_socket_with_coroutine_socket(use_thread_pool=None): - warnings.warn( - "eventlet.util.wrap_socket_with_coroutine_socket() is now " - "eventlet.patcher.monkey_patch(all=False, socket=True)", - DeprecationWarning, stacklevel=2) - from eventlet import patcher - patcher.monkey_patch(all=False, socket=True) - - -def wrap_pipes_with_coroutine_pipes(): - warnings.warn( - "eventlet.util.wrap_pipes_with_coroutine_pipes() is now " - "eventlet.patcher.monkey_patch(all=False, os=True)", - DeprecationWarning, stacklevel=2) - from eventlet import patcher - patcher.monkey_patch(all=False, os=True) - - -def wrap_select_with_coroutine_select(): - warnings.warn( - "eventlet.util.wrap_select_with_coroutine_select() is now " - "eventlet.patcher.monkey_patch(all=False, select=True)", - DeprecationWarning, stacklevel=2) - from eventlet import patcher - patcher.monkey_patch(all=False, select=True) - - -def wrap_threading_local_with_coro_local(): - """ - monkey patch ``threading.local`` with something that is greenlet aware. - Since greenlets cannot cross threads, so this should be semantically - identical to ``threadlocal.local`` - """ - warnings.warn( - "eventlet.util.wrap_threading_local_with_coro_local() is now " - "eventlet.patcher.monkey_patch(all=False, thread=True) -- though" - "note that more than just _local is patched now.", - DeprecationWarning, stacklevel=2) - - from eventlet import patcher - patcher.monkey_patch(all=False, thread=True) - - -def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): - warnings.warn( - "eventlet.util.socket_bind_and_listen is deprecated." - "Please use the standard socket methodology for this instead:" - "sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)" - "sock.bind(addr)" - "sock.listen(backlog)", - DeprecationWarning, stacklevel=2) - set_reuse_addr(descriptor) - descriptor.bind(addr) - descriptor.listen(backlog) - return descriptor - - -def set_reuse_addr(descriptor): - warnings.warn( - "eventlet.util.set_reuse_addr is deprecated." - "Please use the standard socket methodology for this instead:" - "sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)", - DeprecationWarning, stacklevel=2) - try: - descriptor.setsockopt( - socket.SOL_SOCKET, - socket.SO_REUSEADDR, - descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1) - except socket.error: - pass diff --git a/tests/api_test.py b/tests/api_test.py index 5bee32d..8cb7410 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -1,21 +1,17 @@ import os import socket from unittest import TestCase, main -import warnings import eventlet -from eventlet import greenio, util, hubs, greenthread, spawn +from eventlet import greenio, hubs, greenthread, spawn +from eventlet.green import ssl from tests import skip_if_no_ssl -warnings.simplefilter('ignore', DeprecationWarning) -from eventlet import api -warnings.simplefilter('default', DeprecationWarning) - def check_hub(): # Clear through the descriptor queue - api.sleep(0) - api.sleep(0) + eventlet.sleep(0) + eventlet.sleep(0) hub = hubs.get_hub() for nm in 'get_readers', 'get_writers': dct = getattr(hub, nm)() @@ -50,7 +46,7 @@ class TestApi(TestCase): listenfd.close() server = eventlet.listen(('0.0.0.0', 0)) - api.spawn(accept_once, server) + eventlet.spawn_n(accept_once, server) client = eventlet.connect(('127.0.0.1', server.getsockname()[1])) fd = client.makefile('rb') @@ -74,13 +70,16 @@ class TestApi(TestCase): greenio.shutdown_safe(listenfd) listenfd.close() - server = api.ssl_listener(('0.0.0.0', 0), - self.certificate_file, - self.private_key_file) - api.spawn(accept_once, server) + server = eventlet.wrap_ssl( + eventlet.listen(('0.0.0.0', 0)), + self.private_key_file, + self.certificate_file, + server_side=True + ) + eventlet.spawn_n(accept_once, server) raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1])) - client = util.wrap_ssl(raw_client) + client = ssl.wrap_socket(raw_client) fd = socket._fileobject(client, 'rb', 8192) assert fd.readline() == b'hello\r\n' @@ -100,13 +99,13 @@ class TestApi(TestCase): def server(sock): client, addr = sock.accept() - api.sleep(0.1) + eventlet.sleep(0.1) server_evt = spawn(server, server_sock) - api.sleep(0) + eventlet.sleep(0) try: desc = eventlet.connect(('127.0.0.1', bound_port)) - api.trampoline(desc, read=True, write=False, timeout=0.001) - except api.TimeoutError: + hubs.trampoline(desc, read=True, write=False, timeout=0.001) + except eventlet.TimeoutError: pass # test passed else: assert False, "Didn't timeout" @@ -128,8 +127,8 @@ class TestApi(TestCase): def go(): desc = eventlet.connect(('127.0.0.1', bound_port)) try: - api.trampoline(desc, read=True, timeout=0.1) - except api.TimeoutError: + hubs.trampoline(desc, read=True, timeout=0.1) + except eventlet.TimeoutError: assert False, "Timed out" server.close() @@ -138,21 +137,13 @@ class TestApi(TestCase): greenthread.spawn_after_local(0, go) - server_coro = api.spawn(client_closer, server) + server_coro = eventlet.spawn(client_closer, server) while not done[0]: - api.sleep(0) - api.kill(server_coro) + eventlet.sleep(0) + eventlet.kill(server_coro) check_hub() - def test_named(self): - named_foo = api.named('tests.api_test.Foo') - self.assertEqual(named_foo.__name__, "Foo") - - def test_naming_missing_class(self): - self.assertRaises( - ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo') - def test_killing_dormant(self): DELAY = 0.1 state = [] @@ -160,33 +151,33 @@ class TestApi(TestCase): def test(): try: state.append('start') - api.sleep(DELAY) + eventlet.sleep(DELAY) except: state.append('except') # catching GreenletExit pass # when switching to hub, hub makes itself the parent of this greenlet, # thus after the function's done, the control will go to the parent - api.sleep(0) + eventlet.sleep(0) state.append('finished') - g = api.spawn(test) - api.sleep(DELAY / 2) + g = eventlet.spawn(test) + eventlet.sleep(DELAY / 2) self.assertEqual(state, ['start']) - api.kill(g) + eventlet.kill(g) # will not get there, unless switching is explicitly scheduled by kill self.assertEqual(state, ['start', 'except']) - api.sleep(DELAY) + eventlet.sleep(DELAY) self.assertEqual(state, ['start', 'except', 'finished']) def test_nested_with_timeout(self): def func(): - return api.with_timeout(0.2, api.sleep, 2, timeout_value=1) + return eventlet.with_timeout(0.2, eventlet.sleep, 2, timeout_value=1) try: - api.with_timeout(0.1, func) - self.fail(u'Expected api.TimeoutError') - except api.TimeoutError: + eventlet.with_timeout(0.1, func) + self.fail(u'Expected TimeoutError') + except eventlet.TimeoutError: pass diff --git a/tests/processes_test.py b/tests/processes_test.py deleted file mode 100644 index 889c824..0000000 --- a/tests/processes_test.py +++ /dev/null @@ -1,110 +0,0 @@ -import sys -import warnings -from tests import LimitedTestCase, main, skip_on_windows - -warnings.simplefilter('ignore', DeprecationWarning) -from eventlet import processes, api -warnings.simplefilter('default', DeprecationWarning) - - -class TestEchoPool(LimitedTestCase): - def setUp(self): - super(TestEchoPool, self).setUp() - self.pool = processes.ProcessPool('echo', ["hello"]) - - @skip_on_windows - def test_echo(self): - result = None - - proc = self.pool.get() - try: - result = proc.read() - finally: - self.pool.put(proc) - self.assertEqual(result, 'hello\n') - - @skip_on_windows - def test_read_eof(self): - proc = self.pool.get() - try: - proc.read() - self.assertRaises(processes.DeadProcess, proc.read) - finally: - self.pool.put(proc) - - @skip_on_windows - def test_empty_echo(self): - p = processes.Process('echo', ['-n']) - self.assertEqual('', p.read()) - self.assertRaises(processes.DeadProcess, p.read) - - -class TestCatPool(LimitedTestCase): - def setUp(self): - super(TestCatPool, self).setUp() - api.sleep(0) - self.pool = processes.ProcessPool('cat') - - @skip_on_windows - def test_cat(self): - result = None - - proc = self.pool.get() - try: - proc.write('goodbye') - proc.close_stdin() - result = proc.read() - finally: - self.pool.put(proc) - - self.assertEqual(result, 'goodbye') - - @skip_on_windows - def test_write_to_dead(self): - result = None - - proc = self.pool.get() - try: - proc.write('goodbye') - proc.close_stdin() - result = proc.read() - self.assertRaises(processes.DeadProcess, proc.write, 'foo') - finally: - self.pool.put(proc) - - @skip_on_windows - def test_close(self): - result = None - - proc = self.pool.get() - try: - proc.write('hello') - proc.close() - self.assertRaises(processes.DeadProcess, proc.write, 'goodbye') - finally: - self.pool.put(proc) - - -class TestDyingProcessesLeavePool(LimitedTestCase): - def setUp(self): - super(TestDyingProcessesLeavePool, self).setUp() - self.pool = processes.ProcessPool('echo', ['hello'], max_size=1) - - @skip_on_windows - def test_dead_process_not_inserted_into_pool(self): - proc = self.pool.get() - try: - try: - result = proc.read() - self.assertEqual(result, 'hello\n') - result = proc.read() - except processes.DeadProcess: - pass - finally: - self.pool.put(proc) - proc2 = self.pool.get() - assert proc is not proc2 - - -if __name__ == '__main__': - main() diff --git a/tests/ssl_test.py b/tests/ssl_test.py index 73b9b43..621ff47 100644 --- a/tests/ssl_test.py +++ b/tests/ssl_test.py @@ -3,9 +3,9 @@ import warnings from unittest import main import eventlet -from eventlet import util, greenio +from eventlet import greenio try: - from eventlet.green.socket import ssl + from eventlet.green import ssl except ImportError: pass from tests import ( @@ -15,8 +15,8 @@ from tests import ( def listen_ssl_socket(address=('127.0.0.1', 0)): - sock = util.wrap_ssl(socket.socket(), certificate_file, - private_key_file, True) + sock = ssl.wrap_socket( + socket.socket(), private_key_file, certificate_file, server_side=True) sock.bind(address) sock.listen(50) @@ -44,7 +44,8 @@ class SSLTest(LimitedTestCase): server_coro = eventlet.spawn(serve, sock) - client = util.wrap_ssl(eventlet.connect(('127.0.0.1', sock.getsockname()[1]))) + client = ssl.wrap_socket( + eventlet.connect(('127.0.0.1', sock.getsockname()[1]))) client.write(b'line 1\r\nline 2\r\n\r\n') self.assertEqual(client.read(8192), b'response') server_coro.wait() @@ -64,7 +65,7 @@ class SSLTest(LimitedTestCase): server_coro = eventlet.spawn(serve, sock) raw_client = eventlet.connect(('127.0.0.1', sock.getsockname()[1])) - client = util.wrap_ssl(raw_client) + client = ssl.wrap_socket(raw_client) client.write(b'X') greenio.shutdown_safe(client) client.close() @@ -79,7 +80,7 @@ class SSLTest(LimitedTestCase): server_coro = eventlet.spawn(serve, sock) raw_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ssl_client = util.wrap_ssl(raw_client) + ssl_client = ssl.wrap_socket(raw_client) ssl_client.connect(('127.0.0.1', sock.getsockname()[1])) ssl_client.write(b'abc') greenio.shutdown_safe(ssl_client) @@ -91,8 +92,8 @@ class SSLTest(LimitedTestCase): def serve(): sock, addr = listener.accept() self.assertEqual(sock.recv(6), b'before') - sock_ssl = util.wrap_ssl(sock, certificate_file, private_key_file, - server_side=True) + sock_ssl = ssl.wrap_socket(sock, private_key_file, certificate_file, + server_side=True) sock_ssl.do_handshake() self.assertEqual(sock_ssl.read(6), b'during') sock2 = sock_ssl.unwrap() @@ -103,7 +104,7 @@ class SSLTest(LimitedTestCase): server_coro = eventlet.spawn(serve) client = eventlet.connect((listener.getsockname())) client.send(b'before') - client_ssl = util.wrap_ssl(client) + client_ssl = ssl.wrap_socket(client) client_ssl.do_handshake() client_ssl.write(b'during') client2 = client_ssl.unwrap() @@ -142,7 +143,7 @@ class SSLTest(LimitedTestCase): client_sock = eventlet.connect(server_sock.getsockname()) client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE) - client = util.wrap_ssl(client_sock) + client = ssl.wrap_socket(client_sock) client.write(b'request') self.assertEqual(client.read(8), b'response') stage_1.send() @@ -159,7 +160,8 @@ class SSLTest(LimitedTestCase): sock.close() listener = listen_ssl_socket(('', 0)) eventlet.spawn(serve, listener) - client = ssl(eventlet.connect(('localhost', listener.getsockname()[1]))) + client = ssl.wrap_socket( + eventlet.connect(('localhost', listener.getsockname()[1]))) self.assertEqual(client.read(1024), b'content') self.assertEqual(client.read(1024), b'') @@ -176,7 +178,7 @@ class SSLTest(LimitedTestCase): listener = listen_ssl_socket(('', 0)) eventlet.spawn(serve, listener) - ssl(eventlet.connect(('localhost', listener.getsockname()[1]))) + ssl.wrap_socket(eventlet.connect(('localhost', listener.getsockname()[1]))) if __name__ == '__main__': main() diff --git a/tests/stdlib/test_thread__boundedsem.py b/tests/stdlib/test_thread__boundedsem.py index 02bd95c..9e99dc4 100644 --- a/tests/stdlib/test_thread__boundedsem.py +++ b/tests/stdlib/test_thread__boundedsem.py @@ -1,15 +1,15 @@ """Test that BoundedSemaphore with a very high bound is as good as unbounded one""" -from eventlet import coros +from eventlet import semaphore from eventlet.green import thread def allocate_lock(): - return coros.semaphore(1, 9999) + return semaphore.Semaphore(1, 9999) original_allocate_lock = thread.allocate_lock thread.allocate_lock = allocate_lock original_LockType = thread.LockType -thread.LockType = coros.CappedSemaphore +thread.LockType = semaphore.CappedSemaphore try: import os.path diff --git a/tests/test__coros_queue.py b/tests/test__coros_queue.py deleted file mode 100644 index d4ee257..0000000 --- a/tests/test__coros_queue.py +++ /dev/null @@ -1,259 +0,0 @@ -from tests import LimitedTestCase, silence_warnings -from unittest import main -import eventlet -from eventlet import coros, spawn, sleep -from eventlet.event import Event - - -class TestQueue(LimitedTestCase): - - @silence_warnings - def test_send_first(self): - q = coros.queue() - q.send('hi') - self.assertEqual(q.wait(), 'hi') - - @silence_warnings - def test_send_exception_first(self): - q = coros.queue() - q.send(exc=RuntimeError()) - self.assertRaises(RuntimeError, q.wait) - - @silence_warnings - def test_send_last(self): - q = coros.queue() - - def waiter(q): - timer = eventlet.Timeout(0.1) - self.assertEqual(q.wait(), 'hi2') - timer.cancel() - - spawn(waiter, q) - sleep(0) - sleep(0) - q.send('hi2') - - @silence_warnings - def test_max_size(self): - q = coros.queue(2) - results = [] - - def putter(q): - q.send('a') - results.append('a') - q.send('b') - results.append('b') - q.send('c') - results.append('c') - - spawn(putter, q) - sleep(0) - self.assertEqual(results, ['a', 'b']) - self.assertEqual(q.wait(), 'a') - sleep(0) - self.assertEqual(results, ['a', 'b', 'c']) - self.assertEqual(q.wait(), 'b') - self.assertEqual(q.wait(), 'c') - - @silence_warnings - def test_zero_max_size(self): - q = coros.queue(0) - - def sender(evt, q): - q.send('hi') - evt.send('done') - - def receiver(evt, q): - x = q.wait() - evt.send(x) - - e1 = Event() - e2 = Event() - - spawn(sender, e1, q) - sleep(0) - assert not e1.ready() - spawn(receiver, e2, q) - self.assertEqual(e2.wait(), 'hi') - self.assertEqual(e1.wait(), 'done') - - @silence_warnings - def test_multiple_waiters(self): - # tests that multiple waiters get their results back - q = coros.queue() - - sendings = ['1', '2', '3', '4'] - gts = [eventlet.spawn(q.wait) - for x in sendings] - - eventlet.sleep(0.01) # get 'em all waiting - - q.send(sendings[0]) - q.send(sendings[1]) - q.send(sendings[2]) - q.send(sendings[3]) - results = set() - for i, gt in enumerate(gts): - results.add(gt.wait()) - self.assertEqual(results, set(sendings)) - - @silence_warnings - def test_waiters_that_cancel(self): - q = coros.queue() - - def do_receive(q, evt): - eventlet.Timeout(0, RuntimeError()) - try: - result = q.wait() - evt.send(result) - except RuntimeError: - evt.send('timed out') - - evt = Event() - spawn(do_receive, q, evt) - self.assertEqual(evt.wait(), 'timed out') - - q.send('hi') - self.assertEqual(q.wait(), 'hi') - - @silence_warnings - def test_senders_that_die(self): - q = coros.queue() - - def do_send(q): - q.send('sent') - - spawn(do_send, q) - self.assertEqual(q.wait(), 'sent') - - @silence_warnings - def test_two_waiters_one_dies(self): - def waiter(q, evt): - evt.send(q.wait()) - - def do_receive(q, evt): - eventlet.Timeout(0, RuntimeError()) - try: - result = q.wait() - evt.send(result) - except RuntimeError: - evt.send('timed out') - - q = coros.queue() - dying_evt = Event() - waiting_evt = Event() - spawn(do_receive, q, dying_evt) - spawn(waiter, q, waiting_evt) - sleep(0) - q.send('hi') - self.assertEqual(dying_evt.wait(), 'timed out') - self.assertEqual(waiting_evt.wait(), 'hi') - - @silence_warnings - def test_two_bogus_waiters(self): - def do_receive(q, evt): - eventlet.Timeout(0, RuntimeError()) - try: - result = q.wait() - evt.send(result) - except RuntimeError: - evt.send('timed out') - - q = coros.queue() - e1 = Event() - e2 = Event() - spawn(do_receive, q, e1) - spawn(do_receive, q, e2) - sleep(0) - q.send('sent') - self.assertEqual(e1.wait(), 'timed out') - self.assertEqual(e2.wait(), 'timed out') - self.assertEqual(q.wait(), 'sent') - - @silence_warnings - def test_waiting(self): - def do_wait(q, evt): - result = q.wait() - evt.send(result) - - q = coros.queue() - e1 = Event() - spawn(do_wait, q, e1) - sleep(0) - self.assertEqual(1, q.waiting()) - q.send('hi') - sleep(0) - self.assertEqual(0, q.waiting()) - self.assertEqual('hi', e1.wait()) - self.assertEqual(0, q.waiting()) - - -class TestChannel(LimitedTestCase): - - @silence_warnings - def test_send(self): - sleep(0.1) - channel = coros.queue(0) - - events = [] - - def another_greenlet(): - events.append(channel.wait()) - events.append(channel.wait()) - - spawn(another_greenlet) - - events.append('sending') - channel.send('hello') - events.append('sent hello') - channel.send('world') - events.append('sent world') - - self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events) - - @silence_warnings - def test_wait(self): - sleep(0.1) - channel = coros.queue(0) - events = [] - - def another_greenlet(): - events.append('sending hello') - channel.send('hello') - events.append('sending world') - channel.send('world') - events.append('sent world') - - spawn(another_greenlet) - - events.append('waiting') - events.append(channel.wait()) - events.append(channel.wait()) - - self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events) - sleep(0) - self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events) - - @silence_warnings - def test_waiters(self): - c = coros.Channel() - w1 = eventlet.spawn(c.wait) - w2 = eventlet.spawn(c.wait) - w3 = eventlet.spawn(c.wait) - sleep(0) - self.assertEqual(c.waiting(), 3) - s1 = eventlet.spawn(c.send, 1) - s2 = eventlet.spawn(c.send, 2) - s3 = eventlet.spawn(c.send, 3) - sleep(0) # this gets all the sends into a waiting state - self.assertEqual(c.waiting(), 0) - - s1.wait() - s2.wait() - s3.wait() - # NOTE: we don't guarantee that waiters are served in order - results = sorted([w1.wait(), w2.wait(), w3.wait()]) - self.assertEqual(results, [1, 2, 3]) - -if __name__ == '__main__': - main() diff --git a/tests/test__event.py b/tests/test__event.py index 8bc7219..065e13b 100644 --- a/tests/test__event.py +++ b/tests/test__event.py @@ -1,6 +1,6 @@ import unittest +from eventlet import spawn, sleep, with_timeout from eventlet.event import Event -from eventlet.api import spawn, sleep, with_timeout import eventlet from tests import LimitedTestCase diff --git a/tests/test__pool.py b/tests/test__pool.py deleted file mode 100644 index 4191625..0000000 --- a/tests/test__pool.py +++ /dev/null @@ -1,321 +0,0 @@ -import eventlet -import warnings -warnings.simplefilter('ignore', DeprecationWarning) -from eventlet import pool, coros, api, hubs, timeout -warnings.simplefilter('default', DeprecationWarning) -from eventlet import event as _event -from eventlet.support import six -from tests import LimitedTestCase -from unittest import main - - -class TestCoroutinePool(LimitedTestCase): - klass = pool.Pool - - def test_execute_async(self): - done = _event.Event() - - def some_work(): - done.send() - pool = self.klass(0, 2) - pool.execute_async(some_work) - done.wait() - - def test_execute(self): - value = 'return value' - - def some_work(): - return value - pool = self.klass(0, 2) - worker = pool.execute(some_work) - self.assertEqual(value, worker.wait()) - - def test_waiting(self): - pool = self.klass(0, 1) - done = _event.Event() - - def consume(): - done.wait() - - def waiter(pool): - evt = pool.execute(consume) - evt.wait() - - waiters = [] - waiters.append(eventlet.spawn(waiter, pool)) - api.sleep(0) - self.assertEqual(pool.waiting(), 0) - waiters.append(eventlet.spawn(waiter, pool)) - api.sleep(0) - self.assertEqual(pool.waiting(), 1) - waiters.append(eventlet.spawn(waiter, pool)) - api.sleep(0) - self.assertEqual(pool.waiting(), 2) - done.send(None) - for w in waiters: - w.wait() - self.assertEqual(pool.waiting(), 0) - - def test_multiple_coros(self): - evt = _event.Event() - results = [] - - def producer(): - results.append('prod') - evt.send() - - def consumer(): - results.append('cons1') - evt.wait() - results.append('cons2') - - pool = self.klass(0, 2) - done = pool.execute(consumer) - pool.execute_async(producer) - done.wait() - self.assertEqual(['cons1', 'prod', 'cons2'], results) - - def test_timer_cancel(self): - # this test verifies that local timers are not fired - # outside of the context of the execute method - timer_fired = [] - - def fire_timer(): - timer_fired.append(True) - - def some_work(): - hubs.get_hub().schedule_call_local(0, fire_timer) - pool = self.klass(0, 2) - worker = pool.execute(some_work) - worker.wait() - api.sleep(0) - self.assertEqual(timer_fired, []) - - def test_reentrant(self): - pool = self.klass(0, 1) - - def reenter(): - waiter = pool.execute(lambda a: a, 'reenter') - self.assertEqual('reenter', waiter.wait()) - - outer_waiter = pool.execute(reenter) - outer_waiter.wait() - - evt = _event.Event() - - def reenter_async(): - pool.execute_async(lambda a: a, 'reenter') - evt.send('done') - - pool.execute_async(reenter_async) - evt.wait() - - def assert_pool_has_free(self, pool, num_free): - def wait_long_time(e): - e.wait() - timer = timeout.Timeout(1, api.TimeoutError) - try: - evt = _event.Event() - for x in six.moves.range(num_free): - pool.execute(wait_long_time, evt) - # if the pool has fewer free than we expect, - # then we'll hit the timeout error - finally: - timer.cancel() - - # if the runtime error is not raised it means the pool had - # some unexpected free items - timer = timeout.Timeout(0, RuntimeError) - self.assertRaises(RuntimeError, pool.execute, wait_long_time, evt) - - # clean up by causing all the wait_long_time functions to return - evt.send(None) - api.sleep(0) - api.sleep(0) - - def test_resize(self): - pool = self.klass(max_size=2) - evt = _event.Event() - - def wait_long_time(e): - e.wait() - pool.execute(wait_long_time, evt) - pool.execute(wait_long_time, evt) - self.assertEqual(pool.free(), 0) - self.assert_pool_has_free(pool, 0) - - # verify that the pool discards excess items put into it - pool.resize(1) - - # cause the wait_long_time functions to return, which will - # trigger puts to the pool - evt.send(None) - api.sleep(0) - api.sleep(0) - - self.assertEqual(pool.free(), 1) - self.assert_pool_has_free(pool, 1) - - # resize larger and assert that there are more free items - pool.resize(2) - self.assertEqual(pool.free(), 2) - self.assert_pool_has_free(pool, 2) - - def test_stderr_raising(self): - # testing that really egregious errors in the error handling code - # (that prints tracebacks to stderr) don't cause the pool to lose - # any members - import sys - pool = self.klass(min_size=1, max_size=1) - - def crash(*args, **kw): - raise RuntimeError("Whoa") - - class FakeFile(object): - write = crash - - # we're going to do this by causing the traceback.print_exc in - # safe_apply to raise an exception and thus exit _main_loop - normal_err = sys.stderr - try: - sys.stderr = FakeFile() - waiter = pool.execute(crash) - self.assertRaises(RuntimeError, waiter.wait) - # the pool should have something free at this point since the - # waiter returned - # pool.Pool change: if an exception is raised during execution of a link, - # the rest of the links are scheduled to be executed on the next hub iteration - # this introduces a delay in updating pool.sem which makes pool.free() report 0 - # therefore, sleep: - api.sleep(0) - self.assertEqual(pool.free(), 1) - # shouldn't block when trying to get - t = timeout.Timeout(0.1) - try: - pool.execute(api.sleep, 1) - finally: - t.cancel() - finally: - sys.stderr = normal_err - - def test_track_events(self): - pool = self.klass(track_events=True) - for x in range(6): - pool.execute(lambda n: n, x) - for y in range(6): - pool.wait() - - def test_track_slow_event(self): - pool = self.klass(track_events=True) - - def slow(): - api.sleep(0.1) - return 'ok' - pool.execute(slow) - self.assertEqual(pool.wait(), 'ok') - - def test_pool_smash(self): - # The premise is that a coroutine in a Pool tries to get a token out - # of a token pool but times out before getting the token. We verify - # that neither pool is adversely affected by this situation. - from eventlet import pools - pool = self.klass(min_size=1, max_size=1) - tp = pools.TokenPool(max_size=1) - token = tp.get() # empty pool - - def do_receive(tp): - timeout.Timeout(0, RuntimeError()) - try: - t = tp.get() - self.fail("Shouldn't have recieved anything from the pool") - except RuntimeError: - return 'timed out' - - # the execute makes the token pool expect that coroutine, but then - # immediately cuts bait - e1 = pool.execute(do_receive, tp) - self.assertEqual(e1.wait(), 'timed out') - - # the pool can get some random item back - def send_wakeup(tp): - tp.put('wakeup') - api.spawn(send_wakeup, tp) - - # now we ask the pool to run something else, which should not - # be affected by the previous send at all - def resume(): - return 'resumed' - e2 = pool.execute(resume) - self.assertEqual(e2.wait(), 'resumed') - - # we should be able to get out the thing we put in there, too - self.assertEqual(tp.get(), 'wakeup') - - -class PoolBasicTests(LimitedTestCase): - klass = pool.Pool - - def test_execute_async(self): - p = self.klass(max_size=2) - self.assertEqual(p.free(), 2) - r = [] - - def foo(a): - r.append(a) - evt = p.execute(foo, 1) - self.assertEqual(p.free(), 1) - evt.wait() - self.assertEqual(r, [1]) - api.sleep(0) - self.assertEqual(p.free(), 2) - - # Once the pool is exhausted, calling an execute forces a yield. - - p.execute_async(foo, 2) - self.assertEqual(1, p.free()) - self.assertEqual(r, [1]) - - p.execute_async(foo, 3) - self.assertEqual(0, p.free()) - self.assertEqual(r, [1]) - - p.execute_async(foo, 4) - self.assertEqual(r, [1, 2, 3]) - api.sleep(0) - self.assertEqual(r, [1, 2, 3, 4]) - - def test_execute(self): - p = self.klass() - evt = p.execute(lambda a: ('foo', a), 1) - self.assertEqual(evt.wait(), ('foo', 1)) - - def test_with_intpool(self): - from eventlet import pools - - class IntPool(pools.Pool): - def create(self): - self.current_integer = getattr(self, 'current_integer', 0) + 1 - return self.current_integer - - def subtest(intpool_size, pool_size, num_executes): - def run(int_pool): - token = int_pool.get() - api.sleep(0.0001) - int_pool.put(token) - return token - - int_pool = IntPool(max_size=intpool_size) - pool = self.klass(max_size=pool_size) - for ix in six.moves.range(num_executes): - pool.execute(run, int_pool) - pool.waitall() - - subtest(4, 7, 7) - subtest(50, 75, 100) - for isize in (20, 30, 40, 50): - for psize in (25, 35, 50): - subtest(isize, psize, psize) - - -if __name__ == '__main__': - main() diff --git a/tests/test__proc.py b/tests/test__proc.py deleted file mode 100644 index fb71a64..0000000 --- a/tests/test__proc.py +++ /dev/null @@ -1,401 +0,0 @@ -import sys -import unittest -import warnings -warnings.simplefilter('ignore', DeprecationWarning) -from eventlet import proc -warnings.simplefilter('default', DeprecationWarning) -from eventlet import coros -from eventlet import event as _event -from eventlet import Timeout, sleep, getcurrent, with_timeout -from tests import LimitedTestCase, skipped, silence_warnings - -DELAY = 0.01 - - -class ExpectedError(Exception): - pass - - -class TestLink_Signal(LimitedTestCase): - - @silence_warnings - def test_send(self): - s = proc.Source() - q1, q2, q3 = coros.queue(), coros.queue(), coros.queue() - s.link_value(q1) - self.assertRaises(Timeout, s.wait, 0) - assert s.wait(0, None) is None - assert s.wait(0.001, None) is None - self.assertRaises(Timeout, s.wait, 0.001) - s.send(1) - assert not q1.ready() - assert s.wait() == 1 - sleep(0) - assert q1.ready() - s.link_exception(q2) - s.link(q3) - assert not q2.ready() - sleep(0) - assert q3.ready() - assert s.wait() == 1 - - @silence_warnings - def test_send_exception(self): - s = proc.Source() - q1, q2, q3 = coros.queue(), coros.queue(), coros.queue() - s.link_exception(q1) - s.send_exception(OSError('hello')) - sleep(0) - assert q1.ready() - s.link_value(q2) - s.link(q3) - assert not q2.ready() - sleep(0) - assert q3.ready() - self.assertRaises(OSError, q1.wait) - self.assertRaises(OSError, q3.wait) - self.assertRaises(OSError, s.wait) - - -class TestProc(LimitedTestCase): - - def test_proc(self): - p = proc.spawn(lambda: 100) - receiver = proc.spawn(sleep, 1) - p.link(receiver) - self.assertRaises(proc.LinkedCompleted, receiver.wait) - receiver2 = proc.spawn(sleep, 1) - p.link(receiver2) - self.assertRaises(proc.LinkedCompleted, receiver2.wait) - - def test_event(self): - p = proc.spawn(lambda: 100) - event = _event.Event() - p.link(event) - self.assertEqual(event.wait(), 100) - - for i in range(3): - event2 = _event.Event() - p.link(event2) - self.assertEqual(event2.wait(), 100) - - def test_current(self): - p = proc.spawn(lambda: 100) - p.link() - self.assertRaises(proc.LinkedCompleted, sleep, 0.1) - - -class TestCase(LimitedTestCase): - - def link(self, p, listener=None): - getattr(p, self.link_method)(listener) - - def tearDown(self): - LimitedTestCase.tearDown(self) - self.p.unlink() - - def set_links(self, p, first_time, kill_exc_type): - event = _event.Event() - self.link(p, event) - - proc_flag = [] - - def receiver(): - sleep(DELAY) - proc_flag.append('finished') - receiver = proc.spawn(receiver) - self.link(p, receiver) - - queue = coros.queue(1) - self.link(p, queue) - - try: - self.link(p) - except kill_exc_type: - if first_time: - raise - else: - assert first_time, 'not raising here only first time' - - callback_flag = ['initial'] - self.link(p, lambda *args: callback_flag.remove('initial')) - - for _ in range(10): - self.link(p, _event.Event()) - self.link(p, coros.queue(1)) - return event, receiver, proc_flag, queue, callback_flag - - def set_links_timeout(self, link): - # stuff that won't be touched - event = _event.Event() - link(event) - - proc_finished_flag = [] - - def myproc(): - sleep(10) - proc_finished_flag.append('finished') - return 555 - myproc = proc.spawn(myproc) - link(myproc) - - queue = coros.queue(0) - link(queue) - return event, myproc, proc_finished_flag, queue - - def check_timed_out(self, event, myproc, proc_finished_flag, queue): - X = object() - assert with_timeout(DELAY, event.wait, timeout_value=X) is X - assert with_timeout(DELAY, queue.wait, timeout_value=X) is X - assert with_timeout(DELAY, proc.waitall, [myproc], timeout_value=X) is X - assert proc_finished_flag == [], proc_finished_flag - - -class TestReturn_link(TestCase): - link_method = 'link' - - def test_return(self): - def return25(): - return 25 - p = self.p = proc.spawn(return25) - self._test_return(p, True, 25, proc.LinkedCompleted, lambda: sleep(0)) - # repeating the same with dead process - for _ in range(3): - self._test_return(p, False, 25, proc.LinkedCompleted, lambda: sleep(0)) - - def _test_return(self, p, first_time, result, kill_exc_type, action): - event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) - - # stuff that will time out because there's no unhandled exception: - xxxxx = self.set_links_timeout(p.link_exception) - - try: - sleep(DELAY * 2) - except kill_exc_type: - assert first_time, 'raising here only first time' - else: - assert not first_time, 'Should not raise LinkedKilled here after first time' - - assert not p, p - - self.assertEqual(event.wait(), result) - self.assertEqual(queue.wait(), result) - self.assertRaises(kill_exc_type, receiver.wait) - self.assertRaises(kill_exc_type, proc.waitall, [receiver]) - - sleep(DELAY) - assert not proc_flag, proc_flag - assert not callback_flag, callback_flag - - self.check_timed_out(*xxxxx) - - -class TestReturn_link_value(TestReturn_link): - sync = False - link_method = 'link_value' - - -class TestRaise_link(TestCase): - link_method = 'link' - - def _test_raise(self, p, first_time, kill_exc_type): - event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) - xxxxx = self.set_links_timeout(p.link_value) - - try: - sleep(DELAY) - except kill_exc_type: - assert first_time, 'raising here only first time' - else: - assert not first_time, 'Should not raise LinkedKilled here after first time' - - assert not p, p - - self.assertRaises(ExpectedError, event.wait) - self.assertRaises(ExpectedError, queue.wait) - self.assertRaises(kill_exc_type, receiver.wait) - self.assertRaises(kill_exc_type, proc.waitall, [receiver]) - sleep(DELAY) - assert not proc_flag, proc_flag - assert not callback_flag, callback_flag - - self.check_timed_out(*xxxxx) - - @silence_warnings - def test_raise(self): - p = self.p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_raise'))) - self._test_raise(p, True, proc.LinkedFailed) - # repeating the same with dead process - for _ in range(3): - self._test_raise(p, False, proc.LinkedFailed) - - def _test_kill(self, p, first_time, kill_exc_type): - event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) - xxxxx = self.set_links_timeout(p.link_value) - - p.kill() - try: - sleep(DELAY) - except kill_exc_type: - assert first_time, 'raising here only first time' - else: - assert not first_time, 'Should not raise LinkedKilled here after first time' - - assert not p, p - - self.assertRaises(proc.ProcExit, event.wait) - self.assertRaises(proc.ProcExit, queue.wait) - self.assertRaises(kill_exc_type, proc.waitall, [receiver]) - self.assertRaises(kill_exc_type, receiver.wait) - - sleep(DELAY) - assert not proc_flag, proc_flag - assert not callback_flag, callback_flag - - self.check_timed_out(*xxxxx) - - @silence_warnings - def test_kill(self): - p = self.p = proc.spawn(sleep, DELAY) - self._test_kill(p, True, proc.LinkedKilled) - # repeating the same with dead process - for _ in range(3): - self._test_kill(p, False, proc.LinkedKilled) - - -class TestRaise_link_exception(TestRaise_link): - link_method = 'link_exception' - - -class TestStuff(LimitedTestCase): - - def test_wait_noerrors(self): - x = proc.spawn(lambda: 1) - y = proc.spawn(lambda: 2) - z = proc.spawn(lambda: 3) - self.assertEqual(proc.waitall([x, y, z]), [1, 2, 3]) - e = _event.Event() - x.link(e) - self.assertEqual(e.wait(), 1) - x.unlink(e) - e = _event.Event() - x.link(e) - self.assertEqual(e.wait(), 1) - self.assertEqual([proc.waitall([X]) for X in [x, y, z]], [[1], [2], [3]]) - - # this test is timing-sensitive - @skipped - def test_wait_error(self): - def x(): - sleep(DELAY) - return 1 - x = proc.spawn(x) - z = proc.spawn(lambda: 3) - y = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_wait_error'))) - y.link(x) - x.link(y) - y.link(z) - z.link(y) - self.assertRaises(ExpectedError, proc.waitall, [x, y, z]) - self.assertRaises(proc.LinkedFailed, proc.waitall, [x]) - self.assertEqual(proc.waitall([z]), [3]) - self.assertRaises(ExpectedError, proc.waitall, [y]) - - def test_wait_all_exception_order(self): - # if there're several exceptions raised, the earliest one must be raised by wait - def first(): - sleep(0.1) - raise ExpectedError('first') - a = proc.spawn(first) - b = proc.spawn(lambda: getcurrent().throw(ExpectedError('second'))) - try: - proc.waitall([a, b]) - except ExpectedError as ex: - assert 'second' in str(ex), repr(str(ex)) - sleep(0.2) # sleep to ensure that the other timer is raised - - def test_multiple_listeners_error(self): - # if there was an error while calling a callback - # it should not prevent the other listeners from being called - # also, all of the errors should be logged, check the output - # manually that they are - p = proc.spawn(lambda: 5) - results = [] - - def listener1(*args): - results.append(10) - raise ExpectedError('listener1') - - def listener2(*args): - results.append(20) - raise ExpectedError('listener2') - - def listener3(*args): - raise ExpectedError('listener3') - p.link(listener1) - p.link(listener2) - p.link(listener3) - sleep(DELAY * 10) - assert results in [[10, 20], [20, 10]], results - - p = proc.spawn(lambda: getcurrent().throw(ExpectedError('test_multiple_listeners_error'))) - results = [] - p.link(listener1) - p.link(listener2) - p.link(listener3) - sleep(DELAY * 10) - assert results in [[10, 20], [20, 10]], results - - def _test_multiple_listeners_error_unlink(self, p): - # notification must not happen after unlink even - # though notification process has been already started - results = [] - - def listener1(*args): - p.unlink(listener2) - results.append(5) - raise ExpectedError('listener1') - - def listener2(*args): - p.unlink(listener1) - results.append(5) - raise ExpectedError('listener2') - - def listener3(*args): - raise ExpectedError('listener3') - p.link(listener1) - p.link(listener2) - p.link(listener3) - sleep(DELAY * 10) - assert results == [5], results - - def test_multiple_listeners_error_unlink_Proc(self): - p = proc.spawn(lambda: 5) - self._test_multiple_listeners_error_unlink(p) - - def test_multiple_listeners_error_unlink_Source(self): - p = proc.Source() - proc.spawn(p.send, 6) - self._test_multiple_listeners_error_unlink(p) - - def test_killing_unlinked(self): - e = _event.Event() - - def func(): - try: - raise ExpectedError('test_killing_unlinked') - except: - e.send_exception(*sys.exc_info()) - p = proc.spawn_link(func) - try: - try: - e.wait() - except ExpectedError: - pass - finally: - p.unlink() # this disables LinkedCompleted that otherwise would be raised by the next line - sleep(DELAY) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test__socket_errors.py b/tests/test__socket_errors.py index 5228ea7..7832de0 100644 --- a/tests/test__socket_errors.py +++ b/tests/test__socket_errors.py @@ -1,6 +1,5 @@ import unittest import socket as _original_sock -from eventlet import api from eventlet.green import socket diff --git a/tests/test__twistedutil_protocol.py b/tests/test__twistedutil_protocol.py index 29a26cf..337cdbb 100644 --- a/tests/test__twistedutil_protocol.py +++ b/tests/test__twistedutil_protocol.py @@ -19,7 +19,7 @@ except ImportError: pass from eventlet import spawn, sleep, with_timeout, spawn_after -from eventlet.coros import Event +from eventlet.event import Event try: from eventlet.green import socket @@ -33,8 +33,7 @@ deps = commands = nosetests --verbose tests/ nosetests --verbose --with-doctest eventlet/coros.py eventlet/event.py \ - eventlet/pool.py eventlet/pools.py eventlet/proc.py \ - eventlet/queue.py eventlet/timeout.py + eventlet/pools.py eventlet/queue.py eventlet/timeout.py [testenv:pep8] basepython = python2.7 |
