summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
commit48722053dfb43de077df18a139abb16b0a7f7e24 (patch)
treebd55e709e0d4c02619ef0ec54390a8f792da2f74 /tests
parenta58fca290e0831d377d496a69101e5e3dc4c604e (diff)
downloadapscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_datastores.py4
-rw-r--r--tests/test_eventbrokers.py8
-rw-r--r--tests/test_schedulers.py4
-rw-r--r--tests/test_workers.py8
4 files changed, 12 insertions, 12 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 4389069..e5b50a7 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -151,11 +151,11 @@ async def capture_events(
events.append(event)
if len(events) == limit:
limit_event.set()
- datastore.events.unsubscribe(token)
+ subscription.unsubscribe()
events: List[Event] = []
limit_event = anyio.Event()
- token = datastore.events.subscribe(listener, event_types)
+ subscription = datastore.events.subscribe(listener, event_types)
yield events
if limit:
with anyio.fail_after(3):
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index f1f88c0..7097001 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -105,11 +105,11 @@ class TestEventBroker:
def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
queue = Queue()
with broker:
- token = broker.subscribe(queue.put_nowait)
+ subscription = broker.subscribe(queue.put_nowait)
broker.publish(Event())
queue.get(timeout=3)
- broker.unsubscribe(token)
+ subscription.unsubscribe()
broker.publish(Event())
with pytest.raises(Empty):
queue.get(timeout=0.1)
@@ -168,12 +168,12 @@ class TestAsyncEventBroker:
async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream()
async with async_broker:
- token = async_broker.subscribe(send.send)
+ subscription = async_broker.subscribe(send.send)
await async_broker.publish(Event())
with fail_after(3):
await receive.receive()
- async_broker.unsubscribe(token)
+ subscription.unsubscribe()
await async_broker.publish(Event())
with pytest.raises(TimeoutError), fail_after(0.1):
await receive.receive()
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 990cfec..63729bb 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -33,7 +33,7 @@ class TestAsyncScheduler:
received_events: List[Event] = []
event = anyio.Event()
scheduler = AsyncScheduler(start_worker=False)
- scheduler.subscribe(listener)
+ scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
async with scheduler:
await scheduler.add_schedule(dummy_async_job, trigger, id='foo')
@@ -84,7 +84,7 @@ class TestSyncScheduler:
received_events: List[Event] = []
event = threading.Event()
scheduler = Scheduler(start_worker=False)
- scheduler.subscribe(listener)
+ scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
with scheduler:
scheduler.add_schedule(dummy_sync_job, trigger, id='foo')
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 1a98801..f8522f3 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -49,7 +49,7 @@ class TestAsyncWorker:
event = anyio.Event()
data_store = MemoryDataStore()
worker = AsyncWorker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
async with worker:
await worker.data_store.add_task(Task(id='task_id', func=target_func))
job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
@@ -108,7 +108,7 @@ class TestAsyncWorker:
event = anyio.Event()
data_store = MemoryDataStore()
worker = AsyncWorker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
async with worker:
await worker.data_store.add_task(Task(id='task_id', func=fail_func))
job = Job(task_id='task_id', schedule_id='foo',
@@ -162,7 +162,7 @@ class TestSyncWorker:
event = threading.Event()
data_store = MemoryDataStore()
worker = Worker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
with worker:
worker.data_store.add_task(Task(id='task_id', func=sync_func))
job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
@@ -220,7 +220,7 @@ class TestSyncWorker:
event = threading.Event()
data_store = MemoryDataStore()
worker = Worker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
with worker:
worker.data_store.add_task(Task(id='task_id', func=fail_func))
job = Job(task_id='task_id', schedule_id='foo',