diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-07-07 15:12:49 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-07-07 15:12:49 +0000 |
commit | b4b09a68844ab06921609de63346179112075f81 (patch) | |
tree | 5043bb33d2c0866a2f13e7def0e645e8eb331eb9 | |
parent | be0c1d17beaa154b09d8fe13104f6f4bdd8f3ab7 (diff) | |
parent | ac971fdf6aefdb5bdb1854c2112ed17df5f9e871 (diff) | |
download | neutron-b4b09a68844ab06921609de63346179112075f81.tar.gz |
Merge "Improve handling of security group updates" into stable/havana
-rw-r--r-- | neutron/agent/securitygroups_rpc.py | 82 | ||||
-rw-r--r-- | neutron/plugins/openvswitch/agent/ovs_neutron_agent.py | 186 | ||||
-rw-r--r-- | neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py | 206 | ||||
-rw-r--r-- | neutron/tests/unit/openvswitch/test_ovs_tunnel.py | 19 | ||||
-rw-r--r-- | neutron/tests/unit/test_security_groups_rpc.py | 215 |
5 files changed, 520 insertions, 188 deletions
diff --git a/neutron/agent/securitygroups_rpc.py b/neutron/agent/securitygroups_rpc.py index a38e4355bf..8ebf3bfe25 100644 --- a/neutron/agent/securitygroups_rpc.py +++ b/neutron/agent/securitygroups_rpc.py @@ -107,10 +107,18 @@ class SecurityGroupAgentRpcMixin(object): support in agent implementations. """ - def init_firewall(self): + def init_firewall(self, defer_refresh_firewall=False): firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver) self.firewall = importutils.import_object(firewall_driver) + # The following flag will be set to true if port filter must not be + # applied as soon as a rule or membership notification is received + self.defer_refresh_firewall = defer_refresh_firewall + # Stores devices for which firewall should be refreshed when + # deferred refresh is enabled. + self.devices_to_refilter = set() + # Flag raised when a global refresh is needed + self.global_refresh_firewall = False def prepare_devices_filter(self, device_ids): if not device_ids: @@ -141,14 +149,25 @@ class SecurityGroupAgentRpcMixin(object): sec_grp_set = set(security_groups) for device in self.firewall.ports.values(): if sec_grp_set & set(device.get(attribute, [])): - devices.append(device) - + devices.append(device['device']) if devices: - self.refresh_firewall(devices) + if self.defer_refresh_firewall: + LOG.debug(_("Adding %s devices to the list of devices " + "for which firewall needs to be refreshed"), + devices) + self.devices_to_refilter |= set(devices) + else: + self.refresh_firewall(devices) def security_groups_provider_updated(self): LOG.info(_("Provider rule updated")) - self.refresh_firewall() + if self.defer_refresh_firewall: + # NOTE(salv-orlando): A 'global refresh' might not be + # necessary if the subnet for which the provider rules + # were updated is known + self.global_refresh_firewall = True + else: + self.refresh_firewall() def remove_devices_filter(self, device_ids): if not device_ids: @@ -161,16 +180,13 @@ class SecurityGroupAgentRpcMixin(object): continue self.firewall.remove_port_filter(device) - def refresh_firewall(self, devices=None): + def refresh_firewall(self, device_ids=None): LOG.info(_("Refresh firewall rules")) - - if devices: - device_ids = [d['device'] for d in devices] - else: - device_ids = self.firewall.ports.keys() if not device_ids: - LOG.info(_("No ports here to refresh firewall")) - return + device_ids = self.firewall.ports.keys() + if not device_ids: + LOG.info(_("No ports here to refresh firewall")) + return devices = self.plugin_rpc.security_group_rules_for_devices( self.context, device_ids) with self.firewall.defer_apply(): @@ -178,6 +194,46 @@ class SecurityGroupAgentRpcMixin(object): LOG.debug(_("Update port filter for %s"), device['device']) self.firewall.update_port_filter(device) + def firewall_refresh_needed(self): + return self.global_refresh_firewall or self.devices_to_refilter + + def setup_port_filters(self, new_devices, updated_devices): + """Configure port filters for devices. + + This routine applies filters for new devices and refreshes firewall + rules when devices have been updated, or when there are changes in + security group membership or rules. + + :param new_devices: set containing identifiers for new devices + :param updated_devices: set containining identifiers for + updated devices + """ + if new_devices: + LOG.debug(_("Preparing device filters for %d new devices"), + len(new_devices)) + self.prepare_devices_filter(new_devices) + # These data structures are cleared here in order to avoid + # losing updates occurring during firewall refresh + devices_to_refilter = self.devices_to_refilter + global_refresh_firewall = self.global_refresh_firewall + self.devices_to_refilter = set() + self.global_refresh_firewall = False + # TODO(salv-orlando): Avoid if possible ever performing the global + # refresh providing a precise list of devices for which firewall + # should be refreshed + if global_refresh_firewall: + LOG.debug(_("Refreshing firewall for all filtered devices")) + self.refresh_firewall() + else: + # If a device is both in new and updated devices + # avoid reprocessing it + updated_devices = ((updated_devices | devices_to_refilter) - + new_devices) + if updated_devices: + LOG.debug(_("Refreshing firewall for %d devices"), + len(updated_devices)) + self.refresh_firewall(updated_devices) + class SecurityGroupAgentRpcApiMixin(object): diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index ca91e05cd7..4eba1e3c0f 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -42,10 +42,8 @@ from neutron.common import legacy from neutron.common import topics from neutron.common import utils as q_utils from neutron import context -from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import common as rpc_common from neutron.openstack.common.rpc import dispatcher from neutron.plugins.openvswitch.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -118,7 +116,7 @@ class OVSSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin): self.context = context self.plugin_rpc = plugin_rpc self.root_helper = root_helper - self.init_firewall() + self.init_firewall(defer_refresh_firewall=True) class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, @@ -228,6 +226,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.sg_agent = OVSSecurityGroupAgent(self.context, self.plugin_rpc, root_helper) + # Stores port update notifications for processing in main rpc loop + self.updated_ports = set() def _check_ovs_version(self): if constants.TYPE_VXLAN in self.tunnel_types: @@ -290,34 +290,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.debug(_("Network %s not used on agent."), network_id) def port_update(self, context, **kwargs): - LOG.debug(_("port_update received")) port = kwargs.get('port') - # Validate that port is on OVS - vif_port = self.int_br.get_vif_port_by_id(port['id']) - if not vif_port: - return - - if ext_sg.SECURITYGROUPS in port: - self.sg_agent.refresh_firewall() - network_type = kwargs.get('network_type') - segmentation_id = kwargs.get('segmentation_id') - physical_network = kwargs.get('physical_network') - self.treat_vif_port(vif_port, port['id'], port['network_id'], - network_type, physical_network, - segmentation_id, port['admin_state_up']) - try: - if port['admin_state_up']: - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, port['id'], - self.agent_id, - cfg.CONF.host) - else: - # update plugin about port status - self.plugin_rpc.update_device_down(self.context, port['id'], - self.agent_id, - cfg.CONF.host) - except rpc_common.Timeout: - LOG.error(_("RPC timeout while updating port %s"), port['id']) + # Put the port identifier in the updated_ports set. + # Even if full port details might be provided to this call, + # they are not used since there is no guarantee the notifications + # are processed in the same order as the relevant API requests + self.updated_ports.add(port['id']) + LOG.debug(_("port_update message processed for port %s"), port['id']) def tunnel_update(self, context, **kwargs): LOG.debug(_("tunnel_update received")) @@ -827,16 +806,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, int_veth.link.set_mtu(self.veth_mtu) phys_veth.link.set_mtu(self.veth_mtu) - def update_ports(self, registered_ports): - ports = self.int_br.get_vif_port_set() - if ports == registered_ports: - return - self.int_br_device_count = len(ports) - added = ports - registered_ports - removed = registered_ports - ports - return {'current': ports, - 'added': added, - 'removed': removed} + def scan_ports(self, registered_ports, updated_ports=None): + cur_ports = self.int_br.get_vif_port_set() + self.int_br_device_count = len(cur_ports) + port_info = {'current': cur_ports} + if updated_ports: + # Some updated ports might have been removed in the + # meanwhile, and therefore should not be processed. + # In this case the updated port won't be found among + # current ports. + updated_ports &= cur_ports + if updated_ports: + port_info['updated'] = updated_ports + + # FIXME(salv-orlando): It's not really necessary to return early + # if nothing has changed. + if cur_ports == registered_ports: + # No added or removed ports to set, just return here + return port_info + + port_info['added'] = cur_ports - registered_ports + # Remove all the known ports not found on the integration bridge + port_info['removed'] = registered_ports - cur_ports + return port_info def update_ancillary_ports(self, registered_ports): ports = set() @@ -908,12 +900,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.tun_br.delete_port(port_name) self.tun_br_ofports[tunnel_type].pop(remote_ip, None) - def treat_devices_added(self, devices): + def treat_devices_added_or_updated(self, devices): resync = False - self.sg_agent.prepare_devices_filter(devices) for device in devices: - LOG.info(_("Port %s added"), device) + LOG.debug(_("Processing port:%s"), device) try: + # TODO(salv-orlando): Provide bulk API for retrieving + # details for all devices in one call details = self.plugin_rpc.get_device_details(self.context, device, self.agent_id) @@ -933,12 +926,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, details['physical_network'], details['segmentation_id'], details['admin_state_up']) - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id, - cfg.CONF.host) + if details.get('admin_state_up'): + LOG.debug(_("Setting status for %s to UP"), device) + self.plugin_rpc.update_device_up( + self.context, device, self.agent_id, cfg.CONF.host) + else: + LOG.debug(_("Setting status for %s to DOWN"), device) + self.plugin_rpc.update_device_down( + self.context, device, self.agent_id, cfg.CONF.host) else: LOG.debug(_("Device %s not defined on plugin"), device) if (port and int(port.ofport) != -1): @@ -1008,8 +1004,28 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def process_network_ports(self, port_info): resync_a = False resync_b = False - if 'added' in port_info: - resync_a = self.treat_devices_added(port_info['added']) + # TODO(salv-orlando): consider a solution for ensuring notifications + # are processed exactly in the same order in which they were + # received. This is tricky because there are two notification + # sources: the neutron server, and the ovs db monitor process + # If there is an exception while processing security groups ports + # will not be wired anyway, and a resync will be triggered + # TODO(salv-orlando): Optimize avoiding applying filters unnecessarily + # (eg: when there are no IP address changes) + self.sg_agent.setup_port_filters(port_info.get('added', set()), + port_info.get('updated', set())) + # VIF wiring needs to be performed always for 'new' devices. + # For updated ports, re-wiring is not needed in most cases, but needs + # to be performed anyway when the admin state of a device is changed. + # TODO(salv-orlando): Optimize for avoiding unnecessary VIF + # processing for updated ports whose admin state is left unchanged + # A device might be both in the 'added' and 'updated' + # list at the same time; avoid processing it twice. + devices_added_updated = (port_info.get('added', set()) | + port_info.get('updated', set())) + if devices_added_updated: + resync_a = self.treat_devices_added_or_updated( + devices_added_updated) if 'removed' in port_info: resync_b = self.treat_devices_removed(port_info['removed']) # If one of the above opertaions fails => resync with plugin @@ -1048,36 +1064,59 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, resync = True return resync + def _agent_has_updates(self, polling_manager): + return (polling_manager.is_polling_required or + self.updated_ports or + self.sg_agent.firewall_refresh_needed()) + + def _port_info_has_changes(self, port_info): + return (port_info.get('added') or + port_info.get('removed') or + port_info.get('updated')) + def rpc_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.AlwaysPoll() sync = True ports = set() + updated_ports_copy = set() ancillary_ports = set() tunnel_sync = True while True: - try: - start = time.time() - if sync: - LOG.info(_("Agent out of sync with plugin!")) - ports.clear() - ancillary_ports.clear() - sync = False - polling_manager.force_polling() - - # Notify the plugin of tunnel IP - if self.enable_tunneling and tunnel_sync: - LOG.info(_("Agent tunnel out of sync with plugin!")) + start = time.time() + if sync: + LOG.info(_("Agent out of sync with plugin!")) + ports.clear() + ancillary_ports.clear() + sync = False + polling_manager.force_polling() + + # Notify the plugin of tunnel IP + if self.enable_tunneling and tunnel_sync: + LOG.info(_("Agent tunnel out of sync with plugin!")) + try: tunnel_sync = self.tunnel_sync() - - if polling_manager.is_polling_required: - port_info = self.update_ports(ports) - - # notify plugin about port deltas - if port_info: - LOG.debug(_("Agent loop has new devices!")) + except Exception: + LOG.exception(_("Error while synchronizing tunnels")) + tunnel_sync = True + if self._agent_has_updates(polling_manager): + try: + # Save updated ports dict to perform rollback in + # case resync would be needed, and then clear + # self.updated_ports. As the greenthread should not yield + # between these two statements, this will be thread-safe + updated_ports_copy = self.updated_ports + self.updated_ports = set() + port_info = self.scan_ports(ports, updated_ports_copy) + + # Secure and wire/unwire VIFs and update their status + # on Neutron server + if (self._port_info_has_changes(port_info) or + self.sg_agent.firewall_refresh_needed()): + LOG.debug(_("Starting to process devices in:%s"), + port_info) # If treat devices fails - must resync with plugin sync = self.process_network_ports(port_info) ports = port_info['current'] @@ -1094,10 +1133,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, polling_manager.polling_completed() - except Exception: - LOG.exception(_("Error in agent event loop")) - sync = True - tunnel_sync = True + except Exception: + LOG.exception(_("Error while processing VIF ports")) + # Put the ports back in self.updated_port + self.updated_ports |= updated_ports_copy + sync = True # sleep till end of polling interval elapsed = (time.time() - start) diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 69879119ab..5cb70cd46b 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -25,7 +25,6 @@ from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.common import constants as n_const -from neutron.openstack.common.rpc import common as rpc_common from neutron.plugins.openvswitch.agent import ovs_neutron_agent from neutron.plugins.openvswitch.common import constants from neutron.tests import base @@ -153,28 +152,69 @@ class TestOvsNeutronAgent(base.BaseTestCase): self.agent.port_dead(mock.Mock()) self.assertTrue(add_flow_func.called) - def mock_update_ports(self, vif_port_set=None, registered_ports=None): + def mock_scan_ports(self, vif_port_set=None, registered_ports=None, + updated_ports=None): with mock.patch.object(self.agent.int_br, 'get_vif_port_set', return_value=vif_port_set): - return self.agent.update_ports(registered_ports) + return self.agent.scan_ports(registered_ports, updated_ports) - def test_update_ports_returns_none_for_unchanged_ports(self): - self.assertIsNone(self.mock_update_ports()) + def test_scan_ports_returns_current_only_for_unchanged_ports(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 3]) + expected = {'current': vif_port_set} + actual = self.mock_scan_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) - def test_update_ports_returns_port_changes(self): + def test_scan_ports_returns_port_changes(self): vif_port_set = set([1, 3]) registered_ports = set([1, 2]) expected = dict(current=vif_port_set, added=set([3]), removed=set([2])) - actual = self.mock_update_ports(vif_port_set, registered_ports) + actual = self.mock_scan_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) + + def _test_scan_ports_with_updated_ports(self, updated_ports): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) + self.assertEqual(expected, actual) + + def test_scan_ports_finds_known_updated_ports(self): + self._test_scan_ports_with_updated_ports(set([4])) + + def test_scan_ports_ignores_unknown_updated_ports(self): + # the port '5' was not seen on current ports. Hence it has either + # never been wired or already removed and should be ignored + self._test_scan_ports_with_updated_ports(set([4, 5])) + + def test_scan_ports_ignores_updated_port_if_removed(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 2]) + updated_ports = set([1, 2]) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([1])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) + self.assertEqual(expected, actual) + + def test_scan_ports_no_vif_changes_returns_updated_port_only(self): + vif_port_set = set([1, 2, 3]) + registered_ports = set([1, 2, 3]) + updated_ports = set([2]) + expected = dict(current=vif_port_set, updated=set([2])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) self.assertEqual(expected, actual) def test_treat_devices_added_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'get_device_details', side_effect=Exception()): - self.assertTrue(self.agent.treat_devices_added([{}])) + self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) - def _mock_treat_devices_added(self, details, port, func_name): - """Mock treat devices added. + def _mock_treat_devices_added_updated(self, details, port, func_name): + """Mock treat devices added or updated. :param details: the details to return for the device :param port: the port that get_vif_port_by_id should return @@ -187,29 +227,51 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=port), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) - ) as (get_dev_fn, get_vif_func, upd_dev_up, func): - self.assertFalse(self.agent.treat_devices_added([{}])) + ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) return func.called - def test_treat_devices_added_ignores_invalid_ofport(self): + def test_treat_devices_added_updated_ignores_invalid_ofport(self): port = mock.Mock() port.ofport = -1 - self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port, - 'port_dead')) + self.assertFalse(self._mock_treat_devices_added_updated( + mock.MagicMock(), port, 'port_dead')) - def test_treat_devices_added_marks_unknown_port_as_dead(self): + def test_treat_devices_added_updated_marks_unknown_port_as_dead(self): port = mock.Mock() port.ofport = 1 - self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port, - 'port_dead')) + self.assertTrue(self._mock_treat_devices_added_updated( + mock.MagicMock(), port, 'port_dead')) - def test_treat_devices_added_updates_known_port(self): + def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True - self.assertTrue(self._mock_treat_devices_added(details, - mock.Mock(), - 'treat_vif_port')) + self.assertTrue(self._mock_treat_devices_added_updated( + details, mock.Mock(), 'treat_vif_port')) + + def test_treat_devices_added_updated_put_port_down(self): + fake_details_dict = {'admin_state_up': False, + 'port_id': 'xxx', + 'device': 'xxx', + 'network_id': 'yyy', + 'physical_network': 'foo', + 'segmentation_id': 'bar', + 'network_type': 'baz'} + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, 'get_device_details', + return_value=fake_details_dict), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=mock.MagicMock()), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'treat_vif_port') + ) as (get_dev_fn, get_vif_func, upd_dev_up, + upd_dev_down, treat_vif_port): + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertTrue(treat_vif_port.called) + self.assertTrue(upd_dev_down.called) def test_treat_devices_removed_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', @@ -230,17 +292,33 @@ class TestOvsNeutronAgent(base.BaseTestCase): def test_treat_devices_removed_ignores_missing_port(self): self._mock_treat_devices_removed(False) + def _test_process_network_ports(self, port_info): + with contextlib.nested( + mock.patch.object(self.agent.sg_agent, "setup_port_filters"), + mock.patch.object(self.agent, "treat_devices_added_or_updated", + return_value=False), + mock.patch.object(self.agent, "treat_devices_removed", + return_value=False) + ) as (setup_port_filters, device_added_updated, device_removed): + self.assertFalse(self.agent.process_network_ports(port_info)) + setup_port_filters.assert_called_once_with( + port_info['added'], port_info.get('updated', set())) + device_added_updated.assert_called_once_with( + port_info['added'] | port_info.get('updated', set())) + device_removed.assert_called_once_with(port_info['removed']) + def test_process_network_ports(self): - reply = {'current': set(['tap0']), - 'removed': set(['eth0']), - 'added': set(['eth1'])} - with mock.patch.object(self.agent, 'treat_devices_added', - return_value=False) as device_added: - with mock.patch.object(self.agent, 'treat_devices_removed', - return_value=False) as device_removed: - self.assertFalse(self.agent.process_network_ports(reply)) - self.assertTrue(device_added.called) - self.assertTrue(device_removed.called) + self._test_process_network_ports( + {'current': set(['tap0']), + 'removed': set(['eth0']), + 'added': set(['eth1'])}) + + def test_process_network_port_with_updated_ports(self): + self._test_process_network_ports( + {'current': set(['tap0', 'tap1']), + 'updated': set(['tap1', 'eth1']), + 'removed': set(['eth0']), + 'added': set(['eth1'])}) def test_report_state(self): with mock.patch.object(self.agent.state_rpc, @@ -270,61 +348,15 @@ class TestOvsNeutronAgent(base.BaseTestCase): recl_fn.assert_called_with("123") def test_port_update(self): - with contextlib.nested( - mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), - mock.patch.object(self.agent, "treat_vif_port"), - mock.patch.object(self.agent.plugin_rpc, "update_device_up"), - mock.patch.object(self.agent.plugin_rpc, "update_device_down") - ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn): - port = {"id": "123", - "network_id": "124", - "admin_state_up": False} - getvif_fn.return_value = "vif_port_obj" - self.agent.port_update("unused_context", - port=port, - network_type="vlan", - segmentation_id="1", - physical_network="physnet") - treatvif_fn.assert_called_with("vif_port_obj", "123", - "124", "vlan", "physnet", - "1", False) - upddown_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id, - cfg.CONF.host) - - port["admin_state_up"] = True - self.agent.port_update("unused_context", - port=port, - network_type="vlan", - segmentation_id="1", - physical_network="physnet") - updup_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id, - cfg.CONF.host) - - def test_port_update_plugin_rpc_failed(self): - port = {'id': 1, - 'network_id': 1, - 'admin_state_up': True} - with contextlib.nested( - mock.patch.object(ovs_neutron_agent.LOG, 'error'), - mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), - mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), - mock.patch.object(self.agent, 'port_bound'), - mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), - mock.patch.object(self.agent, 'port_dead') - ) as (log, _, device_up, _, device_down, _): - device_up.side_effect = rpc_common.Timeout - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(device_up.called) - self.assertEqual(log.call_count, 1) - - log.reset_mock() - port['admin_state_up'] = False - device_down.side_effect = rpc_common.Timeout - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(device_down.called) - self.assertEqual(log.call_count, 1) + port = {"id": "123", + "network_id": "124", + "admin_state_up": False} + self.agent.port_update("unused_context", + port=port, + network_type="vlan", + segmentation_id="1", + physical_network="physnet") + self.assertEqual(set(['123']), self.agent.updated_ports) def test_setup_physical_bridges(self): with contextlib.nested( diff --git a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py index d5bd080929..22e346212e 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py +++ b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py @@ -426,12 +426,12 @@ class TunnelTest(base.BaseTestCase): def testDaemonLoop(self): reply2 = {'current': set(['tap0']), - 'added': set([]), + 'added': set(['tap2']), 'removed': set([])} reply3 = {'current': set(['tap2']), 'added': set([]), - 'removed': set([])} + 'removed': set(['tap0'])} self.mox.StubOutWithMock(log.ContextAdapter, 'exception') log.ContextAdapter.exception( @@ -439,19 +439,20 @@ class TunnelTest(base.BaseTestCase): Exception('Fake exception to get out of the loop')) self.mox.StubOutWithMock( - ovs_neutron_agent.OVSNeutronAgent, 'update_ports') - ovs_neutron_agent.OVSNeutronAgent.update_ports(set()).AndReturn(reply2) - ovs_neutron_agent.OVSNeutronAgent.update_ports( - set(['tap0'])).AndReturn(reply3) + ovs_neutron_agent.OVSNeutronAgent, 'scan_ports') + ovs_neutron_agent.OVSNeutronAgent.scan_ports( + set(), set()).AndReturn(reply2) + ovs_neutron_agent.OVSNeutronAgent.scan_ports( + set(['tap0']), set()).AndReturn(reply3) self.mox.StubOutWithMock( ovs_neutron_agent.OVSNeutronAgent, 'process_network_ports') ovs_neutron_agent.OVSNeutronAgent.process_network_ports( {'current': set(['tap0']), 'removed': set([]), - 'added': set([])}).AndReturn(False) + 'added': set(['tap2'])}).AndReturn(False) ovs_neutron_agent.OVSNeutronAgent.process_network_ports( - {'current': set(['tap0']), - 'removed': set([]), + {'current': set(['tap2']), + 'removed': set(['tap0']), 'added': set([])}).AndRaise( Exception('Fake exception to get out of the loop')) self.mox.ReplayAll() diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py index ca98a55be9..3c85676a2e 100644 --- a/neutron/tests/unit/test_security_groups_rpc.py +++ b/neutron/tests/unit/test_security_groups_rpc.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +from contextlib import contextmanager from contextlib import nested import mock @@ -501,14 +502,14 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase): class SecurityGroupAgentRpcTestCase(base.BaseTestCase): - def setUp(self): + def setUp(self, defer_refresh_firewall=False): super(SecurityGroupAgentRpcTestCase, self).setUp() self.agent = sg_rpc.SecurityGroupAgentRpcMixin() self.agent.context = None self.addCleanup(mock.patch.stopall) mock.patch('neutron.agent.linux.iptables_manager').start() self.agent.root_helper = 'sudo' - self.agent.init_firewall() + self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall) self.firewall = mock.Mock() firewall_object = firewall_base.FirewallDriver() self.firewall.defer_apply.side_effect = firewall_object.defer_apply @@ -543,7 +544,7 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.agent.prepare_devices_filter(['fake_port_id']) self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3']) self.agent.refresh_firewall.assert_has_calls( - [call.refresh_firewall([self.fake_device])]) + [call.refresh_firewall([self.fake_device['device']])]) def test_security_groups_rule_not_updated(self): self.agent.refresh_firewall = mock.Mock() @@ -556,7 +557,7 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.agent.prepare_devices_filter(['fake_port_id']) self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3']) self.agent.refresh_firewall.assert_has_calls( - [call.refresh_firewall([self.fake_device])]) + [call.refresh_firewall([self.fake_device['device']])]) def test_security_groups_member_not_updated(self): self.agent.refresh_firewall = mock.Mock() @@ -593,6 +594,208 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.firewall.assert_has_calls([]) +class SecurityGroupAgentRpcWithDeferredRefreshTestCase( + SecurityGroupAgentRpcTestCase): + + def setUp(self): + super(SecurityGroupAgentRpcWithDeferredRefreshTestCase, self).setUp( + defer_refresh_firewall=True) + + @contextmanager + def add_fake_device(self, device, sec_groups, source_sec_groups=None): + fake_device = {'device': device, + 'security_groups': sec_groups, + 'security_group_source_groups': source_sec_groups or [], + 'security_group_rules': [{'security_group_id': + 'fake_sgid1', + 'remote_group_id': + 'fake_sgid2'}]} + self.firewall.ports[device] = fake_device + yield + del self.firewall.ports[device] + + def test_security_groups_rule_updated(self): + self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + + def test_multiple_security_groups_rule_updated_same_port(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgidX']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_rule_updated(['fake_sgid1']) + self.agent.security_groups_rule_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertNotIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_rule_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid2']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_rule_updated(['fake_sgid1', + 'fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_multiple_security_groups_rule_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid2']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_rule_updated(['fake_sgid1']) + self.agent.security_groups_rule_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_member_updated(self): + self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + + def test_multiple_security_groups_member_updated_same_port(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid1', 'fake_sgid1B'], + source_sec_groups=['fake_sgidX']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_member_updated(['fake_sgid1', + 'fake_sgid3']) + self.agent.security_groups_member_updated(['fake_sgid2', + 'fake_sgid3']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertNotIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_member_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid1', 'fake_sgid1B'], + source_sec_groups=['fake_sgid2']): + self.agent.security_groups_member_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_multiple_security_groups_member_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid1', 'fake_sgid1B'], + source_sec_groups=['fake_sgid1B']): + self.agent.security_groups_member_updated(['fake_sgid1B']) + self.agent.security_groups_member_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_provider_updated(self): + self.agent.security_groups_provider_updated() + self.assertTrue(self.agent.global_refresh_firewall) + + def test_setup_port_filters_new_ports_only(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(['fake_new_device']), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.assertFalse(self.agent.refresh_firewall.called) + + def test_setup_port_filters_updated_ports_only(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(), set(['fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_updated_device'])) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filter_new_and_updated_ports(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(['fake_new_device']), + set(['fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_updated_device'])) + + def test_setup_port_filters_sg_updates_only(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device'])) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filters_sg_updates_and_new_ports(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(['fake_new_device']), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device'])) + + def test_setup_port_filters_sg_updates_and_updated_ports(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters( + set(), set(['fake_device', 'fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device', 'fake_device_2', 'fake_updated_device'])) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filters_all_updates(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters( + set(['fake_new_device']), + set(['fake_device', 'fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device', 'fake_device_2', 'fake_updated_device'])) + + def test_setup_port_filters_no_update(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.assertFalse(self.agent.refresh_firewall.called) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filters_with_global_refresh(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = True + self.agent.setup_port_filters(set(), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with() + self.assertFalse(self.agent.prepare_devices_filter.called) + + class FakeSGRpcApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin): pass @@ -1210,7 +1413,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): PHYSDEV_INGRESS = 'physdev-out' PHYSDEV_EGRESS = 'physdev-in' - def setUp(self): + def setUp(self, defer_refresh_firewall=False): super(TestSecurityGroupAgentWithIptables, self).setUp() self.mox = mox.Mox() cfg.CONF.set_override( @@ -1225,7 +1428,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): self.root_helper = 'sudo' self.agent.root_helper = 'sudo' - self.agent.init_firewall() + self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall) self.iptables = self.agent.firewall.iptables self.mox.StubOutWithMock(self.iptables, "execute") |