summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/jobs
diff options
context:
space:
mode:
authorChandan Singh <csingh43@bloomberg.net>2019-11-11 17:07:09 +0000
committerChandan Singh <chandan@chandansingh.net>2019-11-14 21:21:06 +0000
commit122177153b14664a0e4fed85aa4f22b87cfabf56 (patch)
tree032c2e46825af91f6fe27f22b5b567eea2b7935d /src/buildstream/_scheduler/jobs
parenta3ee349558f36a220f79665873b36c1b0f990c8e (diff)
downloadbuildstream-122177153b14664a0e4fed85aa4f22b87cfabf56.tar.gz
Reformat code using Black
As discussed over the mailing list, reformat code using Black. This is a one-off change to reformat all our codebase. Moving forward, we shouldn't expect such blanket reformats. Rather, we expect each change to already comply with the Black formatting style.
Diffstat (limited to 'src/buildstream/_scheduler/jobs')
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py12
-rw-r--r--src/buildstream/_scheduler/jobs/job.py145
-rw-r--r--src/buildstream/_scheduler/jobs/jobpickler.py14
3 files changed, 71 insertions, 100 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 246eb75c6..6e035be9c 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -69,9 +69,9 @@ class ElementJob(Job):
super().__init__(*args, **kwargs)
self.set_name(element._get_full_name())
self.queue = queue
- self._element = element # Set the Element pertaining to the job
- self._action_cb = action_cb # The action callable function
- self._complete_cb = complete_cb # The complete callable function
+ self._element = element # Set the Element pertaining to the job
+ self._action_cb = action_cb # The action callable function
+ self._complete_cb = complete_cb # The complete callable function
# Set the plugin element name & key for logging purposes
self.set_message_element_name(self.name)
@@ -97,9 +97,7 @@ class ChildElementJob(ChildJob):
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = self._element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self.message(MessageType.LOG,
- "Build environment for element {}".format(self._element.name),
- detail=env_dump)
+ self.message(MessageType.LOG, "Build environment for element {}".format(self._element.name), detail=env_dump)
# Run the action
return self._action_cb(self._element)
@@ -109,6 +107,6 @@ class ChildElementJob(ChildJob):
workspace = self._element._get_workspace()
if workspace is not None:
- data['workspace'] = workspace.to_dict()
+ data["workspace"] = workspace.to_dict()
return data
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 3363d7b60..e7866bcd4 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -66,7 +66,7 @@ class JobStatus(FastEnum):
# Used to distinguish between status messages and return values
-class _Envelope():
+class _Envelope:
def __init__(self, message_type, message):
self.message_type = message_type
self.message = message
@@ -115,35 +115,34 @@ class _MessageType(FastEnum):
# that should be used - should contain {pid}.
# max_retries (int): The maximum number of retries
#
-class Job():
-
+class Job:
def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
#
- self.name = None # The name of the job, set by the job's subclass
- self.action_name = action_name # The action name for the Queue
- self.child_data = None # Data to be sent to the main process
+ self.name = None # The name of the job, set by the job's subclass
+ self.action_name = action_name # The action name for the Queue
+ self.child_data = None # Data to be sent to the main process
#
# Private members
#
- self._scheduler = scheduler # The scheduler
- self._queue = None # A message passing queue
- self._process = None # The Process object
- self._watcher = None # Child process watcher
- self._listening = False # Whether the parent is currently listening
- self._suspended = False # Whether this job is currently suspended
- self._max_retries = max_retries # Maximum number of automatic retries
- self._result = None # Return value of child action in the parent
- self._tries = 0 # Try count, for retryable jobs
- self._terminated = False # Whether this job has been explicitly terminated
+ self._scheduler = scheduler # The scheduler
+ self._queue = None # A message passing queue
+ self._process = None # The Process object
+ self._watcher = None # Child process watcher
+ self._listening = False # Whether the parent is currently listening
+ self._suspended = False # Whether this job is currently suspended
+ self._max_retries = max_retries # Maximum number of automatic retries
+ self._result = None # Return value of child action in the parent
+ self._tries = 0 # Try count, for retryable jobs
+ self._terminated = False # Whether this job has been explicitly terminated
self._logfile = logfile
- self._message_element_name = None # The plugin instance element name for messaging
- self._message_element_key = None # The element key for messaging
- self._element = None # The Element() passed to the Job() constructor, if applicable
+ self._message_element_name = None # The plugin instance element name for messaging
+ self._message_element_key = None # The element key for messaging
+ self._element = None # The Element() passed to the Job() constructor, if applicable
# set_name()
#
@@ -170,23 +169,16 @@ class Job():
self._max_retries,
self._tries,
self._message_element_name,
- self._message_element_key
+ self._message_element_key,
)
if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
- pickled = pickle_child_job(
- child_job,
- self._scheduler.context.get_projects(),
- )
+ pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),)
self._process = _multiprocessing.AsyncioSafeProcess(
- target=do_pickled_child_job,
- args=[pickled, self._queue],
+ target=do_pickled_child_job, args=[pickled, self._queue],
)
else:
- self._process = _multiprocessing.AsyncioSafeProcess(
- target=child_job.child_action,
- args=[self._queue],
- )
+ self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[self._queue],)
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -257,8 +249,7 @@ class Job():
#
def kill(self):
# Force kill
- self.message(MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name))
+ self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
utils._kill_process_tree(self._process.pid)
# suspend()
@@ -267,8 +258,7 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self.message(MessageType.STATUS,
- "{} suspending".format(self.action_name))
+ self.message(MessageType.STATUS, "{} suspending".format(self.action_name))
try:
# Use SIGTSTP so that child processes may handle and propagate
@@ -292,8 +282,7 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS,
- "{} resuming".format(self.action_name))
+ self.message(MessageType.STATUS, "{} resuming".format(self.action_name))
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
@@ -335,7 +324,7 @@ class Job():
# override 'element_name' and 'element_key' this way.
#
def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
- kwargs['scheduler'] = True
+ kwargs["scheduler"] = True
# If default name & key values not provided, set as given job attributes
if element_name is None:
element_name = self._message_element_name
@@ -373,8 +362,7 @@ class Job():
# lists, dicts, numbers, but not Element instances).
#
def handle_message(self, message):
- raise ImplError("Job '{kind}' does not implement handle_message()"
- .format(kind=type(self).__name__))
+ raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__))
# parent_complete()
#
@@ -386,8 +374,7 @@ class Job():
# result (any): The result returned by child_process().
#
def parent_complete(self, status, result):
- raise ImplError("Job '{kind}' does not implement parent_complete()"
- .format(kind=type(self).__name__))
+ raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
# create_child_job()
#
@@ -405,8 +392,7 @@ class Job():
# (ChildJob): An instance of a subclass of ChildJob.
#
def create_child_job(self, *args, **kwargs):
- raise ImplError("Job '{kind}' does not implement create_child_job()"
- .format(kind=type(self).__name__))
+ raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
@@ -437,9 +423,11 @@ class Job():
returncode = _ReturnCode(returncode)
except ValueError:
# An unexpected return code was returned; fail permanently and report
- self.message(MessageType.ERROR,
- "Internal job process unexpectedly died with exit code {}".format(returncode),
- logfile=self._logfile)
+ self.message(
+ MessageType.ERROR,
+ "Internal job process unexpectedly died with exit code {}".format(returncode),
+ logfile=self._logfile,
+ )
returncode = _ReturnCode.PERM_FAIL
# We don't want to retry if we got OK or a permanent fail.
@@ -503,8 +491,7 @@ class Job():
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope.message['domain'],
- envelope.message['reason'])
+ set_last_task_error(envelope.message["domain"], envelope.message["reason"])
elif envelope.message_type is _MessageType.RESULT:
assert self._result is None
self._result = envelope.message
@@ -514,8 +501,7 @@ class Job():
elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE:
self.handle_message(envelope.message)
else:
- assert False, "Unhandled message type '{}': {}".format(
- envelope.message_type, envelope.message)
+ assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
@@ -552,8 +538,7 @@ class Job():
# http://bugs.python.org/issue3831
#
if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv)
+ self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv)
self._listening = True
# _parent_stop_listening()
@@ -589,11 +574,10 @@ class Job():
# message_element_key (tuple): None, or the element display key tuple
# to be supplied to the Message() constructor.
#
-class ChildJob():
-
+class ChildJob:
def __init__(
- self, action_name, messenger, logdir, logfile, max_retries, tries,
- message_element_name, message_element_key):
+ self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key
+ ):
self.action_name = action_name
@@ -624,14 +608,15 @@ class ChildJob():
# overriden here.
#
def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
- kwargs['scheduler'] = True
+ kwargs["scheduler"] = True
# If default name & key values not provided, set as given job attributes
if element_name is None:
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- self._messenger.message(Message(message_type, message, element_name=element_name,
- element_key=element_key, **kwargs))
+ self._messenger.message(
+ Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
+ )
# send_message()
#
@@ -668,8 +653,7 @@ class ChildJob():
# the result of the Job.
#
def child_process(self):
- raise ImplError("ChildJob '{kind}' does not implement child_process()"
- .format(kind=type(self).__name__))
+ raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))
# child_process_data()
#
@@ -723,12 +707,13 @@ class ChildJob():
def resume_time():
nonlocal stopped_time
nonlocal starttime
- starttime += (datetime.datetime.now() - stopped_time)
+ starttime += datetime.datetime.now() - stopped_time
# Time, log and and run the action function
#
- with _signals.suspendable(stop_time, resume_time), \
- self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
+ with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+ self._logfile, self._logdir
+ ) as filename:
# Graciously handle sigterms.
def handle_sigterm(_signum, _sigframe):
@@ -743,8 +728,7 @@ class ChildJob():
result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - starttime
- self.message(MessageType.SKIPPED, str(e),
- elapsed=elapsed, logfile=filename)
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
self._child_shutdown(_ReturnCode.SKIPPED)
@@ -753,13 +737,16 @@ class ChildJob():
retry_flag = e.temporary
if retry_flag and (self._tries <= self._max_retries):
- self.message(MessageType.FAIL,
- "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed, logfile=filename)
+ self.message(
+ MessageType.FAIL,
+ "Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed,
+ logfile=filename,
+ )
else:
- self.message(MessageType.FAIL, str(e),
- elapsed=elapsed, detail=e.detail,
- logfile=filename, sandbox=e.sandbox)
+ self.message(
+ MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox
+ )
self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
@@ -770,7 +757,7 @@ class ChildJob():
#
self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
- except Exception: # pylint: disable=broad-except
+ except Exception: # pylint: disable=broad-except
# If an unhandled (not normalized to BstError) occurs, that's a bug,
# send the traceback and formatted exception back to the frontend
@@ -779,9 +766,7 @@ class ChildJob():
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self.message(MessageType.BUG, self.action_name,
- elapsed=elapsed, detail=detail,
- logfile=filename)
+ self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
# Unhandled exceptions should permenantly fail
self._child_shutdown(_ReturnCode.PERM_FAIL)
@@ -791,8 +776,7 @@ class ChildJob():
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
- logfile=filename)
+ self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
@@ -831,10 +815,7 @@ class ChildJob():
domain = e.domain
reason = e.reason
- self._send_message(_MessageType.ERROR, {
- 'domain': domain,
- 'reason': reason
- })
+ self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason})
# _child_send_result()
#
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
index b0465ec9e..1d47f67db 100644
--- a/src/buildstream/_scheduler/jobs/jobpickler.py
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -37,9 +37,7 @@ _NAME_TO_PROTO_CLASS = {
"digest": DigestProto,
}
-_PROTO_CLASS_TO_NAME = {
- cls: name for name, cls in _NAME_TO_PROTO_CLASS.items()
-}
+_PROTO_CLASS_TO_NAME = {cls: name for name, cls in _NAME_TO_PROTO_CLASS.items()}
# pickle_child_job()
@@ -57,10 +55,7 @@ def pickle_child_job(child_job, projects):
# necessary for the job, this includes e.g. the global state of the node
# module.
node_module_state = node._get_state_for_pickling()
- return _pickle_child_job_data(
- (child_job, node_module_state),
- projects,
- )
+ return _pickle_child_job_data((child_job, node_module_state), projects,)
# do_pickled_child_job()
@@ -146,10 +141,7 @@ def _pickle_child_job_data(child_job_data, projects):
]
plugin_class_to_factory = {
- cls: factory
- for factory in factory_list
- if factory is not None
- for cls, _ in factory.all_loaded_plugins()
+ cls: factory for factory in factory_list if factory is not None for cls, _ in factory.all_loaded_plugins()
}
pickled_data = io.BytesIO()