summaryrefslogtreecommitdiff
path: root/tests/semaphore_test.py
blob: cf6a29dafafaf35f51d386359fd9d4ceb03a26c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import time

import eventlet
import tests


class TestSemaphore(tests.LimitedTestCase):

    def test_bounded(self):
        sem = eventlet.CappedSemaphore(2, limit=3)
        self.assertEqual(sem.acquire(), True)
        self.assertEqual(sem.acquire(), True)
        gt1 = eventlet.spawn(sem.release)
        self.assertEqual(sem.acquire(), True)
        self.assertEqual(-3, sem.balance)
        sem.release()
        sem.release()
        sem.release()
        gt2 = eventlet.spawn(sem.acquire)
        sem.release()
        self.assertEqual(3, sem.balance)
        gt1.wait()
        gt2.wait()

    def test_bounded_with_zero_limit(self):
        sem = eventlet.CappedSemaphore(0, 0)
        gt = eventlet.spawn(sem.acquire)
        sem.release()
        gt.wait()

    def test_non_blocking(self):
        sem = eventlet.Semaphore(0)
        self.assertEqual(sem.acquire(blocking=False), False)

    def test_timeout(self):
        sem = eventlet.Semaphore(0)
        start = time.time()
        self.assertEqual(sem.acquire(timeout=0.1), False)
        self.assertTrue(time.time() - start >= 0.1)

    def test_timeout_non_blocking(self):
        sem = eventlet.Semaphore()
        self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)

    def test_reinit(self):
        # py39+ expects locks to have a _at_fork_reinit() method; since we
        # patch in Semaphores in eventlet.green.thread, they need it, too
        sem = eventlet.Semaphore()
        sem.acquire()
        sem._at_fork_reinit()
        self.assertEqual(sem.acquire(blocking=False), True)
        self.assertEqual(sem.acquire(blocking=False), False)

        sem = eventlet.Semaphore(0)
        sem.release()
        sem._at_fork_reinit()
        self.assertEqual(sem.acquire(blocking=False), False)

        sem = eventlet.Semaphore(2)
        sem.acquire()
        sem._at_fork_reinit()
        self.assertEqual(sem.acquire(blocking=False), True)
        self.assertEqual(sem.acquire(blocking=False), True)
        self.assertEqual(sem.acquire(blocking=False), False)


def test_semaphore_contention():
    g_mutex = eventlet.Semaphore()
    counts = [0, 0]

    def worker(no):
        while min(counts) < 200:
            with g_mutex:
                counts[no - 1] += 1
                eventlet.sleep(0.001)

    t1 = eventlet.spawn(worker, no=1)
    t2 = eventlet.spawn(worker, no=2)
    eventlet.sleep(0.5)
    t1.kill()
    t2.kill()

    assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts


def test_semaphore_type_check():
    eventlet.Semaphore(0)
    eventlet.Semaphore(1)
    eventlet.Semaphore(1e2)

    with tests.assert_raises(TypeError):
        eventlet.Semaphore('foo')
    with tests.assert_raises(ValueError):
        eventlet.Semaphore(-1)