#!/usr/bin/python # # Copyright (C) 2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ''' Baserock test suite for Trove upgrade Trove is the most complex system that we currently have in Baserock. Therefore we test the Baserock toolset's upgrade functionality using a Trove system. This will one day be a MUSTARD Loom Yarn. This test must be run on a Baserock devel system, which has passwordless SSH access to the KVM host specified as 'DEPLOY_URL' below. The tests will deploy a VM named 'brtests-$host', so a single KVM host can be used by multiple test machines, as long as each test machine has only one test running at a time. Ideas for improvement: - use https://github.com/paramiko/paramiko (ssh library for Python) How to fit this into Yarn: - I don't know! We need to pass state in a totally different way (via the environment). - Should be enough to break the tests into function calls that save state via pickle or the environment. An annoying extra layer of indirection on an already complex codebase, though. Helpful advice: - There is a '--reuse-fixture' option which reuses 'GIVEN' state for a test suite instead of deploying a new VM (which takes ~5 minutes). - Deployment is broken into separate create_config(), run_build() and run_deploy() steps, so that you can comment out calls to one or more of these when iterating over a specific test. ''' import cliapp import contextlib import distutils.version import os import shutil import socket import subprocess import sys import tempfile import time import urlparse import yaml # In general, tests should treat Morph as a black box rather than calling # functions in morphlib to do stuff. It's better than copying and pasting # stuff though! import morphlib import config import util from util import read_file, write_file from util import remote_runcmd from util import run_git, run_morph from util import set_directory class Deployment(object): ''' Base class for context of an initial deployment or upgrade. Creating config, building the system and doing the deployment are deliberately separated because building and deploying are currently slow operations even when repeating an identical build or deployment, so it's often necessary during development to comment out these steps so that the edit-test cycle is not impossibly long. ''' def __init__(self, system_morph_name, deploy_morph_name, systembranch): self.branch = systembranch self.name = config.DEPLOY_NAME self.system_morph_name = system_morph_name self.deploy_morph_name = deploy_morph_name loader = morphlib.morphloader.MorphologyLoader() system_morph_file = os.path.join( self.branch.morphs_dir, system_morph_name + '.morph') self.system_morph = loader.load_from_file(system_morph_file) self.arch = self.system_morph['arch'] def create_config(self): pass def run_build(self): controller = config.DISTBUILD_INITIATORS[self.arch] command = [ 'distbuild', '--controller-initiator-address=%s' % controller, self.system_morph_name] run_morph(command, cwd=self.branch.branch_dir) def run_deploy(self, autostart=False): autostart_param = \ '%s.AUTOSTART=%s' % (self.name, 'yes' if autostart else 'no') run_morph( ['deploy', self.deploy_morph_name, autostart_param], cwd=self.branch.branch_dir) def create_ssh_key(self, key_name): file_path = os.path.join(self.branch.morphs_dir, '%s.key' % key_name) # The '-N ""' is very important here: otherwise ssh-keygen will block # waiting for input. If you try to pass this command to cliapp.runcmd() # as a list it will mangle the quotes. comment = "Generated by Baserock automated tests for '%s'" % key_name keygen_cmd = 'ssh-keygen -N "" -t rsa -b 2048 -f %s -C "%s"' % \ (file_path, comment) cliapp.runcmd(['sh', '-c', keygen_cmd]) def ensure_configure_extension_enabled(self, extension_name): system_morph = os.path.join( self.branch.morphs_dir, '%s.morph' % self.system_morph_name) morph = yaml.load(read_file(system_morph)) if extension_name not in morph['configuration-extensions']: morph['configuration-extensions'].append(extension_name) write_file(system_morph, yaml.dump(morph)) def add_root_ssh_authorized_key(self, public_key_text): self.ensure_configure_extension_enabled('install-files') deploy_files_dir = os.path.join( self.branch.morphs_dir, '%s-files' % self.deploy_morph_name) root_ssh_dir = os.path.join(deploy_files_dir, 'root', '.ssh') os.makedirs(root_ssh_dir) with open(os.path.join(root_ssh_dir, 'authorized_keys'), 'w') as f: f.write("# Added by Baserock automated test runner\n") f.write(public_key_text) with open(os.path.join(deploy_files_dir, 'manifest'), 'wa') as f: f.write('0040700 0 0 /root/.ssh/\n') f.write('0100644 0 0 /root/.ssh/authorized_keys\n') def wait_for_ssh(self, timeout=config.BOOT_TIMEOUT, **kwargs): print "Waiting for machine to respond over SSH ..." start_time = time.time() while True: try: print self.runcmd(['whoami'], **kwargs) break except cliapp.AppException as e: if time.time() < start_time + timeout: # Assume that this is because sshd hasn't started yet. pass else: print("Waited > %s seconds for host %s to respond over " "SSH" % (timeout, self.name)) raise time.sleep(0.5) def runcmd(self, command, **kwargs): url = 'ssh://root@%s/' % self.name return remote_runcmd(url, command, **kwargs) class TroveInitialDeployment(Deployment): ''' Wraps creating necessary files for a Trove deployment. Attribute 'deploy_morph_name' contains a value to passed to 'morph deploy'. ''' def __init__(self, systembranch): super(TroveInitialDeployment, self).__init__( 'trove-system-x86_64', 'trove-test-deploy', systembranch) def create_config(self, initial_deploy_type='kvm'): self.create_ssh_key('lorry') self.create_ssh_key('mason') self.create_ssh_key('worker') self.create_ssh_key('testuser') if initial_deploy_type == 'kvm': deploy_location = ''.join( [config.DEPLOY_URL, self.name, config.DEPLOY_PATH, '%s.img' % self.name]) else: raise NotImplementedError() self.create_trove_deployment_morph( deploy_type='kvm', location=deploy_location) testuser_public_key = read_file( os.path.join(self.branch.morphs_dir, 'testuser.key.pub')) self.add_root_ssh_authorized_key(testuser_public_key) self.admin_id = os.path.join(self.branch.morphs_dir, 'testuser.key') def create_trove_deployment_morph(self, deploy_type=None, location=None): trove_config = dict( type=deploy_type, location=location, DISK_SIZE='3G', VERSION_LABEL='trove-old', INSTALL_FILES='%s-files/manifest' % self.deploy_morph_name, TROVE_ID=self.name, TROVE_COMPANY='Codethink', #UPSTREAM_TROVE='git.baserock.org' #UPSTREAM_TROVE_USER='nobody' #UPSTREAM_TROVE_EMAIL='nobody@example.com' TROVE_ADMIN_USER='testuser', TROVE_ADMIN_EMAIL='test@example.com', TROVE_ADMIN_NAME='Automated Test Gitano Admin User', TROVE_ADMIN_SSH_PUBKEY='testuser.key.pub', LORRY_SSH_KEY='lorry.key', WORKER_SSH_PUBKEY='worker.key.pub', MASON_SSH_PUBKEY='mason.key.pub', ) cluster_morph = dict( name=self.deploy_morph_name, kind='cluster', description='Generated by Baserock automated tests', systems=[ dict( morph=self.system_morph_name, deploy={ self.name: trove_config } ) ] ) text = yaml.dump(cluster_morph) file_path = os.path.join( self.branch.morphs_dir, '%s.morph' % self.deploy_morph_name) with open(file_path, 'w') as f: f.write(text) class TroveUpgrade(Deployment): def __init__(self, systembranch): super(TroveUpgrade, self).__init__( 'trove-system-x86_64', 'trove-test-upgrade', systembranch) def run_deploy(self, autostart=False): autostart_param = \ '%s.AUTOSTART=%s' % (self.name, 'yes' if autostart else 'no') run_morph( ['deploy', self.deploy_morph_name, '--upgrade', autostart_param], cwd=self.branch.branch_dir) def create_config(self, initial_deployment, upgrade_method='ssh-rsync', version_label='trove-current'): self.ensure_configure_extension_enabled('install-files') if upgrade_method == 'ssh-rsync': location = 'root@%s' % self.name else: raise NotImplementedError() self.create_trove_upgrade_morph( initial_deployment, upgrade_method=upgrade_method, location=location, version_label=version_label) def create_trove_upgrade_morph( self, initial_deployment, upgrade_method, location, version_label): ''' FIXME: this is totally wrong! Instead of having to provide exactly the config that the initial deployment used, we should avoid configuration extensions for upgrades entirely and propagate the deploy-time configuration using baserock-system-config-sync. ''' def copy_file_from_initial_deployment(filename, dest_filename=None): src = os.path.join(initial_deployment.branch.morphs_dir, filename) dest = os.path.join(self.branch.morphs_dir, dest_filename or filename) shutil.copyfile(src, dest) def copy_dir_from_initial_deployment(dirname): src = os.path.join(initial_deployment.branch.morphs_dir, dirname) dest = os.path.join(self.branch.morphs_dir, dirname) shutil.copytree(src, dest) for key in ['testuser', 'lorry', 'worker', 'mason']: copy_file_from_initial_deployment('%s.key' % key) copy_file_from_initial_deployment('%s.key.pub' % key) copy_file_from_initial_deployment( '%s.morph' % initial_deployment.deploy_morph_name, '%s.morph' % self.deploy_morph_name) copy_dir_from_initial_deployment( '%s-files' % initial_deployment.deploy_morph_name) deploy_morph_file = os.path.join( self.branch.morphs_dir, '%s.morph' % self.deploy_morph_name) deploy_morph = yaml.load(read_file(deploy_morph_file)) deploy_morph['name'] = self.deploy_morph_name system_config = deploy_morph['systems'][0]['deploy'][self.name] system_config['type'] = upgrade_method system_config['location'] = location system_config['VERSION_LABEL'] = version_label write_file(deploy_morph_file, yaml.dump(deploy_morph)) class SystemTestBranch(): '''Morph system-branch abstraction for use in Baserock system tests.''' def __init__(self, workspace_dir, name): self.workspace_dir = workspace_dir self.branch_dir = os.path.join(workspace_dir, name) self.morphs_dir = os.path.join( self.branch_dir, 'baserock', 'baserock', 'definitions') self.sysbranch = morphlib.sysbranchdir.open(self.branch_dir) def get_repo_and_ref_for_chunk(self, stratum, chunk): stratum_morph_file = os.path.join( self.morphs_dir, '%s.morph' % stratum) stratum_morph = yaml.load(read_file(stratum_morph_file)) for chunkref in stratum_morph['chunks']: if chunkref['name'] == chunk: return chunkref['repo'], chunkref['ref'] else: raise Exception( "Chunk %s not found in stratum %s" % (chunk, stratum)) def _copy_chunk_morph_if_missing(self, chunk_dir, chunk, original_ref): chunk_morph_name = '%s.morph' % chunk with set_directory(chunk_dir): if os.path.exists(chunk_morph_name): return output = run_git(['ls-tree', original_ref, chunk_morph_name]) original_ref_has_chunk_morph = (len(output) > 0) if not original_ref_has_chunk_morph: return object_name = original_ref + ':' + chunk_morph_name with open(chunk_morph_name, 'w') as f: run_git(['cat-file', 'blob', object_name], stdout=f) message = 'Add chunk from branch %s' % config.BRANCH run_git(['add', chunk_morph_name]) run_git(['commit', '-m', message]) def set_chunk_version(self, system, stratum, chunk, ref='baserock/morph'): ''' Force a specific version of a chunk to be built in a system self. Uses `morph edit` and `git reset` to achieve this. ''' chunk_repo_url, original_ref = self.get_repo_and_ref_for_chunk( stratum, chunk) with set_directory(self.morphs_dir): run_morph(['edit', system, stratum, chunk]) run_git(['add', '%s.morph' % stratum]) run_git(['commit', '-m', 'Edit %s chunk' % chunk]) chunk_dir = self.sysbranch.get_git_directory_name(chunk_repo_url) run_git(['reset', '--hard', ref], cwd=chunk_dir) self._copy_chunk_morph_if_missing(chunk_dir, chunk, original_ref) class TestInitialDeployment(object): ''' FIXME: this is out of date! Make it use the 'BaseTestSuite' class instead! ''' def initial_deploy(self, branch, **deploy_kwargs): ''' Initial deployment of trove-system-x86_64 to a newly-created VM. Returns a context with the following things tied to it: - the VM itself (FIXME: isn't actually deleted when the context exists) - an SSH identity added to the machine's SSH agent that provides root access to the deployed VM ''' if self.settings['reuse-workspace'] is not None: # Hack to reuse an existing workspace and running VM because `morph # deploy` currently takes several minutes. class ReuseTroveDeploy(TroveDeployment): def __init__(self, systembranch): self.branch = systembranch self.set_conveniences() trove_deploy = ReuseTroveDeploy(branch) else: trove_deploy = TroveDeployment(branch, **deploy_kwargs) def test_rawdisk_upgrade(self, workspace_dir): branch = self.create_system_branch(workspace_dir, 'testbranch') # Script should: # - deploy trove # - apply patch in system branch # - deploy trove as an upgrade # FIXME: doesn't perform an upgrade, yet. Should we keep the rawdisk # upgrade path? run_morph( ['build', 'trove-system-x86_64'], cwd=branch.branch_dir) image_path = os.path.join( branch.workspace_dir, 'deployed-system.img') trove_deploy = TroveDeployment( branch, deploy_type='rawdisk', location=image_path) run_morph(['deploy', trove_deploy.deploy_morph_name], cwd=branch.branch_dir) class TimeoutError(Exception): pass class BaseTestSuite(object): def wait_for_hostname_to_appear(self, hostname, timeout=10): ''' Block until given hostname resolves successfully. Raises TimeoutError if the hostname has not appeared in 'timeout' seconds. ''' start_time = time.time() while True: try: socket.gethostbyname(hostname) return time.time() - start_time except socket.gaierror as e: pass if time.time() > start_time + timeout: raise TimeoutError( "Host %s did not appear after %i seconds" % (hostname, timeout)) time.sleep(0.5) def wait_for_machine_to_boot(self, instance): wait_time = self.wait_for_hostname_to_appear( instance.name, timeout=config.BOOT_TIMEOUT) if config.VERBOSE: print "Host %s appeared after %0.1f seconds" % \ (instance.name, wait_time) # Remove machine from 'known_hosts', as its identity has probably # changed. cliapp.runcmd(['ssh-keygen', '-R', instance.name]) instance.wait_for_ssh(timeout=config.BOOT_TIMEOUT-wait_time) def create_system_branch(self, workspace_dir, name, parent=config.BRANCH): run_morph( ['branch', 'baserock:baserock/definitions', name, parent], cwd=workspace_dir) return SystemTestBranch(workspace_dir, name) class TestUpgrades(BaseTestSuite): ''' IMPORTANT NOTE: a lot of these tests involve downgrading lighttpd to version 1.3.14; this version of lighttpd cannot parse the /etc/lighttpd.conf file that is in use on a modern Trove! So you will find lighttpd fails on the old system but not on the new system! This behaviour should either be incorporated into the tests, or we should downgrade a different chunk so as not to break things! ''' def upgrade_to_latest_trove(self, workspace_dir, instance): branch = self.create_system_branch(workspace_dir, 'current') upgrade = TroveUpgrade(branch) upgrade.create_config(instance, upgrade_method='ssh-rsync', version_label='trove-current-2') upgrade.run_build() upgrade.run_deploy(autostart=True) self.wait_for_machine_to_boot(instance) def rollback_system_to_factory(self, instance, version_label): try: instance.runcmd( ['system-version-manager', 'set-default', version_label]) instance.runcmd(['reboot']) except cliapp.AppException: # Bit of a hack because we get disconnect before the command # exits so SSH returns failure. pass self.wait_for_machine_to_boot(instance) def get_lighttpd_version(self, instance, expected_start=None): text = instance.runcmd(['lighttpd', '-v']) version_string = text.split(' ')[0] version = distutils.version.LooseVersion( version_string[len('lighttpd-'):]) if config.VERBOSE: print "lighttpd output: %s (%s)" % (text, version.version) if expected_start is not None: assert text.startswith(expected_start) return version @contextlib.contextmanager def given_out_of_date_trove_instance(self, fixture_dir, reuse=False): ''' GIVEN a running current Trove system but with lighttpd version 1.3.14 ''' if reuse: branch = SystemTestBranch(fixture_dir, 'old') instance = TroveInitialDeployment(branch) instance.admin_id = os.path.join(branch.morphs_dir, 'testuser.key') else: branch = self.create_system_branch(fixture_dir, 'old') branch.set_chunk_version( system='trove-system-x86_64', stratum='trove', chunk='lighttpd', ref='lighttpd-1.3.14') instance = TroveInitialDeployment(branch) instance.create_config() instance.run_build() instance.run_deploy(autostart=True) cliapp.runcmd(['ssh-add', instance.admin_id]) try: self.wait_for_machine_to_boot(instance) self.get_lighttpd_version(instance, expected_start='lighttpd-1.3.14') yield instance finally: # Should pass the .pub file really ... cliapp.runcmd(['ssh-add', '-d', instance.admin_id]) def test_scenario_trove_upgrade(self, fixture_dir, workspace_dir, reuse_fixture=False): ''' We want to be able to upgrade an old Trove system to the latest Trove system. While in the real world the user would only want to deploy a released Trove system, that would preclude using this test in continuous integration as we would only notice breakages *after* we had made a release, and the tests would need to be updated for every release. Better to test that deploying 'master' of Trove still works. We artificially create an 'out of date' Trove system because we need to know what to test for (and there is only one public release of Trove at the time of writing). This is more useful than just deploying and upgrade and assuming that if there were no errors from the Baserock deployment tool then it must have been successful. Lighttpd is used in the test because it triggers very few rebuilds. Access to Trove is via SSH, so this also tests that the authorized SSH keys for the root account are correctly shared between the system versions. If they are not, the test will not be able to access the upgraded machine. SCENARIO Bob upgrades his Trove (vague version) GIVEN a running an out-of-date Trove system WHEN Bob builds and upgrades to the current version of Trove with AUTOSTART=1 THEN the Trove is at the new version SCENARIO Bob upgrades his Trove (specific version) GIVEN a running current Trove system but with lighttpd version 1.3.14 WHEN Bob upgrades to the current version of Trove and sets it to autostart immediately THEN the Trove uses a newer version of lightttpd than 1.3.14 WHEN Bob rolls back to the old version of Trove THEN the Trove uses lighttpd version 1.3.14 ''' with self.given_out_of_date_trove_instance( fixture_dir, reuse=reuse_fixture) as instance: old_lighttpd_version = self.get_lighttpd_version(instance) self.upgrade_to_latest_trove(workspace_dir, instance) new_lighttpd_version = self.get_lighttpd_version(instance) self.rollback_system_to_factory(instance) rollback_lighttpd_version = self.get_lighttpd_version(instance) if config.VERBOSE: print "Base system lighttpd version: %s" % old_lighttpd_version print "Upgraded system lighttpd version: %s" % new_lighttpd_version print "Lighttpd version after rollback: %s" % rollback_lighttpd_version assert new_lighttpd_version > old_lighttpd_version assert old_lighttpd_version == rollback_lighttpd_version def get_linux_version(self, instance, expected_start=None): text = instance.runcmd(['uname', '--kernel-release']) version = distutils.version.LooseVersion(text) if config.VERBOSE: print "uname output: %s (%s)" % (text, version.version) if expected_start is not None: assert text.startswith(expected_start) return version @contextlib.contextmanager def given_trove_instance_with_old_kernel(self, fixture_dir, reuse=False): ''' GIVEN a running current Trove system but with Linux 3.6 ''' if reuse: branch = SystemTestBranch(fixture_dir, 'old') instance = TroveInitialDeployment(branch) instance.admin_id = os.path.join(branch.morphs_dir, 'testuser.key') else: branch = self.create_system_branch(fixture_dir, 'old') branch.set_chunk_version( system='trove-system-x86_64', stratum='bsp-x86_64-generic', chunk='linux', ref='v3.6') instance = TroveInitialDeployment(branch) instance.create_config() instance.run_build() instance.run_deploy(autostart=True) cliapp.runcmd(['ssh-add', instance.admin_id]) try: self.wait_for_machine_to_boot(instance) self.get_linux_version(instance, expected_start='3.6') yield instance finally: # Should pass the .pub file really ... cliapp.runcmd(['ssh-add', '-d', instance.admin_id]) def test_scenario_trove_kernel_upgrade(self, fixture_dir, workspace_dir, reuse_fixture=False): ''' The kernel requires special treatment by the upgrade mechanism because it is the first thing that loads. SCENARIO Bob upgrades his Trove (vague version) GIVEN a Trove system with Linux 3.6 WHEN Bob builds and upgrades to the current version of Trove with AUTOSTART=1 THEN the Trove uses the standard version of Linux used Baserock master WHEN Bob rolls back to the old version of the system THEN the Trove uses Linux 3.6 ''' with self.given_trove_instance_with_old_kernel( fixture_dir, reuse=reuse_fixture) as instance: old_linux_version = self.get_linux_version(instance) self.upgrade_to_latest_trove(workspace_dir, instance) new_linux_version = self.get_linux_version(instance) self.rollback_system_to_factory(instance) rollback_linux_version = self.get_linux_version( instance, expected_start=='3.6') if config.VERBOSE: print "Base system linux version: %s" % old_linux_version print "Upgraded system linux version: %s" % new_linux_version print "Rollback linux version: %s" % rollback_linux_version assert old_linux_version == rollback_linux_version assert new_linux_version > old_linux_version def test_scenario_trove_upgrade_shared_user_data( self, fixture_dir, workspace_dir, reuse_fixture=False): ''' Ensure that the user does not lose data when upgrading in the brave new world of separating the base OS from the OS state and the user data. There are some holes at the moment: - Anything the user puts in / directly will be lost on upgrade. - Anything the user puts in /usr, /lib, /bin or /sbin will be lost on upgrade. This test is simple because currently the method used to share the state is to mount a shared subvolume, so the only thing that can go wrong is for all of a directories contents to be missing. SCENARIO Bob upgrades his Trove GIVEN a running an out-of-date Trove system AND files in /opt, /srv, /home, /root and /var WHEN Bob builds and upgrades to the current version of Trove with AUTOSTART=1 THEN the files are present in the new Trove ''' statedirs = ['/home', '/opt', '/root', '/srv', '/var'] with self.given_out_of_date_trove_instance( fixture_dir, reuse=reuse_fixture) as instance: for statedir in statedirs: cmd = '\"echo \\"Test user data\\" > %s/TEST_FILE\"' % statedir instance.runcmd(['sh', '-c', cmd]) self.upgrade_to_latest_trove(workspace_dir, instance) for statedir in statedirs: test_file = '%s/TEST_FILE' % statedir content = instance.runcmd(['cat', test_file]) assert content == "Test user data\n" def test_scenario_trove_upgrade_user_accounts( self, fixture_dir, workspace_dir, reuse_fixture=False): ''' Ensure a user account added in the original system version still works in the new system version. This tests /etc syncing, although all tests exercise this to a certain degree because if e.g. /etc/hostname is not correct in the upgraded system then the test will lose SSH access and fail. SCENARIO Bob upgrades his Trove GIVEN a running an out-of-date Trove system AND a user account 'bob' WHEN Bob builds and upgrades to the current version of Trove with AUTOSTART=1 THEN The user account 'bob' is present and functional ''' with self.given_out_of_date_trove_instance( fixture_dir, reuse=reuse_fixture) as instance: # Set up Bob's account without a password. instance.runcmd(['adduser', '-D', 'bob']) whoami = instance.runcmd(['su', 'bob', '-c', 'whoami']) home = instance.runcmd(['su', 'bob', '-c', '\"cd && pwd\"']) assert whoami == "bob\n" assert home == "/home/bob\n" self.upgrade_to_latest_trove(workspace_dir, instance) whoami = instance.runcmd(['su', 'bob', '-c', 'whoami']) home = instance.runcmd(['su', 'bob', '-c', '\"cd && pwd\"']) assert whoami == "bob\n" assert home == "/home/bob\n" class SimpleTestRunner(cliapp.Application): ''' Run a Baserock system test suite. There is a test suite-wide Morph workspace provided, which should be shared by all prerequisites ('GIVEN') implementations. This is called the 'fixture_dir.' Multiple GIVEN implementations in a single test suite should use differently-named system branches to avoid conflicting with each other. It is up to the test suite's GIVEN implementations to deal with a directory that already contains their data without failing. Each test gets another, 'workspace_dir'. This is per-test and should be used for the 'WHEN' implementations. Since initial deployments currently take several minutes it is very useful to reuse 'GIVEN' state instead of recreating it each time when working on a specific test suite. ''' def check_access_to_deploy_host(self): # From: https://stackoverflow.com/questions/3830508/check-if-passwordless-access-has-been-setup deploy_url = urlparse.urlsplit(config.DEPLOY_URL) assert deploy_url[0] == 'kvm+ssh' try: cliapp.runcmd( ['ssh', '-o', 'NumberOfPasswordPrompts=0', deploy_url[1], 'whoami']) except cliapp.AppException: raise cliapp.AppException( "No passwordless access to deploy host '%s'. Check the SSH " "authorized keys for the remote account." % deploy_url[1]) def maybe_delete_vm(self, vm_name): # FIXME: Would be better if this would check if the machine was running # before destroying it, and checked if it existed before undefining # it, rather than just ignoring exceptions. def run_virsh(args): try: remote_runcmd( config.DEPLOY_URL, ['virsh', '-c', 'qemu:///system'] + args) except cliapp.AppException as e: pass run_virsh(['destroy', vm_name]) run_virsh(['undefine', vm_name]) def add_settings(self): self.settings.string( ['reuse-fixture', 'r'], 'use an existing deployment from a test fixture instead of ' 'building a clean one, to avoid repeating a slow morph deploy', metavar='DIR', default=None) def process_args(self, args): self.check_access_to_deploy_host() if self.settings['reuse-fixture'] is None: self.maybe_delete_vm(config.DEPLOY_NAME) self.run_test() def run_test(self): #test = TestUpgrades().test_scenario_trove_upgrade #test = TestUpgrades().test_scenario_trove_kernel_upgrade #test = TestUpgrades().test_scenario_trove_upgrade_shared_user_data test = TestUpgrades().test_scenario_trove_upgrade_user_accounts if self.settings['reuse-fixture'] is not None: fixture_dir = self.settings['reuse-fixture'] else: fixture_dir = cliapp.runcmd( ['mktemp', '-d', '-p', config.BUILD_TEMPDIR]).strip() run_morph(['init', fixture_dir]) # `morph init` expects the workspace dir to be empty, but once it's # created we can put the logs in there too. Why not. config.log_dir = fixture_dir try: print "Running %s" % test workspace_dir = cliapp.runcmd( ['mktemp', '-d', '-p', config.BUILD_TEMPDIR]).strip() try: run_morph(['init', workspace_dir]) reuse_fixture = self.settings['reuse-fixture'] is not None test(fixture_dir, workspace_dir, reuse_fixture=reuse_fixture) finally: print "Workspace kept in %s" % workspace_dir #cliapp.runcmd(['rm', '-r', workspace_dir]) except Exception as e: import pdb print 'Exception: ', e pdb.post_mortem(sys.exc_traceback) finally: # Careful now! print "Fixture kept in %s" % fixture_dir #cliapp.runcmd(['rm', '-r', fixture_dir]) if __name__ == '__main__': SimpleTestRunner().run()