diff options
Diffstat (limited to 'openstackclient/tests')
19 files changed, 2244 insertions, 296 deletions
diff --git a/openstackclient/tests/functional/compute/v2/test_server_group.py b/openstackclient/tests/functional/compute/v2/test_server_group.py index 3dff3dcd..daeecd2d 100644 --- a/openstackclient/tests/functional/compute/v2/test_server_group.py +++ b/openstackclient/tests/functional/compute/v2/test_server_group.py @@ -33,8 +33,8 @@ class ServerGroupTests(base.TestCase): cmd_output['name'] ) self.assertEqual( - ['affinity'], - cmd_output['policies'] + 'affinity', + cmd_output['policy'] ) cmd_output = json.loads(self.openstack( @@ -47,8 +47,8 @@ class ServerGroupTests(base.TestCase): cmd_output['name'] ) self.assertEqual( - ['anti-affinity'], - cmd_output['policies'] + 'anti-affinity', + cmd_output['policy'] ) del_output = self.openstack( @@ -60,7 +60,7 @@ class ServerGroupTests(base.TestCase): name1 = uuid.uuid4().hex name2 = uuid.uuid4().hex - # test server gorup show + # test server group show cmd_output = json.loads(self.openstack( 'server group create -f json ' + '--policy affinity ' + @@ -74,8 +74,8 @@ class ServerGroupTests(base.TestCase): cmd_output['name'] ) self.assertEqual( - ['affinity'], - cmd_output['policies'] + 'affinity', + cmd_output['policy'] ) cmd_output = json.loads(self.openstack( @@ -91,8 +91,8 @@ class ServerGroupTests(base.TestCase): cmd_output['name'] ) self.assertEqual( - ['anti-affinity'], - cmd_output['policies'] + 'anti-affinity', + cmd_output['policy'] ) # test server group list @@ -101,6 +101,6 @@ class ServerGroupTests(base.TestCase): names = [x["Name"] for x in cmd_output] self.assertIn(name1, names) self.assertIn(name2, names) - policies = [x["Policies"] for x in cmd_output] - self.assertIn(['affinity'], policies) - self.assertIn(['anti-affinity'], policies) + policies = [x["Policy"] for x in cmd_output] + self.assertIn('affinity', policies) + self.assertIn('anti-affinity', policies) diff --git a/openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py b/openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py new file mode 100644 index 00000000..a10aef6b --- /dev/null +++ b/openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py @@ -0,0 +1,201 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import json + +from openstackclient.tests.functional.network.v2 import common + + +class L3NDPProxyTests(common.NetworkTests): + + def setUp(self): + super().setUp() + # Nothing in this class works with Nova Network + if not self.haz_network: + self.skipTest("No Network service present") + if not self.is_extension_enabled('l3-ndp-proxy'): + self.skipTest("No l3-ndp-proxy extension present") + + self.ROT_NAME = self.getUniqueString() + self.EXT_NET_NAME = self.getUniqueString() + self.EXT_SUB_NAME = self.getUniqueString() + self.INT_NET_NAME = self.getUniqueString() + self.INT_SUB_NAME = self.getUniqueString() + self.INT_PORT_NAME = self.getUniqueString() + self.ADDR_SCOPE_NAME = self.getUniqueString() + self.SUBNET_P_NAME = self.getUniqueString() + self.created_ndp_proxies = [] + + json_output = json.loads( + self.openstack( + 'address scope create -f json --ip-version 6 ' + '%(address_s_name)s' % { + 'address_s_name': self.ADDR_SCOPE_NAME})) + self.assertIsNotNone(json_output['id']) + self.ADDRESS_SCOPE_ID = json_output['id'] + json_output = json.loads( + self.openstack( + 'subnet pool create -f json %(subnet_p_name)s ' + '--address-scope %(address_scope)s ' + '--pool-prefix 2001:db8::/96 --default-prefix-length 112' % { + 'subnet_p_name': self.SUBNET_P_NAME, + 'address_scope': self.ADDRESS_SCOPE_ID})) + self.assertIsNotNone(json_output['id']) + self.SUBNET_POOL_ID = json_output['id'] + json_output = json.loads( + self.openstack('network create -f json ' + '--external ' + self.EXT_NET_NAME)) + self.assertIsNotNone(json_output['id']) + self.EXT_NET_ID = json_output['id'] + json_output = json.loads( + self.openstack( + 'subnet create -f json --ip-version 6 --subnet-pool ' + '%(subnet_pool)s --network %(net_id)s %(sub_name)s' % { + 'subnet_pool': self.SUBNET_POOL_ID, + 'net_id': self.EXT_NET_ID, + 'sub_name': self.EXT_SUB_NAME})) + self.assertIsNotNone(json_output['id']) + self.EXT_SUB_ID = json_output['id'] + json_output = json.loads( + self.openstack('router create -f json ' + self.ROT_NAME)) + self.assertIsNotNone(json_output['id']) + self.ROT_ID = json_output['id'] + output = self.openstack( + 'router set %(router_id)s --external-gateway %(net_id)s' % { + 'router_id': self.ROT_ID, + 'net_id': self.EXT_NET_ID}) + self.assertEqual('', output) + output = self.openstack('router set --enable-ndp-proxy ' + self.ROT_ID) + self.assertEqual('', output) + json_output = json.loads( + self.openstack( + 'router show -f json -c enable_ndp_proxy ' + self.ROT_ID)) + self.assertTrue(json_output['enable_ndp_proxy']) + json_output = json.loads( + self.openstack('network create -f json ' + self.INT_NET_NAME)) + self.assertIsNotNone(json_output['id']) + self.INT_NET_ID = json_output['id'] + json_output = json.loads( + self.openstack( + 'subnet create -f json --ip-version 6 --subnet-pool ' + '%(subnet_pool)s --network %(net_id)s %(sub_name)s' % { + 'subnet_pool': self.SUBNET_POOL_ID, + 'net_id': self.INT_NET_ID, + 'sub_name': self.INT_SUB_NAME})) + self.assertIsNotNone(json_output['id']) + self.INT_SUB_ID = json_output['id'] + json_output = json.loads( + self.openstack( + 'port create -f json --network %(net_id)s ' + '%(port_name)s' % { + 'net_id': self.INT_NET_ID, + 'port_name': self.INT_PORT_NAME})) + self.assertIsNotNone(json_output['id']) + self.INT_PORT_ID = json_output['id'] + self.INT_PORT_ADDRESS = json_output['fixed_ips'][0]['ip_address'] + output = self.openstack( + 'router add subnet ' + self.ROT_ID + ' ' + self.INT_SUB_ID) + self.assertEqual('', output) + + def tearDown(self): + for ndp_proxy in self.created_ndp_proxies: + output = self.openstack( + 'router ndp proxy delete ' + ndp_proxy['id']) + self.assertEqual('', output) + output = self.openstack('port delete ' + self.INT_PORT_ID) + self.assertEqual('', output) + output = self.openstack( + 'router set --disable-ndp-proxy ' + self.ROT_ID) + self.assertEqual('', output) + output = self.openstack( + 'router remove subnet ' + self.ROT_ID + ' ' + self.INT_SUB_ID) + self.assertEqual('', output) + output = self.openstack('subnet delete ' + self.INT_SUB_ID) + self.assertEqual('', output) + output = self.openstack('network delete ' + self.INT_NET_ID) + self.assertEqual('', output) + output = self.openstack( + 'router unset ' + self.ROT_ID + ' ' + '--external-gateway') + self.assertEqual('', output) + output = self.openstack('router delete ' + self.ROT_ID) + self.assertEqual('', output) + output = self.openstack('subnet delete ' + self.EXT_SUB_ID) + self.assertEqual('', output) + output = self.openstack('network delete ' + self.EXT_NET_ID) + self.assertEqual('', output) + output = self.openstack('subnet pool delete ' + self.SUBNET_POOL_ID) + self.assertEqual('', output) + output = self.openstack('address scope delete ' + + self.ADDRESS_SCOPE_ID) + self.assertEqual('', output) + super().tearDown() + + def _create_ndp_proxies(self, ndp_proxies): + for ndp_proxy in ndp_proxies: + output = json.loads( + self.openstack( + 'router ndp proxy create %(router)s --name %(name)s ' + '--port %(port)s --ip-address %(address)s -f json' % { + 'router': ndp_proxy['router_id'], + 'name': ndp_proxy['name'], + 'port': ndp_proxy['port_id'], + 'address': ndp_proxy['address']})) + self.assertEqual(ndp_proxy['router_id'], output['router_id']) + self.assertEqual(ndp_proxy['port_id'], output['port_id']) + self.assertEqual(ndp_proxy['address'], output['ip_address']) + self.created_ndp_proxies.append(output) + + def test_create_ndp_proxy(self): + ndp_proxies = [ + { + 'name': self.getUniqueString(), + 'router_id': self.ROT_ID, + 'port_id': self.INT_PORT_ID, + 'address': self.INT_PORT_ADDRESS + } + ] + self._create_ndp_proxies(ndp_proxies) + + def test_ndp_proxy_list(self): + ndp_proxies = { + 'name': self.getUniqueString(), + 'router_id': self.ROT_ID, + 'port_id': self.INT_PORT_ID, + 'address': self.INT_PORT_ADDRESS} + self._create_ndp_proxies([ndp_proxies]) + ndp_proxy = json.loads(self.openstack( + 'router ndp proxy list -f json'))[0] + self.assertEqual(ndp_proxies['name'], ndp_proxy['Name']) + self.assertEqual(ndp_proxies['router_id'], ndp_proxy['Router ID']) + self.assertEqual(ndp_proxies['address'], ndp_proxy['IP Address']) + + def test_ndp_proxy_set_and_show(self): + ndp_proxies = { + 'name': self.getUniqueString(), + 'router_id': self.ROT_ID, + 'port_id': self.INT_PORT_ID, + 'address': self.INT_PORT_ADDRESS} + description = 'balala' + self._create_ndp_proxies([ndp_proxies]) + ndp_proxy_id = self.created_ndp_proxies[0]['id'] + output = self.openstack( + 'router ndp proxy set --description %s %s' % ( + description, ndp_proxy_id)) + self.assertEqual('', output) + json_output = json.loads( + self.openstack('router ndp proxy show -f json ' + ndp_proxy_id)) + self.assertEqual(ndp_proxies['name'], json_output['name']) + self.assertEqual(ndp_proxies['router_id'], json_output['router_id']) + self.assertEqual(ndp_proxies['port_id'], json_output['port_id']) + self.assertEqual(ndp_proxies['address'], json_output['ip_address']) + self.assertEqual(description, json_output['description']) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py index 98e588e8..fd411b35 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py @@ -85,6 +85,73 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests): self.assertEqual(7500, cmd_output['min_kbps']) +class NetworkQosRuleTestsMinimumPacketRate(common.NetworkTests): + """Functional tests for QoS minimum packet rate rule""" + + def setUp(self): + super(NetworkQosRuleTestsMinimumPacketRate, self).setUp() + # Nothing in this class works with Nova Network + if not self.haz_network: + self.skipTest("No Network service present") + + self.QOS_POLICY_NAME = 'qos_policy_%s' % uuid.uuid4().hex + + self.openstack( + 'network qos policy create %s' % self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete %s' % self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + '--type minimum-packet-rate ' + '--min-kpps 2800 ' + '--egress %s' % + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete %s %s' % + (self.QOS_POLICY_NAME, self.RULE_ID)) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json %s' % policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete %s' % policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + '--type minimum-packet-rate ' + '--min-kpps 2800 ' + '--egress %s' % policy_name + )) + raw_output = self.openstack( + 'network qos rule delete %s %s' % + (policy_name, rule['id'])) + self.assertEqual('', raw_output) + + def test_qos_rule_list(self): + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json %s' % self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) + + def test_qos_rule_show(self): + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json %s %s' % + (self.QOS_POLICY_NAME, self.RULE_ID))) + self.assertEqual(self.RULE_ID, cmd_output['id']) + + def test_qos_rule_set(self): + self.openstack('network qos rule set --min-kpps 7500 --any %s %s' % + (self.QOS_POLICY_NAME, self.RULE_ID)) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json %s %s' % + (self.QOS_POLICY_NAME, self.RULE_ID))) + self.assertEqual(7500, cmd_output['min_kpps']) + self.assertEqual('any', cmd_output['direction']) + + class NetworkQosRuleTestsDSCPMarking(common.NetworkTests): """Functional tests for QoS DSCP marking rule""" diff --git a/openstackclient/tests/unit/common/test_configuration.py b/openstackclient/tests/unit/common/test_configuration.py index bdd3debf..148228ec 100644 --- a/openstackclient/tests/unit/common/test_configuration.py +++ b/openstackclient/tests/unit/common/test_configuration.py @@ -35,11 +35,14 @@ class TestConfiguration(utils.TestCommand): fakes.REGION_NAME, ) - opts = [mock.Mock(secret=True, dest="password"), - mock.Mock(secret=True, dest="token")] + opts = [ + mock.Mock(secret=True, dest="password"), + mock.Mock(secret=True, dest="token"), + ] - @mock.patch("keystoneauth1.loading.base.get_plugin_options", - return_value=opts) + @mock.patch( + "keystoneauth1.loading.base.get_plugin_options", return_value=opts + ) def test_show(self, m_get_plugin_opts): arglist = [] verifylist = [('mask', True)] @@ -51,12 +54,14 @@ class TestConfiguration(utils.TestCommand): self.assertEqual(self.columns, columns) self.assertEqual(self.datalist, data) - @mock.patch("keystoneauth1.loading.base.get_plugin_options", - return_value=opts) + @mock.patch( + "keystoneauth1.loading.base.get_plugin_options", return_value=opts + ) def test_show_unmask(self, m_get_plugin_opts): arglist = ['--unmask'] verifylist = [('mask', False)] cmd = configuration.ShowConfiguration(self.app, None) + parsed_args = self.check_parser(cmd, arglist, verifylist) columns, data = cmd.take_action(parsed_args) @@ -71,15 +76,49 @@ class TestConfiguration(utils.TestCommand): ) self.assertEqual(datalist, data) - @mock.patch("keystoneauth1.loading.base.get_plugin_options", - return_value=opts) - def test_show_mask(self, m_get_plugin_opts): + @mock.patch( + "keystoneauth1.loading.base.get_plugin_options", return_value=opts + ) + def test_show_mask_with_cloud_config(self, m_get_plugin_opts): arglist = ['--mask'] verifylist = [('mask', True)] + self.app.client_manager.configuration_type = "cloud_config" cmd = configuration.ShowConfiguration(self.app, None) + parsed_args = self.check_parser(cmd, arglist, verifylist) columns, data = cmd.take_action(parsed_args) self.assertEqual(self.columns, columns) self.assertEqual(self.datalist, data) + + @mock.patch( + "keystoneauth1.loading.base.get_plugin_options", return_value=opts + ) + def test_show_mask_with_global_env(self, m_get_plugin_opts): + arglist = ['--mask'] + verifylist = [('mask', True)] + self.app.client_manager.configuration_type = "global_env" + column_list = ( + 'identity_api_version', + 'password', + 'region', + 'token', + 'username', + ) + datalist = ( + fakes.VERSION, + configuration.REDACTED, + fakes.REGION_NAME, + configuration.REDACTED, + fakes.USERNAME, + ) + + cmd = configuration.ShowConfiguration(self.app, None) + + parsed_args = self.check_parser(cmd, arglist, verifylist) + + columns, data = cmd.take_action(parsed_args) + + self.assertEqual(column_list, columns) + self.assertEqual(datalist, data) diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py index a1e7754a..d77797ab 100644 --- a/openstackclient/tests/unit/compute/v2/fakes.py +++ b/openstackclient/tests/unit/compute/v2/fakes.py @@ -21,6 +21,7 @@ import uuid from novaclient import api_versions from openstack.compute.v2 import flavor as _flavor from openstack.compute.v2 import server +from openstack.compute.v2 import server_group as _server_group from openstack.compute.v2 import server_interface as _server_interface from openstack.compute.v2 import service from openstack.compute.v2 import volume_attachment @@ -1290,72 +1291,6 @@ class FakeHost(object): return host_info -class FakeServerGroup(object): - """Fake one server group""" - - @staticmethod - def _create_one_server_group(attrs=None): - """Create a fake server group - - :param dict attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id and other attributes - """ - if attrs is None: - attrs = {} - - # Set default attributes. - server_group_info = { - 'id': 'server-group-id-' + uuid.uuid4().hex, - 'members': [], - 'metadata': {}, - 'name': 'server-group-name-' + uuid.uuid4().hex, - 'project_id': 'server-group-project-id-' + uuid.uuid4().hex, - 'user_id': 'server-group-user-id-' + uuid.uuid4().hex, - } - - # Overwrite default attributes. - server_group_info.update(attrs) - - server_group = fakes.FakeResource( - info=copy.deepcopy(server_group_info), - loaded=True) - return server_group - - @staticmethod - def create_one_server_group(attrs=None): - """Create a fake server group - - :param dict attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id and other attributes - """ - if attrs is None: - attrs = {} - attrs.setdefault('policies', ['policy1', 'policy2']) - return FakeServerGroup._create_one_server_group(attrs) - - -class FakeServerGroupV264(object): - """Fake one server group for API >= 2.64""" - - @staticmethod - def create_one_server_group(attrs=None): - """Create a fake server group - - :param dict attrs: - A dictionary with all attributes - :return: - A FakeResource object, with id and other attributes - """ - if attrs is None: - attrs = {} - attrs.setdefault('policy', 'policy1') - return FakeServerGroup._create_one_server_group(attrs) - - class FakeUsage(object): """Fake one or more usage.""" @@ -1860,6 +1795,34 @@ class FakeVolumeAttachment(object): return volume_attachments +def create_one_server_group(attrs=None): + """Create a fake server group + + :param dict attrs: + A dictionary with all attributes + :return: + A fake ServerGroup object, with id and other attributes + """ + if attrs is None: + attrs = {} + + # Set default attributes. + server_group_info = { + 'id': 'server-group-id-' + uuid.uuid4().hex, + 'member_ids': '', + 'metadata': {}, + 'name': 'server-group-name-' + uuid.uuid4().hex, + 'project_id': 'server-group-project-id-' + uuid.uuid4().hex, + 'user_id': 'server-group-user-id-' + uuid.uuid4().hex, + } + + # Overwrite default attributes. + server_group_info.update(attrs) + + server_group = _server_group.ServerGroup(**server_group_info) + return server_group + + def create_one_server_interface(attrs=None): """Create a fake SDK ServerInterface. diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py index 004f3a05..2e64e071 100644 --- a/openstackclient/tests/unit/compute/v2/test_server.py +++ b/openstackclient/tests/unit/compute/v2/test_server.py @@ -161,6 +161,10 @@ class TestServer(compute_fakes.TestComputev2): return volumes def run_method_with_servers(self, method_name, server_count): + # Starting with v2.91, the nova api needs to be call with a sentinel + # as availability_zone=None will unpin the server az. + _sentinel = object() + servers = self.setup_servers_mock(server_count) arglist = [] @@ -183,7 +187,11 @@ class TestServer(compute_fakes.TestComputev2): method.assert_called_with(reason=None) elif method_name == 'unshelve': version = self.app.client_manager.compute.api_version - if version >= api_versions.APIVersion('2.77'): + if version >= api_versions.APIVersion('2.91'): + method.assert_called_with(availability_zone=_sentinel, + host=None) + elif (version >= api_versions.APIVersion('2.77') and + version < api_versions.APIVersion('2.91')): method.assert_called_with(availability_zone=None) else: method.assert_called_with() @@ -4441,8 +4449,8 @@ class TestServerList(_TestServerList): columns, data = self.cmd.take_action(parsed_args) self.servers_mock.list.assert_called_with(**self.kwargs) - self.assertEqual(0, self.images_mock.list.call_count) - self.assertEqual(0, self.flavors_mock.list.call_count) + self.images_mock.assert_not_called() + self.flavors_mock.list.assert_not_called() self.assertEqual(self.columns, columns) self.assertEqual(self.data, tuple(data)) @@ -4465,7 +4473,8 @@ class TestServerList(_TestServerList): getattr(s, 'OS-EXT-AZ:availability_zone'), getattr(s, 'OS-EXT-SRV-ATTR:host'), s.Metadata, - ) for s in self.servers) + ) for s in self.servers + ) arglist = [ '--long', ] @@ -4477,6 +4486,11 @@ class TestServerList(_TestServerList): columns, data = self.cmd.take_action(parsed_args) self.servers_mock.list.assert_called_with(**self.kwargs) + image_ids = {s.image['id'] for s in self.servers if s.image} + self.images_mock.assert_called_once_with( + id=f'in:{",".join(image_ids)}', + ) + self.flavors_mock.list.assert_called_once_with(is_public=None) self.assertEqual(self.columns_long, columns) self.assertEqual(self.data, tuple(data)) @@ -4540,6 +4554,8 @@ class TestServerList(_TestServerList): columns, data = self.cmd.take_action(parsed_args) self.servers_mock.list.assert_called_with(**self.kwargs) + self.images_mock.assert_not_called() + self.flavors_mock.list.assert_not_called() self.assertEqual(self.columns, columns) self.assertEqual(self.data, tuple(data)) @@ -4568,6 +4584,8 @@ class TestServerList(_TestServerList): columns, data = self.cmd.take_action(parsed_args) self.servers_mock.list.assert_called_with(**self.kwargs) + self.images_mock.assert_not_called() + self.flavors_mock.list.assert_not_called() self.assertEqual(self.columns, columns) self.assertEqual(self.data, tuple(data)) @@ -4585,8 +4603,8 @@ class TestServerList(_TestServerList): columns, data = self.cmd.take_action(parsed_args) self.servers_mock.list.assert_called_with(**self.kwargs) - self.assertFalse(self.images_mock.list.call_count) - self.assertFalse(self.flavors_mock.list.call_count) + self.images_mock.assert_not_called() + self.flavors_mock.list.assert_not_called() self.get_image_mock.assert_called() self.flavors_mock.get.assert_called() @@ -4610,6 +4628,8 @@ class TestServerList(_TestServerList): self.search_opts['image'] = self.image.id self.servers_mock.list.assert_called_with(**self.kwargs) + self.images_mock.assert_not_called() + self.flavors_mock.list.assert_called_once() self.assertEqual(self.columns, columns) self.assertEqual(self.data, tuple(data)) @@ -4626,10 +4646,12 @@ class TestServerList(_TestServerList): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.flavors_mock.get.has_calls(self.flavor.id) + self.flavors_mock.get.assert_has_calls([mock.call(self.flavor.id)]) self.search_opts['flavor'] = self.flavor.id self.servers_mock.list.assert_called_with(**self.kwargs) + self.images_mock.assert_called_once() + self.flavors_mock.list.assert_not_called() self.assertEqual(self.columns, columns) self.assertEqual(self.data, tuple(data)) @@ -5093,8 +5115,8 @@ class TestServerListV273(_TestServerList): self.search_opts['locked'] = True self.servers_mock.list.assert_called_with(**self.kwargs) - self.assertItemsEqual(self.columns, columns) - self.assertItemsEqual(self.data, tuple(data)) + self.assertCountEqual(self.columns, columns) + self.assertCountEqual(self.data, tuple(data)) def test_server_list_with_unlocked_v273(self): @@ -5113,8 +5135,8 @@ class TestServerListV273(_TestServerList): self.search_opts['locked'] = False self.servers_mock.list.assert_called_with(**self.kwargs) - self.assertItemsEqual(self.columns, columns) - self.assertItemsEqual(self.data, tuple(data)) + self.assertCountEqual(self.columns, columns) + self.assertCountEqual(self.data, tuple(data)) def test_server_list_with_locked_and_unlocked(self): @@ -5154,8 +5176,8 @@ class TestServerListV273(_TestServerList): self.servers_mock.list.assert_called_with(**self.kwargs) - self.assertItemsEqual(self.columns, columns) - self.assertItemsEqual(self.data, tuple(data)) + self.assertCountEqual(self.columns, columns) + self.assertCountEqual(self.data, tuple(data)) @mock.patch.object(iso8601, 'parse_date', side_effect=iso8601.ParseError) def test_server_list_with_invalid_changes_before( @@ -5773,6 +5795,25 @@ class TestServerRebuild(TestServer): self.get_image_mock.assert_called_with(self.image.id) self.server.rebuild.assert_called_with(self.image, None) + def test_rebuild_with_volume_backed_server_no_image(self): + # the volume-backed server will have the image attribute set to an + # empty string, not null/None + self.server.image = '' + + arglist = [ + self.server.id, + ] + verifylist = [ + ('server', self.server.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args) + self.assertIn('The --image option is required', str(exc)) + def test_rebuild_with_name(self): name = 'test-server-xxx' arglist = [ @@ -6267,6 +6308,103 @@ class TestServerRebuild(TestServer): parsed_args) +class TestServerRebuildVolumeBacked(TestServer): + + def setUp(self): + super().setUp() + + self.new_image = image_fakes.create_one_image() + self.find_image_mock.return_value = self.new_image + + attrs = { + 'image': '', + 'networks': {}, + 'adminPass': 'passw0rd', + } + new_server = compute_fakes.FakeServer.create_one_server(attrs=attrs) + + # Fake the server to be rebuilt. The IDs of them should be the same. + attrs['id'] = new_server.id + methods = { + 'rebuild': new_server, + } + self.server = compute_fakes.FakeServer.create_one_server( + attrs=attrs, + methods=methods + ) + + # Return value for utils.find_resource for server. + self.servers_mock.get.return_value = self.server + + self.cmd = server.RebuildServer(self.app, None) + + def test_rebuild_with_reimage_boot_volume(self): + self.app.client_manager.compute.api_version = \ + api_versions.APIVersion('2.93') + + arglist = [ + self.server.id, + '--reimage-boot-volume', + '--image', self.new_image.id + ] + verifylist = [ + ('server', self.server.id), + ('reimage_boot_volume', True), + ('image', self.new_image.id) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(self.server.id) + self.server.rebuild.assert_called_with( + self.new_image, None) + + def test_rebuild_with_no_reimage_boot_volume(self): + self.app.client_manager.compute.api_version = \ + api_versions.APIVersion('2.93') + + arglist = [ + self.server.id, + '--no-reimage-boot-volume', + '--image', self.new_image.id + ] + verifylist = [ + ('server', self.server.id), + ('reimage_boot_volume', False), + ('image', self.new_image.id) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args) + self.assertIn('--reimage-boot-volume is required', str(exc)) + + def test_rebuild_with_reimage_boot_volume_pre_v293(self): + self.app.client_manager.compute.api_version = \ + api_versions.APIVersion('2.92') + + arglist = [ + self.server.id, + '--reimage-boot-volume', + '--image', self.new_image.id + ] + verifylist = [ + ('server', self.server.id), + ('reimage_boot_volume', True) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + exc = self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, + parsed_args) + self.assertIn( + '--os-compute-api-version 2.93 or greater is required', str(exc)) + + class TestEvacuateServer(TestServer): def setUp(self): @@ -8204,7 +8342,23 @@ class TestServerUnshelve(TestServer): def test_unshelve_multi_servers(self): self.run_method_with_servers('unshelve', 3) - def test_unshelve_with_specified_az(self): + def test_unshelve_v277(self): + self.app.client_manager.compute.api_version = \ + api_versions.APIVersion('2.77') + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [server.id] + verifylist = [('server', [server.id])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + server.unshelve.assert_called_with() + + def test_unshelve_with_specified_az_v277(self): self.app.client_manager.compute.api_version = \ api_versions.APIVersion('2.77') @@ -8248,6 +8402,157 @@ class TestServerUnshelve(TestServer): self.assertIn( '--os-compute-api-version 2.77 or greater is required', str(ex)) + def test_unshelve_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [server.id] + verifylist = [('server', [server.id])] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + server.unshelve.assert_called_with() + + def test_unshelve_with_specified_az_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [ + '--availability-zone', "foo-az", + server.id, + ] + verifylist = [ + ('availability_zone', "foo-az"), + ('server', [server.id]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + server.unshelve.assert_called_with(availability_zone="foo-az") + + def test_unshelve_with_specified_host_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [ + '--host', "server1", + server.id, + ] + verifylist = [ + ('host', "server1"), + ('server', [server.id]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + server.unshelve.assert_called_with(host="server1") + + def test_unshelve_with_unpin_az_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = ['--no-availability-zone', server.id] + verifylist = [ + ('no_availability_zone', True), + ('server', [server.id]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + server.unshelve.assert_called_with(availability_zone=None) + + def test_unshelve_with_specified_az_and_host_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [ + '--host', "server1", + '--availability-zone', "foo-az", + server.id, + ] + verifylist = [ + ('host', "server1"), + ('availability_zone', "foo-az"), + ('server', [server.id]) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + + def test_unshelve_with_unpin_az_and_host_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [ + '--host', "server1", + '--no-availability-zone', + server.id, + ] + verifylist = [ + ('host', "server1"), + ('no_availability_zone', True), + ('server', [server.id]) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.servers_mock.get.assert_called_with(server.id) + + def test_unshelve_fails_with_unpin_az_and_az_v291(self): + self.app.client_manager.compute.api_version = ( + api_versions.APIVersion('2.91')) + + server = compute_fakes.FakeServer.create_one_server( + attrs=self.attrs, methods=self.methods) + self.servers_mock.get.return_value = server + arglist = [ + '--availability-zone', "foo-az", + '--no-availability-zone', + server.id, + ] + verifylist = [ + ('availability_zone', "foo-az"), + ('no_availability_zone', True), + ('server', [server.id]) + ] + + ex = self.assertRaises(utils.ParserException, + self.check_parser, + self.cmd, arglist, verifylist) + self.assertIn('argument --no-availability-zone: not allowed ' + 'with argument --availability-zone', str(ex)) + @mock.patch.object(common_utils, 'wait_for_status', return_value=True) def test_unshelve_with_wait(self, mock_wait_for_status): server = compute_fakes.FakeServer.create_one_server( diff --git a/openstackclient/tests/unit/compute/v2/test_server_group.py b/openstackclient/tests/unit/compute/v2/test_server_group.py index 3ed19e27..655366a8 100644 --- a/openstackclient/tests/unit/compute/v2/test_server_group.py +++ b/openstackclient/tests/unit/compute/v2/test_server_group.py @@ -15,10 +15,9 @@ from unittest import mock -from novaclient import api_versions +from openstack import utils as sdk_utils from osc_lib.cli import format_columns from osc_lib import exceptions -from osc_lib import utils from openstackclient.compute.v2 import server_group from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes @@ -27,38 +26,7 @@ from openstackclient.tests.unit import utils as tests_utils class TestServerGroup(compute_fakes.TestComputev2): - fake_server_group = compute_fakes.FakeServerGroup.create_one_server_group() - - columns = ( - 'id', - 'members', - 'name', - 'policies', - 'project_id', - 'user_id', - ) - - data = ( - fake_server_group.id, - format_columns.ListColumn(fake_server_group.members), - fake_server_group.name, - format_columns.ListColumn(fake_server_group.policies), - fake_server_group.project_id, - fake_server_group.user_id, - ) - - def setUp(self): - super(TestServerGroup, self).setUp() - - # Get a shortcut to the ServerGroupsManager Mock - self.server_groups_mock = self.app.client_manager.compute.server_groups - self.server_groups_mock.reset_mock() - - -class TestServerGroupV264(TestServerGroup): - - fake_server_group = \ - compute_fakes.FakeServerGroupV264.create_one_server_group() + fake_server_group = compute_fakes.create_one_server_group() columns = ( 'id', @@ -66,31 +34,40 @@ class TestServerGroupV264(TestServerGroup): 'name', 'policy', 'project_id', + 'rules', 'user_id', ) data = ( fake_server_group.id, - format_columns.ListColumn(fake_server_group.members), + format_columns.ListColumn(fake_server_group.member_ids), fake_server_group.name, fake_server_group.policy, fake_server_group.project_id, + format_columns.DictColumn(fake_server_group.rules), fake_server_group.user_id, ) def setUp(self): - super(TestServerGroupV264, self).setUp() + super().setUp() + + # Create and get a shortcut to the compute client mock + self.app.client_manager.sdk_connection = mock.Mock() + self.sdk_client = self.app.client_manager.sdk_connection.compute + self.sdk_client.reset_mock() class TestServerGroupCreate(TestServerGroup): def setUp(self): - super(TestServerGroupCreate, self).setUp() + super().setUp() - self.server_groups_mock.create.return_value = self.fake_server_group + self.sdk_client.create_server_group.return_value = \ + self.fake_server_group self.cmd = server_group.CreateServerGroup(self.app, None) - def test_server_group_create(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_create(self, sm_mock): arglist = [ '--policy', 'anti-affinity', 'affinity_group', @@ -101,18 +78,16 @@ class TestServerGroupCreate(TestServerGroup): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.create.assert_called_once_with( + self.sdk_client.create_server_group.assert_called_once_with( name=parsed_args.name, - policies=[parsed_args.policy], + policy=parsed_args.policy, ) self.assertCountEqual(self.columns, columns) self.assertCountEqual(self.data, data) - def test_server_group_create_with_soft_policies(self): - self.app.client_manager.compute.api_version = api_versions.APIVersion( - '2.15') - + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_create_with_soft_policies(self, sm_mock): arglist = [ '--policy', 'soft-anti-affinity', 'affinity_group', @@ -123,18 +98,16 @@ class TestServerGroupCreate(TestServerGroup): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.create.assert_called_once_with( + self.sdk_client.create_server_group.assert_called_once_with( name=parsed_args.name, - policies=[parsed_args.policy], + policy=parsed_args.policy, ) self.assertCountEqual(self.columns, columns) self.assertCountEqual(self.data, data) - def test_server_group_create_with_soft_policies_pre_v215(self): - self.app.client_manager.compute.api_version = api_versions.APIVersion( - '2.14') - + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=False) + def test_server_group_create_with_soft_policies_pre_v215(self, sm_mock): arglist = [ '--policy', 'soft-anti-affinity', 'affinity_group', @@ -152,10 +125,8 @@ class TestServerGroupCreate(TestServerGroup): '--os-compute-api-version 2.15 or greater is required', str(ex)) - def test_server_group_create_with_rules(self): - self.app.client_manager.compute.api_version = api_versions.APIVersion( - '2.64') - + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_create_with_rules(self, sm_mock): arglist = [ '--policy', 'soft-anti-affinity', '--rule', 'max_server_per_host=2', @@ -168,19 +139,18 @@ class TestServerGroupCreate(TestServerGroup): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.create.assert_called_once_with( + self.sdk_client.create_server_group.assert_called_once_with( name=parsed_args.name, - policy=parsed_args.policy, # should be 'policy', not 'policies' + policy=parsed_args.policy, rules=parsed_args.rules, ) self.assertCountEqual(self.columns, columns) self.assertCountEqual(self.data, data) - def test_server_group_create_with_rules_pre_v264(self): - self.app.client_manager.compute.api_version = api_versions.APIVersion( - '2.63') - + @mock.patch.object( + sdk_utils, 'supports_microversion', side_effect=[True, False]) + def test_server_group_create_with_rules_pre_v264(self, sm_mock): arglist = [ '--policy', 'soft-anti-affinity', '--rule', 'max_server_per_host=2', @@ -205,9 +175,9 @@ class TestServerGroupCreate(TestServerGroup): class TestServerGroupDelete(TestServerGroup): def setUp(self): - super(TestServerGroupDelete, self).setUp() + super().setUp() - self.server_groups_mock.get.return_value = self.fake_server_group + self.sdk_client.find_server_group.return_value = self.fake_server_group self.cmd = server_group.DeleteServerGroup(self.app, None) def test_server_group_delete(self): @@ -219,8 +189,10 @@ class TestServerGroupDelete(TestServerGroup): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.server_groups_mock.get.assert_called_once_with('affinity_group') - self.server_groups_mock.delete.assert_called_once_with( + self.sdk_client.find_server_group.assert_called_once_with( + 'affinity_group' + ) + self.sdk_client.delete_server_group.assert_called_once_with( self.fake_server_group.id ) self.assertIsNone(result) @@ -235,13 +207,15 @@ class TestServerGroupDelete(TestServerGroup): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.server_groups_mock.get.assert_any_call('affinity_group') - self.server_groups_mock.get.assert_any_call('anti_affinity_group') - self.server_groups_mock.delete.assert_called_with( + self.sdk_client.find_server_group.assert_any_call('affinity_group') + self.sdk_client.find_server_group.assert_any_call( + 'anti_affinity_group' + ) + self.sdk_client.delete_server_group.assert_called_with( self.fake_server_group.id ) - self.assertEqual(2, self.server_groups_mock.get.call_count) - self.assertEqual(2, self.server_groups_mock.delete.call_count) + self.assertEqual(2, self.sdk_client.find_server_group.call_count) + self.assertEqual(2, self.sdk_client.delete_server_group.call_count) self.assertIsNone(result) def test_server_group_delete_no_input(self): @@ -262,25 +236,23 @@ class TestServerGroupDelete(TestServerGroup): ('server_group', ['affinity_group', 'anti_affinity_group']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) - find_mock_result = [self.fake_server_group, exceptions.CommandError] - with mock.patch.object(utils, 'find_resource', - side_effect=find_mock_result) as find_mock: - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 server groups failed to delete.', - str(e)) - - find_mock.assert_any_call(self.server_groups_mock, - 'affinity_group') - find_mock.assert_any_call(self.server_groups_mock, - 'anti_affinity_group') - - self.assertEqual(2, find_mock.call_count) - self.server_groups_mock.delete.assert_called_once_with( - self.fake_server_group.id - ) + + self.sdk_client.find_server_group.side_effect = [ + self.fake_server_group, exceptions.CommandError] + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 server groups failed to delete.', str(e)) + + self.sdk_client.find_server_group.assert_any_call('affinity_group') + self.sdk_client.find_server_group.assert_any_call( + 'anti_affinity_group' + ) + self.assertEqual(2, self.sdk_client.find_server_group.call_count) + self.sdk_client.delete_server_group.assert_called_once_with( + self.fake_server_group.id + ) class TestServerGroupList(TestServerGroup): @@ -300,28 +272,67 @@ class TestServerGroupList(TestServerGroup): 'User Id', ) + list_columns_v264 = ( + 'ID', + 'Name', + 'Policy', + ) + + list_columns_v264_long = ( + 'ID', + 'Name', + 'Policy', + 'Members', + 'Project Id', + 'User Id', + ) + list_data = (( TestServerGroup.fake_server_group.id, TestServerGroup.fake_server_group.name, - format_columns.ListColumn(TestServerGroup.fake_server_group.policies), + format_columns.ListColumn( + TestServerGroup.fake_server_group.policies + ), ),) list_data_long = (( TestServerGroup.fake_server_group.id, TestServerGroup.fake_server_group.name, - format_columns.ListColumn(TestServerGroup.fake_server_group.policies), - format_columns.ListColumn(TestServerGroup.fake_server_group.members), + format_columns.ListColumn( + TestServerGroup.fake_server_group.policies + ), + format_columns.ListColumn( + TestServerGroup.fake_server_group.member_ids + ), + TestServerGroup.fake_server_group.project_id, + TestServerGroup.fake_server_group.user_id, + ),) + + list_data_v264 = (( + TestServerGroup.fake_server_group.id, + TestServerGroup.fake_server_group.name, + TestServerGroup.fake_server_group.policy, + ),) + + list_data_v264_long = (( + TestServerGroup.fake_server_group.id, + TestServerGroup.fake_server_group.name, + TestServerGroup.fake_server_group.policy, + format_columns.ListColumn( + TestServerGroup.fake_server_group.member_ids + ), TestServerGroup.fake_server_group.project_id, TestServerGroup.fake_server_group.user_id, ),) def setUp(self): - super(TestServerGroupList, self).setUp() + super().setUp() - self.server_groups_mock.list.return_value = [self.fake_server_group] + self.sdk_client.server_groups.return_value = [self.fake_server_group] self.cmd = server_group.ListServerGroup(self.app, None) - def test_server_group_list(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=False) + def test_server_group_list(self, sm_mock): arglist = [] verifylist = [ ('all_projects', False), @@ -332,12 +343,13 @@ class TestServerGroupList(TestServerGroup): parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.list.assert_called_once_with() + self.sdk_client.server_groups.assert_called_once_with() self.assertCountEqual(self.list_columns, columns) self.assertCountEqual(self.list_data, tuple(data)) - def test_server_group_list_with_all_projects_and_long(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=False) + def test_server_group_list_with_all_projects_and_long(self, sm_mock): arglist = [ '--all-projects', '--long', @@ -350,13 +362,14 @@ class TestServerGroupList(TestServerGroup): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.list.assert_called_once_with( + self.sdk_client.server_groups.assert_called_once_with( all_projects=True) self.assertCountEqual(self.list_columns_long, columns) self.assertCountEqual(self.list_data_long, tuple(data)) - def test_server_group_list_with_limit(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_list_with_limit(self, sm_mock): arglist = [ '--limit', '1', ] @@ -370,9 +383,10 @@ class TestServerGroupList(TestServerGroup): parsed_args = self.check_parser(self.cmd, arglist, verifylist) self.cmd.take_action(parsed_args) - self.server_groups_mock.list.assert_called_once_with(limit=1) + self.sdk_client.server_groups.assert_called_once_with(limit=1) - def test_server_group_list_with_offset(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_list_with_offset(self, sm_mock): arglist = [ '--offset', '5', ] @@ -386,51 +400,10 @@ class TestServerGroupList(TestServerGroup): parsed_args = self.check_parser(self.cmd, arglist, verifylist) self.cmd.take_action(parsed_args) - self.server_groups_mock.list.assert_called_once_with(offset=5) - - -class TestServerGroupListV264(TestServerGroupV264): - - list_columns = ( - 'ID', - 'Name', - 'Policy', - ) + self.sdk_client.server_groups.assert_called_once_with(offset=5) - list_columns_long = ( - 'ID', - 'Name', - 'Policy', - 'Members', - 'Project Id', - 'User Id', - ) - - list_data = (( - TestServerGroupV264.fake_server_group.id, - TestServerGroupV264.fake_server_group.name, - TestServerGroupV264.fake_server_group.policy, - ),) - - list_data_long = (( - TestServerGroupV264.fake_server_group.id, - TestServerGroupV264.fake_server_group.name, - TestServerGroupV264.fake_server_group.policy, - format_columns.ListColumn( - TestServerGroupV264.fake_server_group.members), - TestServerGroupV264.fake_server_group.project_id, - TestServerGroupV264.fake_server_group.user_id, - ),) - - def setUp(self): - super(TestServerGroupListV264, self).setUp() - - self.server_groups_mock.list.return_value = [self.fake_server_group] - self.cmd = server_group.ListServerGroup(self.app, None) - self.app.client_manager.compute.api_version = api_versions.APIVersion( - '2.64') - - def test_server_group_list(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_list_v264(self, sm_mock): arglist = [] verifylist = [ ('all_projects', False), @@ -438,12 +411,13 @@ class TestServerGroupListV264(TestServerGroupV264): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.list.assert_called_once_with() + self.sdk_client.server_groups.assert_called_once_with() - self.assertCountEqual(self.list_columns, columns) - self.assertCountEqual(self.list_data, tuple(data)) + self.assertCountEqual(self.list_columns_v264, columns) + self.assertCountEqual(self.list_data_v264, tuple(data)) - def test_server_group_list_with_all_projects_and_long(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_list_with_all_projects_and_long_v264(self, sm_mock): arglist = [ '--all-projects', '--long', @@ -454,22 +428,23 @@ class TestServerGroupListV264(TestServerGroupV264): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) columns, data = self.cmd.take_action(parsed_args) - self.server_groups_mock.list.assert_called_once_with( + self.sdk_client.server_groups.assert_called_once_with( all_projects=True) - self.assertCountEqual(self.list_columns_long, columns) - self.assertCountEqual(self.list_data_long, tuple(data)) + self.assertCountEqual(self.list_columns_v264_long, columns) + self.assertCountEqual(self.list_data_v264_long, tuple(data)) class TestServerGroupShow(TestServerGroup): def setUp(self): - super(TestServerGroupShow, self).setUp() + super().setUp() - self.server_groups_mock.get.return_value = self.fake_server_group + self.sdk_client.find_server_group.return_value = self.fake_server_group self.cmd = server_group.ShowServerGroup(self.app, None) - def test_server_group_show(self): + @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True) + def test_server_group_show(self, sm_mock): arglist = [ 'affinity_group', ] diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py index 00e0c129..086c2466 100644 --- a/openstackclient/tests/unit/fakes.py +++ b/openstackclient/tests/unit/fakes.py @@ -11,7 +11,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# import json import sys @@ -49,21 +48,6 @@ TEST_RESPONSE_DICT_V3.set_project_scope() TEST_VERSIONS = fixture.DiscoveryList(href=AUTH_URL) -def to_unicode_dict(catalog_dict): - """Converts dict to unicode dict - - """ - if isinstance(catalog_dict, dict): - return {to_unicode_dict(key): to_unicode_dict(value) - for key, value in catalog_dict.items()} - elif isinstance(catalog_dict, list): - return [to_unicode_dict(element) for element in catalog_dict] - elif isinstance(catalog_dict, str): - return catalog_dict + u"" - else: - return catalog_dict - - class FakeStdout(object): def __init__(self): @@ -142,18 +126,30 @@ class FakeClientManager(object): self.network_endpoint_enabled = True self.compute_endpoint_enabled = True self.volume_endpoint_enabled = True + # The source of configuration. This is either 'cloud_config' (a + # clouds.yaml file) or 'global_env' ('OS_'-prefixed envvars) + self.configuration_type = 'cloud_config' def get_configuration(self): - return { - 'auth': { - 'username': USERNAME, - 'password': PASSWORD, - 'token': AUTH_TOKEN, - }, + + config = { 'region': REGION_NAME, 'identity_api_version': VERSION, } + if self.configuration_type == 'cloud_config': + config['auth'] = { + 'username': USERNAME, + 'password': PASSWORD, + 'token': AUTH_TOKEN, + } + elif self.configuration_type == 'global_env': + config['username'] = USERNAME + config['password'] = PASSWORD + config['token'] = AUTH_TOKEN + + return config + def is_network_endpoint_enabled(self): return self.network_endpoint_enabled diff --git a/openstackclient/tests/unit/identity/v3/test_identity_provider.py b/openstackclient/tests/unit/identity/v3/test_identity_provider.py index 1a9a7991..480bae59 100644 --- a/openstackclient/tests/unit/identity/v3/test_identity_provider.py +++ b/openstackclient/tests/unit/identity/v3/test_identity_provider.py @@ -15,9 +15,12 @@ import copy from unittest import mock +from osc_lib import exceptions + from openstackclient.identity.v3 import identity_provider from openstackclient.tests.unit import fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit import utils as test_utils class TestIdentityProvider(identity_fakes.TestFederatedIdentity): @@ -308,6 +311,86 @@ class TestIdentityProviderCreate(TestIdentityProvider): self.assertEqual(self.columns, columns) self.assertCountEqual(self.datalist, data) + def test_create_identity_provider_authttl_positive(self): + arglist = [ + '--authorization-ttl', '60', + identity_fakes.idp_id, + ] + verifylist = [ + ('identity_provider_id', identity_fakes.idp_id), + ('authorization_ttl', 60), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'remote_ids': None, + 'description': None, + 'domain_id': None, + 'enabled': True, + 'authorization_ttl': 60, + } + + self.identity_providers_mock.create.assert_called_with( + id=identity_fakes.idp_id, + **kwargs + ) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.datalist, data) + + def test_create_identity_provider_authttl_zero(self): + arglist = [ + '--authorization-ttl', '0', + identity_fakes.idp_id, + ] + verifylist = [ + ('identity_provider_id', identity_fakes.idp_id), + ('authorization_ttl', 0), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'remote_ids': None, + 'description': None, + 'domain_id': None, + 'enabled': True, + 'authorization_ttl': 0, + } + + self.identity_providers_mock.create.assert_called_with( + id=identity_fakes.idp_id, + **kwargs + ) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.datalist, data) + + def test_create_identity_provider_authttl_negative(self): + arglist = [ + '--authorization-ttl', '-60', + identity_fakes.idp_id, + ] + verifylist = [ + ('identity_provider_id', identity_fakes.idp_id), + ('authorization_ttl', -60), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_create_identity_provider_authttl_not_int(self): + arglist = [ + '--authorization-ttl', 'spam', + identity_fakes.idp_id, + ] + verifylist = [] + self.assertRaises(test_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + class TestIdentityProviderDelete(TestIdentityProvider): @@ -678,6 +761,93 @@ class TestIdentityProviderSet(TestIdentityProvider): self.cmd.take_action(parsed_args) + def test_identity_provider_set_authttl_positive(self): + def prepare(self): + """Prepare fake return objects before the test is executed""" + updated_idp = copy.deepcopy(identity_fakes.IDENTITY_PROVIDER) + updated_idp['authorization_ttl'] = 60 + resources = fakes.FakeResource( + None, + updated_idp, + loaded=True + ) + self.identity_providers_mock.update.return_value = resources + + prepare(self) + arglist = [ + '--authorization-ttl', '60', + identity_fakes.idp_id + ] + verifylist = [ + ('identity_provider', identity_fakes.idp_id), + ('enable', False), + ('disable', False), + ('remote_id', None), + ('authorization_ttl', 60), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + self.identity_providers_mock.update.assert_called_with( + identity_fakes.idp_id, + authorization_ttl=60, + ) + + def test_identity_provider_set_authttl_zero(self): + def prepare(self): + """Prepare fake return objects before the test is executed""" + updated_idp = copy.deepcopy(identity_fakes.IDENTITY_PROVIDER) + updated_idp['authorization_ttl'] = 0 + resources = fakes.FakeResource( + None, + updated_idp, + loaded=True + ) + self.identity_providers_mock.update.return_value = resources + + prepare(self) + arglist = [ + '--authorization-ttl', '0', + identity_fakes.idp_id + ] + verifylist = [ + ('identity_provider', identity_fakes.idp_id), + ('enable', False), + ('disable', False), + ('remote_id', None), + ('authorization_ttl', 0), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + self.identity_providers_mock.update.assert_called_with( + identity_fakes.idp_id, + authorization_ttl=0, + ) + + def test_identity_provider_set_authttl_negative(self): + arglist = [ + '--authorization-ttl', '-1', + identity_fakes.idp_id + ] + verifylist = [ + ('identity_provider', identity_fakes.idp_id), + ('enable', False), + ('disable', False), + ('remote_id', None), + ('authorization_ttl', -1), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_identity_provider_set_authttl_not_int(self): + arglist = [ + '--authorization-ttl', 'spam', + identity_fakes.idp_id + ] + verifylist = [] + self.assertRaises(test_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + class TestIdentityProviderShow(TestIdentityProvider): diff --git a/openstackclient/tests/unit/identity/v3/test_trust.py b/openstackclient/tests/unit/identity/v3/test_trust.py index d8cfc59f..d530adf5 100644 --- a/openstackclient/tests/unit/identity/v3/test_trust.py +++ b/openstackclient/tests/unit/identity/v3/test_trust.py @@ -206,7 +206,113 @@ class TestTrustList(TestTrust): # containing the data to be listed. columns, data = self.cmd.take_action(parsed_args) - self.trusts_mock.list.assert_called_with() + self.trusts_mock.list.assert_called_with( + trustor_user=None, + trustee_user=None, + ) + + collist = ('ID', 'Expires At', 'Impersonation', 'Project ID', + 'Trustee User ID', 'Trustor User ID') + self.assertEqual(collist, columns) + datalist = (( + identity_fakes.trust_id, + identity_fakes.trust_expires, + identity_fakes.trust_impersonation, + identity_fakes.project_id, + identity_fakes.user_id, + identity_fakes.user_id + ), ) + self.assertEqual(datalist, tuple(data)) + + def test_trust_list_auth_user(self): + auth_ref = self.app.client_manager.auth_ref = mock.Mock() + auth_ref.user_id.return_value = identity_fakes.user_id + + arglist = ['--auth-user'] + verifylist = [ + ('trustor', None), + ('trustee', None), + ('authuser', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + self.trusts_mock.list.assert_any_call( + trustor_user=self.users_mock.get() + ) + self.trusts_mock.list.assert_any_call( + trustee_user=self.users_mock.get() + ) + + collist = ('ID', 'Expires At', 'Impersonation', 'Project ID', + 'Trustee User ID', 'Trustor User ID') + self.assertEqual(collist, columns) + datalist = (( + identity_fakes.trust_id, + identity_fakes.trust_expires, + identity_fakes.trust_impersonation, + identity_fakes.project_id, + identity_fakes.user_id, + identity_fakes.user_id + ), ) + self.assertEqual(datalist, tuple(data)) + + def test_trust_list_trustee(self): + arglist = ['--trustee', identity_fakes.user_name] + verifylist = [ + ('trustor', None), + ('trustee', identity_fakes.user_name), + ('authuser', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + print(self.trusts_mock.list.call_args_list) + self.trusts_mock.list.assert_any_call( + trustee_user=self.users_mock.get(), + trustor_user=None, + ) + + collist = ('ID', 'Expires At', 'Impersonation', 'Project ID', + 'Trustee User ID', 'Trustor User ID') + self.assertEqual(collist, columns) + datalist = (( + identity_fakes.trust_id, + identity_fakes.trust_expires, + identity_fakes.trust_impersonation, + identity_fakes.project_id, + identity_fakes.user_id, + identity_fakes.user_id + ), ) + self.assertEqual(datalist, tuple(data)) + + def test_trust_list_trustor(self): + arglist = ['--trustor', identity_fakes.user_name] + verifylist = [ + ('trustee', None), + ('trustor', identity_fakes.user_name), + ('authuser', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class Lister in cliff, abstract method take_action() + # returns a tuple containing the column names and an iterable + # containing the data to be listed. + columns, data = self.cmd.take_action(parsed_args) + + print(self.trusts_mock.list.call_args_list) + self.trusts_mock.list.assert_any_call( + trustor_user=self.users_mock.get(), + trustee_user=None, + ) collist = ('ID', 'Expires At', 'Impersonation', 'Project ID', 'Trustee User ID', 'Trustor User ID') diff --git a/openstackclient/tests/unit/image/v2/fakes.py b/openstackclient/tests/unit/image/v2/fakes.py index a0eda6d2..f2015450 100644 --- a/openstackclient/tests/unit/image/v2/fakes.py +++ b/openstackclient/tests/unit/image/v2/fakes.py @@ -18,6 +18,7 @@ import uuid from openstack.image.v2 import image from openstack.image.v2 import member +from openstack.image.v2 import task from openstackclient.tests.unit import fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes @@ -44,10 +45,16 @@ class FakeImagev2Client: self.remove_tag = mock.Mock() + self.tasks = mock.Mock() + self.get_task = mock.Mock() + self.auth_token = kwargs['token'] self.management_url = kwargs['endpoint'] self.version = 2.0 + self.tasks = mock.Mock() + self.tasks.resource_class = fakes.FakeResource(None, {}) + class TestImagev2(utils.TestCommand): @@ -129,3 +136,69 @@ def create_one_image_member(attrs=None): image_member_info.update(attrs) return member.Member(**image_member_info) + + +def create_one_task(attrs=None): + """Create a fake task. + + :param attrs: A dictionary with all attributes of task + :type attrs: dict + :return: A fake Task object. + :rtype: `openstack.image.v2.task.Task` + """ + attrs = attrs or {} + + # Set default attribute + task_info = { + 'created_at': '2016-06-29T16:13:07Z', + 'expires_at': '2016-07-01T16:13:07Z', + 'id': str(uuid.uuid4()), + 'input': { + 'image_properties': { + 'container_format': 'ovf', + 'disk_format': 'vhd' + }, + 'import_from': 'https://apps.openstack.org/excellent-image', + 'import_from_format': 'qcow2' + }, + 'message': '', + 'owner': str(uuid.uuid4()), + 'result': { + 'image_id': str(uuid.uuid4()), + }, + 'schema': '/v2/schemas/task', + 'status': random.choice( + [ + 'pending', + 'processing', + 'success', + 'failure', + ] + ), + # though not documented, the API only allows 'import' + # https://github.com/openstack/glance/blob/24.0.0/glance/api/v2/tasks.py#L186-L190 + 'type': 'import', + 'updated_at': '2016-06-29T16:13:07Z', + } + + # Overwrite default attributes if there are some attributes set + task_info.update(attrs) + + return task.Task(**task_info) + + +def create_tasks(attrs=None, count=2): + """Create multiple fake tasks. + + :param attrs: A dictionary with all attributes of Task + :type attrs: dict + :param count: The number of tasks to be faked + :type count: int + :return: A list of fake Task objects + :rtype: list + """ + tasks = [] + for n in range(0, count): + tasks.append(create_one_task(attrs)) + + return tasks diff --git a/openstackclient/tests/unit/image/v2/test_image.py b/openstackclient/tests/unit/image/v2/test_image.py index 02f7b411..f2c11364 100644 --- a/openstackclient/tests/unit/image/v2/test_image.py +++ b/openstackclient/tests/unit/image/v2/test_image.py @@ -1090,7 +1090,7 @@ class TestImageSet(TestImage): self.assertIsNone(result) # we'll have called this but not set anything - self.app.client_manager.image.update_image.called_once_with( + self.app.client_manager.image.update_image.assert_called_once_with( self._image.id, ) diff --git a/openstackclient/tests/unit/image/v2/test_task.py b/openstackclient/tests/unit/image/v2/test_task.py new file mode 100644 index 00000000..e077e2b1 --- /dev/null +++ b/openstackclient/tests/unit/image/v2/test_task.py @@ -0,0 +1,187 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from osc_lib.cli import format_columns + +from openstackclient.image.v2 import task +from openstackclient.tests.unit.image.v2 import fakes as image_fakes + + +class TestTask(image_fakes.TestImagev2): + def setUp(self): + super().setUp() + + # Get shortcuts to mocked image client + self.client = self.app.client_manager.image + + +class TestTaskShow(TestTask): + + task = image_fakes.create_one_task() + + columns = ( + 'created_at', + 'expires_at', + 'id', + 'input', + 'message', + 'owner_id', + 'properties', + 'result', + 'status', + 'type', + 'updated_at', + ) + data = ( + task.created_at, + task.expires_at, + task.id, + task.input, + task.message, + task.owner_id, + format_columns.DictColumn({}), + task.result, + task.status, + task.type, + task.updated_at, + ) + + def setUp(self): + super().setUp() + + self.client.get_task.return_value = self.task + + # Get the command object to test + self.cmd = task.ShowTask(self.app, None) + + def test_task_show(self): + arglist = [self.task.id] + verifylist = [ + ('task', self.task.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # In base command class ShowOne in cliff, abstract method take_action() + # returns a two-part tuple with a tuple of column names and a tuple of + # data to be shown. + columns, data = self.cmd.take_action(parsed_args) + self.client.get_task.assert_called_with(self.task.id) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, data) + + +class TestTaskList(TestTask): + + tasks = image_fakes.create_tasks() + + columns = ( + 'ID', + 'Type', + 'Status', + 'Owner', + ) + datalist = [ + ( + task.id, + task.type, + task.status, + task.owner_id, + ) + for task in tasks + ] + + def setUp(self): + super().setUp() + + self.client.tasks.side_effect = [self.tasks, []] + + # Get the command object to test + self.cmd = task.ListTask(self.app, None) + + def test_task_list_no_options(self): + arglist = [] + verifylist = [ + ('sort_key', None), + ('sort_dir', None), + ('limit', None), + ('marker', None), + ('type', None), + ('status', None), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.client.tasks.assert_called_with() + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.datalist, data) + + def test_task_list_sort_key_option(self): + arglist = ['--sort-key', 'created_at'] + verifylist = [('sort_key', 'created_at')] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.client.tasks.assert_called_with( + sort_key=parsed_args.sort_key, + ) + + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.datalist, data) + + def test_task_list_sort_dir_option(self): + arglist = ['--sort-dir', 'desc'] + verifylist = [('sort_dir', 'desc')] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.client.tasks.assert_called_with( + sort_dir=parsed_args.sort_dir, + ) + + def test_task_list_pagination_options(self): + arglist = ['--limit', '1', '--marker', self.tasks[0].id] + verifylist = [('limit', 1), ('marker', self.tasks[0].id)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.client.tasks.assert_called_with( + limit=parsed_args.limit, + marker=parsed_args.marker, + ) + + def test_task_list_type_option(self): + arglist = ['--type', self.tasks[0].type] + verifylist = [('type', self.tasks[0].type)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.client.tasks.assert_called_with( + type=self.tasks[0].type, + ) + + def test_task_list_status_option(self): + arglist = ['--status', self.tasks[0].status] + verifylist = [('status', self.tasks[0].status)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + self.client.tasks.assert_called_with( + status=self.tasks[0].status, + ) diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py index 1a1a6beb..4d029a0e 100644 --- a/openstackclient/tests/unit/network/v2/fakes.py +++ b/openstackclient/tests/unit/network/v2/fakes.py @@ -26,6 +26,7 @@ from openstack.network.v2 import availability_zone as _availability_zone from openstack.network.v2 import flavor as _flavor from openstack.network.v2 import local_ip as _local_ip from openstack.network.v2 import local_ip_association as _local_ip_association +from openstack.network.v2 import ndp_proxy as _ndp_proxy from openstack.network.v2 import network as _network from openstack.network.v2 import network_ip_availability as _ip_availability from openstack.network.v2 import network_segment_range as _segment_range @@ -58,9 +59,11 @@ QUOTA = { RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit' RULE_TYPE_DSCP_MARKING = 'dscp-marking' RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth' +RULE_TYPE_MINIMUM_PACKET_RATE = 'minimum-packet-rate' VALID_QOS_RULES = [RULE_TYPE_BANDWIDTH_LIMIT, RULE_TYPE_DSCP_MARKING, - RULE_TYPE_MINIMUM_BANDWIDTH] + RULE_TYPE_MINIMUM_BANDWIDTH, + RULE_TYPE_MINIMUM_PACKET_RATE] VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 46, 48, 56] @@ -274,6 +277,9 @@ class FakeNetworkQosRule(object): elif type == RULE_TYPE_MINIMUM_BANDWIDTH: qos_rule_attrs['min_kbps'] = randint(1, 10000) qos_rule_attrs['direction'] = 'egress' + elif type == RULE_TYPE_MINIMUM_PACKET_RATE: + qos_rule_attrs['min_kpps'] = randint(1, 10000) + qos_rule_attrs['direction'] = 'egress' # Overwrite default attributes. qos_rule_attrs.update(attrs) @@ -2074,3 +2080,75 @@ def get_local_ip_associations(local_ip_associations=None, count=2): local_ip_associations = create_local_ip_associations(count) return mock.Mock(side_effect=local_ip_associations) + + +def create_one_ndp_proxy(attrs=None): + """Create a fake NDP proxy. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A FakeResource object with router_id, port_id, etc. + """ + attrs = attrs or {} + router_id = ( + attrs.get('router_id') or 'router-id-' + uuid.uuid4().hex + ) + port_id = ( + attrs.get('port_id') or 'port-id-' + uuid.uuid4().hex + ) + # Set default attributes. + np_attrs = { + 'id': uuid.uuid4().hex, + 'name': 'ndp-proxy-name-' + uuid.uuid4().hex, + 'router_id': router_id, + 'port_id': port_id, + 'ip_address': '2001::1:2', + 'description': 'ndp-proxy-description-' + uuid.uuid4().hex, + 'project_id': 'project-id-' + uuid.uuid4().hex, + 'location': 'MUNCHMUNCHMUNCH', + } + + # Overwrite default attributes. + np_attrs.update(attrs) + + return _ndp_proxy.NDPProxy(**np_attrs) + + +def create_ndp_proxies(attrs=None, count=2): + """Create multiple fake NDP proxies. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of NDP proxxy to fake + :return: + A list of FakeResource objects faking the NDP proxies + """ + ndp_proxies = [] + for i in range(0, count): + ndp_proxies.append( + create_one_ndp_proxy(attrs) + ) + return ndp_proxies + + +def get_ndp_proxies(ndp_proxies=None, count=2): + """Get a list of faked NDP proxies. + + If ndp_proxy list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List ndp_proxies: + A list of FakeResource objects faking ndp proxy + :param int count: + The number of ndp proxy to fake + :return: + An iterable Mock object with side_effect set to a list of faked + ndp proxy + """ + if ndp_proxies is None: + ndp_proxies = ( + create_ndp_proxies(count) + ) + return mock.Mock(side_effect=ndp_proxies) diff --git a/openstackclient/tests/unit/network/v2/test_local_ip.py b/openstackclient/tests/unit/network/v2/test_local_ip.py index 97e246df..be23365e 100644 --- a/openstackclient/tests/unit/network/v2/test_local_ip.py +++ b/openstackclient/tests/unit/network/v2/test_local_ip.py @@ -96,7 +96,7 @@ class TestCreateLocalIP(TestLocalIP): self.network.create_local_ip.assert_called_once_with(**{}) self.assertEqual(set(self.columns), set(columns)) - self.assertItemsEqual(self.data, data) + self.assertCountEqual(self.data, data) def test_create_all_options(self): arglist = [ @@ -130,7 +130,7 @@ class TestCreateLocalIP(TestLocalIP): 'ip_mode': self.new_local_ip.ip_mode, }) self.assertEqual(set(self.columns), set(columns)) - self.assertItemsEqual(self.data, data) + self.assertCountEqual(self.data, data) class TestDeleteLocalIP(TestLocalIP): @@ -263,7 +263,7 @@ class TestListLocalIP(TestLocalIP): self.network.local_ips.assert_called_once_with(**{}) self.assertEqual(self.columns, columns) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) def test_local_ip_list_name(self): arglist = [ @@ -278,7 +278,7 @@ class TestListLocalIP(TestLocalIP): self.network.local_ips.assert_called_once_with( **{'name': self.local_ips[0].name}) self.assertEqual(self.columns, columns) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) def test_local_ip_list_project(self): project = identity_fakes_v3.FakeProject.create_one_project() @@ -295,7 +295,7 @@ class TestListLocalIP(TestLocalIP): self.network.local_ips.assert_called_once_with( **{'project_id': project.id}) self.assertEqual(self.columns, columns) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) def test_local_ip_project_domain(self): project = identity_fakes_v3.FakeProject.create_one_project() @@ -314,7 +314,7 @@ class TestListLocalIP(TestLocalIP): self.network.local_ips.assert_called_once_with(**filters) self.assertEqual(self.columns, columns) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) def test_local_ip_list_network(self): arglist = [ @@ -477,4 +477,4 @@ class TestShowLocalIP(TestLocalIP): self.network.find_local_ip.assert_called_once_with( self._local_ip.name, ignore_missing=False) self.assertEqual(set(self.columns), set(columns)) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_ndp_proxy.py b/openstackclient/tests/unit/network/v2/test_ndp_proxy.py new file mode 100644 index 00000000..48c5deb2 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_ndp_proxy.py @@ -0,0 +1,454 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from unittest import mock +from unittest.mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import ndp_proxy +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestNDPProxy(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestNDPProxy, self).setUp() + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + # Get a shortcut to the DomainManager Mock + self.domains_mock = self.app.client_manager.identity.domains + # Get a shortcut to the network client + self.network = self.app.client_manager.network + self.router = network_fakes.FakeRouter.create_one_router( + {'id': 'fake-router-id'}) + self.network.find_router = mock.Mock(return_value=self.router) + self.port = network_fakes.create_one_port() + self.network.find_port = mock.Mock(return_value=self.port) + + +class TestCreateNDPProxy(TestNDPProxy): + def setUp(self): + super(TestCreateNDPProxy, self).setUp() + attrs = {'router_id': self.router.id, 'port_id': self.port.id} + self.ndp_proxy = ( + network_fakes.create_one_ndp_proxy( + attrs)) + self.columns = ( + 'created_at', + 'description', + 'id', + 'ip_address', + 'name', + 'port_id', + 'project_id', + 'revision_number', + 'router_id', + 'updated_at') + + self.data = ( + self.ndp_proxy.created_at, + self.ndp_proxy.description, + self.ndp_proxy.id, + self.ndp_proxy.ip_address, + self.ndp_proxy.name, + self.ndp_proxy.port_id, + self.ndp_proxy.project_id, + self.ndp_proxy.revision_number, + self.ndp_proxy.router_id, + self.ndp_proxy.updated_at + ) + self.network.create_ndp_proxy = mock.Mock( + return_value=self.ndp_proxy) + + # Get the command object to test + self.cmd = ndp_proxy.CreateNDPProxy(self.app, self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_all_options(self): + arglist = [ + self.ndp_proxy.router_id, + '--name', self.ndp_proxy.name, + '--port', self.ndp_proxy.port_id, + '--ip-address', self.ndp_proxy.ip_address, + '--description', self.ndp_proxy.description, + ] + verifylist = [ + ('name', self.ndp_proxy.name), + ('router', self.ndp_proxy.router_id), + ('port', self.ndp_proxy.port_id), + ('ip_address', self.ndp_proxy.ip_address), + ('description', self.ndp_proxy.description), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_ndp_proxy.assert_called_once_with( + **{'name': self.ndp_proxy.name, + 'router_id': self.ndp_proxy.router_id, + 'ip_address': self.ndp_proxy.ip_address, + 'port_id': self.ndp_proxy.port_id, + 'description': self.ndp_proxy.description}) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteNDPProxy(TestNDPProxy): + + def setUp(self): + super(TestDeleteNDPProxy, self).setUp() + attrs = {'router_id': self.router.id, 'port_id': self.port.id} + self.ndp_proxies = ( + network_fakes.create_ndp_proxies(attrs)) + self.ndp_proxy = self.ndp_proxies[0] + self.network.delete_ndp_proxy = mock.Mock( + return_value=None) + self.network.find_ndp_proxy = mock.Mock( + return_value=self.ndp_proxy) + + # Get the command object to test + self.cmd = ndp_proxy.DeleteNDPProxy(self.app, self.namespace) + + def test_delete(self): + arglist = [ + self.ndp_proxy.id + ] + verifylist = [ + ('ndp_proxy', [self.ndp_proxy.id]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.network.delete_ndp_proxy.assert_called_once_with(self.ndp_proxy) + self.assertIsNone(result) + + def test_delete_error(self): + arglist = [ + self.ndp_proxy.id, + ] + verifylist = [ + ('ndp_proxy', [self.ndp_proxy.id]) + ] + self.network.delete_ndp_proxy.side_effect = Exception( + 'Error message') + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises( + exceptions.CommandError, + self.cmd.take_action, parsed_args) + + def test_multi_ndp_proxies_delete(self): + arglist = [] + np_id = [] + + for a in self.ndp_proxies: + arglist.append(a.id) + np_id.append(a.id) + + verifylist = [ + ('ndp_proxy', np_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.network.delete_ndp_proxy.assert_has_calls( + [call(self.ndp_proxy), call(self.ndp_proxy)]) + self.assertIsNone(result) + + +class TestListNDPProxy(TestNDPProxy): + + def setUp(self): + super(TestListNDPProxy, self).setUp() + attrs = {'router_id': self.router.id, 'port_id': self.port.id} + ndp_proxies = ( + network_fakes.create_ndp_proxies(attrs, count=3)) + self.columns = ( + 'ID', + 'Name', + 'Router ID', + 'IP Address', + 'Project', + ) + self.data = [] + for np in ndp_proxies: + self.data.append(( + np.id, + np.name, + np.router_id, + np.ip_address, + np.project_id, + )) + + self.network.ndp_proxies = mock.Mock( + return_value=ndp_proxies) + + # Get the command object to test + self.cmd = ndp_proxy.ListNDPProxy(self.app, self.namespace) + + def test_ndp_proxy_list(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.ndp_proxies.assert_called_once_with() + self.assertEqual(self.columns, columns) + list_data = list(data) + self.assertEqual(len(self.data), len(list_data)) + for index in range(len(list_data)): + self.assertEqual(self.data[index], list_data[index]) + + def test_ndp_proxy_list_router(self): + arglist = [ + '--router', 'fake-router-name', + ] + + verifylist = [ + ('router', 'fake-router-name') + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ndp_proxies.assert_called_once_with(**{ + 'router_id': 'fake-router-id'}) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_ndp_proxy_list_port(self): + arglist = [ + '--port', self.port.id, + ] + + verifylist = [ + ('port', self.port.id) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ndp_proxies.assert_called_once_with(**{ + 'port_id': self.port.id}) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_ndp_proxy_list_name(self): + arglist = [ + '--name', 'fake-ndp-proxy-name', + ] + + verifylist = [ + ('name', 'fake-ndp-proxy-name') + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ndp_proxies.assert_called_once_with(**{ + 'name': 'fake-ndp-proxy-name'}) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_ndp_proxy_list_ip_address(self): + arglist = [ + '--ip-address', '2001::1:2', + ] + + verifylist = [ + ('ip_address', '2001::1:2') + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.ndp_proxies.assert_called_once_with(**{ + 'ip_address': '2001::1:2'}) + self.assertEqual(self.columns, columns) + self.assertCountEqual(self.data, list(data)) + + def test_ndp_proxy_list_project(self): + project = identity_fakes_v3.FakeProject.create_one_project() + self.projects_mock.get.return_value = project + arglist = [ + '--project', project.id, + ] + verifylist = [ + ('project', project.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.ndp_proxies.assert_called_once_with( + **{'project_id': project.id}) + self.assertEqual(self.columns, columns) + self.assertItemsEqual(self.data, list(data)) + + def test_ndp_proxy_list_project_domain(self): + project = identity_fakes_v3.FakeProject.create_one_project() + self.projects_mock.get.return_value = project + arglist = [ + '--project', project.id, + '--project-domain', project.domain_id, + ] + verifylist = [ + ('project', project.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + filters = {'project_id': project.id} + + self.network.ndp_proxies.assert_called_once_with(**filters) + self.assertEqual(self.columns, columns) + self.assertItemsEqual(self.data, list(data)) + + +class TestSetNDPProxy(TestNDPProxy): + + def setUp(self): + super(TestSetNDPProxy, self).setUp() + attrs = {'router_id': self.router.id, 'port_id': self.port.id} + self.ndp_proxy = ( + network_fakes.create_one_ndp_proxy(attrs)) + self.network.update_ndp_proxy = mock.Mock(return_value=None) + self.network.find_ndp_proxy = mock.Mock( + return_value=self.ndp_proxy) + + # Get the command object to test + self.cmd = ndp_proxy.SetNDPProxy(self.app, self.namespace) + + def test_set_nothing(self): + arglist = [ + self.ndp_proxy.id, + ] + verifylist = [ + ('ndp_proxy', self.ndp_proxy.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = (self.cmd.take_action(parsed_args)) + + self.network.update_ndp_proxy.assert_called_once_with( + self.ndp_proxy) + self.assertIsNone(result) + + def test_set_name(self): + arglist = [ + self.ndp_proxy.id, + '--name', 'fake-name', + ] + verifylist = [ + ('ndp_proxy', self.ndp_proxy.id), + ('name', 'fake-name'), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = (self.cmd.take_action(parsed_args)) + + self.network.update_ndp_proxy.assert_called_once_with( + self.ndp_proxy, name='fake-name') + self.assertIsNone(result) + + def test_set_description(self): + arglist = [ + self.ndp_proxy.id, + '--description', 'balala', + ] + verifylist = [ + ('ndp_proxy', self.ndp_proxy.id), + ('description', 'balala'), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = (self.cmd.take_action(parsed_args)) + + self.network.update_ndp_proxy.assert_called_once_with( + self.ndp_proxy, description='balala') + self.assertIsNone(result) + + +class TestShowNDPProxy(TestNDPProxy): + + def setUp(self): + super(TestShowNDPProxy, self).setUp() + attrs = {'router_id': self.router.id, 'port_id': self.port.id} + self.ndp_proxy = ( + network_fakes.create_one_ndp_proxy(attrs)) + + self.columns = ( + 'created_at', + 'description', + 'id', + 'ip_address', + 'name', + 'port_id', + 'project_id', + 'revision_number', + 'router_id', + 'updated_at') + + self.data = ( + self.ndp_proxy.created_at, + self.ndp_proxy.description, + self.ndp_proxy.id, + self.ndp_proxy.ip_address, + self.ndp_proxy.name, + self.ndp_proxy.port_id, + self.ndp_proxy.project_id, + self.ndp_proxy.revision_number, + self.ndp_proxy.router_id, + self.ndp_proxy.updated_at + ) + self.network.get_ndp_proxy = mock.Mock(return_value=self.ndp_proxy) + self.network.find_ndp_proxy = mock.Mock(return_value=self.ndp_proxy) + + # Get the command object to test + self.cmd = ndp_proxy.ShowNDPProxy(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_default_options(self): + arglist = [ + self.ndp_proxy.id, + ] + verifylist = [ + ('ndp_proxy', self.ndp_proxy.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.find_ndp_proxy.assert_called_once_with( + self.ndp_proxy.id, ignore_missing=False) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py index 217e481e..c7de8160 100644 --- a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py +++ b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py @@ -25,6 +25,7 @@ from openstackclient.tests.unit import utils as tests_utils RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit' RULE_TYPE_DSCP_MARKING = 'dscp-marking' RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth' +RULE_TYPE_MINIMUM_PACKET_RATE = 'minimum-packet-rate' DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 46, 48, 56] @@ -126,8 +127,101 @@ class TestCreateNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): try: self.cmd.take_action(parsed_args) except exceptions.CommandError as e: - msg = ('"Create" rule command for type "minimum-bandwidth" ' - 'requires arguments: direction, min_kbps') + msg = ('Failed to create Network QoS rule: "Create" rule command ' + 'for type "minimum-bandwidth" requires arguments: ' + 'direction, min_kbps') + self.assertEqual(msg, str(e)) + + +class TestCreateNetworkQosRuleMinimumPacketRate(TestNetworkQosRule): + + def test_check_type_parameters(self): + pass + + def setUp(self): + super(TestCreateNetworkQosRuleMinimumPacketRate, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_PACKET_RATE} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.columns = ( + 'direction', + 'id', + 'min_kpps', + 'project_id', + 'qos_policy_id', + 'type' + ) + + self.data = ( + self.new_rule.direction, + self.new_rule.id, + self.new_rule.min_kpps, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + self.network.create_qos_minimum_packet_rate_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.CreateNetworkQosRule(self.app, + self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + '--type', RULE_TYPE_MINIMUM_PACKET_RATE, + '--min-kpps', str(self.new_rule.min_kpps), + '--egress', + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_MINIMUM_PACKET_RATE), + ('min_kpps', self.new_rule.min_kpps), + ('egress', True), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_qos_minimum_packet_rate_rule.\ + assert_called_once_with( + self.qos_policy.id, + **{'min_kpps': self.new_rule.min_kpps, + 'direction': self.new_rule.direction}) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_wrong_options(self): + arglist = [ + '--type', RULE_TYPE_MINIMUM_PACKET_RATE, + '--min-kbps', '10000', + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_MINIMUM_PACKET_RATE), + ('min_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to create Network QoS rule: "Create" rule command ' + 'for type "minimum-packet-rate" requires arguments: ' + 'direction, min_kpps') self.assertEqual(msg, str(e)) @@ -212,8 +306,8 @@ class TestCreateNetworkQosRuleDSCPMarking(TestNetworkQosRule): try: self.cmd.take_action(parsed_args) except exceptions.CommandError as e: - msg = ('"Create" rule command for type "dscp-marking" ' - 'requires arguments: dscp_mark') + msg = ('Failed to create Network QoS rule: "Create" rule command ' + 'for type "dscp-marking" requires arguments: dscp_mark') self.assertEqual(msg, str(e)) @@ -351,8 +445,8 @@ class TestCreateNetworkQosRuleBandwidtLimit(TestNetworkQosRule): try: self.cmd.take_action(parsed_args) except exceptions.CommandError as e: - msg = ('"Create" rule command for type "bandwidth-limit" ' - 'requires arguments: max_kbps') + msg = ('Failed to create Network QoS rule: "Create" rule command ' + 'for type "bandwidth-limit" requires arguments: max_kbps') self.assertEqual(msg, str(e)) @@ -415,6 +509,65 @@ class TestDeleteNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): self.assertEqual(msg, str(e)) +class TestDeleteNetworkQosRuleMinimumPacketRate(TestNetworkQosRule): + + def setUp(self): + super(TestDeleteNetworkQosRuleMinimumPacketRate, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_PACKET_RATE} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.network.delete_qos_minimum_packet_rate_rule = mock.Mock( + return_value=None) + self.network.find_qos_minimum_packet_rate_rule = ( + network_fakes.FakeNetworkQosRule.get_qos_rules( + qos_rules=self.new_rule) + ) + + # Get the command object to test + self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app, + self.namespace) + + def test_qos_policy_delete(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.network.find_qos_policy.assert_called_once_with( + self.qos_policy.id, ignore_missing=False) + self.network.delete_qos_minimum_packet_rate_rule.\ + assert_called_once_with(self.new_rule.id, self.qos_policy.id) + self.assertIsNone(result) + + def test_qos_policy_delete_error(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + self.network.delete_qos_minimum_packet_rate_rule.side_effect = \ + Exception('Error message') + try: + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' % + {'rule': self.new_rule.id, 'e': 'Error message'}) + self.assertEqual(msg, str(e)) + + class TestDeleteNetworkQosRuleDSCPMarking(TestNetworkQosRule): def setUp(self): @@ -627,6 +780,100 @@ class TestSetNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): self.assertEqual(msg, str(e)) +class TestSetNetworkQosRuleMinimumPacketRate(TestNetworkQosRule): + + def setUp(self): + super(TestSetNetworkQosRuleMinimumPacketRate, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_PACKET_RATE} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs=attrs) + self.qos_policy.rules = [self.new_rule] + self.network.update_qos_minimum_packet_rate_rule = mock.Mock( + return_value=None) + self.network.find_qos_minimum_packet_rate_rule = mock.Mock( + return_value=self.new_rule) + self.network.find_qos_policy = mock.Mock( + return_value=self.qos_policy) + + # Get the command object to test + self.cmd = (network_qos_rule.SetNetworkQosRule(self.app, + self.namespace)) + + def test_set_nothing(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.network.update_qos_minimum_packet_rate_rule.assert_called_with( + self.new_rule, self.qos_policy.id) + self.assertIsNone(result) + + def test_set_min_kpps(self): + self._set_min_kpps() + + def test_set_min_kpps_to_zero(self): + self._set_min_kpps(min_kpps=0) + + def _set_min_kpps(self, min_kpps=None): + if min_kpps: + previous_min_kpps = self.new_rule.min_kpps + self.new_rule.min_kpps = min_kpps + + arglist = [ + '--min-kpps', str(self.new_rule.min_kpps), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('min_kpps', self.new_rule.min_kpps), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'min_kpps': self.new_rule.min_kpps, + } + self.network.update_qos_minimum_packet_rate_rule.assert_called_with( + self.new_rule, self.qos_policy.id, **attrs) + self.assertIsNone(result) + + if min_kpps: + self.new_rule.min_kpps = previous_min_kpps + + def test_set_wrong_options(self): + arglist = [ + '--min-kbps', str(10000), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('min_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type ' + '"minimum-packet-rate" only requires arguments: direction, ' + 'min_kpps' % {'rule': self.new_rule.id}) + self.assertEqual(msg, str(e)) + + class TestSetNetworkQosRuleDSCPMarking(TestNetworkQosRule): def setUp(self): @@ -893,6 +1140,9 @@ class TestListNetworkQosRule(TestNetworkQosRule): 'type': RULE_TYPE_MINIMUM_BANDWIDTH} self.new_rule_min_bw = (network_fakes.FakeNetworkQosRule. create_one_qos_rule(attrs=attrs)) + attrs['type'] = RULE_TYPE_MINIMUM_PACKET_RATE + self.new_rule_min_pps = (network_fakes.FakeNetworkQosRule. + create_one_qos_rule(attrs=attrs)) attrs['type'] = RULE_TYPE_DSCP_MARKING self.new_rule_dscp_mark = (network_fakes.FakeNetworkQosRule. create_one_qos_rule(attrs=attrs)) @@ -900,10 +1150,13 @@ class TestListNetworkQosRule(TestNetworkQosRule): self.new_rule_max_bw = (network_fakes.FakeNetworkQosRule. create_one_qos_rule(attrs=attrs)) self.qos_policy.rules = [self.new_rule_min_bw, + self.new_rule_min_pps, self.new_rule_dscp_mark, self.new_rule_max_bw] self.network.find_qos_minimum_bandwidth_rule = mock.Mock( return_value=self.new_rule_min_bw) + self.network.find_qos_minimum_packet_rate_rule = mock.Mock( + return_value=self.new_rule_min_pps) self.network.find_qos_dscp_marking_rule = mock.Mock( return_value=self.new_rule_dscp_mark) self.network.find_qos_bandwidth_limit_rule = mock.Mock( @@ -915,6 +1168,7 @@ class TestListNetworkQosRule(TestNetworkQosRule): 'Max Kbps', 'Max Burst Kbits', 'Min Kbps', + 'Min Kpps', 'DSCP mark', 'Direction', ) @@ -927,6 +1181,7 @@ class TestListNetworkQosRule(TestNetworkQosRule): getattr(self.qos_policy.rules[index], 'max_kbps', ''), getattr(self.qos_policy.rules[index], 'max_burst_kbps', ''), getattr(self.qos_policy.rules[index], 'min_kbps', ''), + getattr(self.qos_policy.rules[index], 'min_kpps', ''), getattr(self.qos_policy.rules[index], 'dscp_mark', ''), getattr(self.qos_policy.rules[index], 'direction', ''), )) @@ -1014,6 +1269,66 @@ class TestShowNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): self.assertEqual(list(self.data), list(data)) +class TestShowNetworkQosRuleMinimumPacketRate(TestNetworkQosRule): + + def setUp(self): + super(TestShowNetworkQosRuleMinimumPacketRate, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_PACKET_RATE} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.columns = ( + 'direction', + 'id', + 'min_kpps', + 'project_id', + 'qos_policy_id', + 'type' + ) + self.data = ( + self.new_rule.direction, + self.new_rule.id, + self.new_rule.min_kpps, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + + self.network.get_qos_minimum_packet_rate_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.ShowNetworkQosRule(self.app, + self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.get_qos_minimum_packet_rate_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertEqual(self.columns, columns) + self.assertEqual(list(self.data), list(data)) + + class TestShowNetworkQosDSCPMarking(TestNetworkQosRule): def setUp(self): diff --git a/openstackclient/tests/unit/network/v2/test_network_rbac.py b/openstackclient/tests/unit/network/v2/test_network_rbac.py index b0bc7e86..7ce25205 100644 --- a/openstackclient/tests/unit/network/v2/test_network_rbac.py +++ b/openstackclient/tests/unit/network/v2/test_network_rbac.py @@ -405,6 +405,9 @@ class TestListNetworkRABC(TestNetworkRBAC): self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies) + self.project = identity_fakes_v3.FakeProject.create_one_project() + self.projects_mock.get.return_value = self.project + def test_network_rbac_list(self): arglist = [] verifylist = [] @@ -466,6 +469,22 @@ class TestListNetworkRABC(TestNetworkRBAC): self.assertEqual(self.columns_long, columns) self.assertEqual(self.data_long, list(data)) + def test_network_rbac_list_target_project_opt(self): + arglist = [ + '--target-project', self.rbac_policies[0].target_project_id, ] + verifylist = [ + ('target_project', self.rbac_policies[0].target_project_id)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.rbac_policies.assert_called_with(**{ + 'target_project_id': self.project.id + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + class TestSetNetworkRBAC(TestNetworkRBAC): diff --git a/openstackclient/tests/unit/network/v2/test_subnet.py b/openstackclient/tests/unit/network/v2/test_subnet.py index 6b3ab2cc..7aaa583d 100644 --- a/openstackclient/tests/unit/network/v2/test_subnet.py +++ b/openstackclient/tests/unit/network/v2/test_subnet.py @@ -918,7 +918,7 @@ class TestListSubnet(TestSubnet): self.network.subnets.assert_called_once_with(**filters) self.assertEqual(self.columns, columns) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) def test_subnet_list_subnetpool_by_id(self): subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool() @@ -939,7 +939,7 @@ class TestListSubnet(TestSubnet): self.network.subnets.assert_called_once_with(**filters) self.assertEqual(self.columns, columns) - self.assertItemsEqual(self.data, list(data)) + self.assertCountEqual(self.data, list(data)) def test_list_with_tag_options(self): arglist = [ |
