summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <ben.c.schubert@gmail.com>2019-03-22 18:39:28 +0000
committerBenjamin Schubert <ben.c.schubert@gmail.com>2019-03-22 18:39:28 +0000
commit6d9fe61f57de76f3462a4d65e53810e4609312a2 (patch)
tree666629b7e5e4a1681a46d0010074d9ceb04b515a
parent5754ac64c01321b25aa84acbdafae4efb62db240 (diff)
downloadbuildstream-bschubert/refactor-pipeline.tar.gz
-rw-r--r--buildstream/_pipeline.py6
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py28
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py45
-rw-r--r--buildstream/_scheduler/queues/queue.py119
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py11
-rw-r--r--buildstream/_scheduler/scheduler.py49
-rw-r--r--buildstream/_stream.py28
-rw-r--r--buildstream/element.py38
8 files changed, 153 insertions, 171 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 5dec9001d..524eb5c97 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -31,7 +31,9 @@ from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from . import Scope, Consistency
from ._project import ProjectRefStorage
-
+import sys
+def debug(*args):
+ print("➤➤➤", *args, file=sys.stderr)
# PipelineSelection()
#
@@ -134,6 +136,7 @@ class Pipeline():
def resolve_elements(self, targets):
with self._context.timed_activity("Resolving cached state", silent_nested=True):
for element in self.dependencies(targets, Scope.ALL):
+ debug("Element: ", str(element))
# Preflight
element._preflight()
@@ -220,6 +223,7 @@ class Pipeline():
elements = list(self.dependencies(targets, scope))
+ debug("GET_SELECTION", mode, [str(e) for e in elements])
return elements
# except_elements():
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index aa489f381..a5712f911 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -66,22 +66,30 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
+ def push(self, element):
+ if element._cached_success():
+ self.skip(element)
+
+ assert element._buildable()
+
+ super().push(element)
+
def process(self, element):
return element._assemble()
- def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
+ # def status(self, element):
+ # if not element._is_required():
+ # # Artifact is not currently required but it may be requested later.
+ # # Keep it in the queue.
+ # return QueueStatus.WAIT
- if element._cached_success():
- return QueueStatus.SKIP
+ # if element._cached_success():
+ # return QueueStatus.SKIP
- if not element._buildable():
- return QueueStatus.WAIT
+ # if not element._buildable():
+ # return QueueStatus.WAIT
- return QueueStatus.READY
+ # return QueueStatus.READY
def _check_cache_size(self, job, element, artifact_size):
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 546c65b65..c49cbaf79 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -35,8 +35,8 @@ class FetchQueue(Queue):
complete_name = "Fetched"
resources = [ResourceType.DOWNLOAD]
- def __init__(self, scheduler, skip_cached=False, fetch_original=False):
- super().__init__(scheduler)
+ def __init__(self, scheduler, next_queue=None, skip_cached=False, fetch_original=False):
+ super().__init__(scheduler, next_queue)
self._skip_cached = skip_cached
self._fetch_original = fetch_original
@@ -44,27 +44,36 @@ class FetchQueue(Queue):
def process(self, element):
element._fetch(fetch_original=self._fetch_original)
- def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
+ def push(self, element):
+ assert element._is_required()
+ assert element._can_query_cache()
- # Optionally skip elements that are already in the artifact cache
- if self._skip_cached:
- if not element._can_query_cache():
- return QueueStatus.WAIT
+ if element._cached() or not element._should_fetch(self._fetch_original):
+ self.skip(element)
- if element._cached():
- return QueueStatus.SKIP
+ super().push(element)
- # This will automatically skip elements which
- # have no sources.
+ # def status(self, element):
+ # if not element._is_required():
+ # # Artifact is not currently required but it may be requested later.
+ # # Keep it in the queue.
+ # return QueueStatus.WAIT
- if not element._should_fetch(self._fetch_original):
- return QueueStatus.SKIP
+ # # Optionally skip elements that are already in the artifact cache
+ # if self._skip_cached:
+ # if not element._can_query_cache():
+ # return QueueStatus.WAIT
- return QueueStatus.READY
+ # if element._cached():
+ # return QueueStatus.SKIP
+
+ # # This will automatically skip elements which
+ # # have no sources.
+
+ # if not element._should_fetch(self._fetch_original):
+ # return QueueStatus.SKIP
+
+ # return QueueStatus.READY
def done(self, _, element, result, status):
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 1efcffc16..5ca80f823 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -31,7 +31,11 @@ from ..resources import ResourceType
# BuildStream toplevel imports
from ..._exceptions import BstError, set_last_task_error
from ..._message import Message, MessageType
+from ...types import _UniquePriorityQueue
+import sys
+def debug(*args):
+ print("➤➤➤", *args, file=sys.stderr)
# Queue status for a given element
#
@@ -59,7 +63,7 @@ class Queue():
complete_name = None
resources = [] # Resources this queues' jobs want
- def __init__(self, scheduler):
+ def __init__(self, scheduler, next_queue=None):
#
# Public members
@@ -73,10 +77,11 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._wait_queue = deque() # Ready / Waiting elements
- self._done_queue = deque() # Processed / Skipped elements
+ self._queue = _UniquePriorityQueue() # Ready / Waiting elements
self._max_retries = 0
+ self._next_queue = next_queue
+
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
@@ -101,7 +106,7 @@ class Queue():
#
#
def process(self, element):
- pass
+ raise NotImplementedError()
# status()
#
@@ -114,7 +119,7 @@ class Queue():
# (QueueStatus): The element status
#
def status(self, element):
- return QueueStatus.READY
+ raise NotImplementedError()
# done()
#
@@ -140,86 +145,34 @@ class Queue():
# Args:
# elts (list): A list of Elements
#
- def enqueue(self, elts):
- if not elts:
- return
-
- # Place skipped elements on the done queue right away.
- #
- # The remaining ready and waiting elements must remain in the
- # same queue, and ready status must be determined at the moment
- # which the scheduler is asking for the next job.
- #
- skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
- wait = [elt for elt in elts if elt not in skip]
-
- self.skipped_elements.extend(skip) # Public record of skipped elements
- self._done_queue.extend(skip) # Elements to be processed
- self._wait_queue.extend(wait) # Elements eligible to be dequeued
-
- # dequeue()
- #
- # A generator which dequeues the elements which
- # are ready to exit the queue.
- #
- # Yields:
- # (Element): Elements being dequeued
- #
- def dequeue(self):
- while self._done_queue:
- yield self._done_queue.popleft()
-
- # dequeue_ready()
- #
- # Reports whether any elements can be promoted to other queues
- #
- # Returns:
- # (bool): Whether there are elements ready
- #
- def dequeue_ready(self):
- return any(self._done_queue)
+ def push(self, element):
+ debug("Adding element", element, "to", self.action_name)
+ self._queue.push(element._pipeline_id, element)
- # harvest_jobs()
- #
- # Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and creating as many jobs for which resources
- # can be reserved.
- #
- # Returns:
- # ([Job]): A list of jobs which can be run now
- #
- def harvest_jobs(self):
- unready = []
- ready = []
+ def skip(self, element):
+ self.skipped_elements.append(element)
+ if self._next_queue:
+ self._next_queue.push(element)
- while self._wait_queue:
- if not self._resources.reserve(self.resources, peek=True):
- break
+ def pop(self):
+ debug("Popping", self.action_name, [(str(x[1]), x[1]._unique_id) for x in self._queue._heap])
- element = self._wait_queue.popleft()
- status = self.status(element)
+ if not self._resources.reserve(self.resources) or not self._queue:
+ raise IndexError()
- if status == QueueStatus.WAIT:
- unready.append(element)
- elif status == QueueStatus.SKIP:
- self._done_queue.append(element)
- self.skipped_elements.append(element)
- else:
- reserved = self._resources.reserve(self.resources)
- assert reserved
- ready.append(element)
-
- self._wait_queue.extendleft(unready)
-
- return [
- ElementJob(self._scheduler, self.action_name,
- self._element_log_path(element),
- element=element, queue=self,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
- for element in ready
- ]
+ else:
+ element = self._queue.pop()
+
+ return ElementJob(
+ self._scheduler,
+ self.action_name,
+ self._element_log_path(element),
+ element=element,
+ queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries,
+ )
#####################################################
# Private Methods #
@@ -301,8 +254,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
- # All elements get placed on the done queue for later processing.
- self._done_queue.append(element)
+ if self._next_queue:
+ self._next_queue.push(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED or job.get_terminated():
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index d7e6546f3..ae4106f00 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -38,15 +38,12 @@ class TrackQueue(Queue):
def process(self, element):
return element._track()
- def status(self, element):
- # We can skip elements entirely if they have no sources.
- if not list(element.sources()):
-
- # But we still have to mark them as tracked
+ def push(self, element):
+ if not any(element.sources()):
element._tracking_done()
- return QueueStatus.SKIP
+ self.skip(element)
- return QueueStatus.READY
+ super().push(element)
def done(self, _, element, result, status):
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 176900b33..b7cd5356d 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -32,6 +32,11 @@ from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._profile import Topics, profile_start, profile_end
+import sys
+def debug(*args):
+ print("➤➤➤", *args, file=sys.stderr)
+
+
# A decent return code for Scheduler.run()
class SchedStatus():
SUCCESS = 0
@@ -429,42 +434,14 @@ class Scheduler():
# and process anything that is ready.
#
def _sched_queue_jobs(self):
- ready = []
- process_queues = True
-
- while self._queue_jobs and process_queues:
-
- # Pull elements forward through queues
- elements = []
- for queue in self.queues:
- queue.enqueue(elements)
- elements = list(queue.dequeue())
-
- # Kickoff whatever processes can be processed at this time
- #
- # We start by queuing from the last queue first, because
- # we want to give priority to queues later in the
- # scheduling process in the case that multiple queues
- # share the same token type.
- #
- # This avoids starvation situations where we dont move on
- # to fetch tasks for elements which failed to pull, and
- # thus need all the pulls to complete before ever starting
- # a build
- ready.extend(chain.from_iterable(
- q.harvest_jobs() for q in reversed(self.queues)
- ))
-
- # harvest_jobs() may have decided to skip some jobs, making
- # them eligible for promotion to the next queue as a side effect.
- #
- # If that happens, do another round.
- process_queues = any(q.dequeue_ready() for q in self.queues)
-
- # Spawn the jobs
- #
- for job in ready:
- self._spawn_job(job)
+ debug("Scheduling queues")
+ for queue in self.queues:
+ try:
+ while True:
+ job = queue.pop()
+ self._spawn_job(job)
+ except IndexError:
+ pass
# _sched()
#
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 262b38852..87c2d172f 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -41,6 +41,9 @@ from .types import _KeyStrength
from . import utils, _yaml, _site
from . import Scope, Consistency
+import sys
+def debug(*args):
+ print("➤➤➤", *args, file=sys.stderr)
# Stream()
#
@@ -152,6 +155,7 @@ class Stream():
isolate=False,
command=None,
usebuildtree=None):
+ raise NotImplementedError()
# Assert we have everything we need built, unless the directory is specified
# in which case we just blindly trust the directory, using the element
@@ -258,8 +262,10 @@ class Stream():
if self._artifacts.has_fetch_remotes():
self._add_queue(PullQueue(self._scheduler))
- self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
- self._add_queue(BuildQueue(self._scheduler))
+ build_queue = BuildQueue(self._scheduler)
+
+ self._add_queue(FetchQueue(self._scheduler, next_queue=build_queue, skip_cached=True))
+ self._add_queue(build_queue)
if self._artifacts.has_push_remotes():
self._add_queue(PushQueue(self._scheduler))
@@ -359,7 +365,6 @@ class Stream():
selection=PipelineSelection.NONE,
ignore_junction_targets=False,
remote=None):
-
use_config = True
if remote:
use_config = False
@@ -400,6 +405,7 @@ class Stream():
selection=PipelineSelection.NONE,
ignore_junction_targets=False,
remote=None):
+ raise NotImplementedError()
use_config = True
if remote:
@@ -1022,6 +1028,8 @@ class Stream():
rewritable=rewritable,
fetch_subprojects=fetch_subprojects)
+ debug("Elements order", [str(e) for e in elements])
+
# Obtain the ArtifactElement objects
artifacts = [self._project.create_artifact_element(ref) for ref in target_artifacts]
@@ -1093,6 +1101,8 @@ class Stream():
selected,
except_elements)
+ debug("SELECTED", [str(e) for e in selected])
+
# Set the "required" artifacts that should not be removed
# while this pipeline is active
#
@@ -1107,6 +1117,10 @@ class Stream():
itertools.chain.from_iterable(
[element.sources() for element in required_elements()]))
+ counter = itertools.count()
+ for element in selected:
+ element._pipeline_id = next(counter)
+ debug("Setting id to", element._pipeline_id, "for", element)
if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
# others are requested dynamically as needed.
@@ -1153,7 +1167,10 @@ class Stream():
def _enqueue_plan(self, plan, *, queue=None):
queue = queue or self._first_non_track_queue
- queue.enqueue(plan)
+ debug("Plan is: ", [str(e) for e in plan])
+
+ for element in plan:
+ queue.push(element)
self.session_elements += plan
# _run()
@@ -1188,10 +1205,11 @@ class Stream():
# fetch_original (Bool): Whether to fetch original unstaged
#
def _fetch(self, elements, *, track_elements=None, fetch_original=False):
-
if track_elements is None:
track_elements = []
+ debug("FETCH", [str(e) for e in elements], [str(e) for e in track_elements])
+
# Subtract the track elements from the fetch elements, they will be added separately
fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
diff --git a/buildstream/element.py b/buildstream/element.py
index f1f0273f6..9d728bfd6 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -963,9 +963,36 @@ class Element(Plugin):
if meta in cls.__instantiated_elements:
return cls.__instantiated_elements[meta]
+ build_deps = [
+ Element._new_from_meta(meta_dep)
+ for meta_dep in meta.build_dependencies
+ ]
+
+ runtime_deps = [
+ Element._new_from_meta(meta_dep)
+ for meta_dep in meta.dependencies
+ ]
+
+ # # Instantiate dependencies
+ # for meta_dep in meta.dependencies:
+ # dependency = Element._new_from_meta(meta_dep)
+ # element.__runtime_dependencies.append(dependency)
+ # dependency.__reverse_dependencies.add(element)
+
+ # for meta_dep in meta.build_dependencies:
+ # dependency = Element._new_from_meta(meta_dep)
+ # element.__build_dependencies.append(dependency)
+ # dependency.__reverse_dependencies.add(element)
+
element = meta.project.create_element(meta, first_pass=meta.first_pass)
cls.__instantiated_elements[meta] = element
+ element.__runtime_dependencies = runtime_deps
+ element.__build_dependencies = build_deps
+
+ for dep in chain(build_deps, runtime_deps):
+ dep.__reverse_dependencies.add(element)
+
# Instantiate sources and generate their keys
for meta_source in meta.sources:
meta_source.first_pass = meta.kind == "junction"
@@ -979,17 +1006,6 @@ class Element(Plugin):
if redundant_ref is not None:
cls.__redundant_source_refs.append((source, redundant_ref))
- # Instantiate dependencies
- for meta_dep in meta.dependencies:
- dependency = Element._new_from_meta(meta_dep)
- element.__runtime_dependencies.append(dependency)
- dependency.__reverse_dependencies.add(element)
-
- for meta_dep in meta.build_dependencies:
- dependency = Element._new_from_meta(meta_dep)
- element.__build_dependencies.append(dependency)
- dependency.__reverse_dependencies.add(element)
-
return element
# _clear_meta_elements_cache()