summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-27 16:08:27 +0000
committerGerrit Code Review <review@openstack.org>2014-09-27 16:08:27 +0000
commitf36e7b83d6c27600cc99a432b3dc2242e9bfef4e (patch)
tree035165294ea316da8d406c24dba73797a704b256
parente3d6e576a4081317a92ede799ab55c96cf70e9a1 (diff)
parent2de5c0f66d938a320bb79093180bef3fb5983b4a (diff)
downloadtrove-f36e7b83d6c27600cc99a432b3dc2242e9bfef4e.tar.gz
Merge "Isolate unit tests from integration tests data"
-rw-r--r--trove/tests/unittests/instance/test_instance_status.py117
-rw-r--r--trove/tests/unittests/mgmt/test_models.py349
2 files changed, 234 insertions, 232 deletions
diff --git a/trove/tests/unittests/instance/test_instance_status.py b/trove/tests/unittests/instance/test_instance_status.py
index bca9299f..a563254f 100644
--- a/trove/tests/unittests/instance/test_instance_status.py
+++ b/trove/tests/unittests/instance/test_instance_status.py
@@ -13,12 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
#
+
+import uuid
+
from testtools import TestCase
from trove.common.instance import ServiceStatuses
+from trove.datastore import models
from trove.instance.models import InstanceStatus
from trove.instance.models import InstanceServiceStatus
from trove.instance.models import SimpleInstance
-from trove.tests.util import test_config
+from trove.tests.unittests.util import util
class FakeInstanceTask(object):
@@ -31,108 +35,105 @@ class FakeInstanceTask(object):
class FakeDBInstance(object):
def __init__(self):
- self.id = None
+ self.id = str(uuid.uuid4())
self.deleted = False
- self.datastore_version_id = test_config.dbaas_datastore_version_id
+ self.datastore_version_id = str(uuid.uuid4())
self.server_status = "ACTIVE"
self.task_status = FakeInstanceTask()
-class InstanceStatusTest(TestCase):
+class BaseInstanceStatusTestCase(TestCase):
def setUp(self):
- super(InstanceStatusTest, self).setUp()
+ util.init_db()
+ self.db_info = FakeDBInstance()
+ self.status = InstanceServiceStatus(
+ ServiceStatuses.RUNNING)
+ self.datastore = models.DBDatastore.create(
+ id=str(uuid.uuid4()),
+ name='mysql',
+ default_version_id=self.db_info.datastore_version_id
+ )
+ self.version = models.DBDatastoreVersion.create(
+ id=self.db_info.datastore_version_id,
+ datastore_id=self.datastore.id,
+ name='5.5',
+ manager='mysql',
+ image_id=str(uuid.uuid4()),
+ active=1,
+ packages="mysql-server-5.5"
+ )
+ super(BaseInstanceStatusTestCase, self).setUp()
def tearDown(self):
- super(InstanceStatusTest, self).tearDown()
+ self.datastore.delete()
+ self.version.delete()
+ super(BaseInstanceStatusTestCase, self).tearDown()
+
+
+class InstanceStatusTest(BaseInstanceStatusTestCase):
def test_task_status_error_reports_error(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.task_status.is_error = True
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.task_status.is_error = True
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.ERROR, instance.status)
def test_task_status_action_building_reports_build(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.task_status.action = "BUILDING"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.task_status.action = "BUILDING"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.BUILD, instance.status)
def test_task_status_action_rebooting_reports_reboot(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.task_status.action = "REBOOTING"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.task_status.action = "REBOOTING"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.REBOOT, instance.status)
def test_task_status_action_resizing_reports_resize(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.task_status.action = "RESIZING"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.task_status.action = "RESIZING"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.RESIZE, instance.status)
def test_task_status_action_deleting_reports_shutdown(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.task_status.action = "DELETING"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.task_status.action = "DELETING"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.SHUTDOWN, instance.status)
def test_nova_server_build_reports_build(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.server_status = "BUILD"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.server_status = "BUILD"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.BUILD, instance.status)
def test_nova_server_error_reports_error(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.server_status = "ERROR"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.server_status = "ERROR"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.ERROR, instance.status)
def test_nova_server_reboot_reports_reboot(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.server_status = "REBOOT"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.server_status = "REBOOT"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.REBOOT, instance.status)
def test_nova_server_resize_reports_resize(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.server_status = "RESIZE"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.server_status = "RESIZE"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.RESIZE, instance.status)
def test_nova_server_verify_resize_reports_resize(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_db_info.server_status = "VERIFY_RESIZE"
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.db_info.server_status = "VERIFY_RESIZE"
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.RESIZE, instance.status)
def test_service_status_paused_reports_reboot(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_status.set_status(ServiceStatuses.PAUSED)
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.status.set_status(ServiceStatuses.PAUSED)
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.REBOOT, instance.status)
def test_service_status_new_reports_build(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_status.set_status(ServiceStatuses.NEW)
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.status.set_status(ServiceStatuses.NEW)
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.BUILD, instance.status)
def test_service_status_running_reports_active(self):
- fake_db_info = FakeDBInstance()
- fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
- fake_status.set_status(ServiceStatuses.RUNNING)
- instance = SimpleInstance('dummy context', fake_db_info, fake_status)
+ self.status.set_status(ServiceStatuses.RUNNING)
+ instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.ACTIVE, instance.status)
diff --git a/trove/tests/unittests/mgmt/test_models.py b/trove/tests/unittests/mgmt/test_models.py
index 5d92b882..c2821c53 100644
--- a/trove/tests/unittests/mgmt/test_models.py
+++ b/trove/tests/unittests/mgmt/test_models.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
+import uuid
from mock import MagicMock, patch, ANY
from testtools import TestCase
from testtools.matchers import Equals, Is, Not
@@ -25,21 +26,40 @@ from trove.backup.models import Backup
from trove.common.context import TroveContext
from trove.common import instance as rd_instance
from trove.datastore import models as datastore_models
-from trove.db.models import DatabaseModelBase
from trove.instance.models import DBInstance
from trove.instance.models import InstanceServiceStatus
from trove.instance.tasks import InstanceTasks
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.openstack.common.notifier import api as notifier
from trove.common import remote
-from trove.tests.util import test_config
+from trove.tests.unittests.util import util
CONF = cfg.CONF
class MockMgmtInstanceTest(TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ util.init_db()
+ cls.version_id = str(uuid.uuid4())
+ cls.datastore = datastore_models.DBDatastore.create(
+ id=str(uuid.uuid4()),
+ name='mysql',
+ default_version_id=cls.version_id
+ )
+ cls.version = datastore_models.DBDatastoreVersion.create(
+ id=cls.version_id,
+ datastore_id=cls.datastore.id,
+ name='5.5',
+ manager='mysql',
+ image_id=str(uuid.uuid4()),
+ active=1,
+ packages="mysql-server-5.5"
+ )
+ super(MockMgmtInstanceTest, cls).setUpClass()
+
def setUp(self):
- super(MockMgmtInstanceTest, self).setUp()
self.context = TroveContext()
self.context.auth_token = 'some_secret_password'
self.client = MagicMock(spec=Client)
@@ -53,63 +73,59 @@ class MockMgmtInstanceTest(TestCase):
CONF.set_override('report_interval', 20)
CONF.set_override('notification_service_id', {'mysql': '123'})
- def tearDown(self):
- super(MockMgmtInstanceTest, self).tearDown()
+ super(MockMgmtInstanceTest, self).setUp()
- @staticmethod
- def build_db_instance(status, task_status=InstanceTasks.DELETING):
- return DBInstance(task_status,
- created='xyz',
- name='test_name',
- id='1',
- flavor_id='flavor_1',
- datastore_version_id=
- test_config.dbaas_datastore_version_id,
- compute_instance_id='compute_id_1',
- server_id='server_id_1',
- tenant_id='tenant_id_1',
- server_status=status)
+ def do_cleanup(self, instance, status):
+ instance.delete()
+ status.delete()
+
+ def build_db_instance(self, status, task_status=InstanceTasks.NONE):
+ version = datastore_models.DBDatastoreVersion.get_by(name='5.5')
+ instance = DBInstance(InstanceTasks.NONE,
+ name='test_name',
+ id=str(uuid.uuid4()),
+ flavor_id='flavor_1',
+ datastore_version_id=
+ version.id,
+ compute_instance_id='compute_id_1',
+ server_id='server_id_1',
+ tenant_id='tenant_id_1',
+ server_status=
+ rd_instance.ServiceStatuses.
+ BUILDING.api_status,
+ deleted=False)
+ instance.save()
+ service_status = InstanceServiceStatus(
+ rd_instance.ServiceStatuses.RUNNING,
+ id=str(uuid.uuid4()),
+ instance_id=instance.id,
+ )
+ service_status.save()
+ instance.set_task_status(task_status)
+ instance.server_status = status
+ instance.save()
+ return instance, service_status
class TestNotificationTransformer(MockMgmtInstanceTest):
+ @classmethod
+ def setUpClass(cls):
+ super(TestNotificationTransformer, cls).setUpClass()
+
def test_tranformer(self):
- transformer = mgmtmodels.NotificationTransformer(context=self.context)
status = rd_instance.ServiceStatuses.BUILDING.api_status
- db_instance = MockMgmtInstanceTest.build_db_instance(
+ instance, service_status = self.build_db_instance(
status, InstanceTasks.BUILDING)
-
- with patch.object(DatabaseModelBase, 'find_all',
- return_value=[db_instance]):
- stub_dsv_db_info = MagicMock(
- spec=datastore_models.DBDatastoreVersion)
- stub_dsv_db_info.id = "test_datastore_version"
- stub_dsv_db_info.datastore_id = "mysql_test_version"
- stub_dsv_db_info.name = "test_datastore_name"
- stub_dsv_db_info.image_id = "test_datastore_image_id"
- stub_dsv_db_info.packages = "test_datastore_pacakges"
- stub_dsv_db_info.active = 1
- stub_dsv_db_info.manager = "mysql"
- stub_datastore_version = datastore_models.DatastoreVersion(
- stub_dsv_db_info)
-
- def side_effect_func(*args, **kwargs):
- if 'instance_id' in kwargs:
- return InstanceServiceStatus(
- rd_instance.ServiceStatuses.BUILDING)
- else:
- return stub_datastore_version
-
- with patch.object(DatabaseModelBase, 'find_by',
- side_effect=side_effect_func):
- payloads = transformer()
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'],
- Not(Is(None)))
- self.assertThat(payload['audit_period_ending'], Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
+ payloads = mgmtmodels.NotificationTransformer(
+ context=self.context)()
+ self.assertIsNotNone(payloads)
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'], Not(Is(None)))
+ self.assertTrue(status.lower() in [db['state'] for db in payloads])
+ self.addCleanup(self.do_cleanup, instance, service_status)
def test_get_service_id(self):
id_map = {
@@ -131,6 +147,11 @@ class TestNotificationTransformer(MockMgmtInstanceTest):
class TestNovaNotificationTransformer(MockMgmtInstanceTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNovaNotificationTransformer, cls).setUpClass()
+
def test_transformer_cache(self):
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
@@ -145,9 +166,9 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
def test_lookup_flavor(self):
flavor = MagicMock(spec=Flavor)
flavor.name = 'flav_1'
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
with patch.object(self.flavor_mgr, 'get', side_effect=[flavor, None]):
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
self.assertThat(transformer._lookup_flavor('1'),
Equals(flavor.name))
self.assertThat(transformer._lookup_flavor('2'),
@@ -155,188 +176,160 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
def test_tranformer(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
- db_instance = MockMgmtInstanceTest.build_db_instance(
- status, task_status=InstanceTasks.BUILDING)
-
- stub_dsv_db_info = MagicMock(spec=datastore_models.DBDatastoreVersion)
- stub_dsv_db_info.id = "test_datastore_version"
- stub_dsv_db_info.datastore_id = "mysql_test_version"
- stub_dsv_db_info.name = "test_datastore_name"
- stub_dsv_db_info.image_id = "test_datastore_image_id"
- stub_dsv_db_info.packages = "test_datastore_pacakges"
- stub_dsv_db_info.active = 1
- stub_dsv_db_info.manager = "mysql"
- stub_datastore_version = datastore_models.DatastoreVersion(
- stub_dsv_db_info)
+ instance, service_status = self.build_db_instance(
+ status, InstanceTasks.BUILDING)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
+ instance,
server,
- None)
-
- with patch.object(DatabaseModelBase, 'find_by',
- return_value=stub_datastore_version):
+ service_status)
- with patch.object(mgmtmodels, 'load_mgmt_instances',
- return_value=[mgmt_instance]):
-
- with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- payloads = transformer()
+ payloads = transformer()
- # assertions
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'],
- Not(Is(None)))
- self.assertThat(payload['audit_period_ending'],
- Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
- self.assertThat(payload['instance_type'],
- Equals('db.small'))
- self.assertThat(payload['instance_type_id'],
- Equals('flavor_1'))
- self.assertThat(payload['user_id'], Equals('test_user_id'))
- self.assertThat(payload['service_id'], Equals('123'))
+ self.assertIsNotNone(payloads)
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'],
+ Not(Is(None)))
+ self.assertThat(payload['state'], Not(Is(None)))
+ self.assertThat(payload['instance_type'],
+ Equals('db.small'))
+ self.assertThat(payload['instance_type_id'],
+ Equals('flavor_1'))
+ self.assertThat(payload['user_id'], Equals('test_user_id'))
+ self.assertThat(payload['service_id'], Equals('123'))
+ self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_invalid_datastore_manager(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
- db_instance = MockMgmtInstanceTest.build_db_instance(
- status, task_status=InstanceTasks.BUILDING)
-
+ instance, service_status = self.build_db_instance(
+ status, InstanceTasks.BUILDING)
+ version = datastore_models.DBDatastoreVersion.get_by(
+ id=instance.datastore_version_id)
+ version.update(manager='something invalid')
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
- stub_datastore_version = MagicMock()
- stub_datastore_version.id = "stub_datastore_version"
- stub_datastore_version.manager = "m0ng0"
- stub_datastore = MagicMock()
- stub_datastore.default_datastore_version = "stub_datastore_version"
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- with patch.object(datastore_models.DatastoreVersion, 'load',
- return_value=stub_datastore_version):
- with patch.object(datastore_models.DatastoreVersion,
- 'load_by_uuid',
- return_value=stub_datastore_version):
- with patch.object(datastore_models.Datastore, 'load',
- return_value=stub_datastore):
- mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
- server,
- None)
- with patch.object(mgmtmodels, 'load_mgmt_instances',
- return_value=[mgmt_instance]):
- with patch.object(self.flavor_mgr,
- 'get', return_value=flavor):
-
- # invocation
- transformer = (
- mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- )
-
- payloads = transformer()
- # assertions
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'],
- Not(Is(None)))
- self.assertThat(payload['audit_period_ending'],
- Not(Is(None)))
- self.assertThat(payload['state'],
- Equals(status.lower()))
- self.assertThat(payload['instance_type'],
- Equals('db.small'))
- self.assertThat(payload['instance_type_id'],
- Equals('flavor_1'))
- self.assertThat(payload['user_id'],
- Equals('test_user_id'))
- self.assertThat(payload['service_id'],
- Equals('unknown-service-id-error'))
+ mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
+ instance,
+ server,
+ service_status)
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr,
+ 'get', return_value=flavor):
+ payloads = transformer()
+ # assertions
+ self.assertIsNotNone(payloads)
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'],
+ Not(Is(None)))
+ self.assertIn(status.lower(),
+ [db['state']
+ for db in payloads])
+ self.assertThat(payload['instance_type'],
+ Equals('db.small'))
+ self.assertThat(payload['instance_type_id'],
+ Equals('flavor_1'))
+ self.assertThat(payload['user_id'],
+ Equals('test_user_id'))
+ self.assertThat(payload['service_id'],
+ Equals('unknown-service-id-error'))
+ version.update(manager='mysql')
+ self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_shutdown_instance(self):
status = rd_instance.ServiceStatuses.SHUTDOWN.api_status
- db_instance = self.build_db_instance(status)
-
+ instance, service_status = self.build_db_instance(status)
+ service_status.set_status(rd_instance.ServiceStatuses.SHUTDOWN)
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
+
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
+ instance,
server,
- None)
+ service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
-
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
with patch.object(Backup, 'running', return_value=None):
self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
payloads = transformer()
# assertion that SHUTDOWN instances are not reported
self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(0))
+ self.assertNotIn(status.lower(),
+ [db['status']
+ for db in payloads])
+ self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_no_nova_instance(self):
status = rd_instance.ServiceStatuses.SHUTDOWN.api_status
- db_instance = MockMgmtInstanceTest.build_db_instance(status)
-
+ instance, service_status = self.build_db_instance(status)
+ service_status.set_status(rd_instance.ServiceStatuses.SHUTDOWN)
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
+ instance,
None,
- None)
+ service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
-
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
with patch.object(Backup, 'running', return_value=None):
self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
payloads = transformer()
# assertion that SHUTDOWN instances are not reported
self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(0))
+ self.assertNotIn(status.lower(),
+ [db['status']
+ for db in payloads])
+ self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_flavor_cache(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
- db_instance = MockMgmtInstanceTest.build_db_instance(
+ instance, service_status = self.build_db_instance(
status, InstanceTasks.BUILDING)
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
+ instance,
server,
- None)
+ service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
-
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
+
transformer()
- # call twice ensure client.flavor invoked once
payloads = transformer()
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(1))
@@ -344,27 +337,34 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'], Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
+ self.assertIn(status.lower(),
+ [db['state']
+ for db in payloads])
self.assertThat(payload['instance_type'], Equals('db.small'))
self.assertThat(payload['instance_type_id'],
Equals('flavor_1'))
self.assertThat(payload['user_id'], Equals('test_user_id'))
# ensure cache was used to get flavor second time
self.flavor_mgr.get.assert_any_call('flavor_1')
+ self.addCleanup(self.do_cleanup, instance, service_status)
class TestMgmtInstanceTasks(MockMgmtInstanceTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestMgmtInstanceTasks, cls).setUpClass()
+
def test_public_exists_events(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
- db_instance = MockMgmtInstanceTest.build_db_instance(
+ instance, service_status = self.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
-
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
+ instance,
server,
- None)
+ service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
@@ -387,3 +387,4 @@ class TestMgmtInstanceTasks(MockMgmtInstanceTest):
'INFO',
ANY)
self.assertThat(self.context.auth_token, Is(None))
+ self.addCleanup(self.do_cleanup, instance, service_status)