summaryrefslogtreecommitdiff
path: root/webtest/ext.py
blob: e75e7473382200f76ce8cd1279c329e60240ef52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
__doc__ = '''Allow to run an external process to test your application'''
from webtest import app as testapp
from webtest.http import StopableWSGIServer
from contextlib import contextmanager
from six import binary_type
import subprocess
import logging
import tempfile
import shutil
import time
import sys
import re
import os

log = logging.getLogger('nose.casperjs')
stderr = sys.stderr


class TestApp(testapp.TestApp):
    """Run the test application in a separate thread to allow to access it via
    http"""

    def __init__(self, app=None, url=None, timeout=30000,
                 extra_environ=None, relative_to=None, **kwargs):
        super(TestApp, self).__init__(app, relative_to=relative_to)
        self.server = StopableWSGIServer.create(app)
        self.server.wait()
        self.application_url = self.server.application_url
        os.environ['APPLICATION_URL'] = self.application_url
        self.extra_environ = extra_environ or {}
        self.timeout = timeout
        self.test_app = self

    def get_binary(self, name):
        if os.path.isfile(name):
            return name
        for path in (os.getcwd(), '/usr/local', '/usr', '/opt'):
            filename = os.path.join(path, 'bin', name)
            if os.path.isfile(filename):
                return filename
        return None

    def close(self):
        """Close WSGI server if needed"""
        if self.server:
            self.server.shutdown()

_re_result = re.compile(
            r'.*([0-9]+ tests executed, [0-9]+ passed, ([0-9]+) failed).*')


@contextmanager
def casperjs(test_app, timeout=60):
    """A context manager to run a test with a :class:`webtest.ext.TestApp`"""
    app = TestApp(test_app.app)
    binary = app.get_binary('casperjs')
    tempdir = tempfile.mkdtemp(prefix='casperjs')

    def run(script, *args):
        dirname = os.path.dirname(sys._getframe(1).f_code.co_filename)
        log = os.path.join(tempdir, script + '.log')
        script = os.path.join(dirname, script)
        if binary:
            stdout = open(log, 'ab+')
            cmd = [binary, 'test'] + list(args) + [script]
            p = subprocess.Popen(cmd,
                    stdout=stdout,
                    stderr=subprocess.PIPE)
            end = time.time() + timeout
            while time.time() < end:
                ret = p.poll()
                if ret is not None:
                    end = time.time() + 100
                    break
                time.sleep(.3)
            if time.time() < end:
                try:
                    p.kill()
                except OSError:
                    pass

            if os.path.isfile(log):
                with open(log) as fd:
                    output = fd.read()

            if isinstance(output, binary_type):
                output = output.decode('utf8', 'replace')

            fail = True
            match = _re_result.match(output.replace('\n', ' '))
            if match is not None:
                text, failed = match.groups()
                if int(failed):
                    fail = True
                else:
                    fail = False
                    stderr.write(text + ' ')

            if fail:
                print(output)
                raise AssertionError(
                    'Failure while running %s' % ' '.join(cmd))

    try:
        yield run
    finally:
        shutil.rmtree(tempdir)
        app.close()