summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--distbuild/worker_build_scheduler.py43
-rw-r--r--morphlib/app.py20
-rw-r--r--morphlib/buildcommand.py35
-rwxr-xr-xmorphlib/exts/kvm.check71
-rw-r--r--morphlib/localrepocache.py76
5 files changed, 170 insertions, 75 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index be732153..4f7ff98f 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -1,6 +1,6 @@
# distbuild/worker_build_scheduler.py -- schedule worker-builds on workers
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -93,6 +93,12 @@ class _HaveAJob(object):
def __init__(self, job):
self.job = job
+class _Disconnected(object):
+
+ def __init__(self, who):
+ self.who = who
+
+
class Job(object):
def __init__(self, job_id, artifact, initiator_id):
@@ -220,7 +226,10 @@ class WorkerBuildQueuer(distbuild.StateMachine):
('idle', WorkerConnection, _JobFinished, 'idle',
self._set_job_finished),
('idle', WorkerConnection, _JobFailed, 'idle',
- self._set_job_failed)
+ self._set_job_failed),
+
+ ('idle', WorkerConnection, _Disconnected, 'idle',
+ self._handle_worker_disconnected),
]
self.add_transitions(spec)
@@ -355,8 +364,22 @@ class WorkerBuildQueuer(distbuild.StateMachine):
(job.artifact.name, worker.who.name()))
self.mainloop.queue_event(worker.who, _HaveAJob(job))
-
-
+
+ def _handle_worker_disconnected(self, event):
+ self._remove_worker(self, event.who)
+
+ def _remove_worker(self, worker):
+ logging.debug('WBQ: Removing worker %s from queue', worker.name())
+
+ # There should only be one InitiatorConnection instance per worker in
+ # the _available_workers list. But anything can happen in space! So we
+ # take care to remove all GiveJob messages in the list that came from
+ # the disconnected worker, not the first.
+ self._available_workers = filter(
+ lambda worker_msg: worker_msg.who != worker,
+ self._available_workers)
+
+
class WorkerConnection(distbuild.StateMachine):
'''Communicate with a single worker.'''
@@ -397,14 +420,15 @@ class WorkerConnection(distbuild.StateMachine):
spec = [
# state, source, event_class, new_state, callback
- ('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('idle', self._jm, distbuild.JsonEof, None, self._disconnected),
('idle', self, _HaveAJob, 'building', self._start_build),
('building', distbuild.BuildController,
distbuild.BuildCancel, 'building',
self._maybe_cancel),
- ('building', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('building', self._jm, distbuild.JsonEof, None,
+ self._disconnected),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
('building', self, _BuildFailed, 'idle', self._request_job),
@@ -412,6 +436,7 @@ class WorkerConnection(distbuild.StateMachine):
('building', self, _BuildFinished, 'caching',
self._request_caching),
+ ('caching', self._jm, distbuild.JsonEof, None, self._disconnected),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
@@ -449,10 +474,12 @@ class WorkerConnection(distbuild.StateMachine):
self._job.initiators.remove(build_cancel.id)
- def _reconnect(self, event_source, event):
+ def _disconnected(self, event_source, event):
distbuild.crash_point()
- logging.debug('WC: Triggering reconnect')
+ logging.debug('WC: Disconnected from worker %s' % self.name())
+ self.mainloop.queue_event(InitiatorConnection, _Disconnected(self))
+
self.mainloop.queue_event(self._cm, distbuild.Reconnect())
def _start_build(self, event_source, event):
diff --git a/morphlib/app.py b/morphlib/app.py
index 0c87f814..b8bae850 100644
--- a/morphlib/app.py
+++ b/morphlib/app.py
@@ -297,26 +297,6 @@ class Morph(cliapp.Application):
morphlib.util.sanitise_morphology_path(args[2]))
args = args[3:]
- def cache_repo_and_submodules(self, cache, url, ref, done):
- subs_to_process = set()
- subs_to_process.add((url, ref))
- while subs_to_process:
- url, ref = subs_to_process.pop()
- done.add((url, ref))
- cached_repo = cache.cache_repo(url)
- cached_repo.update()
-
- try:
- submodules = morphlib.git.Submodules(self, cached_repo.path,
- ref)
- submodules.load()
- except morphlib.git.NoModulesFileError:
- pass
- else:
- for submod in submodules:
- if (submod.url, submod.commit) not in done:
- subs_to_process.add((submod.url, submod.commit))
-
def _write_status(self, text):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
self.output.write('%s %s\n' % (timestamp, text))
diff --git a/morphlib/buildcommand.py b/morphlib/buildcommand.py
index 0aa50a3b..8572450d 100644
--- a/morphlib/buildcommand.py
+++ b/morphlib/buildcommand.py
@@ -387,39 +387,8 @@ class BuildCommand(object):
'''Update the local git repository cache with the sources.'''
repo_name = source.repo_name
- if self.app.settings['no-git-update']:
- self.app.status(msg='Not updating existing git repository '
- '%(repo_name)s '
- 'because of no-git-update being set',
- chatty=True,
- repo_name=repo_name)
- source.repo = self.lrc.get_repo(repo_name)
- return
-
- if self.lrc.has_repo(repo_name):
- source.repo = self.lrc.get_repo(repo_name)
- try:
- sha1 = source.sha1
- source.repo.resolve_ref_to_commit(sha1)
- self.app.status(msg='Not updating git repository '
- '%(repo_name)s because it '
- 'already contains sha1 %(sha1)s',
- chatty=True, repo_name=repo_name,
- sha1=sha1)
- except morphlib.gitdir.InvalidRefError:
- self.app.status(msg='Updating %(repo_name)s',
- repo_name=repo_name)
- source.repo.update()
- else:
- self.app.status(msg='Cloning %(repo_name)s',
- repo_name=repo_name)
- source.repo = self.lrc.cache_repo(repo_name)
-
- # Update submodules.
- done = set()
- self.app.cache_repo_and_submodules(
- self.lrc, source.repo.url,
- source.sha1, done)
+ source.repo = self.lrc.get_updated_repo(repo_name, ref=source.sha1)
+ self.lrc.ensure_submodules(source.repo, source.sha1)
def cache_artifacts_locally(self, artifacts):
'''Get artifacts missing from local cache from remote cache.'''
diff --git a/morphlib/exts/kvm.check b/morphlib/exts/kvm.check
index 1bb4007a..b8877a89 100755
--- a/morphlib/exts/kvm.check
+++ b/morphlib/exts/kvm.check
@@ -1,5 +1,5 @@
#!/usr/bin/python
-# Copyright (C) 2014 Codethink Limited
+# Copyright (C) 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -17,6 +17,7 @@
'''Preparatory checks for Morph 'kvm' write extension'''
import cliapp
+import os
import re
import urlparse
@@ -43,8 +44,10 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension):
ssh_host, vm_name, vm_path = self.check_and_parse_location(location)
self.check_ssh_connectivity(ssh_host)
+ self.check_can_create_file_at_given_path(ssh_host, vm_path)
self.check_no_existing_libvirt_vm(ssh_host, vm_name)
self.check_extra_disks_exist(ssh_host, self.parse_attach_disks())
+ self.check_virtual_networks_are_started(ssh_host)
def check_and_parse_location(self, location):
'''Check and parse the location argument to get relevant data.'''
@@ -73,6 +76,26 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension):
'write extension to deploy upgrades to existing machines.' %
(ssh_host, vm_name))
+ def check_can_create_file_at_given_path(self, ssh_host, vm_path):
+
+ def check_can_write_to_given_path():
+ try:
+ cliapp.ssh_runcmd(ssh_host, ['touch', vm_path])
+ except cliapp.AppException as e:
+ raise cliapp.AppException("Can't write to location %s on %s"
+ % (vm_path, ssh_host))
+ else:
+ cliapp.ssh_runcmd(ssh_host, ['rm', vm_path])
+
+ try:
+ cliapp.ssh_runcmd(ssh_host, ['test', '-e', vm_path])
+ except cliapp.AppException as e:
+ # vm_path doesn't already exist, so let's test we can write
+ check_can_write_to_given_path()
+ else:
+ raise cliapp.AppException('%s already exists on %s'
+ % (vm_path, ssh_host))
+
def check_extra_disks_exist(self, ssh_host, filename_list):
for filename in filename_list:
try:
@@ -81,4 +104,50 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension):
raise cliapp.AppException('Did not find file %s on host %s' %
(filename, ssh_host))
+ def check_virtual_networks_are_started(self, ssh_host):
+
+ def check_virtual_network_is_started(network_name):
+ cmd = ['virsh', '-c', 'qemu:///system', 'net-info', network_name]
+ net_info = cliapp.ssh_runcmd(ssh_host, cmd).split('\n')
+
+ def pretty_concat(lines):
+ return '\n'.join(['\t%s' % line for line in lines])
+
+ for line in net_info:
+ m = re.match('^Active:\W*(\w+)\W*', line)
+ if m:
+ break
+ else:
+ raise cliapp.AppException(
+ "Got unexpected output parsing output of `%s':\n%s"
+ % (' '.join(cmd), pretty_concat(net_info)))
+
+ network_active = m.group(1) == 'yes'
+
+ if not network_active:
+ raise cliapp.AppException("Network '%s' is not started"
+ % network_name)
+
+ def name(nic_entry):
+ if ',' in nic_entry:
+ # NETWORK_NAME,mac=12:34,model=e1000...
+ return nic_entry[:nic_entry.find(',')]
+ else:
+ return nic_entry # NETWORK_NAME
+
+ if 'NIC_CONFIG' in os.environ:
+ nics = os.environ['NIC_CONFIG'].split()
+
+ # --network bridge= is used to specify a bridge
+ # --network user is used to specify a form of NAT
+ # (see the virt-install(1) man page)
+ networks = [name(n) for n in nics if not n.startswith('bridge=')
+ and not n.startswith('user')]
+ else:
+ networks = ['default']
+
+ for network in networks:
+ check_virtual_network_is_started(network)
+
+
KvmPlusSshCheckExtension().run()
diff --git a/morphlib/localrepocache.py b/morphlib/localrepocache.py
index 39fbd200..1565b913 100644
--- a/morphlib/localrepocache.py
+++ b/morphlib/localrepocache.py
@@ -14,10 +14,7 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import logging
import os
-import re
-import urllib2
import urlparse
import string
import sys
@@ -246,15 +243,68 @@ class LocalRepoCache(object):
return repo
raise NotCached(reponame)
- def get_updated_repo(self, reponame): # pragma: no cover
- '''Return object representing cached repository, which is updated.'''
+ def get_updated_repo(self, repo_name, ref=None): # pragma: no cover
+ '''Return object representing cached repository.
- if not self._app.settings['no-git-update']:
- cached_repo = self.cache_repo(reponame)
- self._app.status(
- msg='Updating git repository %s in cache' % reponame)
- cached_repo.update()
- else:
- cached_repo = self.get_repo(reponame)
- return cached_repo
+ If 'ref' is None, the repo will be updated unless
+ app.settings['no-git-update'] is set.
+
+ If 'ref' is set to a SHA1, the repo will only be updated if 'ref' isn't
+ already available locally.
+ '''
+
+ if self._app.settings['no-git-update']:
+ self._app.status(msg='Not updating existing git repository '
+ '%(repo_name)s '
+ 'because of no-git-update being set',
+ chatty=True,
+ repo_name=repo_name)
+ return self.get_repo(repo_name)
+
+ if self.has_repo(repo_name):
+ repo = self.get_repo(repo_name)
+ if ref and morphlib.git.is_valid_sha1(ref):
+ try:
+ repo.resolve_ref_to_commit(ref)
+ self._app.status(msg='Not updating git repository '
+ '%(repo_name)s because it '
+ 'already contains sha1 %(sha1)s',
+ chatty=True, repo_name=repo_name,
+ sha1=ref)
+ return repo
+ except morphlib.gitdir.InvalidRefError:
+ pass
+
+ self._app.status(msg='Updating %(repo_name)s',
+ repo_name=repo_name)
+ repo.update()
+ return repo
+ else:
+ self._app.status(msg='Cloning %(repo_name)s',
+ repo_name=repo_name)
+ return self.cache_repo(repo_name)
+
+ def ensure_submodules(self, toplevel_repo,
+ toplevel_ref): # pragma: no cover
+ '''Ensure any submodules of a given repo are cached and up to date.'''
+
+ def submodules_for_repo(repo_path, ref):
+ try:
+ submodules = morphlib.git.Submodules(self._app, repo_path, ref)
+ submodules.load()
+ return [(submod.url, submod.commit) for submod in submodules]
+ except morphlib.git.NoModulesFileError:
+ return []
+
+ done = set()
+ subs_to_process = submodules_for_repo(toplevel_repo.path, toplevel_ref)
+ while subs_to_process:
+ url, ref = subs_to_process.pop()
+ done.add((url, ref))
+
+ cached_repo = self.get_updated_repo(url, ref=ref)
+
+ for submod in submodules_for_repo(cached_repo.path, ref):
+ if (submod.url, submod.commit) not in done:
+ subs_to_process.add((submod.url, submod.commit))