summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-07-07 15:12:49 +0000
committerGerrit Code Review <review@openstack.org>2014-07-07 15:12:49 +0000
commitb4b09a68844ab06921609de63346179112075f81 (patch)
tree5043bb33d2c0866a2f13e7def0e645e8eb331eb9
parentbe0c1d17beaa154b09d8fe13104f6f4bdd8f3ab7 (diff)
parentac971fdf6aefdb5bdb1854c2112ed17df5f9e871 (diff)
downloadneutron-b4b09a68844ab06921609de63346179112075f81.tar.gz
Merge "Improve handling of security group updates" into stable/havana
-rw-r--r--neutron/agent/securitygroups_rpc.py82
-rw-r--r--neutron/plugins/openvswitch/agent/ovs_neutron_agent.py186
-rw-r--r--neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py206
-rw-r--r--neutron/tests/unit/openvswitch/test_ovs_tunnel.py19
-rw-r--r--neutron/tests/unit/test_security_groups_rpc.py215
5 files changed, 520 insertions, 188 deletions
diff --git a/neutron/agent/securitygroups_rpc.py b/neutron/agent/securitygroups_rpc.py
index a38e4355bf..8ebf3bfe25 100644
--- a/neutron/agent/securitygroups_rpc.py
+++ b/neutron/agent/securitygroups_rpc.py
@@ -107,10 +107,18 @@ class SecurityGroupAgentRpcMixin(object):
support in agent implementations.
"""
- def init_firewall(self):
+ def init_firewall(self, defer_refresh_firewall=False):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
self.firewall = importutils.import_object(firewall_driver)
+ # The following flag will be set to true if port filter must not be
+ # applied as soon as a rule or membership notification is received
+ self.defer_refresh_firewall = defer_refresh_firewall
+ # Stores devices for which firewall should be refreshed when
+ # deferred refresh is enabled.
+ self.devices_to_refilter = set()
+ # Flag raised when a global refresh is needed
+ self.global_refresh_firewall = False
def prepare_devices_filter(self, device_ids):
if not device_ids:
@@ -141,14 +149,25 @@ class SecurityGroupAgentRpcMixin(object):
sec_grp_set = set(security_groups)
for device in self.firewall.ports.values():
if sec_grp_set & set(device.get(attribute, [])):
- devices.append(device)
-
+ devices.append(device['device'])
if devices:
- self.refresh_firewall(devices)
+ if self.defer_refresh_firewall:
+ LOG.debug(_("Adding %s devices to the list of devices "
+ "for which firewall needs to be refreshed"),
+ devices)
+ self.devices_to_refilter |= set(devices)
+ else:
+ self.refresh_firewall(devices)
def security_groups_provider_updated(self):
LOG.info(_("Provider rule updated"))
- self.refresh_firewall()
+ if self.defer_refresh_firewall:
+ # NOTE(salv-orlando): A 'global refresh' might not be
+ # necessary if the subnet for which the provider rules
+ # were updated is known
+ self.global_refresh_firewall = True
+ else:
+ self.refresh_firewall()
def remove_devices_filter(self, device_ids):
if not device_ids:
@@ -161,16 +180,13 @@ class SecurityGroupAgentRpcMixin(object):
continue
self.firewall.remove_port_filter(device)
- def refresh_firewall(self, devices=None):
+ def refresh_firewall(self, device_ids=None):
LOG.info(_("Refresh firewall rules"))
-
- if devices:
- device_ids = [d['device'] for d in devices]
- else:
- device_ids = self.firewall.ports.keys()
if not device_ids:
- LOG.info(_("No ports here to refresh firewall"))
- return
+ device_ids = self.firewall.ports.keys()
+ if not device_ids:
+ LOG.info(_("No ports here to refresh firewall"))
+ return
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, device_ids)
with self.firewall.defer_apply():
@@ -178,6 +194,46 @@ class SecurityGroupAgentRpcMixin(object):
LOG.debug(_("Update port filter for %s"), device['device'])
self.firewall.update_port_filter(device)
+ def firewall_refresh_needed(self):
+ return self.global_refresh_firewall or self.devices_to_refilter
+
+ def setup_port_filters(self, new_devices, updated_devices):
+ """Configure port filters for devices.
+
+ This routine applies filters for new devices and refreshes firewall
+ rules when devices have been updated, or when there are changes in
+ security group membership or rules.
+
+ :param new_devices: set containing identifiers for new devices
+ :param updated_devices: set containining identifiers for
+ updated devices
+ """
+ if new_devices:
+ LOG.debug(_("Preparing device filters for %d new devices"),
+ len(new_devices))
+ self.prepare_devices_filter(new_devices)
+ # These data structures are cleared here in order to avoid
+ # losing updates occurring during firewall refresh
+ devices_to_refilter = self.devices_to_refilter
+ global_refresh_firewall = self.global_refresh_firewall
+ self.devices_to_refilter = set()
+ self.global_refresh_firewall = False
+ # TODO(salv-orlando): Avoid if possible ever performing the global
+ # refresh providing a precise list of devices for which firewall
+ # should be refreshed
+ if global_refresh_firewall:
+ LOG.debug(_("Refreshing firewall for all filtered devices"))
+ self.refresh_firewall()
+ else:
+ # If a device is both in new and updated devices
+ # avoid reprocessing it
+ updated_devices = ((updated_devices | devices_to_refilter) -
+ new_devices)
+ if updated_devices:
+ LOG.debug(_("Refreshing firewall for %d devices"),
+ len(updated_devices))
+ self.refresh_firewall(updated_devices)
+
class SecurityGroupAgentRpcApiMixin(object):
diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
index ca91e05cd7..4eba1e3c0f 100644
--- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
+++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
@@ -42,10 +42,8 @@ from neutron.common import legacy
from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
-from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.openvswitch.common import config # noqa
from neutron.plugins.openvswitch.common import constants
@@ -118,7 +116,7 @@ class OVSSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
self.context = context
self.plugin_rpc = plugin_rpc
self.root_helper = root_helper
- self.init_firewall()
+ self.init_firewall(defer_refresh_firewall=True)
class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
@@ -228,6 +226,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.sg_agent = OVSSecurityGroupAgent(self.context,
self.plugin_rpc,
root_helper)
+ # Stores port update notifications for processing in main rpc loop
+ self.updated_ports = set()
def _check_ovs_version(self):
if constants.TYPE_VXLAN in self.tunnel_types:
@@ -290,34 +290,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.debug(_("Network %s not used on agent."), network_id)
def port_update(self, context, **kwargs):
- LOG.debug(_("port_update received"))
port = kwargs.get('port')
- # Validate that port is on OVS
- vif_port = self.int_br.get_vif_port_by_id(port['id'])
- if not vif_port:
- return
-
- if ext_sg.SECURITYGROUPS in port:
- self.sg_agent.refresh_firewall()
- network_type = kwargs.get('network_type')
- segmentation_id = kwargs.get('segmentation_id')
- physical_network = kwargs.get('physical_network')
- self.treat_vif_port(vif_port, port['id'], port['network_id'],
- network_type, physical_network,
- segmentation_id, port['admin_state_up'])
- try:
- if port['admin_state_up']:
- # update plugin about port status
- self.plugin_rpc.update_device_up(self.context, port['id'],
- self.agent_id,
- cfg.CONF.host)
- else:
- # update plugin about port status
- self.plugin_rpc.update_device_down(self.context, port['id'],
- self.agent_id,
- cfg.CONF.host)
- except rpc_common.Timeout:
- LOG.error(_("RPC timeout while updating port %s"), port['id'])
+ # Put the port identifier in the updated_ports set.
+ # Even if full port details might be provided to this call,
+ # they are not used since there is no guarantee the notifications
+ # are processed in the same order as the relevant API requests
+ self.updated_ports.add(port['id'])
+ LOG.debug(_("port_update message processed for port %s"), port['id'])
def tunnel_update(self, context, **kwargs):
LOG.debug(_("tunnel_update received"))
@@ -827,16 +806,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
int_veth.link.set_mtu(self.veth_mtu)
phys_veth.link.set_mtu(self.veth_mtu)
- def update_ports(self, registered_ports):
- ports = self.int_br.get_vif_port_set()
- if ports == registered_ports:
- return
- self.int_br_device_count = len(ports)
- added = ports - registered_ports
- removed = registered_ports - ports
- return {'current': ports,
- 'added': added,
- 'removed': removed}
+ def scan_ports(self, registered_ports, updated_ports=None):
+ cur_ports = self.int_br.get_vif_port_set()
+ self.int_br_device_count = len(cur_ports)
+ port_info = {'current': cur_ports}
+ if updated_ports:
+ # Some updated ports might have been removed in the
+ # meanwhile, and therefore should not be processed.
+ # In this case the updated port won't be found among
+ # current ports.
+ updated_ports &= cur_ports
+ if updated_ports:
+ port_info['updated'] = updated_ports
+
+ # FIXME(salv-orlando): It's not really necessary to return early
+ # if nothing has changed.
+ if cur_ports == registered_ports:
+ # No added or removed ports to set, just return here
+ return port_info
+
+ port_info['added'] = cur_ports - registered_ports
+ # Remove all the known ports not found on the integration bridge
+ port_info['removed'] = registered_ports - cur_ports
+ return port_info
def update_ancillary_ports(self, registered_ports):
ports = set()
@@ -908,12 +900,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.tun_br.delete_port(port_name)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
- def treat_devices_added(self, devices):
+ def treat_devices_added_or_updated(self, devices):
resync = False
- self.sg_agent.prepare_devices_filter(devices)
for device in devices:
- LOG.info(_("Port %s added"), device)
+ LOG.debug(_("Processing port:%s"), device)
try:
+ # TODO(salv-orlando): Provide bulk API for retrieving
+ # details for all devices in one call
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
@@ -933,12 +926,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
details['physical_network'],
details['segmentation_id'],
details['admin_state_up'])
-
# update plugin about port status
- self.plugin_rpc.update_device_up(self.context,
- device,
- self.agent_id,
- cfg.CONF.host)
+ if details.get('admin_state_up'):
+ LOG.debug(_("Setting status for %s to UP"), device)
+ self.plugin_rpc.update_device_up(
+ self.context, device, self.agent_id, cfg.CONF.host)
+ else:
+ LOG.debug(_("Setting status for %s to DOWN"), device)
+ self.plugin_rpc.update_device_down(
+ self.context, device, self.agent_id, cfg.CONF.host)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
@@ -1008,8 +1004,28 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
- if 'added' in port_info:
- resync_a = self.treat_devices_added(port_info['added'])
+ # TODO(salv-orlando): consider a solution for ensuring notifications
+ # are processed exactly in the same order in which they were
+ # received. This is tricky because there are two notification
+ # sources: the neutron server, and the ovs db monitor process
+ # If there is an exception while processing security groups ports
+ # will not be wired anyway, and a resync will be triggered
+ # TODO(salv-orlando): Optimize avoiding applying filters unnecessarily
+ # (eg: when there are no IP address changes)
+ self.sg_agent.setup_port_filters(port_info.get('added', set()),
+ port_info.get('updated', set()))
+ # VIF wiring needs to be performed always for 'new' devices.
+ # For updated ports, re-wiring is not needed in most cases, but needs
+ # to be performed anyway when the admin state of a device is changed.
+ # TODO(salv-orlando): Optimize for avoiding unnecessary VIF
+ # processing for updated ports whose admin state is left unchanged
+ # A device might be both in the 'added' and 'updated'
+ # list at the same time; avoid processing it twice.
+ devices_added_updated = (port_info.get('added', set()) |
+ port_info.get('updated', set()))
+ if devices_added_updated:
+ resync_a = self.treat_devices_added_or_updated(
+ devices_added_updated)
if 'removed' in port_info:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
@@ -1048,36 +1064,59 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
resync = True
return resync
+ def _agent_has_updates(self, polling_manager):
+ return (polling_manager.is_polling_required or
+ self.updated_ports or
+ self.sg_agent.firewall_refresh_needed())
+
+ def _port_info_has_changes(self, port_info):
+ return (port_info.get('added') or
+ port_info.get('removed') or
+ port_info.get('updated'))
+
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
sync = True
ports = set()
+ updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
while True:
- try:
- start = time.time()
- if sync:
- LOG.info(_("Agent out of sync with plugin!"))
- ports.clear()
- ancillary_ports.clear()
- sync = False
- polling_manager.force_polling()
-
- # Notify the plugin of tunnel IP
- if self.enable_tunneling and tunnel_sync:
- LOG.info(_("Agent tunnel out of sync with plugin!"))
+ start = time.time()
+ if sync:
+ LOG.info(_("Agent out of sync with plugin!"))
+ ports.clear()
+ ancillary_ports.clear()
+ sync = False
+ polling_manager.force_polling()
+
+ # Notify the plugin of tunnel IP
+ if self.enable_tunneling and tunnel_sync:
+ LOG.info(_("Agent tunnel out of sync with plugin!"))
+ try:
tunnel_sync = self.tunnel_sync()
-
- if polling_manager.is_polling_required:
- port_info = self.update_ports(ports)
-
- # notify plugin about port deltas
- if port_info:
- LOG.debug(_("Agent loop has new devices!"))
+ except Exception:
+ LOG.exception(_("Error while synchronizing tunnels"))
+ tunnel_sync = True
+ if self._agent_has_updates(polling_manager):
+ try:
+ # Save updated ports dict to perform rollback in
+ # case resync would be needed, and then clear
+ # self.updated_ports. As the greenthread should not yield
+ # between these two statements, this will be thread-safe
+ updated_ports_copy = self.updated_ports
+ self.updated_ports = set()
+ port_info = self.scan_ports(ports, updated_ports_copy)
+
+ # Secure and wire/unwire VIFs and update their status
+ # on Neutron server
+ if (self._port_info_has_changes(port_info) or
+ self.sg_agent.firewall_refresh_needed()):
+ LOG.debug(_("Starting to process devices in:%s"),
+ port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
ports = port_info['current']
@@ -1094,10 +1133,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
polling_manager.polling_completed()
- except Exception:
- LOG.exception(_("Error in agent event loop"))
- sync = True
- tunnel_sync = True
+ except Exception:
+ LOG.exception(_("Error while processing VIF ports"))
+ # Put the ports back in self.updated_port
+ self.updated_ports |= updated_ports_copy
+ sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
index 69879119ab..5cb70cd46b 100644
--- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
+++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
@@ -25,7 +25,6 @@ from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
from neutron.plugins.openvswitch.common import constants
from neutron.tests import base
@@ -153,28 +152,69 @@ class TestOvsNeutronAgent(base.BaseTestCase):
self.agent.port_dead(mock.Mock())
self.assertTrue(add_flow_func.called)
- def mock_update_ports(self, vif_port_set=None, registered_ports=None):
+ def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
+ updated_ports=None):
with mock.patch.object(self.agent.int_br, 'get_vif_port_set',
return_value=vif_port_set):
- return self.agent.update_ports(registered_ports)
+ return self.agent.scan_ports(registered_ports, updated_ports)
- def test_update_ports_returns_none_for_unchanged_ports(self):
- self.assertIsNone(self.mock_update_ports())
+ def test_scan_ports_returns_current_only_for_unchanged_ports(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 3])
+ expected = {'current': vif_port_set}
+ actual = self.mock_scan_ports(vif_port_set, registered_ports)
+ self.assertEqual(expected, actual)
- def test_update_ports_returns_port_changes(self):
+ def test_scan_ports_returns_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
- actual = self.mock_update_ports(vif_port_set, registered_ports)
+ actual = self.mock_scan_ports(vif_port_set, registered_ports)
+ self.assertEqual(expected, actual)
+
+ def _test_scan_ports_with_updated_ports(self, updated_ports):
+ vif_port_set = set([1, 3, 4])
+ registered_ports = set([1, 2, 4])
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set([4]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ updated_ports)
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_finds_known_updated_ports(self):
+ self._test_scan_ports_with_updated_ports(set([4]))
+
+ def test_scan_ports_ignores_unknown_updated_ports(self):
+ # the port '5' was not seen on current ports. Hence it has either
+ # never been wired or already removed and should be ignored
+ self._test_scan_ports_with_updated_ports(set([4, 5]))
+
+ def test_scan_ports_ignores_updated_port_if_removed(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 2])
+ updated_ports = set([1, 2])
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set([1]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ updated_ports)
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_no_vif_changes_returns_updated_port_only(self):
+ vif_port_set = set([1, 2, 3])
+ registered_ports = set([1, 2, 3])
+ updated_ports = set([2])
+ expected = dict(current=vif_port_set, updated=set([2]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ updated_ports)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()):
- self.assertTrue(self.agent.treat_devices_added([{}]))
+ self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
- def _mock_treat_devices_added(self, details, port, func_name):
- """Mock treat devices added.
+ def _mock_treat_devices_added_updated(self, details, port, func_name):
+ """Mock treat devices added or updated.
:param details: the details to return for the device
:param port: the port that get_vif_port_by_id should return
@@ -187,29 +227,51 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=port),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
- ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
- self.assertFalse(self.agent.treat_devices_added([{}]))
+ ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
return func.called
- def test_treat_devices_added_ignores_invalid_ofport(self):
+ def test_treat_devices_added_updated_ignores_invalid_ofport(self):
port = mock.Mock()
port.ofport = -1
- self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port,
- 'port_dead'))
+ self.assertFalse(self._mock_treat_devices_added_updated(
+ mock.MagicMock(), port, 'port_dead'))
- def test_treat_devices_added_marks_unknown_port_as_dead(self):
+ def test_treat_devices_added_updated_marks_unknown_port_as_dead(self):
port = mock.Mock()
port.ofport = 1
- self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port,
- 'port_dead'))
+ self.assertTrue(self._mock_treat_devices_added_updated(
+ mock.MagicMock(), port, 'port_dead'))
- def test_treat_devices_added_updates_known_port(self):
+ def test_treat_devices_added_updated_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
- self.assertTrue(self._mock_treat_devices_added(details,
- mock.Mock(),
- 'treat_vif_port'))
+ self.assertTrue(self._mock_treat_devices_added_updated(
+ details, mock.Mock(), 'treat_vif_port'))
+
+ def test_treat_devices_added_updated_put_port_down(self):
+ fake_details_dict = {'admin_state_up': False,
+ 'port_id': 'xxx',
+ 'device': 'xxx',
+ 'network_id': 'yyy',
+ 'physical_network': 'foo',
+ 'segmentation_id': 'bar',
+ 'network_type': 'baz'}
+ with contextlib.nested(
+ mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+ return_value=fake_details_dict),
+ mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
+ return_value=mock.MagicMock()),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
+ mock.patch.object(self.agent, 'treat_vif_port')
+ ) as (get_dev_fn, get_vif_func, upd_dev_up,
+ upd_dev_down, treat_vif_port):
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertTrue(treat_vif_port.called)
+ self.assertTrue(upd_dev_down.called)
def test_treat_devices_removed_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
@@ -230,17 +292,33 @@ class TestOvsNeutronAgent(base.BaseTestCase):
def test_treat_devices_removed_ignores_missing_port(self):
self._mock_treat_devices_removed(False)
+ def _test_process_network_ports(self, port_info):
+ with contextlib.nested(
+ mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
+ mock.patch.object(self.agent, "treat_devices_added_or_updated",
+ return_value=False),
+ mock.patch.object(self.agent, "treat_devices_removed",
+ return_value=False)
+ ) as (setup_port_filters, device_added_updated, device_removed):
+ self.assertFalse(self.agent.process_network_ports(port_info))
+ setup_port_filters.assert_called_once_with(
+ port_info['added'], port_info.get('updated', set()))
+ device_added_updated.assert_called_once_with(
+ port_info['added'] | port_info.get('updated', set()))
+ device_removed.assert_called_once_with(port_info['removed'])
+
def test_process_network_ports(self):
- reply = {'current': set(['tap0']),
- 'removed': set(['eth0']),
- 'added': set(['eth1'])}
- with mock.patch.object(self.agent, 'treat_devices_added',
- return_value=False) as device_added:
- with mock.patch.object(self.agent, 'treat_devices_removed',
- return_value=False) as device_removed:
- self.assertFalse(self.agent.process_network_ports(reply))
- self.assertTrue(device_added.called)
- self.assertTrue(device_removed.called)
+ self._test_process_network_ports(
+ {'current': set(['tap0']),
+ 'removed': set(['eth0']),
+ 'added': set(['eth1'])})
+
+ def test_process_network_port_with_updated_ports(self):
+ self._test_process_network_ports(
+ {'current': set(['tap0', 'tap1']),
+ 'updated': set(['tap1', 'eth1']),
+ 'removed': set(['eth0']),
+ 'added': set(['eth1'])})
def test_report_state(self):
with mock.patch.object(self.agent.state_rpc,
@@ -270,61 +348,15 @@ class TestOvsNeutronAgent(base.BaseTestCase):
recl_fn.assert_called_with("123")
def test_port_update(self):
- with contextlib.nested(
- mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
- mock.patch.object(self.agent, "treat_vif_port"),
- mock.patch.object(self.agent.plugin_rpc, "update_device_up"),
- mock.patch.object(self.agent.plugin_rpc, "update_device_down")
- ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn):
- port = {"id": "123",
- "network_id": "124",
- "admin_state_up": False}
- getvif_fn.return_value = "vif_port_obj"
- self.agent.port_update("unused_context",
- port=port,
- network_type="vlan",
- segmentation_id="1",
- physical_network="physnet")
- treatvif_fn.assert_called_with("vif_port_obj", "123",
- "124", "vlan", "physnet",
- "1", False)
- upddown_fn.assert_called_with(self.agent.context,
- "123", self.agent.agent_id,
- cfg.CONF.host)
-
- port["admin_state_up"] = True
- self.agent.port_update("unused_context",
- port=port,
- network_type="vlan",
- segmentation_id="1",
- physical_network="physnet")
- updup_fn.assert_called_with(self.agent.context,
- "123", self.agent.agent_id,
- cfg.CONF.host)
-
- def test_port_update_plugin_rpc_failed(self):
- port = {'id': 1,
- 'network_id': 1,
- 'admin_state_up': True}
- with contextlib.nested(
- mock.patch.object(ovs_neutron_agent.LOG, 'error'),
- mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
- mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
- mock.patch.object(self.agent, 'port_bound'),
- mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
- mock.patch.object(self.agent, 'port_dead')
- ) as (log, _, device_up, _, device_down, _):
- device_up.side_effect = rpc_common.Timeout
- self.agent.port_update(mock.Mock(), port=port)
- self.assertTrue(device_up.called)
- self.assertEqual(log.call_count, 1)
-
- log.reset_mock()
- port['admin_state_up'] = False
- device_down.side_effect = rpc_common.Timeout
- self.agent.port_update(mock.Mock(), port=port)
- self.assertTrue(device_down.called)
- self.assertEqual(log.call_count, 1)
+ port = {"id": "123",
+ "network_id": "124",
+ "admin_state_up": False}
+ self.agent.port_update("unused_context",
+ port=port,
+ network_type="vlan",
+ segmentation_id="1",
+ physical_network="physnet")
+ self.assertEqual(set(['123']), self.agent.updated_ports)
def test_setup_physical_bridges(self):
with contextlib.nested(
diff --git a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py
index d5bd080929..22e346212e 100644
--- a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py
+++ b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py
@@ -426,12 +426,12 @@ class TunnelTest(base.BaseTestCase):
def testDaemonLoop(self):
reply2 = {'current': set(['tap0']),
- 'added': set([]),
+ 'added': set(['tap2']),
'removed': set([])}
reply3 = {'current': set(['tap2']),
'added': set([]),
- 'removed': set([])}
+ 'removed': set(['tap0'])}
self.mox.StubOutWithMock(log.ContextAdapter, 'exception')
log.ContextAdapter.exception(
@@ -439,19 +439,20 @@ class TunnelTest(base.BaseTestCase):
Exception('Fake exception to get out of the loop'))
self.mox.StubOutWithMock(
- ovs_neutron_agent.OVSNeutronAgent, 'update_ports')
- ovs_neutron_agent.OVSNeutronAgent.update_ports(set()).AndReturn(reply2)
- ovs_neutron_agent.OVSNeutronAgent.update_ports(
- set(['tap0'])).AndReturn(reply3)
+ ovs_neutron_agent.OVSNeutronAgent, 'scan_ports')
+ ovs_neutron_agent.OVSNeutronAgent.scan_ports(
+ set(), set()).AndReturn(reply2)
+ ovs_neutron_agent.OVSNeutronAgent.scan_ports(
+ set(['tap0']), set()).AndReturn(reply3)
self.mox.StubOutWithMock(
ovs_neutron_agent.OVSNeutronAgent, 'process_network_ports')
ovs_neutron_agent.OVSNeutronAgent.process_network_ports(
{'current': set(['tap0']),
'removed': set([]),
- 'added': set([])}).AndReturn(False)
+ 'added': set(['tap2'])}).AndReturn(False)
ovs_neutron_agent.OVSNeutronAgent.process_network_ports(
- {'current': set(['tap0']),
- 'removed': set([]),
+ {'current': set(['tap2']),
+ 'removed': set(['tap0']),
'added': set([])}).AndRaise(
Exception('Fake exception to get out of the loop'))
self.mox.ReplayAll()
diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py
index ca98a55be9..3c85676a2e 100644
--- a/neutron/tests/unit/test_security_groups_rpc.py
+++ b/neutron/tests/unit/test_security_groups_rpc.py
@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from contextlib import contextmanager
from contextlib import nested
import mock
@@ -501,14 +502,14 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
- def setUp(self):
+ def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp()
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
self.addCleanup(mock.patch.stopall)
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
- self.agent.init_firewall()
+ self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
@@ -543,7 +544,7 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
- [call.refresh_firewall([self.fake_device])])
+ [call.refresh_firewall([self.fake_device['device']])])
def test_security_groups_rule_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
@@ -556,7 +557,7 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
- [call.refresh_firewall([self.fake_device])])
+ [call.refresh_firewall([self.fake_device['device']])])
def test_security_groups_member_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
@@ -593,6 +594,208 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.firewall.assert_has_calls([])
+class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
+ SecurityGroupAgentRpcTestCase):
+
+ def setUp(self):
+ super(SecurityGroupAgentRpcWithDeferredRefreshTestCase, self).setUp(
+ defer_refresh_firewall=True)
+
+ @contextmanager
+ def add_fake_device(self, device, sec_groups, source_sec_groups=None):
+ fake_device = {'device': device,
+ 'security_groups': sec_groups,
+ 'security_group_source_groups': source_sec_groups or [],
+ 'security_group_rules': [{'security_group_id':
+ 'fake_sgid1',
+ 'remote_group_id':
+ 'fake_sgid2'}]}
+ self.firewall.ports[device] = fake_device
+ yield
+ del self.firewall.ports[device]
+
+ def test_security_groups_rule_updated(self):
+ self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_rule_updated_same_port(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgidX']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_rule_updated(['fake_sgid1'])
+ self.agent.security_groups_rule_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_rule_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid2']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_rule_updated(['fake_sgid1',
+ 'fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_rule_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid2']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_rule_updated(['fake_sgid1'])
+ self.agent.security_groups_rule_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_member_updated(self):
+ self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_member_updated_same_port(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid1', 'fake_sgid1B'],
+ source_sec_groups=['fake_sgidX']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_member_updated(['fake_sgid1',
+ 'fake_sgid3'])
+ self.agent.security_groups_member_updated(['fake_sgid2',
+ 'fake_sgid3'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_member_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid1', 'fake_sgid1B'],
+ source_sec_groups=['fake_sgid2']):
+ self.agent.security_groups_member_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_member_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid1', 'fake_sgid1B'],
+ source_sec_groups=['fake_sgid1B']):
+ self.agent.security_groups_member_updated(['fake_sgid1B'])
+ self.agent.security_groups_member_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_provider_updated(self):
+ self.agent.security_groups_provider_updated()
+ self.assertTrue(self.agent.global_refresh_firewall)
+
+ def test_setup_port_filters_new_ports_only(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(['fake_new_device']), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.assertFalse(self.agent.refresh_firewall.called)
+
+ def test_setup_port_filters_updated_ports_only(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(), set(['fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_updated_device']))
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filter_new_and_updated_ports(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(['fake_new_device']),
+ set(['fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_updated_device']))
+
+ def test_setup_port_filters_sg_updates_only(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device']))
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filters_sg_updates_and_new_ports(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(['fake_new_device']), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device']))
+
+ def test_setup_port_filters_sg_updates_and_updated_ports(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(
+ set(), set(['fake_device', 'fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device', 'fake_device_2', 'fake_updated_device']))
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filters_all_updates(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(
+ set(['fake_new_device']),
+ set(['fake_device', 'fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device', 'fake_device_2', 'fake_updated_device']))
+
+ def test_setup_port_filters_no_update(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filters_with_global_refresh(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = True
+ self.agent.setup_port_filters(set(), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with()
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+
class FakeSGRpcApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
pass
@@ -1210,7 +1413,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
- def setUp(self):
+ def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentWithIptables, self).setUp()
self.mox = mox.Mox()
cfg.CONF.set_override(
@@ -1225,7 +1428,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.root_helper = 'sudo'
self.agent.root_helper = 'sudo'
- self.agent.init_firewall()
+ self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.iptables = self.agent.firewall.iptables
self.mox.StubOutWithMock(self.iptables, "execute")