summaryrefslogtreecommitdiff
path: root/yarnlib/scenario_runner.py
blob: cbe2fad8ba4d2e6f5b888542c2599072b8e81985 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# Copyright 2014  Lars Wirzenius and Codethink Limited
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# =*= License: GPL-3+ =*=


import cliapp
import os

import yarnlib


def default_pre_step(scenario, step, step_number, step_env):
    '''Default callback run before each step in ScenarioRunner.

    Parameters:

    * `scenario`: The yarnlib.Scenario that is about to be run.
    * `step`: The yarnlib.ScenarioStep that is about to be run.
    * `step_number`: Per-scenario counter for the step that is about to be run,
                     starting from 1.
    * `step_env`: Environment that will be used for this step.
    * `return`: Any returned values are passed to `post_step_cb`.

    All parameters are passed as keyword-arguments, so the order of
    parameters is not important, and unused arguments can be ignored by
    putting **kwargs in the parameter definition.

    '''
    pass


def default_post_step(scenario, step, step_number, step_env,
                      exit, stdout, stderr, pre_step_userdata):
    '''Default callback run after each step in ScenarioRunner.

    Parameters:

    * `scenario`: The yarnlib.Scenario that has just been run.
    * `step`: The yarnlib.ScenarioStep that has just been run.
    * `step_number`: Per-scenario counter for the step that is about to be run,
                     starting from 1.
    * `step_env`: Environment that was used for this step.
    * `exit`: Return code of the step that was run.
    * `stdout`: Standard output of the step that was just run.
    * `stderr`: Standard error of the step that was just run.
    * `pre_step_userdata`: Return value from `pre_step_cb`.

    All parameters are passed as keyword-arguments, so the order of
    parameters is not important, and unused arguments can be ignored by
    putting **kwargs in the parameter definition.

    '''
    pass


# Arguably this could just be the run_scenario method with all the
# constructor's parameters passed in, but reeks of poor design with that
# many parameters.
# However, decoupling the ScenarioRunner from the scenarios allows the
# same scenarios to be run against different projects that aim to satisfy
# the same requirements, or multiple yarn suites on the same project,
# if the project aims to satisfy multiple sets of requirements.
# While this use is far less likely than having 1 yarn suite per project,
# it's a useful metric for deciding how to split the arguments between the
# constructor and the run_scenario method.
class ScenarioRunner(object):
    '''Sets up an environment to run scenarios on a Project.

    Parameters:

    * `shell_prelude`: Prefix to all shell scripts run, e.g. shell libraries.
    * `srcdir`: Path to the root of the source tree.
                This is available as the $SRCDIR environment variable
                and is the directory the scenarios are run from.
    * `extra_env`: dict, or iterable of pairs of variables to add to
                   the clean environment provided for every command.
    * `pre_step_cb`: Callback run before each step in a scenario.
                     See default_pre_step for an explanation of its signature.
                     Its return value is passed to `post_step_cb`.
    * `post_step_cb`: Callback run after each step in a scenario.
                     See default_post_step for an explanation of its signature.
    * `cmdrunner`: Function used to run step commands in place of
                   `cliapp.runcmd_unchecked`, intended to be replaceable
                   by unit tests.

    '''


    def __init__(self, shell_prelude, srcdir, extra_env=(),
                 pre_step_cb=default_pre_step, post_step_cb=default_post_step,
                 pre_scenario_cb=lambda *x: None,
                 post_scenario_cb=lambda *x: None,
                 cmdrunner=cliapp.runcmd_unchecked, testdir=None):
        self.shell_prelude = shell_prelude
        self.srcdir = srcdir
        self.env = self.clean_env(extra_env, SRCDIR=srcdir)
        self.pre_step_cb = pre_step_cb
        self.post_step_cb = post_step_cb
        self.pre_scenario_cb = pre_scenario_cb
        self.post_scenario_cb = post_scenario_cb
        self.cmdrunner = cmdrunner
        self.testdir = testdir

    def setup_scenario(self, scenario): # pragma: no cover
        scenario_dir = self.scenario_dir(self.testdir, scenario)
        os.mkdir(scenario_dir)
        datadir = self.datadir(scenario_dir)
        os.mkdir(datadir)
        homedir = self.homedir(datadir)
        os.mkdir(homedir)

        ud = self.pre_scenario_cb(scenario, datadir, homedir)
        return scenario_dir, datadir, homedir, ud

    def run_scenarios(self, scenarios): # pragma: no cover
        failed = []
        for scenario in scenarios:
            scenario_dir, datadir, homedir, ud = self.setup_scenario(scenario)
            if not self.run_scenario(scenario, datadir, homedir):
                failed.append(scenario)
            self.post_scenario_cb(scenario, datadir, homedir, ud)
        return failed

    @staticmethod
    def partition_steps(scenario):
        assuming = [s for s in scenario.steps if s.what == 'ASSUMING']
        cleanup = [s for s in scenario.steps if s.what == 'FINALLY']
        normal = [s for s in scenario.steps
                  if s.what not in ('ASSUMING', 'FINALLY')]
        return assuming, cleanup, normal

    def run_scenario(self, scenario, datadir, homedir):
        assuming, cleanup, normal = self.partition_steps(scenario)
        scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir)

        ok = True
        step_number = 1

        for step in assuming:
            exit = self.run_step(scenario, step, scenario_env, step_number)
            step_number += 1
            if exit != 0:
                break
        else:
            for step in normal:
                exit = self.run_step(scenario, step, scenario_env, step_number)
                step_number += 1
                if exit != 0:
                    ok = False
                    break

            for step in cleanup:
                exit = self.run_step(scenario, step, scenario_env, step_number)
                step_number += 1
                if exit != 0:
                    ok = False
                    break

        return ok

    def setup_step(self, step, scenario_env, scenario, step_number):
        m = yarnlib.implements_matches_step(step.implementation, step)
        assert m is not None
        step_env = dict(scenario_env)
        step_env.update(('MATCH_%d' % i, match) for (i, match)
                        in enumerate(m.groups(''), 1))

        # All parameters passed as keyword-arguments, so that the callback
        # may declare parameters in any order, and ignore any parameters
        # by specifying **kwargs
        pre_step_userdata = self.pre_step_cb(scenario=scenario, step=step,
                                             step_number=step_number,
                                             step_env=step_env)

        shell_script = '%s\n\n%s\n' % (
            self.shell_prelude, step.implementation.shell)
        return step_env, pre_step_userdata, shell_script

    def run_step(self, scenario, step, scenario_env, step_number):
        step_env, pre_step_userdata, shell_script = self.setup_step(
                        step=step, scenario_env=scenario_env,
                        scenario=scenario, step_number=step_number)
        exit, stdout, stderr = self.cmdrunner(
            ['sh', '-xeuc', shell_script], env=step_env, cwd=self.srcdir)

        # All parameters passed as keyword-arguments, so that the callback
        # may declare parameters in any order, and ignore any parameters
        # by specifying **kwargs
        self.post_step_cb(scenario=scenario, step=step,
                          step_number=step_number, step_env=step_env,
                          exit=exit, stdout=stdout, stderr=stderr,
                          pre_step_userdata=pre_step_userdata)

        return exit

    @staticmethod
    def clean_env(extra_env, **kwarg_env):
        '''Return a clean environment for running tests.'''

        whitelisted = [
            'PATH',
        ]

        hardcoded = {
            'TERM': 'dumb',
            'SHELL': '/bin/sh',
            'LC_ALL': 'C',
            'USER': 'tomjon',
            'USERNAME': 'tomjon',
            'LOGNAME': 'tomjon',
        }

        env = {}

        for key in whitelisted:
            if key in os.environ:
                env[key] = os.environ[key]

        env.update(hardcoded)
        env.update(extra_env)
        env.update(kwarg_env)

        return env

    @classmethod
    def scenario_dir(cls, testdir, scenario): # pragma: no cover
        return os.path.join(testdir, cls.nice(scenario.name))

    @staticmethod
    def datadir(scenario_dir): # pragma: no cover
        return os.path.join(scenario_dir, 'datadir')

    @staticmethod
    def homedir(datadir): # pragma: no cover
        return os.path.join(datadir, 'HOME')

    @staticmethod
    def nice(name): # pragma: no cover
        # Quote a scenario or step name so it forms a nice filename.
        nice_chars = "abcdefghijklmnopqrstuvwxyz"
        nice_chars += nice_chars.upper()
        nice_chars += "0123456789-."

        nice = []
        for c in name:
            if c in nice_chars:
                nice.append(c)
            elif not nice or nice[-1] != '_':
                nice.append('_')
        nice = ''.join(nice)
        return nice