# # Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Angelos Evripiotis import copyreg import io import pickle from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest as DigestProto # BuildStream toplevel imports from ..._loader import Loader from ..._messenger import Messenger from ... import utils, node # Note that `str(type(proto_class))` results in `GeneratedProtocolMessageType` # instead of the concrete type, so we come up with our own names here. _NAME_TO_PROTO_CLASS = { "artifact": ArtifactProto, "digest": DigestProto, } _PROTO_CLASS_TO_NAME = {cls: name for name, cls in _NAME_TO_PROTO_CLASS.items()} # pickle_child_job() # # Perform the special case pickling required to pickle a child job for # unpickling in a child process. # # Args: # child_job (ChildJob): The job to pickle. # projects (List[Project]): The list of loaded projects, so we can get the # relevant factories. # def pickle_child_job(child_job, projects): # Note that we need to consider all the state of the program that's # necessary for the job, this includes e.g. the global state of the node # module. node_module_state = node._get_state_for_pickling() return _pickle_child_job_data((child_job, node_module_state), projects,) # do_pickled_child_job() # # Unpickle the supplied 'pickled' job and call 'child_action' on it. # # This is expected to be run in a subprocess started from the main process, as # such it will fixup any globals to be in the expected state. # # Args: # pickled (BytesIO): The pickled data, and job to execute. # *child_args (any) : Any parameters to be passed to `child_action`. # def do_pickled_child_job(pickled, *child_args): utils._is_main_process = _not_main_process child_job, node_module_state = pickle.load(pickled) node._set_state_from_pickling(node_module_state) return child_job.child_action(*child_args) # _not_main_process() # # A function to replace `utils._is_main_process` when we're running in a # subprocess that was not forked - the inheritance of the main process id will # not work in this case. # # Note that we'll always not be the main process by definition. # def _not_main_process(): return False # _pickle_child_job_data() # # Perform the special case pickling required to pickle a child job for # unpickling in a child process. # # Note that this just enables the pickling of things that contain ChildJob-s, # the thing to be pickled doesn't have to be a ChildJob. # # Note that we don't need an `unpickle_child_job_data`, as regular # `pickle.load()` will do everything required. # # Args: # child_job_data (ChildJob): The job to be pickled. # projects (List[Project]): The list of loaded projects, so we can get the # relevant factories. # # Returns: # An `io.BytesIO`, with the pickled contents of the ChildJob and everything it # transitively refers to. # # Some types require special handling when pickling to send to another process. # We register overrides for those special cases: # # o Very stateful objects: Some things carry much more state than they need for # pickling over to the child job process. This extra state brings # complication of supporting pickling of more types, and the performance # penalty of the actual pickling. Use private knowledge of these objects to # safely reduce the pickled state. # # o gRPC objects: These don't pickle, but they do have their own serialization # mechanism, which we use instead. To avoid modifying generated code, we # instead register overrides here. # # o Plugins: These cannot be unpickled unless the factory which created them # has been unpickled first, with the same identifier as before. See note # below. Some state in plugins is not necessary for child jobs, and comes # with a heavy cost; we also need to remove this before pickling. # def _pickle_child_job_data(child_job_data, projects): factory_list = [ factory for p in projects for factory in [ p.config.element_factory, p.first_pass_config.element_factory, p.config.source_factory, p.first_pass_config.source_factory, ] ] plugin_class_to_factory = { cls: factory for factory in factory_list if factory is not None for cls, _ in factory.all_loaded_plugins() } pickled_data = io.BytesIO() pickler = pickle.Pickler(pickled_data) pickler.dispatch_table = copyreg.dispatch_table.copy() def reduce_plugin(plugin): return _reduce_plugin_with_factory_dict(plugin, plugin_class_to_factory) for cls in plugin_class_to_factory: pickler.dispatch_table[cls] = reduce_plugin pickler.dispatch_table[ArtifactProto] = _reduce_proto pickler.dispatch_table[DigestProto] = _reduce_proto pickler.dispatch_table[Loader] = _reduce_object pickler.dispatch_table[Messenger] = _reduce_object pickler.dump(child_job_data) pickled_data.seek(0) return pickled_data def _reduce_object(instance): cls = type(instance) state = instance.get_state_for_child_job_pickling() return (cls.__new__, (cls,), state) def _reduce_proto(instance): name = _PROTO_CLASS_TO_NAME[type(instance)] data = instance.SerializeToString() return (_new_proto_from_reduction_args, (name, data)) def _new_proto_from_reduction_args(name, data): cls = _NAME_TO_PROTO_CLASS[name] instance = cls() instance.ParseFromString(data) return instance def _reduce_plugin_with_factory_dict(plugin, plugin_class_to_factory): meta_kind, state = plugin._get_args_for_child_job_pickling() assert meta_kind factory = plugin_class_to_factory[type(plugin)] args = (factory, meta_kind) return (_new_plugin_from_reduction_args, args, state) def _new_plugin_from_reduction_args(factory, meta_kind): cls, _ = factory.lookup(None, meta_kind, None) plugin = cls.__new__(cls) # Note that we rely on the `__project` member of the Plugin to keep # `factory` alive after the scope of this function. If `factory` were to be # GC'd then we would see undefined behaviour. return plugin