summaryrefslogtreecommitdiff
path: root/openstack_dashboard/test/helpers.py
blob: 324810cedb0018d00e0b84ed45733006f934af21 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# Copyright 2012 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2012 Nebula, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections
import copy
from functools import wraps  # noqa
import json
import os


from ceilometerclient.v2 import client as ceilometer_client
from cinderclient import client as cinder_client
from django.conf import settings
from django.contrib.messages.storage import default_storage  # noqa
from django.core.handlers import wsgi
from django.core import urlresolvers
from django.test.client import RequestFactory  # noqa
from django.test import utils as django_test_utils
from django.utils.importlib import import_module  # noqa
from django.utils import unittest
import glanceclient
from heatclient import client as heat_client
import httplib2
from keystoneclient.v2_0 import client as keystone_client
import mock
import mox
from neutronclient.v2_0 import client as neutron_client
from novaclient.v2 import client as nova_client
from openstack_auth import user
from openstack_auth import utils
from saharaclient import client as sahara_client
from swiftclient import client as swift_client
from troveclient import client as trove_client

from horizon import base
from horizon import conf
from horizon.test import helpers as horizon_helpers
from openstack_dashboard import api
from openstack_dashboard import context_processors
from openstack_dashboard.test.test_data import utils as test_utils


# Makes output of failing mox tests much easier to read.
wsgi.WSGIRequest.__repr__ = lambda self: "<class 'django.http.HttpRequest'>"


def create_stubs(stubs_to_create={}):
    """decorator to simplify setting up multiple stubs at once via mox

    :param stubs_to_create: methods to stub in one or more modules
    :type stubs_to_create: dict

    The keys are python paths to the module containing the methods to mock.

    To mock a method in openstack_dashboard/api/nova.py, the key is::

        api.nova

    The values are either a tuple of list of methods to mock in the module
    indicated by the key.

    For example::

        ('server_list',)
            -or-
        ('flavor_list', 'server_list',)
            -or-
        ['flavor_list', 'server_list']

    Additionally, multiple modules can be mocked at once::

        {
            api.nova: ('flavor_list', 'server_list'),
            api.glance: ('image_list_detailed',),
        }

    """

    if not isinstance(stubs_to_create, dict):
        raise TypeError("create_stub must be passed a dict, but a %s was "
                        "given." % type(stubs_to_create).__name__)

    def inner_stub_out(fn):
        @wraps(fn)
        def instance_stub_out(self, *args, **kwargs):
            for key in stubs_to_create:
                if not (isinstance(stubs_to_create[key], tuple) or
                        isinstance(stubs_to_create[key], list)):
                    raise TypeError("The values of the create_stub "
                                    "dict must be lists or tuples, but "
                                    "is a %s."
                                    % type(stubs_to_create[key]).__name__)

                for value in stubs_to_create[key]:
                    self.mox.StubOutWithMock(key, value)
            return fn(self, *args, **kwargs)
        return instance_stub_out
    return inner_stub_out


class RequestFactoryWithMessages(RequestFactory):
    def get(self, *args, **kwargs):
        req = super(RequestFactoryWithMessages, self).get(*args, **kwargs)
        req.user = utils.get_user(req)
        req.session = []
        req._messages = default_storage(req)
        return req

    def post(self, *args, **kwargs):
        req = super(RequestFactoryWithMessages, self).post(*args, **kwargs)
        req.user = utils.get_user(req)
        req.session = []
        req._messages = default_storage(req)
        return req


@unittest.skipIf(os.environ.get('SKIP_UNITTESTS', False),
                 "The SKIP_UNITTESTS env variable is set.")
class TestCase(horizon_helpers.TestCase):
    """Specialized base test case class for Horizon.

    It gives access to numerous additional features:

      * A full suite of test data through various attached objects and
        managers (e.g. ``self.servers``, ``self.user``, etc.). See the
        docs for
        :class:`~openstack_dashboard.test.test_data.utils.TestData`
        for more information.
      * The ``mox`` mocking framework via ``self.mox``.
      * A set of request context data via ``self.context``.
      * A ``RequestFactory`` class which supports Django's ``contrib.messages``
        framework via ``self.factory``.
      * A ready-to-go request object via ``self.request``.
      * The ability to override specific time data controls for easier testing.
      * Several handy additional assertion methods.
    """
    def setUp(self):
        def fake_conn_request(*args, **kwargs):
            raise Exception("An external URI request tried to escape through "
                            "an httplib2 client. Args: %s, kwargs: %s"
                            % (args, kwargs))

        self._real_conn_request = httplib2.Http._conn_request
        httplib2.Http._conn_request = fake_conn_request

        self._real_context_processor = context_processors.openstack
        context_processors.openstack = lambda request: self.context

        self.patchers = {}
        self.add_panel_mocks()

        super(TestCase, self).setUp()

    def _setup_test_data(self):
        super(TestCase, self)._setup_test_data()
        test_utils.load_test_data(self)
        self.context = {'authorized_tenants': self.tenants.list()}

    def _setup_factory(self):
        # For some magical reason we need a copy of this here.
        self.factory = RequestFactoryWithMessages()

    def _setup_user(self):
        self._real_get_user = utils.get_user
        tenants = self.context['authorized_tenants']
        self.setActiveUser(id=self.user.id,
                           token=self.token,
                           username=self.user.name,
                           domain_id=self.domain.id,
                           tenant_id=self.tenant.id,
                           service_catalog=self.service_catalog,
                           authorized_tenants=tenants)

    def _setup_request(self):
        super(TestCase, self)._setup_request()
        self.request.session['token'] = self.token.id

    def add_panel_mocks(self):
        """Global mocks on panels that get called on all views."""
        self.patchers['aggregates'] = mock.patch(
            'openstack_dashboard.dashboards.admin'
            '.aggregates.panel.Aggregates.can_access',
            mock.Mock(return_value=True))
        self.patchers['aggregates'].start()

    def tearDown(self):
        httplib2.Http._conn_request = self._real_conn_request
        context_processors.openstack = self._real_context_processor
        utils.get_user = self._real_get_user
        mock.patch.stopall()
        super(TestCase, self).tearDown()

    def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
                      service_catalog=None, tenant_name=None, roles=None,
                      authorized_tenants=None, enabled=True, domain_id=None):
        def get_user(request):
            return user.User(id=id,
                             token=token,
                             user=username,
                             domain_id=domain_id,
                             tenant_id=tenant_id,
                             service_catalog=service_catalog,
                             roles=roles,
                             enabled=enabled,
                             authorized_tenants=authorized_tenants,
                             endpoint=settings.OPENSTACK_KEYSTONE_URL)
        utils.get_user = get_user

    def assertRedirectsNoFollow(self, response, expected_url):
        """Check for redirect.

        Asserts that the given response issued a 302 redirect without
        processing the view which is redirected to.
        """
        assert (response.status_code / 100 == 3), \
            "The response did not return a redirect."
        self.assertEqual(response._headers.get('location', None),
                         ('Location', settings.TESTSERVER + expected_url))
        self.assertEqual(response.status_code, 302)

    def assertNoFormErrors(self, response, context_name="form"):
        """Checks for no form errors.

        Asserts that the response either does not contain a form in its
        context, or that if it does, that form has no errors.
        """
        context = getattr(response, "context", {})
        if not context or context_name not in context:
            return True
        errors = response.context[context_name]._errors
        assert len(errors) == 0, \
            "Unexpected errors were found on the form: %s" % errors

    def assertFormErrors(self, response, count=0, message=None,
                         context_name="form"):
        """Check for form errors.

        Asserts that the response does contain a form in its
        context, and that form has errors, if count were given,
        it must match the exact numbers of errors
        """
        context = getattr(response, "context", {})
        assert (context and context_name in context), \
            "The response did not contain a form."
        errors = response.context[context_name]._errors
        if count:
            assert len(errors) == count, \
                "%d errors were found on the form, %d expected" % \
                (len(errors), count)
            if message and message not in unicode(errors):
                self.fail("Expected message not found, instead found: %s"
                          % ["%s: %s" % (key, [e for e in field_errors]) for
                             (key, field_errors) in errors.items()])
        else:
            assert len(errors) > 0, "No errors were found on the form"

    def assertStatusCode(self, response, expected_code):
        """Validates an expected status code.

        Matches camel case of other assert functions
        """
        if response.status_code == expected_code:
            return
        self.fail('status code %r != %r: %s' % (response.status_code,
                                                expected_code,
                                                response.content))

    def assertItemsCollectionEqual(self, response, items_list):
        self.assertEqual(response.content,
                         '{"items": ' + json.dumps(items_list) + "}")

    @staticmethod
    def mock_rest_request(**args):
        mock_args = {
            'user.is_authenticated.return_value': True,
            'is_ajax.return_value': True,
            'policy.check.return_value': True,
            'body': ''
        }
        mock_args.update(args)
        return mock.Mock(**mock_args)


class BaseAdminViewTests(TestCase):
    """Sets an active user with the "admin" role.

    For testing admin-only views and functionality.
    """
    def setActiveUser(self, *args, **kwargs):
        if "roles" not in kwargs:
            kwargs['roles'] = [self.roles.admin._info]
        super(BaseAdminViewTests, self).setActiveUser(*args, **kwargs)

    def setSessionValues(self, **kwargs):
        settings.SESSION_ENGINE = 'django.contrib.sessions.backends.file'
        engine = import_module(settings.SESSION_ENGINE)
        store = engine.SessionStore()
        for key in kwargs:
            store[key] = kwargs[key]
            self.request.session[key] = kwargs[key]
        store.save()
        self.session = store
        self.client.cookies[settings.SESSION_COOKIE_NAME] = store.session_key


class APITestCase(TestCase):
    """Testing APIs.

    For use with tests which deal with the underlying clients rather than
    stubbing out the openstack_dashboard.api.* methods.
    """
    def setUp(self):
        super(APITestCase, self).setUp()
        utils.patch_middleware_get_user()

        def fake_keystoneclient(request, admin=False):
            """Returns the stub keystoneclient.

            Only necessary because the function takes too many arguments to
            conveniently be a lambda.
            """
            return self.stub_keystoneclient()

        # Store the original clients
        self._original_glanceclient = api.glance.glanceclient
        self._original_keystoneclient = api.keystone.keystoneclient
        self._original_novaclient = api.nova.novaclient
        self._original_neutronclient = api.neutron.neutronclient
        self._original_cinderclient = api.cinder.cinderclient
        self._original_heatclient = api.heat.heatclient
        self._original_ceilometerclient = api.ceilometer.ceilometerclient
        self._original_troveclient = api.trove.troveclient
        self._original_saharaclient = api.sahara.client

        # Replace the clients with our stubs.
        api.glance.glanceclient = lambda request: self.stub_glanceclient()
        api.keystone.keystoneclient = fake_keystoneclient
        api.nova.novaclient = lambda request: self.stub_novaclient()
        api.neutron.neutronclient = lambda request: self.stub_neutronclient()
        api.cinder.cinderclient = lambda request: self.stub_cinderclient()
        api.heat.heatclient = (lambda request, password=None:
                               self.stub_heatclient())
        api.ceilometer.ceilometerclient = (lambda request:
                                           self.stub_ceilometerclient())
        api.trove.troveclient = lambda request: self.stub_troveclient()
        api.sahara.client = lambda request: self.stub_saharaclient()

    def tearDown(self):
        super(APITestCase, self).tearDown()
        api.glance.glanceclient = self._original_glanceclient
        api.nova.novaclient = self._original_novaclient
        api.keystone.keystoneclient = self._original_keystoneclient
        api.neutron.neutronclient = self._original_neutronclient
        api.cinder.cinderclient = self._original_cinderclient
        api.heat.heatclient = self._original_heatclient
        api.ceilometer.ceilometerclient = self._original_ceilometerclient
        api.trove.troveclient = self._original_troveclient
        api.sahara.client = self._original_saharaclient

    def stub_novaclient(self):
        if not hasattr(self, "novaclient"):
            self.mox.StubOutWithMock(nova_client, 'Client')
            self.novaclient = self.mox.CreateMock(nova_client.Client)
        return self.novaclient

    def stub_cinderclient(self):
        if not hasattr(self, "cinderclient"):
            self.mox.StubOutWithMock(cinder_client, 'Client')
            self.cinderclient = self.mox.CreateMock(cinder_client.Client)
        return self.cinderclient

    def stub_keystoneclient(self):
        if not hasattr(self, "keystoneclient"):
            self.mox.StubOutWithMock(keystone_client, 'Client')
            # NOTE(saschpe): Mock properties, MockObject.__init__ ignores them:
            keystone_client.Client.auth_token = 'foo'
            keystone_client.Client.service_catalog = None
            keystone_client.Client.tenant_id = '1'
            keystone_client.Client.tenant_name = 'tenant_1'
            keystone_client.Client.management_url = ""
            keystone_client.Client.__dir__ = lambda: []
            self.keystoneclient = self.mox.CreateMock(keystone_client.Client)
        return self.keystoneclient

    def stub_glanceclient(self):
        if not hasattr(self, "glanceclient"):
            self.mox.StubOutWithMock(glanceclient, 'Client')
            self.glanceclient = self.mox.CreateMock(glanceclient.Client)
        return self.glanceclient

    def stub_neutronclient(self):
        if not hasattr(self, "neutronclient"):
            self.mox.StubOutWithMock(neutron_client, 'Client')
            self.neutronclient = self.mox.CreateMock(neutron_client.Client)
        return self.neutronclient

    def stub_swiftclient(self, expected_calls=1):
        if not hasattr(self, "swiftclient"):
            self.mox.StubOutWithMock(swift_client, 'Connection')
            self.swiftclient = self.mox.CreateMock(swift_client.Connection)
            while expected_calls:
                swift_client.Connection(None,
                                        mox.IgnoreArg(),
                                        None,
                                        preauthtoken=mox.IgnoreArg(),
                                        preauthurl=mox.IgnoreArg(),
                                        cacert=None,
                                        insecure=False,
                                        auth_version="2.0") \
                            .AndReturn(self.swiftclient)
                expected_calls -= 1
        return self.swiftclient

    def stub_heatclient(self):
        if not hasattr(self, "heatclient"):
            self.mox.StubOutWithMock(heat_client, 'Client')
            self.heatclient = self.mox.CreateMock(heat_client.Client)
        return self.heatclient

    def stub_ceilometerclient(self):
        if not hasattr(self, "ceilometerclient"):
            self.mox.StubOutWithMock(ceilometer_client, 'Client')
            self.ceilometerclient = self.mox.\
                CreateMock(ceilometer_client.Client)
        return self.ceilometerclient

    def stub_troveclient(self):
        if not hasattr(self, "troveclient"):
            self.mox.StubOutWithMock(trove_client, 'Client')
            self.troveclient = self.mox.CreateMock(trove_client.Client)
        return self.troveclient

    def stub_saharaclient(self):
        if not hasattr(self, "saharaclient"):
            self.mox.StubOutWithMock(sahara_client, 'Client')
            self.saharaclient = self.mox.CreateMock(sahara_client.Client)
        return self.saharaclient


@unittest.skipUnless(os.environ.get('WITH_SELENIUM', False),
                     "The WITH_SELENIUM env variable is not set.")
class SeleniumTestCase(horizon_helpers.SeleniumTestCase):

    def setUp(self):
        super(SeleniumTestCase, self).setUp()

        test_utils.load_test_data(self)
        self.mox = mox.Mox()

        self._real_get_user = utils.get_user
        self.setActiveUser(id=self.user.id,
                           token=self.token,
                           username=self.user.name,
                           tenant_id=self.tenant.id,
                           service_catalog=self.service_catalog,
                           authorized_tenants=self.tenants.list())
        self.patchers = {}
        self.patchers['aggregates'] = mock.patch(
            'openstack_dashboard.dashboards.admin'
            '.aggregates.panel.Aggregates.can_access',
            mock.Mock(return_value=True))
        self.patchers['aggregates'].start()
        os.environ["HORIZON_TEST_RUN"] = "True"

    def tearDown(self):
        self.mox.UnsetStubs()
        utils.get_user = self._real_get_user
        mock.patch.stopall()
        self.mox.VerifyAll()
        del os.environ["HORIZON_TEST_RUN"]

    def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
                      service_catalog=None, tenant_name=None, roles=None,
                      authorized_tenants=None, enabled=True):
        def get_user(request):
            return user.User(id=id,
                             token=token,
                             user=username,
                             tenant_id=tenant_id,
                             service_catalog=service_catalog,
                             roles=roles,
                             enabled=enabled,
                             authorized_tenants=authorized_tenants,
                             endpoint=settings.OPENSTACK_KEYSTONE_URL)
        utils.get_user = get_user


class SeleniumAdminTestCase(SeleniumTestCase):
    """Version of AdminTestCase for Selenium.

    Sets an active user with the "admin" role for testing admin-only views and
    functionality.
    """
    def setActiveUser(self, *args, **kwargs):
        if "roles" not in kwargs:
            kwargs['roles'] = [self.roles.admin._info]
        super(SeleniumAdminTestCase, self).setActiveUser(*args, **kwargs)


def my_custom_sort(flavor):
    sort_order = {
        'm1.secret': 0,
        'm1.tiny': 1,
        'm1.massive': 2,
        'm1.metadata': 3,
    }
    return sort_order[flavor.name]


class PluginTestCase(TestCase):
    """Test case for testing plugin system of Horizon.

    For use with tests which deal with the pluggable dashboard and panel
    configuration, it takes care of backing up and restoring the Horizon
    configuration.
    """
    def setUp(self):
        super(PluginTestCase, self).setUp()
        self.old_horizon_config = conf.HORIZON_CONFIG
        conf.HORIZON_CONFIG = conf.LazySettings()
        base.Horizon._urls()
        # Trigger discovery, registration, and URLconf generation if it
        # hasn't happened yet.
        self.client.get("/")
        # Store our original dashboards
        self._discovered_dashboards = base.Horizon._registry.keys()
        # Gather up and store our original panels for each dashboard
        self._discovered_panels = {}
        for dash in self._discovered_dashboards:
            panels = base.Horizon._registry[dash]._registry.keys()
            self._discovered_panels[dash] = panels

    def tearDown(self):
        super(PluginTestCase, self).tearDown()
        conf.HORIZON_CONFIG = self.old_horizon_config
        # Destroy our singleton and re-create it.
        base.HorizonSite._instance = None
        del base.Horizon
        base.Horizon = base.HorizonSite()
        # Reload the convenience references to Horizon stored in __init__
        reload(import_module("horizon"))
        # Re-register our original dashboards and panels.
        # This is necessary because autodiscovery only works on the first
        # import, and calling reload introduces innumerable additional
        # problems. Manual re-registration is the only good way for testing.
        for dash in self._discovered_dashboards:
            base.Horizon.register(dash)
            for panel in self._discovered_panels[dash]:
                dash.register(panel)
        self._reload_urls()

    def _reload_urls(self):
        """CLeans up URLs.

        Clears out the URL caches, reloads the root urls module, and
        re-triggers the autodiscovery mechanism for Horizon. Allows URLs
        to be re-calculated after registering new dashboards. Useful
        only for testing and should never be used on a live site.
        """
        urlresolvers.clear_url_caches()
        reload(import_module(settings.ROOT_URLCONF))
        base.Horizon._urls()


class update_settings(django_test_utils.override_settings):
    """override_settings which allows override an item in dict.

    django original override_settings replaces a dict completely,
    however OpenStack dashboard setting has many dictionary configuration
    and there are test case where we want to override only one item in
    a dictionary and keep other items in the dictionary.
    This version of override_settings allows this if keep_dict is True.

    If keep_dict False is specified, the original behavior of
    Django override_settings is used.
    """

    def __init__(self, keep_dict=True, **kwargs):
        if keep_dict:
            for key, new_value in kwargs.items():
                value = getattr(settings, key, None)
                if (isinstance(new_value, collections.Mapping) and
                        isinstance(value, collections.Mapping)):
                    copied = copy.copy(value)
                    copied.update(new_value)
                    kwargs[key] = copied
        super(update_settings, self).__init__(**kwargs)