summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 13:49:40 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 13:49:40 +0900
commit835e5475ddf2cdc3c9196072384d2f58a7df23fb (patch)
tree72429f92c9f2065017ea8d397100db1877e36903
parent974bd246deae896b574b5c45a549af0a60cdc63f (diff)
downloadbuildstream-835e5475ddf2cdc3c9196072384d2f58a7df23fb.tar.gz
_scheduler: Slightly changing the Job / Queue API contracts
For the Job: o The action_cb, complete_cb and max_retries are now passed into the constructor, and the Job.spawn() API has no arguments. o The complete_cb() signature has changed to take a success boolean instead of a returncode integer, and also now receives the result directly. For the Queue: o Adapted to new Job API contract changes o Changed the Queue.done() method to now take a success boolean instead of a returncode integer Updated Queue implementations to handle the success boolean instead of the returncode integer.
-rw-r--r--buildstream/_scheduler/buildqueue.py5
-rw-r--r--buildstream/_scheduler/fetchqueue.py4
-rw-r--r--buildstream/_scheduler/job.py105
-rw-r--r--buildstream/_scheduler/pullqueue.py4
-rw-r--r--buildstream/_scheduler/pushqueue.py4
-rw-r--r--buildstream/_scheduler/queue.py19
-rw-r--r--buildstream/_scheduler/trackqueue.py4
7 files changed, 89 insertions, 56 deletions
diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/buildqueue.py
index 6802c7f70..46ce72ca7 100644
--- a/buildstream/_scheduler/buildqueue.py
+++ b/buildstream/_scheduler/buildqueue.py
@@ -46,8 +46,9 @@ class BuildQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, returncode):
- if returncode == 0:
+ def done(self, element, result, success):
+
+ if success:
# Inform element in main process that assembly is done
element._assemble_done()
diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py
index 159d122aa..19e53e009 100644
--- a/buildstream/_scheduler/fetchqueue.py
+++ b/buildstream/_scheduler/fetchqueue.py
@@ -62,9 +62,9 @@ class FetchQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, returncode):
+ def done(self, element, result, success):
- if returncode != 0:
+ if not success:
return False
element._update_state()
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py
index 86f3b87a5..c70e2d607 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/job.py
@@ -53,14 +53,48 @@ class Process(multiprocessing.Process):
# Job()
#
+# The Job object represents a parallel task, when calling Job.spawn(),
+# the given `action_cb` will be called in parallel to the calling process,
+# and `complete_cb` will be called with the action result in the calling
+# process when the job completes.
+#
# Args:
# scheduler (Scheduler): The scheduler
# element (Element): The element to operate on
# action_name (str): The queue action name
+# action_cb (callable): The action function
+# complete_cb (callable): The function to call when complete
+# max_retries (int): The maximum number of retries
+#
+# Here is the calling signature of the action_cb:
+#
+# action_cb():
+#
+# This function will be called in the child task
+#
+# Args:
+# element (Element): The element passed to the Job() constructor
+#
+# Returns:
+# (object): Any abstract simple python object, including a string, int,
+# bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+# complete_cb():
+#
+# This function will be called when the child task completes
+#
+# Args:
+# job (Job): The job object which completed
+# element (Element): The element passed to the Job() constructor
+# success (bool): True if the action_cb did not raise an exception
+# result (object): The deserialized object returned by the `action_cb`, or None
+# if `success` is False
#
class Job():
- def __init__(self, scheduler, element, action_name):
+ def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
# Shared with child process
self.scheduler = scheduler # The scheduler
@@ -68,32 +102,24 @@ class Job():
self.process = None # The Process object
self.watcher = None # Child process watcher
self.action_name = action_name # The action name for the Queue
- self.action = None # The action callable function
- self.complete = None # The complete callable function
+ self.action_cb = action_cb # The action callable function
+ self.complete_cb = complete_cb # The complete callable function
self.element = element # The element we're processing
self.listening = False # Whether the parent is currently listening
self.suspended = False # Whether this job is currently suspended
+ self.max_retries = max_retries # Maximum number of automatic retries
# Only relevant in parent process after spawning
self.pid = None # The child's pid in the parent
self.result = None # Return value of child action in the parent
self.workspace_dict = None # A serialized Workspace object, after any modifications
-
- self.tries = 0
+ self.tries = 0 # Try count, for retryable jobs
# spawn()
#
- # Args:
- # action (callable): The action function
- # complete (callable): The function to call when complete
- # max_retries (int): The maximum number of retries
- #
- def spawn(self, action, complete, max_retries=0):
- self.action = action
- self.complete = complete
+ def spawn(self):
self.tries += 1
- self.max_retries = max_retries
self.parent_start_listening()
@@ -115,16 +141,7 @@ class Job():
# Wait for it to complete
self.watcher = asyncio.get_child_watcher()
- self.watcher.add_child_handler(self.pid, self.child_complete, self.element)
-
- # shutdown()
- #
- # Should be called after the job completes
- #
- def shutdown(self):
- # Make sure we've read everything we need and then stop listening
- self.parent_process_queue()
- self.parent_stop_listening()
+ self.watcher.add_child_handler(self.pid, self.parent_child_completed, self.element)
# terminate()
#
@@ -264,7 +281,7 @@ class Job():
try:
# Try the task action
- result = self.action(element)
+ result = self.action_cb(element)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
@@ -299,10 +316,7 @@ class Job():
else:
# No exception occurred in the action
self.child_send_workspace(element)
-
- if result is not None:
- envelope = Envelope('result', result)
- self.queue.put(envelope)
+ self.child_send_result(result)
elapsed = datetime.datetime.now() - starttime
self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
@@ -327,21 +341,17 @@ class Job():
})
self.queue.put(envelope)
+ def child_send_result(self, result):
+ if result is not None:
+ envelope = Envelope('result', result)
+ self.queue.put(envelope)
+
def child_send_workspace(self, element):
workspace = element._get_workspace()
if workspace:
envelope = Envelope('workspace', workspace.to_dict())
self.queue.put(envelope)
- def child_complete(self, pid, returncode, element):
- self.shutdown()
-
- if returncode != 0 and self.tries <= self.max_retries:
- self.spawn(self.action, self.complete, self.max_retries)
- return
-
- self.complete(self, returncode, element)
-
def child_shutdown(self, exit_code):
self.queue.close()
sys.exit(exit_code)
@@ -405,6 +415,25 @@ class Job():
#######################################################
# Parent Process #
#######################################################
+
+ # shutdown()
+ #
+ # Should be called after the job completes
+ #
+ def parent_shutdown(self):
+ # Make sure we've read everything we need and then stop listening
+ self.parent_process_queue()
+ self.parent_stop_listening()
+
+ def parent_child_completed(self, pid, returncode, element):
+ self.parent_shutdown()
+
+ if returncode != 0 and self.tries <= self.max_retries:
+ self.spawn()
+ return
+
+ self.complete_cb(self, element, returncode == 0, self.result)
+
def parent_process_envelope(self, envelope):
if not self.listening:
return
diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/pullqueue.py
index 3413d2783..5630ef7c0 100644
--- a/buildstream/_scheduler/pullqueue.py
+++ b/buildstream/_scheduler/pullqueue.py
@@ -47,9 +47,9 @@ class PullQueue(Queue):
else:
return QueueStatus.SKIP
- def done(self, element, result, returncode):
+ def done(self, element, result, success):
- if returncode != 0:
+ if not success:
return False
if not result:
diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/pushqueue.py
index 6c7c95094..8a68d5953 100644
--- a/buildstream/_scheduler/pushqueue.py
+++ b/buildstream/_scheduler/pushqueue.py
@@ -41,9 +41,9 @@ class PushQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, returncode):
+ def done(self, element, result, success):
- if returncode != 0:
+ if not success:
return False
# Element._push() returns True if it uploaded an artifact,
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index b060af368..6aa12af1c 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -139,13 +139,14 @@ class Queue():
# Args:
# element (Element): The element which completed processing
# result (any): The return value of the process() implementation
- # returncode (int): The process return code, 0 = success
+ # success (bool): True if the process() implementation did not
+ # raise any exception
#
# Returns:
# (bool): True if the element should appear to be processsed,
# Otherwise False will count the element as "skipped"
#
- def done(self, element, result, returncode):
+ def done(self, element, result, success):
pass
#####################################################
@@ -195,10 +196,12 @@ class Queue():
self.prepare(element)
- job = Job(scheduler, element, self.action_name)
+ job = Job(scheduler, element, self.action_name,
+ self.process, self.job_done,
+ max_retries=self.max_retries)
scheduler.job_starting(job)
- job.spawn(self.process, self.job_done, self.max_retries)
+ job.spawn()
self.active_jobs.append(job)
# These were not ready but were in the beginning, give em
@@ -220,7 +223,7 @@ class Queue():
"Unhandled exception while saving workspaces",
detail=traceback.format_exc())
- def job_done(self, job, returncode, element):
+ def job_done(self, job, element, success, result):
# Remove from our jobs
self.active_jobs.remove(job)
@@ -232,7 +235,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(element, job.result, returncode)
+ processed = self.done(element, result, success)
except BstError as e:
@@ -260,7 +263,7 @@ class Queue():
# No exception occured, handle the success/failure state in the normal way
#
- if returncode == 0:
+ if success:
self.done_queue.append(element)
if processed:
self.processed_elements.append(element)
@@ -274,7 +277,7 @@ class Queue():
self.scheduler.put_job_token(self.queue_type)
# Notify frontend
- self.scheduler.job_completed(self, job, returncode == 0)
+ self.scheduler.job_completed(self, job, success)
self.scheduler.sched()
diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/trackqueue.py
index fe5843148..2e7bc8b97 100644
--- a/buildstream/_scheduler/trackqueue.py
+++ b/buildstream/_scheduler/trackqueue.py
@@ -48,9 +48,9 @@ class TrackQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, returncode):
+ def done(self, element, result, success):
- if returncode != 0:
+ if not success:
return False
changed = False