diff options
Diffstat (limited to 'neutronclient')
11 files changed, 1785 insertions, 0 deletions
diff --git a/neutronclient/osc/v2/networking_bgpvpn/__init__.py b/neutronclient/osc/v2/networking_bgpvpn/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/neutronclient/osc/v2/networking_bgpvpn/__init__.py diff --git a/neutronclient/osc/v2/networking_bgpvpn/bgpvpn.py b/neutronclient/osc/v2/networking_bgpvpn/bgpvpn.py new file mode 100644 index 0000000..1e5dbb0 --- /dev/null +++ b/neutronclient/osc/v2/networking_bgpvpn/bgpvpn.py @@ -0,0 +1,385 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import logging + +from osc_lib.cli.parseractions import KeyValueAction +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils as osc_utils + +from neutronclient._i18n import _ +from neutronclient._i18n import _LE +from neutronclient._i18n import _LW +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.osc.v2.networking_bgpvpn import constants + +LOG = logging.getLogger(__name__) + +_attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('tenant_id', 'Project ID', nc_osc_utils.LIST_LONG_ONLY), + ('name', 'Name', nc_osc_utils.LIST_BOTH), + ('type', 'Type', nc_osc_utils.LIST_BOTH), + ('route_targets', 'Route Targets', nc_osc_utils.LIST_LONG_ONLY), + ('import_targets', 'Import Targets', nc_osc_utils.LIST_LONG_ONLY), + ('export_targets', 'Export Targets', nc_osc_utils.LIST_LONG_ONLY), + ('route_distinguishers', 'Route Distinguishers', + nc_osc_utils.LIST_LONG_ONLY), + ('networks', 'Associated Networks', nc_osc_utils.LIST_LONG_ONLY), + ('routers', 'Associated Routers', nc_osc_utils.LIST_LONG_ONLY), +) +_formatters = { + 'route_targets': osc_utils.format_list, + 'import_targets': osc_utils.format_list, + 'export_targets': osc_utils.format_list, + 'route_distinguishers': osc_utils.format_list, + 'networks': osc_utils.format_list, + 'routers': osc_utils.format_list, +} + + +def _get_common_parser(parser, update=None): + """Adds to parser arguments common to create, set and unset commands. + + :params ArgumentParser parser: argparse object contains all command's + arguments + :params string update: Determines if it is a create command (value: None), + it is a set command (value: 'set') or if it is an + unset command (value: 'unset') + """ + ADD_RT = _("Add Route Target to import/export list") + REMOVE_RT = _("Remove Route Target from import/export list") + ADD_IMPORT_RT = _("Add Route Target to import list") + REMOVE_IMPORT_RT = _("Remove Route Target from import list") + ADD_EXPORT_RT = _("Add Route Target to export list") + REMOVE_EXPORT_RT = _("Remove Route Target from export list") + ADD_RD = _("Add Route Distinguisher to the list of Route Distinguishers " + "from which a Route Distinguishers will be picked from to " + "advertise a VPN route") + REMOVE_RD = _("Remove Route Distinguisher from the list of Route " + "Distinguishers from which a Route Distinguishers will be " + "picked from to advertise a VPN route") + REPEAT_RT = _("repeat option for multiple Route Targets") + REPEAT_RD = _("repeat option for multiple Route Distinguishers") + + def is_appended(): + return update is None or update == 'set' + + if update is None or update == 'set': + parser.add_argument( + '--name', + metavar="<name>", + help=_("Name of the BGP VPN"), + ) + parser.add_argument( + '--route-target', + dest='route_targets', + action='append', + metavar="<route-target>", + help="%s (%s)" % ((ADD_RT if is_appended() else REMOVE_RT), REPEAT_RT), + ) + if update: + parser.add_argument( + '--no-route-target' if update == 'set' else '--all-route-target', + dest='purge_route_target', + action='store_true', + help=_('Empty route target list'), + ) + parser.add_argument( + '--import-target', + dest='import_targets', + action='append', + metavar="<import-target>", + help="%s (%s)" % ((ADD_IMPORT_RT if is_appended() else + REMOVE_IMPORT_RT), REPEAT_RT), + ) + if update: + parser.add_argument( + '--no-import-target' if update == 'set' else '--all-import-target', + dest='purge_import_target', + action='store_true', + help=_('Empty import route target list'), + ) + parser.add_argument( + '--export-target', + dest='export_targets', + action='append', + metavar="<export-target>", + help="%s (%s)" % ((ADD_EXPORT_RT if is_appended() else + REMOVE_EXPORT_RT), REPEAT_RT), + ) + if update: + parser.add_argument( + '--no-export-target' if update == 'set' else + '--all-export-target', + dest='purge_export_target', + action='store_true', + help=_('Empty export route target list'), + ) + parser.add_argument( + '--route-distinguisher', + dest='route_distinguishers', + action='append', + metavar="<route-distinguisher>", + help="%s (%s)" % ((ADD_RD if is_appended() else REMOVE_RD), REPEAT_RD), + ) + if update: + parser.add_argument( + '--no-route-distinguisher' if update == 'set' else + '--all-route-distinguisher', + dest='purge_route_distinguisher', + action='store_true', + help=_('Empty route distinguisher list'), + ) + + +def _args2body(client_manager, id, action, args): + + if (not (args.purge_route_target and args.purge_import_target and + args.purge_export_target and args.purge_route_distinguisher) and + (args.route_targets or args.import_targets or + args.export_targets or args.route_distinguishers)): + bgpvpn = client_manager.neutronclient.show_bgpvpn(id)['bgpvpn'] + + attrs = {} + + if 'name' in args and args.name is not None: + attrs['name'] = str(args.name) + + if args.purge_route_target: + attrs['route_targets'] = [] + elif args.route_targets: + if action == 'set': + attrs['route_targets'] = list(set(bgpvpn['route_targets']) | + set(args.route_targets)) + elif action == 'unset': + attrs['route_targets'] = list(set(bgpvpn['route_targets']) - + set(args.route_targets)) + + if args.purge_import_target: + attrs['import_targets'] = [] + elif args.import_targets: + if action == 'set': + attrs['import_targets'] = list(set(bgpvpn['import_targets']) | + set(args.import_targets)) + elif action == 'unset': + attrs['import_targets'] = list(set(bgpvpn['import_targets']) - + set(args.import_targets)) + + if args.purge_export_target: + attrs['export_targets'] = [] + elif args.export_targets: + if action == 'set': + attrs['export_targets'] = list(set(bgpvpn['export_targets']) | + set(args.export_targets)) + elif action == 'unset': + attrs['export_targets'] = list(set(bgpvpn['export_targets']) - + set(args.export_targets)) + + if args.purge_route_distinguisher: + attrs['route_distinguishers'] = [] + elif args.route_distinguishers: + if action == 'set': + attrs['route_distinguishers'] = list( + set(bgpvpn['route_distinguishers']) | + set(args.route_distinguishers)) + elif action == 'unset': + attrs['route_distinguishers'] = list( + set(bgpvpn['route_distinguishers']) - + set(args.route_distinguishers)) + + return {constants.BGPVPN: attrs} + + +class CreateBgpvpn(command.ShowOne): + _description = _("Create BGP VPN resource") + + def get_parser(self, prog_name): + parser = super(CreateBgpvpn, self).get_parser(prog_name) + nc_osc_utils.add_project_owner_option_to_parser(parser) + _get_common_parser(parser) + parser.add_argument( + '--type', + default='l3', + choices=['l2', 'l3'], + help=_("BGP VPN type selection between IP VPN (l3) and Ethernet " + "VPN (l2) (default: %(default)s)"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = {} + if parsed_args.name is not None: + attrs['name'] = str(parsed_args.name) + if parsed_args.type is not None: + attrs['type'] = parsed_args.type + if parsed_args.route_targets is not None: + attrs['route_targets'] = parsed_args.route_targets + if parsed_args.import_targets is not None: + attrs['import_targets'] = parsed_args.import_targets + if parsed_args.export_targets is not None: + attrs['export_targets'] = parsed_args.export_targets + if parsed_args.route_distinguishers is not None: + attrs['route_distinguishers'] = parsed_args.route_distinguishers + if 'project' in parsed_args and parsed_args.project is not None: + project_id = nc_osc_utils.find_project( + self.app.client_manager.identity, + parsed_args.project, + parsed_args.project_domain, + ).id + attrs['tenant_id'] = project_id + body = {constants.BGPVPN: attrs} + obj = client.create_bgpvpn(body)[constants.BGPVPN] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = osc_utils.get_dict_properties(obj, columns, + formatters=_formatters) + return display_columns, data + + +class SetBgpvpn(command.Command): + _description = _("Set BGP VPN properties") + + def get_parser(self, prog_name): + parser = super(SetBgpvpn, self).get_parser(prog_name) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=_("BGP VPN to update (name or ID)"), + ) + _get_common_parser(parser, update='set') + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + id = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)['id'] + body = _args2body(self.app.client_manager, id, 'set', parsed_args) + client.update_bgpvpn(id, body) + + +class UnsetBgpvpn(command.Command): + _description = _("Unset BGP VPN properties") + + def get_parser(self, prog_name): + parser = super(UnsetBgpvpn, self).get_parser(prog_name) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=_("BGP VPN to update (name or ID)"), + ) + _get_common_parser(parser, update='unset') + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + id = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)['id'] + body = _args2body(self.app.client_manager, id, 'unset', parsed_args) + client.update_bgpvpn(id, body) + + +class DeleteBgpvpn(command.Command): + _description = _("Delete BGP VPN resource(s)") + + def get_parser(self, prog_name): + parser = super(DeleteBgpvpn, self).get_parser(prog_name) + parser.add_argument( + 'bgpvpns', + metavar="<bgpvpn>", + nargs="+", + help=_("BGP VPN(s) to delete (name or ID)"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fails = 0 + for id_or_name in parsed_args.bgpvpns: + try: + id = client.find_resource(constants.BGPVPN, id_or_name)['id'] + client.delete_bgpvpn(id) + LOG.warning(_LW("BGP VPN %(id)s deleted"), {'id': id}) + except Exception as e: + fails += 1 + LOG.error(_LE("Failed to delete BGP VPN with name or ID " + "'%(id_or_name)s': %(e)s"), + {'id_or_name': id_or_name, 'e': e}) + if fails > 0: + msg = (_("Failed to delete %(fails)s of %(total)s BGP VPN.") % + {'fails': fails, 'total': len(parsed_args.bgpvpns)}) + raise exceptions.CommandError(msg) + + +class ListBgpvpn(command.Lister): + _description = _("List BGP VPN resources") + + def get_parser(self, prog_name): + parser = super(ListBgpvpn, self).get_parser(prog_name) + nc_osc_utils.add_project_owner_option_to_parser(parser) + parser.add_argument( + '--long', + action='store_true', + help=_("List additional fields in output"), + ) + parser.add_argument( + '--property', + metavar="<key=value>", + default=dict(), + help=_("Filter property to apply on returned BGP VPNs (repeat to " + "filter on multiple properties)"), + action=KeyValueAction, + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + params = {} + if parsed_args.project is not None: + project_id = nc_osc_utils.find_project( + self.app.client_manager.identity, + parsed_args.project, + parsed_args.project_domain, + ).id + params['tenant_id'] = project_id + if parsed_args.property: + params.update(parsed_args.property) + objs = client.list_bgpvpns(**params)[constants.BGPVPNS] + headers, columns = nc_osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, (osc_utils.get_dict_properties( + s, columns, formatters=_formatters) for s in objs)) + + +class ShowBgpvpn(command.ShowOne): + _description = _("Show information of a given BGP VPN") + + def get_parser(self, prog_name): + parser = super(ShowBgpvpn, self).get_parser(prog_name) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=_("BGP VPN to display (name or ID)"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + id = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn)['id'] + obj = client.show_bgpvpn(id)[constants.BGPVPN] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = osc_utils.get_dict_properties(obj, columns, + formatters=_formatters) + return display_columns, data diff --git a/neutronclient/osc/v2/networking_bgpvpn/constants.py b/neutronclient/osc/v2/networking_bgpvpn/constants.py new file mode 100644 index 0000000..775721e --- /dev/null +++ b/neutronclient/osc/v2/networking_bgpvpn/constants.py @@ -0,0 +1,26 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +BGPVPN = 'bgpvpn' +BGPVPNS = '%ss' % BGPVPN + +NETWORK_RESOURCE_NAME = 'network' +NETWORK_ASSOC = '%s_association' % NETWORK_RESOURCE_NAME +NETWORK_ASSOCS = '%ss' % NETWORK_ASSOC + +ROUTER_RESOURCE_NAME = 'router' +ROUTER_ASSOC = '%s_association' % ROUTER_RESOURCE_NAME +ROUTER_ASSOCS = '%ss' % ROUTER_ASSOC diff --git a/neutronclient/osc/v2/networking_bgpvpn/network_association.py b/neutronclient/osc/v2/networking_bgpvpn/network_association.py new file mode 100644 index 0000000..4799aa5 --- /dev/null +++ b/neutronclient/osc/v2/networking_bgpvpn/network_association.py @@ -0,0 +1,63 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +from neutronclient._i18n import _ +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.osc.v2.networking_bgpvpn import constants +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + CreateBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + DeleteBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + ListBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + ShowBgpvpnResAssoc + + +class BgpvpnNetAssoc(object): + _assoc_res_name = constants.NETWORK_RESOURCE_NAME + _resource = constants.NETWORK_ASSOC + _resource_plural = constants.NETWORK_ASSOCS + + _attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('tenant_id', 'Project ID', nc_osc_utils.LIST_LONG_ONLY), + ('%s_id' % _assoc_res_name, '%s ID' % _assoc_res_name.capitalize(), + nc_osc_utils.LIST_BOTH), + ) + _formatters = {} + + +class CreateBgpvpnNetAssoc(BgpvpnNetAssoc, CreateBgpvpnResAssoc): + _description = _("Create a BGP VPN network association") + pass + + +class DeleteBgpvpnNetAssoc(BgpvpnNetAssoc, DeleteBgpvpnResAssoc): + _description = _("Delete a BGP VPN network association(s) for a given BGP " + "VPN") + pass + + +class ListBgpvpnNetAssoc(BgpvpnNetAssoc, ListBgpvpnResAssoc): + _description = _("List BGP VPN network associations for a given BGP VPN") + pass + + +class ShowBgpvpnNetAssoc(BgpvpnNetAssoc, ShowBgpvpnResAssoc): + _description = _("Show information of a given BGP VPN network association") + pass diff --git a/neutronclient/osc/v2/networking_bgpvpn/resource_association.py b/neutronclient/osc/v2/networking_bgpvpn/resource_association.py new file mode 100644 index 0000000..98f54cd --- /dev/null +++ b/neutronclient/osc/v2/networking_bgpvpn/resource_association.py @@ -0,0 +1,190 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import logging + +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils as osc_utils + +from neutronclient._i18n import _ +from neutronclient._i18n import _LE +from neutronclient._i18n import _LW +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.osc.v2.networking_bgpvpn import constants + +LOG = logging.getLogger(__name__) + + +class CreateBgpvpnResAssoc(command.ShowOne): + """Create a BGP VPN resource association""" + + def get_parser(self, prog_name): + parser = super(CreateBgpvpnResAssoc, self).get_parser(prog_name) + nc_osc_utils.add_project_owner_option_to_parser(parser) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=(_("BGP VPN to apply the %s association (name or ID)") % + self._assoc_res_name), + ) + parser.add_argument( + 'resource', + metavar="<%s>" % self._assoc_res_name, + help=(_("%s to associate the BGP VPN (name or ID)") % + self._assoc_res_name.capitalize()), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + create_method = getattr( + client, 'create_bgpvpn_%s_assoc' % self._assoc_res_name) + bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn) + assoc_res = client.find_resource(self._assoc_res_name, + parsed_args.resource) + body = { + self._resource: { + '%s_id' % self._assoc_res_name: assoc_res['id'], + }, + } + if 'project' in parsed_args and parsed_args.project is not None: + project_id = nc_osc_utils.find_project( + self.app.client_manager.identity, + parsed_args.project, + parsed_args.project_domain, + ).id + body[self._resource]['tenant_id'] = project_id + obj = create_method(bgpvpn['id'], body)[self._resource] + columns, display_columns = nc_osc_utils.get_columns(obj, + self._attr_map) + data = osc_utils.get_dict_properties(obj, columns, + formatters=self._formatters) + return display_columns, data + + +class DeleteBgpvpnResAssoc(command.Command): + """Remove a BGP VPN resource association(s) for a given BGP VPN""" + + def get_parser(self, prog_name): + parser = super(DeleteBgpvpnResAssoc, self).get_parser(prog_name) + parser.add_argument( + 'resource_association_ids', + metavar="<%s association ID>" % self._assoc_res_name, + nargs="+", + help=(_("%s association ID(s) to remove") % + self._assoc_res_name.capitalize()), + ) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=_("BGP VPN the association belongs to (name or ID)"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + delete_method = getattr( + client, 'delete_bgpvpn_%s_assoc' % self._assoc_res_name) + bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn) + fails = 0 + for id in parsed_args.resource_association_ids: + try: + delete_method(bgpvpn['id'], id) + LOG.warning( + _LW("%(assoc_res_name)s association %(id)s deleted"), + {'assoc_res_name': self._assoc_res_name.capitalize(), + 'id': id}) + except Exception as e: + fails += 1 + LOG.error(_LE("Failed to delete %(assoc_res_name)s " + "association with ID '%(id)s': %(e)s"), + {'assoc_res_name': self._assoc_res_name, + 'id': id, + 'e': e}) + if fails > 0: + msg = (_("Failed to delete %(fails)s of %(total)s " + "%(assoc_res_name)s BGP VPN association(s).") % + {'fails': fails, + 'total': len(parsed_args.resource_association_ids), + 'assoc_res_name': self._assoc_res_name}) + raise exceptions.CommandError(msg) + + +class ListBgpvpnResAssoc(command.Lister): + """List BGP VPN resource associations for a given BGP VPN""" + + def get_parser(self, prog_name): + parser = super(ListBgpvpnResAssoc, self).get_parser(prog_name) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=_("BGP VPN listed associations belong to (name or ID)"), + ) + parser.add_argument( + '--long', + action='store_true', + help=_("List additional fields in output"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + list_method = getattr(client, + 'list_bgpvpn_%s_assocs' % self._assoc_res_name) + bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn) + objs = list_method(bgpvpn['id'], + retrieve_all=True)[self._resource_plural] + headers, columns = nc_osc_utils.get_column_definitions( + self._attr_map, long_listing=parsed_args.long) + return (headers, (osc_utils.get_dict_properties( + s, columns, formatters=self._formatters) for s in objs)) + + +class ShowBgpvpnResAssoc(command.ShowOne): + """Show information of a given BGP VPN resource association""" + + def get_parser(self, prog_name): + parser = super(ShowBgpvpnResAssoc, self).get_parser(prog_name) + parser.add_argument( + 'resource_association_id', + metavar="<%s association ID>" % self._assoc_res_name, + help=(_("%s association ID to look up") % + self._assoc_res_name.capitalize()), + ) + parser.add_argument( + 'bgpvpn', + metavar="<bgpvpn>", + help=_("BGP VPN the association belongs to (name or ID)"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + show_method = getattr(client, + 'show_bgpvpn_%s_assoc' % self._assoc_res_name) + bgpvpn = client.find_resource(constants.BGPVPN, parsed_args.bgpvpn) + assoc = client.find_resource_by_id( + self._resource, + parsed_args.resource_association_id, + cmd_resource='bgpvpn_%s_assoc' % self._assoc_res_name, + parent_id=bgpvpn['id']) + obj = show_method(bgpvpn['id'], assoc['id'])[self._resource] + columns, display_columns = nc_osc_utils.get_columns(obj, + self._attr_map) + data = osc_utils.get_dict_properties(obj, columns, + formatters=self._formatters) + return display_columns, data diff --git a/neutronclient/osc/v2/networking_bgpvpn/router_association.py b/neutronclient/osc/v2/networking_bgpvpn/router_association.py new file mode 100644 index 0000000..dd3ce2a --- /dev/null +++ b/neutronclient/osc/v2/networking_bgpvpn/router_association.py @@ -0,0 +1,63 @@ +# Copyright (c) 2016 Juniper Routerworks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +from neutronclient._i18n import _ +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.osc.v2.networking_bgpvpn import constants +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + CreateBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + DeleteBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + ListBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + ShowBgpvpnResAssoc + + +class BgpvpnRouterAssoc(object): + _assoc_res_name = constants.ROUTER_RESOURCE_NAME + _resource = constants.ROUTER_ASSOC + _resource_plural = constants.ROUTER_ASSOCS + + _attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('tenant_id', 'Project ID', nc_osc_utils.LIST_LONG_ONLY), + ('%s_id' % _assoc_res_name, '%s ID' % _assoc_res_name.capitalize(), + nc_osc_utils.LIST_BOTH), + ) + _formatters = {} + + +class CreateBgpvpnRouterAssoc(BgpvpnRouterAssoc, CreateBgpvpnResAssoc): + _description = _("Create a BGP VPN router association") + pass + + +class DeleteBgpvpnRouterAssoc(BgpvpnRouterAssoc, DeleteBgpvpnResAssoc): + _description = _("Delete a BGP VPN router association(s) for a given BGP " + "VPN") + pass + + +class ListBgpvpnRouterAssoc(BgpvpnRouterAssoc, ListBgpvpnResAssoc): + _description = _("List BGP VPN router associations for a given BGP VPN") + pass + + +class ShowBgpvpnRouterAssoc(BgpvpnRouterAssoc, ShowBgpvpnResAssoc): + _description = _("Show information of a given BGP VPN router association") + pass diff --git a/neutronclient/tests/unit/osc/v2/networking_bgpvpn/__init__.py b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/__init__.py diff --git a/neutronclient/tests/unit/osc/v2/networking_bgpvpn/fakes.py b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/fakes.py new file mode 100644 index 0000000..2c3bcfa --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/fakes.py @@ -0,0 +1,183 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock + +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.osc.v2.networking_bgpvpn import constants +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + CreateBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + DeleteBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + ListBgpvpnResAssoc +from neutronclient.osc.v2.networking_bgpvpn.resource_association import\ + ShowBgpvpnResAssoc +from neutronclient.tests.unit.osc.v2 import fakes as test_fakes + + +class TestNeutronClientBgpvpn(test_fakes.TestNeutronClientOSCV2): + + def setUp(self): + super(TestNeutronClientBgpvpn, self).setUp() + self.neutronclient.find_resource = mock.Mock( + side_effect=lambda resource, name_or_id, project_id=None, + cmd_resource=None, parent_id=None, fields=None: + {'id': name_or_id}) + self.neutronclient.find_resource_by_id = mock.Mock( + side_effect=lambda resource, resource_id, cmd_resource=None, + parent_id=None, fields=None: + {'id': resource_id}) + nc_osc_utils.find_project = mock.Mock( + side_effect=lambda _, name_or_id, __: mock.Mock(id=name_or_id)) + + +class FakeBgpvpn(object): + """Fake BGP VPN with attributes.""" + + @staticmethod + def create_one_bgpvpn(attrs=None): + """Create a fake BGP VPN.""" + + attrs = attrs or {} + + # Set default attributes. + bgpvpn_attrs = { + 'id': 'fake_bgpvpn_id', + 'tenant_id': 'fake_project_id', + 'name': '', + 'type': 'l3', + 'route_targets': [], + 'import_targets': [], + 'export_targets': [], + 'route_distinguishers': [], + 'networks': [], + 'routers': [], + } + + # Overwrite default attributes. + bgpvpn_attrs.update(attrs) + return copy.deepcopy(bgpvpn_attrs) + + @staticmethod + def create_bgpvpns(attrs=None, count=1): + """Create multiple fake BGP VPN.""" + + bgpvpns = [] + for i in range(0, count): + if attrs is None: + attrs = {'id': 'fake_id%d' % i} + elif getattr(attrs, 'id', None) is None: + attrs['id'] = 'fake_id%d' % i + bgpvpns.append(FakeBgpvpn.create_one_bgpvpn(attrs)) + + return {constants.BGPVPNS: bgpvpns} + + +class BgpvpnFakeAssoc(object): + _assoc_res_name = 'fake_resource' + _resource = '%s_association' % _assoc_res_name + _resource_plural = '%ss' % _resource + + _attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('tenant_id', 'Project ID', nc_osc_utils.LIST_LONG_ONLY), + ('%s_id' % _assoc_res_name, '%s ID' % _assoc_res_name.capitalize(), + nc_osc_utils.LIST_BOTH), + ) + _formatters = {} + + +class CreateBgpvpnFakeResAssoc(BgpvpnFakeAssoc, CreateBgpvpnResAssoc): + pass + + +class DeleteBgpvpnFakeResAssoc(BgpvpnFakeAssoc, DeleteBgpvpnResAssoc): + pass + + +class ListBgpvpnFakeResAssoc(BgpvpnFakeAssoc, ListBgpvpnResAssoc): + pass + + +class ShowBgpvpnFakeResAssoc(BgpvpnFakeAssoc, ShowBgpvpnResAssoc): + pass + + +class FakeResource(object): + """Fake resource with minimal attributes.""" + + @staticmethod + def create_one_resource(attrs=None): + """Create a fake resource.""" + + attrs = attrs or {} + + # Set default attributes. + res_attrs = { + 'id': 'fake_resource_id', + 'tenant_id': 'fake_project_id', + } + + # Overwrite default attributes. + res_attrs.update(attrs) + return copy.deepcopy(res_attrs) + + @staticmethod + def create_resources(attrs=None, count=1): + """Create multiple fake resources.""" + + resources = [] + for i in range(0, count): + if attrs is None: + attrs = {'id': 'fake_id%d' % i} + elif getattr(attrs, 'id', None) is None: + attrs['id'] = 'fake_id%d' % i + resources.append(FakeResource.create_one_resource(attrs)) + + return {'%ss' % BgpvpnFakeAssoc._assoc_res_name: resources} + + +class FakeResAssoc(object): + """Fake resource association with minimal attributes.""" + + @staticmethod + def create_one_resource_association(resource): + """Create a fake resource association.""" + + res_assoc_attrs = { + 'id': 'fake_association_id', + 'tenant_id': resource['tenant_id'], + 'fake_resource_id': resource['id'], + } + return copy.deepcopy(res_assoc_attrs) + + @staticmethod + def create_resource_associations(resources): + """Create multiple fake resource associations.""" + + res_assocs = [] + for idx, resource in enumerate( + resources['%ss' % BgpvpnFakeAssoc._assoc_res_name]): + res_assoc_attrs = { + 'id': 'fake_association_id%d' % idx, + 'tenant_id': resource['tenant_id'], + 'fake_resource_id': resource['id'], + } + res_assocs.append(copy.deepcopy(res_assoc_attrs)) + + return {BgpvpnFakeAssoc._resource_plural: res_assocs} diff --git a/neutronclient/tests/unit/osc/v2/networking_bgpvpn/test_bgpvpn.py b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/test_bgpvpn.py new file mode 100644 index 0000000..3d83488 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/test_bgpvpn.py @@ -0,0 +1,515 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +import operator + +from osc_lib import exceptions +from osc_lib import utils as osc_utils + +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.osc.v2.networking_bgpvpn import bgpvpn +from neutronclient.osc.v2.networking_bgpvpn import constants +from neutronclient.tests.unit.osc.v2.networking_bgpvpn import fakes + + +columns_short = tuple(col for col, _, listing_mode in bgpvpn._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_SHORT_ONLY)) +columns_long = tuple(col for col, _, listing_mode in bgpvpn._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_LONG_ONLY)) +headers_short = tuple(head for _, head, listing_mode in bgpvpn._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_SHORT_ONLY)) +headers_long = tuple(head for _, head, listing_mode in bgpvpn._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_LONG_ONLY)) +sorted_attr_map = sorted(bgpvpn._attr_map, key=operator.itemgetter(1)) +sorted_columns = tuple(col for col, _, _ in sorted_attr_map) +sorted_headers = tuple(head for _, head, _ in sorted_attr_map) + + +def _get_data(attrs, columns=sorted_columns): + return osc_utils.get_dict_properties(attrs, columns, + formatters=bgpvpn._formatters) + + +class TestCreateBgpvpn(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestCreateBgpvpn, self).setUp() + self.cmd = bgpvpn.CreateBgpvpn(self.app, self.namespace) + + def test_create_bgpvpn_with_no_args(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + self.neutronclient.create_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + arglist = [] + verifylist = [ + ('project', None), + ('name', None), + ('type', 'l3'), + ('route_targets', None), + ('import_targets', None), + ('export_targets', None), + ('route_distinguishers', None), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + cols, data = self.cmd.take_action(parsed_args) + + self.neutronclient.create_bgpvpn.assert_called_once_with( + {constants.BGPVPN: {'type': 'l3'}}) + self.assertEqual(sorted_headers, cols) + self.assertEqual(_get_data(fake_bgpvpn), data) + + def test_create_bgpvpn_with_all_args(self): + attrs = { + 'tenant_id': 'new_fake_project_id', + 'name': 'fake_name', + 'type': 'l2', + 'route_targets': ['fake_rt1', 'fake_rt2', 'fake_rt3'], + 'import_targets': ['fake_irt1', 'fake_irt2', 'fake_irt3'], + 'export_targets': ['fake_ert1', 'fake_ert2', 'fake_ert3'], + 'route_distinguishers': ['fake_rd1', 'fake_rd2', 'fake_rd3'], + } + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn(attrs) + self.neutronclient.create_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + arglist = [ + '--project', fake_bgpvpn['tenant_id'], + '--name', fake_bgpvpn['name'], + '--type', fake_bgpvpn['type'], + ] + for rt in fake_bgpvpn['route_targets']: + arglist.extend(['--route-target', rt]) + for rt in fake_bgpvpn['import_targets']: + arglist.extend(['--import-target', rt]) + for rt in fake_bgpvpn['export_targets']: + arglist.extend(['--export-target', rt]) + for rd in fake_bgpvpn['route_distinguishers']: + arglist.extend(['--route-distinguisher', rd]) + verifylist = [ + ('project', fake_bgpvpn['tenant_id']), + ('name', fake_bgpvpn['name']), + ('type', fake_bgpvpn['type']), + ('route_targets', fake_bgpvpn['route_targets']), + ('import_targets', fake_bgpvpn['import_targets']), + ('export_targets', fake_bgpvpn['export_targets']), + ('route_distinguishers', fake_bgpvpn['route_distinguishers']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + cols, data = self.cmd.take_action(parsed_args) + + fake_bgpvpn_call = copy.deepcopy(fake_bgpvpn) + fake_bgpvpn_call.pop('id') + fake_bgpvpn_call.pop('networks') + fake_bgpvpn_call.pop('routers') + self.neutronclient.create_bgpvpn.assert_called_once_with( + {constants.BGPVPN: fake_bgpvpn_call}) + self.assertEqual(sorted_headers, cols) + self.assertEqual(_get_data(fake_bgpvpn), data) + + +class TestSetBgpvpn(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestSetBgpvpn, self).setUp() + self.cmd = bgpvpn.SetBgpvpn(self.app, self.namespace) + + def test_set_bgpvpn(self): + attrs = { + 'route_targets': ['set_rt1', 'set_rt2', 'set_rt3'], + 'import_targets': ['set_irt1', 'set_irt2', 'set_irt3'], + 'export_targets': ['set_ert1', 'set_ert2', 'set_ert3'], + 'route_distinguishers': ['set_rd1', 'set_rd2', 'set_rd3'], + } + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn(attrs) + self.neutronclient.show_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + self.neutronclient.update_bgpvpn = mock.Mock() + arglist = [ + fake_bgpvpn['id'], + '--name', 'set_name', + '--route-target', 'set_rt1', + '--import-target', 'set_irt1', + '--export-target', 'set_ert1', + '--route-distinguisher', 'set_rd1', + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ('name', 'set_name'), + ('route_targets', ['set_rt1']), + ('purge_route_target', False), + ('import_targets', ['set_irt1']), + ('purge_import_target', False), + ('export_targets', ['set_ert1']), + ('purge_export_target', False), + ('route_distinguishers', ['set_rd1']), + ('purge_route_distinguisher', False), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'name': 'set_name', + 'route_targets': list(set(fake_bgpvpn['route_targets']) | + set(['set_rt1'])), + 'import_targets': list(set(fake_bgpvpn['import_targets']) | + set(['set_irt1'])), + 'export_targets': list(set(fake_bgpvpn['export_targets']) | + set(['set_ert1'])), + 'route_distinguishers': list( + set(fake_bgpvpn['route_distinguishers']) | set(['set_rd1'])), + } + self.neutronclient.update_bgpvpn.assert_called_once_with( + fake_bgpvpn['id'], {constants.BGPVPN: attrs}) + self.assertIsNone(result) + + def test_set_bgpvpn_with_purge_list(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + self.neutronclient.show_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + self.neutronclient.update_bgpvpn = mock.Mock() + arglist = [ + fake_bgpvpn['id'], + '--route-target', 'set_rt1', + '--no-route-target', + '--import-target', 'set_irt1', + '--no-import-target', + '--export-target', 'set_ert1', + '--no-export-target', + '--route-distinguisher', 'set_rd1', + '--no-route-distinguisher', + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ('route_targets', ['set_rt1']), + ('purge_route_target', True), + ('import_targets', ['set_irt1']), + ('purge_import_target', True), + ('export_targets', ['set_ert1']), + ('purge_export_target', True), + ('route_distinguishers', ['set_rd1']), + ('purge_route_distinguisher', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'route_targets': [], + 'import_targets': [], + 'export_targets': [], + 'route_distinguishers': [], + } + self.neutronclient.update_bgpvpn.assert_called_once_with( + fake_bgpvpn['id'], {constants.BGPVPN: attrs}) + self.assertIsNone(result) + + +class TestUnsetBgpvpn(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestUnsetBgpvpn, self).setUp() + self.cmd = bgpvpn.UnsetBgpvpn(self.app, self.namespace) + + def test_unset_bgpvpn(self): + attrs = { + 'route_targets': ['unset_rt1', 'unset_rt2', 'unset_rt3'], + 'import_targets': ['unset_irt1', 'unset_irt2', 'unset_irt3'], + 'export_targets': ['unset_ert1', 'unset_ert2', 'unset_ert3'], + 'route_distinguishers': ['unset_rd1', 'unset_rd2', 'unset_rd3'], + } + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn(attrs) + self.neutronclient.show_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + self.neutronclient.update_bgpvpn = mock.Mock() + arglist = [ + fake_bgpvpn['id'], + '--route-target', 'unset_rt1', + '--import-target', 'unset_irt1', + '--export-target', 'unset_ert1', + '--route-distinguisher', 'unset_rd1', + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ('route_targets', ['unset_rt1']), + ('purge_route_target', False), + ('import_targets', ['unset_irt1']), + ('purge_import_target', False), + ('export_targets', ['unset_ert1']), + ('purge_export_target', False), + ('route_distinguishers', ['unset_rd1']), + ('purge_route_distinguisher', False), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'route_targets': list(set(fake_bgpvpn['route_targets']) - + set(['unset_rt1'])), + 'import_targets': list(set(fake_bgpvpn['import_targets']) - + set(['unset_irt1'])), + 'export_targets': list(set(fake_bgpvpn['export_targets']) - + set(['unset_ert1'])), + 'route_distinguishers': list( + set(fake_bgpvpn['route_distinguishers']) - set(['unset_rd1'])), + } + self.neutronclient.update_bgpvpn.assert_called_once_with( + fake_bgpvpn['id'], {constants.BGPVPN: attrs}) + self.assertIsNone(result) + + def test_unset_bgpvpn_with_purge_list(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + self.neutronclient.show_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + self.neutronclient.update_bgpvpn = mock.Mock() + arglist = [ + fake_bgpvpn['id'], + '--route-target', 'unset_rt1', + '--all-route-target', + '--import-target', 'unset_irt1', + '--all-import-target', + '--export-target', 'unset_ert1', + '--all-export-target', + '--route-distinguisher', 'unset_rd1', + '--all-route-distinguisher', + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ('route_targets', ['unset_rt1']), + ('purge_route_target', True), + ('import_targets', ['unset_irt1']), + ('purge_import_target', True), + ('export_targets', ['unset_ert1']), + ('purge_export_target', True), + ('route_distinguishers', ['unset_rd1']), + ('purge_route_distinguisher', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'route_targets': [], + 'import_targets': [], + 'export_targets': [], + 'route_distinguishers': [], + } + self.neutronclient.update_bgpvpn.assert_called_once_with( + fake_bgpvpn['id'], {constants.BGPVPN: attrs}) + self.assertIsNone(result) + + +class TestDeleteBgpvpn(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestDeleteBgpvpn, self).setUp() + self.neutronclient.find_resource = mock.Mock( + side_effect=lambda _, name_or_id: {'id': name_or_id}) + self.cmd = bgpvpn.DeleteBgpvpn(self.app, self.namespace) + + def test_delete_one_bgpvpn(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + self.neutronclient.delete_bgpvpn = mock.Mock() + arglist = [ + fake_bgpvpn['id'], + ] + verifylist = [ + ('bgpvpns', [fake_bgpvpn['id']]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.neutronclient.delete_bgpvpn.assert_called_once_with( + fake_bgpvpn['id']) + self.assertIsNone(result) + + def test_delete_multi_bpgvpn(self): + fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=3) + fake_bgpvpn_ids = [fake_bgpvpn['id'] for fake_bgpvpn in + fake_bgpvpns[constants.BGPVPNS]] + self.neutronclient.delete_bgpvpn = mock.Mock() + arglist = fake_bgpvpn_ids + verifylist = [ + ('bgpvpns', fake_bgpvpn_ids), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.neutronclient.delete_bgpvpn.assert_has_calls( + [mock.call(id) for id in fake_bgpvpn_ids]) + self.assertIsNone(result) + + def test_delete_multi_bpgvpn_with_unknown(self): + count = 3 + fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count) + fake_bgpvpn_ids = [fake_bgpvpn['id'] for fake_bgpvpn in + fake_bgpvpns[constants.BGPVPNS]] + + def raise_unknonw_resource(resource_path, name_or_id): + if str(count - 2) in name_or_id: + raise Exception() + self.neutronclient.delete_bgpvpn = mock.Mock( + side_effect=raise_unknonw_resource) + arglist = fake_bgpvpn_ids + verifylist = [ + ('bgpvpns', fake_bgpvpn_ids), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + self.neutronclient.delete_bgpvpn.assert_has_calls( + [mock.call(id) for id in fake_bgpvpn_ids]) + + +class TestListBgpvpn(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestListBgpvpn, self).setUp() + self.cmd = bgpvpn.ListBgpvpn(self.app, self.namespace) + + def test_list_all_bgpvpn(self): + count = 3 + fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count) + self.neutronclient.list_bgpvpns = mock.Mock(return_value=fake_bgpvpns) + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_bgpvpns.assert_called_once() + self.assertEqual(headers, list(headers_short)) + self.assertEqual(list(data), + [_get_data(fake_bgpvpn, columns_short) for fake_bgpvpn + in fake_bgpvpns[constants.BGPVPNS]]) + + def test_list_all_bgpvpn_long_mode(self): + count = 3 + fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count) + self.neutronclient.list_bgpvpns = mock.Mock(return_value=fake_bgpvpns) + arglist = [ + '--long', + ] + verifylist = [ + ('long', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_bgpvpns.assert_called_once() + self.assertEqual(headers, list(headers_long)) + self.assertEqual(list(data), + [_get_data(fake_bgpvpn, columns_long) for fake_bgpvpn + in fake_bgpvpns[constants.BGPVPNS]]) + + def test_list_project_bgpvpn(self): + count = 3 + project_id = 'list_fake_project_id' + attrs = {'tenant_id': project_id} + fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count, + attrs=attrs) + self.neutronclient.list_bgpvpns = mock.Mock(return_value=fake_bgpvpns) + arglist = [ + '--project', project_id, + ] + verifylist = [ + ('project', project_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_bgpvpns.assert_called_once_with( + tenant_id=project_id) + self.assertEqual(headers, list(headers_short)) + self.assertEqual(list(data), + [_get_data(fake_bgpvpn, columns_short) for fake_bgpvpn + in fake_bgpvpns[constants.BGPVPNS]]) + + def test_list_bgpvpn_with_filters(self): + count = 3 + name = 'fake_id0' + layer_type = 'l2' + attrs = {'type': layer_type} + fake_bgpvpns = fakes.FakeBgpvpn.create_bgpvpns(count=count, + attrs=attrs) + returned_bgpvpn = fake_bgpvpns[constants.BGPVPNS][0] + self.neutronclient.list_bgpvpns = mock.Mock( + return_value={constants.BGPVPNS: [returned_bgpvpn]}) + arglist = [ + '--property', 'name=%s' % name, + '--property', 'type=%s' % layer_type, + ] + verifylist = [ + ('property', {'name': name, 'type': layer_type}), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_bgpvpns.assert_called_once_with( + name=name, + type=layer_type) + self.assertEqual(headers, list(headers_short)) + self.assertEqual(list(data), + [_get_data(returned_bgpvpn, columns_short)]) + + +class TestShowBgpvpn(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestShowBgpvpn, self).setUp() + self.cmd = bgpvpn.ShowBgpvpn(self.app, self.namespace) + + def test_show_bgpvpn(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + self.neutronclient.show_bgpvpn = mock.Mock( + return_value={constants.BGPVPN: fake_bgpvpn}) + arglist = [ + fake_bgpvpn['id'], + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.show_bgpvpn.assert_called_once_with( + fake_bgpvpn['id']) + self.assertEqual(sorted_headers, headers) + self.assertEqual(_get_data(fake_bgpvpn), data) diff --git a/neutronclient/tests/unit/osc/v2/networking_bgpvpn/test_resource_association.py b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/test_resource_association.py new file mode 100644 index 0000000..e7bc8ce --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/networking_bgpvpn/test_resource_association.py @@ -0,0 +1,272 @@ +# Copyright (c) 2016 Juniper Networks Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +import operator + +from osc_lib import exceptions +from osc_lib import utils as osc_utils + +from neutronclient.osc import utils as nc_osc_utils +from neutronclient.tests.unit.osc.v2.networking_bgpvpn import fakes + + +columns_short = tuple(col for col, _, listing_mode + in fakes.BgpvpnFakeAssoc._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_SHORT_ONLY)) +columns_long = tuple(col for col, _, listing_mode + in fakes.BgpvpnFakeAssoc._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_LONG_ONLY)) +headers_short = tuple(head for _, head, listing_mode + in fakes.BgpvpnFakeAssoc._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_SHORT_ONLY)) +headers_long = tuple(head for _, head, listing_mode + in fakes.BgpvpnFakeAssoc._attr_map + if listing_mode in (nc_osc_utils.LIST_BOTH, + nc_osc_utils.LIST_LONG_ONLY)) +sorted_attr_map = sorted(fakes.BgpvpnFakeAssoc._attr_map, + key=operator.itemgetter(1)) +sorted_columns = tuple(col for col, _, _ in sorted_attr_map) +sorted_headers = tuple(head for _, head, _ in sorted_attr_map) + + +def _get_data(attrs, columns=sorted_columns): + return osc_utils.get_dict_properties( + attrs, columns, formatters=fakes.BgpvpnFakeAssoc._formatters) + + +class TestCreateResAssoc(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestCreateResAssoc, self).setUp() + self.cmd = fakes.CreateBgpvpnFakeResAssoc(self.app, self.namespace) + + def test_create_resource_association(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_one_resource() + fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association( + fake_res) + self.neutronclient.create_bgpvpn_fake_resource_assoc = mock.Mock( + return_value={fakes.BgpvpnFakeAssoc._resource: fake_res_assoc}) + arglist = [ + fake_bgpvpn['id'], + fake_res['id'], + '--project', fake_bgpvpn['tenant_id'], + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ('resource', fake_res['id']), + ('project', fake_bgpvpn['tenant_id']) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + cols, data = self.cmd.take_action(parsed_args) + + fake_res_assoc_call = copy.deepcopy(fake_res_assoc) + fake_res_assoc_call.pop('id') + self.neutronclient.create_bgpvpn_fake_resource_assoc.\ + assert_called_once_with( + fake_bgpvpn['id'], + {fakes.BgpvpnFakeAssoc._resource: fake_res_assoc_call}) + self.assertEqual(sorted_headers, cols) + self.assertEqual(_get_data(fake_res_assoc), data) + + +class TestDeleteResAssoc(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestDeleteResAssoc, self).setUp() + self.cmd = fakes.DeleteBgpvpnFakeResAssoc(self.app, self.namespace) + + def test_delete_one_association(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_one_resource() + fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association( + fake_res) + self.neutronclient.delete_bgpvpn_fake_resource_assoc = mock.Mock() + arglist = [ + fake_res_assoc['id'], + fake_bgpvpn['id'], + ] + verifylist = [ + ('resource_association_ids', [fake_res_assoc['id']]), + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.neutronclient.delete_bgpvpn_fake_resource_assoc.\ + assert_called_once_with(fake_bgpvpn['id'], fake_res_assoc['id']) + self.assertIsNone(result) + + def test_delete_multi_bpgvpn(self): + count = 3 + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_resources(count=count) + fake_res_assocs = fakes.FakeResAssoc.create_resource_associations( + fake_res) + fake_res_assoc_ids = [ + fake_res_assoc['id'] for fake_res_assoc in + fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural] + ] + self.neutronclient.delete_bgpvpn_fake_resource_assoc = mock.Mock() + arglist = \ + fake_res_assoc_ids + [ + fake_bgpvpn['id'] + ] + verifylist = [ + ('resource_association_ids', fake_res_assoc_ids), + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.neutronclient.delete_bgpvpn_fake_resource_assoc.assert_has_calls( + [mock.call(fake_bgpvpn['id'], id) for id in fake_res_assoc_ids]) + self.assertIsNone(result) + + def test_delete_multi_bpgvpn_with_unknown(self): + count = 3 + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_resources(count=count) + fake_res_assocs = fakes.FakeResAssoc.create_resource_associations( + fake_res) + fake_res_assoc_ids = [ + fake_res_assoc['id'] for fake_res_assoc in + fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural] + ] + + def raise_unknonw_resource(resource_path, name_or_id): + if str(count - 2) in name_or_id: + raise Exception() + self.neutronclient.delete_bgpvpn_fake_resource_assoc = mock.Mock( + side_effect=raise_unknonw_resource) + arglist = \ + fake_res_assoc_ids + [ + fake_bgpvpn['id'] + ] + verifylist = [ + ('resource_association_ids', fake_res_assoc_ids), + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.assertRaises(exceptions.CommandError, self.cmd.take_action, + parsed_args) + + self.neutronclient.delete_bgpvpn_fake_resource_assoc.assert_has_calls( + [mock.call(fake_bgpvpn['id'], id) for id in fake_res_assoc_ids]) + + +class TestListResAssoc(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestListResAssoc, self).setUp() + self.cmd = fakes.ListBgpvpnFakeResAssoc(self.app, self.namespace) + + def test_list_bgpvpn_associations(self): + count = 3 + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_resources(count=count) + fake_res_assocs = fakes.FakeResAssoc.create_resource_associations( + fake_res) + self.neutronclient.list_bgpvpn_fake_resource_assocs = mock.Mock( + return_value=fake_res_assocs) + arglist = [ + fake_bgpvpn['id'], + ] + verifylist = [ + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_bgpvpn_fake_resource_assocs.\ + assert_called_once_with(fake_bgpvpn['id'], retrieve_all=True) + self.assertEqual(headers, list(headers_short)) + self.assertEqual( + list(data), + [_get_data(fake_res_assoc, columns_short) for fake_res_assoc + in fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural]]) + + def test_list_bgpvpn_associations_long_mode(self): + count = 3 + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_resources(count=count) + fake_res_assocs = fakes.FakeResAssoc.create_resource_associations( + fake_res) + self.neutronclient.list_bgpvpn_fake_resource_assocs = mock.Mock( + return_value=fake_res_assocs) + arglist = [ + '--long', + fake_bgpvpn['id'], + ] + verifylist = [ + ('long', True), + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_bgpvpn_fake_resource_assocs.\ + assert_called_once_with(fake_bgpvpn['id'], retrieve_all=True) + self.assertEqual(headers, list(headers_long)) + self.assertEqual( + list(data), + [_get_data(fake_res_assoc, columns_long) for fake_res_assoc + in fake_res_assocs[fakes.BgpvpnFakeAssoc._resource_plural]]) + + +class TestShowResAssoc(fakes.TestNeutronClientBgpvpn): + def setUp(self): + super(TestShowResAssoc, self).setUp() + self.cmd = fakes.ShowBgpvpnFakeResAssoc(self.app, self.namespace) + + def test_show_resource_association(self): + fake_bgpvpn = fakes.FakeBgpvpn.create_one_bgpvpn() + fake_res = fakes.FakeResource.create_one_resource() + fake_res_assoc = fakes.FakeResAssoc.create_one_resource_association( + fake_res) + self.neutronclient.show_bgpvpn_fake_resource_assoc = mock.Mock( + return_value={fakes.BgpvpnFakeAssoc._resource: fake_res_assoc}) + arglist = [ + fake_res_assoc['id'], + fake_bgpvpn['id'], + ] + verifylist = [ + ('resource_association_id', fake_res_assoc['id']), + ('bgpvpn', fake_bgpvpn['id']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + headers, data = self.cmd.take_action(parsed_args) + + self.neutronclient.show_bgpvpn_fake_resource_assoc.\ + assert_called_once_with(fake_bgpvpn['id'], fake_res_assoc['id']) + self.assertEqual(sorted_headers, headers) + self.assertEqual(data, _get_data(fake_res_assoc)) diff --git a/neutronclient/v2_0/client.py b/neutronclient/v2_0/client.py index 728339e..759f776 100644 --- a/neutronclient/v2_0/client.py +++ b/neutronclient/v2_0/client.py @@ -630,6 +630,15 @@ class Client(ClientBase): subports_path = "/trunks/%s/get_subports" subports_add_path = "/trunks/%s/add_subports" subports_remove_path = "/trunks/%s/remove_subports" + bgpvpns_path = "/bgpvpn/bgpvpns" + bgpvpn_path = "/bgpvpn/bgpvpns/%s" + bgpvpn_network_associations_path =\ + "/bgpvpn/bgpvpns/%s/network_associations" + bgpvpn_network_association_path =\ + "/bgpvpn/bgpvpns/%s/network_associations/%s" + bgpvpn_router_associations_path = "/bgpvpn/bgpvpns/%s/router_associations" + bgpvpn_router_association_path =\ + "/bgpvpn/bgpvpns/%s/router_associations/%s" # API has no way to report plurals, so we have to hard code them EXTED_PLURALS = {'routers': 'router', @@ -680,6 +689,9 @@ class Client(ClientBase): 'bgp_peers': 'bgp_peer', 'network_ip_availabilities': 'network_ip_availability', 'trunks': 'trunk', + 'bgpvpns': 'bgpvpn', + 'network_associations': 'network_association', + 'router_associations': 'router_association', } def list_ext(self, collection, path, retrieve_all, **_params): @@ -2071,6 +2083,82 @@ class Client(ClientBase): """Fetch a list of all subports attached to given trunk.""" return self.get(self.subports_path % (trunk), params=_params) + def list_bgpvpns(self, retrieve_all=True, **_params): + """Fetches a list of all BGP VPNs for a project""" + return self.list('bgpvpns', self.bgpvpns_path, retrieve_all, **_params) + + def show_bgpvpn(self, bgpvpn, **_params): + """Fetches information of a certain BGP VPN""" + return self.get(self.bgpvpn_path % bgpvpn, params=_params) + + def create_bgpvpn(self, body=None): + """Creates a new BGP VPN""" + return self.post(self.bgpvpns_path, body=body) + + def update_bgpvpn(self, bgpvpn, body=None): + """Updates a BGP VPN""" + return self.put(self.bgpvpn_path % bgpvpn, body=body) + + def delete_bgpvpn(self, bgpvpn): + """Deletes the specified BGP VPN""" + return self.delete(self.bgpvpn_path % bgpvpn) + + def list_bgpvpn_network_assocs(self, bgpvpn, retrieve_all=True, **_params): + """Fetches a list of network associations for a given BGP VPN.""" + return self.list('network_associations', + self.bgpvpn_network_associations_path % bgpvpn, + retrieve_all, **_params) + + def show_bgpvpn_network_assoc(self, bgpvpn, net_assoc, **_params): + """Fetches information of a certain BGP VPN's network association""" + return self.get( + self.bgpvpn_network_association_path % (bgpvpn, net_assoc), + params=_params) + + def create_bgpvpn_network_assoc(self, bgpvpn, body=None): + """Creates a new BGP VPN network association""" + return self.post(self.bgpvpn_network_associations_path % bgpvpn, + body=body) + + def update_bgpvpn_network_assoc(self, bgpvpn, net_assoc, body=None): + """Updates a BGP VPN network association""" + return self.put( + self.bgpvpn_network_association_path % (bgpvpn, net_assoc), + body=body) + + def delete_bgpvpn_network_assoc(self, bgpvpn, net_assoc): + """Deletes the specified BGP VPN network association""" + return self.delete( + self.bgpvpn_network_association_path % (bgpvpn, net_assoc)) + + def list_bgpvpn_router_assocs(self, bgpvpn, retrieve_all=True, **_params): + """Fetches a list of router associations for a given BGP VPN.""" + return self.list('router_associations', + self.bgpvpn_router_associations_path % bgpvpn, + retrieve_all, **_params) + + def show_bgpvpn_router_assoc(self, bgpvpn, router_assoc, **_params): + """Fetches information of a certain BGP VPN's router association""" + return self.get( + self.bgpvpn_router_association_path % (bgpvpn, router_assoc), + params=_params) + + def create_bgpvpn_router_assoc(self, bgpvpn, body=None): + """Creates a new BGP VPN router association""" + return self.post(self.bgpvpn_router_associations_path % bgpvpn, + body=body) + + def update_bgpvpn_router_assoc(self, bgpvpn, router_assoc, body=None): + """Updates a BGP VPN router association""" + return self.put( + self.bgpvpn_router_association_path % (bgpvpn, router_assoc), + body=body) + + def delete_bgpvpn_router_assoc(self, bgpvpn, router_assoc): + """Deletes the specified BGP VPN router association""" + return self.delete( + self.bgpvpn_router_association_path % (bgpvpn, router_assoc)) + def __init__(self, **kwargs): """Initialize a new client for the Neutron v2.0 API.""" super(Client, self).__init__(**kwargs) |