summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2014-06-15 17:06:29 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2014-06-15 21:34:26 +0300
commit89ac8d637367ee62094433b17694162ea5118b17 (patch)
tree4ed64f87c5050241221d52a52208b94efee725a6
parent7fd9d4bdbc31e8daea240229f0385109b58659a0 (diff)
downloadapscheduler-89ac8d637367ee62094433b17694162ea5118b17.tar.gz
Separated the two pool executors to different classes and moved the debug executor to its own module
-rw-r--r--apscheduler/executors/debug.py15
-rw-r--r--apscheduler/executors/pool.py63
-rw-r--r--apscheduler/schedulers/base.py4
-rw-r--r--docs/modules/executors/debug.rst10
-rw-r--r--docs/modules/executors/pool.rst4
-rw-r--r--docs/userguide.rst36
-rw-r--r--examples/executors/processpool.py4
-rw-r--r--tests/test_executors.py18
-rw-r--r--tests/test_schedulers.py10
9 files changed, 96 insertions, 68 deletions
diff --git a/apscheduler/executors/debug.py b/apscheduler/executors/debug.py
new file mode 100644
index 0000000..f9e5959
--- /dev/null
+++ b/apscheduler/executors/debug.py
@@ -0,0 +1,15 @@
+import sys
+
+from apscheduler.executors.base import BaseExecutor, run_job
+
+
+class DebugExecutor(BaseExecutor):
+ """A special executor that executes the target callable directly instead of deferring it to a thread or process."""
+
+ def _do_submit_job(self, job, run_times):
+ try:
+ events = run_job(job, job._jobstore_alias, run_times, self._logger.name)
+ except:
+ self._run_job_error(job.id, *sys.exc_info())
+ else:
+ self._run_job_success(job.id, events)
diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py
index f3353a5..38bb7a5 100644
--- a/apscheduler/executors/pool.py
+++ b/apscheduler/executors/pool.py
@@ -1,52 +1,43 @@
+from abc import abstractmethod
import concurrent.futures
from apscheduler.executors.base import BaseExecutor, run_job
-class DebugExecutor(concurrent.futures.Executor):
- """A special executor that executes the target callable directly instead of deferring it to a thread or process."""
+class BasePoolExecutor(BaseExecutor):
+ @abstractmethod
+ def __init__(self, pool):
+ super(BasePoolExecutor, self).__init__()
+ self._pool = pool
- def submit(self, fn, *args, **kwargs):
- f = concurrent.futures.Future()
- try:
- retval = fn(*args, **kwargs)
- except Exception as e:
- f.set_exception(e)
- else:
- f.set_result(retval)
+ def _do_submit_job(self, job, run_times):
+ f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+ callback = lambda f: self._run_job_success(job.id, f.result())
+ f.add_done_callback(callback)
- return f
+ def shutdown(self, wait=True):
+ self._pool.shutdown(wait)
-class PoolExecutor(BaseExecutor):
+class ThreadPoolExecutor(BasePoolExecutor):
"""
- An executor that runs jobs in a concurrent.futures thread or process pool.
+ An executor that runs jobs in a concurrent.futures thread pool.
- :param str pool_type:
- The type of the pool to create:
-
- * ``thread``: create a thread pool
- * ``process``: create a process pool
- * ``debug``: run jobs directly in the calling thread
- :param max_workers: the size of the thread/process pool. Ignored for pool_type=debug.
+ :param max_workers: the maximum number of spawned threads.
"""
- def __init__(self, pool_type, max_workers=10):
- super(PoolExecutor, self).__init__()
+ def __init__(self, max_workers=10):
+ pool = concurrent.futures.ThreadPoolExecutor(max_workers)
+ super(ThreadPoolExecutor, self).__init__(pool)
- if pool_type == 'thread':
- self._pool = concurrent.futures.ThreadPoolExecutor(max_workers)
- elif pool_type == 'process':
- self._pool = concurrent.futures.ProcessPoolExecutor(max_workers)
- elif pool_type == 'debug':
- self._pool = DebugExecutor()
- else:
- raise ValueError('Unknown pool type: %s' % pool_type)
- def _do_submit_job(self, job, run_times):
- f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
- callback = lambda f: self._run_job_success(job.id, f.result())
- f.add_done_callback(callback)
+class ProcessPoolExecutor(BasePoolExecutor):
+ """
+ An executor that runs jobs in a concurrent.futures process pool.
- def shutdown(self, wait=True):
- self._pool.shutdown(wait)
+ :param max_workers: the maximum number of spawned processes.
+ """
+
+ def __init__(self, max_workers=10):
+ pool = concurrent.futures.ProcessPoolExecutor(max_workers)
+ super(ProcessPoolExecutor, self).__init__(pool)
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py
index eb2b5b0..bae8e37 100644
--- a/apscheduler/schedulers/base.py
+++ b/apscheduler/schedulers/base.py
@@ -12,7 +12,7 @@ import six
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor
-from apscheduler.executors.pool import PoolExecutor
+from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError, BaseJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job
@@ -581,7 +581,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
def _create_default_executor(self):
"""Creates a default executor store, specific to the particular scheduler type."""
- return PoolExecutor('thread')
+ return ThreadPoolExecutor()
def _create_default_jobstore(self):
"""Creates a default job store, specific to the particular scheduler type."""
diff --git a/docs/modules/executors/debug.rst b/docs/modules/executors/debug.rst
new file mode 100644
index 0000000..eadc94c
--- /dev/null
+++ b/docs/modules/executors/debug.rst
@@ -0,0 +1,10 @@
+:mod:`apscheduler.executors.debug`
+==================================
+
+.. automodule:: apscheduler.executors.debug
+
+Module Contents
+---------------
+
+.. autoclass:: DebugExecutor
+ :members:
diff --git a/docs/modules/executors/pool.rst b/docs/modules/executors/pool.rst
index 39bce1b..e1397cf 100644
--- a/docs/modules/executors/pool.rst
+++ b/docs/modules/executors/pool.rst
@@ -6,6 +6,8 @@
Module Contents
---------------
-.. autoclass:: PoolExecutor
+.. autoclass:: ThreadPoolExecutor
:members:
+.. autoclass:: ProcessPoolExecutor
+ :members:
diff --git a/docs/userguide.rst b/docs/userguide.rst
index 92764ce..385c4b5 100644
--- a/docs/userguide.rst
+++ b/docs/userguide.rst
@@ -89,10 +89,10 @@ If, however, you are in the position to choose freely, then
the recommended choice due to its strong data integrity protection.
Likewise, the choice of executors is usually made for you if you use one of the frameworks above.
-Otherwise, the default :class:`~apscheduler.executors.pool.PoolExecutor` should be good enough for most purposes.
-If your workload involves CPU intensive operations, you should configure your PoolExecutor to use process pooling
-instead of thread pooling to make use of multiple CPU cores. You can add a second PoolExecutor for this purpose, and
-only configure one of them for process pooling.
+Otherwise, the default :class:`~apscheduler.executors.pool.ThreadPoolExecutor` should be good enough for most purposes.
+If your workload involves CPU intensive operations, you should consider using
+:class:`~apscheduler.executors.pool.ProcessPoolExecutor` instead to make use of multiple CPU cores.
+You could even use both at once, adding the process pool executor as a secondary executor.
.. _scheduler-config:
@@ -118,8 +118,8 @@ Let's say you want to run BackgroundScheduler in your application with the defau
# Initialize the rest of the application here, or before the scheduler initialization
-This will get you a BackgroundScheduler with a MemoryJobStore named "default" and a PoolExecutor named "default" with a
-default maximum thread count of 10.
+This will get you a BackgroundScheduler with a MemoryJobStore named "default" and a ThreadPoolExecutor named "default"
+with a default maximum thread count of 10.
Now, suppose you want more. You want to have *two* job stores using *two* executors and you also want to tweak the
default values for new jobs and set a different timezone.
@@ -127,8 +127,8 @@ The following three examples are completely equivalent, and will get you:
* a MongoDBJobStore named "mongo"
* an SQLAlchemyJobStore named "default" (using SQLite)
-* a PoolExecutor using threads, named "default", with a worker count of 20
-* a PoolExecutor using subprocesses, named "processpool", with a worker count of 5
+* a ThreadPoolExecutor named "default", with a worker count of 20
+* a ProcessPoolExecutor named "processpool", with a worker count of 5
* UTC as the scheduler's timezone
* coalescing turned off for new jobs by default
* a default maximum instance limit of 3 for new jobs
@@ -140,7 +140,7 @@ Method 1::
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
- from apscheduler.executors.pool import PoolExecutor
+ from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
@@ -148,8 +148,8 @@ Method 1::
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
- 'default': PoolExecutor('thread', 20),
- 'processpool': PoolExecutor('process', 5)
+ 'default': ThreadPoolExecutor(20),
+ 'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
@@ -172,13 +172,11 @@ Method 2::
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
- 'class': 'apscheduler.executors.pool:PoolExecutor',
- 'type': 'thread',
+ 'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
- 'class': 'apscheduler.executors.pool:PoolExecutor',
- 'type': 'process',
+ 'class': 'apscheduler.executors.pool:ProcessPoolExecutor',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
@@ -193,7 +191,7 @@ Method 3::
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
- from apscheduler.executors.pool import PoolExecutor
+ from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
@@ -201,8 +199,8 @@ Method 3::
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
- 'default': PoolExecutor('thread', max_workers=20),
- 'processpool': PoolExecutor('process', max_workers=5)
+ 'default': ThreadPoolExecutor(max_workers=20),
+ 'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
@@ -253,7 +251,7 @@ requirements on your job:
#. Any arguments to the callable must be serializable
Of the builtin job stores, only MemoryJobStore doesn't serialize jobs.
-Of the builtin executors, only a PoolExecutor configured for process pooling will serialize jobs.
+Of the builtin executors, only ProcessPoolExecutor will serialize jobs.
.. important:: If you schedule jobs in a persistent job store during your application's initialization, you **MUST**
define an explicit ID for the job and use ``replace_existing=True`` or you will get a new copy of the job every time
diff --git a/examples/executors/processpool.py b/examples/executors/processpool.py
index b8988df..6bff793 100644
--- a/examples/executors/processpool.py
+++ b/examples/executors/processpool.py
@@ -6,7 +6,7 @@ from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.executors.pool import PoolExecutor
+from apscheduler.executors.pool import ProcessPoolExecutor
def tick():
@@ -15,7 +15,7 @@ def tick():
if __name__ == '__main__':
scheduler = BlockingScheduler()
- scheduler.add_executor(PoolExecutor('process'))
+ scheduler.add_executor(ProcessPoolExecutor())
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
diff --git a/tests/test_executors.py b/tests/test_executors.py
index a2048e4..c3c9ae3 100644
--- a/tests/test_executors.py
+++ b/tests/test_executors.py
@@ -3,7 +3,7 @@ import time
import pytest
from apscheduler.executors.base import MaxInstancesReachedError
-from apscheduler.executors.pool import PoolExecutor
+from apscheduler.executors.pool import BasePoolExecutor
try:
@@ -19,9 +19,21 @@ def mock_scheduler():
return scheduler_
-@pytest.fixture(params=['thread', 'process'])
+@pytest.fixture
+def threadpoolexecutor(request):
+ from apscheduler.executors.pool import ThreadPoolExecutor
+ return ThreadPoolExecutor()
+
+
+@pytest.fixture
+def processpoolexecutor(request):
+ from apscheduler.executors.pool import ProcessPoolExecutor
+ return ProcessPoolExecutor()
+
+
+@pytest.fixture(params=[threadpoolexecutor, processpoolexecutor], ids=['threadpool', 'processpool'])
def executor(request, mock_scheduler):
- executor_ = PoolExecutor(request.param)
+ executor_ = request.param(request)
executor_.start(mock_scheduler, 'dummy')
request.addfinalizer(executor_.shutdown)
return executor_
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 7197821..aa8a932 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -7,7 +7,7 @@ import pytest
import six
from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError
-from apscheduler.executors.pool import PoolExecutor
+from apscheduler.executors.debug import DebugExecutor
from apscheduler.job import Job
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.jobstores.memory import MemoryJobStore
@@ -254,7 +254,7 @@ class TestBaseScheduler(object):
@pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False'])
def test_add_executor(self, scheduler, stopped):
scheduler._stopped = stopped
- executor = PoolExecutor('debug')
+ executor = DebugExecutor()
executor.start = MagicMock()
scheduler.add_executor(executor)
@@ -265,13 +265,13 @@ class TestBaseScheduler(object):
assert executor.start.call_count == 0
def test_add_executor_already_exists(self, scheduler):
- executor = PoolExecutor('debug')
+ executor = DebugExecutor()
scheduler.add_executor(executor)
exc = pytest.raises(KeyError, scheduler.add_executor, executor)
assert exc.value.message == 'This scheduler already has an executor by the alias of "default"'
def test_remove_executor(self, scheduler):
- scheduler.add_executor(PoolExecutor('debug'), 'foo')
+ scheduler.add_executor(DebugExecutor(), 'foo')
scheduler._dispatch_event = MagicMock()
scheduler.remove_executor('foo')
@@ -821,7 +821,7 @@ class TestProcessJobs(object):
class SchedulerImplementationTestBase(object):
@pytest.fixture(autouse=True)
def executor(self, scheduler):
- scheduler.add_executor(PoolExecutor('debug'))
+ scheduler.add_executor(DebugExecutor())
@pytest.fixture
def start_scheduler(self, request, scheduler):