summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler/scheduler.py')
-rw-r--r--buildstream/_scheduler/scheduler.py49
1 files changed, 13 insertions, 36 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 176900b33..b7cd5356d 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -32,6 +32,11 @@ from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._profile import Topics, profile_start, profile_end
+import sys
+def debug(*args):
+ print("➤➤➤", *args, file=sys.stderr)
+
+
# A decent return code for Scheduler.run()
class SchedStatus():
SUCCESS = 0
@@ -429,42 +434,14 @@ class Scheduler():
# and process anything that is ready.
#
def _sched_queue_jobs(self):
- ready = []
- process_queues = True
-
- while self._queue_jobs and process_queues:
-
- # Pull elements forward through queues
- elements = []
- for queue in self.queues:
- queue.enqueue(elements)
- elements = list(queue.dequeue())
-
- # Kickoff whatever processes can be processed at this time
- #
- # We start by queuing from the last queue first, because
- # we want to give priority to queues later in the
- # scheduling process in the case that multiple queues
- # share the same token type.
- #
- # This avoids starvation situations where we dont move on
- # to fetch tasks for elements which failed to pull, and
- # thus need all the pulls to complete before ever starting
- # a build
- ready.extend(chain.from_iterable(
- q.harvest_jobs() for q in reversed(self.queues)
- ))
-
- # harvest_jobs() may have decided to skip some jobs, making
- # them eligible for promotion to the next queue as a side effect.
- #
- # If that happens, do another round.
- process_queues = any(q.dequeue_ready() for q in self.queues)
-
- # Spawn the jobs
- #
- for job in ready:
- self._spawn_job(job)
+ debug("Scheduling queues")
+ for queue in self.queues:
+ try:
+ while True:
+ job = queue.pop()
+ self._spawn_job(job)
+ except IndexError:
+ pass
# _sched()
#