summaryrefslogtreecommitdiff
path: root/tests/test_eventbrokers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_eventbrokers.py')
-rw-r--r--tests/test_eventbrokers.py279
1 files changed, 279 insertions, 0 deletions
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
new file mode 100644
index 0000000..6a0f45d
--- /dev/null
+++ b/tests/test_eventbrokers.py
@@ -0,0 +1,279 @@
+from concurrent.futures import Future
+from datetime import datetime, timezone
+from queue import Empty, Queue
+from typing import Callable
+
+import pytest
+from _pytest.fixtures import FixtureRequest
+from _pytest.logging import LogCaptureFixture
+from anyio import create_memory_object_stream, fail_after
+
+from apscheduler.abc import AsyncEventBroker, EventBroker, Serializer
+from apscheduler.events import Event, ScheduleAdded
+
+
+@pytest.fixture
+def local_broker() -> EventBroker:
+ from apscheduler.eventbrokers.local import LocalEventBroker
+
+ return LocalEventBroker()
+
+
+@pytest.fixture
+def local_async_broker() -> AsyncEventBroker:
+ from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker
+
+ return LocalAsyncEventBroker()
+
+
+@pytest.fixture
+def redis_broker(serializer: Serializer) -> EventBroker:
+ from apscheduler.eventbrokers.redis import RedisEventBroker
+
+ broker = RedisEventBroker.from_url('redis://localhost:6379')
+ broker.serializer = serializer
+ return broker
+
+
+@pytest.fixture
+def mqtt_broker(serializer: Serializer) -> EventBroker:
+ from paho.mqtt.client import Client
+
+ from apscheduler.eventbrokers.mqtt import MQTTEventBroker
+
+ return MQTTEventBroker(Client(), serializer=serializer)
+
+
+@pytest.fixture
+async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker:
+ from asyncpg import create_pool
+
+ from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
+
+ pool = await create_pool('postgres://postgres:secret@localhost:5432/testdb')
+ broker = AsyncpgEventBroker.from_asyncpg_pool(pool)
+ broker.serializer = serializer
+ yield broker
+ await pool.close()
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('local_broker'), id='local'),
+ pytest.param(pytest.lazy_fixture('redis_broker'), id='redis'),
+ pytest.param(pytest.lazy_fixture('mqtt_broker'), id='mqtt')
+])
+def broker(request: FixtureRequest) -> Callable[[], EventBroker]:
+ return request.param
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('local_async_broker'), id='local'),
+ pytest.param(pytest.lazy_fixture('asyncpg_broker'), id='asyncpg')
+])
+def async_broker(request: FixtureRequest) -> Callable[[], AsyncEventBroker]:
+ return request.param
+
+
+class TestEventBroker:
+ def test_publish_subscribe(self, broker: EventBroker) -> None:
+ def subscriber1(event) -> None:
+ queue.put_nowait(event)
+
+ def subscriber2(event) -> None:
+ queue.put_nowait(event)
+
+ queue = Queue()
+ with broker:
+ broker.subscribe(subscriber1)
+ broker.subscribe(subscriber2)
+ event = ScheduleAdded(
+ schedule_id='schedule1',
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ broker.publish(event)
+ event1 = queue.get(timeout=3)
+ event2 = queue.get(timeout=1)
+
+ assert event1 == event2
+ assert isinstance(event1, ScheduleAdded)
+ assert isinstance(event1.timestamp, datetime)
+ assert event1.schedule_id == 'schedule1'
+ assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+
+ def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
+ queue = Queue()
+ with broker:
+ token = broker.subscribe(queue.put_nowait)
+ broker.publish(Event())
+ queue.get(timeout=3)
+
+ broker.unsubscribe(token)
+ broker.publish(Event())
+ with pytest.raises(Empty):
+ queue.get(timeout=0.1)
+
+ def test_publish_no_subscribers(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ with broker:
+ broker.publish(Event())
+
+ assert not caplog.text
+
+ def test_publish_exception(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception('foo')
+
+ timestamp = datetime.now(timezone.utc)
+ event_future: Future[Event] = Future()
+ with broker:
+ broker.subscribe(bad_subscriber)
+ broker.subscribe(event_future.set_result)
+ broker.publish(Event(timestamp=timestamp))
+
+ event = event_future.result(3)
+ assert isinstance(event, Event)
+ assert event.timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text
+
+
+@pytest.mark.anyio
+class TestAsyncEventBroker:
+ async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None:
+ def subscriber1(event) -> None:
+ send.send_nowait(event)
+
+ async def subscriber2(event) -> None:
+ await send.send(event)
+
+ send, receive = create_memory_object_stream(2)
+ async with async_broker:
+ async_broker.subscribe(subscriber1)
+ async_broker.subscribe(subscriber2)
+ event = ScheduleAdded(
+ schedule_id='schedule1',
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ await async_broker.publish(event)
+
+ with fail_after(3):
+ event1 = await receive.receive()
+ event2 = await receive.receive()
+
+ assert event1 == event2
+ assert isinstance(event1, ScheduleAdded)
+ assert isinstance(event1.timestamp, datetime)
+ assert event1.schedule_id == 'schedule1'
+ assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+
+ async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
+ send, receive = create_memory_object_stream()
+ async with async_broker:
+ token = async_broker.subscribe(send.send)
+ await async_broker.publish(Event())
+ with fail_after(3):
+ await receive.receive()
+
+ async_broker.unsubscribe(token)
+ await async_broker.publish(Event())
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
+
+ async def test_publish_no_subscribers(self, async_broker: AsyncEventBroker,
+ caplog: LogCaptureFixture) -> None:
+ async with async_broker:
+ await async_broker.publish(Event())
+
+ assert not caplog.text
+
+ async def test_publish_exception(self, async_broker: AsyncEventBroker,
+ caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception('foo')
+
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ async with async_broker:
+ async_broker.subscribe(bad_subscriber)
+ async_broker.subscribe(events.append)
+ await async_broker.publish(Event(timestamp=timestamp))
+
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text
+#
+# def test_subscribe_coroutine_callback(self) -> None:
+# async def callback(event: Event) -> None:
+# pass
+#
+# with EventBroker() as eventhub:
+# with pytest.raises(ValueError, match='Coroutine functions are not supported'):
+# eventhub.subscribe(callback)
+#
+# def test_relay_events(self) -> None:
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# with EventBroker() as eventhub1, EventBroker() as eventhub2:
+# eventhub2.relay_events_from(eventhub1)
+# eventhub2.subscribe(events.append)
+# eventhub1.publish(Event(timestamp=timestamp))
+#
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
+#
+#
+# @pytest.mark.anyio
+# class TestAsyncEventHub:
+# async def test_publish(self) -> None:
+# async def async_setitem(event: Event) -> None:
+# events[1] = event
+#
+# timestamp = datetime.now(timezone.utc)
+# events: List[Optional[Event]] = [None, None]
+# async with AsyncEventBroker() as eventhub:
+# eventhub.subscribe(partial(setitem, events, 0))
+# eventhub.subscribe(async_setitem)
+# eventhub.publish(Event(timestamp=timestamp))
+#
+# assert events[0] is events[1]
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
+#
+# async def test_unsubscribe(self) -> None:
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# async with AsyncEventBroker() as eventhub:
+# token = eventhub.subscribe(events.append)
+# eventhub.publish(Event(timestamp=timestamp))
+# eventhub.unsubscribe(token)
+# eventhub.publish(Event(timestamp=timestamp))
+#
+# assert len(events) == 1
+#
+# async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
+# async with AsyncEventBroker() as eventhub:
+# eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
+#
+# assert not caplog.text
+#
+# async def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
+# def bad_subscriber(event: Event) -> None:
+# raise Exception('foo')
+#
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# async with AsyncEventBroker() as eventhub:
+# eventhub.subscribe(bad_subscriber)
+# eventhub.subscribe(events.append)
+# eventhub.publish(Event(timestamp=timestamp))
+#
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
+# assert 'Error delivering Event' in caplog.text
+#
+# async def test_relay_events(self) -> None:
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# async with AsyncEventBroker() as eventhub1, AsyncEventBroker() as eventhub2:
+# eventhub1.relay_events_from(eventhub2)
+# eventhub1.subscribe(events.append)
+# eventhub2.publish(Event(timestamp=timestamp))
+#
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp