summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-11-28 22:21:58 +0000
committerGerrit Code Review <review@openstack.org>2014-11-28 22:21:59 +0000
commit181310428c8781fe5ffedd80d08b6955cb26524f (patch)
treea53e5e914e90c1b599734ec35c7f953ebb73e9c3
parent4f36c5a468e0f7fe26ebb8f1bc942d40d3cb107e (diff)
parent93f2536be5fe6f136e24caa11a5cea2282fb332d (diff)
downloadneutron-181310428c8781fe5ffedd80d08b6955cb26524f.tar.gz
Merge "Fix handling of CIDR in allowed address pairs" into stable/juno
-rw-r--r--neutron/db/securitygroups_rpc_base.py2
-rw-r--r--neutron/tests/unit/test_security_groups_rpc.py100
2 files changed, 67 insertions, 35 deletions
diff --git a/neutron/db/securitygroups_rpc_base.py b/neutron/db/securitygroups_rpc_base.py
index 2f6e2b7fdb..3c00b6d0b9 100644
--- a/neutron/db/securitygroups_rpc_base.py
+++ b/neutron/db/securitygroups_rpc_base.py
@@ -214,7 +214,7 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
context, sg_info['sg_member_ips'].keys())
for sg_id, member_ips in ips.items():
for ip in member_ips:
- ethertype = 'IPv%d' % netaddr.IPAddress(ip).version
+ ethertype = 'IPv%d' % netaddr.IPNetwork(ip).version
if (ethertype in sg_info['sg_member_ips'][sg_id]
and ip not in sg_info['sg_member_ips'][sg_id][ethertype]):
sg_info['sg_member_ips'][sg_id][ethertype].append(ip)
diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py
index 3acc378b1d..62ecd195d8 100644
--- a/neutron/tests/unit/test_security_groups_rpc.py
+++ b/neutron/tests/unit/test_security_groups_rpc.py
@@ -222,21 +222,23 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
expected)
self._delete('ports', port_id1)
- def test_security_group_rules_for_devices_ipv4_ingress_addr_pair(self):
+ @contextlib.contextmanager
+ def _port_with_addr_pairs_and_security_group(self):
plugin_obj = manager.NeutronManager.get_plugin()
if ('allowed-address-pairs'
not in plugin_obj.supported_extension_aliases):
self.skipTest("Test depeneds on allowed-address-pairs extension")
fake_prefix = FAKE_PREFIX['IPv4']
with self.network() as n:
- with contextlib.nested(self.subnet(n),
- self.security_group()) as (subnet_v4,
- sg1):
+ with contextlib.nested(
+ self.subnet(n),
+ self.security_group()
+ ) as (subnet_v4, sg1):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '22',
- '22')
+ '22', remote_group_id=sg1_id)
rule2 = self._build_security_group_rule(
sg1_id,
'ingress', 'tcp', '23',
@@ -248,7 +250,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
address_pairs = [{'mac_address': '00:00:00:00:00:01',
- 'ip_address': '10.0.0.0/24'},
+ 'ip_address': '10.0.1.0/24'},
{'mac_address': '00:00:00:00:00:01',
'ip_address': '11.0.0.1'}]
res1 = self._create_port(
@@ -256,34 +258,64 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
security_groups=[sg1_id],
arg_list=(addr_pair.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
- ports_rest1 = self.deserialize(self.fmt, res1)
- port_id1 = ports_rest1['port']['id']
- self.rpc.devices = {port_id1: ports_rest1['port']}
- devices = [port_id1, 'no_exist_device']
- ctx = context.get_admin_context()
- ports_rpc = self.rpc.security_group_rules_for_devices(
- ctx, devices=devices)
- port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
- 'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
- 'security_group_id': sg1_id},
- {'direction': 'ingress',
- 'protocol': 'tcp', 'ethertype': 'IPv4',
- 'port_range_max': 22,
- 'security_group_id': sg1_id,
- 'port_range_min': 22},
- {'direction': 'ingress', 'protocol': 'tcp',
- 'ethertype': 'IPv4',
- 'port_range_max': 23, 'security_group_id': sg1_id,
- 'port_range_min': 23,
- 'source_ip_prefix': fake_prefix},
- ]
- self.assertEqual(port_rpc['security_group_rules'],
- expected)
- self.assertEqual(port_rpc['allowed_address_pairs'],
- address_pairs)
- self._delete('ports', port_id1)
+ yield self.deserialize(self.fmt, res1)
+
+ def test_security_group_info_for_devices_ipv4_addr_pair(self):
+ with self._port_with_addr_pairs_and_security_group() as port:
+ port_id = port['port']['id']
+ sg_id = port['port']['security_groups'][0]
+ devices = [port_id, 'no_exist_device']
+ ctx = context.get_admin_context()
+ # verify that address pairs are included in remote SG IPs
+ sg_member_ips = self.rpc.security_group_info_for_devices(
+ ctx, devices=devices)['sg_member_ips']
+ expected_member_ips = [
+ '10.0.1.0/24', '11.0.0.1',
+ port['port']['fixed_ips'][0]['ip_address']]
+ self.assertEqual(sorted(expected_member_ips),
+ sorted(sg_member_ips[sg_id]['IPv4']))
+ self._delete('ports', port_id)
+
+ def test_security_group_rules_for_devices_ipv4_ingress_addr_pair(self):
+ fake_prefix = FAKE_PREFIX[const.IPv4]
+ with self._port_with_addr_pairs_and_security_group() as port:
+ port_id = port['port']['id']
+ sg_id = port['port']['security_groups'][0]
+ devices = [port_id, 'no_exist_device']
+ ctx = context.get_admin_context()
+ ports_rpc = self.rpc.security_group_rules_for_devices(
+ ctx, devices=devices)
+
+ port_rpc = ports_rpc[port_id]
+ expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ 'security_group_id': sg_id},
+ {'direction': 'egress', 'ethertype': 'IPv6',
+ 'security_group_id': sg_id},
+ {'direction': 'ingress',
+ 'protocol': 'tcp', 'ethertype': 'IPv4',
+ 'port_range_max': 22,
+ 'remote_group_id': sg_id,
+ 'security_group_id': sg_id,
+ 'source_ip_prefix': '11.0.0.1/32',
+ 'port_range_min': 22},
+ {'direction': 'ingress',
+ 'protocol': 'tcp', 'ethertype': 'IPv4',
+ 'port_range_max': 22,
+ 'remote_group_id': sg_id,
+ 'security_group_id': sg_id,
+ 'source_ip_prefix': '10.0.1.0/24',
+ 'port_range_min': 22},
+ {'direction': 'ingress', 'protocol': 'tcp',
+ 'ethertype': 'IPv4',
+ 'port_range_max': 23, 'security_group_id': sg_id,
+ 'port_range_min': 23,
+ 'source_ip_prefix': fake_prefix},
+ ]
+ self.assertEqual(expected,
+ port_rpc['security_group_rules'])
+ self.assertEqual(port['port']['allowed_address_pairs'],
+ port_rpc['allowed_address_pairs'])
+ self._delete('ports', port_id)
def test_security_group_rules_for_devices_ipv4_egress(self):
fake_prefix = FAKE_PREFIX[const.IPv4]