diff options
Diffstat (limited to 'openstackclient/tests')
29 files changed, 2330 insertions, 227 deletions
diff --git a/openstackclient/tests/functional/base.py b/openstackclient/tests/functional/base.py index 885abc02..fb78ea62 100644 --- a/openstackclient/tests/functional/base.py +++ b/openstackclient/tests/functional/base.py @@ -77,6 +77,11 @@ class TestCase(testtools.TestCase): if expected not in actual: raise Exception(expected + ' not in ' + actual) + @classmethod + def assertsOutputNotNone(cls, observed): + if observed is None: + raise Exception('No output observed') + def assert_table_structure(self, items, field_names): """Verify that all items have keys listed in field_names.""" for item in items: diff --git a/openstackclient/tests/functional/common/test_quota.py b/openstackclient/tests/functional/common/test_quota.py index fbb8e563..b2b198af 100644 --- a/openstackclient/tests/functional/common/test_quota.py +++ b/openstackclient/tests/functional/common/test_quota.py @@ -35,19 +35,16 @@ class QuotaTests(base.TestCase): raw_output = self.openstack('quota show ' + self.PROJECT_NAME + opts) self.assertEqual("11\n11\n11\n", raw_output) - @testtools.skip('broken SDK testing') def test_quota_show(self): raw_output = self.openstack('quota show ' + self.PROJECT_NAME) for expected_field in self.EXPECTED_FIELDS: self.assertIn(expected_field, raw_output) - @testtools.skip('broken SDK testing') def test_quota_show_default_project(self): raw_output = self.openstack('quota show') for expected_field in self.EXPECTED_FIELDS: self.assertIn(expected_field, raw_output) - @testtools.skip('broken SDK testing') def test_quota_show_with_default_option(self): raw_output = self.openstack('quota show --default') for expected_field in self.EXPECTED_FIELDS: diff --git a/openstackclient/tests/functional/image/v2/test_image.py b/openstackclient/tests/functional/image/v2/test_image.py index 3f432b02..6faff94a 100644 --- a/openstackclient/tests/functional/image/v2/test_image.py +++ b/openstackclient/tests/functional/image/v2/test_image.py @@ -74,3 +74,25 @@ class ImageTests(base.TestCase): self.openstack('image unset --property a --property c ' + self.NAME) raw_output = self.openstack('image show ' + self.NAME + opts) self.assertEqual(self.NAME + "\n\n", raw_output) + + def test_image_members(self): + opts = self.get_opts(['project_id']) + my_project_id = self.openstack('token issue' + opts).strip() + self.openstack( + 'image add project {} {}'.format(self.NAME, my_project_id)) + + self.openstack( + 'image set --accept ' + self.NAME) + shared_img_list = self.parse_listing( + self.openstack('image list --shared', self.get_opts(['name'])) + ) + self.assertIn(self.NAME, [img['Name'] for img in shared_img_list]) + + self.openstack( + 'image set --reject ' + self.NAME) + shared_img_list = self.parse_listing( + self.openstack('image list --shared', self.get_opts(['name'])) + ) + + self.openstack( + 'image remove project {} {}'.format(self.NAME, my_project_id)) diff --git a/openstackclient/tests/functional/network/v2/test_floating_ip.py b/openstackclient/tests/functional/network/v2/test_floating_ip.py index 5f642f04..fa9607a0 100644 --- a/openstackclient/tests/functional/network/v2/test_floating_ip.py +++ b/openstackclient/tests/functional/network/v2/test_floating_ip.py @@ -14,8 +14,6 @@ import random import re import uuid -import testtools - from openstackclient.tests.functional import base @@ -25,7 +23,6 @@ class FloatingIpTests(base.TestCase): NETWORK_NAME = uuid.uuid4().hex @classmethod - @testtools.skip('broken SDK testing') def setUpClass(cls): # Set up some regex for matching below cls.re_id = re.compile("id\s+\|\s+(\S+)") @@ -37,7 +34,7 @@ class FloatingIpTests(base.TestCase): # Make a random subnet cls.subnet = ".".join(map( str, - (random.randint(0, 255) for _ in range(3)) + (random.randint(0, 223) for _ in range(3)) )) + ".0/26" # Create a network for the floating ip @@ -62,7 +59,6 @@ class FloatingIpTests(base.TestCase): raw_output = cls.openstack('network delete ' + cls.NETWORK_NAME) cls.assertOutput('', raw_output) - @testtools.skip('broken SDK testing') def test_floating_ip_delete(self): """Test create, delete multiple""" raw_output = self.openstack( @@ -93,7 +89,6 @@ class FloatingIpTests(base.TestCase): raw_output = self.openstack('floating ip delete ' + ip1 + ' ' + ip2) self.assertOutput('', raw_output) - @testtools.skip('broken SDK testing') def test_floating_ip_list(self): """Test create defaults, list filters, delete""" raw_output = self.openstack( @@ -135,7 +130,6 @@ class FloatingIpTests(base.TestCase): # TODO(dtroyer): add more filter tests - @testtools.skip('broken SDK testing') def test_floating_ip_show(self): """Test show""" raw_output = self.openstack( diff --git a/openstackclient/tests/functional/network/v2/test_network.py b/openstackclient/tests/functional/network/v2/test_network.py index ef42dcce..c55d70f9 100644 --- a/openstackclient/tests/functional/network/v2/test_network.py +++ b/openstackclient/tests/functional/network/v2/test_network.py @@ -10,164 +10,173 @@ # License for the specific language governing permissions and limitations # under the License. -import re +import json import uuid -import testtools - from openstackclient.tests.functional import base class NetworkTests(base.TestCase): """Functional tests for network""" - @classmethod - def setUpClass(cls): - # Set up some regex for matching below - cls.re_id = re.compile("id\s+\|\s+(\S+)") - cls.re_description = re.compile("description\s+\|\s+([^|]+?)\s+\|") - cls.re_enabled = re.compile("admin_state_up\s+\|\s+(\S+)") - cls.re_shared = re.compile("shared\s+\|\s+(\S+)") - cls.re_external = re.compile("router:external\s+\|\s+(\S+)") - cls.re_default = re.compile("is_default\s+\|\s+(\S+)") - cls.re_port_security = re.compile( - "port_security_enabled\s+\|\s+(\S+)" - ) - def test_network_delete(self): """Test create, delete multiple""" name1 = uuid.uuid4().hex - raw_output = self.openstack( - 'network create ' + + cmd_output = json.loads(self.openstack( + 'network create -f json ' + '--description aaaa ' + name1 - ) + )) + self.assertIsNotNone(cmd_output["id"]) self.assertEqual( 'aaaa', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) + name2 = uuid.uuid4().hex - raw_output = self.openstack( - 'network create ' + + cmd_output = json.loads(self.openstack( + 'network create -f json ' + '--description bbbb ' + name2 - ) + )) + self.assertIsNotNone(cmd_output["id"]) self.assertEqual( 'bbbb', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) del_output = self.openstack('network delete ' + name1 + ' ' + name2) self.assertOutput('', del_output) - @testtools.skip('broken SDK testing') def test_network_list(self): """Test create defaults, list filters, delete""" name1 = uuid.uuid4().hex - raw_output = self.openstack( - 'network create ' + + cmd_output = json.loads(self.openstack( + 'network create -f json ' + '--description aaaa ' + name1 - ) + )) self.addCleanup(self.openstack, 'network delete ' + name1) + self.assertIsNotNone(cmd_output["id"]) self.assertEqual( 'aaaa', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) # Check the default values self.assertEqual( 'UP', - re.search(self.re_enabled, raw_output).group(1), + cmd_output["admin_state_up"], ) self.assertEqual( - 'False', - re.search(self.re_shared, raw_output).group(1), + False, + cmd_output["shared"], ) self.assertEqual( 'Internal', - re.search(self.re_external, raw_output).group(1), + cmd_output["router:external"], ) # NOTE(dtroyer): is_default is not present in the create output # so make sure it stays that way. - self.assertIsNone(re.search(self.re_default, raw_output)) + # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer, + # but the value seems to always be None, regardless + # of the --default or --no-default value. + # self.assertEqual('x', cmd_output) + if ('is_default' in cmd_output): + self.assertEqual( + None, + cmd_output["is_default"], + ) self.assertEqual( - 'True', - re.search(self.re_port_security, raw_output).group(1), + True, + cmd_output["port_security_enabled"], ) name2 = uuid.uuid4().hex - raw_output = self.openstack( - 'network create ' + + cmd_output = json.loads(self.openstack( + 'network create -f json ' + '--description bbbb ' + '--disable ' + '--share ' + name2 - ) + )) self.addCleanup(self.openstack, 'network delete ' + name2) + self.assertIsNotNone(cmd_output["id"]) self.assertEqual( 'bbbb', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) self.assertEqual( 'DOWN', - re.search(self.re_enabled, raw_output).group(1), + cmd_output["admin_state_up"], ) self.assertEqual( - 'True', - re.search(self.re_shared, raw_output).group(1), + True, + cmd_output["shared"], + ) + if ('is_default' in cmd_output): + self.assertEqual( + None, + cmd_output["is_default"], + ) + self.assertEqual( + True, + cmd_output["port_security_enabled"], ) # Test list --long - raw_output = self.openstack('network list --long') - self.assertIsNotNone( - re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output) - ) - self.assertIsNotNone( - re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output) - ) + cmd_output = json.loads(self.openstack( + "network list -f json " + + "--long" + )) + col_name = [x["Name"] for x in cmd_output] + self.assertIn(name1, col_name) + self.assertIn(name2, col_name) # Test list --long --enable - raw_output = self.openstack('network list --long --enable') - self.assertIsNotNone( - re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output) - ) - self.assertIsNone( - re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output) - ) + cmd_output = json.loads(self.openstack( + "network list -f json " + + "--enable " + + "--long" + )) + col_name = [x["Name"] for x in cmd_output] + self.assertIn(name1, col_name) + self.assertNotIn(name2, col_name) # Test list --long --disable - raw_output = self.openstack('network list --long --disable') - self.assertIsNone( - re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output) - ) - self.assertIsNotNone( - re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output) - ) + cmd_output = json.loads(self.openstack( + "network list -f json " + + "--disable " + + "--long" + )) + col_name = [x["Name"] for x in cmd_output] + self.assertNotIn(name1, col_name) + self.assertIn(name2, col_name) # Test list --long --share - raw_output = self.openstack('network list --long --share') - self.assertIsNone( - re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output) - ) - self.assertIsNotNone( - re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output) - ) + cmd_output = json.loads(self.openstack( + "network list -f json " + + "--share " + + "--long" + )) + col_name = [x["Name"] for x in cmd_output] + self.assertNotIn(name1, col_name) + self.assertIn(name2, col_name) # Test list --long --no-share - raw_output = self.openstack('network list --long --no-share') - self.assertIsNotNone( - re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output) - ) - self.assertIsNone( - re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output) - ) + cmd_output = json.loads(self.openstack( + "network list -f json " + + "--no-share " + + "--long" + )) + col_name = [x["Name"] for x in cmd_output] + self.assertIn(name1, col_name) + self.assertNotIn(name2, col_name) - @testtools.skip('broken SDK testing') def test_network_set(self): """Tests create options, set, show, delete""" name = uuid.uuid4().hex - raw_output = self.openstack( - 'network create ' + + cmd_output = json.loads(self.openstack( + 'network create -f json ' + '--description aaaa ' + '--enable ' + '--no-share ' + @@ -175,30 +184,38 @@ class NetworkTests(base.TestCase): '--no-default ' + '--enable-port-security ' + name - ) + )) self.addCleanup(self.openstack, 'network delete ' + name) + self.assertIsNotNone(cmd_output["id"]) self.assertEqual( 'aaaa', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) self.assertEqual( 'UP', - re.search(self.re_enabled, raw_output).group(1), + cmd_output["admin_state_up"], ) self.assertEqual( - 'False', - re.search(self.re_shared, raw_output).group(1), + False, + cmd_output["shared"], ) self.assertEqual( 'Internal', - re.search(self.re_external, raw_output).group(1), + cmd_output["router:external"], ) # NOTE(dtroyer): is_default is not present in the create output # so make sure it stays that way. - self.assertIsNone(re.search(self.re_default, raw_output)) + # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer, + # but the value seems to always be None, regardless + # of the --default or --no-default value. + if ('is_default' in cmd_output): + self.assertEqual( + None, + cmd_output["is_default"], + ) self.assertEqual( - 'True', - re.search(self.re_port_security, raw_output).group(1), + True, + cmd_output["port_security_enabled"], ) raw_output = self.openstack( @@ -212,32 +229,34 @@ class NetworkTests(base.TestCase): ) self.assertOutput('', raw_output) - raw_output = self.openstack('network show ' + name) + cmd_output = json.loads(self.openstack( + 'network show -f json ' + name + )) self.assertEqual( 'cccc', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) self.assertEqual( 'DOWN', - re.search(self.re_enabled, raw_output).group(1), + cmd_output["admin_state_up"], ) self.assertEqual( - 'True', - re.search(self.re_shared, raw_output).group(1), + True, + cmd_output["shared"], ) self.assertEqual( 'External', - re.search(self.re_external, raw_output).group(1), + cmd_output["router:external"], ) # why not 'None' like above?? self.assertEqual( - 'False', - re.search(self.re_default, raw_output).group(1), + False, + cmd_output["is_default"], ) self.assertEqual( - 'False', - re.search(self.re_port_security, raw_output).group(1), + False, + cmd_output["port_security_enabled"], ) # NOTE(dtroyer): There is ambiguity around is_default in that @@ -252,14 +271,16 @@ class NetworkTests(base.TestCase): ) self.assertOutput('', raw_output) - raw_output = self.openstack('network show ' + name) + cmd_output = json.loads(self.openstack( + 'network show -f json ' + name + )) self.assertEqual( 'cccc', - re.search(self.re_description, raw_output).group(1), + cmd_output["description"], ) # NOTE(dtroyer): This should be 'True' self.assertEqual( - 'False', - re.search(self.re_default, raw_output).group(1), + False, + cmd_output["port_security_enabled"], ) diff --git a/openstackclient/tests/functional/network/v2/test_network_agent.py b/openstackclient/tests/functional/network/v2/test_network_agent.py index e99dcef6..dd6112e7 100644 --- a/openstackclient/tests/functional/network/v2/test_network_agent.py +++ b/openstackclient/tests/functional/network/v2/test_network_agent.py @@ -10,8 +10,6 @@ # License for the specific language governing permissions and limitations # under the License. -import testtools - from openstackclient.tests.functional import base @@ -28,13 +26,11 @@ class NetworkAgentTests(base.TestCase): # get the list of network agent IDs. cls.IDs = raw_output.split('\n') - @testtools.skip('broken SDK testing') def test_network_agent_show(self): opts = self.get_opts(self.FIELDS) raw_output = self.openstack('network agent show ' + self.IDs[0] + opts) self.assertEqual(self.IDs[0] + "\n", raw_output) - @testtools.skip('broken SDK testing') def test_network_agent_set(self): opts = self.get_opts(['admin_state_up']) self.openstack('network agent set --disable ' + self.IDs[0]) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py new file mode 100644 index 00000000..af0c9bac --- /dev/null +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py @@ -0,0 +1,181 @@ +# Copyright (c) 2016, Intel Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from openstackclient.tests.functional import base + + +class NetworkQosRuleTestsMinimumBandwidth(base.TestCase): + """Functional tests for QoS minimum bandwidth rule.""" + RULE_ID = None + QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + MIN_KBPS = 2800 + MIN_KBPS_MODIFIED = 7500 + DIRECTION = '--egress' + HEADERS = ['ID'] + FIELDS = ['id'] + TYPE = 'minimum-bandwidth' + + @classmethod + def setUpClass(cls): + opts = cls.get_opts(cls.FIELDS) + cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME) + cls.RULE_ID = cls.openstack('network qos rule create --type ' + + cls.TYPE + ' --min-kbps ' + + str(cls.MIN_KBPS) + ' ' + cls.DIRECTION + + ' ' + cls.QOS_POLICY_NAME + opts) + cls.assertsOutputNotNone(cls.RULE_ID) + + @classmethod + def tearDownClass(cls): + raw_output = cls.openstack('network qos rule delete ' + + cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID) + cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME) + cls.assertOutput('', raw_output) + + def test_qos_policy_list(self): + opts = self.get_opts(self.HEADERS) + raw_output = self.openstack('network qos rule list ' + + self.QOS_POLICY_NAME + opts) + self.assertIn(self.RULE_ID, raw_output) + + def test_qos_policy_show(self): + opts = self.get_opts(self.FIELDS) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(self.RULE_ID, raw_output) + + def test_qos_policy_set(self): + self.openstack('network qos rule set --min-kbps ' + + str(self.MIN_KBPS_MODIFIED) + ' ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + opts = self.get_opts(['min_kbps']) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output) + + +class NetworkQosRuleTestsDSCPMarking(base.TestCase): + """Functional tests for QoS DSCP marking rule.""" + RULE_ID = None + QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + DSCP_MARK = 8 + DSCP_MARK_MODIFIED = 32 + HEADERS = ['ID'] + FIELDS = ['id'] + TYPE = 'dscp-marking' + + @classmethod + def setUpClass(cls): + opts = cls.get_opts(cls.FIELDS) + cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME) + cls.RULE_ID = cls.openstack('network qos rule create --type ' + + cls.TYPE + ' --dscp-mark ' + + str(cls.DSCP_MARK) + ' ' + + cls.QOS_POLICY_NAME + opts) + cls.assertsOutputNotNone(cls.RULE_ID) + + @classmethod + def tearDownClass(cls): + raw_output = cls.openstack('network qos rule delete ' + + cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID) + cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME) + cls.assertOutput('', raw_output) + + def test_qos_policy_list(self): + opts = self.get_opts(self.HEADERS) + raw_output = self.openstack('network qos rule list ' + + self.QOS_POLICY_NAME + opts) + self.assertIn(self.RULE_ID, raw_output) + + def test_qos_policy_show(self): + opts = self.get_opts(self.FIELDS) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(self.RULE_ID, raw_output) + + def test_qos_policy_set(self): + self.openstack('network qos rule set --dscp-mark ' + + str(self.DSCP_MARK_MODIFIED) + ' ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + opts = self.get_opts(['dscp_mark']) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output) + + +class NetworkQosRuleTestsBandwidthLimit(base.TestCase): + """Functional tests for QoS bandwidth limit rule.""" + RULE_ID = None + QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + MAX_KBPS = 10000 + MAX_KBPS_MODIFIED = 15000 + MAX_BURST_KBITS = 1400 + MAX_BURST_KBITS_MODIFIED = 1800 + HEADERS = ['ID'] + FIELDS = ['id'] + TYPE = 'bandwidth-limit' + + @classmethod + def setUpClass(cls): + opts = cls.get_opts(cls.FIELDS) + cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME) + cls.RULE_ID = cls.openstack('network qos rule create --type ' + + cls.TYPE + ' --max-kbps ' + + str(cls.MAX_KBPS) + ' --max-burst-kbits ' + + str(cls.MAX_BURST_KBITS) + ' ' + + cls.QOS_POLICY_NAME + opts) + cls.assertsOutputNotNone(cls.RULE_ID) + + @classmethod + def tearDownClass(cls): + raw_output = cls.openstack('network qos rule delete ' + + cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID) + cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME) + cls.assertOutput('', raw_output) + + def test_qos_policy_list(self): + opts = self.get_opts(self.HEADERS) + raw_output = self.openstack('network qos rule list ' + + self.QOS_POLICY_NAME + opts) + self.assertIn(self.RULE_ID, raw_output) + + def test_qos_policy_show(self): + opts = self.get_opts(self.FIELDS) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(self.RULE_ID, raw_output) + + def test_qos_policy_set(self): + self.openstack('network qos rule set --max-kbps ' + + str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' + + str(self.MAX_BURST_KBITS_MODIFIED) + ' ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + opts = self.get_opts(['max_kbps']) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(str(self.MAX_KBPS_MODIFIED) + "\n", raw_output) + opts = self.get_opts(['max_burst_kbps']) + raw_output = self.openstack('network qos rule show ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID + + opts) + self.assertEqual(str(self.MAX_BURST_KBITS_MODIFIED) + "\n", raw_output) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py index 2bb04a9d..7dff0cbd 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import testtools - from openstackclient.tests.functional import base @@ -22,10 +20,8 @@ class NetworkQosRuleTypeTests(base.TestCase): """Functional tests for Network QoS rule type. """ AVAILABLE_RULE_TYPES = ['dscp_marking', - 'bandwidth_limit', - 'minimum_bandwidth'] + 'bandwidth_limit'] - @testtools.skip('broken SDK testing') def test_qos_rule_type_list(self): raw_output = self.openstack('network qos rule type list') for rule_type in self.AVAILABLE_RULE_TYPES: diff --git a/openstackclient/tests/functional/network/v2/test_port.py b/openstackclient/tests/functional/network/v2/test_port.py index 976fbedb..decd9553 100644 --- a/openstackclient/tests/functional/network/v2/test_port.py +++ b/openstackclient/tests/functional/network/v2/test_port.py @@ -12,8 +12,6 @@ import uuid -import testtools - from openstackclient.tests.functional import base @@ -25,7 +23,6 @@ class PortTests(base.TestCase): FIELDS = ['name'] @classmethod - @testtools.skip('broken SDK testing') def setUpClass(cls): # Create a network for the subnet. cls.openstack('network create ' + cls.NETWORK_NAME) diff --git a/openstackclient/tests/functional/network/v2/test_security_group_rule.py b/openstackclient/tests/functional/network/v2/test_security_group_rule.py index ec3731eb..c91de1a5 100644 --- a/openstackclient/tests/functional/network/v2/test_security_group_rule.py +++ b/openstackclient/tests/functional/network/v2/test_security_group_rule.py @@ -12,8 +12,6 @@ import uuid -import testtools - from openstackclient.tests.functional import base @@ -54,7 +52,6 @@ class SecurityGroupRuleTests(base.TestCase): cls.SECURITY_GROUP_NAME) cls.assertOutput('', raw_output) - @testtools.skip('broken SDK testing') def test_security_group_rule_list(self): opts = self.get_opts(self.ID_HEADER) raw_output = self.openstack('security group rule list ' + diff --git a/openstackclient/tests/functional/post_test_hook_tips.sh b/openstackclient/tests/functional/post_test_hook_tips.sh new file mode 100755 index 00000000..28ab9580 --- /dev/null +++ b/openstackclient/tests/functional/post_test_hook_tips.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +# This is a script that kicks off a series of functional tests against an +# OpenStack cloud. It will attempt to create an instance if one is not +# available. Do not run this script unless you know what you're doing. +# For more information refer to: +# http://docs.openstack.org/developer/python-openstackclient/ + +# This particular script differs from the normal post_test_hook because +# it installs the master (tip) version of osc-lib, os-client-config +# and openstacksdk, OSCs most important dependencies. + +function generate_testr_results { + if [ -f .testrepository/0 ]; then + sudo .tox/functional-tips/bin/testr last --subunit > $WORKSPACE/testrepository.subunit + sudo mv $WORKSPACE/testrepository.subunit $BASE/logs/testrepository.subunit + sudo .tox/functional-tips/bin/subunit2html $BASE/logs/testrepository.subunit $BASE/logs/testr_results.html + sudo gzip -9 $BASE/logs/testrepository.subunit + sudo gzip -9 $BASE/logs/testr_results.html + sudo chown jenkins:jenkins $BASE/logs/testrepository.subunit.gz $BASE/logs/testr_results.html.gz + sudo chmod a+r $BASE/logs/testrepository.subunit.gz $BASE/logs/testr_results.html.gz + fi +} + +export OPENSTACKCLIENT_DIR="$BASE/new/python-openstackclient" +sudo chown -R jenkins:stack $OPENSTACKCLIENT_DIR + +# Go to the openstackclient dir +cd $OPENSTACKCLIENT_DIR + +# Run tests +echo "Running openstackclient functional-tips test suite" +set +e + +# Source environment variables to kick things off +source ~stack/devstack/openrc admin admin +echo 'Running tests with:' +env | grep OS + +# Preserve env for OS_ credentials +sudo -E -H -u jenkins tox -e functional-tips +EXIT_CODE=$? +set -e + +# Collect and parse result +generate_testr_results +exit $EXIT_CODE diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py index ca6b1d31..626b466d 100644 --- a/openstackclient/tests/unit/fakes.py +++ b/openstackclient/tests/unit/fakes.py @@ -219,6 +219,12 @@ class FakeResource(object): def info(self): return self._info + def __getitem__(self, item): + return self._info.get(item) + + def get(self, item, default=None): + return self._info.get(item, default) + class FakeResponse(requests.Response): diff --git a/openstackclient/tests/unit/identity/v2_0/test_project.py b/openstackclient/tests/unit/identity/v2_0/test_project.py index c1f00762..4e1077db 100644 --- a/openstackclient/tests/unit/identity/v2_0/test_project.py +++ b/openstackclient/tests/unit/identity/v2_0/test_project.py @@ -13,8 +13,11 @@ # under the License. # +import mock + from keystoneauth1 import exceptions as ks_exc from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v2_0 import project from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes @@ -302,6 +305,32 @@ class TestProjectDelete(TestProject): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_projects_with_exception(self, find_mock): + find_mock.side_effect = [self.fake_project, + exceptions.CommandError] + arglist = [ + self.fake_project.id, + 'unexist_project', + ] + verifylist = [ + ('projects', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 projects failed to delete.', + str(e)) + + find_mock.assert_any_call(self.projects_mock, self.fake_project.id) + find_mock.assert_any_call(self.projects_mock, 'unexist_project') + + self.assertEqual(2, find_mock.call_count) + self.projects_mock.delete.assert_called_once_with(self.fake_project.id) + class TestProjectList(TestProject): diff --git a/openstackclient/tests/unit/identity/v2_0/test_role.py b/openstackclient/tests/unit/identity/v2_0/test_role.py index 68ebf141..684ce803 100644 --- a/openstackclient/tests/unit/identity/v2_0/test_role.py +++ b/openstackclient/tests/unit/identity/v2_0/test_role.py @@ -17,6 +17,7 @@ import mock from keystoneauth1 import exceptions as ks_exc from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v2_0 import role from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes @@ -240,6 +241,32 @@ class TestRoleDelete(TestRole): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_roles_with_exception(self, find_mock): + find_mock.side_effect = [self.fake_role, + exceptions.CommandError] + arglist = [ + self.fake_role.id, + 'unexist_role', + ] + verifylist = [ + ('roles', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 roles failed to delete.', + str(e)) + + find_mock.assert_any_call(self.roles_mock, self.fake_role.id) + find_mock.assert_any_call(self.roles_mock, 'unexist_role') + + self.assertEqual(2, find_mock.call_count) + self.roles_mock.delete.assert_called_once_with(self.fake_role.id) + class TestRoleList(TestRole): diff --git a/openstackclient/tests/unit/identity/v2_0/test_user.py b/openstackclient/tests/unit/identity/v2_0/test_user.py index 765f8559..a8b9497e 100644 --- a/openstackclient/tests/unit/identity/v2_0/test_user.py +++ b/openstackclient/tests/unit/identity/v2_0/test_user.py @@ -17,6 +17,7 @@ import mock from keystoneauth1 import exceptions as ks_exc from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v2_0 import user from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes @@ -411,6 +412,32 @@ class TestUserDelete(TestUser): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_users_with_exception(self, find_mock): + find_mock.side_effect = [self.fake_user, + exceptions.CommandError] + arglist = [ + self.fake_user.id, + 'unexist_user', + ] + verifylist = [ + ('users', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 users failed to delete.', + str(e)) + + find_mock.assert_any_call(self.users_mock, self.fake_user.id) + find_mock.assert_any_call(self.users_mock, 'unexist_user') + + self.assertEqual(2, find_mock.call_count) + self.users_mock.delete.assert_called_once_with(self.fake_user.id) + class TestUserList(TestUser): diff --git a/openstackclient/tests/unit/identity/v3/test_group.py b/openstackclient/tests/unit/identity/v3/test_group.py index eb50adb5..8558de95 100644 --- a/openstackclient/tests/unit/identity/v3/test_group.py +++ b/openstackclient/tests/unit/identity/v3/test_group.py @@ -16,6 +16,7 @@ from mock import call from keystoneauth1 import exceptions as ks_exc from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v3 import group from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes @@ -257,6 +258,32 @@ class TestGroupDelete(TestGroup): self.groups_mock.delete.assert_called_once_with(self.groups[0].id) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_groups_with_exception(self, find_mock): + find_mock.side_effect = [self.groups[0], + exceptions.CommandError] + arglist = [ + self.groups[0].id, + 'unexist_group', + ] + verifylist = [ + ('groups', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 groups failed to delete.', + str(e)) + + find_mock.assert_any_call(self.groups_mock, self.groups[0].id) + find_mock.assert_any_call(self.groups_mock, 'unexist_group') + + self.assertEqual(2, find_mock.call_count) + self.groups_mock.delete.assert_called_once_with(self.groups[0].id) + class TestGroupList(TestGroup): diff --git a/openstackclient/tests/unit/identity/v3/test_project.py b/openstackclient/tests/unit/identity/v3/test_project.py index 702d9209..2b898090 100644 --- a/openstackclient/tests/unit/identity/v3/test_project.py +++ b/openstackclient/tests/unit/identity/v3/test_project.py @@ -16,6 +16,7 @@ import mock from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v3 import project from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes @@ -445,6 +446,32 @@ class TestProjectDelete(TestProject): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_projects_with_exception(self, find_mock): + find_mock.side_effect = [self.project, + exceptions.CommandError] + arglist = [ + self.project.id, + 'unexist_project', + ] + verifylist = [ + ('projects', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 projects failed to delete.', + str(e)) + + find_mock.assert_any_call(self.projects_mock, self.project.id) + find_mock.assert_any_call(self.projects_mock, 'unexist_project') + + self.assertEqual(2, find_mock.call_count) + self.projects_mock.delete.assert_called_once_with(self.project.id) + class TestProjectList(TestProject): diff --git a/openstackclient/tests/unit/identity/v3/test_role.py b/openstackclient/tests/unit/identity/v3/test_role.py index 448e18d3..c0b68bdf 100644 --- a/openstackclient/tests/unit/identity/v3/test_role.py +++ b/openstackclient/tests/unit/identity/v3/test_role.py @@ -14,6 +14,10 @@ # import copy +import mock + +from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v3 import role from openstackclient.tests.unit import fakes @@ -428,6 +432,36 @@ class TestRoleDelete(TestRole): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_roles_with_exception(self, find_mock): + find_mock.side_effect = [self.roles_mock.get.return_value, + exceptions.CommandError] + arglist = [ + identity_fakes.role_name, + 'unexist_role', + ] + verifylist = [ + ('roles', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 roles failed to delete.', + str(e)) + + find_mock.assert_any_call(self.roles_mock, + identity_fakes.role_name, + domain_id=None) + find_mock.assert_any_call(self.roles_mock, + 'unexist_role', + domain_id=None) + + self.assertEqual(2, find_mock.call_count) + self.roles_mock.delete.assert_called_once_with(identity_fakes.role_id) + class TestRoleList(TestRole): diff --git a/openstackclient/tests/unit/identity/v3/test_trust.py b/openstackclient/tests/unit/identity/v3/test_trust.py index 4eeb8bfe..93e8f63d 100644 --- a/openstackclient/tests/unit/identity/v3/test_trust.py +++ b/openstackclient/tests/unit/identity/v3/test_trust.py @@ -12,6 +12,10 @@ # import copy +import mock + +from osc_lib import exceptions +from osc_lib import utils from openstackclient.identity.v3 import trust from openstackclient.tests.unit import fakes @@ -148,6 +152,33 @@ class TestTrustDelete(TestTrust): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_trusts_with_exception(self, find_mock): + find_mock.side_effect = [self.trusts_mock.get.return_value, + exceptions.CommandError] + arglist = [ + identity_fakes.trust_id, + 'unexist_trust', + ] + verifylist = [ + ('trust', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 trusts failed to delete.', + str(e)) + + find_mock.assert_any_call(self.trusts_mock, identity_fakes.trust_id) + find_mock.assert_any_call(self.trusts_mock, 'unexist_trust') + + self.assertEqual(2, find_mock.call_count) + self.trusts_mock.delete.assert_called_once_with( + identity_fakes.trust_id) + class TestTrustList(TestTrust): diff --git a/openstackclient/tests/unit/identity/v3/test_user.py b/openstackclient/tests/unit/identity/v3/test_user.py index 6150a5f3..3c1f49a6 100644 --- a/openstackclient/tests/unit/identity/v3/test_user.py +++ b/openstackclient/tests/unit/identity/v3/test_user.py @@ -16,6 +16,9 @@ import contextlib import mock +from osc_lib import exceptions +from osc_lib import utils + from openstackclient.identity.v3 import user from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes @@ -465,6 +468,32 @@ class TestUserDelete(TestUser): ) self.assertIsNone(result) + @mock.patch.object(utils, 'find_resource') + def test_delete_multi_users_with_exception(self, find_mock): + find_mock.side_effect = [self.user, + exceptions.CommandError] + arglist = [ + self.user.id, + 'unexist_user', + ] + verifylist = [ + ('users', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 users failed to delete.', + str(e)) + + find_mock.assert_any_call(self.users_mock, self.user.id) + find_mock.assert_any_call(self.users_mock, 'unexist_user') + + self.assertEqual(2, find_mock.call_count) + self.users_mock.delete.assert_called_once_with(self.user.id) + class TestUserList(TestUser): diff --git a/openstackclient/tests/unit/image/v2/test_image.py b/openstackclient/tests/unit/image/v2/test_image.py index a054e513..164185df 100644 --- a/openstackclient/tests/unit/image/v2/test_image.py +++ b/openstackclient/tests/unit/image/v2/test_image.py @@ -829,6 +829,11 @@ class TestImageSet(TestImage): self.images_mock.get.return_value = self.model(**image_fakes.IMAGE) self.images_mock.update.return_value = self.model(**image_fakes.IMAGE) + + self.app.client_manager.auth_ref = mock.Mock( + project_id=self.project.id, + ) + # Get the command object to test self.cmd = image.SetImage(self.app, None) @@ -845,6 +850,101 @@ class TestImageSet(TestImage): self.assertIsNone(result) + self.image_members_mock.update.assert_not_called() + + def test_image_set_membership_option_accept(self): + membership = image_fakes.FakeImage.create_one_image_member( + attrs={'image_id': image_fakes.image_id, + 'member_id': self.project.id} + ) + self.image_members_mock.update.return_value = membership + + arglist = [ + '--accept', + image_fakes.image_id, + ] + verifylist = [ + ('accept', True), + ('reject', False), + ('pending', False), + ('image', image_fakes.image_id) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + + self.image_members_mock.update.assert_called_once_with( + image_fakes.image_id, + self.app.client_manager.auth_ref.project_id, + 'accepted', + ) + + # Assert that the 'update image" route is also called, in addition to + # the 'update membership' route. + self.images_mock.update.assert_called_with(image_fakes.image_id) + + def test_image_set_membership_option_reject(self): + membership = image_fakes.FakeImage.create_one_image_member( + attrs={'image_id': image_fakes.image_id, + 'member_id': self.project.id} + ) + self.image_members_mock.update.return_value = membership + + arglist = [ + '--reject', + image_fakes.image_id, + ] + verifylist = [ + ('accept', False), + ('reject', True), + ('pending', False), + ('image', image_fakes.image_id) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + + self.image_members_mock.update.assert_called_once_with( + image_fakes.image_id, + self.app.client_manager.auth_ref.project_id, + 'rejected', + ) + + # Assert that the 'update image" route is also called, in addition to + # the 'update membership' route. + self.images_mock.update.assert_called_with(image_fakes.image_id) + + def test_image_set_membership_option_pending(self): + membership = image_fakes.FakeImage.create_one_image_member( + attrs={'image_id': image_fakes.image_id, + 'member_id': self.project.id} + ) + self.image_members_mock.update.return_value = membership + + arglist = [ + '--pending', + image_fakes.image_id, + ] + verifylist = [ + ('accept', False), + ('reject', False), + ('pending', True), + ('image', image_fakes.image_id) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + + self.image_members_mock.update.assert_called_once_with( + image_fakes.image_id, + self.app.client_manager.auth_ref.project_id, + 'pending', + ) + + # Assert that the 'update image" route is also called, in addition to + # the 'update membership' route. + self.images_mock.update.assert_called_with(image_fakes.image_id) + def test_image_set_options(self): arglist = [ '--name', 'new-name', diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py index b931cb55..524285ab 100644 --- a/openstackclient/tests/unit/network/v2/fakes.py +++ b/openstackclient/tests/unit/network/v2/fakes.py @@ -14,6 +14,8 @@ import argparse import copy import mock +from random import choice +from random import randint import uuid from openstackclient.tests.unit import fakes @@ -37,10 +39,20 @@ QUOTA = { "l7policy": 5, } +RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit' +RULE_TYPE_DSCP_MARKING = 'dscp-marking' +RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth' +VALID_QOS_RULES = [RULE_TYPE_BANDWIDTH_LIMIT, + RULE_TYPE_DSCP_MARKING, + RULE_TYPE_MINIMUM_BANDWIDTH] +VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, + 34, 36, 38, 40, 46, 48, 56] + class FakeNetworkV2Client(object): def __init__(self, **kwargs): + self.session = mock.Mock() self.extensions = mock.Mock() self.extensions.resource_class = fakes.FakeResource(None, {}) @@ -553,6 +565,8 @@ class FakeNetworkAgent(object): agent_attrs.update(attrs) agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs), loaded=True) + agent.is_admin_state_up = agent_attrs['admin_state_up'] + agent.is_alive = agent_attrs['alive'] return agent @staticmethod @@ -662,83 +676,90 @@ class FakeNetworkRBAC(object): return mock.Mock(side_effect=rbac_policies) -class FakeNetworkQosBandwidthLimitRule(object): - """Fake one or more QoS bandwidth limit rules.""" +class FakeNetworkQosPolicy(object): + """Fake one or more QoS policies.""" @staticmethod - def create_one_qos_bandwidth_limit_rule(attrs=None): - """Create a fake QoS bandwidth limit rule. + def create_one_qos_policy(attrs=None): + """Create a fake QoS policy. :param Dictionary attrs: A dictionary with all attributes :return: - A FakeResource object with id, qos_policy_id, max_kbps and - max_burst_kbps attributes. + A FakeResource object with name, id, etc. """ attrs = attrs or {} + qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex + rule_attrs = {'qos_policy_id': qos_id} + rules = [FakeNetworkQosRule.create_one_qos_rule(rule_attrs)] # Set default attributes. - qos_bandwidth_limit_rule_attrs = { - 'id': 'qos-bandwidth-limit-rule-id-' + uuid.uuid4().hex, - 'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex, - 'max_kbps': 1500, - 'max_burst_kbps': 1200, + qos_policy_attrs = { + 'name': 'qos-policy-name-' + uuid.uuid4().hex, + 'id': qos_id, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'shared': False, + 'description': 'qos-policy-description-' + uuid.uuid4().hex, + 'rules': rules, } # Overwrite default attributes. - qos_bandwidth_limit_rule_attrs.update(attrs) + qos_policy_attrs.update(attrs) - qos_bandwidth_limit_rule = fakes.FakeResource( - info=copy.deepcopy(qos_bandwidth_limit_rule_attrs), + qos_policy = fakes.FakeResource( + info=copy.deepcopy(qos_policy_attrs), loaded=True) - return qos_bandwidth_limit_rule + # Set attributes with special mapping in OpenStack SDK. + qos_policy.is_shared = qos_policy_attrs['shared'] + qos_policy.project_id = qos_policy_attrs['tenant_id'] + + return qos_policy @staticmethod - def create_qos_bandwidth_limit_rules(attrs=None, count=2): - """Create multiple fake QoS bandwidth limit rules. + def create_qos_policies(attrs=None, count=2): + """Create multiple fake QoS policies. :param Dictionary attrs: A dictionary with all attributes :param int count: - The number of QoS bandwidth limit rules to fake + The number of QoS policies to fake :return: - A list of FakeResource objects faking the QoS bandwidth limit rules + A list of FakeResource objects faking the QoS policies """ qos_policies = [] for i in range(0, count): - qos_policies.append(FakeNetworkQosBandwidthLimitRule. - create_one_qos_bandwidth_limit_rule(attrs)) + qos_policies.append( + FakeNetworkQosPolicy.create_one_qos_policy(attrs)) return qos_policies @staticmethod - def get_qos_bandwidth_limit_rules(qos_rules=None, count=2): - """Get a list of faked QoS bandwidth limit rules. + def get_qos_policies(qos_policies=None, count=2): + """Get an iterable MagicMock object with a list of faked QoS policies. - If QoS bandwidth limit rules list is provided, then initialize the - Mock object with the list. Otherwise create one. + If qos policies list is provided, then initialize the Mock object + with the list. Otherwise create one. :param List address scopes: - A list of FakeResource objects faking QoS bandwidth limit rules + A list of FakeResource objects faking qos policies :param int count: - The number of QoS bandwidth limit rules to fake + The number of QoS policies to fake :return: An iterable Mock object with side_effect set to a list of faked - qos bandwidth limit rules + QoS policies """ - if qos_rules is None: - qos_rules = (FakeNetworkQosBandwidthLimitRule. - create_qos_bandwidth_limit_rules(count)) - return mock.Mock(side_effect=qos_rules) + if qos_policies is None: + qos_policies = FakeNetworkQosPolicy.create_qos_policies(count) + return mock.Mock(side_effect=qos_policies) -class FakeNetworkQosPolicy(object): - """Fake one or more QoS policies.""" +class FakeNetworkQosRule(object): + """Fake one or more Network QoS rules.""" @staticmethod - def create_one_qos_policy(attrs=None): - """Create a fake QoS policy. + def create_one_qos_rule(attrs=None): + """Create a fake Network QoS rule. :param Dictionary attrs: A dictionary with all attributes @@ -746,71 +767,69 @@ class FakeNetworkQosPolicy(object): A FakeResource object with name, id, etc. """ attrs = attrs or {} - qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex - rule_attrs = {'qos_policy_id': qos_id} - rules = [ - FakeNetworkQosBandwidthLimitRule. - create_one_qos_bandwidth_limit_rule(rule_attrs)] # Set default attributes. - qos_policy_attrs = { - 'name': 'qos-policy-name-' + uuid.uuid4().hex, - 'id': qos_id, + type = attrs.get('type') or choice(VALID_QOS_RULES) + qos_rule_attrs = { + 'id': 'qos-rule-id-' + uuid.uuid4().hex, + 'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex, 'tenant_id': 'project-id-' + uuid.uuid4().hex, - 'shared': False, - 'description': 'qos-policy-description-' + uuid.uuid4().hex, - 'rules': rules, + 'type': type, } + if type == RULE_TYPE_BANDWIDTH_LIMIT: + qos_rule_attrs['max_kbps'] = randint(1, 10000) + qos_rule_attrs['max_burst_kbits'] = randint(1, 10000) + elif type == RULE_TYPE_DSCP_MARKING: + qos_rule_attrs['dscp_mark'] = choice(VALID_DSCP_MARKS) + elif type == RULE_TYPE_MINIMUM_BANDWIDTH: + qos_rule_attrs['min_kbps'] = randint(1, 10000) + qos_rule_attrs['direction'] = 'egress' # Overwrite default attributes. - qos_policy_attrs.update(attrs) + qos_rule_attrs.update(attrs) - qos_policy = fakes.FakeResource( - info=copy.deepcopy(qos_policy_attrs), - loaded=True) + qos_rule = fakes.FakeResource(info=copy.deepcopy(qos_rule_attrs), + loaded=True) # Set attributes with special mapping in OpenStack SDK. - qos_policy.is_shared = qos_policy_attrs['shared'] - qos_policy.project_id = qos_policy_attrs['tenant_id'] + qos_rule.project_id = qos_rule['tenant_id'] - return qos_policy + return qos_rule @staticmethod - def create_qos_policies(attrs=None, count=2): - """Create multiple fake QoS policies. + def create_qos_rules(attrs=None, count=2): + """Create multiple fake Network QoS rules. :param Dictionary attrs: A dictionary with all attributes :param int count: - The number of QoS policies to fake + The number of Network QoS rule to fake :return: - A list of FakeResource objects faking the QoS policies + A list of FakeResource objects faking the Network QoS rules """ - qos_policies = [] + qos_rules = [] for i in range(0, count): - qos_policies.append( - FakeNetworkQosPolicy.create_one_qos_policy(attrs)) - - return qos_policies + qos_rules.append(FakeNetworkQosRule.create_one_qos_rule(attrs)) + return qos_rules @staticmethod - def get_qos_policies(qos_policies=None, count=2): - """Get an iterable MagicMock object with a list of faked QoS policies. + def get_qos_rules(qos_rules=None, count=2): + """Get a list of faked Network QoS rules. - If qos policies list is provided, then initialize the Mock object - with the list. Otherwise create one. + If Network QoS rules list is provided, then initialize the Mock + object with the list. Otherwise create one. :param List address scopes: - A list of FakeResource objects faking qos policies + A list of FakeResource objects faking Network QoS rules :param int count: - The number of QoS policies to fake + The number of QoS minimum bandwidth rules to fake :return: An iterable Mock object with side_effect set to a list of faked - QoS policies + qos minimum bandwidth rules """ - if qos_policies is None: - qos_policies = FakeNetworkQosPolicy.create_qos_policies(count) - return mock.Mock(side_effect=qos_policies) + if qos_rules is None: + qos_rules = (FakeNetworkQosRule.create_qos_rules(count)) + return mock.Mock(side_effect=qos_rules) class FakeNetworkQosRuleType(object): diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip.py b/openstackclient/tests/unit/network/v2/test_floating_ip.py index 63d22bf8..e395300d 100644 --- a/openstackclient/tests/unit/network/v2/test_floating_ip.py +++ b/openstackclient/tests/unit/network/v2/test_floating_ip.py @@ -208,13 +208,19 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): super(TestDeleteFloatingIPNetwork, self).setUp() self.network.delete_ip = mock.Mock(return_value=None) - self.network.find_ip = ( - network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) # Get the command object to test self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace) - def test_floating_ip_delete(self): + @mock.patch( + "openstackclient.tests.unit.network.v2.test_floating_ip." + + "floating_ip._find_floating_ip" + ) + def test_floating_ip_delete(self, find_floating_ip_mock): + find_floating_ip_mock.side_effect = [ + (self.floating_ips[0], []), + (self.floating_ips[1], []), + ] arglist = [ self.floating_ips[0].id, ] @@ -225,12 +231,24 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): result = self.cmd.take_action(parsed_args) - self.network.find_ip.assert_called_once_with( - self.floating_ips[0].id, ignore_missing=False) + find_floating_ip_mock.assert_called_once_with( + mock.ANY, + [], + self.floating_ips[0].id, + ignore_missing=False, + ) self.network.delete_ip.assert_called_once_with(self.floating_ips[0]) self.assertIsNone(result) - def test_multi_floating_ips_delete(self): + @mock.patch( + "openstackclient.tests.unit.network.v2.test_floating_ip." + + "floating_ip._find_floating_ip" + ) + def test_floating_ip_delete_multi(self, find_floating_ip_mock): + find_floating_ip_mock.side_effect = [ + (self.floating_ips[0], []), + (self.floating_ips[1], []), + ] arglist = [] verifylist = [] @@ -243,13 +261,37 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): result = self.cmd.take_action(parsed_args) + calls = [ + call( + mock.ANY, + [], + self.floating_ips[0].id, + ignore_missing=False, + ), + call( + mock.ANY, + [], + self.floating_ips[1].id, + ignore_missing=False, + ), + ] + find_floating_ip_mock.assert_has_calls(calls) + calls = [] for f in self.floating_ips: calls.append(call(f)) self.network.delete_ip.assert_has_calls(calls) self.assertIsNone(result) - def test_multi_floating_ips_delete_with_exception(self): + @mock.patch( + "openstackclient.tests.unit.network.v2.test_floating_ip." + + "floating_ip._find_floating_ip" + ) + def test_floating_ip_delete_multi_exception(self, find_floating_ip_mock): + find_floating_ip_mock.side_effect = [ + (self.floating_ips[0], []), + exceptions.CommandError, + ] arglist = [ self.floating_ips[0].id, 'unexist_floating_ip', @@ -260,21 +302,24 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) - find_mock_result = [self.floating_ips[0], exceptions.CommandError] - self.network.find_ip = ( - mock.Mock(side_effect=find_mock_result) - ) - try: self.cmd.take_action(parsed_args) self.fail('CommandError should be raised.') except exceptions.CommandError as e: self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - self.network.find_ip.assert_any_call( - self.floating_ips[0].id, ignore_missing=False) - self.network.find_ip.assert_any_call( - 'unexist_floating_ip', ignore_missing=False) + find_floating_ip_mock.assert_any_call( + mock.ANY, + [], + self.floating_ips[0].id, + ignore_missing=False, + ) + find_floating_ip_mock.assert_any_call( + mock.ANY, + [], + 'unexist_floating_ip', + ignore_missing=False, + ) self.network.delete_ip.assert_called_once_with( self.floating_ips[0] ) @@ -534,7 +579,12 @@ class TestShowFloatingIPNetwork(TestFloatingIPNetwork): # Get the command object to test self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace) - def test_floating_ip_show(self): + @mock.patch( + "openstackclient.tests.unit.network.v2.test_floating_ip." + + "floating_ip._find_floating_ip" + ) + def test_floating_ip_show(self, find_floating_ip_mock): + find_floating_ip_mock.return_value = (self.floating_ip, []) arglist = [ self.floating_ip.id, ] @@ -545,9 +595,11 @@ class TestShowFloatingIPNetwork(TestFloatingIPNetwork): columns, data = self.cmd.take_action(parsed_args) - self.network.find_ip.assert_called_once_with( + find_floating_ip_mock.assert_called_once_with( + mock.ANY, + [], self.floating_ip.id, - ignore_missing=False + ignore_missing=False, ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_network_agent.py b/openstackclient/tests/unit/network/v2/test_network_agent.py index 9fd395b4..2fc0c043 100644 --- a/openstackclient/tests/unit/network/v2/test_network_agent.py +++ b/openstackclient/tests/unit/network/v2/test_network_agent.py @@ -195,6 +195,8 @@ class TestListNetworkAgent(TestNetworkAgent): self.assertEqual(self.data, list(data)) +# TODO(huanxuan): Also update by the new attribute name +# "is_admin_state_up" after sdk 0.9.12 class TestSetNetworkAgent(TestNetworkAgent): _network_agent = ( @@ -324,6 +326,6 @@ class TestShowNetworkAgent(TestNetworkAgent): columns, data = self.cmd.take_action(parsed_args) self.network.get_agent.assert_called_once_with( - self._network_agent.id, ignore_missing=False) + self._network_agent.id) self.assertEqual(self.columns, columns) self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py new file mode 100644 index 00000000..41ccae32 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py @@ -0,0 +1,1049 @@ +# Copyright (c) 2016, Intel Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from osc_lib import exceptions + +from openstackclient.network.v2 import network_qos_rule +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit' +RULE_TYPE_DSCP_MARKING = 'dscp-marking' +RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth' +DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, + 34, 36, 38, 40, 46, 48, 56] + + +class TestNetworkQosRule(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestNetworkQosRule, self).setUp() + # Get a shortcut to the network client + self.network = self.app.client_manager.network + self.qos_policy = (network_fakes.FakeNetworkQosPolicy. + create_one_qos_policy()) + self.network.find_qos_policy = mock.Mock(return_value=self.qos_policy) + + +class TestCreateNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): + + def test_check_type_parameters(self): + pass + + def setUp(self): + super(TestCreateNetworkQosRuleMinimumBandwidth, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_BANDWIDTH} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.columns = ( + 'direction', + 'id', + 'min_kbps', + 'project_id', + 'qos_policy_id', + 'type' + ) + + self.data = ( + self.new_rule.direction, + self.new_rule.id, + self.new_rule.min_kbps, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + self.network.create_qos_minimum_bandwidth_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.CreateNetworkQosRule(self.app, + self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + '--type', RULE_TYPE_MINIMUM_BANDWIDTH, + '--min-kbps', str(self.new_rule.min_kbps), + '--egress', + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_MINIMUM_BANDWIDTH), + ('min_kbps', self.new_rule.min_kbps), + ('egress', True), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_qos_minimum_bandwidth_rule.assert_called_once_with( + self.qos_policy.id, + **{'min_kbps': self.new_rule.min_kbps, + 'direction': self.new_rule.direction} + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_wrong_options(self): + arglist = [ + '--type', RULE_TYPE_MINIMUM_BANDWIDTH, + '--max-kbps', '10000', + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_MINIMUM_BANDWIDTH), + ('max_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('"Create" rule command for type "minimum-bandwidth" ' + 'requires arguments min_kbps, direction') + self.assertEqual(msg, str(e)) + + +class TestCreateNetworkQosRuleDSCPMarking(TestNetworkQosRule): + + def test_check_type_parameters(self): + pass + + def setUp(self): + super(TestCreateNetworkQosRuleDSCPMarking, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_DSCP_MARKING} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.columns = ( + 'dscp_mark', + 'id', + 'project_id', + 'qos_policy_id', + 'type' + ) + + self.data = ( + self.new_rule.dscp_mark, + self.new_rule.id, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + self.network.create_qos_dscp_marking_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.CreateNetworkQosRule(self.app, + self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + '--type', RULE_TYPE_DSCP_MARKING, + '--dscp-mark', str(self.new_rule.dscp_mark), + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_DSCP_MARKING), + ('dscp_mark', self.new_rule.dscp_mark), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_qos_dscp_marking_rule.assert_called_once_with( + self.qos_policy.id, + **{'dscp_mark': self.new_rule.dscp_mark} + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_wrong_options(self): + arglist = [ + '--type', RULE_TYPE_DSCP_MARKING, + '--max-kbps', '10000', + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_DSCP_MARKING), + ('max_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('"Create" rule command for type "dscp-marking" ' + 'requires arguments dscp_mark') + self.assertEqual(msg, str(e)) + + +class TestCreateNetworkQosRuleBandwidtLimit(TestNetworkQosRule): + + def test_check_type_parameters(self): + pass + + def setUp(self): + super(TestCreateNetworkQosRuleBandwidtLimit, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_BANDWIDTH_LIMIT} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.columns = ( + 'id', + 'max_burst_kbits', + 'max_kbps', + 'project_id', + 'qos_policy_id', + 'type' + ) + + self.data = ( + self.new_rule.id, + self.new_rule.max_burst_kbits, + self.new_rule.max_kbps, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + self.network.create_qos_bandwidth_limit_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.CreateNetworkQosRule(self.app, + self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + '--type', RULE_TYPE_BANDWIDTH_LIMIT, + '--max-kbps', str(self.new_rule.max_kbps), + '--max-burst-kbits', str(self.new_rule.max_burst_kbits), + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_BANDWIDTH_LIMIT), + ('max_kbps', self.new_rule.max_kbps), + ('max_burst_kbits', self.new_rule.max_burst_kbits), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_qos_bandwidth_limit_rule.assert_called_once_with( + self.qos_policy.id, + **{'max_kbps': self.new_rule.max_kbps, + 'max_burst_kbps': self.new_rule.max_burst_kbits} + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_wrong_options(self): + arglist = [ + '--type', RULE_TYPE_BANDWIDTH_LIMIT, + '--min-kbps', '10000', + self.new_rule.qos_policy_id, + ] + + verifylist = [ + ('type', RULE_TYPE_BANDWIDTH_LIMIT), + ('min_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('"Create" rule command for type "bandwidth-limit" ' + 'requires arguments max_kbps, max_burst_kbps') + self.assertEqual(msg, str(e)) + + +class TestDeleteNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): + + def setUp(self): + super(TestDeleteNetworkQosRuleMinimumBandwidth, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_BANDWIDTH} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.network.delete_qos_minimum_bandwidth_rule = mock.Mock( + return_value=None) + self.network.find_qos_minimum_bandwidth_rule = ( + network_fakes.FakeNetworkQosRule.get_qos_rules( + qos_rules=self.new_rule) + ) + + # Get the command object to test + self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app, + self.namespace) + + def test_qos_policy_delete(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.network.find_qos_policy.assert_called_once_with( + self.qos_policy.id, ignore_missing=False) + self.network.delete_qos_minimum_bandwidth_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertIsNone(result) + + def test_qos_policy_delete_error(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + self.network.delete_qos_minimum_bandwidth_rule.side_effect = \ + Exception('Error message') + try: + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' % + {'rule': self.new_rule.id, 'e': 'Error message'}) + self.assertEqual(msg, str(e)) + + +class TestDeleteNetworkQosRuleDSCPMarking(TestNetworkQosRule): + + def setUp(self): + super(TestDeleteNetworkQosRuleDSCPMarking, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_DSCP_MARKING} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.network.delete_qos_dscp_marking_rule = mock.Mock( + return_value=None) + self.network.find_qos_dscp_marking_rule = ( + network_fakes.FakeNetworkQosRule.get_qos_rules( + qos_rules=self.new_rule) + ) + + # Get the command object to test + self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app, + self.namespace) + + def test_qos_policy_delete(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.network.find_qos_policy.assert_called_once_with( + self.qos_policy.id, ignore_missing=False) + self.network.delete_qos_dscp_marking_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertIsNone(result) + + def test_qos_policy_delete_error(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + self.network.delete_qos_dscp_marking_rule.side_effect = \ + Exception('Error message') + try: + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' % + {'rule': self.new_rule.id, 'e': 'Error message'}) + self.assertEqual(msg, str(e)) + + +class TestDeleteNetworkQosRuleBandwidthLimit(TestNetworkQosRule): + + def setUp(self): + super(TestDeleteNetworkQosRuleBandwidthLimit, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_BANDWIDTH_LIMIT} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.network.delete_qos_bandwidth_limit_rule = mock.Mock( + return_value=None) + self.network.find_qos_bandwidth_limit_rule = ( + network_fakes.FakeNetworkQosRule.get_qos_rules( + qos_rules=self.new_rule) + ) + + # Get the command object to test + self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app, + self.namespace) + + def test_qos_policy_delete(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + self.network.find_qos_policy.assert_called_once_with( + self.qos_policy.id, ignore_missing=False) + self.network.delete_qos_bandwidth_limit_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertIsNone(result) + + def test_qos_policy_delete_error(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + self.network.delete_qos_bandwidth_limit_rule.side_effect = \ + Exception('Error message') + try: + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' % + {'rule': self.new_rule.id, 'e': 'Error message'}) + self.assertEqual(msg, str(e)) + + +class TestSetNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): + + def setUp(self): + super(TestSetNetworkQosRuleMinimumBandwidth, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_BANDWIDTH} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs=attrs) + self.qos_policy.rules = [self.new_rule] + self.network.update_qos_minimum_bandwidth_rule = mock.Mock( + return_value=None) + self.network.find_qos_minimum_bandwidth_rule = mock.Mock( + return_value=self.new_rule) + self.network.find_qos_policy = mock.Mock( + return_value=self.qos_policy) + + # Get the command object to test + self.cmd = (network_qos_rule.SetNetworkQosRule(self.app, + self.namespace)) + + def test_set_nothing(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.network.update_qos_minimum_bandwidth_rule.assert_called_with( + self.new_rule, self.qos_policy.id) + self.assertIsNone(result) + + def test_set_min_kbps(self): + self._set_min_kbps() + + def test_set_min_kbps_to_zero(self): + self._set_min_kbps(min_kbps=0) + + def _set_min_kbps(self, min_kbps=None): + if min_kbps: + previous_min_kbps = self.new_rule.min_kbps + self.new_rule.min_kbps = min_kbps + + arglist = [ + '--min-kbps', str(self.new_rule.min_kbps), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('min_kbps', self.new_rule.min_kbps), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'min_kbps': self.new_rule.min_kbps, + } + self.network.update_qos_minimum_bandwidth_rule.assert_called_with( + self.new_rule, self.qos_policy.id, **attrs) + self.assertIsNone(result) + + if min_kbps: + self.new_rule.min_kbps = previous_min_kbps + + def test_set_wrong_options(self): + arglist = [ + '--max-kbps', str(10000), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('max_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type ' + '"minimum-bandwidth" only requires arguments min_kbps, ' + 'direction' % {'rule': self.new_rule.id}) + self.assertEqual(msg, str(e)) + + +class TestSetNetworkQosRuleDSCPMarking(TestNetworkQosRule): + + def setUp(self): + super(TestSetNetworkQosRuleDSCPMarking, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_DSCP_MARKING} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs=attrs) + self.qos_policy.rules = [self.new_rule] + self.network.update_qos_dscp_marking_rule = mock.Mock( + return_value=None) + self.network.find_qos_dscp_marking_rule = mock.Mock( + return_value=self.new_rule) + self.network.find_qos_policy = mock.Mock( + return_value=self.qos_policy) + + # Get the command object to test + self.cmd = (network_qos_rule.SetNetworkQosRule(self.app, + self.namespace)) + + def test_set_nothing(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.network.update_qos_dscp_marking_rule.assert_called_with( + self.new_rule, self.qos_policy.id) + self.assertIsNone(result) + + def test_set_dscp_mark(self): + self._set_dscp_mark() + + def test_set_dscp_mark_to_zero(self): + self._set_dscp_mark(dscp_mark=0) + + def _set_dscp_mark(self, dscp_mark=None): + if dscp_mark: + previous_dscp_mark = self.new_rule.dscp_mark + self.new_rule.dscp_mark = dscp_mark + + arglist = [ + '--dscp-mark', str(self.new_rule.dscp_mark), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('dscp_mark', self.new_rule.dscp_mark), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'dscp_mark': self.new_rule.dscp_mark, + } + self.network.update_qos_dscp_marking_rule.assert_called_with( + self.new_rule, self.qos_policy.id, **attrs) + self.assertIsNone(result) + + if dscp_mark: + self.new_rule.dscp_mark = previous_dscp_mark + + def test_set_wrong_options(self): + arglist = [ + '--max-kbps', str(10000), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('max_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type ' + '"dscp-marking" only requires arguments dscp_mark' % + {'rule': self.new_rule.id}) + self.assertEqual(msg, str(e)) + + +class TestSetNetworkQosRuleBandwidthLimit(TestNetworkQosRule): + + def setUp(self): + super(TestSetNetworkQosRuleBandwidthLimit, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_BANDWIDTH_LIMIT} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs=attrs) + self.qos_policy.rules = [self.new_rule] + self.network.update_qos_bandwidth_limit_rule = mock.Mock( + return_value=None) + self.network.find_qos_bandwidth_limit_rule = mock.Mock( + return_value=self.new_rule) + self.network.find_qos_policy = mock.Mock( + return_value=self.qos_policy) + + # Get the command object to test + self.cmd = (network_qos_rule.SetNetworkQosRule(self.app, + self.namespace)) + + def test_set_nothing(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.network.update_qos_bandwidth_limit_rule.assert_called_with( + self.new_rule, self.qos_policy.id) + self.assertIsNone(result) + + def test_set_max_kbps(self): + self._set_max_kbps() + + def test_set_max_kbps_to_zero(self): + self._set_max_kbps(max_kbps=0) + + def _set_max_kbps(self, max_kbps=None): + if max_kbps: + previous_max_kbps = self.new_rule.max_kbps + self.new_rule.max_kbps = max_kbps + + arglist = [ + '--max-kbps', str(self.new_rule.max_kbps), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('max_kbps', self.new_rule.max_kbps), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'max_kbps': self.new_rule.max_kbps, + } + self.network.update_qos_bandwidth_limit_rule.assert_called_with( + self.new_rule, self.qos_policy.id, **attrs) + self.assertIsNone(result) + + if max_kbps: + self.new_rule.max_kbps = previous_max_kbps + + def test_set_max_burst_kbits(self): + self._set_max_burst_kbits() + + def test_set_max_burst_kbits_to_zero(self): + self._set_max_burst_kbits(max_burst_kbits=0) + + def _set_max_burst_kbits(self, max_burst_kbits=None): + if max_burst_kbits: + previous_max_burst_kbits = self.new_rule.max_burst_kbits + self.new_rule.max_burst_kbits = max_burst_kbits + + arglist = [ + '--max-burst-kbits', str(self.new_rule.max_burst_kbits), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('max_burst_kbits', self.new_rule.max_burst_kbits), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = { + 'max_burst_kbps': self.new_rule.max_burst_kbits, + } + self.network.update_qos_bandwidth_limit_rule.assert_called_with( + self.new_rule, self.qos_policy.id, **attrs) + self.assertIsNone(result) + + if max_burst_kbits: + self.new_rule.max_burst_kbits = previous_max_burst_kbits + + def test_set_wrong_options(self): + arglist = [ + '--min-kbps', str(10000), + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('min_kbps', 10000), + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + except exceptions.CommandError as e: + msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type ' + '"bandwidth-limit" only requires arguments max_kbps, ' + 'max_burst_kbps' % {'rule': self.new_rule.id}) + self.assertEqual(msg, str(e)) + + +class TestListNetworkQosRule(TestNetworkQosRule): + + def setUp(self): + super(TestListNetworkQosRule, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_BANDWIDTH} + self.new_rule_min_bw = (network_fakes.FakeNetworkQosRule. + create_one_qos_rule(attrs=attrs)) + attrs['type'] = RULE_TYPE_DSCP_MARKING + self.new_rule_dscp_mark = (network_fakes.FakeNetworkQosRule. + create_one_qos_rule(attrs=attrs)) + attrs['type'] = RULE_TYPE_BANDWIDTH_LIMIT + self.new_rule_max_bw = (network_fakes.FakeNetworkQosRule. + create_one_qos_rule(attrs=attrs)) + self.qos_policy.rules = [self.new_rule_min_bw, + self.new_rule_dscp_mark, + self.new_rule_max_bw] + self.network.find_qos_minimum_bandwidth_rule = mock.Mock( + return_value=self.new_rule_min_bw) + self.network.find_qos_dscp_marking_rule = mock.Mock( + return_value=self.new_rule_dscp_mark) + self.network.find_qos_bandwidth_limit_rule = mock.Mock( + return_value=self.new_rule_max_bw) + self.columns = ( + 'ID', + 'QoS Policy ID', + 'Type', + 'Max Kbps', + 'Max Burst Kbits', + 'Min Kbps', + 'DSCP mark', + 'Direction', + ) + self.data = [] + for index in range(len(self.qos_policy.rules)): + self.data.append(( + self.qos_policy.rules[index].id, + self.qos_policy.rules[index].qos_policy_id, + self.qos_policy.rules[index].type, + getattr(self.qos_policy.rules[index], 'max_kbps', ''), + getattr(self.qos_policy.rules[index], 'max_burst_kbps', ''), + getattr(self.qos_policy.rules[index], 'min_kbps', ''), + getattr(self.qos_policy.rules[index], 'dscp_mark', ''), + getattr(self.qos_policy.rules[index], 'direction', ''), + )) + # Get the command object to test + self.cmd = network_qos_rule.ListNetworkQosRule(self.app, + self.namespace) + + def test_qos_rule_list(self): + arglist = [ + self.qos_policy.id + ] + verifylist = [ + ('qos_policy', self.qos_policy.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.find_qos_policy.assert_called_once_with( + self.qos_policy.id, ignore_missing=False) + self.assertEqual(self.columns, columns) + list_data = list(data) + self.assertEqual(len(self.data), len(list_data)) + for index in range(len(list_data)): + self.assertEqual(self.data[index], list_data[index]) + + +class TestShowNetworkQosRuleMinimumBandwidth(TestNetworkQosRule): + + def setUp(self): + super(TestShowNetworkQosRuleMinimumBandwidth, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_MINIMUM_BANDWIDTH} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.columns = ( + 'direction', + 'id', + 'min_kbps', + 'project_id', + 'qos_policy_id', + 'type' + ) + self.data = ( + self.new_rule.direction, + self.new_rule.id, + self.new_rule.min_kbps, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + + self.network.get_qos_minimum_bandwidth_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.ShowNetworkQosRule(self.app, + self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.get_qos_minimum_bandwidth_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertEqual(self.columns, columns) + self.assertEqual(list(self.data), list(data)) + + +class TestShowNetworkQosDSCPMarking(TestNetworkQosRule): + + def setUp(self): + super(TestShowNetworkQosDSCPMarking, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_DSCP_MARKING} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.columns = ( + 'dscp_mark', + 'id', + 'project_id', + 'qos_policy_id', + 'type' + ) + self.data = ( + self.new_rule.dscp_mark, + self.new_rule.id, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + + self.network.get_qos_dscp_marking_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.ShowNetworkQosRule(self.app, + self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.get_qos_dscp_marking_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertEqual(self.columns, columns) + self.assertEqual(list(self.data), list(data)) + + +class TestShowNetworkQosBandwidthLimit(TestNetworkQosRule): + + def setUp(self): + super(TestShowNetworkQosBandwidthLimit, self).setUp() + attrs = {'qos_policy_id': self.qos_policy.id, + 'type': RULE_TYPE_BANDWIDTH_LIMIT} + self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule( + attrs) + self.qos_policy.rules = [self.new_rule] + self.columns = ( + 'id', + 'max_burst_kbits', + 'max_kbps', + 'project_id', + 'qos_policy_id', + 'type' + ) + self.data = ( + self.new_rule.id, + self.new_rule.max_burst_kbits, + self.new_rule.max_kbps, + self.new_rule.project_id, + self.new_rule.qos_policy_id, + self.new_rule.type, + ) + + self.network.get_qos_bandwidth_limit_rule = mock.Mock( + return_value=self.new_rule) + + # Get the command object to test + self.cmd = network_qos_rule.ShowNetworkQosRule(self.app, + self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + # Missing required args should bail here + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self.new_rule.qos_policy_id, + self.new_rule.id, + ] + verifylist = [ + ('qos_policy', self.new_rule.qos_policy_id), + ('id', self.new_rule.id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + self.network.get_qos_bandwidth_limit_rule.assert_called_once_with( + self.new_rule.id, self.qos_policy.id) + self.assertEqual(self.columns, columns) + self.assertEqual(list(self.data), list(data)) diff --git a/openstackclient/tests/unit/volume/v2/fakes.py b/openstackclient/tests/unit/volume/v2/fakes.py index d5cd72ec..a6676403 100644 --- a/openstackclient/tests/unit/volume/v2/fakes.py +++ b/openstackclient/tests/unit/volume/v2/fakes.py @@ -903,3 +903,23 @@ class FakeType(object): volume_types.append(volume_type) return volume_types + + @staticmethod + def get_types(types=None, count=2): + """Get an iterable MagicMock object with a list of faked types. + + If types list is provided, then initialize the Mock object with the + list. Otherwise create one. + + :param List types: + A list of FakeResource objects faking types + :param Integer count: + The number of types to be faked + :return + An iterable Mock object with side_effect set to a list of faked + types + """ + if types is None: + types = FakeType.create_types(count) + + return mock.Mock(side_effect=types) diff --git a/openstackclient/tests/unit/volume/v2/test_consistency_group.py b/openstackclient/tests/unit/volume/v2/test_consistency_group.py index bc99ca8d..6eeeae39 100644 --- a/openstackclient/tests/unit/volume/v2/test_consistency_group.py +++ b/openstackclient/tests/unit/volume/v2/test_consistency_group.py @@ -36,10 +36,115 @@ class TestConsistencyGroup(volume_fakes.TestVolume): self.app.client_manager.volume.cgsnapshots) self.cgsnapshots_mock.reset_mock() + self.volumes_mock = ( + self.app.client_manager.volume.volumes) + self.volumes_mock.reset_mock() + self.types_mock = self.app.client_manager.volume.volume_types self.types_mock.reset_mock() +class TestConsistencyGroupAddVolume(TestConsistencyGroup): + + _consistency_group = ( + volume_fakes.FakeConsistencyGroup.create_one_consistency_group()) + + def setUp(self): + super(TestConsistencyGroupAddVolume, self).setUp() + + self.consistencygroups_mock.get.return_value = ( + self._consistency_group) + # Get the command object to test + self.cmd = \ + consistency_group.AddVolumeToConsistencyGroup(self.app, None) + + def test_add_one_volume_to_consistency_group(self): + volume = volume_fakes.FakeVolume.create_one_volume() + self.volumes_mock.get.return_value = volume + arglist = [ + self._consistency_group.id, + volume.id, + ] + verifylist = [ + ('consistency_group', self._consistency_group.id), + ('volumes', [volume.id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'add_volumes': volume.id, + } + self.consistencygroups_mock.update.assert_called_once_with( + self._consistency_group.id, + **kwargs + ) + self.assertIsNone(result) + + def test_add_multiple_volumes_to_consistency_group(self): + volumes = volume_fakes.FakeVolume.create_volumes(count=2) + self.volumes_mock.get = volume_fakes.FakeVolume.get_volumes(volumes) + arglist = [ + self._consistency_group.id, + volumes[0].id, + volumes[1].id, + ] + verifylist = [ + ('consistency_group', self._consistency_group.id), + ('volumes', [volumes[0].id, volumes[1].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'add_volumes': volumes[0].id + ',' + volumes[1].id, + } + self.consistencygroups_mock.update.assert_called_once_with( + self._consistency_group.id, + **kwargs + ) + self.assertIsNone(result) + + @mock.patch.object(consistency_group.LOG, 'error') + def test_add_multiple_volumes_to_consistency_group_with_exception( + self, mock_error): + volume = volume_fakes.FakeVolume.create_one_volume() + arglist = [ + self._consistency_group.id, + volume.id, + 'unexist_volume', + ] + verifylist = [ + ('consistency_group', self._consistency_group.id), + ('volumes', [volume.id, 'unexist_volume']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [volume, + exceptions.CommandError, + self._consistency_group] + with mock.patch.object(utils, 'find_resource', + side_effect=find_mock_result) as find_mock: + result = self.cmd.take_action(parsed_args) + mock_error.assert_called_with("1 of 2 volumes failed to add.") + self.assertIsNone(result) + find_mock.assert_any_call(self.consistencygroups_mock, + self._consistency_group.id) + find_mock.assert_any_call(self.volumes_mock, + volume.id) + find_mock.assert_any_call(self.volumes_mock, + 'unexist_volume') + self.assertEqual(3, find_mock.call_count) + self.consistencygroups_mock.update.assert_called_once_with( + self._consistency_group.id, add_volumes=volume.id + ) + + class TestConsistencyGroupCreate(TestConsistencyGroup): volume_type = volume_fakes.FakeType.create_one_type() @@ -394,6 +499,107 @@ class TestConsistencyGroupList(TestConsistencyGroup): self.assertEqual(self.data_long, list(data)) +class TestConsistencyGroupRemoveVolume(TestConsistencyGroup): + + _consistency_group = ( + volume_fakes.FakeConsistencyGroup.create_one_consistency_group()) + + def setUp(self): + super(TestConsistencyGroupRemoveVolume, self).setUp() + + self.consistencygroups_mock.get.return_value = ( + self._consistency_group) + # Get the command object to test + self.cmd = \ + consistency_group.RemoveVolumeFromConsistencyGroup(self.app, None) + + def test_remove_one_volume_from_consistency_group(self): + volume = volume_fakes.FakeVolume.create_one_volume() + self.volumes_mock.get.return_value = volume + arglist = [ + self._consistency_group.id, + volume.id, + ] + verifylist = [ + ('consistency_group', self._consistency_group.id), + ('volumes', [volume.id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'remove_volumes': volume.id, + } + self.consistencygroups_mock.update.assert_called_once_with( + self._consistency_group.id, + **kwargs + ) + self.assertIsNone(result) + + def test_remove_multi_volumes_from_consistency_group(self): + volumes = volume_fakes.FakeVolume.create_volumes(count=2) + self.volumes_mock.get = volume_fakes.FakeVolume.get_volumes(volumes) + arglist = [ + self._consistency_group.id, + volumes[0].id, + volumes[1].id, + ] + verifylist = [ + ('consistency_group', self._consistency_group.id), + ('volumes', [volumes[0].id, volumes[1].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'remove_volumes': volumes[0].id + ',' + volumes[1].id, + } + self.consistencygroups_mock.update.assert_called_once_with( + self._consistency_group.id, + **kwargs + ) + self.assertIsNone(result) + + @mock.patch.object(consistency_group.LOG, 'error') + def test_remove_multiple_volumes_from_consistency_group_with_exception( + self, mock_error): + volume = volume_fakes.FakeVolume.create_one_volume() + arglist = [ + self._consistency_group.id, + volume.id, + 'unexist_volume', + ] + verifylist = [ + ('consistency_group', self._consistency_group.id), + ('volumes', [volume.id, 'unexist_volume']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [volume, + exceptions.CommandError, + self._consistency_group] + with mock.patch.object(utils, 'find_resource', + side_effect=find_mock_result) as find_mock: + result = self.cmd.take_action(parsed_args) + mock_error.assert_called_with("1 of 2 volumes failed to remove.") + self.assertIsNone(result) + find_mock.assert_any_call(self.consistencygroups_mock, + self._consistency_group.id) + find_mock.assert_any_call(self.volumes_mock, + volume.id) + find_mock.assert_any_call(self.volumes_mock, + 'unexist_volume') + self.assertEqual(3, find_mock.call_count) + self.consistencygroups_mock.update.assert_called_once_with( + self._consistency_group.id, remove_volumes=volume.id + ) + + class TestConsistencyGroupSet(TestConsistencyGroup): consistency_group = ( diff --git a/openstackclient/tests/unit/volume/v2/test_type.py b/openstackclient/tests/unit/volume/v2/test_type.py index 0d556e13..cec01bd8 100644 --- a/openstackclient/tests/unit/volume/v2/test_type.py +++ b/openstackclient/tests/unit/volume/v2/test_type.py @@ -13,6 +13,7 @@ # import mock +from mock import call from osc_lib import exceptions from osc_lib import utils @@ -133,12 +134,13 @@ class TestTypeCreate(TestType): class TestTypeDelete(TestType): - volume_type = volume_fakes.FakeType.create_one_type() + volume_types = volume_fakes.FakeType.create_types(count=2) def setUp(self): super(TestTypeDelete, self).setUp() - self.types_mock.get.return_value = self.volume_type + self.types_mock.get = volume_fakes.FakeType.get_types( + self.volume_types) self.types_mock.delete.return_value = None # Get the command object to mock @@ -146,18 +148,64 @@ class TestTypeDelete(TestType): def test_type_delete(self): arglist = [ - self.volume_type.id + self.volume_types[0].id ] verifylist = [ - ("volume_types", [self.volume_type.id]) + ("volume_types", [self.volume_types[0].id]) ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.types_mock.delete.assert_called_with(self.volume_type) + self.types_mock.delete.assert_called_with(self.volume_types[0]) self.assertIsNone(result) + def test_delete_multiple_types(self): + arglist = [] + for t in self.volume_types: + arglist.append(t.id) + verifylist = [ + ('volume_types', arglist), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + calls = [] + for t in self.volume_types: + calls.append(call(t)) + self.types_mock.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_delete_multiple_types_with_exception(self): + arglist = [ + self.volume_types[0].id, + 'unexist_type', + ] + verifylist = [ + ('volume_types', arglist), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.volume_types[0], exceptions.CommandError] + with mock.patch.object(utils, 'find_resource', + side_effect=find_mock_result) as find_mock: + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 volume types failed to delete.', + str(e)) + find_mock.assert_any_call( + self.types_mock, self.volume_types[0].id) + find_mock.assert_any_call(self.types_mock, 'unexist_type') + + self.assertEqual(2, find_mock.call_count) + self.types_mock.delete.assert_called_once_with( + self.volume_types[0] + ) + class TestTypeList(TestType): diff --git a/openstackclient/tests/unit/volume/v2/test_volume.py b/openstackclient/tests/unit/volume/v2/test_volume.py index 1529c2e2..4fef9dd9 100644 --- a/openstackclient/tests/unit/volume/v2/test_volume.py +++ b/openstackclient/tests/unit/volume/v2/test_volume.py @@ -823,6 +823,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': False, + 'project_id': None, + 'user_id': None, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -853,6 +866,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': True, + 'project_id': self.project.id, + 'user_id': None, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -885,6 +911,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': True, + 'project_id': self.project.id, + 'user_id': None, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -915,6 +954,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': False, + 'project_id': None, + 'user_id': self.user.id, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] device = self.mock_volume.attachments[0]['device'] @@ -946,6 +998,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': False, + 'project_id': None, + 'user_id': self.user.id, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -976,6 +1041,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': False, + 'project_id': None, + 'user_id': None, + 'display_name': self.mock_volume.name, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -1006,6 +1084,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': False, + 'project_id': None, + 'user_id': None, + 'display_name': None, + 'status': self.mock_volume.status, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -1036,6 +1127,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': True, + 'project_id': None, + 'user_id': None, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + self.assertEqual(self.columns, columns) server = self.mock_volume.attachments[0]['server_id'] @@ -1067,6 +1171,19 @@ class TestVolumeList(TestVolume): columns, data = self.cmd.take_action(parsed_args) + search_opts = { + 'all_tenants': False, + 'project_id': None, + 'user_id': None, + 'display_name': None, + 'status': None, + } + self.volumes_mock.list.assert_called_once_with( + search_opts=search_opts, + marker=None, + limit=None, + ) + collist = [ 'ID', 'Display Name', |
