summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-22 00:19:37 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-22 00:19:37 +0300
commit40cf1621c6574cdbff9aaffa7b6ae83e1bf349d9 (patch)
treecd779fa0e27393fce724af241deb904fe8cdf1ed
parent3734b110e33cac99074c1fde2b240395d0739cf2 (diff)
downloadapscheduler-40cf1621c6574cdbff9aaffa7b6ae83e1bf349d9.tar.gz
Replaced the separate scheduler/worker example with a more practical alternative
-rw-r--r--docs/userguide.rst131
-rw-r--r--examples/separate_worker/async_scheduler.py19
-rw-r--r--examples/separate_worker/async_worker.py34
-rw-r--r--examples/separate_worker/sync_scheduler.py27
-rw-r--r--examples/separate_worker/sync_worker.py28
5 files changed, 93 insertions, 146 deletions
diff --git a/docs/userguide.rst b/docs/userguide.rst
index fd88d1c..5d9cc04 100644
--- a/docs/userguide.rst
+++ b/docs/userguide.rst
@@ -27,25 +27,25 @@ Introduction
The core concept of APScheduler is to give the user the ability to queue Python code to
be executed, either as soon as possible, later at a given time, or on a recurring
-schedule. To make this happen, APScheduler has two types of components: *schedulers* and
-*workers*.
-
-A scheduler is the user-facing interface of the system. When running, it asks its
-associated *data store* for *schedules* due to be run. For each such schedule, it then
-uses the schedule's associated *trigger* to calculate run times up to the present. For
-each run time, the scheduler creates a *job* in the data store, containing the
-designated run time and the identifier of the schedule it was derived from.
-
-A worker asks the data store for jobs, and then starts running those jobs. If the data
-store signals that it has new jobs, the worker will try to acquire those jobs if it is
-capable of accommodating more jobs. When a worker completes a job, it will then also ask
-the data store for as many more jobs as it can handle.
-
-By default, each scheduler starts an internal worker to simplify use, but in more
-complex use cases you may wish to run them in separate processes, or even on separate
-nodes. For this, you'll need both a persistent data store and an *event broker*, shared
-by both the scheduler(s) and worker(s). For more information, see the section below on
-running schedulers and workers separately.
+schedule.
+
+The *scheduler* is the user-facing interface of the system. When it's running, it does
+two things concurrently. The first is processing *schedules*. From its *data store*,
+it fetches *schedules* due to be run. For each such schedule, it then uses the
+schedule's *trigger* to calculate run times up to the present. For each run time, the
+scheduler creates a *job* in the data store, containing the designated run time and the
+identifier of the schedule it was derived from.
+
+The second role of the scheduler is running jobs. The scheduler asks the data store for
+jobs, and then starts running those jobs. If the data store signals that it has new
+jobs, the scheduler will try to acquire those jobs if it is capable of accommodating
+more. When a scheduler completes a job, it will then also ask the data store for as many
+more jobs as it can handle.
+
+By default, schedulers operate in both of these roles, but can be configured to only
+process schedules or run jobs if deemed necessary. It may even be desirable to use the
+scheduler only as an interface to an external data store while leaving schedule and job
+processing to other scheduler instances running elsewhere.
Basic concepts / glossary
=========================
@@ -69,12 +69,6 @@ directly request a task to be run.
A *data store* is used to store *schedules* and *jobs*, and to keep track of tasks.
-A *scheduler* fetches schedules due for their next runs from its associated data store
-and then creates new jobs accordingly.
-
-A *worker* fetches jobs from its data store, runs them and pushes the results back to
-the data store.
-
An *event broker* delivers published events to all interested parties. It facilitates
the cooperation between schedulers and workers by notifying them of new or updated
schedules or jobs.
@@ -291,11 +285,11 @@ Controlling how much a job can be started late
----------------------------------------------
Some tasks are time sensitive, and should not be run at all if it fails to be started on
-time (like, for example, if the worker(s) were down while they were supposed to be
+time (like, for example, if the scheduler(s) were down while they were supposed to be
running the scheduled jobs). You can control this time limit with the
``misfire_grace_time`` option passed to
-:meth:`~apscheduler.schedulers.sync.Scheduler.add_schedule`. A worker that acquires the
-job then checks if the current time is later than the deadline
+:meth:`~apscheduler.schedulers.sync.Scheduler.add_schedule`. A scheduler that acquires
+the job then checks if the current time is later than the deadline
(run time + misfire grace time) and if it is, it skips the execution of the job and
releases it with the outcome of :data:`~apscheduler.JobOutcome.`
@@ -334,11 +328,10 @@ affects the newly queued job.
Context variables
=================
-Schedulers and workers provide certain `context variables`_ available to the tasks being
-run:
+Schedulers provide certain `context variables`_ available to the tasks being run:
-* The current scheduler: :data:`~apscheduler.current_scheduler`
-* The current worker: :data:`~apscheduler.current_worker`
+* The current (synchronous) scheduler: :data:`~apscheduler.current_scheduler`
+* The current asynchronous scheduler: :data:`~apscheduler.current_async_scheduler`
* Information about the job being currently run: :data:`~apscheduler.current_job`
Here's an example::
@@ -359,9 +352,9 @@ Here's an example::
Subscribing to events
=====================
-Schedulers and workers have the ability to notify listeners when some event occurs in
-the scheduler system. Examples of such events would be schedulers or workers starting up
-or shutting down, or schedules or jobs being created or removed from the data store.
+Schedulers have the ability to notify listeners when some event occurs in the scheduler
+system. Examples of such events would be schedulers or workers starting up or shutting
+down, or schedules or jobs being created or removed from the data store.
To listen to events, you need a callable that takes a single positional argument which
is the event object. Then, you need to decide which events you're interested in:
@@ -375,7 +368,7 @@ is the event object. Then, you need to decide which events you're interested in:
def listener(event: Event) -> None:
print(f"Received {event.__class__.__name__}")
- scheduler.events.subscribe(listener, {JobAcquired, JobReleased})
+ scheduler.subscribe(listener, {JobAcquired, JobReleased})
.. code-tab:: python Asynchronous
@@ -384,7 +377,7 @@ is the event object. Then, you need to decide which events you're interested in:
async def listener(event: Event) -> None:
print(f"Received {event.__class__.__name__}")
- scheduler.events.subscribe(listener, {JobAcquired, JobReleased})
+ scheduler.subscribe(listener, {JobAcquired, JobReleased})
This example subscribes to the :class:`~apscheduler.JobAcquired` and
:class:`~apscheduler.JobAcquired` event types. The callback will receive an event of
@@ -456,66 +449,34 @@ Using multiple schedulers
There are several situations in which you would want to run several schedulers against
the same data store at once:
-* Running a server application (usually a web app) with multiple workers
+* Running a server application (usually a web app) with multiple worker processes
* You need fault tolerance (scheduling will continue even if a node or process running
a scheduler goes down)
-When you have multiple schedulers (or workers; see the next section) running at once,
-they need to be able to coordinate their efforts so that the schedules don't get
-processed more than once and the schedulers know when to wake up even if another
-scheduler added the next due schedule to the data store. To this end, a shared
-*event broker* must be configured.
+When you have multiple schedulers running at once, they need to be able to coordinate
+their efforts so that the schedules don't get processed more than once and the
+schedulers know when to wake up even if another scheduler added the next due schedule to
+the data store. To this end, a shared *event broker* must be configured.
.. seealso:: You can find practical examples of data store sharing in the
:file:`examples/web` directory.
-Running schedulers and workers separately
------------------------------------------
-
-Some deployment scenarios may warrant running workers separately from the schedulers.
-For example, if you want to set up a scalable worker pool, you can run just the workers
-in that pool and the schedulers elsewhere without the internal workers. To prevent the
-scheduler from starting an internal worker, you need to pass it the
-``start_worker=False`` option.
-
-Starting a worker without a scheduler looks very similar to the procedure to start a
-scheduler:
-
-.. tabs::
-
- .. code-tab: python Synchronous
-
- from apscheduler.workers.sync import Worker
+Using a scheduler without running it
+------------------------------------
+Some deployment scenarios may warrant the use of a scheduler for only interfacing with
+an external data store, for things like configuring tasks, adding schedules or queuing
+jobs. One such practical use case is a web application that needs to run heavy
+computations elsewhere so they don't cause performance issues with the web application
+itself.
- data_store = ...
- event_broker = ...
- worker = Worker(data_store, event_broker)
- worker.run_until_stopped()
+You can then run one or more schedulers against the same data store and event broker
+elsewhere where they don't disturb the web application. These schedulers will do all the
+heavy lifting like processing schedules and running jobs.
- .. code-tab: python asyncio
-
- import asyncio
-
- from apscheduler.workers.async_ import AsyncWorker
-
-
- async def main():
- data_store = ...
- event_broker = ...
- async with AsyncWorker(data_store, event_broker) as worker:
- await worker.wait_until_stopped()
-
- asyncio.run(main())
-
-There is one significant matter to take into consideration if you do this. The scheduler
-object, usually available from :data:`~apscheduler.current_scheduler`, will not be set
-since there is no scheduler running in the current thread/task.
-
-.. seealso:: A practical example of separate schedulers and workers can be found in the
+.. seealso:: A practical example of this separation of concerns can be found in the
:file:`examples/separate_worker` directory.
-
.. _troubleshooting:
Troubleshooting
diff --git a/examples/separate_worker/async_scheduler.py b/examples/separate_worker/async_scheduler.py
index 59c294c..570ff28 100644
--- a/examples/separate_worker/async_scheduler.py
+++ b/examples/separate_worker/async_scheduler.py
@@ -1,14 +1,12 @@
"""
-Example demonstrating the separation of scheduler and worker.
-This script runs the scheduler part. You need to be running both this and the worker
-script simultaneously in order for the scheduled task to be run.
+This is an example demonstrating the use of the scheduler as only an interface to the
+scheduling system. This script adds or updates a single schedule and then exits. To see
+the schedule acted on, you need to run the corresponding worker script (either
+async_worker.py or sync_worker.py).
-Requires the "postgresql" service to be running.
+This script requires the "postgresql" service to be running.
To install prerequisites: pip install sqlalchemy asyncpg
To run: python async_scheduler.py
-
-When run together with async_worker.py, it should print a line on the console
-on a one-second interval.
"""
from __future__ import annotations
@@ -19,7 +17,6 @@ import logging
from example_tasks import tick
from sqlalchemy.ext.asyncio import create_async_engine
-from apscheduler import SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.async_ import AsyncScheduler
@@ -37,11 +34,9 @@ async def main():
# from apscheduler.eventbrokers.redis import RedisEventBroker
# event_broker = RedisEventBroker.from_url("redis://localhost")
- async with AsyncScheduler(
- data_store, event_broker, role=SchedulerRole.scheduler
- ) as scheduler:
+ async with AsyncScheduler(data_store, event_broker) as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
- await scheduler.run_until_stopped()
+ # Note: we don't actually start the scheduler here!
logging.basicConfig(level=logging.INFO)
diff --git a/examples/separate_worker/async_worker.py b/examples/separate_worker/async_worker.py
index a4e0ef9..0fa1579 100644
--- a/examples/separate_worker/async_worker.py
+++ b/examples/separate_worker/async_worker.py
@@ -1,14 +1,13 @@
"""
-Example demonstrating the separation of scheduler and worker.
-This script runs the scheduler part. You need to be running both this and the worker
-script simultaneously in order for the scheduled task to be run.
+This is an example demonstrating how to run a scheduler to process schedules added by
+another scheduler elsewhere. Prior to starting this script, you need to run the script
+(either async_scheduler.py or sync_scheduler.py) that adds or updates a schedule to the
+data store. This script will then pick up that schedule and start spawning jobs that
+will print a line on the console on one-second intervals.
-Requires the "postgresql" service to be running.
+This script requires the "postgresql" service to be running.
To install prerequisites: pip install sqlalchemy asyncpg
To run: python async_worker.py
-
-When run together with async_scheduler.py, it should print a line on the console
-on a one-second interval.
"""
from __future__ import annotations
@@ -18,26 +17,23 @@ import logging
from sqlalchemy.ext.asyncio import create_async_engine
-from apscheduler import SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.async_ import AsyncScheduler
async def main():
- engine = create_async_engine(
- "postgresql+asyncpg://postgres:secret@localhost/testdb"
- )
- data_store = SQLAlchemyDataStore(engine)
- event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
+ async with AsyncScheduler(data_store, event_broker) as scheduler:
+ await scheduler.run_until_stopped()
- # Uncomment the next two lines to use the Redis event broker instead
- # from apscheduler.eventbrokers.redis import RedisEventBroker
- # event_broker = RedisEventBroker.from_url("redis://localhost")
- scheduler = AsyncScheduler(data_store, event_broker, role=SchedulerRole.worker)
- await scheduler.run_until_stopped()
+logging.basicConfig(level=logging.INFO)
+engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb")
+data_store = SQLAlchemyDataStore(engine)
+event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
+# Uncomment the next two lines to use the Redis event broker instead
+# from apscheduler.eventbrokers.redis import RedisEventBroker
+# event_broker = RedisEventBroker.from_url("redis://localhost")
-logging.basicConfig(level=logging.INFO)
asyncio.run(main())
diff --git a/examples/separate_worker/sync_scheduler.py b/examples/separate_worker/sync_scheduler.py
index 45d337d..4699018 100644
--- a/examples/separate_worker/sync_scheduler.py
+++ b/examples/separate_worker/sync_scheduler.py
@@ -1,14 +1,12 @@
"""
-Example demonstrating the separation of scheduler and worker.
-This script runs the scheduler part. You need to be running both this and the worker
-script simultaneously in order for the scheduled task to be run.
+This is an example demonstrating the use of the scheduler as only an interface to the
+scheduling system. This script adds or updates a single schedule and then exits. To see
+the schedule acted on, you need to run the corresponding worker script (either
+async_worker.py or sync_worker.py).
-Requires the "postgresql" and "redis" services to be running.
-To install prerequisites: pip install sqlalchemy psycopg2 redis
+This script requires the "postgresql" service to be running.
+To install prerequisites: pip install sqlalchemy asyncpg
To run: python sync_scheduler.py
-
-When run together with sync_worker.py, it should print a line on the console
-on a one-second interval.
"""
from __future__ import annotations
@@ -16,23 +14,22 @@ from __future__ import annotations
import logging
from example_tasks import tick
-from sqlalchemy.future import create_engine
+from sqlalchemy.ext.asyncio import create_async_engine
-from apscheduler import SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
-from apscheduler.eventbrokers.redis import RedisEventBroker
+from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
logging.basicConfig(level=logging.INFO)
-engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb")
+engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb")
data_store = SQLAlchemyDataStore(engine)
-event_broker = RedisEventBroker.from_url("redis://localhost")
+event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
# Uncomment the next two lines to use the MQTT event broker instead
# from apscheduler.eventbrokers.mqtt import MQTTEventBroker
# event_broker = MQTTEventBroker()
-with Scheduler(data_store, event_broker, role=SchedulerRole.scheduler) as scheduler:
+with Scheduler(data_store, event_broker) as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
- scheduler.run_until_stopped()
+ # Note: we don't actually start the scheduler here!
diff --git a/examples/separate_worker/sync_worker.py b/examples/separate_worker/sync_worker.py
index 24018ba..27d66b0 100644
--- a/examples/separate_worker/sync_worker.py
+++ b/examples/separate_worker/sync_worker.py
@@ -1,35 +1,33 @@
"""
-Example demonstrating a scheduler that only runs jobs but does not process schedules.
-You need to be running both this and the scheduler script simultaneously in order for
-the scheduled task to be run.
-
-Requires the "postgresql" and "redis" services to be running.
-To install prerequisites: pip install sqlalchemy psycopg2 redis
+This is an example demonstrating how to run a scheduler to process schedules added by
+another scheduler elsewhere. Prior to starting this script, you need to run the script
+(either async_scheduler.py or sync_scheduler.py) that adds or updates a schedule to the
+data store. This script will then pick up that schedule and start spawning jobs that
+will print a line on the console on one-second intervals.
+
+This script requires the "postgresql" service to be running.
+To install prerequisites: pip install sqlalchemy asyncpg
To run: python sync_worker.py
-
-When run together with sync_scheduler.py, it should print a line on the
-console on a one-second interval.
"""
from __future__ import annotations
import logging
-from sqlalchemy.future import create_engine
+from sqlalchemy.ext.asyncio import create_async_engine
-from apscheduler import SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
-from apscheduler.eventbrokers.redis import RedisEventBroker
+from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.sync import Scheduler
logging.basicConfig(level=logging.INFO)
-engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb")
+engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb")
data_store = SQLAlchemyDataStore(engine)
-event_broker = RedisEventBroker.from_url("redis://localhost")
+event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
# Uncomment the next two lines to use the MQTT event broker instead
# from apscheduler.eventbrokers.mqtt import MQTTEventBroker
# event_broker = MQTTEventBroker()
-with Scheduler(data_store, event_broker, role=SchedulerRole.worker) as scheduler:
+with Scheduler(data_store, event_broker) as scheduler:
scheduler.run_until_stopped()