summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
authorChandan Singh <csingh43@bloomberg.net>2019-11-05 13:40:03 +0000
committerChandan Singh <csingh43@bloomberg.net>2019-11-05 13:40:03 +0000
commitab707e87f53249d7f2aac17683254b54196f90ce (patch)
treed1d2898c6561a8ca362419dce92a6f808d45b4e6 /src/buildstream/_scheduler
parente06c2295b063245dbdb2397e5bd8c4d0a79ba10d (diff)
downloadbuildstream-ab707e87f53249d7f2aac17683254b54196f90ce.tar.gz
Use 119 line lengthchandan/black
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py12
-rw-r--r--src/buildstream/_scheduler/jobs/job.py136
-rw-r--r--src/buildstream/_scheduler/jobs/jobpickler.py9
-rw-r--r--src/buildstream/_scheduler/queues/queue.py37
-rw-r--r--src/buildstream/_scheduler/resources.py5
-rw-r--r--src/buildstream/_scheduler/scheduler.py34
6 files changed, 51 insertions, 182 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 2a9f935b5..f3136104f 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -81,9 +81,7 @@ class ElementJob(Job):
self._complete_cb(self, self._element, status, self._result)
def create_child_job(self, *args, **kwargs):
- return ChildElementJob(
- *args, element=self._element, action_cb=self._action_cb, **kwargs
- )
+ return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs)
class ChildElementJob(ChildJob):
@@ -98,13 +96,9 @@ class ChildElementJob(ChildJob):
#
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = self._element.get_environment()
- env_dump = yaml.round_trip_dump(
- elt_env, default_flow_style=False, allow_unicode=True
- )
+ env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
self.message(
- MessageType.LOG,
- "Build environment for element {}".format(self._element.name),
- detail=env_dump,
+ MessageType.LOG, "Build environment for element {}".format(self._element.name), detail=env_dump,
)
# Run the action
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 3a5694a71..8baf8fe1b 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -146,13 +146,9 @@ class Job:
self._terminated = False # Whether this job has been explicitly terminated
self._logfile = logfile
- self._message_element_name = (
- None # The plugin instance element name for messaging
- )
+ self._message_element_name = None # The plugin instance element name for messaging
self._message_element_key = None # The element key for messaging
- self._element = (
- None # The Element() passed to the Job() constructor, if applicable
- )
+ self._element = None # The Element() passed to the Job() constructor, if applicable
# set_name()
#
@@ -182,15 +178,9 @@ class Job:
self._message_element_key,
)
- if (
- self._scheduler.context.platform.does_multiprocessing_start_require_pickling()
- ):
- pickled = pickle_child_job(
- child_job, self._scheduler.context.get_projects(),
- )
- self._process = Process(
- target=do_pickled_child_job, args=[pickled, self._queue],
- )
+ if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
+ pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),)
+ self._process = Process(target=do_pickled_child_job, args=[pickled, self._queue],)
else:
self._process = Process(target=child_job.child_action, args=[self._queue],)
@@ -198,9 +188,7 @@ class Job:
# the child process does not inherit the parent's state, but the main
# process will be notified of any signal after we launch the child.
#
- with _signals.blocked(
- [signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False
- ):
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
self._process.start()
# Wait for the child task to complete.
@@ -282,8 +270,7 @@ class Job:
def kill(self):
# Force kill
self.message(
- MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name),
+ MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name),
)
utils._kill_process_tree(self._process.pid)
@@ -358,22 +345,14 @@ class Job:
# kwargs: Remaining Message() constructor arguments, note that you can
# override 'element_name' and 'element_key' this way.
#
- def message(
- self, message_type, message, element_name=None, element_key=None, **kwargs
- ):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs["scheduler"] = True
# If default name & key values not provided, set as given job attributes
if element_name is None:
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- message = Message(
- message_type,
- message,
- element_name=element_name,
- element_key=element_key,
- **kwargs
- )
+ message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
self._scheduler.notify_messenger(message)
# get_element()
@@ -405,11 +384,7 @@ class Job:
# lists, dicts, numbers, but not Element instances).
#
def handle_message(self, message):
- raise ImplError(
- "Job '{kind}' does not implement handle_message()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__))
# parent_complete()
#
@@ -421,11 +396,7 @@ class Job:
# result (any): The result returned by child_process().
#
def parent_complete(self, status, result):
- raise ImplError(
- "Job '{kind}' does not implement parent_complete()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
# create_child_job()
#
@@ -443,11 +414,7 @@ class Job:
# (ChildJob): An instance of a subclass of ChildJob.
#
def create_child_job(self, *args, **kwargs):
- raise ImplError(
- "Job '{kind}' does not implement create_child_job()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
@@ -480,9 +447,7 @@ class Job:
# An unexpected return code was returned; fail permanently and report
self.message(
MessageType.ERROR,
- "Internal job process unexpectedly died with exit code {}".format(
- returncode
- ),
+ "Internal job process unexpectedly died with exit code {}".format(returncode),
logfile=self._logfile,
)
returncode = _ReturnCode.PERM_FAIL
@@ -490,11 +455,7 @@ class Job:
# We don't want to retry if we got OK or a permanent fail.
retry_flag = returncode == _ReturnCode.FAIL
- if (
- retry_flag
- and (self._tries <= self._max_retries)
- and not self._scheduler.terminated
- ):
+ if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.start()
return
@@ -548,9 +509,7 @@ class Job:
elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE:
self.handle_message(envelope.message)
else:
- assert False, "Unhandled message type '{}': {}".format(
- envelope.message_type, envelope.message
- )
+ assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
@@ -587,9 +546,7 @@ class Job:
# http://bugs.python.org/issue3831
#
if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv
- )
+ self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv)
self._listening = True
# _parent_stop_listening()
@@ -627,15 +584,7 @@ class Job:
#
class ChildJob:
def __init__(
- self,
- action_name,
- messenger,
- logdir,
- logfile,
- max_retries,
- tries,
- message_element_name,
- message_element_key,
+ self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key,
):
self.action_name = action_name
@@ -666,9 +615,7 @@ class ChildJob:
# for front end display if not already set or explicitly
# overriden here.
#
- def message(
- self, message_type, message, element_name=None, element_key=None, **kwargs
- ):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs["scheduler"] = True
# If default name & key values not provided, set as given job attributes
if element_name is None:
@@ -676,13 +623,7 @@ class ChildJob:
if element_key is None:
element_key = self._message_element_key
self._messenger.message(
- Message(
- message_type,
- message,
- element_name=element_name,
- element_key=element_key,
- **kwargs
- )
+ Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
)
# send_message()
@@ -720,11 +661,7 @@ class ChildJob:
# the result of the Job.
#
def child_process(self):
- raise ImplError(
- "ChildJob '{kind}' does not implement child_process()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))
# child_process_data()
#
@@ -782,22 +719,18 @@ class ChildJob:
# Time, log and and run the action function
#
- with _signals.suspendable(
- stop_time, resume_time
- ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
+ with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+ self._logfile, self._logdir
+ ) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
try:
# Try the task action
- result = (
- self.child_process()
- ) # pylint: disable=assignment-from-no-return
+ result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - starttime
- self.message(
- MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename
- )
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
self._child_shutdown(_ReturnCode.SKIPPED)
@@ -829,9 +762,7 @@ class ChildJob:
# Set return code based on whether or not the error was temporary.
#
- self._child_shutdown(
- _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
- )
+ self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
except Exception: # pylint: disable=broad-except
@@ -840,16 +771,10 @@ class ChildJob:
# and print it to the log file.
#
elapsed = datetime.datetime.now() - starttime
- detail = "An unhandled exception occured:\n\n{}".format(
- traceback.format_exc()
- )
+ detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
self.message(
- MessageType.BUG,
- self.action_name,
- elapsed=elapsed,
- detail=detail,
- logfile=filename,
+ MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename,
)
# Unhandled exceptions should permenantly fail
self._child_shutdown(_ReturnCode.PERM_FAIL)
@@ -861,10 +786,7 @@ class ChildJob:
elapsed = datetime.datetime.now() - starttime
self.message(
- MessageType.SUCCESS,
- self.action_name,
- elapsed=elapsed,
- logfile=filename,
+ MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename,
)
# Shutdown needs to stay outside of the above context manager,
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
index 0b482d080..1d47f67db 100644
--- a/src/buildstream/_scheduler/jobs/jobpickler.py
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -23,9 +23,7 @@ import io
import pickle
from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto
-from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
- Digest as DigestProto,
-)
+from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest as DigestProto
# BuildStream toplevel imports
from ..._loader import Loader
@@ -143,10 +141,7 @@ def _pickle_child_job_data(child_job_data, projects):
]
plugin_class_to_factory = {
- cls: factory
- for factory in factory_list
- if factory is not None
- for cls, _ in factory.all_loaded_plugins()
+ cls: factory for factory in factory_list if factory is not None for cls, _ in factory.all_loaded_plugins()
}
pickled_data = io.BytesIO()
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 62ebcc003..d812a48d6 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -76,23 +76,16 @@ class Queue:
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
- self._required_element_check = (
- False # Whether we should check that elements are required before enqueuing
- )
+ self._required_element_check = False # Whether we should check that elements are required before enqueuing
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
- if (
- ResourceType.UPLOAD in self.resources
- or ResourceType.DOWNLOAD in self.resources
- ):
+ if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
self._max_retries = scheduler.context.sched_network_retries
- self._task_group = self._scheduler._state.add_task_group(
- self.action_name, self.complete_name
- )
+ self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
# destroy()
#
@@ -169,11 +162,7 @@ class Queue:
# element (Element): The element waiting to be pushed into the queue
#
def register_pending_element(self, element):
- raise ImplError(
- "Queue type: {} does not implement register_pending_element()".format(
- self.action_name
- )
- )
+ raise ImplError("Queue type: {} does not implement register_pending_element()".format(self.action_name))
#####################################################
# Scheduler / Pipeline facing APIs #
@@ -293,10 +282,7 @@ class Queue:
workspaces.save_config()
except BstError as e:
self._message(
- element,
- MessageType.ERROR,
- "Error saving workspaces",
- detail=str(e),
+ element, MessageType.ERROR, "Error saving workspaces", detail=str(e),
)
except Exception: # pylint: disable=broad-except
self._message(
@@ -334,9 +320,7 @@ class Queue:
# Report error and mark as failed
#
- self._message(
- element, MessageType.ERROR, "Post processing error", detail=str(e)
- )
+ self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
self._task_group.add_failed_task(element._get_full_name())
# Treat this as a task error as it's related to a task
@@ -351,10 +335,7 @@ class Queue:
# Report unhandled exceptions and mark as failed
#
self._message(
- element,
- MessageType.BUG,
- "Unhandled exception in post processing",
- detail=traceback.format_exc(),
+ element, MessageType.BUG, "Unhandled exception in post processing", detail=traceback.format_exc(),
)
self._task_group.add_failed_task(element._get_full_name())
else:
@@ -372,9 +353,7 @@ class Queue:
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
- message = Message(
- message_type, brief, element_name=element._get_full_name(), **kwargs
- )
+ message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
self._scheduler.notify_messenger(message)
def _element_log_path(self, element):
diff --git a/src/buildstream/_scheduler/resources.py b/src/buildstream/_scheduler/resources.py
index 946a7f0b1..e76158779 100644
--- a/src/buildstream/_scheduler/resources.py
+++ b/src/buildstream/_scheduler/resources.py
@@ -90,10 +90,7 @@ class Resources:
# available. If we don't have enough, the job cannot be
# scheduled.
for resource in resources:
- if (
- self._max_resources[resource] > 0
- and self._used_resources[resource] >= self._max_resources[resource]
- ):
+ if self._max_resources[resource] > 0 and self._used_resources[resource] >= self._max_resources[resource]:
return False
# Now we register the fact that our job is using the resources
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0555b1103..6268ec169 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -122,16 +122,12 @@ class Scheduler:
#
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
- self.terminated = (
- False # Whether the scheduler was asked to terminate or has terminated
- )
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
self.suspended = False # Whether the scheduler is currently suspended
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = (
- 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
- )
+ self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
@@ -147,9 +143,7 @@ class Scheduler:
self._notification_queue = notification_queue
self._notifier = notifier
- self.resources = Resources(
- context.sched_builders, context.sched_fetchers, context.sched_pushers
- )
+ self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
# run()
#
@@ -191,9 +185,7 @@ class Scheduler:
_watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
# Start the profiler
- with PROFILER.profile(
- Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)
- ):
+ with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
self._sched()
self.loop.run_forever()
@@ -349,9 +341,7 @@ class Scheduler:
# returncode (int): the return code with which buildbox-casd exited
#
def _abort_on_casd_failure(self, pid, returncode):
- message = Message(
- MessageType.BUG, "buildbox-casd died while the pipeline was active."
- )
+ message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
self._notify(Notification(NotificationType.MESSAGE, message=message))
self._casd_process.returncode = returncode
@@ -407,9 +397,7 @@ class Scheduler:
# to fetch tasks for elements which failed to pull, and
# thus need all the pulls to complete before ever starting
# a build
- ready.extend(
- chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues))
- )
+ ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues)))
# harvest_jobs() may have decided to skip some jobs, making
# them eligible for promotion to the next queue as a side effect.
@@ -419,11 +407,7 @@ class Scheduler:
# Make sure fork is allowed before starting jobs
if not self.context.prepare_fork():
- message = Message(
- MessageType.BUG,
- "Fork is not allowed",
- detail="Background threads are active",
- )
+ message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active",)
self._notify(Notification(NotificationType.MESSAGE, message=message))
self.terminate_jobs()
return
@@ -484,9 +468,7 @@ class Scheduler:
# Notify that we're unsuspended
self._notify(Notification(NotificationType.SUSPENDED))
self._starttime += datetime.datetime.now() - self._suspendtime
- self._notify(
- Notification(NotificationType.SCHED_START_TIME, time=self._starttime)
- )
+ self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
self._suspendtime = None
# _interrupt_event():