diff options
author | Jonathan Maw <jonathan.maw@codethink.co.uk> | 2019-06-10 12:47:32 +0100 |
---|---|---|
committer | Jonathan Maw <jonathan.maw@codethink.co.uk> | 2019-07-09 13:20:45 +0100 |
commit | b1aa1c7da26dcce3654e570ae69a78c6a5de0adf (patch) | |
tree | 557d417037aba2381dc1d7f072352c27db12ad94 | |
parent | da2283a20a3c61babe99515878dcfcce3e2bf7ca (diff) | |
download | buildstream-jonathan/frontend-separation.tar.gz |
Store core state for the frontend separatelyjonathan/frontend-separation
-rw-r--r-- | src/buildstream/_frontend/app.py | 65 | ||||
-rw-r--r-- | src/buildstream/_frontend/status.py | 116 | ||||
-rw-r--r-- | src/buildstream/_frontend/widget.py | 31 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cachesizejob.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cleanupjob.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 11 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 36 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 30 | ||||
-rw-r--r-- | src/buildstream/_state.py | 310 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 37 |
12 files changed, 503 insertions, 143 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 9550fea40..cf4ac2b8d 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -31,6 +31,7 @@ from .. import Scope # Import various buildstream internals from .._context import Context +from ..plugin import Plugin from .._platform import Platform from .._project import Project from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError @@ -38,7 +39,6 @@ from .._message import Message, MessageType, unconditional_messages from .._stream import Stream from .._versions import BST_FORMAT_VERSION from .. import _yaml -from .._scheduler import ElementJob, JobStatus # Import frontend assets from .profile import Profile @@ -82,6 +82,7 @@ class App(): self._interactive_failures = None # Whether to handle failures interactively self._started = False # Whether a session has started self._set_project_dir = False # Whether -C option was used + self._state = None # Frontend reads this and registers callbacks # UI Colors Profiles self._content_profile = Profile(fg='yellow') @@ -203,8 +204,19 @@ class App(): except BstError as e: self._error_exit(e, "Error instantiating platform") + # Create the stream right away, we'll need to pass it around. + self.stream = Stream(self.context, self._session_start, + session_start_callback=self.session_start_cb, + interrupt_callback=self._interrupt_handler, + ticker_callback=self._tick) + + self._state = self.stream.get_state() + + # Register callbacks with the State + self._state.register_task_failed_callback(self._job_failed) + # Create the logger right before setting the message handler - self.logger = LogLine(self.context, + self.logger = LogLine(self.context, self._state, self._content_profile, self._format_profile, self._success_profile, @@ -226,16 +238,11 @@ class App(): # we can override the global exception hook. sys.excepthook = self._global_exception_handler - # Create the stream right away, we'll need to pass it around - self.stream = Stream(self.context, self._session_start, - session_start_callback=self.session_start_cb, - interrupt_callback=self._interrupt_handler, - ticker_callback=self._tick, - job_start_callback=self._job_started, - job_complete_callback=self._job_completed) + # Initialize the parts of Stream that have side-effects + self.stream.init() # Create our status printer, only available in interactive - self._status = Status(self.context, + self._status = Status(self.context, self._state, self._content_profile, self._format_profile, self._success_profile, self._error_profile, self.stream, colors=self.colors) @@ -536,21 +543,31 @@ class App(): def _tick(self): self._maybe_render_status() - def _job_started(self, job): - self._status.add_job(job) - self._maybe_render_status() - - def _job_completed(self, job, status): - self._status.remove_job(job) - self._maybe_render_status() - + # Callback that a job has failed + # + # XXX: This accesses the core directly, which is discouraged. + # Removing use of the core would require delegating to Shell + # the creation of an interactive shell, and the retrying of jobs. + # + # Args: + # action_name (str): The name of the action being performed, + # same as the task group, if it exists + # full_name (str): The name of this specific task, e.g. the element name + # unique_id (int): If an element job failed, the unique ID of the element. + # + def _job_failed(self, action_name, full_name, unique_id=None): # Dont attempt to handle a failure if the user has already opted to # terminate - if status is JobStatus.FAIL and not self.stream.terminated: + if not self.stream.terminated: + if unique_id: + # look-up queue + for q in self.stream.queues: + if q.action_name == action_name: + queue = q + assert queue, "Job action {} does not have a corresponding queue".format(action_name) - if isinstance(job, ElementJob): - element = job.element - queue = job.queue + # look-up element + element = Plugin._lookup(unique_id) # Get the last failure message for additional context failure = self._fail_messages.get(element._unique_id) @@ -564,6 +581,7 @@ class App(): .format(element), err=True) else: self._handle_failure(element, queue, failure) + else: click.echo("\nTerminating all jobs\n", err=True) self.stream.terminate() @@ -643,7 +661,8 @@ class App(): click.echo("\nContinuing with other non failing elements\n", err=True) elif choice == 'retry': click.echo("\nRetrying failed job\n", err=True) - queue.failed_elements.remove(element) + # FIXME: Outstandingly nasty modification of core state + queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) # diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index 8a3de75f3..f0fb22639 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -23,7 +23,6 @@ import click # Import a widget internal for formatting time codes from .widget import TimeCode -from .._scheduler import ElementJob # Status() @@ -36,6 +35,7 @@ from .._scheduler import ElementJob # # Args: # context (Context): The Context +# state (State): The state data from the Core # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text @@ -52,12 +52,13 @@ class Status(): 'clear_eol': 'el' } - def __init__(self, context, + def __init__(self, context, state, content_profile, format_profile, success_profile, error_profile, stream, colors=False): self._context = context + self._state = state self._content_profile = content_profile self._format_profile = format_profile self._success_profile = success_profile @@ -67,7 +68,7 @@ class Status(): self._last_lines = 0 # Number of status lines we last printed to console self._spacing = 1 self._colors = colors - self._header = _StatusHeader(context, + self._header = _StatusHeader(context, state, content_profile, format_profile, success_profile, error_profile, stream) @@ -79,41 +80,8 @@ class Status(): self._need_alloc = True self._term_caps = self._init_terminal() - # add_job() - # - # Adds a job to track in the status area - # - # Args: - # element (Element): The element of the job to track - # action_name (str): The action name for this job - # - def add_job(self, job): - elapsed = self._stream.elapsed_time - job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed) - self._jobs.append(job) - self._need_alloc = True - - # remove_job() - # - # Removes a job currently being tracked in the status area - # - # Args: - # element (Element): The element of the job to track - # action_name (str): The action name for this job - # - def remove_job(self, job): - action_name = job.action_name - if not isinstance(job, ElementJob): - element = None - else: - element = job.element - - self._jobs = [ - job for job in self._jobs - if not (job.element is element and - job.action_name == action_name) - ] - self._need_alloc = True + state.register_task_added_callback(self._add_job) + state.register_task_removed_callback(self._remove_job) # clear() # @@ -314,6 +282,39 @@ class Status(): return lines, column_widths + # _add_job() + # + # Adds a job to track in the status area + # + # Args: + # action_name (str): The action name for this job + # full_name (str): The name of this specific job (e.g. element name) + # + def _add_job(self, action_name, full_name): + task = self._state.tasks[(action_name, full_name)] + start_time = task.start_time + job = _StatusJob(self._context, action_name, full_name, + self._content_profile, self._format_profile, + start_time) + self._jobs.append(job) + self._need_alloc = True + + # _remove_job() + # + # Removes a job currently being tracked in the status area + # + # Args: + # action_name (str): The action name for this job + # full_name (str): The name of this specific job (e.g. element name) + # + def _remove_job(self, action_name, full_name): + self._jobs = [ + job for job in self._jobs + if not (job.full_name == full_name and + job.action_name == action_name) + ] + self._need_alloc = True + # _StatusHeader() # @@ -329,7 +330,7 @@ class Status(): # class _StatusHeader(): - def __init__(self, context, + def __init__(self, context, state, content_profile, format_profile, success_profile, error_profile, stream): @@ -347,6 +348,7 @@ class _StatusHeader(): self._success_profile = success_profile self._error_profile = error_profile self._stream = stream + self._state = state self._time_code = TimeCode(context, content_profile, format_profile) self._context = context @@ -386,16 +388,16 @@ class _StatusHeader(): text = '' # Format and calculate size for each queue progress - for queue in self._stream.queues: + for index, task_group in enumerate(self._state.task_groups.values()): # Add spacing - if self._stream.queues.index(queue) > 0: + if index > 0: size += 2 text += self._format_profile.fmt('→ ') - queue_text, queue_size = self._render_queue(queue) - size += queue_size - text += queue_text + group_text, group_size = self._render_task_group(task_group) + size += group_size + text += group_text line2 = self._centered(text, size, line_length, ' ') @@ -428,16 +430,16 @@ class _StatusHeader(): ################################################### # Private Methods # ################################################### - def _render_queue(self, queue): - processed = str(queue.processed_elements_count) - skipped = str(queue.skipped_elements_count) - failed = str(len(queue.failed_elements)) + def _render_task_group(self, group): + processed = str(group.processed_tasks) + skipped = str(group.skipped_tasks) + failed = str(len(group.failed_tasks)) size = 5 # Space for the formatting '[', ':', ' ', ' ' and ']' - size += len(queue.complete_name) + size += len(group.name) size += len(processed) + len(skipped) + len(failed) text = self._format_profile.fmt("(") + \ - self._content_profile.fmt(queue.complete_name) + \ + self._content_profile.fmt(group.name) + \ self._format_profile.fmt(":") + \ self._success_profile.fmt(processed) + ' ' + \ self._content_profile.fmt(skipped) + ' ' + \ @@ -463,27 +465,21 @@ class _StatusHeader(): # # Args: # context (Context): The Context -# job (Job): The job being processed +# action_name (str): The action performed +# full_name (str): The name of the job # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # elapsed (datetime): The offset into the session when this job is created # class _StatusJob(): - def __init__(self, context, job, content_profile, format_profile, elapsed): - action_name = job.action_name - if not isinstance(job, ElementJob): - element = None - else: - element = job.element - + def __init__(self, context, action_name, full_name, content_profile, format_profile, elapsed): # # Public members # - self.element = element # The Element self.action_name = action_name # The action name self.size = None # The number of characters required to render - self.full_name = element._get_full_name() if element else action_name + self.full_name = full_name # # Private members diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index 20ea80d0f..769c95e75 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -300,6 +300,7 @@ class MessageOrLogFile(Widget): # # Args: # context (Context): The Context +# state (State): The state data from the Core # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text @@ -309,7 +310,7 @@ class MessageOrLogFile(Widget): # class LogLine(Widget): - def __init__(self, context, + def __init__(self, context, state, content_profile, format_profile, success_profile, @@ -327,6 +328,7 @@ class LogLine(Widget): self._log_lines = context.log_error_lines self._message_lines = context.log_message_lines self._resolved_keys = None + self._state = state self._space_widget = Space(context, content_profile, format_profile) self._logfile_widget = LogFile(context, content_profile, format_profile, err_profile) @@ -550,7 +552,7 @@ class LogLine(Widget): # Early silent return if there are no queues, can happen # only in the case that the stream early returned due to # an inconsistent pipeline state. - if not stream.queues: + if not self._state.task_groups: return text = '' @@ -566,9 +568,12 @@ class LogLine(Widget): values = OrderedDict() for element, messages in sorted(self._failure_messages.items(), key=lambda x: x[0].name): - for queue in stream.queues: - if any(el.name == element.name for el in queue.failed_elements): + for group in self._state.task_groups.values(): + # Exclude the failure messages if the job didn't ultimately fail + # (e.g. succeeded on retry) + if element.name in group.failed_tasks: values[element.name] = ''.join(self._render(v) for v in messages) + if values: text += self.content_profile.fmt("Failure Summary\n", bold=True) text += self._format_values(values, style_value=False) @@ -582,15 +587,15 @@ class LogLine(Widget): processed_maxlen = 1 skipped_maxlen = 1 failed_maxlen = 1 - for queue in stream.queues: - processed_maxlen = max(len(str(queue.processed_elements_count)), processed_maxlen) - skipped_maxlen = max(len(str(queue.skipped_elements_count)), skipped_maxlen) - failed_maxlen = max(len(str(len(queue.failed_elements))), failed_maxlen) + for group in self._state.task_groups.values(): + processed_maxlen = max(len(str(group.processed_tasks)), processed_maxlen) + skipped_maxlen = max(len(str(group.skipped_tasks)), skipped_maxlen) + failed_maxlen = max(len(str(len(group.failed_tasks))), failed_maxlen) - for queue in stream.queues: - processed = str(queue.processed_elements_count) - skipped = str(queue.skipped_elements_count) - failed = str(len(queue.failed_elements)) + for group in self._state.task_groups.values(): + processed = str(group.processed_tasks) + skipped = str(group.skipped_tasks) + failed = str(len(group.failed_tasks)) processed_align = ' ' * (processed_maxlen - len(processed)) skipped_align = ' ' * (skipped_maxlen - len(skipped)) @@ -606,7 +611,7 @@ class LogLine(Widget): status_text += self.content_profile.fmt("failed ") + \ self._err_profile.fmt(failed) + ' ' + failed_align - values["{} Queue".format(queue.action_name)] = status_text + values["{} Queue".format(group.name)] = status_text text += self._format_values(values, style_value=False) diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py index f36c30190..581101c07 100644 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob class CacheSizeJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.set_name(self.action_name) self._complete_cb = complete_cb context = self._scheduler.context diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 85722c83f..3e9a8ff47 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob class CleanupJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.set_name(self.action_name) self._complete_cb = complete_cb context = self._scheduler.context diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index a535f55db..138448685 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -67,6 +67,7 @@ from .job import Job, ChildJob class ElementJob(Job): def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.set_name(element._get_full_name()) self.queue = queue self._element = element self._action_cb = action_cb # The action callable function diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 1c2726b5f..87f461939 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -129,6 +129,7 @@ class Job(): # # Public members # + self.name = None # The name of the job, set by the job's subclass self.action_name = action_name # The action name for the Queue self.child_data = None # Data to be sent to the main process @@ -150,6 +151,12 @@ class Job(): self._message_unique_id = None self._task_id = None + # set_name() + # + # Sets the name of this job + def set_name(self, name): + self.name = name + # start() # # Starts the job. diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index 1dd45607b..1be3f7cd0 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -21,7 +21,7 @@ from datetime import timedelta from . import Queue, QueueStatus -from ..jobs import ElementJob, JobStatus +from ..jobs import JobStatus from ..resources import ResourceType from ..._message import MessageType @@ -55,14 +55,9 @@ class BuildQueue(Queue): detail=detail, action_name=self.action_name, elapsed=timedelta(seconds=0), logfile=logfile) - job = ElementJob(self._scheduler, self.action_name, - logfile, element=element, queue=self, - action_cb=self.get_process_func(), - complete_cb=self._job_done, - max_retries=self._max_retries) self._done_queue.append(element) - self.failed_elements.append(element) - self._scheduler._job_complete_callback(job, False) + element_name = element._get_full_name() + self._task_group.add_failed_task(element_name) return super().enqueue(to_queue) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index adc1cd467..538b2b9d1 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -63,13 +63,6 @@ class Queue(): def __init__(self, scheduler): # - # Public members - # - self.failed_elements = [] # List of failed elements, for the frontend - self.processed_elements_count = 0 # Number of processed elements, for the frontend - self.skipped_elements_count = 0 # Number of skipped elements, for the frontend - - # # Private members # self._scheduler = scheduler @@ -87,6 +80,17 @@ class Queue(): if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: self._max_retries = scheduler.context.sched_network_retries + self._task_group = self._scheduler._state.add_task_group(self.action_name) + + # destroy() + # + # Explicitly performs all cleanup tasks for this queue + # + # Note: Doing this explicitly is preferred to a __del__ method because + # it is not at the mercy of the garbage collector + def destroy(self): + self._scheduler._state.remove_task_group(self.action_name) + ##################################################### # Abstract Methods for Queue implementations # ##################################################### @@ -236,6 +240,12 @@ class Queue(): def set_required_element_check(self): self._required_element_check = True + # any_failed_elements() + # + # Returns whether any elements in this queue have failed their jobs + def any_failed_elements(self): + return any(self._task_group.failed_tasks) + ##################################################### # Private Methods # ##################################################### @@ -298,7 +308,7 @@ class Queue(): # Report error and mark as failed # self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) # Treat this as a task error as it's related to a task # even though it did not occur in the task context @@ -314,18 +324,18 @@ class Queue(): self._message(element, MessageType.BUG, "Unhandled exception in post processing", detail=traceback.format_exc()) - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) else: # All elements get placed on the done queue for later processing. self._done_queue.append(element) # These lists are for bookkeeping purposes for the UI and logging. if status == JobStatus.SKIPPED or job.get_terminated(): - self.skipped_elements_count += 1 + self._task_group.add_skipped_task() elif status == JobStatus.OK: - self.processed_elements_count += 1 + self._task_group.add_processed_task() else: - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) # Convenience wrapper for Queue implementations to send # a message for the element they are processing @@ -357,7 +367,7 @@ class Queue(): status = self.status(element) if status == QueueStatus.SKIP: # Place skipped elements into the done queue immediately - self.skipped_elements_count += 1 # Public record of skipped elements + self._task_group.add_skipped_task() self._done_queue.append(element) # Elements to proceed to the next queue elif status == QueueStatus.READY: # Push elements which are ready to be processed immediately into the queue diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 17d655ce2..d2ea2741e 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -28,7 +28,7 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType -from .jobs import JobStatus, CacheSizeJob, CleanupJob +from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob from .._profile import Topics, PROFILER @@ -61,19 +61,16 @@ _ACTION_NAME_CACHE_SIZE = 'size' # Args: # context: The Context in the parent scheduling process # start_time: The time at which the session started +# state: The state that can be made available to the frontend # interrupt_callback: A callback to handle ^C # ticker_callback: A callback call once per second -# job_start_callback: A callback call when each job starts -# job_complete_callback: A callback call when each job completes # class Scheduler(): def __init__(self, context, - start_time, + start_time, state, interrupt_callback=None, - ticker_callback=None, - job_start_callback=None, - job_complete_callback=None): + ticker_callback=None): # # Public members @@ -94,6 +91,7 @@ class Scheduler(): self._starttime = start_time # Initial application start time self._suspendtime = None # Session time compensation for suspended state self._queue_jobs = True # Whether we should continue to queue jobs + self._state = state # State of cache management related jobs self._cache_size_scheduled = False # Whether we have a cache size job scheduled @@ -104,8 +102,6 @@ class Scheduler(): # Callbacks to report back to the Scheduler owner self._interrupt_callback = interrupt_callback self._ticker_callback = ticker_callback - self._job_start_callback = job_start_callback - self._job_complete_callback = job_complete_callback # Whether our exclusive jobs, like 'cleanup' are currently already # waiting or active. @@ -163,7 +159,7 @@ class Scheduler(): # Stop handling unix signals self._disconnect_signals() - failed = any(any(queue.failed_elements) for queue in self.queues) + failed = any(queue.any_failed_elements() for queue in self.queues) self.loop = None if failed: @@ -184,6 +180,9 @@ class Scheduler(): # def clear_queues(self): if self.queues: + for queue in self.queues: + queue.destroy() + self.queues.clear() # terminate_jobs() @@ -261,8 +260,12 @@ class Scheduler(): # Remove from the active jobs list self._active_jobs.remove(job) - # Scheduler owner facing callback - self._job_complete_callback(job, status) + self._state.remove_task(job.action_name, job.name) + if status == JobStatus.FAIL: + unique_id = None + if isinstance(job, ElementJob): + unique_id = job._element._unique_id + self._state.fail_task(job.action_name, job.name, unique_id) # Now check for more jobs self._sched() @@ -321,8 +324,7 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - if self._job_start_callback: - self._job_start_callback(job) + self._state.add_task(job.action_name, job.name, self.elapsed_time()) job.start() # Callback for the cache size job diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py new file mode 100644 index 000000000..b2f0b705d --- /dev/null +++ b/src/buildstream/_state.py @@ -0,0 +1,310 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + + +from collections import OrderedDict + + +# TaskGroup +# +# The state data stored for a group of tasks (usually scheduler queues) +# +# Args: +# name (str): The name of the Task Group, e.g. 'build' +# state (State): The state object +# +class TaskGroup(): + def __init__(self, name, state): + self.name = name + self.processed_tasks = 0 + self.skipped_tasks = 0 + # NOTE: failed_tasks is a list of strings instead of an integer count + # because the frontend requires the full list of failed tasks to + # know whether to print failure messages for a given element. + self.failed_tasks = [] + + self._state = state + self._update_task_group_cbs = [] + + ########################################### + # Core-facing APIs to drive notifications # + ########################################### + + # add_processed_task() + # + # Update the TaskGroup's count of processed tasks and notify of changes + # + # This is a core-facing API and should not be called from the frontend + # + def add_processed_task(self): + self.processed_tasks += 1 + for cb in self._state._task_groups_changed_cbs: + cb() + + # add_skipped_task() + # + # Update the TaskGroup's count of skipped tasks and notify of changes + # + # This is a core-facing API and should not be called from the frontend + # + def add_skipped_task(self): + self.skipped_tasks += 1 + + for cb in self._state._task_groups_changed_cbs: + cb() + + # add_failed_task() + # + # Update the TaskGroup's list of failed tasks and notify of changes + # + # Args: + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # + # This is a core-facing API and should not be called from the frontend + # + def add_failed_task(self, full_name): + self.failed_tasks.append(full_name) + + for cb in self._state._task_groups_changed_cbs: + cb() + + +# State +# +# The state data that is stored for the purpose of sharing with the frontend. +# +# BuildStream's Core is responsible for making changes to this data. +# BuildStream's Frontend may register callbacks with State to be notified +# when parts of State change, and read State to know what has changed. +class State(): + def __init__(self): + self.task_groups = OrderedDict() # key is TaskGroup name + + # Note: A Task's full_name is technically unique, but only accidentally. + self.tasks = OrderedDict() # key is a tuple of action_name and full_name + + self._task_added_cbs = [] + self._task_removed_cbs = [] + self._task_groups_changed_cbs = [] + self._task_failed_cbs = [] + + ##################################### + # Frontend-facing notification APIs # + ##################################### + + # register_task_added_callback() + # + # Registers a callback to be notified when a task is added + # + # Args: + # callback (function): The callback to be notified + # + # Callback Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # + def register_task_added_callback(self, callback): + self._task_added_cbs.append(callback) + + # unregister_task_added_callback() + # + # Unregisters a callback previously registered by + # register_task_added_callback() + # + # Args: + # callback (function): The callback to be removed + # + def unregister_task_added_callback(self, callback): + self._task_added_cbs.remove(callback) + + # register_task_removed_callback() + # + # Registers a callback to be notified when a task is removed + # + # Args: + # callback (function): The callback to be notified + # + # Callback Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # + def register_task_removed_callback(self, callback): + self._task_removed_cbs.append(callback) + + # unregister_task_removed_callback() + # + # Unregisters a callback previously registered by + # register_task_removed_callback() + # + # Args: + # callback (function): The callback to be notified + # + def unregister_task_removed_callback(self, callback): + self._task_removed_cbs.remove(callback) + + # register_task_failed_callback() + # + # Registers a callback to be notified when a task has failed + # + # Args: + # callback (function): The callback to be notified + # + # Callback Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # unique_id (int): (optionally) the element's unique ID, if the failure + # came from an element + # + def register_task_failed_callback(self, callback): + self._task_failed_cbs.append(callback) + + # unregister_task_failed_callback() + # + # Unregisters a callback previously registered by + # register_task_failed_callback() + # + # Args: + # callback (function): The callback to be removed + # + def unregister_task_failed_callback(self, callback): + self._task_failed_cbs.remove(callback) + + ############################################## + # Core-facing APIs for driving notifications # + ############################################## + + # add_task_group() + # + # Notification that a new task group has been added + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # name (str): The name of the task group, e.g. 'build' + # + # Returns: + # TaskGroup: The task group created + # + def add_task_group(self, name): + assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups) + group = TaskGroup(name, self) + self.task_groups[name] = group + + return group + + # remove_task_group() + # + # Notification that a task group has been removed + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # name (str): The name of the task group, e.g. 'build' + # + def remove_task_group(self, name): + # Rely on 'del' to raise an error when removing nonexistent task groups + del self.task_groups[name] + + # add_task() + # + # Add a task and send appropriate notifications + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # start_time (timedelta): The time the task started, relative to + # buildstream's start time. + # + def add_task(self, action_name, full_name, start_time): + task_key = (action_name, full_name) + assert task_key not in self.tasks, \ + "Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks) + + task = _Task(action_name, full_name, start_time) + self.tasks[task_key] = task + + for cb in self._task_added_cbs: + cb(action_name, full_name) + + # remove_task() + # + # Remove the task and send appropriate notifications + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # + def remove_task(self, action_name, full_name): + # Rely on 'del' to raise an error when removing nonexistent tasks + del self.tasks[(action_name, full_name)] + + for cb in self._task_removed_cbs: + cb(action_name, full_name) + + # fail_task() + # + # Notify all registered callbacks that a task has failed. + # + # This is separate from the tasks changed callbacks because a failed task + # requires the frontend to intervene to decide what happens next. + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # unique_id (int): (optionally) the element's unique ID, if the failure came from an element + # + def fail_task(self, action_name, full_name, unique_id=None): + for cb in self._task_failed_cbs: + cb(action_name, full_name, unique_id) + + +# _Task +# +# The state data stored for an individual task +# +# Args: +# action_name (str): The name of the action, e.g. 'build' +# full_name (str): The full name of the task, distinguishing +# it from other tasks with the same action name +# e.g. an element's name. +# start_time (timedelta): The time the task started, relative to +# buildstream's start time. +class _Task(): + def __init__(self, action_name, full_name, start_time): + self.action_name = action_name + self.full_name = full_name + self.start_time = start_time diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index fcd40c3b4..2ad1a4fee 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -38,6 +38,7 @@ from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER +from ._state import State from .types import _KeyStrength from . import utils, _yaml, _site from . import Scope, Consistency @@ -53,17 +54,13 @@ from . import Scope, Consistency # session_start_callback (callable): A callback to invoke when the session starts # interrupt_callback (callable): A callback to invoke when we get interrupted # ticker_callback (callable): Invoked every second while running the scheduler -# job_start_callback (callable): Called when a job starts -# job_complete_callback (callable): Called when a job completes # class Stream(): def __init__(self, context, session_start, *, session_start_callback=None, interrupt_callback=None, - ticker_callback=None, - job_start_callback=None, - job_complete_callback=None): + ticker_callback=None): # # Public members @@ -76,19 +73,27 @@ class Stream(): # # Private members # - self._artifacts = context.artifactcache - self._sourcecache = context.sourcecache self._context = context + self._artifacts = None + self._sourcecache = None self._project = None self._pipeline = None - self._scheduler = Scheduler(context, session_start, + self._state = State() # Owned by Stream, used by Core to set state + self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback=interrupt_callback, - ticker_callback=ticker_callback, - job_start_callback=job_start_callback, - job_complete_callback=job_complete_callback) + ticker_callback=ticker_callback) self._first_non_track_queue = None self._session_start_callback = session_start_callback + # init() + # + # Initialization of Stream that has side-effects that require it to be + # performed after the Stream is created. + # + def init(self): + self._artifacts = self._context.artifactcache + self._sourcecache = self._context.sourcecache + # cleanup() # # Cleans up application state @@ -972,6 +977,15 @@ class Stream(): self.queues = [queue] self._run() + # get_state() + # + # Get the State object owned by Stream + # + # Returns: + # State: The State object + def get_state(self): + return self._state + ############################################################# # Scheduler API forwarding # ############################################################# @@ -1213,7 +1227,6 @@ class Stream(): # def _add_queue(self, queue, *, track=False): self.queues.append(queue) - if not (track or self._first_non_track_queue): self._first_non_track_queue = queue self._first_non_track_queue.set_required_element_check() |