diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 15:44:48 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 15:44:48 +0900 |
commit | 3b44740589ee5f6634d877bf2e20b189b7624cff (patch) | |
tree | bfc834dee0817b50c359143508bad4ec33fe3569 /buildstream | |
parent | 494fdd65ac25cbc806fe9a369f31c595abf344e7 (diff) | |
download | buildstream-3b44740589ee5f6634d877bf2e20b189b7624cff.tar.gz |
_scheduler/queue.py: Adhere to policy on private symbols
And change the API contract a bit more, now the Queue object
takes the scheduler in the constructor and the Queue.attach()
method is removed.
This is a part of issue #285
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_pipeline.py | 20 | ||||
-rw-r--r-- | buildstream/_scheduler/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queue.py | 157 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 4 |
4 files changed, 123 insertions, 62 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 80b0e01ac..e4e8377c2 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -242,7 +242,7 @@ class Pipeline(): # are rewritten inline. # def track(self, scheduler): - track = TrackQueue() + track = TrackQueue(scheduler) track.enqueue(self._track_elements) self.session_elements = len(self._track_elements) @@ -278,10 +278,10 @@ class Pipeline(): self.session_elements = len(self._track_elements) + len(fetch_plan) - fetch = FetchQueue() + fetch = FetchQueue(scheduler) fetch.enqueue(fetch_plan) if self._track_elements: - track = TrackQueue() + track = TrackQueue(scheduler) track.enqueue(self._track_elements) queues = [track, fetch] else: @@ -318,22 +318,22 @@ class Pipeline(): # track_plan will be made consistent) self._assert_consistent(plan) - fetch = FetchQueue(skip_cached=True) - build = BuildQueue() + fetch = FetchQueue(scheduler, skip_cached=True) + build = BuildQueue(scheduler) track = None pull = None push = None queues = [] if self._track_elements: - track = TrackQueue() + track = TrackQueue(scheduler) queues.append(track) if self._artifacts.has_fetch_remotes(): - pull = PullQueue() + pull = PullQueue(scheduler) queues.append(pull) queues.append(fetch) queues.append(build) if self._artifacts.has_push_remotes(): - push = PushQueue() + push = PushQueue(scheduler) queues.append(push) # If we're going to track, tracking elements go into the first queue @@ -434,7 +434,7 @@ class Pipeline(): self._assert_consistent(plan) self.session_elements = len(plan) - pull = PullQueue() + pull = PullQueue(scheduler) pull.enqueue(plan) queues = [pull] @@ -461,7 +461,7 @@ class Pipeline(): self._assert_consistent(plan) self.session_elements = len(plan) - push = PushQueue() + push = PushQueue(scheduler) push.enqueue(plan) queues = [push] diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py index 19e53e009..2e27f8df0 100644 --- a/buildstream/_scheduler/fetchqueue.py +++ b/buildstream/_scheduler/fetchqueue.py @@ -34,8 +34,8 @@ class FetchQueue(Queue): complete_name = "Fetched" queue_type = QueueType.FETCH - def __init__(self, skip_cached=False): - super().__init__() + def __init__(self, scheduler, skip_cached=False): + super().__init__(scheduler) self.skip_cached = skip_cached diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py index d5f2fd216..7c4ad6919 100644 --- a/buildstream/_scheduler/queue.py +++ b/buildstream/_scheduler/queue.py @@ -62,6 +62,8 @@ class QueueStatus(Enum): # Queue() # +# Args: +# scheduler (Scheduler): The Scheduler # class Queue(): @@ -70,20 +72,25 @@ class Queue(): complete_name = None queue_type = None - def __init__(self): - self.scheduler = None - self.wait_queue = deque() - self.done_queue = deque() - self.active_jobs = [] - self.max_retries = 0 + def __init__(self, scheduler): - # For the frontend to know how many elements - # were successfully processed, failed, or skipped - # as they did not require processing. # - self.failed_elements = [] - self.processed_elements = [] - self.skipped_elements = [] + # Public members + # + self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation + self.failed_elements = [] # List of failed elements, for the frontend + self.processed_elements = [] # List of processed elements, for the frontend + self.skipped_elements = [] # List of skipped elements, for the frontend + + # + # Private members + # + self._scheduler = scheduler + self._wait_queue = deque() + self._done_queue = deque() + self._max_retries = 0 + if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: + self._max_retries = scheduler.context.sched_network_retries # Assert the subclass has setup class data assert self.action_name is not None @@ -150,15 +157,16 @@ class Queue(): pass ##################################################### - # Queue internals and Scheduler facing APIs # + # Scheduler / Pipeline facing APIs # ##################################################### - # Attach to the scheduler - def attach(self, scheduler): - self.scheduler = scheduler - if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: - self.max_retries = scheduler.context.sched_network_retries - + # enqueue() + # + # Enqueues some elements + # + # Args: + # elts (list): A list of Elements + # def enqueue(self, elts): if not elts: return @@ -168,23 +176,56 @@ class Queue(): skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] wait = [elt for elt in elts if elt not in skip] - self.wait_queue.extend(wait) - self.done_queue.extend(skip) + self._wait_queue.extend(wait) + self._done_queue.extend(skip) self.skipped_elements.extend(skip) + # dequeue() + # + # A generator which dequeues the elements which + # are ready to exit the queue. + # + # Yields: + # (Element): Elements being dequeued + # def dequeue(self): - while self.done_queue: - yield self.done_queue.popleft() + while self._done_queue: + yield self._done_queue.popleft() + # dequeue_ready() + # + # Reports whether there are any elements to dequeue + # + # Returns: + # (bool): Whether there are elements to dequeue + # def dequeue_ready(self): - return any(self.done_queue) + return any(self._done_queue) + # process_ready() + # + # Process elements in the queue, moving elements which were enqueued + # into the dequeue pool, and processing them if necessary. + # + # This will have different results for elements depending + # on the Queue.status() implementation. + # + # o Elements which are QueueStatus.WAIT will not be effected + # + # o Elements which are QueueStatus.READY will be processed + # and added to the Queue.active_jobs list as a result, + # given that the scheduler allows the Queue enough tokens + # for the given queue's job type + # + # o Elements which are QueueStatus.SKIP will move directly + # to the dequeue pool + # def process_ready(self): - scheduler = self.scheduler + scheduler = self._scheduler unready = [] - while self.wait_queue and scheduler.get_job_token(self.queue_type): - element = self.wait_queue.popleft() + while self._wait_queue and scheduler.get_job_token(self.queue_type): + element = self._wait_queue.popleft() status = self.status(element) if status == QueueStatus.WAIT: @@ -193,15 +234,15 @@ class Queue(): continue elif status == QueueStatus.SKIP: scheduler.put_job_token(self.queue_type) - self.done_queue.append(element) + self._done_queue.append(element) self.skipped_elements.append(element) continue self.prepare(element) job = Job(scheduler, element, self.action_name, - self.process, self.job_done, - max_retries=self.max_retries) + self.process, self._job_done, + max_retries=self._max_retries) scheduler.job_starting(job) job.spawn() @@ -209,9 +250,22 @@ class Queue(): # These were not ready but were in the beginning, give em # first priority again next time around - self.wait_queue.extendleft(unready) + self._wait_queue.extendleft(unready) + + ##################################################### + # Private Methods # + ##################################################### - def update_workspaces(self, element, job): + # _update_workspaces() + # + # Updates and possibly saves the workspaces in the + # main data model in the main process after a job completes. + # + # Args: + # element (Element): The element which completed + # job (Job): The job which completed + # + def _update_workspaces(self, element, job): # Handle any workspace modifications now # if job.workspace_dict: @@ -220,19 +274,28 @@ class Queue(): try: project.workspaces.save_config() except BstError as e: - self.message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e)) + self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e)) except Exception as e: # pylint: disable=broad-except - self.message(element, MessageType.BUG, - "Unhandled exception while saving workspaces", - detail=traceback.format_exc()) + self._message(element, MessageType.BUG, + "Unhandled exception while saving workspaces", + detail=traceback.format_exc()) - def job_done(self, job, element, success, result): + # _job_done() + # + # A callback reported by the Job() when a job completes + # + # This will call the Queue implementation specific Queue.done() + # implementation and trigger the scheduler to reschedule. + # + # See the Job object for an explanation of the call signature + # + def _job_done(self, job, element, success, result): # Remove from our jobs self.active_jobs.remove(job) # Update workspaces in the main task before calling any queue implementation - self.update_workspaces(element, job) + self._update_workspaces(element, job) # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed @@ -244,7 +307,7 @@ class Queue(): # Report error and mark as failed # - self.message(element, MessageType.ERROR, "Post processing error", detail=str(e)) + self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) self.failed_elements.append(element) # Treat this as a task error as it's related to a task @@ -258,16 +321,16 @@ class Queue(): # Report unhandled exceptions and mark as failed # - self.message(element, MessageType.BUG, - "Unhandled exception in post processing", - detail=traceback.format_exc()) + self._message(element, MessageType.BUG, + "Unhandled exception in post processing", + detail=traceback.format_exc()) self.failed_elements.append(element) else: # No exception occured, handle the success/failure state in the normal way # if success: - self.done_queue.append(element) + self._done_queue.append(element) if processed: self.processed_elements.append(element) else: @@ -277,16 +340,16 @@ class Queue(): # Give the token for this job back to the scheduler # immediately before invoking another round of scheduling - self.scheduler.put_job_token(self.queue_type) + self._scheduler.put_job_token(self.queue_type) # Notify frontend - self.scheduler.job_completed(self, job, success) + self._scheduler.job_completed(self, job, success) - self.scheduler.sched() + self._scheduler.sched() # Convenience wrapper for Queue implementations to send # a message for the element they are processing - def message(self, element, message_type, brief, **kwargs): + def _message(self, element, message_type, brief, **kwargs): context = element._get_context() message = Message(element._get_unique_id(), message_type, brief, **kwargs) context.message(message) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 01df0996b..0ab8ace7c 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -104,10 +104,8 @@ class Scheduler(): # def run(self, queues): - # Attach the queues + # Hold on to the queues to process self.queues = queues - for queue in queues: - queue.attach(self) # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. |