summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/test-minimal-system103
1 files changed, 103 insertions, 0 deletions
diff --git a/scripts/test-minimal-system b/scripts/test-minimal-system
new file mode 100755
index 00000000..1d9f7ca0
--- /dev/null
+++ b/scripts/test-minimal-system
@@ -0,0 +1,103 @@
+#!/usr/bin/python3
+# Copyright (C) 2017 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+'''test-minimal-system: Boots a disk image in QEMU and tests that it works.'''
+
+import argparse
+import asyncio
+import asyncio.subprocess
+import locale
+import logging
+import sys
+import time
+
+
+QEMU = 'qemu-system-x86_64'
+
+FAILURE_TIMEOUT = 60 # seconds
+
+
+def argument_parser():
+ parser = argparse.ArgumentParser(
+ description="Test that a minimal-system VM image works as expected")
+ parser.add_argument("sda", help="Path to disk image file")
+ return parser
+
+
+async def await_line(stream, marker):
+ '''Read from 'stream' until a line appears that starts with 'marker'.'''
+ marker = marker
+ async for line in stream:
+ decoded_line = line.decode('unicode-escape')
+ sys.stdout.write(decoded_line)
+ if decoded_line.strip().startswith(marker):
+ logging.debug("Matched line with marker: %s", decoded_line)
+ return decoded_line
+
+
+async def run_qemu_test(sda):
+ command = [QEMU, '-drive', 'file=%s,format=raw' % sda, '-nographic']
+
+ logging.debug("Starting process: %s", command)
+ process = await asyncio.create_subprocess_exec(
+ *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+
+ success = False
+ try:
+ init_banner = await await_line(process.stdout, "init started: BusyBox")
+ print("Got BusyBox init banner:", init_banner)
+ assert init_banner != None
+
+ process.stdin.write('\nuname -a\n'.encode('ascii'))
+
+ uname = await await_line(process.stdout, "Linux")
+ print("Got `uname -a` output:", uname)
+ assert uname != None
+
+ print("Test successful")
+ success = True
+ except asyncio.CancelledError:
+ # Move straight to process.kill()
+ pass
+ finally:
+ process.kill()
+ await process.wait()
+ return success
+
+
+def fail_timeout(qemu_task):
+ sys.stderr.write("Test failed as timeout of %i seconds was reached.\n" %
+ FAILURE_TIMEOUT)
+ qemu_task.cancel()
+
+
+def main():
+ args = argument_parser().parse_args()
+
+ loop = asyncio.get_event_loop()
+ qemu_task = loop.create_task(run_qemu_test(args.sda))
+ loop.call_later(FAILURE_TIMEOUT, fail_timeout, qemu_task)
+ loop.run_until_complete(qemu_task)
+ loop.close()
+
+ if qemu_task.result():
+ return 0
+ else:
+ return 1
+
+
+result = main()
+sys.exit(result)