diff options
-rw-r--r-- | src/buildstream/_loader/loader.py | 25 | ||||
-rw-r--r-- | src/buildstream/_messenger.py | 29 | ||||
-rw-r--r-- | src/buildstream/_plugincontext.py | 49 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/jobpickler.py | 132 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 15 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 15 | ||||
-rw-r--r-- | src/buildstream/element.py | 37 | ||||
-rw-r--r-- | src/buildstream/plugin.py | 25 | ||||
-rw-r--r-- | src/buildstream/source.py | 26 |
9 files changed, 352 insertions, 1 deletions
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py index cb37d299c..cdfc1dd56 100644 --- a/src/buildstream/_loader/loader.py +++ b/src/buildstream/_loader/loader.py @@ -146,6 +146,31 @@ class Loader(): return ret + # get_state_for_child_job_pickling(self) + # + # Return data necessary to reconstruct this object in a child job process. + # + # This should be implemented the same as __getstate__(). We define this + # method instead as it is child job specific. + # + # Returns: + # (dict): This `state` is what we want `self.__dict__` to be restored to + # after instantiation in the child process. + # + def get_state_for_child_job_pickling(self): + state = self.__dict__.copy() + + # When pickling a Loader over to the ChildJob, we don't want to bring + # the whole Stream over with it. The _fetch_subprojects member is a + # method of the Stream. We also don't want to remove it in the main + # process. If we remove it in the child process then we will already be + # too late. The only time that seems just right is here, when preparing + # the child process' copy of the Loader. + # + del state['_fetch_subprojects'] + + return state + # clean_caches() # # Clean internal loader caches, recursively diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 7dec93994..d83b464ff 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -283,3 +283,32 @@ class Messenger(): # Write to the open log file self._log_handle.write('{}\n'.format(text)) self._log_handle.flush() + + # get_state_for_child_job_pickling(self) + # + # Return data necessary to reconstruct this object in a child job process. + # + # This should be implemented the same as __getstate__(). We define this + # method instead as it is child job specific. + # + # Returns: + # (dict): This `state` is what we want `self.__dict__` to be restored to + # after instantiation in the child process. + # + def get_state_for_child_job_pickling(self): + state = self.__dict__.copy() + + # When pickling a Messenger over to the ChildJob, we don't want to bring + # the whole _message_handler over with it. We also don't want to remove it + # in the main process. If we remove it in the child process then we will + # already be too late. The only time that seems just right is here, when + # preparing the child process' copy of the Messenger. + # + # Another approach might be to use a context manager on the Messenger, + # which removes and restores the _message_handler. This wouldn't require + # access to private details of Messenger, but it would open up a window + # where messagesw wouldn't be handled as expected. + # + del state['_message_handler'] + + return state diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py index 162b6fe40..9e32f4992 100644 --- a/src/buildstream/_plugincontext.py +++ b/src/buildstream/_plugincontext.py @@ -58,10 +58,50 @@ class PluginContext(): # The PluginSource object self._plugin_base = plugin_base - self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path) + self._site_plugin_path = site_plugin_path + self._site_source = plugin_base.make_plugin_source( + searchpath=self._site_plugin_path, + ) self._alternate_sources = {} self._format_versions = format_versions + def __getstate__(self): + state = self.__dict__.copy() + + # PluginSource is not a picklable type, so we must reconstruct this one + # as best we can when unpickling. + # + # Since the values of `_types` depend on the PluginSource, we must also + # get rid of those. It is only a cache - we will automatically recreate + # them on demand. + # + # Similarly we must clear out the `_alternate_sources` cache. + # + # Note that this method of referring to members is error-prone in that + # a later 'search and replace' renaming might miss these. Guard against + # this by making sure we are not creating new members, only clearing + # existing ones. + # + del state['_site_source'] + assert '_types' in state + state['_types'] = {} + assert '_alternate_sources' in state + state['_alternate_sources'] = {} + + return state + + def __setstate__(self, state): + self.__dict__.update(state) + + # Note that in order to enable plugins to be unpickled along with this + # PluginSource, we would also have to set and restore the 'identifier' + # of the PluginSource. We would also have to recreate `_types` as it + # was before unpickling them. We are not using this method in + # BuildStream, so the identifier is not restored here. + self._site_source = self._plugin_base.make_plugin_source( + searchpath=self._site_plugin_path, + ) + # lookup(): # # Fetches a type loaded from a plugin in this plugin context @@ -76,6 +116,13 @@ class PluginContext(): def lookup(self, kind): return self._ensure_plugin(kind) + # all_loaded_plugins(): + # + # Returns: an iterable over all the loaded plugins. + # + def all_loaded_plugins(self): + return self._types.values() + def _get_local_plugin_source(self, path): if ('local', path) not in self._alternate_sources: # key by a tuple to avoid collision diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py new file mode 100644 index 000000000..0edf88c10 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -0,0 +1,132 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Angelos Evripiotis <jevripiotis@bloomberg.net> + + +import copyreg +import io +import pickle + +from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto + +# BuildStream toplevel imports +from ..._loader import Loader +from ..._messenger import Messenger + + +# pickle_child_job() +# +# Perform the special case pickling required to pickle a child job for +# unpickling in a child process. +# +# Note that we don't need an `unpickle_child_job`, as regular `pickle.load()` +# will do everything required. +# +# Args: +# child_job (ChildJob): The job to be pickled. +# projects (List[Project]): The list of loaded projects, so we can get the +# relevant factories. +# +# Returns: +# An `io.BytesIO`, with the pickled contents of the ChildJob and everything it +# transitively refers to. +# +# Some types require special handling when pickling to send to another process. +# We register overrides for those special cases: +# +# o Very stateful objects: Some things carry much more state than they need for +# pickling over to the child job process. This extra state brings +# complication of supporting pickling of more types, and the performance +# penalty of the actual pickling. Use private knowledge of these objects to +# safely reduce the pickled state. +# +# o gRPC objects: These don't pickle, but they do have their own serialization +# mechanism, which we use instead. To avoid modifying generated code, we +# instead register overrides here. +# +# o Plugins: These cannot be unpickled unless the factory which created them +# has been unpickled first, with the same identifier as before. See note +# below. Some state in plugins is not necessary for child jobs, and comes +# with a heavy cost; we also need to remove this before pickling. +# +def pickle_child_job(child_job, projects): + + element_classes = [ + cls + for p in projects + if p.config.element_factory is not None + for cls, _ in p.config.element_factory.all_loaded_plugins() + ] + source_classes = [ + cls + for p in projects + if p.config.source_factory is not None + for cls, _ in p.config.source_factory.all_loaded_plugins() + ] + + data = io.BytesIO() + pickler = pickle.Pickler(data) + pickler.dispatch_table = copyreg.dispatch_table.copy() + + for cls in element_classes: + pickler.dispatch_table[cls] = _reduce_plugin + for cls in source_classes: + pickler.dispatch_table[cls] = _reduce_plugin + pickler.dispatch_table[ArtifactProto] = _reduce_artifact_proto + pickler.dispatch_table[Loader] = _reduce_object + pickler.dispatch_table[Messenger] = _reduce_object + + pickler.dump(child_job) + data.seek(0) + + return data + + +def _reduce_object(instance): + cls = type(instance) + state = instance.get_state_for_child_job_pickling() + return (cls.__new__, (cls,), state) + + +def _reduce_artifact_proto(instance): + assert isinstance(instance, ArtifactProto) + data = instance.SerializeToString() + return (_new_artifact_proto_from_reduction_args, (data,)) + + +def _new_artifact_proto_from_reduction_args(data): + instance = ArtifactProto() + instance.ParseFromString(data) + return instance + + +def _reduce_plugin(plugin): + factory, meta_kind, state = plugin._get_args_for_child_job_pickling() + args = (factory, meta_kind) + return (_new_plugin_from_reduction_args, args, state) + + +def _new_plugin_from_reduction_args(factory, meta_kind): + cls, _ = factory.lookup(meta_kind) + plugin = cls.__new__(cls) + + # Note that we rely on the `__project` member of the Plugin to keep + # `factory` alive after the scope of this function. If `factory` were to be + # GC'd then we would see undefined behaviour. + + return plugin diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 00d61140e..2dea1d48b 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -601,3 +601,18 @@ class Scheduler(): def _tick(self): self._ticker_callback() self.loop.call_later(1, self._tick) + + def __getstate__(self): + # The only use-cases for pickling in BuildStream at the time of writing + # are enabling the 'spawn' method of starting child processes, and + # saving jobs to disk for replays. + # + # In both of these use-cases, a common mistake is that something being + # pickled indirectly holds a reference to the Scheduler, which in turn + # holds lots of things that are not pickleable. + # + # Make this situation easier to debug by failing early, in the + # Scheduler itself. Pickling this is almost certainly a mistake, unless + # a new use-case arises. + # + raise TypeError("Scheduler objects should not be pickled.") diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index d791449cc..0f320c569 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1555,3 +1555,18 @@ class Stream(): self._message(MessageType.WARN, "No artifacts found for globs: {}".format(', '.join(artifact_globs))) return element_targets, artifact_refs + + def __getstate__(self): + # The only use-cases for pickling in BuildStream at the time of writing + # are enabling the 'spawn' method of starting child processes, and + # saving jobs to disk for replays. + # + # In both of these use-cases, a common mistake is that something being + # pickled indirectly holds a reference to the Stream, which in turn + # holds lots of things that are not pickleable. + # + # Make this situation easier to debug by failing early, in the + # Stream itself. Pickling this is almost certainly a mistake, unless + # a new use-case arises. + # + raise TypeError("Stream objects should not be pickled.") diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 396a72fb2..efa876c73 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -240,6 +240,7 @@ class Element(Plugin): self._build_log_path = None # The path of the build log for this Element self.__artifact = None # Artifact class for direct artifact composite interaction self.__strict_artifact = None # Artifact for strict cache key + self.__meta_kind = meta.kind # The kind of this source, required for unpickling # the index of the last source in this element that requires previous # sources for staging @@ -2314,6 +2315,42 @@ class Element(Plugin): rdep.__buildable_callback(rdep) rdep.__buildable_callback = None + # _get_args_for_child_job_pickling(self) + # + # Return data necessary to reconstruct this object in a child job process. + # + # Returns: + # (PluginContext, str, dict): A tuple of (factory, meta_kind, state), + # where `factory` is an object that can use `meta_kind` to create an + # instance of the same type as `self`. `state` is what we want + # `self.__dict__` to be restored to after instantiation in the child + # process. + # + def _get_args_for_child_job_pickling(self): + state = self.__dict__.copy() + + # These are called in the main process to notify the scheduler about + # certain things. They carry a reference to the scheduler, which we + # don't want in the child process, so clear them. + # + # Note that this method of referring to members is error-prone in that + # a later 'search and replace' renaming might miss these. Guard against + # this by making sure we are not creating new members, only clearing + # existing ones. + # + assert "_Element__can_query_cache_callback" in state + state["_Element__can_query_cache_callback"] = None + assert "_Element__buildable_callback" in state + state["_Element__buildable_callback"] = None + + # This callback is not even read in the child process, so delete it. + # If this assumption is invalidated, we will get an attribute error to + # let us know, and we will need to update accordingly. + del state["_Element__required_callback"] + + factory = self._get_project().config.element_factory + return factory, self.__meta_kind, state + ############################################################# # Private Local Methods # ############################################################# diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 9a322ab81..d9639161d 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -235,7 +235,13 @@ class Plugin(): self._unique_id = unique_id self.__context = context # The Context object + + # Note that when pickling jobs over to a child process, we rely on this + # reference to the Project, it keeps the plugin factory alive. If the + # factory were to be GC'd then we would see undefined behaviour. Make + # sure to test plugin pickling if this reference is to be removed. self.__project = project # The Project object + self.__provenance = provenance # The Provenance information self.__type_tag = type_tag # The type of plugin (element or source) self.__configuring = False # Whether we are currently configuring @@ -666,6 +672,25 @@ class Plugin(): def _preflight(self): self.preflight() + # _get_args_for_child_job_pickling(self) + # + # Return data necessary to reconstruct this object in a child job process. + # + # Returns: + # (PluginContext, str, dict): A tuple of (factory, meta_kind, state), + # where `factory` is an object that can use `meta_kind` to create an + # instance of the same type as `self`. `state` is what we want + # `self.__dict__` to be restored to after instantiation in the child + # process. + # + def _get_args_for_child_job_pickling(self): + # Note that this is only to be implemented as a BuildStream internal, + # so it's not an ImplError - those apply to custom plugins. Direct + # descendants of Plugin must implement this, e.g. Element and Source. + # Raise NotImplementedError as this would be an internal bug. + raise NotImplementedError("{tag} plugin '{kind}' does not implement _get_args_for_child_job_pickling()".format( + tag=self.__type_tag, kind=self.get_kind())) + ############################################################# # Local Private Methods # ############################################################# diff --git a/src/buildstream/source.py b/src/buildstream/source.py index 03c1301c5..b513fdb2a 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -316,6 +316,7 @@ class Source(Plugin): self.__element_kind = meta.element_kind # The kind of the element owning this source self.__directory = meta.directory # Staging relative directory self.__consistency = Consistency.INCONSISTENT # Cached consistency state + self.__meta_kind = meta.kind # The kind of this source, required for unpickling self.__key = None # Cache key for source @@ -1075,6 +1076,31 @@ class Source(Plugin): length = min(len(key), context.log_key_length) return key[:length] + # _get_args_for_child_job_pickling(self) + # + # Return data necessary to reconstruct this object in a child job process. + # + # Returns: + # (PluginContext, str, dict): A tuple of (factory, meta_kind, state), + # where `factory` is an object that can use `meta_kind` to create an + # instance of the same type as `self`. `state` is what we want + # `self.__dict__` to be restored to after instantiation in the child + # process. + # + def _get_args_for_child_job_pickling(self): + factory = self._get_project().config.source_factory + + # In case you're wondering, note that it doesn't seem to be necessary + # to make a copy of `self.__dict__` here, because: + # + # o It seems that the default implementation of `_PyObject_GetState` + # in `typeobject.c` currently works this way, in CPython. + # + # o The code sketch of how pickling works also returns `self.__dict__`: + # https://docs.python.org/3/library/pickle.html#pickling-class-instances + # + return factory, self.__meta_kind, self.__dict__ + ############################################################# # Local Private Methods # ############################################################# |