summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-07-31 16:31:47 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-08 13:52:36 +0000
commit0026e37918998addb61d7cefcbb9b5f7f6f7b4f4 (patch)
treea756eaf5ba545882f8ec69204759ac22b8112d23 /src/buildstream/_scheduler
parent1701aa1239f472317edfc6f675378dc91b1fcd61 (diff)
downloadbuildstream-0026e37918998addb61d7cefcbb9b5f7f6f7b4f4.tar.gz
_message.py: Use element_name & element_key instead of unique_idtpollard/messageobject
Adding the element full name and display key into all element related messages removes the need to look up the plugintable via a plugin unique_id just to retrieve the same values for logging and widget frontend display. Relying on plugintable state is also incompatible if the frontend will be running in a different process, as it will exist in multiple states. The element full name is now displayed instead of the unique_id, such as in the debugging widget. It is also displayed in place of 'name' (i.e including any junction prepend) to be more informative.
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py12
-rw-r--r--src/buildstream/_scheduler/jobs/job.py117
-rw-r--r--src/buildstream/_scheduler/queues/queue.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py11
4 files changed, 81 insertions, 61 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 138448685..246eb75c6 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -69,17 +69,13 @@ class ElementJob(Job):
super().__init__(*args, **kwargs)
self.set_name(element._get_full_name())
self.queue = queue
- self._element = element
+ self._element = element # Set the Element pertaining to the job
self._action_cb = action_cb # The action callable function
self._complete_cb = complete_cb # The complete callable function
- # Set the ID for logging purposes
- self.set_message_unique_id(element._unique_id)
- self.set_task_id(element._unique_id)
-
- @property
- def element(self):
- return self._element
+ # Set the plugin element name & key for logging purposes
+ self.set_message_element_name(self.name)
+ self.set_message_element_key(self._element._get_display_key())
def parent_complete(self, status, result):
self._complete_cb(self, self._element, status, self._result)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 2e6c7bb32..32c5559fb 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -147,8 +147,9 @@ class Job():
self._terminated = False # Whether this job has been explicitly terminated
self._logfile = logfile
- self._message_unique_id = None
- self._task_id = None
+ self._message_element_name = None # The plugin instance element name for messaging
+ self._message_element_key = None # The element key for messaging
+ self._element = None # The Element() passed to the Job() constructor, if applicable
# set_name()
#
@@ -174,8 +175,8 @@ class Job():
self._logfile,
self._max_retries,
self._tries,
- self._message_unique_id,
- self._task_id,
+ self._message_element_name,
+ self._message_element_key
)
# Make sure that picklability doesn't break, by exercising it during
@@ -311,56 +312,62 @@ class Job():
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
- # set_message_unique_id()
+ # set_message_element_name()
#
- # This is called by Job subclasses to set the plugin ID
- # issuing the message (if an element is related to the Job).
+ # This is called by Job subclasses to set the plugin instance element
+ # name issuing the message (if an element is related to the Job).
#
# Args:
- # unique_id (int): The id to be supplied to the Message() constructor
+ # element_name (int): The element_name to be supplied to the Message() constructor
#
- def set_message_unique_id(self, unique_id):
- self._message_unique_id = unique_id
+ def set_message_element_name(self, element_name):
+ self._message_element_name = element_name
- # set_task_id()
+ # set_message_element_key()
#
- # This is called by Job subclasses to set a plugin ID
- # associated with the task at large (if any element is related
- # to the task).
- #
- # This will only be used in the child process running the task.
- #
- # The task ID helps keep messages in the frontend coherent
- # in the case that multiple plugins log in the context of
- # a single task (e.g. running integration commands should appear
- # in the frontend for the element being built, not the element
- # running the integration commands).
+ # This is called by Job subclasses to set the element
+ # key for for the issuing message (if an element is related to the Job).
#
# Args:
- # task_id (int): The plugin identifier for this task
+ # element_key (tuple): The element_key tuple to be supplied to the Message() constructor
#
- def set_task_id(self, task_id):
- self._task_id = task_id
+ def set_message_element_key(self, element_key):
+ self._message_element_key = element_key
# message():
#
# Logs a message, this will be logged in the task's logfile and
# conditionally also be sent to the frontend.
#
+ # XXX: Note no calls to message() currently override the default
+ # name & key (previously unique_id), potential to be removed.
+ #
# Args:
# message_type (MessageType): The type of message to send
# message (str): The message
# kwargs: Remaining Message() constructor arguments, note that you can
- # override 'unique_id' this way.
+ # override 'element_name' and 'element_key' this way.
#
- def message(self, message_type, message, **kwargs):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs['scheduler'] = True
- unique_id = self._message_unique_id
- if "unique_id" in kwargs:
- unique_id = kwargs["unique_id"]
- del kwargs["unique_id"]
+ # If default name & key values not provided, set as given job attributes
+ if element_name is None:
+ element_name = self._message_element_name
+ if element_key is None:
+ element_key = self._message_element_key
self._scheduler.context.messenger.message(
- Message(unique_id, message_type, message, **kwargs))
+ Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
+
+ # get_element()
+ #
+ # Get the Element() related to the job, if jobtype (i.e ElementJob) is
+ # applicable, default None.
+ #
+ # Returns:
+ # (Element): The Element() instance pertaining to the Job, else None.
+ #
+ def get_element(self):
+ return self._element
#######################################################
# Abstract Methods #
@@ -573,13 +580,16 @@ class Job():
# that should be used - should contain {pid}.
# max_retries (int): The maximum number of retries.
# tries (int): The number of retries so far.
-# message_unique_id (int): None, or the id to be supplied to the Message() constructor.
-# task_id (int): None, or the plugin identifier for this job.
+# message_element_name (str): None, or the plugin instance element name
+# to be supplied to the Message() constructor.
+# message_element_key (tuple): None, or the element display key tuple
+# to be supplied to the Message() constructor.
#
class ChildJob():
def __init__(
- self, action_name, messenger, logdir, logfile, max_retries, tries, message_unique_id, task_id):
+ self, action_name, messenger, logdir, logfile, max_retries, tries,
+ message_element_name, message_element_key):
self.action_name = action_name
@@ -588,8 +598,8 @@ class ChildJob():
self._logfile = logfile
self._max_retries = max_retries
self._tries = tries
- self._message_unique_id = message_unique_id
- self._task_id = task_id
+ self._message_element_name = message_element_name
+ self._message_element_key = message_element_key
self._queue = None
@@ -598,20 +608,26 @@ class ChildJob():
# Logs a message, this will be logged in the task's logfile and
# conditionally also be sent to the frontend.
#
+ # XXX: Note no calls to message() currently override the default
+ # name & key (previously unique_id), potential to be removed.
+ #
# Args:
# message_type (MessageType): The type of message to send
# message (str): The message
- # kwargs: Remaining Message() constructor arguments, note that you can
- # override 'unique_id' this way.
+ # kwargs: Remaining Message() constructor arguments, note
+ # element_key is set in _child_message_handler
+ # for front end display if not already set or explicitly
+ # overriden here.
#
- def message(self, message_type, message, **kwargs):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs['scheduler'] = True
- unique_id = self._message_unique_id
- if "unique_id" in kwargs:
- unique_id = kwargs["unique_id"]
- del kwargs["unique_id"]
- self._messenger.message(
- Message(unique_id, message_type, message, **kwargs))
+ # If default name & key values not provided, set as given job attributes
+ if element_name is None:
+ element_name = self._message_element_name
+ if element_key is None:
+ element_key = self._message_element_key
+ self._messenger.message(Message(message_type, message, element_name=element_name,
+ element_key=element_key, **kwargs))
# send_message()
#
@@ -844,6 +860,8 @@ class ChildJob():
# frontend's main message handler in the context of a child task
# and performs local logging to the local log file before sending
# the message back to the parent process for further propagation.
+ # The related element display key is added to the message for
+ # widget rendering if not already set for an element childjob.
#
# Args:
# message (Message): The message to log
@@ -852,7 +870,12 @@ class ChildJob():
def _child_message_handler(self, message, is_silenced):
message.action_name = self.action_name
- message.task_id = self._task_id
+
+ # If no key has been set at this point, and the element job has
+ # a related key, set it. This is needed for messages going
+ # straight to the message handler from the child process.
+ if message.element_key is None and self._message_element_key:
+ message.element_key = self._message_element_key
# Send to frontend if appropriate
if is_silenced and (message.message_type not in unconditional_messages):
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 79f1fa44f..8c81ce21d 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -341,7 +341,7 @@ class Queue():
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
context = element._get_context()
- message = Message(element._unique_id, message_type, brief, **kwargs)
+ message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
context.messenger.message(message)
def _element_log_path(self, element):
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2dea1d48b..9d7cf5d09 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._profile import Topics, PROFILER
@@ -253,10 +253,11 @@ class Scheduler():
self._state.remove_task(job.action_name, job.name)
if status == JobStatus.FAIL:
- unique_id = None
- if isinstance(job, ElementJob):
- unique_id = job._element._unique_id
- self._state.fail_task(job.action_name, job.name, unique_id)
+ # If it's an elementjob, we want to compare against the failure messages
+ # and send the Element() instance. Note this will change if the frontend
+ # is run in a separate process for pickling
+ element = job.get_element()
+ self._state.fail_task(job.action_name, job.name, element=element)
# Now check for more jobs
self._sched()