diff options
author | Adam Coldrick <adam.coldrick@codethink.co.uk> | 2015-03-19 09:34:58 +0000 |
---|---|---|
committer | Morph (on behalf of Adam Coldrick) <adam.coldrick@codethink.co.uk> | 2015-03-19 09:34:58 +0000 |
commit | 7db4ee53fb5398dd8f4ae8f56778735fe6531178 (patch) | |
tree | 01513d77326acd03b2da356ec2cd7f4761901b6b /distbuild/worker_build_scheduler.py | |
parent | 211d6317d22bace089da58875d280ae5e54d5d54 (diff) | |
download | morph-7db4ee53fb5398dd8f4ae8f56778735fe6531178.tar.gz |
Morph build 2ee8190abe87461992f5b7ed85fe2ee9
System branch: master
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 190 |
1 files changed, 123 insertions, 67 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index be732153..d00b0290 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -1,6 +1,6 @@ # distbuild/worker_build_scheduler.py -- schedule worker-builds on workers # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -12,8 +12,7 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. +# with this program. If not, see <http://www.gnu.org/licenses/>. import collections @@ -93,6 +92,12 @@ class _HaveAJob(object): def __init__(self, job): self.job = job +class _Disconnected(object): + + def __init__(self, who): + self.who = who + + class Job(object): def __init__(self, job_id, artifact, initiator_id): @@ -220,7 +225,10 @@ class WorkerBuildQueuer(distbuild.StateMachine): ('idle', WorkerConnection, _JobFinished, 'idle', self._set_job_finished), ('idle', WorkerConnection, _JobFailed, 'idle', - self._set_job_failed) + self._set_job_failed), + + ('idle', WorkerConnection, _Disconnected, 'idle', + self._handle_worker_disconnected), ] self.add_transitions(spec) @@ -262,13 +270,13 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('Worker build step already started: %s' % event.artifact.basename()) progress = WorkerBuildStepAlreadyStarted(event.initiator_id, - event.artifact.source.cache_key, job.who.name()) + event.artifact.cache_key, job.who.name()) else: logging.debug('Job created but not building yet ' '(waiting for a worker to become available): %s' % event.artifact.basename()) progress = WorkerBuildWaiting(event.initiator_id, - event.artifact.source.cache_key) + event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) else: @@ -279,7 +287,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._give_job(job) else: progress = WorkerBuildWaiting(event.initiator_id, - event.artifact.source.cache_key) + event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _handle_cancel(self, event_source, event): @@ -323,7 +331,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() who = event.who - last_job = who.job() # the job this worker's just completed + last_job = who.current_job() # the job this worker's just completed if last_job: logging.debug('%s wants new job, just did %s', @@ -355,8 +363,22 @@ class WorkerBuildQueuer(distbuild.StateMachine): (job.artifact.name, worker.who.name())) self.mainloop.queue_event(worker.who, _HaveAJob(job)) - - + + def _handle_worker_disconnected(self, event): + self._remove_worker(self, event.who) + + def _remove_worker(self, worker): + logging.debug('WBQ: Removing worker %s from queue', worker.name()) + + # There should only be one InitiatorConnection instance per worker in + # the _available_workers list. But anything can happen in space! So we + # take care to remove all GiveJob messages in the list that came from + # the disconnected worker, not the first. + self._available_workers = filter( + lambda worker_msg: worker_msg.who != worker, + self._available_workers) + + class WorkerConnection(distbuild.StateMachine): '''Communicate with a single worker.''' @@ -372,9 +394,12 @@ class WorkerConnection(distbuild.StateMachine): self._writeable_cache_server = writeable_cache_server self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance - self._helper_id = None - self._job = None - self._exec_response_msg = None + + self._active_jobs = dict() + self._current_job = None + self._current_job_exec_response = None + self._current_job_cache_request = None + self._debug_json = False addr, port = self._conn.getpeername() @@ -384,8 +409,8 @@ class WorkerConnection(distbuild.StateMachine): def name(self): return self._worker_name - def job(self): - return self._job + def current_job(self): + return self._current_job def setup(self): distbuild.crash_point() @@ -397,14 +422,15 @@ class WorkerConnection(distbuild.StateMachine): spec = [ # state, source, event_class, new_state, callback - ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), + ('idle', self._jm, distbuild.JsonEof, None, self._disconnected), ('idle', self, _HaveAJob, 'building', self._start_build), ('building', distbuild.BuildController, distbuild.BuildCancel, 'building', self._maybe_cancel), - ('building', self._jm, distbuild.JsonEof, None, self._reconnect), + ('building', self._jm, distbuild.JsonEof, None, + self._disconnected), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), ('building', self, _BuildFailed, 'idle', self._request_job), @@ -412,6 +438,7 @@ class WorkerConnection(distbuild.StateMachine): ('building', self, _BuildFinished, 'caching', self._request_caching), + ('caching', self._jm, distbuild.JsonEof, None, self._disconnected), ('caching', distbuild.HelperRouter, distbuild.HelperResult, 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), @@ -423,58 +450,69 @@ class WorkerConnection(distbuild.StateMachine): def _maybe_cancel(self, event_source, build_cancel): - if build_cancel.id not in self._job.initiators: + if build_cancel.id not in self._current_job.initiators: return # event not relevant logging.debug('WC: BuildController %r requested a cancel', event_source) - if (len(self._job.initiators) == 1): + job = self._current_job + if (len(job.initiators) == 1): logging.debug('WC: Cancelling running job %s ' 'with job id %s running on %s', - self._job.artifact.basename(), - self._job.id, + job.artifact.basename(), job.id, self.name()) - msg = distbuild.message('exec-cancel', id=self._job.id) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) else: logging.debug('WC: Not cancelling running job %s with job id %s, ' 'other initiators want it done: %s', - self._job.artifact.basename(), - self._job.id, - [i for i in self._job.initiators - if i != build_cancel.id]) + job.artifact.basename(), + job.id, + [i for i in job.initiators if i != build_cancel.id]) - self._job.initiators.remove(build_cancel.id) + job.initiators.remove(build_cancel.id) - def _reconnect(self, event_source, event): + def _disconnected(self, event_source, event): distbuild.crash_point() - logging.debug('WC: Triggering reconnect') + logging.debug('WC: Disconnected from worker %s' % self.name()) + self.mainloop.queue_event(InitiatorConnection, _Disconnected(self)) + self.mainloop.queue_event(self._cm, distbuild.Reconnect()) def _start_build(self, event_source, event): distbuild.crash_point() - self._job = event.job - self._helper_id = None - self._exec_response_msg = None + job = event.job + + if job.id in self._active_jobs: + logging.warn('Duplicate job %s for worker %s', job.id, self.name()) + + if self._current_job_exec_response or self._current_job_cache_request: + logging.warn('Caching not finished for %s', self._current_job.id) + + self._active_jobs[job.id] = job + self._current_job = job logging.debug('WC: starting build: %s for %s' % - (self._job.artifact.name, self._job.initiators)) + (job.artifact.name, job.initiators)) argv = [ self._morph_instance, 'worker-build', '--build-log-on-stdout', - self._job.artifact.name, + job.artifact.name, ] + msg = distbuild.message('exec-request', - id=self._job.id, + id=job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(self._job.artifact), + stdin_contents=distbuild.serialise_artifact(job.artifact, + job.artifact.repo, + job.artifact.ref), ) self._jm.send(msg) @@ -482,10 +520,10 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) - started = WorkerBuildStepStarted(self._job.initiators, - self._job.artifact.source.cache_key, self.name()) + started = WorkerBuildStepStarted(job.initiators, + job.artifact.cache_key, self.name()) - self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -500,37 +538,50 @@ class WorkerConnection(distbuild.StateMachine): 'exec-output': self._handle_exec_output, 'exec-response': self._handle_exec_response, } - + handler = handlers[event.msg['type']] - handler(event.msg) + job = self._active_jobs.get(event.msg['id']) + + if job: + handler(event.msg, job) + else: + logging.warn('Received %s for unknown job %s', + event.msg['type'], event.msg['id']) + + def _handle_exec_output(self, msg, job): + '''Handle output from a job that the worker is or was running.''' - def _handle_exec_output(self, msg): new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators + logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, self._job.artifact.source.cache_key)) + WorkerBuildOutput(new, job.artifact.cache_key)) - def _handle_exec_response(self, msg): - logging.debug('WC: finished building: %s' % self._job.artifact.name) - logging.debug('initiators that need to know: %s' - % self._job.initiators) + def _handle_exec_response(self, msg, job): + '''Handle completion of a job that the worker is or was running.''' + + logging.debug('WC: finished building: %s' % job.artifact.name) + logging.debug('initiators that need to know: %s' % job.initiators) new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, - self._job.artifact.source.cache_key) + new_event = WorkerBuildFailed(new, job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobFailed(job)) self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. self.mainloop.queue_event(self, _BuildFinished()) - self._exec_response_msg = new + self._current_job_exec_response = new + + # The job is no longer considered active, because the worker is + # finished with it so we won't receive any more messages about it. + del self._active_jobs[job.id] def _request_job(self, event_source, event): distbuild.crash_point() @@ -544,15 +595,16 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - kind = self._job.artifact.source.morphology['kind'] + job = self._current_job + kind = job.artifact.kind if kind == 'chunk': - source_artifacts = self._job.artifact.source.artifacts + source_artifacts = job.artifact.source_artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] suffixes.append('build-log') else: - filename = '%s.%s' % (kind, self._job.artifact.name) + filename = '%s.%s' % (kind, job.artifact.name) suffixes = [filename] if kind == 'stratum': @@ -568,22 +620,22 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(self._job.artifact.source.cache_key), + urllib.quote(job.artifact.cache_key), suffixes)) msg = distbuild.message( 'http-request', id=self._request_ids.next(), url=url, method='GET', body=None, headers=None) - self._helper_id = msg['id'] + self._current_job_cache_request = msg['id'] req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) - progress = WorkerBuildCaching(self._job.initiators, - self._job.artifact.source.cache_key) + progress = WorkerBuildCaching(job.initiators, + job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _maybe_handle_helper_result(self, event_source, event): - if event.msg['id'] == self._helper_id: + if event.msg['id'] == self._current_job_cache_request: distbuild.crash_point() logging.debug('caching: event.msg: %s' % repr(event.msg)) @@ -591,8 +643,8 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Shared artifact cache population done') new_event = WorkerBuildFinished( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _Cached()) else: @@ -607,13 +659,17 @@ class WorkerConnection(distbuild.StateMachine): # The BuildController will not try to cancel jobs that have # been marked as failed. self.mainloop.queue_event(WorkerConnection, - _JobFailed(self._job)) + _JobFailed(self._current_job)) new_event = WorkerBuildFailed( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _BuildFailed()) - self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job)) + self.mainloop.queue_event(WorkerConnection, + _JobFinished(self._current_job)) + + self._current_job_exec_response = None + self._current_job_cache_request = None |