summaryrefslogtreecommitdiff
path: root/neutron/tests/unit/ml2/test_security_group.py
diff options
context:
space:
mode:
Diffstat (limited to 'neutron/tests/unit/ml2/test_security_group.py')
-rw-r--r--neutron/tests/unit/ml2/test_security_group.py107
1 files changed, 82 insertions, 25 deletions
diff --git a/neutron/tests/unit/ml2/test_security_group.py b/neutron/tests/unit/ml2/test_security_group.py
index 39c3cc2bae..cc8468ae23 100644
--- a/neutron/tests/unit/ml2/test_security_group.py
+++ b/neutron/tests/unit/ml2/test_security_group.py
@@ -14,11 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
+import math
import mock
from neutron.api.v2 import attributes
+from neutron.common import constants as const
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
+from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extension_security_group as test_sg
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
@@ -55,38 +59,91 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
plugin = manager.NeutronManager.get_plugin()
plugin.start_rpc_listeners()
- def test_security_group_get_port_from_device(self):
+ def _make_port_with_new_sec_group(self, net_id):
+ sg = self._make_security_group(self.fmt, 'name', 'desc')
+ port = self._make_port(
+ self.fmt, net_id, security_groups=[sg['security_group']['id']])
+ return port['port']
+
+ def test_security_group_get_ports_from_devices(self):
with self.network() as n:
with self.subnet(n):
- with self.security_group() as sg:
- security_group_id = sg['security_group']['id']
- res = self._create_port(self.fmt, n['network']['id'])
- port = self.deserialize(self.fmt, res)
- fixed_ips = port['port']['fixed_ips']
- data = {'port': {'fixed_ips': fixed_ips,
- 'name': port['port']['name'],
- ext_sg.SECURITYGROUPS:
- [security_group_id]}}
-
- req = self.new_update_request('ports', data,
- port['port']['id'])
- res = self.deserialize(self.fmt,
- req.get_response(self.api))
- port_id = res['port']['id']
- plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.get_port_from_device(port_id)
- self.assertEqual(port_id, port_dict['id'])
- self.assertEqual([security_group_id],
+ port1 = self._make_port_with_new_sec_group(n['network']['id'])
+ port2 = self._make_port_with_new_sec_group(n['network']['id'])
+ plugin = manager.NeutronManager.get_plugin()
+ # should match full ID and starting chars
+ ports = plugin.get_ports_from_devices(
+ [port1['id'], port2['id'][0:8]])
+ self.assertEqual(2, len(ports))
+ for port_dict in ports:
+ p = port1 if port1['id'] == port_dict['id'] else port2
+ self.assertEqual(p['id'], port_dict['id'])
+ self.assertEqual(p['security_groups'],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
- self.assertEqual([fixed_ips[0]['ip_address']],
+ self.assertEqual([p['fixed_ips'][0]['ip_address']],
port_dict['fixed_ips'])
- self._delete('ports', port_id)
+ self._delete('ports', p['id'])
+
+ def test_security_group_get_ports_from_devices_with_bad_id(self):
+ plugin = manager.NeutronManager.get_plugin()
+ ports = plugin.get_ports_from_devices(['bad_device_id'])
+ self.assertFalse(ports)
- def test_security_group_get_port_from_device_with_no_port(self):
+ def test_security_group_no_db_calls_with_no_ports(self):
+ plugin = manager.NeutronManager.get_plugin()
+ with mock.patch(
+ 'neutron.plugins.ml2.db.get_sg_ids_grouped_by_port'
+ ) as get_mock:
+ self.assertFalse(plugin.get_ports_from_devices([]))
+ self.assertFalse(get_mock.called)
+
+ def test_large_port_count_broken_into_parts(self):
+ plugin = manager.NeutronManager.get_plugin()
+ max_ports_per_query = 5
+ ports_to_query = 73
+ for max_ports_per_query in (1, 2, 5, 7, 9, 31):
+ with contextlib.nested(
+ mock.patch('neutron.plugins.ml2.db.MAX_PORTS_PER_QUERY',
+ new=max_ports_per_query),
+ mock.patch('neutron.plugins.ml2.db.get_sg_ids_grouped_by_port',
+ return_value={}),
+ ) as (max_mock, get_mock):
+ plugin.get_ports_from_devices(
+ ['%s%s' % (const.TAP_DEVICE_PREFIX, i)
+ for i in range(ports_to_query)])
+ all_call_args = map(lambda x: x[1][0], get_mock.mock_calls)
+ last_call_args = all_call_args.pop()
+ # all but last should be getting MAX_PORTS_PER_QUERY ports
+ self.assertTrue(
+ all(map(lambda x: len(x) == max_ports_per_query,
+ all_call_args))
+ )
+ remaining = ports_to_query % max_ports_per_query
+ if remaining:
+ self.assertEqual(remaining, len(last_call_args))
+ # should be broken into ceil(total/MAX_PORTS_PER_QUERY) calls
+ self.assertEqual(
+ math.ceil(ports_to_query / float(max_ports_per_query)),
+ get_mock.call_count
+ )
+
+ def test_full_uuids_skip_port_id_lookup(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.get_port_from_device('bad_device_id')
- self.assertIsNone(port_dict)
+ # when full UUIDs are provided, the _or statement should only
+ # have one matching 'IN' critiera for all of the IDs
+ with contextlib.nested(
+ mock.patch('neutron.plugins.ml2.db.or_'),
+ mock.patch('neutron.plugins.ml2.db.db_api.get_session')
+ ) as (or_mock, sess_mock):
+ fmock = sess_mock.query.return_value.outerjoin.return_value.filter
+ # return no ports to exit the method early since we are mocking
+ # the query
+ fmock.return_value.all.return_value = []
+ plugin.get_ports_from_devices([test_api_v2._uuid(),
+ test_api_v2._uuid()])
+ # the or_ function should only have one argument
+ or_mock.assert_called_once_with(mock.ANY)
class TestMl2SecurityGroupsXML(TestMl2SecurityGroups):