From 6d9fe61f57de76f3462a4d65e53810e4609312a2 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Fri, 22 Mar 2019 18:39:28 +0000 Subject: WIP --- buildstream/_pipeline.py | 6 +- buildstream/_scheduler/queues/buildqueue.py | 28 ++++--- buildstream/_scheduler/queues/fetchqueue.py | 45 ++++++----- buildstream/_scheduler/queues/queue.py | 119 +++++++++------------------- buildstream/_scheduler/queues/trackqueue.py | 11 +-- buildstream/_scheduler/scheduler.py | 49 +++--------- buildstream/_stream.py | 28 +++++-- buildstream/element.py | 38 ++++++--- 8 files changed, 153 insertions(+), 171 deletions(-) diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 5dec9001d..524eb5c97 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -31,7 +31,9 @@ from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end from . import Scope, Consistency from ._project import ProjectRefStorage - +import sys +def debug(*args): + print("➤➤➤", *args, file=sys.stderr) # PipelineSelection() # @@ -134,6 +136,7 @@ class Pipeline(): def resolve_elements(self, targets): with self._context.timed_activity("Resolving cached state", silent_nested=True): for element in self.dependencies(targets, Scope.ALL): + debug("Element: ", str(element)) # Preflight element._preflight() @@ -220,6 +223,7 @@ class Pipeline(): elements = list(self.dependencies(targets, scope)) + debug("GET_SELECTION", mode, [str(e) for e in elements]) return elements # except_elements(): diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index aa489f381..a5712f911 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -66,22 +66,30 @@ class BuildQueue(Queue): return super().enqueue(to_queue) + def push(self, element): + if element._cached_success(): + self.skip(element) + + assert element._buildable() + + super().push(element) + def process(self, element): return element._assemble() - def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT + # def status(self, element): + # if not element._is_required(): + # # Artifact is not currently required but it may be requested later. + # # Keep it in the queue. + # return QueueStatus.WAIT - if element._cached_success(): - return QueueStatus.SKIP + # if element._cached_success(): + # return QueueStatus.SKIP - if not element._buildable(): - return QueueStatus.WAIT + # if not element._buildable(): + # return QueueStatus.WAIT - return QueueStatus.READY + # return QueueStatus.READY def _check_cache_size(self, job, element, artifact_size): diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 546c65b65..c49cbaf79 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -35,8 +35,8 @@ class FetchQueue(Queue): complete_name = "Fetched" resources = [ResourceType.DOWNLOAD] - def __init__(self, scheduler, skip_cached=False, fetch_original=False): - super().__init__(scheduler) + def __init__(self, scheduler, next_queue=None, skip_cached=False, fetch_original=False): + super().__init__(scheduler, next_queue) self._skip_cached = skip_cached self._fetch_original = fetch_original @@ -44,27 +44,36 @@ class FetchQueue(Queue): def process(self, element): element._fetch(fetch_original=self._fetch_original) - def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT + def push(self, element): + assert element._is_required() + assert element._can_query_cache() - # Optionally skip elements that are already in the artifact cache - if self._skip_cached: - if not element._can_query_cache(): - return QueueStatus.WAIT + if element._cached() or not element._should_fetch(self._fetch_original): + self.skip(element) - if element._cached(): - return QueueStatus.SKIP + super().push(element) - # This will automatically skip elements which - # have no sources. + # def status(self, element): + # if not element._is_required(): + # # Artifact is not currently required but it may be requested later. + # # Keep it in the queue. + # return QueueStatus.WAIT - if not element._should_fetch(self._fetch_original): - return QueueStatus.SKIP + # # Optionally skip elements that are already in the artifact cache + # if self._skip_cached: + # if not element._can_query_cache(): + # return QueueStatus.WAIT - return QueueStatus.READY + # if element._cached(): + # return QueueStatus.SKIP + + # # This will automatically skip elements which + # # have no sources. + + # if not element._should_fetch(self._fetch_original): + # return QueueStatus.SKIP + + # return QueueStatus.READY def done(self, _, element, result, status): diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 1efcffc16..5ca80f823 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -31,7 +31,11 @@ from ..resources import ResourceType # BuildStream toplevel imports from ..._exceptions import BstError, set_last_task_error from ..._message import Message, MessageType +from ...types import _UniquePriorityQueue +import sys +def debug(*args): + print("➤➤➤", *args, file=sys.stderr) # Queue status for a given element # @@ -59,7 +63,7 @@ class Queue(): complete_name = None resources = [] # Resources this queues' jobs want - def __init__(self, scheduler): + def __init__(self, scheduler, next_queue=None): # # Public members @@ -73,10 +77,11 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._wait_queue = deque() # Ready / Waiting elements - self._done_queue = deque() # Processed / Skipped elements + self._queue = _UniquePriorityQueue() # Ready / Waiting elements self._max_retries = 0 + self._next_queue = next_queue + # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None @@ -101,7 +106,7 @@ class Queue(): # # def process(self, element): - pass + raise NotImplementedError() # status() # @@ -114,7 +119,7 @@ class Queue(): # (QueueStatus): The element status # def status(self, element): - return QueueStatus.READY + raise NotImplementedError() # done() # @@ -140,86 +145,34 @@ class Queue(): # Args: # elts (list): A list of Elements # - def enqueue(self, elts): - if not elts: - return - - # Place skipped elements on the done queue right away. - # - # The remaining ready and waiting elements must remain in the - # same queue, and ready status must be determined at the moment - # which the scheduler is asking for the next job. - # - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] - - self.skipped_elements.extend(skip) # Public record of skipped elements - self._done_queue.extend(skip) # Elements to be processed - self._wait_queue.extend(wait) # Elements eligible to be dequeued - - # dequeue() - # - # A generator which dequeues the elements which - # are ready to exit the queue. - # - # Yields: - # (Element): Elements being dequeued - # - def dequeue(self): - while self._done_queue: - yield self._done_queue.popleft() - - # dequeue_ready() - # - # Reports whether any elements can be promoted to other queues - # - # Returns: - # (bool): Whether there are elements ready - # - def dequeue_ready(self): - return any(self._done_queue) + def push(self, element): + debug("Adding element", element, "to", self.action_name) + self._queue.push(element._pipeline_id, element) - # harvest_jobs() - # - # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and creating as many jobs for which resources - # can be reserved. - # - # Returns: - # ([Job]): A list of jobs which can be run now - # - def harvest_jobs(self): - unready = [] - ready = [] + def skip(self, element): + self.skipped_elements.append(element) + if self._next_queue: + self._next_queue.push(element) - while self._wait_queue: - if not self._resources.reserve(self.resources, peek=True): - break + def pop(self): + debug("Popping", self.action_name, [(str(x[1]), x[1]._unique_id) for x in self._queue._heap]) - element = self._wait_queue.popleft() - status = self.status(element) + if not self._resources.reserve(self.resources) or not self._queue: + raise IndexError() - if status == QueueStatus.WAIT: - unready.append(element) - elif status == QueueStatus.SKIP: - self._done_queue.append(element) - self.skipped_elements.append(element) - else: - reserved = self._resources.reserve(self.resources) - assert reserved - ready.append(element) - - self._wait_queue.extendleft(unready) - - return [ - ElementJob(self._scheduler, self.action_name, - self._element_log_path(element), - element=element, queue=self, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) - for element in ready - ] + else: + element = self._queue.pop() + + return ElementJob( + self._scheduler, + self.action_name, + self._element_log_path(element), + element=element, + queue=self, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries, + ) ##################################################### # Private Methods # @@ -301,8 +254,8 @@ class Queue(): detail=traceback.format_exc()) self.failed_elements.append(element) else: - # All elements get placed on the done queue for later processing. - self._done_queue.append(element) + if self._next_queue: + self._next_queue.push(element) # These lists are for bookkeeping purposes for the UI and logging. if status == JobStatus.SKIPPED or job.get_terminated(): diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index d7e6546f3..ae4106f00 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -38,15 +38,12 @@ class TrackQueue(Queue): def process(self, element): return element._track() - def status(self, element): - # We can skip elements entirely if they have no sources. - if not list(element.sources()): - - # But we still have to mark them as tracked + def push(self, element): + if not any(element.sources()): element._tracking_done() - return QueueStatus.SKIP + self.skip(element) - return QueueStatus.READY + super().push(element) def done(self, _, element, result, status): diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 176900b33..b7cd5356d 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -32,6 +32,11 @@ from .jobs import JobStatus, CacheSizeJob, CleanupJob from .._profile import Topics, profile_start, profile_end +import sys +def debug(*args): + print("➤➤➤", *args, file=sys.stderr) + + # A decent return code for Scheduler.run() class SchedStatus(): SUCCESS = 0 @@ -429,42 +434,14 @@ class Scheduler(): # and process anything that is ready. # def _sched_queue_jobs(self): - ready = [] - process_queues = True - - while self._queue_jobs and process_queues: - - # Pull elements forward through queues - elements = [] - for queue in self.queues: - queue.enqueue(elements) - elements = list(queue.dequeue()) - - # Kickoff whatever processes can be processed at this time - # - # We start by queuing from the last queue first, because - # we want to give priority to queues later in the - # scheduling process in the case that multiple queues - # share the same token type. - # - # This avoids starvation situations where we dont move on - # to fetch tasks for elements which failed to pull, and - # thus need all the pulls to complete before ever starting - # a build - ready.extend(chain.from_iterable( - q.harvest_jobs() for q in reversed(self.queues) - )) - - # harvest_jobs() may have decided to skip some jobs, making - # them eligible for promotion to the next queue as a side effect. - # - # If that happens, do another round. - process_queues = any(q.dequeue_ready() for q in self.queues) - - # Spawn the jobs - # - for job in ready: - self._spawn_job(job) + debug("Scheduling queues") + for queue in self.queues: + try: + while True: + job = queue.pop() + self._spawn_job(job) + except IndexError: + pass # _sched() # diff --git a/buildstream/_stream.py b/buildstream/_stream.py index 262b38852..87c2d172f 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -41,6 +41,9 @@ from .types import _KeyStrength from . import utils, _yaml, _site from . import Scope, Consistency +import sys +def debug(*args): + print("➤➤➤", *args, file=sys.stderr) # Stream() # @@ -152,6 +155,7 @@ class Stream(): isolate=False, command=None, usebuildtree=None): + raise NotImplementedError() # Assert we have everything we need built, unless the directory is specified # in which case we just blindly trust the directory, using the element @@ -258,8 +262,10 @@ class Stream(): if self._artifacts.has_fetch_remotes(): self._add_queue(PullQueue(self._scheduler)) - self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) - self._add_queue(BuildQueue(self._scheduler)) + build_queue = BuildQueue(self._scheduler) + + self._add_queue(FetchQueue(self._scheduler, next_queue=build_queue, skip_cached=True)) + self._add_queue(build_queue) if self._artifacts.has_push_remotes(): self._add_queue(PushQueue(self._scheduler)) @@ -359,7 +365,6 @@ class Stream(): selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): - use_config = True if remote: use_config = False @@ -400,6 +405,7 @@ class Stream(): selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): + raise NotImplementedError() use_config = True if remote: @@ -1022,6 +1028,8 @@ class Stream(): rewritable=rewritable, fetch_subprojects=fetch_subprojects) + debug("Elements order", [str(e) for e in elements]) + # Obtain the ArtifactElement objects artifacts = [self._project.create_artifact_element(ref) for ref in target_artifacts] @@ -1093,6 +1101,8 @@ class Stream(): selected, except_elements) + debug("SELECTED", [str(e) for e in selected]) + # Set the "required" artifacts that should not be removed # while this pipeline is active # @@ -1107,6 +1117,10 @@ class Stream(): itertools.chain.from_iterable( [element.sources() for element in required_elements()])) + counter = itertools.count() + for element in selected: + element._pipeline_id = next(counter) + debug("Setting id to", element._pipeline_id, "for", element) if selection == PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, # others are requested dynamically as needed. @@ -1153,7 +1167,10 @@ class Stream(): def _enqueue_plan(self, plan, *, queue=None): queue = queue or self._first_non_track_queue - queue.enqueue(plan) + debug("Plan is: ", [str(e) for e in plan]) + + for element in plan: + queue.push(element) self.session_elements += plan # _run() @@ -1188,10 +1205,11 @@ class Stream(): # fetch_original (Bool): Whether to fetch original unstaged # def _fetch(self, elements, *, track_elements=None, fetch_original=False): - if track_elements is None: track_elements = [] + debug("FETCH", [str(e) for e in elements], [str(e) for e in track_elements]) + # Subtract the track elements from the fetch elements, they will be added separately fetch_plan = self._pipeline.subtract_elements(elements, track_elements) diff --git a/buildstream/element.py b/buildstream/element.py index f1f0273f6..9d728bfd6 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -963,9 +963,36 @@ class Element(Plugin): if meta in cls.__instantiated_elements: return cls.__instantiated_elements[meta] + build_deps = [ + Element._new_from_meta(meta_dep) + for meta_dep in meta.build_dependencies + ] + + runtime_deps = [ + Element._new_from_meta(meta_dep) + for meta_dep in meta.dependencies + ] + + # # Instantiate dependencies + # for meta_dep in meta.dependencies: + # dependency = Element._new_from_meta(meta_dep) + # element.__runtime_dependencies.append(dependency) + # dependency.__reverse_dependencies.add(element) + + # for meta_dep in meta.build_dependencies: + # dependency = Element._new_from_meta(meta_dep) + # element.__build_dependencies.append(dependency) + # dependency.__reverse_dependencies.add(element) + element = meta.project.create_element(meta, first_pass=meta.first_pass) cls.__instantiated_elements[meta] = element + element.__runtime_dependencies = runtime_deps + element.__build_dependencies = build_deps + + for dep in chain(build_deps, runtime_deps): + dep.__reverse_dependencies.add(element) + # Instantiate sources and generate their keys for meta_source in meta.sources: meta_source.first_pass = meta.kind == "junction" @@ -979,17 +1006,6 @@ class Element(Plugin): if redundant_ref is not None: cls.__redundant_source_refs.append((source, redundant_ref)) - # Instantiate dependencies - for meta_dep in meta.dependencies: - dependency = Element._new_from_meta(meta_dep) - element.__runtime_dependencies.append(dependency) - dependency.__reverse_dependencies.add(element) - - for meta_dep in meta.build_dependencies: - dependency = Element._new_from_meta(meta_dep) - element.__build_dependencies.append(dependency) - dependency.__reverse_dependencies.add(element) - return element # _clear_meta_elements_cache() -- cgit v1.2.1