summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Dawson <phil.dawson@codethink.co.uk>2018-11-21 09:10:05 +0000
committerPhil Dawson <phil.dawson@codethink.co.uk>2018-11-21 09:10:05 +0000
commit34395d069f1fe78c0d50af6255d2a62c68fbde95 (patch)
tree7ba181f62b7eef3248dd35c1554b8f77dc93c227
parentea2de561bb06683f9c2742e2ba46386c17788563 (diff)
downloadbuildstream-phil/712-priority-queue.tar.gz
WIP: use priority queue in schedularphil/712-priority-queue
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py3
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py3
-rw-r--r--buildstream/_scheduler/jobs/elementjob.py10
-rw-r--r--buildstream/_scheduler/jobs/job.py4
-rw-r--r--buildstream/_scheduler/scheduler.py5
-rwxr-xr-xsetup.py1
6 files changed, 24 insertions, 2 deletions
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index d46fd4c16..17ccb0a36 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -39,3 +39,6 @@ class CacheSizeJob(Job):
def child_process_data(self):
return {}
+
+ def key(self):
+ return (100, 'cache-size')
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index 8bdbba0ed..3e860e28c 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -32,3 +32,6 @@ class CleanupJob(Job):
def parent_complete(self, success, result):
if success:
self._artifacts.set_cache_size(result)
+
+ def key(self):
+ return (0, 'cleanup')
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c062f..1d6305cc5 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -22,6 +22,13 @@ from ..._message import Message, MessageType
from .job import Job
+_ACTIONS = {
+ "Build": 10,
+ "Fetch": 20,
+ "Pull": 30,
+ "Push": 40,
+ "Track": 50,
+}
# ElementJob()
#
@@ -113,3 +120,6 @@ class ElementJob(Job):
data['workspace'] = workspace.to_dict()
return data
+
+ def key(self):
+ return (_ACTIONS.get(self.action_name, 100), self._element.name)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 60ae0d001..ff0ea8340 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -353,6 +353,10 @@ class Job():
def child_process_data(self):
return {}
+ def key(self):
+ raise ImplError("Job '{kind}' does not implement key()"
+ .format(kind=type(self).__name__))
+
#######################################################
# Local Private Methods #
#######################################################
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c7308e..6b849f359 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -25,6 +25,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+from sortedcontainers import SortedList
# Local imports
from .resources import Resources, ResourceType
@@ -72,7 +73,7 @@ class Scheduler():
# Public members
#
self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = [] # Jobs waiting for resources
+ self.waiting_jobs = SortedList([], key=lambda job: job.key()) # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -222,7 +223,7 @@ class Scheduler():
#
def schedule_jobs(self, jobs):
for job in jobs:
- self.waiting_jobs.append(job)
+ self.waiting_jobs.add(job)
# job_completed():
#
diff --git a/setup.py b/setup.py
index 76610f0ef..95d920406 100755
--- a/setup.py
+++ b/setup.py
@@ -343,6 +343,7 @@ setup(name='BuildStream',
'jinja2 >= 2.10',
'protobuf >= 3.5',
'grpcio >= 1.10',
+ 'sortedcontainers >= 1.5.7',
],
entry_points=bst_install_entry_points,
tests_require=dev_requires,