summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Ipsum <richard.ipsum@codethink.co.uk>2014-04-24 16:25:12 +0000
committerRichard Ipsum <richard.ipsum@codethink.co.uk>2014-04-24 16:25:12 +0000
commit2c682e6192b86eccaa869c23d82cde81b4ea42d4 (patch)
tree92b4710b4903c21a31e4cacc38130d4b73d50429
parentb8f2100e3f7fbff990d4f28c71f9c7a178b5d8f0 (diff)
parent4c3c32ed36388137897bf8eade030991a18092b3 (diff)
downloadmorph-2c682e6192b86eccaa869c23d82cde81b4ea42d4.tar.gz
Merge branch 'baserock/richardipsum/distbuild_jobs'
Reviewed by: Sam Thursfield Richard Maw Lars Wirzenius
-rw-r--r--distbuild/__init__.py5
-rw-r--r--distbuild/build_controller.py55
-rw-r--r--distbuild/initiator.py7
-rw-r--r--distbuild/initiator_connection.py22
-rw-r--r--distbuild/protocol.py5
-rw-r--r--distbuild/worker_build_scheduler.py271
6 files changed, 262 insertions, 103 deletions
diff --git a/distbuild/__init__.py b/distbuild/__init__.py
index 57aaccaf..7274f6a9 100644
--- a/distbuild/__init__.py
+++ b/distbuild/__init__.py
@@ -44,11 +44,14 @@ from worker_build_scheduler import (WorkerBuildQueuer,
WorkerCancelPending,
WorkerBuildOutput,
WorkerBuildCaching,
+ WorkerBuildStepAlreadyStarted,
+ WorkerBuildWaiting,
WorkerBuildFinished,
WorkerBuildFailed,
WorkerBuildStepStarted)
from build_controller import (BuildController, BuildFailed, BuildProgress,
- BuildSteps, BuildStepStarted, BuildOutput,
+ BuildSteps, BuildStepStarted,
+ BuildStepAlreadyStarted, BuildOutput,
BuildStepFinished, BuildStepFailed,
BuildFinished, BuildCancel,
build_step_name, map_build_graph)
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
index 1f91cd79..a3911586 100644
--- a/distbuild/build_controller.py
+++ b/distbuild/build_controller.py
@@ -93,7 +93,12 @@ class BuildStepStarted(object):
self.id = request_id
self.step_name = step_name
self.worker_name = worker_name
-
+
+class BuildStepAlreadyStarted(BuildStepStarted):
+
+ def __init__(self, request_id, step_name, worker_name):
+ super(BuildStepAlreadyStarted, self).__init__(
+ request_id, step_name, worker_name)
class BuildOutput(object):
@@ -206,6 +211,7 @@ class BuildController(distbuild.StateMachine):
# specific to WorkerConnection instances that our currently
# building for us, but the state machines are not intended to
# behave that way).
+
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildStepStarted, 'building',
self._maybe_relay_build_step_started),
@@ -216,6 +222,12 @@ class BuildController(distbuild.StateMachine):
distbuild.WorkerBuildCaching, 'building',
self._maybe_relay_build_caching),
('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildStepAlreadyStarted, 'building',
+ self._maybe_relay_build_step_already_started),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildWaiting, 'building',
+ self._maybe_relay_build_waiting_for_worker),
+ ('building', distbuild.WorkerConnection,
distbuild.WorkerBuildFinished, 'building',
self._maybe_check_result_and_queue_more_builds),
('building', distbuild.WorkerConnection,
@@ -432,13 +444,29 @@ class BuildController(distbuild.StateMachine):
cancel = BuildCancel(disconnect.id)
self.mainloop.queue_event(BuildController, cancel)
+ def _maybe_relay_build_waiting_for_worker(self, event_source, event):
+ if event.initiator_id != self._request['id']:
+ return # not for us
+
+ artifact = self._find_artifact(event.artifact_cache_key)
+ if artifact is None:
+ # This is not the event you are looking for.
+ return
+
+ progress = BuildProgress(
+ self._request['id'],
+ 'Ready to build %s: waiting for a worker to become available'
+ % artifact.name)
+ self.mainloop.queue_event(BuildController, progress)
+
def _maybe_relay_build_step_started(self, event_source, event):
distbuild.crash_point()
- if event.initiator_id != self._request['id']:
+ if self._request['id'] not in event.initiators:
return # not for us
logging.debug(
'BC: _relay_build_step_started: %s' % event.artifact_cache_key)
+
artifact = self._find_artifact(event.artifact_cache_key)
if artifact is None:
# This is not the event you are looking for.
@@ -450,9 +478,21 @@ class BuildController(distbuild.StateMachine):
self.mainloop.queue_event(BuildController, started)
logging.debug('BC: emitted %s' % repr(started))
+ def _maybe_relay_build_step_already_started(self, event_source, event):
+ if event.initiator_id != self._request['id']:
+ return # not for us
+
+ artifact = self._find_artifact(event.artifact_cache_key)
+
+ logging.debug('BC: got build step already started: %s' % artifact.name)
+ started = BuildStepAlreadyStarted(
+ self._request['id'], build_step_name(artifact), event.worker_name)
+ self.mainloop.queue_event(BuildController, started)
+ logging.debug('BC: emitted %s' % repr(started))
+
def _maybe_relay_build_output(self, event_source, event):
distbuild.crash_point()
- if event.msg['id'] != self._request['id']:
+ if self._request['id'] not in event.msg['ids']:
return # not for us
logging.debug('BC: got output: %s' % repr(event.msg))
@@ -470,7 +510,8 @@ class BuildController(distbuild.StateMachine):
def _maybe_relay_build_caching(self, event_source, event):
distbuild.crash_point()
- if event.initiator_id != self._request['id']:
+
+ if self._request['id'] not in event.initiators:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
@@ -493,7 +534,7 @@ class BuildController(distbuild.StateMachine):
def _maybe_check_result_and_queue_more_builds(self, event_source, event):
distbuild.crash_point()
- if event.msg['id'] != self._request['id']:
+ if self._request['id'] not in event.msg['ids']:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
@@ -534,8 +575,8 @@ class BuildController(distbuild.StateMachine):
def _maybe_notify_build_failed(self, event_source, event):
distbuild.crash_point()
- if event.msg['id'] != self._request['id']:
- return
+ if self._request['id'] not in event.msg['ids']:
+ return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
index 6e4ca65a..67f37764 100644
--- a/distbuild/initiator.py
+++ b/distbuild/initiator.py
@@ -94,6 +94,7 @@ class Initiator(distbuild.StateMachine):
'build-progress': self._handle_build_progress_message,
'build-steps': self._handle_build_steps_message,
'step-started': self._handle_step_started_message,
+ 'step-already-started': self._handle_step_already_started_message,
'step-output': self._handle_step_output_message,
'step-finished': self._handle_step_finished_message,
'step-failed': self._handle_step_failed_message,
@@ -127,6 +128,12 @@ class Initiator(distbuild.StateMachine):
self._step_outputs[msg['step_name']].close()
del self._step_outputs[msg['step_name']]
+ def _handle_step_already_started_message(self, msg):
+ self._app.status(
+ msg='%s is already building on %s' % (msg['step_name'],
+ msg['worker_name']))
+ self._open_output(msg)
+
def _handle_step_started_message(self, msg):
self._app.status(
msg='Started building %(step_name)s on %(worker_name)s',
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
index 34f2bdaa..b3769d7c 100644
--- a/distbuild/initiator_connection.py
+++ b/distbuild/initiator_connection.py
@@ -81,6 +81,9 @@ class InitiatorConnection(distbuild.StateMachine):
'idle', self._send_build_steps_message),
('idle', distbuild.BuildController, distbuild.BuildStepStarted,
'idle', self._send_build_step_started_message),
+ ('idle', distbuild.BuildController,
+ distbuild.BuildStepAlreadyStarted, 'idle',
+ self._send_build_step_already_started_message),
('idle', distbuild.BuildController, distbuild.BuildOutput,
'idle', self._send_build_output_message),
('idle', distbuild.BuildController, distbuild.BuildStepFinished,
@@ -113,7 +116,6 @@ class InitiatorConnection(distbuild.StateMachine):
self.initiator_name, str(id))
self.mainloop.queue_event(InitiatorConnection,
InitiatorDisconnect(id))
- # TODO should this clear our_ids?
self.mainloop.queue_event(self, _Close(event_source))
def _close(self, event_source, event):
@@ -183,6 +185,9 @@ class InitiatorConnection(distbuild.StateMachine):
self._log_send(msg)
def _send_build_step_started_message(self, event_source, event):
+ logging.debug('InitiatorConnection: build_step_started: '
+ 'id=%s step_name=%s worker_name=%s' %
+ (event.id, event.step_name, event.worker_name))
if event.id in self.our_ids:
msg = distbuild.message('step-started',
id=self._route_map.get_incoming_id(event.id),
@@ -191,6 +196,19 @@ class InitiatorConnection(distbuild.StateMachine):
self.jm.send(msg)
self._log_send(msg)
+ def _send_build_step_already_started_message(self, event_source, event):
+ logging.debug('InitiatorConnection: build_step_already_started: '
+ 'id=%s step_name=%s worker_name=%s' % (event.id, event.step_name,
+ event.worker_name))
+
+ if event.id in self.our_ids:
+ msg = distbuild.message('step-already-started',
+ id=self._route_map.get_incoming_id(event.id),
+ step_name=event.step_name,
+ worker_name=event.worker_name)
+ self.jm.send(msg)
+ self._log_send(msg)
+
def _send_build_output_message(self, event_source, event):
logging.debug('InitiatorConnection: build_output: '
'id=%s stdout=%s stderr=%s' %
@@ -205,6 +223,8 @@ class InitiatorConnection(distbuild.StateMachine):
self._log_send(msg)
def _send_build_step_finished_message(self, event_source, event):
+ logging.debug('heard built step finished: event.id: %s our_ids: %s'
+ % (str(event.id), str(self.our_ids)))
if event.id in self.our_ids:
msg = distbuild.message('step-finished',
id=self._route_map.get_incoming_id(event.id),
diff --git a/distbuild/protocol.py b/distbuild/protocol.py
index 9a4c362e..561201bb 100644
--- a/distbuild/protocol.py
+++ b/distbuild/protocol.py
@@ -39,6 +39,11 @@ _types = {
'step_name',
'worker_name',
],
+ 'step-already-started': [
+ 'id',
+ 'step_name',
+ 'worker_name',
+ ],
'step-output': [
'id',
'step_name',
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index fc5849b3..83873d74 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -33,20 +33,30 @@ class WorkerBuildRequest(object):
self.artifact = artifact
self.initiator_id = initiator_id
-
class WorkerCancelPending(object):
def __init__(self, initiator_id):
self.initiator_id = initiator_id
-
class WorkerBuildStepStarted(object):
+ def __init__(self, initiators, cache_key, worker_name):
+ self.initiators = initiators
+ self.artifact_cache_key = cache_key
+ self.worker_name = worker_name
+
+class WorkerBuildStepAlreadyStarted(object):
+
def __init__(self, initiator_id, cache_key, worker_name):
self.initiator_id = initiator_id
self.artifact_cache_key = cache_key
self.worker_name = worker_name
+class WorkerBuildWaiting(object):
+
+ def __init__(self, initiator_id, cache_key):
+ self.initiator_id = initiator_id
+ self.artifact_cache_key = cache_key
class WorkerBuildOutput(object):
@@ -54,21 +64,18 @@ class WorkerBuildOutput(object):
self.msg = msg
self.artifact_cache_key = cache_key
-
class WorkerBuildCaching(object):
- def __init__(self, initiator_id, cache_key):
- self.initiator_id = initiator_id
+ def __init__(self, initiators, cache_key):
+ self.initiators = initiators
self.artifact_cache_key = cache_key
-
class WorkerBuildFinished(object):
def __init__(self, msg, cache_key):
self.msg = msg
self.artifact_cache_key = cache_key
-
class WorkerBuildFailed(object):
def __init__(self, msg, cache_key):
@@ -84,21 +91,60 @@ class _NeedJob(object):
class _HaveAJob(object):
- def __init__(self, artifact, initiator_id):
+ def __init__(self, job):
+ self.job = job
+
+class Job(object):
+
+ def __init__(self, job_id, artifact, initiator_id):
+ self.id = job_id
self.artifact = artifact
- self.initiator_id = initiator_id
-
-
-class _JobIsFinished(object):
+ self.initiators = [initiator_id]
+ self.who = None # we don't know who's going to do this yet
+ self.is_building = False
- def __init__(self, msg):
- self.msg = msg
-
+ def add_initiator(self, initiator_id):
+ self.initiators.append(initiator_id)
+
+class Jobs(object):
+
+ def __init__(self, idgen):
+ self._idgen = idgen
+ self._jobs = {}
+
+ def get(self, artifact_basename):
+ return (self._jobs[artifact_basename]
+ if artifact_basename in self._jobs else None)
+
+ def create(self, artifact, initiator_id):
+ job = Job(self._idgen.next(), artifact, initiator_id)
+ self._jobs[job.artifact.basename()] = job
+ return job
+
+ def remove(self, job):
+ del self._jobs[job.artifact.basename()]
+
+ def exists(self, artifact_basename):
+ return artifact_basename in self._jobs
+
+ def get_next_job(self):
+ # for now just return the first thing we find that's not being built
+ waiting = [job for (_, job) in
+ self._jobs.iteritems() if job.who == None]
+
+ return waiting.pop() if len(waiting) > 0 else None
+
+ def __repr__(self):
+ return str([job.artifact.basename()
+ for (_, job) in self._jobs.iteritems()])
-class _JobFailed(object):
+class _BuildFinished(object):
+
+ pass
+
+class _BuildFailed(object):
pass
-
class _Cached(object):
@@ -123,8 +169,9 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
logging.debug('WBQ: Setting up %s' % self)
- self._request_queue = []
self._available_workers = []
+ self._jobs = Jobs(
+ distbuild.IdentifierGenerator('WorkerBuildQueuerJob'))
spec = [
# state, source, event_class, new_state, callback
@@ -136,45 +183,86 @@ class WorkerBuildQueuer(distbuild.StateMachine):
]
self.add_transitions(spec)
+
+
def _handle_request(self, event_source, event):
distbuild.crash_point()
- logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name)
- self._request_queue.append(event)
- logging.debug(
- 'WBQ: %d available workers and %d requests queued' %
- (len(self._available_workers),
- len(self._request_queue)))
- if self._available_workers:
- self._give_job()
+ logging.debug('Handling build request for %s' % event.initiator_id)
+ logging.debug('Current jobs: %s' % self._jobs)
+ logging.debug('Workers available: %d' % len(self._available_workers))
+
+ # Have we already made a job for this thing?
+ # If so, add our initiator id to the existing job
+ # If not, create a job
+
+ if self._jobs.exists(event.artifact.basename()):
+ job = self._jobs.get(event.artifact.basename())
+ job.initiators.append(event.initiator_id)
+
+ if job.is_building:
+ logging.debug('Worker build step already started: %s' %
+ event.artifact.basename())
+ progress = WorkerBuildStepAlreadyStarted(event.initiator_id,
+ event.artifact.cache_key, job.who.name())
+ else:
+ logging.debug('Job created but not building yet '
+ '(waiting for a worker to become available): %s' %
+ event.artifact.basename())
+ progress = WorkerBuildWaiting(event.initiator_id,
+ event.artifact.cache_key)
+
+ self.mainloop.queue_event(WorkerConnection, progress)
+ else:
+ logging.debug('WBQ: Creating job for: %s' % event.artifact.name)
+ job = self._jobs.create(event.artifact, event.initiator_id)
+
+ if self._available_workers:
+ self._give_job(job)
+ else:
+ progress = WorkerBuildWaiting(event.initiator_id,
+ event.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, progress)
def _handle_cancel(self, event_source, worker_cancel_pending):
- for request in [r for r in self._request_queue if
- r.initiator_id == worker_cancel_pending.initiator_id]:
- logging.debug('WBQ: Removing request from queue: %s',
- request.artifact.name)
- self._request_queue.remove(request)
+ # TODO: this probably needs to check whether any initiators
+ # care about this thing
+
+ pass
def _handle_worker(self, event_source, event):
distbuild.crash_point()
+ who = event.who
+ last_job = who.job() # the job this worker's just completed
+
+ if last_job:
+ logging.debug('%s wants new job, just did %s' %
+ (who.name(), last_job.artifact.basename()))
+
+ self._jobs.remove(last_job)
+ else:
+ logging.debug('%s wants its first job' % who.name())
+
logging.debug('WBQ: Adding worker to queue: %s' % event.who)
self._available_workers.append(event)
- logging.debug(
- 'WBQ: %d available workers and %d requests queued' %
- (len(self._available_workers),
- len(self._request_queue)))
- if self._request_queue:
- self._give_job()
+ logging.debug('Current jobs: %s' % self._jobs)
+ logging.debug('Workers available: %d' % len(self._available_workers))
+
+ job = self._jobs.get_next_job()
+
+ if job:
+ self._give_job(job)
- def _give_job(self):
- request = self._request_queue.pop(0)
+ def _give_job(self, job):
worker = self._available_workers.pop(0)
+ job.who = worker.who
+
logging.debug(
'WBQ: Giving %s to %s' %
- (request.artifact.name, worker.who.name()))
- self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact,
- request.initiator_id))
+ (job.artifact.name, worker.who.name()))
+
+ self.mainloop.queue_event(worker.who, _HaveAJob(job))
class WorkerConnection(distbuild.StateMachine):
@@ -194,6 +282,9 @@ class WorkerConnection(distbuild.StateMachine):
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
self._helper_id = None
+ self._job = None
+ self._exec_response_msg = None
+ self._debug_json = False
addr, port = self._conn.getpeername()
name = socket.getfqdn(addr)
@@ -202,6 +293,9 @@ class WorkerConnection(distbuild.StateMachine):
def name(self):
return self._worker_name
+ def job(self):
+ return self._job
+
def setup(self):
distbuild.crash_point()
@@ -221,14 +315,14 @@ class WorkerConnection(distbuild.StateMachine):
('building', self._jm, distbuild.JsonEof, None, self._reconnect),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
- ('building', self, _JobFailed, 'idle', self._request_job),
- ('building', self, _JobIsFinished, 'caching',
+ ('building', self, _BuildFailed, 'idle', self._request_job),
+ ('building', self, _BuildFinished, 'caching',
self._request_caching),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
- ('caching', self, _JobFailed, 'idle', self._request_job),
+ ('caching', self, _BuildFailed, 'idle', self._request_job),
]
self.add_transitions(spec)
@@ -237,13 +331,8 @@ class WorkerConnection(distbuild.StateMachine):
def _maybe_cancel(self, event_source, build_cancel):
logging.debug('WC: BuildController %r requested a cancel' %
event_source)
- if build_cancel.id == self._initiator_id:
- distbuild.crash_point()
- for id in self._initiator_request_map[self._initiator_id]:
- logging.debug('WC: Cancelling exec %s' % id)
- msg = distbuild.message('exec-cancel', id=id)
- self._jm.send(msg)
+ # TODO: implement cancel
def _reconnect(self, event_source, event):
distbuild.crash_point()
@@ -254,31 +343,34 @@ class WorkerConnection(distbuild.StateMachine):
def _start_build(self, event_source, event):
distbuild.crash_point()
- self._artifact = event.artifact
- self._initiator_id = event.initiator_id
+ self._job = event.job
+ self._helper_id = None
+ self._exec_response_msg = None
+
logging.debug('WC: starting build: %s for %s' %
- (self._artifact.name, self._initiator_id))
+ (self._job.artifact.name, self._job.initiators))
argv = [
self._morph_instance,
'worker-build',
- self._artifact.name,
+ self._job.artifact.name,
]
msg = distbuild.message('exec-request',
- id=self._request_ids.next(),
+ id=self._job.id,
argv=argv,
- stdin_contents=distbuild.serialise_artifact(self._artifact),
+ stdin_contents=distbuild.serialise_artifact(self._job.artifact),
)
self._jm.send(msg)
- logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg))
- self._route_map.add(self._initiator_id, msg['id'])
- self._initiator_request_map[self._initiator_id].add(msg['id'])
- logging.debug(
- 'WC: route map from %s to %s',
- self._artifact.cache_key, msg['id'])
- started = WorkerBuildStepStarted(
- self._initiator_id, self._artifact.cache_key, self.name())
+ if self._debug_json:
+ logging.debug('WC: sent to worker %s: %r'
+ % (self._worker_name, msg))
+
+ started = WorkerBuildStepStarted(self._job.initiators,
+ self._job.artifact.cache_key, self.name())
+
+ self._job.is_building = True
+
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -299,30 +391,29 @@ class WorkerConnection(distbuild.StateMachine):
def _handle_exec_output(self, msg):
new = dict(msg)
- new['id'] = self._route_map.get_incoming_id(msg['id'])
+ new['ids'] = self._job.initiators
logging.debug('WC: emitting: %s', repr(new))
self.mainloop.queue_event(
WorkerConnection,
- WorkerBuildOutput(new, self._artifact.cache_key))
+ WorkerBuildOutput(new, self._job.artifact.cache_key))
def _handle_exec_response(self, msg):
- logging.debug('WC: finished building: %s' % self._artifact.name)
+ logging.debug('WC: finished building: %s' % self._job.artifact.name)
+ logging.debug('initiators that need to know: %s'
+ % self._job.initiators)
new = dict(msg)
- new['id'] = self._route_map.get_incoming_id(msg['id'])
- self._route_map.remove(msg['id'])
- self._initiator_request_map[self._initiator_id].remove(msg['id'])
+ new['ids'] = self._job.initiators
if new['exit'] != 0:
# Build failed.
- new_event = WorkerBuildFailed(new, self._artifact.cache_key)
+ new_event = WorkerBuildFailed(new, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(self, _JobFailed())
- self._artifact = None
- self._initiator_id = None
+ self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
- self.mainloop.queue_event(self, _JobIsFinished(new))
+ self.mainloop.queue_event(self, _BuildFinished())
+ self._exec_response_msg = new
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -333,14 +424,14 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- kind = self._artifact.source.morphology['kind']
+ kind = self._job.artifact.source.morphology['kind']
if kind == 'chunk':
- source_artifacts = self._artifact.source.artifacts
+ source_artifacts = self._job.artifact.source.artifacts
suffixes = ['%s.%s' % (kind, name) for name in source_artifacts]
else:
- filename = '%s.%s' % (kind, self._artifact.name)
+ filename = '%s.%s' % (kind, self._job.artifact.name)
suffixes = [filename]
if kind == 'stratum':
@@ -360,7 +451,7 @@ class WorkerConnection(distbuild.StateMachine):
'/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
(urllib.quote(worker_host),
self._worker_cache_server_port,
- urllib.quote(self._artifact.cache_key),
+ urllib.quote(self._job.artifact.cache_key),
suffixes))
msg = distbuild.message(
@@ -370,12 +461,9 @@ class WorkerConnection(distbuild.StateMachine):
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
- progress = WorkerBuildCaching(
- self._initiator_id, self._artifact.cache_key)
+ progress = WorkerBuildCaching(self._job.initiators,
+ self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
-
- self._initiator_id = None
- self._finished_msg = event.msg
def _maybe_handle_helper_result(self, event_source, event):
if event.msg['id'] == self._helper_id:
@@ -384,21 +472,16 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('caching: event.msg: %s' % repr(event.msg))
if event.msg['status'] == httplib.OK:
logging.debug('Shared artifact cache population done')
+
new_event = WorkerBuildFinished(
- self._finished_msg, self._artifact.cache_key)
+ self._exec_response_msg, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self._finished_msg = None
- self._helper_id = None
self.mainloop.queue_event(self, _Cached())
else:
logging.error(
'Failed to populate artifact cache: %s %s' %
(event.msg['status'], event.msg['body']))
new_event = WorkerBuildFailed(
- self._finished_msg, self._artifact.cache_key)
+ self._exec_response_msg, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self._finished_msg = None
- self._helper_id = None
- self.mainloop.queue_event(self, _JobFailed())
-
- self._artifact = None
+ self.mainloop.queue_event(self, _BuildFailed())