diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-11 21:14:14 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-11 21:14:14 +0300 |
commit | 56afe91d5dc338db3440b2e9ecdea3e522dba30f (patch) | |
tree | 311380b0d953f09919d7e8c4c0a340507e5d0dc5 /tests | |
parent | 7248a78e7e787b728b083aaa8199eeba3a3f3023 (diff) | |
download | apscheduler-56afe91d5dc338db3440b2e9ecdea3e522dba30f.tar.gz |
Implemented a pluggable event broker system
Diffstat (limited to 'tests')
-rw-r--r-- | tests/conftest.py | 1 | ||||
-rw-r--r-- | tests/test_datastores.py | 4 | ||||
-rw-r--r-- | tests/test_eventbrokers.py | 279 | ||||
-rw-r--r-- | tests/test_events.py | 135 |
4 files changed, 281 insertions, 138 deletions
diff --git a/tests/conftest.py b/tests/conftest.py index bc5bab2..135c18d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,7 +24,6 @@ def timezone() -> ZoneInfo: @pytest.fixture(params=[ - pytest.param(None, id='none'), pytest.param(PickleSerializer, id='pickle'), pytest.param(CBORSerializer, id='cbor'), pytest.param(JSONSerializer, id='json') diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 52f8349..74db6e7 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -40,11 +40,11 @@ async def capture_events( events.append(event) if len(events) == limit: limit_event.set() - store.unsubscribe(token) + store.events.unsubscribe(token) events: List[Event] = [] limit_event = anyio.Event() - token = store.subscribe(listener, event_types) + token = store.events.subscribe(listener, event_types) yield events if limit: with anyio.fail_after(3): diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py new file mode 100644 index 0000000..6a0f45d --- /dev/null +++ b/tests/test_eventbrokers.py @@ -0,0 +1,279 @@ +from concurrent.futures import Future +from datetime import datetime, timezone +from queue import Empty, Queue +from typing import Callable + +import pytest +from _pytest.fixtures import FixtureRequest +from _pytest.logging import LogCaptureFixture +from anyio import create_memory_object_stream, fail_after + +from apscheduler.abc import AsyncEventBroker, EventBroker, Serializer +from apscheduler.events import Event, ScheduleAdded + + +@pytest.fixture +def local_broker() -> EventBroker: + from apscheduler.eventbrokers.local import LocalEventBroker + + return LocalEventBroker() + + +@pytest.fixture +def local_async_broker() -> AsyncEventBroker: + from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker + + return LocalAsyncEventBroker() + + +@pytest.fixture +def redis_broker(serializer: Serializer) -> EventBroker: + from apscheduler.eventbrokers.redis import RedisEventBroker + + broker = RedisEventBroker.from_url('redis://localhost:6379') + broker.serializer = serializer + return broker + + +@pytest.fixture +def mqtt_broker(serializer: Serializer) -> EventBroker: + from paho.mqtt.client import Client + + from apscheduler.eventbrokers.mqtt import MQTTEventBroker + + return MQTTEventBroker(Client(), serializer=serializer) + + +@pytest.fixture +async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker: + from asyncpg import create_pool + + from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker + + pool = await create_pool('postgres://postgres:secret@localhost:5432/testdb') + broker = AsyncpgEventBroker.from_asyncpg_pool(pool) + broker.serializer = serializer + yield broker + await pool.close() + + +@pytest.fixture(params=[ + pytest.param(pytest.lazy_fixture('local_broker'), id='local'), + pytest.param(pytest.lazy_fixture('redis_broker'), id='redis'), + pytest.param(pytest.lazy_fixture('mqtt_broker'), id='mqtt') +]) +def broker(request: FixtureRequest) -> Callable[[], EventBroker]: + return request.param + + +@pytest.fixture(params=[ + pytest.param(pytest.lazy_fixture('local_async_broker'), id='local'), + pytest.param(pytest.lazy_fixture('asyncpg_broker'), id='asyncpg') +]) +def async_broker(request: FixtureRequest) -> Callable[[], AsyncEventBroker]: + return request.param + + +class TestEventBroker: + def test_publish_subscribe(self, broker: EventBroker) -> None: + def subscriber1(event) -> None: + queue.put_nowait(event) + + def subscriber2(event) -> None: + queue.put_nowait(event) + + queue = Queue() + with broker: + broker.subscribe(subscriber1) + broker.subscribe(subscriber2) + event = ScheduleAdded( + schedule_id='schedule1', + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)) + broker.publish(event) + event1 = queue.get(timeout=3) + event2 = queue.get(timeout=1) + + assert event1 == event2 + assert isinstance(event1, ScheduleAdded) + assert isinstance(event1.timestamp, datetime) + assert event1.schedule_id == 'schedule1' + assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc) + + def test_unsubscribe(self, broker: EventBroker, caplog) -> None: + queue = Queue() + with broker: + token = broker.subscribe(queue.put_nowait) + broker.publish(Event()) + queue.get(timeout=3) + + broker.unsubscribe(token) + broker.publish(Event()) + with pytest.raises(Empty): + queue.get(timeout=0.1) + + def test_publish_no_subscribers(self, broker: EventBroker, caplog: LogCaptureFixture) -> None: + with broker: + broker.publish(Event()) + + assert not caplog.text + + def test_publish_exception(self, broker: EventBroker, caplog: LogCaptureFixture) -> None: + def bad_subscriber(event: Event) -> None: + raise Exception('foo') + + timestamp = datetime.now(timezone.utc) + event_future: Future[Event] = Future() + with broker: + broker.subscribe(bad_subscriber) + broker.subscribe(event_future.set_result) + broker.publish(Event(timestamp=timestamp)) + + event = event_future.result(3) + assert isinstance(event, Event) + assert event.timestamp == timestamp + assert 'Error delivering Event' in caplog.text + + +@pytest.mark.anyio +class TestAsyncEventBroker: + async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None: + def subscriber1(event) -> None: + send.send_nowait(event) + + async def subscriber2(event) -> None: + await send.send(event) + + send, receive = create_memory_object_stream(2) + async with async_broker: + async_broker.subscribe(subscriber1) + async_broker.subscribe(subscriber2) + event = ScheduleAdded( + schedule_id='schedule1', + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)) + await async_broker.publish(event) + + with fail_after(3): + event1 = await receive.receive() + event2 = await receive.receive() + + assert event1 == event2 + assert isinstance(event1, ScheduleAdded) + assert isinstance(event1.timestamp, datetime) + assert event1.schedule_id == 'schedule1' + assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc) + + async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None: + send, receive = create_memory_object_stream() + async with async_broker: + token = async_broker.subscribe(send.send) + await async_broker.publish(Event()) + with fail_after(3): + await receive.receive() + + async_broker.unsubscribe(token) + await async_broker.publish(Event()) + with pytest.raises(TimeoutError), fail_after(0.1): + await receive.receive() + + async def test_publish_no_subscribers(self, async_broker: AsyncEventBroker, + caplog: LogCaptureFixture) -> None: + async with async_broker: + await async_broker.publish(Event()) + + assert not caplog.text + + async def test_publish_exception(self, async_broker: AsyncEventBroker, + caplog: LogCaptureFixture) -> None: + def bad_subscriber(event: Event) -> None: + raise Exception('foo') + + timestamp = datetime.now(timezone.utc) + events = [] + async with async_broker: + async_broker.subscribe(bad_subscriber) + async_broker.subscribe(events.append) + await async_broker.publish(Event(timestamp=timestamp)) + + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp + assert 'Error delivering Event' in caplog.text +# +# def test_subscribe_coroutine_callback(self) -> None: +# async def callback(event: Event) -> None: +# pass +# +# with EventBroker() as eventhub: +# with pytest.raises(ValueError, match='Coroutine functions are not supported'): +# eventhub.subscribe(callback) +# +# def test_relay_events(self) -> None: +# timestamp = datetime.now(timezone.utc) +# events = [] +# with EventBroker() as eventhub1, EventBroker() as eventhub2: +# eventhub2.relay_events_from(eventhub1) +# eventhub2.subscribe(events.append) +# eventhub1.publish(Event(timestamp=timestamp)) +# +# assert isinstance(events[0], Event) +# assert events[0].timestamp == timestamp +# +# +# @pytest.mark.anyio +# class TestAsyncEventHub: +# async def test_publish(self) -> None: +# async def async_setitem(event: Event) -> None: +# events[1] = event +# +# timestamp = datetime.now(timezone.utc) +# events: List[Optional[Event]] = [None, None] +# async with AsyncEventBroker() as eventhub: +# eventhub.subscribe(partial(setitem, events, 0)) +# eventhub.subscribe(async_setitem) +# eventhub.publish(Event(timestamp=timestamp)) +# +# assert events[0] is events[1] +# assert isinstance(events[0], Event) +# assert events[0].timestamp == timestamp +# +# async def test_unsubscribe(self) -> None: +# timestamp = datetime.now(timezone.utc) +# events = [] +# async with AsyncEventBroker() as eventhub: +# token = eventhub.subscribe(events.append) +# eventhub.publish(Event(timestamp=timestamp)) +# eventhub.unsubscribe(token) +# eventhub.publish(Event(timestamp=timestamp)) +# +# assert len(events) == 1 +# +# async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None: +# async with AsyncEventBroker() as eventhub: +# eventhub.publish(Event(timestamp=datetime.now(timezone.utc))) +# +# assert not caplog.text +# +# async def test_publish_exception(self, caplog: LogCaptureFixture) -> None: +# def bad_subscriber(event: Event) -> None: +# raise Exception('foo') +# +# timestamp = datetime.now(timezone.utc) +# events = [] +# async with AsyncEventBroker() as eventhub: +# eventhub.subscribe(bad_subscriber) +# eventhub.subscribe(events.append) +# eventhub.publish(Event(timestamp=timestamp)) +# +# assert isinstance(events[0], Event) +# assert events[0].timestamp == timestamp +# assert 'Error delivering Event' in caplog.text +# +# async def test_relay_events(self) -> None: +# timestamp = datetime.now(timezone.utc) +# events = [] +# async with AsyncEventBroker() as eventhub1, AsyncEventBroker() as eventhub2: +# eventhub1.relay_events_from(eventhub2) +# eventhub1.subscribe(events.append) +# eventhub2.publish(Event(timestamp=timestamp)) +# +# assert isinstance(events[0], Event) +# assert events[0].timestamp == timestamp diff --git a/tests/test_events.py b/tests/test_events.py deleted file mode 100644 index bbe344f..0000000 --- a/tests/test_events.py +++ /dev/null @@ -1,135 +0,0 @@ -from datetime import datetime, timezone -from functools import partial -from operator import setitem -from typing import List, Optional - -import pytest -from _pytest.logging import LogCaptureFixture - -from apscheduler.events import AsyncEventHub, Event, EventHub - - -class TestEventHub: - def test_publish(self) -> None: - timestamp = datetime.now(timezone.utc) - events: List[Optional[Event]] = [None, None] - with EventHub() as eventhub: - eventhub.subscribe(partial(setitem, events, 0)) - eventhub.subscribe(partial(setitem, events, 1)) - eventhub.publish(Event(timestamp=timestamp)) - - assert events[0] is events[1] - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp - - def test_unsubscribe(self) -> None: - timestamp = datetime.now(timezone.utc) - events = [] - with EventHub() as eventhub: - token = eventhub.subscribe(events.append) - eventhub.publish(Event(timestamp=timestamp)) - eventhub.unsubscribe(token) - eventhub.publish(Event(timestamp=timestamp)) - - assert len(events) == 1 - - def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None: - with EventHub() as eventhub: - eventhub.publish(Event(timestamp=datetime.now(timezone.utc))) - - assert not caplog.text - - def test_publish_exception(self, caplog: LogCaptureFixture) -> None: - def bad_subscriber(event: Event) -> None: - raise Exception('foo') - - timestamp = datetime.now(timezone.utc) - events = [] - with EventHub() as eventhub: - eventhub.subscribe(bad_subscriber) - eventhub.subscribe(events.append) - eventhub.publish(Event(timestamp=timestamp)) - - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp - assert 'Error delivering Event' in caplog.text - - def test_subscribe_coroutine_callback(self) -> None: - async def callback(event: Event) -> None: - pass - - with EventHub() as eventhub: - with pytest.raises(ValueError, match='Coroutine functions are not supported'): - eventhub.subscribe(callback) - - def test_relay_events(self) -> None: - timestamp = datetime.now(timezone.utc) - events = [] - with EventHub() as eventhub1, EventHub() as eventhub2: - eventhub2.relay_events_from(eventhub1) - eventhub2.subscribe(events.append) - eventhub1.publish(Event(timestamp=timestamp)) - - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp - - -@pytest.mark.anyio -class TestAsyncEventHub: - async def test_publish(self) -> None: - async def async_setitem(event: Event) -> None: - events[1] = event - - timestamp = datetime.now(timezone.utc) - events: List[Optional[Event]] = [None, None] - async with AsyncEventHub() as eventhub: - eventhub.subscribe(partial(setitem, events, 0)) - eventhub.subscribe(async_setitem) - eventhub.publish(Event(timestamp=timestamp)) - - assert events[0] is events[1] - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp - - async def test_unsubscribe(self) -> None: - timestamp = datetime.now(timezone.utc) - events = [] - async with AsyncEventHub() as eventhub: - token = eventhub.subscribe(events.append) - eventhub.publish(Event(timestamp=timestamp)) - eventhub.unsubscribe(token) - eventhub.publish(Event(timestamp=timestamp)) - - assert len(events) == 1 - - async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None: - async with AsyncEventHub() as eventhub: - eventhub.publish(Event(timestamp=datetime.now(timezone.utc))) - - assert not caplog.text - - async def test_publish_exception(self, caplog: LogCaptureFixture) -> None: - def bad_subscriber(event: Event) -> None: - raise Exception('foo') - - timestamp = datetime.now(timezone.utc) - events = [] - async with AsyncEventHub() as eventhub: - eventhub.subscribe(bad_subscriber) - eventhub.subscribe(events.append) - eventhub.publish(Event(timestamp=timestamp)) - - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp - assert 'Error delivering Event' in caplog.text - - async def test_relay_events(self) -> None: - timestamp = datetime.now(timezone.utc) - events = [] - async with AsyncEventHub() as eventhub1, AsyncEventHub() as eventhub2: - eventhub1.relay_events_from(eventhub2) - eventhub1.subscribe(events.append) - eventhub2.publish(Event(timestamp=timestamp)) - - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp |