summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Shepelev <temotor@gmail.com>2016-12-23 01:36:36 +0300
committerSergey Shepelev <temotor@gmail.com>2016-12-23 01:36:36 +0300
commitc3e46981863502502fc7a11faa266f925250be4c (patch)
tree406da21c545676296d025a8a53123edb25b164c2
parente1bb2fee1db118a617b674821d1f73add9e4f77f (diff)
downloadeventlet-issue-364.tar.gz
Type check Semaphore, GreenPool arguments; Thanks to Matthew D. Pagelissue-364
- export Event, *Semaphore in `eventlet.` top level namespace https://github.com/eventlet/eventlet/issues/364
-rw-r--r--eventlet/__init__.py38
-rw-r--r--eventlet/greenpool.py36
-rw-r--r--eventlet/semaphore.py9
-rw-r--r--tests/greenpool_test.py82
-rw-r--r--tests/semaphore_test.py28
5 files changed, 111 insertions, 82 deletions
diff --git a/eventlet/__init__.py b/eventlet/__init__.py
index efbdbe6..fbf3ef4 100644
--- a/eventlet/__init__.py
+++ b/eventlet/__init__.py
@@ -7,36 +7,44 @@ __version__ = '.'.join(map(str, version_info))
# errors of greenlet so that the packager can still at least
# access the version. Also this makes easy_install a little quieter
if os.environ.get('EVENTLET_IMPORT_VERSION_ONLY') != '1':
- from eventlet import greenthread
+ from eventlet import convenience
+ from eventlet import event
from eventlet import greenpool
+ from eventlet import greenthread
+ from eventlet import patcher
from eventlet import queue
+ from eventlet import semaphore
from eventlet import timeout
- from eventlet import patcher
- from eventlet import convenience
import greenlet
+ connect = convenience.connect
+ listen = convenience.listen
+ serve = convenience.serve
+ StopServe = convenience.StopServe
+ wrap_ssl = convenience.wrap_ssl
+
+ Event = event.Event
+
+ GreenPool = greenpool.GreenPool
+ GreenPile = greenpool.GreenPile
+
sleep = greenthread.sleep
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n
spawn_after = greenthread.spawn_after
kill = greenthread.kill
- Timeout = timeout.Timeout
- with_timeout = timeout.with_timeout
-
- GreenPool = greenpool.GreenPool
- GreenPile = greenpool.GreenPile
+ import_patched = patcher.import_patched
+ monkey_patch = patcher.monkey_patch
Queue = queue.Queue
- import_patched = patcher.import_patched
- monkey_patch = patcher.monkey_patch
+ Semaphore = semaphore.Semaphore
+ CappedSemaphore = semaphore.CappedSemaphore
+ BoundedSemaphore = semaphore.BoundedSemaphore
- connect = convenience.connect
- listen = convenience.listen
- serve = convenience.serve
- StopServe = convenience.StopServe
- wrap_ssl = convenience.wrap_ssl
+ Timeout = timeout.Timeout
+ with_timeout = timeout.with_timeout
getcurrent = greenlet.greenlet.getcurrent
diff --git a/eventlet/greenpool.py b/eventlet/greenpool.py
index a2503fc..1a9c0ad 100644
--- a/eventlet/greenpool.py
+++ b/eventlet/greenpool.py
@@ -1,9 +1,7 @@
import traceback
-from eventlet import event
-from eventlet import greenthread
+import eventlet
from eventlet import queue
-from eventlet import semaphore
from eventlet.support import greenlets as greenlet
from eventlet.support import six
@@ -17,10 +15,16 @@ class GreenPool(object):
"""
def __init__(self, size=1000):
+ try:
+ size = int(size)
+ except ValueError as e:
+ raise TypeError('GreenPool() expect size :: int, actual: {0} {1}'.format(type(size), str(e)))
+ if size < 0:
+ raise ValueError('GreenPool() expect size >= 0, actual: {0}'.format(repr(size)))
self.size = size
self.coroutines_running = set()
- self.sem = semaphore.Semaphore(size)
- self.no_coros_running = event.Event()
+ self.sem = eventlet.Semaphore(size)
+ self.no_coros_running = eventlet.Event()
def resize(self, new_size):
""" Change the max number of greenthreads doing work at any given time.
@@ -49,7 +53,7 @@ class GreenPool(object):
def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
- Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
+ Returns the :class:`GreenThread <eventlet.GreenThread>`
object that is running the function, which can be used to retrieve the
results.
@@ -61,17 +65,17 @@ class GreenPool(object):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
- current = greenthread.getcurrent()
+ current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
# a bit hacky to use the GT without switching to it
- gt = greenthread.GreenThread(current)
+ gt = eventlet.GreenThread(current)
gt.main(function, args, kwargs)
return gt
else:
self.sem.acquire()
- gt = greenthread.spawn(function, *args, **kwargs)
+ gt = eventlet.spawn(function, *args, **kwargs)
if not self.coroutines_running:
- self.no_coros_running = event.Event()
+ self.no_coros_running = eventlet.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done)
return gt
@@ -89,7 +93,7 @@ class GreenPool(object):
if coro is None:
return
else:
- coro = greenthread.getcurrent()
+ coro = eventlet.getcurrent()
self._spawn_done(coro)
def spawn_n(self, function, *args, **kwargs):
@@ -99,21 +103,21 @@ class GreenPool(object):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
- current = greenthread.getcurrent()
+ current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
- g = greenthread.spawn_n(
+ g = eventlet.spawn_n(
self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
- self.no_coros_running = event.Event()
+ self.no_coros_running = eventlet.Event()
self.coroutines_running.add(g)
def waitall(self):
"""Waits until all greenthreads in the pool are finished working."""
- assert greenthread.getcurrent() not in self.coroutines_running, \
+ assert eventlet.getcurrent() not in self.coroutines_running, \
"Calling waitall() from within one of the " \
"GreenPool's greenthreads will never terminate."
if self.running():
@@ -151,7 +155,7 @@ class GreenPool(object):
if function is None:
function = lambda *a: a
gi = GreenMap(self.size)
- greenthread.spawn_n(self._do_map, function, iterable, gi)
+ eventlet.spawn_n(self._do_map, function, iterable, gi)
return gi
def imap(self, function, *iterables):
diff --git a/eventlet/semaphore.py b/eventlet/semaphore.py
index b2ef9d3..8cac803 100644
--- a/eventlet/semaphore.py
+++ b/eventlet/semaphore.py
@@ -34,10 +34,13 @@ class Semaphore(object):
"""
def __init__(self, value=1):
- self.counter = value
+ try:
+ value = int(value)
+ except ValueError as e:
+ raise TypeError('Semaphore() expect value :: int, actual: {0} {1}'.format(type(value), str(e)))
if value < 0:
- raise ValueError("Semaphore must be initialized with a positive "
- "number, got %s" % value)
+ raise ValueError('Semaphore() expect value >= 0, actual: {0}'.format(repr(value)))
+ self.counter = value
self._waiters = collections.deque()
def __repr__(self):
diff --git a/tests/greenpool_test.py b/tests/greenpool_test.py
index ff56262..0da0331 100644
--- a/tests/greenpool_test.py
+++ b/tests/greenpool_test.py
@@ -1,9 +1,8 @@
import gc
-import os
import random
import eventlet
-from eventlet import hubs, greenpool, event, pools
+from eventlet import hubs, pools
from eventlet.support import greenlets as greenlet, six
import tests
@@ -24,7 +23,7 @@ def raiser(exc):
class GreenPool(tests.LimitedTestCase):
def test_spawn(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
waiters = []
for i in range(10):
waiters.append(p.spawn(passthru, i))
@@ -32,7 +31,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(results, list(range(10)))
def test_spawn_n(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
results_closure = []
def do_something(a):
@@ -45,8 +44,8 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(results_closure, list(range(10)))
def test_waiting(self):
- pool = greenpool.GreenPool(1)
- done = event.Event()
+ pool = eventlet.GreenPool(1)
+ done = eventlet.Event()
def consume():
done.wait()
@@ -74,7 +73,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(pool.running(), 0)
def test_multiple_coros(self):
- evt = event.Event()
+ evt = eventlet.Event()
results = []
def producer():
@@ -86,7 +85,7 @@ class GreenPool(tests.LimitedTestCase):
evt.wait()
results.append('cons2')
- pool = greenpool.GreenPool(2)
+ pool = eventlet.GreenPool(2)
done = pool.spawn(consumer)
pool.spawn_n(producer)
done.wait()
@@ -103,7 +102,7 @@ class GreenPool(tests.LimitedTestCase):
def some_work():
hubs.get_hub().schedule_call_local(0, fire_timer)
- pool = greenpool.GreenPool(2)
+ pool = eventlet.GreenPool(2)
worker = pool.spawn(some_work)
worker.wait()
eventlet.sleep(0)
@@ -111,7 +110,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(timer_fired, [])
def test_reentrant(self):
- pool = greenpool.GreenPool(1)
+ pool = eventlet.GreenPool(1)
def reenter():
waiter = pool.spawn(lambda a: a, 'reenter')
@@ -120,7 +119,7 @@ class GreenPool(tests.LimitedTestCase):
outer_waiter = pool.spawn(reenter)
outer_waiter.wait()
- evt = event.Event()
+ evt = eventlet.Event()
def reenter_async():
pool.spawn_n(lambda a: a, 'reenter')
@@ -137,7 +136,7 @@ class GreenPool(tests.LimitedTestCase):
timer = eventlet.Timeout(1)
try:
- evt = event.Event()
+ evt = eventlet.Event()
for x in six.moves.range(num_free):
pool.spawn(wait_long_time, evt)
# if the pool has fewer free than we expect,
@@ -159,8 +158,8 @@ class GreenPool(tests.LimitedTestCase):
eventlet.sleep(0)
def test_resize(self):
- pool = greenpool.GreenPool(2)
- evt = event.Event()
+ pool = eventlet.GreenPool(2)
+ evt = eventlet.Event()
def wait_long_time(e):
e.wait()
@@ -194,7 +193,7 @@ class GreenPool(tests.LimitedTestCase):
# The premise is that a coroutine in a Pool tries to get a token out
# of a token pool but times out before getting the token. We verify
# that neither pool is adversely affected by this situation.
- pool = greenpool.GreenPool(1)
+ pool = eventlet.GreenPool(1)
tp = pools.TokenPool(max_size=1)
tp.get() # empty out the pool
@@ -230,7 +229,7 @@ class GreenPool(tests.LimitedTestCase):
gt.wait()
def test_spawn_n_2(self):
- p = greenpool.GreenPool(2)
+ p = eventlet.GreenPool(2)
self.assertEqual(p.free(), 2)
r = []
@@ -259,7 +258,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(set(r), set([1, 2, 3, 4]))
def test_exceptions(self):
- p = greenpool.GreenPool(2)
+ p = eventlet.GreenPool(2)
for m in (p.spawn, p.spawn_n):
self.assert_pool_has_free(p, 2)
m(raiser, RuntimeError())
@@ -272,22 +271,22 @@ class GreenPool(tests.LimitedTestCase):
self.assert_pool_has_free(p, 2)
def test_imap(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
result_list = list(p.imap(passthru, range(10)))
self.assertEqual(result_list, list(range(10)))
def test_empty_imap(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
result_iter = p.imap(passthru, [])
self.assertRaises(StopIteration, result_iter.next)
def test_imap_nonefunc(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
result_list = list(p.imap(None, range(10)))
self.assertEqual(result_list, [(x,) for x in range(10)])
def test_imap_multi_args(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
result_list = list(p.imap(passthru2, range(10), range(10, 20)))
self.assertEqual(result_list, list(zip(range(10), range(10, 20))))
@@ -295,7 +294,7 @@ class GreenPool(tests.LimitedTestCase):
# testing the case where the function raises an exception;
# both that the caller sees that exception, and that the iterator
# continues to be usable to get the rest of the items
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
def raiser(item):
if item == 1 or item == 7:
@@ -315,30 +314,30 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9])
def test_starmap(self):
- p = greenpool.GreenPool(4)
+ p = eventlet.GreenPool(4)
result_list = list(p.starmap(passthru, [(x,) for x in range(10)]))
self.assertEqual(result_list, list(range(10)))
def test_waitall_on_nothing(self):
- p = greenpool.GreenPool()
+ p = eventlet.GreenPool()
p.waitall()
def test_recursive_waitall(self):
- p = greenpool.GreenPool()
+ p = eventlet.GreenPool()
gt = p.spawn(p.waitall)
self.assertRaises(AssertionError, gt.wait)
class GreenPile(tests.LimitedTestCase):
def test_pile(self):
- p = greenpool.GreenPile(4)
+ p = eventlet.GreenPile(4)
for i in range(10):
p.spawn(passthru, i)
result_list = list(p)
self.assertEqual(result_list, list(range(10)))
def test_pile_spawn_times_out(self):
- p = greenpool.GreenPile(4)
+ p = eventlet.GreenPile(4)
for i in range(4):
p.spawn(passthru, i)
# now it should be full and this should time out
@@ -351,9 +350,9 @@ class GreenPile(tests.LimitedTestCase):
self.assertEqual(list(p), list(range(10)))
def test_constructing_from_pool(self):
- pool = greenpool.GreenPool(2)
- pile1 = greenpool.GreenPile(pool)
- pile2 = greenpool.GreenPile(pool)
+ pool = eventlet.GreenPool(2)
+ pile1 = eventlet.GreenPile(pool)
+ pile2 = eventlet.GreenPile(pool)
def bunch_of_work(pile, unique):
for i in range(10):
@@ -366,6 +365,17 @@ class GreenPile(tests.LimitedTestCase):
self.assertEqual(list(pile1), list(range(10)))
+def test_greenpool_type_check():
+ eventlet.GreenPool(0)
+ eventlet.GreenPool(1)
+ eventlet.GreenPool(1e3)
+
+ with tests.assert_raises(TypeError):
+ eventlet.GreenPool('foo')
+ with tests.assert_raises(ValueError):
+ eventlet.GreenPool(-1)
+
+
class StressException(Exception):
pass
@@ -391,10 +401,9 @@ class Stress(tests.LimitedTestCase):
# tests will take extra-long
TEST_TIMEOUT = 60
- @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def spawn_order_check(self, concurrency):
# checks that piles are strictly ordered
- p = greenpool.GreenPile(concurrency)
+ p = eventlet.GreenPile(concurrency)
def makework(count, unique):
for i in six.moves.range(count):
@@ -425,18 +434,16 @@ class Stress(tests.LimitedTestCase):
for l in latest[1:]:
self.assertEqual(l, iters - 1)
- @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_5(self):
self.spawn_order_check(5)
- @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_50(self):
self.spawn_order_check(50)
def imap_memory_check(self, concurrency):
# checks that imap is strictly
# ordered and consumes a constant amount of memory
- p = greenpool.GreenPool(concurrency)
+ p = eventlet.GreenPool(concurrency)
count = 1000
it = p.imap(passthru, six.moves.range(count))
latest = -1
@@ -460,15 +467,12 @@ class Stress(tests.LimitedTestCase):
# make sure we got to the end
self.assertEqual(latest, count - 1)
- @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_50(self):
self.imap_memory_check(50)
- @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_500(self):
self.imap_memory_check(500)
- @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_with_intpool(self):
class IntPool(pools.Pool):
def create(self):
@@ -483,7 +487,7 @@ class Stress(tests.LimitedTestCase):
return token
int_pool = IntPool(max_size=intpool_size)
- pool = greenpool.GreenPool(pool_size)
+ pool = eventlet.GreenPool(pool_size)
for ix in six.moves.range(num_executes):
pool.spawn(run, int_pool)
pool.waitall()
diff --git a/tests/semaphore_test.py b/tests/semaphore_test.py
index daf83f6..d6c11d1 100644
--- a/tests/semaphore_test.py
+++ b/tests/semaphore_test.py
@@ -1,14 +1,13 @@
import time
import eventlet
-from eventlet import semaphore
-from tests import LimitedTestCase
+import tests
-class TestSemaphore(LimitedTestCase):
+class TestSemaphore(tests.LimitedTestCase):
def test_bounded(self):
- sem = semaphore.CappedSemaphore(2, limit=3)
+ sem = eventlet.CappedSemaphore(2, limit=3)
self.assertEqual(sem.acquire(), True)
self.assertEqual(sem.acquire(), True)
gt1 = eventlet.spawn(sem.release)
@@ -24,28 +23,28 @@ class TestSemaphore(LimitedTestCase):
gt2.wait()
def test_bounded_with_zero_limit(self):
- sem = semaphore.CappedSemaphore(0, 0)
+ sem = eventlet.CappedSemaphore(0, 0)
gt = eventlet.spawn(sem.acquire)
sem.release()
gt.wait()
def test_non_blocking(self):
- sem = semaphore.Semaphore(0)
+ sem = eventlet.Semaphore(0)
self.assertEqual(sem.acquire(blocking=False), False)
def test_timeout(self):
- sem = semaphore.Semaphore(0)
+ sem = eventlet.Semaphore(0)
start = time.time()
self.assertEqual(sem.acquire(timeout=0.1), False)
self.assertTrue(time.time() - start >= 0.1)
def test_timeout_non_blocking(self):
- sem = semaphore.Semaphore()
+ sem = eventlet.Semaphore()
self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
def test_semaphore_contention():
- g_mutex = semaphore.Semaphore()
+ g_mutex = eventlet.Semaphore()
counts = [0, 0]
def worker(no):
@@ -61,3 +60,14 @@ def test_semaphore_contention():
t2.kill()
assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
+
+
+def test_semaphore_type_check():
+ eventlet.Semaphore(0)
+ eventlet.Semaphore(1)
+ eventlet.Semaphore(1e2)
+
+ with tests.assert_raises(TypeError):
+ eventlet.Semaphore('foo')
+ with tests.assert_raises(ValueError):
+ eventlet.Semaphore(-1)