diff options
-rw-r--r-- | distbuild/worker_build_scheduler.py | 43 | ||||
-rw-r--r-- | morphlib/app.py | 20 | ||||
-rw-r--r-- | morphlib/buildcommand.py | 35 | ||||
-rwxr-xr-x | morphlib/exts/kvm.check | 71 | ||||
-rw-r--r-- | morphlib/localrepocache.py | 76 |
5 files changed, 170 insertions, 75 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index be732153..4f7ff98f 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -1,6 +1,6 @@ # distbuild/worker_build_scheduler.py -- schedule worker-builds on workers # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -93,6 +93,12 @@ class _HaveAJob(object): def __init__(self, job): self.job = job +class _Disconnected(object): + + def __init__(self, who): + self.who = who + + class Job(object): def __init__(self, job_id, artifact, initiator_id): @@ -220,7 +226,10 @@ class WorkerBuildQueuer(distbuild.StateMachine): ('idle', WorkerConnection, _JobFinished, 'idle', self._set_job_finished), ('idle', WorkerConnection, _JobFailed, 'idle', - self._set_job_failed) + self._set_job_failed), + + ('idle', WorkerConnection, _Disconnected, 'idle', + self._handle_worker_disconnected), ] self.add_transitions(spec) @@ -355,8 +364,22 @@ class WorkerBuildQueuer(distbuild.StateMachine): (job.artifact.name, worker.who.name())) self.mainloop.queue_event(worker.who, _HaveAJob(job)) - - + + def _handle_worker_disconnected(self, event): + self._remove_worker(self, event.who) + + def _remove_worker(self, worker): + logging.debug('WBQ: Removing worker %s from queue', worker.name()) + + # There should only be one InitiatorConnection instance per worker in + # the _available_workers list. But anything can happen in space! So we + # take care to remove all GiveJob messages in the list that came from + # the disconnected worker, not the first. + self._available_workers = filter( + lambda worker_msg: worker_msg.who != worker, + self._available_workers) + + class WorkerConnection(distbuild.StateMachine): '''Communicate with a single worker.''' @@ -397,14 +420,15 @@ class WorkerConnection(distbuild.StateMachine): spec = [ # state, source, event_class, new_state, callback - ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), + ('idle', self._jm, distbuild.JsonEof, None, self._disconnected), ('idle', self, _HaveAJob, 'building', self._start_build), ('building', distbuild.BuildController, distbuild.BuildCancel, 'building', self._maybe_cancel), - ('building', self._jm, distbuild.JsonEof, None, self._reconnect), + ('building', self._jm, distbuild.JsonEof, None, + self._disconnected), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), ('building', self, _BuildFailed, 'idle', self._request_job), @@ -412,6 +436,7 @@ class WorkerConnection(distbuild.StateMachine): ('building', self, _BuildFinished, 'caching', self._request_caching), + ('caching', self._jm, distbuild.JsonEof, None, self._disconnected), ('caching', distbuild.HelperRouter, distbuild.HelperResult, 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), @@ -449,10 +474,12 @@ class WorkerConnection(distbuild.StateMachine): self._job.initiators.remove(build_cancel.id) - def _reconnect(self, event_source, event): + def _disconnected(self, event_source, event): distbuild.crash_point() - logging.debug('WC: Triggering reconnect') + logging.debug('WC: Disconnected from worker %s' % self.name()) + self.mainloop.queue_event(InitiatorConnection, _Disconnected(self)) + self.mainloop.queue_event(self._cm, distbuild.Reconnect()) def _start_build(self, event_source, event): diff --git a/morphlib/app.py b/morphlib/app.py index 0c87f814..b8bae850 100644 --- a/morphlib/app.py +++ b/morphlib/app.py @@ -297,26 +297,6 @@ class Morph(cliapp.Application): morphlib.util.sanitise_morphology_path(args[2])) args = args[3:] - def cache_repo_and_submodules(self, cache, url, ref, done): - subs_to_process = set() - subs_to_process.add((url, ref)) - while subs_to_process: - url, ref = subs_to_process.pop() - done.add((url, ref)) - cached_repo = cache.cache_repo(url) - cached_repo.update() - - try: - submodules = morphlib.git.Submodules(self, cached_repo.path, - ref) - submodules.load() - except morphlib.git.NoModulesFileError: - pass - else: - for submod in submodules: - if (submod.url, submod.commit) not in done: - subs_to_process.add((submod.url, submod.commit)) - def _write_status(self, text): timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) self.output.write('%s %s\n' % (timestamp, text)) diff --git a/morphlib/buildcommand.py b/morphlib/buildcommand.py index 0aa50a3b..8572450d 100644 --- a/morphlib/buildcommand.py +++ b/morphlib/buildcommand.py @@ -387,39 +387,8 @@ class BuildCommand(object): '''Update the local git repository cache with the sources.''' repo_name = source.repo_name - if self.app.settings['no-git-update']: - self.app.status(msg='Not updating existing git repository ' - '%(repo_name)s ' - 'because of no-git-update being set', - chatty=True, - repo_name=repo_name) - source.repo = self.lrc.get_repo(repo_name) - return - - if self.lrc.has_repo(repo_name): - source.repo = self.lrc.get_repo(repo_name) - try: - sha1 = source.sha1 - source.repo.resolve_ref_to_commit(sha1) - self.app.status(msg='Not updating git repository ' - '%(repo_name)s because it ' - 'already contains sha1 %(sha1)s', - chatty=True, repo_name=repo_name, - sha1=sha1) - except morphlib.gitdir.InvalidRefError: - self.app.status(msg='Updating %(repo_name)s', - repo_name=repo_name) - source.repo.update() - else: - self.app.status(msg='Cloning %(repo_name)s', - repo_name=repo_name) - source.repo = self.lrc.cache_repo(repo_name) - - # Update submodules. - done = set() - self.app.cache_repo_and_submodules( - self.lrc, source.repo.url, - source.sha1, done) + source.repo = self.lrc.get_updated_repo(repo_name, ref=source.sha1) + self.lrc.ensure_submodules(source.repo, source.sha1) def cache_artifacts_locally(self, artifacts): '''Get artifacts missing from local cache from remote cache.''' diff --git a/morphlib/exts/kvm.check b/morphlib/exts/kvm.check index 1bb4007a..b8877a89 100755 --- a/morphlib/exts/kvm.check +++ b/morphlib/exts/kvm.check @@ -1,5 +1,5 @@ #!/usr/bin/python -# Copyright (C) 2014 Codethink Limited +# Copyright (C) 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -17,6 +17,7 @@ '''Preparatory checks for Morph 'kvm' write extension''' import cliapp +import os import re import urlparse @@ -43,8 +44,10 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension): ssh_host, vm_name, vm_path = self.check_and_parse_location(location) self.check_ssh_connectivity(ssh_host) + self.check_can_create_file_at_given_path(ssh_host, vm_path) self.check_no_existing_libvirt_vm(ssh_host, vm_name) self.check_extra_disks_exist(ssh_host, self.parse_attach_disks()) + self.check_virtual_networks_are_started(ssh_host) def check_and_parse_location(self, location): '''Check and parse the location argument to get relevant data.''' @@ -73,6 +76,26 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension): 'write extension to deploy upgrades to existing machines.' % (ssh_host, vm_name)) + def check_can_create_file_at_given_path(self, ssh_host, vm_path): + + def check_can_write_to_given_path(): + try: + cliapp.ssh_runcmd(ssh_host, ['touch', vm_path]) + except cliapp.AppException as e: + raise cliapp.AppException("Can't write to location %s on %s" + % (vm_path, ssh_host)) + else: + cliapp.ssh_runcmd(ssh_host, ['rm', vm_path]) + + try: + cliapp.ssh_runcmd(ssh_host, ['test', '-e', vm_path]) + except cliapp.AppException as e: + # vm_path doesn't already exist, so let's test we can write + check_can_write_to_given_path() + else: + raise cliapp.AppException('%s already exists on %s' + % (vm_path, ssh_host)) + def check_extra_disks_exist(self, ssh_host, filename_list): for filename in filename_list: try: @@ -81,4 +104,50 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension): raise cliapp.AppException('Did not find file %s on host %s' % (filename, ssh_host)) + def check_virtual_networks_are_started(self, ssh_host): + + def check_virtual_network_is_started(network_name): + cmd = ['virsh', '-c', 'qemu:///system', 'net-info', network_name] + net_info = cliapp.ssh_runcmd(ssh_host, cmd).split('\n') + + def pretty_concat(lines): + return '\n'.join(['\t%s' % line for line in lines]) + + for line in net_info: + m = re.match('^Active:\W*(\w+)\W*', line) + if m: + break + else: + raise cliapp.AppException( + "Got unexpected output parsing output of `%s':\n%s" + % (' '.join(cmd), pretty_concat(net_info))) + + network_active = m.group(1) == 'yes' + + if not network_active: + raise cliapp.AppException("Network '%s' is not started" + % network_name) + + def name(nic_entry): + if ',' in nic_entry: + # NETWORK_NAME,mac=12:34,model=e1000... + return nic_entry[:nic_entry.find(',')] + else: + return nic_entry # NETWORK_NAME + + if 'NIC_CONFIG' in os.environ: + nics = os.environ['NIC_CONFIG'].split() + + # --network bridge= is used to specify a bridge + # --network user is used to specify a form of NAT + # (see the virt-install(1) man page) + networks = [name(n) for n in nics if not n.startswith('bridge=') + and not n.startswith('user')] + else: + networks = ['default'] + + for network in networks: + check_virtual_network_is_started(network) + + KvmPlusSshCheckExtension().run() diff --git a/morphlib/localrepocache.py b/morphlib/localrepocache.py index 39fbd200..1565b913 100644 --- a/morphlib/localrepocache.py +++ b/morphlib/localrepocache.py @@ -14,10 +14,7 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import logging import os -import re -import urllib2 import urlparse import string import sys @@ -246,15 +243,68 @@ class LocalRepoCache(object): return repo raise NotCached(reponame) - def get_updated_repo(self, reponame): # pragma: no cover - '''Return object representing cached repository, which is updated.''' + def get_updated_repo(self, repo_name, ref=None): # pragma: no cover + '''Return object representing cached repository. - if not self._app.settings['no-git-update']: - cached_repo = self.cache_repo(reponame) - self._app.status( - msg='Updating git repository %s in cache' % reponame) - cached_repo.update() - else: - cached_repo = self.get_repo(reponame) - return cached_repo + If 'ref' is None, the repo will be updated unless + app.settings['no-git-update'] is set. + + If 'ref' is set to a SHA1, the repo will only be updated if 'ref' isn't + already available locally. + ''' + + if self._app.settings['no-git-update']: + self._app.status(msg='Not updating existing git repository ' + '%(repo_name)s ' + 'because of no-git-update being set', + chatty=True, + repo_name=repo_name) + return self.get_repo(repo_name) + + if self.has_repo(repo_name): + repo = self.get_repo(repo_name) + if ref and morphlib.git.is_valid_sha1(ref): + try: + repo.resolve_ref_to_commit(ref) + self._app.status(msg='Not updating git repository ' + '%(repo_name)s because it ' + 'already contains sha1 %(sha1)s', + chatty=True, repo_name=repo_name, + sha1=ref) + return repo + except morphlib.gitdir.InvalidRefError: + pass + + self._app.status(msg='Updating %(repo_name)s', + repo_name=repo_name) + repo.update() + return repo + else: + self._app.status(msg='Cloning %(repo_name)s', + repo_name=repo_name) + return self.cache_repo(repo_name) + + def ensure_submodules(self, toplevel_repo, + toplevel_ref): # pragma: no cover + '''Ensure any submodules of a given repo are cached and up to date.''' + + def submodules_for_repo(repo_path, ref): + try: + submodules = morphlib.git.Submodules(self._app, repo_path, ref) + submodules.load() + return [(submod.url, submod.commit) for submod in submodules] + except morphlib.git.NoModulesFileError: + return [] + + done = set() + subs_to_process = submodules_for_repo(toplevel_repo.path, toplevel_ref) + while subs_to_process: + url, ref = subs_to_process.pop() + done.add((url, ref)) + + cached_repo = self.get_updated_repo(url, ref=ref) + + for submod in submodules_for_repo(cached_repo.path, ref): + if (submod.url, submod.commit) not in done: + subs_to_process.add((submod.url, submod.commit)) |