summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-07-16 16:48:38 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-08-06 14:54:14 +0100
commitdc863015320f1ff9b749a678a2a1ce4b83c006e0 (patch)
treec432cf41d5edb05a1bea96e30493ac6d6ff3540d
parent07bb725bbf4817a4982f3eaeb5a4268bade3e9c7 (diff)
downloadbuildstream-phil/ui-split-refactor.tar.gz
Add workarounds for queue querying in main processphil/ui-split-refactor
-rw-r--r--src/buildstream/_frontend/app.py3
-rw-r--r--src/buildstream/_scheduler/scheduler.py29
-rw-r--r--src/buildstream/_stream.py13
3 files changed, 26 insertions, 19 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 90070af48..7fd10b4b6 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -570,8 +570,9 @@ class App():
if not self.stream.terminated:
if element_job:
# look-up queue
+ # Issue with pickling a queue object, so for now only pass action names
for q in self.stream.queues:
- if q.action_name == action_name:
+ if q == action_name:
queue = q
assert queue, "Job action {} does not have a corresponding queue".format(action_name)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 664986534..44ebeef5f 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+import queue
# Local imports
from .resources import Resources, ResourceType
@@ -55,6 +56,7 @@ class NotificationType(enum.Enum):
EXCEPTION = "exception"
TASK_ERROR = "task_error"
SCHED_TERMINATE = "sched_terminate"
+ QUEUES = "queues"
class Notification:
@@ -70,7 +72,8 @@ class Notification:
element=None,
exception=None,
domain=None,
- reason=None):
+ reason=None,
+ queues=None):
self.notification_type = notification_type
self.full_name = full_name
@@ -82,6 +85,7 @@ class Notification:
self.exception = exception
self.domain = domain
self.reason = reason
+ self.queues = queues
# Scheduler()
@@ -175,6 +179,14 @@ class Scheduler():
# Hold on to the queues to process
self.queues = queues
+ # Report to the main process which queues are in session,
+ # for now a list of action_names as pickling queues is
+ # causing errors. Will need actual queue object or bidirectional
+ # notification queue for error handling later.
+ queue_list = [q.action_name for q in self.queues]
+ notifcation = Notification(NotificationType.QUEUES, queues=queue_list)
+ self._notify(notifcation)
+
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
@@ -294,20 +306,17 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
- # process_jobs (bool): If the scheduler should also process the
- # job, else just generate the notification
#
- def job_completed(self, job, status, process_jobs=True):
+ def job_completed(self, job, status):
- if process_jobs:
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ self._active_jobs.remove(job)
+ element = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance if interactive failure handling. Note
# this may change if the frontend is run in a separate process for pickling
- element = job._element if (job.element_job and self._interactive_failure) else None
+ element = job._element if (job.element_job and self._interactive_failure) else element
notification = Notification(NotificationType.JOB_COMPLETE,
full_name=job.name,
@@ -317,9 +326,7 @@ class Scheduler():
element=element)
self._notify(notification)
- if process_jobs:
- # Now check for more jobs
- self._sched()
+ self._sched()
# check_cache_size():
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4de975e9b..e0f0842af 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,7 +38,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus, Notification
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -114,7 +114,6 @@ class Stream():
try:
func(*args, **kwargs)
except Exception as e:
- from ._scheduler.scheduler import Notification, NotificationType
queue.put(Notification(NotificationType.EXCEPTION, exception=e))
def run_in_subprocess(self, func, *args, **kwargs):
@@ -367,6 +366,7 @@ class Stream():
if track_elements:
self._enqueue_plan(track_elements, queue=track_queue)
self._enqueue_plan(elements)
+
self._run()
# fetch()
@@ -1646,15 +1646,14 @@ class Stream():
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
- if notification.failed_element:
- unique_id = notification.full_name
- else:
- unique_id = None
- self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ self._state.fail_task(notification.job_action, notification.full_name,
+ notification.failed_element, notification.element)
elif notification.notification_type == NotificationType.EXCEPTION:
raise notification.exception
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(notification.domain, notification.reason)
+ elif notification.notification_type == NotificationType.QUEUES:
+ self.queues = notification.queues
else:
raise StreamError("Unreccognised notification type recieved")