diff options
| author | Sergey Shepelev <temotor@gmail.com> | 2016-12-23 01:36:36 +0300 |
|---|---|---|
| committer | Sergey Shepelev <temotor@gmail.com> | 2016-12-23 01:36:36 +0300 |
| commit | c3e46981863502502fc7a11faa266f925250be4c (patch) | |
| tree | 406da21c545676296d025a8a53123edb25b164c2 | |
| parent | e1bb2fee1db118a617b674821d1f73add9e4f77f (diff) | |
| download | eventlet-issue-364.tar.gz | |
Type check Semaphore, GreenPool arguments; Thanks to Matthew D. Pagelissue-364
- export Event, *Semaphore in `eventlet.` top level namespace
https://github.com/eventlet/eventlet/issues/364
| -rw-r--r-- | eventlet/__init__.py | 38 | ||||
| -rw-r--r-- | eventlet/greenpool.py | 36 | ||||
| -rw-r--r-- | eventlet/semaphore.py | 9 | ||||
| -rw-r--r-- | tests/greenpool_test.py | 82 | ||||
| -rw-r--r-- | tests/semaphore_test.py | 28 |
5 files changed, 111 insertions, 82 deletions
diff --git a/eventlet/__init__.py b/eventlet/__init__.py index efbdbe6..fbf3ef4 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -7,36 +7,44 @@ __version__ = '.'.join(map(str, version_info)) # errors of greenlet so that the packager can still at least # access the version. Also this makes easy_install a little quieter if os.environ.get('EVENTLET_IMPORT_VERSION_ONLY') != '1': - from eventlet import greenthread + from eventlet import convenience + from eventlet import event from eventlet import greenpool + from eventlet import greenthread + from eventlet import patcher from eventlet import queue + from eventlet import semaphore from eventlet import timeout - from eventlet import patcher - from eventlet import convenience import greenlet + connect = convenience.connect + listen = convenience.listen + serve = convenience.serve + StopServe = convenience.StopServe + wrap_ssl = convenience.wrap_ssl + + Event = event.Event + + GreenPool = greenpool.GreenPool + GreenPile = greenpool.GreenPile + sleep = greenthread.sleep spawn = greenthread.spawn spawn_n = greenthread.spawn_n spawn_after = greenthread.spawn_after kill = greenthread.kill - Timeout = timeout.Timeout - with_timeout = timeout.with_timeout - - GreenPool = greenpool.GreenPool - GreenPile = greenpool.GreenPile + import_patched = patcher.import_patched + monkey_patch = patcher.monkey_patch Queue = queue.Queue - import_patched = patcher.import_patched - monkey_patch = patcher.monkey_patch + Semaphore = semaphore.Semaphore + CappedSemaphore = semaphore.CappedSemaphore + BoundedSemaphore = semaphore.BoundedSemaphore - connect = convenience.connect - listen = convenience.listen - serve = convenience.serve - StopServe = convenience.StopServe - wrap_ssl = convenience.wrap_ssl + Timeout = timeout.Timeout + with_timeout = timeout.with_timeout getcurrent = greenlet.greenlet.getcurrent diff --git a/eventlet/greenpool.py b/eventlet/greenpool.py index a2503fc..1a9c0ad 100644 --- a/eventlet/greenpool.py +++ b/eventlet/greenpool.py @@ -1,9 +1,7 @@ import traceback -from eventlet import event -from eventlet import greenthread +import eventlet from eventlet import queue -from eventlet import semaphore from eventlet.support import greenlets as greenlet from eventlet.support import six @@ -17,10 +15,16 @@ class GreenPool(object): """ def __init__(self, size=1000): + try: + size = int(size) + except ValueError as e: + raise TypeError('GreenPool() expect size :: int, actual: {0} {1}'.format(type(size), str(e))) + if size < 0: + raise ValueError('GreenPool() expect size >= 0, actual: {0}'.format(repr(size))) self.size = size self.coroutines_running = set() - self.sem = semaphore.Semaphore(size) - self.no_coros_running = event.Event() + self.sem = eventlet.Semaphore(size) + self.no_coros_running = eventlet.Event() def resize(self, new_size): """ Change the max number of greenthreads doing work at any given time. @@ -49,7 +53,7 @@ class GreenPool(object): def spawn(self, function, *args, **kwargs): """Run the *function* with its arguments in its own green thread. - Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>` + Returns the :class:`GreenThread <eventlet.GreenThread>` object that is running the function, which can be used to retrieve the results. @@ -61,17 +65,17 @@ class GreenPool(object): """ # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine - current = greenthread.getcurrent() + current = eventlet.getcurrent() if self.sem.locked() and current in self.coroutines_running: # a bit hacky to use the GT without switching to it - gt = greenthread.GreenThread(current) + gt = eventlet.GreenThread(current) gt.main(function, args, kwargs) return gt else: self.sem.acquire() - gt = greenthread.spawn(function, *args, **kwargs) + gt = eventlet.spawn(function, *args, **kwargs) if not self.coroutines_running: - self.no_coros_running = event.Event() + self.no_coros_running = eventlet.Event() self.coroutines_running.add(gt) gt.link(self._spawn_done) return gt @@ -89,7 +93,7 @@ class GreenPool(object): if coro is None: return else: - coro = greenthread.getcurrent() + coro = eventlet.getcurrent() self._spawn_done(coro) def spawn_n(self, function, *args, **kwargs): @@ -99,21 +103,21 @@ class GreenPool(object): """ # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine - current = greenthread.getcurrent() + current = eventlet.getcurrent() if self.sem.locked() and current in self.coroutines_running: self._spawn_n_impl(function, args, kwargs, None) else: self.sem.acquire() - g = greenthread.spawn_n( + g = eventlet.spawn_n( self._spawn_n_impl, function, args, kwargs, True) if not self.coroutines_running: - self.no_coros_running = event.Event() + self.no_coros_running = eventlet.Event() self.coroutines_running.add(g) def waitall(self): """Waits until all greenthreads in the pool are finished working.""" - assert greenthread.getcurrent() not in self.coroutines_running, \ + assert eventlet.getcurrent() not in self.coroutines_running, \ "Calling waitall() from within one of the " \ "GreenPool's greenthreads will never terminate." if self.running(): @@ -151,7 +155,7 @@ class GreenPool(object): if function is None: function = lambda *a: a gi = GreenMap(self.size) - greenthread.spawn_n(self._do_map, function, iterable, gi) + eventlet.spawn_n(self._do_map, function, iterable, gi) return gi def imap(self, function, *iterables): diff --git a/eventlet/semaphore.py b/eventlet/semaphore.py index b2ef9d3..8cac803 100644 --- a/eventlet/semaphore.py +++ b/eventlet/semaphore.py @@ -34,10 +34,13 @@ class Semaphore(object): """ def __init__(self, value=1): - self.counter = value + try: + value = int(value) + except ValueError as e: + raise TypeError('Semaphore() expect value :: int, actual: {0} {1}'.format(type(value), str(e))) if value < 0: - raise ValueError("Semaphore must be initialized with a positive " - "number, got %s" % value) + raise ValueError('Semaphore() expect value >= 0, actual: {0}'.format(repr(value))) + self.counter = value self._waiters = collections.deque() def __repr__(self): diff --git a/tests/greenpool_test.py b/tests/greenpool_test.py index ff56262..0da0331 100644 --- a/tests/greenpool_test.py +++ b/tests/greenpool_test.py @@ -1,9 +1,8 @@ import gc -import os import random import eventlet -from eventlet import hubs, greenpool, event, pools +from eventlet import hubs, pools from eventlet.support import greenlets as greenlet, six import tests @@ -24,7 +23,7 @@ def raiser(exc): class GreenPool(tests.LimitedTestCase): def test_spawn(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) waiters = [] for i in range(10): waiters.append(p.spawn(passthru, i)) @@ -32,7 +31,7 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(results, list(range(10))) def test_spawn_n(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) results_closure = [] def do_something(a): @@ -45,8 +44,8 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(results_closure, list(range(10))) def test_waiting(self): - pool = greenpool.GreenPool(1) - done = event.Event() + pool = eventlet.GreenPool(1) + done = eventlet.Event() def consume(): done.wait() @@ -74,7 +73,7 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(pool.running(), 0) def test_multiple_coros(self): - evt = event.Event() + evt = eventlet.Event() results = [] def producer(): @@ -86,7 +85,7 @@ class GreenPool(tests.LimitedTestCase): evt.wait() results.append('cons2') - pool = greenpool.GreenPool(2) + pool = eventlet.GreenPool(2) done = pool.spawn(consumer) pool.spawn_n(producer) done.wait() @@ -103,7 +102,7 @@ class GreenPool(tests.LimitedTestCase): def some_work(): hubs.get_hub().schedule_call_local(0, fire_timer) - pool = greenpool.GreenPool(2) + pool = eventlet.GreenPool(2) worker = pool.spawn(some_work) worker.wait() eventlet.sleep(0) @@ -111,7 +110,7 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(timer_fired, []) def test_reentrant(self): - pool = greenpool.GreenPool(1) + pool = eventlet.GreenPool(1) def reenter(): waiter = pool.spawn(lambda a: a, 'reenter') @@ -120,7 +119,7 @@ class GreenPool(tests.LimitedTestCase): outer_waiter = pool.spawn(reenter) outer_waiter.wait() - evt = event.Event() + evt = eventlet.Event() def reenter_async(): pool.spawn_n(lambda a: a, 'reenter') @@ -137,7 +136,7 @@ class GreenPool(tests.LimitedTestCase): timer = eventlet.Timeout(1) try: - evt = event.Event() + evt = eventlet.Event() for x in six.moves.range(num_free): pool.spawn(wait_long_time, evt) # if the pool has fewer free than we expect, @@ -159,8 +158,8 @@ class GreenPool(tests.LimitedTestCase): eventlet.sleep(0) def test_resize(self): - pool = greenpool.GreenPool(2) - evt = event.Event() + pool = eventlet.GreenPool(2) + evt = eventlet.Event() def wait_long_time(e): e.wait() @@ -194,7 +193,7 @@ class GreenPool(tests.LimitedTestCase): # The premise is that a coroutine in a Pool tries to get a token out # of a token pool but times out before getting the token. We verify # that neither pool is adversely affected by this situation. - pool = greenpool.GreenPool(1) + pool = eventlet.GreenPool(1) tp = pools.TokenPool(max_size=1) tp.get() # empty out the pool @@ -230,7 +229,7 @@ class GreenPool(tests.LimitedTestCase): gt.wait() def test_spawn_n_2(self): - p = greenpool.GreenPool(2) + p = eventlet.GreenPool(2) self.assertEqual(p.free(), 2) r = [] @@ -259,7 +258,7 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(set(r), set([1, 2, 3, 4])) def test_exceptions(self): - p = greenpool.GreenPool(2) + p = eventlet.GreenPool(2) for m in (p.spawn, p.spawn_n): self.assert_pool_has_free(p, 2) m(raiser, RuntimeError()) @@ -272,22 +271,22 @@ class GreenPool(tests.LimitedTestCase): self.assert_pool_has_free(p, 2) def test_imap(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) result_list = list(p.imap(passthru, range(10))) self.assertEqual(result_list, list(range(10))) def test_empty_imap(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) result_iter = p.imap(passthru, []) self.assertRaises(StopIteration, result_iter.next) def test_imap_nonefunc(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) result_list = list(p.imap(None, range(10))) self.assertEqual(result_list, [(x,) for x in range(10)]) def test_imap_multi_args(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) result_list = list(p.imap(passthru2, range(10), range(10, 20))) self.assertEqual(result_list, list(zip(range(10), range(10, 20)))) @@ -295,7 +294,7 @@ class GreenPool(tests.LimitedTestCase): # testing the case where the function raises an exception; # both that the caller sees that exception, and that the iterator # continues to be usable to get the rest of the items - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) def raiser(item): if item == 1 or item == 7: @@ -315,30 +314,30 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9]) def test_starmap(self): - p = greenpool.GreenPool(4) + p = eventlet.GreenPool(4) result_list = list(p.starmap(passthru, [(x,) for x in range(10)])) self.assertEqual(result_list, list(range(10))) def test_waitall_on_nothing(self): - p = greenpool.GreenPool() + p = eventlet.GreenPool() p.waitall() def test_recursive_waitall(self): - p = greenpool.GreenPool() + p = eventlet.GreenPool() gt = p.spawn(p.waitall) self.assertRaises(AssertionError, gt.wait) class GreenPile(tests.LimitedTestCase): def test_pile(self): - p = greenpool.GreenPile(4) + p = eventlet.GreenPile(4) for i in range(10): p.spawn(passthru, i) result_list = list(p) self.assertEqual(result_list, list(range(10))) def test_pile_spawn_times_out(self): - p = greenpool.GreenPile(4) + p = eventlet.GreenPile(4) for i in range(4): p.spawn(passthru, i) # now it should be full and this should time out @@ -351,9 +350,9 @@ class GreenPile(tests.LimitedTestCase): self.assertEqual(list(p), list(range(10))) def test_constructing_from_pool(self): - pool = greenpool.GreenPool(2) - pile1 = greenpool.GreenPile(pool) - pile2 = greenpool.GreenPile(pool) + pool = eventlet.GreenPool(2) + pile1 = eventlet.GreenPile(pool) + pile2 = eventlet.GreenPile(pool) def bunch_of_work(pile, unique): for i in range(10): @@ -366,6 +365,17 @@ class GreenPile(tests.LimitedTestCase): self.assertEqual(list(pile1), list(range(10))) +def test_greenpool_type_check(): + eventlet.GreenPool(0) + eventlet.GreenPool(1) + eventlet.GreenPool(1e3) + + with tests.assert_raises(TypeError): + eventlet.GreenPool('foo') + with tests.assert_raises(ValueError): + eventlet.GreenPool(-1) + + class StressException(Exception): pass @@ -391,10 +401,9 @@ class Stress(tests.LimitedTestCase): # tests will take extra-long TEST_TIMEOUT = 60 - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def spawn_order_check(self, concurrency): # checks that piles are strictly ordered - p = greenpool.GreenPile(concurrency) + p = eventlet.GreenPile(concurrency) def makework(count, unique): for i in six.moves.range(count): @@ -425,18 +434,16 @@ class Stress(tests.LimitedTestCase): for l in latest[1:]: self.assertEqual(l, iters - 1) - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_ordering_5(self): self.spawn_order_check(5) - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_ordering_50(self): self.spawn_order_check(50) def imap_memory_check(self, concurrency): # checks that imap is strictly # ordered and consumes a constant amount of memory - p = greenpool.GreenPool(concurrency) + p = eventlet.GreenPool(concurrency) count = 1000 it = p.imap(passthru, six.moves.range(count)) latest = -1 @@ -460,15 +467,12 @@ class Stress(tests.LimitedTestCase): # make sure we got to the end self.assertEqual(latest, count - 1) - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_imap_50(self): self.imap_memory_check(50) - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_imap_500(self): self.imap_memory_check(500) - @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_with_intpool(self): class IntPool(pools.Pool): def create(self): @@ -483,7 +487,7 @@ class Stress(tests.LimitedTestCase): return token int_pool = IntPool(max_size=intpool_size) - pool = greenpool.GreenPool(pool_size) + pool = eventlet.GreenPool(pool_size) for ix in six.moves.range(num_executes): pool.spawn(run, int_pool) pool.waitall() diff --git a/tests/semaphore_test.py b/tests/semaphore_test.py index daf83f6..d6c11d1 100644 --- a/tests/semaphore_test.py +++ b/tests/semaphore_test.py @@ -1,14 +1,13 @@ import time import eventlet -from eventlet import semaphore -from tests import LimitedTestCase +import tests -class TestSemaphore(LimitedTestCase): +class TestSemaphore(tests.LimitedTestCase): def test_bounded(self): - sem = semaphore.CappedSemaphore(2, limit=3) + sem = eventlet.CappedSemaphore(2, limit=3) self.assertEqual(sem.acquire(), True) self.assertEqual(sem.acquire(), True) gt1 = eventlet.spawn(sem.release) @@ -24,28 +23,28 @@ class TestSemaphore(LimitedTestCase): gt2.wait() def test_bounded_with_zero_limit(self): - sem = semaphore.CappedSemaphore(0, 0) + sem = eventlet.CappedSemaphore(0, 0) gt = eventlet.spawn(sem.acquire) sem.release() gt.wait() def test_non_blocking(self): - sem = semaphore.Semaphore(0) + sem = eventlet.Semaphore(0) self.assertEqual(sem.acquire(blocking=False), False) def test_timeout(self): - sem = semaphore.Semaphore(0) + sem = eventlet.Semaphore(0) start = time.time() self.assertEqual(sem.acquire(timeout=0.1), False) self.assertTrue(time.time() - start >= 0.1) def test_timeout_non_blocking(self): - sem = semaphore.Semaphore() + sem = eventlet.Semaphore() self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1) def test_semaphore_contention(): - g_mutex = semaphore.Semaphore() + g_mutex = eventlet.Semaphore() counts = [0, 0] def worker(no): @@ -61,3 +60,14 @@ def test_semaphore_contention(): t2.kill() assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts + + +def test_semaphore_type_check(): + eventlet.Semaphore(0) + eventlet.Semaphore(1) + eventlet.Semaphore(1e2) + + with tests.assert_raises(TypeError): + eventlet.Semaphore('foo') + with tests.assert_raises(ValueError): + eventlet.Semaphore(-1) |
