diff options
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 138 |
1 files changed, 82 insertions, 56 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 4f7ff98f..e58059b2 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -12,8 +12,7 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. +# with this program. If not, see <http://www.gnu.org/licenses/>. import collections @@ -332,7 +331,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() who = event.who - last_job = who.job() # the job this worker's just completed + last_job = who.current_job() # the job this worker's just completed if last_job: logging.debug('%s wants new job, just did %s', @@ -395,9 +394,12 @@ class WorkerConnection(distbuild.StateMachine): self._writeable_cache_server = writeable_cache_server self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance - self._helper_id = None - self._job = None - self._exec_response_msg = None + + self._active_jobs = dict() + self._current_job = None + self._current_job_exec_response = None + self._current_job_cache_request = None + self._debug_json = False addr, port = self._conn.getpeername() @@ -407,8 +409,8 @@ class WorkerConnection(distbuild.StateMachine): def name(self): return self._worker_name - def job(self): - return self._job + def current_job(self): + return self._current_job def setup(self): distbuild.crash_point() @@ -448,31 +450,30 @@ class WorkerConnection(distbuild.StateMachine): def _maybe_cancel(self, event_source, build_cancel): - if build_cancel.id not in self._job.initiators: + if build_cancel.id not in self._current_job.initiators: return # event not relevant logging.debug('WC: BuildController %r requested a cancel', event_source) - if (len(self._job.initiators) == 1): + job = self._current_job + if (len(job.initiators) == 1): logging.debug('WC: Cancelling running job %s ' 'with job id %s running on %s', - self._job.artifact.basename(), - self._job.id, + job.artifact.basename(), job.id, self.name()) - msg = distbuild.message('exec-cancel', id=self._job.id) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) else: logging.debug('WC: Not cancelling running job %s with job id %s, ' 'other initiators want it done: %s', - self._job.artifact.basename(), - self._job.id, - [i for i in self._job.initiators - if i != build_cancel.id]) + job.artifact.basename(), + job.id, + [i for i in job.initiators if i != build_cancel.id]) - self._job.initiators.remove(build_cancel.id) + job.initiators.remove(build_cancel.id) def _disconnected(self, event_source, event): distbuild.crash_point() @@ -485,23 +486,30 @@ class WorkerConnection(distbuild.StateMachine): def _start_build(self, event_source, event): distbuild.crash_point() - self._job = event.job - self._helper_id = None - self._exec_response_msg = None + job = event.job + + if job.id in self._active_jobs: + logging.warn('Duplicate job %s for worker %s', job.id, self.name()) + + if self._current_job_exec_response or self._current_job_cache_request: + logging.warn('Caching not finished for %s', self._current_job.id) + + self._active_jobs[job.id] = job + self._current_job = job logging.debug('WC: starting build: %s for %s' % - (self._job.artifact.name, self._job.initiators)) + (job.artifact.name, job.initiators)) argv = [ self._morph_instance, 'worker-build', '--build-log-on-stdout', - self._job.artifact.name, + job.artifact.name, ] msg = distbuild.message('exec-request', - id=self._job.id, + id=job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(self._job.artifact), + stdin_contents=distbuild.serialise_artifact(job.artifact), ) self._jm.send(msg) @@ -509,10 +517,10 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) - started = WorkerBuildStepStarted(self._job.initiators, - self._job.artifact.source.cache_key, self.name()) + started = WorkerBuildStepStarted(job.initiators, + job.artifact.source.cache_key, self.name()) - self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -527,37 +535,50 @@ class WorkerConnection(distbuild.StateMachine): 'exec-output': self._handle_exec_output, 'exec-response': self._handle_exec_response, } - + handler = handlers[event.msg['type']] - handler(event.msg) + job = self._active_jobs.get(event.msg['id']) + + if job: + handler(event.msg, job) + else: + logging.warn('Received %s for unknown job %s', + event.msg['type'], event.msg['id']) + + def _handle_exec_output(self, msg, job): + '''Handle output from a job that the worker is or was running.''' - def _handle_exec_output(self, msg): new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators + logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, self._job.artifact.source.cache_key)) + WorkerBuildOutput(new, job.artifact.source.cache_key)) - def _handle_exec_response(self, msg): - logging.debug('WC: finished building: %s' % self._job.artifact.name) - logging.debug('initiators that need to know: %s' - % self._job.initiators) + def _handle_exec_response(self, msg, job): + '''Handle completion of a job that the worker is or was running.''' + + logging.debug('WC: finished building: %s' % job.artifact.name) + logging.debug('initiators that need to know: %s' % job.initiators) new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, - self._job.artifact.source.cache_key) + new_event = WorkerBuildFailed(new, job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobFailed(job)) self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. self.mainloop.queue_event(self, _BuildFinished()) - self._exec_response_msg = new + self._current_job_exec_response = new + + # The job is no longer considered active, because the worker is + # finished with it so we won't receive any more messages about it. + del self._active_jobs[job.id] def _request_job(self, event_source, event): distbuild.crash_point() @@ -571,15 +592,16 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - kind = self._job.artifact.source.morphology['kind'] + job = self._current_job + kind = job.artifact.source.morphology['kind'] if kind == 'chunk': - source_artifacts = self._job.artifact.source.artifacts + source_artifacts = job.artifact.source.artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] suffixes.append('build-log') else: - filename = '%s.%s' % (kind, self._job.artifact.name) + filename = '%s.%s' % (kind, job.artifact.name) suffixes = [filename] if kind == 'stratum': @@ -595,22 +617,22 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(self._job.artifact.source.cache_key), + urllib.quote(job.artifact.source.cache_key), suffixes)) msg = distbuild.message( 'http-request', id=self._request_ids.next(), url=url, method='GET', body=None, headers=None) - self._helper_id = msg['id'] + self._current_job_cache_request = msg['id'] req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) - progress = WorkerBuildCaching(self._job.initiators, - self._job.artifact.source.cache_key) + progress = WorkerBuildCaching(job.initiators, + job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _maybe_handle_helper_result(self, event_source, event): - if event.msg['id'] == self._helper_id: + if event.msg['id'] == self._current_job_cache_request: distbuild.crash_point() logging.debug('caching: event.msg: %s' % repr(event.msg)) @@ -618,8 +640,8 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Shared artifact cache population done') new_event = WorkerBuildFinished( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _Cached()) else: @@ -634,13 +656,17 @@ class WorkerConnection(distbuild.StateMachine): # The BuildController will not try to cancel jobs that have # been marked as failed. self.mainloop.queue_event(WorkerConnection, - _JobFailed(self._job)) + _JobFailed(self._current_job)) new_event = WorkerBuildFailed( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _BuildFailed()) - self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job)) + self.mainloop.queue_event(WorkerConnection, + _JobFinished(self._current_job)) + + self._current_job_exec_response = None + self._current_job_cache_request = None |