summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <lars.wirzenius@codethink.co.uk>2011-12-09 17:03:04 +0000
committerLars Wirzenius <lars.wirzenius@codethink.co.uk>2011-12-09 17:14:56 +0000
commitd441caf6b9620dbe10eb51d31d6d6055c9161ae0 (patch)
treedf2e78c151ee0404fa467e6755bbe78517cd65e6
parentc90af8cff7819d40d0fdd14f1ea9c162f7d5d7d2 (diff)
downloadmorph-d441caf6b9620dbe10eb51d31d6d6055c9161ae0.tar.gz
add a "morph test" command
-rwxr-xr-xmorph50
-rw-r--r--morphlib/__init__.py1
-rw-r--r--morphlib/tester.py156
3 files changed, 207 insertions, 0 deletions
diff --git a/morph b/morph
index eda5cf58..84ccefbc 100755
--- a/morph
+++ b/morph
@@ -55,7 +55,27 @@ class Morph(cliapp.Application):
'DANGEROUS and will install stuff on your '
'system')
+ self.settings.boolean(['test-console'],
+ 'show what the system outputs on the serial '
+ 'console during tests')
+ self.settings.integer(['test-timeout'],
+ 'abort test if system doesn\'t produce '
+ 'expected output in TIMEOUT seconds '
+ '(default: %default)',
+ metavar='TIMEOUT',
+ default=10)
+
def cmd_build(self, args):
+ '''Build a binary from a morphology.
+
+ Command line arguments are the repository, git tree-ish reference,
+ and morphology filename. The binary gets put into the cache.
+
+ (The triplet of command line arguments may be repeated as many
+ times as necessary.)
+
+ '''
+
tempdir = morphlib.tempdir.Tempdir()
builder = morphlib.builder.Builder(tempdir, self)
@@ -73,6 +93,36 @@ class Morph(cliapp.Application):
if args:
raise cliapp.AppException('Extra args on command line: %s' % args)
+ def cmd_testsysimg(self, args):
+ '''Run tests for a built system image.
+
+ Command line arguments are the filename of the system image,
+ and the filenames of the Python modules that contain the test
+ "stories". Each module must have a variable called "story",
+ which is a list of tuples. Each tuple is either two strings
+ (one to send, the other a regular expression for what is expected
+ in return), or two strings and a timeout in seconds.
+
+ testsysimg runs the image under KVM, and accesses it via a
+ serial console, and runs the test stories, one by one.
+
+ '''
+
+ if not args:
+ raise cliapp.AppException('Missing command line arguments. '
+ 'Run with --help to see usage.')
+
+ system = morphlib.tester.KvmSystem(args[0],
+ verbose=self.settings['test-console'],
+ timeout=self.settings['test-timeout'])
+
+ for filename in args[1:]:
+ self.msg('Running %s' % filename)
+ module = morphlib.tester.load_module(filename)
+ story_steps = getattr(module, 'story')
+ story = morphlib.tester.TestStory(system, story_steps, self.msg)
+ story.run()
+ self.msg('Finished OK.')
def msg(self, msg):
'''Show a message to the user about what is going on.'''
diff --git a/morphlib/__init__.py b/morphlib/__init__.py
index c4a1992d..5d8568b4 100644
--- a/morphlib/__init__.py
+++ b/morphlib/__init__.py
@@ -28,4 +28,5 @@ import git
import morphology
import stopwatch
import tempdir
+import tester
import util
diff --git a/morphlib/tester.py b/morphlib/tester.py
new file mode 100644
index 00000000..e6e17031
--- /dev/null
+++ b/morphlib/tester.py
@@ -0,0 +1,156 @@
+# Copyright (C) 2011 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import imp
+import logging
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+
+class Timeout(Exception):
+
+ def __str__(self):
+ buf, regexp = self.args
+ return 'Cannot find %s in %s' % (repr(regexp), repr(buf))
+
+
+class System(object):
+
+ '''Abstract interface to a system.
+
+ The interface allows starting and stopping the system, and interacting
+ with its via a serial console.
+
+ '''
+
+ def start(self):
+ '''Start the system.'''
+ raise NotImplementedError()
+
+ def stop(self):
+ '''Stop the system.'''
+ raise NotImplementedError()
+
+ def waitfor(self, regexp, timeout):
+ '''Wait system to output a match for ``regexp`` on the serial console.
+
+ If timeout is exceeded, raise Timeout.
+
+ '''
+ raise NotImplementedError()
+
+ def send(self, text):
+ '''Send TEXT via the serial console to the system.'''
+ raise NotImplementedError()
+
+
+class KvmSystem(System):
+
+ '''A system running under KVM.'''
+
+ def __init__(self, image_filename, verbose=False, timeout=None):
+ self.image_filename = image_filename
+ self.verbose = verbose
+ self.timeout = timeout
+ self.p = None
+ self.stdin_buf = ''
+ self.stdout_buf = ''
+
+ def start(self):
+ self.p = subprocess.Popen(['kvm', '-nographic', self.image_filename],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ def stop(self):
+ self.p.terminate()
+ self.p.wait()
+
+ def _io(self, timeout):
+ r, w, x = select.select([self.p.stdout], [self.p.stdin], [], timeout)
+ if self.p.stdout in r:
+ byte = self.p.stdout.read(1)
+ if byte:
+ if self.verbose:
+ sys.stdout.write(byte)
+ sys.stdout.flush()
+ self.stdout_buf += byte
+ elif self.p.stdin in w:
+ if self.stdin_buf:
+ byte = self.stdin_buf[0]
+ self.p.stdin.write(byte)
+ self.p.stdin.flush()
+ self.stdin_buf = self.stdin_buf[1:]
+ else:
+ if self.verbose:
+ sys.stdout.write('_')
+ sys.stdout.flush()
+
+ def waitfor(self, regexp, timeout=None):
+ pat = re.compile(regexp, re.DOTALL | re.MULTILINE)
+ started = time.time()
+ timeout = timeout if timeout is not None else self.timeout
+ remaining = timeout
+ while not pat.search(self.stdout_buf) and remaining > 0:
+ self._io(remaining)
+ remaining = (started + timeout) - time.time()
+ m = pat.search(self.stdout_buf)
+ if not m:
+ raise Timeout(self.stdout_buf, regexp)
+ self.stdout_buf = self.stdout_buf[m.end():]
+
+ def send(self, text):
+ self.stdin_buf += text
+
+
+class TestStory(object):
+
+ '''Execute a test story.'''
+
+ def __init__(self, system, steps, msg):
+ self.system = system
+ self.steps = steps
+ self.msg = msg
+
+ def run(self):
+ self.system.start()
+ for t in self.steps:
+ if len(t) == 2:
+ send, expect = t
+ timeout = None
+ else:
+ assert len(t) == 3
+ send, expect, timeout = t
+ self.msg('Sending: %s' % repr(send))
+ self.msg('Expecting: %s' % repr(expect))
+ self.system.send(send)
+ self.system.waitfor(expect, timeout=timeout)
+ self.system.stop()
+
+
+def load_module(filename):
+ for t in imp.get_suffixes():
+ suffix, mode, kind = t
+ if filename.endswith(suffix):
+ module_name = os.path.basename(filename[:-len(suffix)])
+ with open(filename, mode) as f:
+ return imp.load_module(module_name, f, filename, t)
+ raise Exception("Unknown module: %s" % filename)
+