summaryrefslogtreecommitdiff
path: root/scripts/test-minimal-system
blob: 1d9f7ca085144fef33936f5e26f102a605278ece (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/python3
# Copyright (C) 2017 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

'''test-minimal-system: Boots a disk image in QEMU and tests that it works.'''

import argparse
import asyncio
import asyncio.subprocess
import locale
import logging
import sys
import time


QEMU = 'qemu-system-x86_64'

FAILURE_TIMEOUT = 60   # seconds


def argument_parser():
    parser = argparse.ArgumentParser(
        description="Test that a minimal-system VM image works as expected")
    parser.add_argument("sda", help="Path to disk image file")
    return parser


async def await_line(stream, marker):
    '''Read from 'stream' until a line appears that starts with 'marker'.'''
    marker = marker
    async for line in stream:
        decoded_line = line.decode('unicode-escape')
        sys.stdout.write(decoded_line)
        if decoded_line.strip().startswith(marker):
            logging.debug("Matched line with marker: %s", decoded_line)
            return decoded_line


async def run_qemu_test(sda):
    command = [QEMU, '-drive', 'file=%s,format=raw' % sda, '-nographic']

    logging.debug("Starting process: %s", command)
    process = await asyncio.create_subprocess_exec(
        *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)

    success = False
    try:
        init_banner = await await_line(process.stdout, "init started: BusyBox")
        print("Got BusyBox init banner:", init_banner)
        assert init_banner != None

        process.stdin.write('\nuname -a\n'.encode('ascii'))

        uname = await await_line(process.stdout, "Linux")
        print("Got `uname -a` output:", uname)
        assert uname != None

        print("Test successful")
        success = True
    except asyncio.CancelledError:
        # Move straight to process.kill()
        pass
    finally:
        process.kill()
    await process.wait()
    return success


def fail_timeout(qemu_task):
    sys.stderr.write("Test failed as timeout of %i seconds was reached.\n" %
                     FAILURE_TIMEOUT)
    qemu_task.cancel()


def main():
    args = argument_parser().parse_args()

    loop = asyncio.get_event_loop()
    qemu_task = loop.create_task(run_qemu_test(args.sda))
    loop.call_later(FAILURE_TIMEOUT, fail_timeout, qemu_task)
    loop.run_until_complete(qemu_task)
    loop.close()

    if qemu_task.result():
        return 0
    else:
        return 1


result = main()
sys.exit(result)