summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-01-23 10:54:08 +0000
committerJürg Billeter <j@bitron.ch>2018-01-23 12:03:37 +0000
commit68cef134a22f1df01f0c33c024a99555319f86df (patch)
tree22997885b961876b08a435b43f0c9e25313b548a /buildstream/_scheduler
parent4df5d035f62ecee62d679323e4e770625957827c (diff)
downloadbuildstream-68cef134a22f1df01f0c33c024a99555319f86df.tar.gz
_scheduler: Replace skip() and ready() with status()
Queue sometimes called ready() before skip() and sometimes it didn't. This change consolidates the two methods to ensure we always determine the correct status.
Diffstat (limited to 'buildstream/_scheduler')
-rw-r--r--buildstream/_scheduler/__init__.py2
-rw-r--r--buildstream/_scheduler/buildqueue.py13
-rw-r--r--buildstream/_scheduler/fetchqueue.py11
-rw-r--r--buildstream/_scheduler/pullqueue.py14
-rw-r--r--buildstream/_scheduler/pushqueue.py9
-rw-r--r--buildstream/_scheduler/queue.py47
-rw-r--r--buildstream/_scheduler/trackqueue.py9
7 files changed, 59 insertions, 46 deletions
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index 1ad6ec79b..14cdebf8e 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -18,7 +18,7 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
-from .queue import Queue, QueueType
+from .queue import Queue, QueueStatus, QueueType
from .fetchqueue import FetchQueue
from .trackqueue import TrackQueue
diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/buildqueue.py
index 6d1857313..74a7544c0 100644
--- a/buildstream/_scheduler/buildqueue.py
+++ b/buildstream/_scheduler/buildqueue.py
@@ -19,7 +19,7 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
-from . import Queue, QueueType
+from . import Queue, QueueStatus, QueueType
# A queue which assembles elements
@@ -34,14 +34,17 @@ class BuildQueue(Queue):
element._assemble()
return element._get_unique_id()
- def ready(self, element):
+ def status(self, element):
# state of dependencies may have changed, recalculate element state
element._update_state()
- return element._buildable()
+ if element._cached():
+ return QueueStatus.SKIP
- def skip(self, element):
- return element._cached()
+ if not element._buildable():
+ return QueueStatus.WAIT
+
+ return QueueStatus.READY
def done(self, element, result, returncode):
# Elements are cached after they are successfully assembled
diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py
index b284d5b35..ebc1fa17a 100644
--- a/buildstream/_scheduler/fetchqueue.py
+++ b/buildstream/_scheduler/fetchqueue.py
@@ -23,7 +23,7 @@
from .. import Consistency
# Local imports
-from . import Queue, QueueType
+from . import Queue, QueueStatus, QueueType
# A queue which fetches element sources
@@ -43,14 +43,17 @@ class FetchQueue(Queue):
for source in element.sources():
source._fetch()
- def skip(self, element):
+ def status(self, element):
# Optionally skip elements that are already in the artifact cache
if self.skip_cached and element._cached():
- return True
+ return QueueStatus.SKIP
# This will automatically skip elements which
# have no sources.
- return element._consistency() == Consistency.CACHED
+ if element._consistency() == Consistency.CACHED:
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
def done(self, element, result, returncode):
diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/pullqueue.py
index b8091e24d..3ab5ab7f4 100644
--- a/buildstream/_scheduler/pullqueue.py
+++ b/buildstream/_scheduler/pullqueue.py
@@ -23,7 +23,7 @@
from ..element import _KeyStrength
# Local imports
-from . import Queue, QueueType
+from . import Queue, QueueStatus, QueueType
# A queue which pulls element artifacts
@@ -38,20 +38,20 @@ class PullQueue(Queue):
# returns whether an artifact was downloaded or not
return element._pull()
- def skip(self, element):
+ def status(self, element):
if element._cached(strength=_KeyStrength.STRONG):
- return True
+ return QueueStatus.SKIP
elif element._remotely_cached(strength=_KeyStrength.STRONG):
# pull artifact using strong key
- return False
+ return QueueStatus.READY
elif element._cached():
- return True
+ return QueueStatus.SKIP
elif element._remotely_cached():
# pull artifact using weak key
- return False
+ return QueueStatus.READY
else:
# nothing to pull
- return True
+ return QueueStatus.SKIP
def done(self, element, result, returncode):
diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/pushqueue.py
index 6ffaf7949..6c7c95094 100644
--- a/buildstream/_scheduler/pushqueue.py
+++ b/buildstream/_scheduler/pushqueue.py
@@ -20,7 +20,7 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# Local imports
-from . import Queue, QueueType
+from . import Queue, QueueStatus, QueueType
# A queue which pushes element artifacts
@@ -35,8 +35,11 @@ class PushQueue(Queue):
# returns whether an artifact was uploaded or not
return element._push()
- def skip(self, element):
- return element._skip_push()
+ def status(self, element):
+ if element._skip_push():
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
def done(self, element, result, returncode):
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index 4642edaa7..46640677f 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -21,6 +21,7 @@
# System imports
from collections import deque
+from enum import Enum
# Local imports
from .job import Job
@@ -40,6 +41,20 @@ class QueueType():
PUSH = 3
+# Queue status for a given element
+#
+#
+class QueueStatus(Enum):
+ # The element is waiting for dependencies.
+ WAIT = 1
+
+ # The element can skip this queue.
+ SKIP = 2
+
+ # The element is ready for processing in this queue.
+ READY = 3
+
+
# Queue()
#
#
@@ -89,33 +104,18 @@ class Queue():
def process(self, element):
pass
- # ready()
- #
- # Abstract method for reporting whether an element
- # is ready for processing in this queue or not.
- #
- # Args:
- # element (Element): An element to process
- #
- # Returns:
- # (bool): Whether the element is ready for processing
- #
- def ready(self, element):
- return True
-
- # skip()
+ # status()
#
- # Abstract method for reporting whether an element
- # can be skipped for this phase.
+ # Abstract method for reporting the status of an element.
#
# Args:
# element (Element): An element to process
#
# Returns:
- # (bool): Whether the element can be skipped
+ # (QueueStatus): The element status
#
- def skip(self, element):
- return False
+ def status(self, element):
+ return QueueStatus.READY
# done()
#
@@ -149,7 +149,7 @@ class Queue():
# Place skipped elements directly on the done queue
elts = list(elts)
- skip = [elt for elt in elts if self.skip(elt)]
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
wait = [elt for elt in elts if elt not in skip]
self.wait_queue.extend(wait)
@@ -167,11 +167,12 @@ class Queue():
while len(self.wait_queue) > 0 and scheduler.get_job_token(self.queue_type):
element = self.wait_queue.popleft()
- if not self.ready(element):
+ status = self.status(element)
+ if status == QueueStatus.WAIT:
scheduler.put_job_token(self.queue_type)
unready.append(element)
continue
- elif self.skip(element):
+ elif status == QueueStatus.SKIP:
scheduler.put_job_token(self.queue_type)
self.done_queue.append(element)
self.skipped_elements.append(element)
diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/trackqueue.py
index 3f35aff47..f78479c49 100644
--- a/buildstream/_scheduler/trackqueue.py
+++ b/buildstream/_scheduler/trackqueue.py
@@ -27,7 +27,7 @@ from ..plugin import _plugin_lookup
from .. import _yaml
# Local imports
-from . import Queue, QueueType
+from . import Queue, QueueStatus, QueueType
# A queue which tracks sources
@@ -45,9 +45,12 @@ class TrackQueue(Queue):
def process(self, element):
return element._track()
- def skip(self, element):
+ def status(self, element):
# We can skip elements entirely if they have no sources.
- return len(list(element.sources())) == 0
+ if len(list(element.sources())) == 0:
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
def done(self, element, result, returncode):