diff options
author | Zuul <zuul@review.opendev.org> | 2021-09-27 14:33:15 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-09-27 14:33:15 +0000 |
commit | bace3d1890c73ba81011d500e2fdd0fffb95a4ff (patch) | |
tree | 4b07f1ab015b6b709f12b7af8a050fab5486d2d8 | |
parent | 9f3b83f4dd4a2cd318b9afc636c54622be762bb0 (diff) | |
parent | e2d5d65a41209423e2d980db407b0567f97b903d (diff) | |
download | neutron-stable/rocky.tar.gz |
Merge "[L3] Use processing queue for network update events" into stable/rockystable/rocky
-rw-r--r-- | neutron/agent/l3/agent.py | 149 | ||||
-rw-r--r-- | neutron/tests/functional/agent/l3/test_legacy_router.py | 2 | ||||
-rw-r--r-- | neutron/tests/unit/agent/l3/test_agent.py | 46 |
3 files changed, 125 insertions, 72 deletions
diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index f5830e9826..02a22bd524 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -78,6 +78,7 @@ DELETE_RELATED_ROUTER = 2 ADD_UPDATE_ROUTER = 3 ADD_UPDATE_RELATED_ROUTER = 4 PD_UPDATE = 5 +UPDATE_NETWORK = 6 RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER, ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER} @@ -501,16 +502,26 @@ class L3NATAgent(ha.AgentMixin, network_id = kwargs['network']['id'] LOG.debug("Got network %s update", network_id) for ri in self.router_info.values(): - LOG.debug("Checking if router %s is plugged to the network %s", - ri, network_id) - ports = list(ri.internal_ports) - if ri.ex_gw_port: - ports.append(ri.ex_gw_port) - port_belongs = lambda p: p['network_id'] == network_id - if any(port_belongs(p) for p in ports): - update = queue.ResourceUpdate( - ri.router_id, PRIORITY_SYNC_ROUTERS_TASK) - self._resync_router(update) + update = queue.ResourceUpdate(ri.router_id, + PRIORITY_RPC, + action=UPDATE_NETWORK, + resource=network_id) + self._queue.add(update) + + def _process_network_update(self, router_id, network_id): + ri = self.router_info.get(router_id) + if not ri: + return + LOG.debug("Checking if router %s is plugged to the network %s", + ri, network_id) + ports = list(ri.internal_ports) + if ri.ex_gw_port: + ports.append(ri.ex_gw_port) + port_belongs = lambda p: p['network_id'] == network_id + if any(port_belongs(p) for p in ports): + update = queue.ResourceUpdate( + ri.router_id, PRIORITY_SYNC_ROUTERS_TASK) + self._resync_router(update) def _process_router_if_compatible(self, router): if (self.conf.external_network_bridge and @@ -611,69 +622,83 @@ class L3NATAgent(ha.AgentMixin, router_update.resource = None # Force the agent to resync the router self._queue.add(router_update) - def _process_router_update(self): + def _process_update(self): + for rp, update in self._queue.each_update_to_next_resource(): - LOG.info("Starting router update for %s, action %s, priority %s, " + LOG.info("Starting processing update %s, action %s, priority %s, " "update_id %s. Wait time elapsed: %.3f", update.id, update.action, update.priority, update.update_id, update.time_elapsed_since_create) - if update.action == PD_UPDATE: - self.pd.process_prefix_update() - LOG.info("Finished a router update for %s IPv6 PD, " - "update_id. %s. Time elapsed: %.3f", - update.id, update.update_id, - update.time_elapsed_since_start) - continue - - routers = [update.resource] if update.resource else [] + if update.action == UPDATE_NETWORK: + self._process_network_update( + router_id=update.id, + network_id=update.resource) + else: + self._process_router_update(rp, update) + + def _process_router_update(self, rp, update): + LOG.info("Starting router update for %s, action %s, priority %s, " + "update_id %s. Wait time elapsed: %.3f", + update.id, update.action, update.priority, + update.update_id, + update.time_elapsed_since_create) + if update.action == PD_UPDATE: + self.pd.process_prefix_update() + LOG.info("Finished a router update for %s IPv6 PD, " + "update_id. %s. Time elapsed: %.3f", + update.id, update.update_id, + update.time_elapsed_since_start) + return - not_delete_no_routers = (update.action != DELETE_ROUTER and - not routers) - related_action = update.action in (DELETE_RELATED_ROUTER, - ADD_UPDATE_RELATED_ROUTER) - if not_delete_no_routers or related_action: - try: - update.timestamp = timeutils.utcnow() - routers = self.plugin_rpc.get_routers(self.context, - [update.id]) - except Exception: - msg = "Failed to fetch router information for '%s'" - LOG.exception(msg, update.id) - self._resync_router(update) - continue - - # For a related action, verify the router is still hosted here, - # since it could have just been deleted and we don't want to - # add it back. - if related_action: - routers = [r for r in routers if r['id'] == update.id] - - if not routers: - removed = self._safe_router_removed(update.id) - if not removed: - self._resync_router(update) - else: - # need to update timestamp of removed router in case - # there are older events for the same router in the - # processing queue (like events from fullsync) in order to - # prevent deleted router re-creation - rp.fetched_and_processed(update.timestamp) - LOG.info("Finished a router update for %s, update_id %s. " - "Time elapsed: %.3f", - update.id, update.update_id, - update.time_elapsed_since_start) - continue + routers = [update.resource] if update.resource else [] - if not self._process_routers_if_compatible(routers, update): + not_delete_no_routers = (update.action != DELETE_ROUTER and + not routers) + related_action = update.action in (DELETE_RELATED_ROUTER, + ADD_UPDATE_RELATED_ROUTER) + if not_delete_no_routers or related_action: + try: + update.timestamp = timeutils.utcnow() + routers = self.plugin_rpc.get_routers(self.context, + [update.id]) + except Exception: + msg = "Failed to fetch router information for '%s'" + LOG.exception(msg, update.id) self._resync_router(update) - continue + return + + # For a related action, verify the router is still hosted here, + # since it could have just been deleted and we don't want to + # add it back. + if related_action: + routers = [r for r in routers if r['id'] == update.id] - rp.fetched_and_processed(update.timestamp) - LOG.info("Finished a router update for %s, update_id %s. " + if not routers: + removed = self._safe_router_removed(update.id) + if not removed: + self._resync_router(update) + else: + # need to update timestamp of removed router in case + # there are older events for the same router in the + # processing queue (like events from fullsync) in order to + # prevent deleted router re-creation + rp.fetched_and_processed(update.timestamp) + LOG.info("Finished a router delete for %s, update_id %s. " "Time elapsed: %.3f", update.id, update.update_id, update.time_elapsed_since_start) + return + + if not self._process_routers_if_compatible(routers, update): + self._resync_router(update) + return + + rp.fetched_and_processed(update.timestamp) + LOG.info("Finished a router update for %s, update_id %s. " + "Time elapsed: %.3f", + update.id, update.update_id, + update.time_elapsed_since_start) def _process_routers_if_compatible(self, routers, update): process_result = True @@ -722,7 +747,7 @@ class L3NATAgent(ha.AgentMixin, def _process_routers_loop(self): LOG.debug("Starting _process_routers_loop") while True: - self._pool.spawn_n(self._process_router_update) + self._pool.spawn_n(self._process_update) # NOTE(kevinbenton): this is set to 1 second because the actual interval # is controlled by a FixedIntervalLoopingCall in neutron/service.py that diff --git a/neutron/tests/functional/agent/l3/test_legacy_router.py b/neutron/tests/functional/agent/l3/test_legacy_router.py index bc696afe9a..27733b2d51 100644 --- a/neutron/tests/functional/agent/l3/test_legacy_router.py +++ b/neutron/tests/functional/agent/l3/test_legacy_router.py @@ -239,7 +239,7 @@ class L3AgentTestCase(framework.L3AgentTestFramework): # make sure all events are processed while not self.agent._queue._queue.empty(): - self.agent._process_router_update() + self.agent._process_update() for r in routers_to_keep: self.assertIn(r['id'], self.agent.router_info) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index f8162a181d..273062fdc6 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -2203,11 +2203,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): ri.process = mock.Mock() ri.initialize = mock.Mock(side_effect=RuntimeError()) agent._create_router = mock.Mock(return_value=ri) - agent._process_router_update() + agent._process_update() log_exception.assert_has_calls(calls) ri.initialize.side_effect = None - agent._process_router_update() + agent._process_update() self.assertTrue(ri.delete.called) self.assertEqual(2, ri.initialize.call_count) self.assertEqual(2, agent._create_router.call_count) @@ -2519,6 +2519,17 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): self.assertFalse(agent._queue.add.called) def test_network_update(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent.router_info = { + _uuid(): mock.Mock(), + _uuid(): mock.Mock()} + network_id = _uuid() + agent._queue = mock.Mock() + network = {'id': network_id} + agent.network_update(None, network=network) + self.assertEqual(2, agent._queue.add.call_count) + + def test__process_network_update(self): router = l3_test_common.prepare_router_data(num_internal_ports=2) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) agent._process_added_router(router) @@ -2527,10 +2538,27 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): internal_ports = ri.router.get(lib_constants.INTERFACE_KEY, []) network_id = internal_ports[0]['network_id'] agent._queue = mock.Mock() - network = {'id': network_id} - agent.network_update(None, network=network) + agent._process_network_update(ri.router_id, network_id) self.assertEqual(1, agent._queue.add.call_count) + def test__process_network_update_no_router_info_found(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + network_id = _uuid() + agent._queue = mock.Mock() + agent._process_network_update(_uuid(), network_id) + agent._queue.add.assert_not_called() + + def test__process_network_update_not_connected_to_router(self): + router = l3_test_common.prepare_router_data(num_internal_ports=2) + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent._process_added_router(router) + ri = l3router.RouterInfo(agent, router['id'], + router, **self.ri_kwargs) + network_id = _uuid() + agent._queue = mock.Mock() + agent._process_network_update(ri.router_id, network_id) + agent._queue.add.assert_not_called() + def test_create_router_namespace(self): self.mock_ip.ensure_namespace.return_value = self.mock_ip agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) @@ -2660,7 +2688,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): update.resource = None agent._queue.each_update_to_next_resource.side_effect = [ [(None, update)]] - agent._process_router_update() + agent._process_update() self.assertFalse(agent.fullsync) self.assertEqual(ext_net_call, agent._process_router_if_compatible.called) @@ -2687,13 +2715,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): resource=router, timestamp=timeutils.utcnow()) agent._queue.add(update) - agent._process_router_update() + agent._process_update() # The update contained the router object, get_routers won't be called self.assertFalse(agent.plugin_rpc.get_routers.called) # The update failed, assert that get_routers was called - agent._process_router_update() + agent._process_update() self.assertTrue(agent.plugin_rpc.get_routers.called) def test_process_routers_update_rpc_timeout_on_get_ext_net(self): @@ -2717,7 +2745,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent.plugin_rpc.get_routers.side_effect = ( Exception("Failed to get router info")) # start test - agent._process_router_update() + agent._process_update() router_info.delete.assert_not_called() self.assertFalse(router_info.delete.called) self.assertTrue(agent.router_info) @@ -2740,7 +2768,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): agent._safe_router_removed = mock.Mock() if error: agent._safe_router_removed.return_value = False - agent._process_router_update() + agent._process_update() if error: self.assertFalse(router_processor.fetched_and_processed.called) agent._resync_router.assert_called_with(update) |