summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_loader/loader.py25
-rw-r--r--src/buildstream/_messenger.py29
-rw-r--r--src/buildstream/_plugincontext.py49
-rw-r--r--src/buildstream/_scheduler/jobs/jobpickler.py132
-rw-r--r--src/buildstream/_scheduler/scheduler.py15
-rw-r--r--src/buildstream/_stream.py15
-rw-r--r--src/buildstream/element.py37
-rw-r--r--src/buildstream/plugin.py25
-rw-r--r--src/buildstream/source.py26
9 files changed, 352 insertions, 1 deletions
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index cb37d299c..cdfc1dd56 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -146,6 +146,31 @@ class Loader():
return ret
+ # get_state_for_child_job_pickling(self)
+ #
+ # Return data necessary to reconstruct this object in a child job process.
+ #
+ # This should be implemented the same as __getstate__(). We define this
+ # method instead as it is child job specific.
+ #
+ # Returns:
+ # (dict): This `state` is what we want `self.__dict__` to be restored to
+ # after instantiation in the child process.
+ #
+ def get_state_for_child_job_pickling(self):
+ state = self.__dict__.copy()
+
+ # When pickling a Loader over to the ChildJob, we don't want to bring
+ # the whole Stream over with it. The _fetch_subprojects member is a
+ # method of the Stream. We also don't want to remove it in the main
+ # process. If we remove it in the child process then we will already be
+ # too late. The only time that seems just right is here, when preparing
+ # the child process' copy of the Loader.
+ #
+ del state['_fetch_subprojects']
+
+ return state
+
# clean_caches()
#
# Clean internal loader caches, recursively
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 7dec93994..d83b464ff 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -283,3 +283,32 @@ class Messenger():
# Write to the open log file
self._log_handle.write('{}\n'.format(text))
self._log_handle.flush()
+
+ # get_state_for_child_job_pickling(self)
+ #
+ # Return data necessary to reconstruct this object in a child job process.
+ #
+ # This should be implemented the same as __getstate__(). We define this
+ # method instead as it is child job specific.
+ #
+ # Returns:
+ # (dict): This `state` is what we want `self.__dict__` to be restored to
+ # after instantiation in the child process.
+ #
+ def get_state_for_child_job_pickling(self):
+ state = self.__dict__.copy()
+
+ # When pickling a Messenger over to the ChildJob, we don't want to bring
+ # the whole _message_handler over with it. We also don't want to remove it
+ # in the main process. If we remove it in the child process then we will
+ # already be too late. The only time that seems just right is here, when
+ # preparing the child process' copy of the Messenger.
+ #
+ # Another approach might be to use a context manager on the Messenger,
+ # which removes and restores the _message_handler. This wouldn't require
+ # access to private details of Messenger, but it would open up a window
+ # where messagesw wouldn't be handled as expected.
+ #
+ del state['_message_handler']
+
+ return state
diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py
index 162b6fe40..9e32f4992 100644
--- a/src/buildstream/_plugincontext.py
+++ b/src/buildstream/_plugincontext.py
@@ -58,10 +58,50 @@ class PluginContext():
# The PluginSource object
self._plugin_base = plugin_base
- self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path)
+ self._site_plugin_path = site_plugin_path
+ self._site_source = plugin_base.make_plugin_source(
+ searchpath=self._site_plugin_path,
+ )
self._alternate_sources = {}
self._format_versions = format_versions
+ def __getstate__(self):
+ state = self.__dict__.copy()
+
+ # PluginSource is not a picklable type, so we must reconstruct this one
+ # as best we can when unpickling.
+ #
+ # Since the values of `_types` depend on the PluginSource, we must also
+ # get rid of those. It is only a cache - we will automatically recreate
+ # them on demand.
+ #
+ # Similarly we must clear out the `_alternate_sources` cache.
+ #
+ # Note that this method of referring to members is error-prone in that
+ # a later 'search and replace' renaming might miss these. Guard against
+ # this by making sure we are not creating new members, only clearing
+ # existing ones.
+ #
+ del state['_site_source']
+ assert '_types' in state
+ state['_types'] = {}
+ assert '_alternate_sources' in state
+ state['_alternate_sources'] = {}
+
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ # Note that in order to enable plugins to be unpickled along with this
+ # PluginSource, we would also have to set and restore the 'identifier'
+ # of the PluginSource. We would also have to recreate `_types` as it
+ # was before unpickling them. We are not using this method in
+ # BuildStream, so the identifier is not restored here.
+ self._site_source = self._plugin_base.make_plugin_source(
+ searchpath=self._site_plugin_path,
+ )
+
# lookup():
#
# Fetches a type loaded from a plugin in this plugin context
@@ -76,6 +116,13 @@ class PluginContext():
def lookup(self, kind):
return self._ensure_plugin(kind)
+ # all_loaded_plugins():
+ #
+ # Returns: an iterable over all the loaded plugins.
+ #
+ def all_loaded_plugins(self):
+ return self._types.values()
+
def _get_local_plugin_source(self, path):
if ('local', path) not in self._alternate_sources:
# key by a tuple to avoid collision
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
new file mode 100644
index 000000000..0edf88c10
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -0,0 +1,132 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Angelos Evripiotis <jevripiotis@bloomberg.net>
+
+
+import copyreg
+import io
+import pickle
+
+from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto
+
+# BuildStream toplevel imports
+from ..._loader import Loader
+from ..._messenger import Messenger
+
+
+# pickle_child_job()
+#
+# Perform the special case pickling required to pickle a child job for
+# unpickling in a child process.
+#
+# Note that we don't need an `unpickle_child_job`, as regular `pickle.load()`
+# will do everything required.
+#
+# Args:
+# child_job (ChildJob): The job to be pickled.
+# projects (List[Project]): The list of loaded projects, so we can get the
+# relevant factories.
+#
+# Returns:
+# An `io.BytesIO`, with the pickled contents of the ChildJob and everything it
+# transitively refers to.
+#
+# Some types require special handling when pickling to send to another process.
+# We register overrides for those special cases:
+#
+# o Very stateful objects: Some things carry much more state than they need for
+# pickling over to the child job process. This extra state brings
+# complication of supporting pickling of more types, and the performance
+# penalty of the actual pickling. Use private knowledge of these objects to
+# safely reduce the pickled state.
+#
+# o gRPC objects: These don't pickle, but they do have their own serialization
+# mechanism, which we use instead. To avoid modifying generated code, we
+# instead register overrides here.
+#
+# o Plugins: These cannot be unpickled unless the factory which created them
+# has been unpickled first, with the same identifier as before. See note
+# below. Some state in plugins is not necessary for child jobs, and comes
+# with a heavy cost; we also need to remove this before pickling.
+#
+def pickle_child_job(child_job, projects):
+
+ element_classes = [
+ cls
+ for p in projects
+ if p.config.element_factory is not None
+ for cls, _ in p.config.element_factory.all_loaded_plugins()
+ ]
+ source_classes = [
+ cls
+ for p in projects
+ if p.config.source_factory is not None
+ for cls, _ in p.config.source_factory.all_loaded_plugins()
+ ]
+
+ data = io.BytesIO()
+ pickler = pickle.Pickler(data)
+ pickler.dispatch_table = copyreg.dispatch_table.copy()
+
+ for cls in element_classes:
+ pickler.dispatch_table[cls] = _reduce_plugin
+ for cls in source_classes:
+ pickler.dispatch_table[cls] = _reduce_plugin
+ pickler.dispatch_table[ArtifactProto] = _reduce_artifact_proto
+ pickler.dispatch_table[Loader] = _reduce_object
+ pickler.dispatch_table[Messenger] = _reduce_object
+
+ pickler.dump(child_job)
+ data.seek(0)
+
+ return data
+
+
+def _reduce_object(instance):
+ cls = type(instance)
+ state = instance.get_state_for_child_job_pickling()
+ return (cls.__new__, (cls,), state)
+
+
+def _reduce_artifact_proto(instance):
+ assert isinstance(instance, ArtifactProto)
+ data = instance.SerializeToString()
+ return (_new_artifact_proto_from_reduction_args, (data,))
+
+
+def _new_artifact_proto_from_reduction_args(data):
+ instance = ArtifactProto()
+ instance.ParseFromString(data)
+ return instance
+
+
+def _reduce_plugin(plugin):
+ factory, meta_kind, state = plugin._get_args_for_child_job_pickling()
+ args = (factory, meta_kind)
+ return (_new_plugin_from_reduction_args, args, state)
+
+
+def _new_plugin_from_reduction_args(factory, meta_kind):
+ cls, _ = factory.lookup(meta_kind)
+ plugin = cls.__new__(cls)
+
+ # Note that we rely on the `__project` member of the Plugin to keep
+ # `factory` alive after the scope of this function. If `factory` were to be
+ # GC'd then we would see undefined behaviour.
+
+ return plugin
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 00d61140e..2dea1d48b 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -601,3 +601,18 @@ class Scheduler():
def _tick(self):
self._ticker_callback()
self.loop.call_later(1, self._tick)
+
+ def __getstate__(self):
+ # The only use-cases for pickling in BuildStream at the time of writing
+ # are enabling the 'spawn' method of starting child processes, and
+ # saving jobs to disk for replays.
+ #
+ # In both of these use-cases, a common mistake is that something being
+ # pickled indirectly holds a reference to the Scheduler, which in turn
+ # holds lots of things that are not pickleable.
+ #
+ # Make this situation easier to debug by failing early, in the
+ # Scheduler itself. Pickling this is almost certainly a mistake, unless
+ # a new use-case arises.
+ #
+ raise TypeError("Scheduler objects should not be pickled.")
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d791449cc..0f320c569 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1555,3 +1555,18 @@ class Stream():
self._message(MessageType.WARN, "No artifacts found for globs: {}".format(', '.join(artifact_globs)))
return element_targets, artifact_refs
+
+ def __getstate__(self):
+ # The only use-cases for pickling in BuildStream at the time of writing
+ # are enabling the 'spawn' method of starting child processes, and
+ # saving jobs to disk for replays.
+ #
+ # In both of these use-cases, a common mistake is that something being
+ # pickled indirectly holds a reference to the Stream, which in turn
+ # holds lots of things that are not pickleable.
+ #
+ # Make this situation easier to debug by failing early, in the
+ # Stream itself. Pickling this is almost certainly a mistake, unless
+ # a new use-case arises.
+ #
+ raise TypeError("Stream objects should not be pickled.")
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 396a72fb2..efa876c73 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -240,6 +240,7 @@ class Element(Plugin):
self._build_log_path = None # The path of the build log for this Element
self.__artifact = None # Artifact class for direct artifact composite interaction
self.__strict_artifact = None # Artifact for strict cache key
+ self.__meta_kind = meta.kind # The kind of this source, required for unpickling
# the index of the last source in this element that requires previous
# sources for staging
@@ -2314,6 +2315,42 @@ class Element(Plugin):
rdep.__buildable_callback(rdep)
rdep.__buildable_callback = None
+ # _get_args_for_child_job_pickling(self)
+ #
+ # Return data necessary to reconstruct this object in a child job process.
+ #
+ # Returns:
+ # (PluginContext, str, dict): A tuple of (factory, meta_kind, state),
+ # where `factory` is an object that can use `meta_kind` to create an
+ # instance of the same type as `self`. `state` is what we want
+ # `self.__dict__` to be restored to after instantiation in the child
+ # process.
+ #
+ def _get_args_for_child_job_pickling(self):
+ state = self.__dict__.copy()
+
+ # These are called in the main process to notify the scheduler about
+ # certain things. They carry a reference to the scheduler, which we
+ # don't want in the child process, so clear them.
+ #
+ # Note that this method of referring to members is error-prone in that
+ # a later 'search and replace' renaming might miss these. Guard against
+ # this by making sure we are not creating new members, only clearing
+ # existing ones.
+ #
+ assert "_Element__can_query_cache_callback" in state
+ state["_Element__can_query_cache_callback"] = None
+ assert "_Element__buildable_callback" in state
+ state["_Element__buildable_callback"] = None
+
+ # This callback is not even read in the child process, so delete it.
+ # If this assumption is invalidated, we will get an attribute error to
+ # let us know, and we will need to update accordingly.
+ del state["_Element__required_callback"]
+
+ factory = self._get_project().config.element_factory
+ return factory, self.__meta_kind, state
+
#############################################################
# Private Local Methods #
#############################################################
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 9a322ab81..d9639161d 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -235,7 +235,13 @@ class Plugin():
self._unique_id = unique_id
self.__context = context # The Context object
+
+ # Note that when pickling jobs over to a child process, we rely on this
+ # reference to the Project, it keeps the plugin factory alive. If the
+ # factory were to be GC'd then we would see undefined behaviour. Make
+ # sure to test plugin pickling if this reference is to be removed.
self.__project = project # The Project object
+
self.__provenance = provenance # The Provenance information
self.__type_tag = type_tag # The type of plugin (element or source)
self.__configuring = False # Whether we are currently configuring
@@ -666,6 +672,25 @@ class Plugin():
def _preflight(self):
self.preflight()
+ # _get_args_for_child_job_pickling(self)
+ #
+ # Return data necessary to reconstruct this object in a child job process.
+ #
+ # Returns:
+ # (PluginContext, str, dict): A tuple of (factory, meta_kind, state),
+ # where `factory` is an object that can use `meta_kind` to create an
+ # instance of the same type as `self`. `state` is what we want
+ # `self.__dict__` to be restored to after instantiation in the child
+ # process.
+ #
+ def _get_args_for_child_job_pickling(self):
+ # Note that this is only to be implemented as a BuildStream internal,
+ # so it's not an ImplError - those apply to custom plugins. Direct
+ # descendants of Plugin must implement this, e.g. Element and Source.
+ # Raise NotImplementedError as this would be an internal bug.
+ raise NotImplementedError("{tag} plugin '{kind}' does not implement _get_args_for_child_job_pickling()".format(
+ tag=self.__type_tag, kind=self.get_kind()))
+
#############################################################
# Local Private Methods #
#############################################################
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index 03c1301c5..b513fdb2a 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -316,6 +316,7 @@ class Source(Plugin):
self.__element_kind = meta.element_kind # The kind of the element owning this source
self.__directory = meta.directory # Staging relative directory
self.__consistency = Consistency.INCONSISTENT # Cached consistency state
+ self.__meta_kind = meta.kind # The kind of this source, required for unpickling
self.__key = None # Cache key for source
@@ -1075,6 +1076,31 @@ class Source(Plugin):
length = min(len(key), context.log_key_length)
return key[:length]
+ # _get_args_for_child_job_pickling(self)
+ #
+ # Return data necessary to reconstruct this object in a child job process.
+ #
+ # Returns:
+ # (PluginContext, str, dict): A tuple of (factory, meta_kind, state),
+ # where `factory` is an object that can use `meta_kind` to create an
+ # instance of the same type as `self`. `state` is what we want
+ # `self.__dict__` to be restored to after instantiation in the child
+ # process.
+ #
+ def _get_args_for_child_job_pickling(self):
+ factory = self._get_project().config.source_factory
+
+ # In case you're wondering, note that it doesn't seem to be necessary
+ # to make a copy of `self.__dict__` here, because:
+ #
+ # o It seems that the default implementation of `_PyObject_GetState`
+ # in `typeobject.c` currently works this way, in CPython.
+ #
+ # o The code sketch of how pickling works also returns `self.__dict__`:
+ # https://docs.python.org/3/library/pickle.html#pickling-class-instances
+ #
+ return factory, self.__meta_kind, self.__dict__
+
#############################################################
# Local Private Methods #
#############################################################