summaryrefslogtreecommitdiff
path: root/openstackclient/tests
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests')
-rw-r--r--openstackclient/tests/functional/base.py5
-rw-r--r--openstackclient/tests/functional/common/test_quota.py3
-rw-r--r--openstackclient/tests/functional/image/v2/test_image.py22
-rw-r--r--openstackclient/tests/functional/network/v2/test_floating_ip.py8
-rw-r--r--openstackclient/tests/functional/network/v2/test_network.py229
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_agent.py4
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule.py181
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py6
-rw-r--r--openstackclient/tests/functional/network/v2/test_port.py3
-rw-r--r--openstackclient/tests/functional/network/v2/test_security_group_rule.py3
-rwxr-xr-xopenstackclient/tests/functional/post_test_hook_tips.sh47
-rw-r--r--openstackclient/tests/unit/fakes.py6
-rw-r--r--openstackclient/tests/unit/identity/v2_0/test_project.py29
-rw-r--r--openstackclient/tests/unit/identity/v2_0/test_role.py27
-rw-r--r--openstackclient/tests/unit/identity/v2_0/test_user.py27
-rw-r--r--openstackclient/tests/unit/identity/v3/test_group.py27
-rw-r--r--openstackclient/tests/unit/identity/v3/test_project.py27
-rw-r--r--openstackclient/tests/unit/identity/v3/test_role.py34
-rw-r--r--openstackclient/tests/unit/identity/v3/test_trust.py31
-rw-r--r--openstackclient/tests/unit/identity/v3/test_user.py29
-rw-r--r--openstackclient/tests/unit/image/v2/test_image.py100
-rw-r--r--openstackclient/tests/unit/network/v2/fakes.py165
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip.py90
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_agent.py4
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_qos_rule.py1049
-rw-r--r--openstackclient/tests/unit/volume/v2/fakes.py20
-rw-r--r--openstackclient/tests/unit/volume/v2/test_consistency_group.py206
-rw-r--r--openstackclient/tests/unit/volume/v2/test_type.py58
-rw-r--r--openstackclient/tests/unit/volume/v2/test_volume.py117
29 files changed, 2330 insertions, 227 deletions
diff --git a/openstackclient/tests/functional/base.py b/openstackclient/tests/functional/base.py
index 885abc02..fb78ea62 100644
--- a/openstackclient/tests/functional/base.py
+++ b/openstackclient/tests/functional/base.py
@@ -77,6 +77,11 @@ class TestCase(testtools.TestCase):
if expected not in actual:
raise Exception(expected + ' not in ' + actual)
+ @classmethod
+ def assertsOutputNotNone(cls, observed):
+ if observed is None:
+ raise Exception('No output observed')
+
def assert_table_structure(self, items, field_names):
"""Verify that all items have keys listed in field_names."""
for item in items:
diff --git a/openstackclient/tests/functional/common/test_quota.py b/openstackclient/tests/functional/common/test_quota.py
index fbb8e563..b2b198af 100644
--- a/openstackclient/tests/functional/common/test_quota.py
+++ b/openstackclient/tests/functional/common/test_quota.py
@@ -35,19 +35,16 @@ class QuotaTests(base.TestCase):
raw_output = self.openstack('quota show ' + self.PROJECT_NAME + opts)
self.assertEqual("11\n11\n11\n", raw_output)
- @testtools.skip('broken SDK testing')
def test_quota_show(self):
raw_output = self.openstack('quota show ' + self.PROJECT_NAME)
for expected_field in self.EXPECTED_FIELDS:
self.assertIn(expected_field, raw_output)
- @testtools.skip('broken SDK testing')
def test_quota_show_default_project(self):
raw_output = self.openstack('quota show')
for expected_field in self.EXPECTED_FIELDS:
self.assertIn(expected_field, raw_output)
- @testtools.skip('broken SDK testing')
def test_quota_show_with_default_option(self):
raw_output = self.openstack('quota show --default')
for expected_field in self.EXPECTED_FIELDS:
diff --git a/openstackclient/tests/functional/image/v2/test_image.py b/openstackclient/tests/functional/image/v2/test_image.py
index 3f432b02..6faff94a 100644
--- a/openstackclient/tests/functional/image/v2/test_image.py
+++ b/openstackclient/tests/functional/image/v2/test_image.py
@@ -74,3 +74,25 @@ class ImageTests(base.TestCase):
self.openstack('image unset --property a --property c ' + self.NAME)
raw_output = self.openstack('image show ' + self.NAME + opts)
self.assertEqual(self.NAME + "\n\n", raw_output)
+
+ def test_image_members(self):
+ opts = self.get_opts(['project_id'])
+ my_project_id = self.openstack('token issue' + opts).strip()
+ self.openstack(
+ 'image add project {} {}'.format(self.NAME, my_project_id))
+
+ self.openstack(
+ 'image set --accept ' + self.NAME)
+ shared_img_list = self.parse_listing(
+ self.openstack('image list --shared', self.get_opts(['name']))
+ )
+ self.assertIn(self.NAME, [img['Name'] for img in shared_img_list])
+
+ self.openstack(
+ 'image set --reject ' + self.NAME)
+ shared_img_list = self.parse_listing(
+ self.openstack('image list --shared', self.get_opts(['name']))
+ )
+
+ self.openstack(
+ 'image remove project {} {}'.format(self.NAME, my_project_id))
diff --git a/openstackclient/tests/functional/network/v2/test_floating_ip.py b/openstackclient/tests/functional/network/v2/test_floating_ip.py
index 5f642f04..fa9607a0 100644
--- a/openstackclient/tests/functional/network/v2/test_floating_ip.py
+++ b/openstackclient/tests/functional/network/v2/test_floating_ip.py
@@ -14,8 +14,6 @@ import random
import re
import uuid
-import testtools
-
from openstackclient.tests.functional import base
@@ -25,7 +23,6 @@ class FloatingIpTests(base.TestCase):
NETWORK_NAME = uuid.uuid4().hex
@classmethod
- @testtools.skip('broken SDK testing')
def setUpClass(cls):
# Set up some regex for matching below
cls.re_id = re.compile("id\s+\|\s+(\S+)")
@@ -37,7 +34,7 @@ class FloatingIpTests(base.TestCase):
# Make a random subnet
cls.subnet = ".".join(map(
str,
- (random.randint(0, 255) for _ in range(3))
+ (random.randint(0, 223) for _ in range(3))
)) + ".0/26"
# Create a network for the floating ip
@@ -62,7 +59,6 @@ class FloatingIpTests(base.TestCase):
raw_output = cls.openstack('network delete ' + cls.NETWORK_NAME)
cls.assertOutput('', raw_output)
- @testtools.skip('broken SDK testing')
def test_floating_ip_delete(self):
"""Test create, delete multiple"""
raw_output = self.openstack(
@@ -93,7 +89,6 @@ class FloatingIpTests(base.TestCase):
raw_output = self.openstack('floating ip delete ' + ip1 + ' ' + ip2)
self.assertOutput('', raw_output)
- @testtools.skip('broken SDK testing')
def test_floating_ip_list(self):
"""Test create defaults, list filters, delete"""
raw_output = self.openstack(
@@ -135,7 +130,6 @@ class FloatingIpTests(base.TestCase):
# TODO(dtroyer): add more filter tests
- @testtools.skip('broken SDK testing')
def test_floating_ip_show(self):
"""Test show"""
raw_output = self.openstack(
diff --git a/openstackclient/tests/functional/network/v2/test_network.py b/openstackclient/tests/functional/network/v2/test_network.py
index ef42dcce..c55d70f9 100644
--- a/openstackclient/tests/functional/network/v2/test_network.py
+++ b/openstackclient/tests/functional/network/v2/test_network.py
@@ -10,164 +10,173 @@
# License for the specific language governing permissions and limitations
# under the License.
-import re
+import json
import uuid
-import testtools
-
from openstackclient.tests.functional import base
class NetworkTests(base.TestCase):
"""Functional tests for network"""
- @classmethod
- def setUpClass(cls):
- # Set up some regex for matching below
- cls.re_id = re.compile("id\s+\|\s+(\S+)")
- cls.re_description = re.compile("description\s+\|\s+([^|]+?)\s+\|")
- cls.re_enabled = re.compile("admin_state_up\s+\|\s+(\S+)")
- cls.re_shared = re.compile("shared\s+\|\s+(\S+)")
- cls.re_external = re.compile("router:external\s+\|\s+(\S+)")
- cls.re_default = re.compile("is_default\s+\|\s+(\S+)")
- cls.re_port_security = re.compile(
- "port_security_enabled\s+\|\s+(\S+)"
- )
-
def test_network_delete(self):
"""Test create, delete multiple"""
name1 = uuid.uuid4().hex
- raw_output = self.openstack(
- 'network create ' +
+ cmd_output = json.loads(self.openstack(
+ 'network create -f json ' +
'--description aaaa ' +
name1
- )
+ ))
+ self.assertIsNotNone(cmd_output["id"])
self.assertEqual(
'aaaa',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
+
name2 = uuid.uuid4().hex
- raw_output = self.openstack(
- 'network create ' +
+ cmd_output = json.loads(self.openstack(
+ 'network create -f json ' +
'--description bbbb ' +
name2
- )
+ ))
+ self.assertIsNotNone(cmd_output["id"])
self.assertEqual(
'bbbb',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
del_output = self.openstack('network delete ' + name1 + ' ' + name2)
self.assertOutput('', del_output)
- @testtools.skip('broken SDK testing')
def test_network_list(self):
"""Test create defaults, list filters, delete"""
name1 = uuid.uuid4().hex
- raw_output = self.openstack(
- 'network create ' +
+ cmd_output = json.loads(self.openstack(
+ 'network create -f json ' +
'--description aaaa ' +
name1
- )
+ ))
self.addCleanup(self.openstack, 'network delete ' + name1)
+ self.assertIsNotNone(cmd_output["id"])
self.assertEqual(
'aaaa',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
# Check the default values
self.assertEqual(
'UP',
- re.search(self.re_enabled, raw_output).group(1),
+ cmd_output["admin_state_up"],
)
self.assertEqual(
- 'False',
- re.search(self.re_shared, raw_output).group(1),
+ False,
+ cmd_output["shared"],
)
self.assertEqual(
'Internal',
- re.search(self.re_external, raw_output).group(1),
+ cmd_output["router:external"],
)
# NOTE(dtroyer): is_default is not present in the create output
# so make sure it stays that way.
- self.assertIsNone(re.search(self.re_default, raw_output))
+ # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer,
+ # but the value seems to always be None, regardless
+ # of the --default or --no-default value.
+ # self.assertEqual('x', cmd_output)
+ if ('is_default' in cmd_output):
+ self.assertEqual(
+ None,
+ cmd_output["is_default"],
+ )
self.assertEqual(
- 'True',
- re.search(self.re_port_security, raw_output).group(1),
+ True,
+ cmd_output["port_security_enabled"],
)
name2 = uuid.uuid4().hex
- raw_output = self.openstack(
- 'network create ' +
+ cmd_output = json.loads(self.openstack(
+ 'network create -f json ' +
'--description bbbb ' +
'--disable ' +
'--share ' +
name2
- )
+ ))
self.addCleanup(self.openstack, 'network delete ' + name2)
+ self.assertIsNotNone(cmd_output["id"])
self.assertEqual(
'bbbb',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
self.assertEqual(
'DOWN',
- re.search(self.re_enabled, raw_output).group(1),
+ cmd_output["admin_state_up"],
)
self.assertEqual(
- 'True',
- re.search(self.re_shared, raw_output).group(1),
+ True,
+ cmd_output["shared"],
+ )
+ if ('is_default' in cmd_output):
+ self.assertEqual(
+ None,
+ cmd_output["is_default"],
+ )
+ self.assertEqual(
+ True,
+ cmd_output["port_security_enabled"],
)
# Test list --long
- raw_output = self.openstack('network list --long')
- self.assertIsNotNone(
- re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output)
- )
- self.assertIsNotNone(
- re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output)
- )
+ cmd_output = json.loads(self.openstack(
+ "network list -f json " +
+ "--long"
+ ))
+ col_name = [x["Name"] for x in cmd_output]
+ self.assertIn(name1, col_name)
+ self.assertIn(name2, col_name)
# Test list --long --enable
- raw_output = self.openstack('network list --long --enable')
- self.assertIsNotNone(
- re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output)
- )
- self.assertIsNone(
- re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output)
- )
+ cmd_output = json.loads(self.openstack(
+ "network list -f json " +
+ "--enable " +
+ "--long"
+ ))
+ col_name = [x["Name"] for x in cmd_output]
+ self.assertIn(name1, col_name)
+ self.assertNotIn(name2, col_name)
# Test list --long --disable
- raw_output = self.openstack('network list --long --disable')
- self.assertIsNone(
- re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output)
- )
- self.assertIsNotNone(
- re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output)
- )
+ cmd_output = json.loads(self.openstack(
+ "network list -f json " +
+ "--disable " +
+ "--long"
+ ))
+ col_name = [x["Name"] for x in cmd_output]
+ self.assertNotIn(name1, col_name)
+ self.assertIn(name2, col_name)
# Test list --long --share
- raw_output = self.openstack('network list --long --share')
- self.assertIsNone(
- re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output)
- )
- self.assertIsNotNone(
- re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output)
- )
+ cmd_output = json.loads(self.openstack(
+ "network list -f json " +
+ "--share " +
+ "--long"
+ ))
+ col_name = [x["Name"] for x in cmd_output]
+ self.assertNotIn(name1, col_name)
+ self.assertIn(name2, col_name)
# Test list --long --no-share
- raw_output = self.openstack('network list --long --no-share')
- self.assertIsNotNone(
- re.search("\|\s+" + name1 + "\s+\|\s+ACTIVE", raw_output)
- )
- self.assertIsNone(
- re.search("\|\s+" + name2 + "\s+\|\s+ACTIVE", raw_output)
- )
+ cmd_output = json.loads(self.openstack(
+ "network list -f json " +
+ "--no-share " +
+ "--long"
+ ))
+ col_name = [x["Name"] for x in cmd_output]
+ self.assertIn(name1, col_name)
+ self.assertNotIn(name2, col_name)
- @testtools.skip('broken SDK testing')
def test_network_set(self):
"""Tests create options, set, show, delete"""
name = uuid.uuid4().hex
- raw_output = self.openstack(
- 'network create ' +
+ cmd_output = json.loads(self.openstack(
+ 'network create -f json ' +
'--description aaaa ' +
'--enable ' +
'--no-share ' +
@@ -175,30 +184,38 @@ class NetworkTests(base.TestCase):
'--no-default ' +
'--enable-port-security ' +
name
- )
+ ))
self.addCleanup(self.openstack, 'network delete ' + name)
+ self.assertIsNotNone(cmd_output["id"])
self.assertEqual(
'aaaa',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
self.assertEqual(
'UP',
- re.search(self.re_enabled, raw_output).group(1),
+ cmd_output["admin_state_up"],
)
self.assertEqual(
- 'False',
- re.search(self.re_shared, raw_output).group(1),
+ False,
+ cmd_output["shared"],
)
self.assertEqual(
'Internal',
- re.search(self.re_external, raw_output).group(1),
+ cmd_output["router:external"],
)
# NOTE(dtroyer): is_default is not present in the create output
# so make sure it stays that way.
- self.assertIsNone(re.search(self.re_default, raw_output))
+ # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer,
+ # but the value seems to always be None, regardless
+ # of the --default or --no-default value.
+ if ('is_default' in cmd_output):
+ self.assertEqual(
+ None,
+ cmd_output["is_default"],
+ )
self.assertEqual(
- 'True',
- re.search(self.re_port_security, raw_output).group(1),
+ True,
+ cmd_output["port_security_enabled"],
)
raw_output = self.openstack(
@@ -212,32 +229,34 @@ class NetworkTests(base.TestCase):
)
self.assertOutput('', raw_output)
- raw_output = self.openstack('network show ' + name)
+ cmd_output = json.loads(self.openstack(
+ 'network show -f json ' + name
+ ))
self.assertEqual(
'cccc',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
self.assertEqual(
'DOWN',
- re.search(self.re_enabled, raw_output).group(1),
+ cmd_output["admin_state_up"],
)
self.assertEqual(
- 'True',
- re.search(self.re_shared, raw_output).group(1),
+ True,
+ cmd_output["shared"],
)
self.assertEqual(
'External',
- re.search(self.re_external, raw_output).group(1),
+ cmd_output["router:external"],
)
# why not 'None' like above??
self.assertEqual(
- 'False',
- re.search(self.re_default, raw_output).group(1),
+ False,
+ cmd_output["is_default"],
)
self.assertEqual(
- 'False',
- re.search(self.re_port_security, raw_output).group(1),
+ False,
+ cmd_output["port_security_enabled"],
)
# NOTE(dtroyer): There is ambiguity around is_default in that
@@ -252,14 +271,16 @@ class NetworkTests(base.TestCase):
)
self.assertOutput('', raw_output)
- raw_output = self.openstack('network show ' + name)
+ cmd_output = json.loads(self.openstack(
+ 'network show -f json ' + name
+ ))
self.assertEqual(
'cccc',
- re.search(self.re_description, raw_output).group(1),
+ cmd_output["description"],
)
# NOTE(dtroyer): This should be 'True'
self.assertEqual(
- 'False',
- re.search(self.re_default, raw_output).group(1),
+ False,
+ cmd_output["port_security_enabled"],
)
diff --git a/openstackclient/tests/functional/network/v2/test_network_agent.py b/openstackclient/tests/functional/network/v2/test_network_agent.py
index e99dcef6..dd6112e7 100644
--- a/openstackclient/tests/functional/network/v2/test_network_agent.py
+++ b/openstackclient/tests/functional/network/v2/test_network_agent.py
@@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import testtools
-
from openstackclient.tests.functional import base
@@ -28,13 +26,11 @@ class NetworkAgentTests(base.TestCase):
# get the list of network agent IDs.
cls.IDs = raw_output.split('\n')
- @testtools.skip('broken SDK testing')
def test_network_agent_show(self):
opts = self.get_opts(self.FIELDS)
raw_output = self.openstack('network agent show ' + self.IDs[0] + opts)
self.assertEqual(self.IDs[0] + "\n", raw_output)
- @testtools.skip('broken SDK testing')
def test_network_agent_set(self):
opts = self.get_opts(['admin_state_up'])
self.openstack('network agent set --disable ' + self.IDs[0])
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
new file mode 100644
index 00000000..af0c9bac
--- /dev/null
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
@@ -0,0 +1,181 @@
+# Copyright (c) 2016, Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from openstackclient.tests.functional import base
+
+
+class NetworkQosRuleTestsMinimumBandwidth(base.TestCase):
+ """Functional tests for QoS minimum bandwidth rule."""
+ RULE_ID = None
+ QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ MIN_KBPS = 2800
+ MIN_KBPS_MODIFIED = 7500
+ DIRECTION = '--egress'
+ HEADERS = ['ID']
+ FIELDS = ['id']
+ TYPE = 'minimum-bandwidth'
+
+ @classmethod
+ def setUpClass(cls):
+ opts = cls.get_opts(cls.FIELDS)
+ cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
+ cls.RULE_ID = cls.openstack('network qos rule create --type ' +
+ cls.TYPE + ' --min-kbps ' +
+ str(cls.MIN_KBPS) + ' ' + cls.DIRECTION +
+ ' ' + cls.QOS_POLICY_NAME + opts)
+ cls.assertsOutputNotNone(cls.RULE_ID)
+
+ @classmethod
+ def tearDownClass(cls):
+ raw_output = cls.openstack('network qos rule delete ' +
+ cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
+ cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
+ cls.assertOutput('', raw_output)
+
+ def test_qos_policy_list(self):
+ opts = self.get_opts(self.HEADERS)
+ raw_output = self.openstack('network qos rule list '
+ + self.QOS_POLICY_NAME + opts)
+ self.assertIn(self.RULE_ID, raw_output)
+
+ def test_qos_policy_show(self):
+ opts = self.get_opts(self.FIELDS)
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(self.RULE_ID, raw_output)
+
+ def test_qos_policy_set(self):
+ self.openstack('network qos rule set --min-kbps ' +
+ str(self.MIN_KBPS_MODIFIED) + ' ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ opts = self.get_opts(['min_kbps'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output)
+
+
+class NetworkQosRuleTestsDSCPMarking(base.TestCase):
+ """Functional tests for QoS DSCP marking rule."""
+ RULE_ID = None
+ QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ DSCP_MARK = 8
+ DSCP_MARK_MODIFIED = 32
+ HEADERS = ['ID']
+ FIELDS = ['id']
+ TYPE = 'dscp-marking'
+
+ @classmethod
+ def setUpClass(cls):
+ opts = cls.get_opts(cls.FIELDS)
+ cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
+ cls.RULE_ID = cls.openstack('network qos rule create --type ' +
+ cls.TYPE + ' --dscp-mark ' +
+ str(cls.DSCP_MARK) + ' ' +
+ cls.QOS_POLICY_NAME + opts)
+ cls.assertsOutputNotNone(cls.RULE_ID)
+
+ @classmethod
+ def tearDownClass(cls):
+ raw_output = cls.openstack('network qos rule delete ' +
+ cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
+ cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
+ cls.assertOutput('', raw_output)
+
+ def test_qos_policy_list(self):
+ opts = self.get_opts(self.HEADERS)
+ raw_output = self.openstack('network qos rule list '
+ + self.QOS_POLICY_NAME + opts)
+ self.assertIn(self.RULE_ID, raw_output)
+
+ def test_qos_policy_show(self):
+ opts = self.get_opts(self.FIELDS)
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(self.RULE_ID, raw_output)
+
+ def test_qos_policy_set(self):
+ self.openstack('network qos rule set --dscp-mark ' +
+ str(self.DSCP_MARK_MODIFIED) + ' ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ opts = self.get_opts(['dscp_mark'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output)
+
+
+class NetworkQosRuleTestsBandwidthLimit(base.TestCase):
+ """Functional tests for QoS bandwidth limit rule."""
+ RULE_ID = None
+ QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ MAX_KBPS = 10000
+ MAX_KBPS_MODIFIED = 15000
+ MAX_BURST_KBITS = 1400
+ MAX_BURST_KBITS_MODIFIED = 1800
+ HEADERS = ['ID']
+ FIELDS = ['id']
+ TYPE = 'bandwidth-limit'
+
+ @classmethod
+ def setUpClass(cls):
+ opts = cls.get_opts(cls.FIELDS)
+ cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
+ cls.RULE_ID = cls.openstack('network qos rule create --type ' +
+ cls.TYPE + ' --max-kbps ' +
+ str(cls.MAX_KBPS) + ' --max-burst-kbits ' +
+ str(cls.MAX_BURST_KBITS) + ' ' +
+ cls.QOS_POLICY_NAME + opts)
+ cls.assertsOutputNotNone(cls.RULE_ID)
+
+ @classmethod
+ def tearDownClass(cls):
+ raw_output = cls.openstack('network qos rule delete ' +
+ cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
+ cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
+ cls.assertOutput('', raw_output)
+
+ def test_qos_policy_list(self):
+ opts = self.get_opts(self.HEADERS)
+ raw_output = self.openstack('network qos rule list '
+ + self.QOS_POLICY_NAME + opts)
+ self.assertIn(self.RULE_ID, raw_output)
+
+ def test_qos_policy_show(self):
+ opts = self.get_opts(self.FIELDS)
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(self.RULE_ID, raw_output)
+
+ def test_qos_policy_set(self):
+ self.openstack('network qos rule set --max-kbps ' +
+ str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' +
+ str(self.MAX_BURST_KBITS_MODIFIED) + ' ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ opts = self.get_opts(['max_kbps'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.MAX_KBPS_MODIFIED) + "\n", raw_output)
+ opts = self.get_opts(['max_burst_kbps'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.MAX_BURST_KBITS_MODIFIED) + "\n", raw_output)
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
index 2bb04a9d..7dff0cbd 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
@@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import testtools
-
from openstackclient.tests.functional import base
@@ -22,10 +20,8 @@ class NetworkQosRuleTypeTests(base.TestCase):
"""Functional tests for Network QoS rule type. """
AVAILABLE_RULE_TYPES = ['dscp_marking',
- 'bandwidth_limit',
- 'minimum_bandwidth']
+ 'bandwidth_limit']
- @testtools.skip('broken SDK testing')
def test_qos_rule_type_list(self):
raw_output = self.openstack('network qos rule type list')
for rule_type in self.AVAILABLE_RULE_TYPES:
diff --git a/openstackclient/tests/functional/network/v2/test_port.py b/openstackclient/tests/functional/network/v2/test_port.py
index 976fbedb..decd9553 100644
--- a/openstackclient/tests/functional/network/v2/test_port.py
+++ b/openstackclient/tests/functional/network/v2/test_port.py
@@ -12,8 +12,6 @@
import uuid
-import testtools
-
from openstackclient.tests.functional import base
@@ -25,7 +23,6 @@ class PortTests(base.TestCase):
FIELDS = ['name']
@classmethod
- @testtools.skip('broken SDK testing')
def setUpClass(cls):
# Create a network for the subnet.
cls.openstack('network create ' + cls.NETWORK_NAME)
diff --git a/openstackclient/tests/functional/network/v2/test_security_group_rule.py b/openstackclient/tests/functional/network/v2/test_security_group_rule.py
index ec3731eb..c91de1a5 100644
--- a/openstackclient/tests/functional/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/functional/network/v2/test_security_group_rule.py
@@ -12,8 +12,6 @@
import uuid
-import testtools
-
from openstackclient.tests.functional import base
@@ -54,7 +52,6 @@ class SecurityGroupRuleTests(base.TestCase):
cls.SECURITY_GROUP_NAME)
cls.assertOutput('', raw_output)
- @testtools.skip('broken SDK testing')
def test_security_group_rule_list(self):
opts = self.get_opts(self.ID_HEADER)
raw_output = self.openstack('security group rule list ' +
diff --git a/openstackclient/tests/functional/post_test_hook_tips.sh b/openstackclient/tests/functional/post_test_hook_tips.sh
new file mode 100755
index 00000000..28ab9580
--- /dev/null
+++ b/openstackclient/tests/functional/post_test_hook_tips.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+
+# This is a script that kicks off a series of functional tests against an
+# OpenStack cloud. It will attempt to create an instance if one is not
+# available. Do not run this script unless you know what you're doing.
+# For more information refer to:
+# http://docs.openstack.org/developer/python-openstackclient/
+
+# This particular script differs from the normal post_test_hook because
+# it installs the master (tip) version of osc-lib, os-client-config
+# and openstacksdk, OSCs most important dependencies.
+
+function generate_testr_results {
+ if [ -f .testrepository/0 ]; then
+ sudo .tox/functional-tips/bin/testr last --subunit > $WORKSPACE/testrepository.subunit
+ sudo mv $WORKSPACE/testrepository.subunit $BASE/logs/testrepository.subunit
+ sudo .tox/functional-tips/bin/subunit2html $BASE/logs/testrepository.subunit $BASE/logs/testr_results.html
+ sudo gzip -9 $BASE/logs/testrepository.subunit
+ sudo gzip -9 $BASE/logs/testr_results.html
+ sudo chown jenkins:jenkins $BASE/logs/testrepository.subunit.gz $BASE/logs/testr_results.html.gz
+ sudo chmod a+r $BASE/logs/testrepository.subunit.gz $BASE/logs/testr_results.html.gz
+ fi
+}
+
+export OPENSTACKCLIENT_DIR="$BASE/new/python-openstackclient"
+sudo chown -R jenkins:stack $OPENSTACKCLIENT_DIR
+
+# Go to the openstackclient dir
+cd $OPENSTACKCLIENT_DIR
+
+# Run tests
+echo "Running openstackclient functional-tips test suite"
+set +e
+
+# Source environment variables to kick things off
+source ~stack/devstack/openrc admin admin
+echo 'Running tests with:'
+env | grep OS
+
+# Preserve env for OS_ credentials
+sudo -E -H -u jenkins tox -e functional-tips
+EXIT_CODE=$?
+set -e
+
+# Collect and parse result
+generate_testr_results
+exit $EXIT_CODE
diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py
index ca6b1d31..626b466d 100644
--- a/openstackclient/tests/unit/fakes.py
+++ b/openstackclient/tests/unit/fakes.py
@@ -219,6 +219,12 @@ class FakeResource(object):
def info(self):
return self._info
+ def __getitem__(self, item):
+ return self._info.get(item)
+
+ def get(self, item, default=None):
+ return self._info.get(item, default)
+
class FakeResponse(requests.Response):
diff --git a/openstackclient/tests/unit/identity/v2_0/test_project.py b/openstackclient/tests/unit/identity/v2_0/test_project.py
index c1f00762..4e1077db 100644
--- a/openstackclient/tests/unit/identity/v2_0/test_project.py
+++ b/openstackclient/tests/unit/identity/v2_0/test_project.py
@@ -13,8 +13,11 @@
# under the License.
#
+import mock
+
from keystoneauth1 import exceptions as ks_exc
from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v2_0 import project
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
@@ -302,6 +305,32 @@ class TestProjectDelete(TestProject):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_projects_with_exception(self, find_mock):
+ find_mock.side_effect = [self.fake_project,
+ exceptions.CommandError]
+ arglist = [
+ self.fake_project.id,
+ 'unexist_project',
+ ]
+ verifylist = [
+ ('projects', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 projects failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.projects_mock, self.fake_project.id)
+ find_mock.assert_any_call(self.projects_mock, 'unexist_project')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.projects_mock.delete.assert_called_once_with(self.fake_project.id)
+
class TestProjectList(TestProject):
diff --git a/openstackclient/tests/unit/identity/v2_0/test_role.py b/openstackclient/tests/unit/identity/v2_0/test_role.py
index 68ebf141..684ce803 100644
--- a/openstackclient/tests/unit/identity/v2_0/test_role.py
+++ b/openstackclient/tests/unit/identity/v2_0/test_role.py
@@ -17,6 +17,7 @@ import mock
from keystoneauth1 import exceptions as ks_exc
from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v2_0 import role
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
@@ -240,6 +241,32 @@ class TestRoleDelete(TestRole):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_roles_with_exception(self, find_mock):
+ find_mock.side_effect = [self.fake_role,
+ exceptions.CommandError]
+ arglist = [
+ self.fake_role.id,
+ 'unexist_role',
+ ]
+ verifylist = [
+ ('roles', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 roles failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.roles_mock, self.fake_role.id)
+ find_mock.assert_any_call(self.roles_mock, 'unexist_role')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.roles_mock.delete.assert_called_once_with(self.fake_role.id)
+
class TestRoleList(TestRole):
diff --git a/openstackclient/tests/unit/identity/v2_0/test_user.py b/openstackclient/tests/unit/identity/v2_0/test_user.py
index 765f8559..a8b9497e 100644
--- a/openstackclient/tests/unit/identity/v2_0/test_user.py
+++ b/openstackclient/tests/unit/identity/v2_0/test_user.py
@@ -17,6 +17,7 @@ import mock
from keystoneauth1 import exceptions as ks_exc
from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v2_0 import user
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
@@ -411,6 +412,32 @@ class TestUserDelete(TestUser):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_users_with_exception(self, find_mock):
+ find_mock.side_effect = [self.fake_user,
+ exceptions.CommandError]
+ arglist = [
+ self.fake_user.id,
+ 'unexist_user',
+ ]
+ verifylist = [
+ ('users', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 users failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.users_mock, self.fake_user.id)
+ find_mock.assert_any_call(self.users_mock, 'unexist_user')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.users_mock.delete.assert_called_once_with(self.fake_user.id)
+
class TestUserList(TestUser):
diff --git a/openstackclient/tests/unit/identity/v3/test_group.py b/openstackclient/tests/unit/identity/v3/test_group.py
index eb50adb5..8558de95 100644
--- a/openstackclient/tests/unit/identity/v3/test_group.py
+++ b/openstackclient/tests/unit/identity/v3/test_group.py
@@ -16,6 +16,7 @@ from mock import call
from keystoneauth1 import exceptions as ks_exc
from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v3 import group
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
@@ -257,6 +258,32 @@ class TestGroupDelete(TestGroup):
self.groups_mock.delete.assert_called_once_with(self.groups[0].id)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_groups_with_exception(self, find_mock):
+ find_mock.side_effect = [self.groups[0],
+ exceptions.CommandError]
+ arglist = [
+ self.groups[0].id,
+ 'unexist_group',
+ ]
+ verifylist = [
+ ('groups', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 groups failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.groups_mock, self.groups[0].id)
+ find_mock.assert_any_call(self.groups_mock, 'unexist_group')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.groups_mock.delete.assert_called_once_with(self.groups[0].id)
+
class TestGroupList(TestGroup):
diff --git a/openstackclient/tests/unit/identity/v3/test_project.py b/openstackclient/tests/unit/identity/v3/test_project.py
index 702d9209..2b898090 100644
--- a/openstackclient/tests/unit/identity/v3/test_project.py
+++ b/openstackclient/tests/unit/identity/v3/test_project.py
@@ -16,6 +16,7 @@
import mock
from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v3 import project
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
@@ -445,6 +446,32 @@ class TestProjectDelete(TestProject):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_projects_with_exception(self, find_mock):
+ find_mock.side_effect = [self.project,
+ exceptions.CommandError]
+ arglist = [
+ self.project.id,
+ 'unexist_project',
+ ]
+ verifylist = [
+ ('projects', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 projects failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.projects_mock, self.project.id)
+ find_mock.assert_any_call(self.projects_mock, 'unexist_project')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.projects_mock.delete.assert_called_once_with(self.project.id)
+
class TestProjectList(TestProject):
diff --git a/openstackclient/tests/unit/identity/v3/test_role.py b/openstackclient/tests/unit/identity/v3/test_role.py
index 448e18d3..c0b68bdf 100644
--- a/openstackclient/tests/unit/identity/v3/test_role.py
+++ b/openstackclient/tests/unit/identity/v3/test_role.py
@@ -14,6 +14,10 @@
#
import copy
+import mock
+
+from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v3 import role
from openstackclient.tests.unit import fakes
@@ -428,6 +432,36 @@ class TestRoleDelete(TestRole):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_roles_with_exception(self, find_mock):
+ find_mock.side_effect = [self.roles_mock.get.return_value,
+ exceptions.CommandError]
+ arglist = [
+ identity_fakes.role_name,
+ 'unexist_role',
+ ]
+ verifylist = [
+ ('roles', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 roles failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.roles_mock,
+ identity_fakes.role_name,
+ domain_id=None)
+ find_mock.assert_any_call(self.roles_mock,
+ 'unexist_role',
+ domain_id=None)
+
+ self.assertEqual(2, find_mock.call_count)
+ self.roles_mock.delete.assert_called_once_with(identity_fakes.role_id)
+
class TestRoleList(TestRole):
diff --git a/openstackclient/tests/unit/identity/v3/test_trust.py b/openstackclient/tests/unit/identity/v3/test_trust.py
index 4eeb8bfe..93e8f63d 100644
--- a/openstackclient/tests/unit/identity/v3/test_trust.py
+++ b/openstackclient/tests/unit/identity/v3/test_trust.py
@@ -12,6 +12,10 @@
#
import copy
+import mock
+
+from osc_lib import exceptions
+from osc_lib import utils
from openstackclient.identity.v3 import trust
from openstackclient.tests.unit import fakes
@@ -148,6 +152,33 @@ class TestTrustDelete(TestTrust):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_trusts_with_exception(self, find_mock):
+ find_mock.side_effect = [self.trusts_mock.get.return_value,
+ exceptions.CommandError]
+ arglist = [
+ identity_fakes.trust_id,
+ 'unexist_trust',
+ ]
+ verifylist = [
+ ('trust', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 trusts failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.trusts_mock, identity_fakes.trust_id)
+ find_mock.assert_any_call(self.trusts_mock, 'unexist_trust')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.trusts_mock.delete.assert_called_once_with(
+ identity_fakes.trust_id)
+
class TestTrustList(TestTrust):
diff --git a/openstackclient/tests/unit/identity/v3/test_user.py b/openstackclient/tests/unit/identity/v3/test_user.py
index 6150a5f3..3c1f49a6 100644
--- a/openstackclient/tests/unit/identity/v3/test_user.py
+++ b/openstackclient/tests/unit/identity/v3/test_user.py
@@ -16,6 +16,9 @@
import contextlib
import mock
+from osc_lib import exceptions
+from osc_lib import utils
+
from openstackclient.identity.v3 import user
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
@@ -465,6 +468,32 @@ class TestUserDelete(TestUser):
)
self.assertIsNone(result)
+ @mock.patch.object(utils, 'find_resource')
+ def test_delete_multi_users_with_exception(self, find_mock):
+ find_mock.side_effect = [self.user,
+ exceptions.CommandError]
+ arglist = [
+ self.user.id,
+ 'unexist_user',
+ ]
+ verifylist = [
+ ('users', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 users failed to delete.',
+ str(e))
+
+ find_mock.assert_any_call(self.users_mock, self.user.id)
+ find_mock.assert_any_call(self.users_mock, 'unexist_user')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.users_mock.delete.assert_called_once_with(self.user.id)
+
class TestUserList(TestUser):
diff --git a/openstackclient/tests/unit/image/v2/test_image.py b/openstackclient/tests/unit/image/v2/test_image.py
index a054e513..164185df 100644
--- a/openstackclient/tests/unit/image/v2/test_image.py
+++ b/openstackclient/tests/unit/image/v2/test_image.py
@@ -829,6 +829,11 @@ class TestImageSet(TestImage):
self.images_mock.get.return_value = self.model(**image_fakes.IMAGE)
self.images_mock.update.return_value = self.model(**image_fakes.IMAGE)
+
+ self.app.client_manager.auth_ref = mock.Mock(
+ project_id=self.project.id,
+ )
+
# Get the command object to test
self.cmd = image.SetImage(self.app, None)
@@ -845,6 +850,101 @@ class TestImageSet(TestImage):
self.assertIsNone(result)
+ self.image_members_mock.update.assert_not_called()
+
+ def test_image_set_membership_option_accept(self):
+ membership = image_fakes.FakeImage.create_one_image_member(
+ attrs={'image_id': image_fakes.image_id,
+ 'member_id': self.project.id}
+ )
+ self.image_members_mock.update.return_value = membership
+
+ arglist = [
+ '--accept',
+ image_fakes.image_id,
+ ]
+ verifylist = [
+ ('accept', True),
+ ('reject', False),
+ ('pending', False),
+ ('image', image_fakes.image_id)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+
+ self.image_members_mock.update.assert_called_once_with(
+ image_fakes.image_id,
+ self.app.client_manager.auth_ref.project_id,
+ 'accepted',
+ )
+
+ # Assert that the 'update image" route is also called, in addition to
+ # the 'update membership' route.
+ self.images_mock.update.assert_called_with(image_fakes.image_id)
+
+ def test_image_set_membership_option_reject(self):
+ membership = image_fakes.FakeImage.create_one_image_member(
+ attrs={'image_id': image_fakes.image_id,
+ 'member_id': self.project.id}
+ )
+ self.image_members_mock.update.return_value = membership
+
+ arglist = [
+ '--reject',
+ image_fakes.image_id,
+ ]
+ verifylist = [
+ ('accept', False),
+ ('reject', True),
+ ('pending', False),
+ ('image', image_fakes.image_id)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+
+ self.image_members_mock.update.assert_called_once_with(
+ image_fakes.image_id,
+ self.app.client_manager.auth_ref.project_id,
+ 'rejected',
+ )
+
+ # Assert that the 'update image" route is also called, in addition to
+ # the 'update membership' route.
+ self.images_mock.update.assert_called_with(image_fakes.image_id)
+
+ def test_image_set_membership_option_pending(self):
+ membership = image_fakes.FakeImage.create_one_image_member(
+ attrs={'image_id': image_fakes.image_id,
+ 'member_id': self.project.id}
+ )
+ self.image_members_mock.update.return_value = membership
+
+ arglist = [
+ '--pending',
+ image_fakes.image_id,
+ ]
+ verifylist = [
+ ('accept', False),
+ ('reject', False),
+ ('pending', True),
+ ('image', image_fakes.image_id)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+
+ self.image_members_mock.update.assert_called_once_with(
+ image_fakes.image_id,
+ self.app.client_manager.auth_ref.project_id,
+ 'pending',
+ )
+
+ # Assert that the 'update image" route is also called, in addition to
+ # the 'update membership' route.
+ self.images_mock.update.assert_called_with(image_fakes.image_id)
+
def test_image_set_options(self):
arglist = [
'--name', 'new-name',
diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py
index b931cb55..524285ab 100644
--- a/openstackclient/tests/unit/network/v2/fakes.py
+++ b/openstackclient/tests/unit/network/v2/fakes.py
@@ -14,6 +14,8 @@
import argparse
import copy
import mock
+from random import choice
+from random import randint
import uuid
from openstackclient.tests.unit import fakes
@@ -37,10 +39,20 @@ QUOTA = {
"l7policy": 5,
}
+RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
+RULE_TYPE_DSCP_MARKING = 'dscp-marking'
+RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+VALID_QOS_RULES = [RULE_TYPE_BANDWIDTH_LIMIT,
+ RULE_TYPE_DSCP_MARKING,
+ RULE_TYPE_MINIMUM_BANDWIDTH]
+VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
+ 34, 36, 38, 40, 46, 48, 56]
+
class FakeNetworkV2Client(object):
def __init__(self, **kwargs):
+ self.session = mock.Mock()
self.extensions = mock.Mock()
self.extensions.resource_class = fakes.FakeResource(None, {})
@@ -553,6 +565,8 @@ class FakeNetworkAgent(object):
agent_attrs.update(attrs)
agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs),
loaded=True)
+ agent.is_admin_state_up = agent_attrs['admin_state_up']
+ agent.is_alive = agent_attrs['alive']
return agent
@staticmethod
@@ -662,83 +676,90 @@ class FakeNetworkRBAC(object):
return mock.Mock(side_effect=rbac_policies)
-class FakeNetworkQosBandwidthLimitRule(object):
- """Fake one or more QoS bandwidth limit rules."""
+class FakeNetworkQosPolicy(object):
+ """Fake one or more QoS policies."""
@staticmethod
- def create_one_qos_bandwidth_limit_rule(attrs=None):
- """Create a fake QoS bandwidth limit rule.
+ def create_one_qos_policy(attrs=None):
+ """Create a fake QoS policy.
:param Dictionary attrs:
A dictionary with all attributes
:return:
- A FakeResource object with id, qos_policy_id, max_kbps and
- max_burst_kbps attributes.
+ A FakeResource object with name, id, etc.
"""
attrs = attrs or {}
+ qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex
+ rule_attrs = {'qos_policy_id': qos_id}
+ rules = [FakeNetworkQosRule.create_one_qos_rule(rule_attrs)]
# Set default attributes.
- qos_bandwidth_limit_rule_attrs = {
- 'id': 'qos-bandwidth-limit-rule-id-' + uuid.uuid4().hex,
- 'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex,
- 'max_kbps': 1500,
- 'max_burst_kbps': 1200,
+ qos_policy_attrs = {
+ 'name': 'qos-policy-name-' + uuid.uuid4().hex,
+ 'id': qos_id,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'shared': False,
+ 'description': 'qos-policy-description-' + uuid.uuid4().hex,
+ 'rules': rules,
}
# Overwrite default attributes.
- qos_bandwidth_limit_rule_attrs.update(attrs)
+ qos_policy_attrs.update(attrs)
- qos_bandwidth_limit_rule = fakes.FakeResource(
- info=copy.deepcopy(qos_bandwidth_limit_rule_attrs),
+ qos_policy = fakes.FakeResource(
+ info=copy.deepcopy(qos_policy_attrs),
loaded=True)
- return qos_bandwidth_limit_rule
+ # Set attributes with special mapping in OpenStack SDK.
+ qos_policy.is_shared = qos_policy_attrs['shared']
+ qos_policy.project_id = qos_policy_attrs['tenant_id']
+
+ return qos_policy
@staticmethod
- def create_qos_bandwidth_limit_rules(attrs=None, count=2):
- """Create multiple fake QoS bandwidth limit rules.
+ def create_qos_policies(attrs=None, count=2):
+ """Create multiple fake QoS policies.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
- The number of QoS bandwidth limit rules to fake
+ The number of QoS policies to fake
:return:
- A list of FakeResource objects faking the QoS bandwidth limit rules
+ A list of FakeResource objects faking the QoS policies
"""
qos_policies = []
for i in range(0, count):
- qos_policies.append(FakeNetworkQosBandwidthLimitRule.
- create_one_qos_bandwidth_limit_rule(attrs))
+ qos_policies.append(
+ FakeNetworkQosPolicy.create_one_qos_policy(attrs))
return qos_policies
@staticmethod
- def get_qos_bandwidth_limit_rules(qos_rules=None, count=2):
- """Get a list of faked QoS bandwidth limit rules.
+ def get_qos_policies(qos_policies=None, count=2):
+ """Get an iterable MagicMock object with a list of faked QoS policies.
- If QoS bandwidth limit rules list is provided, then initialize the
- Mock object with the list. Otherwise create one.
+ If qos policies list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
:param List address scopes:
- A list of FakeResource objects faking QoS bandwidth limit rules
+ A list of FakeResource objects faking qos policies
:param int count:
- The number of QoS bandwidth limit rules to fake
+ The number of QoS policies to fake
:return:
An iterable Mock object with side_effect set to a list of faked
- qos bandwidth limit rules
+ QoS policies
"""
- if qos_rules is None:
- qos_rules = (FakeNetworkQosBandwidthLimitRule.
- create_qos_bandwidth_limit_rules(count))
- return mock.Mock(side_effect=qos_rules)
+ if qos_policies is None:
+ qos_policies = FakeNetworkQosPolicy.create_qos_policies(count)
+ return mock.Mock(side_effect=qos_policies)
-class FakeNetworkQosPolicy(object):
- """Fake one or more QoS policies."""
+class FakeNetworkQosRule(object):
+ """Fake one or more Network QoS rules."""
@staticmethod
- def create_one_qos_policy(attrs=None):
- """Create a fake QoS policy.
+ def create_one_qos_rule(attrs=None):
+ """Create a fake Network QoS rule.
:param Dictionary attrs:
A dictionary with all attributes
@@ -746,71 +767,69 @@ class FakeNetworkQosPolicy(object):
A FakeResource object with name, id, etc.
"""
attrs = attrs or {}
- qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex
- rule_attrs = {'qos_policy_id': qos_id}
- rules = [
- FakeNetworkQosBandwidthLimitRule.
- create_one_qos_bandwidth_limit_rule(rule_attrs)]
# Set default attributes.
- qos_policy_attrs = {
- 'name': 'qos-policy-name-' + uuid.uuid4().hex,
- 'id': qos_id,
+ type = attrs.get('type') or choice(VALID_QOS_RULES)
+ qos_rule_attrs = {
+ 'id': 'qos-rule-id-' + uuid.uuid4().hex,
+ 'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex,
'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'shared': False,
- 'description': 'qos-policy-description-' + uuid.uuid4().hex,
- 'rules': rules,
+ 'type': type,
}
+ if type == RULE_TYPE_BANDWIDTH_LIMIT:
+ qos_rule_attrs['max_kbps'] = randint(1, 10000)
+ qos_rule_attrs['max_burst_kbits'] = randint(1, 10000)
+ elif type == RULE_TYPE_DSCP_MARKING:
+ qos_rule_attrs['dscp_mark'] = choice(VALID_DSCP_MARKS)
+ elif type == RULE_TYPE_MINIMUM_BANDWIDTH:
+ qos_rule_attrs['min_kbps'] = randint(1, 10000)
+ qos_rule_attrs['direction'] = 'egress'
# Overwrite default attributes.
- qos_policy_attrs.update(attrs)
+ qos_rule_attrs.update(attrs)
- qos_policy = fakes.FakeResource(
- info=copy.deepcopy(qos_policy_attrs),
- loaded=True)
+ qos_rule = fakes.FakeResource(info=copy.deepcopy(qos_rule_attrs),
+ loaded=True)
# Set attributes with special mapping in OpenStack SDK.
- qos_policy.is_shared = qos_policy_attrs['shared']
- qos_policy.project_id = qos_policy_attrs['tenant_id']
+ qos_rule.project_id = qos_rule['tenant_id']
- return qos_policy
+ return qos_rule
@staticmethod
- def create_qos_policies(attrs=None, count=2):
- """Create multiple fake QoS policies.
+ def create_qos_rules(attrs=None, count=2):
+ """Create multiple fake Network QoS rules.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
- The number of QoS policies to fake
+ The number of Network QoS rule to fake
:return:
- A list of FakeResource objects faking the QoS policies
+ A list of FakeResource objects faking the Network QoS rules
"""
- qos_policies = []
+ qos_rules = []
for i in range(0, count):
- qos_policies.append(
- FakeNetworkQosPolicy.create_one_qos_policy(attrs))
-
- return qos_policies
+ qos_rules.append(FakeNetworkQosRule.create_one_qos_rule(attrs))
+ return qos_rules
@staticmethod
- def get_qos_policies(qos_policies=None, count=2):
- """Get an iterable MagicMock object with a list of faked QoS policies.
+ def get_qos_rules(qos_rules=None, count=2):
+ """Get a list of faked Network QoS rules.
- If qos policies list is provided, then initialize the Mock object
- with the list. Otherwise create one.
+ If Network QoS rules list is provided, then initialize the Mock
+ object with the list. Otherwise create one.
:param List address scopes:
- A list of FakeResource objects faking qos policies
+ A list of FakeResource objects faking Network QoS rules
:param int count:
- The number of QoS policies to fake
+ The number of QoS minimum bandwidth rules to fake
:return:
An iterable Mock object with side_effect set to a list of faked
- QoS policies
+ qos minimum bandwidth rules
"""
- if qos_policies is None:
- qos_policies = FakeNetworkQosPolicy.create_qos_policies(count)
- return mock.Mock(side_effect=qos_policies)
+ if qos_rules is None:
+ qos_rules = (FakeNetworkQosRule.create_qos_rules(count))
+ return mock.Mock(side_effect=qos_rules)
class FakeNetworkQosRuleType(object):
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip.py b/openstackclient/tests/unit/network/v2/test_floating_ip.py
index 63d22bf8..e395300d 100644
--- a/openstackclient/tests/unit/network/v2/test_floating_ip.py
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip.py
@@ -208,13 +208,19 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
super(TestDeleteFloatingIPNetwork, self).setUp()
self.network.delete_ip = mock.Mock(return_value=None)
- self.network.find_ip = (
- network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
# Get the command object to test
self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace)
- def test_floating_ip_delete(self):
+ @mock.patch(
+ "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "floating_ip._find_floating_ip"
+ )
+ def test_floating_ip_delete(self, find_floating_ip_mock):
+ find_floating_ip_mock.side_effect = [
+ (self.floating_ips[0], []),
+ (self.floating_ips[1], []),
+ ]
arglist = [
self.floating_ips[0].id,
]
@@ -225,12 +231,24 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
result = self.cmd.take_action(parsed_args)
- self.network.find_ip.assert_called_once_with(
- self.floating_ips[0].id, ignore_missing=False)
+ find_floating_ip_mock.assert_called_once_with(
+ mock.ANY,
+ [],
+ self.floating_ips[0].id,
+ ignore_missing=False,
+ )
self.network.delete_ip.assert_called_once_with(self.floating_ips[0])
self.assertIsNone(result)
- def test_multi_floating_ips_delete(self):
+ @mock.patch(
+ "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "floating_ip._find_floating_ip"
+ )
+ def test_floating_ip_delete_multi(self, find_floating_ip_mock):
+ find_floating_ip_mock.side_effect = [
+ (self.floating_ips[0], []),
+ (self.floating_ips[1], []),
+ ]
arglist = []
verifylist = []
@@ -243,13 +261,37 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
result = self.cmd.take_action(parsed_args)
+ calls = [
+ call(
+ mock.ANY,
+ [],
+ self.floating_ips[0].id,
+ ignore_missing=False,
+ ),
+ call(
+ mock.ANY,
+ [],
+ self.floating_ips[1].id,
+ ignore_missing=False,
+ ),
+ ]
+ find_floating_ip_mock.assert_has_calls(calls)
+
calls = []
for f in self.floating_ips:
calls.append(call(f))
self.network.delete_ip.assert_has_calls(calls)
self.assertIsNone(result)
- def test_multi_floating_ips_delete_with_exception(self):
+ @mock.patch(
+ "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "floating_ip._find_floating_ip"
+ )
+ def test_floating_ip_delete_multi_exception(self, find_floating_ip_mock):
+ find_floating_ip_mock.side_effect = [
+ (self.floating_ips[0], []),
+ exceptions.CommandError,
+ ]
arglist = [
self.floating_ips[0].id,
'unexist_floating_ip',
@@ -260,21 +302,24 @@ class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- find_mock_result = [self.floating_ips[0], exceptions.CommandError]
- self.network.find_ip = (
- mock.Mock(side_effect=find_mock_result)
- )
-
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
- self.network.find_ip.assert_any_call(
- self.floating_ips[0].id, ignore_missing=False)
- self.network.find_ip.assert_any_call(
- 'unexist_floating_ip', ignore_missing=False)
+ find_floating_ip_mock.assert_any_call(
+ mock.ANY,
+ [],
+ self.floating_ips[0].id,
+ ignore_missing=False,
+ )
+ find_floating_ip_mock.assert_any_call(
+ mock.ANY,
+ [],
+ 'unexist_floating_ip',
+ ignore_missing=False,
+ )
self.network.delete_ip.assert_called_once_with(
self.floating_ips[0]
)
@@ -534,7 +579,12 @@ class TestShowFloatingIPNetwork(TestFloatingIPNetwork):
# Get the command object to test
self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace)
- def test_floating_ip_show(self):
+ @mock.patch(
+ "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "floating_ip._find_floating_ip"
+ )
+ def test_floating_ip_show(self, find_floating_ip_mock):
+ find_floating_ip_mock.return_value = (self.floating_ip, [])
arglist = [
self.floating_ip.id,
]
@@ -545,9 +595,11 @@ class TestShowFloatingIPNetwork(TestFloatingIPNetwork):
columns, data = self.cmd.take_action(parsed_args)
- self.network.find_ip.assert_called_once_with(
+ find_floating_ip_mock.assert_called_once_with(
+ mock.ANY,
+ [],
self.floating_ip.id,
- ignore_missing=False
+ ignore_missing=False,
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_network_agent.py b/openstackclient/tests/unit/network/v2/test_network_agent.py
index 9fd395b4..2fc0c043 100644
--- a/openstackclient/tests/unit/network/v2/test_network_agent.py
+++ b/openstackclient/tests/unit/network/v2/test_network_agent.py
@@ -195,6 +195,8 @@ class TestListNetworkAgent(TestNetworkAgent):
self.assertEqual(self.data, list(data))
+# TODO(huanxuan): Also update by the new attribute name
+# "is_admin_state_up" after sdk 0.9.12
class TestSetNetworkAgent(TestNetworkAgent):
_network_agent = (
@@ -324,6 +326,6 @@ class TestShowNetworkAgent(TestNetworkAgent):
columns, data = self.cmd.take_action(parsed_args)
self.network.get_agent.assert_called_once_with(
- self._network_agent.id, ignore_missing=False)
+ self._network_agent.id)
self.assertEqual(self.columns, columns)
self.assertEqual(list(self.data), list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
new file mode 100644
index 00000000..41ccae32
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
@@ -0,0 +1,1049 @@
+# Copyright (c) 2016, Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import network_qos_rule
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
+RULE_TYPE_DSCP_MARKING = 'dscp-marking'
+RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
+ 34, 36, 38, 40, 46, 48, 56]
+
+
+class TestNetworkQosRule(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNetworkQosRule, self).setUp()
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ self.qos_policy = (network_fakes.FakeNetworkQosPolicy.
+ create_one_qos_policy())
+ self.network.find_qos_policy = mock.Mock(return_value=self.qos_policy)
+
+
+class TestCreateNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'direction',
+ 'id',
+ 'min_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.direction,
+ self.new_rule.id,
+ self.new_rule.min_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_MINIMUM_BANDWIDTH,
+ '--min-kbps', str(self.new_rule.min_kbps),
+ '--egress',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_MINIMUM_BANDWIDTH),
+ ('min_kbps', self.new_rule.min_kbps),
+ ('egress', True),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_qos_minimum_bandwidth_rule.assert_called_once_with(
+ self.qos_policy.id,
+ **{'min_kbps': self.new_rule.min_kbps,
+ 'direction': self.new_rule.direction}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_MINIMUM_BANDWIDTH,
+ '--max-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_MINIMUM_BANDWIDTH),
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('"Create" rule command for type "minimum-bandwidth" '
+ 'requires arguments min_kbps, direction')
+ self.assertEqual(msg, str(e))
+
+
+class TestCreateNetworkQosRuleDSCPMarking(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'dscp_mark',
+ 'id',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.dscp_mark,
+ self.new_rule.id,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_DSCP_MARKING,
+ '--dscp-mark', str(self.new_rule.dscp_mark),
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_DSCP_MARKING),
+ ('dscp_mark', self.new_rule.dscp_mark),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_qos_dscp_marking_rule.assert_called_once_with(
+ self.qos_policy.id,
+ **{'dscp_mark': self.new_rule.dscp_mark}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_DSCP_MARKING,
+ '--max-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_DSCP_MARKING),
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('"Create" rule command for type "dscp-marking" '
+ 'requires arguments dscp_mark')
+ self.assertEqual(msg, str(e))
+
+
+class TestCreateNetworkQosRuleBandwidtLimit(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleBandwidtLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'id',
+ 'max_burst_kbits',
+ 'max_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.id,
+ self.new_rule.max_burst_kbits,
+ self.new_rule.max_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_BANDWIDTH_LIMIT,
+ '--max-kbps', str(self.new_rule.max_kbps),
+ '--max-burst-kbits', str(self.new_rule.max_burst_kbits),
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_BANDWIDTH_LIMIT),
+ ('max_kbps', self.new_rule.max_kbps),
+ ('max_burst_kbits', self.new_rule.max_burst_kbits),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_qos_bandwidth_limit_rule.assert_called_once_with(
+ self.qos_policy.id,
+ **{'max_kbps': self.new_rule.max_kbps,
+ 'max_burst_kbps': self.new_rule.max_burst_kbits}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_BANDWIDTH_LIMIT,
+ '--min-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_BANDWIDTH_LIMIT),
+ ('min_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('"Create" rule command for type "bandwidth-limit" '
+ 'requires arguments max_kbps, max_burst_kbps')
+ self.assertEqual(msg, str(e))
+
+
+class TestDeleteNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_minimum_bandwidth_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_minimum_bandwidth_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_minimum_bandwidth_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
+class TestDeleteNetworkQosRuleDSCPMarking(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_dscp_marking_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_dscp_marking_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_dscp_marking_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_dscp_marking_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
+class TestDeleteNetworkQosRuleBandwidthLimit(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleBandwidthLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_bandwidth_limit_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_bandwidth_limit_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_bandwidth_limit_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
+class TestSetNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_minimum_bandwidth_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_min_kbps(self):
+ self._set_min_kbps()
+
+ def test_set_min_kbps_to_zero(self):
+ self._set_min_kbps(min_kbps=0)
+
+ def _set_min_kbps(self, min_kbps=None):
+ if min_kbps:
+ previous_min_kbps = self.new_rule.min_kbps
+ self.new_rule.min_kbps = min_kbps
+
+ arglist = [
+ '--min-kbps', str(self.new_rule.min_kbps),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('min_kbps', self.new_rule.min_kbps),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'min_kbps': self.new_rule.min_kbps,
+ }
+ self.network.update_qos_minimum_bandwidth_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ if min_kbps:
+ self.new_rule.min_kbps = previous_min_kbps
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--max-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"minimum-bandwidth" only requires arguments min_kbps, '
+ 'direction' % {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
+class TestSetNetworkQosRuleDSCPMarking(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_dscp_marking_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_dscp_marking_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_dscp_mark(self):
+ self._set_dscp_mark()
+
+ def test_set_dscp_mark_to_zero(self):
+ self._set_dscp_mark(dscp_mark=0)
+
+ def _set_dscp_mark(self, dscp_mark=None):
+ if dscp_mark:
+ previous_dscp_mark = self.new_rule.dscp_mark
+ self.new_rule.dscp_mark = dscp_mark
+
+ arglist = [
+ '--dscp-mark', str(self.new_rule.dscp_mark),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('dscp_mark', self.new_rule.dscp_mark),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'dscp_mark': self.new_rule.dscp_mark,
+ }
+ self.network.update_qos_dscp_marking_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ if dscp_mark:
+ self.new_rule.dscp_mark = previous_dscp_mark
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--max-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"dscp-marking" only requires arguments dscp_mark' %
+ {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
+class TestSetNetworkQosRuleBandwidthLimit(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleBandwidthLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_bandwidth_limit_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_max_kbps(self):
+ self._set_max_kbps()
+
+ def test_set_max_kbps_to_zero(self):
+ self._set_max_kbps(max_kbps=0)
+
+ def _set_max_kbps(self, max_kbps=None):
+ if max_kbps:
+ previous_max_kbps = self.new_rule.max_kbps
+ self.new_rule.max_kbps = max_kbps
+
+ arglist = [
+ '--max-kbps', str(self.new_rule.max_kbps),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_kbps', self.new_rule.max_kbps),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'max_kbps': self.new_rule.max_kbps,
+ }
+ self.network.update_qos_bandwidth_limit_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ if max_kbps:
+ self.new_rule.max_kbps = previous_max_kbps
+
+ def test_set_max_burst_kbits(self):
+ self._set_max_burst_kbits()
+
+ def test_set_max_burst_kbits_to_zero(self):
+ self._set_max_burst_kbits(max_burst_kbits=0)
+
+ def _set_max_burst_kbits(self, max_burst_kbits=None):
+ if max_burst_kbits:
+ previous_max_burst_kbits = self.new_rule.max_burst_kbits
+ self.new_rule.max_burst_kbits = max_burst_kbits
+
+ arglist = [
+ '--max-burst-kbits', str(self.new_rule.max_burst_kbits),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_burst_kbits', self.new_rule.max_burst_kbits),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'max_burst_kbps': self.new_rule.max_burst_kbits,
+ }
+ self.network.update_qos_bandwidth_limit_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ if max_burst_kbits:
+ self.new_rule.max_burst_kbits = previous_max_burst_kbits
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--min-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('min_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"bandwidth-limit" only requires arguments max_kbps, '
+ 'max_burst_kbps' % {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
+class TestListNetworkQosRule(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestListNetworkQosRule, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule_min_bw = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
+ attrs['type'] = RULE_TYPE_DSCP_MARKING
+ self.new_rule_dscp_mark = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
+ attrs['type'] = RULE_TYPE_BANDWIDTH_LIMIT
+ self.new_rule_max_bw = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
+ self.qos_policy.rules = [self.new_rule_min_bw,
+ self.new_rule_dscp_mark,
+ self.new_rule_max_bw]
+ self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule_min_bw)
+ self.network.find_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule_dscp_mark)
+ self.network.find_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule_max_bw)
+ self.columns = (
+ 'ID',
+ 'QoS Policy ID',
+ 'Type',
+ 'Max Kbps',
+ 'Max Burst Kbits',
+ 'Min Kbps',
+ 'DSCP mark',
+ 'Direction',
+ )
+ self.data = []
+ for index in range(len(self.qos_policy.rules)):
+ self.data.append((
+ self.qos_policy.rules[index].id,
+ self.qos_policy.rules[index].qos_policy_id,
+ self.qos_policy.rules[index].type,
+ getattr(self.qos_policy.rules[index], 'max_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'max_burst_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'min_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'dscp_mark', ''),
+ getattr(self.qos_policy.rules[index], 'direction', ''),
+ ))
+ # Get the command object to test
+ self.cmd = network_qos_rule.ListNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_rule_list(self):
+ arglist = [
+ self.qos_policy.id
+ ]
+ verifylist = [
+ ('qos_policy', self.qos_policy.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ list_data = list(data)
+ self.assertEqual(len(self.data), len(list_data))
+ for index in range(len(list_data)):
+ self.assertEqual(self.data[index], list_data[index])
+
+
+class TestShowNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'direction',
+ 'id',
+ 'min_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.direction,
+ self.new_rule.id,
+ self.new_rule.min_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_minimum_bandwidth_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
+
+
+class TestShowNetworkQosDSCPMarking(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'dscp_mark',
+ 'id',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.dscp_mark,
+ self.new_rule.id,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_dscp_marking_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
+
+
+class TestShowNetworkQosBandwidthLimit(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosBandwidthLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'id',
+ 'max_burst_kbits',
+ 'max_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.id,
+ self.new_rule.max_burst_kbits,
+ self.new_rule.max_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_bandwidth_limit_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
diff --git a/openstackclient/tests/unit/volume/v2/fakes.py b/openstackclient/tests/unit/volume/v2/fakes.py
index d5cd72ec..a6676403 100644
--- a/openstackclient/tests/unit/volume/v2/fakes.py
+++ b/openstackclient/tests/unit/volume/v2/fakes.py
@@ -903,3 +903,23 @@ class FakeType(object):
volume_types.append(volume_type)
return volume_types
+
+ @staticmethod
+ def get_types(types=None, count=2):
+ """Get an iterable MagicMock object with a list of faked types.
+
+ If types list is provided, then initialize the Mock object with the
+ list. Otherwise create one.
+
+ :param List types:
+ A list of FakeResource objects faking types
+ :param Integer count:
+ The number of types to be faked
+ :return
+ An iterable Mock object with side_effect set to a list of faked
+ types
+ """
+ if types is None:
+ types = FakeType.create_types(count)
+
+ return mock.Mock(side_effect=types)
diff --git a/openstackclient/tests/unit/volume/v2/test_consistency_group.py b/openstackclient/tests/unit/volume/v2/test_consistency_group.py
index bc99ca8d..6eeeae39 100644
--- a/openstackclient/tests/unit/volume/v2/test_consistency_group.py
+++ b/openstackclient/tests/unit/volume/v2/test_consistency_group.py
@@ -36,10 +36,115 @@ class TestConsistencyGroup(volume_fakes.TestVolume):
self.app.client_manager.volume.cgsnapshots)
self.cgsnapshots_mock.reset_mock()
+ self.volumes_mock = (
+ self.app.client_manager.volume.volumes)
+ self.volumes_mock.reset_mock()
+
self.types_mock = self.app.client_manager.volume.volume_types
self.types_mock.reset_mock()
+class TestConsistencyGroupAddVolume(TestConsistencyGroup):
+
+ _consistency_group = (
+ volume_fakes.FakeConsistencyGroup.create_one_consistency_group())
+
+ def setUp(self):
+ super(TestConsistencyGroupAddVolume, self).setUp()
+
+ self.consistencygroups_mock.get.return_value = (
+ self._consistency_group)
+ # Get the command object to test
+ self.cmd = \
+ consistency_group.AddVolumeToConsistencyGroup(self.app, None)
+
+ def test_add_one_volume_to_consistency_group(self):
+ volume = volume_fakes.FakeVolume.create_one_volume()
+ self.volumes_mock.get.return_value = volume
+ arglist = [
+ self._consistency_group.id,
+ volume.id,
+ ]
+ verifylist = [
+ ('consistency_group', self._consistency_group.id),
+ ('volumes', [volume.id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'add_volumes': volume.id,
+ }
+ self.consistencygroups_mock.update.assert_called_once_with(
+ self._consistency_group.id,
+ **kwargs
+ )
+ self.assertIsNone(result)
+
+ def test_add_multiple_volumes_to_consistency_group(self):
+ volumes = volume_fakes.FakeVolume.create_volumes(count=2)
+ self.volumes_mock.get = volume_fakes.FakeVolume.get_volumes(volumes)
+ arglist = [
+ self._consistency_group.id,
+ volumes[0].id,
+ volumes[1].id,
+ ]
+ verifylist = [
+ ('consistency_group', self._consistency_group.id),
+ ('volumes', [volumes[0].id, volumes[1].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'add_volumes': volumes[0].id + ',' + volumes[1].id,
+ }
+ self.consistencygroups_mock.update.assert_called_once_with(
+ self._consistency_group.id,
+ **kwargs
+ )
+ self.assertIsNone(result)
+
+ @mock.patch.object(consistency_group.LOG, 'error')
+ def test_add_multiple_volumes_to_consistency_group_with_exception(
+ self, mock_error):
+ volume = volume_fakes.FakeVolume.create_one_volume()
+ arglist = [
+ self._consistency_group.id,
+ volume.id,
+ 'unexist_volume',
+ ]
+ verifylist = [
+ ('consistency_group', self._consistency_group.id),
+ ('volumes', [volume.id, 'unexist_volume']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [volume,
+ exceptions.CommandError,
+ self._consistency_group]
+ with mock.patch.object(utils, 'find_resource',
+ side_effect=find_mock_result) as find_mock:
+ result = self.cmd.take_action(parsed_args)
+ mock_error.assert_called_with("1 of 2 volumes failed to add.")
+ self.assertIsNone(result)
+ find_mock.assert_any_call(self.consistencygroups_mock,
+ self._consistency_group.id)
+ find_mock.assert_any_call(self.volumes_mock,
+ volume.id)
+ find_mock.assert_any_call(self.volumes_mock,
+ 'unexist_volume')
+ self.assertEqual(3, find_mock.call_count)
+ self.consistencygroups_mock.update.assert_called_once_with(
+ self._consistency_group.id, add_volumes=volume.id
+ )
+
+
class TestConsistencyGroupCreate(TestConsistencyGroup):
volume_type = volume_fakes.FakeType.create_one_type()
@@ -394,6 +499,107 @@ class TestConsistencyGroupList(TestConsistencyGroup):
self.assertEqual(self.data_long, list(data))
+class TestConsistencyGroupRemoveVolume(TestConsistencyGroup):
+
+ _consistency_group = (
+ volume_fakes.FakeConsistencyGroup.create_one_consistency_group())
+
+ def setUp(self):
+ super(TestConsistencyGroupRemoveVolume, self).setUp()
+
+ self.consistencygroups_mock.get.return_value = (
+ self._consistency_group)
+ # Get the command object to test
+ self.cmd = \
+ consistency_group.RemoveVolumeFromConsistencyGroup(self.app, None)
+
+ def test_remove_one_volume_from_consistency_group(self):
+ volume = volume_fakes.FakeVolume.create_one_volume()
+ self.volumes_mock.get.return_value = volume
+ arglist = [
+ self._consistency_group.id,
+ volume.id,
+ ]
+ verifylist = [
+ ('consistency_group', self._consistency_group.id),
+ ('volumes', [volume.id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'remove_volumes': volume.id,
+ }
+ self.consistencygroups_mock.update.assert_called_once_with(
+ self._consistency_group.id,
+ **kwargs
+ )
+ self.assertIsNone(result)
+
+ def test_remove_multi_volumes_from_consistency_group(self):
+ volumes = volume_fakes.FakeVolume.create_volumes(count=2)
+ self.volumes_mock.get = volume_fakes.FakeVolume.get_volumes(volumes)
+ arglist = [
+ self._consistency_group.id,
+ volumes[0].id,
+ volumes[1].id,
+ ]
+ verifylist = [
+ ('consistency_group', self._consistency_group.id),
+ ('volumes', [volumes[0].id, volumes[1].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ # Set expected values
+ kwargs = {
+ 'remove_volumes': volumes[0].id + ',' + volumes[1].id,
+ }
+ self.consistencygroups_mock.update.assert_called_once_with(
+ self._consistency_group.id,
+ **kwargs
+ )
+ self.assertIsNone(result)
+
+ @mock.patch.object(consistency_group.LOG, 'error')
+ def test_remove_multiple_volumes_from_consistency_group_with_exception(
+ self, mock_error):
+ volume = volume_fakes.FakeVolume.create_one_volume()
+ arglist = [
+ self._consistency_group.id,
+ volume.id,
+ 'unexist_volume',
+ ]
+ verifylist = [
+ ('consistency_group', self._consistency_group.id),
+ ('volumes', [volume.id, 'unexist_volume']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [volume,
+ exceptions.CommandError,
+ self._consistency_group]
+ with mock.patch.object(utils, 'find_resource',
+ side_effect=find_mock_result) as find_mock:
+ result = self.cmd.take_action(parsed_args)
+ mock_error.assert_called_with("1 of 2 volumes failed to remove.")
+ self.assertIsNone(result)
+ find_mock.assert_any_call(self.consistencygroups_mock,
+ self._consistency_group.id)
+ find_mock.assert_any_call(self.volumes_mock,
+ volume.id)
+ find_mock.assert_any_call(self.volumes_mock,
+ 'unexist_volume')
+ self.assertEqual(3, find_mock.call_count)
+ self.consistencygroups_mock.update.assert_called_once_with(
+ self._consistency_group.id, remove_volumes=volume.id
+ )
+
+
class TestConsistencyGroupSet(TestConsistencyGroup):
consistency_group = (
diff --git a/openstackclient/tests/unit/volume/v2/test_type.py b/openstackclient/tests/unit/volume/v2/test_type.py
index 0d556e13..cec01bd8 100644
--- a/openstackclient/tests/unit/volume/v2/test_type.py
+++ b/openstackclient/tests/unit/volume/v2/test_type.py
@@ -13,6 +13,7 @@
#
import mock
+from mock import call
from osc_lib import exceptions
from osc_lib import utils
@@ -133,12 +134,13 @@ class TestTypeCreate(TestType):
class TestTypeDelete(TestType):
- volume_type = volume_fakes.FakeType.create_one_type()
+ volume_types = volume_fakes.FakeType.create_types(count=2)
def setUp(self):
super(TestTypeDelete, self).setUp()
- self.types_mock.get.return_value = self.volume_type
+ self.types_mock.get = volume_fakes.FakeType.get_types(
+ self.volume_types)
self.types_mock.delete.return_value = None
# Get the command object to mock
@@ -146,18 +148,64 @@ class TestTypeDelete(TestType):
def test_type_delete(self):
arglist = [
- self.volume_type.id
+ self.volume_types[0].id
]
verifylist = [
- ("volume_types", [self.volume_type.id])
+ ("volume_types", [self.volume_types[0].id])
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.types_mock.delete.assert_called_with(self.volume_type)
+ self.types_mock.delete.assert_called_with(self.volume_types[0])
self.assertIsNone(result)
+ def test_delete_multiple_types(self):
+ arglist = []
+ for t in self.volume_types:
+ arglist.append(t.id)
+ verifylist = [
+ ('volume_types', arglist),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for t in self.volume_types:
+ calls.append(call(t))
+ self.types_mock.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_delete_multiple_types_with_exception(self):
+ arglist = [
+ self.volume_types[0].id,
+ 'unexist_type',
+ ]
+ verifylist = [
+ ('volume_types', arglist),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.volume_types[0], exceptions.CommandError]
+ with mock.patch.object(utils, 'find_resource',
+ side_effect=find_mock_result) as find_mock:
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 volume types failed to delete.',
+ str(e))
+ find_mock.assert_any_call(
+ self.types_mock, self.volume_types[0].id)
+ find_mock.assert_any_call(self.types_mock, 'unexist_type')
+
+ self.assertEqual(2, find_mock.call_count)
+ self.types_mock.delete.assert_called_once_with(
+ self.volume_types[0]
+ )
+
class TestTypeList(TestType):
diff --git a/openstackclient/tests/unit/volume/v2/test_volume.py b/openstackclient/tests/unit/volume/v2/test_volume.py
index 1529c2e2..4fef9dd9 100644
--- a/openstackclient/tests/unit/volume/v2/test_volume.py
+++ b/openstackclient/tests/unit/volume/v2/test_volume.py
@@ -823,6 +823,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': False,
+ 'project_id': None,
+ 'user_id': None,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -853,6 +866,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': True,
+ 'project_id': self.project.id,
+ 'user_id': None,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -885,6 +911,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': True,
+ 'project_id': self.project.id,
+ 'user_id': None,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -915,6 +954,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': False,
+ 'project_id': None,
+ 'user_id': self.user.id,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
device = self.mock_volume.attachments[0]['device']
@@ -946,6 +998,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': False,
+ 'project_id': None,
+ 'user_id': self.user.id,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -976,6 +1041,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': False,
+ 'project_id': None,
+ 'user_id': None,
+ 'display_name': self.mock_volume.name,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -1006,6 +1084,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': False,
+ 'project_id': None,
+ 'user_id': None,
+ 'display_name': None,
+ 'status': self.mock_volume.status,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -1036,6 +1127,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': True,
+ 'project_id': None,
+ 'user_id': None,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
self.assertEqual(self.columns, columns)
server = self.mock_volume.attachments[0]['server_id']
@@ -1067,6 +1171,19 @@ class TestVolumeList(TestVolume):
columns, data = self.cmd.take_action(parsed_args)
+ search_opts = {
+ 'all_tenants': False,
+ 'project_id': None,
+ 'user_id': None,
+ 'display_name': None,
+ 'status': None,
+ }
+ self.volumes_mock.list.assert_called_once_with(
+ search_opts=search_opts,
+ marker=None,
+ limit=None,
+ )
+
collist = [
'ID',
'Display Name',