summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-23 00:29:05 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-24 00:05:30 +0300
commit642d0ee75d7bd5b475cdbd86efcefec7d026dbb3 (patch)
tree246936c5aa962734d4a7bcd18437389c883256dd
parent631c78e7161cfe8457bf1121eb355e3a3d19c35d (diff)
downloadapscheduler-642d0ee75d7bd5b475cdbd86efcefec7d026dbb3.tar.gz
Fixed MongoDB data store retrying synchronously
-rw-r--r--src/apscheduler/datastores/mongodb.py32
1 files changed, 16 insertions, 16 deletions
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 4d47878..0846e14 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -132,7 +132,7 @@ class MongoDBDataStore(BaseExternalDataStore):
self._jobs_results.create_index("expires_at", session=session)
async def add_task(self, task: Task) -> None:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
previous = self._tasks.find_one_and_update(
{"_id": task.id},
@@ -149,7 +149,7 @@ class MongoDBDataStore(BaseExternalDataStore):
await self._event_broker.publish(TaskAdded(task_id=task.id))
async def remove_task(self, task_id: str) -> None:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
if not self._tasks.find_one_and_delete({"_id": task_id}):
raise TaskLookupError(task_id)
@@ -157,7 +157,7 @@ class MongoDBDataStore(BaseExternalDataStore):
await self._event_broker.publish(TaskRemoved(task_id=task_id))
async def get_task(self, task_id: str) -> Task:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
document = self._tasks.find_one(
{"_id": task_id}, projection=self._task_attrs
@@ -171,7 +171,7 @@ class MongoDBDataStore(BaseExternalDataStore):
return task
async def get_tasks(self) -> list[Task]:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
tasks: list[Task] = []
for document in self._tasks.find(
@@ -184,7 +184,7 @@ class MongoDBDataStore(BaseExternalDataStore):
async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
filters = {"_id": {"$in": list(ids)}} if ids is not None else {}
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
schedules: list[Schedule] = []
cursor = self._schedules.find(filters).sort("_id")
@@ -209,14 +209,14 @@ class MongoDBDataStore(BaseExternalDataStore):
document = schedule.marshal(self.serializer)
document["_id"] = document.pop("id")
try:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
self._schedules.insert_one(document)
except DuplicateKeyError:
if conflict_policy is ConflictPolicy.exception:
raise ConflictingIdError(schedule.id) from None
elif conflict_policy is ConflictPolicy.replace:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
self._schedules.replace_one(
{"_id": schedule.id}, document, True
@@ -234,7 +234,7 @@ class MongoDBDataStore(BaseExternalDataStore):
async def remove_schedules(self, ids: Iterable[str]) -> None:
filters = {"_id": {"$in": list(ids)}} if ids is not None else {}
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt, self.client.start_session() as session:
cursor = self._schedules.find(
filters, projection=["_id"], session=session
@@ -247,7 +247,7 @@ class MongoDBDataStore(BaseExternalDataStore):
await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id))
async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt, self.client.start_session() as session:
schedules: list[Schedule] = []
cursor = (
@@ -324,7 +324,7 @@ class MongoDBDataStore(BaseExternalDataStore):
finished_schedule_ids.append(schedule.id)
if requests:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt, self.client.start_session() as session:
self._schedules.bulk_write(
requests, ordered=False, session=session
@@ -340,7 +340,7 @@ class MongoDBDataStore(BaseExternalDataStore):
await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id))
async def get_next_schedule_run_time(self) -> datetime | None:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
document = self._schedules.find_one(
{"next_run_time": {"$ne": None}},
@@ -356,7 +356,7 @@ class MongoDBDataStore(BaseExternalDataStore):
async def add_job(self, job: Job) -> None:
document = job.marshal(self.serializer)
document["_id"] = document.pop("id")
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
self._jobs.insert_one(document)
@@ -370,7 +370,7 @@ class MongoDBDataStore(BaseExternalDataStore):
async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
filters = {"_id": {"$in": list(ids)}} if ids is not None else {}
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
jobs: list[Job] = []
cursor = self._jobs.find(filters).sort("_id")
@@ -389,7 +389,7 @@ class MongoDBDataStore(BaseExternalDataStore):
return jobs
async def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt, self.client.start_session() as session:
cursor = self._jobs.find(
{
@@ -466,7 +466,7 @@ class MongoDBDataStore(BaseExternalDataStore):
async def release_job(
self, worker_id: str, task_id: str, result: JobResult
) -> None:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt, self.client.start_session() as session:
# Record the job result
if result.expires_at > result.finished_at:
@@ -483,7 +483,7 @@ class MongoDBDataStore(BaseExternalDataStore):
self._jobs.delete_one({"_id": result.job_id}, session=session)
async def get_job_result(self, job_id: UUID) -> JobResult | None:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
document = self._jobs_results.find_one_and_delete({"_id": job_id})