summaryrefslogtreecommitdiff
path: root/tests/test_executors_py35.py
blob: 7849eb028b949ad2f8e41cfa6a92b5055d6aa8c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""Contains test functions using Python 3.3+ syntax."""
from asyncio import CancelledError
from datetime import datetime

import pytest
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.executors.tornado import TornadoExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.tornado import TornadoScheduler
from pytz import utc


@pytest.fixture
def asyncio_scheduler(event_loop):
    scheduler = AsyncIOScheduler(event_loop=event_loop)
    scheduler.start(paused=True)
    yield scheduler
    scheduler.shutdown(False)


@pytest.fixture
def asyncio_executor(asyncio_scheduler):
    executor = AsyncIOExecutor()
    executor.start(asyncio_scheduler, 'default')
    yield executor
    executor.shutdown()


@pytest.fixture
def tornado_scheduler(io_loop):
    scheduler = TornadoScheduler(io_loop=io_loop)
    scheduler.start(paused=True)
    yield scheduler
    scheduler.shutdown(False)


@pytest.fixture
def tornado_executor(tornado_scheduler):
    executor = TornadoExecutor()
    executor.start(tornado_scheduler, 'default')
    yield executor
    executor.shutdown()


async def waiter(sleep, exception):
    await sleep(0.1)
    if exception:
        raise Exception('dummy error')
    else:
        return True


@pytest.mark.parametrize('exception', [False, True])
@pytest.mark.asyncio
async def test_run_coroutine_job(asyncio_scheduler, asyncio_executor, exception):
    from asyncio import Future, sleep

    future = Future()
    job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception])
    asyncio_executor._run_job_success = lambda job_id, events: future.set_result(events)
    asyncio_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc)
    asyncio_executor.submit_job(job, [datetime.now(utc)])
    events = await future
    assert len(events) == 1
    if exception:
        assert str(events[0].exception) == 'dummy error'
    else:
        assert events[0].retval is True


@pytest.mark.parametrize('exception', [False, True])
@pytest.mark.gen_test
async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, exception):
    from tornado.concurrent import Future
    from tornado.gen import sleep

    future = Future()
    job = tornado_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception])
    tornado_executor._run_job_success = lambda job_id, events: future.set_result(events)
    tornado_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc)
    tornado_executor.submit_job(job, [datetime.now(utc)])
    events = await future
    assert len(events) == 1
    if exception:
        assert str(events[0].exception) == 'dummy error'
    else:
        assert events[0].retval is True


@pytest.mark.asyncio
async def test_asyncio_executor_shutdown(asyncio_scheduler, asyncio_executor):
    """Test that the AsyncIO executor cancels its pending tasks on shutdown."""
    from asyncio import sleep

    job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, None])
    asyncio_executor.submit_job(job, [datetime.now(utc)])
    futures = asyncio_executor._pending_futures.copy()
    assert len(futures) == 1

    asyncio_executor.shutdown()
    with pytest.raises(CancelledError):
        await futures.pop()