summaryrefslogtreecommitdiff
path: root/distbuild/worker_build_scheduler.py
diff options
context:
space:
mode:
authorAdam Coldrick <adam.coldrick@codethink.co.uk>2015-03-19 09:34:58 +0000
committerMorph (on behalf of Adam Coldrick) <adam.coldrick@codethink.co.uk>2015-03-19 09:34:58 +0000
commit7db4ee53fb5398dd8f4ae8f56778735fe6531178 (patch)
tree01513d77326acd03b2da356ec2cd7f4761901b6b /distbuild/worker_build_scheduler.py
parent211d6317d22bace089da58875d280ae5e54d5d54 (diff)
downloadmorph-7db4ee53fb5398dd8f4ae8f56778735fe6531178.tar.gz
Morph build 2ee8190abe87461992f5b7ed85fe2ee9
System branch: master
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r--distbuild/worker_build_scheduler.py190
1 files changed, 123 insertions, 67 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index be732153..d00b0290 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -1,6 +1,6 @@
# distbuild/worker_build_scheduler.py -- schedule worker-builds on workers
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -12,8 +12,7 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+# with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
@@ -93,6 +92,12 @@ class _HaveAJob(object):
def __init__(self, job):
self.job = job
+class _Disconnected(object):
+
+ def __init__(self, who):
+ self.who = who
+
+
class Job(object):
def __init__(self, job_id, artifact, initiator_id):
@@ -220,7 +225,10 @@ class WorkerBuildQueuer(distbuild.StateMachine):
('idle', WorkerConnection, _JobFinished, 'idle',
self._set_job_finished),
('idle', WorkerConnection, _JobFailed, 'idle',
- self._set_job_failed)
+ self._set_job_failed),
+
+ ('idle', WorkerConnection, _Disconnected, 'idle',
+ self._handle_worker_disconnected),
]
self.add_transitions(spec)
@@ -262,13 +270,13 @@ class WorkerBuildQueuer(distbuild.StateMachine):
logging.debug('Worker build step already started: %s' %
event.artifact.basename())
progress = WorkerBuildStepAlreadyStarted(event.initiator_id,
- event.artifact.source.cache_key, job.who.name())
+ event.artifact.cache_key, job.who.name())
else:
logging.debug('Job created but not building yet '
'(waiting for a worker to become available): %s' %
event.artifact.basename())
progress = WorkerBuildWaiting(event.initiator_id,
- event.artifact.source.cache_key)
+ event.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
else:
@@ -279,7 +287,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self._give_job(job)
else:
progress = WorkerBuildWaiting(event.initiator_id,
- event.artifact.source.cache_key)
+ event.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
def _handle_cancel(self, event_source, event):
@@ -323,7 +331,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
who = event.who
- last_job = who.job() # the job this worker's just completed
+ last_job = who.current_job() # the job this worker's just completed
if last_job:
logging.debug('%s wants new job, just did %s',
@@ -355,8 +363,22 @@ class WorkerBuildQueuer(distbuild.StateMachine):
(job.artifact.name, worker.who.name()))
self.mainloop.queue_event(worker.who, _HaveAJob(job))
-
-
+
+ def _handle_worker_disconnected(self, event):
+ self._remove_worker(self, event.who)
+
+ def _remove_worker(self, worker):
+ logging.debug('WBQ: Removing worker %s from queue', worker.name())
+
+ # There should only be one InitiatorConnection instance per worker in
+ # the _available_workers list. But anything can happen in space! So we
+ # take care to remove all GiveJob messages in the list that came from
+ # the disconnected worker, not the first.
+ self._available_workers = filter(
+ lambda worker_msg: worker_msg.who != worker,
+ self._available_workers)
+
+
class WorkerConnection(distbuild.StateMachine):
'''Communicate with a single worker.'''
@@ -372,9 +394,12 @@ class WorkerConnection(distbuild.StateMachine):
self._writeable_cache_server = writeable_cache_server
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
- self._helper_id = None
- self._job = None
- self._exec_response_msg = None
+
+ self._active_jobs = dict()
+ self._current_job = None
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None
+
self._debug_json = False
addr, port = self._conn.getpeername()
@@ -384,8 +409,8 @@ class WorkerConnection(distbuild.StateMachine):
def name(self):
return self._worker_name
- def job(self):
- return self._job
+ def current_job(self):
+ return self._current_job
def setup(self):
distbuild.crash_point()
@@ -397,14 +422,15 @@ class WorkerConnection(distbuild.StateMachine):
spec = [
# state, source, event_class, new_state, callback
- ('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('idle', self._jm, distbuild.JsonEof, None, self._disconnected),
('idle', self, _HaveAJob, 'building', self._start_build),
('building', distbuild.BuildController,
distbuild.BuildCancel, 'building',
self._maybe_cancel),
- ('building', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('building', self._jm, distbuild.JsonEof, None,
+ self._disconnected),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
('building', self, _BuildFailed, 'idle', self._request_job),
@@ -412,6 +438,7 @@ class WorkerConnection(distbuild.StateMachine):
('building', self, _BuildFinished, 'caching',
self._request_caching),
+ ('caching', self._jm, distbuild.JsonEof, None, self._disconnected),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
@@ -423,58 +450,69 @@ class WorkerConnection(distbuild.StateMachine):
def _maybe_cancel(self, event_source, build_cancel):
- if build_cancel.id not in self._job.initiators:
+ if build_cancel.id not in self._current_job.initiators:
return # event not relevant
logging.debug('WC: BuildController %r requested a cancel',
event_source)
- if (len(self._job.initiators) == 1):
+ job = self._current_job
+ if (len(job.initiators) == 1):
logging.debug('WC: Cancelling running job %s '
'with job id %s running on %s',
- self._job.artifact.basename(),
- self._job.id,
+ job.artifact.basename(), job.id,
self.name())
- msg = distbuild.message('exec-cancel', id=self._job.id)
+ msg = distbuild.message('exec-cancel', id=job.id)
self._jm.send(msg)
self.mainloop.queue_event(self, _BuildCancelled())
else:
logging.debug('WC: Not cancelling running job %s with job id %s, '
'other initiators want it done: %s',
- self._job.artifact.basename(),
- self._job.id,
- [i for i in self._job.initiators
- if i != build_cancel.id])
+ job.artifact.basename(),
+ job.id,
+ [i for i in job.initiators if i != build_cancel.id])
- self._job.initiators.remove(build_cancel.id)
+ job.initiators.remove(build_cancel.id)
- def _reconnect(self, event_source, event):
+ def _disconnected(self, event_source, event):
distbuild.crash_point()
- logging.debug('WC: Triggering reconnect')
+ logging.debug('WC: Disconnected from worker %s' % self.name())
+ self.mainloop.queue_event(InitiatorConnection, _Disconnected(self))
+
self.mainloop.queue_event(self._cm, distbuild.Reconnect())
def _start_build(self, event_source, event):
distbuild.crash_point()
- self._job = event.job
- self._helper_id = None
- self._exec_response_msg = None
+ job = event.job
+
+ if job.id in self._active_jobs:
+ logging.warn('Duplicate job %s for worker %s', job.id, self.name())
+
+ if self._current_job_exec_response or self._current_job_cache_request:
+ logging.warn('Caching not finished for %s', self._current_job.id)
+
+ self._active_jobs[job.id] = job
+ self._current_job = job
logging.debug('WC: starting build: %s for %s' %
- (self._job.artifact.name, self._job.initiators))
+ (job.artifact.name, job.initiators))
argv = [
self._morph_instance,
'worker-build',
'--build-log-on-stdout',
- self._job.artifact.name,
+ job.artifact.name,
]
+
msg = distbuild.message('exec-request',
- id=self._job.id,
+ id=job.id,
argv=argv,
- stdin_contents=distbuild.serialise_artifact(self._job.artifact),
+ stdin_contents=distbuild.serialise_artifact(job.artifact,
+ job.artifact.repo,
+ job.artifact.ref),
)
self._jm.send(msg)
@@ -482,10 +520,10 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('WC: sent to worker %s: %r'
% (self._worker_name, msg))
- started = WorkerBuildStepStarted(self._job.initiators,
- self._job.artifact.source.cache_key, self.name())
+ started = WorkerBuildStepStarted(job.initiators,
+ job.artifact.cache_key, self.name())
- self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -500,37 +538,50 @@ class WorkerConnection(distbuild.StateMachine):
'exec-output': self._handle_exec_output,
'exec-response': self._handle_exec_response,
}
-
+
handler = handlers[event.msg['type']]
- handler(event.msg)
+ job = self._active_jobs.get(event.msg['id'])
+
+ if job:
+ handler(event.msg, job)
+ else:
+ logging.warn('Received %s for unknown job %s',
+ event.msg['type'], event.msg['id'])
+
+ def _handle_exec_output(self, msg, job):
+ '''Handle output from a job that the worker is or was running.'''
- def _handle_exec_output(self, msg):
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
+
logging.debug('WC: emitting: %s', repr(new))
self.mainloop.queue_event(
WorkerConnection,
- WorkerBuildOutput(new, self._job.artifact.source.cache_key))
+ WorkerBuildOutput(new, job.artifact.cache_key))
- def _handle_exec_response(self, msg):
- logging.debug('WC: finished building: %s' % self._job.artifact.name)
- logging.debug('initiators that need to know: %s'
- % self._job.initiators)
+ def _handle_exec_response(self, msg, job):
+ '''Handle completion of a job that the worker is or was running.'''
+
+ logging.debug('WC: finished building: %s' % job.artifact.name)
+ logging.debug('initiators that need to know: %s' % job.initiators)
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
if new['exit'] != 0:
# Build failed.
- new_event = WorkerBuildFailed(new,
- self._job.artifact.source.cache_key)
+ new_event = WorkerBuildFailed(new, job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobFailed(job))
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
self.mainloop.queue_event(self, _BuildFinished())
- self._exec_response_msg = new
+ self._current_job_exec_response = new
+
+ # The job is no longer considered active, because the worker is
+ # finished with it so we won't receive any more messages about it.
+ del self._active_jobs[job.id]
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -544,15 +595,16 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- kind = self._job.artifact.source.morphology['kind']
+ job = self._current_job
+ kind = job.artifact.kind
if kind == 'chunk':
- source_artifacts = self._job.artifact.source.artifacts
+ source_artifacts = job.artifact.source_artifacts
suffixes = ['%s.%s' % (kind, name) for name in source_artifacts]
suffixes.append('build-log')
else:
- filename = '%s.%s' % (kind, self._job.artifact.name)
+ filename = '%s.%s' % (kind, job.artifact.name)
suffixes = [filename]
if kind == 'stratum':
@@ -568,22 +620,22 @@ class WorkerConnection(distbuild.StateMachine):
'/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
(urllib.quote(worker_host),
self._worker_cache_server_port,
- urllib.quote(self._job.artifact.source.cache_key),
+ urllib.quote(job.artifact.cache_key),
suffixes))
msg = distbuild.message(
'http-request', id=self._request_ids.next(), url=url,
method='GET', body=None, headers=None)
- self._helper_id = msg['id']
+ self._current_job_cache_request = msg['id']
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
- progress = WorkerBuildCaching(self._job.initiators,
- self._job.artifact.source.cache_key)
+ progress = WorkerBuildCaching(job.initiators,
+ job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
def _maybe_handle_helper_result(self, event_source, event):
- if event.msg['id'] == self._helper_id:
+ if event.msg['id'] == self._current_job_cache_request:
distbuild.crash_point()
logging.debug('caching: event.msg: %s' % repr(event.msg))
@@ -591,8 +643,8 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Shared artifact cache population done')
new_event = WorkerBuildFinished(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _Cached())
else:
@@ -607,13 +659,17 @@ class WorkerConnection(distbuild.StateMachine):
# The BuildController will not try to cancel jobs that have
# been marked as failed.
self.mainloop.queue_event(WorkerConnection,
- _JobFailed(self._job))
+ _JobFailed(self._current_job))
new_event = WorkerBuildFailed(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _BuildFailed())
- self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job))
+ self.mainloop.queue_event(WorkerConnection,
+ _JobFinished(self._current_job))
+
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None