summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShaun Stanworth <shaun.stanworth@onefinestay.com>2014-11-09 16:35:25 +0000
committerSergey Shepelev <temotor@gmail.com>2015-02-21 14:46:45 +0300
commitbab11168093d63db8cd11b47321264b3ae91411c (patch)
tree6842737a63b136fa551221787c107983e7144f20
parenta6ce444265d36fb23361809d40da73caf4864487 (diff)
downloadeventlet-tm.tar.gz
semaphore: Don't hog a semaphore if someone else is waiting for ittm
https://github.com/eventlet/eventlet/issues/136 https://github.com/eventlet/eventlet/pull/163 (comment by Sergey Shepelev) _waiters is now deque, watch out for O(N) acquire()
-rw-r--r--eventlet/semaphore.py28
-rw-r--r--tests/semaphore_test.py21
2 files changed, 42 insertions, 7 deletions
diff --git a/eventlet/semaphore.py b/eventlet/semaphore.py
index 73dbbc1..962b304 100644
--- a/eventlet/semaphore.py
+++ b/eventlet/semaphore.py
@@ -1,4 +1,7 @@
from __future__ import with_statement
+
+import collections
+
from eventlet import greenthread
from eventlet import hubs
from eventlet.timeout import Timeout
@@ -35,7 +38,7 @@ class Semaphore(object):
if value < 0:
raise ValueError("Semaphore must be initialized with a positive "
"number, got %s" % value)
- self._waiters = set()
+ self._waiters = collections.deque()
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)),
@@ -80,8 +83,12 @@ class Semaphore(object):
raise ValueError("can't specify timeout for non-blocking acquire")
if not blocking and self.locked():
return False
- if self.counter <= 0:
- self._waiters.add(greenthread.getcurrent())
+
+ current_thread = greenthread.getcurrent()
+
+ if self.counter <= 0 or self._waiters:
+ if current_thread not in self._waiters:
+ self._waiters.append(current_thread)
try:
if timeout is not None:
ok = False
@@ -92,10 +99,19 @@ class Semaphore(object):
if not ok:
return False
else:
- while self.counter <= 0:
+ # If someone else is already in this wait loop, give them
+ # a chance to get out.
+ while True:
hubs.get_hub().switch()
+ if self.counter > 0:
+ break
finally:
- self._waiters.discard(greenthread.getcurrent())
+ try:
+ self._waiters.remove(current_thread)
+ except ValueError:
+ # Fine if its already been dropped.
+ pass
+
self.counter -= 1
return True
@@ -117,7 +133,7 @@ class Semaphore(object):
def _do_acquire(self):
if self._waiters and self.counter > 0:
- waiter = self._waiters.pop()
+ waiter = self._waiters.popleft()
waiter.switch()
def __exit__(self, typ, val, tb):
diff --git a/tests/semaphore_test.py b/tests/semaphore_test.py
index 1316330..ced9136 100644
--- a/tests/semaphore_test.py
+++ b/tests/semaphore_test.py
@@ -45,5 +45,24 @@ class TestSemaphore(LimitedTestCase):
self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
+def test_semaphore_contention():
+ g_mutex = semaphore.Semaphore()
+ counts = [0, 0]
+
+ def worker(no):
+ while min(counts) < 200:
+ with g_mutex:
+ counts[no - 1] += 1
+ eventlet.sleep(0.001)
+
+ t1 = eventlet.spawn(worker, no=1)
+ t2 = eventlet.spawn(worker, no=2)
+ eventlet.sleep(0.5)
+ t1.kill()
+ t2.kill()
+
+ assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
+
+
if __name__ == '__main__':
- unittest.main()
+ unittest.main() \ No newline at end of file