summaryrefslogtreecommitdiff
path: root/openstackclient/tests
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests')
-rw-r--r--openstackclient/tests/functional/compute/v2/test_server_group.py24
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py201
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule.py67
-rw-r--r--openstackclient/tests/unit/common/test_configuration.py57
-rw-r--r--openstackclient/tests/unit/compute/v2/fakes.py95
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server.py333
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server_group.py285
-rw-r--r--openstackclient/tests/unit/fakes.py40
-rw-r--r--openstackclient/tests/unit/identity/v3/test_identity_provider.py170
-rw-r--r--openstackclient/tests/unit/identity/v3/test_trust.py108
-rw-r--r--openstackclient/tests/unit/image/v2/fakes.py73
-rw-r--r--openstackclient/tests/unit/image/v2/test_image.py2
-rw-r--r--openstackclient/tests/unit/image/v2/test_task.py187
-rw-r--r--openstackclient/tests/unit/network/v2/fakes.py80
-rw-r--r--openstackclient/tests/unit/network/v2/test_local_ip.py14
-rw-r--r--openstackclient/tests/unit/network/v2/test_ndp_proxy.py454
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_qos_rule.py327
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_rbac.py19
-rw-r--r--openstackclient/tests/unit/network/v2/test_subnet.py4
19 files changed, 2244 insertions, 296 deletions
diff --git a/openstackclient/tests/functional/compute/v2/test_server_group.py b/openstackclient/tests/functional/compute/v2/test_server_group.py
index 3dff3dcd..daeecd2d 100644
--- a/openstackclient/tests/functional/compute/v2/test_server_group.py
+++ b/openstackclient/tests/functional/compute/v2/test_server_group.py
@@ -33,8 +33,8 @@ class ServerGroupTests(base.TestCase):
cmd_output['name']
)
self.assertEqual(
- ['affinity'],
- cmd_output['policies']
+ 'affinity',
+ cmd_output['policy']
)
cmd_output = json.loads(self.openstack(
@@ -47,8 +47,8 @@ class ServerGroupTests(base.TestCase):
cmd_output['name']
)
self.assertEqual(
- ['anti-affinity'],
- cmd_output['policies']
+ 'anti-affinity',
+ cmd_output['policy']
)
del_output = self.openstack(
@@ -60,7 +60,7 @@ class ServerGroupTests(base.TestCase):
name1 = uuid.uuid4().hex
name2 = uuid.uuid4().hex
- # test server gorup show
+ # test server group show
cmd_output = json.loads(self.openstack(
'server group create -f json ' +
'--policy affinity ' +
@@ -74,8 +74,8 @@ class ServerGroupTests(base.TestCase):
cmd_output['name']
)
self.assertEqual(
- ['affinity'],
- cmd_output['policies']
+ 'affinity',
+ cmd_output['policy']
)
cmd_output = json.loads(self.openstack(
@@ -91,8 +91,8 @@ class ServerGroupTests(base.TestCase):
cmd_output['name']
)
self.assertEqual(
- ['anti-affinity'],
- cmd_output['policies']
+ 'anti-affinity',
+ cmd_output['policy']
)
# test server group list
@@ -101,6 +101,6 @@ class ServerGroupTests(base.TestCase):
names = [x["Name"] for x in cmd_output]
self.assertIn(name1, names)
self.assertIn(name2, names)
- policies = [x["Policies"] for x in cmd_output]
- self.assertIn(['affinity'], policies)
- self.assertIn(['anti-affinity'], policies)
+ policies = [x["Policy"] for x in cmd_output]
+ self.assertIn('affinity', policies)
+ self.assertIn('anti-affinity', policies)
diff --git a/openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py b/openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py
new file mode 100644
index 00000000..a10aef6b
--- /dev/null
+++ b/openstackclient/tests/functional/network/v2/test_network_ndp_proxy.py
@@ -0,0 +1,201 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import json
+
+from openstackclient.tests.functional.network.v2 import common
+
+
+class L3NDPProxyTests(common.NetworkTests):
+
+ def setUp(self):
+ super().setUp()
+ # Nothing in this class works with Nova Network
+ if not self.haz_network:
+ self.skipTest("No Network service present")
+ if not self.is_extension_enabled('l3-ndp-proxy'):
+ self.skipTest("No l3-ndp-proxy extension present")
+
+ self.ROT_NAME = self.getUniqueString()
+ self.EXT_NET_NAME = self.getUniqueString()
+ self.EXT_SUB_NAME = self.getUniqueString()
+ self.INT_NET_NAME = self.getUniqueString()
+ self.INT_SUB_NAME = self.getUniqueString()
+ self.INT_PORT_NAME = self.getUniqueString()
+ self.ADDR_SCOPE_NAME = self.getUniqueString()
+ self.SUBNET_P_NAME = self.getUniqueString()
+ self.created_ndp_proxies = []
+
+ json_output = json.loads(
+ self.openstack(
+ 'address scope create -f json --ip-version 6 '
+ '%(address_s_name)s' % {
+ 'address_s_name': self.ADDR_SCOPE_NAME}))
+ self.assertIsNotNone(json_output['id'])
+ self.ADDRESS_SCOPE_ID = json_output['id']
+ json_output = json.loads(
+ self.openstack(
+ 'subnet pool create -f json %(subnet_p_name)s '
+ '--address-scope %(address_scope)s '
+ '--pool-prefix 2001:db8::/96 --default-prefix-length 112' % {
+ 'subnet_p_name': self.SUBNET_P_NAME,
+ 'address_scope': self.ADDRESS_SCOPE_ID}))
+ self.assertIsNotNone(json_output['id'])
+ self.SUBNET_POOL_ID = json_output['id']
+ json_output = json.loads(
+ self.openstack('network create -f json '
+ '--external ' + self.EXT_NET_NAME))
+ self.assertIsNotNone(json_output['id'])
+ self.EXT_NET_ID = json_output['id']
+ json_output = json.loads(
+ self.openstack(
+ 'subnet create -f json --ip-version 6 --subnet-pool '
+ '%(subnet_pool)s --network %(net_id)s %(sub_name)s' % {
+ 'subnet_pool': self.SUBNET_POOL_ID,
+ 'net_id': self.EXT_NET_ID,
+ 'sub_name': self.EXT_SUB_NAME}))
+ self.assertIsNotNone(json_output['id'])
+ self.EXT_SUB_ID = json_output['id']
+ json_output = json.loads(
+ self.openstack('router create -f json ' + self.ROT_NAME))
+ self.assertIsNotNone(json_output['id'])
+ self.ROT_ID = json_output['id']
+ output = self.openstack(
+ 'router set %(router_id)s --external-gateway %(net_id)s' % {
+ 'router_id': self.ROT_ID,
+ 'net_id': self.EXT_NET_ID})
+ self.assertEqual('', output)
+ output = self.openstack('router set --enable-ndp-proxy ' + self.ROT_ID)
+ self.assertEqual('', output)
+ json_output = json.loads(
+ self.openstack(
+ 'router show -f json -c enable_ndp_proxy ' + self.ROT_ID))
+ self.assertTrue(json_output['enable_ndp_proxy'])
+ json_output = json.loads(
+ self.openstack('network create -f json ' + self.INT_NET_NAME))
+ self.assertIsNotNone(json_output['id'])
+ self.INT_NET_ID = json_output['id']
+ json_output = json.loads(
+ self.openstack(
+ 'subnet create -f json --ip-version 6 --subnet-pool '
+ '%(subnet_pool)s --network %(net_id)s %(sub_name)s' % {
+ 'subnet_pool': self.SUBNET_POOL_ID,
+ 'net_id': self.INT_NET_ID,
+ 'sub_name': self.INT_SUB_NAME}))
+ self.assertIsNotNone(json_output['id'])
+ self.INT_SUB_ID = json_output['id']
+ json_output = json.loads(
+ self.openstack(
+ 'port create -f json --network %(net_id)s '
+ '%(port_name)s' % {
+ 'net_id': self.INT_NET_ID,
+ 'port_name': self.INT_PORT_NAME}))
+ self.assertIsNotNone(json_output['id'])
+ self.INT_PORT_ID = json_output['id']
+ self.INT_PORT_ADDRESS = json_output['fixed_ips'][0]['ip_address']
+ output = self.openstack(
+ 'router add subnet ' + self.ROT_ID + ' ' + self.INT_SUB_ID)
+ self.assertEqual('', output)
+
+ def tearDown(self):
+ for ndp_proxy in self.created_ndp_proxies:
+ output = self.openstack(
+ 'router ndp proxy delete ' + ndp_proxy['id'])
+ self.assertEqual('', output)
+ output = self.openstack('port delete ' + self.INT_PORT_ID)
+ self.assertEqual('', output)
+ output = self.openstack(
+ 'router set --disable-ndp-proxy ' + self.ROT_ID)
+ self.assertEqual('', output)
+ output = self.openstack(
+ 'router remove subnet ' + self.ROT_ID + ' ' + self.INT_SUB_ID)
+ self.assertEqual('', output)
+ output = self.openstack('subnet delete ' + self.INT_SUB_ID)
+ self.assertEqual('', output)
+ output = self.openstack('network delete ' + self.INT_NET_ID)
+ self.assertEqual('', output)
+ output = self.openstack(
+ 'router unset ' + self.ROT_ID + ' ' + '--external-gateway')
+ self.assertEqual('', output)
+ output = self.openstack('router delete ' + self.ROT_ID)
+ self.assertEqual('', output)
+ output = self.openstack('subnet delete ' + self.EXT_SUB_ID)
+ self.assertEqual('', output)
+ output = self.openstack('network delete ' + self.EXT_NET_ID)
+ self.assertEqual('', output)
+ output = self.openstack('subnet pool delete ' + self.SUBNET_POOL_ID)
+ self.assertEqual('', output)
+ output = self.openstack('address scope delete ' +
+ self.ADDRESS_SCOPE_ID)
+ self.assertEqual('', output)
+ super().tearDown()
+
+ def _create_ndp_proxies(self, ndp_proxies):
+ for ndp_proxy in ndp_proxies:
+ output = json.loads(
+ self.openstack(
+ 'router ndp proxy create %(router)s --name %(name)s '
+ '--port %(port)s --ip-address %(address)s -f json' % {
+ 'router': ndp_proxy['router_id'],
+ 'name': ndp_proxy['name'],
+ 'port': ndp_proxy['port_id'],
+ 'address': ndp_proxy['address']}))
+ self.assertEqual(ndp_proxy['router_id'], output['router_id'])
+ self.assertEqual(ndp_proxy['port_id'], output['port_id'])
+ self.assertEqual(ndp_proxy['address'], output['ip_address'])
+ self.created_ndp_proxies.append(output)
+
+ def test_create_ndp_proxy(self):
+ ndp_proxies = [
+ {
+ 'name': self.getUniqueString(),
+ 'router_id': self.ROT_ID,
+ 'port_id': self.INT_PORT_ID,
+ 'address': self.INT_PORT_ADDRESS
+ }
+ ]
+ self._create_ndp_proxies(ndp_proxies)
+
+ def test_ndp_proxy_list(self):
+ ndp_proxies = {
+ 'name': self.getUniqueString(),
+ 'router_id': self.ROT_ID,
+ 'port_id': self.INT_PORT_ID,
+ 'address': self.INT_PORT_ADDRESS}
+ self._create_ndp_proxies([ndp_proxies])
+ ndp_proxy = json.loads(self.openstack(
+ 'router ndp proxy list -f json'))[0]
+ self.assertEqual(ndp_proxies['name'], ndp_proxy['Name'])
+ self.assertEqual(ndp_proxies['router_id'], ndp_proxy['Router ID'])
+ self.assertEqual(ndp_proxies['address'], ndp_proxy['IP Address'])
+
+ def test_ndp_proxy_set_and_show(self):
+ ndp_proxies = {
+ 'name': self.getUniqueString(),
+ 'router_id': self.ROT_ID,
+ 'port_id': self.INT_PORT_ID,
+ 'address': self.INT_PORT_ADDRESS}
+ description = 'balala'
+ self._create_ndp_proxies([ndp_proxies])
+ ndp_proxy_id = self.created_ndp_proxies[0]['id']
+ output = self.openstack(
+ 'router ndp proxy set --description %s %s' % (
+ description, ndp_proxy_id))
+ self.assertEqual('', output)
+ json_output = json.loads(
+ self.openstack('router ndp proxy show -f json ' + ndp_proxy_id))
+ self.assertEqual(ndp_proxies['name'], json_output['name'])
+ self.assertEqual(ndp_proxies['router_id'], json_output['router_id'])
+ self.assertEqual(ndp_proxies['port_id'], json_output['port_id'])
+ self.assertEqual(ndp_proxies['address'], json_output['ip_address'])
+ self.assertEqual(description, json_output['description'])
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
index 98e588e8..fd411b35 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
@@ -85,6 +85,73 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
self.assertEqual(7500, cmd_output['min_kbps'])
+class NetworkQosRuleTestsMinimumPacketRate(common.NetworkTests):
+ """Functional tests for QoS minimum packet rate rule"""
+
+ def setUp(self):
+ super(NetworkQosRuleTestsMinimumPacketRate, self).setUp()
+ # Nothing in this class works with Nova Network
+ if not self.haz_network:
+ self.skipTest("No Network service present")
+
+ self.QOS_POLICY_NAME = 'qos_policy_%s' % uuid.uuid4().hex
+
+ self.openstack(
+ 'network qos policy create %s' % self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete %s' % self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json '
+ '--type minimum-packet-rate '
+ '--min-kpps 2800 '
+ '--egress %s' %
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete %s %s' %
+ (self.QOS_POLICY_NAME, self.RULE_ID))
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json %s' % policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete %s' % policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json '
+ '--type minimum-packet-rate '
+ '--min-kpps 2800 '
+ '--egress %s' % policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete %s %s' %
+ (policy_name, rule['id']))
+ self.assertEqual('', raw_output)
+
+ def test_qos_rule_list(self):
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json %s' % self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
+
+ def test_qos_rule_show(self):
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json %s %s' %
+ (self.QOS_POLICY_NAME, self.RULE_ID)))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
+
+ def test_qos_rule_set(self):
+ self.openstack('network qos rule set --min-kpps 7500 --any %s %s' %
+ (self.QOS_POLICY_NAME, self.RULE_ID))
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json %s %s' %
+ (self.QOS_POLICY_NAME, self.RULE_ID)))
+ self.assertEqual(7500, cmd_output['min_kpps'])
+ self.assertEqual('any', cmd_output['direction'])
+
+
class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
"""Functional tests for QoS DSCP marking rule"""
diff --git a/openstackclient/tests/unit/common/test_configuration.py b/openstackclient/tests/unit/common/test_configuration.py
index bdd3debf..148228ec 100644
--- a/openstackclient/tests/unit/common/test_configuration.py
+++ b/openstackclient/tests/unit/common/test_configuration.py
@@ -35,11 +35,14 @@ class TestConfiguration(utils.TestCommand):
fakes.REGION_NAME,
)
- opts = [mock.Mock(secret=True, dest="password"),
- mock.Mock(secret=True, dest="token")]
+ opts = [
+ mock.Mock(secret=True, dest="password"),
+ mock.Mock(secret=True, dest="token"),
+ ]
- @mock.patch("keystoneauth1.loading.base.get_plugin_options",
- return_value=opts)
+ @mock.patch(
+ "keystoneauth1.loading.base.get_plugin_options", return_value=opts
+ )
def test_show(self, m_get_plugin_opts):
arglist = []
verifylist = [('mask', True)]
@@ -51,12 +54,14 @@ class TestConfiguration(utils.TestCommand):
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
- @mock.patch("keystoneauth1.loading.base.get_plugin_options",
- return_value=opts)
+ @mock.patch(
+ "keystoneauth1.loading.base.get_plugin_options", return_value=opts
+ )
def test_show_unmask(self, m_get_plugin_opts):
arglist = ['--unmask']
verifylist = [('mask', False)]
cmd = configuration.ShowConfiguration(self.app, None)
+
parsed_args = self.check_parser(cmd, arglist, verifylist)
columns, data = cmd.take_action(parsed_args)
@@ -71,15 +76,49 @@ class TestConfiguration(utils.TestCommand):
)
self.assertEqual(datalist, data)
- @mock.patch("keystoneauth1.loading.base.get_plugin_options",
- return_value=opts)
- def test_show_mask(self, m_get_plugin_opts):
+ @mock.patch(
+ "keystoneauth1.loading.base.get_plugin_options", return_value=opts
+ )
+ def test_show_mask_with_cloud_config(self, m_get_plugin_opts):
arglist = ['--mask']
verifylist = [('mask', True)]
+ self.app.client_manager.configuration_type = "cloud_config"
cmd = configuration.ShowConfiguration(self.app, None)
+
parsed_args = self.check_parser(cmd, arglist, verifylist)
columns, data = cmd.take_action(parsed_args)
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
+
+ @mock.patch(
+ "keystoneauth1.loading.base.get_plugin_options", return_value=opts
+ )
+ def test_show_mask_with_global_env(self, m_get_plugin_opts):
+ arglist = ['--mask']
+ verifylist = [('mask', True)]
+ self.app.client_manager.configuration_type = "global_env"
+ column_list = (
+ 'identity_api_version',
+ 'password',
+ 'region',
+ 'token',
+ 'username',
+ )
+ datalist = (
+ fakes.VERSION,
+ configuration.REDACTED,
+ fakes.REGION_NAME,
+ configuration.REDACTED,
+ fakes.USERNAME,
+ )
+
+ cmd = configuration.ShowConfiguration(self.app, None)
+
+ parsed_args = self.check_parser(cmd, arglist, verifylist)
+
+ columns, data = cmd.take_action(parsed_args)
+
+ self.assertEqual(column_list, columns)
+ self.assertEqual(datalist, data)
diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py
index a1e7754a..d77797ab 100644
--- a/openstackclient/tests/unit/compute/v2/fakes.py
+++ b/openstackclient/tests/unit/compute/v2/fakes.py
@@ -21,6 +21,7 @@ import uuid
from novaclient import api_versions
from openstack.compute.v2 import flavor as _flavor
from openstack.compute.v2 import server
+from openstack.compute.v2 import server_group as _server_group
from openstack.compute.v2 import server_interface as _server_interface
from openstack.compute.v2 import service
from openstack.compute.v2 import volume_attachment
@@ -1290,72 +1291,6 @@ class FakeHost(object):
return host_info
-class FakeServerGroup(object):
- """Fake one server group"""
-
- @staticmethod
- def _create_one_server_group(attrs=None):
- """Create a fake server group
-
- :param dict attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id and other attributes
- """
- if attrs is None:
- attrs = {}
-
- # Set default attributes.
- server_group_info = {
- 'id': 'server-group-id-' + uuid.uuid4().hex,
- 'members': [],
- 'metadata': {},
- 'name': 'server-group-name-' + uuid.uuid4().hex,
- 'project_id': 'server-group-project-id-' + uuid.uuid4().hex,
- 'user_id': 'server-group-user-id-' + uuid.uuid4().hex,
- }
-
- # Overwrite default attributes.
- server_group_info.update(attrs)
-
- server_group = fakes.FakeResource(
- info=copy.deepcopy(server_group_info),
- loaded=True)
- return server_group
-
- @staticmethod
- def create_one_server_group(attrs=None):
- """Create a fake server group
-
- :param dict attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id and other attributes
- """
- if attrs is None:
- attrs = {}
- attrs.setdefault('policies', ['policy1', 'policy2'])
- return FakeServerGroup._create_one_server_group(attrs)
-
-
-class FakeServerGroupV264(object):
- """Fake one server group for API >= 2.64"""
-
- @staticmethod
- def create_one_server_group(attrs=None):
- """Create a fake server group
-
- :param dict attrs:
- A dictionary with all attributes
- :return:
- A FakeResource object, with id and other attributes
- """
- if attrs is None:
- attrs = {}
- attrs.setdefault('policy', 'policy1')
- return FakeServerGroup._create_one_server_group(attrs)
-
-
class FakeUsage(object):
"""Fake one or more usage."""
@@ -1860,6 +1795,34 @@ class FakeVolumeAttachment(object):
return volume_attachments
+def create_one_server_group(attrs=None):
+ """Create a fake server group
+
+ :param dict attrs:
+ A dictionary with all attributes
+ :return:
+ A fake ServerGroup object, with id and other attributes
+ """
+ if attrs is None:
+ attrs = {}
+
+ # Set default attributes.
+ server_group_info = {
+ 'id': 'server-group-id-' + uuid.uuid4().hex,
+ 'member_ids': '',
+ 'metadata': {},
+ 'name': 'server-group-name-' + uuid.uuid4().hex,
+ 'project_id': 'server-group-project-id-' + uuid.uuid4().hex,
+ 'user_id': 'server-group-user-id-' + uuid.uuid4().hex,
+ }
+
+ # Overwrite default attributes.
+ server_group_info.update(attrs)
+
+ server_group = _server_group.ServerGroup(**server_group_info)
+ return server_group
+
+
def create_one_server_interface(attrs=None):
"""Create a fake SDK ServerInterface.
diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py
index 004f3a05..2e64e071 100644
--- a/openstackclient/tests/unit/compute/v2/test_server.py
+++ b/openstackclient/tests/unit/compute/v2/test_server.py
@@ -161,6 +161,10 @@ class TestServer(compute_fakes.TestComputev2):
return volumes
def run_method_with_servers(self, method_name, server_count):
+ # Starting with v2.91, the nova api needs to be call with a sentinel
+ # as availability_zone=None will unpin the server az.
+ _sentinel = object()
+
servers = self.setup_servers_mock(server_count)
arglist = []
@@ -183,7 +187,11 @@ class TestServer(compute_fakes.TestComputev2):
method.assert_called_with(reason=None)
elif method_name == 'unshelve':
version = self.app.client_manager.compute.api_version
- if version >= api_versions.APIVersion('2.77'):
+ if version >= api_versions.APIVersion('2.91'):
+ method.assert_called_with(availability_zone=_sentinel,
+ host=None)
+ elif (version >= api_versions.APIVersion('2.77') and
+ version < api_versions.APIVersion('2.91')):
method.assert_called_with(availability_zone=None)
else:
method.assert_called_with()
@@ -4441,8 +4449,8 @@ class TestServerList(_TestServerList):
columns, data = self.cmd.take_action(parsed_args)
self.servers_mock.list.assert_called_with(**self.kwargs)
- self.assertEqual(0, self.images_mock.list.call_count)
- self.assertEqual(0, self.flavors_mock.list.call_count)
+ self.images_mock.assert_not_called()
+ self.flavors_mock.list.assert_not_called()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, tuple(data))
@@ -4465,7 +4473,8 @@ class TestServerList(_TestServerList):
getattr(s, 'OS-EXT-AZ:availability_zone'),
getattr(s, 'OS-EXT-SRV-ATTR:host'),
s.Metadata,
- ) for s in self.servers)
+ ) for s in self.servers
+ )
arglist = [
'--long',
]
@@ -4477,6 +4486,11 @@ class TestServerList(_TestServerList):
columns, data = self.cmd.take_action(parsed_args)
self.servers_mock.list.assert_called_with(**self.kwargs)
+ image_ids = {s.image['id'] for s in self.servers if s.image}
+ self.images_mock.assert_called_once_with(
+ id=f'in:{",".join(image_ids)}',
+ )
+ self.flavors_mock.list.assert_called_once_with(is_public=None)
self.assertEqual(self.columns_long, columns)
self.assertEqual(self.data, tuple(data))
@@ -4540,6 +4554,8 @@ class TestServerList(_TestServerList):
columns, data = self.cmd.take_action(parsed_args)
self.servers_mock.list.assert_called_with(**self.kwargs)
+ self.images_mock.assert_not_called()
+ self.flavors_mock.list.assert_not_called()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, tuple(data))
@@ -4568,6 +4584,8 @@ class TestServerList(_TestServerList):
columns, data = self.cmd.take_action(parsed_args)
self.servers_mock.list.assert_called_with(**self.kwargs)
+ self.images_mock.assert_not_called()
+ self.flavors_mock.list.assert_not_called()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, tuple(data))
@@ -4585,8 +4603,8 @@ class TestServerList(_TestServerList):
columns, data = self.cmd.take_action(parsed_args)
self.servers_mock.list.assert_called_with(**self.kwargs)
- self.assertFalse(self.images_mock.list.call_count)
- self.assertFalse(self.flavors_mock.list.call_count)
+ self.images_mock.assert_not_called()
+ self.flavors_mock.list.assert_not_called()
self.get_image_mock.assert_called()
self.flavors_mock.get.assert_called()
@@ -4610,6 +4628,8 @@ class TestServerList(_TestServerList):
self.search_opts['image'] = self.image.id
self.servers_mock.list.assert_called_with(**self.kwargs)
+ self.images_mock.assert_not_called()
+ self.flavors_mock.list.assert_called_once()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, tuple(data))
@@ -4626,10 +4646,12 @@ class TestServerList(_TestServerList):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.flavors_mock.get.has_calls(self.flavor.id)
+ self.flavors_mock.get.assert_has_calls([mock.call(self.flavor.id)])
self.search_opts['flavor'] = self.flavor.id
self.servers_mock.list.assert_called_with(**self.kwargs)
+ self.images_mock.assert_called_once()
+ self.flavors_mock.list.assert_not_called()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, tuple(data))
@@ -5093,8 +5115,8 @@ class TestServerListV273(_TestServerList):
self.search_opts['locked'] = True
self.servers_mock.list.assert_called_with(**self.kwargs)
- self.assertItemsEqual(self.columns, columns)
- self.assertItemsEqual(self.data, tuple(data))
+ self.assertCountEqual(self.columns, columns)
+ self.assertCountEqual(self.data, tuple(data))
def test_server_list_with_unlocked_v273(self):
@@ -5113,8 +5135,8 @@ class TestServerListV273(_TestServerList):
self.search_opts['locked'] = False
self.servers_mock.list.assert_called_with(**self.kwargs)
- self.assertItemsEqual(self.columns, columns)
- self.assertItemsEqual(self.data, tuple(data))
+ self.assertCountEqual(self.columns, columns)
+ self.assertCountEqual(self.data, tuple(data))
def test_server_list_with_locked_and_unlocked(self):
@@ -5154,8 +5176,8 @@ class TestServerListV273(_TestServerList):
self.servers_mock.list.assert_called_with(**self.kwargs)
- self.assertItemsEqual(self.columns, columns)
- self.assertItemsEqual(self.data, tuple(data))
+ self.assertCountEqual(self.columns, columns)
+ self.assertCountEqual(self.data, tuple(data))
@mock.patch.object(iso8601, 'parse_date', side_effect=iso8601.ParseError)
def test_server_list_with_invalid_changes_before(
@@ -5773,6 +5795,25 @@ class TestServerRebuild(TestServer):
self.get_image_mock.assert_called_with(self.image.id)
self.server.rebuild.assert_called_with(self.image, None)
+ def test_rebuild_with_volume_backed_server_no_image(self):
+ # the volume-backed server will have the image attribute set to an
+ # empty string, not null/None
+ self.server.image = ''
+
+ arglist = [
+ self.server.id,
+ ]
+ verifylist = [
+ ('server', self.server.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn('The --image option is required', str(exc))
+
def test_rebuild_with_name(self):
name = 'test-server-xxx'
arglist = [
@@ -6267,6 +6308,103 @@ class TestServerRebuild(TestServer):
parsed_args)
+class TestServerRebuildVolumeBacked(TestServer):
+
+ def setUp(self):
+ super().setUp()
+
+ self.new_image = image_fakes.create_one_image()
+ self.find_image_mock.return_value = self.new_image
+
+ attrs = {
+ 'image': '',
+ 'networks': {},
+ 'adminPass': 'passw0rd',
+ }
+ new_server = compute_fakes.FakeServer.create_one_server(attrs=attrs)
+
+ # Fake the server to be rebuilt. The IDs of them should be the same.
+ attrs['id'] = new_server.id
+ methods = {
+ 'rebuild': new_server,
+ }
+ self.server = compute_fakes.FakeServer.create_one_server(
+ attrs=attrs,
+ methods=methods
+ )
+
+ # Return value for utils.find_resource for server.
+ self.servers_mock.get.return_value = self.server
+
+ self.cmd = server.RebuildServer(self.app, None)
+
+ def test_rebuild_with_reimage_boot_volume(self):
+ self.app.client_manager.compute.api_version = \
+ api_versions.APIVersion('2.93')
+
+ arglist = [
+ self.server.id,
+ '--reimage-boot-volume',
+ '--image', self.new_image.id
+ ]
+ verifylist = [
+ ('server', self.server.id),
+ ('reimage_boot_volume', True),
+ ('image', self.new_image.id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(self.server.id)
+ self.server.rebuild.assert_called_with(
+ self.new_image, None)
+
+ def test_rebuild_with_no_reimage_boot_volume(self):
+ self.app.client_manager.compute.api_version = \
+ api_versions.APIVersion('2.93')
+
+ arglist = [
+ self.server.id,
+ '--no-reimage-boot-volume',
+ '--image', self.new_image.id
+ ]
+ verifylist = [
+ ('server', self.server.id),
+ ('reimage_boot_volume', False),
+ ('image', self.new_image.id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn('--reimage-boot-volume is required', str(exc))
+
+ def test_rebuild_with_reimage_boot_volume_pre_v293(self):
+ self.app.client_manager.compute.api_version = \
+ api_versions.APIVersion('2.92')
+
+ arglist = [
+ self.server.id,
+ '--reimage-boot-volume',
+ '--image', self.new_image.id
+ ]
+ verifylist = [
+ ('server', self.server.id),
+ ('reimage_boot_volume', True)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-compute-api-version 2.93 or greater is required', str(exc))
+
+
class TestEvacuateServer(TestServer):
def setUp(self):
@@ -8204,7 +8342,23 @@ class TestServerUnshelve(TestServer):
def test_unshelve_multi_servers(self):
self.run_method_with_servers('unshelve', 3)
- def test_unshelve_with_specified_az(self):
+ def test_unshelve_v277(self):
+ self.app.client_manager.compute.api_version = \
+ api_versions.APIVersion('2.77')
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [server.id]
+ verifylist = [('server', [server.id])]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+ server.unshelve.assert_called_with()
+
+ def test_unshelve_with_specified_az_v277(self):
self.app.client_manager.compute.api_version = \
api_versions.APIVersion('2.77')
@@ -8248,6 +8402,157 @@ class TestServerUnshelve(TestServer):
self.assertIn(
'--os-compute-api-version 2.77 or greater is required', str(ex))
+ def test_unshelve_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [server.id]
+ verifylist = [('server', [server.id])]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+ server.unshelve.assert_called_with()
+
+ def test_unshelve_with_specified_az_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [
+ '--availability-zone', "foo-az",
+ server.id,
+ ]
+ verifylist = [
+ ('availability_zone', "foo-az"),
+ ('server', [server.id])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+ server.unshelve.assert_called_with(availability_zone="foo-az")
+
+ def test_unshelve_with_specified_host_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [
+ '--host', "server1",
+ server.id,
+ ]
+ verifylist = [
+ ('host', "server1"),
+ ('server', [server.id])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+ server.unshelve.assert_called_with(host="server1")
+
+ def test_unshelve_with_unpin_az_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = ['--no-availability-zone', server.id]
+ verifylist = [
+ ('no_availability_zone', True),
+ ('server', [server.id])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+ server.unshelve.assert_called_with(availability_zone=None)
+
+ def test_unshelve_with_specified_az_and_host_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [
+ '--host', "server1",
+ '--availability-zone', "foo-az",
+ server.id,
+ ]
+ verifylist = [
+ ('host', "server1"),
+ ('availability_zone', "foo-az"),
+ ('server', [server.id])
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+
+ def test_unshelve_with_unpin_az_and_host_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [
+ '--host', "server1",
+ '--no-availability-zone',
+ server.id,
+ ]
+ verifylist = [
+ ('host', "server1"),
+ ('no_availability_zone', True),
+ ('server', [server.id])
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.servers_mock.get.assert_called_with(server.id)
+
+ def test_unshelve_fails_with_unpin_az_and_az_v291(self):
+ self.app.client_manager.compute.api_version = (
+ api_versions.APIVersion('2.91'))
+
+ server = compute_fakes.FakeServer.create_one_server(
+ attrs=self.attrs, methods=self.methods)
+ self.servers_mock.get.return_value = server
+ arglist = [
+ '--availability-zone', "foo-az",
+ '--no-availability-zone',
+ server.id,
+ ]
+ verifylist = [
+ ('availability_zone', "foo-az"),
+ ('no_availability_zone', True),
+ ('server', [server.id])
+ ]
+
+ ex = self.assertRaises(utils.ParserException,
+ self.check_parser,
+ self.cmd, arglist, verifylist)
+ self.assertIn('argument --no-availability-zone: not allowed '
+ 'with argument --availability-zone', str(ex))
+
@mock.patch.object(common_utils, 'wait_for_status', return_value=True)
def test_unshelve_with_wait(self, mock_wait_for_status):
server = compute_fakes.FakeServer.create_one_server(
diff --git a/openstackclient/tests/unit/compute/v2/test_server_group.py b/openstackclient/tests/unit/compute/v2/test_server_group.py
index 3ed19e27..655366a8 100644
--- a/openstackclient/tests/unit/compute/v2/test_server_group.py
+++ b/openstackclient/tests/unit/compute/v2/test_server_group.py
@@ -15,10 +15,9 @@
from unittest import mock
-from novaclient import api_versions
+from openstack import utils as sdk_utils
from osc_lib.cli import format_columns
from osc_lib import exceptions
-from osc_lib import utils
from openstackclient.compute.v2 import server_group
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
@@ -27,38 +26,7 @@ from openstackclient.tests.unit import utils as tests_utils
class TestServerGroup(compute_fakes.TestComputev2):
- fake_server_group = compute_fakes.FakeServerGroup.create_one_server_group()
-
- columns = (
- 'id',
- 'members',
- 'name',
- 'policies',
- 'project_id',
- 'user_id',
- )
-
- data = (
- fake_server_group.id,
- format_columns.ListColumn(fake_server_group.members),
- fake_server_group.name,
- format_columns.ListColumn(fake_server_group.policies),
- fake_server_group.project_id,
- fake_server_group.user_id,
- )
-
- def setUp(self):
- super(TestServerGroup, self).setUp()
-
- # Get a shortcut to the ServerGroupsManager Mock
- self.server_groups_mock = self.app.client_manager.compute.server_groups
- self.server_groups_mock.reset_mock()
-
-
-class TestServerGroupV264(TestServerGroup):
-
- fake_server_group = \
- compute_fakes.FakeServerGroupV264.create_one_server_group()
+ fake_server_group = compute_fakes.create_one_server_group()
columns = (
'id',
@@ -66,31 +34,40 @@ class TestServerGroupV264(TestServerGroup):
'name',
'policy',
'project_id',
+ 'rules',
'user_id',
)
data = (
fake_server_group.id,
- format_columns.ListColumn(fake_server_group.members),
+ format_columns.ListColumn(fake_server_group.member_ids),
fake_server_group.name,
fake_server_group.policy,
fake_server_group.project_id,
+ format_columns.DictColumn(fake_server_group.rules),
fake_server_group.user_id,
)
def setUp(self):
- super(TestServerGroupV264, self).setUp()
+ super().setUp()
+
+ # Create and get a shortcut to the compute client mock
+ self.app.client_manager.sdk_connection = mock.Mock()
+ self.sdk_client = self.app.client_manager.sdk_connection.compute
+ self.sdk_client.reset_mock()
class TestServerGroupCreate(TestServerGroup):
def setUp(self):
- super(TestServerGroupCreate, self).setUp()
+ super().setUp()
- self.server_groups_mock.create.return_value = self.fake_server_group
+ self.sdk_client.create_server_group.return_value = \
+ self.fake_server_group
self.cmd = server_group.CreateServerGroup(self.app, None)
- def test_server_group_create(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_create(self, sm_mock):
arglist = [
'--policy', 'anti-affinity',
'affinity_group',
@@ -101,18 +78,16 @@ class TestServerGroupCreate(TestServerGroup):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.create.assert_called_once_with(
+ self.sdk_client.create_server_group.assert_called_once_with(
name=parsed_args.name,
- policies=[parsed_args.policy],
+ policy=parsed_args.policy,
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
- def test_server_group_create_with_soft_policies(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.15')
-
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_create_with_soft_policies(self, sm_mock):
arglist = [
'--policy', 'soft-anti-affinity',
'affinity_group',
@@ -123,18 +98,16 @@ class TestServerGroupCreate(TestServerGroup):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.create.assert_called_once_with(
+ self.sdk_client.create_server_group.assert_called_once_with(
name=parsed_args.name,
- policies=[parsed_args.policy],
+ policy=parsed_args.policy,
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
- def test_server_group_create_with_soft_policies_pre_v215(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.14')
-
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
+ def test_server_group_create_with_soft_policies_pre_v215(self, sm_mock):
arglist = [
'--policy', 'soft-anti-affinity',
'affinity_group',
@@ -152,10 +125,8 @@ class TestServerGroupCreate(TestServerGroup):
'--os-compute-api-version 2.15 or greater is required',
str(ex))
- def test_server_group_create_with_rules(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.64')
-
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_create_with_rules(self, sm_mock):
arglist = [
'--policy', 'soft-anti-affinity',
'--rule', 'max_server_per_host=2',
@@ -168,19 +139,18 @@ class TestServerGroupCreate(TestServerGroup):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.create.assert_called_once_with(
+ self.sdk_client.create_server_group.assert_called_once_with(
name=parsed_args.name,
- policy=parsed_args.policy, # should be 'policy', not 'policies'
+ policy=parsed_args.policy,
rules=parsed_args.rules,
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
- def test_server_group_create_with_rules_pre_v264(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.63')
-
+ @mock.patch.object(
+ sdk_utils, 'supports_microversion', side_effect=[True, False])
+ def test_server_group_create_with_rules_pre_v264(self, sm_mock):
arglist = [
'--policy', 'soft-anti-affinity',
'--rule', 'max_server_per_host=2',
@@ -205,9 +175,9 @@ class TestServerGroupCreate(TestServerGroup):
class TestServerGroupDelete(TestServerGroup):
def setUp(self):
- super(TestServerGroupDelete, self).setUp()
+ super().setUp()
- self.server_groups_mock.get.return_value = self.fake_server_group
+ self.sdk_client.find_server_group.return_value = self.fake_server_group
self.cmd = server_group.DeleteServerGroup(self.app, None)
def test_server_group_delete(self):
@@ -219,8 +189,10 @@ class TestServerGroupDelete(TestServerGroup):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.server_groups_mock.get.assert_called_once_with('affinity_group')
- self.server_groups_mock.delete.assert_called_once_with(
+ self.sdk_client.find_server_group.assert_called_once_with(
+ 'affinity_group'
+ )
+ self.sdk_client.delete_server_group.assert_called_once_with(
self.fake_server_group.id
)
self.assertIsNone(result)
@@ -235,13 +207,15 @@ class TestServerGroupDelete(TestServerGroup):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.server_groups_mock.get.assert_any_call('affinity_group')
- self.server_groups_mock.get.assert_any_call('anti_affinity_group')
- self.server_groups_mock.delete.assert_called_with(
+ self.sdk_client.find_server_group.assert_any_call('affinity_group')
+ self.sdk_client.find_server_group.assert_any_call(
+ 'anti_affinity_group'
+ )
+ self.sdk_client.delete_server_group.assert_called_with(
self.fake_server_group.id
)
- self.assertEqual(2, self.server_groups_mock.get.call_count)
- self.assertEqual(2, self.server_groups_mock.delete.call_count)
+ self.assertEqual(2, self.sdk_client.find_server_group.call_count)
+ self.assertEqual(2, self.sdk_client.delete_server_group.call_count)
self.assertIsNone(result)
def test_server_group_delete_no_input(self):
@@ -262,25 +236,23 @@ class TestServerGroupDelete(TestServerGroup):
('server_group', ['affinity_group', 'anti_affinity_group']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- find_mock_result = [self.fake_server_group, exceptions.CommandError]
- with mock.patch.object(utils, 'find_resource',
- side_effect=find_mock_result) as find_mock:
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 server groups failed to delete.',
- str(e))
-
- find_mock.assert_any_call(self.server_groups_mock,
- 'affinity_group')
- find_mock.assert_any_call(self.server_groups_mock,
- 'anti_affinity_group')
-
- self.assertEqual(2, find_mock.call_count)
- self.server_groups_mock.delete.assert_called_once_with(
- self.fake_server_group.id
- )
+
+ self.sdk_client.find_server_group.side_effect = [
+ self.fake_server_group, exceptions.CommandError]
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 server groups failed to delete.', str(e))
+
+ self.sdk_client.find_server_group.assert_any_call('affinity_group')
+ self.sdk_client.find_server_group.assert_any_call(
+ 'anti_affinity_group'
+ )
+ self.assertEqual(2, self.sdk_client.find_server_group.call_count)
+ self.sdk_client.delete_server_group.assert_called_once_with(
+ self.fake_server_group.id
+ )
class TestServerGroupList(TestServerGroup):
@@ -300,28 +272,67 @@ class TestServerGroupList(TestServerGroup):
'User Id',
)
+ list_columns_v264 = (
+ 'ID',
+ 'Name',
+ 'Policy',
+ )
+
+ list_columns_v264_long = (
+ 'ID',
+ 'Name',
+ 'Policy',
+ 'Members',
+ 'Project Id',
+ 'User Id',
+ )
+
list_data = ((
TestServerGroup.fake_server_group.id,
TestServerGroup.fake_server_group.name,
- format_columns.ListColumn(TestServerGroup.fake_server_group.policies),
+ format_columns.ListColumn(
+ TestServerGroup.fake_server_group.policies
+ ),
),)
list_data_long = ((
TestServerGroup.fake_server_group.id,
TestServerGroup.fake_server_group.name,
- format_columns.ListColumn(TestServerGroup.fake_server_group.policies),
- format_columns.ListColumn(TestServerGroup.fake_server_group.members),
+ format_columns.ListColumn(
+ TestServerGroup.fake_server_group.policies
+ ),
+ format_columns.ListColumn(
+ TestServerGroup.fake_server_group.member_ids
+ ),
+ TestServerGroup.fake_server_group.project_id,
+ TestServerGroup.fake_server_group.user_id,
+ ),)
+
+ list_data_v264 = ((
+ TestServerGroup.fake_server_group.id,
+ TestServerGroup.fake_server_group.name,
+ TestServerGroup.fake_server_group.policy,
+ ),)
+
+ list_data_v264_long = ((
+ TestServerGroup.fake_server_group.id,
+ TestServerGroup.fake_server_group.name,
+ TestServerGroup.fake_server_group.policy,
+ format_columns.ListColumn(
+ TestServerGroup.fake_server_group.member_ids
+ ),
TestServerGroup.fake_server_group.project_id,
TestServerGroup.fake_server_group.user_id,
),)
def setUp(self):
- super(TestServerGroupList, self).setUp()
+ super().setUp()
- self.server_groups_mock.list.return_value = [self.fake_server_group]
+ self.sdk_client.server_groups.return_value = [self.fake_server_group]
self.cmd = server_group.ListServerGroup(self.app, None)
- def test_server_group_list(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
+ def test_server_group_list(self, sm_mock):
arglist = []
verifylist = [
('all_projects', False),
@@ -332,12 +343,13 @@ class TestServerGroupList(TestServerGroup):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.list.assert_called_once_with()
+ self.sdk_client.server_groups.assert_called_once_with()
self.assertCountEqual(self.list_columns, columns)
self.assertCountEqual(self.list_data, tuple(data))
- def test_server_group_list_with_all_projects_and_long(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=False)
+ def test_server_group_list_with_all_projects_and_long(self, sm_mock):
arglist = [
'--all-projects',
'--long',
@@ -350,13 +362,14 @@ class TestServerGroupList(TestServerGroup):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.list.assert_called_once_with(
+ self.sdk_client.server_groups.assert_called_once_with(
all_projects=True)
self.assertCountEqual(self.list_columns_long, columns)
self.assertCountEqual(self.list_data_long, tuple(data))
- def test_server_group_list_with_limit(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_list_with_limit(self, sm_mock):
arglist = [
'--limit', '1',
]
@@ -370,9 +383,10 @@ class TestServerGroupList(TestServerGroup):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
- self.server_groups_mock.list.assert_called_once_with(limit=1)
+ self.sdk_client.server_groups.assert_called_once_with(limit=1)
- def test_server_group_list_with_offset(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_list_with_offset(self, sm_mock):
arglist = [
'--offset', '5',
]
@@ -386,51 +400,10 @@ class TestServerGroupList(TestServerGroup):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
- self.server_groups_mock.list.assert_called_once_with(offset=5)
-
-
-class TestServerGroupListV264(TestServerGroupV264):
-
- list_columns = (
- 'ID',
- 'Name',
- 'Policy',
- )
+ self.sdk_client.server_groups.assert_called_once_with(offset=5)
- list_columns_long = (
- 'ID',
- 'Name',
- 'Policy',
- 'Members',
- 'Project Id',
- 'User Id',
- )
-
- list_data = ((
- TestServerGroupV264.fake_server_group.id,
- TestServerGroupV264.fake_server_group.name,
- TestServerGroupV264.fake_server_group.policy,
- ),)
-
- list_data_long = ((
- TestServerGroupV264.fake_server_group.id,
- TestServerGroupV264.fake_server_group.name,
- TestServerGroupV264.fake_server_group.policy,
- format_columns.ListColumn(
- TestServerGroupV264.fake_server_group.members),
- TestServerGroupV264.fake_server_group.project_id,
- TestServerGroupV264.fake_server_group.user_id,
- ),)
-
- def setUp(self):
- super(TestServerGroupListV264, self).setUp()
-
- self.server_groups_mock.list.return_value = [self.fake_server_group]
- self.cmd = server_group.ListServerGroup(self.app, None)
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.64')
-
- def test_server_group_list(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_list_v264(self, sm_mock):
arglist = []
verifylist = [
('all_projects', False),
@@ -438,12 +411,13 @@ class TestServerGroupListV264(TestServerGroupV264):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.list.assert_called_once_with()
+ self.sdk_client.server_groups.assert_called_once_with()
- self.assertCountEqual(self.list_columns, columns)
- self.assertCountEqual(self.list_data, tuple(data))
+ self.assertCountEqual(self.list_columns_v264, columns)
+ self.assertCountEqual(self.list_data_v264, tuple(data))
- def test_server_group_list_with_all_projects_and_long(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_list_with_all_projects_and_long_v264(self, sm_mock):
arglist = [
'--all-projects',
'--long',
@@ -454,22 +428,23 @@ class TestServerGroupListV264(TestServerGroupV264):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.server_groups_mock.list.assert_called_once_with(
+ self.sdk_client.server_groups.assert_called_once_with(
all_projects=True)
- self.assertCountEqual(self.list_columns_long, columns)
- self.assertCountEqual(self.list_data_long, tuple(data))
+ self.assertCountEqual(self.list_columns_v264_long, columns)
+ self.assertCountEqual(self.list_data_v264_long, tuple(data))
class TestServerGroupShow(TestServerGroup):
def setUp(self):
- super(TestServerGroupShow, self).setUp()
+ super().setUp()
- self.server_groups_mock.get.return_value = self.fake_server_group
+ self.sdk_client.find_server_group.return_value = self.fake_server_group
self.cmd = server_group.ShowServerGroup(self.app, None)
- def test_server_group_show(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion', return_value=True)
+ def test_server_group_show(self, sm_mock):
arglist = [
'affinity_group',
]
diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py
index 00e0c129..086c2466 100644
--- a/openstackclient/tests/unit/fakes.py
+++ b/openstackclient/tests/unit/fakes.py
@@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-#
import json
import sys
@@ -49,21 +48,6 @@ TEST_RESPONSE_DICT_V3.set_project_scope()
TEST_VERSIONS = fixture.DiscoveryList(href=AUTH_URL)
-def to_unicode_dict(catalog_dict):
- """Converts dict to unicode dict
-
- """
- if isinstance(catalog_dict, dict):
- return {to_unicode_dict(key): to_unicode_dict(value)
- for key, value in catalog_dict.items()}
- elif isinstance(catalog_dict, list):
- return [to_unicode_dict(element) for element in catalog_dict]
- elif isinstance(catalog_dict, str):
- return catalog_dict + u""
- else:
- return catalog_dict
-
-
class FakeStdout(object):
def __init__(self):
@@ -142,18 +126,30 @@ class FakeClientManager(object):
self.network_endpoint_enabled = True
self.compute_endpoint_enabled = True
self.volume_endpoint_enabled = True
+ # The source of configuration. This is either 'cloud_config' (a
+ # clouds.yaml file) or 'global_env' ('OS_'-prefixed envvars)
+ self.configuration_type = 'cloud_config'
def get_configuration(self):
- return {
- 'auth': {
- 'username': USERNAME,
- 'password': PASSWORD,
- 'token': AUTH_TOKEN,
- },
+
+ config = {
'region': REGION_NAME,
'identity_api_version': VERSION,
}
+ if self.configuration_type == 'cloud_config':
+ config['auth'] = {
+ 'username': USERNAME,
+ 'password': PASSWORD,
+ 'token': AUTH_TOKEN,
+ }
+ elif self.configuration_type == 'global_env':
+ config['username'] = USERNAME
+ config['password'] = PASSWORD
+ config['token'] = AUTH_TOKEN
+
+ return config
+
def is_network_endpoint_enabled(self):
return self.network_endpoint_enabled
diff --git a/openstackclient/tests/unit/identity/v3/test_identity_provider.py b/openstackclient/tests/unit/identity/v3/test_identity_provider.py
index 1a9a7991..480bae59 100644
--- a/openstackclient/tests/unit/identity/v3/test_identity_provider.py
+++ b/openstackclient/tests/unit/identity/v3/test_identity_provider.py
@@ -15,9 +15,12 @@
import copy
from unittest import mock
+from osc_lib import exceptions
+
from openstackclient.identity.v3 import identity_provider
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+from openstackclient.tests.unit import utils as test_utils
class TestIdentityProvider(identity_fakes.TestFederatedIdentity):
@@ -308,6 +311,86 @@ class TestIdentityProviderCreate(TestIdentityProvider):
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.datalist, data)
+ def test_create_identity_provider_authttl_positive(self):
+ arglist = [
+ '--authorization-ttl', '60',
+ identity_fakes.idp_id,
+ ]
+ verifylist = [
+ ('identity_provider_id', identity_fakes.idp_id),
+ ('authorization_ttl', 60),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'remote_ids': None,
+ 'description': None,
+ 'domain_id': None,
+ 'enabled': True,
+ 'authorization_ttl': 60,
+ }
+
+ self.identity_providers_mock.create.assert_called_with(
+ id=identity_fakes.idp_id,
+ **kwargs
+ )
+
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.datalist, data)
+
+ def test_create_identity_provider_authttl_zero(self):
+ arglist = [
+ '--authorization-ttl', '0',
+ identity_fakes.idp_id,
+ ]
+ verifylist = [
+ ('identity_provider_id', identity_fakes.idp_id),
+ ('authorization_ttl', 0),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'remote_ids': None,
+ 'description': None,
+ 'domain_id': None,
+ 'enabled': True,
+ 'authorization_ttl': 0,
+ }
+
+ self.identity_providers_mock.create.assert_called_with(
+ id=identity_fakes.idp_id,
+ **kwargs
+ )
+
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.datalist, data)
+
+ def test_create_identity_provider_authttl_negative(self):
+ arglist = [
+ '--authorization-ttl', '-60',
+ identity_fakes.idp_id,
+ ]
+ verifylist = [
+ ('identity_provider_id', identity_fakes.idp_id),
+ ('authorization_ttl', -60),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_create_identity_provider_authttl_not_int(self):
+ arglist = [
+ '--authorization-ttl', 'spam',
+ identity_fakes.idp_id,
+ ]
+ verifylist = []
+ self.assertRaises(test_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
class TestIdentityProviderDelete(TestIdentityProvider):
@@ -678,6 +761,93 @@ class TestIdentityProviderSet(TestIdentityProvider):
self.cmd.take_action(parsed_args)
+ def test_identity_provider_set_authttl_positive(self):
+ def prepare(self):
+ """Prepare fake return objects before the test is executed"""
+ updated_idp = copy.deepcopy(identity_fakes.IDENTITY_PROVIDER)
+ updated_idp['authorization_ttl'] = 60
+ resources = fakes.FakeResource(
+ None,
+ updated_idp,
+ loaded=True
+ )
+ self.identity_providers_mock.update.return_value = resources
+
+ prepare(self)
+ arglist = [
+ '--authorization-ttl', '60',
+ identity_fakes.idp_id
+ ]
+ verifylist = [
+ ('identity_provider', identity_fakes.idp_id),
+ ('enable', False),
+ ('disable', False),
+ ('remote_id', None),
+ ('authorization_ttl', 60),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ self.identity_providers_mock.update.assert_called_with(
+ identity_fakes.idp_id,
+ authorization_ttl=60,
+ )
+
+ def test_identity_provider_set_authttl_zero(self):
+ def prepare(self):
+ """Prepare fake return objects before the test is executed"""
+ updated_idp = copy.deepcopy(identity_fakes.IDENTITY_PROVIDER)
+ updated_idp['authorization_ttl'] = 0
+ resources = fakes.FakeResource(
+ None,
+ updated_idp,
+ loaded=True
+ )
+ self.identity_providers_mock.update.return_value = resources
+
+ prepare(self)
+ arglist = [
+ '--authorization-ttl', '0',
+ identity_fakes.idp_id
+ ]
+ verifylist = [
+ ('identity_provider', identity_fakes.idp_id),
+ ('enable', False),
+ ('disable', False),
+ ('remote_id', None),
+ ('authorization_ttl', 0),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ self.identity_providers_mock.update.assert_called_with(
+ identity_fakes.idp_id,
+ authorization_ttl=0,
+ )
+
+ def test_identity_provider_set_authttl_negative(self):
+ arglist = [
+ '--authorization-ttl', '-1',
+ identity_fakes.idp_id
+ ]
+ verifylist = [
+ ('identity_provider', identity_fakes.idp_id),
+ ('enable', False),
+ ('disable', False),
+ ('remote_id', None),
+ ('authorization_ttl', -1),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_identity_provider_set_authttl_not_int(self):
+ arglist = [
+ '--authorization-ttl', 'spam',
+ identity_fakes.idp_id
+ ]
+ verifylist = []
+ self.assertRaises(test_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
class TestIdentityProviderShow(TestIdentityProvider):
diff --git a/openstackclient/tests/unit/identity/v3/test_trust.py b/openstackclient/tests/unit/identity/v3/test_trust.py
index d8cfc59f..d530adf5 100644
--- a/openstackclient/tests/unit/identity/v3/test_trust.py
+++ b/openstackclient/tests/unit/identity/v3/test_trust.py
@@ -206,7 +206,113 @@ class TestTrustList(TestTrust):
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
- self.trusts_mock.list.assert_called_with()
+ self.trusts_mock.list.assert_called_with(
+ trustor_user=None,
+ trustee_user=None,
+ )
+
+ collist = ('ID', 'Expires At', 'Impersonation', 'Project ID',
+ 'Trustee User ID', 'Trustor User ID')
+ self.assertEqual(collist, columns)
+ datalist = ((
+ identity_fakes.trust_id,
+ identity_fakes.trust_expires,
+ identity_fakes.trust_impersonation,
+ identity_fakes.project_id,
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ ), )
+ self.assertEqual(datalist, tuple(data))
+
+ def test_trust_list_auth_user(self):
+ auth_ref = self.app.client_manager.auth_ref = mock.Mock()
+ auth_ref.user_id.return_value = identity_fakes.user_id
+
+ arglist = ['--auth-user']
+ verifylist = [
+ ('trustor', None),
+ ('trustee', None),
+ ('authuser', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.trusts_mock.list.assert_any_call(
+ trustor_user=self.users_mock.get()
+ )
+ self.trusts_mock.list.assert_any_call(
+ trustee_user=self.users_mock.get()
+ )
+
+ collist = ('ID', 'Expires At', 'Impersonation', 'Project ID',
+ 'Trustee User ID', 'Trustor User ID')
+ self.assertEqual(collist, columns)
+ datalist = ((
+ identity_fakes.trust_id,
+ identity_fakes.trust_expires,
+ identity_fakes.trust_impersonation,
+ identity_fakes.project_id,
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ ), )
+ self.assertEqual(datalist, tuple(data))
+
+ def test_trust_list_trustee(self):
+ arglist = ['--trustee', identity_fakes.user_name]
+ verifylist = [
+ ('trustor', None),
+ ('trustee', identity_fakes.user_name),
+ ('authuser', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ print(self.trusts_mock.list.call_args_list)
+ self.trusts_mock.list.assert_any_call(
+ trustee_user=self.users_mock.get(),
+ trustor_user=None,
+ )
+
+ collist = ('ID', 'Expires At', 'Impersonation', 'Project ID',
+ 'Trustee User ID', 'Trustor User ID')
+ self.assertEqual(collist, columns)
+ datalist = ((
+ identity_fakes.trust_id,
+ identity_fakes.trust_expires,
+ identity_fakes.trust_impersonation,
+ identity_fakes.project_id,
+ identity_fakes.user_id,
+ identity_fakes.user_id
+ ), )
+ self.assertEqual(datalist, tuple(data))
+
+ def test_trust_list_trustor(self):
+ arglist = ['--trustor', identity_fakes.user_name]
+ verifylist = [
+ ('trustee', None),
+ ('trustor', identity_fakes.user_name),
+ ('authuser', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ print(self.trusts_mock.list.call_args_list)
+ self.trusts_mock.list.assert_any_call(
+ trustor_user=self.users_mock.get(),
+ trustee_user=None,
+ )
collist = ('ID', 'Expires At', 'Impersonation', 'Project ID',
'Trustee User ID', 'Trustor User ID')
diff --git a/openstackclient/tests/unit/image/v2/fakes.py b/openstackclient/tests/unit/image/v2/fakes.py
index a0eda6d2..f2015450 100644
--- a/openstackclient/tests/unit/image/v2/fakes.py
+++ b/openstackclient/tests/unit/image/v2/fakes.py
@@ -18,6 +18,7 @@ import uuid
from openstack.image.v2 import image
from openstack.image.v2 import member
+from openstack.image.v2 import task
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
@@ -44,10 +45,16 @@ class FakeImagev2Client:
self.remove_tag = mock.Mock()
+ self.tasks = mock.Mock()
+ self.get_task = mock.Mock()
+
self.auth_token = kwargs['token']
self.management_url = kwargs['endpoint']
self.version = 2.0
+ self.tasks = mock.Mock()
+ self.tasks.resource_class = fakes.FakeResource(None, {})
+
class TestImagev2(utils.TestCommand):
@@ -129,3 +136,69 @@ def create_one_image_member(attrs=None):
image_member_info.update(attrs)
return member.Member(**image_member_info)
+
+
+def create_one_task(attrs=None):
+ """Create a fake task.
+
+ :param attrs: A dictionary with all attributes of task
+ :type attrs: dict
+ :return: A fake Task object.
+ :rtype: `openstack.image.v2.task.Task`
+ """
+ attrs = attrs or {}
+
+ # Set default attribute
+ task_info = {
+ 'created_at': '2016-06-29T16:13:07Z',
+ 'expires_at': '2016-07-01T16:13:07Z',
+ 'id': str(uuid.uuid4()),
+ 'input': {
+ 'image_properties': {
+ 'container_format': 'ovf',
+ 'disk_format': 'vhd'
+ },
+ 'import_from': 'https://apps.openstack.org/excellent-image',
+ 'import_from_format': 'qcow2'
+ },
+ 'message': '',
+ 'owner': str(uuid.uuid4()),
+ 'result': {
+ 'image_id': str(uuid.uuid4()),
+ },
+ 'schema': '/v2/schemas/task',
+ 'status': random.choice(
+ [
+ 'pending',
+ 'processing',
+ 'success',
+ 'failure',
+ ]
+ ),
+ # though not documented, the API only allows 'import'
+ # https://github.com/openstack/glance/blob/24.0.0/glance/api/v2/tasks.py#L186-L190
+ 'type': 'import',
+ 'updated_at': '2016-06-29T16:13:07Z',
+ }
+
+ # Overwrite default attributes if there are some attributes set
+ task_info.update(attrs)
+
+ return task.Task(**task_info)
+
+
+def create_tasks(attrs=None, count=2):
+ """Create multiple fake tasks.
+
+ :param attrs: A dictionary with all attributes of Task
+ :type attrs: dict
+ :param count: The number of tasks to be faked
+ :type count: int
+ :return: A list of fake Task objects
+ :rtype: list
+ """
+ tasks = []
+ for n in range(0, count):
+ tasks.append(create_one_task(attrs))
+
+ return tasks
diff --git a/openstackclient/tests/unit/image/v2/test_image.py b/openstackclient/tests/unit/image/v2/test_image.py
index 02f7b411..f2c11364 100644
--- a/openstackclient/tests/unit/image/v2/test_image.py
+++ b/openstackclient/tests/unit/image/v2/test_image.py
@@ -1090,7 +1090,7 @@ class TestImageSet(TestImage):
self.assertIsNone(result)
# we'll have called this but not set anything
- self.app.client_manager.image.update_image.called_once_with(
+ self.app.client_manager.image.update_image.assert_called_once_with(
self._image.id,
)
diff --git a/openstackclient/tests/unit/image/v2/test_task.py b/openstackclient/tests/unit/image/v2/test_task.py
new file mode 100644
index 00000000..e077e2b1
--- /dev/null
+++ b/openstackclient/tests/unit/image/v2/test_task.py
@@ -0,0 +1,187 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from osc_lib.cli import format_columns
+
+from openstackclient.image.v2 import task
+from openstackclient.tests.unit.image.v2 import fakes as image_fakes
+
+
+class TestTask(image_fakes.TestImagev2):
+ def setUp(self):
+ super().setUp()
+
+ # Get shortcuts to mocked image client
+ self.client = self.app.client_manager.image
+
+
+class TestTaskShow(TestTask):
+
+ task = image_fakes.create_one_task()
+
+ columns = (
+ 'created_at',
+ 'expires_at',
+ 'id',
+ 'input',
+ 'message',
+ 'owner_id',
+ 'properties',
+ 'result',
+ 'status',
+ 'type',
+ 'updated_at',
+ )
+ data = (
+ task.created_at,
+ task.expires_at,
+ task.id,
+ task.input,
+ task.message,
+ task.owner_id,
+ format_columns.DictColumn({}),
+ task.result,
+ task.status,
+ task.type,
+ task.updated_at,
+ )
+
+ def setUp(self):
+ super().setUp()
+
+ self.client.get_task.return_value = self.task
+
+ # Get the command object to test
+ self.cmd = task.ShowTask(self.app, None)
+
+ def test_task_show(self):
+ arglist = [self.task.id]
+ verifylist = [
+ ('task', self.task.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class ShowOne in cliff, abstract method take_action()
+ # returns a two-part tuple with a tuple of column names and a tuple of
+ # data to be shown.
+ columns, data = self.cmd.take_action(parsed_args)
+ self.client.get_task.assert_called_with(self.task.id)
+
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, data)
+
+
+class TestTaskList(TestTask):
+
+ tasks = image_fakes.create_tasks()
+
+ columns = (
+ 'ID',
+ 'Type',
+ 'Status',
+ 'Owner',
+ )
+ datalist = [
+ (
+ task.id,
+ task.type,
+ task.status,
+ task.owner_id,
+ )
+ for task in tasks
+ ]
+
+ def setUp(self):
+ super().setUp()
+
+ self.client.tasks.side_effect = [self.tasks, []]
+
+ # Get the command object to test
+ self.cmd = task.ListTask(self.app, None)
+
+ def test_task_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('sort_key', None),
+ ('sort_dir', None),
+ ('limit', None),
+ ('marker', None),
+ ('type', None),
+ ('status', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.client.tasks.assert_called_with()
+
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.datalist, data)
+
+ def test_task_list_sort_key_option(self):
+ arglist = ['--sort-key', 'created_at']
+ verifylist = [('sort_key', 'created_at')]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.client.tasks.assert_called_with(
+ sort_key=parsed_args.sort_key,
+ )
+
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.datalist, data)
+
+ def test_task_list_sort_dir_option(self):
+ arglist = ['--sort-dir', 'desc']
+ verifylist = [('sort_dir', 'desc')]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.client.tasks.assert_called_with(
+ sort_dir=parsed_args.sort_dir,
+ )
+
+ def test_task_list_pagination_options(self):
+ arglist = ['--limit', '1', '--marker', self.tasks[0].id]
+ verifylist = [('limit', 1), ('marker', self.tasks[0].id)]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.client.tasks.assert_called_with(
+ limit=parsed_args.limit,
+ marker=parsed_args.marker,
+ )
+
+ def test_task_list_type_option(self):
+ arglist = ['--type', self.tasks[0].type]
+ verifylist = [('type', self.tasks[0].type)]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.client.tasks.assert_called_with(
+ type=self.tasks[0].type,
+ )
+
+ def test_task_list_status_option(self):
+ arglist = ['--status', self.tasks[0].status]
+ verifylist = [('status', self.tasks[0].status)]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ self.client.tasks.assert_called_with(
+ status=self.tasks[0].status,
+ )
diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py
index 1a1a6beb..4d029a0e 100644
--- a/openstackclient/tests/unit/network/v2/fakes.py
+++ b/openstackclient/tests/unit/network/v2/fakes.py
@@ -26,6 +26,7 @@ from openstack.network.v2 import availability_zone as _availability_zone
from openstack.network.v2 import flavor as _flavor
from openstack.network.v2 import local_ip as _local_ip
from openstack.network.v2 import local_ip_association as _local_ip_association
+from openstack.network.v2 import ndp_proxy as _ndp_proxy
from openstack.network.v2 import network as _network
from openstack.network.v2 import network_ip_availability as _ip_availability
from openstack.network.v2 import network_segment_range as _segment_range
@@ -58,9 +59,11 @@ QUOTA = {
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
RULE_TYPE_DSCP_MARKING = 'dscp-marking'
RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+RULE_TYPE_MINIMUM_PACKET_RATE = 'minimum-packet-rate'
VALID_QOS_RULES = [RULE_TYPE_BANDWIDTH_LIMIT,
RULE_TYPE_DSCP_MARKING,
- RULE_TYPE_MINIMUM_BANDWIDTH]
+ RULE_TYPE_MINIMUM_BANDWIDTH,
+ RULE_TYPE_MINIMUM_PACKET_RATE]
VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 46, 48, 56]
@@ -274,6 +277,9 @@ class FakeNetworkQosRule(object):
elif type == RULE_TYPE_MINIMUM_BANDWIDTH:
qos_rule_attrs['min_kbps'] = randint(1, 10000)
qos_rule_attrs['direction'] = 'egress'
+ elif type == RULE_TYPE_MINIMUM_PACKET_RATE:
+ qos_rule_attrs['min_kpps'] = randint(1, 10000)
+ qos_rule_attrs['direction'] = 'egress'
# Overwrite default attributes.
qos_rule_attrs.update(attrs)
@@ -2074,3 +2080,75 @@ def get_local_ip_associations(local_ip_associations=None, count=2):
local_ip_associations = create_local_ip_associations(count)
return mock.Mock(side_effect=local_ip_associations)
+
+
+def create_one_ndp_proxy(attrs=None):
+ """Create a fake NDP proxy.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with router_id, port_id, etc.
+ """
+ attrs = attrs or {}
+ router_id = (
+ attrs.get('router_id') or 'router-id-' + uuid.uuid4().hex
+ )
+ port_id = (
+ attrs.get('port_id') or 'port-id-' + uuid.uuid4().hex
+ )
+ # Set default attributes.
+ np_attrs = {
+ 'id': uuid.uuid4().hex,
+ 'name': 'ndp-proxy-name-' + uuid.uuid4().hex,
+ 'router_id': router_id,
+ 'port_id': port_id,
+ 'ip_address': '2001::1:2',
+ 'description': 'ndp-proxy-description-' + uuid.uuid4().hex,
+ 'project_id': 'project-id-' + uuid.uuid4().hex,
+ 'location': 'MUNCHMUNCHMUNCH',
+ }
+
+ # Overwrite default attributes.
+ np_attrs.update(attrs)
+
+ return _ndp_proxy.NDPProxy(**np_attrs)
+
+
+def create_ndp_proxies(attrs=None, count=2):
+ """Create multiple fake NDP proxies.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of NDP proxxy to fake
+ :return:
+ A list of FakeResource objects faking the NDP proxies
+ """
+ ndp_proxies = []
+ for i in range(0, count):
+ ndp_proxies.append(
+ create_one_ndp_proxy(attrs)
+ )
+ return ndp_proxies
+
+
+def get_ndp_proxies(ndp_proxies=None, count=2):
+ """Get a list of faked NDP proxies.
+
+ If ndp_proxy list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List ndp_proxies:
+ A list of FakeResource objects faking ndp proxy
+ :param int count:
+ The number of ndp proxy to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ ndp proxy
+ """
+ if ndp_proxies is None:
+ ndp_proxies = (
+ create_ndp_proxies(count)
+ )
+ return mock.Mock(side_effect=ndp_proxies)
diff --git a/openstackclient/tests/unit/network/v2/test_local_ip.py b/openstackclient/tests/unit/network/v2/test_local_ip.py
index 97e246df..be23365e 100644
--- a/openstackclient/tests/unit/network/v2/test_local_ip.py
+++ b/openstackclient/tests/unit/network/v2/test_local_ip.py
@@ -96,7 +96,7 @@ class TestCreateLocalIP(TestLocalIP):
self.network.create_local_ip.assert_called_once_with(**{})
self.assertEqual(set(self.columns), set(columns))
- self.assertItemsEqual(self.data, data)
+ self.assertCountEqual(self.data, data)
def test_create_all_options(self):
arglist = [
@@ -130,7 +130,7 @@ class TestCreateLocalIP(TestLocalIP):
'ip_mode': self.new_local_ip.ip_mode,
})
self.assertEqual(set(self.columns), set(columns))
- self.assertItemsEqual(self.data, data)
+ self.assertCountEqual(self.data, data)
class TestDeleteLocalIP(TestLocalIP):
@@ -263,7 +263,7 @@ class TestListLocalIP(TestLocalIP):
self.network.local_ips.assert_called_once_with(**{})
self.assertEqual(self.columns, columns)
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
def test_local_ip_list_name(self):
arglist = [
@@ -278,7 +278,7 @@ class TestListLocalIP(TestLocalIP):
self.network.local_ips.assert_called_once_with(
**{'name': self.local_ips[0].name})
self.assertEqual(self.columns, columns)
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
def test_local_ip_list_project(self):
project = identity_fakes_v3.FakeProject.create_one_project()
@@ -295,7 +295,7 @@ class TestListLocalIP(TestLocalIP):
self.network.local_ips.assert_called_once_with(
**{'project_id': project.id})
self.assertEqual(self.columns, columns)
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
def test_local_ip_project_domain(self):
project = identity_fakes_v3.FakeProject.create_one_project()
@@ -314,7 +314,7 @@ class TestListLocalIP(TestLocalIP):
self.network.local_ips.assert_called_once_with(**filters)
self.assertEqual(self.columns, columns)
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
def test_local_ip_list_network(self):
arglist = [
@@ -477,4 +477,4 @@ class TestShowLocalIP(TestLocalIP):
self.network.find_local_ip.assert_called_once_with(
self._local_ip.name, ignore_missing=False)
self.assertEqual(set(self.columns), set(columns))
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_ndp_proxy.py b/openstackclient/tests/unit/network/v2/test_ndp_proxy.py
new file mode 100644
index 00000000..48c5deb2
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_ndp_proxy.py
@@ -0,0 +1,454 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import mock
+from unittest.mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import ndp_proxy
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestNDPProxy(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNDPProxy, self).setUp()
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ self.router = network_fakes.FakeRouter.create_one_router(
+ {'id': 'fake-router-id'})
+ self.network.find_router = mock.Mock(return_value=self.router)
+ self.port = network_fakes.create_one_port()
+ self.network.find_port = mock.Mock(return_value=self.port)
+
+
+class TestCreateNDPProxy(TestNDPProxy):
+ def setUp(self):
+ super(TestCreateNDPProxy, self).setUp()
+ attrs = {'router_id': self.router.id, 'port_id': self.port.id}
+ self.ndp_proxy = (
+ network_fakes.create_one_ndp_proxy(
+ attrs))
+ self.columns = (
+ 'created_at',
+ 'description',
+ 'id',
+ 'ip_address',
+ 'name',
+ 'port_id',
+ 'project_id',
+ 'revision_number',
+ 'router_id',
+ 'updated_at')
+
+ self.data = (
+ self.ndp_proxy.created_at,
+ self.ndp_proxy.description,
+ self.ndp_proxy.id,
+ self.ndp_proxy.ip_address,
+ self.ndp_proxy.name,
+ self.ndp_proxy.port_id,
+ self.ndp_proxy.project_id,
+ self.ndp_proxy.revision_number,
+ self.ndp_proxy.router_id,
+ self.ndp_proxy.updated_at
+ )
+ self.network.create_ndp_proxy = mock.Mock(
+ return_value=self.ndp_proxy)
+
+ # Get the command object to test
+ self.cmd = ndp_proxy.CreateNDPProxy(self.app, self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_all_options(self):
+ arglist = [
+ self.ndp_proxy.router_id,
+ '--name', self.ndp_proxy.name,
+ '--port', self.ndp_proxy.port_id,
+ '--ip-address', self.ndp_proxy.ip_address,
+ '--description', self.ndp_proxy.description,
+ ]
+ verifylist = [
+ ('name', self.ndp_proxy.name),
+ ('router', self.ndp_proxy.router_id),
+ ('port', self.ndp_proxy.port_id),
+ ('ip_address', self.ndp_proxy.ip_address),
+ ('description', self.ndp_proxy.description),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_ndp_proxy.assert_called_once_with(
+ **{'name': self.ndp_proxy.name,
+ 'router_id': self.ndp_proxy.router_id,
+ 'ip_address': self.ndp_proxy.ip_address,
+ 'port_id': self.ndp_proxy.port_id,
+ 'description': self.ndp_proxy.description})
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteNDPProxy(TestNDPProxy):
+
+ def setUp(self):
+ super(TestDeleteNDPProxy, self).setUp()
+ attrs = {'router_id': self.router.id, 'port_id': self.port.id}
+ self.ndp_proxies = (
+ network_fakes.create_ndp_proxies(attrs))
+ self.ndp_proxy = self.ndp_proxies[0]
+ self.network.delete_ndp_proxy = mock.Mock(
+ return_value=None)
+ self.network.find_ndp_proxy = mock.Mock(
+ return_value=self.ndp_proxy)
+
+ # Get the command object to test
+ self.cmd = ndp_proxy.DeleteNDPProxy(self.app, self.namespace)
+
+ def test_delete(self):
+ arglist = [
+ self.ndp_proxy.id
+ ]
+ verifylist = [
+ ('ndp_proxy', [self.ndp_proxy.id])
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.delete_ndp_proxy.assert_called_once_with(self.ndp_proxy)
+ self.assertIsNone(result)
+
+ def test_delete_error(self):
+ arglist = [
+ self.ndp_proxy.id,
+ ]
+ verifylist = [
+ ('ndp_proxy', [self.ndp_proxy.id])
+ ]
+ self.network.delete_ndp_proxy.side_effect = Exception(
+ 'Error message')
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action, parsed_args)
+
+ def test_multi_ndp_proxies_delete(self):
+ arglist = []
+ np_id = []
+
+ for a in self.ndp_proxies:
+ arglist.append(a.id)
+ np_id.append(a.id)
+
+ verifylist = [
+ ('ndp_proxy', np_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.delete_ndp_proxy.assert_has_calls(
+ [call(self.ndp_proxy), call(self.ndp_proxy)])
+ self.assertIsNone(result)
+
+
+class TestListNDPProxy(TestNDPProxy):
+
+ def setUp(self):
+ super(TestListNDPProxy, self).setUp()
+ attrs = {'router_id': self.router.id, 'port_id': self.port.id}
+ ndp_proxies = (
+ network_fakes.create_ndp_proxies(attrs, count=3))
+ self.columns = (
+ 'ID',
+ 'Name',
+ 'Router ID',
+ 'IP Address',
+ 'Project',
+ )
+ self.data = []
+ for np in ndp_proxies:
+ self.data.append((
+ np.id,
+ np.name,
+ np.router_id,
+ np.ip_address,
+ np.project_id,
+ ))
+
+ self.network.ndp_proxies = mock.Mock(
+ return_value=ndp_proxies)
+
+ # Get the command object to test
+ self.cmd = ndp_proxy.ListNDPProxy(self.app, self.namespace)
+
+ def test_ndp_proxy_list(self):
+ arglist = []
+ verifylist = []
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ndp_proxies.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ list_data = list(data)
+ self.assertEqual(len(self.data), len(list_data))
+ for index in range(len(list_data)):
+ self.assertEqual(self.data[index], list_data[index])
+
+ def test_ndp_proxy_list_router(self):
+ arglist = [
+ '--router', 'fake-router-name',
+ ]
+
+ verifylist = [
+ ('router', 'fake-router-name')
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ndp_proxies.assert_called_once_with(**{
+ 'router_id': 'fake-router-id'})
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, list(data))
+
+ def test_ndp_proxy_list_port(self):
+ arglist = [
+ '--port', self.port.id,
+ ]
+
+ verifylist = [
+ ('port', self.port.id)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ndp_proxies.assert_called_once_with(**{
+ 'port_id': self.port.id})
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, list(data))
+
+ def test_ndp_proxy_list_name(self):
+ arglist = [
+ '--name', 'fake-ndp-proxy-name',
+ ]
+
+ verifylist = [
+ ('name', 'fake-ndp-proxy-name')
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ndp_proxies.assert_called_once_with(**{
+ 'name': 'fake-ndp-proxy-name'})
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, list(data))
+
+ def test_ndp_proxy_list_ip_address(self):
+ arglist = [
+ '--ip-address', '2001::1:2',
+ ]
+
+ verifylist = [
+ ('ip_address', '2001::1:2')
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ndp_proxies.assert_called_once_with(**{
+ 'ip_address': '2001::1:2'})
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, list(data))
+
+ def test_ndp_proxy_list_project(self):
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ self.projects_mock.get.return_value = project
+ arglist = [
+ '--project', project.id,
+ ]
+ verifylist = [
+ ('project', project.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ndp_proxies.assert_called_once_with(
+ **{'project_id': project.id})
+ self.assertEqual(self.columns, columns)
+ self.assertItemsEqual(self.data, list(data))
+
+ def test_ndp_proxy_list_project_domain(self):
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ self.projects_mock.get.return_value = project
+ arglist = [
+ '--project', project.id,
+ '--project-domain', project.domain_id,
+ ]
+ verifylist = [
+ ('project', project.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'project_id': project.id}
+
+ self.network.ndp_proxies.assert_called_once_with(**filters)
+ self.assertEqual(self.columns, columns)
+ self.assertItemsEqual(self.data, list(data))
+
+
+class TestSetNDPProxy(TestNDPProxy):
+
+ def setUp(self):
+ super(TestSetNDPProxy, self).setUp()
+ attrs = {'router_id': self.router.id, 'port_id': self.port.id}
+ self.ndp_proxy = (
+ network_fakes.create_one_ndp_proxy(attrs))
+ self.network.update_ndp_proxy = mock.Mock(return_value=None)
+ self.network.find_ndp_proxy = mock.Mock(
+ return_value=self.ndp_proxy)
+
+ # Get the command object to test
+ self.cmd = ndp_proxy.SetNDPProxy(self.app, self.namespace)
+
+ def test_set_nothing(self):
+ arglist = [
+ self.ndp_proxy.id,
+ ]
+ verifylist = [
+ ('ndp_proxy', self.ndp_proxy.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = (self.cmd.take_action(parsed_args))
+
+ self.network.update_ndp_proxy.assert_called_once_with(
+ self.ndp_proxy)
+ self.assertIsNone(result)
+
+ def test_set_name(self):
+ arglist = [
+ self.ndp_proxy.id,
+ '--name', 'fake-name',
+ ]
+ verifylist = [
+ ('ndp_proxy', self.ndp_proxy.id),
+ ('name', 'fake-name'),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = (self.cmd.take_action(parsed_args))
+
+ self.network.update_ndp_proxy.assert_called_once_with(
+ self.ndp_proxy, name='fake-name')
+ self.assertIsNone(result)
+
+ def test_set_description(self):
+ arglist = [
+ self.ndp_proxy.id,
+ '--description', 'balala',
+ ]
+ verifylist = [
+ ('ndp_proxy', self.ndp_proxy.id),
+ ('description', 'balala'),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = (self.cmd.take_action(parsed_args))
+
+ self.network.update_ndp_proxy.assert_called_once_with(
+ self.ndp_proxy, description='balala')
+ self.assertIsNone(result)
+
+
+class TestShowNDPProxy(TestNDPProxy):
+
+ def setUp(self):
+ super(TestShowNDPProxy, self).setUp()
+ attrs = {'router_id': self.router.id, 'port_id': self.port.id}
+ self.ndp_proxy = (
+ network_fakes.create_one_ndp_proxy(attrs))
+
+ self.columns = (
+ 'created_at',
+ 'description',
+ 'id',
+ 'ip_address',
+ 'name',
+ 'port_id',
+ 'project_id',
+ 'revision_number',
+ 'router_id',
+ 'updated_at')
+
+ self.data = (
+ self.ndp_proxy.created_at,
+ self.ndp_proxy.description,
+ self.ndp_proxy.id,
+ self.ndp_proxy.ip_address,
+ self.ndp_proxy.name,
+ self.ndp_proxy.port_id,
+ self.ndp_proxy.project_id,
+ self.ndp_proxy.revision_number,
+ self.ndp_proxy.router_id,
+ self.ndp_proxy.updated_at
+ )
+ self.network.get_ndp_proxy = mock.Mock(return_value=self.ndp_proxy)
+ self.network.find_ndp_proxy = mock.Mock(return_value=self.ndp_proxy)
+
+ # Get the command object to test
+ self.cmd = ndp_proxy.ShowNDPProxy(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_default_options(self):
+ arglist = [
+ self.ndp_proxy.id,
+ ]
+ verifylist = [
+ ('ndp_proxy', self.ndp_proxy.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.find_ndp_proxy.assert_called_once_with(
+ self.ndp_proxy.id, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
index 217e481e..c7de8160 100644
--- a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
+++ b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
@@ -25,6 +25,7 @@ from openstackclient.tests.unit import utils as tests_utils
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
RULE_TYPE_DSCP_MARKING = 'dscp-marking'
RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+RULE_TYPE_MINIMUM_PACKET_RATE = 'minimum-packet-rate'
DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 46, 48, 56]
@@ -126,8 +127,101 @@ class TestCreateNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
- msg = ('"Create" rule command for type "minimum-bandwidth" '
- 'requires arguments: direction, min_kbps')
+ msg = ('Failed to create Network QoS rule: "Create" rule command '
+ 'for type "minimum-bandwidth" requires arguments: '
+ 'direction, min_kbps')
+ self.assertEqual(msg, str(e))
+
+
+class TestCreateNetworkQosRuleMinimumPacketRate(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleMinimumPacketRate, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_PACKET_RATE}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'direction',
+ 'id',
+ 'min_kpps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.direction,
+ self.new_rule.id,
+ self.new_rule.min_kpps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_minimum_packet_rate_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_MINIMUM_PACKET_RATE,
+ '--min-kpps', str(self.new_rule.min_kpps),
+ '--egress',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_MINIMUM_PACKET_RATE),
+ ('min_kpps', self.new_rule.min_kpps),
+ ('egress', True),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_qos_minimum_packet_rate_rule.\
+ assert_called_once_with(
+ self.qos_policy.id,
+ **{'min_kpps': self.new_rule.min_kpps,
+ 'direction': self.new_rule.direction})
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_MINIMUM_PACKET_RATE,
+ '--min-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_MINIMUM_PACKET_RATE),
+ ('min_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to create Network QoS rule: "Create" rule command '
+ 'for type "minimum-packet-rate" requires arguments: '
+ 'direction, min_kpps')
self.assertEqual(msg, str(e))
@@ -212,8 +306,8 @@ class TestCreateNetworkQosRuleDSCPMarking(TestNetworkQosRule):
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
- msg = ('"Create" rule command for type "dscp-marking" '
- 'requires arguments: dscp_mark')
+ msg = ('Failed to create Network QoS rule: "Create" rule command '
+ 'for type "dscp-marking" requires arguments: dscp_mark')
self.assertEqual(msg, str(e))
@@ -351,8 +445,8 @@ class TestCreateNetworkQosRuleBandwidtLimit(TestNetworkQosRule):
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
- msg = ('"Create" rule command for type "bandwidth-limit" '
- 'requires arguments: max_kbps')
+ msg = ('Failed to create Network QoS rule: "Create" rule command '
+ 'for type "bandwidth-limit" requires arguments: max_kbps')
self.assertEqual(msg, str(e))
@@ -415,6 +509,65 @@ class TestDeleteNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
self.assertEqual(msg, str(e))
+class TestDeleteNetworkQosRuleMinimumPacketRate(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleMinimumPacketRate, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_PACKET_RATE}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_minimum_packet_rate_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_minimum_packet_rate_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_minimum_packet_rate_rule.\
+ assert_called_once_with(self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_minimum_packet_rate_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
class TestDeleteNetworkQosRuleDSCPMarking(TestNetworkQosRule):
def setUp(self):
@@ -627,6 +780,100 @@ class TestSetNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
self.assertEqual(msg, str(e))
+class TestSetNetworkQosRuleMinimumPacketRate(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleMinimumPacketRate, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_PACKET_RATE}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_minimum_packet_rate_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_minimum_packet_rate_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_minimum_packet_rate_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_min_kpps(self):
+ self._set_min_kpps()
+
+ def test_set_min_kpps_to_zero(self):
+ self._set_min_kpps(min_kpps=0)
+
+ def _set_min_kpps(self, min_kpps=None):
+ if min_kpps:
+ previous_min_kpps = self.new_rule.min_kpps
+ self.new_rule.min_kpps = min_kpps
+
+ arglist = [
+ '--min-kpps', str(self.new_rule.min_kpps),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('min_kpps', self.new_rule.min_kpps),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'min_kpps': self.new_rule.min_kpps,
+ }
+ self.network.update_qos_minimum_packet_rate_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ if min_kpps:
+ self.new_rule.min_kpps = previous_min_kpps
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--min-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('min_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"minimum-packet-rate" only requires arguments: direction, '
+ 'min_kpps' % {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
class TestSetNetworkQosRuleDSCPMarking(TestNetworkQosRule):
def setUp(self):
@@ -893,6 +1140,9 @@ class TestListNetworkQosRule(TestNetworkQosRule):
'type': RULE_TYPE_MINIMUM_BANDWIDTH}
self.new_rule_min_bw = (network_fakes.FakeNetworkQosRule.
create_one_qos_rule(attrs=attrs))
+ attrs['type'] = RULE_TYPE_MINIMUM_PACKET_RATE
+ self.new_rule_min_pps = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
attrs['type'] = RULE_TYPE_DSCP_MARKING
self.new_rule_dscp_mark = (network_fakes.FakeNetworkQosRule.
create_one_qos_rule(attrs=attrs))
@@ -900,10 +1150,13 @@ class TestListNetworkQosRule(TestNetworkQosRule):
self.new_rule_max_bw = (network_fakes.FakeNetworkQosRule.
create_one_qos_rule(attrs=attrs))
self.qos_policy.rules = [self.new_rule_min_bw,
+ self.new_rule_min_pps,
self.new_rule_dscp_mark,
self.new_rule_max_bw]
self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
return_value=self.new_rule_min_bw)
+ self.network.find_qos_minimum_packet_rate_rule = mock.Mock(
+ return_value=self.new_rule_min_pps)
self.network.find_qos_dscp_marking_rule = mock.Mock(
return_value=self.new_rule_dscp_mark)
self.network.find_qos_bandwidth_limit_rule = mock.Mock(
@@ -915,6 +1168,7 @@ class TestListNetworkQosRule(TestNetworkQosRule):
'Max Kbps',
'Max Burst Kbits',
'Min Kbps',
+ 'Min Kpps',
'DSCP mark',
'Direction',
)
@@ -927,6 +1181,7 @@ class TestListNetworkQosRule(TestNetworkQosRule):
getattr(self.qos_policy.rules[index], 'max_kbps', ''),
getattr(self.qos_policy.rules[index], 'max_burst_kbps', ''),
getattr(self.qos_policy.rules[index], 'min_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'min_kpps', ''),
getattr(self.qos_policy.rules[index], 'dscp_mark', ''),
getattr(self.qos_policy.rules[index], 'direction', ''),
))
@@ -1014,6 +1269,66 @@ class TestShowNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
self.assertEqual(list(self.data), list(data))
+class TestShowNetworkQosRuleMinimumPacketRate(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosRuleMinimumPacketRate, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_PACKET_RATE}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'direction',
+ 'id',
+ 'min_kpps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.direction,
+ self.new_rule.id,
+ self.new_rule.min_kpps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_minimum_packet_rate_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_minimum_packet_rate_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
+
+
class TestShowNetworkQosDSCPMarking(TestNetworkQosRule):
def setUp(self):
diff --git a/openstackclient/tests/unit/network/v2/test_network_rbac.py b/openstackclient/tests/unit/network/v2/test_network_rbac.py
index b0bc7e86..7ce25205 100644
--- a/openstackclient/tests/unit/network/v2/test_network_rbac.py
+++ b/openstackclient/tests/unit/network/v2/test_network_rbac.py
@@ -405,6 +405,9 @@ class TestListNetworkRABC(TestNetworkRBAC):
self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies)
+ self.project = identity_fakes_v3.FakeProject.create_one_project()
+ self.projects_mock.get.return_value = self.project
+
def test_network_rbac_list(self):
arglist = []
verifylist = []
@@ -466,6 +469,22 @@ class TestListNetworkRABC(TestNetworkRBAC):
self.assertEqual(self.columns_long, columns)
self.assertEqual(self.data_long, list(data))
+ def test_network_rbac_list_target_project_opt(self):
+ arglist = [
+ '--target-project', self.rbac_policies[0].target_project_id, ]
+ verifylist = [
+ ('target_project', self.rbac_policies[0].target_project_id)]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.rbac_policies.assert_called_with(**{
+ 'target_project_id': self.project.id
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
class TestSetNetworkRBAC(TestNetworkRBAC):
diff --git a/openstackclient/tests/unit/network/v2/test_subnet.py b/openstackclient/tests/unit/network/v2/test_subnet.py
index 6b3ab2cc..7aaa583d 100644
--- a/openstackclient/tests/unit/network/v2/test_subnet.py
+++ b/openstackclient/tests/unit/network/v2/test_subnet.py
@@ -918,7 +918,7 @@ class TestListSubnet(TestSubnet):
self.network.subnets.assert_called_once_with(**filters)
self.assertEqual(self.columns, columns)
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
def test_subnet_list_subnetpool_by_id(self):
subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
@@ -939,7 +939,7 @@ class TestListSubnet(TestSubnet):
self.network.subnets.assert_called_once_with(**filters)
self.assertEqual(self.columns, columns)
- self.assertItemsEqual(self.data, list(data))
+ self.assertCountEqual(self.data, list(data))
def test_list_with_tag_options(self):
arglist = [