diff options
author | Chandan Singh <csingh43@bloomberg.net> | 2019-11-11 17:07:09 +0000 |
---|---|---|
committer | Chandan Singh <chandan@chandansingh.net> | 2019-11-14 21:21:06 +0000 |
commit | 122177153b14664a0e4fed85aa4f22b87cfabf56 (patch) | |
tree | 032c2e46825af91f6fe27f22b5b567eea2b7935d /src/buildstream/_scheduler/jobs | |
parent | a3ee349558f36a220f79665873b36c1b0f990c8e (diff) | |
download | buildstream-122177153b14664a0e4fed85aa4f22b87cfabf56.tar.gz |
Reformat code using Black
As discussed over the mailing list, reformat code using Black. This is a
one-off change to reformat all our codebase. Moving forward, we
shouldn't expect such blanket reformats. Rather, we expect each change
to already comply with the Black formatting style.
Diffstat (limited to 'src/buildstream/_scheduler/jobs')
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 145 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/jobpickler.py | 14 |
3 files changed, 71 insertions, 100 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index 246eb75c6..6e035be9c 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -69,9 +69,9 @@ class ElementJob(Job): super().__init__(*args, **kwargs) self.set_name(element._get_full_name()) self.queue = queue - self._element = element # Set the Element pertaining to the job - self._action_cb = action_cb # The action callable function - self._complete_cb = complete_cb # The complete callable function + self._element = element # Set the Element pertaining to the job + self._action_cb = action_cb # The action callable function + self._complete_cb = complete_cb # The complete callable function # Set the plugin element name & key for logging purposes self.set_message_element_name(self.name) @@ -97,9 +97,7 @@ class ChildElementJob(ChildJob): # This should probably be omitted for non-build tasks but it's harmless here elt_env = self._element.get_environment() env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) - self.message(MessageType.LOG, - "Build environment for element {}".format(self._element.name), - detail=env_dump) + self.message(MessageType.LOG, "Build environment for element {}".format(self._element.name), detail=env_dump) # Run the action return self._action_cb(self._element) @@ -109,6 +107,6 @@ class ChildElementJob(ChildJob): workspace = self._element._get_workspace() if workspace is not None: - data['workspace'] = workspace.to_dict() + data["workspace"] = workspace.to_dict() return data diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 3363d7b60..e7866bcd4 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -66,7 +66,7 @@ class JobStatus(FastEnum): # Used to distinguish between status messages and return values -class _Envelope(): +class _Envelope: def __init__(self, message_type, message): self.message_type = message_type self.message = message @@ -115,35 +115,34 @@ class _MessageType(FastEnum): # that should be used - should contain {pid}. # max_retries (int): The maximum number of retries # -class Job(): - +class Job: def __init__(self, scheduler, action_name, logfile, *, max_retries=0): # # Public members # - self.name = None # The name of the job, set by the job's subclass - self.action_name = action_name # The action name for the Queue - self.child_data = None # Data to be sent to the main process + self.name = None # The name of the job, set by the job's subclass + self.action_name = action_name # The action name for the Queue + self.child_data = None # Data to be sent to the main process # # Private members # - self._scheduler = scheduler # The scheduler - self._queue = None # A message passing queue - self._process = None # The Process object - self._watcher = None # Child process watcher - self._listening = False # Whether the parent is currently listening - self._suspended = False # Whether this job is currently suspended - self._max_retries = max_retries # Maximum number of automatic retries - self._result = None # Return value of child action in the parent - self._tries = 0 # Try count, for retryable jobs - self._terminated = False # Whether this job has been explicitly terminated + self._scheduler = scheduler # The scheduler + self._queue = None # A message passing queue + self._process = None # The Process object + self._watcher = None # Child process watcher + self._listening = False # Whether the parent is currently listening + self._suspended = False # Whether this job is currently suspended + self._max_retries = max_retries # Maximum number of automatic retries + self._result = None # Return value of child action in the parent + self._tries = 0 # Try count, for retryable jobs + self._terminated = False # Whether this job has been explicitly terminated self._logfile = logfile - self._message_element_name = None # The plugin instance element name for messaging - self._message_element_key = None # The element key for messaging - self._element = None # The Element() passed to the Job() constructor, if applicable + self._message_element_name = None # The plugin instance element name for messaging + self._message_element_key = None # The element key for messaging + self._element = None # The Element() passed to the Job() constructor, if applicable # set_name() # @@ -170,23 +169,16 @@ class Job(): self._max_retries, self._tries, self._message_element_name, - self._message_element_key + self._message_element_key, ) if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): - pickled = pickle_child_job( - child_job, - self._scheduler.context.get_projects(), - ) + pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),) self._process = _multiprocessing.AsyncioSafeProcess( - target=do_pickled_child_job, - args=[pickled, self._queue], + target=do_pickled_child_job, args=[pickled, self._queue], ) else: - self._process = _multiprocessing.AsyncioSafeProcess( - target=child_job.child_action, - args=[self._queue], - ) + self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[self._queue],) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main @@ -257,8 +249,7 @@ class Job(): # def kill(self): # Force kill - self.message(MessageType.WARN, - "{} did not terminate gracefully, killing".format(self.action_name)) + self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) utils._kill_process_tree(self._process.pid) # suspend() @@ -267,8 +258,7 @@ class Job(): # def suspend(self): if not self._suspended: - self.message(MessageType.STATUS, - "{} suspending".format(self.action_name)) + self.message(MessageType.STATUS, "{} suspending".format(self.action_name)) try: # Use SIGTSTP so that child processes may handle and propagate @@ -292,8 +282,7 @@ class Job(): def resume(self, silent=False): if self._suspended: if not silent and not self._scheduler.terminated: - self.message(MessageType.STATUS, - "{} resuming".format(self.action_name)) + self.message(MessageType.STATUS, "{} resuming".format(self.action_name)) os.kill(self._process.pid, signal.SIGCONT) self._suspended = False @@ -335,7 +324,7 @@ class Job(): # override 'element_name' and 'element_key' this way. # def message(self, message_type, message, element_name=None, element_key=None, **kwargs): - kwargs['scheduler'] = True + kwargs["scheduler"] = True # If default name & key values not provided, set as given job attributes if element_name is None: element_name = self._message_element_name @@ -373,8 +362,7 @@ class Job(): # lists, dicts, numbers, but not Element instances). # def handle_message(self, message): - raise ImplError("Job '{kind}' does not implement handle_message()" - .format(kind=type(self).__name__)) + raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__)) # parent_complete() # @@ -386,8 +374,7 @@ class Job(): # result (any): The result returned by child_process(). # def parent_complete(self, status, result): - raise ImplError("Job '{kind}' does not implement parent_complete()" - .format(kind=type(self).__name__)) + raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__)) # create_child_job() # @@ -405,8 +392,7 @@ class Job(): # (ChildJob): An instance of a subclass of ChildJob. # def create_child_job(self, *args, **kwargs): - raise ImplError("Job '{kind}' does not implement create_child_job()" - .format(kind=type(self).__name__)) + raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__)) ####################################################### # Local Private Methods # @@ -437,9 +423,11 @@ class Job(): returncode = _ReturnCode(returncode) except ValueError: # An unexpected return code was returned; fail permanently and report - self.message(MessageType.ERROR, - "Internal job process unexpectedly died with exit code {}".format(returncode), - logfile=self._logfile) + self.message( + MessageType.ERROR, + "Internal job process unexpectedly died with exit code {}".format(returncode), + logfile=self._logfile, + ) returncode = _ReturnCode.PERM_FAIL # We don't want to retry if we got OK or a permanent fail. @@ -503,8 +491,7 @@ class Job(): # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope.message['domain'], - envelope.message['reason']) + set_last_task_error(envelope.message["domain"], envelope.message["reason"]) elif envelope.message_type is _MessageType.RESULT: assert self._result is None self._result = envelope.message @@ -514,8 +501,7 @@ class Job(): elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE: self.handle_message(envelope.message) else: - assert False, "Unhandled message type '{}': {}".format( - envelope.message_type, envelope.message) + assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) # _parent_process_queue() # @@ -552,8 +538,7 @@ class Job(): # http://bugs.python.org/issue3831 # if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv) + self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() @@ -589,11 +574,10 @@ class Job(): # message_element_key (tuple): None, or the element display key tuple # to be supplied to the Message() constructor. # -class ChildJob(): - +class ChildJob: def __init__( - self, action_name, messenger, logdir, logfile, max_retries, tries, - message_element_name, message_element_key): + self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key + ): self.action_name = action_name @@ -624,14 +608,15 @@ class ChildJob(): # overriden here. # def message(self, message_type, message, element_name=None, element_key=None, **kwargs): - kwargs['scheduler'] = True + kwargs["scheduler"] = True # If default name & key values not provided, set as given job attributes if element_name is None: element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - self._messenger.message(Message(message_type, message, element_name=element_name, - element_key=element_key, **kwargs)) + self._messenger.message( + Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) + ) # send_message() # @@ -668,8 +653,7 @@ class ChildJob(): # the result of the Job. # def child_process(self): - raise ImplError("ChildJob '{kind}' does not implement child_process()" - .format(kind=type(self).__name__)) + raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__)) # child_process_data() # @@ -723,12 +707,13 @@ class ChildJob(): def resume_time(): nonlocal stopped_time nonlocal starttime - starttime += (datetime.datetime.now() - stopped_time) + starttime += datetime.datetime.now() - stopped_time # Time, log and and run the action function # - with _signals.suspendable(stop_time, resume_time), \ - self._messenger.recorded_messages(self._logfile, self._logdir) as filename: + with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages( + self._logfile, self._logdir + ) as filename: # Graciously handle sigterms. def handle_sigterm(_signum, _sigframe): @@ -743,8 +728,7 @@ class ChildJob(): result = self.child_process() # pylint: disable=assignment-from-no-return except SkipJob as e: elapsed = datetime.datetime.now() - starttime - self.message(MessageType.SKIPPED, str(e), - elapsed=elapsed, logfile=filename) + self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code self._child_shutdown(_ReturnCode.SKIPPED) @@ -753,13 +737,16 @@ class ChildJob(): retry_flag = e.temporary if retry_flag and (self._tries <= self._max_retries): - self.message(MessageType.FAIL, - "Try #{} failed, retrying".format(self._tries), - elapsed=elapsed, logfile=filename) + self.message( + MessageType.FAIL, + "Try #{} failed, retrying".format(self._tries), + elapsed=elapsed, + logfile=filename, + ) else: - self.message(MessageType.FAIL, str(e), - elapsed=elapsed, detail=e.detail, - logfile=filename, sandbox=e.sandbox) + self.message( + MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox + ) self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) @@ -770,7 +757,7 @@ class ChildJob(): # self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) - except Exception: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except # If an unhandled (not normalized to BstError) occurs, that's a bug, # send the traceback and formatted exception back to the frontend @@ -779,9 +766,7 @@ class ChildJob(): elapsed = datetime.datetime.now() - starttime detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) - self.message(MessageType.BUG, self.action_name, - elapsed=elapsed, detail=detail, - logfile=filename) + self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail self._child_shutdown(_ReturnCode.PERM_FAIL) @@ -791,8 +776,7 @@ class ChildJob(): self._child_send_result(result) elapsed = datetime.datetime.now() - starttime - self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, - logfile=filename) + self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process @@ -831,10 +815,7 @@ class ChildJob(): domain = e.domain reason = e.reason - self._send_message(_MessageType.ERROR, { - 'domain': domain, - 'reason': reason - }) + self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason}) # _child_send_result() # diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py index b0465ec9e..1d47f67db 100644 --- a/src/buildstream/_scheduler/jobs/jobpickler.py +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -37,9 +37,7 @@ _NAME_TO_PROTO_CLASS = { "digest": DigestProto, } -_PROTO_CLASS_TO_NAME = { - cls: name for name, cls in _NAME_TO_PROTO_CLASS.items() -} +_PROTO_CLASS_TO_NAME = {cls: name for name, cls in _NAME_TO_PROTO_CLASS.items()} # pickle_child_job() @@ -57,10 +55,7 @@ def pickle_child_job(child_job, projects): # necessary for the job, this includes e.g. the global state of the node # module. node_module_state = node._get_state_for_pickling() - return _pickle_child_job_data( - (child_job, node_module_state), - projects, - ) + return _pickle_child_job_data((child_job, node_module_state), projects,) # do_pickled_child_job() @@ -146,10 +141,7 @@ def _pickle_child_job_data(child_job_data, projects): ] plugin_class_to_factory = { - cls: factory - for factory in factory_list - if factory is not None - for cls, _ in factory.all_loaded_plugins() + cls: factory for factory in factory_list if factory is not None for cls, _ in factory.all_loaded_plugins() } pickled_data = io.BytesIO() |