summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAkihiro Motoki <motoki@da.jp.nec.com>2015-03-01 19:37:40 +0900
committerAkihiro Motoki <motoki@da.jp.nec.com>2015-03-15 16:39:11 +0900
commit4829e25b6a68df0372a3b84c3cc7c9b680ced669 (patch)
tree1cfbf7707b079ba3588622c35765c2df466921cc
parent5a6e6089265a5749641a4a91e1c4c877b9e8b314 (diff)
downloadpython-neutronclient-4829e25b6a68df0372a3b84c3cc7c9b680ced669.tar.gz
security-group-rule-list: show all info of rules briefly
Previously security-group-rule-list does not display full information of rules by default (e.g., port ranges, ether types) and users need to use security-group-rule-show to check details. It is not convenient. This commit introduces some aggregated columns ("protocol/port" and "remote") to show infomration briefly and as a result full attributes of rules will be displayed. Closes-Bug: #1182629 Change-Id: I047bf9a1ccba5b023d66f22ef5256f7786196113
-rw-r--r--neutronclient/neutron/v2_0/securitygroup.py65
-rw-r--r--neutronclient/tests/functional/test_readonly_neutron.py5
-rw-r--r--neutronclient/tests/unit/test_cli20_securitygroup.py205
3 files changed, 200 insertions, 75 deletions
diff --git a/neutronclient/neutron/v2_0/securitygroup.py b/neutronclient/neutron/v2_0/securitygroup.py
index bafa3b0..65e05c9 100644
--- a/neutronclient/neutron/v2_0/securitygroup.py
+++ b/neutronclient/neutron/v2_0/securitygroup.py
@@ -21,6 +21,16 @@ from neutronclient.i18n import _
from neutronclient.neutron import v2_0 as neutronV20
+def _get_remote(rule):
+ if rule['remote_ip_prefix']:
+ remote = '%s (CIDR)' % rule['remote_ip_prefix']
+ elif rule['remote_group_id']:
+ remote = '%s (group)' % rule['remote_group_id']
+ else:
+ remote = None
+ return remote
+
+
def _get_protocol_port(rule):
proto = rule['protocol']
port_min = rule['port_range_min']
@@ -157,10 +167,19 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
"""List security group rules that belong to a given tenant."""
resource = 'security_group_rule'
- list_columns = ['id', 'security_group_id', 'direction', 'protocol',
- 'remote_ip_prefix', 'remote_group_id']
+ list_columns = ['id', 'security_group_id', 'direction',
+ 'ethertype', 'protocol/port', 'remote']
+ # replace_rules: key is an attribute name in Neutron API and
+ # corresponding value is a display name shown by CLI.
replace_rules = {'security_group_id': 'security_group',
'remote_group_id': 'remote_group'}
+ digest_fields = {
+ 'remote': {
+ 'method': _get_remote,
+ 'depends_on': ['remote_ip_prefix', 'remote_group_id']},
+ 'protocol/port': {
+ 'method': _get_protocol_port,
+ 'depends_on': ['protocol', 'port_range_min', 'port_range_max']}}
pagination_support = True
sorting_support = True
@@ -177,19 +196,28 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
rules = dict((rules[k], k) for k in rules.keys())
return [rules.get(col, col) for col in cols]
+ def get_required_fields(self, fields):
+ fields = self.replace_columns(fields, self.replace_rules, reverse=True)
+ for field, digest_fields in self.digest_fields.items():
+ if field in fields:
+ fields += digest_fields['depends_on']
+ fields.remove(field)
+ return fields
+
def retrieve_list(self, parsed_args):
- parsed_args.fields = self.replace_columns(parsed_args.fields,
- self.replace_rules,
- reverse=True)
+ parsed_args.fields = self.get_required_fields(parsed_args.fields)
return super(ListSecurityGroupRule, self).retrieve_list(parsed_args)
- def extend_list(self, data, parsed_args):
- if parsed_args.no_nameconv:
- return
+ def _get_sg_name_dict(self, data, page_size, no_nameconv):
+ """Get names of security groups referred in the retrieved rules.
+
+ :return: a dict from secgroup ID to secgroup name
+ """
+ if no_nameconv:
+ return {}
neutron_client = self.get_client()
search_opts = {'fields': ['id', 'name']}
if self.pagination_support:
- page_size = parsed_args.page_size
if page_size:
search_opts.update({'limit': page_size})
sec_group_ids = set()
@@ -222,14 +250,28 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
secgroups.extend(
_get_sec_group_list(sec_group_ids[i: i + chunk_size]))
- sg_dict = dict([(sg['id'], sg['name'])
- for sg in secgroups if sg['name']])
+ return dict([(sg['id'], sg['name'])
+ for sg in secgroups if sg['name']])
+
+ @staticmethod
+ def _has_fileds(rule, required_fileds):
+ return all([key in rule for key in required_fileds])
+
+ def extend_list(self, data, parsed_args):
+ sg_dict = self._get_sg_name_dict(data, parsed_args.page_size,
+ parsed_args.no_nameconv)
for rule in data:
+ # Replace security group UUID with its name.
for key in self.replace_rules:
if key in rule:
rule[key] = sg_dict.get(rule[key], rule[key])
+ for field, digest_rule in self.digest_fields.items():
+ if self._has_fileds(rule, digest_rule['depends_on']):
+ rule[field] = digest_rule['method'](rule) or 'any'
def setup_columns(self, info, parsed_args):
+ # Translate the specified columns from the command line
+ # into field names used in "info".
parsed_args.columns = self.replace_columns(parsed_args.columns,
self.replace_rules,
reverse=True)
@@ -240,6 +282,7 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
parsed_args)
cols = info[0]
if not parsed_args.no_nameconv:
+ # Replace column names in the header line (in info[0])
cols = self.replace_columns(info[0], self.replace_rules)
parsed_args.columns = cols
return (cols, info[1])
diff --git a/neutronclient/tests/functional/test_readonly_neutron.py b/neutronclient/tests/functional/test_readonly_neutron.py
index eae8cd2..b436b0b 100644
--- a/neutronclient/tests/functional/test_readonly_neutron.py
+++ b/neutronclient/tests/functional/test_readonly_neutron.py
@@ -115,9 +115,8 @@ class SimpleReadOnlyNeutronClientTest(base.ClientTestBase):
security_grp = self.parser.listing(self.neutron
('security-group-rule-list'))
self.assertTableStruct(security_grp, ['id', 'security_group',
- 'direction', 'protocol',
- 'remote_ip_prefix',
- 'remote_group'])
+ 'direction', 'ethertype',
+ 'protocol/port', 'remote'])
def test_neutron_subnet_list(self):
subnet_list = self.parser.listing(self.neutron('subnet-list'))
diff --git a/neutronclient/tests/unit/test_cli20_securitygroup.py b/neutronclient/tests/unit/test_cli20_securitygroup.py
index 400e2b8..5fc715b 100644
--- a/neutronclient/tests/unit/test_cli20_securitygroup.py
+++ b/neutronclient/tests/unit/test_cli20_securitygroup.py
@@ -332,9 +332,9 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
self._test_show_resource(resource, cmd, self.test_id,
args, ['id'])
- def _test_list_security_group_rules_extend(self, data=None, expected=None,
+ def _test_list_security_group_rules_extend(self, api_data, expected,
args=(), conv=True,
- query_field=False):
+ query_fields=None):
def setup_list_stub(resources, data, query):
reses = {resources: data}
resstr = self.client.serialize(reses)
@@ -349,38 +349,22 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
headers=mox.ContainsKeyValue(
'X-Auth-Token', test_cli20.TOKEN)).AndReturn(resp)
- # Setup the default data
- _data = {'cols': ['id', 'security_group_id', 'remote_group_id'],
- 'data': [('ruleid1', 'myid1', 'myid1'),
- ('ruleid2', 'myid2', 'myid3'),
- ('ruleid3', 'myid2', 'myid2')]}
- _expected = {'cols': ['id', 'security_group', 'remote_group'],
- 'data': [('ruleid1', 'group1', 'group1'),
- ('ruleid2', 'group2', 'group3'),
- ('ruleid3', 'group2', 'group2')]}
- if data is None:
- data = _data
- list_data = [dict(zip(data['cols'], d)) for d in data['data']]
- if expected is None:
- expected = {}
- expected['cols'] = expected.get('cols', _expected['cols'])
- expected['data'] = expected.get('data', _expected['data'])
-
cmd = securitygroup.ListSecurityGroupRule(
test_cli20.MyApp(sys.stdout), None)
self.mox.StubOutWithMock(cmd, 'get_client')
self.mox.StubOutWithMock(self.client.httpclient, 'request')
cmd.get_client().AndReturn(self.client)
query = ''
- if query_field:
- query = '&'.join(['fields=' + f for f in data['cols']])
- setup_list_stub('security_group_rules', list_data, query)
+ if query_fields:
+ query = '&'.join(['fields=' + f for f in query_fields])
+ setup_list_stub('security_group_rules', api_data, query)
if conv:
cmd.get_client().AndReturn(self.client)
sec_ids = set()
- for n in data['data']:
- sec_ids.add(n[1])
- sec_ids.add(n[2])
+ for n in api_data:
+ sec_ids.add(n['security_group_id'])
+ if n.get('remote_group_id'):
+ sec_ids.add(n['remote_group_id'])
filters = ''
for id in sec_ids:
filters = filters + "&id=%s" % id
@@ -397,50 +381,131 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
self.mox.VerifyAll()
self.mox.UnsetStubs()
# Check columns
- self.assertEqual(result[0], expected['cols'])
+ self.assertEqual(expected['cols'], result[0])
# Check data
_result = [x for x in result[1]]
self.assertEqual(len(_result), len(expected['data']))
for res, exp in zip(_result, expected['data']):
- self.assertEqual(len(res), len(exp))
- self.assertEqual(res, exp)
-
- def test_list_security_group_rules_extend_source_id(self):
- self._test_list_security_group_rules_extend()
-
- def test_list_security_group_rules_extend_no_nameconv(self):
- expected = {'cols': ['id', 'security_group_id', 'remote_group_id'],
- 'data': [('ruleid1', 'myid1', 'myid1'),
- ('ruleid2', 'myid2', 'myid3'),
- ('ruleid3', 'myid2', 'myid2')]}
- args = ['--no-nameconv']
- self._test_list_security_group_rules_extend(expected=expected,
- args=args, conv=False)
-
- def test_list_security_group_rules_extend_with_columns(self):
+ self.assertEqual(len(exp), len(res))
+ self.assertEqual(exp, res)
+
+ def _test_list_security_group_rules_extend_sg_name(
+ self, expected_mode=None, args=(), conv=True, query_field=False):
+ if query_field:
+ field_filters = ['id', 'security_group_id',
+ 'remote_ip_prefix', 'remote_group_id']
+ else:
+ field_filters = None
+
+ data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
+ remote_group_id='myid1',
+ filters=field_filters),
+ self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
+ remote_group_id='myid3',
+ filters=field_filters),
+ self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
+ remote_group_id='myid2',
+ filters=field_filters),
+ ]
+
+ if expected_mode == 'noconv':
+ expected = {'cols': ['id', 'security_group_id', 'remote_group_id'],
+ 'data': [('ruleid1', 'myid1', 'myid1'),
+ ('ruleid2', 'myid2', 'myid3'),
+ ('ruleid3', 'myid2', 'myid2')]}
+ elif expected_mode == 'remote_group_id':
+ expected = {'cols': ['id', 'security_group', 'remote_group'],
+ 'data': [('ruleid1', 'group1', 'group1'),
+ ('ruleid2', 'group2', 'group3'),
+ ('ruleid3', 'group2', 'group2')]}
+ else:
+ expected = {'cols': ['id', 'security_group', 'remote'],
+ 'data': [('ruleid1', 'group1', 'group1 (group)'),
+ ('ruleid2', 'group2', 'group3 (group)'),
+ ('ruleid3', 'group2', 'group2 (group)')]}
+
+ self._test_list_security_group_rules_extend(
+ data, expected, args=args, conv=conv, query_fields=field_filters)
+
+ def test_list_security_group_rules_extend_remote_sg_name(self):
+ args = '-c id -c security_group -c remote'.split()
+ self._test_list_security_group_rules_extend_sg_name(args=args)
+
+ def test_list_security_group_rules_extend_sg_name_noconv(self):
+ args = '--no-nameconv -c id -c security_group_id -c remote_group_id'
+ args = args.split()
+ self._test_list_security_group_rules_extend_sg_name(
+ expected_mode='noconv', args=args, conv=False)
+
+ def test_list_security_group_rules_extend_sg_name_with_columns(self):
args = '-c id -c security_group_id -c remote_group_id'.split()
- self._test_list_security_group_rules_extend(args=args)
+ self._test_list_security_group_rules_extend_sg_name(
+ expected_mode='remote_group_id', args=args)
- def test_list_security_group_rules_extend_with_columns_no_id(self):
+ def test_list_security_group_rules_extend_sg_name_with_columns_no_id(self):
args = '-c id -c security_group -c remote_group'.split()
- self._test_list_security_group_rules_extend(args=args)
-
- def test_list_security_group_rules_extend_with_fields(self):
- args = '-F id -F security_group_id -F remote_group_id'.split()
- self._test_list_security_group_rules_extend(args=args,
- query_field=True)
-
- def test_list_security_group_rules_extend_with_fields_no_id(self):
- args = '-F id -F security_group -F remote_group'.split()
- self._test_list_security_group_rules_extend(args=args,
- query_field=True)
-
- def _prepare_rule(self, direction=None, ethertype=None,
+ self._test_list_security_group_rules_extend_sg_name(
+ expected_mode='remote_group_id', args=args)
+
+ def test_list_security_group_rules_extend_sg_name_with_fields(self):
+ # NOTE: remote_ip_prefix is required to show "remote" column
+ args = ('-F id -F security_group_id '
+ '-F remote_ip_prefix -F remote_group_id').split()
+ self._test_list_security_group_rules_extend_sg_name(
+ args=args, query_field=True)
+
+ def test_list_security_group_rules_extend_sg_name_with_fields_no_id(self):
+ # NOTE: remote_ip_prefix is required to show "remote" column
+ args = ('-F id -F security_group '
+ '-F remote_ip_prefix -F remote_group').split()
+ self._test_list_security_group_rules_extend_sg_name(args=args,
+ query_field=True)
+
+ def test_list_security_group_rules_extend_remote(self):
+ args = '-c id -c security_group -c remote'.split()
+
+ data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
+ remote_ip_prefix='172.16.18.0/24'),
+ self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
+ remote_ip_prefix='172.16.20.0/24'),
+ self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
+ remote_group_id='myid3')]
+ expected = {'cols': ['id', 'security_group', 'remote'],
+ 'data': [('ruleid1', 'group1', '172.16.18.0/24 (CIDR)'),
+ ('ruleid2', 'group2', '172.16.20.0/24 (CIDR)'),
+ ('ruleid3', 'group2', 'group3 (group)')]}
+ self._test_list_security_group_rules_extend(data, expected, args)
+
+ def test_list_security_group_rules_extend_proto_port(self):
+ data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
+ protocol='tcp',
+ port_range_min=22, port_range_max=22),
+ self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
+ direction='egress', ethertype='IPv6',
+ protocol='udp',
+ port_range_min=80, port_range_max=81),
+ self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
+ protocol='icmp',
+ remote_ip_prefix='10.2.0.0/16')]
+ expected = {
+ 'cols': ['id', 'security_group', 'direction', 'ethertype',
+ 'protocol/port', 'remote'],
+ 'data': [
+ ('ruleid1', 'group1', 'ingress', 'IPv4', '22/tcp', 'any'),
+ ('ruleid2', 'group2', 'egress', 'IPv6', '80-81/udp', 'any'),
+ ('ruleid3', 'group2', 'ingress', 'IPv4', 'icmp',
+ '10.2.0.0/16 (CIDR)')
+ ]}
+ self._test_list_security_group_rules_extend(data, expected)
+
+ def _prepare_rule(self, rule_id=None, sg_id=None, tenant_id=None,
+ direction=None, ethertype=None,
protocol=None, port_range_min=None, port_range_max=None,
- remote_ip_prefix=None, remote_group_id=None):
- return {'id': str(uuid.uuid4()),
- 'tenant_id': str(uuid.uuid4()),
- 'security_group_id': str(uuid.uuid4()),
+ remote_ip_prefix=None, remote_group_id=None,
+ filters=None):
+ rule = {'id': rule_id or str(uuid.uuid4()),
+ 'tenant_id': tenant_id or str(uuid.uuid4()),
+ 'security_group_id': sg_id or str(uuid.uuid4()),
'direction': direction or 'ingress',
'ethertype': ethertype or 'IPv4',
'protocol': protocol,
@@ -448,6 +513,24 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
'port_range_max': port_range_max,
'remote_ip_prefix': remote_ip_prefix,
'remote_group_id': remote_group_id}
+ if filters:
+ return dict([(k, v) for k, v in rule.items() if k in filters])
+ else:
+ return rule
+
+ def test__get_remote_both_unspecified(self):
+ sg_rule = self._prepare_rule(remote_ip_prefix=None,
+ remote_group_id=None)
+ self.assertIsNone(securitygroup._get_remote(sg_rule))
+
+ def test__get_remote_remote_ip_prefix_specified(self):
+ sg_rule = self._prepare_rule(remote_ip_prefix='172.16.18.0/24')
+ self.assertEqual('172.16.18.0/24 (CIDR)',
+ securitygroup._get_remote(sg_rule))
+
+ def test__get_remote_remote_group_specified(self):
+ sg_rule = self._prepare_rule(remote_group_id='sg_id1')
+ self.assertEqual('sg_id1 (group)', securitygroup._get_remote(sg_rule))
def test__get_protocol_port_all_none(self):
sg_rule = self._prepare_rule()