summaryrefslogtreecommitdiff
path: root/openstackclient/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests/unit')
-rw-r--r--openstackclient/tests/unit/common/test_project_cleanup.py26
-rw-r--r--openstackclient/tests/unit/compute/v2/fakes.py329
-rw-r--r--openstackclient/tests/unit/compute/v2/test_host.py105
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server.py3
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server_migration.py187
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server_volume.py186
-rw-r--r--openstackclient/tests/unit/image/v2/test_image.py25
-rw-r--r--openstackclient/tests/unit/network/v2/fakes.py107
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip_port_forwarding.py231
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_qos_policy.py2
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_qos_rule_type.py34
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_trunk.py851
-rw-r--r--openstackclient/tests/unit/volume/v1/test_volume.py51
-rw-r--r--openstackclient/tests/unit/volume/v2/test_consistency_group.py4
-rw-r--r--openstackclient/tests/unit/volume/v2/test_volume.py53
-rw-r--r--openstackclient/tests/unit/volume/v3/fakes.py89
-rw-r--r--openstackclient/tests/unit/volume/v3/test_block_storage_cleanup.py178
-rw-r--r--openstackclient/tests/unit/volume/v3/test_block_storage_log_level.py233
-rw-r--r--openstackclient/tests/unit/volume/v3/test_block_storage_manage.py411
-rw-r--r--openstackclient/tests/unit/volume/v3/test_volume.py179
-rw-r--r--openstackclient/tests/unit/volume/v3/test_volume_group.py180
21 files changed, 2991 insertions, 473 deletions
diff --git a/openstackclient/tests/unit/common/test_project_cleanup.py b/openstackclient/tests/unit/common/test_project_cleanup.py
index d235aeb0..50c434b9 100644
--- a/openstackclient/tests/unit/common/test_project_cleanup.py
+++ b/openstackclient/tests/unit/common/test_project_cleanup.py
@@ -85,6 +85,32 @@ class TestProjectCleanup(TestProjectCleanupBase):
self.assertIsNone(result)
+ def test_project_cleanup_with_auto_approve(self):
+ arglist = [
+ '--project', self.project.id,
+ '--auto-approve',
+ ]
+ verifylist = [
+ ('dry_run', False),
+ ('auth_project', False),
+ ('project', self.project.id),
+ ('auto_approve', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = None
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.sdk_connect_as_project_mock.assert_called_with(
+ self.project)
+ calls = [
+ mock.call(dry_run=True, status_queue=mock.ANY, filters={}),
+ mock.call(dry_run=False, status_queue=mock.ANY, filters={})
+ ]
+ self.project_cleanup_mock.assert_has_calls(calls)
+
+ self.assertIsNone(result)
+
def test_project_cleanup_with_project(self):
arglist = [
'--project', self.project.id,
diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py
index b2702128..f7f07509 100644
--- a/openstackclient/tests/unit/compute/v2/fakes.py
+++ b/openstackclient/tests/unit/compute/v2/fakes.py
@@ -21,9 +21,11 @@ import uuid
from novaclient import api_versions
from openstack.compute.v2 import flavor as _flavor
from openstack.compute.v2 import hypervisor as _hypervisor
+from openstack.compute.v2 import migration as _migration
from openstack.compute.v2 import server as _server
from openstack.compute.v2 import server_group as _server_group
from openstack.compute.v2 import server_interface as _server_interface
+from openstack.compute.v2 import server_migration as _server_migration
from openstack.compute.v2 import service
from openstack.compute.v2 import volume_attachment
@@ -1433,242 +1435,155 @@ class FakeRateLimit(object):
self.next_available = next_available
-class FakeMigration(object):
- """Fake one or more migrations."""
+def create_one_migration(attrs=None):
+ """Create a fake migration.
- @staticmethod
- def create_one_migration(attrs=None, methods=None):
- """Create a fake migration.
-
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :return:
- A FakeResource object, with id, type, and so on
- """
- attrs = attrs or {}
- methods = methods or {}
+ :param dict attrs: A dictionary with all attributes
+ :return: A fake openstack.compute.v2.migration.Migration object
+ """
+ attrs = attrs or {}
- # Set default attributes.
- migration_info = {
- "dest_host": "10.0.2.15",
- "status": "migrating",
- "migration_type": "migration",
- "updated_at": "2017-01-31T08:03:25.000000",
- "created_at": "2017-01-31T08:03:21.000000",
- "dest_compute": "compute-" + uuid.uuid4().hex,
- "id": random.randint(1, 999),
- "source_node": "node-" + uuid.uuid4().hex,
- "instance_uuid": uuid.uuid4().hex,
- "dest_node": "node-" + uuid.uuid4().hex,
- "source_compute": "compute-" + uuid.uuid4().hex,
- "uuid": uuid.uuid4().hex,
- "old_instance_type_id": uuid.uuid4().hex,
- "new_instance_type_id": uuid.uuid4().hex,
- "project_id": uuid.uuid4().hex,
- "user_id": uuid.uuid4().hex
- }
+ # Set default attributes.
+ migration_info = {
+ "created_at": "2017-01-31T08:03:21.000000",
+ "dest_compute": "compute-" + uuid.uuid4().hex,
+ "dest_host": "10.0.2.15",
+ "dest_node": "node-" + uuid.uuid4().hex,
+ "id": random.randint(1, 999),
+ "migration_type": "migration",
+ "new_flavor_id": uuid.uuid4().hex,
+ "old_flavor_id": uuid.uuid4().hex,
+ "project_id": uuid.uuid4().hex,
+ "server_id": uuid.uuid4().hex,
+ "source_compute": "compute-" + uuid.uuid4().hex,
+ "source_node": "node-" + uuid.uuid4().hex,
+ "status": "migrating",
+ "updated_at": "2017-01-31T08:03:25.000000",
+ "user_id": uuid.uuid4().hex,
+ "uuid": uuid.uuid4().hex,
+ }
- # Overwrite default attributes.
- migration_info.update(attrs)
+ # Overwrite default attributes.
+ migration_info.update(attrs)
- migration = fakes.FakeResource(info=copy.deepcopy(migration_info),
- methods=methods,
- loaded=True)
- return migration
+ migration = _migration.Migration(**migration_info)
+ return migration
- @staticmethod
- def create_migrations(attrs=None, methods=None, count=2):
- """Create multiple fake migrations.
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :param int count:
- The number of migrations to fake
- :return:
- A list of FakeResource objects faking the migrations
- """
- migrations = []
- for i in range(0, count):
- migrations.append(
- FakeMigration.create_one_migration(
- attrs, methods))
+def create_migrations(attrs=None, count=2):
+ """Create multiple fake migrations.
- return migrations
+ :param dict attrs: A dictionary with all attributes
+ :param int count: The number of migrations to fake
+ :return: A list of fake openstack.compute.v2.migration.Migration objects
+ """
+ migrations = []
+ for i in range(0, count):
+ migrations.append(create_one_migration(attrs))
+ return migrations
-class FakeServerMigration(object):
- """Fake one or more server migrations."""
- @staticmethod
- def create_one_server_migration(attrs=None, methods=None):
- """Create a fake server migration.
-
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :return:
- A FakeResource object, with id, type, and so on
- """
- attrs = attrs or {}
- methods = methods or {}
+def create_one_server_migration(attrs=None):
+ """Create a fake server migration.
- # Set default attributes.
+ :param dict attrs: A dictionary with all attributes
+ :return A fake openstack.compute.v2.server_migration.ServerMigration object
+ """
+ attrs = attrs or {}
- migration_info = {
- "created_at": "2016-01-29T13:42:02.000000",
- "dest_compute": "compute2",
- "dest_host": "1.2.3.4",
- "dest_node": "node2",
- "id": random.randint(1, 999),
- "server_uuid": uuid.uuid4().hex,
- "source_compute": "compute1",
- "source_node": "node1",
- "status": "running",
- "memory_total_bytes": random.randint(1, 99999),
- "memory_processed_bytes": random.randint(1, 99999),
- "memory_remaining_bytes": random.randint(1, 99999),
- "disk_total_bytes": random.randint(1, 99999),
- "disk_processed_bytes": random.randint(1, 99999),
- "disk_remaining_bytes": random.randint(1, 99999),
- "updated_at": "2016-01-29T13:42:02.000000",
- # added in 2.59
- "uuid": uuid.uuid4().hex,
- # added in 2.80
- "user_id": uuid.uuid4().hex,
- "project_id": uuid.uuid4().hex,
- }
+ # Set default attributes.
- # Overwrite default attributes.
- migration_info.update(attrs)
+ migration_info = {
+ "created_at": "2016-01-29T13:42:02.000000",
+ "dest_compute": "compute2",
+ "dest_host": "1.2.3.4",
+ "dest_node": "node2",
+ "id": random.randint(1, 999),
+ "server_uuid": uuid.uuid4().hex,
+ "source_compute": "compute1",
+ "source_node": "node1",
+ "status": "running",
+ "memory_total_bytes": random.randint(1, 99999),
+ "memory_processed_bytes": random.randint(1, 99999),
+ "memory_remaining_bytes": random.randint(1, 99999),
+ "disk_total_bytes": random.randint(1, 99999),
+ "disk_processed_bytes": random.randint(1, 99999),
+ "disk_remaining_bytes": random.randint(1, 99999),
+ "updated_at": "2016-01-29T13:42:02.000000",
+ # added in 2.59
+ "uuid": uuid.uuid4().hex,
+ # added in 2.80
+ "user_id": uuid.uuid4().hex,
+ "project_id": uuid.uuid4().hex,
+ }
- migration = fakes.FakeResource(
- info=copy.deepcopy(migration_info),
- methods=methods,
- loaded=True)
- return migration
+ # Overwrite default attributes.
+ migration_info.update(attrs)
+ migration = _server_migration.ServerMigration(**migration_info)
+ return migration
-class FakeVolumeAttachment(object):
- """Fake one or more volume attachments (BDMs)."""
- @staticmethod
- def create_one_volume_attachment(attrs=None, methods=None):
- """Create a fake volume attachment.
+def create_server_migrations(attrs=None, methods=None, count=2):
+ """Create multiple server migrations.
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :return:
- A FakeResource object, with id, device, and so on
- """
- attrs = attrs or {}
- methods = methods or {}
-
- # Set default attributes.
- volume_attachment_info = {
- "id": uuid.uuid4().hex,
- "device": "/dev/sdb",
- "serverId": uuid.uuid4().hex,
- "volumeId": uuid.uuid4().hex,
- # introduced in API microversion 2.70
- "tag": "foo",
- # introduced in API microversion 2.79
- "delete_on_termination": True,
- # introduced in API microversion 2.89
- "attachment_id": uuid.uuid4().hex,
- "bdm_uuid": uuid.uuid4().hex
- }
-
- # Overwrite default attributes.
- volume_attachment_info.update(attrs)
-
- volume_attachment = fakes.FakeResource(
- info=copy.deepcopy(volume_attachment_info),
- methods=methods,
- loaded=True)
- return volume_attachment
+ :param dict attrs: A dictionary with all attributes
+ :param int count: The number of server migrations to fake
+ :return A list of fake
+ openstack.compute.v2.server_migration.ServerMigration objects
+ """
+ migrations = []
+ for i in range(0, count):
+ migrations.append(
+ create_one_server_migration(attrs, methods))
- @staticmethod
- def create_volume_attachments(attrs=None, methods=None, count=2):
- """Create multiple fake volume attachments (BDMs).
+ return migrations
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :param int count:
- The number of volume attachments to fake
- :return:
- A list of FakeResource objects faking the volume attachments.
- """
- volume_attachments = []
- for i in range(0, count):
- volume_attachments.append(
- FakeVolumeAttachment.create_one_volume_attachment(
- attrs, methods))
- return volume_attachments
+def create_one_volume_attachment(attrs=None):
+ """Create a fake volume attachment.
- @staticmethod
- def create_one_sdk_volume_attachment(attrs=None, methods=None):
- """Create a fake sdk VolumeAttachment.
+ :param dict attrs: A dictionary with all attributes
+ :return: A fake openstack.compute.v2.volume_attachment.VolumeAttachment
+ object
+ """
+ attrs = attrs or {}
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :return:
- A fake VolumeAttachment object, with id, device, and so on
- """
- attrs = attrs or {}
- methods = methods or {}
+ # Set default attributes.
+ volume_attachment_info = {
+ "id": uuid.uuid4().hex,
+ "device": "/dev/sdb",
+ "server_id": uuid.uuid4().hex,
+ "volume_id": uuid.uuid4().hex,
+ # introduced in API microversion 2.70
+ "tag": "foo",
+ # introduced in API microversion 2.79
+ "delete_on_termination": True,
+ # introduced in API microversion 2.89
+ "attachment_id": uuid.uuid4().hex,
+ "bdm_id": uuid.uuid4().hex,
+ }
- # Set default attributes.
- volume_attachment_info = {
- "id": uuid.uuid4().hex,
- "device": "/dev/sdb",
- "server_id": uuid.uuid4().hex,
- "volume_id": uuid.uuid4().hex,
- # introduced in API microversion 2.70
- "tag": "foo",
- # introduced in API microversion 2.79
- "delete_on_termination": True,
- # introduced in API microversion 2.89
- "attachment_id": uuid.uuid4().hex,
- "bdm_uuid": uuid.uuid4().hex
- }
+ # Overwrite default attributes.
+ volume_attachment_info.update(attrs)
- # Overwrite default attributes.
- volume_attachment_info.update(attrs)
+ return volume_attachment.VolumeAttachment(**volume_attachment_info)
- return volume_attachment.VolumeAttachment(**volume_attachment_info)
- @staticmethod
- def create_sdk_volume_attachments(attrs=None, methods=None, count=2):
- """Create multiple fake VolumeAttachment objects (BDMs).
+def create_volume_attachments(attrs=None, count=2):
+ """Create multiple fake volume attachments.
- :param dict attrs:
- A dictionary with all attributes
- :param dict methods:
- A dictionary with all methods
- :param int count:
- The number of volume attachments to fake
- :return:
- A list of VolumeAttachment objects faking the volume attachments.
- """
- volume_attachments = []
- for i in range(0, count):
- volume_attachments.append(
- FakeVolumeAttachment.create_one_sdk_volume_attachment(
- attrs, methods))
+ :param dict attrs: A dictionary with all attributes
+ :param int count: The number of volume attachments to fake
+ :return: A list of fake
+ openstack.compute.v2.volume_attachment.VolumeAttachment objects
+ """
+ volume_attachments = []
+ for i in range(0, count):
+ volume_attachments.append(create_one_volume_attachment(attrs))
- return volume_attachments
+ return volume_attachments
def create_one_hypervisor(attrs=None):
diff --git a/openstackclient/tests/unit/compute/v2/test_host.py b/openstackclient/tests/unit/compute/v2/test_host.py
index 4e1b5ad1..ec91b37a 100644
--- a/openstackclient/tests/unit/compute/v2/test_host.py
+++ b/openstackclient/tests/unit/compute/v2/test_host.py
@@ -17,6 +17,7 @@ from unittest import mock
from openstackclient.compute.v2 import host
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit import fakes
from openstackclient.tests.unit import utils as tests_utils
@@ -26,7 +27,10 @@ class TestHost(compute_fakes.TestComputev2):
super(TestHost, self).setUp()
# Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
+ self.app.client_manager.sdk_connection = mock.Mock()
+ self.app.client_manager.sdk_connection.compute = mock.Mock()
+ self.sdk_client = self.app.client_manager.sdk_connection.compute
+ self.sdk_client.get = mock.Mock()
@mock.patch(
@@ -34,27 +38,29 @@ class TestHost(compute_fakes.TestComputev2):
)
class TestHostList(TestHost):
- host = compute_fakes.FakeHost.create_one_host()
-
- columns = (
- 'Host Name',
- 'Service',
- 'Zone',
- )
-
- data = [(
- host['host_name'],
- host['service'],
- host['zone'],
- )]
+ _host = compute_fakes.FakeHost.create_one_host()
def setUp(self):
super(TestHostList, self).setUp()
+ self.sdk_client.get.return_value = fakes.FakeResponse(
+ data={'hosts': [self._host]}
+ )
+
+ self.columns = (
+ 'Host Name', 'Service', 'Zone'
+ )
+
+ self.data = [(
+ self._host['host_name'],
+ self._host['service'],
+ self._host['zone'],
+ )]
+
self.cmd = host.ListHost(self.app, None)
def test_host_list_no_option(self, h_mock):
- h_mock.return_value = [self.host]
+ h_mock.return_value = [self._host]
arglist = []
verifylist = []
@@ -62,24 +68,24 @@ class TestHostList(TestHost):
columns, data = self.cmd.take_action(parsed_args)
- h_mock.assert_called_with(None)
+ self.sdk_client.get.assert_called_with('/os-hosts', microversion='2.1')
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
def test_host_list_with_option(self, h_mock):
- h_mock.return_value = [self.host]
+ h_mock.return_value = [self._host]
arglist = [
- '--zone', self.host['zone'],
+ '--zone', self._host['zone'],
]
verifylist = [
- ('zone', self.host['zone']),
+ ('zone', self._host['zone']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- h_mock.assert_called_with(self.host['zone'])
+ self.sdk_client.get.assert_called_with('/os-hosts', microversion='2.1')
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
@@ -141,31 +147,43 @@ class TestHostSet(TestHost):
)
class TestHostShow(TestHost):
- host = compute_fakes.FakeHost.create_one_host()
-
- columns = (
- 'Host',
- 'Project',
- 'CPU',
- 'Memory MB',
- 'Disk GB',
- )
-
- data = [(
- host['host'],
- host['project'],
- host['cpu'],
- host['memory_mb'],
- host['disk_gb'],
- )]
+ _host = compute_fakes.FakeHost.create_one_host()
def setUp(self):
super(TestHostShow, self).setUp()
+ output_data = {"resource": {
+ "host": self._host['host'],
+ "project": self._host['project'],
+ "cpu": self._host['cpu'],
+ "memory_mb": self._host['memory_mb'],
+ "disk_gb": self._host['disk_gb']
+ }}
+
+ self.sdk_client.get.return_value = fakes.FakeResponse(
+ data={'host': [output_data]}
+ )
+
+ self.columns = (
+ 'Host',
+ 'Project',
+ 'CPU',
+ 'Memory MB',
+ 'Disk GB',
+ )
+
+ self.data = [(
+ self._host['host'],
+ self._host['project'],
+ self._host['cpu'],
+ self._host['memory_mb'],
+ self._host['disk_gb'],
+ )]
+
self.cmd = host.ShowHost(self.app, None)
def test_host_show_no_option(self, h_mock):
- h_mock.host_show.return_value = [self.host]
+ h_mock.host_show.return_value = [self._host]
arglist = []
verifylist = []
@@ -174,18 +192,21 @@ class TestHostShow(TestHost):
self.cmd, arglist, verifylist)
def test_host_show_with_option(self, h_mock):
- h_mock.return_value = [self.host]
+ h_mock.return_value = [self._host]
arglist = [
- self.host['host_name'],
+ self._host['host_name'],
]
verifylist = [
- ('host', self.host['host_name']),
+ ('host', self._host['host_name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- h_mock.assert_called_with(self.host['host_name'])
+ self.sdk_client.get.assert_called_with(
+ '/os-hosts/' + self._host['host_name'],
+ microversion='2.1'
+ )
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py
index 01c71b6c..511cac02 100644
--- a/openstackclient/tests/unit/compute/v2/test_server.py
+++ b/openstackclient/tests/unit/compute/v2/test_server.py
@@ -927,8 +927,7 @@ class TestServerVolume(TestServer):
'volume_id': self.volumes[0].id,
}
self.volume_attachment = \
- compute_fakes.FakeVolumeAttachment.\
- create_one_sdk_volume_attachment(attrs=attrs)
+ compute_fakes.create_one_volume_attachment(attrs=attrs)
self.sdk_client.create_volume_attachment.return_value = \
self.volume_attachment
diff --git a/openstackclient/tests/unit/compute/v2/test_server_migration.py b/openstackclient/tests/unit/compute/v2/test_server_migration.py
index 93c1865a..afe868d9 100644
--- a/openstackclient/tests/unit/compute/v2/test_server_migration.py
+++ b/openstackclient/tests/unit/compute/v2/test_server_migration.py
@@ -40,6 +40,18 @@ class TestServerMigration(compute_fakes.TestComputev2):
self.app.client_manager.sdk_connection.compute = mock.Mock()
self.sdk_client = self.app.client_manager.sdk_connection.compute
+ patcher = mock.patch.object(
+ sdk_utils, 'supports_microversion', return_value=True)
+ self.addCleanup(patcher.stop)
+ self.supports_microversion_mock = patcher.start()
+
+ def _set_mock_microversion(self, mock_v):
+ """Set a specific microversion for the mock supports_microversion()."""
+ self.supports_microversion_mock.reset_mock(return_value=True)
+ self.supports_microversion_mock.side_effect = (
+ lambda _, v:
+ api_versions.APIVersion(v) <= api_versions.APIVersion(mock_v))
+
class TestListMigration(TestServerMigration):
"""Test fetch all migrations."""
@@ -51,19 +63,20 @@ class TestListMigration(TestServerMigration):
]
MIGRATION_FIELDS = [
- 'source_node', 'dest_node', 'source_compute', 'dest_compute',
- 'dest_host', 'status', 'server_id', 'old_flavor_id',
+ 'source_node', 'dest_node', 'source_compute',
+ 'dest_compute', 'dest_host', 'status', 'server_id', 'old_flavor_id',
'new_flavor_id', 'created_at', 'updated_at'
]
def setUp(self):
super().setUp()
- self.server = compute_fakes.FakeServer.create_one_server()
+ self._set_mock_microversion('2.1')
+
+ self.server = compute_fakes.FakeServer.create_one_sdk_server()
self.sdk_client.find_server.return_value = self.server
- self.migrations = compute_fakes.FakeMigration.create_migrations(
- count=3)
+ self.migrations = compute_fakes.create_migrations(count=3)
self.sdk_client.migrations.return_value = self.migrations
self.data = (common_utils.get_item_properties(
@@ -72,20 +85,6 @@ class TestListMigration(TestServerMigration):
# Get the command object to test
self.cmd = server_migration.ListMigration(self.app, None)
- patcher = mock.patch.object(
- sdk_utils, 'supports_microversion', return_value=True)
- self.addCleanup(patcher.stop)
- self.supports_microversion_mock = patcher.start()
- self._set_mock_microversion(
- self.app.client_manager.compute.api_version.get_string())
-
- def _set_mock_microversion(self, mock_v):
- """Set a specific microversion for the mock supports_microversion()."""
- self.supports_microversion_mock.reset_mock(return_value=True)
- self.supports_microversion_mock.side_effect = (
- lambda _, v:
- api_versions.APIVersion(v) <= api_versions.APIVersion(mock_v))
-
def test_server_migration_list_no_options(self):
arglist = []
verifylist = []
@@ -601,12 +600,15 @@ class TestServerMigrationShow(TestServerMigration):
def setUp(self):
super().setUp()
- self.server = compute_fakes.FakeServer.create_one_server()
- self.servers_mock.get.return_value = self.server
+ self.server = compute_fakes.FakeServer.create_one_sdk_server()
+ self.sdk_client.find_server.return_value = self.server
- self.server_migration = compute_fakes.FakeServerMigration\
- .create_one_server_migration()
- self.server_migrations_mock.get.return_value = self.server_migration
+ self.server_migration = compute_fakes.create_one_server_migration()
+ self.sdk_client.get_server_migration.return_value =\
+ self.server_migration
+ self.sdk_client.server_migrations.return_value = iter(
+ [self.server_migration]
+ )
self.columns = (
'ID',
@@ -629,7 +631,7 @@ class TestServerMigrationShow(TestServerMigration):
self.data = (
self.server_migration.id,
- self.server_migration.server_uuid,
+ self.server_migration.server_id,
self.server_migration.status,
self.server_migration.source_compute,
self.server_migration.source_node,
@@ -662,19 +664,18 @@ class TestServerMigrationShow(TestServerMigration):
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
- self.servers_mock.get.assert_called_with(self.server.id)
- self.server_migrations_mock.get.assert_called_with(
- self.server.id, '2',)
+ self.sdk_client.find_server.assert_called_with(
+ self.server.id, ignore_missing=False)
+ self.sdk_client.get_server_migration.assert_called_with(
+ self.server.id, '2', ignore_missing=False)
def test_server_migration_show(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.24')
+ self._set_mock_microversion('2.24')
self._test_server_migration_show()
def test_server_migration_show_v259(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
+ self._set_mock_microversion('2.59')
self.columns += ('UUID',)
self.data += (self.server_migration.uuid,)
@@ -682,8 +683,7 @@ class TestServerMigrationShow(TestServerMigration):
self._test_server_migration_show()
def test_server_migration_show_v280(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.80')
+ self._set_mock_microversion('2.80')
self.columns += ('UUID', 'User ID', 'Project ID')
self.data += (
@@ -695,8 +695,7 @@ class TestServerMigrationShow(TestServerMigration):
self._test_server_migration_show()
def test_server_migration_show_pre_v224(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.23')
+ self._set_mock_microversion('2.23')
arglist = [
self.server.id,
@@ -714,9 +713,11 @@ class TestServerMigrationShow(TestServerMigration):
str(ex))
def test_server_migration_show_by_uuid(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
- self.server_migrations_mock.list.return_value = [self.server_migration]
+ self._set_mock_microversion('2.59')
+
+ self.sdk_client.server_migrations.return_value = iter(
+ [self.server_migration]
+ )
self.columns += ('UUID',)
self.data += (self.server_migration.uuid,)
@@ -733,14 +734,14 @@ class TestServerMigrationShow(TestServerMigration):
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
- self.servers_mock.get.assert_called_with(self.server.id)
- self.server_migrations_mock.list.assert_called_with(self.server.id)
- self.server_migrations_mock.get.assert_not_called()
+ self.sdk_client.find_server.assert_called_with(
+ self.server.id, ignore_missing=False)
+ self.sdk_client.server_migrations.assert_called_with(self.server.id)
+ self.sdk_client.get_server_migration.assert_not_called()
def test_server_migration_show_by_uuid_no_matches(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
- self.server_migrations_mock.list.return_value = []
+ self._set_mock_microversion('2.59')
+ self.sdk_client.server_migrations.return_value = iter([])
arglist = [
self.server.id,
@@ -758,8 +759,7 @@ class TestServerMigrationShow(TestServerMigration):
str(ex))
def test_server_migration_show_by_uuid_pre_v259(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.58')
+ self._set_mock_microversion('2.58')
arglist = [
self.server.id,
@@ -777,8 +777,7 @@ class TestServerMigrationShow(TestServerMigration):
str(ex))
def test_server_migration_show_invalid_id(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.24')
+ self._set_mock_microversion('2.24')
arglist = [
self.server.id,
@@ -801,17 +800,16 @@ class TestServerMigrationAbort(TestServerMigration):
def setUp(self):
super().setUp()
- self.server = compute_fakes.FakeServer.create_one_server()
+ self.server = compute_fakes.FakeServer.create_one_sdk_server()
# Return value for utils.find_resource for server.
- self.servers_mock.get.return_value = self.server
+ self.sdk_client.find_server.return_value = self.server
# Get the command object to test
self.cmd = server_migration.AbortMigration(self.app, None)
def test_migration_abort(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.24')
+ self._set_mock_microversion('2.24')
arglist = [
self.server.id,
@@ -822,14 +820,14 @@ class TestServerMigrationAbort(TestServerMigration):
result = self.cmd.take_action(parsed_args)
- self.servers_mock.get.assert_called_with(self.server.id)
- self.server_migrations_mock.live_migration_abort.assert_called_with(
- self.server.id, '2',)
+ self.sdk_client.find_server.assert_called_with(
+ self.server.id, ignore_missing=False)
+ self.sdk_client.abort_server_migration.assert_called_with(
+ '2', self.server.id, ignore_missing=False)
self.assertIsNone(result)
def test_migration_abort_pre_v224(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.23')
+ self._set_mock_microversion('2.23')
arglist = [
self.server.id,
@@ -847,12 +845,12 @@ class TestServerMigrationAbort(TestServerMigration):
str(ex))
def test_server_migration_abort_by_uuid(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
+ self._set_mock_microversion('2.59')
- self.server_migration = compute_fakes.FakeServerMigration\
- .create_one_server_migration()
- self.server_migrations_mock.list.return_value = [self.server_migration]
+ self.server_migration = compute_fakes.create_one_server_migration()
+ self.sdk_client.server_migrations.return_value = iter(
+ [self.server_migration]
+ )
arglist = [
self.server.id,
@@ -863,17 +861,19 @@ class TestServerMigrationAbort(TestServerMigration):
result = self.cmd.take_action(parsed_args)
- self.servers_mock.get.assert_called_with(self.server.id)
- self.server_migrations_mock.list.assert_called_with(self.server.id)
- self.server_migrations_mock.live_migration_abort.assert_called_with(
- self.server.id, self.server_migration.id)
+ self.sdk_client.find_server.assert_called_with(
+ self.server.id, ignore_missing=False)
+ self.sdk_client.server_migrations.assert_called_with(self.server.id)
+ self.sdk_client.abort_server_migration.assert_called_with(
+ self.server_migration.id, self.server.id, ignore_missing=False)
self.assertIsNone(result)
def test_server_migration_abort_by_uuid_no_matches(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
+ self._set_mock_microversion('2.59')
- self.server_migrations_mock.list.return_value = []
+ self.sdk_client.server_migrations.return_value = iter(
+ []
+ )
arglist = [
self.server.id,
@@ -891,8 +891,7 @@ class TestServerMigrationAbort(TestServerMigration):
str(ex))
def test_server_migration_abort_by_uuid_pre_v259(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.58')
+ self._set_mock_microversion('2.58')
arglist = [
self.server.id,
@@ -915,17 +914,16 @@ class TestServerMigrationForceComplete(TestServerMigration):
def setUp(self):
super().setUp()
- self.server = compute_fakes.FakeServer.create_one_server()
+ self.server = compute_fakes.FakeServer.create_one_sdk_server()
# Return value for utils.find_resource for server.
- self.servers_mock.get.return_value = self.server
+ self.sdk_client.find_server.return_value = self.server
# Get the command object to test
self.cmd = server_migration.ForceCompleteMigration(self.app, None)
def test_migration_force_complete(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.22')
+ self._set_mock_microversion('2.22')
arglist = [
self.server.id,
@@ -936,14 +934,14 @@ class TestServerMigrationForceComplete(TestServerMigration):
result = self.cmd.take_action(parsed_args)
- self.servers_mock.get.assert_called_with(self.server.id)
- self.server_migrations_mock.live_migrate_force_complete\
- .assert_called_with(self.server.id, '2',)
+ self.sdk_client.find_server.assert_called_with(
+ self.server.id, ignore_missing=False)
+ self.sdk_client.force_complete_server_migration\
+ .assert_called_with('2', self.server.id)
self.assertIsNone(result)
def test_migration_force_complete_pre_v222(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.21')
+ self._set_mock_microversion('2.21')
arglist = [
self.server.id,
@@ -961,12 +959,12 @@ class TestServerMigrationForceComplete(TestServerMigration):
str(ex))
def test_server_migration_force_complete_by_uuid(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
+ self._set_mock_microversion('2.59')
- self.server_migration = compute_fakes.FakeServerMigration\
- .create_one_server_migration()
- self.server_migrations_mock.list.return_value = [self.server_migration]
+ self.server_migration = compute_fakes.create_one_server_migration()
+ self.sdk_client.server_migrations.return_value = iter(
+ [self.server_migration]
+ )
arglist = [
self.server.id,
@@ -977,17 +975,17 @@ class TestServerMigrationForceComplete(TestServerMigration):
result = self.cmd.take_action(parsed_args)
- self.servers_mock.get.assert_called_with(self.server.id)
- self.server_migrations_mock.list.assert_called_with(self.server.id)
- self.server_migrations_mock.live_migrate_force_complete\
- .assert_called_with(self.server.id, self.server_migration.id)
+ self.sdk_client.find_server.assert_called_with(
+ self.server.id, ignore_missing=False)
+ self.sdk_client.server_migrations.assert_called_with(self.server.id)
+ self.sdk_client.force_complete_server_migration.\
+ assert_called_with(self.server_migration.id, self.server.id)
self.assertIsNone(result)
def test_server_migration_force_complete_by_uuid_no_matches(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.59')
+ self._set_mock_microversion('2.59')
- self.server_migrations_mock.list.return_value = []
+ self.sdk_client.server_migrations.return_value = iter([])
arglist = [
self.server.id,
@@ -1005,8 +1003,7 @@ class TestServerMigrationForceComplete(TestServerMigration):
str(ex))
def test_server_migration_force_complete_by_uuid_pre_v259(self):
- self.app.client_manager.compute.api_version = api_versions.APIVersion(
- '2.58')
+ self._set_mock_microversion('2.58')
arglist = [
self.server.id,
diff --git a/openstackclient/tests/unit/compute/v2/test_server_volume.py b/openstackclient/tests/unit/compute/v2/test_server_volume.py
index 02d378f8..f86bc7dd 100644
--- a/openstackclient/tests/unit/compute/v2/test_server_volume.py
+++ b/openstackclient/tests/unit/compute/v2/test_server_volume.py
@@ -11,11 +11,15 @@
# under the License.
#
+from unittest import mock
+
from novaclient import api_versions
+from openstack import utils as sdk_utils
from osc_lib import exceptions
from openstackclient.compute.v2 import server_volume
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit.volume.v2 import fakes as volume_fakes
class TestServerVolume(compute_fakes.TestComputev2):
@@ -23,13 +27,11 @@ class TestServerVolume(compute_fakes.TestComputev2):
def setUp(self):
super().setUp()
- # Get a shortcut to the compute client ServerManager Mock
- self.servers_mock = self.app.client_manager.compute.servers
- self.servers_mock.reset_mock()
-
- # Get a shortcut to the compute client VolumeManager mock
- self.servers_volumes_mock = self.app.client_manager.compute.volumes
- self.servers_volumes_mock.reset_mock()
+ self.app.client_manager.sdk_connection = mock.Mock()
+ self.app.client_manager.sdk_connection.compute = mock.Mock()
+ self.app.client_manager.sdk_connection.volume = mock.Mock()
+ self.compute_client = self.app.client_manager.sdk_connection.compute
+ self.volume_client = self.app.client_manager.sdk_connection.volume
class TestServerVolumeList(TestServerVolume):
@@ -37,20 +39,21 @@ class TestServerVolumeList(TestServerVolume):
def setUp(self):
super().setUp()
- self.server = compute_fakes.FakeServer.create_one_server()
- self.volume_attachments = (
- compute_fakes.FakeVolumeAttachment.create_volume_attachments())
+ self.server = compute_fakes.FakeServer.create_one_sdk_server()
+ self.volume_attachments = compute_fakes.create_volume_attachments()
- self.servers_mock.get.return_value = self.server
- self.servers_volumes_mock.get_server_volumes.return_value = (
+ self.compute_client.find_server.return_value = self.server
+ self.compute_client.volume_attachments.return_value = (
self.volume_attachments)
# Get the command object to test
self.cmd = server_volume.ListServerVolume(self.app, None)
- def test_server_volume_list(self):
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_list(self, sm_mock):
self.app.client_manager.compute.api_version = \
api_versions.APIVersion('2.1')
+ sm_mock.side_effect = [False, False, False, False]
arglist = [
self.server.id,
@@ -68,24 +71,25 @@ class TestServerVolumeList(TestServerVolume):
(
self.volume_attachments[0].id,
self.volume_attachments[0].device,
- self.volume_attachments[0].serverId,
- self.volume_attachments[0].volumeId,
+ self.volume_attachments[0].server_id,
+ self.volume_attachments[0].volume_id,
),
(
self.volume_attachments[1].id,
self.volume_attachments[1].device,
- self.volume_attachments[1].serverId,
- self.volume_attachments[1].volumeId,
+ self.volume_attachments[1].server_id,
+ self.volume_attachments[1].volume_id,
),
),
tuple(data),
)
- self.servers_volumes_mock.get_server_volumes.assert_called_once_with(
- self.server.id)
+ self.compute_client.volume_attachments.assert_called_once_with(
+ self.server,
+ )
- def test_server_volume_list_with_tags(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.70')
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_list_with_tags(self, sm_mock):
+ sm_mock.side_effect = [False, True, False, False]
arglist = [
self.server.id,
@@ -105,27 +109,27 @@ class TestServerVolumeList(TestServerVolume):
(
self.volume_attachments[0].id,
self.volume_attachments[0].device,
- self.volume_attachments[0].serverId,
- self.volume_attachments[0].volumeId,
+ self.volume_attachments[0].server_id,
+ self.volume_attachments[0].volume_id,
self.volume_attachments[0].tag,
),
(
self.volume_attachments[1].id,
self.volume_attachments[1].device,
- self.volume_attachments[1].serverId,
- self.volume_attachments[1].volumeId,
+ self.volume_attachments[1].server_id,
+ self.volume_attachments[1].volume_id,
self.volume_attachments[1].tag,
),
),
tuple(data),
)
- self.servers_volumes_mock.get_server_volumes.assert_called_once_with(
- self.server.id)
-
- def test_server_volume_list_with_delete_on_attachment(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.79')
+ self.compute_client.volume_attachments.assert_called_once_with(
+ self.server,
+ )
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_list_with_delete_on_attachment(self, sm_mock):
+ sm_mock.side_effect = [False, True, True, False]
arglist = [
self.server.id,
]
@@ -148,29 +152,30 @@ class TestServerVolumeList(TestServerVolume):
(
self.volume_attachments[0].id,
self.volume_attachments[0].device,
- self.volume_attachments[0].serverId,
- self.volume_attachments[0].volumeId,
+ self.volume_attachments[0].server_id,
+ self.volume_attachments[0].volume_id,
self.volume_attachments[0].tag,
self.volume_attachments[0].delete_on_termination,
),
(
self.volume_attachments[1].id,
self.volume_attachments[1].device,
- self.volume_attachments[1].serverId,
- self.volume_attachments[1].volumeId,
+ self.volume_attachments[1].server_id,
+ self.volume_attachments[1].volume_id,
self.volume_attachments[1].tag,
self.volume_attachments[1].delete_on_termination,
),
),
tuple(data),
)
- self.servers_volumes_mock.get_server_volumes.assert_called_once_with(
- self.server.id)
+ self.compute_client.volume_attachments.assert_called_once_with(
+ self.server,
+ )
- def test_server_volume_list_with_attachment_ids(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.89')
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_list_with_attachment_ids(self, sm_mock):
+ sm_mock.side_effect = [True, True, True, True]
arglist = [
self.server.id,
]
@@ -193,28 +198,29 @@ class TestServerVolumeList(TestServerVolume):
(
(
self.volume_attachments[0].device,
- self.volume_attachments[0].serverId,
- self.volume_attachments[0].volumeId,
+ self.volume_attachments[0].server_id,
+ self.volume_attachments[0].volume_id,
self.volume_attachments[0].tag,
self.volume_attachments[0].delete_on_termination,
self.volume_attachments[0].attachment_id,
- self.volume_attachments[0].bdm_uuid
+ self.volume_attachments[0].bdm_id
),
(
self.volume_attachments[1].device,
- self.volume_attachments[1].serverId,
- self.volume_attachments[1].volumeId,
+ self.volume_attachments[1].server_id,
+ self.volume_attachments[1].volume_id,
self.volume_attachments[1].tag,
self.volume_attachments[1].delete_on_termination,
self.volume_attachments[1].attachment_id,
- self.volume_attachments[1].bdm_uuid
+ self.volume_attachments[1].bdm_id
),
),
tuple(data),
)
- self.servers_volumes_mock.get_server_volumes.assert_called_once_with(
- self.server.id)
+ self.compute_client.volume_attachments.assert_called_once_with(
+ self.server,
+ )
class TestServerVolumeUpdate(TestServerVolume):
@@ -222,21 +228,23 @@ class TestServerVolumeUpdate(TestServerVolume):
def setUp(self):
super().setUp()
- self.server = compute_fakes.FakeServer.create_one_server()
- self.servers_mock.get.return_value = self.server
+ self.server = compute_fakes.FakeServer.create_one_sdk_server()
+ self.compute_client.find_server.return_value = self.server
+
+ self.volume = volume_fakes.create_one_sdk_volume()
+ self.volume_client.find_volume.return_value = self.volume
# Get the command object to test
self.cmd = server_volume.UpdateServerVolume(self.app, None)
def test_server_volume_update(self):
-
arglist = [
self.server.id,
- 'foo',
+ self.volume.id,
]
verifylist = [
('server', self.server.id),
- ('volume', 'foo'),
+ ('volume', self.volume.id),
('delete_on_termination', None),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -244,67 +252,73 @@ class TestServerVolumeUpdate(TestServerVolume):
result = self.cmd.take_action(parsed_args)
# This is a no-op
- self.servers_volumes_mock.update_server_volume.assert_not_called()
+ self.compute_client.update_volume_attachment.assert_not_called()
self.assertIsNone(result)
- def test_server_volume_update_with_delete_on_termination(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.85')
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_update_with_delete_on_termination(self, sm_mock):
+ sm_mock.return_value = True
arglist = [
self.server.id,
- 'foo',
+ self.volume.id,
'--delete-on-termination',
]
verifylist = [
('server', self.server.id),
- ('volume', 'foo'),
+ ('volume', self.volume.id),
('delete_on_termination', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.servers_volumes_mock.update_server_volume.assert_called_once_with(
- self.server.id, 'foo', 'foo',
- delete_on_termination=True)
+ self.compute_client.update_volume_attachment.assert_called_once_with(
+ self.server,
+ self.volume,
+ delete_on_termination=True,
+ )
self.assertIsNone(result)
- def test_server_volume_update_with_preserve_on_termination(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.85')
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_update_with_preserve_on_termination(self, sm_mock):
+ sm_mock.return_value = True
arglist = [
self.server.id,
- 'foo',
+ self.volume.id,
'--preserve-on-termination',
]
verifylist = [
('server', self.server.id),
- ('volume', 'foo'),
+ ('volume', self.volume.id),
('delete_on_termination', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.servers_volumes_mock.update_server_volume.assert_called_once_with(
- self.server.id, 'foo', 'foo',
- delete_on_termination=False)
+ self.compute_client.update_volume_attachment.assert_called_once_with(
+ self.server,
+ self.volume,
+ delete_on_termination=False
+ )
self.assertIsNone(result)
- def test_server_volume_update_with_delete_on_termination_pre_v285(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.84')
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_update_with_delete_on_termination_pre_v285(
+ self, sm_mock,
+ ):
+ sm_mock.return_value = False
arglist = [
self.server.id,
- 'foo',
+ self.volume.id,
'--delete-on-termination',
]
verifylist = [
('server', self.server.id),
- ('volume', 'foo'),
+ ('volume', self.volume.id),
('delete_on_termination', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -312,20 +326,24 @@ class TestServerVolumeUpdate(TestServerVolume):
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
- parsed_args)
+ parsed_args,
+ )
+ self.compute_client.update_volume_attachment.assert_not_called()
- def test_server_volume_update_with_preserve_on_termination_pre_v285(self):
- self.app.client_manager.compute.api_version = \
- api_versions.APIVersion('2.84')
+ @mock.patch.object(sdk_utils, 'supports_microversion')
+ def test_server_volume_update_with_preserve_on_termination_pre_v285(
+ self, sm_mock,
+ ):
+ sm_mock.return_value = False
arglist = [
self.server.id,
- 'foo',
+ self.volume.id,
'--preserve-on-termination',
]
verifylist = [
('server', self.server.id),
- ('volume', 'foo'),
+ ('volume', self.volume.id),
('delete_on_termination', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -333,4 +351,6 @@ class TestServerVolumeUpdate(TestServerVolume):
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
- parsed_args)
+ parsed_args,
+ )
+ self.compute_client.update_volume_attachment.assert_not_called()
diff --git a/openstackclient/tests/unit/image/v2/test_image.py b/openstackclient/tests/unit/image/v2/test_image.py
index 010c4a9d..019b4d9d 100644
--- a/openstackclient/tests/unit/image/v2/test_image.py
+++ b/openstackclient/tests/unit/image/v2/test_image.py
@@ -905,7 +905,10 @@ class TestImageList(TestImage):
marker=self._image.id,
)
- self.client.find_image.assert_called_with('graven')
+ self.client.find_image.assert_called_with(
+ 'graven',
+ ignore_missing=False,
+ )
def test_image_list_name_option(self):
arglist = [
@@ -1856,6 +1859,10 @@ class TestImageImport(TestImage):
self.client.import_image.assert_called_once_with(
self.image,
method='glance-direct',
+ uri=None,
+ remote_region=None,
+ remote_image=None,
+ remote_service_interface=None,
stores=None,
all_stores=None,
all_stores_must_succeed=False,
@@ -1880,7 +1887,10 @@ class TestImageImport(TestImage):
self.client.import_image.assert_called_once_with(
self.image,
method='web-download',
- # uri='https://example.com/',
+ uri='https://example.com/',
+ remote_region=None,
+ remote_image=None,
+ remote_service_interface=None,
stores=None,
all_stores=None,
all_stores_must_succeed=False,
@@ -1978,6 +1988,10 @@ class TestImageImport(TestImage):
self.client.import_image.assert_called_once_with(
self.image,
method='copy-image',
+ uri=None,
+ remote_region=None,
+ remote_image=None,
+ remote_service_interface=None,
stores=['fast'],
all_stores=None,
all_stores_must_succeed=False,
@@ -2005,9 +2019,10 @@ class TestImageImport(TestImage):
self.client.import_image.assert_called_once_with(
self.image,
method='glance-download',
- # remote_region='eu/dublin',
- # remote_image='remote-image-id',
- # remote_service_interface='private',
+ uri=None,
+ remote_region='eu/dublin',
+ remote_image='remote-image-id',
+ remote_service_interface='private',
stores=None,
all_stores=None,
all_stores_must_succeed=False,
diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py
index 4d029a0e..6d922008 100644
--- a/openstackclient/tests/unit/network/v2/fakes.py
+++ b/openstackclient/tests/unit/network/v2/fakes.py
@@ -34,6 +34,7 @@ from openstack.network.v2 import port as _port
from openstack.network.v2 import rbac_policy as network_rbac
from openstack.network.v2 import segment as _segment
from openstack.network.v2 import service_profile as _flavor_profile
+from openstack.network.v2 import trunk as _trunk
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
@@ -1065,11 +1066,13 @@ class FakeFloatingIPPortForwarding(object):
""""Fake one or more Port forwarding"""
@staticmethod
- def create_one_port_forwarding(attrs=None):
+ def create_one_port_forwarding(attrs=None, use_range=False):
"""Create a fake Port Forwarding.
:param Dictionary attrs:
A dictionary with all attributes
+ :param Boolean use_range:
+ A boolean which defines if we will use ranges or not
:return:
A FakeResource object with name, id, etc.
"""
@@ -1083,13 +1086,29 @@ class FakeFloatingIPPortForwarding(object):
'floatingip_id': floatingip_id,
'internal_port_id': 'internal-port-id-' + uuid.uuid4().hex,
'internal_ip_address': '192.168.1.2',
- 'internal_port': randint(1, 65535),
- 'external_port': randint(1, 65535),
'protocol': 'tcp',
'description': 'some description',
'location': 'MUNCHMUNCHMUNCH',
}
+ if use_range:
+ port_range = randint(0, 100)
+ internal_start = randint(1, 65535 - port_range)
+ internal_end = internal_start + port_range
+ internal_range = ':'.join(map(str, [internal_start, internal_end]))
+ external_start = randint(1, 65535 - port_range)
+ external_end = external_start + port_range
+ external_range = ':'.join(map(str, [external_start, external_end]))
+ port_forwarding_attrs['internal_port_range'] = internal_range
+ port_forwarding_attrs['external_port_range'] = external_range
+ port_forwarding_attrs['internal_port'] = None
+ port_forwarding_attrs['external_port'] = None
+ else:
+ port_forwarding_attrs['internal_port'] = randint(1, 65535)
+ port_forwarding_attrs['external_port'] = randint(1, 65535)
+ port_forwarding_attrs['internal_port_range'] = ''
+ port_forwarding_attrs['external_port_range'] = ''
+
# Overwrite default attributes.
port_forwarding_attrs.update(attrs)
@@ -1100,25 +1119,28 @@ class FakeFloatingIPPortForwarding(object):
return port_forwarding
@staticmethod
- def create_port_forwardings(attrs=None, count=2):
+ def create_port_forwardings(attrs=None, count=2, use_range=False):
"""Create multiple fake Port Forwarding.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of Port Forwarding rule to fake
+ :param Boolean use_range:
+ A boolean which defines if we will use ranges or not
:return:
A list of FakeResource objects faking the Port Forwardings
"""
port_forwardings = []
for i in range(0, count):
port_forwardings.append(
- FakeFloatingIPPortForwarding.create_one_port_forwarding(attrs)
+ FakeFloatingIPPortForwarding.create_one_port_forwarding(
+ attrs, use_range=use_range)
)
return port_forwardings
@staticmethod
- def get_port_forwardings(port_forwardings=None, count=2):
+ def get_port_forwardings(port_forwardings=None, count=2, use_range=False):
"""Get a list of faked Port Forwardings.
If port forwardings list is provided, then initialize the Mock object
@@ -1128,13 +1150,16 @@ class FakeFloatingIPPortForwarding(object):
A list of FakeResource objects faking port forwardings
:param int count:
The number of Port Forwardings to fake
+ :param Boolean use_range:
+ A boolean which defines if we will use ranges or not
:return:
An iterable Mock object with side_effect set to a list of faked
Port Forwardings
"""
if port_forwardings is None:
port_forwardings = (
- FakeFloatingIPPortForwarding.create_port_forwardings(count)
+ FakeFloatingIPPortForwarding.create_port_forwardings(
+ count, use_range=use_range)
)
return mock.Mock(side_effect=port_forwardings)
@@ -2152,3 +2177,71 @@ def get_ndp_proxies(ndp_proxies=None, count=2):
create_ndp_proxies(count)
)
return mock.Mock(side_effect=ndp_proxies)
+
+
+def create_one_trunk(attrs=None):
+ """Create a fake trunk.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with name, id, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ trunk_attrs = {
+ 'id': 'trunk-id-' + uuid.uuid4().hex,
+ 'name': 'trunk-name-' + uuid.uuid4().hex,
+ 'description': '',
+ 'port_id': 'port-' + uuid.uuid4().hex,
+ 'admin_state_up': True,
+ 'project_id': 'project-id-' + uuid.uuid4().hex,
+ 'status': 'ACTIVE',
+ 'sub_ports': [{'port_id': 'subport-' +
+ uuid.uuid4().hex,
+ 'segmentation_type': 'vlan',
+ 'segmentation_id': 100}],
+ }
+ # Overwrite default attributes.
+ trunk_attrs.update(attrs)
+
+ trunk = _trunk.Trunk(**trunk_attrs)
+
+ return trunk
+
+
+def create_trunks(attrs=None, count=2):
+ """Create multiple fake trunks.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of trunks to fake
+ :return:
+ A list of FakeResource objects faking the trunks
+ """
+ trunks = []
+ for i in range(0, count):
+ trunks.append(create_one_trunk(attrs))
+
+ return trunks
+
+
+def get_trunks(trunks=None, count=2):
+ """Get an iterable Mock object with a list of faked trunks.
+
+ If trunk list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List trunks:
+ A list of FakeResource objects faking trunks
+ :param int count:
+ The number of trunks to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ trunks
+ """
+ if trunks is None:
+ trunks = create_trunks(count)
+ return mock.Mock(side_effect=trunks)
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_port_forwarding.py b/openstackclient/tests/unit/network/v2/test_floating_ip_port_forwarding.py
index 97399f43..d0f5af8c 100644
--- a/openstackclient/tests/unit/network/v2/test_floating_ip_port_forwarding.py
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip_port_forwarding.py
@@ -49,6 +49,18 @@ class TestCreateFloatingIPPortForwarding(TestFloatingIPPortForwarding):
}
)
)
+
+ self.new_port_forwarding_with_ranges = (
+ network_fakes.FakeFloatingIPPortForwarding.
+ create_one_port_forwarding(
+ use_range=True,
+ attrs={
+ 'internal_port_id': self.port.id,
+ 'floatingip_id': self.floating_ip.id,
+ }
+ )
+ )
+
self.network.create_floating_ip_port_forwarding = mock.Mock(
return_value=self.new_port_forwarding)
@@ -63,22 +75,26 @@ class TestCreateFloatingIPPortForwarding(TestFloatingIPPortForwarding):
self.columns = (
'description',
'external_port',
+ 'external_port_range',
'floatingip_id',
'id',
'internal_ip_address',
'internal_port',
'internal_port_id',
+ 'internal_port_range',
'protocol'
)
self.data = (
self.new_port_forwarding.description,
self.new_port_forwarding.external_port,
+ self.new_port_forwarding.external_port_range,
self.new_port_forwarding.floatingip_id,
self.new_port_forwarding.id,
self.new_port_forwarding.internal_ip_address,
self.new_port_forwarding.internal_port,
self.new_port_forwarding.internal_port_id,
+ self.new_port_forwarding.internal_port_range,
self.new_port_forwarding.protocol,
)
@@ -90,6 +106,160 @@ class TestCreateFloatingIPPortForwarding(TestFloatingIPPortForwarding):
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
+ def test_create_all_options_with_range(self):
+ arglist = [
+ '--port', self.new_port_forwarding_with_ranges.internal_port_id,
+ '--internal-protocol-port',
+ self.new_port_forwarding_with_ranges.internal_port_range,
+ '--external-protocol-port',
+ self.new_port_forwarding_with_ranges.external_port_range,
+ '--protocol', self.new_port_forwarding_with_ranges.protocol,
+ self.new_port_forwarding_with_ranges.floatingip_id,
+ '--internal-ip-address',
+ self.new_port_forwarding_with_ranges.internal_ip_address,
+ '--description',
+ self.new_port_forwarding_with_ranges.description,
+ ]
+ verifylist = [
+ ('port', self.new_port_forwarding_with_ranges.internal_port_id),
+ ('internal_protocol_port',
+ self.new_port_forwarding_with_ranges.internal_port_range),
+ ('external_protocol_port',
+ self.new_port_forwarding_with_ranges.external_port_range),
+ ('protocol', self.new_port_forwarding_with_ranges.protocol),
+ ('floating_ip',
+ self.new_port_forwarding_with_ranges.floatingip_id),
+ ('internal_ip_address', self.new_port_forwarding_with_ranges.
+ internal_ip_address),
+ ('description', self.new_port_forwarding_with_ranges.description),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_floating_ip_port_forwarding.\
+ assert_called_once_with(
+ self.new_port_forwarding.floatingip_id,
+ **{
+ 'external_port_range':
+ self.new_port_forwarding_with_ranges.
+ external_port_range,
+ 'internal_ip_address':
+ self.new_port_forwarding_with_ranges.
+ internal_ip_address,
+ 'internal_port_range':
+ self.new_port_forwarding_with_ranges.
+ internal_port_range,
+ 'internal_port_id':
+ self.new_port_forwarding_with_ranges.internal_port_id,
+ 'protocol': self.new_port_forwarding_with_ranges.protocol,
+ 'description':
+ self.new_port_forwarding_with_ranges.description,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_all_options_with_range_invalid_port_exception(self):
+ invalid_port_range = '999999:999999'
+ arglist = [
+ '--port', self.new_port_forwarding_with_ranges.internal_port_id,
+ '--internal-protocol-port', invalid_port_range,
+ '--external-protocol-port', invalid_port_range,
+ '--protocol', self.new_port_forwarding_with_ranges.protocol,
+ self.new_port_forwarding_with_ranges.floatingip_id,
+ '--internal-ip-address',
+ self.new_port_forwarding_with_ranges.internal_ip_address,
+ '--description',
+ self.new_port_forwarding_with_ranges.description,
+ ]
+ verifylist = [
+ ('port', self.new_port_forwarding_with_ranges.internal_port_id),
+ ('internal_protocol_port', invalid_port_range),
+ ('external_protocol_port', invalid_port_range),
+ ('protocol', self.new_port_forwarding_with_ranges.protocol),
+ ('floating_ip',
+ self.new_port_forwarding_with_ranges.floatingip_id),
+ ('internal_ip_address', self.new_port_forwarding_with_ranges.
+ internal_ip_address),
+ ('description', self.new_port_forwarding_with_ranges.description),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ msg = 'The port number range is <1-65535>'
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual(msg, str(e))
+ self.network.create_floating_ip_port_forwarding.assert_not_called()
+
+ def test_create_all_options_with_invalid_range_exception(self):
+ invalid_port_range = '80:70'
+ arglist = [
+ '--port', self.new_port_forwarding_with_ranges.internal_port_id,
+ '--internal-protocol-port', invalid_port_range,
+ '--external-protocol-port', invalid_port_range,
+ '--protocol', self.new_port_forwarding_with_ranges.protocol,
+ self.new_port_forwarding_with_ranges.floatingip_id,
+ '--internal-ip-address',
+ self.new_port_forwarding_with_ranges.internal_ip_address,
+ '--description',
+ self.new_port_forwarding_with_ranges.description,
+ ]
+ verifylist = [
+ ('port', self.new_port_forwarding_with_ranges.internal_port_id),
+ ('internal_protocol_port', invalid_port_range),
+ ('external_protocol_port', invalid_port_range),
+ ('protocol', self.new_port_forwarding_with_ranges.protocol),
+ ('floating_ip',
+ self.new_port_forwarding_with_ranges.floatingip_id),
+ ('internal_ip_address', self.new_port_forwarding_with_ranges.
+ internal_ip_address),
+ ('description', self.new_port_forwarding_with_ranges.description),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ msg = 'The last number in port range must be greater or equal to ' \
+ 'the first'
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual(msg, str(e))
+ self.network.create_floating_ip_port_forwarding.assert_not_called()
+
+ def test_create_all_options_with_unmatch_ranges_exception(self):
+ internal_range = '80:90'
+ external_range = '8080:8100'
+ arglist = [
+ '--port', self.new_port_forwarding_with_ranges.internal_port_id,
+ '--internal-protocol-port', internal_range,
+ '--external-protocol-port', external_range,
+ '--protocol', self.new_port_forwarding_with_ranges.protocol,
+ self.new_port_forwarding_with_ranges.floatingip_id,
+ '--internal-ip-address',
+ self.new_port_forwarding_with_ranges.internal_ip_address,
+ '--description',
+ self.new_port_forwarding_with_ranges.description,
+ ]
+ verifylist = [
+ ('port', self.new_port_forwarding_with_ranges.internal_port_id),
+ ('internal_protocol_port', internal_range),
+ ('external_protocol_port', external_range),
+ ('protocol', self.new_port_forwarding_with_ranges.protocol),
+ ('floating_ip',
+ self.new_port_forwarding_with_ranges.floatingip_id),
+ ('internal_ip_address', self.new_port_forwarding_with_ranges.
+ internal_ip_address),
+ ('description', self.new_port_forwarding_with_ranges.description),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ msg = "The relation between internal and external ports does not " \
+ "match the pattern 1:N and N:N"
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual(msg, str(e))
+ self.network.create_floating_ip_port_forwarding.assert_not_called()
+
def test_create_all_options(self):
arglist = [
'--port', self.new_port_forwarding.internal_port_id,
@@ -106,8 +276,10 @@ class TestCreateFloatingIPPortForwarding(TestFloatingIPPortForwarding):
]
verifylist = [
('port', self.new_port_forwarding.internal_port_id),
- ('internal_protocol_port', self.new_port_forwarding.internal_port),
- ('external_protocol_port', self.new_port_forwarding.external_port),
+ ('internal_protocol_port',
+ str(self.new_port_forwarding.internal_port)),
+ ('external_protocol_port',
+ str(self.new_port_forwarding.external_port)),
('protocol', self.new_port_forwarding.protocol),
('floating_ip', self.new_port_forwarding.floatingip_id),
('internal_ip_address', self.new_port_forwarding.
@@ -253,7 +425,9 @@ class TestListFloatingIPPortForwarding(TestFloatingIPPortForwarding):
'Internal Port ID',
'Internal IP Address',
'Internal Port',
+ 'Internal Port Range',
'External Port',
+ 'External Port Range',
'Protocol',
'Description',
)
@@ -275,7 +449,9 @@ class TestListFloatingIPPortForwarding(TestFloatingIPPortForwarding):
port_forwarding.internal_port_id,
port_forwarding.internal_ip_address,
port_forwarding.internal_port,
+ port_forwarding.internal_port_range,
port_forwarding.external_port,
+ port_forwarding.external_port_range,
port_forwarding.protocol,
port_forwarding.description,
))
@@ -330,7 +506,7 @@ class TestListFloatingIPPortForwarding(TestFloatingIPPortForwarding):
query = {
'internal_port_id': self.port_forwardings[0].internal_port_id,
- 'external_port': str(self.port_forwardings[0].external_port),
+ 'external_port': self.port_forwardings[0].external_port,
'protocol': self.port_forwardings[0].protocol,
}
@@ -392,7 +568,7 @@ class TestSetFloatingIPPortForwarding(TestFloatingIPPortForwarding):
self.assertIsNone(result)
def test_set_all_thing(self):
- arglist = [
+ arglist_single = [
'--port', self.port.id,
'--internal-ip-address', 'new_internal_ip_address',
'--internal-protocol-port', '100',
@@ -402,21 +578,23 @@ class TestSetFloatingIPPortForwarding(TestFloatingIPPortForwarding):
self._port_forwarding.floatingip_id,
self._port_forwarding.id,
]
- verifylist = [
+ arglist_range = list(arglist_single)
+ arglist_range[5] = '100:110'
+ arglist_range[7] = '200:210'
+ verifylist_single = [
('port', self.port.id),
('internal_ip_address', 'new_internal_ip_address'),
- ('internal_protocol_port', 100),
- ('external_protocol_port', 200),
+ ('internal_protocol_port', '100'),
+ ('external_protocol_port', '200'),
('protocol', 'tcp'),
('description', 'some description'),
('floating_ip', self._port_forwarding.floatingip_id),
('port_forwarding_id', self._port_forwarding.id),
]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- attrs = {
+ verifylist_range = list(verifylist_single)
+ verifylist_range[2] = ('internal_protocol_port', '100:110')
+ verifylist_range[3] = ('external_protocol_port', '200:210')
+ attrs_single = {
'internal_port_id': self.port.id,
'internal_ip_address': 'new_internal_ip_address',
'internal_port': 100,
@@ -424,12 +602,25 @@ class TestSetFloatingIPPortForwarding(TestFloatingIPPortForwarding):
'protocol': 'tcp',
'description': 'some description',
}
- self.network.update_floating_ip_port_forwarding.assert_called_with(
- self._port_forwarding.floatingip_id,
- self._port_forwarding.id,
- **attrs
- )
- self.assertIsNone(result)
+ attrs_range = dict(attrs_single, internal_port_range='100:110',
+ external_port_range='200:210')
+ attrs_range.pop('internal_port')
+ attrs_range.pop('external_port')
+
+ def run_and_validate(arglist, verifylist, attrs):
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_floating_ip_port_forwarding.assert_called_with(
+ self._port_forwarding.floatingip_id,
+ self._port_forwarding.id,
+ **attrs
+ )
+ self.assertIsNone(result)
+
+ run_and_validate(arglist_single, verifylist_single, attrs_single)
+ run_and_validate(arglist_range, verifylist_range, attrs_range)
class TestShowFloatingIPPortForwarding(TestFloatingIPPortForwarding):
@@ -438,11 +629,13 @@ class TestShowFloatingIPPortForwarding(TestFloatingIPPortForwarding):
columns = (
'description',
'external_port',
+ 'external_port_range',
'floatingip_id',
'id',
'internal_ip_address',
'internal_port',
'internal_port_id',
+ 'internal_port_range',
'protocol',
)
@@ -459,11 +652,13 @@ class TestShowFloatingIPPortForwarding(TestFloatingIPPortForwarding):
self.data = (
self._port_forwarding.description,
self._port_forwarding.external_port,
+ self._port_forwarding.external_port_range,
self._port_forwarding.floatingip_id,
self._port_forwarding.id,
self._port_forwarding.internal_ip_address,
self._port_forwarding.internal_port,
self._port_forwarding.internal_port_id,
+ self._port_forwarding.internal_port_range,
self._port_forwarding.protocol,
)
self.network.find_floating_ip_port_forwarding = mock.Mock(
diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_policy.py b/openstackclient/tests/unit/network/v2/test_network_qos_policy.py
index af4cb3fb..ca21ccf7 100644
--- a/openstackclient/tests/unit/network/v2/test_network_qos_policy.py
+++ b/openstackclient/tests/unit/network/v2/test_network_qos_policy.py
@@ -432,7 +432,7 @@ class TestShowNetworkQosPolicy(TestQosPolicy):
_qos_policy.is_default,
_qos_policy.name,
_qos_policy.project_id,
- _qos_policy.rules,
+ network_qos_policy.RulesColumn(_qos_policy.rules),
_qos_policy.shared,
)
diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/unit/network/v2/test_network_qos_rule_type.py
index 08a83fab..3aae822e 100644
--- a/openstackclient/tests/unit/network/v2/test_network_qos_rule_type.py
+++ b/openstackclient/tests/unit/network/v2/test_network_qos_rule_type.py
@@ -115,3 +115,37 @@ class TestListNetworkQosRuleType(TestNetworkQosRuleType):
self.network.qos_rule_types.assert_called_once_with(**{})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
+
+ def test_qos_rule_type_list_all_supported(self):
+ arglist = [
+ '--all-supported'
+ ]
+ verifylist = [
+ ('all_supported', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.qos_rule_types.assert_called_once_with(
+ **{'all_supported': True}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_qos_rule_type_list_all_rules(self):
+ arglist = [
+ '--all-rules'
+ ]
+ verifylist = [
+ ('all_rules', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.qos_rule_types.assert_called_once_with(
+ **{'all_rules': True}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_network_trunk.py b/openstackclient/tests/unit/network/v2/test_network_trunk.py
new file mode 100644
index 00000000..fae70fb0
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network_trunk.py
@@ -0,0 +1,851 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import copy
+from unittest import mock
+from unittest.mock import call
+
+from osc_lib.cli import format_columns
+from osc_lib import exceptions
+import testtools
+
+from openstackclient.network.v2 import network_trunk
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+# Tests for Neutron trunks
+#
+class TestNetworkTrunk(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super().setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestCreateNetworkTrunk(TestNetworkTrunk):
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ trunk_networks = network_fakes.create_networks(count=2)
+ parent_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[0]['id']})
+ sub_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[1]['id']})
+
+ new_trunk = network_fakes.create_one_trunk(
+ attrs={'project_id': project.id,
+ 'port_id': parent_port['id'],
+ 'sub_ports': {
+ 'port_id': sub_port['id'],
+ 'segmentation_id': 42,
+ 'segmentation_type': 'vlan'}
+ })
+
+ columns = (
+ 'description',
+ 'id',
+ 'is_admin_state_up',
+ 'name',
+ 'port_id',
+ 'project_id',
+ 'status',
+ 'sub_ports',
+ 'tags'
+ )
+ data = (
+ new_trunk.description,
+ new_trunk.id,
+ new_trunk.is_admin_state_up,
+ new_trunk.name,
+ new_trunk.port_id,
+ new_trunk.project_id,
+ new_trunk.status,
+ format_columns.ListDictColumn(new_trunk.sub_ports),
+ [],
+ )
+
+ def setUp(self):
+ super().setUp()
+ self.network.create_trunk = mock.Mock(return_value=self.new_trunk)
+ self.network.find_port = mock.Mock(
+ side_effect=[self.parent_port, self.sub_port])
+
+ # Get the command object to test
+ self.cmd = network_trunk.CreateNetworkTrunk(self.app, self.namespace)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ "--parent-port", self.new_trunk['port_id'],
+ self.new_trunk['name'],
+ ]
+ verifylist = [
+ ('parent_port', self.new_trunk['port_id']),
+ ('name', self.new_trunk['name']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_trunk.assert_called_once_with(**{
+ 'name': self.new_trunk['name'],
+ 'admin_state_up': self.new_trunk['admin_state_up'],
+ 'port_id': self.new_trunk['port_id'],
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_full_options(self):
+ self.new_trunk['description'] = 'foo description'
+ subport = self.new_trunk.sub_ports[0]
+ arglist = [
+ "--disable",
+ "--description", self.new_trunk.description,
+ "--parent-port", self.new_trunk.port_id,
+ "--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,'
+ 'segmentation-id=%(seg_id)s' % {
+ 'seg_id': subport['segmentation_id'],
+ 'seg_type': subport['segmentation_type'],
+ 'port': subport['port_id']},
+ self.new_trunk.name,
+ ]
+ verifylist = [
+ ('name', self.new_trunk.name),
+ ('description', self.new_trunk.description),
+ ('parent_port', self.new_trunk.port_id),
+ ('add_subports', [{
+ 'port': subport['port_id'],
+ 'segmentation-id': str(subport['segmentation_id']),
+ 'segmentation-type': subport['segmentation_type']}]),
+ ('disable', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_trunk.assert_called_once_with(**{
+ 'name': self.new_trunk.name,
+ 'description': self.new_trunk.description,
+ 'admin_state_up': False,
+ 'port_id': self.new_trunk.port_id,
+ 'sub_ports': [subport],
+ })
+ self.assertEqual(self.columns, columns)
+ data_with_desc = list(self.data)
+ data_with_desc[0] = self.new_trunk['description']
+ data_with_desc = tuple(data_with_desc)
+ self.assertEqual(data_with_desc, data)
+
+ def test_create_trunk_with_subport_invalid_segmentation_id_fail(self):
+ subport = self.new_trunk.sub_ports[0]
+ arglist = [
+ "--parent-port", self.new_trunk.port_id,
+ "--subport", "port=%(port)s,segmentation-type=%(seg_type)s,"
+ "segmentation-id=boom" % {
+ 'seg_type': subport['segmentation_type'],
+ 'port': subport['port_id']},
+ self.new_trunk.name,
+ ]
+ verifylist = [
+ ('name', self.new_trunk.name),
+ ('parent_port', self.new_trunk.port_id),
+ ('add_subports', [{
+ 'port': subport['port_id'],
+ 'segmentation-id': 'boom',
+ 'segmentation-type': subport['segmentation_type']}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ with testtools.ExpectedException(exceptions.CommandError) as e:
+ self.cmd.take_action(parsed_args)
+ self.assertEqual("Segmentation-id 'boom' is not an integer",
+ str(e))
+
+ def test_create_network_trunk_subports_without_optional_keys(self):
+ subport = copy.copy(self.new_trunk.sub_ports[0])
+ # Pop out the segmentation-id and segmentation-type
+ subport.pop('segmentation_type')
+ subport.pop('segmentation_id')
+ arglist = [
+ '--parent-port', self.new_trunk.port_id,
+ '--subport', 'port=%(port)s' % {'port': subport['port_id']},
+ self.new_trunk.name,
+ ]
+ verifylist = [
+ ('name', self.new_trunk.name),
+ ('parent_port', self.new_trunk.port_id),
+ ('add_subports', [{
+ 'port': subport['port_id']}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_trunk.assert_called_once_with(**{
+ 'name': self.new_trunk.name,
+ 'admin_state_up': True,
+ 'port_id': self.new_trunk.port_id,
+ 'sub_ports': [subport],
+ })
+ self.assertEqual(self.columns, columns)
+ data_with_desc = list(self.data)
+ data_with_desc[0] = self.new_trunk['description']
+ data_with_desc = tuple(data_with_desc)
+ self.assertEqual(data_with_desc, data)
+
+ def test_create_network_trunk_subports_without_required_key_fail(self):
+ subport = self.new_trunk.sub_ports[0]
+ arglist = [
+ '--parent-port', self.new_trunk.port_id,
+ '--subport', 'segmentation-type=%(seg_type)s,'
+ 'segmentation-id=%(seg_id)s' % {
+ 'seg_id': subport['segmentation_id'],
+ 'seg_type': subport['segmentation_type']},
+ self.new_trunk.name,
+ ]
+ verifylist = [
+ ('name', self.new_trunk.name),
+ ('parent_port', self.new_trunk.port_id),
+ ('add_subports', [{
+ 'segmentation_id': str(subport['segmentation_id']),
+ 'segmentation_type': subport['segmentation_type']}]),
+ ]
+
+ with testtools.ExpectedException(argparse.ArgumentTypeError):
+ self.check_parser(self.cmd, arglist, verifylist)
+
+
+class TestDeleteNetworkTrunk(TestNetworkTrunk):
+ # The trunk to be deleted.
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ trunk_networks = network_fakes.create_networks(count=2)
+ parent_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[0]['id']})
+ sub_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[1]['id']})
+
+ new_trunks = network_fakes.create_trunks(
+ attrs={'project_id': project.id,
+ 'port_id': parent_port['id'],
+ 'sub_ports': {
+ 'port_id': sub_port['id'],
+ 'segmentation_id': 42,
+ 'segmentation_type': 'vlan'}
+ })
+
+ def setUp(self):
+ super().setUp()
+ self.network.find_trunk = mock.Mock(
+ side_effect=[self.new_trunks[0], self.new_trunks[1]])
+ self.network.delete_trunk = mock.Mock(return_value=None)
+ self.network.find_port = mock.Mock(
+ side_effect=[self.parent_port, self.sub_port])
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Get the command object to test
+ self.cmd = network_trunk.DeleteNetworkTrunk(self.app, self.namespace)
+
+ def test_delete_trunkx(self):
+ arglist = [
+ self.new_trunks[0].name,
+ ]
+ verifylist = [
+ ('trunk', [self.new_trunks[0].name]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.delete_trunk.assert_called_once_with(
+ self.new_trunks[0].id)
+ self.assertIsNone(result)
+
+ def test_delete_trunk_multiple(self):
+ arglist = []
+ verifylist = []
+
+ for t in self.new_trunks:
+ arglist.append(t['name'])
+ verifylist = [
+ ('trunk', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for t in self.new_trunks:
+ calls.append(call(t.id))
+ self.network.delete_trunk.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_delete_trunk_multiple_with_exception(self):
+ arglist = [
+ self.new_trunks[0].name,
+ 'unexist_trunk',
+ ]
+ verifylist = [
+ ('trunk',
+ [self.new_trunks[0].name, 'unexist_trunk']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.network.find_trunk = mock.Mock(
+ side_effect=[self.new_trunks[0], exceptions.CommandError])
+ with testtools.ExpectedException(exceptions.CommandError) as e:
+ self.cmd.take_action(parsed_args)
+ self.assertEqual('1 of 2 trunks failed to delete.', str(e))
+ self.network.delete_trunk.assert_called_once_with(
+ self.new_trunks[0].id
+ )
+
+
+class TestShowNetworkTrunk(TestNetworkTrunk):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ # The trunk to set.
+ new_trunk = network_fakes.create_one_trunk()
+ columns = (
+ 'description',
+ 'id',
+ 'is_admin_state_up',
+ 'name',
+ 'port_id',
+ 'project_id',
+ 'status',
+ 'sub_ports',
+ 'tags'
+ )
+ data = (
+ new_trunk.description,
+ new_trunk.id,
+ new_trunk.is_admin_state_up,
+ new_trunk.name,
+ new_trunk.port_id,
+ new_trunk.project_id,
+ new_trunk.status,
+ format_columns.ListDictColumn(new_trunk.sub_ports),
+ [],
+ )
+
+ def setUp(self):
+ super().setUp()
+ self.network.find_trunk = mock.Mock(return_value=self.new_trunk)
+ self.network.get_trunk = mock.Mock(return_value=self.new_trunk)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Get the command object to test
+ self.cmd = network_trunk.ShowNetworkTrunk(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_trunk.id,
+ ]
+ verifylist = [
+ ('trunk', self.new_trunk.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_trunk.assert_called_once_with(self.new_trunk.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestListNetworkTrunk(TestNetworkTrunk):
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ # Create trunks to be listed.
+ new_trunks = network_fakes.create_trunks(
+ {'created_at': '2001-01-01 00:00:00',
+ 'updated_at': '2001-01-01 00:00:00'}, count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Parent Port',
+ 'Description'
+ )
+ columns_long = columns + (
+ 'Status',
+ 'State',
+ 'Created At',
+ 'Updated At'
+ )
+ data = []
+ for t in new_trunks:
+ data.append((
+ t['id'],
+ t['name'],
+ t['port_id'],
+ t['description']
+ ))
+ data_long = []
+ for t in new_trunks:
+ data_long.append((
+ t['id'],
+ t['name'],
+ t['port_id'],
+ t['description'],
+ t['status'],
+ network_trunk.AdminStateColumn(''),
+ '2001-01-01 00:00:00',
+ '2001-01-01 00:00:00',
+ ))
+
+ def setUp(self):
+ super().setUp()
+ self.network.trunks = mock.Mock(return_value=self.new_trunks)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Get the command object to test
+ self.cmd = network_trunk.ListNetworkTrunk(self.app, self.namespace)
+
+ def test_trunk_list_no_option(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.trunks.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_trunk_list_long(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.trunks.assert_called_once_with()
+ self.assertEqual(self.columns_long, columns)
+ self.assertEqual(self.data_long, list(data))
+
+
+class TestSetNetworkTrunk(TestNetworkTrunk):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ trunk_networks = network_fakes.create_networks(count=2)
+ parent_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[0]['id']})
+ sub_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[1]['id']})
+ # Create trunks to be listed.
+ _trunk = network_fakes.create_one_trunk(
+ attrs={'project_id': project.id,
+ 'port_id': parent_port['id'],
+ 'sub_ports': {
+ 'port_id': sub_port['id'],
+ 'segmentation_id': 42,
+ 'segmentation_type': 'vlan'}
+ })
+ columns = (
+ 'admin_state_up',
+ 'id',
+ 'name',
+ 'description',
+ 'port_id',
+ 'project_id',
+ 'status',
+ 'sub_ports',
+ )
+ data = (
+ _trunk.id,
+ _trunk.name,
+ _trunk.description,
+ _trunk.port_id,
+ _trunk.project_id,
+ _trunk.status,
+ format_columns.ListDictColumn(_trunk.sub_ports),
+ )
+
+ def setUp(self):
+ super().setUp()
+ self.network.update_trunk = mock.Mock(return_value=self._trunk)
+ self.network.add_trunk_subports = mock.Mock(return_value=self._trunk)
+ self.network.find_trunk = mock.Mock(return_value=self._trunk)
+ self.network.find_port = mock.Mock(
+ side_effect=[self.sub_port, self.sub_port])
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Get the command object to test
+ self.cmd = network_trunk.SetNetworkTrunk(self.app, self.namespace)
+
+ def _test_set_network_trunk_attr(self, attr, value):
+ arglist = [
+ '--%s' % attr, value,
+ self._trunk[attr],
+ ]
+ verifylist = [
+ (attr, value),
+ ('trunk', self._trunk[attr]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ attr: value,
+ }
+ self.network.update_trunk.assert_called_once_with(
+ self._trunk, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_network_trunk_name(self):
+ self._test_set_network_trunk_attr('name', 'trunky')
+
+ def test_set_network_trunk_description(self):
+ self._test_set_network_trunk_attr('description', 'description')
+
+ def test_set_network_trunk_admin_state_up_disable(self):
+ arglist = [
+ '--disable',
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('disable', True),
+ ('trunk', self._trunk['name']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': False,
+ }
+ self.network.update_trunk.assert_called_once_with(
+ self._trunk, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_network_trunk_admin_state_up_enable(self):
+ arglist = [
+ '--enable',
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('enable', True),
+ ('trunk', self._trunk['name']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': True,
+ }
+ self.network.update_trunk.assert_called_once_with(
+ self._trunk, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_network_trunk_nothing(self):
+ arglist = [self._trunk['name'], ]
+ verifylist = [('trunk', self._trunk['name']), ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_trunk.assert_called_once_with(
+ self._trunk, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_network_trunk_subports(self):
+ subport = self._trunk['sub_ports'][0]
+ arglist = [
+ '--subport', 'port=%(port)s,segmentation-type=%(seg_type)s,'
+ 'segmentation-id=%(seg_id)s' % {
+ 'seg_id': subport['segmentation_id'],
+ 'seg_type': subport['segmentation_type'],
+ 'port': subport['port_id']},
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ('set_subports', [{
+ 'port': subport['port_id'],
+ 'segmentation-id': str(subport['segmentation_id']),
+ 'segmentation-type': subport['segmentation_type']}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.add_trunk_subports.assert_called_once_with(
+ self._trunk, [subport])
+ self.assertIsNone(result)
+
+ def test_set_network_trunk_subports_without_optional_keys(self):
+ subport = copy.copy(self._trunk['sub_ports'][0])
+ # Pop out the segmentation-id and segmentation-type
+ subport.pop('segmentation_type')
+ subport.pop('segmentation_id')
+ arglist = [
+ '--subport', 'port=%(port)s' % {'port': subport['port_id']},
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ('set_subports', [{
+ 'port': subport['port_id']}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.add_trunk_subports.assert_called_once_with(
+ self._trunk, [subport])
+ self.assertIsNone(result)
+
+ def test_set_network_trunk_subports_without_required_key_fail(self):
+ subport = self._trunk['sub_ports'][0]
+ arglist = [
+ '--subport', 'segmentation-type=%(seg_type)s,'
+ 'segmentation-id=%(seg_id)s' % {
+ 'seg_id': subport['segmentation_id'],
+ 'seg_type': subport['segmentation_type']},
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ('set_subports', [{
+ 'segmentation-id': str(subport['segmentation_id']),
+ 'segmentation-type': subport['segmentation_type']}]),
+ ]
+
+ with testtools.ExpectedException(argparse.ArgumentTypeError):
+ self.check_parser(self.cmd, arglist, verifylist)
+
+ self.network.add_trunk_subports.assert_not_called()
+
+ def test_set_trunk_attrs_with_exception(self):
+ arglist = [
+ '--name', 'reallylongname',
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ('name', 'reallylongname'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.network.update_trunk = (
+ mock.Mock(side_effect=exceptions.CommandError)
+ )
+ with testtools.ExpectedException(exceptions.CommandError) as e:
+ self.cmd.take_action(parsed_args)
+ self.assertEqual(
+ "Failed to set trunk '%s': " % self._trunk['name'],
+ str(e))
+ attrs = {'name': 'reallylongname'}
+ self.network.update_trunk.assert_called_once_with(
+ self._trunk, **attrs)
+ self.network.add_trunk_subports.assert_not_called()
+
+ def test_set_trunk_add_subport_with_exception(self):
+ arglist = [
+ '--subport', 'port=invalid_subport',
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ('set_subports', [{'port': 'invalid_subport'}]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.network.add_trunk_subports = (
+ mock.Mock(side_effect=exceptions.CommandError)
+ )
+ self.network.find_port = (mock.Mock(
+ return_value={'id': 'invalid_subport'}))
+ with testtools.ExpectedException(exceptions.CommandError) as e:
+ self.cmd.take_action(parsed_args)
+ self.assertEqual(
+ "Failed to add subports to trunk '%s': " % self._trunk['name'],
+ str(e))
+ self.network.update_trunk.assert_called_once_with(
+ self._trunk)
+ self.network.add_trunk_subports.assert_called_once_with(
+ self._trunk, [{'port_id': 'invalid_subport'}])
+
+
+class TestListNetworkSubport(TestNetworkTrunk):
+
+ _trunk = network_fakes.create_one_trunk()
+ _subports = _trunk['sub_ports']
+
+ columns = (
+ 'Port',
+ 'Segmentation Type',
+ 'Segmentation ID',
+ )
+ data = []
+ for s in _subports:
+ data.append((
+ s['port_id'],
+ s['segmentation_type'],
+ s['segmentation_id'],
+ ))
+
+ def setUp(self):
+ super().setUp()
+
+ self.network.find_trunk = mock.Mock(return_value=self._trunk)
+ self.network.get_trunk_subports = mock.Mock(
+ return_value={network_trunk.SUB_PORTS: self._subports})
+
+ # Get the command object to test
+ self.cmd = network_trunk.ListNetworkSubport(self.app, self.namespace)
+
+ def test_subport_list(self):
+ arglist = [
+ '--trunk', self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_trunk_subports.assert_called_once_with(self._trunk)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestUnsetNetworkTrunk(TestNetworkTrunk):
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ trunk_networks = network_fakes.create_networks(count=2)
+ parent_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[0]['id']})
+ sub_port = network_fakes.create_one_port(
+ attrs={'project_id': project.id,
+ 'network_id': trunk_networks[1]['id']})
+ _trunk = network_fakes.create_one_trunk(
+ attrs={'project_id': project.id,
+ 'port_id': parent_port['id'],
+ 'sub_ports': {
+ 'port_id': sub_port['id'],
+ 'segmentation_id': 42,
+ 'segmentation_type': 'vlan'}
+ })
+
+ columns = (
+ 'admin_state_up',
+ 'id',
+ 'name',
+ 'port_id',
+ 'project_id',
+ 'status',
+ 'sub_ports',
+ )
+ data = (
+ network_trunk.AdminStateColumn(_trunk['admin_state_up']),
+ _trunk['id'],
+ _trunk['name'],
+ _trunk['port_id'],
+ _trunk['project_id'],
+ _trunk['status'],
+ format_columns.ListDictColumn(_trunk['sub_ports']),
+ )
+
+ def setUp(self):
+ super().setUp()
+
+ self.network.find_trunk = mock.Mock(return_value=self._trunk)
+ self.network.find_port = mock.Mock(
+ side_effect=[self.sub_port, self.sub_port])
+ self.network.delete_trunk_subports = mock.Mock(return_value=None)
+
+ # Get the command object to test
+ self.cmd = network_trunk.UnsetNetworkTrunk(self.app, self.namespace)
+
+ def test_unset_network_trunk_subport(self):
+ subport = self._trunk['sub_ports'][0]
+ arglist = [
+ "--subport", subport['port_id'],
+ self._trunk['name'],
+ ]
+
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ('unset_subports', [subport['port_id']]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.delete_trunk_subports.assert_called_once_with(
+ self._trunk,
+ [{'port_id': subport['port_id']}]
+ )
+ self.assertIsNone(result)
+
+ def test_unset_subport_no_arguments_fail(self):
+ arglist = [
+ self._trunk['name'],
+ ]
+ verifylist = [
+ ('trunk', self._trunk['name']),
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, verifylist)
diff --git a/openstackclient/tests/unit/volume/v1/test_volume.py b/openstackclient/tests/unit/volume/v1/test_volume.py
index 9f16b398..b46a608d 100644
--- a/openstackclient/tests/unit/volume/v1/test_volume.py
+++ b/openstackclient/tests/unit/volume/v1/test_volume.py
@@ -430,7 +430,8 @@ class TestVolumeCreate(TestVolume):
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.datalist, data)
- def test_volume_create_with_bootable_and_readonly(self):
+ @mock.patch.object(utils, 'wait_for_status', return_value=True)
+ def test_volume_create_with_bootable_and_readonly(self, mock_wait):
arglist = [
'--bootable',
'--read-only',
@@ -472,7 +473,8 @@ class TestVolumeCreate(TestVolume):
self.volumes_mock.update_readonly_flag.assert_called_with(
self.new_volume.id, True)
- def test_volume_create_with_nonbootable_and_readwrite(self):
+ @mock.patch.object(utils, 'wait_for_status', return_value=True)
+ def test_volume_create_with_nonbootable_and_readwrite(self, mock_wait):
arglist = [
'--non-bootable',
'--read-write',
@@ -515,8 +517,9 @@ class TestVolumeCreate(TestVolume):
self.new_volume.id, False)
@mock.patch.object(volume.LOG, 'error')
+ @mock.patch.object(utils, 'wait_for_status', return_value=True)
def test_volume_create_with_bootable_and_readonly_fail(
- self, mock_error):
+ self, mock_wait, mock_error):
self.volumes_mock.set_bootable.side_effect = (
exceptions.CommandError())
@@ -566,6 +569,48 @@ class TestVolumeCreate(TestVolume):
self.volumes_mock.update_readonly_flag.assert_called_with(
self.new_volume.id, True)
+ @mock.patch.object(volume.LOG, 'error')
+ @mock.patch.object(utils, 'wait_for_status', return_value=False)
+ def test_volume_create_non_available_with_readonly(
+ self, mock_wait, mock_error):
+ arglist = [
+ '--non-bootable',
+ '--read-only',
+ '--size', str(self.new_volume.size),
+ self.new_volume.display_name,
+ ]
+ verifylist = [
+ ('bootable', False),
+ ('non_bootable', True),
+ ('read_only', True),
+ ('read_write', False),
+ ('size', self.new_volume.size),
+ ('name', self.new_volume.display_name),
+ ]
+
+ parsed_args = self.check_parser(
+ self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volumes_mock.create.assert_called_with(
+ self.new_volume.size,
+ None,
+ None,
+ self.new_volume.display_name,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ )
+
+ self.assertEqual(2, mock_error.call_count)
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.datalist, data)
+
def test_volume_create_without_size(self):
arglist = [
self.new_volume.display_name,
diff --git a/openstackclient/tests/unit/volume/v2/test_consistency_group.py b/openstackclient/tests/unit/volume/v2/test_consistency_group.py
index 7ef4a08e..c5537ed8 100644
--- a/openstackclient/tests/unit/volume/v2/test_consistency_group.py
+++ b/openstackclient/tests/unit/volume/v2/test_consistency_group.py
@@ -257,7 +257,7 @@ class TestConsistencyGroupCreate(TestConsistencyGroup):
self.new_consistency_group.name,
]
verifylist = [
- ('consistency_group_source', self.new_consistency_group.id),
+ ('source', self.new_consistency_group.id),
('description', self.new_consistency_group.description),
('name', self.new_consistency_group.name),
]
@@ -285,7 +285,7 @@ class TestConsistencyGroupCreate(TestConsistencyGroup):
self.new_consistency_group.name,
]
verifylist = [
- ('consistency_group_snapshot', self.consistency_group_snapshot.id),
+ ('snapshot', self.consistency_group_snapshot.id),
('description', self.new_consistency_group.description),
('name', self.new_consistency_group.name),
]
diff --git a/openstackclient/tests/unit/volume/v2/test_volume.py b/openstackclient/tests/unit/volume/v2/test_volume.py
index c930002f..0419acef 100644
--- a/openstackclient/tests/unit/volume/v2/test_volume.py
+++ b/openstackclient/tests/unit/volume/v2/test_volume.py
@@ -435,7 +435,8 @@ class TestVolumeCreate(TestVolume):
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.datalist, data)
- def test_volume_create_with_bootable_and_readonly(self):
+ @mock.patch.object(utils, 'wait_for_status', return_value=True)
+ def test_volume_create_with_bootable_and_readonly(self, mock_wait):
arglist = [
'--bootable',
'--read-only',
@@ -478,7 +479,8 @@ class TestVolumeCreate(TestVolume):
self.volumes_mock.update_readonly_flag.assert_called_with(
self.new_volume.id, True)
- def test_volume_create_with_nonbootable_and_readwrite(self):
+ @mock.patch.object(utils, 'wait_for_status', return_value=True)
+ def test_volume_create_with_nonbootable_and_readwrite(self, mock_wait):
arglist = [
'--non-bootable',
'--read-write',
@@ -522,8 +524,9 @@ class TestVolumeCreate(TestVolume):
self.new_volume.id, False)
@mock.patch.object(volume.LOG, 'error')
+ @mock.patch.object(utils, 'wait_for_status', return_value=True)
def test_volume_create_with_bootable_and_readonly_fail(
- self, mock_error):
+ self, mock_wait, mock_error):
self.volumes_mock.set_bootable.side_effect = (
exceptions.CommandError())
@@ -574,6 +577,50 @@ class TestVolumeCreate(TestVolume):
self.volumes_mock.update_readonly_flag.assert_called_with(
self.new_volume.id, True)
+ @mock.patch.object(volume.LOG, 'error')
+ @mock.patch.object(utils, 'wait_for_status', return_value=False)
+ def test_volume_create_non_available_with_readonly(
+ self, mock_wait, mock_error,
+ ):
+ arglist = [
+ '--non-bootable',
+ '--read-only',
+ '--size', str(self.new_volume.size),
+ self.new_volume.name,
+ ]
+ verifylist = [
+ ('bootable', False),
+ ('non_bootable', True),
+ ('read_only', True),
+ ('read_write', False),
+ ('size', self.new_volume.size),
+ ('name', self.new_volume.name),
+ ]
+
+ parsed_args = self.check_parser(
+ self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volumes_mock.create.assert_called_with(
+ size=self.new_volume.size,
+ snapshot_id=None,
+ name=self.new_volume.name,
+ description=None,
+ volume_type=None,
+ availability_zone=None,
+ metadata=None,
+ imageRef=None,
+ source_volid=None,
+ consistencygroup_id=None,
+ scheduler_hints=None,
+ backup_id=None,
+ )
+
+ self.assertEqual(2, mock_error.call_count)
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.datalist, data)
+
def test_volume_create_without_size(self):
arglist = [
self.new_volume.name,
diff --git a/openstackclient/tests/unit/volume/v3/fakes.py b/openstackclient/tests/unit/volume/v3/fakes.py
index 3e3a05fa..62383580 100644
--- a/openstackclient/tests/unit/volume/v3/fakes.py
+++ b/openstackclient/tests/unit/volume/v3/fakes.py
@@ -47,6 +47,10 @@ class FakeVolumeClient:
self.volumes.resource_class = fakes.FakeResource(None, {})
self.volume_types = mock.Mock()
self.volume_types.resource_class = fakes.FakeResource(None, {})
+ self.services = mock.Mock()
+ self.services.resource_class = fakes.FakeResource(None, {})
+ self.workers = mock.Mock()
+ self.workers.resource_class = fakes.FakeResource(None, {})
class TestVolume(utils.TestCommand):
@@ -436,3 +440,88 @@ def get_volume_attachments(attachments=None, count=2):
attachments = create_volume_attachments(count)
return mock.Mock(side_effect=attachments)
+
+
+def create_service_log_level_entry(attrs=None):
+ service_log_level_info = {
+ 'host': 'host_test',
+ 'binary': 'cinder-api',
+ 'prefix': 'cinder.api.common',
+ 'level': 'DEBUG',
+ }
+ # Overwrite default attributes if there are some attributes set
+ attrs = attrs or {}
+
+ service_log_level_info.update(attrs)
+
+ service_log_level = fakes.FakeResource(
+ None, service_log_level_info, loaded=True)
+ return service_log_level
+
+
+def create_cleanup_records():
+ """Create fake service cleanup records.
+
+ :return: A list of FakeResource objects
+ """
+ cleaning_records = []
+ unavailable_records = []
+ cleaning_work_info = {
+ 'id': 1,
+ 'host': 'devstack@fakedriver-1',
+ 'binary': 'cinder-volume',
+ 'cluster_name': 'fake_cluster',
+ }
+ unavailable_work_info = {
+ 'id': 2,
+ 'host': 'devstack@fakedriver-2',
+ 'binary': 'cinder-scheduler',
+ 'cluster_name': 'new_cluster',
+ }
+ cleaning_records.append(cleaning_work_info)
+ unavailable_records.append(unavailable_work_info)
+
+ cleaning = [fakes.FakeResource(
+ None, obj, loaded=True) for obj in cleaning_records]
+ unavailable = [fakes.FakeResource(
+ None, obj, loaded=True) for obj in unavailable_records]
+
+ return cleaning, unavailable
+
+
+def create_one_manage_record(attrs=None, snapshot=False):
+ manage_dict = {
+ 'reference': {'source-name': 'fake-volume'},
+ 'size': '1',
+ 'safe_to_manage': False,
+ 'reason_not_safe': 'already managed',
+ 'cinder_id': 'fake-volume',
+ 'extra_info': None,
+ }
+ if snapshot:
+ manage_dict['source_reference'] = {'source-name': 'fake-source'}
+
+ # Overwrite default attributes if there are some attributes set
+ attrs = attrs or {}
+
+ manage_dict.update(attrs)
+ manage_record = fakes.FakeResource(None, manage_dict, loaded=True)
+ return manage_record
+
+
+def create_volume_manage_list_records(count=2):
+ volume_manage_list = []
+ for i in range(count):
+ volume_manage_list.append(
+ create_one_manage_record({'size': str(i + 1)}))
+
+ return volume_manage_list
+
+
+def create_snapshot_manage_list_records(count=2):
+ snapshot_manage_list = []
+ for i in range(count):
+ snapshot_manage_list.append(
+ create_one_manage_record({'size': str(i + 1)}, snapshot=True))
+
+ return snapshot_manage_list
diff --git a/openstackclient/tests/unit/volume/v3/test_block_storage_cleanup.py b/openstackclient/tests/unit/volume/v3/test_block_storage_cleanup.py
new file mode 100644
index 00000000..b48ce2f9
--- /dev/null
+++ b/openstackclient/tests/unit/volume/v3/test_block_storage_cleanup.py
@@ -0,0 +1,178 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from cinderclient import api_versions
+from osc_lib import exceptions
+
+from openstackclient.tests.unit.volume.v3 import fakes as volume_fakes
+from openstackclient.volume.v3 import block_storage_cleanup
+
+
+class TestBlockStorage(volume_fakes.TestVolume):
+
+ def setUp(self):
+ super().setUp()
+
+ # Get a shortcut to the BlockStorageWorkerManager Mock
+ self.worker_mock = self.app.client_manager.volume.workers
+ self.worker_mock.reset_mock()
+
+
+class TestBlockStorageCleanup(TestBlockStorage):
+
+ cleaning, unavailable = volume_fakes.create_cleanup_records()
+
+ def setUp(self):
+ super().setUp()
+
+ self.worker_mock.clean.return_value = (self.cleaning, self.unavailable)
+
+ # Get the command object to test
+ self.cmd = \
+ block_storage_cleanup.BlockStorageCleanup(self.app, None)
+
+ def test_cleanup(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.24')
+
+ arglist = [
+ ]
+ verifylist = [
+ ('cluster', None),
+ ('host', None),
+ ('binary', None),
+ ('is_up', None),
+ ('disabled', None),
+ ('resource_id', None),
+ ('resource_type', None),
+ ('service_id', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ expected_columns = ('ID', 'Cluster Name', 'Host', 'Binary', 'Status')
+ cleaning_data = tuple(
+ (
+ obj.id,
+ obj.cluster_name,
+ obj.host,
+ obj.binary,
+ 'Cleaning'
+ ) for obj in self.cleaning
+ )
+ unavailable_data = tuple(
+ (
+ obj.id,
+ obj.cluster_name,
+ obj.host,
+ obj.binary,
+ 'Unavailable'
+ ) for obj in self.unavailable
+ )
+ expected_data = cleaning_data + unavailable_data
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, tuple(data))
+
+ # checking if proper call was made to cleanup resources
+ # Since we ignore all parameters with None value, we don't
+ # have any arguments passed to the API
+ self.worker_mock.clean.assert_called_once_with()
+
+ def test_block_storage_cleanup_pre_324(self):
+ arglist = [
+ ]
+ verifylist = [
+ ('cluster', None),
+ ('host', None),
+ ('binary', None),
+ ('is_up', None),
+ ('disabled', None),
+ ('resource_id', None),
+ ('resource_type', None),
+ ('service_id', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.24 or greater is required', str(exc))
+
+ def test_cleanup_with_args(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.24')
+
+ fake_cluster = 'fake-cluster'
+ fake_host = 'fake-host'
+ fake_binary = 'fake-service'
+ fake_resource_id = str(uuid.uuid4())
+ fake_resource_type = 'Volume'
+ fake_service_id = 1
+ arglist = [
+ '--cluster', fake_cluster,
+ '--host', fake_host,
+ '--binary', fake_binary,
+ '--down',
+ '--enabled',
+ '--resource-id', fake_resource_id,
+ '--resource-type', fake_resource_type,
+ '--service-id', str(fake_service_id),
+ ]
+ verifylist = [
+ ('cluster', fake_cluster),
+ ('host', fake_host),
+ ('binary', fake_binary),
+ ('is_up', False),
+ ('disabled', False),
+ ('resource_id', fake_resource_id),
+ ('resource_type', fake_resource_type),
+ ('service_id', fake_service_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ expected_columns = ('ID', 'Cluster Name', 'Host', 'Binary', 'Status')
+ cleaning_data = tuple(
+ (
+ obj.id,
+ obj.cluster_name,
+ obj.host,
+ obj.binary,
+ 'Cleaning'
+ ) for obj in self.cleaning
+ )
+ unavailable_data = tuple(
+ (
+ obj.id,
+ obj.cluster_name,
+ obj.host,
+ obj.binary,
+ 'Unavailable'
+ ) for obj in self.unavailable
+ )
+ expected_data = cleaning_data + unavailable_data
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, tuple(data))
+
+ # checking if proper call was made to cleanup resources
+ self.worker_mock.clean.assert_called_once_with(
+ cluster_name=fake_cluster,
+ host=fake_host,
+ binary=fake_binary,
+ is_up=False,
+ disabled=False,
+ resource_id=fake_resource_id,
+ resource_type=fake_resource_type,
+ service_id=fake_service_id)
diff --git a/openstackclient/tests/unit/volume/v3/test_block_storage_log_level.py b/openstackclient/tests/unit/volume/v3/test_block_storage_log_level.py
new file mode 100644
index 00000000..35ea6274
--- /dev/null
+++ b/openstackclient/tests/unit/volume/v3/test_block_storage_log_level.py
@@ -0,0 +1,233 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from cinderclient import api_versions
+import ddt
+from osc_lib import exceptions
+
+from openstackclient.tests.unit import utils as tests_utils
+from openstackclient.tests.unit.volume.v3 import fakes as volume_fakes
+from openstackclient.volume.v3 import block_storage_log_level as service
+
+
+class TestService(volume_fakes.TestVolume):
+
+ def setUp(self):
+ super().setUp()
+
+ # Get a shortcut to the ServiceManager Mock
+ self.service_mock = self.app.client_manager.volume.services
+ self.service_mock.reset_mock()
+
+
+class TestBlockStorageLogLevelList(TestService):
+
+ service_log = volume_fakes.create_service_log_level_entry()
+
+ def setUp(self):
+ super().setUp()
+
+ self.service_mock.get_log_levels.return_value = [self.service_log]
+
+ # Get the command object to test
+ self.cmd = service.BlockStorageLogLevelList(self.app, None)
+
+ def test_block_storage_log_level_list(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.32')
+ arglist = [
+ '--host', self.service_log.host,
+ '--service', self.service_log.binary,
+ '--log-prefix', self.service_log.prefix,
+ ]
+ verifylist = [
+ ('host', self.service_log.host),
+ ('service', self.service_log.binary),
+ ('log_prefix', self.service_log.prefix),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ expected_columns = [
+ 'Binary',
+ 'Host',
+ 'Prefix',
+ 'Level',
+ ]
+
+ # confirming if all expected columns are present in the result.
+ self.assertEqual(expected_columns, columns)
+
+ datalist = ((
+ self.service_log.binary,
+ self.service_log.host,
+ self.service_log.prefix,
+ self.service_log.level,
+ ), )
+
+ # confirming if all expected values are present in the result.
+ self.assertEqual(datalist, tuple(data))
+
+ # checking if proper call was made to get log level of services
+ self.service_mock.get_log_levels.assert_called_with(
+ server=self.service_log.host,
+ binary=self.service_log.binary,
+ prefix=self.service_log.prefix,
+ )
+
+ def test_block_storage_log_level_list_pre_332(self):
+ arglist = [
+ '--host', self.service_log.host,
+ '--service', 'cinder-api',
+ '--log-prefix', 'cinder_test.api.common',
+ ]
+ verifylist = [
+ ('host', self.service_log.host),
+ ('service', 'cinder-api'),
+ ('log_prefix', 'cinder_test.api.common'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.32 or greater is required', str(exc))
+
+ def test_block_storage_log_level_list_invalid_service_name(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.32')
+ arglist = [
+ '--host', self.service_log.host,
+ '--service', 'nova-api',
+ '--log-prefix', 'cinder_test.api.common',
+ ]
+ verifylist = [
+ ('host', self.service_log.host),
+ ('service', 'nova-api'),
+ ('log_prefix', 'cinder_test.api.common'),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+
+@ddt.ddt
+class TestBlockStorageLogLevelSet(TestService):
+
+ service_log = volume_fakes.create_service_log_level_entry()
+
+ def setUp(self):
+ super().setUp()
+
+ # Get the command object to test
+ self.cmd = service.BlockStorageLogLevelSet(self.app, None)
+
+ def test_block_storage_log_level_set(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.32')
+ arglist = [
+ 'ERROR',
+ '--host', self.service_log.host,
+ '--service', self.service_log.binary,
+ '--log-prefix', self.service_log.prefix,
+ ]
+ verifylist = [
+ ('level', 'ERROR'),
+ ('host', self.service_log.host),
+ ('service', self.service_log.binary),
+ ('log_prefix', self.service_log.prefix),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ # checking if proper call was made to set log level of services
+ self.service_mock.set_log_levels.assert_called_with(
+ level='ERROR',
+ server=self.service_log.host,
+ binary=self.service_log.binary,
+ prefix=self.service_log.prefix,
+ )
+
+ def test_block_storage_log_level_set_pre_332(self):
+ arglist = [
+ 'ERROR',
+ '--host', self.service_log.host,
+ '--service', 'cinder-api',
+ '--log-prefix', 'cinder_test.api.common',
+ ]
+ verifylist = [
+ ('level', 'ERROR'),
+ ('host', self.service_log.host),
+ ('service', 'cinder-api'),
+ ('log_prefix', 'cinder_test.api.common'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.32 or greater is required', str(exc))
+
+ def test_block_storage_log_level_set_invalid_service_name(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.32')
+ arglist = [
+ 'ERROR',
+ '--host', self.service_log.host,
+ '--service', 'nova-api',
+ '--log-prefix', 'cinder.api.common',
+ ]
+ verifylist = [
+ ('level', 'ERROR'),
+ ('host', self.service_log.host),
+ ('service', 'nova-api'),
+ ('log_prefix', 'cinder.api.common'),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ @ddt.data('WARNING', 'info', 'Error', 'debuG', 'fake-log-level')
+ def test_block_storage_log_level_set_log_level(self, log_level):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.32')
+ arglist = [
+ log_level,
+ '--host', self.service_log.host,
+ '--service', 'cinder-api',
+ '--log-prefix', 'cinder.api.common',
+ ]
+ verifylist = [
+ ('level', log_level.upper()),
+ ('host', self.service_log.host),
+ ('service', 'cinder-api'),
+ ('log_prefix', 'cinder.api.common'),
+ ]
+
+ if log_level == 'fake-log-level':
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+ else:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ # checking if proper call was made to set log level of services
+ self.service_mock.set_log_levels.assert_called_with(
+ level=log_level.upper(),
+ server=self.service_log.host,
+ binary=self.service_log.binary,
+ prefix=self.service_log.prefix)
diff --git a/openstackclient/tests/unit/volume/v3/test_block_storage_manage.py b/openstackclient/tests/unit/volume/v3/test_block_storage_manage.py
new file mode 100644
index 00000000..afd0fd35
--- /dev/null
+++ b/openstackclient/tests/unit/volume/v3/test_block_storage_manage.py
@@ -0,0 +1,411 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from cinderclient import api_versions
+from osc_lib import exceptions
+
+from openstackclient.tests.unit import utils as tests_utils
+from openstackclient.tests.unit.volume.v2 import fakes as v2_volume_fakes
+from openstackclient.tests.unit.volume.v3 import fakes as volume_fakes
+from openstackclient.volume.v3 import block_storage_manage
+
+
+class TestBlockStorageManage(v2_volume_fakes.TestVolume):
+
+ def setUp(self):
+ super().setUp()
+
+ self.volumes_mock = self.app.client_manager.volume.volumes
+ self.volumes_mock.reset_mock()
+ self.snapshots_mock = self.app.client_manager.volume.volume_snapshots
+ self.snapshots_mock.reset_mock()
+
+
+class TestBlockStorageVolumeManage(TestBlockStorageManage):
+
+ volume_manage_list = volume_fakes.create_volume_manage_list_records()
+
+ def setUp(self):
+ super().setUp()
+
+ self.volumes_mock.list_manageable.return_value = (
+ self.volume_manage_list)
+
+ # Get the command object to test
+ self.cmd = block_storage_manage.BlockStorageManageVolumes(
+ self.app, None)
+
+ def test_block_storage_volume_manage_list(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.8')
+ host = 'fake_host'
+ arglist = [
+ host,
+ ]
+ verifylist = [
+ ('host', host),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ expected_columns = [
+ 'reference',
+ 'size',
+ 'safe_to_manage',
+ 'reason_not_safe',
+ 'cinder_id',
+ 'extra_info',
+ ]
+
+ # confirming if all expected columns are present in the result.
+ self.assertEqual(expected_columns, columns)
+
+ datalist = []
+ for volume_record in self.volume_manage_list:
+ manage_details = (
+ volume_record.reference,
+ volume_record.size,
+ volume_record.safe_to_manage,
+ volume_record.reason_not_safe,
+ volume_record.cinder_id,
+ volume_record.extra_info,
+ )
+ datalist.append(manage_details)
+ datalist = tuple(datalist)
+
+ # confirming if all expected values are present in the result.
+ self.assertEqual(datalist, tuple(data))
+
+ # checking if proper call was made to get volume manageable list
+ self.volumes_mock.list_manageable.assert_called_with(
+ host=parsed_args.host,
+ detailed=parsed_args.detailed,
+ marker=parsed_args.marker,
+ limit=parsed_args.limit,
+ offset=parsed_args.offset,
+ sort=parsed_args.sort,
+ cluster=parsed_args.cluster,
+ )
+
+ def test_block_storage_volume_manage_pre_38(self):
+ host = 'fake_host'
+ arglist = [
+ host,
+ ]
+ verifylist = [
+ ('host', host),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.8 or greater is required', str(exc))
+
+ def test_block_storage_volume_manage_pre_317(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.16')
+ cluster = 'fake_cluster'
+ arglist = [
+ '--cluster', cluster,
+ ]
+ verifylist = [
+ ('cluster', cluster),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.17 or greater is required', str(exc))
+ self.assertIn('--cluster', str(exc))
+
+ def test_block_storage_volume_manage_host_and_cluster(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.17')
+ host = 'fake_host'
+ cluster = 'fake_cluster'
+ arglist = [
+ host,
+ '--cluster', cluster,
+ ]
+ verifylist = [
+ ('host', host),
+ ('cluster', cluster),
+ ]
+ exc = self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd,
+ arglist, verifylist)
+ self.assertIn(
+ 'argument --cluster: not allowed with argument <host>', str(exc))
+
+ def test_block_storage_volume_manage_list_all_args(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.8')
+ host = 'fake_host'
+ detailed = True
+ marker = 'fake_marker'
+ limit = '5'
+ offset = '3'
+ sort = 'size:asc'
+ arglist = [
+ host,
+ '--detailed', str(detailed),
+ '--marker', marker,
+ '--limit', limit,
+ '--offset', offset,
+ '--sort', sort,
+ ]
+ verifylist = [
+ ('host', host),
+ ('detailed', str(detailed)),
+ ('marker', marker),
+ ('limit', limit),
+ ('offset', offset),
+ ('sort', sort),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ expected_columns = [
+ 'reference',
+ 'size',
+ 'safe_to_manage',
+ 'reason_not_safe',
+ 'cinder_id',
+ 'extra_info',
+ ]
+
+ # confirming if all expected columns are present in the result.
+ self.assertEqual(expected_columns, columns)
+
+ datalist = []
+ for volume_record in self.volume_manage_list:
+ manage_details = (
+ volume_record.reference,
+ volume_record.size,
+ volume_record.safe_to_manage,
+ volume_record.reason_not_safe,
+ volume_record.cinder_id,
+ volume_record.extra_info,
+ )
+ datalist.append(manage_details)
+ datalist = tuple(datalist)
+
+ # confirming if all expected values are present in the result.
+ self.assertEqual(datalist, tuple(data))
+
+ # checking if proper call was made to get volume manageable list
+ self.volumes_mock.list_manageable.assert_called_with(
+ host=host,
+ detailed=detailed,
+ marker=marker,
+ limit=limit,
+ offset=offset,
+ sort=sort,
+ cluster=parsed_args.cluster,
+ )
+
+
+class TestBlockStorageSnapshotManage(TestBlockStorageManage):
+
+ snapshot_manage_list = volume_fakes.create_snapshot_manage_list_records()
+
+ def setUp(self):
+ super().setUp()
+
+ self.snapshots_mock.list_manageable.return_value = (
+ self.snapshot_manage_list)
+
+ # Get the command object to test
+ self.cmd = block_storage_manage.BlockStorageManageSnapshots(
+ self.app, None)
+
+ def test_block_storage_snapshot_manage_list(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.8')
+ host = 'fake_host'
+ arglist = [
+ host,
+ ]
+ verifylist = [
+ ('host', host),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ expected_columns = [
+ 'reference',
+ 'size',
+ 'safe_to_manage',
+ 'source_reference',
+ 'reason_not_safe',
+ 'cinder_id',
+ 'extra_info',
+ ]
+
+ # confirming if all expected columns are present in the result.
+ self.assertEqual(expected_columns, columns)
+
+ datalist = []
+ for snapshot_record in self.snapshot_manage_list:
+ manage_details = (
+ snapshot_record.reference,
+ snapshot_record.size,
+ snapshot_record.safe_to_manage,
+ snapshot_record.source_reference,
+ snapshot_record.reason_not_safe,
+ snapshot_record.cinder_id,
+ snapshot_record.extra_info,
+ )
+ datalist.append(manage_details)
+ datalist = tuple(datalist)
+
+ # confirming if all expected values are present in the result.
+ self.assertEqual(datalist, tuple(data))
+
+ # checking if proper call was made to get snapshot manageable list
+ self.snapshots_mock.list_manageable.assert_called_with(
+ host=parsed_args.host,
+ detailed=parsed_args.detailed,
+ marker=parsed_args.marker,
+ limit=parsed_args.limit,
+ offset=parsed_args.offset,
+ sort=parsed_args.sort,
+ cluster=parsed_args.cluster,
+ )
+
+ def test_block_storage_volume_manage_pre_38(self):
+ host = 'fake_host'
+ arglist = [
+ host,
+ ]
+ verifylist = [
+ ('host', host),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.8 or greater is required', str(exc))
+
+ def test_block_storage_volume_manage_pre_317(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.16')
+ cluster = 'fake_cluster'
+ arglist = [
+ '--cluster', cluster,
+ ]
+ verifylist = [
+ ('cluster', cluster),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.17 or greater is required', str(exc))
+ self.assertIn('--cluster', str(exc))
+
+ def test_block_storage_volume_manage_host_and_cluster(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.17')
+ host = 'fake_host'
+ cluster = 'fake_cluster'
+ arglist = [
+ host,
+ '--cluster', cluster,
+ ]
+ verifylist = [
+ ('host', host),
+ ('cluster', cluster),
+ ]
+ exc = self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd,
+ arglist, verifylist)
+ self.assertIn(
+ 'argument --cluster: not allowed with argument <host>', str(exc))
+
+ def test_block_storage_volume_manage_list_all_args(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.8')
+ host = 'fake_host'
+ detailed = True
+ marker = 'fake_marker'
+ limit = '5'
+ offset = '3'
+ sort = 'size:asc'
+ arglist = [
+ host,
+ '--detailed', str(detailed),
+ '--marker', marker,
+ '--limit', limit,
+ '--offset', offset,
+ '--sort', sort,
+ ]
+ verifylist = [
+ ('host', host),
+ ('detailed', str(detailed)),
+ ('marker', marker),
+ ('limit', limit),
+ ('offset', offset),
+ ('sort', sort),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ expected_columns = [
+ 'reference',
+ 'size',
+ 'safe_to_manage',
+ 'source_reference',
+ 'reason_not_safe',
+ 'cinder_id',
+ 'extra_info',
+ ]
+
+ # confirming if all expected columns are present in the result.
+ self.assertEqual(expected_columns, columns)
+
+ datalist = []
+ for snapshot_record in self.snapshot_manage_list:
+ manage_details = (
+ snapshot_record.reference,
+ snapshot_record.size,
+ snapshot_record.safe_to_manage,
+ snapshot_record.source_reference,
+ snapshot_record.reason_not_safe,
+ snapshot_record.cinder_id,
+ snapshot_record.extra_info,
+ )
+ datalist.append(manage_details)
+ datalist = tuple(datalist)
+
+ # confirming if all expected values are present in the result.
+ self.assertEqual(datalist, tuple(data))
+
+ # checking if proper call was made to get snapshot manageable list
+ self.snapshots_mock.list_manageable.assert_called_with(
+ host=host,
+ detailed=detailed,
+ marker=marker,
+ limit=limit,
+ offset=offset,
+ sort=sort,
+ cluster=parsed_args.cluster,
+ )
diff --git a/openstackclient/tests/unit/volume/v3/test_volume.py b/openstackclient/tests/unit/volume/v3/test_volume.py
new file mode 100644
index 00000000..ed72bfa1
--- /dev/null
+++ b/openstackclient/tests/unit/volume/v3/test_volume.py
@@ -0,0 +1,179 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import copy
+from unittest import mock
+
+from cinderclient import api_versions
+from osc_lib.cli import format_columns
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.tests.unit.volume.v2 import fakes as volume_fakes
+from openstackclient.volume.v3 import volume
+
+
+class TestVolumeSummary(volume_fakes.TestVolume):
+
+ columns = [
+ 'Total Count',
+ 'Total Size',
+ ]
+
+ def setUp(self):
+ super().setUp()
+
+ self.volumes_mock = self.app.client_manager.volume.volumes
+ self.volumes_mock.reset_mock()
+ self.mock_vol_1 = volume_fakes.create_one_volume()
+ self.mock_vol_2 = volume_fakes.create_one_volume()
+ self.return_dict = {
+ 'volume-summary': {
+ 'total_count': 2,
+ 'total_size': self.mock_vol_1.size + self.mock_vol_2.size}}
+ self.volumes_mock.summary.return_value = self.return_dict
+
+ # Get the command object to test
+ self.cmd = volume.VolumeSummary(self.app, None)
+
+ def test_volume_summary(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.12')
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volumes_mock.summary.assert_called_once_with(
+ all_tenants=True,
+ )
+
+ self.assertEqual(self.columns, columns)
+
+ datalist = (
+ 2,
+ self.mock_vol_1.size + self.mock_vol_2.size)
+ self.assertCountEqual(datalist, tuple(data))
+
+ def test_volume_summary_pre_312(self):
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.12 or greater is required',
+ str(exc))
+
+ def test_volume_summary_with_metadata(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.36')
+
+ combine_meta = {**self.mock_vol_1.metadata, **self.mock_vol_2.metadata}
+ meta_dict = copy.deepcopy(self.return_dict)
+ meta_dict['volume-summary']['metadata'] = combine_meta
+ self.volumes_mock.summary.return_value = meta_dict
+
+ new_cols = copy.deepcopy(self.columns)
+ new_cols.extend(['Metadata'])
+
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volumes_mock.summary.assert_called_once_with(
+ all_tenants=True,
+ )
+
+ self.assertEqual(new_cols, columns)
+
+ datalist = (
+ 2,
+ self.mock_vol_1.size + self.mock_vol_2.size,
+ format_columns.DictColumn(combine_meta))
+ self.assertCountEqual(datalist, tuple(data))
+
+
+class TestVolumeRevertToSnapshot(volume_fakes.TestVolume):
+
+ def setUp(self):
+ super().setUp()
+
+ self.volumes_mock = self.app.client_manager.volume.volumes
+ self.volumes_mock.reset_mock()
+ self.snapshots_mock = self.app.client_manager.volume.volume_snapshots
+ self.snapshots_mock.reset_mock()
+ self.mock_volume = volume_fakes.create_one_volume()
+ self.mock_snapshot = volume_fakes.create_one_snapshot(
+ attrs={'volume_id': self.volumes_mock.id})
+
+ # Get the command object to test
+ self.cmd = volume.VolumeRevertToSnapshot(self.app, None)
+
+ def test_volume_revert_to_snapshot_pre_340(self):
+ arglist = [
+ self.mock_snapshot.id,
+ ]
+ verifylist = [
+ ('snapshot', self.mock_snapshot.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.40 or greater is required',
+ str(exc))
+
+ def test_volume_revert_to_snapshot(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.40')
+ arglist = [
+ self.mock_snapshot.id,
+ ]
+ verifylist = [
+ ('snapshot', self.mock_snapshot.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.mock_snapshot, self.mock_volume]
+ with mock.patch.object(utils, 'find_resource',
+ side_effect=find_mock_result) as find_mock:
+ self.cmd.take_action(parsed_args)
+
+ self.volumes_mock.revert_to_snapshot.assert_called_once_with(
+ volume=self.mock_volume,
+ snapshot=self.mock_snapshot,
+ )
+ self.assertEqual(2, find_mock.call_count)
diff --git a/openstackclient/tests/unit/volume/v3/test_volume_group.py b/openstackclient/tests/unit/volume/v3/test_volume_group.py
index 96079a08..78717de8 100644
--- a/openstackclient/tests/unit/volume/v3/test_volume_group.py
+++ b/openstackclient/tests/unit/volume/v3/test_volume_group.py
@@ -10,9 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
+from unittest import mock
+
from cinderclient import api_versions
from osc_lib import exceptions
+from openstackclient.tests.unit import utils as tests_utils
from openstackclient.tests.unit.volume.v3 import fakes as volume_fakes
from openstackclient.volume.v3 import volume_group
@@ -32,6 +35,10 @@ class TestVolumeGroup(volume_fakes.TestVolume):
self.volume_types_mock = self.app.client_manager.volume.volume_types
self.volume_types_mock.reset_mock()
+ self.volume_group_snapshots_mock = \
+ self.app.client_manager.volume.group_snapshots
+ self.volume_group_snapshots_mock.reset_mock()
+
class TestVolumeGroupCreate(TestVolumeGroup):
@@ -43,6 +50,8 @@ class TestVolumeGroupCreate(TestVolumeGroup):
'volume_types': [fake_volume_type.id],
},
)
+ fake_volume_group_snapshot = \
+ volume_fakes.create_one_volume_group_snapshot()
columns = (
'ID',
@@ -79,6 +88,10 @@ class TestVolumeGroupCreate(TestVolumeGroup):
self.fake_volume_group_type
self.volume_groups_mock.create.return_value = self.fake_volume_group
self.volume_groups_mock.get.return_value = self.fake_volume_group
+ self.volume_groups_mock.create_from_src.return_value = \
+ self.fake_volume_group
+ self.volume_group_snapshots_mock.get.return_value = \
+ self.fake_volume_group_snapshot
self.cmd = volume_group.CreateVolumeGroup(self.app, None)
@@ -87,8 +100,8 @@ class TestVolumeGroupCreate(TestVolumeGroup):
api_versions.APIVersion('3.13')
arglist = [
- self.fake_volume_group_type.id,
- self.fake_volume_type.id,
+ '--volume-group-type', self.fake_volume_group_type.id,
+ '--volume-type', self.fake_volume_type.id,
]
verifylist = [
('volume_group_type', self.fake_volume_group_type.id),
@@ -115,13 +128,75 @@ class TestVolumeGroupCreate(TestVolumeGroup):
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
- def test_volume_group_create_with_options(self):
+ def test_volume_group_create__legacy(self):
self.app.client_manager.volume.api_version = \
api_versions.APIVersion('3.13')
arglist = [
self.fake_volume_group_type.id,
self.fake_volume_type.id,
+ ]
+ verifylist = [
+ ('volume_group_type_legacy', self.fake_volume_group_type.id),
+ ('volume_types_legacy', [self.fake_volume_type.id]),
+ ('name', None),
+ ('description', None),
+ ('availability_zone', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ with mock.patch.object(self.cmd.log, 'warning') as mock_warning:
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volume_group_types_mock.get.assert_called_once_with(
+ self.fake_volume_group_type.id)
+ self.volume_types_mock.get.assert_called_once_with(
+ self.fake_volume_type.id)
+ self.volume_groups_mock.create.assert_called_once_with(
+ self.fake_volume_group_type.id,
+ self.fake_volume_type.id,
+ None,
+ None,
+ availability_zone=None,
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, data)
+ mock_warning.assert_called_once()
+ self.assertIn(
+ 'Passing volume group type and volume types as positional ',
+ str(mock_warning.call_args[0][0]),
+ )
+
+ def test_volume_group_create_no_volume_type(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.13')
+
+ arglist = [
+ '--volume-group-type', self.fake_volume_group_type.id,
+ ]
+ verifylist = [
+ ('volume_group_type', self.fake_volume_group_type.id),
+ ('name', None),
+ ('description', None),
+ ('availability_zone', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--volume-types is a required argument when creating ',
+ str(exc))
+
+ def test_volume_group_create_with_options(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.13')
+
+ arglist = [
+ '--volume-group-type', self.fake_volume_group_type.id,
+ '--volume-type', self.fake_volume_type.id,
'--name', 'foo',
'--description', 'hello, world',
'--availability-zone', 'bar',
@@ -156,8 +231,8 @@ class TestVolumeGroupCreate(TestVolumeGroup):
api_versions.APIVersion('3.12')
arglist = [
- self.fake_volume_group_type.id,
- self.fake_volume_type.id,
+ '--volume-group-type', self.fake_volume_group_type.id,
+ '--volume-type', self.fake_volume_type.id,
]
verifylist = [
('volume_group_type', self.fake_volume_group_type.id),
@@ -176,6 +251,101 @@ class TestVolumeGroupCreate(TestVolumeGroup):
'--os-volume-api-version 3.13 or greater is required',
str(exc))
+ def test_volume_group_create_from_source_group(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.14')
+
+ arglist = [
+ '--source-group', self.fake_volume_group.id,
+ ]
+ verifylist = [
+ ('source_group', self.fake_volume_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volume_groups_mock.get.assert_has_calls(
+ [mock.call(self.fake_volume_group.id),
+ mock.call(self.fake_volume_group.id)])
+ self.volume_groups_mock.create_from_src.assert_called_once_with(
+ None,
+ self.fake_volume_group.id,
+ None,
+ None,
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, data)
+
+ def test_volume_group_create_from_group_snapshot(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.14')
+
+ arglist = [
+ '--group-snapshot', self.fake_volume_group_snapshot.id,
+ ]
+ verifylist = [
+ ('group_snapshot', self.fake_volume_group_snapshot.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.volume_group_snapshots_mock.get.assert_called_once_with(
+ self.fake_volume_group_snapshot.id)
+ self.volume_groups_mock.get.assert_called_once_with(
+ self.fake_volume_group.id)
+ self.volume_groups_mock.create_from_src.assert_called_once_with(
+ self.fake_volume_group_snapshot.id,
+ None,
+ None,
+ None,
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertCountEqual(self.data, data)
+
+ def test_volume_group_create_from_src_pre_v314(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.13')
+
+ arglist = [
+ '--source-group', self.fake_volume_group.id,
+ ]
+ verifylist = [
+ ('source_group', self.fake_volume_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ exc = self.assertRaises(
+ exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+ self.assertIn(
+ '--os-volume-api-version 3.14 or greater is required',
+ str(exc))
+
+ def test_volume_group_create_from_src_source_group_group_snapshot(self):
+ self.app.client_manager.volume.api_version = \
+ api_versions.APIVersion('3.14')
+
+ arglist = [
+ '--source-group', self.fake_volume_group.id,
+ '--group-snapshot', self.fake_volume_group_snapshot.id,
+ ]
+ verifylist = [
+ ('source_group', self.fake_volume_group.id),
+ ('group_snapshot', self.fake_volume_group_snapshot.id),
+ ]
+
+ exc = self.assertRaises(tests_utils.ParserException,
+ self.check_parser,
+ self.cmd,
+ arglist,
+ verifylist)
+ self.assertIn(
+ '--group-snapshot: not allowed with argument --source-group',
+ str(exc))
+
class TestVolumeGroupDelete(TestVolumeGroup):