summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-24 00:05:17 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-24 00:05:31 +0300
commit7c7faaa69f9b84945f9eb02ce09af65c3b9d6a15 (patch)
tree52b3d464ce68b4ed7611b1e0da177d742f067b96
parent642d0ee75d7bd5b475cdbd86efcefec7d026dbb3 (diff)
downloadapscheduler-7c7faaa69f9b84945f9eb02ce09af65c3b9d6a15.tar.gz
Fixed MongoDB data store running blocking operations on the event loop thread
-rw-r--r--src/apscheduler/datastores/mongodb.py59
1 files changed, 35 insertions, 24 deletions
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 0846e14..1846353 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -9,6 +9,7 @@ from uuid import UUID
import attrs
import pymongo
+from anyio import to_thread
from attrs.validators import instance_of
from bson import CodecOptions, UuidRepresentation
from bson.codec_options import TypeEncoder, TypeRegistry
@@ -105,11 +106,26 @@ class MongoDBDataStore(BaseExternalDataStore):
client = MongoClient(uri)
return cls(client, **options)
+ def _initialize(self) -> None:
+ with self.client.start_session() as session:
+ if self.start_from_scratch:
+ self._tasks.delete_many({}, session=session)
+ self._schedules.delete_many({}, session=session)
+ self._jobs.delete_many({}, session=session)
+ self._jobs_results.delete_many({}, session=session)
+
+ self._schedules.create_index("next_fire_time", session=session)
+ self._jobs.create_index("task_id", session=session)
+ self._jobs.create_index("created_at", session=session)
+ self._jobs.create_index("tags", session=session)
+ self._jobs_results.create_index("finished_at", session=session)
+ self._jobs_results.create_index("expires_at", session=session)
+
async def start(
self, exit_stack: AsyncExitStack, event_broker: EventBroker
) -> None:
await super().start(exit_stack, event_broker)
- server_info = self.client.server_info()
+ server_info = await to_thread.run_sync(self.client.server_info)
if server_info["versionArray"] < [4, 0]:
raise RuntimeError(
f"MongoDB server must be at least v4.0; current version = "
@@ -117,30 +133,21 @@ class MongoDBDataStore(BaseExternalDataStore):
)
async for attempt in self._retry():
- with attempt, self.client.start_session() as session:
- if self.start_from_scratch:
- self._tasks.delete_many({}, session=session)
- self._schedules.delete_many({}, session=session)
- self._jobs.delete_many({}, session=session)
- self._jobs_results.delete_many({}, session=session)
-
- self._schedules.create_index("next_fire_time", session=session)
- self._jobs.create_index("task_id", session=session)
- self._jobs.create_index("created_at", session=session)
- self._jobs.create_index("tags", session=session)
- self._jobs_results.create_index("finished_at", session=session)
- self._jobs_results.create_index("expires_at", session=session)
+ with attempt:
+ await to_thread.run_sync(self._initialize)
async def add_task(self, task: Task) -> None:
async for attempt in self._retry():
with attempt:
- previous = self._tasks.find_one_and_update(
- {"_id": task.id},
- {
- "$set": task.marshal(self.serializer),
- "$setOnInsert": {"running_jobs": 0},
- },
- upsert=True,
+ previous = await to_thread.run_sync(
+ lambda: self._tasks.find_one_and_update(
+ {"_id": task.id},
+ {
+ "$set": task.marshal(self.serializer),
+ "$setOnInsert": {"running_jobs": 0},
+ },
+ upsert=True,
+ )
)
if previous:
@@ -151,7 +158,9 @@ class MongoDBDataStore(BaseExternalDataStore):
async def remove_task(self, task_id: str) -> None:
async for attempt in self._retry():
with attempt:
- if not self._tasks.find_one_and_delete({"_id": task_id}):
+ if not await to_thread.run_sync(
+ self._tasks.find_one_and_delete, {"_id": task_id}
+ ):
raise TaskLookupError(task_id)
await self._event_broker.publish(TaskRemoved(task_id=task_id))
@@ -159,8 +168,10 @@ class MongoDBDataStore(BaseExternalDataStore):
async def get_task(self, task_id: str) -> Task:
async for attempt in self._retry():
with attempt:
- document = self._tasks.find_one(
- {"_id": task_id}, projection=self._task_attrs
+ document = await to_thread.run_sync(
+ lambda: self._tasks.find_one(
+ {"_id": task_id}, projection=self._task_attrs
+ )
)
if not document: