diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 13:49:40 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 13:49:40 +0900 |
commit | 835e5475ddf2cdc3c9196072384d2f58a7df23fb (patch) | |
tree | 72429f92c9f2065017ea8d397100db1877e36903 | |
parent | 974bd246deae896b574b5c45a549af0a60cdc63f (diff) | |
download | buildstream-835e5475ddf2cdc3c9196072384d2f58a7df23fb.tar.gz |
_scheduler: Slightly changing the Job / Queue API contracts
For the Job:
o The action_cb, complete_cb and max_retries are now passed
into the constructor, and the Job.spawn() API has no arguments.
o The complete_cb() signature has changed to take a success
boolean instead of a returncode integer, and also now receives
the result directly.
For the Queue:
o Adapted to new Job API contract changes
o Changed the Queue.done() method to now take a success boolean
instead of a returncode integer
Updated Queue implementations to handle the success boolean instead
of the returncode integer.
-rw-r--r-- | buildstream/_scheduler/buildqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/job.py | 105 | ||||
-rw-r--r-- | buildstream/_scheduler/pullqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/pushqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queue.py | 19 | ||||
-rw-r--r-- | buildstream/_scheduler/trackqueue.py | 4 |
7 files changed, 89 insertions, 56 deletions
diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/buildqueue.py index 6802c7f70..46ce72ca7 100644 --- a/buildstream/_scheduler/buildqueue.py +++ b/buildstream/_scheduler/buildqueue.py @@ -46,8 +46,9 @@ class BuildQueue(Queue): return QueueStatus.READY - def done(self, element, result, returncode): - if returncode == 0: + def done(self, element, result, success): + + if success: # Inform element in main process that assembly is done element._assemble_done() diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py index 159d122aa..19e53e009 100644 --- a/buildstream/_scheduler/fetchqueue.py +++ b/buildstream/_scheduler/fetchqueue.py @@ -62,9 +62,9 @@ class FetchQueue(Queue): return QueueStatus.READY - def done(self, element, result, returncode): + def done(self, element, result, success): - if returncode != 0: + if not success: return False element._update_state() diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py index 86f3b87a5..c70e2d607 100644 --- a/buildstream/_scheduler/job.py +++ b/buildstream/_scheduler/job.py @@ -53,14 +53,48 @@ class Process(multiprocessing.Process): # Job() # +# The Job object represents a parallel task, when calling Job.spawn(), +# the given `action_cb` will be called in parallel to the calling process, +# and `complete_cb` will be called with the action result in the calling +# process when the job completes. +# # Args: # scheduler (Scheduler): The scheduler # element (Element): The element to operate on # action_name (str): The queue action name +# action_cb (callable): The action function +# complete_cb (callable): The function to call when complete +# max_retries (int): The maximum number of retries +# +# Here is the calling signature of the action_cb: +# +# action_cb(): +# +# This function will be called in the child task +# +# Args: +# element (Element): The element passed to the Job() constructor +# +# Returns: +# (object): Any abstract simple python object, including a string, int, +# bool, list or dict, this must be a simple serializable object. +# +# Here is the calling signature of the complete_cb: +# +# complete_cb(): +# +# This function will be called when the child task completes +# +# Args: +# job (Job): The job object which completed +# element (Element): The element passed to the Job() constructor +# success (bool): True if the action_cb did not raise an exception +# result (object): The deserialized object returned by the `action_cb`, or None +# if `success` is False # class Job(): - def __init__(self, scheduler, element, action_name): + def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0): # Shared with child process self.scheduler = scheduler # The scheduler @@ -68,32 +102,24 @@ class Job(): self.process = None # The Process object self.watcher = None # Child process watcher self.action_name = action_name # The action name for the Queue - self.action = None # The action callable function - self.complete = None # The complete callable function + self.action_cb = action_cb # The action callable function + self.complete_cb = complete_cb # The complete callable function self.element = element # The element we're processing self.listening = False # Whether the parent is currently listening self.suspended = False # Whether this job is currently suspended + self.max_retries = max_retries # Maximum number of automatic retries # Only relevant in parent process after spawning self.pid = None # The child's pid in the parent self.result = None # Return value of child action in the parent self.workspace_dict = None # A serialized Workspace object, after any modifications - - self.tries = 0 + self.tries = 0 # Try count, for retryable jobs # spawn() # - # Args: - # action (callable): The action function - # complete (callable): The function to call when complete - # max_retries (int): The maximum number of retries - # - def spawn(self, action, complete, max_retries=0): - self.action = action - self.complete = complete + def spawn(self): self.tries += 1 - self.max_retries = max_retries self.parent_start_listening() @@ -115,16 +141,7 @@ class Job(): # Wait for it to complete self.watcher = asyncio.get_child_watcher() - self.watcher.add_child_handler(self.pid, self.child_complete, self.element) - - # shutdown() - # - # Should be called after the job completes - # - def shutdown(self): - # Make sure we've read everything we need and then stop listening - self.parent_process_queue() - self.parent_stop_listening() + self.watcher.add_child_handler(self.pid, self.parent_child_completed, self.element) # terminate() # @@ -264,7 +281,7 @@ class Job(): try: # Try the task action - result = self.action(element) + result = self.action_cb(element) except BstError as e: elapsed = datetime.datetime.now() - starttime @@ -299,10 +316,7 @@ class Job(): else: # No exception occurred in the action self.child_send_workspace(element) - - if result is not None: - envelope = Envelope('result', result) - self.queue.put(envelope) + self.child_send_result(result) elapsed = datetime.datetime.now() - starttime self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed, @@ -327,21 +341,17 @@ class Job(): }) self.queue.put(envelope) + def child_send_result(self, result): + if result is not None: + envelope = Envelope('result', result) + self.queue.put(envelope) + def child_send_workspace(self, element): workspace = element._get_workspace() if workspace: envelope = Envelope('workspace', workspace.to_dict()) self.queue.put(envelope) - def child_complete(self, pid, returncode, element): - self.shutdown() - - if returncode != 0 and self.tries <= self.max_retries: - self.spawn(self.action, self.complete, self.max_retries) - return - - self.complete(self, returncode, element) - def child_shutdown(self, exit_code): self.queue.close() sys.exit(exit_code) @@ -405,6 +415,25 @@ class Job(): ####################################################### # Parent Process # ####################################################### + + # shutdown() + # + # Should be called after the job completes + # + def parent_shutdown(self): + # Make sure we've read everything we need and then stop listening + self.parent_process_queue() + self.parent_stop_listening() + + def parent_child_completed(self, pid, returncode, element): + self.parent_shutdown() + + if returncode != 0 and self.tries <= self.max_retries: + self.spawn() + return + + self.complete_cb(self, element, returncode == 0, self.result) + def parent_process_envelope(self, envelope): if not self.listening: return diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/pullqueue.py index 3413d2783..5630ef7c0 100644 --- a/buildstream/_scheduler/pullqueue.py +++ b/buildstream/_scheduler/pullqueue.py @@ -47,9 +47,9 @@ class PullQueue(Queue): else: return QueueStatus.SKIP - def done(self, element, result, returncode): + def done(self, element, result, success): - if returncode != 0: + if not success: return False if not result: diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/pushqueue.py index 6c7c95094..8a68d5953 100644 --- a/buildstream/_scheduler/pushqueue.py +++ b/buildstream/_scheduler/pushqueue.py @@ -41,9 +41,9 @@ class PushQueue(Queue): return QueueStatus.READY - def done(self, element, result, returncode): + def done(self, element, result, success): - if returncode != 0: + if not success: return False # Element._push() returns True if it uploaded an artifact, diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py index b060af368..6aa12af1c 100644 --- a/buildstream/_scheduler/queue.py +++ b/buildstream/_scheduler/queue.py @@ -139,13 +139,14 @@ class Queue(): # Args: # element (Element): The element which completed processing # result (any): The return value of the process() implementation - # returncode (int): The process return code, 0 = success + # success (bool): True if the process() implementation did not + # raise any exception # # Returns: # (bool): True if the element should appear to be processsed, # Otherwise False will count the element as "skipped" # - def done(self, element, result, returncode): + def done(self, element, result, success): pass ##################################################### @@ -195,10 +196,12 @@ class Queue(): self.prepare(element) - job = Job(scheduler, element, self.action_name) + job = Job(scheduler, element, self.action_name, + self.process, self.job_done, + max_retries=self.max_retries) scheduler.job_starting(job) - job.spawn(self.process, self.job_done, self.max_retries) + job.spawn() self.active_jobs.append(job) # These were not ready but were in the beginning, give em @@ -220,7 +223,7 @@ class Queue(): "Unhandled exception while saving workspaces", detail=traceback.format_exc()) - def job_done(self, job, returncode, element): + def job_done(self, job, element, success, result): # Remove from our jobs self.active_jobs.remove(job) @@ -232,7 +235,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - processed = self.done(element, job.result, returncode) + processed = self.done(element, result, success) except BstError as e: @@ -260,7 +263,7 @@ class Queue(): # No exception occured, handle the success/failure state in the normal way # - if returncode == 0: + if success: self.done_queue.append(element) if processed: self.processed_elements.append(element) @@ -274,7 +277,7 @@ class Queue(): self.scheduler.put_job_token(self.queue_type) # Notify frontend - self.scheduler.job_completed(self, job, returncode == 0) + self.scheduler.job_completed(self, job, success) self.scheduler.sched() diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/trackqueue.py index fe5843148..2e7bc8b97 100644 --- a/buildstream/_scheduler/trackqueue.py +++ b/buildstream/_scheduler/trackqueue.py @@ -48,9 +48,9 @@ class TrackQueue(Queue): return QueueStatus.READY - def done(self, element, result, returncode): + def done(self, element, result, success): - if returncode != 0: + if not success: return False changed = False |