summaryrefslogtreecommitdiff
path: root/neutron/tests/unit/test_security_groups_rpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'neutron/tests/unit/test_security_groups_rpc.py')
-rw-r--r--neutron/tests/unit/test_security_groups_rpc.py45
1 files changed, 45 insertions, 0 deletions
diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py
index 7f20c7d9ea..3acc378b1d 100644
--- a/neutron/tests/unit/test_security_groups_rpc.py
+++ b/neutron/tests/unit/test_security_groups_rpc.py
@@ -1021,6 +1021,7 @@ class BaseSecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.agent.root_helper = 'sudo'
self.agent.plugin_rpc = mock.Mock()
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
+ self.default_firewall = self.agent.firewall
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
@@ -1057,6 +1058,26 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
self.fake_device),
])
+ def test_prepare_devices_filter_with_noopfirewall(self):
+ self.agent.firewall = self.default_firewall
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.prepare_devices_filter(['fake_device'])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+
+ def test_prepare_devices_filter_with_firewall_disabled(self):
+ cfg.CONF.set_override('enable_security_group', False, 'SECURITYGROUP')
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.prepare_devices_filter(['fake_device'])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+
def test_security_groups_rule_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
@@ -1111,6 +1132,30 @@ class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
self.agent.refresh_firewall([])
self.assertFalse(self.firewall.called)
+ def test_refresh_firewall_with_firewall_disabled(self):
+ cfg.CONF.set_override('enable_security_group', False, 'SECURITYGROUP')
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.firewall.defer_apply = mock.Mock()
+ self.agent.refresh_firewall([self.fake_device])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+ self.assertFalse(self.agent.firewall.defer_apply.called)
+
+ def test_refresh_firewall_with_noopfirewall(self):
+ self.agent.firewall = self.default_firewall
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.firewall.defer_apply = mock.Mock()
+ self.agent.refresh_firewall([self.fake_device])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+ self.assertFalse(self.agent.firewall.defer_apply.called)
+
class SecurityGroupAgentEnhancedRpcTestCase(
BaseSecurityGroupAgentRpcTestCase):