summaryrefslogtreecommitdiff
path: root/buildstream
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 15:44:48 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 15:44:48 +0900
commit3b44740589ee5f6634d877bf2e20b189b7624cff (patch)
treebfc834dee0817b50c359143508bad4ec33fe3569 /buildstream
parent494fdd65ac25cbc806fe9a369f31c595abf344e7 (diff)
downloadbuildstream-3b44740589ee5f6634d877bf2e20b189b7624cff.tar.gz
_scheduler/queue.py: Adhere to policy on private symbols
And change the API contract a bit more, now the Queue object takes the scheduler in the constructor and the Queue.attach() method is removed. This is a part of issue #285
Diffstat (limited to 'buildstream')
-rw-r--r--buildstream/_pipeline.py20
-rw-r--r--buildstream/_scheduler/fetchqueue.py4
-rw-r--r--buildstream/_scheduler/queue.py157
-rw-r--r--buildstream/_scheduler/scheduler.py4
4 files changed, 123 insertions, 62 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 80b0e01ac..e4e8377c2 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -242,7 +242,7 @@ class Pipeline():
# are rewritten inline.
#
def track(self, scheduler):
- track = TrackQueue()
+ track = TrackQueue(scheduler)
track.enqueue(self._track_elements)
self.session_elements = len(self._track_elements)
@@ -278,10 +278,10 @@ class Pipeline():
self.session_elements = len(self._track_elements) + len(fetch_plan)
- fetch = FetchQueue()
+ fetch = FetchQueue(scheduler)
fetch.enqueue(fetch_plan)
if self._track_elements:
- track = TrackQueue()
+ track = TrackQueue(scheduler)
track.enqueue(self._track_elements)
queues = [track, fetch]
else:
@@ -318,22 +318,22 @@ class Pipeline():
# track_plan will be made consistent)
self._assert_consistent(plan)
- fetch = FetchQueue(skip_cached=True)
- build = BuildQueue()
+ fetch = FetchQueue(scheduler, skip_cached=True)
+ build = BuildQueue(scheduler)
track = None
pull = None
push = None
queues = []
if self._track_elements:
- track = TrackQueue()
+ track = TrackQueue(scheduler)
queues.append(track)
if self._artifacts.has_fetch_remotes():
- pull = PullQueue()
+ pull = PullQueue(scheduler)
queues.append(pull)
queues.append(fetch)
queues.append(build)
if self._artifacts.has_push_remotes():
- push = PushQueue()
+ push = PushQueue(scheduler)
queues.append(push)
# If we're going to track, tracking elements go into the first queue
@@ -434,7 +434,7 @@ class Pipeline():
self._assert_consistent(plan)
self.session_elements = len(plan)
- pull = PullQueue()
+ pull = PullQueue(scheduler)
pull.enqueue(plan)
queues = [pull]
@@ -461,7 +461,7 @@ class Pipeline():
self._assert_consistent(plan)
self.session_elements = len(plan)
- push = PushQueue()
+ push = PushQueue(scheduler)
push.enqueue(plan)
queues = [push]
diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py
index 19e53e009..2e27f8df0 100644
--- a/buildstream/_scheduler/fetchqueue.py
+++ b/buildstream/_scheduler/fetchqueue.py
@@ -34,8 +34,8 @@ class FetchQueue(Queue):
complete_name = "Fetched"
queue_type = QueueType.FETCH
- def __init__(self, skip_cached=False):
- super().__init__()
+ def __init__(self, scheduler, skip_cached=False):
+ super().__init__(scheduler)
self.skip_cached = skip_cached
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index d5f2fd216..7c4ad6919 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -62,6 +62,8 @@ class QueueStatus(Enum):
# Queue()
#
+# Args:
+# scheduler (Scheduler): The Scheduler
#
class Queue():
@@ -70,20 +72,25 @@ class Queue():
complete_name = None
queue_type = None
- def __init__(self):
- self.scheduler = None
- self.wait_queue = deque()
- self.done_queue = deque()
- self.active_jobs = []
- self.max_retries = 0
+ def __init__(self, scheduler):
- # For the frontend to know how many elements
- # were successfully processed, failed, or skipped
- # as they did not require processing.
#
- self.failed_elements = []
- self.processed_elements = []
- self.skipped_elements = []
+ # Public members
+ #
+ self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation
+ self.failed_elements = [] # List of failed elements, for the frontend
+ self.processed_elements = [] # List of processed elements, for the frontend
+ self.skipped_elements = [] # List of skipped elements, for the frontend
+
+ #
+ # Private members
+ #
+ self._scheduler = scheduler
+ self._wait_queue = deque()
+ self._done_queue = deque()
+ self._max_retries = 0
+ if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH:
+ self._max_retries = scheduler.context.sched_network_retries
# Assert the subclass has setup class data
assert self.action_name is not None
@@ -150,15 +157,16 @@ class Queue():
pass
#####################################################
- # Queue internals and Scheduler facing APIs #
+ # Scheduler / Pipeline facing APIs #
#####################################################
- # Attach to the scheduler
- def attach(self, scheduler):
- self.scheduler = scheduler
- if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH:
- self.max_retries = scheduler.context.sched_network_retries
-
+ # enqueue()
+ #
+ # Enqueues some elements
+ #
+ # Args:
+ # elts (list): A list of Elements
+ #
def enqueue(self, elts):
if not elts:
return
@@ -168,23 +176,56 @@ class Queue():
skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
wait = [elt for elt in elts if elt not in skip]
- self.wait_queue.extend(wait)
- self.done_queue.extend(skip)
+ self._wait_queue.extend(wait)
+ self._done_queue.extend(skip)
self.skipped_elements.extend(skip)
+ # dequeue()
+ #
+ # A generator which dequeues the elements which
+ # are ready to exit the queue.
+ #
+ # Yields:
+ # (Element): Elements being dequeued
+ #
def dequeue(self):
- while self.done_queue:
- yield self.done_queue.popleft()
+ while self._done_queue:
+ yield self._done_queue.popleft()
+ # dequeue_ready()
+ #
+ # Reports whether there are any elements to dequeue
+ #
+ # Returns:
+ # (bool): Whether there are elements to dequeue
+ #
def dequeue_ready(self):
- return any(self.done_queue)
+ return any(self._done_queue)
+ # process_ready()
+ #
+ # Process elements in the queue, moving elements which were enqueued
+ # into the dequeue pool, and processing them if necessary.
+ #
+ # This will have different results for elements depending
+ # on the Queue.status() implementation.
+ #
+ # o Elements which are QueueStatus.WAIT will not be effected
+ #
+ # o Elements which are QueueStatus.READY will be processed
+ # and added to the Queue.active_jobs list as a result,
+ # given that the scheduler allows the Queue enough tokens
+ # for the given queue's job type
+ #
+ # o Elements which are QueueStatus.SKIP will move directly
+ # to the dequeue pool
+ #
def process_ready(self):
- scheduler = self.scheduler
+ scheduler = self._scheduler
unready = []
- while self.wait_queue and scheduler.get_job_token(self.queue_type):
- element = self.wait_queue.popleft()
+ while self._wait_queue and scheduler.get_job_token(self.queue_type):
+ element = self._wait_queue.popleft()
status = self.status(element)
if status == QueueStatus.WAIT:
@@ -193,15 +234,15 @@ class Queue():
continue
elif status == QueueStatus.SKIP:
scheduler.put_job_token(self.queue_type)
- self.done_queue.append(element)
+ self._done_queue.append(element)
self.skipped_elements.append(element)
continue
self.prepare(element)
job = Job(scheduler, element, self.action_name,
- self.process, self.job_done,
- max_retries=self.max_retries)
+ self.process, self._job_done,
+ max_retries=self._max_retries)
scheduler.job_starting(job)
job.spawn()
@@ -209,9 +250,22 @@ class Queue():
# These were not ready but were in the beginning, give em
# first priority again next time around
- self.wait_queue.extendleft(unready)
+ self._wait_queue.extendleft(unready)
+
+ #####################################################
+ # Private Methods #
+ #####################################################
- def update_workspaces(self, element, job):
+ # _update_workspaces()
+ #
+ # Updates and possibly saves the workspaces in the
+ # main data model in the main process after a job completes.
+ #
+ # Args:
+ # element (Element): The element which completed
+ # job (Job): The job which completed
+ #
+ def _update_workspaces(self, element, job):
# Handle any workspace modifications now
#
if job.workspace_dict:
@@ -220,19 +274,28 @@ class Queue():
try:
project.workspaces.save_config()
except BstError as e:
- self.message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
+ self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
except Exception as e: # pylint: disable=broad-except
- self.message(element, MessageType.BUG,
- "Unhandled exception while saving workspaces",
- detail=traceback.format_exc())
+ self._message(element, MessageType.BUG,
+ "Unhandled exception while saving workspaces",
+ detail=traceback.format_exc())
- def job_done(self, job, element, success, result):
+ # _job_done()
+ #
+ # A callback reported by the Job() when a job completes
+ #
+ # This will call the Queue implementation specific Queue.done()
+ # implementation and trigger the scheduler to reschedule.
+ #
+ # See the Job object for an explanation of the call signature
+ #
+ def _job_done(self, job, element, success, result):
# Remove from our jobs
self.active_jobs.remove(job)
# Update workspaces in the main task before calling any queue implementation
- self.update_workspaces(element, job)
+ self._update_workspaces(element, job)
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed
@@ -244,7 +307,7 @@ class Queue():
# Report error and mark as failed
#
- self.message(element, MessageType.ERROR, "Post processing error", detail=str(e))
+ self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
self.failed_elements.append(element)
# Treat this as a task error as it's related to a task
@@ -258,16 +321,16 @@ class Queue():
# Report unhandled exceptions and mark as failed
#
- self.message(element, MessageType.BUG,
- "Unhandled exception in post processing",
- detail=traceback.format_exc())
+ self._message(element, MessageType.BUG,
+ "Unhandled exception in post processing",
+ detail=traceback.format_exc())
self.failed_elements.append(element)
else:
# No exception occured, handle the success/failure state in the normal way
#
if success:
- self.done_queue.append(element)
+ self._done_queue.append(element)
if processed:
self.processed_elements.append(element)
else:
@@ -277,16 +340,16 @@ class Queue():
# Give the token for this job back to the scheduler
# immediately before invoking another round of scheduling
- self.scheduler.put_job_token(self.queue_type)
+ self._scheduler.put_job_token(self.queue_type)
# Notify frontend
- self.scheduler.job_completed(self, job, success)
+ self._scheduler.job_completed(self, job, success)
- self.scheduler.sched()
+ self._scheduler.sched()
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
- def message(self, element, message_type, brief, **kwargs):
+ def _message(self, element, message_type, brief, **kwargs):
context = element._get_context()
message = Message(element._get_unique_id(), message_type, brief, **kwargs)
context.message(message)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 01df0996b..0ab8ace7c 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -104,10 +104,8 @@ class Scheduler():
#
def run(self, queues):
- # Attach the queues
+ # Hold on to the queues to process
self.queues = queues
- for queue in queues:
- queue.attach(self)
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.