diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-22 00:19:37 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-22 00:19:37 +0300 |
commit | 40cf1621c6574cdbff9aaffa7b6ae83e1bf349d9 (patch) | |
tree | cd779fa0e27393fce724af241deb904fe8cdf1ed | |
parent | 3734b110e33cac99074c1fde2b240395d0739cf2 (diff) | |
download | apscheduler-40cf1621c6574cdbff9aaffa7b6ae83e1bf349d9.tar.gz |
Replaced the separate scheduler/worker example with a more practical alternative
-rw-r--r-- | docs/userguide.rst | 131 | ||||
-rw-r--r-- | examples/separate_worker/async_scheduler.py | 19 | ||||
-rw-r--r-- | examples/separate_worker/async_worker.py | 34 | ||||
-rw-r--r-- | examples/separate_worker/sync_scheduler.py | 27 | ||||
-rw-r--r-- | examples/separate_worker/sync_worker.py | 28 |
5 files changed, 93 insertions, 146 deletions
diff --git a/docs/userguide.rst b/docs/userguide.rst index fd88d1c..5d9cc04 100644 --- a/docs/userguide.rst +++ b/docs/userguide.rst @@ -27,25 +27,25 @@ Introduction The core concept of APScheduler is to give the user the ability to queue Python code to be executed, either as soon as possible, later at a given time, or on a recurring -schedule. To make this happen, APScheduler has two types of components: *schedulers* and -*workers*. - -A scheduler is the user-facing interface of the system. When running, it asks its -associated *data store* for *schedules* due to be run. For each such schedule, it then -uses the schedule's associated *trigger* to calculate run times up to the present. For -each run time, the scheduler creates a *job* in the data store, containing the -designated run time and the identifier of the schedule it was derived from. - -A worker asks the data store for jobs, and then starts running those jobs. If the data -store signals that it has new jobs, the worker will try to acquire those jobs if it is -capable of accommodating more jobs. When a worker completes a job, it will then also ask -the data store for as many more jobs as it can handle. - -By default, each scheduler starts an internal worker to simplify use, but in more -complex use cases you may wish to run them in separate processes, or even on separate -nodes. For this, you'll need both a persistent data store and an *event broker*, shared -by both the scheduler(s) and worker(s). For more information, see the section below on -running schedulers and workers separately. +schedule. + +The *scheduler* is the user-facing interface of the system. When it's running, it does +two things concurrently. The first is processing *schedules*. From its *data store*, +it fetches *schedules* due to be run. For each such schedule, it then uses the +schedule's *trigger* to calculate run times up to the present. For each run time, the +scheduler creates a *job* in the data store, containing the designated run time and the +identifier of the schedule it was derived from. + +The second role of the scheduler is running jobs. The scheduler asks the data store for +jobs, and then starts running those jobs. If the data store signals that it has new +jobs, the scheduler will try to acquire those jobs if it is capable of accommodating +more. When a scheduler completes a job, it will then also ask the data store for as many +more jobs as it can handle. + +By default, schedulers operate in both of these roles, but can be configured to only +process schedules or run jobs if deemed necessary. It may even be desirable to use the +scheduler only as an interface to an external data store while leaving schedule and job +processing to other scheduler instances running elsewhere. Basic concepts / glossary ========================= @@ -69,12 +69,6 @@ directly request a task to be run. A *data store* is used to store *schedules* and *jobs*, and to keep track of tasks. -A *scheduler* fetches schedules due for their next runs from its associated data store -and then creates new jobs accordingly. - -A *worker* fetches jobs from its data store, runs them and pushes the results back to -the data store. - An *event broker* delivers published events to all interested parties. It facilitates the cooperation between schedulers and workers by notifying them of new or updated schedules or jobs. @@ -291,11 +285,11 @@ Controlling how much a job can be started late ---------------------------------------------- Some tasks are time sensitive, and should not be run at all if it fails to be started on -time (like, for example, if the worker(s) were down while they were supposed to be +time (like, for example, if the scheduler(s) were down while they were supposed to be running the scheduled jobs). You can control this time limit with the ``misfire_grace_time`` option passed to -:meth:`~apscheduler.schedulers.sync.Scheduler.add_schedule`. A worker that acquires the -job then checks if the current time is later than the deadline +:meth:`~apscheduler.schedulers.sync.Scheduler.add_schedule`. A scheduler that acquires +the job then checks if the current time is later than the deadline (run time + misfire grace time) and if it is, it skips the execution of the job and releases it with the outcome of :data:`~apscheduler.JobOutcome.` @@ -334,11 +328,10 @@ affects the newly queued job. Context variables ================= -Schedulers and workers provide certain `context variables`_ available to the tasks being -run: +Schedulers provide certain `context variables`_ available to the tasks being run: -* The current scheduler: :data:`~apscheduler.current_scheduler` -* The current worker: :data:`~apscheduler.current_worker` +* The current (synchronous) scheduler: :data:`~apscheduler.current_scheduler` +* The current asynchronous scheduler: :data:`~apscheduler.current_async_scheduler` * Information about the job being currently run: :data:`~apscheduler.current_job` Here's an example:: @@ -359,9 +352,9 @@ Here's an example:: Subscribing to events ===================== -Schedulers and workers have the ability to notify listeners when some event occurs in -the scheduler system. Examples of such events would be schedulers or workers starting up -or shutting down, or schedules or jobs being created or removed from the data store. +Schedulers have the ability to notify listeners when some event occurs in the scheduler +system. Examples of such events would be schedulers or workers starting up or shutting +down, or schedules or jobs being created or removed from the data store. To listen to events, you need a callable that takes a single positional argument which is the event object. Then, you need to decide which events you're interested in: @@ -375,7 +368,7 @@ is the event object. Then, you need to decide which events you're interested in: def listener(event: Event) -> None: print(f"Received {event.__class__.__name__}") - scheduler.events.subscribe(listener, {JobAcquired, JobReleased}) + scheduler.subscribe(listener, {JobAcquired, JobReleased}) .. code-tab:: python Asynchronous @@ -384,7 +377,7 @@ is the event object. Then, you need to decide which events you're interested in: async def listener(event: Event) -> None: print(f"Received {event.__class__.__name__}") - scheduler.events.subscribe(listener, {JobAcquired, JobReleased}) + scheduler.subscribe(listener, {JobAcquired, JobReleased}) This example subscribes to the :class:`~apscheduler.JobAcquired` and :class:`~apscheduler.JobAcquired` event types. The callback will receive an event of @@ -456,66 +449,34 @@ Using multiple schedulers There are several situations in which you would want to run several schedulers against the same data store at once: -* Running a server application (usually a web app) with multiple workers +* Running a server application (usually a web app) with multiple worker processes * You need fault tolerance (scheduling will continue even if a node or process running a scheduler goes down) -When you have multiple schedulers (or workers; see the next section) running at once, -they need to be able to coordinate their efforts so that the schedules don't get -processed more than once and the schedulers know when to wake up even if another -scheduler added the next due schedule to the data store. To this end, a shared -*event broker* must be configured. +When you have multiple schedulers running at once, they need to be able to coordinate +their efforts so that the schedules don't get processed more than once and the +schedulers know when to wake up even if another scheduler added the next due schedule to +the data store. To this end, a shared *event broker* must be configured. .. seealso:: You can find practical examples of data store sharing in the :file:`examples/web` directory. -Running schedulers and workers separately ------------------------------------------ - -Some deployment scenarios may warrant running workers separately from the schedulers. -For example, if you want to set up a scalable worker pool, you can run just the workers -in that pool and the schedulers elsewhere without the internal workers. To prevent the -scheduler from starting an internal worker, you need to pass it the -``start_worker=False`` option. - -Starting a worker without a scheduler looks very similar to the procedure to start a -scheduler: - -.. tabs:: - - .. code-tab: python Synchronous - - from apscheduler.workers.sync import Worker +Using a scheduler without running it +------------------------------------ +Some deployment scenarios may warrant the use of a scheduler for only interfacing with +an external data store, for things like configuring tasks, adding schedules or queuing +jobs. One such practical use case is a web application that needs to run heavy +computations elsewhere so they don't cause performance issues with the web application +itself. - data_store = ... - event_broker = ... - worker = Worker(data_store, event_broker) - worker.run_until_stopped() +You can then run one or more schedulers against the same data store and event broker +elsewhere where they don't disturb the web application. These schedulers will do all the +heavy lifting like processing schedules and running jobs. - .. code-tab: python asyncio - - import asyncio - - from apscheduler.workers.async_ import AsyncWorker - - - async def main(): - data_store = ... - event_broker = ... - async with AsyncWorker(data_store, event_broker) as worker: - await worker.wait_until_stopped() - - asyncio.run(main()) - -There is one significant matter to take into consideration if you do this. The scheduler -object, usually available from :data:`~apscheduler.current_scheduler`, will not be set -since there is no scheduler running in the current thread/task. - -.. seealso:: A practical example of separate schedulers and workers can be found in the +.. seealso:: A practical example of this separation of concerns can be found in the :file:`examples/separate_worker` directory. - .. _troubleshooting: Troubleshooting diff --git a/examples/separate_worker/async_scheduler.py b/examples/separate_worker/async_scheduler.py index 59c294c..570ff28 100644 --- a/examples/separate_worker/async_scheduler.py +++ b/examples/separate_worker/async_scheduler.py @@ -1,14 +1,12 @@ """ -Example demonstrating the separation of scheduler and worker. -This script runs the scheduler part. You need to be running both this and the worker -script simultaneously in order for the scheduled task to be run. +This is an example demonstrating the use of the scheduler as only an interface to the +scheduling system. This script adds or updates a single schedule and then exits. To see +the schedule acted on, you need to run the corresponding worker script (either +async_worker.py or sync_worker.py). -Requires the "postgresql" service to be running. +This script requires the "postgresql" service to be running. To install prerequisites: pip install sqlalchemy asyncpg To run: python async_scheduler.py - -When run together with async_worker.py, it should print a line on the console -on a one-second interval. """ from __future__ import annotations @@ -19,7 +17,6 @@ import logging from example_tasks import tick from sqlalchemy.ext.asyncio import create_async_engine -from apscheduler import SchedulerRole from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker from apscheduler.schedulers.async_ import AsyncScheduler @@ -37,11 +34,9 @@ async def main(): # from apscheduler.eventbrokers.redis import RedisEventBroker # event_broker = RedisEventBroker.from_url("redis://localhost") - async with AsyncScheduler( - data_store, event_broker, role=SchedulerRole.scheduler - ) as scheduler: + async with AsyncScheduler(data_store, event_broker) as scheduler: await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") - await scheduler.run_until_stopped() + # Note: we don't actually start the scheduler here! logging.basicConfig(level=logging.INFO) diff --git a/examples/separate_worker/async_worker.py b/examples/separate_worker/async_worker.py index a4e0ef9..0fa1579 100644 --- a/examples/separate_worker/async_worker.py +++ b/examples/separate_worker/async_worker.py @@ -1,14 +1,13 @@ """ -Example demonstrating the separation of scheduler and worker. -This script runs the scheduler part. You need to be running both this and the worker -script simultaneously in order for the scheduled task to be run. +This is an example demonstrating how to run a scheduler to process schedules added by +another scheduler elsewhere. Prior to starting this script, you need to run the script +(either async_scheduler.py or sync_scheduler.py) that adds or updates a schedule to the +data store. This script will then pick up that schedule and start spawning jobs that +will print a line on the console on one-second intervals. -Requires the "postgresql" service to be running. +This script requires the "postgresql" service to be running. To install prerequisites: pip install sqlalchemy asyncpg To run: python async_worker.py - -When run together with async_scheduler.py, it should print a line on the console -on a one-second interval. """ from __future__ import annotations @@ -18,26 +17,23 @@ import logging from sqlalchemy.ext.asyncio import create_async_engine -from apscheduler import SchedulerRole from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker from apscheduler.schedulers.async_ import AsyncScheduler async def main(): - engine = create_async_engine( - "postgresql+asyncpg://postgres:secret@localhost/testdb" - ) - data_store = SQLAlchemyDataStore(engine) - event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) + async with AsyncScheduler(data_store, event_broker) as scheduler: + await scheduler.run_until_stopped() - # Uncomment the next two lines to use the Redis event broker instead - # from apscheduler.eventbrokers.redis import RedisEventBroker - # event_broker = RedisEventBroker.from_url("redis://localhost") - scheduler = AsyncScheduler(data_store, event_broker, role=SchedulerRole.worker) - await scheduler.run_until_stopped() +logging.basicConfig(level=logging.INFO) +engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb") +data_store = SQLAlchemyDataStore(engine) +event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) +# Uncomment the next two lines to use the Redis event broker instead +# from apscheduler.eventbrokers.redis import RedisEventBroker +# event_broker = RedisEventBroker.from_url("redis://localhost") -logging.basicConfig(level=logging.INFO) asyncio.run(main()) diff --git a/examples/separate_worker/sync_scheduler.py b/examples/separate_worker/sync_scheduler.py index 45d337d..4699018 100644 --- a/examples/separate_worker/sync_scheduler.py +++ b/examples/separate_worker/sync_scheduler.py @@ -1,14 +1,12 @@ """ -Example demonstrating the separation of scheduler and worker. -This script runs the scheduler part. You need to be running both this and the worker -script simultaneously in order for the scheduled task to be run. +This is an example demonstrating the use of the scheduler as only an interface to the +scheduling system. This script adds or updates a single schedule and then exits. To see +the schedule acted on, you need to run the corresponding worker script (either +async_worker.py or sync_worker.py). -Requires the "postgresql" and "redis" services to be running. -To install prerequisites: pip install sqlalchemy psycopg2 redis +This script requires the "postgresql" service to be running. +To install prerequisites: pip install sqlalchemy asyncpg To run: python sync_scheduler.py - -When run together with sync_worker.py, it should print a line on the console -on a one-second interval. """ from __future__ import annotations @@ -16,23 +14,22 @@ from __future__ import annotations import logging from example_tasks import tick -from sqlalchemy.future import create_engine +from sqlalchemy.ext.asyncio import create_async_engine -from apscheduler import SchedulerRole from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore -from apscheduler.eventbrokers.redis import RedisEventBroker +from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.interval import IntervalTrigger logging.basicConfig(level=logging.INFO) -engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb") +engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb") data_store = SQLAlchemyDataStore(engine) -event_broker = RedisEventBroker.from_url("redis://localhost") +event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) # Uncomment the next two lines to use the MQTT event broker instead # from apscheduler.eventbrokers.mqtt import MQTTEventBroker # event_broker = MQTTEventBroker() -with Scheduler(data_store, event_broker, role=SchedulerRole.scheduler) as scheduler: +with Scheduler(data_store, event_broker) as scheduler: scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") - scheduler.run_until_stopped() + # Note: we don't actually start the scheduler here! diff --git a/examples/separate_worker/sync_worker.py b/examples/separate_worker/sync_worker.py index 24018ba..27d66b0 100644 --- a/examples/separate_worker/sync_worker.py +++ b/examples/separate_worker/sync_worker.py @@ -1,35 +1,33 @@ """ -Example demonstrating a scheduler that only runs jobs but does not process schedules. -You need to be running both this and the scheduler script simultaneously in order for -the scheduled task to be run. - -Requires the "postgresql" and "redis" services to be running. -To install prerequisites: pip install sqlalchemy psycopg2 redis +This is an example demonstrating how to run a scheduler to process schedules added by +another scheduler elsewhere. Prior to starting this script, you need to run the script +(either async_scheduler.py or sync_scheduler.py) that adds or updates a schedule to the +data store. This script will then pick up that schedule and start spawning jobs that +will print a line on the console on one-second intervals. + +This script requires the "postgresql" service to be running. +To install prerequisites: pip install sqlalchemy asyncpg To run: python sync_worker.py - -When run together with sync_scheduler.py, it should print a line on the -console on a one-second interval. """ from __future__ import annotations import logging -from sqlalchemy.future import create_engine +from sqlalchemy.ext.asyncio import create_async_engine -from apscheduler import SchedulerRole from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore -from apscheduler.eventbrokers.redis import RedisEventBroker +from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker from apscheduler.schedulers.sync import Scheduler logging.basicConfig(level=logging.INFO) -engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb") +engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb") data_store = SQLAlchemyDataStore(engine) -event_broker = RedisEventBroker.from_url("redis://localhost") +event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) # Uncomment the next two lines to use the MQTT event broker instead # from apscheduler.eventbrokers.mqtt import MQTTEventBroker # event_broker = MQTTEventBroker() -with Scheduler(data_store, event_broker, role=SchedulerRole.worker) as scheduler: +with Scheduler(data_store, event_broker) as scheduler: scheduler.run_until_stopped() |