From 68cbf56f9ccf9ecd4a53df5c684c2036d78a8612 Mon Sep 17 00:00:00 2001 From: elajkat Date: Fri, 6 Jan 2023 13:10:16 +0100 Subject: Move network trunk commands from python-neutronclient The depends-on patch adds trunk commands to OSC, as we can long consider trunk operations as core Networking operations. Change-Id: Ie557a5d541cf117d20f3f2b548620a74dbadb383 Depends-On: https://review.opendev.org/c/openstack/python-openstackclient/+/869447 Related-Bug: #1999774 --- doc/source/cli/osc/v2/network-trunk.rst | 16 - neutronclient/osc/v2/trunk/__init__.py | 0 neutronclient/osc/v2/trunk/network_trunk.py | 393 ----------- neutronclient/tests/unit/osc/v2/trunk/__init__.py | 0 neutronclient/tests/unit/osc/v2/trunk/fakes.py | 87 --- .../tests/unit/osc/v2/trunk/test_network_trunk.py | 769 --------------------- setup.cfg | 7 - 7 files changed, 1272 deletions(-) delete mode 100644 doc/source/cli/osc/v2/network-trunk.rst delete mode 100644 neutronclient/osc/v2/trunk/__init__.py delete mode 100644 neutronclient/osc/v2/trunk/network_trunk.py delete mode 100644 neutronclient/tests/unit/osc/v2/trunk/__init__.py delete mode 100644 neutronclient/tests/unit/osc/v2/trunk/fakes.py delete mode 100644 neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py diff --git a/doc/source/cli/osc/v2/network-trunk.rst b/doc/source/cli/osc/v2/network-trunk.rst deleted file mode 100644 index 22144a4..0000000 --- a/doc/source/cli/osc/v2/network-trunk.rst +++ /dev/null @@ -1,16 +0,0 @@ -============= -network trunk -============= - -A **network trunk** is a container to group logical ports from different -networks and provide a single trunked vNIC for servers. It consists of -one parent port which is a regular VIF and multiple subports which allow -the server to connect to more networks. - -Network v2 - -.. autoprogram-cliff:: openstack.neutronclient.v2 - :command: network subport list - -.. autoprogram-cliff:: openstack.neutronclient.v2 - :command: network trunk * diff --git a/neutronclient/osc/v2/trunk/__init__.py b/neutronclient/osc/v2/trunk/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/neutronclient/osc/v2/trunk/network_trunk.py b/neutronclient/osc/v2/trunk/network_trunk.py deleted file mode 100644 index 26cb52b..0000000 --- a/neutronclient/osc/v2/trunk/network_trunk.py +++ /dev/null @@ -1,393 +0,0 @@ -# Copyright 2016 ZTE Corporation. -# All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -"""Network trunk and subports action implementations""" -import logging - -from osc_lib.cli import format_columns -from osc_lib.cli import parseractions -from osc_lib.command import command -from osc_lib import exceptions -from osc_lib import utils as osc_utils - -from neutronclient._i18n import _ -from neutronclient.osc import utils as nc_osc_utils -from neutronclient.osc.v2 import utils as v2_utils - -LOG = logging.getLogger(__name__) - -TRUNK = 'trunk' -TRUNKS = 'trunks' -SUB_PORTS = 'sub_ports' - - -class CreateNetworkTrunk(command.ShowOne): - """Create a network trunk for a given project""" - - def get_parser(self, prog_name): - parser = super(CreateNetworkTrunk, self).get_parser(prog_name) - parser.add_argument( - 'name', - metavar='', - help=_("Name of the trunk to create") - ) - parser.add_argument( - '--description', - metavar='', - help=_("A description of the trunk") - ) - parser.add_argument( - '--parent-port', - metavar='', - required=True, - help=_("Parent port belonging to this trunk (name or ID)") - ) - parser.add_argument( - '--subport', - metavar='', - action=parseractions.MultiKeyValueAction, dest='add_subports', - optional_keys=['segmentation-id', 'segmentation-type'], - required_keys=['port'], - help=_("Subport to add. Subport is of form " - "\'port=,segmentation-type=,segmentation-ID=\' " - "(--subport) option can be repeated") - ) - admin_group = parser.add_mutually_exclusive_group() - admin_group.add_argument( - '--enable', - action='store_true', - default=True, - help=_("Enable trunk (default)") - ) - admin_group.add_argument( - '--disable', - action='store_true', - help=_("Disable trunk") - ) - nc_osc_utils.add_project_owner_option_to_parser(parser) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - attrs = _get_attrs_for_trunk(self.app.client_manager, - parsed_args) - body = {TRUNK: attrs} - obj = client.create_trunk(body) - columns = _get_columns(obj[TRUNK]) - data = osc_utils.get_dict_properties(obj[TRUNK], columns, - formatters=_formatters) - return columns, data - - -class DeleteNetworkTrunk(command.Command): - """Delete a given network trunk""" - - def get_parser(self, prog_name): - parser = super(DeleteNetworkTrunk, self).get_parser(prog_name) - parser.add_argument( - 'trunk', - metavar="", - nargs="+", - help=_("Trunk(s) to delete (name or ID)") - ) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - result = 0 - for trunk in parsed_args.trunk: - try: - trunk_id = _get_id(client, trunk, TRUNK) - client.delete_trunk(trunk_id) - except Exception as e: - result += 1 - LOG.error(_("Failed to delete trunk with name " - "or ID '%(trunk)s': %(e)s"), - {'trunk': trunk, 'e': e}) - if result > 0: - total = len(parsed_args.trunk) - msg = (_("%(result)s of %(total)s trunks failed " - "to delete.") % {'result': result, 'total': total}) - raise exceptions.CommandError(msg) - - -class ListNetworkTrunk(command.Lister): - """List all network trunks""" - - def get_parser(self, prog_name): - parser = super(ListNetworkTrunk, self).get_parser(prog_name) - parser.add_argument( - '--long', - action='store_true', - default=False, - help=_("List additional fields in output") - ) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - data = client.list_trunks() - headers = ( - 'ID', - 'Name', - 'Parent Port', - 'Description' - ) - columns = ( - 'id', - 'name', - 'port_id', - 'description' - ) - if parsed_args.long: - headers += ( - 'Status', - 'State', - 'Created At', - 'Updated At', - ) - columns += ( - 'status', - 'admin_state_up', - 'created_at', - 'updated_at' - ) - return (headers, - (osc_utils.get_dict_properties( - s, columns, - formatters=_formatters, - ) for s in data[TRUNKS])) - - -class SetNetworkTrunk(command.Command): - """Set network trunk properties""" - - def get_parser(self, prog_name): - parser = super(SetNetworkTrunk, self).get_parser(prog_name) - parser.add_argument( - 'trunk', - metavar="", - help=_("Trunk to modify (name or ID)") - ) - parser.add_argument( - '--name', - metavar="", - help=_("Set trunk name") - ) - parser.add_argument( - '--description', - metavar='', - help=_("A description of the trunk") - ) - parser.add_argument( - '--subport', - metavar='', - action=parseractions.MultiKeyValueAction, dest='set_subports', - optional_keys=['segmentation-id', 'segmentation-type'], - required_keys=['port'], - help=_("Subport to add. Subport is of form " - "\'port=,segmentation-type=,segmentation-ID=\'" - "(--subport) option can be repeated") - ) - admin_group = parser.add_mutually_exclusive_group() - admin_group.add_argument( - '--enable', - action='store_true', - help=_("Enable trunk") - ) - admin_group.add_argument( - '--disable', - action='store_true', - help=_("Disable trunk") - ) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - trunk_id = _get_id(client, parsed_args.trunk, TRUNK) - attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args) - body = {TRUNK: attrs} - try: - client.update_trunk(trunk_id, body) - except Exception as e: - msg = (_("Failed to set trunk '%(t)s': %(e)s") - % {'t': parsed_args.trunk, 'e': e}) - raise exceptions.CommandError(msg) - if parsed_args.set_subports: - subport_attrs = _get_attrs_for_subports(self.app.client_manager, - parsed_args) - try: - client.trunk_add_subports(trunk_id, subport_attrs) - except Exception as e: - msg = (_("Failed to add subports to trunk '%(t)s': %(e)s") - % {'t': parsed_args.trunk, 'e': e}) - raise exceptions.CommandError(msg) - - -class ShowNetworkTrunk(command.ShowOne): - """Show information of a given network trunk""" - def get_parser(self, prog_name): - parser = super(ShowNetworkTrunk, self).get_parser(prog_name) - parser.add_argument( - 'trunk', - metavar="", - help=_("Trunk to display (name or ID)") - ) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - trunk_id = _get_id(client, parsed_args.trunk, TRUNK) - obj = client.show_trunk(trunk_id) - columns = _get_columns(obj[TRUNK]) - data = osc_utils.get_dict_properties(obj[TRUNK], columns, - formatters=_formatters) - return columns, data - - -class ListNetworkSubport(command.Lister): - """List all subports for a given network trunk""" - - def get_parser(self, prog_name): - parser = super(ListNetworkSubport, self).get_parser(prog_name) - parser.add_argument( - '--trunk', - required=True, - metavar="", - help=_("List subports belonging to this trunk (name or ID)") - ) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - trunk_id = _get_id(client, parsed_args.trunk, TRUNK) - data = client.trunk_get_subports(trunk_id) - headers = ('Port', 'Segmentation Type', 'Segmentation ID') - columns = ('port_id', 'segmentation_type', 'segmentation_id') - return (headers, - (osc_utils.get_dict_properties( - s, columns, - ) for s in data[SUB_PORTS])) - - -class UnsetNetworkTrunk(command.Command): - """Unset subports from a given network trunk""" - - def get_parser(self, prog_name): - parser = super(UnsetNetworkTrunk, self).get_parser(prog_name) - parser.add_argument( - 'trunk', - metavar="", - help=_("Unset subports from this trunk (name or ID)") - ) - parser.add_argument( - '--subport', - metavar="", - required=True, - action='append', dest='unset_subports', - help=_("Subport to delete (name or ID of the port) " - "(--subport) option can be repeated") - ) - return parser - - def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args) - trunk_id = _get_id(client, parsed_args.trunk, TRUNK) - client.trunk_remove_subports(trunk_id, attrs) - - -_formatters = { - 'admin_state_up': v2_utils.AdminStateColumn, - 'sub_ports': format_columns.ListDictColumn, -} - - -def _get_columns(item): - return tuple(sorted(list(item.keys()))) - - -def _get_attrs_for_trunk(client_manager, parsed_args): - attrs = {} - if parsed_args.name is not None: - attrs['name'] = str(parsed_args.name) - if parsed_args.description is not None: - attrs['description'] = str(parsed_args.description) - if parsed_args.enable: - attrs['admin_state_up'] = True - if parsed_args.disable: - attrs['admin_state_up'] = False - if 'parent_port' in parsed_args and parsed_args.parent_port is not None: - port_id = _get_id(client_manager.neutronclient, - parsed_args.parent_port, 'port') - attrs['port_id'] = port_id - if 'add_subports' in parsed_args and parsed_args.add_subports is not None: - attrs[SUB_PORTS] = _format_subports(client_manager, - parsed_args.add_subports) - - # "trunk set" command doesn't support setting project. - if 'project' in parsed_args and parsed_args.project is not None: - identity_client = client_manager.identity - project_id = nc_osc_utils.find_project( - identity_client, - parsed_args.project, - parsed_args.project_domain, - ).id - attrs['tenant_id'] = project_id - - return attrs - - -def _format_subports(client_manager, subports): - attrs = [] - for subport in subports: - subport_attrs = {} - if subport.get('port'): - port_id = _get_id(client_manager.neutronclient, - subport['port'], 'port') - subport_attrs['port_id'] = port_id - if subport.get('segmentation-id'): - try: - subport_attrs['segmentation_id'] = int( - subport['segmentation-id']) - except ValueError: - msg = (_("Segmentation-id '%s' is not an integer") % - subport['segmentation-id']) - raise exceptions.CommandError(msg) - if subport.get('segmentation-type'): - subport_attrs['segmentation_type'] = subport['segmentation-type'] - attrs.append(subport_attrs) - return attrs - - -def _get_attrs_for_subports(client_manager, parsed_args): - attrs = {} - if 'set_subports' in parsed_args and parsed_args.set_subports is not None: - attrs[SUB_PORTS] = _format_subports(client_manager, - parsed_args.set_subports) - if ('unset_subports' in parsed_args and - parsed_args.unset_subports is not None): - subports_list = [] - for subport in parsed_args.unset_subports: - port_id = _get_id(client_manager.neutronclient, - subport, 'port') - subports_list.append({'port_id': port_id}) - attrs[SUB_PORTS] = subports_list - return attrs - - -def _get_id(client, id_or_name, resource): - return client.find_resource(resource, str(id_or_name))['id'] diff --git a/neutronclient/tests/unit/osc/v2/trunk/__init__.py b/neutronclient/tests/unit/osc/v2/trunk/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/neutronclient/tests/unit/osc/v2/trunk/fakes.py b/neutronclient/tests/unit/osc/v2/trunk/fakes.py deleted file mode 100644 index a52aa30..0000000 --- a/neutronclient/tests/unit/osc/v2/trunk/fakes.py +++ /dev/null @@ -1,87 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -from unittest import mock - -from oslo_utils import uuidutils - - -class FakeTrunk(object): - """Fake one or more trunks.""" - @staticmethod - def create_one_trunk(attrs=None): - """Create a fake trunk. - - :param Dictionary attrs: - A dictionary with all attributes - :return: - A Dictionary with id, name, description, admin_state_up, port_id, - sub_ports, status and project_id - """ - attrs = attrs or {} - - # Set default attributes. - trunk_attrs = { - 'id': 'trunk-id-' + uuidutils.generate_uuid(dashed=False), - 'name': 'trunk-name-' + uuidutils.generate_uuid(dashed=False), - 'description': '', - 'port_id': 'port-' + uuidutils.generate_uuid(dashed=False), - 'admin_state_up': True, - 'project_id': 'project-id-' + - uuidutils.generate_uuid(dashed=False), - 'status': 'ACTIVE', - 'sub_ports': [{'port_id': 'subport-' + - uuidutils.generate_uuid(dashed=False), - 'segmentation_type': 'vlan', - 'segmentation_id': 100}], - } - - # Overwrite default attributes. - trunk_attrs.update(attrs) - return copy.deepcopy(trunk_attrs) - - @staticmethod - def create_trunks(attrs=None, count=2): - """Create multiple fake trunks. - - :param Dictionary attrs: - A dictionary with all attributes - :param int count: - The number of routers to fake - :return: - A list of dictionaries faking the trunks - """ - trunks = [] - for i in range(0, count): - trunks.append(FakeTrunk.create_one_trunk(attrs)) - - return trunks - - @staticmethod - def get_trunks(trunks=None, count=2): - """Get an iterable Mock object with a list of faked trunks. - - If trunks list is provided, then initialize the Mock object with the - list. Otherwise create one. - - :param List trunks: - A list of FakeResource objects faking trunks - :param int count: - The number of trunks to fake - :return: - An iterable Mock object with side_effect set to a list of faked - trunks - """ - if trunks is None: - trunks = FakeTrunk.create_trunks(count) - return mock.Mock(side_effect=trunks) diff --git a/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py b/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py deleted file mode 100644 index 1e16c1f..0000000 --- a/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py +++ /dev/null @@ -1,769 +0,0 @@ -# Copyright 2016 ZTE Corporation. -# All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import copy -from unittest import mock -from unittest.mock import call - -from osc_lib.cli import format_columns -from osc_lib import exceptions -from osc_lib.tests import utils as tests_utils -import testtools - -from neutronclient.osc.v2.trunk import network_trunk as trunk -from neutronclient.osc.v2 import utils as v2_utils -from neutronclient.tests.unit.osc.v2 import fakes as test_fakes -from neutronclient.tests.unit.osc.v2.trunk import fakes - - -def _get_id(client, id_or_name, resource): - return id_or_name - - -class TestCreateNetworkTrunk(test_fakes.TestNeutronClientOSCV2): - # The new trunk created - _trunk = fakes.FakeTrunk.create_one_trunk() - - columns = ( - 'admin_state_up', - 'description', - 'id', - 'name', - 'port_id', - 'project_id', - 'status', - 'sub_ports', - ) - - def get_data(self): - return ( - v2_utils.AdminStateColumn(self._trunk['admin_state_up']), - self._trunk['description'], - self._trunk['id'], - self._trunk['name'], - self._trunk['port_id'], - self._trunk['project_id'], - self._trunk['status'], - format_columns.ListDictColumn(self._trunk['sub_ports']), - ) - - def setUp(self): - super(TestCreateNetworkTrunk, self).setUp() - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.create_trunk = mock.Mock( - return_value={trunk.TRUNK: self._trunk}) - self.data = self.get_data() - - # Get the command object to test - self.cmd = trunk.CreateNetworkTrunk(self.app, self.namespace) - - def test_create_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_create_default_options(self): - arglist = [ - "--parent-port", self._trunk['port_id'], - self._trunk['name'], - ] - verifylist = [ - ('parent_port', self._trunk['port_id']), - ('name', self._trunk['name']), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.neutronclient.create_trunk.assert_called_once_with({ - trunk.TRUNK: {'name': self._trunk['name'], - 'admin_state_up': self._trunk['admin_state_up'], - 'port_id': self._trunk['port_id']} - }) - self.assertEqual(self.columns, columns) - self.assertItemEqual(self.data, data) - - def test_create_full_options(self): - self._trunk['description'] = 'foo description' - self.data = self.get_data() - subport = self._trunk['sub_ports'][0] - arglist = [ - "--disable", - "--description", self._trunk['description'], - "--parent-port", self._trunk['port_id'], - "--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,' - 'segmentation-id=%(seg_id)s' % { - 'seg_id': subport['segmentation_id'], - 'seg_type': subport['segmentation_type'], - 'port': subport['port_id']}, - self._trunk['name'], - ] - verifylist = [ - ('name', self._trunk['name']), - ('description', self._trunk['description']), - ('parent_port', self._trunk['port_id']), - ('add_subports', [{ - 'port': subport['port_id'], - 'segmentation-id': str(subport['segmentation_id']), - 'segmentation-type': subport['segmentation_type']}]), - ('disable', True), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = (self.cmd.take_action(parsed_args)) - - self.neutronclient.create_trunk.assert_called_once_with({ - trunk.TRUNK: {'name': self._trunk['name'], - 'description': self._trunk['description'], - 'admin_state_up': False, - 'sub_ports': [subport], - 'port_id': self._trunk['port_id']} - }) - self.assertEqual(self.columns, columns) - self.assertItemEqual(self.data, data) - - def test_create_trunk_with_subport_invalid_segmentation_id_fail(self): - subport = self._trunk['sub_ports'][0] - arglist = [ - "--parent-port", self._trunk['port_id'], - "--subport", "port=%(port)s,segmentation-type=%(seg_type)s," - "segmentation-id=boom" % { - 'seg_type': subport['segmentation_type'], - 'port': subport['port_id']}, - self._trunk['name'], - ] - verifylist = [ - ('name', self._trunk['name']), - ('parent_port', self._trunk['port_id']), - ('add_subports', [{ - 'port': subport['port_id'], - 'segmentation-id': 'boom', - 'segmentation-type': subport['segmentation_type']}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - with testtools.ExpectedException(exceptions.CommandError) as e: - self.cmd.take_action(parsed_args) - self.assertEqual("Segmentation-id 'boom' is not an integer", - str(e)) - - def test_create_network_trunk_subports_without_optional_keys(self): - subport = copy.copy(self._trunk['sub_ports'][0]) - # Pop out the segmentation-id and segmentation-type - subport.pop('segmentation_type') - subport.pop('segmentation_id') - arglist = [ - '--parent-port', self._trunk['port_id'], - '--subport', 'port=%(port)s' % {'port': subport['port_id']}, - self._trunk['name'], - ] - verifylist = [ - ('name', self._trunk['name']), - ('parent_port', self._trunk['port_id']), - ('add_subports', [{ - 'port': subport['port_id']}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - columns, data = (self.cmd.take_action(parsed_args)) - - self.neutronclient.create_trunk.assert_called_once_with({ - trunk.TRUNK: {'name': self._trunk['name'], - 'admin_state_up': True, - 'sub_ports': [subport], - 'port_id': self._trunk['port_id']} - }) - self.assertEqual(self.columns, columns) - self.assertItemEqual(self.data, data) - - def test_create_network_trunk_subports_without_required_key_fail(self): - subport = self._trunk['sub_ports'][0] - arglist = [ - '--parent-port', self._trunk['port_id'], - '--subport', 'segmentation-type=%(seg_type)s,' - 'segmentation-id=%(seg_id)s' % { - 'seg_id': subport['segmentation_id'], - 'seg_type': subport['segmentation_type']}, - self._trunk['name'], - ] - verifylist = [ - ('name', self._trunk['name']), - ('parent_port', self._trunk['port_id']), - ('add_subports', [{ - 'segmentation-id': str(subport['segmentation_id']), - 'segmentation-type': subport['segmentation_type']}]), - ] - - with testtools.ExpectedException(argparse.ArgumentTypeError): - self.check_parser(self.cmd, arglist, verifylist) - - -class TestDeleteNetworkTrunk(test_fakes.TestNeutronClientOSCV2): - # The trunk to be deleted. - _trunks = fakes.FakeTrunk.create_trunks(count=2) - - def setUp(self): - super(TestDeleteNetworkTrunk, self).setUp() - - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.delete_trunk = mock.Mock(return_value=None) - - # Get the command object to test - self.cmd = trunk.DeleteNetworkTrunk(self.app, self.namespace) - - def test_delete_trunk(self): - arglist = [ - self._trunks[0]['name'], - ] - verifylist = [ - ('trunk', [self._trunks[0]['name']]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - self.neutronclient.delete_trunk.assert_called_once_with( - self._trunks[0]['name']) - self.assertIsNone(result) - - def test_delete_trunk_multiple(self): - arglist = [] - verifylist = [] - - for t in self._trunks: - arglist.append(t['name']) - verifylist = [ - ('trunk', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for t in self._trunks: - calls.append(call(t['name'])) - self.neutronclient.delete_trunk.assert_has_calls(calls) - self.assertIsNone(result) - - def test_delete_trunk_multiple_with_exception(self): - arglist = [ - self._trunks[0]['name'], - 'unexist_trunk', - ] - verifylist = [ - ('trunk', - [self._trunks[0]['name'], 'unexist_trunk']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - get_mock_result = [self._trunks[0], exceptions.CommandError] - trunk._get_id = ( - mock.Mock(side_effect=get_mock_result) - ) - with testtools.ExpectedException(exceptions.CommandError) as e: - self.cmd.take_action(parsed_args) - self.assertEqual('1 of 2 trunks failed to delete.', str(e)) - self.neutronclient.delete_trunk.assert_called_once_with( - self._trunks[0] - ) - - -class TestShowNetworkTrunk(test_fakes.TestNeutronClientOSCV2): - - # The trunk to set. - _trunk = fakes.FakeTrunk.create_one_trunk() - - columns = ( - 'admin_state_up', - 'description', - 'id', - 'name', - 'port_id', - 'project_id', - 'status', - 'sub_ports', - ) - data = ( - v2_utils.AdminStateColumn(_trunk['admin_state_up']), - _trunk['description'], - _trunk['id'], - _trunk['name'], - _trunk['port_id'], - _trunk['project_id'], - _trunk['status'], - format_columns.ListDictColumn(_trunk['sub_ports']), - ) - - def setUp(self): - super(TestShowNetworkTrunk, self).setUp() - - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.show_trunk = mock.Mock( - return_value={trunk.TRUNK: self._trunk}) - - # Get the command object to test - self.cmd = trunk.ShowNetworkTrunk(self.app, self.namespace) - - def test_show_no_options(self): - arglist = [] - verifylist = [] - - self.assertRaises(tests_utils.ParserException, self.check_parser, - self.cmd, arglist, verifylist) - - def test_show_all_options(self): - arglist = [ - self._trunk['id'], - ] - verifylist = [ - ('trunk', self._trunk['id']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.neutronclient.show_trunk.assert_called_once_with( - self._trunk['id']) - self.assertEqual(self.columns, columns) - self.assertItemEqual(self.data, data) - - -class TestListNetworkTrunk(test_fakes.TestNeutronClientOSCV2): - # Create trunks to be listed. - _trunks = fakes.FakeTrunk.create_trunks( - {'created_at': '2001-01-01 00:00:00', - 'updated_at': '2001-01-01 00:00:00'}, count=3) - - columns = ( - 'ID', - 'Name', - 'Parent Port', - 'Description' - ) - columns_long = columns + ( - 'Status', - 'State', - 'Created At', - 'Updated At' - ) - data = [] - for t in _trunks: - data.append(( - t['id'], - t['name'], - t['port_id'], - t['description'] - )) - data_long = [] - for t in _trunks: - data_long.append(( - t['id'], - t['name'], - t['port_id'], - t['description'], - t['status'], - v2_utils.AdminStateColumn(t['admin_state_up']), - '2001-01-01 00:00:00', - '2001-01-01 00:00:00', - )) - - def setUp(self): - super(TestListNetworkTrunk, self).setUp() - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.list_trunks = mock.Mock( - return_value={trunk.TRUNKS: self._trunks}) - - # Get the command object to test - self.cmd = trunk.ListNetworkTrunk(self.app, self.namespace) - - def test_trunk_list_no_option(self): - arglist = [] - verifylist = [] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.neutronclient.list_trunks.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertListItemEqual(self.data, list(data)) - - def test_trunk_list_long(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.neutronclient.list_trunks.assert_called_once_with() - self.assertEqual(self.columns_long, columns) - self.assertListItemEqual(self.data_long, list(data)) - - -class TestSetNetworkTrunk(test_fakes.TestNeutronClientOSCV2): - # Create trunks to be listed. - _trunk = fakes.FakeTrunk.create_one_trunk() - - columns = ( - 'admin_state_up', - 'id', - 'name', - 'description', - 'port_id', - 'project_id', - 'status', - 'sub_ports', - ) - data = ( - v2_utils.AdminStateColumn(_trunk['admin_state_up']), - _trunk['id'], - _trunk['name'], - _trunk['description'], - _trunk['port_id'], - _trunk['project_id'], - _trunk['status'], - format_columns.ListDictColumn(_trunk['sub_ports']), - ) - - def setUp(self): - super(TestSetNetworkTrunk, self).setUp() - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.update_trunk = mock.Mock( - return_value={trunk.TRUNK: self._trunk}) - self.neutronclient.trunk_add_subports = mock.Mock( - return_value=self._trunk) - - # Get the command object to test - self.cmd = trunk.SetNetworkTrunk(self.app, self.namespace) - - def _test_set_network_trunk_attr(self, attr, value): - arglist = [ - '--%s' % attr, value, - self._trunk[attr], - ] - verifylist = [ - (attr, value), - ('trunk', self._trunk[attr]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - attr: value, - } - self.neutronclient.update_trunk.assert_called_once_with( - self._trunk[attr], {trunk.TRUNK: attrs}) - self.assertIsNone(result) - - def test_set_network_trunk_name(self): - self._test_set_network_trunk_attr('name', 'trunky') - - def test_test_set_network_trunk_description(self): - self._test_set_network_trunk_attr('description', 'description') - - def test_set_network_trunk_admin_state_up_disable(self): - arglist = [ - '--disable', - self._trunk['name'], - ] - verifylist = [ - ('disable', True), - ('trunk', self._trunk['name']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': False, - } - self.neutronclient.update_trunk.assert_called_once_with( - self._trunk['name'], {trunk.TRUNK: attrs}) - self.assertIsNone(result) - - def test_set_network_trunk_admin_state_up_enable(self): - arglist = [ - '--enable', - self._trunk['name'], - ] - verifylist = [ - ('enable', True), - ('trunk', self._trunk['name']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - attrs = { - 'admin_state_up': True, - } - self.neutronclient.update_trunk.assert_called_once_with( - self._trunk['name'], {trunk.TRUNK: attrs}) - self.assertIsNone(result) - - def test_set_network_trunk_nothing(self): - arglist = [self._trunk['name'], ] - verifylist = [('trunk', self._trunk['name']), ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - attrs = {} - self.neutronclient.update_trunk.assert_called_once_with( - self._trunk['name'], {trunk.TRUNK: attrs}) - self.assertIsNone(result) - - def test_set_network_trunk_subports(self): - subport = self._trunk['sub_ports'][0] - arglist = [ - '--subport', 'port=%(port)s,segmentation-type=%(seg_type)s,' - 'segmentation-id=%(seg_id)s' % { - 'seg_id': subport['segmentation_id'], - 'seg_type': subport['segmentation_type'], - 'port': subport['port_id']}, - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ('set_subports', [{ - 'port': subport['port_id'], - 'segmentation-id': str(subport['segmentation_id']), - 'segmentation-type': subport['segmentation_type']}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.neutronclient.trunk_add_subports.assert_called_once_with( - self._trunk['name'], {'sub_ports': [subport]} - ) - self.assertIsNone(result) - - def test_set_network_trunk_subports_without_optional_keys(self): - subport = copy.copy(self._trunk['sub_ports'][0]) - # Pop out the segmentation-id and segmentation-type - subport.pop('segmentation_type') - subport.pop('segmentation_id') - arglist = [ - '--subport', 'port=%(port)s' % {'port': subport['port_id']}, - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ('set_subports', [{ - 'port': subport['port_id']}]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.neutronclient.trunk_add_subports.assert_called_once_with( - self._trunk['name'], {'sub_ports': [subport]} - ) - self.assertIsNone(result) - - def test_set_network_trunk_subports_without_required_key_fail(self): - subport = self._trunk['sub_ports'][0] - arglist = [ - '--subport', 'segmentation-type=%(seg_type)s,' - 'segmentation-id=%(seg_id)s' % { - 'seg_id': subport['segmentation_id'], - 'seg_type': subport['segmentation_type']}, - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ('set_subports', [{ - 'segmentation-id': str(subport['segmentation_id']), - 'segmentation-type': subport['segmentation_type']}]), - ] - - with testtools.ExpectedException(argparse.ArgumentTypeError): - self.check_parser(self.cmd, arglist, verifylist) - - self.neutronclient.trunk_add_subports.assert_not_called() - - def test_set_trunk_attrs_with_exception(self): - arglist = [ - '--name', 'reallylongname', - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ('name', 'reallylongname'), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.neutronclient.update_trunk = ( - mock.Mock(side_effect=exceptions.CommandError) - ) - with testtools.ExpectedException(exceptions.CommandError) as e: - self.cmd.take_action(parsed_args) - self.assertEqual( - "Failed to set trunk '%s': " % self._trunk['name'], - str(e)) - attrs = {'name': 'reallylongname'} - self.neutronclient.update_trunk.assert_called_once_with( - self._trunk['name'], {trunk.TRUNK: attrs}) - self.neutronclient.trunk_add_subports.assert_not_called() - - def test_set_trunk_add_subport_with_exception(self): - arglist = [ - '--subport', 'port=invalid_subport', - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ('set_subports', [{'port': 'invalid_subport'}]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - self.neutronclient.trunk_add_subports = ( - mock.Mock(side_effect=exceptions.CommandError) - ) - with testtools.ExpectedException(exceptions.CommandError) as e: - self.cmd.take_action(parsed_args) - self.assertEqual( - "Failed to add subports to trunk '%s': " % self._trunk['name'], - str(e)) - self.neutronclient.update_trunk.assert_called_once_with( - self._trunk['name'], {trunk.TRUNK: {}}) - self.neutronclient.trunk_add_subports.assert_called_once_with( - self._trunk['name'], - {'sub_ports': [{'port_id': 'invalid_subport'}]} - ) - - -class TestListNetworkSubport(test_fakes.TestNeutronClientOSCV2): - - _trunk = fakes.FakeTrunk.create_one_trunk() - _subports = _trunk['sub_ports'] - - columns = ( - 'Port', - 'Segmentation Type', - 'Segmentation ID', - ) - data = [] - for s in _subports: - data.append(( - s['port_id'], - s['segmentation_type'], - s['segmentation_id'], - )) - - def setUp(self): - super(TestListNetworkSubport, self).setUp() - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.trunk_get_subports = mock.Mock( - return_value={trunk.SUB_PORTS: self._subports}) - - # Get the command object to test - self.cmd = trunk.ListNetworkSubport(self.app, self.namespace) - - def test_subport_list(self): - arglist = [ - '--trunk', self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.neutronclient.trunk_get_subports.assert_called_once_with( - self._trunk['name']) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - -class TestUnsetNetworkTrunk(test_fakes.TestNeutronClientOSCV2): - - _trunk = fakes.FakeTrunk.create_one_trunk() - - columns = ( - 'admin_state_up', - 'id', - 'name', - 'port_id', - 'project_id', - 'status', - 'sub_ports', - ) - data = ( - v2_utils.AdminStateColumn(_trunk['admin_state_up']), - _trunk['id'], - _trunk['name'], - _trunk['port_id'], - _trunk['project_id'], - _trunk['status'], - format_columns.ListDictColumn(_trunk['sub_ports']), - ) - - def setUp(self): - super(TestUnsetNetworkTrunk, self).setUp() - - mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', - new=_get_id).start() - self.neutronclient.trunk_remove_subports = mock.Mock( - return_value=None) - - # Get the command object to test - self.cmd = trunk.UnsetNetworkTrunk(self.app, self.namespace) - - def test_unset_network_trunk_subport(self): - subport = self._trunk['sub_ports'][0] - arglist = [ - "--subport", subport['port_id'], - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ('unset_subports', [subport['port_id']]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.neutronclient.trunk_remove_subports.assert_called_once_with( - self._trunk['name'], - {trunk.SUB_PORTS: [{'port_id': subport['port_id']}]} - ) - self.assertIsNone(result) - - def test_unset_subport_no_arguments_fail(self): - arglist = [ - self._trunk['name'], - ] - verifylist = [ - ('trunk', self._trunk['name']), - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, verifylist) diff --git a/setup.cfg b/setup.cfg index 5c62570..432ec68 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,13 +33,6 @@ openstack.cli.extension = neutronclient = neutronclient.osc.plugin openstack.neutronclient.v2 = - network_subport_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkSubport - network_trunk_create = neutronclient.osc.v2.trunk.network_trunk:CreateNetworkTrunk - network_trunk_delete = neutronclient.osc.v2.trunk.network_trunk:DeleteNetworkTrunk - network_trunk_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkTrunk - network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk - network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk - network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk sfc_flow_classifier_create = neutronclient.osc.v2.sfc.sfc_flow_classifier:CreateSfcFlowClassifier sfc_flow_classifier_delete = neutronclient.osc.v2.sfc.sfc_flow_classifier:DeleteSfcFlowClassifier sfc_flow_classifier_list = neutronclient.osc.v2.sfc.sfc_flow_classifier:ListSfcFlowClassifier -- cgit v1.2.1