diff options
Diffstat (limited to 'openstackclient/tests')
| -rw-r--r-- | openstackclient/tests/compute/v2/fakes.py | 19 | ||||
| -rw-r--r-- | openstackclient/tests/image/v2/test_image.py | 91 | ||||
| -rw-r--r-- | openstackclient/tests/network/v2/fakes.py | 10 | ||||
| -rw-r--r-- | openstackclient/tests/network/v2/test_network.py | 169 | ||||
| -rw-r--r-- | openstackclient/tests/network/v2/test_router.py | 36 | ||||
| -rw-r--r-- | openstackclient/tests/network/v2/test_security_group_rule.py | 242 |
6 files changed, 486 insertions, 81 deletions
diff --git a/openstackclient/tests/compute/v2/fakes.py b/openstackclient/tests/compute/v2/fakes.py index b3f3fb49..70fc386f 100644 --- a/openstackclient/tests/compute/v2/fakes.py +++ b/openstackclient/tests/compute/v2/fakes.py @@ -859,6 +859,25 @@ class FakeNetwork(object): return networks + @staticmethod + def get_networks(networks=None, count=2): + """Get an iterable MagicMock object with a list of faked networks. + + If networks list is provided, then initialize the Mock object with the + list. Otherwise create one. + + :param List networks: + A list of FakeResource objects faking networks + :param int count: + The number of networks to fake + :return: + An iterable Mock object with side_effect set to a list of faked + networks + """ + if networks is None: + networks = FakeNetwork.create_networks(count=count) + return mock.Mock(side_effect=networks) + class FakeHost(object): """Fake one host.""" diff --git a/openstackclient/tests/image/v2/test_image.py b/openstackclient/tests/image/v2/test_image.py index 0248f30b..33f25331 100644 --- a/openstackclient/tests/image/v2/test_image.py +++ b/openstackclient/tests/image/v2/test_image.py @@ -20,6 +20,7 @@ import warlock from glanceclient.v2 import schemas from openstackclient.common import exceptions +from openstackclient.common import utils as common_utils from openstackclient.image.v2 import image from openstackclient.tests import fakes from openstackclient.tests.identity.v3 import fakes as identity_fakes @@ -341,28 +342,31 @@ class TestImageCreate(TestImage): class TestAddProjectToImage(TestImage): + _image = image_fakes.FakeImage.create_one_image() + columns = ( 'image_id', 'member_id', 'status', ) + datalist = ( - image_fakes.image_id, + _image.id, identity_fakes.project_id, - image_fakes.member_status, + image_fakes.member_status ) def setUp(self): super(TestAddProjectToImage, self).setUp() # This is the return value for utils.find_resource() - self.images_mock.get.return_value = fakes.FakeResource( - None, - copy.deepcopy(image_fakes.IMAGE), - loaded=True, - ) + self.images_mock.get.return_value = self._image + + # Update the image_id in the MEMBER dict + self.new_member = copy.deepcopy(image_fakes.MEMBER) + self.new_member['image_id'] = self._image.id self.image_members_mock.create.return_value = fakes.FakeModel( - copy.deepcopy(image_fakes.MEMBER), + self.new_member, ) self.project_mock.get.return_value = fakes.FakeResource( None, @@ -379,11 +383,11 @@ class TestAddProjectToImage(TestImage): def test_add_project_to_image_no_option(self): arglist = [ - image_fakes.image_id, + self._image.id, identity_fakes.project_id, ] verifylist = [ - ('image', image_fakes.image_id), + ('image', self._image.id), ('project', identity_fakes.project_id), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -393,20 +397,21 @@ class TestAddProjectToImage(TestImage): # data to be shown. columns, data = self.cmd.take_action(parsed_args) self.image_members_mock.create.assert_called_with( - image_fakes.image_id, + self._image.id, identity_fakes.project_id ) + self.assertEqual(self.columns, columns) self.assertEqual(self.datalist, data) def test_add_project_to_image_with_option(self): arglist = [ - image_fakes.image_id, + self._image.id, identity_fakes.project_id, '--project-domain', identity_fakes.domain_id, ] verifylist = [ - ('image', image_fakes.image_id), + ('image', self._image.id), ('project', identity_fakes.project_id), ('project_domain', identity_fakes.domain_id), ] @@ -417,7 +422,7 @@ class TestAddProjectToImage(TestImage): # data to be shown. columns, data = self.cmd.take_action(parsed_args) self.image_members_mock.create.assert_called_with( - image_fakes.image_id, + self._image.id, identity_fakes.project_id ) self.assertEqual(self.columns, columns) @@ -468,25 +473,26 @@ class TestImageDelete(TestImage): class TestImageList(TestImage): + _image = image_fakes.FakeImage.create_one_image() + columns = ( 'ID', 'Name', 'Status', ) + datalist = ( - ( - image_fakes.image_id, - image_fakes.image_name, - '', - ), - ) + _image.id, + _image.name, + '', + ), def setUp(self): super(TestImageList, self).setUp() self.api_mock = mock.Mock() self.api_mock.image_list.side_effect = [ - [copy.deepcopy(image_fakes.IMAGE)], [], + [image_fakes.FakeImage.get_image_info(self._image)], [], ] self.app.client_manager.image.api = self.api_mock @@ -611,23 +617,24 @@ class TestImageList(TestImage): self.assertEqual(collist, columns) datalist = (( - image_fakes.image_id, - image_fakes.image_name, + self._image.id, + self._image.name, '', '', '', '', - 'public', - False, - image_fakes.image_owner, - '', + self._image.visibility, + self._image.protected, + self._image.owner, + common_utils.format_list(self._image.tags), ), ) self.assertEqual(datalist, tuple(data)) @mock.patch('openstackclient.api.utils.simple_filter') def test_image_list_property_option(self, sf_mock): sf_mock.return_value = [ - copy.deepcopy(image_fakes.IMAGE), + copy.deepcopy( + image_fakes.FakeImage.get_image_info(self._image)), ] arglist = [ @@ -644,7 +651,7 @@ class TestImageList(TestImage): columns, data = self.cmd.take_action(parsed_args) self.api_mock.image_list.assert_called_with() sf_mock.assert_called_with( - [image_fakes.IMAGE], + [image_fakes.FakeImage.get_image_info(self._image)], attr='a', value='1', property_field='properties', @@ -656,7 +663,8 @@ class TestImageList(TestImage): @mock.patch('openstackclient.common.utils.sort_items') def test_image_list_sort_option(self, si_mock): si_mock.return_value = [ - copy.deepcopy(image_fakes.IMAGE) + copy.deepcopy( + image_fakes.FakeImage.get_image_info(self._image)) ] arglist = ['--sort', 'name:asc'] @@ -669,10 +677,9 @@ class TestImageList(TestImage): columns, data = self.cmd.take_action(parsed_args) self.api_mock.image_list.assert_called_with() si_mock.assert_called_with( - [image_fakes.IMAGE], + [image_fakes.FakeImage.get_image_info(self._image)], 'name:asc' ) - self.assertEqual(self.columns, columns) self.assertEqual(self.datalist, tuple(data)) @@ -720,12 +727,10 @@ class TestRemoveProjectImage(TestImage): def setUp(self): super(TestRemoveProjectImage, self).setUp() + self._image = image_fakes.FakeImage.create_one_image() # This is the return value for utils.find_resource() - self.images_mock.get.return_value = fakes.FakeResource( - None, - copy.deepcopy(image_fakes.IMAGE), - loaded=True, - ) + self.images_mock.get.return_value = self._image + self.project_mock.get.return_value = fakes.FakeResource( None, copy.deepcopy(identity_fakes.PROJECT), @@ -742,11 +747,11 @@ class TestRemoveProjectImage(TestImage): def test_remove_project_image_no_options(self): arglist = [ - image_fakes.image_id, + self._image.id, identity_fakes.project_id, ] verifylist = [ - ('image', image_fakes.image_id), + ('image', self._image.id), ('project', identity_fakes.project_id), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -754,19 +759,19 @@ class TestRemoveProjectImage(TestImage): result = self.cmd.take_action(parsed_args) self.image_members_mock.delete.assert_called_with( - image_fakes.image_id, + self._image.id, identity_fakes.project_id, ) self.assertIsNone(result) def test_remove_project_image_with_options(self): arglist = [ - image_fakes.image_id, + self._image.id, identity_fakes.project_id, '--project-domain', identity_fakes.domain_id, ] verifylist = [ - ('image', image_fakes.image_id), + ('image', self._image.id), ('project', identity_fakes.project_id), ('project_domain', identity_fakes.domain_id), ] @@ -775,7 +780,7 @@ class TestRemoveProjectImage(TestImage): result = self.cmd.take_action(parsed_args) self.image_members_mock.delete.assert_called_with( - image_fakes.image_id, + self._image.id, identity_fakes.project_id, ) self.assertIsNone(result) diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py index 7c4604bd..84ede381 100644 --- a/openstackclient/tests/network/v2/fakes.py +++ b/openstackclient/tests/network/v2/fakes.py @@ -184,8 +184,7 @@ class FakeNetwork(object): :param Dictionary attrs: A dictionary with all attributes :return: - A FakeResource object, with id, name, admin_state_up, - router_external, status, subnets, tenant_id + A FakeResource object, with id, name, etc. """ attrs = attrs or {} @@ -199,7 +198,7 @@ class FakeNetwork(object): 'shared': False, 'subnets': ['a', 'b'], 'provider_network_type': 'vlan', - 'router_external': True, + 'router:external': True, 'availability_zones': [], 'availability_zone_hints': [], 'is_default': False, @@ -213,6 +212,7 @@ class FakeNetwork(object): # Set attributes with special mapping in OpenStack SDK. network.project_id = network_attrs['tenant_id'] + network.is_router_external = network_attrs['router:external'] return network @@ -496,8 +496,8 @@ class FakeSecurityGroupRule(object): 'direction': 'ingress', 'ethertype': 'IPv4', 'id': 'security-group-rule-id-' + uuid.uuid4().hex, - 'port_range_max': 0, - 'port_range_min': 0, + 'port_range_max': None, + 'port_range_min': None, 'protocol': 'tcp', 'remote_group_id': None, 'remote_ip_prefix': '0.0.0.0/0', diff --git a/openstackclient/tests/network/v2/test_network.py b/openstackclient/tests/network/v2/test_network.py index 1269b0a1..ba810f16 100644 --- a/openstackclient/tests/network/v2/test_network.py +++ b/openstackclient/tests/network/v2/test_network.py @@ -14,6 +14,7 @@ import copy import mock +from mock import call from openstackclient.common import exceptions from openstackclient.common import utils from openstackclient.network.v2 import network @@ -55,7 +56,7 @@ class TestCreateNetworkIdentityV3(TestNetwork): 'name', 'project_id', 'provider_network_type', - 'router_external', + 'router:external', 'shared', 'status', 'subnets', @@ -70,7 +71,7 @@ class TestCreateNetworkIdentityV3(TestNetwork): _network.name, _network.project_id, _network.provider_network_type, - network._format_router_external(_network.router_external), + network._format_router_external(_network.is_router_external), _network.shared, _network.status, utils.format_list(_network.subnets), @@ -148,6 +149,7 @@ class TestCreateNetworkIdentityV3(TestNetwork): "--provider-network-type", "vlan", "--provider-physical-network", "physnet1", "--provider-segment", "400", + "--transparent-vlan", self._network.name, ] verifylist = [ @@ -161,6 +163,7 @@ class TestCreateNetworkIdentityV3(TestNetwork): ('provider_network_type', 'vlan'), ('physical_network', 'physnet1'), ('segmentation_id', '400'), + ('transparent_vlan', True), ('name', self._network.name), ] @@ -178,6 +181,7 @@ class TestCreateNetworkIdentityV3(TestNetwork): 'provider:network_type': 'vlan', 'provider:physical_network': 'physnet1', 'provider:segmentation_id': '400', + 'vlan_transparent': True, }) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) @@ -223,7 +227,7 @@ class TestCreateNetworkIdentityV2(TestNetwork): 'name', 'project_id', 'provider_network_type', - 'router_external', + 'router:external', 'shared', 'status', 'subnets', @@ -238,7 +242,7 @@ class TestCreateNetworkIdentityV2(TestNetwork): _network.name, _network.project_id, _network.provider_network_type, - network._format_router_external(_network.router_external), + network._format_router_external(_network.is_router_external), _network.shared, _network.status, utils.format_list(_network.subnets), @@ -320,33 +324,88 @@ class TestCreateNetworkIdentityV2(TestNetwork): class TestDeleteNetwork(TestNetwork): - # The network to delete. - _network = network_fakes.FakeNetwork.create_one_network() - def setUp(self): super(TestDeleteNetwork, self).setUp() + # The networks to delete + self._networks = network_fakes.FakeNetwork.create_networks(count=3) + self.network.delete_network = mock.Mock(return_value=None) - self.network.find_network = mock.Mock(return_value=self._network) + self.network.find_network = network_fakes.FakeNetwork.get_networks( + networks=self._networks) # Get the command object to test self.cmd = network.DeleteNetwork(self.app, self.namespace) - def test_delete(self): + def test_delete_one_network(self): arglist = [ - self._network.name, + self._networks[0].name, ] verifylist = [ - ('network', [self._network.name]), + ('network', [self._networks[0].name]), ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.delete_network.assert_called_once_with(self._networks[0]) + self.assertIsNone(result) + + def test_delete_multiple_networks(self): + arglist = [] + for n in self._networks: + arglist.append(n.id) + verifylist = [ + ('network', arglist), + ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) - self.network.delete_network.assert_called_once_with(self._network) + calls = [] + for n in self._networks: + calls.append(call(n)) + self.network.delete_network.assert_has_calls(calls) self.assertIsNone(result) + def test_delete_multiple_networks_exception(self): + arglist = [ + self._networks[0].id, + 'xxxx-yyyy-zzzz', + self._networks[1].id, + ] + verifylist = [ + ('network', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # Fake exception in find_network() + ret_find = [ + self._networks[0], + exceptions.NotFound('404'), + self._networks[1], + ] + self.network.find_network = mock.Mock(side_effect=ret_find) + + # Fake exception in delete_network() + ret_delete = [ + None, + exceptions.NotFound('404'), + ] + self.network.delete_network = mock.Mock(side_effect=ret_delete) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + # The second call of find_network() should fail. So delete_network() + # was only called twice. + calls = [ + call(self._networks[0]), + call(self._networks[1]), + ] + self.network.delete_network.assert_has_calls(calls) + class TestListNetwork(TestNetwork): @@ -390,7 +449,7 @@ class TestListNetwork(TestNetwork): net.shared, utils.format_list(net.subnets), net.provider_network_type, - network._format_router_external(net.router_external), + network._format_router_external(net.is_router_external), utils.format_list(net.availability_zones), )) @@ -486,6 +545,7 @@ class TestSetNetwork(TestNetwork): '--provider-network-type', 'vlan', '--provider-physical-network', 'physnet1', '--provider-segment', '400', + '--no-transparent-vlan', ] verifylist = [ ('network', self._network.name), @@ -497,6 +557,7 @@ class TestSetNetwork(TestNetwork): ('provider_network_type', 'vlan'), ('physical_network', 'physnet1'), ('segmentation_id', '400'), + ('no_transparent_vlan', True), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -511,6 +572,7 @@ class TestSetNetwork(TestNetwork): 'provider:network_type': 'vlan', 'provider:physical_network': 'physnet1', 'provider:segmentation_id': '400', + 'vlan_transparent': False, } self.network.update_network.assert_called_once_with( self._network, **attrs) @@ -565,7 +627,7 @@ class TestShowNetwork(TestNetwork): 'name', 'project_id', 'provider_network_type', - 'router_external', + 'router:external', 'shared', 'status', 'subnets', @@ -580,7 +642,7 @@ class TestShowNetwork(TestNetwork): _network.name, _network.project_id, _network.provider_network_type, - network._format_router_external(_network.router_external), + network._format_router_external(_network.is_router_external), _network.shared, _network.status, utils.format_list(_network.subnets), @@ -746,36 +808,97 @@ class TestCreateNetworkCompute(TestNetworkCompute): class TestDeleteNetworkCompute(TestNetworkCompute): - # The network to delete. - _network = compute_fakes.FakeNetwork.create_one_network() - def setUp(self): super(TestDeleteNetworkCompute, self).setUp() self.app.client_manager.network_endpoint_enabled = False + # The networks to delete + self._networks = compute_fakes.FakeNetwork.create_networks(count=3) + self.compute.networks.delete.return_value = None # Return value of utils.find_resource() - self.compute.networks.get.return_value = self._network + self.compute.networks.get = \ + compute_fakes.FakeNetwork.get_networks(networks=self._networks) # Get the command object to test self.cmd = network.DeleteNetwork(self.app, None) - def test_network_delete(self): + def test_delete_one_network(self): arglist = [ - self._network.label, + self._networks[0].label, ] verifylist = [ - ('network', [self._network.label]), + ('network', [self._networks[0].label]), ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.compute.networks.delete.assert_called_once_with( + self._networks[0].id) + self.assertIsNone(result) + + def test_delete_multiple_networks(self): + arglist = [] + for n in self._networks: + arglist.append(n.label) + verifylist = [ + ('network', arglist), + ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) - self.compute.networks.delete.assert_called_once_with(self._network.id) + calls = [] + for n in self._networks: + calls.append(call(n.id)) + self.compute.networks.delete.assert_has_calls(calls) self.assertIsNone(result) + def test_delete_multiple_networks_exception(self): + arglist = [ + self._networks[0].id, + 'xxxx-yyyy-zzzz', + self._networks[1].id, + ] + verifylist = [ + ('network', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # Fake exception in utils.find_resource() + # In compute v2, we use utils.find_resource() to find a network. + # It calls get() several times, but find() only one time. So we + # choose to fake get() always raise exception, then pass through. + # And fake find() to find the real network or not. + self.compute.networks.get.side_effect = Exception() + ret_find = [ + self._networks[0], + Exception(), + self._networks[1], + ] + self.compute.networks.find.side_effect = ret_find + + # Fake exception in delete() + ret_delete = [ + None, + Exception(), + ] + self.compute.networks.delete = mock.Mock(side_effect=ret_delete) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + # The second call of utils.find_resource() should fail. So delete() + # was only called twice. + calls = [ + call(self._networks[0].id), + call(self._networks[1].id), + ] + self.compute.networks.delete.assert_has_calls(calls) + class TestListNetworkCompute(TestNetworkCompute): diff --git a/openstackclient/tests/network/v2/test_router.py b/openstackclient/tests/network/v2/test_router.py index 655e86c9..99b41d2d 100644 --- a/openstackclient/tests/network/v2/test_router.py +++ b/openstackclient/tests/network/v2/test_router.py @@ -495,6 +495,42 @@ class TestSetRouter(TestRouter): self._router, **attrs) self.assertIsNone(result) + def test_set_no_route(self): + arglist = [ + self._router.name, + '--no-route', + ] + verifylist = [ + ('router', self._router.name), + ('no_route', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'routes': [], + } + self.network.update_router.assert_called_once_with( + self._router, **attrs) + self.assertIsNone(result) + + def test_set_route_no_route(self): + arglist = [ + self._router.name, + '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1', + '--no-route', + ] + verifylist = [ + ('router', self._router.name), + ('routes', [{'destination': '10.20.30.0/24', + 'gateway': '10.20.30.1'}]), + ('no_route', True), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + def test_set_clear_routes(self): arglist = [ self._router.name, diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py index bd903f9e..2a64b884 100644 --- a/openstackclient/tests/network/v2/test_security_group_rule.py +++ b/openstackclient/tests/network/v2/test_security_group_rule.py @@ -14,6 +14,7 @@ import copy import mock +from openstackclient.common import exceptions from openstackclient.network import utils as network_utils from openstackclient.network.v2 import security_group_rule from openstackclient.tests.compute.v2 import fakes as compute_fakes @@ -131,22 +132,40 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_create_bad_protocol(self): + def test_create_bad_ethertype(self): arglist = [ - '--protocol', 'foo', + '--ethertype', 'foo', self._security_group.id, ] self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_create_bad_ethertype(self): + def test_create_all_protocol_options(self): arglist = [ - '--ethertype', 'foo', + '--protocol', 'tcp', + '--proto', 'tcp', self._security_group.id, ] self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) + def test_create_all_port_range_options(self): + arglist = [ + '--dst-port', '80:80', + '--icmp-type', '3', + '--icmp-code', '1', + self._security_group.id, + ] + verifylist = [ + ('dst_port', (80, 80)), + ('icmp_type', 3), + ('icmp_code', 1), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + def test_create_default_rule(self): self._setup_security_group_rule({ 'port_range_max': 443, @@ -177,6 +196,36 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertEqual(self.expected_columns, columns) self.assertEqual(self.expected_data, data) + def test_create_proto_option(self): + self._setup_security_group_rule({ + 'protocol': 'icmp', + 'remote_ip_prefix': '10.0.2.0/24', + }) + arglist = [ + '--proto', self._security_group_rule.protocol, + '--src-ip', self._security_group_rule.remote_ip_prefix, + self._security_group.id, + ] + verifylist = [ + ('proto', self._security_group_rule.protocol), + ('protocol', None), + ('src_ip', self._security_group_rule.remote_ip_prefix), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'protocol': self._security_group_rule.protocol, + 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + def test_create_source_group(self): self._setup_security_group_rule({ 'port_range_max': 22, @@ -215,17 +264,15 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): def test_create_source_ip(self): self._setup_security_group_rule({ 'protocol': 'icmp', - 'port_range_max': -1, - 'port_range_min': -1, 'remote_ip_prefix': '10.0.2.0/24', }) arglist = [ - '--proto', self._security_group_rule.protocol, + '--protocol', self._security_group_rule.protocol, '--src-ip', self._security_group_rule.remote_ip_prefix, self._security_group.id, ] verifylist = [ - ('proto', self._security_group_rule.protocol), + ('protocol', self._security_group_rule.protocol), ('src_ip', self._security_group_rule.remote_ip_prefix), ('group', self._security_group.id), ] @@ -249,6 +296,7 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): 'ethertype': 'IPv6', 'port_range_max': 443, 'port_range_min': 443, + 'protocol': '6', 'remote_group_id': None, 'remote_ip_prefix': None, }) @@ -258,6 +306,7 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): '--ethertype', self._security_group_rule.ethertype, '--project', identity_fakes.project_name, '--project-domain', identity_fakes.domain_name, + '--protocol', self._security_group_rule.protocol, self._security_group.id, ] verifylist = [ @@ -267,6 +316,7 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): ('ethertype', self._security_group_rule.ethertype), ('project', identity_fakes.project_name), ('project_domain', identity_fakes.domain_name), + ('protocol', self._security_group_rule.protocol), ('group', self._security_group.id), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -285,6 +335,136 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertEqual(self.expected_columns, columns) self.assertEqual(self.expected_data, data) + def test_create_tcp_with_icmp_type(self): + arglist = [ + '--protocol', 'tcp', + '--icmp-type', '15', + self._security_group.id, + ] + verifylist = [ + ('protocol', 'tcp'), + ('icmp_type', 15), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_create_icmp_code(self): + arglist = [ + '--protocol', '1', + '--icmp-code', '1', + self._security_group.id, + ] + verifylist = [ + ('protocol', '1'), + ('icmp_code', 1), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + def test_create_icmp_type(self): + self._setup_security_group_rule({ + 'port_range_min': 15, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + }) + arglist = [ + '--icmp-type', str(self._security_group_rule.port_range_min), + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._security_group_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_ipv6_icmp_type_code(self): + self._setup_security_group_rule({ + 'ethertype': 'IPv6', + 'port_range_min': 139, + 'port_range_max': 2, + 'protocol': 'ipv6-icmp', + }) + arglist = [ + '--icmp-type', str(self._security_group_rule.port_range_min), + '--icmp-code', str(self._security_group_rule.port_range_max), + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._security_group_rule.port_range_min), + ('icmp_code', self._security_group_rule.port_range_max), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_min': self._security_group_rule.port_range_min, + 'port_range_max': self._security_group_rule.port_range_max, + 'protocol': self._security_group_rule.protocol, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmpv6_type(self): + self._setup_security_group_rule({ + 'ethertype': 'IPv6', + 'port_range_min': 139, + 'protocol': 'icmpv6', + }) + arglist = [ + '--icmp-type', str(self._security_group_rule.port_range_min), + '--protocol', self._security_group_rule.protocol, + self._security_group.id, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._security_group_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._security_group_rule.protocol), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_security_group_rule.assert_called_once_with(**{ + 'direction': self._security_group_rule.direction, + 'ethertype': self._security_group_rule.ethertype, + 'port_range_min': self._security_group_rule.port_range_min, + 'protocol': self._security_group_rule.protocol, + 'security_group_id': self._security_group.id, + }) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): @@ -337,10 +517,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) + def test_create_all_protocol_options(self): + arglist = [ + '--protocol', 'tcp', + '--proto', 'tcp', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + def test_create_network_options(self): arglist = [ '--ingress', '--ethertype', 'IPv4', + '--icmp-type', '3', + '--icmp-code', '11', '--project', identity_fakes.project_name, '--project-domain', identity_fakes.domain_name, self._security_group.id, @@ -416,12 +607,45 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): 'ip_range': {'cidr': '10.0.2.0/24'}, }) arglist = [ + '--protocol', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ] + verifylist = [ + ('protocol', self._security_group_rule.ip_protocol), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_proto_option(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ '--proto', self._security_group_rule.ip_protocol, '--src-ip', self._security_group_rule.ip_range['cidr'], self._security_group.id, ] verifylist = [ ('proto', self._security_group_rule.ip_protocol), + ('protocol', None), ('src_ip', self._security_group_rule.ip_range['cidr']), ('group', self._security_group.id), ] @@ -522,8 +746,6 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): _security_group_rule_icmp = \ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ 'protocol': 'icmp', - 'port_range_max': -1, - 'port_range_min': -1, 'remote_ip_prefix': '10.0.2.0/24', 'security_group_id': _security_group.id, }) |
