diff options
Diffstat (limited to 'openstackclient/tests')
14 files changed, 1584 insertions, 1232 deletions
diff --git a/openstackclient/tests/functional/compute/v2/test_server.py b/openstackclient/tests/functional/compute/v2/test_server.py index f152de80..dd257e9a 100644 --- a/openstackclient/tests/functional/compute/v2/test_server.py +++ b/openstackclient/tests/functional/compute/v2/test_server.py @@ -371,7 +371,7 @@ class ServerTests(common.ComputeTestCase): server_name = uuid.uuid4().hex server = json.loads(self.openstack( # auto/none enable in nova micro version (v2.37+) - '--os-compute-api-version 2.latest ' + + '--os-compute-api-version 2.37 ' + 'server create -f json ' + '--flavor ' + self.flavor_name + ' ' + '--image ' + self.image_name + ' ' + @@ -394,7 +394,7 @@ class ServerTests(common.ComputeTestCase): try: self.openstack( # auto/none enable in nova micro version (v2.37+) - '--os-compute-api-version 2.latest ' + + '--os-compute-api-version 2.37 ' + 'server create -f json ' + '--flavor ' + self.flavor_name + ' ' + '--image ' + self.image_name + ' ' + diff --git a/openstackclient/tests/unit/api/test_compute_v2.py b/openstackclient/tests/unit/api/test_compute_v2.py new file mode 100644 index 00000000..f443e810 --- /dev/null +++ b/openstackclient/tests/unit/api/test_compute_v2.py @@ -0,0 +1,228 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Compute v2 API Library Tests""" + +from requests_mock.contrib import fixture + +from keystoneclient import session +from openstackclient.api import compute_v2 as compute +from openstackclient.tests.unit import utils +from osc_lib import exceptions as osc_lib_exceptions + + +FAKE_PROJECT = 'xyzpdq' +FAKE_URL = 'http://gopher.com/v2' + + +class TestComputeAPIv2(utils.TestCase): + + def setUp(self): + super(TestComputeAPIv2, self).setUp() + sess = session.Session() + self.api = compute.APIv2(session=sess, endpoint=FAKE_URL) + self.requests_mock = self.useFixture(fixture.Fixture()) + + +class TestSecurityGroup(TestComputeAPIv2): + + FAKE_SECURITY_GROUP_RESP = { + 'id': '1', + 'name': 'sg1', + 'description': 'test security group', + 'tenant_id': '0123456789', + 'rules': [] + } + FAKE_SECURITY_GROUP_RESP_2 = { + 'id': '2', + 'name': 'sg2', + 'description': 'another test security group', + 'tenant_id': '0123456789', + 'rules': [] + } + LIST_SECURITY_GROUP_RESP = [ + FAKE_SECURITY_GROUP_RESP_2, + FAKE_SECURITY_GROUP_RESP, + ] + + def test_security_group_create_default(self): + self.requests_mock.register_uri( + 'POST', + FAKE_URL + '/os-security-groups', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, + status_code=200, + ) + ret = self.api.security_group_create('sg1') + self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) + + def test_security_group_create_options(self): + self.requests_mock.register_uri( + 'POST', + FAKE_URL + '/os-security-groups', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, + status_code=200, + ) + ret = self.api.security_group_create( + name='sg1', + description='desc', + ) + self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) + + def test_security_group_delete_id(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/1', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, + status_code=200, + ) + self.requests_mock.register_uri( + 'DELETE', + FAKE_URL + '/os-security-groups/1', + status_code=202, + ) + ret = self.api.security_group_delete('1') + self.assertEqual(202, ret.status_code) + self.assertEqual("", ret.text) + + def test_security_group_delete_name(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/sg1', + status_code=404, + ) + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups', + json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, + status_code=200, + ) + self.requests_mock.register_uri( + 'DELETE', + FAKE_URL + '/os-security-groups/1', + status_code=202, + ) + ret = self.api.security_group_delete('sg1') + self.assertEqual(202, ret.status_code) + self.assertEqual("", ret.text) + + def test_security_group_delete_not_found(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/sg3', + status_code=404, + ) + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups', + json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, + status_code=200, + ) + self.assertRaises( + osc_lib_exceptions.NotFound, + self.api.security_group_delete, + 'sg3', + ) + + def test_security_group_find_id(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/1', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, + status_code=200, + ) + ret = self.api.security_group_find('1') + self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) + + def test_security_group_find_name(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/sg2', + status_code=404, + ) + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups', + json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, + status_code=200, + ) + ret = self.api.security_group_find('sg2') + self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret) + + def test_security_group_find_not_found(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/sg3', + status_code=404, + ) + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups', + json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, + status_code=200, + ) + self.assertRaises( + osc_lib_exceptions.NotFound, + self.api.security_group_find, + 'sg3', + ) + + def test_security_group_list_no_options(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups', + json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, + status_code=200, + ) + ret = self.api.security_group_list() + self.assertEqual(self.LIST_SECURITY_GROUP_RESP, ret) + + def test_security_group_set_options_id(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/1', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, + status_code=200, + ) + self.requests_mock.register_uri( + 'PUT', + FAKE_URL + '/os-security-groups/1', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP}, + status_code=200, + ) + ret = self.api.security_group_set( + security_group='1', + description='desc2') + self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret) + + def test_security_group_set_options_name(self): + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups/sg2', + status_code=404, + ) + self.requests_mock.register_uri( + 'GET', + FAKE_URL + '/os-security-groups', + json={'security_groups': self.LIST_SECURITY_GROUP_RESP}, + status_code=200, + ) + self.requests_mock.register_uri( + 'PUT', + FAKE_URL + '/os-security-groups/2', + json={'security_group': self.FAKE_SECURITY_GROUP_RESP_2}, + status_code=200, + ) + ret = self.api.security_group_set( + security_group='sg2', + description='desc2') + self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret) diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py index 4a194859..05cb5076 100644 --- a/openstackclient/tests/unit/compute/v2/fakes.py +++ b/openstackclient/tests/unit/compute/v2/fakes.py @@ -17,6 +17,7 @@ import copy import mock import uuid +from openstackclient.api import compute_v2 from openstackclient.tests.unit import fakes from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes from openstackclient.tests.unit.image.v2 import fakes as image_fakes @@ -180,9 +181,6 @@ class FakeComputev2Client(object): self.hypervisors_stats = mock.Mock() self.hypervisors_stats.resource_class = fakes.FakeResource(None, {}) - self.security_groups = mock.Mock() - self.security_groups.resource_class = fakes.FakeResource(None, {}) - self.security_group_rules = mock.Mock() self.security_group_rules.resource_class = fakes.FakeResource(None, {}) @@ -222,6 +220,11 @@ class TestComputev2(utils.TestCommand): token=fakes.AUTH_TOKEN, ) + self.app.client_manager.compute.api = compute_v2.APIv2( + session=self.app.client_manager.session, + endpoint=fakes.AUTH_URL, + ) + self.app.client_manager.identity = identity_fakes.FakeIdentityv2Client( endpoint=fakes.AUTH_URL, token=fakes.AUTH_TOKEN, @@ -485,11 +488,7 @@ class FakeSecurityGroup(object): # Overwrite default attributes. security_group_attrs.update(attrs) - - security_group = fakes.FakeResource( - info=copy.deepcopy(security_group_attrs), - loaded=True) - return security_group + return security_group_attrs @staticmethod def create_security_groups(attrs=None, count=2): diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py index fed847f1..71288a31 100644 --- a/openstackclient/tests/unit/compute/v2/test_server.py +++ b/openstackclient/tests/unit/compute/v2/test_server.py @@ -41,11 +41,6 @@ class TestServer(compute_fakes.TestComputev2): self.flavors_mock = self.app.client_manager.compute.flavors self.flavors_mock.reset_mock() - # Get a shortcut to the compute client SecurityGroupManager Mock - self.security_groups_mock = \ - self.app.client_manager.compute.security_groups - self.security_groups_mock.reset_mock() - # Get a shortcut to the image client ImageManager Mock self.images_mock = self.app.client_manager.image.images self.images_mock.reset_mock() @@ -232,6 +227,9 @@ class TestServerAddPort(TestServer): self.find_port.assert_not_called() +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_find' +) class TestServerAddSecurityGroup(TestServer): def setUp(self): @@ -239,11 +237,9 @@ class TestServerAddSecurityGroup(TestServer): self.security_group = \ compute_fakes.FakeSecurityGroup.create_one_security_group() - # This is the return value for utils.find_resource() for security group - self.security_groups_mock.get.return_value = self.security_group attrs = { - 'security_groups': [{'name': self.security_group.id}] + 'security_groups': [{'name': self.security_group['id']}] } methods = { 'add_security_group': None, @@ -259,23 +255,24 @@ class TestServerAddSecurityGroup(TestServer): # Get the command object to test self.cmd = server.AddServerSecurityGroup(self.app, None) - def test_server_add_security_group(self): + def test_server_add_security_group(self, sg_find_mock): + sg_find_mock.return_value = self.security_group arglist = [ self.server.id, - self.security_group.id + self.security_group['id'] ] verifylist = [ ('server', self.server.id), - ('group', self.security_group.id), + ('group', self.security_group['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.security_groups_mock.get.assert_called_with( - self.security_group.id, + sg_find_mock.assert_called_with( + self.security_group['id'], ) self.servers_mock.get.assert_called_with(self.server.id) self.server.add_security_group.assert_called_with( - self.security_group.id, + self.security_group['id'], ) self.assertIsNone(result) @@ -1716,6 +1713,9 @@ class TestServerRemovePort(TestServer): self.find_port.assert_not_called() +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_find' +) class TestServerRemoveSecurityGroup(TestServer): def setUp(self): @@ -1723,11 +1723,9 @@ class TestServerRemoveSecurityGroup(TestServer): self.security_group = \ compute_fakes.FakeSecurityGroup.create_one_security_group() - # This is the return value for utils.find_resource() for security group - self.security_groups_mock.get.return_value = self.security_group attrs = { - 'security_groups': [{'name': self.security_group.id}] + 'security_groups': [{'name': self.security_group['id']}] } methods = { 'remove_security_group': None, @@ -1743,23 +1741,24 @@ class TestServerRemoveSecurityGroup(TestServer): # Get the command object to test self.cmd = server.RemoveServerSecurityGroup(self.app, None) - def test_server_remove_security_group(self): + def test_server_remove_security_group(self, sg_find_mock): + sg_find_mock.return_value = self.security_group arglist = [ self.server.id, - self.security_group.id + self.security_group['id'] ] verifylist = [ ('server', self.server.id), - ('group', self.security_group.id), + ('group', self.security_group['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.security_groups_mock.get.assert_called_with( - self.security_group.id, + sg_find_mock.assert_called_with( + self.security_group['id'], ) self.servers_mock.get.assert_called_with(self.server.id) self.server.remove_security_group.assert_called_with( - self.security_group.id, + self.security_group['id'], ) self.assertIsNone(result) diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py index f28f9103..999694b7 100644 --- a/openstackclient/tests/unit/fakes.py +++ b/openstackclient/tests/unit/fakes.py @@ -228,6 +228,9 @@ class FakeResource(object): def get(self, item, default=None): return self._info.get(item, default) + def pop(self, key, default_value=None): + return self.info.pop(key, default_value) + class FakeResponse(requests.Response): diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py b/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py new file mode 100644 index 00000000..23cd82d2 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py @@ -0,0 +1,265 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import floating_ip as fip +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import utils as tests_utils + + +# Tests for Nova network + +class TestFloatingIPCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestFloatingIPCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +class TestCreateFloatingIPCompute(TestFloatingIPCompute): + + # The floating ip to be deleted. + floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() + + columns = ( + 'fixed_ip', + 'id', + 'instance_id', + 'ip', + 'pool', + ) + + data = ( + floating_ip.fixed_ip, + floating_ip.id, + floating_ip.instance_id, + floating_ip.ip, + floating_ip.pool, + ) + + def setUp(self): + super(TestCreateFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ips.create.return_value = self.floating_ip + + # Get the command object to test + self.cmd = fip.CreateFloatingIP(self.app, None) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + self.floating_ip.pool, + ] + verifylist = [ + ('network', self.floating_ip.pool), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.floating_ips.create.assert_called_once_with( + self.floating_ip.pool) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteFloatingIPCompute(TestFloatingIPCompute): + + # The floating ips to be deleted. + floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2) + + def setUp(self): + super(TestDeleteFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ips.delete.return_value = None + + # Return value of utils.find_resource() + self.compute.floating_ips.get = ( + compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) + + # Get the command object to test + self.cmd = fip.DeleteFloatingIP(self.app, None) + + def test_floating_ip_delete(self): + arglist = [ + self.floating_ips[0].id, + ] + verifylist = [ + ('floating_ip', [self.floating_ips[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.compute.floating_ips.delete.assert_called_once_with( + self.floating_ips[0].id + ) + self.assertIsNone(result) + + def test_multi_floating_ips_delete(self): + arglist = [] + verifylist = [] + + for f in self.floating_ips: + arglist.append(f.id) + verifylist = [ + ('floating_ip', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for f in self.floating_ips: + calls.append(call(f.id)) + self.compute.floating_ips.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_floating_ips_delete_with_exception(self): + arglist = [ + self.floating_ips[0].id, + 'unexist_floating_ip', + ] + verifylist = [ + ('floating_ip', + [self.floating_ips[0].id, 'unexist_floating_ip']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.floating_ips[0], exceptions.CommandError] + self.compute.floating_ips.get = ( + mock.Mock(side_effect=find_mock_result) + ) + self.compute.floating_ips.find.side_effect = exceptions.NotFound(None) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) + + self.compute.floating_ips.get.assert_any_call( + self.floating_ips[0].id) + self.compute.floating_ips.get.assert_any_call( + 'unexist_floating_ip') + self.compute.floating_ips.delete.assert_called_once_with( + self.floating_ips[0].id + ) + + +class TestListFloatingIPCompute(TestFloatingIPCompute): + + # The floating ips to be list up + floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3) + + columns = ( + 'ID', + 'Floating IP Address', + 'Fixed IP Address', + 'Server', + 'Pool', + ) + + data = [] + for ip in floating_ips: + data.append(( + ip.id, + ip.ip, + ip.fixed_ip, + ip.instance_id, + ip.pool, + )) + + def setUp(self): + super(TestListFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.floating_ips.list.return_value = self.floating_ips + + # Get the command object to test + self.cmd = fip.ListFloatingIP(self.app, None) + + def test_floating_ip_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.floating_ips.list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestShowFloatingIPCompute(TestFloatingIPCompute): + + # The floating ip to display. + floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() + + columns = ( + 'fixed_ip', + 'id', + 'instance_id', + 'ip', + 'pool', + ) + + data = ( + floating_ip.fixed_ip, + floating_ip.id, + floating_ip.instance_id, + floating_ip.ip, + floating_ip.pool, + ) + + def setUp(self): + super(TestShowFloatingIPCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Return value of utils.find_resource() + self.compute.floating_ips.get.return_value = self.floating_ip + + # Get the command object to test + self.cmd = fip.ShowFloatingIP(self.app, None) + + def test_floating_ip_show(self): + arglist = [ + self.floating_ip.id, + ] + verifylist = [ + ('floating_ip', self.floating_ip.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip.py b/openstackclient/tests/unit/network/v2/test_floating_ip_network.py index 69fb1419..4fbbc822 100644 --- a/openstackclient/tests/unit/network/v2/test_floating_ip.py +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_network.py @@ -17,14 +17,13 @@ from mock import call from osc_lib import exceptions from openstackclient.network.v2 import floating_ip as fip -from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3 from openstackclient.tests.unit.network.v2 import fakes as network_fakes from openstackclient.tests.unit import utils as tests_utils # Tests for Neutron network -# + class TestFloatingIPNetwork(network_fakes.TestNetworkV2): def setUp(self): @@ -612,7 +611,7 @@ class TestSetFloatingIP(TestFloatingIPNetwork): self.cmd = fip.SetFloatingIP(self.app, self.namespace) @mock.patch( - "openstackclient.tests.unit.network.v2.test_floating_ip." + + "openstackclient.tests.unit.network.v2.test_floating_ip_network." + "fip._find_floating_ip" ) def test_port_option(self, find_floating_ip_mock): @@ -645,7 +644,7 @@ class TestSetFloatingIP(TestFloatingIPNetwork): self.floating_ip, **attrs) @mock.patch( - "openstackclient.tests.unit.network.v2.test_floating_ip." + + "openstackclient.tests.unit.network.v2.test_floating_ip_network." + "fip._find_floating_ip" ) def test_fixed_ip_option(self, find_floating_ip_mock): @@ -702,7 +701,7 @@ class TestUnsetFloatingIP(TestFloatingIPNetwork): self.cmd = fip.UnsetFloatingIP(self.app, self.namespace) @mock.patch( - "openstackclient.tests.unit.network.v2.test_floating_ip." + + "openstackclient.tests.unit.network.v2.test_floating_ip_network." + "fip._find_floating_ip" ) def test_floating_ip_unset_port(self, find_floating_ip_mock): @@ -733,247 +732,3 @@ class TestUnsetFloatingIP(TestFloatingIPNetwork): self.floating_ip, **attrs) self.assertIsNone(result) - - -# Tests for Nova network -# -class TestFloatingIPCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestFloatingIPCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - -class TestCreateFloatingIPCompute(TestFloatingIPCompute): - - # The floating ip to be deleted. - floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'fixed_ip', - 'id', - 'instance_id', - 'ip', - 'pool', - ) - - data = ( - floating_ip.fixed_ip, - floating_ip.id, - floating_ip.instance_id, - floating_ip.ip, - floating_ip.pool, - ) - - def setUp(self): - super(TestCreateFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.create.return_value = self.floating_ip - - # Get the command object to test - self.cmd = fip.CreateFloatingIP(self.app, None) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - self.floating_ip.pool, - ] - verifylist = [ - ('network', self.floating_ip.pool), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.create.assert_called_once_with( - self.floating_ip.pool) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - -class TestDeleteFloatingIPCompute(TestFloatingIPCompute): - - # The floating ips to be deleted. - floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2) - - def setUp(self): - super(TestDeleteFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.delete.return_value = None - - # Return value of utils.find_resource() - self.compute.floating_ips.get = ( - compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips)) - - # Get the command object to test - self.cmd = fip.DeleteFloatingIP(self.app, None) - - def test_floating_ip_delete(self): - arglist = [ - self.floating_ips[0].id, - ] - verifylist = [ - ('floating_ip', [self.floating_ips[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.delete.assert_called_once_with( - self.floating_ips[0].id - ) - self.assertIsNone(result) - - def test_multi_floating_ips_delete(self): - arglist = [] - verifylist = [] - - for f in self.floating_ips: - arglist.append(f.id) - verifylist = [ - ('floating_ip', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for f in self.floating_ips: - calls.append(call(f.id)) - self.compute.floating_ips.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_floating_ips_delete_with_exception(self): - arglist = [ - self.floating_ips[0].id, - 'unexist_floating_ip', - ] - verifylist = [ - ('floating_ip', - [self.floating_ips[0].id, 'unexist_floating_ip']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self.floating_ips[0], exceptions.CommandError] - self.compute.floating_ips.get = ( - mock.Mock(side_effect=find_mock_result) - ) - self.compute.floating_ips.find.side_effect = exceptions.NotFound(None) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 floating_ips failed to delete.', str(e)) - - self.compute.floating_ips.get.assert_any_call( - self.floating_ips[0].id) - self.compute.floating_ips.get.assert_any_call( - 'unexist_floating_ip') - self.compute.floating_ips.delete.assert_called_once_with( - self.floating_ips[0].id - ) - - -class TestListFloatingIPCompute(TestFloatingIPCompute): - - # The floating ips to be list up - floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3) - - columns = ( - 'ID', - 'Floating IP Address', - 'Fixed IP Address', - 'Server', - 'Pool', - ) - - data = [] - for ip in floating_ips: - data.append(( - ip.id, - ip.ip, - ip.fixed_ip, - ip.instance_id, - ip.pool, - )) - - def setUp(self): - super(TestListFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.floating_ips.list.return_value = self.floating_ips - - # Get the command object to test - self.cmd = fip.ListFloatingIP(self.app, None) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.floating_ips.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestShowFloatingIPCompute(TestFloatingIPCompute): - - # The floating ip to display. - floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip() - - columns = ( - 'fixed_ip', - 'id', - 'instance_id', - 'ip', - 'pool', - ) - - data = ( - floating_ip.fixed_ip, - floating_ip.id, - floating_ip.instance_id, - floating_ip.ip, - floating_ip.pool, - ) - - def setUp(self): - super(TestShowFloatingIPCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Return value of utils.find_resource() - self.compute.floating_ips.get.return_value = self.floating_ip - - # Get the command object to test - self.cmd = fip.ShowFloatingIP(self.app, None) - - def test_floating_ip_show(self): - arglist = [ - self.floating_ip.id, - ] - verifylist = [ - ('floating_ip', self.floating_ip.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py index 11d01d36..8db21430 100644 --- a/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py @@ -11,44 +11,12 @@ # under the License. # -from osc_lib import exceptions - from openstackclient.network.v2 import floating_ip_pool from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes -from openstackclient.tests.unit.network.v2 import fakes as network_fakes - - -# Tests for Network API v2 -# -class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2): - - def setUp(self): - super(TestFloatingIPPoolNetwork, self).setUp() - - # Get a shortcut to the network client - self.network = self.app.client_manager.network - - -class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork): - - def setUp(self): - super(TestListFloatingIPPoolNetwork, self).setUp() - - # Get the command object to test - self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, - self.namespace) - - def test_floating_ip_list(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.assertRaises(exceptions.CommandError, self.cmd.take_action, - parsed_args) # Tests for Compute network -# + class TestFloatingIPPoolCompute(compute_fakes.TestComputev2): def setUp(self): diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py new file mode 100644 index 00000000..95ff5549 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py @@ -0,0 +1,46 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from osc_lib import exceptions + +from openstackclient.network.v2 import floating_ip_pool +from openstackclient.tests.unit.network.v2 import fakes as network_fakes + + +# Tests for Network API v2 + +class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2): + + def setUp(self): + super(TestFloatingIPPoolNetwork, self).setUp() + + # Get a shortcut to the network client + self.network = self.app.client_manager.network + + +class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork): + + def setUp(self): + super(TestListFloatingIPPoolNetwork, self).setUp() + + # Get the command object to test + self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, + self.namespace) + + def test_floating_ip_list(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) diff --git a/openstackclient/tests/unit/network/v2/test_router.py b/openstackclient/tests/unit/network/v2/test_router.py index a4f91997..02e0be94 100644 --- a/openstackclient/tests/unit/network/v2/test_router.py +++ b/openstackclient/tests/unit/network/v2/test_router.py @@ -211,6 +211,35 @@ class TestCreateRouter(TestRouter): def test_create_with_no_ha_option(self): self._test_create_with_ha_options('--no-ha', False) + def _test_create_with_distributed_options(self, option, distributed): + arglist = [ + option, + self.new_router.name, + ] + verifylist = [ + ('name', self.new_router.name), + ('enable', True), + ('distributed', distributed), + ('centralized', not distributed), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.network.create_router.assert_called_once_with(**{ + 'admin_state_up': True, + 'name': self.new_router.name, + 'distributed': distributed, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_with_distributed_option(self): + self._test_create_with_distributed_options('--distributed', True) + + def test_create_with_centralized_option(self): + self._test_create_with_distributed_options('--centralized', False) + def test_create_with_AZ_hints(self): arglist = [ self.new_router.name, diff --git a/openstackclient/tests/unit/network/v2/test_security_group_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_compute.py new file mode 100644 index 00000000..db9831bb --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_security_group_compute.py @@ -0,0 +1,405 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import security_group +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSecurityGroupCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestSecurityGroupCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_create' +) +class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + + # The security group to be shown. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group['description'], + _security_group['id'], + _security_group['name'], + _security_group['tenant_id'], + '', + ) + + def setUp(self): + super(TestCreateSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Get the command object to test + self.cmd = security_group.CreateSecurityGroup(self.app, None) + + def test_security_group_create_no_options(self, sg_mock): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_security_group_create_min_options(self, sg_mock): + sg_mock.return_value = self._security_group + arglist = [ + self._security_group['name'], + ] + verifylist = [ + ('name', self._security_group['name']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + sg_mock.assert_called_once_with( + self._security_group['name'], + self._security_group['name'], + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_security_group_create_all_options(self, sg_mock): + sg_mock.return_value = self._security_group + arglist = [ + '--description', self._security_group['description'], + self._security_group['name'], + ] + verifylist = [ + ('description', self._security_group['description']), + ('name', self._security_group['name']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + sg_mock.assert_called_once_with( + self._security_group['name'], + self._security_group['description'], + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_delete' +) +class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): + + # The security groups to be deleted. + _security_groups = \ + compute_fakes.FakeSecurityGroup.create_security_groups() + + def setUp(self): + super(TestDeleteSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.api.security_group_find = ( + compute_fakes.FakeSecurityGroup.get_security_groups( + self._security_groups) + ) + + # Get the command object to test + self.cmd = security_group.DeleteSecurityGroup(self.app, None) + + def test_security_group_delete(self, sg_mock): + sg_mock.return_value = mock.Mock(return_value=None) + arglist = [ + self._security_groups[0]['id'], + ] + verifylist = [ + ('group', [self._security_groups[0]['id']]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + sg_mock.assert_called_once_with( + self._security_groups[0]['id'], + ) + self.assertIsNone(result) + + def test_security_group_multi_delete(self, sg_mock): + sg_mock.return_value = mock.Mock(return_value=None) + arglist = [] + verifylist = [] + + for s in self._security_groups: + arglist.append(s['id']) + verifylist = [ + ('group', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_groups: + calls.append(call(s['id'])) + sg_mock.assert_has_calls(calls) + self.assertIsNone(result) + + def test_security_group_multi_delete_with_exception(self, sg_mock): + sg_mock.return_value = mock.Mock(return_value=None) + sg_mock.side_effect = ([ + mock.Mock(return_value=None), + exceptions.CommandError, + ]) + arglist = [ + self._security_groups[0]['id'], + 'unexist_security_group', + ] + verifylist = [ + ('group', + [self._security_groups[0]['id'], 'unexist_security_group']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 groups failed to delete.', str(e)) + + sg_mock.assert_any_call(self._security_groups[0]['id']) + sg_mock.assert_any_call('unexist_security_group') + + +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_list' +) +class TestListSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group to be listed. + _security_groups = \ + compute_fakes.FakeSecurityGroup.create_security_groups(count=3) + + columns = ( + 'ID', + 'Name', + 'Description', + ) + columns_all_projects = ( + 'ID', + 'Name', + 'Description', + 'Project', + ) + + data = [] + for grp in _security_groups: + data.append(( + grp['id'], + grp['name'], + grp['description'], + )) + data_all_projects = [] + for grp in _security_groups: + data_all_projects.append(( + grp['id'], + grp['name'], + grp['description'], + grp['tenant_id'], + )) + + def setUp(self): + super(TestListSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Get the command object to test + self.cmd = security_group.ListSecurityGroup(self.app, None) + + def test_security_group_list_no_options(self, sg_mock): + sg_mock.return_value = self._security_groups + arglist = [] + verifylist = [ + ('all_projects', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + kwargs = {'search_opts': {'all_tenants': False}} + sg_mock.assert_called_once_with(**kwargs) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_security_group_list_all_projects(self, sg_mock): + sg_mock.return_value = self._security_groups + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + kwargs = {'search_opts': {'all_tenants': True}} + sg_mock.assert_called_once_with(**kwargs) + self.assertEqual(self.columns_all_projects, columns) + self.assertEqual(self.data_all_projects, list(data)) + + +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_set' +) +class TestSetSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group to be set. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + def setUp(self): + super(TestSetSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.api.security_group_find = mock.Mock( + return_value=self._security_group) + + # Get the command object to test + self.cmd = security_group.SetSecurityGroup(self.app, None) + + def test_security_group_set_no_options(self, sg_mock): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_security_group_set_no_updates(self, sg_mock): + sg_mock.return_value = mock.Mock(return_value=None) + arglist = [ + self._security_group['name'], + ] + verifylist = [ + ('group', self._security_group['name']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + sg_mock.assert_called_once_with( + self._security_group, + self._security_group['name'], + self._security_group['description'], + ) + self.assertIsNone(result) + + def test_security_group_set_all_options(self, sg_mock): + sg_mock.return_value = mock.Mock(return_value=None) + new_name = 'new-' + self._security_group['name'] + new_description = 'new-' + self._security_group['description'] + arglist = [ + '--name', new_name, + '--description', new_description, + self._security_group['name'], + ] + verifylist = [ + ('description', new_description), + ('group', self._security_group['name']), + ('name', new_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + sg_mock.assert_called_once_with( + self._security_group, + new_name, + new_description + ) + self.assertIsNone(result) + + +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_find' +) +class TestShowSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group rule to be shown with the group. + _security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + # The security group to be shown. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group( + attrs={'rules': [_security_group_rule._info]} + ) + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group['description'], + _security_group['id'], + _security_group['name'], + _security_group['tenant_id'], + security_group._format_compute_security_group_rules( + [_security_group_rule._info]), + ) + + def setUp(self): + super(TestShowSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Get the command object to test + self.cmd = security_group.ShowSecurityGroup(self.app, None) + + def test_security_group_show_no_options(self, sg_mock): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_security_group_show_all_options(self, sg_mock): + sg_mock.return_value = self._security_group + arglist = [ + self._security_group['id'], + ] + verifylist = [ + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + sg_mock.assert_called_once_with(self._security_group['id']) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group.py b/openstackclient/tests/unit/network/v2/test_security_group_network.py index 66d357f9..35b7e366 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_network.py @@ -17,7 +17,6 @@ from mock import call from osc_lib import exceptions from openstackclient.network.v2 import security_group -from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.network.v2 import fakes as network_fakes from openstackclient.tests.unit import utils as tests_utils @@ -36,15 +35,6 @@ class TestSecurityGroupNetwork(network_fakes.TestNetworkV2): self.domains_mock = self.app.client_manager.identity.domains -class TestSecurityGroupCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): project = identity_fakes.FakeProject.create_one_project() @@ -129,90 +119,6 @@ class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): self.assertEqual(self.data, data) -class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - '', - ) - - def setUp(self): - super(TestCreateSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.create.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.CreateSecurityGroup(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_network_options(self): - arglist = [ - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.name, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_min_options(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.name) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--description', self._security_group.description, - self._security_group.name, - ] - verifylist = [ - ('description', self._security_group.description), - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.description) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): # The security groups to be deleted. @@ -297,94 +203,6 @@ class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): ) -class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): - - # The security groups to be deleted. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups() - - def setUp(self): - super(TestDeleteSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.delete = mock.Mock(return_value=None) - - self.compute.security_groups.get = ( - compute_fakes.FakeSecurityGroup.get_security_groups( - self._security_groups) - ) - - # Get the command object to test - self.cmd = security_group.DeleteSecurityGroup(self.app, None) - - def test_security_group_delete(self): - arglist = [ - self._security_groups[0].id, - ] - verifylist = [ - ('group', [self._security_groups[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id) - self.assertIsNone(result) - - def test_multi_security_groups_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s.id) - verifylist = [ - ('group', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_groups: - calls.append(call(s.id)) - self.compute.security_groups.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_groups_delete_with_exception(self): - arglist = [ - self._security_groups[0].id, - 'unexist_security_group', - ] - verifylist = [ - ('group', - [self._security_groups[0].id, 'unexist_security_group']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._security_groups[0], exceptions.CommandError] - self.compute.security_groups.get = ( - mock.Mock(side_effect=find_mock_result) - ) - self.compute.security_groups.find.side_effect = ( - exceptions.NotFound(None)) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - self.compute.security_groups.get.assert_any_call( - self._security_groups[0].id) - self.compute.security_groups.get.assert_any_call( - 'unexist_security_group') - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id - ) - - class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): # The security group to be listed. @@ -483,80 +301,6 @@ class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): self.assertEqual(self.data, list(data)) -class TestListSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be listed. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups(count=3) - - columns = ( - 'ID', - 'Name', - 'Description', - ) - columns_all_projects = ( - 'ID', - 'Name', - 'Description', - 'Project', - ) - - data = [] - for grp in _security_groups: - data.append(( - grp.id, - grp.name, - grp.description, - )) - data_all_projects = [] - for grp in _security_groups: - data_all_projects.append(( - grp.id, - grp.name, - grp.description, - grp.tenant_id, - )) - - def setUp(self): - super(TestListSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - self.compute.security_groups.list.return_value = self._security_groups - - # Get the command object to test - self.cmd = security_group.ListSecurityGroup(self.app, None) - - def test_security_group_list_no_options(self): - arglist = [] - verifylist = [ - ('all_projects', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': False}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_security_group_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': True}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns_all_projects, columns) - self.assertEqual(self.data_all_projects, list(data)) - - class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): # The security group to be set. @@ -623,72 +367,6 @@ class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): self.assertIsNone(result) -class TestSetSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be set. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def setUp(self): - super(TestSetSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.update = mock.Mock(return_value=None) - - self.compute.security_groups.get = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.SetSecurityGroup(self.app, None) - - def test_set_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_set_no_updates(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('group', self._security_group.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - self._security_group.name, - self._security_group.description - ) - self.assertIsNone(result) - - def test_set_all_options(self): - new_name = 'new-' + self._security_group.name - new_description = 'new-' + self._security_group.description - arglist = [ - '--name', new_name, - '--description', new_description, - self._security_group.name, - ] - verifylist = [ - ('description', new_description), - ('group', self._security_group.name), - ('name', new_name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - new_name, - new_description - ) - self.assertIsNone(result) - - class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): # The security group rule to be shown with the group. @@ -746,63 +424,3 @@ class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): self._security_group.id, ignore_missing=False) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) - - -class TestShowSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group rule to be shown with the group. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'rules': [_security_group_rule._info]} - ) - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - security_group._format_compute_security_group_rules( - [_security_group_rule._info]), - ) - - def setUp(self): - super(TestShowSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.ShowSecurityGroup(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py new file mode 100644 index 00000000..06a7a25d --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py @@ -0,0 +1,572 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network import utils as network_utils +from openstackclient.network.v2 import security_group_rule +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestSecurityGroupRuleCompute, self).setUp() + + # Get a shortcut to the network client + self.compute = self.app.client_manager.compute + + +class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + + # The security group rule to be created. + _security_group_rule = None + + # The security group that will contain the rule created. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + def _setup_security_group_rule(self, attrs=None): + self._security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( + attrs) + self.compute.security_group_rules.create.return_value = \ + self._security_group_rule + expected_columns, expected_data = \ + security_group_rule._format_security_group_rule_show( + self._security_group_rule._info) + return expected_columns, expected_data + + def setUp(self): + super(TestCreateSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.api.security_group_find = mock.Mock( + return_value=self._security_group, + ) + + # Get the command object to test + self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) + + def test_security_group_rule_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_security_group_rule_create_all_source_options(self): + arglist = [ + '--src-ip', '10.10.0.0/24', + '--src-group', self._security_group['id'], + self._security_group['id'], + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_security_group_rule_create_all_remote_options(self): + arglist = [ + '--remote-ip', '10.10.0.0/24', + '--remote-group', self._security_group['id'], + self._security_group['id'], + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_security_group_rule_create_bad_protocol(self): + arglist = [ + '--protocol', 'foo', + self._security_group['id'], + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_security_group_rule_create_all_protocol_options(self): + arglist = [ + '--protocol', 'tcp', + '--proto', 'tcp', + self._security_group['id'], + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_security_group_rule_create_network_options(self): + arglist = [ + '--ingress', + '--ethertype', 'IPv4', + '--icmp-type', '3', + '--icmp-code', '11', + '--project', self.project.name, + '--project-domain', self.domain.name, + self._security_group['id'], + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_security_group_rule_create_default_rule(self): + expected_columns, expected_data = self._setup_security_group_rule() + dst_port = str(self._security_group_rule.from_port) + ':' + \ + str(self._security_group_rule.to_port) + arglist = [ + '--dst-port', dst_port, + self._security_group['id'], + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # TODO(dtroyer): save this for the security group rule changes + # self.compute.api.security_group_rule_create.assert_called_once_with( + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group['id'], + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_security_group_rule_create_source_group(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'from_port': 22, + 'to_port': 22, + 'group': {'name': self._security_group['name']}, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.from_port), + '--src-group', self._security_group['name'], + self._security_group['id'], + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('src_group', self._security_group['name']), + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # TODO(dtroyer): save this for the security group rule changes + # self.compute.api.security_group_rule_create.assert_called_once_with( + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group['id'], + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + self._security_group['id'], + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_security_group_rule_create_remote_group(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'from_port': 22, + 'to_port': 22, + 'group': {'name': self._security_group['name']}, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.from_port), + '--remote-group', self._security_group['name'], + self._security_group['id'], + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('remote_group', self._security_group['name']), + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # TODO(dtroyer): save this for the security group rule changes + # self.compute.api.security_group_rule_create.assert_called_once_with( + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group['id'], + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + self._security_group['id'], + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_security_group_rule_create_source_ip(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--protocol', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group['id'], + ] + verifylist = [ + ('protocol', self._security_group_rule.ip_protocol), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # TODO(dtroyer): save this for the security group rule changes + # self.compute.api.security_group_rule_create.assert_called_once_with( + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group['id'], + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_security_group_rule_create_remote_ip(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--protocol', self._security_group_rule.ip_protocol, + '--remote-ip', self._security_group_rule.ip_range['cidr'], + self._security_group['id'], + ] + verifylist = [ + ('protocol', self._security_group_rule.ip_protocol), + ('remote_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # TODO(dtroyer): save this for the security group rule changes + # self.compute.api.security_group_rule_create.assert_called_once_with( + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group['id'], + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_security_group_rule_create_proto_option(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--proto', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group['id'], + ] + verifylist = [ + ('proto', self._security_group_rule.ip_protocol), + ('protocol', None), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # TODO(dtroyer): save this for the security group rule changes + # self.compute.api.security_group_rule_create.assert_called_once_with( + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group['id'], + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + +class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group rule to be deleted. + _security_group_rules = \ + compute_fakes.FakeSecurityGroupRule.create_security_group_rules( + count=2) + + def setUp(self): + super(TestDeleteSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Get the command object to test + self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) + + def test_security_group_rule_delete(self): + arglist = [ + self._security_group_rules[0].id, + ] + verifylist = [ + ('rule', [self._security_group_rules[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.delete.assert_called_once_with( + self._security_group_rules[0].id) + self.assertIsNone(result) + + def test_security_group_rule_multi_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_group_rules: + arglist.append(s.id) + verifylist = [ + ('rule', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_group_rules: + calls.append(call(s.id)) + self.compute.security_group_rules.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_security_group_rule_multi_delete_with_exception(self): + arglist = [ + self._security_group_rules[0].id, + 'unexist_rule', + ] + verifylist = [ + ('rule', + [self._security_group_rules[0].id, 'unexist_rule']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [None, exceptions.CommandError] + self.compute.security_group_rules.delete = ( + mock.Mock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 rules failed to delete.', str(e)) + + self.compute.security_group_rules.delete.assert_any_call( + self._security_group_rules[0].id) + self.compute.security_group_rules.delete.assert_any_call( + 'unexist_rule') + + +class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group to hold the rules. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + # The security group rule to be listed. + _security_group_rule_tcp = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'ip_protocol': 'tcp', + 'from_port': 80, + 'to_port': 80, + 'group': {'name': _security_group['name']}, + }) + _security_group_rule_icmp = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + 'group': {'name': _security_group['name']}, + }) + _security_group['rules'] = [ + _security_group_rule_tcp._info, + _security_group_rule_icmp._info, + ] + + expected_columns_with_group = ( + 'ID', + 'IP Protocol', + 'IP Range', + 'Port Range', + 'Remote Security Group', + ) + expected_columns_no_group = \ + expected_columns_with_group + ('Security Group',) + + expected_data_with_group = [] + expected_data_no_group = [] + for _security_group_rule in _security_group['rules']: + rule = network_utils.transform_compute_security_group_rule( + _security_group_rule + ) + expected_rule_with_group = ( + rule['id'], + rule['ip_protocol'], + rule['ip_range'], + rule['port_range'], + rule['remote_security_group'], + ) + expected_rule_no_group = expected_rule_with_group + \ + (_security_group_rule['parent_group_id'],) + expected_data_with_group.append(expected_rule_with_group) + expected_data_no_group.append(expected_rule_no_group) + + def setUp(self): + super(TestListSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.api.security_group_find = mock.Mock( + return_value=self._security_group, + ) + self.compute.api.security_group_list = mock.Mock( + return_value=[self._security_group], + ) + + # Get the command object to test + self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) + + def test_security_group_rule_list_default(self): + parsed_args = self.check_parser(self.cmd, [], []) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.api.security_group_list.assert_called_once_with( + search_opts={'all_tenants': False} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_security_group_rule_list_with_group(self): + arglist = [ + self._security_group['id'], + ] + verifylist = [ + ('group', self._security_group['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.api.security_group_find.assert_called_once_with( + self._security_group['id'] + ) + self.assertEqual(self.expected_columns_with_group, columns) + self.assertEqual(self.expected_data_with_group, list(data)) + + def test_security_group_rule_list_all_projects(self): + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.api.security_group_list.assert_called_once_with( + search_opts={'all_tenants': True} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_security_group_rule_list_with_ignored_options(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.api.security_group_list.assert_called_once_with( + search_opts={'all_tenants': False} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + +class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group rule to be shown. + _security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + columns, data = \ + security_group_rule._format_security_group_rule_show( + _security_group_rule._info) + + def setUp(self): + super(TestShowSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Build a security group fake customized for this test. + security_group_rules = [self._security_group_rule._info] + security_group = {'rules': security_group_rules} + self.compute.api.security_group_list = mock.Mock( + return_value=[security_group], + ) + + # Get the command object to test + self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) + + def test_security_group_rule_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_security_group_rule_show_all_options(self): + arglist = [ + self._security_group_rule.id, + ] + verifylist = [ + ('rule', self._security_group_rule.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.api.security_group_list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py index e3538d5f..5d9d03e9 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_rule.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py @@ -11,16 +11,12 @@ # under the License. # -import copy import mock from mock import call from osc_lib import exceptions -from openstackclient.network import utils as network_utils from openstackclient.network.v2 import security_group_rule -from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes -from openstackclient.tests.unit import fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.network.v2 import fakes as network_fakes from openstackclient.tests.unit import utils as tests_utils @@ -39,15 +35,6 @@ class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2): self.domains_mock = self.app.client_manager.identity.domains -class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupRuleCompute, self).setUp() - - # Get a shortcut to the network client - self.compute = self.app.client_manager.compute - - class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): project = identity_fakes.FakeProject.create_one_project() @@ -548,280 +535,6 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertEqual(self.expected_data, data) -class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group rule to be created. - _security_group_rule = None - - # The security group that will contain the rule created. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def _setup_security_group_rule(self, attrs=None): - self._security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( - attrs) - self.compute.security_group_rules.create.return_value = \ - self._security_group_rule - expected_columns, expected_data = \ - security_group_rule._format_security_group_rule_show( - self._security_group_rule._info) - return expected_columns, expected_data - - def setUp(self): - super(TestCreateSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_all_source_options(self): - arglist = [ - '--src-ip', '10.10.0.0/24', - '--src-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_remote_options(self): - arglist = [ - '--remote-ip', '10.10.0.0/24', - '--remote-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_bad_protocol(self): - arglist = [ - '--protocol', 'foo', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_protocol_options(self): - arglist = [ - '--protocol', 'tcp', - '--proto', 'tcp', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_network_options(self): - arglist = [ - '--ingress', - '--ethertype', 'IPv4', - '--icmp-type', '3', - '--icmp-code', '11', - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_default_rule(self): - expected_columns, expected_data = self._setup_security_group_rule() - dst_port = str(self._security_group_rule.from_port) + ':' + \ - str(self._security_group_rule.to_port) - arglist = [ - '--dst-port', dst_port, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_group(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'from_port': 22, - 'to_port': 22, - 'group': {'name': self._security_group.name}, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.from_port), - '--src-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('src_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_remote_group(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'from_port': 22, - 'to_port': 22, - 'group': {'name': self._security_group.name}, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.from_port), - '--remote-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('remote_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_ip(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_remote_ip(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--remote-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('remote_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_proto_option(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--proto', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('proto', self._security_group_rule.ip_protocol), - ('protocol', None), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): # The security group rules to be deleted. @@ -909,83 +622,6 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): ) -class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be deleted. - _security_group_rules = \ - compute_fakes.FakeSecurityGroupRule.create_security_group_rules( - count=2) - - def setUp(self): - super(TestDeleteSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Get the command object to test - self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) - - def test_security_group_rule_delete(self): - arglist = [ - self._security_group_rules[0].id, - ] - verifylist = [ - ('rule', [self._security_group_rules[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.delete.assert_called_once_with( - self._security_group_rules[0].id) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s.id) - verifylist = [ - ('rule', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_group_rules: - calls.append(call(s.id)) - self.compute.security_group_rules.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete_with_exception(self): - arglist = [ - self._security_group_rules[0].id, - 'unexist_rule', - ] - verifylist = [ - ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [None, exceptions.CommandError] - self.compute.security_group_rules.delete = ( - mock.Mock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 rules failed to delete.', str(e)) - - self.compute.security_group_rules.delete.assert_any_call( - self._security_group_rules[0].id) - self.compute.security_group_rules.delete.assert_any_call( - 'unexist_rule') - - class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): # The security group to hold the rules. @@ -1165,131 +801,6 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertEqual(self.expected_data_no_group, list(data)) -class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group to hold the rules. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - # The security group rule to be listed. - _security_group_rule_tcp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'tcp', - 'from_port': 80, - 'to_port': 80, - 'group': {'name': _security_group.name}, - }) - _security_group_rule_icmp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - 'group': {'name': _security_group.name}, - }) - _security_group.rules = [_security_group_rule_tcp._info, - _security_group_rule_icmp._info] - - expected_columns_with_group = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Remote Security Group', - ) - expected_columns_no_group = \ - expected_columns_with_group + ('Security Group',) - - expected_data_with_group = [] - expected_data_no_group = [] - for _security_group_rule in _security_group.rules: - rule = network_utils.transform_compute_security_group_rule( - _security_group_rule - ) - expected_rule_with_group = ( - rule['id'], - rule['ip_protocol'], - rule['ip_range'], - rule['port_range'], - rule['remote_security_group'], - ) - expected_rule_no_group = expected_rule_with_group + \ - (_security_group_rule['parent_group_id'],) - expected_data_with_group.append(expected_rule_with_group) - expected_data_no_group.append(expected_rule_no_group) - - def setUp(self): - super(TestListSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = \ - self._security_group - self.compute.security_groups.list.return_value = \ - [self._security_group] - - # Get the command object to test - self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) - - def test_list_default(self): - parsed_args = self.check_parser(self.cmd, [], []) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_group(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id - ) - self.assertEqual(self.expected_columns_with_group, columns) - self.assertEqual(self.expected_data_with_group, list(data)) - - def test_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': True} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_ignored_options(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): # The security group rule to be shown. @@ -1353,49 +864,3 @@ class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self._security_group_rule.id, ignore_missing=False) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) - - -class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be shown. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - columns, data = \ - security_group_rule._format_security_group_rule_show( - _security_group_rule._info) - - def setUp(self): - super(TestShowSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Build a security group fake customized for this test. - security_group_rules = [self._security_group_rule._info] - security_group = fakes.FakeResource( - info=copy.deepcopy({'rules': security_group_rules}), - loaded=True) - security_group.rules = security_group_rules - self.compute.security_groups.list.return_value = [security_group] - - # Get the command object to test - self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group_rule.id, - ] - verifylist = [ - ('rule', self._security_group_rule.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) |
