summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorelajkat <lajos.katona@est.tech>2023-01-06 13:10:16 +0100
committerelajkat <lajos.katona@est.tech>2023-01-16 14:18:27 +0100
commit68cbf56f9ccf9ecd4a53df5c684c2036d78a8612 (patch)
tree193427a20a8a08189139f0a793b30c855e5eaf3a
parent33f1c89a840262ad378bf82fde5b42a07ad271db (diff)
downloadpython-neutronclient-68cbf56f9ccf9ecd4a53df5c684c2036d78a8612.tar.gz
Move network trunk commands from python-neutronclient
The depends-on patch adds trunk commands to OSC, as we can long consider trunk operations as core Networking operations. Change-Id: Ie557a5d541cf117d20f3f2b548620a74dbadb383 Depends-On: https://review.opendev.org/c/openstack/python-openstackclient/+/869447 Related-Bug: #1999774
-rw-r--r--doc/source/cli/osc/v2/network-trunk.rst16
-rw-r--r--neutronclient/osc/v2/trunk/__init__.py0
-rw-r--r--neutronclient/osc/v2/trunk/network_trunk.py393
-rw-r--r--neutronclient/tests/unit/osc/v2/trunk/__init__.py0
-rw-r--r--neutronclient/tests/unit/osc/v2/trunk/fakes.py87
-rw-r--r--neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py769
-rw-r--r--setup.cfg7
7 files changed, 0 insertions, 1272 deletions
diff --git a/doc/source/cli/osc/v2/network-trunk.rst b/doc/source/cli/osc/v2/network-trunk.rst
deleted file mode 100644
index 22144a4..0000000
--- a/doc/source/cli/osc/v2/network-trunk.rst
+++ /dev/null
@@ -1,16 +0,0 @@
-=============
-network trunk
-=============
-
-A **network trunk** is a container to group logical ports from different
-networks and provide a single trunked vNIC for servers. It consists of
-one parent port which is a regular VIF and multiple subports which allow
-the server to connect to more networks.
-
-Network v2
-
-.. autoprogram-cliff:: openstack.neutronclient.v2
- :command: network subport list
-
-.. autoprogram-cliff:: openstack.neutronclient.v2
- :command: network trunk *
diff --git a/neutronclient/osc/v2/trunk/__init__.py b/neutronclient/osc/v2/trunk/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutronclient/osc/v2/trunk/__init__.py
+++ /dev/null
diff --git a/neutronclient/osc/v2/trunk/network_trunk.py b/neutronclient/osc/v2/trunk/network_trunk.py
deleted file mode 100644
index 26cb52b..0000000
--- a/neutronclient/osc/v2/trunk/network_trunk.py
+++ /dev/null
@@ -1,393 +0,0 @@
-# Copyright 2016 ZTE Corporation.
-# All Rights Reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-"""Network trunk and subports action implementations"""
-import logging
-
-from osc_lib.cli import format_columns
-from osc_lib.cli import parseractions
-from osc_lib.command import command
-from osc_lib import exceptions
-from osc_lib import utils as osc_utils
-
-from neutronclient._i18n import _
-from neutronclient.osc import utils as nc_osc_utils
-from neutronclient.osc.v2 import utils as v2_utils
-
-LOG = logging.getLogger(__name__)
-
-TRUNK = 'trunk'
-TRUNKS = 'trunks'
-SUB_PORTS = 'sub_ports'
-
-
-class CreateNetworkTrunk(command.ShowOne):
- """Create a network trunk for a given project"""
-
- def get_parser(self, prog_name):
- parser = super(CreateNetworkTrunk, self).get_parser(prog_name)
- parser.add_argument(
- 'name',
- metavar='<name>',
- help=_("Name of the trunk to create")
- )
- parser.add_argument(
- '--description',
- metavar='<description>',
- help=_("A description of the trunk")
- )
- parser.add_argument(
- '--parent-port',
- metavar='<parent-port>',
- required=True,
- help=_("Parent port belonging to this trunk (name or ID)")
- )
- parser.add_argument(
- '--subport',
- metavar='<port=,segmentation-type=,segmentation-id=>',
- action=parseractions.MultiKeyValueAction, dest='add_subports',
- optional_keys=['segmentation-id', 'segmentation-type'],
- required_keys=['port'],
- help=_("Subport to add. Subport is of form "
- "\'port=<name or ID>,segmentation-type=,segmentation-ID=\' "
- "(--subport) option can be repeated")
- )
- admin_group = parser.add_mutually_exclusive_group()
- admin_group.add_argument(
- '--enable',
- action='store_true',
- default=True,
- help=_("Enable trunk (default)")
- )
- admin_group.add_argument(
- '--disable',
- action='store_true',
- help=_("Disable trunk")
- )
- nc_osc_utils.add_project_owner_option_to_parser(parser)
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- attrs = _get_attrs_for_trunk(self.app.client_manager,
- parsed_args)
- body = {TRUNK: attrs}
- obj = client.create_trunk(body)
- columns = _get_columns(obj[TRUNK])
- data = osc_utils.get_dict_properties(obj[TRUNK], columns,
- formatters=_formatters)
- return columns, data
-
-
-class DeleteNetworkTrunk(command.Command):
- """Delete a given network trunk"""
-
- def get_parser(self, prog_name):
- parser = super(DeleteNetworkTrunk, self).get_parser(prog_name)
- parser.add_argument(
- 'trunk',
- metavar="<trunk>",
- nargs="+",
- help=_("Trunk(s) to delete (name or ID)")
- )
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- result = 0
- for trunk in parsed_args.trunk:
- try:
- trunk_id = _get_id(client, trunk, TRUNK)
- client.delete_trunk(trunk_id)
- except Exception as e:
- result += 1
- LOG.error(_("Failed to delete trunk with name "
- "or ID '%(trunk)s': %(e)s"),
- {'trunk': trunk, 'e': e})
- if result > 0:
- total = len(parsed_args.trunk)
- msg = (_("%(result)s of %(total)s trunks failed "
- "to delete.") % {'result': result, 'total': total})
- raise exceptions.CommandError(msg)
-
-
-class ListNetworkTrunk(command.Lister):
- """List all network trunks"""
-
- def get_parser(self, prog_name):
- parser = super(ListNetworkTrunk, self).get_parser(prog_name)
- parser.add_argument(
- '--long',
- action='store_true',
- default=False,
- help=_("List additional fields in output")
- )
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- data = client.list_trunks()
- headers = (
- 'ID',
- 'Name',
- 'Parent Port',
- 'Description'
- )
- columns = (
- 'id',
- 'name',
- 'port_id',
- 'description'
- )
- if parsed_args.long:
- headers += (
- 'Status',
- 'State',
- 'Created At',
- 'Updated At',
- )
- columns += (
- 'status',
- 'admin_state_up',
- 'created_at',
- 'updated_at'
- )
- return (headers,
- (osc_utils.get_dict_properties(
- s, columns,
- formatters=_formatters,
- ) for s in data[TRUNKS]))
-
-
-class SetNetworkTrunk(command.Command):
- """Set network trunk properties"""
-
- def get_parser(self, prog_name):
- parser = super(SetNetworkTrunk, self).get_parser(prog_name)
- parser.add_argument(
- 'trunk',
- metavar="<trunk>",
- help=_("Trunk to modify (name or ID)")
- )
- parser.add_argument(
- '--name',
- metavar="<name>",
- help=_("Set trunk name")
- )
- parser.add_argument(
- '--description',
- metavar='<description>',
- help=_("A description of the trunk")
- )
- parser.add_argument(
- '--subport',
- metavar='<port=,segmentation-type=,segmentation-id=>',
- action=parseractions.MultiKeyValueAction, dest='set_subports',
- optional_keys=['segmentation-id', 'segmentation-type'],
- required_keys=['port'],
- help=_("Subport to add. Subport is of form "
- "\'port=<name or ID>,segmentation-type=,segmentation-ID=\'"
- "(--subport) option can be repeated")
- )
- admin_group = parser.add_mutually_exclusive_group()
- admin_group.add_argument(
- '--enable',
- action='store_true',
- help=_("Enable trunk")
- )
- admin_group.add_argument(
- '--disable',
- action='store_true',
- help=_("Disable trunk")
- )
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
- attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args)
- body = {TRUNK: attrs}
- try:
- client.update_trunk(trunk_id, body)
- except Exception as e:
- msg = (_("Failed to set trunk '%(t)s': %(e)s")
- % {'t': parsed_args.trunk, 'e': e})
- raise exceptions.CommandError(msg)
- if parsed_args.set_subports:
- subport_attrs = _get_attrs_for_subports(self.app.client_manager,
- parsed_args)
- try:
- client.trunk_add_subports(trunk_id, subport_attrs)
- except Exception as e:
- msg = (_("Failed to add subports to trunk '%(t)s': %(e)s")
- % {'t': parsed_args.trunk, 'e': e})
- raise exceptions.CommandError(msg)
-
-
-class ShowNetworkTrunk(command.ShowOne):
- """Show information of a given network trunk"""
- def get_parser(self, prog_name):
- parser = super(ShowNetworkTrunk, self).get_parser(prog_name)
- parser.add_argument(
- 'trunk',
- metavar="<trunk>",
- help=_("Trunk to display (name or ID)")
- )
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
- obj = client.show_trunk(trunk_id)
- columns = _get_columns(obj[TRUNK])
- data = osc_utils.get_dict_properties(obj[TRUNK], columns,
- formatters=_formatters)
- return columns, data
-
-
-class ListNetworkSubport(command.Lister):
- """List all subports for a given network trunk"""
-
- def get_parser(self, prog_name):
- parser = super(ListNetworkSubport, self).get_parser(prog_name)
- parser.add_argument(
- '--trunk',
- required=True,
- metavar="<trunk>",
- help=_("List subports belonging to this trunk (name or ID)")
- )
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
- data = client.trunk_get_subports(trunk_id)
- headers = ('Port', 'Segmentation Type', 'Segmentation ID')
- columns = ('port_id', 'segmentation_type', 'segmentation_id')
- return (headers,
- (osc_utils.get_dict_properties(
- s, columns,
- ) for s in data[SUB_PORTS]))
-
-
-class UnsetNetworkTrunk(command.Command):
- """Unset subports from a given network trunk"""
-
- def get_parser(self, prog_name):
- parser = super(UnsetNetworkTrunk, self).get_parser(prog_name)
- parser.add_argument(
- 'trunk',
- metavar="<trunk>",
- help=_("Unset subports from this trunk (name or ID)")
- )
- parser.add_argument(
- '--subport',
- metavar="<subport>",
- required=True,
- action='append', dest='unset_subports',
- help=_("Subport to delete (name or ID of the port) "
- "(--subport) option can be repeated")
- )
- return parser
-
- def take_action(self, parsed_args):
- client = self.app.client_manager.neutronclient
- attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args)
- trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
- client.trunk_remove_subports(trunk_id, attrs)
-
-
-_formatters = {
- 'admin_state_up': v2_utils.AdminStateColumn,
- 'sub_ports': format_columns.ListDictColumn,
-}
-
-
-def _get_columns(item):
- return tuple(sorted(list(item.keys())))
-
-
-def _get_attrs_for_trunk(client_manager, parsed_args):
- attrs = {}
- if parsed_args.name is not None:
- attrs['name'] = str(parsed_args.name)
- if parsed_args.description is not None:
- attrs['description'] = str(parsed_args.description)
- if parsed_args.enable:
- attrs['admin_state_up'] = True
- if parsed_args.disable:
- attrs['admin_state_up'] = False
- if 'parent_port' in parsed_args and parsed_args.parent_port is not None:
- port_id = _get_id(client_manager.neutronclient,
- parsed_args.parent_port, 'port')
- attrs['port_id'] = port_id
- if 'add_subports' in parsed_args and parsed_args.add_subports is not None:
- attrs[SUB_PORTS] = _format_subports(client_manager,
- parsed_args.add_subports)
-
- # "trunk set" command doesn't support setting project.
- if 'project' in parsed_args and parsed_args.project is not None:
- identity_client = client_manager.identity
- project_id = nc_osc_utils.find_project(
- identity_client,
- parsed_args.project,
- parsed_args.project_domain,
- ).id
- attrs['tenant_id'] = project_id
-
- return attrs
-
-
-def _format_subports(client_manager, subports):
- attrs = []
- for subport in subports:
- subport_attrs = {}
- if subport.get('port'):
- port_id = _get_id(client_manager.neutronclient,
- subport['port'], 'port')
- subport_attrs['port_id'] = port_id
- if subport.get('segmentation-id'):
- try:
- subport_attrs['segmentation_id'] = int(
- subport['segmentation-id'])
- except ValueError:
- msg = (_("Segmentation-id '%s' is not an integer") %
- subport['segmentation-id'])
- raise exceptions.CommandError(msg)
- if subport.get('segmentation-type'):
- subport_attrs['segmentation_type'] = subport['segmentation-type']
- attrs.append(subport_attrs)
- return attrs
-
-
-def _get_attrs_for_subports(client_manager, parsed_args):
- attrs = {}
- if 'set_subports' in parsed_args and parsed_args.set_subports is not None:
- attrs[SUB_PORTS] = _format_subports(client_manager,
- parsed_args.set_subports)
- if ('unset_subports' in parsed_args and
- parsed_args.unset_subports is not None):
- subports_list = []
- for subport in parsed_args.unset_subports:
- port_id = _get_id(client_manager.neutronclient,
- subport, 'port')
- subports_list.append({'port_id': port_id})
- attrs[SUB_PORTS] = subports_list
- return attrs
-
-
-def _get_id(client, id_or_name, resource):
- return client.find_resource(resource, str(id_or_name))['id']
diff --git a/neutronclient/tests/unit/osc/v2/trunk/__init__.py b/neutronclient/tests/unit/osc/v2/trunk/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutronclient/tests/unit/osc/v2/trunk/__init__.py
+++ /dev/null
diff --git a/neutronclient/tests/unit/osc/v2/trunk/fakes.py b/neutronclient/tests/unit/osc/v2/trunk/fakes.py
deleted file mode 100644
index a52aa30..0000000
--- a/neutronclient/tests/unit/osc/v2/trunk/fakes.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-from unittest import mock
-
-from oslo_utils import uuidutils
-
-
-class FakeTrunk(object):
- """Fake one or more trunks."""
- @staticmethod
- def create_one_trunk(attrs=None):
- """Create a fake trunk.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :return:
- A Dictionary with id, name, description, admin_state_up, port_id,
- sub_ports, status and project_id
- """
- attrs = attrs or {}
-
- # Set default attributes.
- trunk_attrs = {
- 'id': 'trunk-id-' + uuidutils.generate_uuid(dashed=False),
- 'name': 'trunk-name-' + uuidutils.generate_uuid(dashed=False),
- 'description': '',
- 'port_id': 'port-' + uuidutils.generate_uuid(dashed=False),
- 'admin_state_up': True,
- 'project_id': 'project-id-' +
- uuidutils.generate_uuid(dashed=False),
- 'status': 'ACTIVE',
- 'sub_ports': [{'port_id': 'subport-' +
- uuidutils.generate_uuid(dashed=False),
- 'segmentation_type': 'vlan',
- 'segmentation_id': 100}],
- }
-
- # Overwrite default attributes.
- trunk_attrs.update(attrs)
- return copy.deepcopy(trunk_attrs)
-
- @staticmethod
- def create_trunks(attrs=None, count=2):
- """Create multiple fake trunks.
-
- :param Dictionary attrs:
- A dictionary with all attributes
- :param int count:
- The number of routers to fake
- :return:
- A list of dictionaries faking the trunks
- """
- trunks = []
- for i in range(0, count):
- trunks.append(FakeTrunk.create_one_trunk(attrs))
-
- return trunks
-
- @staticmethod
- def get_trunks(trunks=None, count=2):
- """Get an iterable Mock object with a list of faked trunks.
-
- If trunks list is provided, then initialize the Mock object with the
- list. Otherwise create one.
-
- :param List trunks:
- A list of FakeResource objects faking trunks
- :param int count:
- The number of trunks to fake
- :return:
- An iterable Mock object with side_effect set to a list of faked
- trunks
- """
- if trunks is None:
- trunks = FakeTrunk.create_trunks(count)
- return mock.Mock(side_effect=trunks)
diff --git a/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py b/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py
deleted file mode 100644
index 1e16c1f..0000000
--- a/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py
+++ /dev/null
@@ -1,769 +0,0 @@
-# Copyright 2016 ZTE Corporation.
-# All Rights Reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import argparse
-import copy
-from unittest import mock
-from unittest.mock import call
-
-from osc_lib.cli import format_columns
-from osc_lib import exceptions
-from osc_lib.tests import utils as tests_utils
-import testtools
-
-from neutronclient.osc.v2.trunk import network_trunk as trunk
-from neutronclient.osc.v2 import utils as v2_utils
-from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
-from neutronclient.tests.unit.osc.v2.trunk import fakes
-
-
-def _get_id(client, id_or_name, resource):
- return id_or_name
-
-
-class TestCreateNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
- # The new trunk created
- _trunk = fakes.FakeTrunk.create_one_trunk()
-
- columns = (
- 'admin_state_up',
- 'description',
- 'id',
- 'name',
- 'port_id',
- 'project_id',
- 'status',
- 'sub_ports',
- )
-
- def get_data(self):
- return (
- v2_utils.AdminStateColumn(self._trunk['admin_state_up']),
- self._trunk['description'],
- self._trunk['id'],
- self._trunk['name'],
- self._trunk['port_id'],
- self._trunk['project_id'],
- self._trunk['status'],
- format_columns.ListDictColumn(self._trunk['sub_ports']),
- )
-
- def setUp(self):
- super(TestCreateNetworkTrunk, self).setUp()
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.create_trunk = mock.Mock(
- return_value={trunk.TRUNK: self._trunk})
- self.data = self.get_data()
-
- # Get the command object to test
- self.cmd = trunk.CreateNetworkTrunk(self.app, self.namespace)
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- "--parent-port", self._trunk['port_id'],
- self._trunk['name'],
- ]
- verifylist = [
- ('parent_port', self._trunk['port_id']),
- ('name', self._trunk['name']),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.neutronclient.create_trunk.assert_called_once_with({
- trunk.TRUNK: {'name': self._trunk['name'],
- 'admin_state_up': self._trunk['admin_state_up'],
- 'port_id': self._trunk['port_id']}
- })
- self.assertEqual(self.columns, columns)
- self.assertItemEqual(self.data, data)
-
- def test_create_full_options(self):
- self._trunk['description'] = 'foo description'
- self.data = self.get_data()
- subport = self._trunk['sub_ports'][0]
- arglist = [
- "--disable",
- "--description", self._trunk['description'],
- "--parent-port", self._trunk['port_id'],
- "--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,'
- 'segmentation-id=%(seg_id)s' % {
- 'seg_id': subport['segmentation_id'],
- 'seg_type': subport['segmentation_type'],
- 'port': subport['port_id']},
- self._trunk['name'],
- ]
- verifylist = [
- ('name', self._trunk['name']),
- ('description', self._trunk['description']),
- ('parent_port', self._trunk['port_id']),
- ('add_subports', [{
- 'port': subport['port_id'],
- 'segmentation-id': str(subport['segmentation_id']),
- 'segmentation-type': subport['segmentation_type']}]),
- ('disable', True),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.neutronclient.create_trunk.assert_called_once_with({
- trunk.TRUNK: {'name': self._trunk['name'],
- 'description': self._trunk['description'],
- 'admin_state_up': False,
- 'sub_ports': [subport],
- 'port_id': self._trunk['port_id']}
- })
- self.assertEqual(self.columns, columns)
- self.assertItemEqual(self.data, data)
-
- def test_create_trunk_with_subport_invalid_segmentation_id_fail(self):
- subport = self._trunk['sub_ports'][0]
- arglist = [
- "--parent-port", self._trunk['port_id'],
- "--subport", "port=%(port)s,segmentation-type=%(seg_type)s,"
- "segmentation-id=boom" % {
- 'seg_type': subport['segmentation_type'],
- 'port': subport['port_id']},
- self._trunk['name'],
- ]
- verifylist = [
- ('name', self._trunk['name']),
- ('parent_port', self._trunk['port_id']),
- ('add_subports', [{
- 'port': subport['port_id'],
- 'segmentation-id': 'boom',
- 'segmentation-type': subport['segmentation_type']}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- with testtools.ExpectedException(exceptions.CommandError) as e:
- self.cmd.take_action(parsed_args)
- self.assertEqual("Segmentation-id 'boom' is not an integer",
- str(e))
-
- def test_create_network_trunk_subports_without_optional_keys(self):
- subport = copy.copy(self._trunk['sub_ports'][0])
- # Pop out the segmentation-id and segmentation-type
- subport.pop('segmentation_type')
- subport.pop('segmentation_id')
- arglist = [
- '--parent-port', self._trunk['port_id'],
- '--subport', 'port=%(port)s' % {'port': subport['port_id']},
- self._trunk['name'],
- ]
- verifylist = [
- ('name', self._trunk['name']),
- ('parent_port', self._trunk['port_id']),
- ('add_subports', [{
- 'port': subport['port_id']}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- columns, data = (self.cmd.take_action(parsed_args))
-
- self.neutronclient.create_trunk.assert_called_once_with({
- trunk.TRUNK: {'name': self._trunk['name'],
- 'admin_state_up': True,
- 'sub_ports': [subport],
- 'port_id': self._trunk['port_id']}
- })
- self.assertEqual(self.columns, columns)
- self.assertItemEqual(self.data, data)
-
- def test_create_network_trunk_subports_without_required_key_fail(self):
- subport = self._trunk['sub_ports'][0]
- arglist = [
- '--parent-port', self._trunk['port_id'],
- '--subport', 'segmentation-type=%(seg_type)s,'
- 'segmentation-id=%(seg_id)s' % {
- 'seg_id': subport['segmentation_id'],
- 'seg_type': subport['segmentation_type']},
- self._trunk['name'],
- ]
- verifylist = [
- ('name', self._trunk['name']),
- ('parent_port', self._trunk['port_id']),
- ('add_subports', [{
- 'segmentation-id': str(subport['segmentation_id']),
- 'segmentation-type': subport['segmentation_type']}]),
- ]
-
- with testtools.ExpectedException(argparse.ArgumentTypeError):
- self.check_parser(self.cmd, arglist, verifylist)
-
-
-class TestDeleteNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
- # The trunk to be deleted.
- _trunks = fakes.FakeTrunk.create_trunks(count=2)
-
- def setUp(self):
- super(TestDeleteNetworkTrunk, self).setUp()
-
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.delete_trunk = mock.Mock(return_value=None)
-
- # Get the command object to test
- self.cmd = trunk.DeleteNetworkTrunk(self.app, self.namespace)
-
- def test_delete_trunk(self):
- arglist = [
- self._trunks[0]['name'],
- ]
- verifylist = [
- ('trunk', [self._trunks[0]['name']]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
- self.neutronclient.delete_trunk.assert_called_once_with(
- self._trunks[0]['name'])
- self.assertIsNone(result)
-
- def test_delete_trunk_multiple(self):
- arglist = []
- verifylist = []
-
- for t in self._trunks:
- arglist.append(t['name'])
- verifylist = [
- ('trunk', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for t in self._trunks:
- calls.append(call(t['name']))
- self.neutronclient.delete_trunk.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_delete_trunk_multiple_with_exception(self):
- arglist = [
- self._trunks[0]['name'],
- 'unexist_trunk',
- ]
- verifylist = [
- ('trunk',
- [self._trunks[0]['name'], 'unexist_trunk']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- get_mock_result = [self._trunks[0], exceptions.CommandError]
- trunk._get_id = (
- mock.Mock(side_effect=get_mock_result)
- )
- with testtools.ExpectedException(exceptions.CommandError) as e:
- self.cmd.take_action(parsed_args)
- self.assertEqual('1 of 2 trunks failed to delete.', str(e))
- self.neutronclient.delete_trunk.assert_called_once_with(
- self._trunks[0]
- )
-
-
-class TestShowNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
-
- # The trunk to set.
- _trunk = fakes.FakeTrunk.create_one_trunk()
-
- columns = (
- 'admin_state_up',
- 'description',
- 'id',
- 'name',
- 'port_id',
- 'project_id',
- 'status',
- 'sub_ports',
- )
- data = (
- v2_utils.AdminStateColumn(_trunk['admin_state_up']),
- _trunk['description'],
- _trunk['id'],
- _trunk['name'],
- _trunk['port_id'],
- _trunk['project_id'],
- _trunk['status'],
- format_columns.ListDictColumn(_trunk['sub_ports']),
- )
-
- def setUp(self):
- super(TestShowNetworkTrunk, self).setUp()
-
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.show_trunk = mock.Mock(
- return_value={trunk.TRUNK: self._trunk})
-
- # Get the command object to test
- self.cmd = trunk.ShowNetworkTrunk(self.app, self.namespace)
-
- def test_show_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_show_all_options(self):
- arglist = [
- self._trunk['id'],
- ]
- verifylist = [
- ('trunk', self._trunk['id']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.neutronclient.show_trunk.assert_called_once_with(
- self._trunk['id'])
- self.assertEqual(self.columns, columns)
- self.assertItemEqual(self.data, data)
-
-
-class TestListNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
- # Create trunks to be listed.
- _trunks = fakes.FakeTrunk.create_trunks(
- {'created_at': '2001-01-01 00:00:00',
- 'updated_at': '2001-01-01 00:00:00'}, count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Parent Port',
- 'Description'
- )
- columns_long = columns + (
- 'Status',
- 'State',
- 'Created At',
- 'Updated At'
- )
- data = []
- for t in _trunks:
- data.append((
- t['id'],
- t['name'],
- t['port_id'],
- t['description']
- ))
- data_long = []
- for t in _trunks:
- data_long.append((
- t['id'],
- t['name'],
- t['port_id'],
- t['description'],
- t['status'],
- v2_utils.AdminStateColumn(t['admin_state_up']),
- '2001-01-01 00:00:00',
- '2001-01-01 00:00:00',
- ))
-
- def setUp(self):
- super(TestListNetworkTrunk, self).setUp()
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.list_trunks = mock.Mock(
- return_value={trunk.TRUNKS: self._trunks})
-
- # Get the command object to test
- self.cmd = trunk.ListNetworkTrunk(self.app, self.namespace)
-
- def test_trunk_list_no_option(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.neutronclient.list_trunks.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertListItemEqual(self.data, list(data))
-
- def test_trunk_list_long(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.neutronclient.list_trunks.assert_called_once_with()
- self.assertEqual(self.columns_long, columns)
- self.assertListItemEqual(self.data_long, list(data))
-
-
-class TestSetNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
- # Create trunks to be listed.
- _trunk = fakes.FakeTrunk.create_one_trunk()
-
- columns = (
- 'admin_state_up',
- 'id',
- 'name',
- 'description',
- 'port_id',
- 'project_id',
- 'status',
- 'sub_ports',
- )
- data = (
- v2_utils.AdminStateColumn(_trunk['admin_state_up']),
- _trunk['id'],
- _trunk['name'],
- _trunk['description'],
- _trunk['port_id'],
- _trunk['project_id'],
- _trunk['status'],
- format_columns.ListDictColumn(_trunk['sub_ports']),
- )
-
- def setUp(self):
- super(TestSetNetworkTrunk, self).setUp()
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.update_trunk = mock.Mock(
- return_value={trunk.TRUNK: self._trunk})
- self.neutronclient.trunk_add_subports = mock.Mock(
- return_value=self._trunk)
-
- # Get the command object to test
- self.cmd = trunk.SetNetworkTrunk(self.app, self.namespace)
-
- def _test_set_network_trunk_attr(self, attr, value):
- arglist = [
- '--%s' % attr, value,
- self._trunk[attr],
- ]
- verifylist = [
- (attr, value),
- ('trunk', self._trunk[attr]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- attr: value,
- }
- self.neutronclient.update_trunk.assert_called_once_with(
- self._trunk[attr], {trunk.TRUNK: attrs})
- self.assertIsNone(result)
-
- def test_set_network_trunk_name(self):
- self._test_set_network_trunk_attr('name', 'trunky')
-
- def test_test_set_network_trunk_description(self):
- self._test_set_network_trunk_attr('description', 'description')
-
- def test_set_network_trunk_admin_state_up_disable(self):
- arglist = [
- '--disable',
- self._trunk['name'],
- ]
- verifylist = [
- ('disable', True),
- ('trunk', self._trunk['name']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': False,
- }
- self.neutronclient.update_trunk.assert_called_once_with(
- self._trunk['name'], {trunk.TRUNK: attrs})
- self.assertIsNone(result)
-
- def test_set_network_trunk_admin_state_up_enable(self):
- arglist = [
- '--enable',
- self._trunk['name'],
- ]
- verifylist = [
- ('enable', True),
- ('trunk', self._trunk['name']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- attrs = {
- 'admin_state_up': True,
- }
- self.neutronclient.update_trunk.assert_called_once_with(
- self._trunk['name'], {trunk.TRUNK: attrs})
- self.assertIsNone(result)
-
- def test_set_network_trunk_nothing(self):
- arglist = [self._trunk['name'], ]
- verifylist = [('trunk', self._trunk['name']), ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- attrs = {}
- self.neutronclient.update_trunk.assert_called_once_with(
- self._trunk['name'], {trunk.TRUNK: attrs})
- self.assertIsNone(result)
-
- def test_set_network_trunk_subports(self):
- subport = self._trunk['sub_ports'][0]
- arglist = [
- '--subport', 'port=%(port)s,segmentation-type=%(seg_type)s,'
- 'segmentation-id=%(seg_id)s' % {
- 'seg_id': subport['segmentation_id'],
- 'seg_type': subport['segmentation_type'],
- 'port': subport['port_id']},
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ('set_subports', [{
- 'port': subport['port_id'],
- 'segmentation-id': str(subport['segmentation_id']),
- 'segmentation-type': subport['segmentation_type']}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.neutronclient.trunk_add_subports.assert_called_once_with(
- self._trunk['name'], {'sub_ports': [subport]}
- )
- self.assertIsNone(result)
-
- def test_set_network_trunk_subports_without_optional_keys(self):
- subport = copy.copy(self._trunk['sub_ports'][0])
- # Pop out the segmentation-id and segmentation-type
- subport.pop('segmentation_type')
- subport.pop('segmentation_id')
- arglist = [
- '--subport', 'port=%(port)s' % {'port': subport['port_id']},
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ('set_subports', [{
- 'port': subport['port_id']}]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.neutronclient.trunk_add_subports.assert_called_once_with(
- self._trunk['name'], {'sub_ports': [subport]}
- )
- self.assertIsNone(result)
-
- def test_set_network_trunk_subports_without_required_key_fail(self):
- subport = self._trunk['sub_ports'][0]
- arglist = [
- '--subport', 'segmentation-type=%(seg_type)s,'
- 'segmentation-id=%(seg_id)s' % {
- 'seg_id': subport['segmentation_id'],
- 'seg_type': subport['segmentation_type']},
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ('set_subports', [{
- 'segmentation-id': str(subport['segmentation_id']),
- 'segmentation-type': subport['segmentation_type']}]),
- ]
-
- with testtools.ExpectedException(argparse.ArgumentTypeError):
- self.check_parser(self.cmd, arglist, verifylist)
-
- self.neutronclient.trunk_add_subports.assert_not_called()
-
- def test_set_trunk_attrs_with_exception(self):
- arglist = [
- '--name', 'reallylongname',
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ('name', 'reallylongname'),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- self.neutronclient.update_trunk = (
- mock.Mock(side_effect=exceptions.CommandError)
- )
- with testtools.ExpectedException(exceptions.CommandError) as e:
- self.cmd.take_action(parsed_args)
- self.assertEqual(
- "Failed to set trunk '%s': " % self._trunk['name'],
- str(e))
- attrs = {'name': 'reallylongname'}
- self.neutronclient.update_trunk.assert_called_once_with(
- self._trunk['name'], {trunk.TRUNK: attrs})
- self.neutronclient.trunk_add_subports.assert_not_called()
-
- def test_set_trunk_add_subport_with_exception(self):
- arglist = [
- '--subport', 'port=invalid_subport',
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ('set_subports', [{'port': 'invalid_subport'}]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- self.neutronclient.trunk_add_subports = (
- mock.Mock(side_effect=exceptions.CommandError)
- )
- with testtools.ExpectedException(exceptions.CommandError) as e:
- self.cmd.take_action(parsed_args)
- self.assertEqual(
- "Failed to add subports to trunk '%s': " % self._trunk['name'],
- str(e))
- self.neutronclient.update_trunk.assert_called_once_with(
- self._trunk['name'], {trunk.TRUNK: {}})
- self.neutronclient.trunk_add_subports.assert_called_once_with(
- self._trunk['name'],
- {'sub_ports': [{'port_id': 'invalid_subport'}]}
- )
-
-
-class TestListNetworkSubport(test_fakes.TestNeutronClientOSCV2):
-
- _trunk = fakes.FakeTrunk.create_one_trunk()
- _subports = _trunk['sub_ports']
-
- columns = (
- 'Port',
- 'Segmentation Type',
- 'Segmentation ID',
- )
- data = []
- for s in _subports:
- data.append((
- s['port_id'],
- s['segmentation_type'],
- s['segmentation_id'],
- ))
-
- def setUp(self):
- super(TestListNetworkSubport, self).setUp()
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.trunk_get_subports = mock.Mock(
- return_value={trunk.SUB_PORTS: self._subports})
-
- # Get the command object to test
- self.cmd = trunk.ListNetworkSubport(self.app, self.namespace)
-
- def test_subport_list(self):
- arglist = [
- '--trunk', self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.neutronclient.trunk_get_subports.assert_called_once_with(
- self._trunk['name'])
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestUnsetNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
-
- _trunk = fakes.FakeTrunk.create_one_trunk()
-
- columns = (
- 'admin_state_up',
- 'id',
- 'name',
- 'port_id',
- 'project_id',
- 'status',
- 'sub_ports',
- )
- data = (
- v2_utils.AdminStateColumn(_trunk['admin_state_up']),
- _trunk['id'],
- _trunk['name'],
- _trunk['port_id'],
- _trunk['project_id'],
- _trunk['status'],
- format_columns.ListDictColumn(_trunk['sub_ports']),
- )
-
- def setUp(self):
- super(TestUnsetNetworkTrunk, self).setUp()
-
- mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
- new=_get_id).start()
- self.neutronclient.trunk_remove_subports = mock.Mock(
- return_value=None)
-
- # Get the command object to test
- self.cmd = trunk.UnsetNetworkTrunk(self.app, self.namespace)
-
- def test_unset_network_trunk_subport(self):
- subport = self._trunk['sub_ports'][0]
- arglist = [
- "--subport", subport['port_id'],
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ('unset_subports', [subport['port_id']]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.neutronclient.trunk_remove_subports.assert_called_once_with(
- self._trunk['name'],
- {trunk.SUB_PORTS: [{'port_id': subport['port_id']}]}
- )
- self.assertIsNone(result)
-
- def test_unset_subport_no_arguments_fail(self):
- arglist = [
- self._trunk['name'],
- ]
- verifylist = [
- ('trunk', self._trunk['name']),
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, verifylist)
diff --git a/setup.cfg b/setup.cfg
index 5c62570..432ec68 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -33,13 +33,6 @@ openstack.cli.extension =
neutronclient = neutronclient.osc.plugin
openstack.neutronclient.v2 =
- network_subport_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkSubport
- network_trunk_create = neutronclient.osc.v2.trunk.network_trunk:CreateNetworkTrunk
- network_trunk_delete = neutronclient.osc.v2.trunk.network_trunk:DeleteNetworkTrunk
- network_trunk_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkTrunk
- network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk
- network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk
- network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk
sfc_flow_classifier_create = neutronclient.osc.v2.sfc.sfc_flow_classifier:CreateSfcFlowClassifier
sfc_flow_classifier_delete = neutronclient.osc.v2.sfc.sfc_flow_classifier:DeleteSfcFlowClassifier
sfc_flow_classifier_list = neutronclient.osc.v2.sfc.sfc_flow_classifier:ListSfcFlowClassifier