summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-09-10 16:50:03 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-09-10 16:50:03 +0000
commit7ce327e063da7f29ae16f1384d612242e622bc39 (patch)
tree2aeb941d6dde5c0bdea4a038ec5c8bd8f5d5dbf5
parenteca24ae1fbc03503ffcf60ca2600dbc8fefc484f (diff)
parenta1252b72e9605c09519fd247ab6b90fea106cd54 (diff)
downloadbuildstream-7ce327e063da7f29ae16f1384d612242e622bc39.tar.gz
Merge branch 'juerg/duplicate-artifact-job' into 'master'
_state.py: Use separate task identifier See merge request BuildStream/buildstream!2062
-rw-r--r--src/buildstream/_frontend/app.py19
-rw-r--r--src/buildstream/_frontend/status.py30
-rw-r--r--src/buildstream/_messenger.py4
-rw-r--r--src/buildstream/_scheduler/jobs/job.py7
-rw-r--r--src/buildstream/_scheduler/scheduler.py12
-rw-r--r--src/buildstream/_state.py78
-rw-r--r--src/buildstream/_stream.py6
-rw-r--r--tests/frontend/push.py30
8 files changed, 103 insertions, 83 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 320ae32ee..b25a421c3 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -612,18 +612,18 @@ class App:
# the creation of an interactive shell, and the retrying of jobs.
#
# Args:
- # action_name (str): The name of the action being performed,
- # same as the task group, if it exists
- # full_name (str): The name of this specific task, e.g. the element full name
+ # task_id (str): The unique identifier of the task
# element (tuple): If an element job failed a tuple of Element instance unique_id & display key
#
- def _job_failed(self, action_name, full_name, element=None):
+ def _job_failed(self, task_id, element=None):
+ task = self._state.tasks[task_id]
+
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not self.stream.terminated:
if element:
# Get the last failure message for additional context
- failure = self._fail_messages.get(full_name)
+ failure = self._fail_messages.get(task.full_name)
# XXX This is dangerous, sometimes we get the job completed *before*
# the failure message reaches us ??
@@ -631,18 +631,19 @@ class App:
self._status.clear()
click.echo(
"\n\n\nBUG: Message handling out of sync, "
- + "unable to retrieve failure message for element {}\n\n\n\n\n".format(full_name),
+ + "unable to retrieve failure message for element {}\n\n\n\n\n".format(task.full_name),
err=True,
)
else:
- self._handle_failure(element, action_name, failure, full_name)
+ self._handle_failure(element, task, failure)
else:
# Not an element_job, we don't handle the failure
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
- def _handle_failure(self, element, action_name, failure, full_name):
+ def _handle_failure(self, element, task, failure):
+ full_name = task.full_name
# Handle non interactive mode setting of what to do when a job fails.
if not self._interactive_failures:
@@ -726,7 +727,7 @@ class App:
elif choice == "retry":
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- self.stream._failure_retry(action_name, unique_id)
+ self.stream._failure_retry(task.id, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index e0505b45b..6d11cf63f 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -159,13 +159,11 @@ class Status:
# Reacts to a specified job being changed
#
# Args:
- # action_name (str): The action name for this job
- # full_name (str): The name of this specific job (e.g. element name)
+ # task_id (str): The unique identifier of the task
#
- def _job_changed(self, action_name, full_name):
- job_key = (action_name, full_name)
- task = self._state.tasks[job_key]
- job = self._jobs[job_key]
+ def _job_changed(self, task_id):
+ task = self._state.tasks[task_id]
+ job = self._jobs[task_id]
if job.update(task):
self._need_alloc = True
@@ -293,14 +291,15 @@ class Status:
# Adds a job to track in the status area
#
# Args:
- # action_name (str): The action name for this job
- # full_name (str): The name of this specific job (e.g. element name)
+ # task_id (str): The unique identifier of the task
#
- def _add_job(self, action_name, full_name):
- task = self._state.tasks[(action_name, full_name)]
+ def _add_job(self, task_id):
+ task = self._state.tasks[task_id]
elapsed = task.elapsed_offset
- job = _StatusJob(self._context, action_name, full_name, self._content_profile, self._format_profile, elapsed)
- self._jobs[(action_name, full_name)] = job
+ job = _StatusJob(
+ self._context, task.action_name, task.full_name, self._content_profile, self._format_profile, elapsed
+ )
+ self._jobs[task_id] = job
self._need_alloc = True
# _remove_job()
@@ -308,11 +307,10 @@ class Status:
# Removes a job currently being tracked in the status area
#
# Args:
- # action_name (str): The action name for this job
- # full_name (str): The name of this specific job (e.g. element name)
+ # task_id (str): The unique identifier of the task
#
- def _remove_job(self, action_name, full_name):
- del self._jobs[(action_name, full_name)]
+ def _remove_job(self, task_id):
+ del self._jobs[task_id]
self._need_alloc = True
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 805f56b5b..b9ceb2107 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -209,7 +209,7 @@ class Messenger:
message = Message(MessageType.START, activity_name, element_name=element_name)
self.message(message)
- task = self._state.add_task(activity_name, full_name)
+ task = self._state.add_task(full_name, activity_name, full_name)
task.set_render_cb(self._render_status)
self._active_simple_tasks += 1
if not self._next_render:
@@ -224,7 +224,7 @@ class Messenger:
self.message(message)
raise
finally:
- self._state.remove_task(activity_name, full_name)
+ self._state.remove_task(full_name)
self._active_simple_tasks -= 1
if self._active_simple_tasks == 0:
self._next_render = None
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index fd4f7720d..c8ff853ed 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -23,6 +23,7 @@
# System imports
import asyncio
import datetime
+import itertools
import multiprocessing
import os
import signal
@@ -109,11 +110,17 @@ class _MessageType(FastEnum):
# max_retries (int): The maximum number of retries
#
class Job:
+ # Unique id generator for jobs
+ #
+ # This is used to identify tasks in the `State` class
+ _id_generator = itertools.count(1)
+
def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
#
+ self.id = "{}-{}".format(action_name, next(Job._id_generator))
self.name = None # The name of the job, set by the job's subclass
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3e6bf1f92..5f0d69a06 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -261,9 +261,9 @@ class Scheduler:
else:
element_info = None
- self._state.fail_task(job.action_name, job.name, element_info)
+ self._state.fail_task(job.id, element_info)
- self._state.remove_task(job.action_name, job.name)
+ self._state.remove_task(job.id)
self._sched()
@@ -306,7 +306,7 @@ class Scheduler:
self._active_jobs.append(job)
job.start()
- self._state.add_task(job.action_name, job.name, self._state.elapsed_time())
+ self._state.add_task(job.id, job.action_name, job.name, self._state.elapsed_time())
# _sched_queue_jobs()
#
@@ -497,10 +497,12 @@ class Scheduler:
self._ticker_callback()
self.loop.call_later(1, self._tick)
- def _failure_retry(self, action_name, unique_id):
+ def _failure_retry(self, task_id, unique_id):
+ task = self._state.tasks[task_id]
+
queue = None
for q in self.queues:
- if q.action_name == action_name:
+ if q.action_name == task.action_name:
queue = q
break
# Assert queue found, we should only be retrying a queued job
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 6e08c004d..4a552b0b9 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -126,10 +126,7 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # task_id (str): The unique identifier of the task
#
def register_task_added_callback(self, callback):
self._task_added_cbs.append(callback)
@@ -153,10 +150,7 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # task_id (str): The unique identifier of the task
#
def register_task_removed_callback(self, callback):
self._task_removed_cbs.append(callback)
@@ -180,10 +174,7 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # task_id (str): The unique identifier of the task
#
def register_task_changed_callback(self, callback):
self._task_changed_cbs.append(callback)
@@ -207,11 +198,9 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
- # element_job (bool): (optionally) If an element job failed.
+ # task_id (str): The unique identifier of the task
+ # element (tuple): (optionally) The element unique_id and display keys if an
+ # element job
#
def register_task_failed_callback(self, callback):
self._task_failed_cbs.append(callback)
@@ -235,11 +224,8 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
- # element_job (bool): (optionally) If an element job failed.
+ # task_id (str): The unique identifier of the task
+ # unique_id: The unique id of the plugin instance to look up
#
def register_task_retry_callback(self, callback):
self._task_retry_cbs.append(callback)
@@ -288,6 +274,7 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
+ # task_id (str): The unique identifier of the task
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
@@ -297,20 +284,19 @@ class State:
# use this as they don't report relative to wallclock time
# if the Scheduler has been suspended.
#
- def add_task(self, action_name, full_name, elapsed_offset=None):
- task_key = (action_name, full_name)
- assert task_key not in self.tasks, "Trying to add task '{}:{}' to '{}'".format(
- action_name, full_name, self.tasks
+ def add_task(self, task_id, action_name, full_name, elapsed_offset=None):
+ assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format(
+ action_name, full_name, task_id, self.tasks
)
if not elapsed_offset:
elapsed_offset = self.elapsed_time()
- task = _Task(self, action_name, full_name, elapsed_offset)
- self.tasks[task_key] = task
+ task = _Task(self, task_id, action_name, full_name, elapsed_offset)
+ self.tasks[task_id] = task
for cb in self._task_added_cbs:
- cb(action_name, full_name)
+ cb(task_id)
return task
@@ -321,17 +307,14 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # task_id (str): The unique identifier of the task
#
- def remove_task(self, action_name, full_name):
+ def remove_task(self, task_id):
# Rely on 'del' to raise an error when removing nonexistent tasks
- del self.tasks[(action_name, full_name)]
+ del self.tasks[task_id]
for cb in self._task_removed_cbs:
- cb(action_name, full_name)
+ cb(task_id)
# fail_task()
#
@@ -343,16 +326,13 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # task_id (str): The unique identifier of the task
# element (tuple): (optionally) The element unique_id and display keys if an
# element job
#
- def fail_task(self, action_name, full_name, element=None):
+ def fail_task(self, task_id, element=None):
for cb in self._task_failed_cbs:
- cb(action_name, full_name, element)
+ cb(task_id, element)
# retry_task()
#
@@ -361,12 +341,12 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # action_name: The name of the action, e.g. 'build'
+ # task_id (str): The unique identifier of the task
# unique_id: The unique id of the plugin instance to look up
#
- def retry_task(self, action_name: str, unique_id: str) -> None:
+ def retry_task(self, task_id: str, unique_id: str) -> None:
for cb in self._task_retry_cbs:
- cb(action_name, unique_id)
+ cb(task_id, unique_id)
# elapsed_time()
#
@@ -407,6 +387,7 @@ class State:
#
# Args:
# state (State): The State object
+# task_id (str): The unique identifier of the task
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
@@ -414,8 +395,9 @@ class State:
# elapsed_offset (timedelta): The time the task started, relative to
# buildstream's start time.
class _Task:
- def __init__(self, state, action_name, full_name, elapsed_offset):
+ def __init__(self, state, task_id, action_name, full_name, elapsed_offset):
self._state = state
+ self.id = task_id
self.action_name = action_name
self.full_name = full_name
self.elapsed_offset = elapsed_offset
@@ -437,14 +419,14 @@ class _Task:
def set_current_progress(self, progress):
self.current_progress = progress
for cb in self._state._task_changed_cbs:
- cb(self.action_name, self.full_name)
+ cb(self.id)
if self._render_cb:
self._render_cb()
def set_maximum_progress(self, progress):
self.maximum_progress = progress
for cb in self._state._task_changed_cbs:
- cb(self.action_name, self.full_name)
+ cb(self.id)
if self._render_cb:
self._render_cb()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 82bd4ee6f..fbb2fca69 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1346,11 +1346,11 @@ class Stream:
# failed task from the tasks group.
#
# Args:
- # action_name: The name of the action being performed
+ # task_id (str): The unique identifier of the task
# unique_id: A unique_id to load an Element instance
#
- def _failure_retry(self, action_name: str, unique_id: str) -> None:
- self._state.retry_task(action_name, unique_id)
+ def _failure_retry(self, task_id: str, unique_id: str) -> None:
+ self._state.retry_task(task_id, unique_id)
# _run()
#
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index 3a0afbd87..ffcc166f9 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -142,6 +142,36 @@ def test_push_artifact(cli, tmpdir, datafiles):
assert_shared(cli, share, project, element)
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_artifact_glob(cli, tmpdir, datafiles):
+ project = str(datafiles)
+ element = "target.bst"
+
+ # Configure a local cache
+ local_cache = os.path.join(str(tmpdir), "cache")
+ cli.configure({"cachedir": local_cache})
+
+ with create_artifact_share(os.path.join(str(tmpdir), "artifactshare")) as share:
+
+ # First build it without the artifact cache configured
+ result = cli.run(project=project, args=["build", element])
+ result.assert_success()
+
+ # Assert that the *artifact* is cached locally
+ cache_key = cli.get_element_key(project, element)
+ artifact_ref = os.path.join("test", os.path.splitext(element)[0], cache_key)
+ assert os.path.exists(os.path.join(local_cache, "artifacts", "refs", artifact_ref))
+
+ # Configure artifact share
+ cli.configure({"artifacts": {"url": share.repo, "push": True}})
+
+ # Run bst artifact push with a wildcard.
+ # This matches two artifact refs (weak and strong cache keys).
+ result = cli.run(project=project, args=["artifact", "push", "test/target/*"])
+ result.assert_success()
+ assert len(result.get_pushed_elements()) == 2
+
+
# Tests that:
#
# * `bst artifact push` fails if the element is not cached locally