summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Maw <jonathan.maw@codethink.co.uk>2019-06-10 12:47:32 +0100
committerJonathan Maw <jonathan.maw@codethink.co.uk>2019-07-09 13:20:45 +0100
commitb1aa1c7da26dcce3654e570ae69a78c6a5de0adf (patch)
tree557d417037aba2381dc1d7f072352c27db12ad94
parentda2283a20a3c61babe99515878dcfcce3e2bf7ca (diff)
downloadbuildstream-jonathan/frontend-separation.tar.gz
Store core state for the frontend separatelyjonathan/frontend-separation
-rw-r--r--src/buildstream/_frontend/app.py65
-rw-r--r--src/buildstream/_frontend/status.py116
-rw-r--r--src/buildstream/_frontend/widget.py31
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py1
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py1
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py1
-rw-r--r--src/buildstream/_scheduler/jobs/job.py7
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/queue.py36
-rw-r--r--src/buildstream/_scheduler/scheduler.py30
-rw-r--r--src/buildstream/_state.py310
-rw-r--r--src/buildstream/_stream.py37
12 files changed, 503 insertions, 143 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 9550fea40..cf4ac2b8d 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -31,6 +31,7 @@ from .. import Scope
# Import various buildstream internals
from .._context import Context
+from ..plugin import Plugin
from .._platform import Platform
from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
@@ -38,7 +39,6 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
-from .._scheduler import ElementJob, JobStatus
# Import frontend assets
from .profile import Profile
@@ -82,6 +82,7 @@ class App():
self._interactive_failures = None # Whether to handle failures interactively
self._started = False # Whether a session has started
self._set_project_dir = False # Whether -C option was used
+ self._state = None # Frontend reads this and registers callbacks
# UI Colors Profiles
self._content_profile = Profile(fg='yellow')
@@ -203,8 +204,19 @@ class App():
except BstError as e:
self._error_exit(e, "Error instantiating platform")
+ # Create the stream right away, we'll need to pass it around.
+ self.stream = Stream(self.context, self._session_start,
+ session_start_callback=self.session_start_cb,
+ interrupt_callback=self._interrupt_handler,
+ ticker_callback=self._tick)
+
+ self._state = self.stream.get_state()
+
+ # Register callbacks with the State
+ self._state.register_task_failed_callback(self._job_failed)
+
# Create the logger right before setting the message handler
- self.logger = LogLine(self.context,
+ self.logger = LogLine(self.context, self._state,
self._content_profile,
self._format_profile,
self._success_profile,
@@ -226,16 +238,11 @@ class App():
# we can override the global exception hook.
sys.excepthook = self._global_exception_handler
- # Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context, self._session_start,
- session_start_callback=self.session_start_cb,
- interrupt_callback=self._interrupt_handler,
- ticker_callback=self._tick,
- job_start_callback=self._job_started,
- job_complete_callback=self._job_completed)
+ # Initialize the parts of Stream that have side-effects
+ self.stream.init()
# Create our status printer, only available in interactive
- self._status = Status(self.context,
+ self._status = Status(self.context, self._state,
self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
self.stream, colors=self.colors)
@@ -536,21 +543,31 @@ class App():
def _tick(self):
self._maybe_render_status()
- def _job_started(self, job):
- self._status.add_job(job)
- self._maybe_render_status()
-
- def _job_completed(self, job, status):
- self._status.remove_job(job)
- self._maybe_render_status()
-
+ # Callback that a job has failed
+ #
+ # XXX: This accesses the core directly, which is discouraged.
+ # Removing use of the core would require delegating to Shell
+ # the creation of an interactive shell, and the retrying of jobs.
+ #
+ # Args:
+ # action_name (str): The name of the action being performed,
+ # same as the task group, if it exists
+ # full_name (str): The name of this specific task, e.g. the element name
+ # unique_id (int): If an element job failed, the unique ID of the element.
+ #
+ def _job_failed(self, action_name, full_name, unique_id=None):
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if status is JobStatus.FAIL and not self.stream.terminated:
+ if not self.stream.terminated:
+ if unique_id:
+ # look-up queue
+ for q in self.stream.queues:
+ if q.action_name == action_name:
+ queue = q
+ assert queue, "Job action {} does not have a corresponding queue".format(action_name)
- if isinstance(job, ElementJob):
- element = job.element
- queue = job.queue
+ # look-up element
+ element = Plugin._lookup(unique_id)
# Get the last failure message for additional context
failure = self._fail_messages.get(element._unique_id)
@@ -564,6 +581,7 @@ class App():
.format(element), err=True)
else:
self._handle_failure(element, queue, failure)
+
else:
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
@@ -643,7 +661,8 @@ class App():
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
- queue.failed_elements.remove(element)
+ # FIXME: Outstandingly nasty modification of core state
+ queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
#
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index 8a3de75f3..f0fb22639 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -23,7 +23,6 @@ import click
# Import a widget internal for formatting time codes
from .widget import TimeCode
-from .._scheduler import ElementJob
# Status()
@@ -36,6 +35,7 @@ from .._scheduler import ElementJob
#
# Args:
# context (Context): The Context
+# state (State): The state data from the Core
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
@@ -52,12 +52,13 @@ class Status():
'clear_eol': 'el'
}
- def __init__(self, context,
+ def __init__(self, context, state,
content_profile, format_profile,
success_profile, error_profile,
stream, colors=False):
self._context = context
+ self._state = state
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
@@ -67,7 +68,7 @@ class Status():
self._last_lines = 0 # Number of status lines we last printed to console
self._spacing = 1
self._colors = colors
- self._header = _StatusHeader(context,
+ self._header = _StatusHeader(context, state,
content_profile, format_profile,
success_profile, error_profile,
stream)
@@ -79,41 +80,8 @@ class Status():
self._need_alloc = True
self._term_caps = self._init_terminal()
- # add_job()
- #
- # Adds a job to track in the status area
- #
- # Args:
- # element (Element): The element of the job to track
- # action_name (str): The action name for this job
- #
- def add_job(self, job):
- elapsed = self._stream.elapsed_time
- job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed)
- self._jobs.append(job)
- self._need_alloc = True
-
- # remove_job()
- #
- # Removes a job currently being tracked in the status area
- #
- # Args:
- # element (Element): The element of the job to track
- # action_name (str): The action name for this job
- #
- def remove_job(self, job):
- action_name = job.action_name
- if not isinstance(job, ElementJob):
- element = None
- else:
- element = job.element
-
- self._jobs = [
- job for job in self._jobs
- if not (job.element is element and
- job.action_name == action_name)
- ]
- self._need_alloc = True
+ state.register_task_added_callback(self._add_job)
+ state.register_task_removed_callback(self._remove_job)
# clear()
#
@@ -314,6 +282,39 @@ class Status():
return lines, column_widths
+ # _add_job()
+ #
+ # Adds a job to track in the status area
+ #
+ # Args:
+ # action_name (str): The action name for this job
+ # full_name (str): The name of this specific job (e.g. element name)
+ #
+ def _add_job(self, action_name, full_name):
+ task = self._state.tasks[(action_name, full_name)]
+ start_time = task.start_time
+ job = _StatusJob(self._context, action_name, full_name,
+ self._content_profile, self._format_profile,
+ start_time)
+ self._jobs.append(job)
+ self._need_alloc = True
+
+ # _remove_job()
+ #
+ # Removes a job currently being tracked in the status area
+ #
+ # Args:
+ # action_name (str): The action name for this job
+ # full_name (str): The name of this specific job (e.g. element name)
+ #
+ def _remove_job(self, action_name, full_name):
+ self._jobs = [
+ job for job in self._jobs
+ if not (job.full_name == full_name and
+ job.action_name == action_name)
+ ]
+ self._need_alloc = True
+
# _StatusHeader()
#
@@ -329,7 +330,7 @@ class Status():
#
class _StatusHeader():
- def __init__(self, context,
+ def __init__(self, context, state,
content_profile, format_profile,
success_profile, error_profile,
stream):
@@ -347,6 +348,7 @@ class _StatusHeader():
self._success_profile = success_profile
self._error_profile = error_profile
self._stream = stream
+ self._state = state
self._time_code = TimeCode(context, content_profile, format_profile)
self._context = context
@@ -386,16 +388,16 @@ class _StatusHeader():
text = ''
# Format and calculate size for each queue progress
- for queue in self._stream.queues:
+ for index, task_group in enumerate(self._state.task_groups.values()):
# Add spacing
- if self._stream.queues.index(queue) > 0:
+ if index > 0:
size += 2
text += self._format_profile.fmt('→ ')
- queue_text, queue_size = self._render_queue(queue)
- size += queue_size
- text += queue_text
+ group_text, group_size = self._render_task_group(task_group)
+ size += group_size
+ text += group_text
line2 = self._centered(text, size, line_length, ' ')
@@ -428,16 +430,16 @@ class _StatusHeader():
###################################################
# Private Methods #
###################################################
- def _render_queue(self, queue):
- processed = str(queue.processed_elements_count)
- skipped = str(queue.skipped_elements_count)
- failed = str(len(queue.failed_elements))
+ def _render_task_group(self, group):
+ processed = str(group.processed_tasks)
+ skipped = str(group.skipped_tasks)
+ failed = str(len(group.failed_tasks))
size = 5 # Space for the formatting '[', ':', ' ', ' ' and ']'
- size += len(queue.complete_name)
+ size += len(group.name)
size += len(processed) + len(skipped) + len(failed)
text = self._format_profile.fmt("(") + \
- self._content_profile.fmt(queue.complete_name) + \
+ self._content_profile.fmt(group.name) + \
self._format_profile.fmt(":") + \
self._success_profile.fmt(processed) + ' ' + \
self._content_profile.fmt(skipped) + ' ' + \
@@ -463,27 +465,21 @@ class _StatusHeader():
#
# Args:
# context (Context): The Context
-# job (Job): The job being processed
+# action_name (str): The action performed
+# full_name (str): The name of the job
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# elapsed (datetime): The offset into the session when this job is created
#
class _StatusJob():
- def __init__(self, context, job, content_profile, format_profile, elapsed):
- action_name = job.action_name
- if not isinstance(job, ElementJob):
- element = None
- else:
- element = job.element
-
+ def __init__(self, context, action_name, full_name, content_profile, format_profile, elapsed):
#
# Public members
#
- self.element = element # The Element
self.action_name = action_name # The action name
self.size = None # The number of characters required to render
- self.full_name = element._get_full_name() if element else action_name
+ self.full_name = full_name
#
# Private members
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 20ea80d0f..769c95e75 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -300,6 +300,7 @@ class MessageOrLogFile(Widget):
#
# Args:
# context (Context): The Context
+# state (State): The state data from the Core
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
@@ -309,7 +310,7 @@ class MessageOrLogFile(Widget):
#
class LogLine(Widget):
- def __init__(self, context,
+ def __init__(self, context, state,
content_profile,
format_profile,
success_profile,
@@ -327,6 +328,7 @@ class LogLine(Widget):
self._log_lines = context.log_error_lines
self._message_lines = context.log_message_lines
self._resolved_keys = None
+ self._state = state
self._space_widget = Space(context, content_profile, format_profile)
self._logfile_widget = LogFile(context, content_profile, format_profile, err_profile)
@@ -550,7 +552,7 @@ class LogLine(Widget):
# Early silent return if there are no queues, can happen
# only in the case that the stream early returned due to
# an inconsistent pipeline state.
- if not stream.queues:
+ if not self._state.task_groups:
return
text = ''
@@ -566,9 +568,12 @@ class LogLine(Widget):
values = OrderedDict()
for element, messages in sorted(self._failure_messages.items(), key=lambda x: x[0].name):
- for queue in stream.queues:
- if any(el.name == element.name for el in queue.failed_elements):
+ for group in self._state.task_groups.values():
+ # Exclude the failure messages if the job didn't ultimately fail
+ # (e.g. succeeded on retry)
+ if element.name in group.failed_tasks:
values[element.name] = ''.join(self._render(v) for v in messages)
+
if values:
text += self.content_profile.fmt("Failure Summary\n", bold=True)
text += self._format_values(values, style_value=False)
@@ -582,15 +587,15 @@ class LogLine(Widget):
processed_maxlen = 1
skipped_maxlen = 1
failed_maxlen = 1
- for queue in stream.queues:
- processed_maxlen = max(len(str(queue.processed_elements_count)), processed_maxlen)
- skipped_maxlen = max(len(str(queue.skipped_elements_count)), skipped_maxlen)
- failed_maxlen = max(len(str(len(queue.failed_elements))), failed_maxlen)
+ for group in self._state.task_groups.values():
+ processed_maxlen = max(len(str(group.processed_tasks)), processed_maxlen)
+ skipped_maxlen = max(len(str(group.skipped_tasks)), skipped_maxlen)
+ failed_maxlen = max(len(str(len(group.failed_tasks))), failed_maxlen)
- for queue in stream.queues:
- processed = str(queue.processed_elements_count)
- skipped = str(queue.skipped_elements_count)
- failed = str(len(queue.failed_elements))
+ for group in self._state.task_groups.values():
+ processed = str(group.processed_tasks)
+ skipped = str(group.skipped_tasks)
+ failed = str(len(group.failed_tasks))
processed_align = ' ' * (processed_maxlen - len(processed))
skipped_align = ' ' * (skipped_maxlen - len(skipped))
@@ -606,7 +611,7 @@ class LogLine(Widget):
status_text += self.content_profile.fmt("failed ") + \
self._err_profile.fmt(failed) + ' ' + failed_align
- values["{} Queue".format(queue.action_name)] = status_text
+ values["{} Queue".format(group.name)] = status_text
text += self._format_values(values, style_value=False)
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index f36c30190..581101c07 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self.set_name(self.action_name)
self._complete_cb = complete_cb
context = self._scheduler.context
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 85722c83f..3e9a8ff47 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self.set_name(self.action_name)
self._complete_cb = complete_cb
context = self._scheduler.context
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index a535f55db..138448685 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -67,6 +67,7 @@ from .job import Job, ChildJob
class ElementJob(Job):
def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self.set_name(element._get_full_name())
self.queue = queue
self._element = element
self._action_cb = action_cb # The action callable function
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 1c2726b5f..87f461939 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -129,6 +129,7 @@ class Job():
#
# Public members
#
+ self.name = None # The name of the job, set by the job's subclass
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
@@ -150,6 +151,12 @@ class Job():
self._message_unique_id = None
self._task_id = None
+ # set_name()
+ #
+ # Sets the name of this job
+ def set_name(self, name):
+ self.name = name
+
# start()
#
# Starts the job.
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 1dd45607b..1be3f7cd0 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,7 @@
from datetime import timedelta
from . import Queue, QueueStatus
-from ..jobs import ElementJob, JobStatus
+from ..jobs import JobStatus
from ..resources import ResourceType
from ..._message import MessageType
@@ -55,14 +55,9 @@ class BuildQueue(Queue):
detail=detail, action_name=self.action_name,
elapsed=timedelta(seconds=0),
logfile=logfile)
- job = ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- action_cb=self.get_process_func(),
- complete_cb=self._job_done,
- max_retries=self._max_retries)
self._done_queue.append(element)
- self.failed_elements.append(element)
- self._scheduler._job_complete_callback(job, False)
+ element_name = element._get_full_name()
+ self._task_group.add_failed_task(element_name)
return super().enqueue(to_queue)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index adc1cd467..538b2b9d1 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -63,13 +63,6 @@ class Queue():
def __init__(self, scheduler):
#
- # Public members
- #
- self.failed_elements = [] # List of failed elements, for the frontend
- self.processed_elements_count = 0 # Number of processed elements, for the frontend
- self.skipped_elements_count = 0 # Number of skipped elements, for the frontend
-
- #
# Private members
#
self._scheduler = scheduler
@@ -87,6 +80,17 @@ class Queue():
if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
self._max_retries = scheduler.context.sched_network_retries
+ self._task_group = self._scheduler._state.add_task_group(self.action_name)
+
+ # destroy()
+ #
+ # Explicitly performs all cleanup tasks for this queue
+ #
+ # Note: Doing this explicitly is preferred to a __del__ method because
+ # it is not at the mercy of the garbage collector
+ def destroy(self):
+ self._scheduler._state.remove_task_group(self.action_name)
+
#####################################################
# Abstract Methods for Queue implementations #
#####################################################
@@ -236,6 +240,12 @@ class Queue():
def set_required_element_check(self):
self._required_element_check = True
+ # any_failed_elements()
+ #
+ # Returns whether any elements in this queue have failed their jobs
+ def any_failed_elements(self):
+ return any(self._task_group.failed_tasks)
+
#####################################################
# Private Methods #
#####################################################
@@ -298,7 +308,7 @@ class Queue():
# Report error and mark as failed
#
self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
# Treat this as a task error as it's related to a task
# even though it did not occur in the task context
@@ -314,18 +324,18 @@ class Queue():
self._message(element, MessageType.BUG,
"Unhandled exception in post processing",
detail=traceback.format_exc())
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
else:
# All elements get placed on the done queue for later processing.
self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED or job.get_terminated():
- self.skipped_elements_count += 1
+ self._task_group.add_skipped_task()
elif status == JobStatus.OK:
- self.processed_elements_count += 1
+ self._task_group.add_processed_task()
else:
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
@@ -357,7 +367,7 @@ class Queue():
status = self.status(element)
if status == QueueStatus.SKIP:
# Place skipped elements into the done queue immediately
- self.skipped_elements_count += 1 # Public record of skipped elements
+ self._task_group.add_skipped_task()
self._done_queue.append(element) # Elements to proceed to the next queue
elif status == QueueStatus.READY:
# Push elements which are ready to be processed immediately into the queue
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 17d655ce2..d2ea2741e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob
from .._profile import Topics, PROFILER
@@ -61,19 +61,16 @@ _ACTION_NAME_CACHE_SIZE = 'size'
# Args:
# context: The Context in the parent scheduling process
# start_time: The time at which the session started
+# state: The state that can be made available to the frontend
# interrupt_callback: A callback to handle ^C
# ticker_callback: A callback call once per second
-# job_start_callback: A callback call when each job starts
-# job_complete_callback: A callback call when each job completes
#
class Scheduler():
def __init__(self, context,
- start_time,
+ start_time, state,
interrupt_callback=None,
- ticker_callback=None,
- job_start_callback=None,
- job_complete_callback=None):
+ ticker_callback=None):
#
# Public members
@@ -94,6 +91,7 @@ class Scheduler():
self._starttime = start_time # Initial application start time
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
+ self._state = state
# State of cache management related jobs
self._cache_size_scheduled = False # Whether we have a cache size job scheduled
@@ -104,8 +102,6 @@ class Scheduler():
# Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
- self._job_start_callback = job_start_callback
- self._job_complete_callback = job_complete_callback
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -163,7 +159,7 @@ class Scheduler():
# Stop handling unix signals
self._disconnect_signals()
- failed = any(any(queue.failed_elements) for queue in self.queues)
+ failed = any(queue.any_failed_elements() for queue in self.queues)
self.loop = None
if failed:
@@ -184,6 +180,9 @@ class Scheduler():
#
def clear_queues(self):
if self.queues:
+ for queue in self.queues:
+ queue.destroy()
+
self.queues.clear()
# terminate_jobs()
@@ -261,8 +260,12 @@ class Scheduler():
# Remove from the active jobs list
self._active_jobs.remove(job)
- # Scheduler owner facing callback
- self._job_complete_callback(job, status)
+ self._state.remove_task(job.action_name, job.name)
+ if status == JobStatus.FAIL:
+ unique_id = None
+ if isinstance(job, ElementJob):
+ unique_id = job._element._unique_id
+ self._state.fail_task(job.action_name, job.name, unique_id)
# Now check for more jobs
self._sched()
@@ -321,8 +324,7 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- if self._job_start_callback:
- self._job_start_callback(job)
+ self._state.add_task(job.action_name, job.name, self.elapsed_time())
job.start()
# Callback for the cache size job
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
new file mode 100644
index 000000000..b2f0b705d
--- /dev/null
+++ b/src/buildstream/_state.py
@@ -0,0 +1,310 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+
+from collections import OrderedDict
+
+
+# TaskGroup
+#
+# The state data stored for a group of tasks (usually scheduler queues)
+#
+# Args:
+# name (str): The name of the Task Group, e.g. 'build'
+# state (State): The state object
+#
+class TaskGroup():
+ def __init__(self, name, state):
+ self.name = name
+ self.processed_tasks = 0
+ self.skipped_tasks = 0
+ # NOTE: failed_tasks is a list of strings instead of an integer count
+ # because the frontend requires the full list of failed tasks to
+ # know whether to print failure messages for a given element.
+ self.failed_tasks = []
+
+ self._state = state
+ self._update_task_group_cbs = []
+
+ ###########################################
+ # Core-facing APIs to drive notifications #
+ ###########################################
+
+ # add_processed_task()
+ #
+ # Update the TaskGroup's count of processed tasks and notify of changes
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ def add_processed_task(self):
+ self.processed_tasks += 1
+ for cb in self._state._task_groups_changed_cbs:
+ cb()
+
+ # add_skipped_task()
+ #
+ # Update the TaskGroup's count of skipped tasks and notify of changes
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ def add_skipped_task(self):
+ self.skipped_tasks += 1
+
+ for cb in self._state._task_groups_changed_cbs:
+ cb()
+
+ # add_failed_task()
+ #
+ # Update the TaskGroup's list of failed tasks and notify of changes
+ #
+ # Args:
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ def add_failed_task(self, full_name):
+ self.failed_tasks.append(full_name)
+
+ for cb in self._state._task_groups_changed_cbs:
+ cb()
+
+
+# State
+#
+# The state data that is stored for the purpose of sharing with the frontend.
+#
+# BuildStream's Core is responsible for making changes to this data.
+# BuildStream's Frontend may register callbacks with State to be notified
+# when parts of State change, and read State to know what has changed.
+class State():
+ def __init__(self):
+ self.task_groups = OrderedDict() # key is TaskGroup name
+
+ # Note: A Task's full_name is technically unique, but only accidentally.
+ self.tasks = OrderedDict() # key is a tuple of action_name and full_name
+
+ self._task_added_cbs = []
+ self._task_removed_cbs = []
+ self._task_groups_changed_cbs = []
+ self._task_failed_cbs = []
+
+ #####################################
+ # Frontend-facing notification APIs #
+ #####################################
+
+ # register_task_added_callback()
+ #
+ # Registers a callback to be notified when a task is added
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ #
+ # Callback Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ #
+ def register_task_added_callback(self, callback):
+ self._task_added_cbs.append(callback)
+
+ # unregister_task_added_callback()
+ #
+ # Unregisters a callback previously registered by
+ # register_task_added_callback()
+ #
+ # Args:
+ # callback (function): The callback to be removed
+ #
+ def unregister_task_added_callback(self, callback):
+ self._task_added_cbs.remove(callback)
+
+ # register_task_removed_callback()
+ #
+ # Registers a callback to be notified when a task is removed
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ #
+ # Callback Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ #
+ def register_task_removed_callback(self, callback):
+ self._task_removed_cbs.append(callback)
+
+ # unregister_task_removed_callback()
+ #
+ # Unregisters a callback previously registered by
+ # register_task_removed_callback()
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ #
+ def unregister_task_removed_callback(self, callback):
+ self._task_removed_cbs.remove(callback)
+
+ # register_task_failed_callback()
+ #
+ # Registers a callback to be notified when a task has failed
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ #
+ # Callback Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ # unique_id (int): (optionally) the element's unique ID, if the failure
+ # came from an element
+ #
+ def register_task_failed_callback(self, callback):
+ self._task_failed_cbs.append(callback)
+
+ # unregister_task_failed_callback()
+ #
+ # Unregisters a callback previously registered by
+ # register_task_failed_callback()
+ #
+ # Args:
+ # callback (function): The callback to be removed
+ #
+ def unregister_task_failed_callback(self, callback):
+ self._task_failed_cbs.remove(callback)
+
+ ##############################################
+ # Core-facing APIs for driving notifications #
+ ##############################################
+
+ # add_task_group()
+ #
+ # Notification that a new task group has been added
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # name (str): The name of the task group, e.g. 'build'
+ #
+ # Returns:
+ # TaskGroup: The task group created
+ #
+ def add_task_group(self, name):
+ assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups)
+ group = TaskGroup(name, self)
+ self.task_groups[name] = group
+
+ return group
+
+ # remove_task_group()
+ #
+ # Notification that a task group has been removed
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # name (str): The name of the task group, e.g. 'build'
+ #
+ def remove_task_group(self, name):
+ # Rely on 'del' to raise an error when removing nonexistent task groups
+ del self.task_groups[name]
+
+ # add_task()
+ #
+ # Add a task and send appropriate notifications
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ # start_time (timedelta): The time the task started, relative to
+ # buildstream's start time.
+ #
+ def add_task(self, action_name, full_name, start_time):
+ task_key = (action_name, full_name)
+ assert task_key not in self.tasks, \
+ "Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks)
+
+ task = _Task(action_name, full_name, start_time)
+ self.tasks[task_key] = task
+
+ for cb in self._task_added_cbs:
+ cb(action_name, full_name)
+
+ # remove_task()
+ #
+ # Remove the task and send appropriate notifications
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ #
+ def remove_task(self, action_name, full_name):
+ # Rely on 'del' to raise an error when removing nonexistent tasks
+ del self.tasks[(action_name, full_name)]
+
+ for cb in self._task_removed_cbs:
+ cb(action_name, full_name)
+
+ # fail_task()
+ #
+ # Notify all registered callbacks that a task has failed.
+ #
+ # This is separate from the tasks changed callbacks because a failed task
+ # requires the frontend to intervene to decide what happens next.
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ # unique_id (int): (optionally) the element's unique ID, if the failure came from an element
+ #
+ def fail_task(self, action_name, full_name, unique_id=None):
+ for cb in self._task_failed_cbs:
+ cb(action_name, full_name, unique_id)
+
+
+# _Task
+#
+# The state data stored for an individual task
+#
+# Args:
+# action_name (str): The name of the action, e.g. 'build'
+# full_name (str): The full name of the task, distinguishing
+# it from other tasks with the same action name
+# e.g. an element's name.
+# start_time (timedelta): The time the task started, relative to
+# buildstream's start time.
+class _Task():
+ def __init__(self, action_name, full_name, start_time):
+ self.action_name = action_name
+ self.full_name = full_name
+ self.start_time = start_time
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index fcd40c3b4..2ad1a4fee 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,6 +38,7 @@ from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
+from ._state import State
from .types import _KeyStrength
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -53,17 +54,13 @@ from . import Scope, Consistency
# session_start_callback (callable): A callback to invoke when the session starts
# interrupt_callback (callable): A callback to invoke when we get interrupted
# ticker_callback (callable): Invoked every second while running the scheduler
-# job_start_callback (callable): Called when a job starts
-# job_complete_callback (callable): Called when a job completes
#
class Stream():
def __init__(self, context, session_start, *,
session_start_callback=None,
interrupt_callback=None,
- ticker_callback=None,
- job_start_callback=None,
- job_complete_callback=None):
+ ticker_callback=None):
#
# Public members
@@ -76,19 +73,27 @@ class Stream():
#
# Private members
#
- self._artifacts = context.artifactcache
- self._sourcecache = context.sourcecache
self._context = context
+ self._artifacts = None
+ self._sourcecache = None
self._project = None
self._pipeline = None
- self._scheduler = Scheduler(context, session_start,
+ self._state = State() # Owned by Stream, used by Core to set state
+ self._scheduler = Scheduler(context, session_start, self._state,
interrupt_callback=interrupt_callback,
- ticker_callback=ticker_callback,
- job_start_callback=job_start_callback,
- job_complete_callback=job_complete_callback)
+ ticker_callback=ticker_callback)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ # init()
+ #
+ # Initialization of Stream that has side-effects that require it to be
+ # performed after the Stream is created.
+ #
+ def init(self):
+ self._artifacts = self._context.artifactcache
+ self._sourcecache = self._context.sourcecache
+
# cleanup()
#
# Cleans up application state
@@ -972,6 +977,15 @@ class Stream():
self.queues = [queue]
self._run()
+ # get_state()
+ #
+ # Get the State object owned by Stream
+ #
+ # Returns:
+ # State: The State object
+ def get_state(self):
+ return self._state
+
#############################################################
# Scheduler API forwarding #
#############################################################
@@ -1213,7 +1227,6 @@ class Stream():
#
def _add_queue(self, queue, *, track=False):
self.queues.append(queue)
-
if not (track or self._first_non_track_queue):
self._first_non_track_queue = queue
self._first_non_track_queue.set_required_element_check()