summaryrefslogtreecommitdiff
path: root/distbuild/worker_build_scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r--distbuild/worker_build_scheduler.py271
1 files changed, 177 insertions, 94 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index fc5849b3..83873d74 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -33,20 +33,30 @@ class WorkerBuildRequest(object):
self.artifact = artifact
self.initiator_id = initiator_id
-
class WorkerCancelPending(object):
def __init__(self, initiator_id):
self.initiator_id = initiator_id
-
class WorkerBuildStepStarted(object):
+ def __init__(self, initiators, cache_key, worker_name):
+ self.initiators = initiators
+ self.artifact_cache_key = cache_key
+ self.worker_name = worker_name
+
+class WorkerBuildStepAlreadyStarted(object):
+
def __init__(self, initiator_id, cache_key, worker_name):
self.initiator_id = initiator_id
self.artifact_cache_key = cache_key
self.worker_name = worker_name
+class WorkerBuildWaiting(object):
+
+ def __init__(self, initiator_id, cache_key):
+ self.initiator_id = initiator_id
+ self.artifact_cache_key = cache_key
class WorkerBuildOutput(object):
@@ -54,21 +64,18 @@ class WorkerBuildOutput(object):
self.msg = msg
self.artifact_cache_key = cache_key
-
class WorkerBuildCaching(object):
- def __init__(self, initiator_id, cache_key):
- self.initiator_id = initiator_id
+ def __init__(self, initiators, cache_key):
+ self.initiators = initiators
self.artifact_cache_key = cache_key
-
class WorkerBuildFinished(object):
def __init__(self, msg, cache_key):
self.msg = msg
self.artifact_cache_key = cache_key
-
class WorkerBuildFailed(object):
def __init__(self, msg, cache_key):
@@ -84,21 +91,60 @@ class _NeedJob(object):
class _HaveAJob(object):
- def __init__(self, artifact, initiator_id):
+ def __init__(self, job):
+ self.job = job
+
+class Job(object):
+
+ def __init__(self, job_id, artifact, initiator_id):
+ self.id = job_id
self.artifact = artifact
- self.initiator_id = initiator_id
-
-
-class _JobIsFinished(object):
+ self.initiators = [initiator_id]
+ self.who = None # we don't know who's going to do this yet
+ self.is_building = False
- def __init__(self, msg):
- self.msg = msg
-
+ def add_initiator(self, initiator_id):
+ self.initiators.append(initiator_id)
+
+class Jobs(object):
+
+ def __init__(self, idgen):
+ self._idgen = idgen
+ self._jobs = {}
+
+ def get(self, artifact_basename):
+ return (self._jobs[artifact_basename]
+ if artifact_basename in self._jobs else None)
+
+ def create(self, artifact, initiator_id):
+ job = Job(self._idgen.next(), artifact, initiator_id)
+ self._jobs[job.artifact.basename()] = job
+ return job
+
+ def remove(self, job):
+ del self._jobs[job.artifact.basename()]
+
+ def exists(self, artifact_basename):
+ return artifact_basename in self._jobs
+
+ def get_next_job(self):
+ # for now just return the first thing we find that's not being built
+ waiting = [job for (_, job) in
+ self._jobs.iteritems() if job.who == None]
+
+ return waiting.pop() if len(waiting) > 0 else None
+
+ def __repr__(self):
+ return str([job.artifact.basename()
+ for (_, job) in self._jobs.iteritems()])
-class _JobFailed(object):
+class _BuildFinished(object):
+
+ pass
+
+class _BuildFailed(object):
pass
-
class _Cached(object):
@@ -123,8 +169,9 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
logging.debug('WBQ: Setting up %s' % self)
- self._request_queue = []
self._available_workers = []
+ self._jobs = Jobs(
+ distbuild.IdentifierGenerator('WorkerBuildQueuerJob'))
spec = [
# state, source, event_class, new_state, callback
@@ -136,45 +183,86 @@ class WorkerBuildQueuer(distbuild.StateMachine):
]
self.add_transitions(spec)
+
+
def _handle_request(self, event_source, event):
distbuild.crash_point()
- logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name)
- self._request_queue.append(event)
- logging.debug(
- 'WBQ: %d available workers and %d requests queued' %
- (len(self._available_workers),
- len(self._request_queue)))
- if self._available_workers:
- self._give_job()
+ logging.debug('Handling build request for %s' % event.initiator_id)
+ logging.debug('Current jobs: %s' % self._jobs)
+ logging.debug('Workers available: %d' % len(self._available_workers))
+
+ # Have we already made a job for this thing?
+ # If so, add our initiator id to the existing job
+ # If not, create a job
+
+ if self._jobs.exists(event.artifact.basename()):
+ job = self._jobs.get(event.artifact.basename())
+ job.initiators.append(event.initiator_id)
+
+ if job.is_building:
+ logging.debug('Worker build step already started: %s' %
+ event.artifact.basename())
+ progress = WorkerBuildStepAlreadyStarted(event.initiator_id,
+ event.artifact.cache_key, job.who.name())
+ else:
+ logging.debug('Job created but not building yet '
+ '(waiting for a worker to become available): %s' %
+ event.artifact.basename())
+ progress = WorkerBuildWaiting(event.initiator_id,
+ event.artifact.cache_key)
+
+ self.mainloop.queue_event(WorkerConnection, progress)
+ else:
+ logging.debug('WBQ: Creating job for: %s' % event.artifact.name)
+ job = self._jobs.create(event.artifact, event.initiator_id)
+
+ if self._available_workers:
+ self._give_job(job)
+ else:
+ progress = WorkerBuildWaiting(event.initiator_id,
+ event.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, progress)
def _handle_cancel(self, event_source, worker_cancel_pending):
- for request in [r for r in self._request_queue if
- r.initiator_id == worker_cancel_pending.initiator_id]:
- logging.debug('WBQ: Removing request from queue: %s',
- request.artifact.name)
- self._request_queue.remove(request)
+ # TODO: this probably needs to check whether any initiators
+ # care about this thing
+
+ pass
def _handle_worker(self, event_source, event):
distbuild.crash_point()
+ who = event.who
+ last_job = who.job() # the job this worker's just completed
+
+ if last_job:
+ logging.debug('%s wants new job, just did %s' %
+ (who.name(), last_job.artifact.basename()))
+
+ self._jobs.remove(last_job)
+ else:
+ logging.debug('%s wants its first job' % who.name())
+
logging.debug('WBQ: Adding worker to queue: %s' % event.who)
self._available_workers.append(event)
- logging.debug(
- 'WBQ: %d available workers and %d requests queued' %
- (len(self._available_workers),
- len(self._request_queue)))
- if self._request_queue:
- self._give_job()
+ logging.debug('Current jobs: %s' % self._jobs)
+ logging.debug('Workers available: %d' % len(self._available_workers))
+
+ job = self._jobs.get_next_job()
+
+ if job:
+ self._give_job(job)
- def _give_job(self):
- request = self._request_queue.pop(0)
+ def _give_job(self, job):
worker = self._available_workers.pop(0)
+ job.who = worker.who
+
logging.debug(
'WBQ: Giving %s to %s' %
- (request.artifact.name, worker.who.name()))
- self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact,
- request.initiator_id))
+ (job.artifact.name, worker.who.name()))
+
+ self.mainloop.queue_event(worker.who, _HaveAJob(job))
class WorkerConnection(distbuild.StateMachine):
@@ -194,6 +282,9 @@ class WorkerConnection(distbuild.StateMachine):
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
self._helper_id = None
+ self._job = None
+ self._exec_response_msg = None
+ self._debug_json = False
addr, port = self._conn.getpeername()
name = socket.getfqdn(addr)
@@ -202,6 +293,9 @@ class WorkerConnection(distbuild.StateMachine):
def name(self):
return self._worker_name
+ def job(self):
+ return self._job
+
def setup(self):
distbuild.crash_point()
@@ -221,14 +315,14 @@ class WorkerConnection(distbuild.StateMachine):
('building', self._jm, distbuild.JsonEof, None, self._reconnect),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
- ('building', self, _JobFailed, 'idle', self._request_job),
- ('building', self, _JobIsFinished, 'caching',
+ ('building', self, _BuildFailed, 'idle', self._request_job),
+ ('building', self, _BuildFinished, 'caching',
self._request_caching),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
- ('caching', self, _JobFailed, 'idle', self._request_job),
+ ('caching', self, _BuildFailed, 'idle', self._request_job),
]
self.add_transitions(spec)
@@ -237,13 +331,8 @@ class WorkerConnection(distbuild.StateMachine):
def _maybe_cancel(self, event_source, build_cancel):
logging.debug('WC: BuildController %r requested a cancel' %
event_source)
- if build_cancel.id == self._initiator_id:
- distbuild.crash_point()
- for id in self._initiator_request_map[self._initiator_id]:
- logging.debug('WC: Cancelling exec %s' % id)
- msg = distbuild.message('exec-cancel', id=id)
- self._jm.send(msg)
+ # TODO: implement cancel
def _reconnect(self, event_source, event):
distbuild.crash_point()
@@ -254,31 +343,34 @@ class WorkerConnection(distbuild.StateMachine):
def _start_build(self, event_source, event):
distbuild.crash_point()
- self._artifact = event.artifact
- self._initiator_id = event.initiator_id
+ self._job = event.job
+ self._helper_id = None
+ self._exec_response_msg = None
+
logging.debug('WC: starting build: %s for %s' %
- (self._artifact.name, self._initiator_id))
+ (self._job.artifact.name, self._job.initiators))
argv = [
self._morph_instance,
'worker-build',
- self._artifact.name,
+ self._job.artifact.name,
]
msg = distbuild.message('exec-request',
- id=self._request_ids.next(),
+ id=self._job.id,
argv=argv,
- stdin_contents=distbuild.serialise_artifact(self._artifact),
+ stdin_contents=distbuild.serialise_artifact(self._job.artifact),
)
self._jm.send(msg)
- logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg))
- self._route_map.add(self._initiator_id, msg['id'])
- self._initiator_request_map[self._initiator_id].add(msg['id'])
- logging.debug(
- 'WC: route map from %s to %s',
- self._artifact.cache_key, msg['id'])
- started = WorkerBuildStepStarted(
- self._initiator_id, self._artifact.cache_key, self.name())
+ if self._debug_json:
+ logging.debug('WC: sent to worker %s: %r'
+ % (self._worker_name, msg))
+
+ started = WorkerBuildStepStarted(self._job.initiators,
+ self._job.artifact.cache_key, self.name())
+
+ self._job.is_building = True
+
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -299,30 +391,29 @@ class WorkerConnection(distbuild.StateMachine):
def _handle_exec_output(self, msg):
new = dict(msg)
- new['id'] = self._route_map.get_incoming_id(msg['id'])
+ new['ids'] = self._job.initiators
logging.debug('WC: emitting: %s', repr(new))
self.mainloop.queue_event(
WorkerConnection,
- WorkerBuildOutput(new, self._artifact.cache_key))
+ WorkerBuildOutput(new, self._job.artifact.cache_key))
def _handle_exec_response(self, msg):
- logging.debug('WC: finished building: %s' % self._artifact.name)
+ logging.debug('WC: finished building: %s' % self._job.artifact.name)
+ logging.debug('initiators that need to know: %s'
+ % self._job.initiators)
new = dict(msg)
- new['id'] = self._route_map.get_incoming_id(msg['id'])
- self._route_map.remove(msg['id'])
- self._initiator_request_map[self._initiator_id].remove(msg['id'])
+ new['ids'] = self._job.initiators
if new['exit'] != 0:
# Build failed.
- new_event = WorkerBuildFailed(new, self._artifact.cache_key)
+ new_event = WorkerBuildFailed(new, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(self, _JobFailed())
- self._artifact = None
- self._initiator_id = None
+ self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
- self.mainloop.queue_event(self, _JobIsFinished(new))
+ self.mainloop.queue_event(self, _BuildFinished())
+ self._exec_response_msg = new
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -333,14 +424,14 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- kind = self._artifact.source.morphology['kind']
+ kind = self._job.artifact.source.morphology['kind']
if kind == 'chunk':
- source_artifacts = self._artifact.source.artifacts
+ source_artifacts = self._job.artifact.source.artifacts
suffixes = ['%s.%s' % (kind, name) for name in source_artifacts]
else:
- filename = '%s.%s' % (kind, self._artifact.name)
+ filename = '%s.%s' % (kind, self._job.artifact.name)
suffixes = [filename]
if kind == 'stratum':
@@ -360,7 +451,7 @@ class WorkerConnection(distbuild.StateMachine):
'/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
(urllib.quote(worker_host),
self._worker_cache_server_port,
- urllib.quote(self._artifact.cache_key),
+ urllib.quote(self._job.artifact.cache_key),
suffixes))
msg = distbuild.message(
@@ -370,12 +461,9 @@ class WorkerConnection(distbuild.StateMachine):
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
- progress = WorkerBuildCaching(
- self._initiator_id, self._artifact.cache_key)
+ progress = WorkerBuildCaching(self._job.initiators,
+ self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
-
- self._initiator_id = None
- self._finished_msg = event.msg
def _maybe_handle_helper_result(self, event_source, event):
if event.msg['id'] == self._helper_id:
@@ -384,21 +472,16 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('caching: event.msg: %s' % repr(event.msg))
if event.msg['status'] == httplib.OK:
logging.debug('Shared artifact cache population done')
+
new_event = WorkerBuildFinished(
- self._finished_msg, self._artifact.cache_key)
+ self._exec_response_msg, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self._finished_msg = None
- self._helper_id = None
self.mainloop.queue_event(self, _Cached())
else:
logging.error(
'Failed to populate artifact cache: %s %s' %
(event.msg['status'], event.msg['body']))
new_event = WorkerBuildFailed(
- self._finished_msg, self._artifact.cache_key)
+ self._exec_response_msg, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self._finished_msg = None
- self._helper_id = None
- self.mainloop.queue_event(self, _JobFailed())
-
- self._artifact = None
+ self.mainloop.queue_event(self, _BuildFailed())