summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-04-09 13:31:39 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-04 15:01:50 +0000
commit1ff7c902f5df37415b2c30ee8800295e4ba81709 (patch)
treee48f0d4894c6f96fdb23bb3636a9a4107bbe4ac2
parent866f114ca2a9d51e285e2887c9dd4359206851e1 (diff)
downloadbuildstream-1ff7c902f5df37415b2c30ee8800295e4ba81709.tar.gz
_scheduler: don't pass whole queue to child job
Stop passing the scheduler's job queue's across to child jobs, via the 'action_cb' parameter. Instead pass a module-level function, which will pickle nicely. This isn't much of a problem while we are in the 'fork' multiprocessing model. As we move towards supporting the 'spawn' model for win32, then we need to consider what we will be pickling and unpickling, to cross the process boundary.
-rw-r--r--src/buildstream/_scheduler/queues/artifactpushqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py10
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py21
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/queue.py25
-rw-r--r--src/buildstream/_scheduler/queues/sourcepushqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py8
7 files changed, 65 insertions, 32 deletions
diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
index b861d4fc7..3b45a7154 100644
--- a/src/buildstream/_scheduler/queues/artifactpushqueue.py
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -32,13 +32,16 @@ class ArtifactPushQueue(Queue):
complete_name = "Pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # returns whether an artifact was uploaded or not
- if not element._push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return ArtifactPushQueue._push_or_skip
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+ @staticmethod
+ def _push_or_skip(element):
+ if not element._push():
+ raise SkipJob(ArtifactPushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index b280661cc..1dd45607b 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -57,7 +57,7 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
self._done_queue.append(element)
@@ -66,8 +66,8 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
- def process(self, element):
- return element._assemble()
+ def get_process_func(self):
+ return BuildQueue._assemble_element
def status(self, element):
if element._cached_success():
@@ -115,3 +115,7 @@ class BuildQueue(Queue):
# Set a "buildable" callback for an element not yet ready
# to be processed in the build queue.
element._set_buildable_callback(self._enqueue_element)
+
+ @staticmethod
+ def _assemble_element(element):
+ return element._assemble()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index bbb3b3d78..af2a69444 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -39,10 +39,13 @@ class FetchQueue(Queue):
super().__init__(scheduler)
self._skip_cached = skip_cached
- self._fetch_original = fetch_original
+ self._should_fetch_original = fetch_original
- def process(self, element):
- element._fetch(fetch_original=self._fetch_original)
+ def get_process_func(self):
+ if self._should_fetch_original:
+ return FetchQueue._fetch_original
+ else:
+ return FetchQueue._fetch_not_original
def status(self, element):
# Optionally skip elements that are already in the artifact cache
@@ -56,7 +59,7 @@ class FetchQueue(Queue):
# This will automatically skip elements which
# have no sources.
- if not element._should_fetch(self._fetch_original):
+ if not element._should_fetch(self._should_fetch_original):
return QueueStatus.SKIP
return QueueStatus.READY
@@ -69,7 +72,7 @@ class FetchQueue(Queue):
element._fetch_done()
# Successful fetch, we must be CACHED or in the sourcecache
- if self._fetch_original:
+ if self._should_fetch_original:
assert element._get_consistency() == Consistency.CACHED
else:
assert element._source_cached()
@@ -78,3 +81,11 @@ class FetchQueue(Queue):
# Set a "can_query_cache" callback for an element not yet ready
# to be processed in the fetch queue.
element._set_can_query_cache_callback(self._enqueue_element)
+
+ @staticmethod
+ def _fetch_not_original(element):
+ element._fetch(fetch_original=False)
+
+ @staticmethod
+ def _fetch_original(element):
+ element._fetch(fetch_original=True)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 245293342..2c46cd2fd 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,10 +33,8 @@ class PullQueue(Queue):
complete_name = "Pulled"
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
- def process(self, element):
- # returns whether an artifact was downloaded or not
- if not element._pull():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return PullQueue._pull_or_skip
def status(self, element):
if not element._can_query_cache():
@@ -65,3 +63,8 @@ class PullQueue(Queue):
# immediately ready to query the artifact cache so that it
# may be pulled.
element._set_can_query_cache_callback(self._enqueue_element)
+
+ @staticmethod
+ def _pull_or_skip(element):
+ if not element._pull():
+ raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 7c577e7bd..f2cefd5d2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -91,20 +91,25 @@ class Queue():
# Abstract Methods for Queue implementations #
#####################################################
- # process()
+ # get_process_func()
#
- # Abstract method for processing an element
+ # Abstract method, returns a callable for processing an element.
#
- # Args:
- # element (Element): An element to process
+ # The callable should fit the signature `process(element: Element) -> any`.
#
- # Returns:
- # (any): An optional something to be returned
- # for every element successfully processed
+ # Note that the callable may be executed in a child process, so the return
+ # value should be a simple object (must be pickle-able, i.e. strings,
+ # lists, dicts, numbers, but not Element instances). This is sent to back
+ # to the main process.
#
+ # This method is the only way for a queue to affect elements, and so is
+ # not optional to implement.
#
- def process(self, element):
- pass
+ # Returns:
+ # (Callable[[Element], Any]): The callable for processing elements.
+ #
+ def get_process_func(self):
+ raise NotImplementedError()
# status()
#
@@ -218,7 +223,7 @@ class Queue():
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
index c38460e6a..4b4b96e2b 100644
--- a/src/buildstream/_scheduler/queues/sourcepushqueue.py
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -30,13 +30,16 @@ class SourcePushQueue(Queue):
complete_name = "Sources pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # Returns whether a source was pushed or not
- if not element._source_push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return SourcePushQueue._push_or_skip
def status(self, element):
if element._skip_source_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+ @staticmethod
+ def _push_or_skip(element):
+ if not element._source_push():
+ raise SkipJob(SourcePushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 194bb7e1d..6bdf838f9 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -35,8 +35,8 @@ class TrackQueue(Queue):
complete_name = "Tracked"
resources = [ResourceType.DOWNLOAD]
- def process(self, element):
- return element._track()
+ def get_process_func(self):
+ return TrackQueue._track_element
def status(self, element):
# We can skip elements entirely if they have no sources.
@@ -60,3 +60,7 @@ class TrackQueue(Queue):
source._set_ref(new_ref, save=True)
element._tracking_done()
+
+ @staticmethod
+ def _track_element(element):
+ return element._track()