diff options
author | Chandan Singh <csingh43@bloomberg.net> | 2019-11-05 13:40:03 +0000 |
---|---|---|
committer | Chandan Singh <csingh43@bloomberg.net> | 2019-11-05 13:40:03 +0000 |
commit | ab707e87f53249d7f2aac17683254b54196f90ce (patch) | |
tree | d1d2898c6561a8ca362419dce92a6f808d45b4e6 /src/buildstream/_scheduler | |
parent | e06c2295b063245dbdb2397e5bd8c4d0a79ba10d (diff) | |
download | buildstream-ab707e87f53249d7f2aac17683254b54196f90ce.tar.gz |
Use 119 line lengthchandan/black
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 136 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/jobpickler.py | 9 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 37 | ||||
-rw-r--r-- | src/buildstream/_scheduler/resources.py | 5 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 34 |
6 files changed, 51 insertions, 182 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index 2a9f935b5..f3136104f 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -81,9 +81,7 @@ class ElementJob(Job): self._complete_cb(self, self._element, status, self._result) def create_child_job(self, *args, **kwargs): - return ChildElementJob( - *args, element=self._element, action_cb=self._action_cb, **kwargs - ) + return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs) class ChildElementJob(ChildJob): @@ -98,13 +96,9 @@ class ChildElementJob(ChildJob): # # This should probably be omitted for non-build tasks but it's harmless here elt_env = self._element.get_environment() - env_dump = yaml.round_trip_dump( - elt_env, default_flow_style=False, allow_unicode=True - ) + env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) self.message( - MessageType.LOG, - "Build environment for element {}".format(self._element.name), - detail=env_dump, + MessageType.LOG, "Build environment for element {}".format(self._element.name), detail=env_dump, ) # Run the action diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 3a5694a71..8baf8fe1b 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -146,13 +146,9 @@ class Job: self._terminated = False # Whether this job has been explicitly terminated self._logfile = logfile - self._message_element_name = ( - None # The plugin instance element name for messaging - ) + self._message_element_name = None # The plugin instance element name for messaging self._message_element_key = None # The element key for messaging - self._element = ( - None # The Element() passed to the Job() constructor, if applicable - ) + self._element = None # The Element() passed to the Job() constructor, if applicable # set_name() # @@ -182,15 +178,9 @@ class Job: self._message_element_key, ) - if ( - self._scheduler.context.platform.does_multiprocessing_start_require_pickling() - ): - pickled = pickle_child_job( - child_job, self._scheduler.context.get_projects(), - ) - self._process = Process( - target=do_pickled_child_job, args=[pickled, self._queue], - ) + if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): + pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),) + self._process = Process(target=do_pickled_child_job, args=[pickled, self._queue],) else: self._process = Process(target=child_job.child_action, args=[self._queue],) @@ -198,9 +188,7 @@ class Job: # the child process does not inherit the parent's state, but the main # process will be notified of any signal after we launch the child. # - with _signals.blocked( - [signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False - ): + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): self._process.start() # Wait for the child task to complete. @@ -282,8 +270,7 @@ class Job: def kill(self): # Force kill self.message( - MessageType.WARN, - "{} did not terminate gracefully, killing".format(self.action_name), + MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name), ) utils._kill_process_tree(self._process.pid) @@ -358,22 +345,14 @@ class Job: # kwargs: Remaining Message() constructor arguments, note that you can # override 'element_name' and 'element_key' this way. # - def message( - self, message_type, message, element_name=None, element_key=None, **kwargs - ): + def message(self, message_type, message, element_name=None, element_key=None, **kwargs): kwargs["scheduler"] = True # If default name & key values not provided, set as given job attributes if element_name is None: element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - message = Message( - message_type, - message, - element_name=element_name, - element_key=element_key, - **kwargs - ) + message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) self._scheduler.notify_messenger(message) # get_element() @@ -405,11 +384,7 @@ class Job: # lists, dicts, numbers, but not Element instances). # def handle_message(self, message): - raise ImplError( - "Job '{kind}' does not implement handle_message()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__)) # parent_complete() # @@ -421,11 +396,7 @@ class Job: # result (any): The result returned by child_process(). # def parent_complete(self, status, result): - raise ImplError( - "Job '{kind}' does not implement parent_complete()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__)) # create_child_job() # @@ -443,11 +414,7 @@ class Job: # (ChildJob): An instance of a subclass of ChildJob. # def create_child_job(self, *args, **kwargs): - raise ImplError( - "Job '{kind}' does not implement create_child_job()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__)) ####################################################### # Local Private Methods # @@ -480,9 +447,7 @@ class Job: # An unexpected return code was returned; fail permanently and report self.message( MessageType.ERROR, - "Internal job process unexpectedly died with exit code {}".format( - returncode - ), + "Internal job process unexpectedly died with exit code {}".format(returncode), logfile=self._logfile, ) returncode = _ReturnCode.PERM_FAIL @@ -490,11 +455,7 @@ class Job: # We don't want to retry if we got OK or a permanent fail. retry_flag = returncode == _ReturnCode.FAIL - if ( - retry_flag - and (self._tries <= self._max_retries) - and not self._scheduler.terminated - ): + if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.start() return @@ -548,9 +509,7 @@ class Job: elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE: self.handle_message(envelope.message) else: - assert False, "Unhandled message type '{}': {}".format( - envelope.message_type, envelope.message - ) + assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) # _parent_process_queue() # @@ -587,9 +546,7 @@ class Job: # http://bugs.python.org/issue3831 # if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv - ) + self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() @@ -627,15 +584,7 @@ class Job: # class ChildJob: def __init__( - self, - action_name, - messenger, - logdir, - logfile, - max_retries, - tries, - message_element_name, - message_element_key, + self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key, ): self.action_name = action_name @@ -666,9 +615,7 @@ class ChildJob: # for front end display if not already set or explicitly # overriden here. # - def message( - self, message_type, message, element_name=None, element_key=None, **kwargs - ): + def message(self, message_type, message, element_name=None, element_key=None, **kwargs): kwargs["scheduler"] = True # If default name & key values not provided, set as given job attributes if element_name is None: @@ -676,13 +623,7 @@ class ChildJob: if element_key is None: element_key = self._message_element_key self._messenger.message( - Message( - message_type, - message, - element_name=element_name, - element_key=element_key, - **kwargs - ) + Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) ) # send_message() @@ -720,11 +661,7 @@ class ChildJob: # the result of the Job. # def child_process(self): - raise ImplError( - "ChildJob '{kind}' does not implement child_process()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__)) # child_process_data() # @@ -782,22 +719,18 @@ class ChildJob: # Time, log and and run the action function # - with _signals.suspendable( - stop_time, resume_time - ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename: + with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages( + self._logfile, self._logdir + ) as filename: self.message(MessageType.START, self.action_name, logfile=filename) try: # Try the task action - result = ( - self.child_process() - ) # pylint: disable=assignment-from-no-return + result = self.child_process() # pylint: disable=assignment-from-no-return except SkipJob as e: elapsed = datetime.datetime.now() - starttime - self.message( - MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename - ) + self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code self._child_shutdown(_ReturnCode.SKIPPED) @@ -829,9 +762,7 @@ class ChildJob: # Set return code based on whether or not the error was temporary. # - self._child_shutdown( - _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL - ) + self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) except Exception: # pylint: disable=broad-except @@ -840,16 +771,10 @@ class ChildJob: # and print it to the log file. # elapsed = datetime.datetime.now() - starttime - detail = "An unhandled exception occured:\n\n{}".format( - traceback.format_exc() - ) + detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) self.message( - MessageType.BUG, - self.action_name, - elapsed=elapsed, - detail=detail, - logfile=filename, + MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename, ) # Unhandled exceptions should permenantly fail self._child_shutdown(_ReturnCode.PERM_FAIL) @@ -861,10 +786,7 @@ class ChildJob: elapsed = datetime.datetime.now() - starttime self.message( - MessageType.SUCCESS, - self.action_name, - elapsed=elapsed, - logfile=filename, + MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename, ) # Shutdown needs to stay outside of the above context manager, diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py index 0b482d080..1d47f67db 100644 --- a/src/buildstream/_scheduler/jobs/jobpickler.py +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -23,9 +23,7 @@ import io import pickle from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto -from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( - Digest as DigestProto, -) +from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest as DigestProto # BuildStream toplevel imports from ..._loader import Loader @@ -143,10 +141,7 @@ def _pickle_child_job_data(child_job_data, projects): ] plugin_class_to_factory = { - cls: factory - for factory in factory_list - if factory is not None - for cls, _ in factory.all_loaded_plugins() + cls: factory for factory in factory_list if factory is not None for cls, _ in factory.all_loaded_plugins() } pickled_data = io.BytesIO() diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 62ebcc003..d812a48d6 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -76,23 +76,16 @@ class Queue: self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 - self._required_element_check = ( - False # Whether we should check that elements are required before enqueuing - ) + self._required_element_check = False # Whether we should check that elements are required before enqueuing # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None - if ( - ResourceType.UPLOAD in self.resources - or ResourceType.DOWNLOAD in self.resources - ): + if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: self._max_retries = scheduler.context.sched_network_retries - self._task_group = self._scheduler._state.add_task_group( - self.action_name, self.complete_name - ) + self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name) # destroy() # @@ -169,11 +162,7 @@ class Queue: # element (Element): The element waiting to be pushed into the queue # def register_pending_element(self, element): - raise ImplError( - "Queue type: {} does not implement register_pending_element()".format( - self.action_name - ) - ) + raise ImplError("Queue type: {} does not implement register_pending_element()".format(self.action_name)) ##################################################### # Scheduler / Pipeline facing APIs # @@ -293,10 +282,7 @@ class Queue: workspaces.save_config() except BstError as e: self._message( - element, - MessageType.ERROR, - "Error saving workspaces", - detail=str(e), + element, MessageType.ERROR, "Error saving workspaces", detail=str(e), ) except Exception: # pylint: disable=broad-except self._message( @@ -334,9 +320,7 @@ class Queue: # Report error and mark as failed # - self._message( - element, MessageType.ERROR, "Post processing error", detail=str(e) - ) + self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) self._task_group.add_failed_task(element._get_full_name()) # Treat this as a task error as it's related to a task @@ -351,10 +335,7 @@ class Queue: # Report unhandled exceptions and mark as failed # self._message( - element, - MessageType.BUG, - "Unhandled exception in post processing", - detail=traceback.format_exc(), + element, MessageType.BUG, "Unhandled exception in post processing", detail=traceback.format_exc(), ) self._task_group.add_failed_task(element._get_full_name()) else: @@ -372,9 +353,7 @@ class Queue: # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): - message = Message( - message_type, brief, element_name=element._get_full_name(), **kwargs - ) + message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) self._scheduler.notify_messenger(message) def _element_log_path(self, element): diff --git a/src/buildstream/_scheduler/resources.py b/src/buildstream/_scheduler/resources.py index 946a7f0b1..e76158779 100644 --- a/src/buildstream/_scheduler/resources.py +++ b/src/buildstream/_scheduler/resources.py @@ -90,10 +90,7 @@ class Resources: # available. If we don't have enough, the job cannot be # scheduled. for resource in resources: - if ( - self._max_resources[resource] > 0 - and self._used_resources[resource] >= self._max_resources[resource] - ): + if self._max_resources[resource] > 0 and self._used_resources[resource] >= self._max_resources[resource]: return False # Now we register the fact that our job is using the resources diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 0555b1103..6268ec169 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -122,16 +122,12 @@ class Scheduler: # self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues - self.terminated = ( - False # Whether the scheduler was asked to terminate or has terminated - ) + self.terminated = False # Whether the scheduler was asked to terminate or has terminated self.suspended = False # Whether the scheduler is currently suspended # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = ( - 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py - ) + self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members @@ -147,9 +143,7 @@ class Scheduler: self._notification_queue = notification_queue self._notifier = notifier - self.resources = Resources( - context.sched_builders, context.sched_fetchers, context.sched_pushers - ) + self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) # run() # @@ -191,9 +185,7 @@ class Scheduler: _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) # Start the profiler - with PROFILER.profile( - Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues) - ): + with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues self._sched() self.loop.run_forever() @@ -349,9 +341,7 @@ class Scheduler: # returncode (int): the return code with which buildbox-casd exited # def _abort_on_casd_failure(self, pid, returncode): - message = Message( - MessageType.BUG, "buildbox-casd died while the pipeline was active." - ) + message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") self._notify(Notification(NotificationType.MESSAGE, message=message)) self._casd_process.returncode = returncode @@ -407,9 +397,7 @@ class Scheduler: # to fetch tasks for elements which failed to pull, and # thus need all the pulls to complete before ever starting # a build - ready.extend( - chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues)) - ) + ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues))) # harvest_jobs() may have decided to skip some jobs, making # them eligible for promotion to the next queue as a side effect. @@ -419,11 +407,7 @@ class Scheduler: # Make sure fork is allowed before starting jobs if not self.context.prepare_fork(): - message = Message( - MessageType.BUG, - "Fork is not allowed", - detail="Background threads are active", - ) + message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active",) self._notify(Notification(NotificationType.MESSAGE, message=message)) self.terminate_jobs() return @@ -484,9 +468,7 @@ class Scheduler: # Notify that we're unsuspended self._notify(Notification(NotificationType.SUSPENDED)) self._starttime += datetime.datetime.now() - self._suspendtime - self._notify( - Notification(NotificationType.SCHED_START_TIME, time=self._starttime) - ) + self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) self._suspendtime = None # _interrupt_event(): |