summaryrefslogtreecommitdiff
path: root/openstackclient/tests
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests')
-rw-r--r--openstackclient/tests/compute/v2/fakes.py19
-rw-r--r--openstackclient/tests/image/v2/test_image.py91
-rw-r--r--openstackclient/tests/network/v2/fakes.py10
-rw-r--r--openstackclient/tests/network/v2/test_network.py169
-rw-r--r--openstackclient/tests/network/v2/test_router.py36
-rw-r--r--openstackclient/tests/network/v2/test_security_group_rule.py242
6 files changed, 486 insertions, 81 deletions
diff --git a/openstackclient/tests/compute/v2/fakes.py b/openstackclient/tests/compute/v2/fakes.py
index b3f3fb49..70fc386f 100644
--- a/openstackclient/tests/compute/v2/fakes.py
+++ b/openstackclient/tests/compute/v2/fakes.py
@@ -859,6 +859,25 @@ class FakeNetwork(object):
return networks
+ @staticmethod
+ def get_networks(networks=None, count=2):
+ """Get an iterable MagicMock object with a list of faked networks.
+
+ If networks list is provided, then initialize the Mock object with the
+ list. Otherwise create one.
+
+ :param List networks:
+ A list of FakeResource objects faking networks
+ :param int count:
+ The number of networks to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ networks
+ """
+ if networks is None:
+ networks = FakeNetwork.create_networks(count=count)
+ return mock.Mock(side_effect=networks)
+
class FakeHost(object):
"""Fake one host."""
diff --git a/openstackclient/tests/image/v2/test_image.py b/openstackclient/tests/image/v2/test_image.py
index 0248f30b..33f25331 100644
--- a/openstackclient/tests/image/v2/test_image.py
+++ b/openstackclient/tests/image/v2/test_image.py
@@ -20,6 +20,7 @@ import warlock
from glanceclient.v2 import schemas
from openstackclient.common import exceptions
+from openstackclient.common import utils as common_utils
from openstackclient.image.v2 import image
from openstackclient.tests import fakes
from openstackclient.tests.identity.v3 import fakes as identity_fakes
@@ -341,28 +342,31 @@ class TestImageCreate(TestImage):
class TestAddProjectToImage(TestImage):
+ _image = image_fakes.FakeImage.create_one_image()
+
columns = (
'image_id',
'member_id',
'status',
)
+
datalist = (
- image_fakes.image_id,
+ _image.id,
identity_fakes.project_id,
- image_fakes.member_status,
+ image_fakes.member_status
)
def setUp(self):
super(TestAddProjectToImage, self).setUp()
# This is the return value for utils.find_resource()
- self.images_mock.get.return_value = fakes.FakeResource(
- None,
- copy.deepcopy(image_fakes.IMAGE),
- loaded=True,
- )
+ self.images_mock.get.return_value = self._image
+
+ # Update the image_id in the MEMBER dict
+ self.new_member = copy.deepcopy(image_fakes.MEMBER)
+ self.new_member['image_id'] = self._image.id
self.image_members_mock.create.return_value = fakes.FakeModel(
- copy.deepcopy(image_fakes.MEMBER),
+ self.new_member,
)
self.project_mock.get.return_value = fakes.FakeResource(
None,
@@ -379,11 +383,11 @@ class TestAddProjectToImage(TestImage):
def test_add_project_to_image_no_option(self):
arglist = [
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id,
]
verifylist = [
- ('image', image_fakes.image_id),
+ ('image', self._image.id),
('project', identity_fakes.project_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -393,20 +397,21 @@ class TestAddProjectToImage(TestImage):
# data to be shown.
columns, data = self.cmd.take_action(parsed_args)
self.image_members_mock.create.assert_called_with(
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id
)
+
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, data)
def test_add_project_to_image_with_option(self):
arglist = [
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id,
'--project-domain', identity_fakes.domain_id,
]
verifylist = [
- ('image', image_fakes.image_id),
+ ('image', self._image.id),
('project', identity_fakes.project_id),
('project_domain', identity_fakes.domain_id),
]
@@ -417,7 +422,7 @@ class TestAddProjectToImage(TestImage):
# data to be shown.
columns, data = self.cmd.take_action(parsed_args)
self.image_members_mock.create.assert_called_with(
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id
)
self.assertEqual(self.columns, columns)
@@ -468,25 +473,26 @@ class TestImageDelete(TestImage):
class TestImageList(TestImage):
+ _image = image_fakes.FakeImage.create_one_image()
+
columns = (
'ID',
'Name',
'Status',
)
+
datalist = (
- (
- image_fakes.image_id,
- image_fakes.image_name,
- '',
- ),
- )
+ _image.id,
+ _image.name,
+ '',
+ ),
def setUp(self):
super(TestImageList, self).setUp()
self.api_mock = mock.Mock()
self.api_mock.image_list.side_effect = [
- [copy.deepcopy(image_fakes.IMAGE)], [],
+ [image_fakes.FakeImage.get_image_info(self._image)], [],
]
self.app.client_manager.image.api = self.api_mock
@@ -611,23 +617,24 @@ class TestImageList(TestImage):
self.assertEqual(collist, columns)
datalist = ((
- image_fakes.image_id,
- image_fakes.image_name,
+ self._image.id,
+ self._image.name,
'',
'',
'',
'',
- 'public',
- False,
- image_fakes.image_owner,
- '',
+ self._image.visibility,
+ self._image.protected,
+ self._image.owner,
+ common_utils.format_list(self._image.tags),
), )
self.assertEqual(datalist, tuple(data))
@mock.patch('openstackclient.api.utils.simple_filter')
def test_image_list_property_option(self, sf_mock):
sf_mock.return_value = [
- copy.deepcopy(image_fakes.IMAGE),
+ copy.deepcopy(
+ image_fakes.FakeImage.get_image_info(self._image)),
]
arglist = [
@@ -644,7 +651,7 @@ class TestImageList(TestImage):
columns, data = self.cmd.take_action(parsed_args)
self.api_mock.image_list.assert_called_with()
sf_mock.assert_called_with(
- [image_fakes.IMAGE],
+ [image_fakes.FakeImage.get_image_info(self._image)],
attr='a',
value='1',
property_field='properties',
@@ -656,7 +663,8 @@ class TestImageList(TestImage):
@mock.patch('openstackclient.common.utils.sort_items')
def test_image_list_sort_option(self, si_mock):
si_mock.return_value = [
- copy.deepcopy(image_fakes.IMAGE)
+ copy.deepcopy(
+ image_fakes.FakeImage.get_image_info(self._image))
]
arglist = ['--sort', 'name:asc']
@@ -669,10 +677,9 @@ class TestImageList(TestImage):
columns, data = self.cmd.take_action(parsed_args)
self.api_mock.image_list.assert_called_with()
si_mock.assert_called_with(
- [image_fakes.IMAGE],
+ [image_fakes.FakeImage.get_image_info(self._image)],
'name:asc'
)
-
self.assertEqual(self.columns, columns)
self.assertEqual(self.datalist, tuple(data))
@@ -720,12 +727,10 @@ class TestRemoveProjectImage(TestImage):
def setUp(self):
super(TestRemoveProjectImage, self).setUp()
+ self._image = image_fakes.FakeImage.create_one_image()
# This is the return value for utils.find_resource()
- self.images_mock.get.return_value = fakes.FakeResource(
- None,
- copy.deepcopy(image_fakes.IMAGE),
- loaded=True,
- )
+ self.images_mock.get.return_value = self._image
+
self.project_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
@@ -742,11 +747,11 @@ class TestRemoveProjectImage(TestImage):
def test_remove_project_image_no_options(self):
arglist = [
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id,
]
verifylist = [
- ('image', image_fakes.image_id),
+ ('image', self._image.id),
('project', identity_fakes.project_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -754,19 +759,19 @@ class TestRemoveProjectImage(TestImage):
result = self.cmd.take_action(parsed_args)
self.image_members_mock.delete.assert_called_with(
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id,
)
self.assertIsNone(result)
def test_remove_project_image_with_options(self):
arglist = [
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id,
'--project-domain', identity_fakes.domain_id,
]
verifylist = [
- ('image', image_fakes.image_id),
+ ('image', self._image.id),
('project', identity_fakes.project_id),
('project_domain', identity_fakes.domain_id),
]
@@ -775,7 +780,7 @@ class TestRemoveProjectImage(TestImage):
result = self.cmd.take_action(parsed_args)
self.image_members_mock.delete.assert_called_with(
- image_fakes.image_id,
+ self._image.id,
identity_fakes.project_id,
)
self.assertIsNone(result)
diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py
index 7c4604bd..84ede381 100644
--- a/openstackclient/tests/network/v2/fakes.py
+++ b/openstackclient/tests/network/v2/fakes.py
@@ -184,8 +184,7 @@ class FakeNetwork(object):
:param Dictionary attrs:
A dictionary with all attributes
:return:
- A FakeResource object, with id, name, admin_state_up,
- router_external, status, subnets, tenant_id
+ A FakeResource object, with id, name, etc.
"""
attrs = attrs or {}
@@ -199,7 +198,7 @@ class FakeNetwork(object):
'shared': False,
'subnets': ['a', 'b'],
'provider_network_type': 'vlan',
- 'router_external': True,
+ 'router:external': True,
'availability_zones': [],
'availability_zone_hints': [],
'is_default': False,
@@ -213,6 +212,7 @@ class FakeNetwork(object):
# Set attributes with special mapping in OpenStack SDK.
network.project_id = network_attrs['tenant_id']
+ network.is_router_external = network_attrs['router:external']
return network
@@ -496,8 +496,8 @@ class FakeSecurityGroupRule(object):
'direction': 'ingress',
'ethertype': 'IPv4',
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
- 'port_range_max': 0,
- 'port_range_min': 0,
+ 'port_range_max': None,
+ 'port_range_min': None,
'protocol': 'tcp',
'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0',
diff --git a/openstackclient/tests/network/v2/test_network.py b/openstackclient/tests/network/v2/test_network.py
index 1269b0a1..ba810f16 100644
--- a/openstackclient/tests/network/v2/test_network.py
+++ b/openstackclient/tests/network/v2/test_network.py
@@ -14,6 +14,7 @@
import copy
import mock
+from mock import call
from openstackclient.common import exceptions
from openstackclient.common import utils
from openstackclient.network.v2 import network
@@ -55,7 +56,7 @@ class TestCreateNetworkIdentityV3(TestNetwork):
'name',
'project_id',
'provider_network_type',
- 'router_external',
+ 'router:external',
'shared',
'status',
'subnets',
@@ -70,7 +71,7 @@ class TestCreateNetworkIdentityV3(TestNetwork):
_network.name,
_network.project_id,
_network.provider_network_type,
- network._format_router_external(_network.router_external),
+ network._format_router_external(_network.is_router_external),
_network.shared,
_network.status,
utils.format_list(_network.subnets),
@@ -148,6 +149,7 @@ class TestCreateNetworkIdentityV3(TestNetwork):
"--provider-network-type", "vlan",
"--provider-physical-network", "physnet1",
"--provider-segment", "400",
+ "--transparent-vlan",
self._network.name,
]
verifylist = [
@@ -161,6 +163,7 @@ class TestCreateNetworkIdentityV3(TestNetwork):
('provider_network_type', 'vlan'),
('physical_network', 'physnet1'),
('segmentation_id', '400'),
+ ('transparent_vlan', True),
('name', self._network.name),
]
@@ -178,6 +181,7 @@ class TestCreateNetworkIdentityV3(TestNetwork):
'provider:network_type': 'vlan',
'provider:physical_network': 'physnet1',
'provider:segmentation_id': '400',
+ 'vlan_transparent': True,
})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
@@ -223,7 +227,7 @@ class TestCreateNetworkIdentityV2(TestNetwork):
'name',
'project_id',
'provider_network_type',
- 'router_external',
+ 'router:external',
'shared',
'status',
'subnets',
@@ -238,7 +242,7 @@ class TestCreateNetworkIdentityV2(TestNetwork):
_network.name,
_network.project_id,
_network.provider_network_type,
- network._format_router_external(_network.router_external),
+ network._format_router_external(_network.is_router_external),
_network.shared,
_network.status,
utils.format_list(_network.subnets),
@@ -320,33 +324,88 @@ class TestCreateNetworkIdentityV2(TestNetwork):
class TestDeleteNetwork(TestNetwork):
- # The network to delete.
- _network = network_fakes.FakeNetwork.create_one_network()
-
def setUp(self):
super(TestDeleteNetwork, self).setUp()
+ # The networks to delete
+ self._networks = network_fakes.FakeNetwork.create_networks(count=3)
+
self.network.delete_network = mock.Mock(return_value=None)
- self.network.find_network = mock.Mock(return_value=self._network)
+ self.network.find_network = network_fakes.FakeNetwork.get_networks(
+ networks=self._networks)
# Get the command object to test
self.cmd = network.DeleteNetwork(self.app, self.namespace)
- def test_delete(self):
+ def test_delete_one_network(self):
arglist = [
- self._network.name,
+ self._networks[0].name,
]
verifylist = [
- ('network', [self._network.name]),
+ ('network', [self._networks[0].name]),
]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.delete_network.assert_called_once_with(self._networks[0])
+ self.assertIsNone(result)
+
+ def test_delete_multiple_networks(self):
+ arglist = []
+ for n in self._networks:
+ arglist.append(n.id)
+ verifylist = [
+ ('network', arglist),
+ ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
result = self.cmd.take_action(parsed_args)
- self.network.delete_network.assert_called_once_with(self._network)
+ calls = []
+ for n in self._networks:
+ calls.append(call(n))
+ self.network.delete_network.assert_has_calls(calls)
self.assertIsNone(result)
+ def test_delete_multiple_networks_exception(self):
+ arglist = [
+ self._networks[0].id,
+ 'xxxx-yyyy-zzzz',
+ self._networks[1].id,
+ ]
+ verifylist = [
+ ('network', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # Fake exception in find_network()
+ ret_find = [
+ self._networks[0],
+ exceptions.NotFound('404'),
+ self._networks[1],
+ ]
+ self.network.find_network = mock.Mock(side_effect=ret_find)
+
+ # Fake exception in delete_network()
+ ret_delete = [
+ None,
+ exceptions.NotFound('404'),
+ ]
+ self.network.delete_network = mock.Mock(side_effect=ret_delete)
+
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ # The second call of find_network() should fail. So delete_network()
+ # was only called twice.
+ calls = [
+ call(self._networks[0]),
+ call(self._networks[1]),
+ ]
+ self.network.delete_network.assert_has_calls(calls)
+
class TestListNetwork(TestNetwork):
@@ -390,7 +449,7 @@ class TestListNetwork(TestNetwork):
net.shared,
utils.format_list(net.subnets),
net.provider_network_type,
- network._format_router_external(net.router_external),
+ network._format_router_external(net.is_router_external),
utils.format_list(net.availability_zones),
))
@@ -486,6 +545,7 @@ class TestSetNetwork(TestNetwork):
'--provider-network-type', 'vlan',
'--provider-physical-network', 'physnet1',
'--provider-segment', '400',
+ '--no-transparent-vlan',
]
verifylist = [
('network', self._network.name),
@@ -497,6 +557,7 @@ class TestSetNetwork(TestNetwork):
('provider_network_type', 'vlan'),
('physical_network', 'physnet1'),
('segmentation_id', '400'),
+ ('no_transparent_vlan', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -511,6 +572,7 @@ class TestSetNetwork(TestNetwork):
'provider:network_type': 'vlan',
'provider:physical_network': 'physnet1',
'provider:segmentation_id': '400',
+ 'vlan_transparent': False,
}
self.network.update_network.assert_called_once_with(
self._network, **attrs)
@@ -565,7 +627,7 @@ class TestShowNetwork(TestNetwork):
'name',
'project_id',
'provider_network_type',
- 'router_external',
+ 'router:external',
'shared',
'status',
'subnets',
@@ -580,7 +642,7 @@ class TestShowNetwork(TestNetwork):
_network.name,
_network.project_id,
_network.provider_network_type,
- network._format_router_external(_network.router_external),
+ network._format_router_external(_network.is_router_external),
_network.shared,
_network.status,
utils.format_list(_network.subnets),
@@ -746,36 +808,97 @@ class TestCreateNetworkCompute(TestNetworkCompute):
class TestDeleteNetworkCompute(TestNetworkCompute):
- # The network to delete.
- _network = compute_fakes.FakeNetwork.create_one_network()
-
def setUp(self):
super(TestDeleteNetworkCompute, self).setUp()
self.app.client_manager.network_endpoint_enabled = False
+ # The networks to delete
+ self._networks = compute_fakes.FakeNetwork.create_networks(count=3)
+
self.compute.networks.delete.return_value = None
# Return value of utils.find_resource()
- self.compute.networks.get.return_value = self._network
+ self.compute.networks.get = \
+ compute_fakes.FakeNetwork.get_networks(networks=self._networks)
# Get the command object to test
self.cmd = network.DeleteNetwork(self.app, None)
- def test_network_delete(self):
+ def test_delete_one_network(self):
arglist = [
- self._network.label,
+ self._networks[0].label,
]
verifylist = [
- ('network', [self._network.label]),
+ ('network', [self._networks[0].label]),
]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.compute.networks.delete.assert_called_once_with(
+ self._networks[0].id)
+ self.assertIsNone(result)
+
+ def test_delete_multiple_networks(self):
+ arglist = []
+ for n in self._networks:
+ arglist.append(n.label)
+ verifylist = [
+ ('network', arglist),
+ ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
result = self.cmd.take_action(parsed_args)
- self.compute.networks.delete.assert_called_once_with(self._network.id)
+ calls = []
+ for n in self._networks:
+ calls.append(call(n.id))
+ self.compute.networks.delete.assert_has_calls(calls)
self.assertIsNone(result)
+ def test_delete_multiple_networks_exception(self):
+ arglist = [
+ self._networks[0].id,
+ 'xxxx-yyyy-zzzz',
+ self._networks[1].id,
+ ]
+ verifylist = [
+ ('network', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # Fake exception in utils.find_resource()
+ # In compute v2, we use utils.find_resource() to find a network.
+ # It calls get() several times, but find() only one time. So we
+ # choose to fake get() always raise exception, then pass through.
+ # And fake find() to find the real network or not.
+ self.compute.networks.get.side_effect = Exception()
+ ret_find = [
+ self._networks[0],
+ Exception(),
+ self._networks[1],
+ ]
+ self.compute.networks.find.side_effect = ret_find
+
+ # Fake exception in delete()
+ ret_delete = [
+ None,
+ Exception(),
+ ]
+ self.compute.networks.delete = mock.Mock(side_effect=ret_delete)
+
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ # The second call of utils.find_resource() should fail. So delete()
+ # was only called twice.
+ calls = [
+ call(self._networks[0].id),
+ call(self._networks[1].id),
+ ]
+ self.compute.networks.delete.assert_has_calls(calls)
+
class TestListNetworkCompute(TestNetworkCompute):
diff --git a/openstackclient/tests/network/v2/test_router.py b/openstackclient/tests/network/v2/test_router.py
index 655e86c9..99b41d2d 100644
--- a/openstackclient/tests/network/v2/test_router.py
+++ b/openstackclient/tests/network/v2/test_router.py
@@ -495,6 +495,42 @@ class TestSetRouter(TestRouter):
self._router, **attrs)
self.assertIsNone(result)
+ def test_set_no_route(self):
+ arglist = [
+ self._router.name,
+ '--no-route',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('no_route', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'routes': [],
+ }
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_route_no_route(self):
+ arglist = [
+ self._router.name,
+ '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
+ '--no-route',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('routes', [{'destination': '10.20.30.0/24',
+ 'gateway': '10.20.30.1'}]),
+ ('no_route', True),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
def test_set_clear_routes(self):
arglist = [
self._router.name,
diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py
index bd903f9e..2a64b884 100644
--- a/openstackclient/tests/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/network/v2/test_security_group_rule.py
@@ -14,6 +14,7 @@
import copy
import mock
+from openstackclient.common import exceptions
from openstackclient.network import utils as network_utils
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.compute.v2 import fakes as compute_fakes
@@ -131,22 +132,40 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_bad_protocol(self):
+ def test_create_bad_ethertype(self):
arglist = [
- '--protocol', 'foo',
+ '--ethertype', 'foo',
self._security_group.id,
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_bad_ethertype(self):
+ def test_create_all_protocol_options(self):
arglist = [
- '--ethertype', 'foo',
+ '--protocol', 'tcp',
+ '--proto', 'tcp',
self._security_group.id,
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
+ def test_create_all_port_range_options(self):
+ arglist = [
+ '--dst-port', '80:80',
+ '--icmp-type', '3',
+ '--icmp-code', '1',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (80, 80)),
+ ('icmp_type', 3),
+ ('icmp_code', 1),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
def test_create_default_rule(self):
self._setup_security_group_rule({
'port_range_max': 443,
@@ -177,6 +196,36 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
self.assertEqual(self.expected_columns, columns)
self.assertEqual(self.expected_data, data)
+ def test_create_proto_option(self):
+ self._setup_security_group_rule({
+ 'protocol': 'icmp',
+ 'remote_ip_prefix': '10.0.2.0/24',
+ })
+ arglist = [
+ '--proto', self._security_group_rule.protocol,
+ '--src-ip', self._security_group_rule.remote_ip_prefix,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('proto', self._security_group_rule.protocol),
+ ('protocol', None),
+ ('src_ip', self._security_group_rule.remote_ip_prefix),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
def test_create_source_group(self):
self._setup_security_group_rule({
'port_range_max': 22,
@@ -215,17 +264,15 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
def test_create_source_ip(self):
self._setup_security_group_rule({
'protocol': 'icmp',
- 'port_range_max': -1,
- 'port_range_min': -1,
'remote_ip_prefix': '10.0.2.0/24',
})
arglist = [
- '--proto', self._security_group_rule.protocol,
+ '--protocol', self._security_group_rule.protocol,
'--src-ip', self._security_group_rule.remote_ip_prefix,
self._security_group.id,
]
verifylist = [
- ('proto', self._security_group_rule.protocol),
+ ('protocol', self._security_group_rule.protocol),
('src_ip', self._security_group_rule.remote_ip_prefix),
('group', self._security_group.id),
]
@@ -249,6 +296,7 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
'ethertype': 'IPv6',
'port_range_max': 443,
'port_range_min': 443,
+ 'protocol': '6',
'remote_group_id': None,
'remote_ip_prefix': None,
})
@@ -258,6 +306,7 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
'--ethertype', self._security_group_rule.ethertype,
'--project', identity_fakes.project_name,
'--project-domain', identity_fakes.domain_name,
+ '--protocol', self._security_group_rule.protocol,
self._security_group.id,
]
verifylist = [
@@ -267,6 +316,7 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
('ethertype', self._security_group_rule.ethertype),
('project', identity_fakes.project_name),
('project_domain', identity_fakes.domain_name),
+ ('protocol', self._security_group_rule.protocol),
('group', self._security_group.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -285,6 +335,136 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
self.assertEqual(self.expected_columns, columns)
self.assertEqual(self.expected_data, data)
+ def test_create_tcp_with_icmp_type(self):
+ arglist = [
+ '--protocol', 'tcp',
+ '--icmp-type', '15',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', 'tcp'),
+ ('icmp_type', 15),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_create_icmp_code(self):
+ arglist = [
+ '--protocol', '1',
+ '--icmp-code', '1',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', '1'),
+ ('icmp_code', 1),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_create_icmp_type(self):
+ self._setup_security_group_rule({
+ 'port_range_min': 15,
+ 'protocol': 'icmp',
+ 'remote_ip_prefix': '0.0.0.0/0',
+ })
+ arglist = [
+ '--icmp-type', str(self._security_group_rule.port_range_min),
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', None),
+ ('icmp_type', self._security_group_rule.port_range_min),
+ ('icmp_code', None),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_ipv6_icmp_type_code(self):
+ self._setup_security_group_rule({
+ 'ethertype': 'IPv6',
+ 'port_range_min': 139,
+ 'port_range_max': 2,
+ 'protocol': 'ipv6-icmp',
+ })
+ arglist = [
+ '--icmp-type', str(self._security_group_rule.port_range_min),
+ '--icmp-code', str(self._security_group_rule.port_range_max),
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', None),
+ ('icmp_type', self._security_group_rule.port_range_min),
+ ('icmp_code', self._security_group_rule.port_range_max),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'protocol': self._security_group_rule.protocol,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_icmpv6_type(self):
+ self._setup_security_group_rule({
+ 'ethertype': 'IPv6',
+ 'port_range_min': 139,
+ 'protocol': 'icmpv6',
+ })
+ arglist = [
+ '--icmp-type', str(self._security_group_rule.port_range_min),
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', None),
+ ('icmp_type', self._security_group_rule.port_range_min),
+ ('icmp_code', None),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
@@ -337,10 +517,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
+ def test_create_all_protocol_options(self):
+ arglist = [
+ '--protocol', 'tcp',
+ '--proto', 'tcp',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
def test_create_network_options(self):
arglist = [
'--ingress',
'--ethertype', 'IPv4',
+ '--icmp-type', '3',
+ '--icmp-code', '11',
'--project', identity_fakes.project_name,
'--project-domain', identity_fakes.domain_name,
self._security_group.id,
@@ -416,12 +607,45 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
'ip_range': {'cidr': '10.0.2.0/24'},
})
arglist = [
+ '--protocol', self._security_group_rule.ip_protocol,
+ '--src-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', self._security_group_rule.ip_protocol),
+ ('src_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_create_proto_option(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
'--proto', self._security_group_rule.ip_protocol,
'--src-ip', self._security_group_rule.ip_range['cidr'],
self._security_group.id,
]
verifylist = [
('proto', self._security_group_rule.ip_protocol),
+ ('protocol', None),
('src_ip', self._security_group_rule.ip_range['cidr']),
('group', self._security_group.id),
]
@@ -522,8 +746,6 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
_security_group_rule_icmp = \
network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
'protocol': 'icmp',
- 'port_range_max': -1,
- 'port_range_min': -1,
'remote_ip_prefix': '10.0.2.0/24',
'security_group_id': _security_group.id,
})