summaryrefslogtreecommitdiff
path: root/deps/gyp/test/lib/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'deps/gyp/test/lib/TestCmd.py')
-rw-r--r--deps/gyp/test/lib/TestCmd.py1597
1 files changed, 0 insertions, 1597 deletions
diff --git a/deps/gyp/test/lib/TestCmd.py b/deps/gyp/test/lib/TestCmd.py
deleted file mode 100644
index 71403614b9..0000000000
--- a/deps/gyp/test/lib/TestCmd.py
+++ /dev/null
@@ -1,1597 +0,0 @@
-"""
-TestCmd.py: a testing framework for commands and scripts.
-
-The TestCmd module provides a framework for portable automated testing
-of executable commands and scripts (in any language, not just Python),
-especially commands and scripts that require file system interaction.
-
-In addition to running tests and evaluating conditions, the TestCmd
-module manages and cleans up one or more temporary workspace
-directories, and provides methods for creating files and directories in
-those workspace directories from in-line data, here-documents), allowing
-tests to be completely self-contained.
-
-A TestCmd environment object is created via the usual invocation:
-
- import TestCmd
- test = TestCmd.TestCmd()
-
-There are a bunch of keyword arguments available at instantiation:
-
- test = TestCmd.TestCmd(description = 'string',
- program = 'program_or_script_to_test',
- interpreter = 'script_interpreter',
- workdir = 'prefix',
- subdir = 'subdir',
- verbose = Boolean,
- match = default_match_function,
- diff = default_diff_function,
- combine = Boolean)
-
-There are a bunch of methods that let you do different things:
-
- test.verbose_set(1)
-
- test.description_set('string')
-
- test.program_set('program_or_script_to_test')
-
- test.interpreter_set('script_interpreter')
- test.interpreter_set(['script_interpreter', 'arg'])
-
- test.workdir_set('prefix')
- test.workdir_set('')
-
- test.workpath('file')
- test.workpath('subdir', 'file')
-
- test.subdir('subdir', ...)
-
- test.rmdir('subdir', ...)
-
- test.write('file', "contents\n")
- test.write(['subdir', 'file'], "contents\n")
-
- test.read('file')
- test.read(['subdir', 'file'])
- test.read('file', mode)
- test.read(['subdir', 'file'], mode)
-
- test.writable('dir', 1)
- test.writable('dir', None)
-
- test.preserve(condition, ...)
-
- test.cleanup(condition)
-
- test.command_args(program = 'program_or_script_to_run',
- interpreter = 'script_interpreter',
- arguments = 'arguments to pass to program')
-
- test.run(program = 'program_or_script_to_run',
- interpreter = 'script_interpreter',
- arguments = 'arguments to pass to program',
- chdir = 'directory_to_chdir_to',
- stdin = 'input to feed to the program\n')
- universal_newlines = True)
-
- p = test.start(program = 'program_or_script_to_run',
- interpreter = 'script_interpreter',
- arguments = 'arguments to pass to program',
- universal_newlines = None)
-
- test.finish(self, p)
-
- test.pass_test()
- test.pass_test(condition)
- test.pass_test(condition, function)
-
- test.fail_test()
- test.fail_test(condition)
- test.fail_test(condition, function)
- test.fail_test(condition, function, skip)
-
- test.no_result()
- test.no_result(condition)
- test.no_result(condition, function)
- test.no_result(condition, function, skip)
-
- test.stdout()
- test.stdout(run)
-
- test.stderr()
- test.stderr(run)
-
- test.symlink(target, link)
-
- test.banner(string)
- test.banner(string, width)
-
- test.diff(actual, expected)
-
- test.match(actual, expected)
-
- test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
- test.match_exact(["actual 1\n", "actual 2\n"],
- ["expected 1\n", "expected 2\n"])
-
- test.match_re("actual 1\nactual 2\n", regex_string)
- test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
-
- test.match_re_dotall("actual 1\nactual 2\n", regex_string)
- test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
-
- test.tempdir()
- test.tempdir('temporary-directory')
-
- test.sleep()
- test.sleep(seconds)
-
- test.where_is('foo')
- test.where_is('foo', 'PATH1:PATH2')
- test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
-
- test.unlink('file')
- test.unlink('subdir', 'file')
-
-The TestCmd module provides pass_test(), fail_test(), and no_result()
-unbound functions that report test results for use with the Aegis change
-management system. These methods terminate the test immediately,
-reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
-status 0 (success), 1 or 2 respectively. This allows for a distinction
-between an actual failed test and a test that could not be properly
-evaluated because of an external condition (such as a full file system
-or incorrect permissions).
-
- import TestCmd
-
- TestCmd.pass_test()
- TestCmd.pass_test(condition)
- TestCmd.pass_test(condition, function)
-
- TestCmd.fail_test()
- TestCmd.fail_test(condition)
- TestCmd.fail_test(condition, function)
- TestCmd.fail_test(condition, function, skip)
-
- TestCmd.no_result()
- TestCmd.no_result(condition)
- TestCmd.no_result(condition, function)
- TestCmd.no_result(condition, function, skip)
-
-The TestCmd module also provides unbound functions that handle matching
-in the same way as the match_*() methods described above.
-
- import TestCmd
-
- test = TestCmd.TestCmd(match = TestCmd.match_exact)
-
- test = TestCmd.TestCmd(match = TestCmd.match_re)
-
- test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
-
-The TestCmd module provides unbound functions that can be used for the
-"diff" argument to TestCmd.TestCmd instantiation:
-
- import TestCmd
-
- test = TestCmd.TestCmd(match = TestCmd.match_re,
- diff = TestCmd.diff_re)
-
- test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
-
-The "diff" argument can also be used with standard difflib functions:
-
- import difflib
-
- test = TestCmd.TestCmd(diff = difflib.context_diff)
-
- test = TestCmd.TestCmd(diff = difflib.unified_diff)
-
-Lastly, the where_is() method also exists in an unbound function
-version.
-
- import TestCmd
-
- TestCmd.where_is('foo')
- TestCmd.where_is('foo', 'PATH1:PATH2')
- TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
-"""
-
-# Copyright 2000-2010 Steven Knight
-# This module is free software, and you may redistribute it and/or modify
-# it under the same terms as Python itself, so long as this copyright message
-# and disclaimer are retained in their original form.
-#
-# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
-# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
-# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
-# DAMAGE.
-#
-# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
-# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
-# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-
-__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
-__version__ = "0.37"
-
-import errno
-import os
-import os.path
-import re
-import shutil
-import stat
-import string
-import sys
-import tempfile
-import time
-import traceback
-import types
-import UserList
-
-__all__ = [
- 'diff_re',
- 'fail_test',
- 'no_result',
- 'pass_test',
- 'match_exact',
- 'match_re',
- 'match_re_dotall',
- 'python_executable',
- 'TestCmd'
-]
-
-try:
- import difflib
-except ImportError:
- __all__.append('simple_diff')
-
-def is_List(e):
- return type(e) is types.ListType \
- or isinstance(e, UserList.UserList)
-
-try:
- from UserString import UserString
-except ImportError:
- class UserString:
- pass
-
-if hasattr(types, 'UnicodeType'):
- def is_String(e):
- return type(e) is types.StringType \
- or type(e) is types.UnicodeType \
- or isinstance(e, UserString)
-else:
- def is_String(e):
- return type(e) is types.StringType or isinstance(e, UserString)
-
-tempfile.template = 'testcmd.'
-if os.name in ('posix', 'nt'):
- tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
-else:
- tempfile.template = 'testcmd.'
-
-re_space = re.compile('\s')
-
-_Cleanup = []
-
-_chain_to_exitfunc = None
-
-def _clean():
- global _Cleanup
- cleanlist = filter(None, _Cleanup)
- del _Cleanup[:]
- cleanlist.reverse()
- for test in cleanlist:
- test.cleanup()
- if _chain_to_exitfunc:
- _chain_to_exitfunc()
-
-try:
- import atexit
-except ImportError:
- # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
- try:
- _chain_to_exitfunc = sys.exitfunc
- except AttributeError:
- pass
- sys.exitfunc = _clean
-else:
- atexit.register(_clean)
-
-try:
- zip
-except NameError:
- def zip(*lists):
- result = []
- for i in xrange(min(map(len, lists))):
- result.append(tuple(map(lambda l, i=i: l[i], lists)))
- return result
-
-class Collector:
- def __init__(self, top):
- self.entries = [top]
- def __call__(self, arg, dirname, names):
- pathjoin = lambda n, d=dirname: os.path.join(d, n)
- self.entries.extend(map(pathjoin, names))
-
-def _caller(tblist, skip):
- string = ""
- arr = []
- for file, line, name, text in tblist:
- if file[-10:] == "TestCmd.py":
- break
- arr = [(file, line, name, text)] + arr
- atfrom = "at"
- for file, line, name, text in arr[skip:]:
- if name in ("?", "<module>"):
- name = ""
- else:
- name = " (" + name + ")"
- string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
- atfrom = "\tfrom"
- return string
-
-def fail_test(self = None, condition = 1, function = None, skip = 0):
- """Cause the test to fail.
-
- By default, the fail_test() method reports that the test FAILED
- and exits with a status of 1. If a condition argument is supplied,
- the test fails only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- of = ""
- desc = ""
- sep = " "
- if not self is None:
- if self.program:
- of = " of " + self.program
- sep = "\n\t"
- if self.description:
- desc = " [" + self.description + "]"
- sep = "\n\t"
-
- at = _caller(traceback.extract_stack(), skip)
- sys.stderr.write("FAILED test" + of + desc + sep + at)
-
- sys.exit(1)
-
-def no_result(self = None, condition = 1, function = None, skip = 0):
- """Causes a test to exit with no valid result.
-
- By default, the no_result() method reports NO RESULT for the test
- and exits with a status of 2. If a condition argument is supplied,
- the test fails only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- of = ""
- desc = ""
- sep = " "
- if not self is None:
- if self.program:
- of = " of " + self.program
- sep = "\n\t"
- if self.description:
- desc = " [" + self.description + "]"
- sep = "\n\t"
-
- if os.environ.get('TESTCMD_DEBUG_SKIPS'):
- at = _caller(traceback.extract_stack(), skip)
- sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
- else:
- sys.stderr.write("NO RESULT\n")
-
- sys.exit(2)
-
-def pass_test(self = None, condition = 1, function = None):
- """Causes a test to pass.
-
- By default, the pass_test() method reports PASSED for the test
- and exits with a status of 0. If a condition argument is supplied,
- the test passes only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- sys.stderr.write("PASSED\n")
- sys.exit(0)
-
-def match_exact(lines = None, matches = None):
- """
- """
- if not is_List(lines):
- lines = string.split(lines, "\n")
- if not is_List(matches):
- matches = string.split(matches, "\n")
- if len(lines) != len(matches):
- return
- for i in range(len(lines)):
- if lines[i] != matches[i]:
- return
- return 1
-
-def match_re(lines = None, res = None):
- """
- """
- if not is_List(lines):
- lines = string.split(lines, "\n")
- if not is_List(res):
- res = string.split(res, "\n")
- if len(lines) != len(res):
- return
- for i in range(len(lines)):
- s = "^" + res[i] + "$"
- try:
- expr = re.compile(s)
- except re.error, e:
- msg = "Regular expression error in %s: %s"
- raise re.error, msg % (repr(s), e[0])
- if not expr.search(lines[i]):
- return
- return 1
-
-def match_re_dotall(lines = None, res = None):
- """
- """
- if not type(lines) is type(""):
- lines = string.join(lines, "\n")
- if not type(res) is type(""):
- res = string.join(res, "\n")
- s = "^" + res + "$"
- try:
- expr = re.compile(s, re.DOTALL)
- except re.error, e:
- msg = "Regular expression error in %s: %s"
- raise re.error, msg % (repr(s), e[0])
- if expr.match(lines):
- return 1
-
-try:
- import difflib
-except ImportError:
- pass
-else:
- def simple_diff(a, b, fromfile='', tofile='',
- fromfiledate='', tofiledate='', n=3, lineterm='\n'):
- """
- A function with the same calling signature as difflib.context_diff
- (diff -c) and difflib.unified_diff (diff -u) but which prints
- output like the simple, unadorned 'diff" command.
- """
- sm = difflib.SequenceMatcher(None, a, b)
- def comma(x1, x2):
- return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
- result = []
- for op, a1, a2, b1, b2 in sm.get_opcodes():
- if op == 'delete':
- result.append("%sd%d" % (comma(a1, a2), b1))
- result.extend(map(lambda l: '< ' + l, a[a1:a2]))
- elif op == 'insert':
- result.append("%da%s" % (a1, comma(b1, b2)))
- result.extend(map(lambda l: '> ' + l, b[b1:b2]))
- elif op == 'replace':
- result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
- result.extend(map(lambda l: '< ' + l, a[a1:a2]))
- result.append('---')
- result.extend(map(lambda l: '> ' + l, b[b1:b2]))
- return result
-
-def diff_re(a, b, fromfile='', tofile='',
- fromfiledate='', tofiledate='', n=3, lineterm='\n'):
- """
- A simple "diff" of two sets of lines when the expected lines
- are regular expressions. This is a really dumb thing that
- just compares each line in turn, so it doesn't look for
- chunks of matching lines and the like--but at least it lets
- you know exactly which line first didn't compare correctl...
- """
- result = []
- diff = len(a) - len(b)
- if diff < 0:
- a = a + ['']*(-diff)
- elif diff > 0:
- b = b + ['']*diff
- i = 0
- for aline, bline in zip(a, b):
- s = "^" + aline + "$"
- try:
- expr = re.compile(s)
- except re.error, e:
- msg = "Regular expression error in %s: %s"
- raise re.error, msg % (repr(s), e[0])
- if not expr.search(bline):
- result.append("%sc%s" % (i+1, i+1))
- result.append('< ' + repr(a[i]))
- result.append('---')
- result.append('> ' + repr(b[i]))
- i = i+1
- return result
-
-if os.name == 'java':
-
- python_executable = os.path.join(sys.prefix, 'jython')
-
-else:
-
- python_executable = sys.executable
-
-if sys.platform == 'win32':
-
- default_sleep_seconds = 2
-
- def where_is(file, path=None, pathext=None):
- if path is None:
- path = os.environ['PATH']
- if is_String(path):
- path = string.split(path, os.pathsep)
- if pathext is None:
- pathext = os.environ['PATHEXT']
- if is_String(pathext):
- pathext = string.split(pathext, os.pathsep)
- for ext in pathext:
- if string.lower(ext) == string.lower(file[-len(ext):]):
- pathext = ['']
- break
- for dir in path:
- f = os.path.join(dir, file)
- for ext in pathext:
- fext = f + ext
- if os.path.isfile(fext):
- return fext
- return None
-
-else:
-
- def where_is(file, path=None, pathext=None):
- if path is None:
- path = os.environ['PATH']
- if is_String(path):
- path = string.split(path, os.pathsep)
- for dir in path:
- f = os.path.join(dir, file)
- if os.path.isfile(f):
- try:
- st = os.stat(f)
- except OSError:
- continue
- if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
- return f
- return None
-
- default_sleep_seconds = 1
-
-
-
-try:
- import subprocess
-except ImportError:
- # The subprocess module doesn't exist in this version of Python,
- # so we're going to cobble up something that looks just enough
- # like its API for our purposes below.
- import new
-
- subprocess = new.module('subprocess')
-
- subprocess.PIPE = 'PIPE'
- subprocess.STDOUT = 'STDOUT'
- subprocess.mswindows = (sys.platform == 'win32')
-
- try:
- import popen2
- popen2.Popen3
- except AttributeError:
- class Popen3:
- universal_newlines = 1
- def __init__(self, command, **kw):
- if sys.platform == 'win32' and command[0] == '"':
- command = '"' + command + '"'
- (stdin, stdout, stderr) = os.popen3(' ' + command)
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- def close_output(self):
- self.stdout.close()
- self.resultcode = self.stderr.close()
- def wait(self):
- resultcode = self.resultcode
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
-
- else:
- try:
- popen2.Popen4
- except AttributeError:
- # A cribbed Popen4 class, with some retrofitted code from
- # the Python 1.5 Popen3 class methods to do certain things
- # by hand.
- class Popen4(popen2.Popen3):
- childerr = None
-
- def __init__(self, cmd, bufsize=-1):
- p2cread, p2cwrite = os.pipe()
- c2pread, c2pwrite = os.pipe()
- self.pid = os.fork()
- if self.pid == 0:
- # Child
- os.dup2(p2cread, 0)
- os.dup2(c2pwrite, 1)
- os.dup2(c2pwrite, 2)
- for i in range(3, popen2.MAXFD):
- try:
- os.close(i)
- except: pass
- try:
- os.execvp(cmd[0], cmd)
- finally:
- os._exit(1)
- # Shouldn't come here, I guess
- os._exit(1)
- os.close(p2cread)
- self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
- os.close(c2pwrite)
- self.fromchild = os.fdopen(c2pread, 'r', bufsize)
- popen2._active.append(self)
-
- popen2.Popen4 = Popen4
-
- class Popen3(popen2.Popen3, popen2.Popen4):
- universal_newlines = 1
- def __init__(self, command, **kw):
- if kw.get('stderr') == 'STDOUT':
- apply(popen2.Popen4.__init__, (self, command, 1))
- else:
- apply(popen2.Popen3.__init__, (self, command, 1))
- self.stdin = self.tochild
- self.stdout = self.fromchild
- self.stderr = self.childerr
- def wait(self, *args, **kw):
- resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
-
- subprocess.Popen = Popen3
-
-
-
-# From Josiah Carlson,
-# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
-# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
-
-PIPE = subprocess.PIPE
-
-if subprocess.mswindows:
- from win32file import ReadFile, WriteFile
- from win32pipe import PeekNamedPipe
- import msvcrt
-else:
- import select
- import fcntl
-
- try: fcntl.F_GETFL
- except AttributeError: fcntl.F_GETFL = 3
-
- try: fcntl.F_SETFL
- except AttributeError: fcntl.F_SETFL = 4
-
-class Popen(subprocess.Popen):
- def recv(self, maxsize=None):
- return self._recv('stdout', maxsize)
-
- def recv_err(self, maxsize=None):
- return self._recv('stderr', maxsize)
-
- def send_recv(self, input='', maxsize=None):
- return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
-
- def get_conn_maxsize(self, which, maxsize):
- if maxsize is None:
- maxsize = 1024
- elif maxsize < 1:
- maxsize = 1
- return getattr(self, which), maxsize
-
- def _close(self, which):
- getattr(self, which).close()
- setattr(self, which, None)
-
- if subprocess.mswindows:
- def send(self, input):
- if not self.stdin:
- return None
-
- try:
- x = msvcrt.get_osfhandle(self.stdin.fileno())
- (errCode, written) = WriteFile(x, input)
- except ValueError:
- return self._close('stdin')
- except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
- return self._close('stdin')
- raise
-
- return written
-
- def _recv(self, which, maxsize):
- conn, maxsize = self.get_conn_maxsize(which, maxsize)
- if conn is None:
- return None
-
- try:
- x = msvcrt.get_osfhandle(conn.fileno())
- (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
- if maxsize < nAvail:
- nAvail = maxsize
- if nAvail > 0:
- (errCode, read) = ReadFile(x, nAvail, None)
- except ValueError:
- return self._close(which)
- except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
- return self._close(which)
- raise
-
- #if self.universal_newlines:
- # read = self._translate_newlines(read)
- return read
-
- else:
- def send(self, input):
- if not self.stdin:
- return None
-
- if not select.select([], [self.stdin], [], 0)[1]:
- return 0
-
- try:
- written = os.write(self.stdin.fileno(), input)
- except OSError, why:
- if why[0] == errno.EPIPE: #broken pipe
- return self._close('stdin')
- raise
-
- return written
-
- def _recv(self, which, maxsize):
- conn, maxsize = self.get_conn_maxsize(which, maxsize)
- if conn is None:
- return None
-
- try:
- flags = fcntl.fcntl(conn, fcntl.F_GETFL)
- except TypeError:
- flags = None
- else:
- if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
-
- try:
- if not select.select([conn], [], [], 0)[0]:
- return ''
-
- r = conn.read(maxsize)
- if not r:
- return self._close(which)
-
- #if self.universal_newlines:
- # r = self._translate_newlines(r)
- return r
- finally:
- if not conn.closed and not flags is None:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags)
-
-disconnect_message = "Other end disconnected!"
-
-def recv_some(p, t=.1, e=1, tr=5, stderr=0):
- if tr < 1:
- tr = 1
- x = time.time()+t
- y = []
- r = ''
- pr = p.recv
- if stderr:
- pr = p.recv_err
- while time.time() < x or r:
- r = pr()
- if r is None:
- if e:
- raise Exception(disconnect_message)
- else:
- break
- elif r:
- y.append(r)
- else:
- time.sleep(max((x-time.time())/tr, 0))
- return ''.join(y)
-
-# TODO(3.0: rewrite to use memoryview()
-def send_all(p, data):
- while len(data):
- sent = p.send(data)
- if sent is None:
- raise Exception(disconnect_message)
- data = buffer(data, sent)
-
-
-
-try:
- object
-except NameError:
- class object:
- pass
-
-
-
-class TestCmd(object):
- """Class TestCmd
- """
-
- def __init__(self, description = None,
- program = None,
- interpreter = None,
- workdir = None,
- subdir = None,
- verbose = None,
- match = None,
- diff = None,
- combine = 0,
- universal_newlines = 1):
- self._cwd = os.getcwd()
- self.description_set(description)
- self.program_set(program)
- self.interpreter_set(interpreter)
- if verbose is None:
- try:
- verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
- except ValueError:
- verbose = 0
- self.verbose_set(verbose)
- self.combine = combine
- self.universal_newlines = universal_newlines
- if match is not None:
- self.match_function = match
- else:
- self.match_function = match_re
- if diff is not None:
- self.diff_function = diff
- else:
- try:
- difflib
- except NameError:
- pass
- else:
- self.diff_function = simple_diff
- #self.diff_function = difflib.context_diff
- #self.diff_function = difflib.unified_diff
- self._dirlist = []
- self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
- if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
- self._preserve['pass_test'] = os.environ['PRESERVE']
- self._preserve['fail_test'] = os.environ['PRESERVE']
- self._preserve['no_result'] = os.environ['PRESERVE']
- else:
- try:
- self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
- except KeyError:
- pass
- try:
- self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
- except KeyError:
- pass
- try:
- self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
- except KeyError:
- pass
- self._stdout = []
- self._stderr = []
- self.status = None
- self.condition = 'no_result'
- self.workdir_set(workdir)
- self.subdir(subdir)
-
- def __del__(self):
- self.cleanup()
-
- def __repr__(self):
- return "%x" % id(self)
-
- banner_char = '='
- banner_width = 80
-
- def banner(self, s, width=None):
- if width is None:
- width = self.banner_width
- return s + self.banner_char * (width - len(s))
-
- if os.name == 'posix':
-
- def escape(self, arg):
- "escape shell special characters"
- slash = '\\'
- special = '"$'
-
- arg = string.replace(arg, slash, slash+slash)
- for c in special:
- arg = string.replace(arg, c, slash+c)
-
- if re_space.search(arg):
- arg = '"' + arg + '"'
- return arg
-
- else:
-
- # Windows does not allow special characters in file names
- # anyway, so no need for an escape function, we will just quote
- # the arg.
- def escape(self, arg):
- if re_space.search(arg):
- arg = '"' + arg + '"'
- return arg
-
- def canonicalize(self, path):
- if is_List(path):
- path = apply(os.path.join, tuple(path))
- if not os.path.isabs(path):
- path = os.path.join(self.workdir, path)
- return path
-
- def chmod(self, path, mode):
- """Changes permissions on the specified file or directory
- path name."""
- path = self.canonicalize(path)
- os.chmod(path, mode)
-
- def cleanup(self, condition = None):
- """Removes any temporary working directories for the specified
- TestCmd environment. If the environment variable PRESERVE was
- set when the TestCmd environment was created, temporary working
- directories are not removed. If any of the environment variables
- PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
- when the TestCmd environment was created, then temporary working
- directories are not removed if the test passed, failed, or had
- no result, respectively. Temporary working directories are also
- preserved for conditions specified via the preserve method.
-
- Typically, this method is not called directly, but is used when
- the script exits to clean up temporary working directories as
- appropriate for the exit status.
- """
- if not self._dirlist:
- return
- os.chdir(self._cwd)
- self.workdir = None
- if condition is None:
- condition = self.condition
- if self._preserve[condition]:
- for dir in self._dirlist:
- print "Preserved directory", dir
- else:
- list = self._dirlist[:]
- list.reverse()
- for dir in list:
- self.writable(dir, 1)
- shutil.rmtree(dir, ignore_errors = 1)
- self._dirlist = []
-
- try:
- global _Cleanup
- _Cleanup.remove(self)
- except (AttributeError, ValueError):
- pass
-
- def command_args(self, program = None,
- interpreter = None,
- arguments = None):
- if program:
- if type(program) == type('') and not os.path.isabs(program):
- program = os.path.join(self._cwd, program)
- else:
- program = self.program
- if not interpreter:
- interpreter = self.interpreter
- if not type(program) in [type([]), type(())]:
- program = [program]
- cmd = list(program)
- if interpreter:
- if not type(interpreter) in [type([]), type(())]:
- interpreter = [interpreter]
- cmd = list(interpreter) + cmd
- if arguments:
- if type(arguments) == type(''):
- arguments = string.split(arguments)
- cmd.extend(arguments)
- return cmd
-
- def description_set(self, description):
- """Set the description of the functionality being tested.
- """
- self.description = description
-
- try:
- difflib
- except NameError:
- def diff(self, a, b, name, *args, **kw):
- print self.banner('Expected %s' % name)
- print a
- print self.banner('Actual %s' % name)
- print b
- else:
- def diff(self, a, b, name, *args, **kw):
- print self.banner(name)
- args = (a.splitlines(), b.splitlines()) + args
- lines = apply(self.diff_function, args, kw)
- for l in lines:
- print l
-
- def fail_test(self, condition = 1, function = None, skip = 0):
- """Cause the test to fail.
- """
- if not condition:
- return
- self.condition = 'fail_test'
- fail_test(self = self,
- condition = condition,
- function = function,
- skip = skip)
-
- def interpreter_set(self, interpreter):
- """Set the program to be used to interpret the program
- under test as a script.
- """
- self.interpreter = interpreter
-
- def match(self, lines, matches):
- """Compare actual and expected file contents.
- """
- return self.match_function(lines, matches)
-
- def match_exact(self, lines, matches):
- """Compare actual and expected file contents.
- """
- return match_exact(lines, matches)
-
- def match_re(self, lines, res):
- """Compare actual and expected file contents.
- """
- return match_re(lines, res)
-
- def match_re_dotall(self, lines, res):
- """Compare actual and expected file contents.
- """
- return match_re_dotall(lines, res)
-
- def no_result(self, condition = 1, function = None, skip = 0):
- """Report that the test could not be run.
- """
- if not condition:
- return
- self.condition = 'no_result'
- no_result(self = self,
- condition = condition,
- function = function,
- skip = skip)
-
- def pass_test(self, condition = 1, function = None):
- """Cause the test to pass.
- """
- if not condition:
- return
- self.condition = 'pass_test'
- pass_test(self = self, condition = condition, function = function)
-
- def preserve(self, *conditions):
- """Arrange for the temporary working directories for the
- specified TestCmd environment to be preserved for one or more
- conditions. If no conditions are specified, arranges for
- the temporary working directories to be preserved for all
- conditions.
- """
- if conditions is ():
- conditions = ('pass_test', 'fail_test', 'no_result')
- for cond in conditions:
- self._preserve[cond] = 1
-
- def program_set(self, program):
- """Set the executable program or script to be tested.
- """
- if program and not os.path.isabs(program):
- program = os.path.join(self._cwd, program)
- self.program = program
-
- def read(self, file, mode = 'rb'):
- """Reads and returns the contents of the specified file name.
- The file name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The file is
- assumed to be under the temporary working directory unless it
- is an absolute path name. The I/O mode for the file may
- be specified; it must begin with an 'r'. The default is
- 'rb' (binary read).
- """
- file = self.canonicalize(file)
- if mode[0] != 'r':
- raise ValueError, "mode must begin with 'r'"
- with open(file, mode) as f:
- result = f.read()
- return result
-
- def rmdir(self, dir):
- """Removes the specified dir name.
- The dir name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The dir is
- assumed to be under the temporary working directory unless it
- is an absolute path name.
- The dir must be empty.
- """
- dir = self.canonicalize(dir)
- os.rmdir(dir)
-
- def start(self, program = None,
- interpreter = None,
- arguments = None,
- universal_newlines = None,
- **kw):
- """
- Starts a program or script for the test environment.
-
- The specified program will have the original directory
- prepended unless it is enclosed in a [list].
- """
- cmd = self.command_args(program, interpreter, arguments)
- cmd_string = string.join(map(self.escape, cmd), ' ')
- if self.verbose:
- sys.stderr.write(cmd_string + "\n")
- if universal_newlines is None:
- universal_newlines = self.universal_newlines
-
- # On Windows, if we make stdin a pipe when we plan to send
- # no input, and the test program exits before
- # Popen calls msvcrt.open_osfhandle, that call will fail.
- # So don't use a pipe for stdin if we don't need one.
- stdin = kw.get('stdin', None)
- if stdin is not None:
- stdin = subprocess.PIPE
-
- combine = kw.get('combine', self.combine)
- if combine:
- stderr_value = subprocess.STDOUT
- else:
- stderr_value = subprocess.PIPE
-
- return Popen(cmd,
- stdin=stdin,
- stdout=subprocess.PIPE,
- stderr=stderr_value,
- universal_newlines=universal_newlines)
-
- def finish(self, popen, **kw):
- """
- Finishes and waits for the process being run under control of
- the specified popen argument, recording the exit status,
- standard output and error output.
- """
- popen.stdin.close()
- self.status = popen.wait()
- if not self.status:
- self.status = 0
- self._stdout.append(popen.stdout.read())
- if popen.stderr:
- stderr = popen.stderr.read()
- else:
- stderr = ''
- self._stderr.append(stderr)
-
- def run(self, program = None,
- interpreter = None,
- arguments = None,
- chdir = None,
- stdin = None,
- universal_newlines = None):
- """Runs a test of the program or script for the test
- environment. Standard output and error output are saved for
- future retrieval via the stdout() and stderr() methods.
-
- The specified program will have the original directory
- prepended unless it is enclosed in a [list].
- """
- if chdir:
- oldcwd = os.getcwd()
- if not os.path.isabs(chdir):
- chdir = os.path.join(self.workpath(chdir))
- if self.verbose:
- sys.stderr.write("chdir(" + chdir + ")\n")
- os.chdir(chdir)
- p = self.start(program,
- interpreter,
- arguments,
- universal_newlines,
- stdin=stdin)
- if stdin:
- if is_List(stdin):
- for line in stdin:
- p.stdin.write(line)
- else:
- p.stdin.write(stdin)
- p.stdin.close()
-
- out = p.stdout.read()
- if p.stderr is None:
- err = ''
- else:
- err = p.stderr.read()
- try:
- close_output = p.close_output
- except AttributeError:
- p.stdout.close()
- if not p.stderr is None:
- p.stderr.close()
- else:
- close_output()
-
- self._stdout.append(out)
- self._stderr.append(err)
-
- self.status = p.wait()
- if not self.status:
- self.status = 0
-
- if chdir:
- os.chdir(oldcwd)
- if self.verbose >= 2:
- write = sys.stdout.write
- write('============ STATUS: %d\n' % self.status)
- out = self.stdout()
- if out or self.verbose >= 3:
- write('============ BEGIN STDOUT (len=%d):\n' % len(out))
- write(out)
- write('============ END STDOUT\n')
- err = self.stderr()
- if err or self.verbose >= 3:
- write('============ BEGIN STDERR (len=%d)\n' % len(err))
- write(err)
- write('============ END STDERR\n')
-
- def sleep(self, seconds = default_sleep_seconds):
- """Sleeps at least the specified number of seconds. If no
- number is specified, sleeps at least the minimum number of
- seconds necessary to advance file time stamps on the current
- system. Sleeping more seconds is all right.
- """
- time.sleep(seconds)
-
- def stderr(self, run = None):
- """Returns the error output from the specified run number.
- If there is no specified run number, then returns the error
- output of the last run. If the run number is less than zero,
- then returns the error output from that many runs back from the
- current run.
- """
- if not run:
- run = len(self._stderr)
- elif run < 0:
- run = len(self._stderr) + run
- run = run - 1
- return self._stderr[run]
-
- def stdout(self, run = None):
- """Returns the standard output from the specified run number.
- If there is no specified run number, then returns the standard
- output of the last run. If the run number is less than zero,
- then returns the standard output from that many runs back from
- the current run.
- """
- if not run:
- run = len(self._stdout)
- elif run < 0:
- run = len(self._stdout) + run
- run = run - 1
- return self._stdout[run]
-
- def subdir(self, *subdirs):
- """Create new subdirectories under the temporary working
- directory, one for each argument. An argument may be a list,
- in which case the list elements are concatenated using the
- os.path.join() method. Subdirectories multiple levels deep
- must be created using a separate argument for each level:
-
- test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
-
- Returns the number of subdirectories actually created.
- """
- count = 0
- for sub in subdirs:
- if sub is None:
- continue
- if is_List(sub):
- sub = apply(os.path.join, tuple(sub))
- new = os.path.join(self.workdir, sub)
- try:
- os.mkdir(new)
- except OSError:
- pass
- else:
- count = count + 1
- return count
-
- def symlink(self, target, link):
- """Creates a symlink to the specified target.
- The link name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The link is
- assumed to be under the temporary working directory unless it
- is an absolute path name. The target is *not* assumed to be
- under the temporary working directory.
- """
- link = self.canonicalize(link)
- os.symlink(target, link)
-
- def tempdir(self, path=None):
- """Creates a temporary directory.
- A unique directory name is generated if no path name is specified.
- The directory is created, and will be removed when the TestCmd
- object is destroyed.
- """
- if path is None:
- try:
- path = tempfile.mktemp(prefix=tempfile.template)
- except TypeError:
- path = tempfile.mktemp()
- os.mkdir(path)
-
- # Symlinks in the path will report things
- # differently from os.getcwd(), so chdir there
- # and back to fetch the canonical path.
- cwd = os.getcwd()
- try:
- os.chdir(path)
- path = os.getcwd()
- finally:
- os.chdir(cwd)
-
- # Uppercase the drive letter since the case of drive
- # letters is pretty much random on win32:
- drive,rest = os.path.splitdrive(path)
- if drive:
- path = string.upper(drive) + rest
-
- #
- self._dirlist.append(path)
- global _Cleanup
- try:
- _Cleanup.index(self)
- except ValueError:
- _Cleanup.append(self)
-
- return path
-
- def touch(self, path, mtime=None):
- """Updates the modification time on the specified file or
- directory path name. The default is to update to the
- current time if no explicit modification time is specified.
- """
- path = self.canonicalize(path)
- atime = os.path.getatime(path)
- if mtime is None:
- mtime = time.time()
- os.utime(path, (atime, mtime))
-
- def unlink(self, file):
- """Unlinks the specified file name.
- The file name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The file is
- assumed to be under the temporary working directory unless it
- is an absolute path name.
- """
- file = self.canonicalize(file)
- os.unlink(file)
-
- def verbose_set(self, verbose):
- """Set the verbose level.
- """
- self.verbose = verbose
-
- def where_is(self, file, path=None, pathext=None):
- """Find an executable file.
- """
- if is_List(file):
- file = apply(os.path.join, tuple(file))
- if not os.path.isabs(file):
- file = where_is(file, path, pathext)
- return file
-
- def workdir_set(self, path):
- """Creates a temporary working directory with the specified
- path name. If the path is a null string (''), a unique
- directory name is created.
- """
- if (path != None):
- if path == '':
- path = None
- path = self.tempdir(path)
- self.workdir = path
-
- def workpath(self, *args):
- """Returns the absolute path name to a subdirectory or file
- within the current temporary working directory. Concatenates
- the temporary working directory name with the specified
- arguments using the os.path.join() method.
- """
- return apply(os.path.join, (self.workdir,) + tuple(args))
-
- def readable(self, top, read=1):
- """Make the specified directory tree readable (read == 1)
- or not (read == None).
-
- This method has no effect on Windows systems, which use a
- completely different mechanism to control file readability.
- """
-
- if sys.platform == 'win32':
- return
-
- if read:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
- else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
-
- if os.path.isfile(top):
- # If it's a file, that's easy, just chmod it.
- do_chmod(top)
- elif read:
- # It's a directory and we're trying to turn on read
- # permission, so it's also pretty easy, just chmod the
- # directory and then chmod every entry on our walk down the
- # tree. Because os.path.walk() is top-down, we'll enable
- # read permission on any directories that have it disabled
- # before os.path.walk() tries to list their contents.
- do_chmod(top)
-
- def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
- for n in names:
- do_chmod(os.path.join(dirname, n))
-
- os.path.walk(top, chmod_entries, None)
- else:
- # It's a directory and we're trying to turn off read
- # permission, which means we have to chmod the directoreis
- # in the tree bottom-up, lest disabling read permission from
- # the top down get in the way of being able to get at lower
- # parts of the tree. But os.path.walk() visits things top
- # down, so we just use an object to collect a list of all
- # of the entries in the tree, reverse the list, and then
- # chmod the reversed (bottom-up) list.
- col = Collector(top)
- os.path.walk(top, col, None)
- col.entries.reverse()
- for d in col.entries: do_chmod(d)
-
- def writable(self, top, write=1):
- """Make the specified directory tree writable (write == 1)
- or not (write == None).
- """
-
- if sys.platform == 'win32':
-
- if write:
- def do_chmod(fname):
- try: os.chmod(fname, stat.S_IWRITE)
- except OSError: pass
- else:
- def do_chmod(fname):
- try: os.chmod(fname, stat.S_IREAD)
- except OSError: pass
-
- else:
-
- if write:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
- else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
-
- if os.path.isfile(top):
- do_chmod(top)
- else:
- col = Collector(top)
- os.path.walk(top, col, None)
- for d in col.entries: do_chmod(d)
-
- def executable(self, top, execute=1):
- """Make the specified directory tree executable (execute == 1)
- or not (execute == None).
-
- This method has no effect on Windows systems, which use a
- completely different mechanism to control file executability.
- """
-
- if sys.platform == 'win32':
- return
-
- if execute:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
- else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
-
- if os.path.isfile(top):
- # If it's a file, that's easy, just chmod it.
- do_chmod(top)
- elif execute:
- # It's a directory and we're trying to turn on execute
- # permission, so it's also pretty easy, just chmod the
- # directory and then chmod every entry on our walk down the
- # tree. Because os.path.walk() is top-down, we'll enable
- # execute permission on any directories that have it disabled
- # before os.path.walk() tries to list their contents.
- do_chmod(top)
-
- def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
- for n in names:
- do_chmod(os.path.join(dirname, n))
-
- os.path.walk(top, chmod_entries, None)
- else:
- # It's a directory and we're trying to turn off execute
- # permission, which means we have to chmod the directories
- # in the tree bottom-up, lest disabling execute permission from
- # the top down get in the way of being able to get at lower
- # parts of the tree. But os.path.walk() visits things top
- # down, so we just use an object to collect a list of all
- # of the entries in the tree, reverse the list, and then
- # chmod the reversed (bottom-up) list.
- col = Collector(top)
- os.path.walk(top, col, None)
- col.entries.reverse()
- for d in col.entries: do_chmod(d)
-
- def write(self, file, content, mode = 'wb'):
- """Writes the specified content text (second argument) to the
- specified file name (first argument). The file name may be
- a list, in which case the elements are concatenated with the
- os.path.join() method. The file is created under the temporary
- working directory. Any subdirectories in the path must already
- exist. The I/O mode for the file may be specified; it must
- begin with a 'w'. The default is 'wb' (binary write).
- """
- file = self.canonicalize(file)
- if mode[0] != 'w':
- raise ValueError, "mode must begin with 'w'"
- with open(file, mode) as f:
- f.write(content)
-
-# Local Variables:
-# tab-width:4
-# indent-tabs-mode:nil
-# End:
-# vim: set expandtab tabstop=4 shiftwidth=4: