summaryrefslogtreecommitdiff
path: root/distbuild/worker_build_scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r--distbuild/worker_build_scheduler.py135
1 files changed, 81 insertions, 54 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 4f7ff98f..81c961e1 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -332,7 +332,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
who = event.who
- last_job = who.job() # the job this worker's just completed
+ last_job = who.current_job() # the job this worker's just completed
if last_job:
logging.debug('%s wants new job, just did %s',
@@ -395,9 +395,12 @@ class WorkerConnection(distbuild.StateMachine):
self._writeable_cache_server = writeable_cache_server
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
- self._helper_id = None
- self._job = None
- self._exec_response_msg = None
+
+ self._active_jobs = dict()
+ self._current_job = None
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None
+
self._debug_json = False
addr, port = self._conn.getpeername()
@@ -407,8 +410,8 @@ class WorkerConnection(distbuild.StateMachine):
def name(self):
return self._worker_name
- def job(self):
- return self._job
+ def current_job(self):
+ return self._current_job
def setup(self):
distbuild.crash_point()
@@ -448,31 +451,30 @@ class WorkerConnection(distbuild.StateMachine):
def _maybe_cancel(self, event_source, build_cancel):
- if build_cancel.id not in self._job.initiators:
+ if build_cancel.id not in self._current_job.initiators:
return # event not relevant
logging.debug('WC: BuildController %r requested a cancel',
event_source)
- if (len(self._job.initiators) == 1):
+ job = self._current_job
+ if (len(job.initiators) == 1):
logging.debug('WC: Cancelling running job %s '
'with job id %s running on %s',
- self._job.artifact.basename(),
- self._job.id,
+ job.artifact.basename(), job.id,
self.name())
- msg = distbuild.message('exec-cancel', id=self._job.id)
+ msg = distbuild.message('exec-cancel', id=job.id)
self._jm.send(msg)
self.mainloop.queue_event(self, _BuildCancelled())
else:
logging.debug('WC: Not cancelling running job %s with job id %s, '
'other initiators want it done: %s',
- self._job.artifact.basename(),
- self._job.id,
- [i for i in self._job.initiators
- if i != build_cancel.id])
+ job.artifact.basename(),
+ job.id,
+ [i for i in job.initiators if i != build_cancel.id])
- self._job.initiators.remove(build_cancel.id)
+ job.initiators.remove(build_cancel.id)
def _disconnected(self, event_source, event):
distbuild.crash_point()
@@ -485,23 +487,30 @@ class WorkerConnection(distbuild.StateMachine):
def _start_build(self, event_source, event):
distbuild.crash_point()
- self._job = event.job
- self._helper_id = None
- self._exec_response_msg = None
+ job = event.job
+
+ if job.id in self._active_jobs:
+ logging.warn('Duplicate job %s for worker %s', job.id, self.name())
+
+ if self._current_job_exec_response or self._current_job_cache_request:
+ logging.warn('Caching not finished for %s', self._current_job.id)
+
+ self._active_jobs[job.id] = job
+ self._current_job = job
logging.debug('WC: starting build: %s for %s' %
- (self._job.artifact.name, self._job.initiators))
+ (job.artifact.name, job.initiators))
argv = [
self._morph_instance,
'worker-build',
'--build-log-on-stdout',
- self._job.artifact.name,
+ job.artifact.name,
]
msg = distbuild.message('exec-request',
- id=self._job.id,
+ id=job.id,
argv=argv,
- stdin_contents=distbuild.serialise_artifact(self._job.artifact),
+ stdin_contents=distbuild.serialise_artifact(job.artifact),
)
self._jm.send(msg)
@@ -509,10 +518,10 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('WC: sent to worker %s: %r'
% (self._worker_name, msg))
- started = WorkerBuildStepStarted(self._job.initiators,
- self._job.artifact.source.cache_key, self.name())
+ started = WorkerBuildStepStarted(job.initiators,
+ job.artifact.source.cache_key, self.name())
- self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -527,37 +536,50 @@ class WorkerConnection(distbuild.StateMachine):
'exec-output': self._handle_exec_output,
'exec-response': self._handle_exec_response,
}
-
+
handler = handlers[event.msg['type']]
- handler(event.msg)
+ job = self._active_jobs.get(event.msg['id'])
+
+ if job:
+ handler(event.msg, job)
+ else:
+ logging.warn('Received %s for unknown job %s',
+ event.msg['type'], event.msg['id'])
+
+ def _handle_exec_output(self, msg, job):
+ '''Handle output from a job that the worker is or was running.'''
- def _handle_exec_output(self, msg):
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
+
logging.debug('WC: emitting: %s', repr(new))
self.mainloop.queue_event(
WorkerConnection,
- WorkerBuildOutput(new, self._job.artifact.source.cache_key))
+ WorkerBuildOutput(new, job.artifact.source.cache_key))
- def _handle_exec_response(self, msg):
- logging.debug('WC: finished building: %s' % self._job.artifact.name)
- logging.debug('initiators that need to know: %s'
- % self._job.initiators)
+ def _handle_exec_response(self, msg, job):
+ '''Handle completion of a job that the worker is or was running.'''
+
+ logging.debug('WC: finished building: %s' % job.artifact.name)
+ logging.debug('initiators that need to know: %s' % job.initiators)
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
if new['exit'] != 0:
# Build failed.
- new_event = WorkerBuildFailed(new,
- self._job.artifact.source.cache_key)
+ new_event = WorkerBuildFailed(new, job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobFailed(job))
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
self.mainloop.queue_event(self, _BuildFinished())
- self._exec_response_msg = new
+ self._current_job_exec_response = new
+
+ # The job is no longer considered active, because the worker is
+ # finished with it so we won't receive any more messages about it.
+ del self._active_jobs[job.id]
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -571,15 +593,16 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- kind = self._job.artifact.source.morphology['kind']
+ job = self._current_job
+ kind = job.artifact.source.morphology['kind']
if kind == 'chunk':
- source_artifacts = self._job.artifact.source.artifacts
+ source_artifacts = job.artifact.source.artifacts
suffixes = ['%s.%s' % (kind, name) for name in source_artifacts]
suffixes.append('build-log')
else:
- filename = '%s.%s' % (kind, self._job.artifact.name)
+ filename = '%s.%s' % (kind, job.artifact.name)
suffixes = [filename]
if kind == 'stratum':
@@ -595,22 +618,22 @@ class WorkerConnection(distbuild.StateMachine):
'/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
(urllib.quote(worker_host),
self._worker_cache_server_port,
- urllib.quote(self._job.artifact.source.cache_key),
+ urllib.quote(job.artifact.source.cache_key),
suffixes))
msg = distbuild.message(
'http-request', id=self._request_ids.next(), url=url,
method='GET', body=None, headers=None)
- self._helper_id = msg['id']
+ self._current_job_cache_request = msg['id']
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
- progress = WorkerBuildCaching(self._job.initiators,
- self._job.artifact.source.cache_key)
+ progress = WorkerBuildCaching(job.initiators,
+ job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
def _maybe_handle_helper_result(self, event_source, event):
- if event.msg['id'] == self._helper_id:
+ if event.msg['id'] == self._current_job_cache_request:
distbuild.crash_point()
logging.debug('caching: event.msg: %s' % repr(event.msg))
@@ -618,8 +641,8 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Shared artifact cache population done')
new_event = WorkerBuildFinished(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _Cached())
else:
@@ -634,13 +657,17 @@ class WorkerConnection(distbuild.StateMachine):
# The BuildController will not try to cancel jobs that have
# been marked as failed.
self.mainloop.queue_event(WorkerConnection,
- _JobFailed(self._job))
+ _JobFailed(self._current_job))
new_event = WorkerBuildFailed(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _BuildFailed())
- self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job))
+ self.mainloop.queue_event(WorkerConnection,
+ _JobFinished(self._current_job))
+
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None