summaryrefslogtreecommitdiff
path: root/Lib/test/test_asyncio/test_locks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_locks.py')
-rw-r--r--Lib/test/test_asyncio/test_locks.py70
1 files changed, 57 insertions, 13 deletions
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 441adeea8f..b2492c1acf 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -720,24 +720,68 @@ class ConditionTests(test_utils.TestCase):
self.loop.run_until_complete(f())
def test_explicit_lock(self):
- lock = asyncio.Lock()
- cond = asyncio.Condition(lock)
+ async def f(lock=None, cond=None):
+ if lock is None:
+ lock = asyncio.Lock()
+ if cond is None:
+ cond = asyncio.Condition(lock)
+ self.assertIs(cond._lock, lock)
+ self.assertFalse(lock.locked())
+ self.assertFalse(cond.locked())
+ async with cond:
+ self.assertTrue(lock.locked())
+ self.assertTrue(cond.locked())
+ self.assertFalse(lock.locked())
+ self.assertFalse(cond.locked())
+ async with lock:
+ self.assertTrue(lock.locked())
+ self.assertTrue(cond.locked())
+ self.assertFalse(lock.locked())
+ self.assertFalse(cond.locked())
- self.assertIs(cond._lock, lock)
- self.assertIs(cond._loop, lock._loop)
+ # All should work in the same way.
+ self.loop.run_until_complete(f())
+ self.loop.run_until_complete(f(asyncio.Lock()))
+ lock = asyncio.Lock()
+ self.loop.run_until_complete(f(lock, asyncio.Condition(lock)))
def test_ambiguous_loops(self):
- loop = self.new_test_loop()
+ loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
- lock = asyncio.Lock()
- lock._loop = loop
-
- async def _create_condition():
- with self.assertRaises(ValueError):
- asyncio.Condition(lock)
-
- self.loop.run_until_complete(_create_condition())
+ async def wrong_loop_in_lock():
+ with self.assertRaises(TypeError):
+ asyncio.Lock(loop=loop) # actively disallowed since 3.10
+ lock = asyncio.Lock()
+ lock._loop = loop # use private API for testing
+ async with lock:
+ # acquired immediately via the fast-path
+ # without interaction with any event loop.
+ cond = asyncio.Condition(lock)
+ # cond.acquire() will trigger waiting on the lock
+ # and it will discover the event loop mismatch.
+ with self.assertRaisesRegex(
+ RuntimeError,
+ "is bound to a different event loop",
+ ):
+ await cond.acquire()
+
+ async def wrong_loop_in_cond():
+ # Same analogy here with the condition's loop.
+ lock = asyncio.Lock()
+ async with lock:
+ with self.assertRaises(TypeError):
+ asyncio.Condition(lock, loop=loop)
+ cond = asyncio.Condition(lock)
+ cond._loop = loop
+ with self.assertRaisesRegex(
+ RuntimeError,
+ "is bound to a different event loop",
+ ):
+ await cond.wait()
+
+ self.loop.run_until_complete(wrong_loop_in_lock())
+ self.loop.run_until_complete(wrong_loop_in_cond())
def test_timeout_in_block(self):
loop = asyncio.new_event_loop()