summaryrefslogtreecommitdiff
path: root/openstackclient/tests
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests')
-rw-r--r--openstackclient/tests/functional/compute/v2/test_server.py4
-rw-r--r--openstackclient/tests/unit/api/test_compute_v2.py228
-rw-r--r--openstackclient/tests/unit/compute/v2/fakes.py15
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server.py45
-rw-r--r--openstackclient/tests/unit/fakes.py3
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip_compute.py265
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip_network.py (renamed from openstackclient/tests/unit/network/v2/test_floating_ip.py)253
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py (renamed from openstackclient/tests/unit/network/v2/test_floating_ip_pool.py)34
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py46
-rw-r--r--openstackclient/tests/unit/network/v2/test_router.py29
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_compute.py405
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_network.py (renamed from openstackclient/tests/unit/network/v2/test_security_group.py)382
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py572
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_rule_network.py (renamed from openstackclient/tests/unit/network/v2/test_security_group_rule.py)535
14 files changed, 1584 insertions, 1232 deletions
diff --git a/openstackclient/tests/functional/compute/v2/test_server.py b/openstackclient/tests/functional/compute/v2/test_server.py
index f152de80..dd257e9a 100644
--- a/openstackclient/tests/functional/compute/v2/test_server.py
+++ b/openstackclient/tests/functional/compute/v2/test_server.py
@@ -371,7 +371,7 @@ class ServerTests(common.ComputeTestCase):
server_name = uuid.uuid4().hex
server = json.loads(self.openstack(
# auto/none enable in nova micro version (v2.37+)
- '--os-compute-api-version 2.latest ' +
+ '--os-compute-api-version 2.37 ' +
'server create -f json ' +
'--flavor ' + self.flavor_name + ' ' +
'--image ' + self.image_name + ' ' +
@@ -394,7 +394,7 @@ class ServerTests(common.ComputeTestCase):
try:
self.openstack(
# auto/none enable in nova micro version (v2.37+)
- '--os-compute-api-version 2.latest ' +
+ '--os-compute-api-version 2.37 ' +
'server create -f json ' +
'--flavor ' + self.flavor_name + ' ' +
'--image ' + self.image_name + ' ' +
diff --git a/openstackclient/tests/unit/api/test_compute_v2.py b/openstackclient/tests/unit/api/test_compute_v2.py
new file mode 100644
index 00000000..f443e810
--- /dev/null
+++ b/openstackclient/tests/unit/api/test_compute_v2.py
@@ -0,0 +1,228 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Compute v2 API Library Tests"""
+
+from requests_mock.contrib import fixture
+
+from keystoneclient import session
+from openstackclient.api import compute_v2 as compute
+from openstackclient.tests.unit import utils
+from osc_lib import exceptions as osc_lib_exceptions
+
+
+FAKE_PROJECT = 'xyzpdq'
+FAKE_URL = 'http://gopher.com/v2'
+
+
+class TestComputeAPIv2(utils.TestCase):
+
+ def setUp(self):
+ super(TestComputeAPIv2, self).setUp()
+ sess = session.Session()
+ self.api = compute.APIv2(session=sess, endpoint=FAKE_URL)
+ self.requests_mock = self.useFixture(fixture.Fixture())
+
+
+class TestSecurityGroup(TestComputeAPIv2):
+
+ FAKE_SECURITY_GROUP_RESP = {
+ 'id': '1',
+ 'name': 'sg1',
+ 'description': 'test security group',
+ 'tenant_id': '0123456789',
+ 'rules': []
+ }
+ FAKE_SECURITY_GROUP_RESP_2 = {
+ 'id': '2',
+ 'name': 'sg2',
+ 'description': 'another test security group',
+ 'tenant_id': '0123456789',
+ 'rules': []
+ }
+ LIST_SECURITY_GROUP_RESP = [
+ FAKE_SECURITY_GROUP_RESP_2,
+ FAKE_SECURITY_GROUP_RESP,
+ ]
+
+ def test_security_group_create_default(self):
+ self.requests_mock.register_uri(
+ 'POST',
+ FAKE_URL + '/os-security-groups',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_create('sg1')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_create_options(self):
+ self.requests_mock.register_uri(
+ 'POST',
+ FAKE_URL + '/os-security-groups',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_create(
+ name='sg1',
+ description='desc',
+ )
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_delete_id(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'DELETE',
+ FAKE_URL + '/os-security-groups/1',
+ status_code=202,
+ )
+ ret = self.api.security_group_delete('1')
+ self.assertEqual(202, ret.status_code)
+ self.assertEqual("", ret.text)
+
+ def test_security_group_delete_name(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg1',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'DELETE',
+ FAKE_URL + '/os-security-groups/1',
+ status_code=202,
+ )
+ ret = self.api.security_group_delete('sg1')
+ self.assertEqual(202, ret.status_code)
+ self.assertEqual("", ret.text)
+
+ def test_security_group_delete_not_found(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg3',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.assertRaises(
+ osc_lib_exceptions.NotFound,
+ self.api.security_group_delete,
+ 'sg3',
+ )
+
+ def test_security_group_find_id(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_find('1')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_find_name(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg2',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_find('sg2')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret)
+
+ def test_security_group_find_not_found(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg3',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.assertRaises(
+ osc_lib_exceptions.NotFound,
+ self.api.security_group_find,
+ 'sg3',
+ )
+
+ def test_security_group_list_no_options(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_list()
+ self.assertEqual(self.LIST_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_set_options_id(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'PUT',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_set(
+ security_group='1',
+ description='desc2')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_set_options_name(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg2',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'PUT',
+ FAKE_URL + '/os-security-groups/2',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP_2},
+ status_code=200,
+ )
+ ret = self.api.security_group_set(
+ security_group='sg2',
+ description='desc2')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret)
diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py
index 4a194859..05cb5076 100644
--- a/openstackclient/tests/unit/compute/v2/fakes.py
+++ b/openstackclient/tests/unit/compute/v2/fakes.py
@@ -17,6 +17,7 @@ import copy
import mock
import uuid
+from openstackclient.api import compute_v2
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
from openstackclient.tests.unit.image.v2 import fakes as image_fakes
@@ -180,9 +181,6 @@ class FakeComputev2Client(object):
self.hypervisors_stats = mock.Mock()
self.hypervisors_stats.resource_class = fakes.FakeResource(None, {})
- self.security_groups = mock.Mock()
- self.security_groups.resource_class = fakes.FakeResource(None, {})
-
self.security_group_rules = mock.Mock()
self.security_group_rules.resource_class = fakes.FakeResource(None, {})
@@ -222,6 +220,11 @@ class TestComputev2(utils.TestCommand):
token=fakes.AUTH_TOKEN,
)
+ self.app.client_manager.compute.api = compute_v2.APIv2(
+ session=self.app.client_manager.session,
+ endpoint=fakes.AUTH_URL,
+ )
+
self.app.client_manager.identity = identity_fakes.FakeIdentityv2Client(
endpoint=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
@@ -485,11 +488,7 @@ class FakeSecurityGroup(object):
# Overwrite default attributes.
security_group_attrs.update(attrs)
-
- security_group = fakes.FakeResource(
- info=copy.deepcopy(security_group_attrs),
- loaded=True)
- return security_group
+ return security_group_attrs
@staticmethod
def create_security_groups(attrs=None, count=2):
diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py
index fed847f1..71288a31 100644
--- a/openstackclient/tests/unit/compute/v2/test_server.py
+++ b/openstackclient/tests/unit/compute/v2/test_server.py
@@ -41,11 +41,6 @@ class TestServer(compute_fakes.TestComputev2):
self.flavors_mock = self.app.client_manager.compute.flavors
self.flavors_mock.reset_mock()
- # Get a shortcut to the compute client SecurityGroupManager Mock
- self.security_groups_mock = \
- self.app.client_manager.compute.security_groups
- self.security_groups_mock.reset_mock()
-
# Get a shortcut to the image client ImageManager Mock
self.images_mock = self.app.client_manager.image.images
self.images_mock.reset_mock()
@@ -232,6 +227,9 @@ class TestServerAddPort(TestServer):
self.find_port.assert_not_called()
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_find'
+)
class TestServerAddSecurityGroup(TestServer):
def setUp(self):
@@ -239,11 +237,9 @@ class TestServerAddSecurityGroup(TestServer):
self.security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
- # This is the return value for utils.find_resource() for security group
- self.security_groups_mock.get.return_value = self.security_group
attrs = {
- 'security_groups': [{'name': self.security_group.id}]
+ 'security_groups': [{'name': self.security_group['id']}]
}
methods = {
'add_security_group': None,
@@ -259,23 +255,24 @@ class TestServerAddSecurityGroup(TestServer):
# Get the command object to test
self.cmd = server.AddServerSecurityGroup(self.app, None)
- def test_server_add_security_group(self):
+ def test_server_add_security_group(self, sg_find_mock):
+ sg_find_mock.return_value = self.security_group
arglist = [
self.server.id,
- self.security_group.id
+ self.security_group['id']
]
verifylist = [
('server', self.server.id),
- ('group', self.security_group.id),
+ ('group', self.security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.security_groups_mock.get.assert_called_with(
- self.security_group.id,
+ sg_find_mock.assert_called_with(
+ self.security_group['id'],
)
self.servers_mock.get.assert_called_with(self.server.id)
self.server.add_security_group.assert_called_with(
- self.security_group.id,
+ self.security_group['id'],
)
self.assertIsNone(result)
@@ -1716,6 +1713,9 @@ class TestServerRemovePort(TestServer):
self.find_port.assert_not_called()
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_find'
+)
class TestServerRemoveSecurityGroup(TestServer):
def setUp(self):
@@ -1723,11 +1723,9 @@ class TestServerRemoveSecurityGroup(TestServer):
self.security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
- # This is the return value for utils.find_resource() for security group
- self.security_groups_mock.get.return_value = self.security_group
attrs = {
- 'security_groups': [{'name': self.security_group.id}]
+ 'security_groups': [{'name': self.security_group['id']}]
}
methods = {
'remove_security_group': None,
@@ -1743,23 +1741,24 @@ class TestServerRemoveSecurityGroup(TestServer):
# Get the command object to test
self.cmd = server.RemoveServerSecurityGroup(self.app, None)
- def test_server_remove_security_group(self):
+ def test_server_remove_security_group(self, sg_find_mock):
+ sg_find_mock.return_value = self.security_group
arglist = [
self.server.id,
- self.security_group.id
+ self.security_group['id']
]
verifylist = [
('server', self.server.id),
- ('group', self.security_group.id),
+ ('group', self.security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.security_groups_mock.get.assert_called_with(
- self.security_group.id,
+ sg_find_mock.assert_called_with(
+ self.security_group['id'],
)
self.servers_mock.get.assert_called_with(self.server.id)
self.server.remove_security_group.assert_called_with(
- self.security_group.id,
+ self.security_group['id'],
)
self.assertIsNone(result)
diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py
index f28f9103..999694b7 100644
--- a/openstackclient/tests/unit/fakes.py
+++ b/openstackclient/tests/unit/fakes.py
@@ -228,6 +228,9 @@ class FakeResource(object):
def get(self, item, default=None):
return self._info.get(item, default)
+ def pop(self, key, default_value=None):
+ return self.info.pop(key, default_value)
+
class FakeResponse(requests.Response):
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py b/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py
new file mode 100644
index 00000000..23cd82d2
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip_compute.py
@@ -0,0 +1,265 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import floating_ip as fip
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+# Tests for Nova network
+
+class TestFloatingIPCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestFloatingIPCompute, self).setUp()
+
+ # Get a shortcut to the compute client
+ self.compute = self.app.client_manager.compute
+
+
+class TestCreateFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ip to be deleted.
+ floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
+
+ columns = (
+ 'fixed_ip',
+ 'id',
+ 'instance_id',
+ 'ip',
+ 'pool',
+ )
+
+ data = (
+ floating_ip.fixed_ip,
+ floating_ip.id,
+ floating_ip.instance_id,
+ floating_ip.ip,
+ floating_ip.pool,
+ )
+
+ def setUp(self):
+ super(TestCreateFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ips.create.return_value = self.floating_ip
+
+ # Get the command object to test
+ self.cmd = fip.CreateFloatingIP(self.app, None)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ self.floating_ip.pool,
+ ]
+ verifylist = [
+ ('network', self.floating_ip.pool),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ips.create.assert_called_once_with(
+ self.floating_ip.pool)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ips to be deleted.
+ floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2)
+
+ def setUp(self):
+ super(TestDeleteFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ips.delete.return_value = None
+
+ # Return value of utils.find_resource()
+ self.compute.floating_ips.get = (
+ compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
+
+ # Get the command object to test
+ self.cmd = fip.DeleteFloatingIP(self.app, None)
+
+ def test_floating_ip_delete(self):
+ arglist = [
+ self.floating_ips[0].id,
+ ]
+ verifylist = [
+ ('floating_ip', [self.floating_ips[0].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ips.delete.assert_called_once_with(
+ self.floating_ips[0].id
+ )
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete(self):
+ arglist = []
+ verifylist = []
+
+ for f in self.floating_ips:
+ arglist.append(f.id)
+ verifylist = [
+ ('floating_ip', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for f in self.floating_ips:
+ calls.append(call(f.id))
+ self.compute.floating_ips.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete_with_exception(self):
+ arglist = [
+ self.floating_ips[0].id,
+ 'unexist_floating_ip',
+ ]
+ verifylist = [
+ ('floating_ip',
+ [self.floating_ips[0].id, 'unexist_floating_ip']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.floating_ips[0], exceptions.CommandError]
+ self.compute.floating_ips.get = (
+ mock.Mock(side_effect=find_mock_result)
+ )
+ self.compute.floating_ips.find.side_effect = exceptions.NotFound(None)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
+
+ self.compute.floating_ips.get.assert_any_call(
+ self.floating_ips[0].id)
+ self.compute.floating_ips.get.assert_any_call(
+ 'unexist_floating_ip')
+ self.compute.floating_ips.delete.assert_called_once_with(
+ self.floating_ips[0].id
+ )
+
+
+class TestListFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ips to be list up
+ floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3)
+
+ columns = (
+ 'ID',
+ 'Floating IP Address',
+ 'Fixed IP Address',
+ 'Server',
+ 'Pool',
+ )
+
+ data = []
+ for ip in floating_ips:
+ data.append((
+ ip.id,
+ ip.ip,
+ ip.fixed_ip,
+ ip.instance_id,
+ ip.pool,
+ ))
+
+ def setUp(self):
+ super(TestListFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ips.list.return_value = self.floating_ips
+
+ # Get the command object to test
+ self.cmd = fip.ListFloatingIP(self.app, None)
+
+ def test_floating_ip_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ips.list.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestShowFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ip to display.
+ floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
+
+ columns = (
+ 'fixed_ip',
+ 'id',
+ 'instance_id',
+ 'ip',
+ 'pool',
+ )
+
+ data = (
+ floating_ip.fixed_ip,
+ floating_ip.id,
+ floating_ip.instance_id,
+ floating_ip.ip,
+ floating_ip.pool,
+ )
+
+ def setUp(self):
+ super(TestShowFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Return value of utils.find_resource()
+ self.compute.floating_ips.get.return_value = self.floating_ip
+
+ # Get the command object to test
+ self.cmd = fip.ShowFloatingIP(self.app, None)
+
+ def test_floating_ip_show(self):
+ arglist = [
+ self.floating_ip.id,
+ ]
+ verifylist = [
+ ('floating_ip', self.floating_ip.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip.py b/openstackclient/tests/unit/network/v2/test_floating_ip_network.py
index 69fb1419..4fbbc822 100644
--- a/openstackclient/tests/unit/network/v2/test_floating_ip.py
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip_network.py
@@ -17,14 +17,13 @@ from mock import call
from osc_lib import exceptions
from openstackclient.network.v2 import floating_ip as fip
-from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
from openstackclient.tests.unit import utils as tests_utils
# Tests for Neutron network
-#
+
class TestFloatingIPNetwork(network_fakes.TestNetworkV2):
def setUp(self):
@@ -612,7 +611,7 @@ class TestSetFloatingIP(TestFloatingIPNetwork):
self.cmd = fip.SetFloatingIP(self.app, self.namespace)
@mock.patch(
- "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "openstackclient.tests.unit.network.v2.test_floating_ip_network." +
"fip._find_floating_ip"
)
def test_port_option(self, find_floating_ip_mock):
@@ -645,7 +644,7 @@ class TestSetFloatingIP(TestFloatingIPNetwork):
self.floating_ip, **attrs)
@mock.patch(
- "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "openstackclient.tests.unit.network.v2.test_floating_ip_network." +
"fip._find_floating_ip"
)
def test_fixed_ip_option(self, find_floating_ip_mock):
@@ -702,7 +701,7 @@ class TestUnsetFloatingIP(TestFloatingIPNetwork):
self.cmd = fip.UnsetFloatingIP(self.app, self.namespace)
@mock.patch(
- "openstackclient.tests.unit.network.v2.test_floating_ip." +
+ "openstackclient.tests.unit.network.v2.test_floating_ip_network." +
"fip._find_floating_ip"
)
def test_floating_ip_unset_port(self, find_floating_ip_mock):
@@ -733,247 +732,3 @@ class TestUnsetFloatingIP(TestFloatingIPNetwork):
self.floating_ip, **attrs)
self.assertIsNone(result)
-
-
-# Tests for Nova network
-#
-class TestFloatingIPCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestFloatingIPCompute, self).setUp()
-
- # Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
-
-
-class TestCreateFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ip to be deleted.
- floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
-
- columns = (
- 'fixed_ip',
- 'id',
- 'instance_id',
- 'ip',
- 'pool',
- )
-
- data = (
- floating_ip.fixed_ip,
- floating_ip.id,
- floating_ip.instance_id,
- floating_ip.ip,
- floating_ip.pool,
- )
-
- def setUp(self):
- super(TestCreateFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ips.create.return_value = self.floating_ip
-
- # Get the command object to test
- self.cmd = fip.CreateFloatingIP(self.app, None)
-
- def test_create_no_options(self):
- arglist = []
- verifylist = []
-
- self.assertRaises(tests_utils.ParserException, self.check_parser,
- self.cmd, arglist, verifylist)
-
- def test_create_default_options(self):
- arglist = [
- self.floating_ip.pool,
- ]
- verifylist = [
- ('network', self.floating_ip.pool),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ips.create.assert_called_once_with(
- self.floating_ip.pool)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
-class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ips to be deleted.
- floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2)
-
- def setUp(self):
- super(TestDeleteFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ips.delete.return_value = None
-
- # Return value of utils.find_resource()
- self.compute.floating_ips.get = (
- compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
-
- # Get the command object to test
- self.cmd = fip.DeleteFloatingIP(self.app, None)
-
- def test_floating_ip_delete(self):
- arglist = [
- self.floating_ips[0].id,
- ]
- verifylist = [
- ('floating_ip', [self.floating_ips[0].id]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ips.delete.assert_called_once_with(
- self.floating_ips[0].id
- )
- self.assertIsNone(result)
-
- def test_multi_floating_ips_delete(self):
- arglist = []
- verifylist = []
-
- for f in self.floating_ips:
- arglist.append(f.id)
- verifylist = [
- ('floating_ip', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for f in self.floating_ips:
- calls.append(call(f.id))
- self.compute.floating_ips.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_floating_ips_delete_with_exception(self):
- arglist = [
- self.floating_ips[0].id,
- 'unexist_floating_ip',
- ]
- verifylist = [
- ('floating_ip',
- [self.floating_ips[0].id, 'unexist_floating_ip']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self.floating_ips[0], exceptions.CommandError]
- self.compute.floating_ips.get = (
- mock.Mock(side_effect=find_mock_result)
- )
- self.compute.floating_ips.find.side_effect = exceptions.NotFound(None)
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
-
- self.compute.floating_ips.get.assert_any_call(
- self.floating_ips[0].id)
- self.compute.floating_ips.get.assert_any_call(
- 'unexist_floating_ip')
- self.compute.floating_ips.delete.assert_called_once_with(
- self.floating_ips[0].id
- )
-
-
-class TestListFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ips to be list up
- floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3)
-
- columns = (
- 'ID',
- 'Floating IP Address',
- 'Fixed IP Address',
- 'Server',
- 'Pool',
- )
-
- data = []
- for ip in floating_ips:
- data.append((
- ip.id,
- ip.ip,
- ip.fixed_ip,
- ip.instance_id,
- ip.pool,
- ))
-
- def setUp(self):
- super(TestListFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.floating_ips.list.return_value = self.floating_ips
-
- # Get the command object to test
- self.cmd = fip.ListFloatingIP(self.app, None)
-
- def test_floating_ip_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.floating_ips.list.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
-
-class TestShowFloatingIPCompute(TestFloatingIPCompute):
-
- # The floating ip to display.
- floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
-
- columns = (
- 'fixed_ip',
- 'id',
- 'instance_id',
- 'ip',
- 'pool',
- )
-
- data = (
- floating_ip.fixed_ip,
- floating_ip.id,
- floating_ip.instance_id,
- floating_ip.ip,
- floating_ip.pool,
- )
-
- def setUp(self):
- super(TestShowFloatingIPCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Return value of utils.find_resource()
- self.compute.floating_ips.get.return_value = self.floating_ip
-
- # Get the command object to test
- self.cmd = fip.ShowFloatingIP(self.app, None)
-
- def test_floating_ip_show(self):
- arglist = [
- self.floating_ip.id,
- ]
- verifylist = [
- ('floating_ip', self.floating_ip.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py
index 11d01d36..8db21430 100644
--- a/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_compute.py
@@ -11,44 +11,12 @@
# under the License.
#
-from osc_lib import exceptions
-
from openstackclient.network.v2 import floating_ip_pool
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
-from openstackclient.tests.unit.network.v2 import fakes as network_fakes
-
-
-# Tests for Network API v2
-#
-class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2):
-
- def setUp(self):
- super(TestFloatingIPPoolNetwork, self).setUp()
-
- # Get a shortcut to the network client
- self.network = self.app.client_manager.network
-
-
-class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork):
-
- def setUp(self):
- super(TestListFloatingIPPoolNetwork, self).setUp()
-
- # Get the command object to test
- self.cmd = floating_ip_pool.ListFloatingIPPool(self.app,
- self.namespace)
-
- def test_floating_ip_list(self):
- arglist = []
- verifylist = []
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- self.assertRaises(exceptions.CommandError, self.cmd.take_action,
- parsed_args)
# Tests for Compute network
-#
+
class TestFloatingIPPoolCompute(compute_fakes.TestComputev2):
def setUp(self):
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py
new file mode 100644
index 00000000..95ff5549
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool_network.py
@@ -0,0 +1,46 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import floating_ip_pool
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+
+
+# Tests for Network API v2
+
+class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestFloatingIPPoolNetwork, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+
+class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork):
+
+ def setUp(self):
+ super(TestListFloatingIPPoolNetwork, self).setUp()
+
+ # Get the command object to test
+ self.cmd = floating_ip_pool.ListFloatingIPPool(self.app,
+ self.namespace)
+
+ def test_floating_ip_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
diff --git a/openstackclient/tests/unit/network/v2/test_router.py b/openstackclient/tests/unit/network/v2/test_router.py
index a4f91997..02e0be94 100644
--- a/openstackclient/tests/unit/network/v2/test_router.py
+++ b/openstackclient/tests/unit/network/v2/test_router.py
@@ -211,6 +211,35 @@ class TestCreateRouter(TestRouter):
def test_create_with_no_ha_option(self):
self._test_create_with_ha_options('--no-ha', False)
+ def _test_create_with_distributed_options(self, option, distributed):
+ arglist = [
+ option,
+ self.new_router.name,
+ ]
+ verifylist = [
+ ('name', self.new_router.name),
+ ('enable', True),
+ ('distributed', distributed),
+ ('centralized', not distributed),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_router.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'name': self.new_router.name,
+ 'distributed': distributed,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_with_distributed_option(self):
+ self._test_create_with_distributed_options('--distributed', True)
+
+ def test_create_with_centralized_option(self):
+ self._test_create_with_distributed_options('--centralized', False)
+
def test_create_with_AZ_hints(self):
arglist = [
self.new_router.name,
diff --git a/openstackclient/tests/unit/network/v2/test_security_group_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_compute.py
new file mode 100644
index 00000000..db9831bb
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_security_group_compute.py
@@ -0,0 +1,405 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import security_group
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestSecurityGroupCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestSecurityGroupCompute, self).setUp()
+
+ # Get a shortcut to the compute client
+ self.compute = self.app.client_manager.compute
+
+
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_create'
+)
+class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
+
+ project = identity_fakes.FakeProject.create_one_project()
+ domain = identity_fakes.FakeDomain.create_one_domain()
+
+ # The security group to be shown.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ columns = (
+ 'description',
+ 'id',
+ 'name',
+ 'project_id',
+ 'rules',
+ )
+
+ data = (
+ _security_group['description'],
+ _security_group['id'],
+ _security_group['name'],
+ _security_group['tenant_id'],
+ '',
+ )
+
+ def setUp(self):
+ super(TestCreateSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Get the command object to test
+ self.cmd = security_group.CreateSecurityGroup(self.app, None)
+
+ def test_security_group_create_no_options(self, sg_mock):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_security_group_create_min_options(self, sg_mock):
+ sg_mock.return_value = self._security_group
+ arglist = [
+ self._security_group['name'],
+ ]
+ verifylist = [
+ ('name', self._security_group['name']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ sg_mock.assert_called_once_with(
+ self._security_group['name'],
+ self._security_group['name'],
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_security_group_create_all_options(self, sg_mock):
+ sg_mock.return_value = self._security_group
+ arglist = [
+ '--description', self._security_group['description'],
+ self._security_group['name'],
+ ]
+ verifylist = [
+ ('description', self._security_group['description']),
+ ('name', self._security_group['name']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ sg_mock.assert_called_once_with(
+ self._security_group['name'],
+ self._security_group['description'],
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_delete'
+)
+class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security groups to be deleted.
+ _security_groups = \
+ compute_fakes.FakeSecurityGroup.create_security_groups()
+
+ def setUp(self):
+ super(TestDeleteSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.api.security_group_find = (
+ compute_fakes.FakeSecurityGroup.get_security_groups(
+ self._security_groups)
+ )
+
+ # Get the command object to test
+ self.cmd = security_group.DeleteSecurityGroup(self.app, None)
+
+ def test_security_group_delete(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ arglist = [
+ self._security_groups[0]['id'],
+ ]
+ verifylist = [
+ ('group', [self._security_groups[0]['id']]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ sg_mock.assert_called_once_with(
+ self._security_groups[0]['id'],
+ )
+ self.assertIsNone(result)
+
+ def test_security_group_multi_delete(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ arglist = []
+ verifylist = []
+
+ for s in self._security_groups:
+ arglist.append(s['id'])
+ verifylist = [
+ ('group', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_groups:
+ calls.append(call(s['id']))
+ sg_mock.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_security_group_multi_delete_with_exception(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ sg_mock.side_effect = ([
+ mock.Mock(return_value=None),
+ exceptions.CommandError,
+ ])
+ arglist = [
+ self._security_groups[0]['id'],
+ 'unexist_security_group',
+ ]
+ verifylist = [
+ ('group',
+ [self._security_groups[0]['id'], 'unexist_security_group']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 groups failed to delete.', str(e))
+
+ sg_mock.assert_any_call(self._security_groups[0]['id'])
+ sg_mock.assert_any_call('unexist_security_group')
+
+
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_list'
+)
+class TestListSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security group to be listed.
+ _security_groups = \
+ compute_fakes.FakeSecurityGroup.create_security_groups(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Description',
+ )
+ columns_all_projects = (
+ 'ID',
+ 'Name',
+ 'Description',
+ 'Project',
+ )
+
+ data = []
+ for grp in _security_groups:
+ data.append((
+ grp['id'],
+ grp['name'],
+ grp['description'],
+ ))
+ data_all_projects = []
+ for grp in _security_groups:
+ data_all_projects.append((
+ grp['id'],
+ grp['name'],
+ grp['description'],
+ grp['tenant_id'],
+ ))
+
+ def setUp(self):
+ super(TestListSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Get the command object to test
+ self.cmd = security_group.ListSecurityGroup(self.app, None)
+
+ def test_security_group_list_no_options(self, sg_mock):
+ sg_mock.return_value = self._security_groups
+ arglist = []
+ verifylist = [
+ ('all_projects', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ kwargs = {'search_opts': {'all_tenants': False}}
+ sg_mock.assert_called_once_with(**kwargs)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_security_group_list_all_projects(self, sg_mock):
+ sg_mock.return_value = self._security_groups
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ kwargs = {'search_opts': {'all_tenants': True}}
+ sg_mock.assert_called_once_with(**kwargs)
+ self.assertEqual(self.columns_all_projects, columns)
+ self.assertEqual(self.data_all_projects, list(data))
+
+
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_set'
+)
+class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security group to be set.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ def setUp(self):
+ super(TestSetSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.api.security_group_find = mock.Mock(
+ return_value=self._security_group)
+
+ # Get the command object to test
+ self.cmd = security_group.SetSecurityGroup(self.app, None)
+
+ def test_security_group_set_no_options(self, sg_mock):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_security_group_set_no_updates(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ arglist = [
+ self._security_group['name'],
+ ]
+ verifylist = [
+ ('group', self._security_group['name']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ sg_mock.assert_called_once_with(
+ self._security_group,
+ self._security_group['name'],
+ self._security_group['description'],
+ )
+ self.assertIsNone(result)
+
+ def test_security_group_set_all_options(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ new_name = 'new-' + self._security_group['name']
+ new_description = 'new-' + self._security_group['description']
+ arglist = [
+ '--name', new_name,
+ '--description', new_description,
+ self._security_group['name'],
+ ]
+ verifylist = [
+ ('description', new_description),
+ ('group', self._security_group['name']),
+ ('name', new_name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ sg_mock.assert_called_once_with(
+ self._security_group,
+ new_name,
+ new_description
+ )
+ self.assertIsNone(result)
+
+
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_find'
+)
+class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security group rule to be shown with the group.
+ _security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ # The security group to be shown.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group(
+ attrs={'rules': [_security_group_rule._info]}
+ )
+
+ columns = (
+ 'description',
+ 'id',
+ 'name',
+ 'project_id',
+ 'rules',
+ )
+
+ data = (
+ _security_group['description'],
+ _security_group['id'],
+ _security_group['name'],
+ _security_group['tenant_id'],
+ security_group._format_compute_security_group_rules(
+ [_security_group_rule._info]),
+ )
+
+ def setUp(self):
+ super(TestShowSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Get the command object to test
+ self.cmd = security_group.ShowSecurityGroup(self.app, None)
+
+ def test_security_group_show_no_options(self, sg_mock):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_security_group_show_all_options(self, sg_mock):
+ sg_mock.return_value = self._security_group
+ arglist = [
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ sg_mock.assert_called_once_with(self._security_group['id'])
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_security_group.py b/openstackclient/tests/unit/network/v2/test_security_group_network.py
index 66d357f9..35b7e366 100644
--- a/openstackclient/tests/unit/network/v2/test_security_group.py
+++ b/openstackclient/tests/unit/network/v2/test_security_group_network.py
@@ -17,7 +17,6 @@ from mock import call
from osc_lib import exceptions
from openstackclient.network.v2 import security_group
-from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
from openstackclient.tests.unit import utils as tests_utils
@@ -36,15 +35,6 @@ class TestSecurityGroupNetwork(network_fakes.TestNetworkV2):
self.domains_mock = self.app.client_manager.identity.domains
-class TestSecurityGroupCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestSecurityGroupCompute, self).setUp()
-
- # Get a shortcut to the compute client
- self.compute = self.app.client_manager.compute
-
-
class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
project = identity_fakes.FakeProject.create_one_project()
@@ -129,90 +119,6 @@ class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
self.assertEqual(self.data, data)
-class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
-
- project = identity_fakes.FakeProject.create_one_project()
- domain = identity_fakes.FakeDomain.create_one_domain()
- # The security group to be shown.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- columns = (
- 'description',
- 'id',
- 'name',
- 'project_id',
- 'rules',
- )
-
- data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.tenant_id,
- '',
- )
-
- def setUp(self):
- super(TestCreateSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.create.return_value = self._security_group
-
- # Get the command object to test
- self.cmd = security_group.CreateSecurityGroup(self.app, None)
-
- def test_create_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_create_network_options(self):
- arglist = [
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self._security_group.name,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_min_options(self):
- arglist = [
- self._security_group.name,
- ]
- verifylist = [
- ('name', self._security_group.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.create.assert_called_once_with(
- self._security_group.name,
- self._security_group.name)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
- def test_create_all_options(self):
- arglist = [
- '--description', self._security_group.description,
- self._security_group.name,
- ]
- verifylist = [
- ('description', self._security_group.description),
- ('name', self._security_group.name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.create.assert_called_once_with(
- self._security_group.name,
- self._security_group.description)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
-
-
class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security groups to be deleted.
@@ -297,94 +203,6 @@ class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
)
-class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security groups to be deleted.
- _security_groups = \
- compute_fakes.FakeSecurityGroup.create_security_groups()
-
- def setUp(self):
- super(TestDeleteSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.delete = mock.Mock(return_value=None)
-
- self.compute.security_groups.get = (
- compute_fakes.FakeSecurityGroup.get_security_groups(
- self._security_groups)
- )
-
- # Get the command object to test
- self.cmd = security_group.DeleteSecurityGroup(self.app, None)
-
- def test_security_group_delete(self):
- arglist = [
- self._security_groups[0].id,
- ]
- verifylist = [
- ('group', [self._security_groups[0].id]),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.delete.assert_called_once_with(
- self._security_groups[0].id)
- self.assertIsNone(result)
-
- def test_multi_security_groups_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._security_groups:
- arglist.append(s.id)
- verifylist = [
- ('group', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._security_groups:
- calls.append(call(s.id))
- self.compute.security_groups.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_security_groups_delete_with_exception(self):
- arglist = [
- self._security_groups[0].id,
- 'unexist_security_group',
- ]
- verifylist = [
- ('group',
- [self._security_groups[0].id, 'unexist_security_group']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [self._security_groups[0], exceptions.CommandError]
- self.compute.security_groups.get = (
- mock.Mock(side_effect=find_mock_result)
- )
- self.compute.security_groups.find.side_effect = (
- exceptions.NotFound(None))
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 groups failed to delete.', str(e))
-
- self.compute.security_groups.get.assert_any_call(
- self._security_groups[0].id)
- self.compute.security_groups.get.assert_any_call(
- 'unexist_security_group')
- self.compute.security_groups.delete.assert_called_once_with(
- self._security_groups[0].id
- )
-
-
class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security group to be listed.
@@ -483,80 +301,6 @@ class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
self.assertEqual(self.data, list(data))
-class TestListSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security group to be listed.
- _security_groups = \
- compute_fakes.FakeSecurityGroup.create_security_groups(count=3)
-
- columns = (
- 'ID',
- 'Name',
- 'Description',
- )
- columns_all_projects = (
- 'ID',
- 'Name',
- 'Description',
- 'Project',
- )
-
- data = []
- for grp in _security_groups:
- data.append((
- grp.id,
- grp.name,
- grp.description,
- ))
- data_all_projects = []
- for grp in _security_groups:
- data_all_projects.append((
- grp.id,
- grp.name,
- grp.description,
- grp.tenant_id,
- ))
-
- def setUp(self):
- super(TestListSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.list.return_value = self._security_groups
-
- # Get the command object to test
- self.cmd = security_group.ListSecurityGroup(self.app, None)
-
- def test_security_group_list_no_options(self):
- arglist = []
- verifylist = [
- ('all_projects', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- kwargs = {'search_opts': {'all_tenants': False}}
- self.compute.security_groups.list.assert_called_once_with(**kwargs)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, list(data))
-
- def test_security_group_list_all_projects(self):
- arglist = [
- '--all-projects',
- ]
- verifylist = [
- ('all_projects', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- kwargs = {'search_opts': {'all_tenants': True}}
- self.compute.security_groups.list.assert_called_once_with(**kwargs)
- self.assertEqual(self.columns_all_projects, columns)
- self.assertEqual(self.data_all_projects, list(data))
-
-
class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security group to be set.
@@ -623,72 +367,6 @@ class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
self.assertIsNone(result)
-class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security group to be set.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- def setUp(self):
- super(TestSetSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.update = mock.Mock(return_value=None)
-
- self.compute.security_groups.get = mock.Mock(
- return_value=self._security_group)
-
- # Get the command object to test
- self.cmd = security_group.SetSecurityGroup(self.app, None)
-
- def test_set_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_set_no_updates(self):
- arglist = [
- self._security_group.name,
- ]
- verifylist = [
- ('group', self._security_group.name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.update.assert_called_once_with(
- self._security_group,
- self._security_group.name,
- self._security_group.description
- )
- self.assertIsNone(result)
-
- def test_set_all_options(self):
- new_name = 'new-' + self._security_group.name
- new_description = 'new-' + self._security_group.description
- arglist = [
- '--name', new_name,
- '--description', new_description,
- self._security_group.name,
- ]
- verifylist = [
- ('description', new_description),
- ('group', self._security_group.name),
- ('name', new_name),
- ]
-
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.update.assert_called_once_with(
- self._security_group,
- new_name,
- new_description
- )
- self.assertIsNone(result)
-
-
class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security group rule to be shown with the group.
@@ -746,63 +424,3 @@ class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
self._security_group.id, ignore_missing=False)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
-
-
-class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
-
- # The security group rule to be shown with the group.
- _security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
-
- # The security group to be shown.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group(
- attrs={'rules': [_security_group_rule._info]}
- )
-
- columns = (
- 'description',
- 'id',
- 'name',
- 'project_id',
- 'rules',
- )
-
- data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.tenant_id,
- security_group._format_compute_security_group_rules(
- [_security_group_rule._info]),
- )
-
- def setUp(self):
- super(TestShowSecurityGroupCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.get.return_value = self._security_group
-
- # Get the command object to test
- self.cmd = security_group.ShowSecurityGroup(self.app, None)
-
- def test_show_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_show_all_options(self):
- arglist = [
- self._security_group.id,
- ]
- verifylist = [
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.get.assert_called_once_with(
- self._security_group.id)
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py
new file mode 100644
index 00000000..06a7a25d
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py
@@ -0,0 +1,572 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network import utils as network_utils
+from openstackclient.network.v2 import security_group_rule
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestSecurityGroupRuleCompute, self).setUp()
+
+ # Get a shortcut to the network client
+ self.compute = self.app.client_manager.compute
+
+
+class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ project = identity_fakes.FakeProject.create_one_project()
+ domain = identity_fakes.FakeDomain.create_one_domain()
+
+ # The security group rule to be created.
+ _security_group_rule = None
+
+ # The security group that will contain the rule created.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ def _setup_security_group_rule(self, attrs=None):
+ self._security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
+ attrs)
+ self.compute.security_group_rules.create.return_value = \
+ self._security_group_rule
+ expected_columns, expected_data = \
+ security_group_rule._format_security_group_rule_show(
+ self._security_group_rule._info)
+ return expected_columns, expected_data
+
+ def setUp(self):
+ super(TestCreateSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.api.security_group_find = mock.Mock(
+ return_value=self._security_group,
+ )
+
+ # Get the command object to test
+ self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
+
+ def test_security_group_rule_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_security_group_rule_create_all_source_options(self):
+ arglist = [
+ '--src-ip', '10.10.0.0/24',
+ '--src-group', self._security_group['id'],
+ self._security_group['id'],
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_security_group_rule_create_all_remote_options(self):
+ arglist = [
+ '--remote-ip', '10.10.0.0/24',
+ '--remote-group', self._security_group['id'],
+ self._security_group['id'],
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_security_group_rule_create_bad_protocol(self):
+ arglist = [
+ '--protocol', 'foo',
+ self._security_group['id'],
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_security_group_rule_create_all_protocol_options(self):
+ arglist = [
+ '--protocol', 'tcp',
+ '--proto', 'tcp',
+ self._security_group['id'],
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_security_group_rule_create_network_options(self):
+ arglist = [
+ '--ingress',
+ '--ethertype', 'IPv4',
+ '--icmp-type', '3',
+ '--icmp-code', '11',
+ '--project', self.project.name,
+ '--project-domain', self.domain.name,
+ self._security_group['id'],
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_security_group_rule_create_default_rule(self):
+ expected_columns, expected_data = self._setup_security_group_rule()
+ dst_port = str(self._security_group_rule.from_port) + ':' + \
+ str(self._security_group_rule.to_port)
+ arglist = [
+ '--dst-port', dst_port,
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group['id'],
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_security_group_rule_create_source_group(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'from_port': 22,
+ 'to_port': 22,
+ 'group': {'name': self._security_group['name']},
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.from_port),
+ '--src-group', self._security_group['name'],
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('src_group', self._security_group['name']),
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group['id'],
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ self._security_group['id'],
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_security_group_rule_create_remote_group(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'from_port': 22,
+ 'to_port': 22,
+ 'group': {'name': self._security_group['name']},
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.from_port),
+ '--remote-group', self._security_group['name'],
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('remote_group', self._security_group['name']),
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group['id'],
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ self._security_group['id'],
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_security_group_rule_create_source_ip(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
+ '--protocol', self._security_group_rule.ip_protocol,
+ '--src-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('protocol', self._security_group_rule.ip_protocol),
+ ('src_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group['id'],
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_security_group_rule_create_remote_ip(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
+ '--protocol', self._security_group_rule.ip_protocol,
+ '--remote-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('protocol', self._security_group_rule.ip_protocol),
+ ('remote_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group['id'],
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_security_group_rule_create_proto_option(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
+ '--proto', self._security_group_rule.ip_protocol,
+ '--src-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('proto', self._security_group_rule.ip_protocol),
+ ('protocol', None),
+ ('src_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group['id'],
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+
+class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group rule to be deleted.
+ _security_group_rules = \
+ compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
+ count=2)
+
+ def setUp(self):
+ super(TestDeleteSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Get the command object to test
+ self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
+
+ def test_security_group_rule_delete(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ ]
+ verifylist = [
+ ('rule', [self._security_group_rules[0].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.delete.assert_called_once_with(
+ self._security_group_rules[0].id)
+ self.assertIsNone(result)
+
+ def test_security_group_rule_multi_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_group_rules:
+ arglist.append(s.id)
+ verifylist = [
+ ('rule', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_group_rules:
+ calls.append(call(s.id))
+ self.compute.security_group_rules.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_security_group_rule_multi_delete_with_exception(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ 'unexist_rule',
+ ]
+ verifylist = [
+ ('rule',
+ [self._security_group_rules[0].id, 'unexist_rule']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [None, exceptions.CommandError]
+ self.compute.security_group_rules.delete = (
+ mock.Mock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 rules failed to delete.', str(e))
+
+ self.compute.security_group_rules.delete.assert_any_call(
+ self._security_group_rules[0].id)
+ self.compute.security_group_rules.delete.assert_any_call(
+ 'unexist_rule')
+
+
+class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group to hold the rules.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ # The security group rule to be listed.
+ _security_group_rule_tcp = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
+ 'ip_protocol': 'tcp',
+ 'from_port': 80,
+ 'to_port': 80,
+ 'group': {'name': _security_group['name']},
+ })
+ _security_group_rule_icmp = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ 'group': {'name': _security_group['name']},
+ })
+ _security_group['rules'] = [
+ _security_group_rule_tcp._info,
+ _security_group_rule_icmp._info,
+ ]
+
+ expected_columns_with_group = (
+ 'ID',
+ 'IP Protocol',
+ 'IP Range',
+ 'Port Range',
+ 'Remote Security Group',
+ )
+ expected_columns_no_group = \
+ expected_columns_with_group + ('Security Group',)
+
+ expected_data_with_group = []
+ expected_data_no_group = []
+ for _security_group_rule in _security_group['rules']:
+ rule = network_utils.transform_compute_security_group_rule(
+ _security_group_rule
+ )
+ expected_rule_with_group = (
+ rule['id'],
+ rule['ip_protocol'],
+ rule['ip_range'],
+ rule['port_range'],
+ rule['remote_security_group'],
+ )
+ expected_rule_no_group = expected_rule_with_group + \
+ (_security_group_rule['parent_group_id'],)
+ expected_data_with_group.append(expected_rule_with_group)
+ expected_data_no_group.append(expected_rule_no_group)
+
+ def setUp(self):
+ super(TestListSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.api.security_group_find = mock.Mock(
+ return_value=self._security_group,
+ )
+ self.compute.api.security_group_list = mock.Mock(
+ return_value=[self._security_group],
+ )
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
+
+ def test_security_group_rule_list_default(self):
+ parsed_args = self.check_parser(self.cmd, [], [])
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.api.security_group_list.assert_called_once_with(
+ search_opts={'all_tenants': False}
+ )
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+ def test_security_group_rule_list_with_group(self):
+ arglist = [
+ self._security_group['id'],
+ ]
+ verifylist = [
+ ('group', self._security_group['id']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.api.security_group_find.assert_called_once_with(
+ self._security_group['id']
+ )
+ self.assertEqual(self.expected_columns_with_group, columns)
+ self.assertEqual(self.expected_data_with_group, list(data))
+
+ def test_security_group_rule_list_all_projects(self):
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.api.security_group_list.assert_called_once_with(
+ search_opts={'all_tenants': True}
+ )
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+ def test_security_group_rule_list_with_ignored_options(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.api.security_group_list.assert_called_once_with(
+ search_opts={'all_tenants': False}
+ )
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+
+class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group rule to be shown.
+ _security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ columns, data = \
+ security_group_rule._format_security_group_rule_show(
+ _security_group_rule._info)
+
+ def setUp(self):
+ super(TestShowSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Build a security group fake customized for this test.
+ security_group_rules = [self._security_group_rule._info]
+ security_group = {'rules': security_group_rules}
+ self.compute.api.security_group_list = mock.Mock(
+ return_value=[security_group],
+ )
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
+
+ def test_security_group_rule_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_security_group_rule_show_all_options(self):
+ arglist = [
+ self._security_group_rule.id,
+ ]
+ verifylist = [
+ ('rule', self._security_group_rule.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.api.security_group_list.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py
index e3538d5f..5d9d03e9 100644
--- a/openstackclient/tests/unit/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py
@@ -11,16 +11,12 @@
# under the License.
#
-import copy
import mock
from mock import call
from osc_lib import exceptions
-from openstackclient.network import utils as network_utils
from openstackclient.network.v2 import security_group_rule
-from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
-from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
from openstackclient.tests.unit import utils as tests_utils
@@ -39,15 +35,6 @@ class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
self.domains_mock = self.app.client_manager.identity.domains
-class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
-
- def setUp(self):
- super(TestSecurityGroupRuleCompute, self).setUp()
-
- # Get a shortcut to the network client
- self.compute = self.app.client_manager.compute
-
-
class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
project = identity_fakes.FakeProject.create_one_project()
@@ -548,280 +535,6 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
self.assertEqual(self.expected_data, data)
-class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- project = identity_fakes.FakeProject.create_one_project()
- domain = identity_fakes.FakeDomain.create_one_domain()
- # The security group rule to be created.
- _security_group_rule = None
-
- # The security group that will contain the rule created.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- def _setup_security_group_rule(self, attrs=None):
- self._security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
- attrs)
- self.compute.security_group_rules.create.return_value = \
- self._security_group_rule
- expected_columns, expected_data = \
- security_group_rule._format_security_group_rule_show(
- self._security_group_rule._info)
- return expected_columns, expected_data
-
- def setUp(self):
- super(TestCreateSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.get.return_value = self._security_group
-
- # Get the command object to test
- self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
-
- def test_create_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_create_all_source_options(self):
- arglist = [
- '--src-ip', '10.10.0.0/24',
- '--src-group', self._security_group.id,
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_all_remote_options(self):
- arglist = [
- '--remote-ip', '10.10.0.0/24',
- '--remote-group', self._security_group.id,
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_bad_protocol(self):
- arglist = [
- '--protocol', 'foo',
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_all_protocol_options(self):
- arglist = [
- '--protocol', 'tcp',
- '--proto', 'tcp',
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_network_options(self):
- arglist = [
- '--ingress',
- '--ethertype', 'IPv4',
- '--icmp-type', '3',
- '--icmp-code', '11',
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self._security_group.id,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_default_rule(self):
- expected_columns, expected_data = self._setup_security_group_rule()
- dst_port = str(self._security_group_rule.from_port) + ':' + \
- str(self._security_group_rule.to_port)
- arglist = [
- '--dst-port', dst_port,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.from_port,
- self._security_group_rule.to_port)),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_source_group(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'from_port': 22,
- 'to_port': 22,
- 'group': {'name': self._security_group.name},
- })
- arglist = [
- '--dst-port', str(self._security_group_rule.from_port),
- '--src-group', self._security_group.name,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.from_port,
- self._security_group_rule.to_port)),
- ('src_group', self._security_group.name),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_remote_group(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'from_port': 22,
- 'to_port': 22,
- 'group': {'name': self._security_group.name},
- })
- arglist = [
- '--dst-port', str(self._security_group_rule.from_port),
- '--remote-group', self._security_group.name,
- self._security_group.id,
- ]
- verifylist = [
- ('dst_port', (self._security_group_rule.from_port,
- self._security_group_rule.to_port)),
- ('remote_group', self._security_group.name),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_source_ip(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- })
- arglist = [
- '--protocol', self._security_group_rule.ip_protocol,
- '--src-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- ]
- verifylist = [
- ('protocol', self._security_group_rule.ip_protocol),
- ('src_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_remote_ip(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- })
- arglist = [
- '--protocol', self._security_group_rule.ip_protocol,
- '--remote-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- ]
- verifylist = [
- ('protocol', self._security_group_rule.ip_protocol),
- ('remote_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
- def test_create_proto_option(self):
- expected_columns, expected_data = self._setup_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- })
- arglist = [
- '--proto', self._security_group_rule.ip_protocol,
- '--src-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
- ]
- verifylist = [
- ('proto', self._security_group_rule.ip_protocol),
- ('protocol', None),
- ('src_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
- self._security_group_rule.ip_protocol,
- self._security_group_rule.from_port,
- self._security_group_rule.to_port,
- self._security_group_rule.ip_range['cidr'],
- None,
- )
- self.assertEqual(expected_columns, columns)
- self.assertEqual(expected_data, data)
-
-
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rules to be deleted.
@@ -909,83 +622,6 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
)
-class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- # The security group rule to be deleted.
- _security_group_rules = \
- compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
- count=2)
-
- def setUp(self):
- super(TestDeleteSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Get the command object to test
- self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
-
- def test_security_group_rule_delete(self):
- arglist = [
- self._security_group_rules[0].id,
- ]
- verifylist = [
- ('rule', [self._security_group_rules[0].id]),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- self.compute.security_group_rules.delete.assert_called_once_with(
- self._security_group_rules[0].id)
- self.assertIsNone(result)
-
- def test_multi_security_group_rules_delete(self):
- arglist = []
- verifylist = []
-
- for s in self._security_group_rules:
- arglist.append(s.id)
- verifylist = [
- ('rule', arglist),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- result = self.cmd.take_action(parsed_args)
-
- calls = []
- for s in self._security_group_rules:
- calls.append(call(s.id))
- self.compute.security_group_rules.delete.assert_has_calls(calls)
- self.assertIsNone(result)
-
- def test_multi_security_group_rules_delete_with_exception(self):
- arglist = [
- self._security_group_rules[0].id,
- 'unexist_rule',
- ]
- verifylist = [
- ('rule',
- [self._security_group_rules[0].id, 'unexist_rule']),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- find_mock_result = [None, exceptions.CommandError]
- self.compute.security_group_rules.delete = (
- mock.Mock(side_effect=find_mock_result)
- )
-
- try:
- self.cmd.take_action(parsed_args)
- self.fail('CommandError should be raised.')
- except exceptions.CommandError as e:
- self.assertEqual('1 of 2 rules failed to delete.', str(e))
-
- self.compute.security_group_rules.delete.assert_any_call(
- self._security_group_rules[0].id)
- self.compute.security_group_rules.delete.assert_any_call(
- 'unexist_rule')
-
-
class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group to hold the rules.
@@ -1165,131 +801,6 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
self.assertEqual(self.expected_data_no_group, list(data))
-class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- # The security group to hold the rules.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
-
- # The security group rule to be listed.
- _security_group_rule_tcp = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
- 'ip_protocol': 'tcp',
- 'from_port': 80,
- 'to_port': 80,
- 'group': {'name': _security_group.name},
- })
- _security_group_rule_icmp = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'ip_range': {'cidr': '10.0.2.0/24'},
- 'group': {'name': _security_group.name},
- })
- _security_group.rules = [_security_group_rule_tcp._info,
- _security_group_rule_icmp._info]
-
- expected_columns_with_group = (
- 'ID',
- 'IP Protocol',
- 'IP Range',
- 'Port Range',
- 'Remote Security Group',
- )
- expected_columns_no_group = \
- expected_columns_with_group + ('Security Group',)
-
- expected_data_with_group = []
- expected_data_no_group = []
- for _security_group_rule in _security_group.rules:
- rule = network_utils.transform_compute_security_group_rule(
- _security_group_rule
- )
- expected_rule_with_group = (
- rule['id'],
- rule['ip_protocol'],
- rule['ip_range'],
- rule['port_range'],
- rule['remote_security_group'],
- )
- expected_rule_no_group = expected_rule_with_group + \
- (_security_group_rule['parent_group_id'],)
- expected_data_with_group.append(expected_rule_with_group)
- expected_data_no_group.append(expected_rule_no_group)
-
- def setUp(self):
- super(TestListSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- self.compute.security_groups.get.return_value = \
- self._security_group
- self.compute.security_groups.list.return_value = \
- [self._security_group]
-
- # Get the command object to test
- self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
-
- def test_list_default(self):
- parsed_args = self.check_parser(self.cmd, [], [])
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
- search_opts={'all_tenants': False}
- )
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
- def test_list_with_group(self):
- arglist = [
- self._security_group.id,
- ]
- verifylist = [
- ('group', self._security_group.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.get.assert_called_once_with(
- self._security_group.id
- )
- self.assertEqual(self.expected_columns_with_group, columns)
- self.assertEqual(self.expected_data_with_group, list(data))
-
- def test_list_all_projects(self):
- arglist = [
- '--all-projects',
- ]
- verifylist = [
- ('all_projects', True),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
- search_opts={'all_tenants': True}
- )
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
- def test_list_with_ignored_options(self):
- arglist = [
- '--long',
- ]
- verifylist = [
- ('long', False),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
- search_opts={'all_tenants': False}
- )
- self.assertEqual(self.expected_columns_no_group, columns)
- self.assertEqual(self.expected_data_no_group, list(data))
-
-
class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rule to be shown.
@@ -1353,49 +864,3 @@ class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
self._security_group_rule.id, ignore_missing=False)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
-
-
-class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
-
- # The security group rule to be shown.
- _security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
-
- columns, data = \
- security_group_rule._format_security_group_rule_show(
- _security_group_rule._info)
-
- def setUp(self):
- super(TestShowSecurityGroupRuleCompute, self).setUp()
-
- self.app.client_manager.network_endpoint_enabled = False
-
- # Build a security group fake customized for this test.
- security_group_rules = [self._security_group_rule._info]
- security_group = fakes.FakeResource(
- info=copy.deepcopy({'rules': security_group_rules}),
- loaded=True)
- security_group.rules = security_group_rules
- self.compute.security_groups.list.return_value = [security_group]
-
- # Get the command object to test
- self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
-
- def test_show_no_options(self):
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, [], [])
-
- def test_show_all_options(self):
- arglist = [
- self._security_group_rule.id,
- ]
- verifylist = [
- ('rule', self._security_group_rule.id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- columns, data = self.cmd.take_action(parsed_args)
-
- self.compute.security_groups.list.assert_called_once_with()
- self.assertEqual(self.columns, columns)
- self.assertEqual(self.data, data)