diff options
Diffstat (limited to 'neutron/tests/unit/ml2/test_security_group.py')
-rw-r--r-- | neutron/tests/unit/ml2/test_security_group.py | 107 |
1 files changed, 82 insertions, 25 deletions
diff --git a/neutron/tests/unit/ml2/test_security_group.py b/neutron/tests/unit/ml2/test_security_group.py index 39c3cc2bae..cc8468ae23 100644 --- a/neutron/tests/unit/ml2/test_security_group.py +++ b/neutron/tests/unit/ml2/test_security_group.py @@ -14,11 +14,15 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib +import math import mock from neutron.api.v2 import attributes +from neutron.common import constants as const from neutron.extensions import securitygroup as ext_sg from neutron import manager +from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_extension_security_group as test_sg from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc @@ -55,38 +59,91 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase, plugin = manager.NeutronManager.get_plugin() plugin.start_rpc_listeners() - def test_security_group_get_port_from_device(self): + def _make_port_with_new_sec_group(self, net_id): + sg = self._make_security_group(self.fmt, 'name', 'desc') + port = self._make_port( + self.fmt, net_id, security_groups=[sg['security_group']['id']]) + return port['port'] + + def test_security_group_get_ports_from_devices(self): with self.network() as n: with self.subnet(n): - with self.security_group() as sg: - security_group_id = sg['security_group']['id'] - res = self._create_port(self.fmt, n['network']['id']) - port = self.deserialize(self.fmt, res) - fixed_ips = port['port']['fixed_ips'] - data = {'port': {'fixed_ips': fixed_ips, - 'name': port['port']['name'], - ext_sg.SECURITYGROUPS: - [security_group_id]}} - - req = self.new_update_request('ports', data, - port['port']['id']) - res = self.deserialize(self.fmt, - req.get_response(self.api)) - port_id = res['port']['id'] - plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.get_port_from_device(port_id) - self.assertEqual(port_id, port_dict['id']) - self.assertEqual([security_group_id], + port1 = self._make_port_with_new_sec_group(n['network']['id']) + port2 = self._make_port_with_new_sec_group(n['network']['id']) + plugin = manager.NeutronManager.get_plugin() + # should match full ID and starting chars + ports = plugin.get_ports_from_devices( + [port1['id'], port2['id'][0:8]]) + self.assertEqual(2, len(ports)) + for port_dict in ports: + p = port1 if port1['id'] == port_dict['id'] else port2 + self.assertEqual(p['id'], port_dict['id']) + self.assertEqual(p['security_groups'], port_dict[ext_sg.SECURITYGROUPS]) self.assertEqual([], port_dict['security_group_rules']) - self.assertEqual([fixed_ips[0]['ip_address']], + self.assertEqual([p['fixed_ips'][0]['ip_address']], port_dict['fixed_ips']) - self._delete('ports', port_id) + self._delete('ports', p['id']) + + def test_security_group_get_ports_from_devices_with_bad_id(self): + plugin = manager.NeutronManager.get_plugin() + ports = plugin.get_ports_from_devices(['bad_device_id']) + self.assertFalse(ports) - def test_security_group_get_port_from_device_with_no_port(self): + def test_security_group_no_db_calls_with_no_ports(self): + plugin = manager.NeutronManager.get_plugin() + with mock.patch( + 'neutron.plugins.ml2.db.get_sg_ids_grouped_by_port' + ) as get_mock: + self.assertFalse(plugin.get_ports_from_devices([])) + self.assertFalse(get_mock.called) + + def test_large_port_count_broken_into_parts(self): + plugin = manager.NeutronManager.get_plugin() + max_ports_per_query = 5 + ports_to_query = 73 + for max_ports_per_query in (1, 2, 5, 7, 9, 31): + with contextlib.nested( + mock.patch('neutron.plugins.ml2.db.MAX_PORTS_PER_QUERY', + new=max_ports_per_query), + mock.patch('neutron.plugins.ml2.db.get_sg_ids_grouped_by_port', + return_value={}), + ) as (max_mock, get_mock): + plugin.get_ports_from_devices( + ['%s%s' % (const.TAP_DEVICE_PREFIX, i) + for i in range(ports_to_query)]) + all_call_args = map(lambda x: x[1][0], get_mock.mock_calls) + last_call_args = all_call_args.pop() + # all but last should be getting MAX_PORTS_PER_QUERY ports + self.assertTrue( + all(map(lambda x: len(x) == max_ports_per_query, + all_call_args)) + ) + remaining = ports_to_query % max_ports_per_query + if remaining: + self.assertEqual(remaining, len(last_call_args)) + # should be broken into ceil(total/MAX_PORTS_PER_QUERY) calls + self.assertEqual( + math.ceil(ports_to_query / float(max_ports_per_query)), + get_mock.call_count + ) + + def test_full_uuids_skip_port_id_lookup(self): plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.get_port_from_device('bad_device_id') - self.assertIsNone(port_dict) + # when full UUIDs are provided, the _or statement should only + # have one matching 'IN' critiera for all of the IDs + with contextlib.nested( + mock.patch('neutron.plugins.ml2.db.or_'), + mock.patch('neutron.plugins.ml2.db.db_api.get_session') + ) as (or_mock, sess_mock): + fmock = sess_mock.query.return_value.outerjoin.return_value.filter + # return no ports to exit the method early since we are mocking + # the query + fmock.return_value.all.return_value = [] + plugin.get_ports_from_devices([test_api_v2._uuid(), + test_api_v2._uuid()]) + # the or_ function should only have one argument + or_mock.assert_called_once_with(mock.ANY) class TestMl2SecurityGroupsXML(TestMl2SecurityGroups): |