summaryrefslogtreecommitdiff
path: root/lib/testresources/__init__.py
blob: 859584bc1dbe6d6abc8cec3565d763d99d610fb1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
#  testresources: extensions to python unittest to allow declaritive use
#  of resources by test cases.
#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
#

"""TestResources: declarative management of external resources for tests."""

import inspect
import unittest


def test_suite():
    import testresources.tests
    return testresources.tests.test_suite()


def split_by_resources(tests):
    """Split a list of tests by the resources that the tests use.

    :return: a dictionary mapping sets of resources to lists of tests
    using that combination of resources.  The dictionary always
    contains an entry for "no resources".
    """
    no_resources = frozenset()
    resource_set_tests = {no_resources: []}
    for test in tests:
        resources = getattr(test, "resources", ())
        all_resources = list(resource.neededResources() for _, resource in  resources)
        resource_set = set()
        for resource_list in all_resources:
            resource_set.update(resource_list)
        resource_set_tests.setdefault(frozenset(resource_set), []).append(test)
    return resource_set_tests


class OptimisingTestSuite(unittest.TestSuite):
    """A resource creation optimising TestSuite."""

    def adsorbSuite(self, test_case_or_suite):
        """Deprecated. Use addTest instead."""
        self.addTest(test_case_or_suite)

    def addTest(self, test_case_or_suite):
        """Add `test_case_or_suite`, unwrapping standard TestSuites.

        This means that any containing unittest.TestSuites will be removed,
        while any custom test suites will be 'distributed' across their
        members. Thus addTest(CustomSuite([a, b])) will result in
        CustomSuite([a]) and CustomSuite([b]) being added to this suite.
        """
        try:
            tests = iter(test_case_or_suite)
        except TypeError:
            unittest.TestSuite.addTest(self, test_case_or_suite)
            return
        if test_case_or_suite.__class__ in (unittest.TestSuite, OptimisingTestSuite):
            for test in tests:
                self.adsorbSuite(test)
        else:
            for test in tests:
                unittest.TestSuite.addTest(
                    self, test_case_or_suite.__class__([test]))

    def cost_of_switching(self, old_resource_set, new_resource_set):
        """Cost of switching from 'old_resource_set' to 'new_resource_set'.

        This is calculated by adding the cost of tearing down unnecessary
        resources to the cost of setting up the newly-needed resources.

        Note that resources which are always dirtied may skew the predicted
        skew the cost of switching because they are considered common, even
        when reusing them may actually be equivalent to a teardown+setup
        operation.
        """
        new_resources = new_resource_set - old_resource_set
        gone_resources = old_resource_set - new_resource_set
        return (sum(resource.setUpCost for resource in new_resources) +
            sum(resource.tearDownCost for resource in gone_resources))

    def switch(self, old_resource_set, new_resource_set, result):
        """Switch from 'old_resource_set' to 'new_resource_set'.

        Tear down resources in old_resource_set that aren't in
        new_resource_set and set up resources that are in new_resource_set but
        not in old_resource_set.

        :param result: TestResult object to report activity on.
        """
        new_resources = new_resource_set - old_resource_set
        old_resources = old_resource_set - new_resource_set
        for resource in old_resources:
            resource.finishedWith(resource._currentResource, result)
        for resource in new_resources:
            resource.getResource(result)

    def run(self, result):
        self.sortTests()
        current_resources = set()
        for test in self._tests:
            if result.shouldStop:
                break
            resources = getattr(test, 'resources', [])
            new_resources = set()
            for name, resource in resources:
                new_resources.update(resource.neededResources())
            self.switch(current_resources, new_resources, result)
            current_resources = new_resources
            test(result)
        self.switch(current_resources, set(), result)
        return result

    def sortTests(self):
        """Attempt to topographically sort the contained tests.

        Feel free to override to improve the sort behaviour.
        """
        # We group the tests by the resource combinations they use,
        # since there will usually be fewer resource combinations than
        # actual tests and there can never be more.
        resource_set_tests = split_by_resources(self._tests)

        graph = self._getGraph(resource_set_tests.keys())
        no_resources = frozenset()
        # Recursive visit-all-nodes all-permutations.
        def cost(from_set, resource_sets):
            """Get the cost of resource traversal for resource sets.

            :return: cost, order
            """
            if not resource_sets:
                # tear down last resources
                return graph[from_set][no_resources], []
            costs = []
            for to_set in resource_sets:
                child_cost, child_order = cost(
                    to_set, resource_sets - set([to_set]))
                costs.append((graph[from_set][to_set] + child_cost,
                              [to_set] + child_order))
            return min(costs)
        _, order = cost(no_resources,
                        set(resource_set_tests) - set([no_resources]))
        order.append(no_resources)
        self._tests = sum(
            (resource_set_tests[resource_set] for resource_set in order), [])

    def _getGraph(self, resource_sets):
        """Build a graph of the resource-using nodes.

        :return: A complete directed graph of the switching costs
            between each resource combination.
        """
        graph = {}
        for from_set in resource_sets:
            graph[from_set] = {}
            for to_set in resource_sets:
                if from_set is to_set:
                    graph[from_set][to_set] = 0
                else:
                    graph[from_set][to_set] = self.cost_of_switching(
                        from_set, to_set)
        return graph


class TestLoader(unittest.TestLoader):
    """Custom TestLoader to set the right TestSuite class."""
    suiteClass = OptimisingTestSuite


class TestResource(object):
    """A resource that can be shared across tests.

    Resources can report activity to a TestResult. The methods
     - startCleanResource(resource)
     - stopCleanResource(resource)
     - startMakeResource(resource)
     - stopMakeResource(resource)
    will be looked for and if present invoked before and after cleaning or
    creation of resource objects takes place.

    :cvar resources: The same as the resources list on an instance, the default
        constructor will look for the class instance and copy it. This is a
        convenience to avoid needing to define __init__ solely to alter the
        dependencies list.
    :ivar resources: The resources that this resource needs. Calling
        neededResources will return the closure of this resource and its needed
        resources. The resources list is in the same format as resources on a 
        test case - a list of tuples (attribute_name, resource).
    :ivar setUpCost: The relative cost to construct a resource of this type.
         One good approach is to set this to the number of seconds it normally
         takes to set up the resource.
    :ivar tearDownCost: The relative cost to tear down a resource of this
         type. One good approach is to set this to the number of seconds it
         normally takes to tear down the resource.
    """

    setUpCost = 1
    tearDownCost = 1

    def __init__(self):
        """Create a TestResource object."""
        self._dirty = False
        self._uses = 0
        self._currentResource = None
        self.resources = list(getattr(self.__class__, "resources", []))

    def _call_result_method_if_exists(self, result, methodname, *args):
        """Call a method on a TestResult that may exist."""
        method = getattr(result, methodname, None)
        if callable(method):
            method(*args)

    def _clean_all(self, resource, result):
        """Clean the dependencies from resource, and then resource itself."""
        self._call_result_method_if_exists(result, "startCleanResource", self)
        self.clean(resource)
        for name, manager in self.resources:
            manager.finishedWith(getattr(resource, name))
        self._call_result_method_if_exists(result, "stopCleanResource", self)

    def clean(self, resource):
        """Override this to class method to hook into resource removal."""

    def dirtied(self, resource):
        """Mark the resource as having been 'dirtied'.

        A resource is dirty when it is no longer suitable for use by other
        tests.

        e.g. a shared database that has had rows changed.
        """
        self._dirty = True

    def finishedWith(self, resource, result=None):
        """Indicate that 'resource' has one less user.

        If there are no more registered users of 'resource' then we trigger
        the `clean` hook, which should do any resource-specific
        cleanup.

        :param resource: A resource returned by `TestResource.getResource`.
        :param result: An optional TestResult to report resource changes to.
        """
        self._uses -= 1
        if self._uses == 0:
            self._clean_all(resource, result)
            self._setResource(None)

    def getResource(self, result=None):
        """Get the resource for this class and record that it's being used.

        The resource is constructed using the `make` hook.

        Once done with the resource, pass it to `finishedWith` to indicated
        that it is no longer needed.
        :param result: An optional TestResult to report resource changes to.
        """
        if self._uses == 0:
            self._setResource(self._make_all(result))
        elif self.isDirty():
            self._setResource(self.reset(self._currentResource, result))
        self._uses += 1
        return self._currentResource

    def isDirty(self):
        """Return True if this managers cached resource is dirty.
        
        Calling when the resource is not currently held has undefined
        behaviour.
        """
        if self._dirty:
            return True
        for name, mgr in self.resources:
            if mgr.isDirty():
                return True
            res = mgr.getResource()
            try:
                if res is not getattr(self._currentResource, name):
                    return True
            finally:
                mgr.finishedWith(res)

    def _make_all(self, result):
        """Make the dependencies of this resource and this resource."""
        self._call_result_method_if_exists(result, "startMakeResource", self)
        dependency_resources = {}
        for name, resource in self.resources:
            dependency_resources[name] = resource.getResource()
        resource = self.make(dependency_resources)
        for name, value in dependency_resources.items():
            setattr(resource, name, value)
        self._call_result_method_if_exists(result, "stopMakeResource", self)
        return resource

    def make(self, dependency_resources):
        """Override this to construct resources.
        
        :param dependency_resources: A dict mapping name -> resource instance
            for the resources specified as dependencies.
        """
        raise NotImplementedError(
            "Override make to construct resources.")

    def neededResources(self):
        """Return the resources needed for this resource, including self.
        
        :return: A list of needed resources, in topological deepest-first
            order.
        """
        seen = set([self])
        result = []
        for name, resource in self.resources:
            for resource in resource.neededResources():
                if resource in seen:
                    continue
                seen.add(resource)
                result.append(resource)
        result.append(self)
        return result

    def reset(self, old_resource, result=None):
        """Overridable method to return a clean version of old_resource.

        By default, the resource will be cleaned then remade if it had
        previously been `dirtied`. 

        This function needs to take the dependent resource stack into
        consideration as _make_all and _clean_all do.

        :return: The new resource.
        :param result: An optional TestResult to report resource changes to.
        """
        if self._dirty:
            self._clean_all(old_resource, result)
            resource = self._make_all(result)
        else:
            resource = old_resource
        return resource

    def _setResource(self, new_resource):
        """Set the current resource to a new value."""
        self._currentResource = new_resource
        self._dirty = False


class GenericResource(TestResource):
    """A TestResource that decorates an external helper of some kind.

    GenericResource can be used to adapt an external resource so that 
    testresources can use it. By default the setUp and tearDown methods are
    called when making and cleaning the resource, and the resource is 
    considered permanently dirty, so it is torn down and brought up again
    between every use.

    The constructor method is called with the dependency resources dict::
        resource_factory(**dependency_resources)
    This permits naming those resources to match the contract of the setUp
    method.
    """

    def __init__(self, resource_factory, setup_method_name='setUp',
        teardown_method_name='tearDown'):
        """Create a GenericResource

        :param resource_factory: A factory to create a new resource.
        :param setup_method_name: Optional method name to call to setup the
            resource. Defaults to 'setUp'.
        :param teardown_method_name: Optional method name to call to tear down
            the resource. Defaults to 'tearDown'.
        """
        TestResource.__init__(self)
        self.resource_factory = resource_factory
        self.setup_method_name = setup_method_name
        self.teardown_method_name = teardown_method_name

    def clean(self, resource):
        getattr(resource, self.teardown_method_name)()

    def make(self, dependency_resources):
        result = self.resource_factory(**dependency_resources)
        getattr(result, self.setup_method_name)()
        return result

    def isDirty(self):
        return True


class ResourcedTestCase(unittest.TestCase):
    """A TestCase parent or utility that enables cross-test resource usage.

    :ivar resources: A list of (name, resource) pairs, where 'resource' is a
        subclass of `TestResource` and 'name' is the name of the attribute
        that the resource should be stored on.
    """

    resources = []

    def __get_result(self):
        # unittest hides the result. This forces us to look up the stack.
        # The result is passed to a run() or a __call__ method 4 or more frames
        # up: that method is what calls setUp and tearDown, and they call their
        # parent setUp etc. Its not guaranteed that the parameter to run will
        # be calls result as its not required to be a keyword parameter in 
        # TestCase. However, in practice, this works.
        stack = inspect.stack()
        for frame in stack[3:]:
            if frame[3] in ('run', '__call__'):
                # Not all frames called 'run' will be unittest. It could be a
                # reactor in trial, for instance.
                result = frame[0].f_locals.get('result')
                if (result is not None and
                    getattr(result, 'startTest', None) is not None):
                    return result

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.setUpResources()

    def setUpResources(self):
        """Set up any resources that this test needs."""
        result = self.__get_result()
        for resource in self.resources:
            setattr(self, resource[0], resource[1].getResource(result))

    def tearDown(self):
        self.tearDownResources()
        unittest.TestCase.tearDown(self)

    def tearDownResources(self):
        """Tear down any resources that this test declares."""
        result = self.__get_result()
        for resource in self.resources:
            resource[1].finishedWith(getattr(self, resource[0]), result)
            delattr(self, resource[0])