summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-07-04 15:39:49 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-04 15:39:49 +0000
commit40a2ebe05bf2d4a922ac11f453c79a3d74b196ca (patch)
treee48f0d4894c6f96fdb23bb3636a9a4107bbe4ac2
parenta80adc03bfa6cd2ba7a7fe3981f00556e5e1f5da (diff)
parent1ff7c902f5df37415b2c30ee8800295e4ba81709 (diff)
downloadbuildstream-40a2ebe05bf2d4a922ac11f453c79a3d74b196ca.tar.gz
Merge branch 'aevri/smallerjobs2' into 'master'
_scheduler: don't pass whole queue to child job See merge request BuildStream/buildstream!1408
-rw-r--r--src/buildstream/_scheduler/queues/artifactpushqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py10
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py21
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/queue.py29
-rw-r--r--src/buildstream/_scheduler/queues/sourcepushqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py8
7 files changed, 67 insertions, 34 deletions
diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
index b861d4fc7..3b45a7154 100644
--- a/src/buildstream/_scheduler/queues/artifactpushqueue.py
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -32,13 +32,16 @@ class ArtifactPushQueue(Queue):
complete_name = "Pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # returns whether an artifact was uploaded or not
- if not element._push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return ArtifactPushQueue._push_or_skip
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+ @staticmethod
+ def _push_or_skip(element):
+ if not element._push():
+ raise SkipJob(ArtifactPushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index b280661cc..1dd45607b 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -57,7 +57,7 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
self._done_queue.append(element)
@@ -66,8 +66,8 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
- def process(self, element):
- return element._assemble()
+ def get_process_func(self):
+ return BuildQueue._assemble_element
def status(self, element):
if element._cached_success():
@@ -115,3 +115,7 @@ class BuildQueue(Queue):
# Set a "buildable" callback for an element not yet ready
# to be processed in the build queue.
element._set_buildable_callback(self._enqueue_element)
+
+ @staticmethod
+ def _assemble_element(element):
+ return element._assemble()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index bbb3b3d78..af2a69444 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -39,10 +39,13 @@ class FetchQueue(Queue):
super().__init__(scheduler)
self._skip_cached = skip_cached
- self._fetch_original = fetch_original
+ self._should_fetch_original = fetch_original
- def process(self, element):
- element._fetch(fetch_original=self._fetch_original)
+ def get_process_func(self):
+ if self._should_fetch_original:
+ return FetchQueue._fetch_original
+ else:
+ return FetchQueue._fetch_not_original
def status(self, element):
# Optionally skip elements that are already in the artifact cache
@@ -56,7 +59,7 @@ class FetchQueue(Queue):
# This will automatically skip elements which
# have no sources.
- if not element._should_fetch(self._fetch_original):
+ if not element._should_fetch(self._should_fetch_original):
return QueueStatus.SKIP
return QueueStatus.READY
@@ -69,7 +72,7 @@ class FetchQueue(Queue):
element._fetch_done()
# Successful fetch, we must be CACHED or in the sourcecache
- if self._fetch_original:
+ if self._should_fetch_original:
assert element._get_consistency() == Consistency.CACHED
else:
assert element._source_cached()
@@ -78,3 +81,11 @@ class FetchQueue(Queue):
# Set a "can_query_cache" callback for an element not yet ready
# to be processed in the fetch queue.
element._set_can_query_cache_callback(self._enqueue_element)
+
+ @staticmethod
+ def _fetch_not_original(element):
+ element._fetch(fetch_original=False)
+
+ @staticmethod
+ def _fetch_original(element):
+ element._fetch(fetch_original=True)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 245293342..2c46cd2fd 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,10 +33,8 @@ class PullQueue(Queue):
complete_name = "Pulled"
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
- def process(self, element):
- # returns whether an artifact was downloaded or not
- if not element._pull():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return PullQueue._pull_or_skip
def status(self, element):
if not element._can_query_cache():
@@ -65,3 +63,8 @@ class PullQueue(Queue):
# immediately ready to query the artifact cache so that it
# may be pulled.
element._set_can_query_cache_callback(self._enqueue_element)
+
+ @staticmethod
+ def _pull_or_skip(element):
+ if not element._pull():
+ raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 9a07f633c..f2cefd5d2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -91,20 +91,25 @@ class Queue():
# Abstract Methods for Queue implementations #
#####################################################
- # process()
+ # get_process_func()
#
- # Abstract method for processing an element
+ # Abstract method, returns a callable for processing an element.
#
- # Args:
- # element (Element): An element to process
+ # The callable should fit the signature `process(element: Element) -> any`.
#
- # Returns:
- # (any): An optional something to be returned
- # for every element successfully processed
+ # Note that the callable may be executed in a child process, so the return
+ # value should be a simple object (must be pickle-able, i.e. strings,
+ # lists, dicts, numbers, but not Element instances). This is sent to back
+ # to the main process.
#
+ # This method is the only way for a queue to affect elements, and so is
+ # not optional to implement.
#
- def process(self, element):
- pass
+ # Returns:
+ # (Callable[[Element], Any]): The callable for processing elements.
+ #
+ def get_process_func(self):
+ raise NotImplementedError()
# status()
#
@@ -218,7 +223,7 @@ class Queue():
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
@@ -259,7 +264,7 @@ class Queue():
workspaces.save_config()
except BstError as e:
self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
- except Exception as e: # pylint: disable=broad-except
+ except Exception: # pylint: disable=broad-except
self._message(element, MessageType.BUG,
"Unhandled exception while saving workspaces",
detail=traceback.format_exc())
@@ -302,7 +307,7 @@ class Queue():
#
set_last_task_error(e.domain, e.reason)
- except Exception as e: # pylint: disable=broad-except
+ except Exception: # pylint: disable=broad-except
# Report unhandled exceptions and mark as failed
#
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
index c38460e6a..4b4b96e2b 100644
--- a/src/buildstream/_scheduler/queues/sourcepushqueue.py
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -30,13 +30,16 @@ class SourcePushQueue(Queue):
complete_name = "Sources pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # Returns whether a source was pushed or not
- if not element._source_push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return SourcePushQueue._push_or_skip
def status(self, element):
if element._skip_source_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+ @staticmethod
+ def _push_or_skip(element):
+ if not element._source_push():
+ raise SkipJob(SourcePushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 194bb7e1d..6bdf838f9 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -35,8 +35,8 @@ class TrackQueue(Queue):
complete_name = "Tracked"
resources = [ResourceType.DOWNLOAD]
- def process(self, element):
- return element._track()
+ def get_process_func(self):
+ return TrackQueue._track_element
def status(self, element):
# We can skip elements entirely if they have no sources.
@@ -60,3 +60,7 @@ class TrackQueue(Queue):
source._set_ref(new_ref, save=True)
element._tracking_done()
+
+ @staticmethod
+ def _track_element(element):
+ return element._track()