diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-09-10 16:50:03 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-09-10 16:50:03 +0000 |
commit | 7ce327e063da7f29ae16f1384d612242e622bc39 (patch) | |
tree | 2aeb941d6dde5c0bdea4a038ec5c8bd8f5d5dbf5 | |
parent | eca24ae1fbc03503ffcf60ca2600dbc8fefc484f (diff) | |
parent | a1252b72e9605c09519fd247ab6b90fea106cd54 (diff) | |
download | buildstream-7ce327e063da7f29ae16f1384d612242e622bc39.tar.gz |
Merge branch 'juerg/duplicate-artifact-job' into 'master'
_state.py: Use separate task identifier
See merge request BuildStream/buildstream!2062
-rw-r--r-- | src/buildstream/_frontend/app.py | 19 | ||||
-rw-r--r-- | src/buildstream/_frontend/status.py | 30 | ||||
-rw-r--r-- | src/buildstream/_messenger.py | 4 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 12 | ||||
-rw-r--r-- | src/buildstream/_state.py | 78 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 6 | ||||
-rw-r--r-- | tests/frontend/push.py | 30 |
8 files changed, 103 insertions, 83 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 320ae32ee..b25a421c3 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -612,18 +612,18 @@ class App: # the creation of an interactive shell, and the retrying of jobs. # # Args: - # action_name (str): The name of the action being performed, - # same as the task group, if it exists - # full_name (str): The name of this specific task, e.g. the element full name + # task_id (str): The unique identifier of the task # element (tuple): If an element job failed a tuple of Element instance unique_id & display key # - def _job_failed(self, action_name, full_name, element=None): + def _job_failed(self, task_id, element=None): + task = self._state.tasks[task_id] + # Dont attempt to handle a failure if the user has already opted to # terminate if not self.stream.terminated: if element: # Get the last failure message for additional context - failure = self._fail_messages.get(full_name) + failure = self._fail_messages.get(task.full_name) # XXX This is dangerous, sometimes we get the job completed *before* # the failure message reaches us ?? @@ -631,18 +631,19 @@ class App: self._status.clear() click.echo( "\n\n\nBUG: Message handling out of sync, " - + "unable to retrieve failure message for element {}\n\n\n\n\n".format(full_name), + + "unable to retrieve failure message for element {}\n\n\n\n\n".format(task.full_name), err=True, ) else: - self._handle_failure(element, action_name, failure, full_name) + self._handle_failure(element, task, failure) else: # Not an element_job, we don't handle the failure click.echo("\nTerminating all jobs\n", err=True) self.stream.terminate() - def _handle_failure(self, element, action_name, failure, full_name): + def _handle_failure(self, element, task, failure): + full_name = task.full_name # Handle non interactive mode setting of what to do when a job fails. if not self._interactive_failures: @@ -726,7 +727,7 @@ class App: elif choice == "retry": click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - self.stream._failure_retry(action_name, unique_id) + self.stream._failure_retry(task.id, unique_id) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index e0505b45b..6d11cf63f 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -159,13 +159,11 @@ class Status: # Reacts to a specified job being changed # # Args: - # action_name (str): The action name for this job - # full_name (str): The name of this specific job (e.g. element name) + # task_id (str): The unique identifier of the task # - def _job_changed(self, action_name, full_name): - job_key = (action_name, full_name) - task = self._state.tasks[job_key] - job = self._jobs[job_key] + def _job_changed(self, task_id): + task = self._state.tasks[task_id] + job = self._jobs[task_id] if job.update(task): self._need_alloc = True @@ -293,14 +291,15 @@ class Status: # Adds a job to track in the status area # # Args: - # action_name (str): The action name for this job - # full_name (str): The name of this specific job (e.g. element name) + # task_id (str): The unique identifier of the task # - def _add_job(self, action_name, full_name): - task = self._state.tasks[(action_name, full_name)] + def _add_job(self, task_id): + task = self._state.tasks[task_id] elapsed = task.elapsed_offset - job = _StatusJob(self._context, action_name, full_name, self._content_profile, self._format_profile, elapsed) - self._jobs[(action_name, full_name)] = job + job = _StatusJob( + self._context, task.action_name, task.full_name, self._content_profile, self._format_profile, elapsed + ) + self._jobs[task_id] = job self._need_alloc = True # _remove_job() @@ -308,11 +307,10 @@ class Status: # Removes a job currently being tracked in the status area # # Args: - # action_name (str): The action name for this job - # full_name (str): The name of this specific job (e.g. element name) + # task_id (str): The unique identifier of the task # - def _remove_job(self, action_name, full_name): - del self._jobs[(action_name, full_name)] + def _remove_job(self, task_id): + del self._jobs[task_id] self._need_alloc = True diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 805f56b5b..b9ceb2107 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -209,7 +209,7 @@ class Messenger: message = Message(MessageType.START, activity_name, element_name=element_name) self.message(message) - task = self._state.add_task(activity_name, full_name) + task = self._state.add_task(full_name, activity_name, full_name) task.set_render_cb(self._render_status) self._active_simple_tasks += 1 if not self._next_render: @@ -224,7 +224,7 @@ class Messenger: self.message(message) raise finally: - self._state.remove_task(activity_name, full_name) + self._state.remove_task(full_name) self._active_simple_tasks -= 1 if self._active_simple_tasks == 0: self._next_render = None diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index fd4f7720d..c8ff853ed 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -23,6 +23,7 @@ # System imports import asyncio import datetime +import itertools import multiprocessing import os import signal @@ -109,11 +110,17 @@ class _MessageType(FastEnum): # max_retries (int): The maximum number of retries # class Job: + # Unique id generator for jobs + # + # This is used to identify tasks in the `State` class + _id_generator = itertools.count(1) + def __init__(self, scheduler, action_name, logfile, *, max_retries=0): # # Public members # + self.id = "{}-{}".format(action_name, next(Job._id_generator)) self.name = None # The name of the job, set by the job's subclass self.action_name = action_name # The action name for the Queue self.child_data = None # Data to be sent to the main process diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 3e6bf1f92..5f0d69a06 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -261,9 +261,9 @@ class Scheduler: else: element_info = None - self._state.fail_task(job.action_name, job.name, element_info) + self._state.fail_task(job.id, element_info) - self._state.remove_task(job.action_name, job.name) + self._state.remove_task(job.id) self._sched() @@ -306,7 +306,7 @@ class Scheduler: self._active_jobs.append(job) job.start() - self._state.add_task(job.action_name, job.name, self._state.elapsed_time()) + self._state.add_task(job.id, job.action_name, job.name, self._state.elapsed_time()) # _sched_queue_jobs() # @@ -497,10 +497,12 @@ class Scheduler: self._ticker_callback() self.loop.call_later(1, self._tick) - def _failure_retry(self, action_name, unique_id): + def _failure_retry(self, task_id, unique_id): + task = self._state.tasks[task_id] + queue = None for q in self.queues: - if q.action_name == action_name: + if q.action_name == task.action_name: queue = q break # Assert queue found, we should only be retrying a queued job diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index 6e08c004d..4a552b0b9 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -126,10 +126,7 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # task_id (str): The unique identifier of the task # def register_task_added_callback(self, callback): self._task_added_cbs.append(callback) @@ -153,10 +150,7 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # task_id (str): The unique identifier of the task # def register_task_removed_callback(self, callback): self._task_removed_cbs.append(callback) @@ -180,10 +174,7 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # task_id (str): The unique identifier of the task # def register_task_changed_callback(self, callback): self._task_changed_cbs.append(callback) @@ -207,11 +198,9 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. - # element_job (bool): (optionally) If an element job failed. + # task_id (str): The unique identifier of the task + # element (tuple): (optionally) The element unique_id and display keys if an + # element job # def register_task_failed_callback(self, callback): self._task_failed_cbs.append(callback) @@ -235,11 +224,8 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. - # element_job (bool): (optionally) If an element job failed. + # task_id (str): The unique identifier of the task + # unique_id: The unique id of the plugin instance to look up # def register_task_retry_callback(self, callback): self._task_retry_cbs.append(callback) @@ -288,6 +274,7 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: + # task_id (str): The unique identifier of the task # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name @@ -297,20 +284,19 @@ class State: # use this as they don't report relative to wallclock time # if the Scheduler has been suspended. # - def add_task(self, action_name, full_name, elapsed_offset=None): - task_key = (action_name, full_name) - assert task_key not in self.tasks, "Trying to add task '{}:{}' to '{}'".format( - action_name, full_name, self.tasks + def add_task(self, task_id, action_name, full_name, elapsed_offset=None): + assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format( + action_name, full_name, task_id, self.tasks ) if not elapsed_offset: elapsed_offset = self.elapsed_time() - task = _Task(self, action_name, full_name, elapsed_offset) - self.tasks[task_key] = task + task = _Task(self, task_id, action_name, full_name, elapsed_offset) + self.tasks[task_id] = task for cb in self._task_added_cbs: - cb(action_name, full_name) + cb(task_id) return task @@ -321,17 +307,14 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # task_id (str): The unique identifier of the task # - def remove_task(self, action_name, full_name): + def remove_task(self, task_id): # Rely on 'del' to raise an error when removing nonexistent tasks - del self.tasks[(action_name, full_name)] + del self.tasks[task_id] for cb in self._task_removed_cbs: - cb(action_name, full_name) + cb(task_id) # fail_task() # @@ -343,16 +326,13 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # task_id (str): The unique identifier of the task # element (tuple): (optionally) The element unique_id and display keys if an # element job # - def fail_task(self, action_name, full_name, element=None): + def fail_task(self, task_id, element=None): for cb in self._task_failed_cbs: - cb(action_name, full_name, element) + cb(task_id, element) # retry_task() # @@ -361,12 +341,12 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # action_name: The name of the action, e.g. 'build' + # task_id (str): The unique identifier of the task # unique_id: The unique id of the plugin instance to look up # - def retry_task(self, action_name: str, unique_id: str) -> None: + def retry_task(self, task_id: str, unique_id: str) -> None: for cb in self._task_retry_cbs: - cb(action_name, unique_id) + cb(task_id, unique_id) # elapsed_time() # @@ -407,6 +387,7 @@ class State: # # Args: # state (State): The State object +# task_id (str): The unique identifier of the task # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name @@ -414,8 +395,9 @@ class State: # elapsed_offset (timedelta): The time the task started, relative to # buildstream's start time. class _Task: - def __init__(self, state, action_name, full_name, elapsed_offset): + def __init__(self, state, task_id, action_name, full_name, elapsed_offset): self._state = state + self.id = task_id self.action_name = action_name self.full_name = full_name self.elapsed_offset = elapsed_offset @@ -437,14 +419,14 @@ class _Task: def set_current_progress(self, progress): self.current_progress = progress for cb in self._state._task_changed_cbs: - cb(self.action_name, self.full_name) + cb(self.id) if self._render_cb: self._render_cb() def set_maximum_progress(self, progress): self.maximum_progress = progress for cb in self._state._task_changed_cbs: - cb(self.action_name, self.full_name) + cb(self.id) if self._render_cb: self._render_cb() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 82bd4ee6f..fbb2fca69 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1346,11 +1346,11 @@ class Stream: # failed task from the tasks group. # # Args: - # action_name: The name of the action being performed + # task_id (str): The unique identifier of the task # unique_id: A unique_id to load an Element instance # - def _failure_retry(self, action_name: str, unique_id: str) -> None: - self._state.retry_task(action_name, unique_id) + def _failure_retry(self, task_id: str, unique_id: str) -> None: + self._state.retry_task(task_id, unique_id) # _run() # diff --git a/tests/frontend/push.py b/tests/frontend/push.py index 3a0afbd87..ffcc166f9 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -142,6 +142,36 @@ def test_push_artifact(cli, tmpdir, datafiles): assert_shared(cli, share, project, element) +@pytest.mark.datafiles(DATA_DIR) +def test_push_artifact_glob(cli, tmpdir, datafiles): + project = str(datafiles) + element = "target.bst" + + # Configure a local cache + local_cache = os.path.join(str(tmpdir), "cache") + cli.configure({"cachedir": local_cache}) + + with create_artifact_share(os.path.join(str(tmpdir), "artifactshare")) as share: + + # First build it without the artifact cache configured + result = cli.run(project=project, args=["build", element]) + result.assert_success() + + # Assert that the *artifact* is cached locally + cache_key = cli.get_element_key(project, element) + artifact_ref = os.path.join("test", os.path.splitext(element)[0], cache_key) + assert os.path.exists(os.path.join(local_cache, "artifacts", "refs", artifact_ref)) + + # Configure artifact share + cli.configure({"artifacts": {"url": share.repo, "push": True}}) + + # Run bst artifact push with a wildcard. + # This matches two artifact refs (weak and strong cache keys). + result = cli.run(project=project, args=["artifact", "push", "test/target/*"]) + result.assert_success() + assert len(result.get_pushed_elements()) == 2 + + # Tests that: # # * `bst artifact push` fails if the element is not cached locally |