summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-01-17 18:08:52 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-01-17 18:08:52 +0200
commit616b2e60a03984e059bcbfc8200a9f4a9ce75457 (patch)
treef8be6f3c50bee22995ad29a42f0bcf23b70840fb
parenta114d56baa9acbe4f4113a4eef7040619bd2e4be (diff)
downloadapscheduler-616b2e60a03984e059bcbfc8200a9f4a9ce75457.tar.gz
Fixed broken process pool executor issue
Fixes #362.
-rw-r--r--apscheduler/executors/pool.py9
-rw-r--r--docs/versionhistory.rst2
-rw-r--r--tests/test_executors.py34
3 files changed, 43 insertions, 2 deletions
diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py
index 2f4ef45..302d4bd 100644
--- a/apscheduler/executors/pool.py
+++ b/apscheduler/executors/pool.py
@@ -1,4 +1,5 @@
from abc import abstractmethod
+from concurrent.futures.process import BrokenProcessPool
import concurrent.futures
from apscheduler.executors.base import BaseExecutor, run_job
@@ -19,7 +20,13 @@ class BasePoolExecutor(BaseExecutor):
else:
self._run_job_success(job.id, f.result())
- f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+ try:
+ f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+ except BrokenProcessPool:
+ self._logger.warning('Process pool is broken; replacing pool with a fresh instance')
+ self._pool = self._pool.__class__(self._pool._max_workers)
+ f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
+
f.add_done_callback(callback)
def shutdown(self, wait=True):
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 772b520..8cca199 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -29,6 +29,8 @@ APScheduler, see the :doc:`migration section <migration>`.
one search condition
* Fixed a problem where bound methods added as jobs via textual references were called with an
unwanted extra ``self`` argument (PR by Pengjie Song)
+* Fixed ``BrokenPoolError`` in ``ProcessPoolExecutor`` so that it will automatically replace the
+ broken pool with a fresh instance
3.6.3
diff --git a/tests/test_executors.py b/tests/test_executors.py
index 16defd2..dbc8b36 100644
--- a/tests/test_executors.py
+++ b/tests/test_executors.py
@@ -2,14 +2,18 @@ from datetime import datetime
from threading import Event
from types import TracebackType
import gc
+import os
+import signal
import time
import pytest
from pytz import UTC
-from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED
+from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
from apscheduler.executors.base import MaxInstancesReachedError, run_job
+from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.job import Job
+from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.base import BaseScheduler
try:
@@ -144,3 +148,31 @@ def test_run_job_memory_leak():
foos = [x for x in gc.get_objects() if type(x) is FooBar]
assert len(foos) == 0
+
+
+def test_broken_pool():
+ def listener(evt):
+ nonlocal pid
+ pid = evt.retval
+ event.set()
+
+ pid = None
+ event = Event()
+ scheduler = BackgroundScheduler(executors={'default': ProcessPoolExecutor(1)})
+ scheduler.add_listener(listener, EVENT_JOB_EXECUTED)
+ scheduler.add_job(os.getpid, 'date', run_date=datetime.now(UTC))
+ scheduler.start()
+
+ event.wait(3)
+ killed_pid = pid
+ os.kill(pid, signal.SIGTERM)
+ try:
+ os.waitpid(pid, 0)
+ except OSError:
+ pass
+
+ event.clear()
+ scheduler.add_job(os.getpid, 'date', run_date=datetime.now(UTC))
+ event.wait(3)
+ assert pid != killed_pid
+ scheduler.shutdown(True)