summaryrefslogtreecommitdiff
path: root/tests/hub_test.py
blob: 3f403b1ffbd5f302b801e0b5851bb2ad65c9b9bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
import errno
import fcntl
import os
import sys
import time

import tests
from tests import skip_if_no_itimer, skip_unless
import eventlet
from eventlet import debug, hubs
from eventlet.support import greenlets
import six


DELAY = 0.001


def noop():
    pass


class TestTimerCleanup(tests.LimitedTestCase):
    TEST_TIMEOUT = 2

    def test_cancel_immediate(self):
        hub = hubs.get_hub()
        stimers = hub.get_timers_count()
        scanceled = hub.timers_canceled
        for i in six.moves.range(2000):
            t = hubs.get_hub().schedule_call_global(60, noop)
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
        # there should be fewer than 1000 new timers and canceled
        self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
        self.assert_less_than_equal(hub.timers_canceled, 1000)

    def test_cancel_accumulated(self):
        hub = hubs.get_hub()
        stimers = hub.get_timers_count()
        scanceled = hub.timers_canceled
        for i in six.moves.range(2000):
            t = hubs.get_hub().schedule_call_global(60, noop)
            eventlet.sleep()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1, hub.timers)
        # there should be fewer than 1000 new timers and canceled
        self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
        self.assert_less_than_equal(hub.timers_canceled, 1000)

    def test_cancel_proportion(self):
        # if fewer than half the pending timers are canceled, it should
        # not clean them out
        hub = hubs.get_hub()
        uncanceled_timers = []
        stimers = hub.get_timers_count()
        scanceled = hub.timers_canceled
        for i in six.moves.range(1000):
            # 2/3rds of new timers are uncanceled
            t = hubs.get_hub().schedule_call_global(60, noop)
            t2 = hubs.get_hub().schedule_call_global(60, noop)
            t3 = hubs.get_hub().schedule_call_global(60, noop)
            eventlet.sleep()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
            uncanceled_timers.append(t2)
            uncanceled_timers.append(t3)
        # 3000 new timers, plus a few extras
        self.assert_less_than_equal(stimers + 3000,
                                    stimers + hub.get_timers_count())
        self.assertEqual(hub.timers_canceled, 1000)
        for t in uncanceled_timers:
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count())
        eventlet.sleep()


class TestMultipleListenersCleanup(tests.LimitedTestCase):
    def setUp(self):
        super(TestMultipleListenersCleanup, self).setUp()
        debug.hub_prevent_multiple_readers(False)
        debug.hub_exceptions(False)

    def tearDown(self):
        super(TestMultipleListenersCleanup, self).tearDown()
        debug.hub_prevent_multiple_readers(True)
        debug.hub_exceptions(True)

    def test_cleanup(self):
        r, w = os.pipe()
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)

        fcntl.fcntl(r, fcntl.F_SETFL,
                    fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)

        def readfd(fd):
            while True:
                try:
                    return os.read(fd, 1)
                except OSError as e:
                    if e.errno != errno.EAGAIN:
                        raise
                    hubs.trampoline(fd, read=True)

        first_listener = eventlet.spawn(readfd, r)
        eventlet.sleep()

        second_listener = eventlet.spawn(readfd, r)
        eventlet.sleep()

        hubs.get_hub().schedule_call_global(0, second_listener.throw,
                                            eventlet.Timeout(None))
        eventlet.sleep()

        os.write(w, b'.')
        self.assertEqual(first_listener.wait(), b'.')


class TestScheduleCall(tests.LimitedTestCase):

    def test_local(self):
        lst = [1]
        eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
        eventlet.sleep(0)
        eventlet.sleep(DELAY * 2)
        assert lst == [1], lst

    def test_global(self):
        lst = [1]
        eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
        eventlet.sleep(0)
        eventlet.sleep(DELAY * 2)
        assert lst == [], lst

    def test_ordering(self):
        lst = []
        hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
        hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
        hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
        while len(lst) < 3:
            eventlet.sleep(DELAY)
        self.assertEqual(lst, [1, 2, 3])


class TestDebug(tests.LimitedTestCase):

    def test_debug_listeners(self):
        hubs.get_hub().set_debug_listeners(True)
        hubs.get_hub().set_debug_listeners(False)

    def test_timer_exceptions(self):
        hubs.get_hub().set_timer_exceptions(True)
        hubs.get_hub().set_timer_exceptions(False)


class TestExceptionInMainloop(tests.LimitedTestCase):

    def test_sleep(self):
        # even if there was an error in the mainloop, the hub should continue
        # to work
        start = time.time()
        eventlet.sleep(DELAY)
        delay = time.time() - start

        assert delay >= DELAY * \
            0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
                delay, DELAY)

        def fail():
            1 // 0

        hubs.get_hub().schedule_call_global(0, fail)

        start = time.time()
        eventlet.sleep(DELAY)
        delay = time.time() - start

        assert delay >= DELAY * \
            0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
                delay, DELAY)


class TestExceptionInGreenthread(tests.LimitedTestCase):

    @tests.skip_unless(greenlets.preserves_excinfo)
    def test_exceptionpreservation(self):
        # events for controlling execution order
        gt1event = eventlet.Event()
        gt2event = eventlet.Event()

        def test_gt1():
            try:
                raise KeyError()
            except KeyError:
                gt1event.send('exception')
                gt2event.wait()
                assert sys.exc_info()[0] is KeyError
                gt1event.send('test passed')

        def test_gt2():
            gt1event.wait()
            gt1event.reset()
            assert sys.exc_info()[0] is None
            try:
                raise ValueError()
            except ValueError:
                gt2event.send('exception')
                gt1event.wait()
                assert sys.exc_info()[0] is ValueError

        g1 = eventlet.spawn(test_gt1)
        g2 = eventlet.spawn(test_gt2)
        try:
            g1.wait()
            g2.wait()
        finally:
            g1.kill()
            g2.kill()

    def test_exceptionleaks(self):
        # tests expected behaviour with all versions of greenlet
        def test_gt(sem):
            try:
                raise KeyError()
            except KeyError:
                sem.release()
                hubs.get_hub().switch()

        # semaphores for controlling execution order
        sem = eventlet.Semaphore()
        sem.acquire()
        g = eventlet.spawn(test_gt, sem)
        try:
            sem.acquire()
            assert sys.exc_info()[0] is None
        finally:
            g.kill()


class TestHubBlockingDetector(tests.LimitedTestCase):
    TEST_TIMEOUT = 10

    def test_block_detect(self):
        def look_im_blocking():
            import time
            time.sleep(2)
        from eventlet import debug
        debug.hub_blocking_detection(True)
        gt = eventlet.spawn(look_im_blocking)
        self.assertRaises(RuntimeError, gt.wait)
        debug.hub_blocking_detection(False)

    @tests.skip_if_no_itimer
    def test_block_detect_with_itimer(self):
        def look_im_blocking():
            import time
            time.sleep(0.5)

        from eventlet import debug
        debug.hub_blocking_detection(True, resolution=0.1)
        gt = eventlet.spawn(look_im_blocking)
        self.assertRaises(RuntimeError, gt.wait)
        debug.hub_blocking_detection(False)


class TestSuspend(tests.LimitedTestCase):
    TEST_TIMEOUT = 4
    longMessage = True
    maxDiff = None

    def test_suspend_doesnt_crash(self):
        import os
        import shutil
        import signal
        import subprocess
        import sys
        import tempfile
        self.tempdir = tempfile.mkdtemp('test_suspend')
        filename = os.path.join(self.tempdir, 'test_suspend.py')
        fd = open(filename, "w")
        fd.write("""import eventlet
eventlet.Timeout(0.5)
try:
   eventlet.listen(("127.0.0.1", 0)).accept()
except eventlet.Timeout:
   print("exited correctly")
""")
        fd.close()
        python_path = os.pathsep.join(sys.path + [self.tempdir])
        new_env = os.environ.copy()
        new_env['PYTHONPATH'] = python_path
        p = subprocess.Popen([sys.executable,
                              os.path.join(self.tempdir, filename)],
                             stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
        eventlet.sleep(0.4)  # wait for process to hit accept
        os.kill(p.pid, signal.SIGSTOP)  # suspend and resume to generate EINTR
        os.kill(p.pid, signal.SIGCONT)
        output, _ = p.communicate()
        lines = output.decode('utf-8', 'replace').splitlines()
        assert "exited correctly" in lines[-1], output
        shutil.rmtree(self.tempdir)


def test_repeated_select_bad_fd():
    from eventlet.green import select

    def once():
        try:
            select.select([-1], [], [])
            assert False, 'Expected ValueError'
        except ValueError:
            pass

    once()
    once()


def test_fork():
    tests.run_isolated('hub_fork.py')


def test_fork_simple():
    tests.run_isolated('hub_fork_simple.py')


class TestDeadRunLoop(tests.LimitedTestCase):
    TEST_TIMEOUT = 2

    class CustomException(Exception):
        pass

    def test_kill(self):
        """ Checks that killing a process after the hub runloop dies does
        not immediately return to hub greenlet's parent and schedule a
        redundant timer. """
        hub = hubs.get_hub()

        def dummyproc():
            hub.switch()

        g = eventlet.spawn(dummyproc)
        eventlet.sleep(0)  # let dummyproc run
        assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
        self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
                          KeyboardInterrupt())

        # kill dummyproc, this schedules a timer to return execution to
        # this greenlet before throwing an exception in dummyproc.
        # it is from this timer that execution should be returned to this
        # greenlet, and not by propagating of the terminating greenlet.
        g.kill()
        with eventlet.Timeout(0.5, self.CustomException()):
            # we now switch to the hub, there should be no existing timers
            # that switch back to this greenlet and so this hub.switch()
            # call should block indefinitely.
            self.assertRaises(self.CustomException, hub.switch)

    def test_parent(self):
        """ Checks that a terminating greenthread whose parent
        was a previous, now-defunct hub greenlet returns execution to
        the hub runloop and not the hub greenlet's parent. """
        hub = hubs.get_hub()

        def dummyproc():
            pass

        g = eventlet.spawn(dummyproc)
        assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
        self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
                          KeyboardInterrupt())

        assert not g.dead  # check dummyproc hasn't completed
        with eventlet.Timeout(0.5, self.CustomException()):
            # we now switch to the hub which will allow
            # completion of dummyproc.
            # this should return execution back to the runloop and not
            # this greenlet so that hub.switch() would block indefinitely.
            self.assertRaises(self.CustomException, hub.switch)
        assert g.dead  # sanity check that dummyproc has completed


def test_use_hub_class():
    tests.run_isolated('hub_use_hub_class.py')


def test_kqueue_unsupported():
    # https://github.com/eventlet/eventlet/issues/38
    # get_hub on windows broken by kqueue
    tests.run_isolated('hub_kqueue_unsupported.py')