diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
commit | 48722053dfb43de077df18a139abb16b0a7f7e24 (patch) | |
tree | bd55e709e0d4c02619ef0ec54390a8f792da2f74 /tests | |
parent | a58fca290e0831d377d496a69101e5e3dc4c604e (diff) | |
download | apscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz |
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_datastores.py | 4 | ||||
-rw-r--r-- | tests/test_eventbrokers.py | 8 | ||||
-rw-r--r-- | tests/test_schedulers.py | 4 | ||||
-rw-r--r-- | tests/test_workers.py | 8 |
4 files changed, 12 insertions, 12 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 4389069..e5b50a7 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -151,11 +151,11 @@ async def capture_events( events.append(event) if len(events) == limit: limit_event.set() - datastore.events.unsubscribe(token) + subscription.unsubscribe() events: List[Event] = [] limit_event = anyio.Event() - token = datastore.events.subscribe(listener, event_types) + subscription = datastore.events.subscribe(listener, event_types) yield events if limit: with anyio.fail_after(3): diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index f1f88c0..7097001 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -105,11 +105,11 @@ class TestEventBroker: def test_unsubscribe(self, broker: EventBroker, caplog) -> None: queue = Queue() with broker: - token = broker.subscribe(queue.put_nowait) + subscription = broker.subscribe(queue.put_nowait) broker.publish(Event()) queue.get(timeout=3) - broker.unsubscribe(token) + subscription.unsubscribe() broker.publish(Event()) with pytest.raises(Empty): queue.get(timeout=0.1) @@ -168,12 +168,12 @@ class TestAsyncEventBroker: async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None: send, receive = create_memory_object_stream() async with async_broker: - token = async_broker.subscribe(send.send) + subscription = async_broker.subscribe(send.send) await async_broker.publish(Event()) with fail_after(3): await receive.receive() - async_broker.unsubscribe(token) + subscription.unsubscribe() await async_broker.publish(Event()) with pytest.raises(TimeoutError), fail_after(0.1): await receive.receive() diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 990cfec..63729bb 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -33,7 +33,7 @@ class TestAsyncScheduler: received_events: List[Event] = [] event = anyio.Event() scheduler = AsyncScheduler(start_worker=False) - scheduler.subscribe(listener) + scheduler.events.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) async with scheduler: await scheduler.add_schedule(dummy_async_job, trigger, id='foo') @@ -84,7 +84,7 @@ class TestSyncScheduler: received_events: List[Event] = [] event = threading.Event() scheduler = Scheduler(start_worker=False) - scheduler.subscribe(listener) + scheduler.events.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) with scheduler: scheduler.add_schedule(dummy_sync_job, trigger, id='foo') diff --git a/tests/test_workers.py b/tests/test_workers.py index 1a98801..f8522f3 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -49,7 +49,7 @@ class TestAsyncWorker: event = anyio.Event() data_store = MemoryDataStore() worker = AsyncWorker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) async with worker: await worker.data_store.add_task(Task(id='task_id', func=target_func)) job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) @@ -108,7 +108,7 @@ class TestAsyncWorker: event = anyio.Event() data_store = MemoryDataStore() worker = AsyncWorker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) async with worker: await worker.data_store.add_task(Task(id='task_id', func=fail_func)) job = Job(task_id='task_id', schedule_id='foo', @@ -162,7 +162,7 @@ class TestSyncWorker: event = threading.Event() data_store = MemoryDataStore() worker = Worker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) with worker: worker.data_store.add_task(Task(id='task_id', func=sync_func)) job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) @@ -220,7 +220,7 @@ class TestSyncWorker: event = threading.Event() data_store = MemoryDataStore() worker = Worker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) with worker: worker.data_store.add_task(Task(id='task_id', func=fail_func)) job = Job(task_id='task_id', schedule_id='foo', |