diff options
author | Doug Hellmann <doug@doughellmann.com> | 2015-07-13 16:11:47 +0000 |
---|---|---|
committer | Doug Hellmann <doug@doughellmann.com> | 2015-07-15 18:20:18 +0000 |
commit | 6bd180efc305e0078249e8982b2d21055851b28a (patch) | |
tree | 7e3ad9fc27157ec57acc8a0cc813e6826c3f5c4f | |
parent | 6c4509587a9494a015c4b482429ae3268d33864d (diff) | |
download | oslo-utils-6bd180efc305e0078249e8982b2d21055851b28a.tar.gz |
Remove oslo namespace package
Blueprint remove-namespace-packages
Depends-on: I4e1cb4f1caaee61683dc9fb0ffc67e8f9e06dc6d
for openstack/barbican
Depends-on: I52848a1f63cb4a1b30d9e1c32f4bb18c5d9fe80e
for openstack/castellan
Depends-on: Ia21c15e8eca6bf456f7cfe13f815f5ce068601e7
for openstack/designate
Depends-on: I695d2141c00a5ee36e042efbb9bac4e2803c1948
for openstack/ironic
Depends-on: I779d8df5872887d5d8ec44012d792b59b0cf4f64
for openstack/ironic-lib
Depends-on: I977248bec9560bcd730efe6bed741f1dc025fdf3
for openstack/keystonemiddleware
Depends-on: I75e6e15d50ef9830d0581efd4cbcceb3e626f7b7
for openstack/manila
Depends-on: I975592f3694be42d52685ebf606f6d3012caf1a8
for openstack/murano
Depends-on: Icbd2ae016553b5e5c886205e87acf38f3ccec550
for openstack/murano-dashboard
Depends-on: If8a132de65ba1e57ea93f98daac66816a3cefaa8
for openstack/neutron
Depends-on: I9891564317aa3cf84874debdc966e48fb270455e
for openstack/os-brick
Depends-on: Ie11a5d51a6471792e122942dfc9063a0d9d38399
for openstack/python-barbicanclient
Depends-on: I5d325e9397404211df00d9e39195b63d9a2f2e76
for openstack/python-muranoclient
Depends-on: I474dcd9de166b00d02b3586858a28797e97a3231
for openstack/python-neutronclient
Depends-on: I5e4746b58e11bf56957f10517d484c173335dfc9
for openstack/sahara
Change-Id: I9ef9506873605661f394476c96e4a3bbdb85fbed
-rw-r--r-- | oslo/__init__.py | 15 | ||||
-rw-r--r-- | oslo/utils/__init__.py | 26 | ||||
-rw-r--r-- | oslo/utils/encodeutils.py | 13 | ||||
-rw-r--r-- | oslo/utils/excutils.py | 13 | ||||
-rw-r--r-- | oslo/utils/importutils.py | 13 | ||||
-rw-r--r-- | oslo/utils/netutils.py | 15 | ||||
-rw-r--r-- | oslo/utils/reflection.py | 13 | ||||
-rw-r--r-- | oslo/utils/strutils.py | 13 | ||||
-rw-r--r-- | oslo/utils/timeutils.py | 13 | ||||
-rw-r--r-- | oslo/utils/units.py | 13 | ||||
-rw-r--r-- | oslo/utils/uuidutils.py | 13 | ||||
-rw-r--r-- | setup.cfg | 4 | ||||
-rw-r--r-- | tests/__init__.py | 13 | ||||
-rw-r--r-- | tests/base.py | 55 | ||||
-rw-r--r-- | tests/test_excutils.py | 197 | ||||
-rw-r--r-- | tests/test_importutils.py | 121 | ||||
-rw-r--r-- | tests/test_netutils.py | 204 | ||||
-rw-r--r-- | tests/test_reflection.py | 279 | ||||
-rw-r--r-- | tests/test_strutils.py | 594 | ||||
-rw-r--r-- | tests/test_timeutils.py | 359 | ||||
-rw-r--r-- | tests/test_uuidutils.py | 54 | ||||
-rw-r--r-- | tests/test_warning.py | 61 | ||||
-rw-r--r-- | tests/tests_encodeutils.py | 105 |
23 files changed, 0 insertions, 2206 deletions
diff --git a/oslo/__init__.py b/oslo/__init__.py deleted file mode 100644 index 594bc16..0000000 --- a/oslo/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -__import__('pkg_resources').declare_namespace(__name__) diff --git a/oslo/utils/__init__.py b/oslo/utils/__init__.py deleted file mode 100644 index 73e54f3..0000000 --- a/oslo/utils/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import warnings - - -def deprecated(): - new_name = __name__.replace('.', '_') - warnings.warn( - ('The oslo namespace package is deprecated. Please use %s instead.' % - new_name), - DeprecationWarning, - stacklevel=3, - ) - - -deprecated() diff --git a/oslo/utils/encodeutils.py b/oslo/utils/encodeutils.py deleted file mode 100644 index 0de2476..0000000 --- a/oslo/utils/encodeutils.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.encodeutils import * # noqa diff --git a/oslo/utils/excutils.py b/oslo/utils/excutils.py deleted file mode 100644 index 333c955..0000000 --- a/oslo/utils/excutils.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.excutils import * # noqa diff --git a/oslo/utils/importutils.py b/oslo/utils/importutils.py deleted file mode 100644 index 9097dc4..0000000 --- a/oslo/utils/importutils.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.importutils import * # noqa diff --git a/oslo/utils/netutils.py b/oslo/utils/netutils.py deleted file mode 100644 index 701224d..0000000 --- a/oslo/utils/netutils.py +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.netutils import * # noqa -# NOTE(dhellmann): Needed for taskflow. -from oslo_utils.netutils import _ModifiedSplitResult # noqa diff --git a/oslo/utils/reflection.py b/oslo/utils/reflection.py deleted file mode 100644 index f8b5c16..0000000 --- a/oslo/utils/reflection.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.reflection import * # noqa diff --git a/oslo/utils/strutils.py b/oslo/utils/strutils.py deleted file mode 100644 index 989e088..0000000 --- a/oslo/utils/strutils.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.strutils import * # noqa diff --git a/oslo/utils/timeutils.py b/oslo/utils/timeutils.py deleted file mode 100644 index 65b856a..0000000 --- a/oslo/utils/timeutils.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.timeutils import * # noqa diff --git a/oslo/utils/units.py b/oslo/utils/units.py deleted file mode 100644 index 741e740..0000000 --- a/oslo/utils/units.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.units import * # noqa diff --git a/oslo/utils/uuidutils.py b/oslo/utils/uuidutils.py deleted file mode 100644 index 4f98a84..0000000 --- a/oslo/utils/uuidutils.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils.uuidutils import * # noqa @@ -21,11 +21,7 @@ classifier = [files] packages = - oslo - oslo.utils oslo_utils -namespace_packages = - oslo [pbr] warnerrors = true diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index 19f5e72..0000000 --- a/tests/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/tests/base.py b/tests/base.py deleted file mode 100644 index a3069ed..0000000 --- a/tests/base.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2010-2011 OpenStack Foundation -# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -import fixtures -import testtools - -_TRUE_VALUES = ('true', '1', 'yes') - -# FIXME(dhellmann) Update this to use oslo.test library - - -class TestCase(testtools.TestCase): - - """Test case base class for all unit tests.""" - - def setUp(self): - """Run before each test method to initialize test environment.""" - - super(TestCase, self).setUp() - test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) - try: - test_timeout = int(test_timeout) - except ValueError: - # If timeout value is invalid do not set a timeout. - test_timeout = 0 - if test_timeout > 0: - self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) - - self.useFixture(fixtures.NestedTempfile()) - self.useFixture(fixtures.TempHomeDir()) - - if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES: - stdout = self.useFixture(fixtures.StringStream('stdout')).stream - self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) - if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES: - stderr = self.useFixture(fixtures.StringStream('stderr')).stream - self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) - - self.log_fixture = self.useFixture(fixtures.FakeLogger()) diff --git a/tests/test_excutils.py b/tests/test_excutils.py deleted file mode 100644 index 3f35e79..0000000 --- a/tests/test_excutils.py +++ /dev/null @@ -1,197 +0,0 @@ -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import time - -import mock -from oslotest import base as test_base -from oslotest import moxstubout - -from oslo.utils import excutils - - -mox = moxstubout.mox - - -class SaveAndReraiseTest(test_base.BaseTestCase): - - def test_save_and_reraise_exception(self): - e = None - msg = 'foo' - try: - try: - raise Exception(msg) - except Exception: - with excutils.save_and_reraise_exception(): - pass - except Exception as _e: - e = _e - - self.assertEqual(str(e), msg) - - @mock.patch('logging.getLogger') - def test_save_and_reraise_exception_dropped(self, get_logger_mock): - logger = get_logger_mock() - e = None - msg = 'second exception' - try: - try: - raise Exception('dropped') - except Exception: - with excutils.save_and_reraise_exception(): - raise Exception(msg) - except Exception as _e: - e = _e - self.assertEqual(str(e), msg) - self.assertTrue(logger.error.called) - - def test_save_and_reraise_exception_no_reraise(self): - """Test that suppressing the reraise works.""" - try: - raise Exception('foo') - except Exception: - with excutils.save_and_reraise_exception() as ctxt: - ctxt.reraise = False - - @mock.patch('logging.getLogger') - def test_save_and_reraise_exception_dropped_no_reraise(self, - get_logger_mock): - logger = get_logger_mock() - e = None - msg = 'second exception' - try: - try: - raise Exception('dropped') - except Exception: - with excutils.save_and_reraise_exception(reraise=False): - raise Exception(msg) - except Exception as _e: - e = _e - self.assertEqual(str(e), msg) - self.assertFalse(logger.error.called) - - -class ForeverRetryUncaughtExceptionsTest(test_base.BaseTestCase): - - def setUp(self): - super(ForeverRetryUncaughtExceptionsTest, self).setUp() - moxfixture = self.useFixture(moxstubout.MoxStubout()) - self.mox = moxfixture.mox - self.stubs = moxfixture.stubs - - @excutils.forever_retry_uncaught_exceptions - def exception_generator(self): - exc = self.exception_to_raise() - while exc is not None: - raise exc - exc = self.exception_to_raise() - - def exception_to_raise(self): - return None - - def my_time_sleep(self, arg): - pass - - def exc_retrier_common_start(self): - self.stubs.Set(time, 'sleep', self.my_time_sleep) - self.mox.StubOutWithMock(logging, 'exception') - self.mox.StubOutWithMock(time, 'time', - use_mock_anything=True) - self.mox.StubOutWithMock(self, 'exception_to_raise') - - def exc_retrier_sequence(self, exc_id=None, timestamp=None, - exc_count=None): - self.exception_to_raise().AndReturn( - Exception('unexpected %d' % exc_id)) - time.time().AndReturn(timestamp) - if exc_count != 0: - logging.exception(mox.In( - 'Unexpected exception occurred %d time(s)' % exc_count)) - - def exc_retrier_common_end(self): - self.exception_to_raise().AndReturn(None) - self.mox.ReplayAll() - self.exception_generator() - self.addCleanup(self.stubs.UnsetAll) - - def test_exc_retrier_1exc_gives_1log(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - self.exc_retrier_common_end() - - def test_exc_retrier_same_10exc_1min_gives_1log(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - # By design, the following exception don't get logged because they - # are within the same minute. - for i in range(2, 11): - self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0) - self.exc_retrier_common_end() - - def test_exc_retrier_same_2exc_2min_gives_2logs(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1) - self.exc_retrier_common_end() - - def test_exc_retrier_same_10exc_2min_gives_2logs(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0) - # The previous 4 exceptions are counted here - self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5) - # Again, the following are not logged due to being within - # the same minute - self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0) - self.exc_retrier_common_end() - - def test_exc_retrier_mixed_4exc_1min_gives_2logs(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - # By design, this second 'unexpected 1' exception is not counted. This - # is likely a rare thing and is a sacrifice for code simplicity. - self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0) - self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1) - # Again, trailing exceptions within a minute are not counted. - self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0) - self.exc_retrier_common_end() - - def test_exc_retrier_mixed_4exc_2min_gives_2logs(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - # Again, this second exception of the same type is not counted - # for the sake of code simplicity. - self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0) - # The difference between this and the previous case is the log - # is also triggered by more than a minute expiring. - self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1) - self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0) - self.exc_retrier_common_end() - - def test_exc_retrier_mixed_4exc_2min_gives_3logs(self): - self.exc_retrier_common_start() - self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1) - # This time the second 'unexpected 1' exception is counted due - # to the same exception occurring same when the minute expires. - self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0) - self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2) - self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1) - self.exc_retrier_common_end() diff --git a/tests/test_importutils.py b/tests/test_importutils.py deleted file mode 100644 index c82b8fa..0000000 --- a/tests/test_importutils.py +++ /dev/null @@ -1,121 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import sys - -from oslotest import base as test_base - -from oslo.utils import importutils - - -class ImportUtilsTest(test_base.BaseTestCase): - - # NOTE(jkoelker) There has GOT to be a way to test this. But mocking - # __import__ is the devil. Right now we just make - # sure we can import something from the stdlib - def test_import_class(self): - dt = importutils.import_class('datetime.datetime') - self.assertEqual(sys.modules['datetime'].datetime, dt) - - def test_import_bad_class(self): - self.assertRaises(ImportError, importutils.import_class, - 'lol.u_mad.brah') - - def test_import_module(self): - dt = importutils.import_module('datetime') - self.assertEqual(sys.modules['datetime'], dt) - - def test_import_object_optional_arg_not_present(self): - obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver') - self.assertEqual(obj.__class__.__name__, 'FakeDriver') - - def test_import_object_optional_arg_present(self): - obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver', - first_arg=False) - self.assertEqual(obj.__class__.__name__, 'FakeDriver') - - def test_import_object_required_arg_not_present(self): - # arg 1 isn't optional here - self.assertRaises(TypeError, importutils.import_object, - 'oslo_utils.tests.fake.FakeDriver2') - - def test_import_object_required_arg_present(self): - obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2', - first_arg=False) - self.assertEqual(obj.__class__.__name__, 'FakeDriver2') - - # namespace tests - def test_import_object_ns_optional_arg_not_present(self): - obj = importutils.import_object_ns('oslo_utils', - 'tests.fake.FakeDriver') - self.assertEqual(obj.__class__.__name__, 'FakeDriver') - - def test_import_object_ns_optional_arg_present(self): - obj = importutils.import_object_ns('oslo_utils', - 'tests.fake.FakeDriver', - first_arg=False) - self.assertEqual(obj.__class__.__name__, 'FakeDriver') - - def test_import_object_ns_required_arg_not_present(self): - # arg 1 isn't optional here - self.assertRaises(TypeError, importutils.import_object_ns, - 'oslo_utils', 'tests.fake.FakeDriver2') - - def test_import_object_ns_required_arg_present(self): - obj = importutils.import_object_ns('oslo_utils', - 'tests.fake.FakeDriver2', - first_arg=False) - self.assertEqual(obj.__class__.__name__, 'FakeDriver2') - - # namespace tests - def test_import_object_ns_full_optional_arg_not_present(self): - obj = importutils.import_object_ns('tests2', - 'oslo_utils.tests.fake.FakeDriver') - self.assertEqual(obj.__class__.__name__, 'FakeDriver') - - def test_import_object_ns_full_optional_arg_present(self): - obj = importutils.import_object_ns('tests2', - 'oslo_utils.tests.fake.FakeDriver', - first_arg=False) - self.assertEqual(obj.__class__.__name__, 'FakeDriver') - - def test_import_object_ns_full_required_arg_not_present(self): - # arg 1 isn't optional here - self.assertRaises(TypeError, importutils.import_object_ns, - 'tests2', 'oslo_utils.tests.fake.FakeDriver2') - - def test_import_object_ns_full_required_arg_present(self): - obj = importutils.import_object_ns('tests2', - 'oslo_utils.tests.fake.FakeDriver2', - first_arg=False) - self.assertEqual(obj.__class__.__name__, 'FakeDriver2') - - def test_import_object(self): - dt = importutils.import_object('datetime.time') - self.assertTrue(isinstance(dt, sys.modules['datetime'].time)) - - def test_import_object_with_args(self): - dt = importutils.import_object('datetime.datetime', 2012, 4, 5) - self.assertTrue(isinstance(dt, sys.modules['datetime'].datetime)) - self.assertEqual(dt, datetime.datetime(2012, 4, 5)) - - def test_try_import(self): - dt = importutils.try_import('datetime') - self.assertEqual(sys.modules['datetime'], dt) - - def test_try_import_returns_default(self): - foo = importutils.try_import('foo.bar') - self.assertIsNone(foo) diff --git a/tests/test_netutils.py b/tests/test_netutils.py deleted file mode 100644 index 0aa59cf..0000000 --- a/tests/test_netutils.py +++ /dev/null @@ -1,204 +0,0 @@ -# Copyright 2012 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import socket - -import mock -from oslotest import base as test_base - -from oslo.utils import netutils - - -class NetworkUtilsTest(test_base.BaseTestCase): - - def test_no_host(self): - result = netutils.urlsplit('http://') - self.assertEqual('', result.netloc) - self.assertEqual(None, result.port) - self.assertEqual(None, result.hostname) - self.assertEqual('http', result.scheme) - - def test_parse_host_port(self): - self.assertEqual(('server01', 80), - netutils.parse_host_port('server01:80')) - self.assertEqual(('server01', None), - netutils.parse_host_port('server01')) - self.assertEqual(('server01', 1234), - netutils.parse_host_port('server01', - default_port=1234)) - self.assertEqual(('::1', 80), - netutils.parse_host_port('[::1]:80')) - self.assertEqual(('::1', None), - netutils.parse_host_port('[::1]')) - self.assertEqual(('::1', 1234), - netutils.parse_host_port('[::1]', - default_port=1234)) - self.assertEqual(('2001:db8:85a3::8a2e:370:7334', 1234), - netutils.parse_host_port( - '2001:db8:85a3::8a2e:370:7334', - default_port=1234)) - - def test_urlsplit(self): - result = netutils.urlsplit('rpc://myhost?someparam#somefragment') - self.assertEqual(result.scheme, 'rpc') - self.assertEqual(result.netloc, 'myhost') - self.assertEqual(result.path, '') - self.assertEqual(result.query, 'someparam') - self.assertEqual(result.fragment, 'somefragment') - - result = netutils.urlsplit( - 'rpc://myhost/mypath?someparam#somefragment', - allow_fragments=False) - self.assertEqual(result.scheme, 'rpc') - self.assertEqual(result.netloc, 'myhost') - self.assertEqual(result.path, '/mypath') - self.assertEqual(result.query, 'someparam#somefragment') - self.assertEqual(result.fragment, '') - - result = netutils.urlsplit( - 'rpc://user:pass@myhost/mypath?someparam#somefragment', - allow_fragments=False) - self.assertEqual(result.scheme, 'rpc') - self.assertEqual(result.netloc, 'user:pass@myhost') - self.assertEqual(result.path, '/mypath') - self.assertEqual(result.query, 'someparam#somefragment') - self.assertEqual(result.fragment, '') - - def test_urlsplit_ipv6(self): - ipv6_url = 'http://[::1]:443/v2.0/' - result = netutils.urlsplit(ipv6_url) - self.assertEqual(result.scheme, 'http') - self.assertEqual(result.netloc, '[::1]:443') - self.assertEqual(result.path, '/v2.0/') - self.assertEqual(result.hostname, '::1') - self.assertEqual(result.port, 443) - - ipv6_url = 'http://user:pass@[::1]/v2.0/' - result = netutils.urlsplit(ipv6_url) - self.assertEqual(result.scheme, 'http') - self.assertEqual(result.netloc, 'user:pass@[::1]') - self.assertEqual(result.path, '/v2.0/') - self.assertEqual(result.hostname, '::1') - self.assertEqual(result.port, None) - - ipv6_url = 'https://[2001:db8:85a3::8a2e:370:7334]:1234/v2.0/xy?ab#12' - result = netutils.urlsplit(ipv6_url) - self.assertEqual(result.scheme, 'https') - self.assertEqual(result.netloc, '[2001:db8:85a3::8a2e:370:7334]:1234') - self.assertEqual(result.path, '/v2.0/xy') - self.assertEqual(result.hostname, '2001:db8:85a3::8a2e:370:7334') - self.assertEqual(result.port, 1234) - self.assertEqual(result.query, 'ab') - self.assertEqual(result.fragment, '12') - - def test_urlsplit_params(self): - test_url = "http://localhost/?a=b&c=d" - result = netutils.urlsplit(test_url) - self.assertEqual({'a': 'b', 'c': 'd'}, result.params()) - self.assertEqual({'a': 'b', 'c': 'd'}, result.params(collapse=False)) - - test_url = "http://localhost/?a=b&a=c&a=d" - result = netutils.urlsplit(test_url) - self.assertEqual({'a': 'd'}, result.params()) - self.assertEqual({'a': ['b', 'c', 'd']}, result.params(collapse=False)) - - test_url = "http://localhost" - result = netutils.urlsplit(test_url) - self.assertEqual({}, result.params()) - - test_url = "http://localhost?" - result = netutils.urlsplit(test_url) - self.assertEqual({}, result.params()) - - def test_set_tcp_keepalive(self): - mock_sock = mock.Mock() - netutils.set_tcp_keepalive(mock_sock, True, 100, 10, 5) - calls = [ - mock.call.setsockopt(socket.SOL_SOCKET, - socket.SO_KEEPALIVE, True), - ] - if hasattr(socket, 'TCP_KEEPIDLE'): - calls += [ - mock.call.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPIDLE, 100) - ] - if hasattr(socket, 'TCP_KEEPINTVL'): - calls += [ - mock.call.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPINTVL, 10), - ] - if hasattr(socket, 'TCP_KEEPCNT'): - calls += [ - mock.call.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPCNT, 5) - ] - mock_sock.assert_has_calls(calls) - - mock_sock.reset_mock() - netutils.set_tcp_keepalive(mock_sock, False) - self.assertEqual(1, len(mock_sock.mock_calls)) - - def test_is_valid_ipv4(self): - self.assertTrue(netutils.is_valid_ipv4('42.42.42.42')) - - self.assertFalse(netutils.is_valid_ipv4('-1.11.11.11')) - - self.assertFalse(netutils.is_valid_ipv4('')) - - def test_is_valid_ipv6(self): - self.assertTrue(netutils.is_valid_ipv6('::1')) - - self.assertFalse(netutils.is_valid_ipv6( - '1fff::a88:85a3::172.31.128.1')) - - self.assertFalse(netutils.is_valid_ipv6('')) - - def test_is_valid_ip(self): - self.assertTrue(netutils.is_valid_ip('127.0.0.1')) - - self.assertTrue(netutils.is_valid_ip('2001:db8::ff00:42:8329')) - - self.assertFalse(netutils.is_valid_ip('256.0.0.0')) - - self.assertFalse(netutils.is_valid_ip('::1.2.3.')) - - self.assertFalse(netutils.is_valid_ip('')) - - def test_valid_port(self): - valid_inputs = [1, '1', 2, '3', '5', 8, 13, 21, - '80', '3246', '65535'] - for input_str in valid_inputs: - self.assertTrue(netutils.is_valid_port(input_str)) - - def test_valid_port_fail(self): - invalid_inputs = ['-32768', '0', 0, '65536', 528491, '528491', - '528.491', 'thirty-seven', None] - for input_str in invalid_inputs: - self.assertFalse(netutils.is_valid_port(input_str)) - - def test_get_my_ip(self): - sock_attrs = { - 'return_value.getsockname.return_value': ['1.2.3.4', '']} - with mock.patch('socket.socket', **sock_attrs): - addr = netutils.get_my_ipv4() - self.assertEqual(addr, '1.2.3.4') - - @mock.patch('socket.socket') - @mock.patch('oslo_utils.netutils._get_my_ipv4_address') - def test_get_my_ip_socket_error(self, ip, mock_socket): - mock_socket.side_effect = socket.error - ip.return_value = '1.2.3.4' - addr = netutils.get_my_ipv4() - self.assertEqual(addr, '1.2.3.4') diff --git a/tests/test_reflection.py b/tests/test_reflection.py deleted file mode 100644 index 24c31e1..0000000 --- a/tests/test_reflection.py +++ /dev/null @@ -1,279 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslotest import base as test_base -import six -import testtools - -from oslo.utils import reflection - - -if six.PY3: - RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception', - 'BaseException', 'object'] -else: - RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception', - 'BaseException', 'object'] - - -def dummy_decorator(f): - - @six.wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - - return wrapper - - -def mere_function(a, b): - pass - - -def function_with_defs(a, b, optional=None): - pass - - -def function_with_kwargs(a, b, **kwargs): - pass - - -class Class(object): - - def method(self, c, d): - pass - - @staticmethod - def static_method(e, f): - pass - - @classmethod - def class_method(cls, g, h): - pass - - -class CallableClass(object): - def __call__(self, i, j): - pass - - -class ClassWithInit(object): - def __init__(self, k, l): - pass - - -class CallbackEqualityTest(test_base.BaseTestCase): - def test_different_simple_callbacks(self): - - def a(): - pass - - def b(): - pass - - self.assertFalse(reflection.is_same_callback(a, b)) - - def test_static_instance_callbacks(self): - - class A(object): - - @staticmethod - def b(a, b, c): - pass - - a = A() - b = A() - - self.assertTrue(reflection.is_same_callback(a.b, b.b)) - - def test_different_instance_callbacks(self): - - class A(object): - def b(self): - pass - - def __eq__(self, other): - return True - - b = A() - c = A() - - self.assertFalse(reflection.is_same_callback(b.b, c.b)) - self.assertTrue(reflection.is_same_callback(b.b, c.b, strict=False)) - - -class GetCallableNameTest(test_base.BaseTestCase): - - def test_mere_function(self): - name = reflection.get_callable_name(mere_function) - self.assertEqual('.'.join((__name__, 'mere_function')), name) - - def test_method(self): - name = reflection.get_callable_name(Class.method) - self.assertEqual('.'.join((__name__, 'Class', 'method')), name) - - def test_instance_method(self): - name = reflection.get_callable_name(Class().method) - self.assertEqual('.'.join((__name__, 'Class', 'method')), name) - - def test_static_method(self): - name = reflection.get_callable_name(Class.static_method) - if six.PY3: - self.assertEqual('.'.join((__name__, 'Class', 'static_method')), - name) - else: - # NOTE(imelnikov): static method are just functions, class name - # is not recorded anywhere in them. - self.assertEqual('.'.join((__name__, 'static_method')), name) - - def test_class_method(self): - name = reflection.get_callable_name(Class.class_method) - self.assertEqual('.'.join((__name__, 'Class', 'class_method')), name) - - def test_constructor(self): - name = reflection.get_callable_name(Class) - self.assertEqual('.'.join((__name__, 'Class')), name) - - def test_callable_class(self): - name = reflection.get_callable_name(CallableClass()) - self.assertEqual('.'.join((__name__, 'CallableClass')), name) - - def test_callable_class_call(self): - name = reflection.get_callable_name(CallableClass().__call__) - self.assertEqual('.'.join((__name__, 'CallableClass', - '__call__')), name) - - -# These extended/special case tests only work on python 3, due to python 2 -# being broken/incorrect with regard to these special cases... -@testtools.skipIf(not six.PY3, 'python 3.x is not currently available') -class GetCallableNameTestExtended(test_base.BaseTestCase): - # Tests items in http://legacy.python.org/dev/peps/pep-3155/ - - class InnerCallableClass(object): - def __call__(self): - pass - - def test_inner_callable_class(self): - obj = self.InnerCallableClass() - name = reflection.get_callable_name(obj.__call__) - expected_name = '.'.join((__name__, 'GetCallableNameTestExtended', - 'InnerCallableClass', '__call__')) - self.assertEqual(expected_name, name) - - def test_inner_callable_function(self): - def a(): - - def b(): - pass - - return b - - name = reflection.get_callable_name(a()) - expected_name = '.'.join((__name__, 'GetCallableNameTestExtended', - 'test_inner_callable_function', '<locals>', - 'a', '<locals>', 'b')) - self.assertEqual(expected_name, name) - - def test_inner_class(self): - obj = self.InnerCallableClass() - name = reflection.get_callable_name(obj) - expected_name = '.'.join((__name__, - 'GetCallableNameTestExtended', - 'InnerCallableClass')) - self.assertEqual(expected_name, name) - - -class GetCallableArgsTest(test_base.BaseTestCase): - - def test_mere_function(self): - result = reflection.get_callable_args(mere_function) - self.assertEqual(['a', 'b'], result) - - def test_function_with_defaults(self): - result = reflection.get_callable_args(function_with_defs) - self.assertEqual(['a', 'b', 'optional'], result) - - def test_required_only(self): - result = reflection.get_callable_args(function_with_defs, - required_only=True) - self.assertEqual(['a', 'b'], result) - - def test_method(self): - result = reflection.get_callable_args(Class.method) - self.assertEqual(['self', 'c', 'd'], result) - - def test_instance_method(self): - result = reflection.get_callable_args(Class().method) - self.assertEqual(['c', 'd'], result) - - def test_class_method(self): - result = reflection.get_callable_args(Class.class_method) - self.assertEqual(['g', 'h'], result) - - def test_class_constructor(self): - result = reflection.get_callable_args(ClassWithInit) - self.assertEqual(['k', 'l'], result) - - def test_class_with_call(self): - result = reflection.get_callable_args(CallableClass()) - self.assertEqual(['i', 'j'], result) - - def test_decorators_work(self): - @dummy_decorator - def special_fun(x, y): - pass - result = reflection.get_callable_args(special_fun) - self.assertEqual(['x', 'y'], result) - - -class AcceptsKwargsTest(test_base.BaseTestCase): - - def test_no_kwargs(self): - self.assertEqual(False, reflection.accepts_kwargs(mere_function)) - - def test_with_kwargs(self): - self.assertEqual(True, reflection.accepts_kwargs(function_with_kwargs)) - - -class GetClassNameTest(test_base.BaseTestCase): - - def test_std_exception(self): - name = reflection.get_class_name(RuntimeError) - self.assertEqual('RuntimeError', name) - - def test_class(self): - name = reflection.get_class_name(Class) - self.assertEqual('.'.join((__name__, 'Class')), name) - - def test_instance(self): - name = reflection.get_class_name(Class()) - self.assertEqual('.'.join((__name__, 'Class')), name) - - def test_int(self): - name = reflection.get_class_name(42) - self.assertEqual('int', name) - - -class GetAllClassNamesTest(test_base.BaseTestCase): - - def test_std_class(self): - names = list(reflection.get_all_class_names(RuntimeError)) - self.assertEqual(RUNTIME_ERROR_CLASSES, names) - - def test_std_class_up_to(self): - names = list(reflection.get_all_class_names(RuntimeError, - up_to=Exception)) - self.assertEqual(RUNTIME_ERROR_CLASSES[:-2], names) diff --git a/tests/test_strutils.py b/tests/test_strutils.py deleted file mode 100644 index 1ac80f9..0000000 --- a/tests/test_strutils.py +++ /dev/null @@ -1,594 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import math - -import mock -from oslotest import base as test_base -import six -import testscenarios - -from oslo.utils import strutils -from oslo.utils import units - -load_tests = testscenarios.load_tests_apply_scenarios - - -class StrUtilsTest(test_base.BaseTestCase): - - def test_bool_bool_from_string(self): - self.assertTrue(strutils.bool_from_string(True)) - self.assertFalse(strutils.bool_from_string(False)) - - def test_bool_bool_from_string_default(self): - self.assertTrue(strutils.bool_from_string('', default=True)) - self.assertFalse(strutils.bool_from_string('wibble', default=False)) - - def _test_bool_from_string(self, c): - self.assertTrue(strutils.bool_from_string(c('true'))) - self.assertTrue(strutils.bool_from_string(c('TRUE'))) - self.assertTrue(strutils.bool_from_string(c('on'))) - self.assertTrue(strutils.bool_from_string(c('On'))) - self.assertTrue(strutils.bool_from_string(c('yes'))) - self.assertTrue(strutils.bool_from_string(c('YES'))) - self.assertTrue(strutils.bool_from_string(c('yEs'))) - self.assertTrue(strutils.bool_from_string(c('1'))) - self.assertTrue(strutils.bool_from_string(c('T'))) - self.assertTrue(strutils.bool_from_string(c('t'))) - self.assertTrue(strutils.bool_from_string(c('Y'))) - self.assertTrue(strutils.bool_from_string(c('y'))) - - self.assertFalse(strutils.bool_from_string(c('false'))) - self.assertFalse(strutils.bool_from_string(c('FALSE'))) - self.assertFalse(strutils.bool_from_string(c('off'))) - self.assertFalse(strutils.bool_from_string(c('OFF'))) - self.assertFalse(strutils.bool_from_string(c('no'))) - self.assertFalse(strutils.bool_from_string(c('0'))) - self.assertFalse(strutils.bool_from_string(c('42'))) - self.assertFalse(strutils.bool_from_string(c( - 'This should not be True'))) - self.assertFalse(strutils.bool_from_string(c('F'))) - self.assertFalse(strutils.bool_from_string(c('f'))) - self.assertFalse(strutils.bool_from_string(c('N'))) - self.assertFalse(strutils.bool_from_string(c('n'))) - - # Whitespace should be stripped - self.assertTrue(strutils.bool_from_string(c(' 1 '))) - self.assertTrue(strutils.bool_from_string(c(' true '))) - self.assertFalse(strutils.bool_from_string(c(' 0 '))) - self.assertFalse(strutils.bool_from_string(c(' false '))) - - def test_bool_from_string(self): - self._test_bool_from_string(lambda s: s) - - def test_unicode_bool_from_string(self): - self._test_bool_from_string(six.text_type) - self.assertFalse(strutils.bool_from_string(u'使用', strict=False)) - - exc = self.assertRaises(ValueError, strutils.bool_from_string, - u'使用', strict=True) - expected_msg = (u"Unrecognized value '使用', acceptable values are:" - u" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on'," - u" 't', 'true', 'y', 'yes'") - self.assertEqual(expected_msg, six.text_type(exc)) - - def test_other_bool_from_string(self): - self.assertFalse(strutils.bool_from_string(None)) - self.assertFalse(strutils.bool_from_string(mock.Mock())) - - def test_int_bool_from_string(self): - self.assertTrue(strutils.bool_from_string(1)) - - self.assertFalse(strutils.bool_from_string(-1)) - self.assertFalse(strutils.bool_from_string(0)) - self.assertFalse(strutils.bool_from_string(2)) - - def test_strict_bool_from_string(self): - # None isn't allowed in strict mode - exc = self.assertRaises(ValueError, strutils.bool_from_string, None, - strict=True) - expected_msg = ("Unrecognized value 'None', acceptable values are:" - " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on'," - " 't', 'true', 'y', 'yes'") - self.assertEqual(expected_msg, str(exc)) - - # Unrecognized strings aren't allowed - self.assertFalse(strutils.bool_from_string('Other', strict=False)) - exc = self.assertRaises(ValueError, strutils.bool_from_string, 'Other', - strict=True) - expected_msg = ("Unrecognized value 'Other', acceptable values are:" - " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on'," - " 't', 'true', 'y', 'yes'") - self.assertEqual(expected_msg, str(exc)) - - # Unrecognized numbers aren't allowed - exc = self.assertRaises(ValueError, strutils.bool_from_string, 2, - strict=True) - expected_msg = ("Unrecognized value '2', acceptable values are:" - " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on'," - " 't', 'true', 'y', 'yes'") - self.assertEqual(expected_msg, str(exc)) - - # False-like values are allowed - self.assertFalse(strutils.bool_from_string('f', strict=True)) - self.assertFalse(strutils.bool_from_string('false', strict=True)) - self.assertFalse(strutils.bool_from_string('off', strict=True)) - self.assertFalse(strutils.bool_from_string('n', strict=True)) - self.assertFalse(strutils.bool_from_string('no', strict=True)) - self.assertFalse(strutils.bool_from_string('0', strict=True)) - - self.assertTrue(strutils.bool_from_string('1', strict=True)) - - # Avoid font-similarity issues (one looks like lowercase-el, zero like - # oh, etc...) - for char in ('O', 'o', 'L', 'l', 'I', 'i'): - self.assertRaises(ValueError, strutils.bool_from_string, char, - strict=True) - - def test_int_from_bool_as_string(self): - self.assertEqual(1, strutils.int_from_bool_as_string(True)) - self.assertEqual(0, strutils.int_from_bool_as_string(False)) - - def test_slugify(self): - to_slug = strutils.to_slug - self.assertRaises(TypeError, to_slug, True) - self.assertEqual(six.u("hello"), to_slug("hello")) - self.assertEqual(six.u("two-words"), to_slug("Two Words")) - self.assertEqual(six.u("ma-any-spa-ce-es"), - to_slug("Ma-any\t spa--ce- es")) - self.assertEqual(six.u("excamation"), to_slug("exc!amation!")) - self.assertEqual(six.u("ampserand"), to_slug("&ser$and")) - self.assertEqual(six.u("ju5tnum8er"), to_slug("ju5tnum8er")) - self.assertEqual(six.u("strip-"), to_slug(" strip - ")) - self.assertEqual(six.u("perche"), to_slug(six.b("perch\xc3\xa9"))) - self.assertEqual(six.u("strange"), - to_slug("\x80strange", errors="ignore")) - - -class StringToBytesTest(test_base.BaseTestCase): - - _unit_system = [ - ('si', dict(unit_system='SI')), - ('iec', dict(unit_system='IEC')), - ('invalid_unit_system', dict(unit_system='KKK', assert_error=True)), - ] - - _sign = [ - ('no_sign', dict(sign='')), - ('positive', dict(sign='+')), - ('negative', dict(sign='-')), - ('invalid_sign', dict(sign='~', assert_error=True)), - ] - - _magnitude = [ - ('integer', dict(magnitude='79')), - ('decimal', dict(magnitude='7.9')), - ('decimal_point_start', dict(magnitude='.9')), - ('decimal_point_end', dict(magnitude='79.', assert_error=True)), - ('invalid_literal', dict(magnitude='7.9.9', assert_error=True)), - ('garbage_value', dict(magnitude='asdf', assert_error=True)), - ] - - _unit_prefix = [ - ('no_unit_prefix', dict(unit_prefix='')), - ('k', dict(unit_prefix='k')), - ('K', dict(unit_prefix='K')), - ('M', dict(unit_prefix='M')), - ('G', dict(unit_prefix='G')), - ('T', dict(unit_prefix='T')), - ('Ki', dict(unit_prefix='Ki')), - ('Mi', dict(unit_prefix='Mi')), - ('Gi', dict(unit_prefix='Gi')), - ('Ti', dict(unit_prefix='Ti')), - ('invalid_unit_prefix', dict(unit_prefix='B', assert_error=True)), - ] - - _unit_suffix = [ - ('b', dict(unit_suffix='b')), - ('bit', dict(unit_suffix='bit')), - ('B', dict(unit_suffix='B')), - ('invalid_unit_suffix', dict(unit_suffix='Kg', assert_error=True)), - ] - - _return_int = [ - ('return_dec', dict(return_int=False)), - ('return_int', dict(return_int=True)), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._unit_system, - cls._sign, - cls._magnitude, - cls._unit_prefix, - cls._unit_suffix, - cls._return_int) - - def test_string_to_bytes(self): - - def _get_quantity(sign, magnitude, unit_suffix): - res = float('%s%s' % (sign, magnitude)) - if unit_suffix in ['b', 'bit']: - res /= 8 - return res - - def _get_constant(unit_prefix, unit_system): - if not unit_prefix: - return 1 - elif unit_system == 'SI': - res = getattr(units, unit_prefix) - elif unit_system == 'IEC': - if unit_prefix.endswith('i'): - res = getattr(units, unit_prefix) - else: - res = getattr(units, '%si' % unit_prefix) - return res - - text = ''.join([self.sign, self.magnitude, self.unit_prefix, - self.unit_suffix]) - err_si = self.unit_system == 'SI' and (self.unit_prefix == 'K' or - self.unit_prefix.endswith('i')) - err_iec = self.unit_system == 'IEC' and self.unit_prefix == 'k' - if getattr(self, 'assert_error', False) or err_si or err_iec: - self.assertRaises(ValueError, strutils.string_to_bytes, - text, unit_system=self.unit_system, - return_int=self.return_int) - return - quantity = _get_quantity(self.sign, self.magnitude, self.unit_suffix) - constant = _get_constant(self.unit_prefix, self.unit_system) - expected = quantity * constant - actual = strutils.string_to_bytes(text, unit_system=self.unit_system, - return_int=self.return_int) - if self.return_int: - self.assertEqual(actual, int(math.ceil(expected))) - else: - self.assertAlmostEqual(actual, expected) - -StringToBytesTest.generate_scenarios() - - -class MaskPasswordTestCase(test_base.BaseTestCase): - - def test_json(self): - # Test 'adminPass' w/o spaces - payload = """{'adminPass':'mypassword'}""" - expected = """{'adminPass':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'adminPass' with spaces - payload = """{ 'adminPass' : 'mypassword' }""" - expected = """{ 'adminPass' : '***' }""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' w/o spaces - payload = """{'admin_pass':'mypassword'}""" - expected = """{'admin_pass':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' with spaces - payload = """{ 'admin_pass' : 'mypassword' }""" - expected = """{ 'admin_pass' : '***' }""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' w/o spaces - payload = """{'admin_password':'mypassword'}""" - expected = """{'admin_password':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' with spaces - payload = """{ 'admin_password' : 'mypassword' }""" - expected = """{ 'admin_password' : '***' }""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' w/o spaces - payload = """{'password':'mypassword'}""" - expected = """{'password':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' with spaces - payload = """{ 'password' : 'mypassword' }""" - expected = """{ 'password' : '***' }""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'auth_password' w/o spaces - payload = """{'auth_password':'mypassword'}""" - expected = """{'auth_password':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'auth_password' with spaces - payload = """{ 'auth_password' : 'mypassword' }""" - expected = """{ 'auth_password' : '***' }""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'secret_uuid' w/o spaces - payload = """{'secret_uuid':'myuuid'}""" - expected = """{'secret_uuid':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'secret_uuid' with spaces - payload = """{ 'secret_uuid' : 'myuuid' }""" - expected = """{ 'secret_uuid' : '***' }""" - self.assertEqual(expected, strutils.mask_password(payload)) - - def test_xml(self): - # Test 'adminPass' w/o spaces - payload = """<adminPass>mypassword</adminPass>""" - expected = """<adminPass>***</adminPass>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'adminPass' with spaces - payload = """<adminPass> - mypassword - </adminPass>""" - expected = """<adminPass>***</adminPass>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' w/o spaces - payload = """<admin_pass>mypassword</admin_pass>""" - expected = """<admin_pass>***</admin_pass>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' with spaces - payload = """<admin_pass> - mypassword - </admin_pass>""" - expected = """<admin_pass>***</admin_pass>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' w/o spaces - payload = """<admin_password>mypassword</admin_password>""" - expected = """<admin_password>***</admin_password>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' with spaces - payload = """<admin_password> - mypassword - </admin_password>""" - expected = """<admin_password>***</admin_password>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' w/o spaces - payload = """<password>mypassword</password>""" - expected = """<password>***</password>""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' with spaces - payload = """<password> - mypassword - </password>""" - expected = """<password>***</password>""" - self.assertEqual(expected, strutils.mask_password(payload)) - - def test_xml_attribute(self): - # Test 'adminPass' w/o spaces - payload = """adminPass='mypassword'""" - expected = """adminPass='***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'adminPass' with spaces - payload = """adminPass = 'mypassword'""" - expected = """adminPass = '***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'adminPass' with double quotes - payload = """adminPass = "mypassword\"""" - expected = """adminPass = "***\"""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' w/o spaces - payload = """admin_pass='mypassword'""" - expected = """admin_pass='***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' with spaces - payload = """admin_pass = 'mypassword'""" - expected = """admin_pass = '***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_pass' with double quotes - payload = """admin_pass = "mypassword\"""" - expected = """admin_pass = "***\"""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' w/o spaces - payload = """admin_password='mypassword'""" - expected = """admin_password='***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' with spaces - payload = """admin_password = 'mypassword'""" - expected = """admin_password = '***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'admin_password' with double quotes - payload = """admin_password = "mypassword\"""" - expected = """admin_password = "***\"""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' w/o spaces - payload = """password='mypassword'""" - expected = """password='***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' with spaces - payload = """password = 'mypassword'""" - expected = """password = '***'""" - self.assertEqual(expected, strutils.mask_password(payload)) - # Test 'password' with double quotes - payload = """password = "mypassword\"""" - expected = """password = "***\"""" - self.assertEqual(expected, strutils.mask_password(payload)) - - def test_json_message(self): - payload = """body: {"changePassword": {"adminPass": "1234567"}}""" - expected = """body: {"changePassword": {"adminPass": "***"}}""" - self.assertEqual(expected, strutils.mask_password(payload)) - payload = """body: {"rescue": {"admin_pass": "1234567"}}""" - expected = """body: {"rescue": {"admin_pass": "***"}}""" - self.assertEqual(expected, strutils.mask_password(payload)) - payload = """body: {"rescue": {"admin_password": "1234567"}}""" - expected = """body: {"rescue": {"admin_password": "***"}}""" - self.assertEqual(expected, strutils.mask_password(payload)) - payload = """body: {"rescue": {"password": "1234567"}}""" - expected = """body: {"rescue": {"password": "***"}}""" - self.assertEqual(expected, strutils.mask_password(payload)) - - def test_xml_message(self): - payload = """<?xml version="1.0" encoding="UTF-8"?> -<rebuild - xmlns="http://docs.openstack.org/compute/api/v1.1" - name="foobar" - imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7" - accessIPv4="1.2.3.4" - accessIPv6="fe80::100" - adminPass="seekr3t"> - <metadata> - <meta key="My Server Name">Apache1</meta> - </metadata> -</rebuild>""" - expected = """<?xml version="1.0" encoding="UTF-8"?> -<rebuild - xmlns="http://docs.openstack.org/compute/api/v1.1" - name="foobar" - imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7" - accessIPv4="1.2.3.4" - accessIPv6="fe80::100" - adminPass="***"> - <metadata> - <meta key="My Server Name">Apache1</meta> - </metadata> -</rebuild>""" - self.assertEqual(expected, strutils.mask_password(payload)) - payload = """<?xml version="1.0" encoding="UTF-8"?> -<rescue xmlns="http://docs.openstack.org/compute/api/v1.1" - admin_pass="MySecretPass"/>""" - expected = """<?xml version="1.0" encoding="UTF-8"?> -<rescue xmlns="http://docs.openstack.org/compute/api/v1.1" - admin_pass="***"/>""" - self.assertEqual(expected, strutils.mask_password(payload)) - payload = """<?xml version="1.0" encoding="UTF-8"?> -<rescue xmlns="http://docs.openstack.org/compute/api/v1.1" - admin_password="MySecretPass"/>""" - expected = """<?xml version="1.0" encoding="UTF-8"?> -<rescue xmlns="http://docs.openstack.org/compute/api/v1.1" - admin_password="***"/>""" - self.assertEqual(expected, strutils.mask_password(payload)) - payload = """<?xml version="1.0" encoding="UTF-8"?> -<rescue xmlns="http://docs.openstack.org/compute/api/v1.1" - password="MySecretPass"/>""" - expected = """<?xml version="1.0" encoding="UTF-8"?> -<rescue xmlns="http://docs.openstack.org/compute/api/v1.1" - password="***"/>""" - self.assertEqual(expected, strutils.mask_password(payload)) - - def test_mask_password(self): - payload = "test = 'password' : 'aaaaaa'" - expected = "test = 'password' : '111'" - self.assertEqual(expected, - strutils.mask_password(payload, secret='111')) - - payload = 'mysqld --password "aaaaaa"' - expected = 'mysqld --password "****"' - self.assertEqual(expected, - strutils.mask_password(payload, secret='****')) - - payload = 'mysqld --password aaaaaa' - expected = 'mysqld --password ???' - self.assertEqual(expected, - strutils.mask_password(payload, secret='???')) - - payload = 'mysqld --password = "aaaaaa"' - expected = 'mysqld --password = "****"' - self.assertEqual(expected, - strutils.mask_password(payload, secret='****')) - - payload = "mysqld --password = 'aaaaaa'" - expected = "mysqld --password = '****'" - self.assertEqual(expected, - strutils.mask_password(payload, secret='****')) - - payload = "mysqld --password = aaaaaa" - expected = "mysqld --password = ****" - self.assertEqual(expected, - strutils.mask_password(payload, secret='****')) - - payload = "test = password = aaaaaa" - expected = "test = password = 111" - self.assertEqual(expected, - strutils.mask_password(payload, secret='111')) - - payload = "test = password= aaaaaa" - expected = "test = password= 111" - self.assertEqual(expected, - strutils.mask_password(payload, secret='111')) - - payload = "test = password =aaaaaa" - expected = "test = password =111" - self.assertEqual(expected, - strutils.mask_password(payload, secret='111')) - - payload = "test = password=aaaaaa" - expected = "test = password=111" - self.assertEqual(expected, - strutils.mask_password(payload, secret='111')) - - payload = 'test = "original_password" : "aaaaaaaaa"' - expected = 'test = "original_password" : "***"' - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = 'test = "param1" : "value"' - expected = 'test = "param1" : "value"' - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = """{'adminPass':'mypassword'}""" - payload = six.text_type(payload) - expected = """{'adminPass':'***'}""" - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = ("test = 'node.session.auth.password','-v','mypassword'," - "'nomask'") - expected = ("test = 'node.session.auth.password','-v','***'," - "'nomask'") - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = ("test = 'node.session.auth.password', '--password', " - "'mypassword', 'nomask'") - expected = ("test = 'node.session.auth.password', '--password', " - "'***', 'nomask'") - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = ("test = 'node.session.auth.password', '--password', " - "'mypassword'") - expected = ("test = 'node.session.auth.password', '--password', " - "'***'") - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = "test = node.session.auth.password -v mypassword nomask" - expected = "test = node.session.auth.password -v *** nomask" - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = ("test = node.session.auth.password --password mypassword " - "nomask") - expected = ("test = node.session.auth.password --password *** " - "nomask") - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = ("test = node.session.auth.password --password mypassword") - expected = ("test = node.session.auth.password --password ***") - self.assertEqual(expected, strutils.mask_password(payload)) - - payload = "test = cmd --password my\xe9\x80\x80pass" - expected = ("test = cmd --password ***") - self.assertEqual(expected, strutils.mask_password(payload)) - - -class IsIntLikeTestCase(test_base.BaseTestCase): - def test_is_int_like_true(self): - self.assertTrue(strutils.is_int_like(1)) - self.assertTrue(strutils.is_int_like("1")) - self.assertTrue(strutils.is_int_like("514")) - self.assertTrue(strutils.is_int_like("0")) - - def test_is_int_like_false(self): - self.assertFalse(strutils.is_int_like(1.1)) - self.assertFalse(strutils.is_int_like("1.1")) - self.assertFalse(strutils.is_int_like("1.1.1")) - self.assertFalse(strutils.is_int_like(None)) - self.assertFalse(strutils.is_int_like("0.")) - self.assertFalse(strutils.is_int_like("aaaaaa")) - self.assertFalse(strutils.is_int_like("....")) - self.assertFalse(strutils.is_int_like("1g")) - self.assertFalse( - strutils.is_int_like("0cc3346e-9fef-4445-abe6-5d2b2690ec64")) - self.assertFalse(strutils.is_int_like("a1")) - # NOTE(viktors): 12e3 - is a float number - self.assertFalse(strutils.is_int_like("12e3")) - # NOTE(viktors): Check integer numbers with base not 10 - self.assertFalse(strutils.is_int_like("0o51")) - self.assertFalse(strutils.is_int_like("0xDEADBEEF")) diff --git a/tests/test_timeutils.py b/tests/test_timeutils.py deleted file mode 100644 index bc13a31..0000000 --- a/tests/test_timeutils.py +++ /dev/null @@ -1,359 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import calendar -import datetime -import time - -import iso8601 -import mock -from oslotest import base as test_base -from testtools import matchers - -from oslo.utils import timeutils - - -class TimeUtilsTest(test_base.BaseTestCase): - - def setUp(self): - super(TimeUtilsTest, self).setUp() - self.skynet_self_aware_time_str = '1997-08-29T06:14:00Z' - self.skynet_self_aware_time_ms_str = '1997-08-29T06:14:00.000123Z' - self.skynet_self_aware_time = datetime.datetime(1997, 8, 29, 6, 14, 0) - self.skynet_self_aware_ms_time = datetime.datetime( - 1997, 8, 29, 6, 14, 0, 123) - self.one_minute_before = datetime.datetime(1997, 8, 29, 6, 13, 0) - self.one_minute_after = datetime.datetime(1997, 8, 29, 6, 15, 0) - self.skynet_self_aware_time_perfect_str = '1997-08-29T06:14:00.000000' - self.skynet_self_aware_time_perfect = datetime.datetime(1997, 8, 29, - 6, 14, 0) - self.addCleanup(timeutils.clear_time_override) - - def test_isotime(self): - with mock.patch('datetime.datetime') as datetime_mock: - datetime_mock.utcnow.return_value = self.skynet_self_aware_time - dt = timeutils.isotime() - self.assertEqual(dt, self.skynet_self_aware_time_str) - - def test_isotimei_micro_second_precision(self): - with mock.patch('datetime.datetime') as datetime_mock: - datetime_mock.utcnow.return_value = self.skynet_self_aware_ms_time - dt = timeutils.isotime(subsecond=True) - self.assertEqual(dt, self.skynet_self_aware_time_ms_str) - - def test_parse_isotime(self): - expect = timeutils.parse_isotime(self.skynet_self_aware_time_str) - skynet_self_aware_time_utc = self.skynet_self_aware_time.replace( - tzinfo=iso8601.iso8601.UTC) - self.assertEqual(skynet_self_aware_time_utc, expect) - - def test_parse_isotime_micro_second_precision(self): - expect = timeutils.parse_isotime(self.skynet_self_aware_time_ms_str) - skynet_self_aware_time_ms_utc = self.skynet_self_aware_ms_time.replace( - tzinfo=iso8601.iso8601.UTC) - self.assertEqual(skynet_self_aware_time_ms_utc, expect) - - def test_strtime(self): - expect = timeutils.strtime(self.skynet_self_aware_time_perfect) - self.assertEqual(self.skynet_self_aware_time_perfect_str, expect) - - def test_parse_strtime(self): - perfect_time_format = self.skynet_self_aware_time_perfect_str - expect = timeutils.parse_strtime(perfect_time_format) - self.assertEqual(self.skynet_self_aware_time_perfect, expect) - - def test_strtime_and_back(self): - orig_t = datetime.datetime(1997, 8, 29, 6, 14, 0) - s = timeutils.strtime(orig_t) - t = timeutils.parse_strtime(s) - self.assertEqual(orig_t, t) - - def _test_is_older_than(self, fn): - strptime = datetime.datetime.strptime - with mock.patch('datetime.datetime') as datetime_mock: - datetime_mock.utcnow.return_value = self.skynet_self_aware_time - datetime_mock.strptime = strptime - expect_true = timeutils.is_older_than(fn(self.one_minute_before), - 59) - self.assertTrue(expect_true) - expect_false = timeutils.is_older_than(fn(self.one_minute_before), - 60) - self.assertFalse(expect_false) - expect_false = timeutils.is_older_than(fn(self.one_minute_before), - 61) - self.assertFalse(expect_false) - - def test_is_older_than_datetime(self): - self._test_is_older_than(lambda x: x) - - def test_is_older_than_str(self): - self._test_is_older_than(timeutils.strtime) - - def test_is_older_than_aware(self): - """Tests sending is_older_than an 'aware' datetime.""" - self._test_is_older_than(lambda x: x.replace( - tzinfo=iso8601.iso8601.UTC)) - - def _test_is_newer_than(self, fn): - strptime = datetime.datetime.strptime - with mock.patch('datetime.datetime') as datetime_mock: - datetime_mock.utcnow.return_value = self.skynet_self_aware_time - datetime_mock.strptime = strptime - expect_true = timeutils.is_newer_than(fn(self.one_minute_after), - 59) - self.assertTrue(expect_true) - expect_false = timeutils.is_newer_than(fn(self.one_minute_after), - 60) - self.assertFalse(expect_false) - expect_false = timeutils.is_newer_than(fn(self.one_minute_after), - 61) - self.assertFalse(expect_false) - - def test_is_newer_than_datetime(self): - self._test_is_newer_than(lambda x: x) - - def test_is_newer_than_str(self): - self._test_is_newer_than(timeutils.strtime) - - def test_is_newer_than_aware(self): - """Tests sending is_newer_than an 'aware' datetime.""" - self._test_is_newer_than(lambda x: x.replace( - tzinfo=iso8601.iso8601.UTC)) - - def test_set_time_override_using_default(self): - now = timeutils.utcnow_ts() - - # NOTE(kgriffs): Normally it's bad form to sleep in a unit test, - # but this is the only way to test that set_time_override defaults - # to setting the override to the current time. - time.sleep(1) - - timeutils.set_time_override() - overriden_now = timeutils.utcnow_ts() - self.assertThat(now, matchers.LessThan(overriden_now)) - - def test_utcnow_ts(self): - skynet_self_aware_ts = 872835240 - skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts) - self.assertEqual(self.skynet_self_aware_time, skynet_dt) - - # NOTE(kgriffs): timeutils.utcnow_ts() uses time.time() - # IFF time override is not set. - with mock.patch('time.time') as time_mock: - time_mock.return_value = skynet_self_aware_ts - ts = timeutils.utcnow_ts() - self.assertEqual(ts, skynet_self_aware_ts) - - timeutils.set_time_override(skynet_dt) - ts = timeutils.utcnow_ts() - self.assertEqual(ts, skynet_self_aware_ts) - - def test_utcnow_ts_microsecond(self): - skynet_self_aware_ts = 872835240.000123 - skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts) - self.assertEqual(self.skynet_self_aware_ms_time, skynet_dt) - - # NOTE(kgriffs): timeutils.utcnow_ts() uses time.time() - # IFF time override is not set. - with mock.patch('time.time') as time_mock: - time_mock.return_value = skynet_self_aware_ts - ts = timeutils.utcnow_ts(microsecond=True) - self.assertEqual(ts, skynet_self_aware_ts) - - timeutils.set_time_override(skynet_dt) - ts = timeutils.utcnow_ts(microsecond=True) - self.assertEqual(ts, skynet_self_aware_ts) - - def test_utcnow(self): - timeutils.set_time_override(mock.sentinel.utcnow) - self.assertEqual(timeutils.utcnow(), mock.sentinel.utcnow) - - timeutils.clear_time_override() - self.assertFalse(timeutils.utcnow() == mock.sentinel.utcnow) - - self.assertTrue(timeutils.utcnow()) - - self.assertEqual(timeutils.utcnow(True).tzinfo, - iso8601.iso8601.UTC) - - def test_advance_time_delta(self): - timeutils.set_time_override(self.one_minute_before) - timeutils.advance_time_delta(datetime.timedelta(seconds=60)) - self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time) - - def test_advance_time_seconds(self): - timeutils.set_time_override(self.one_minute_before) - timeutils.advance_time_seconds(60) - self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time) - - def test_marshall_time(self): - now = timeutils.utcnow() - binary = timeutils.marshall_now(now) - backagain = timeutils.unmarshall_time(binary) - self.assertEqual(now, backagain) - - def test_delta_seconds(self): - before = timeutils.utcnow() - after = before + datetime.timedelta(days=7, seconds=59, - microseconds=123456) - self.assertAlmostEquals(604859.123456, - timeutils.delta_seconds(before, after)) - - def test_total_seconds(self): - delta = datetime.timedelta(days=1, hours=2, minutes=3, seconds=4.5) - self.assertAlmostEquals(93784.5, - timeutils.total_seconds(delta)) - - def test_iso8601_from_timestamp(self): - utcnow = timeutils.utcnow() - iso = timeutils.isotime(utcnow) - ts = calendar.timegm(utcnow.timetuple()) - self.assertEqual(iso, timeutils.iso8601_from_timestamp(ts)) - - def test_is_soon(self): - expires = timeutils.utcnow() + datetime.timedelta(minutes=5) - self.assertFalse(timeutils.is_soon(expires, 120)) - self.assertTrue(timeutils.is_soon(expires, 300)) - self.assertTrue(timeutils.is_soon(expires, 600)) - - with mock.patch('datetime.datetime') as datetime_mock: - datetime_mock.utcnow.return_value = self.skynet_self_aware_time - expires = timeutils.utcnow() - self.assertTrue(timeutils.is_soon(expires, 0)) - - -class TestIso8601Time(test_base.BaseTestCase): - - def _instaneous(self, timestamp, yr, mon, day, hr, minute, sec, micro): - self.assertEqual(timestamp.year, yr) - self.assertEqual(timestamp.month, mon) - self.assertEqual(timestamp.day, day) - self.assertEqual(timestamp.hour, hr) - self.assertEqual(timestamp.minute, minute) - self.assertEqual(timestamp.second, sec) - self.assertEqual(timestamp.microsecond, micro) - - def _do_test(self, time_str, yr, mon, day, hr, minute, sec, micro, shift): - DAY_SECONDS = 24 * 60 * 60 - timestamp = timeutils.parse_isotime(time_str) - self._instaneous(timestamp, yr, mon, day, hr, minute, sec, micro) - offset = timestamp.tzinfo.utcoffset(None) - self.assertEqual(offset.seconds + offset.days * DAY_SECONDS, shift) - - def test_zulu(self): - time_str = '2012-02-14T20:53:07Z' - self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, 0) - - def test_zulu_micros(self): - time_str = '2012-02-14T20:53:07.123Z' - self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 123000, 0) - - def test_offset_east(self): - time_str = '2012-02-14T20:53:07+04:30' - offset = 4.5 * 60 * 60 - self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset) - - def test_offset_east_micros(self): - time_str = '2012-02-14T20:53:07.42+04:30' - offset = 4.5 * 60 * 60 - self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 420000, offset) - - def test_offset_west(self): - time_str = '2012-02-14T20:53:07-05:30' - offset = -5.5 * 60 * 60 - self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset) - - def test_offset_west_micros(self): - time_str = '2012-02-14T20:53:07.654321-05:30' - offset = -5.5 * 60 * 60 - self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 654321, offset) - - def test_compare(self): - zulu = timeutils.parse_isotime('2012-02-14T20:53:07') - east = timeutils.parse_isotime('2012-02-14T20:53:07-01:00') - west = timeutils.parse_isotime('2012-02-14T20:53:07+01:00') - self.assertTrue(east > west) - self.assertTrue(east > zulu) - self.assertTrue(zulu > west) - - def test_compare_micros(self): - zulu = timeutils.parse_isotime('2012-02-14T20:53:07.6544') - east = timeutils.parse_isotime('2012-02-14T19:53:07.654321-01:00') - west = timeutils.parse_isotime('2012-02-14T21:53:07.655+01:00') - self.assertTrue(east < west) - self.assertTrue(east < zulu) - self.assertTrue(zulu < west) - - def test_zulu_roundtrip(self): - time_str = '2012-02-14T20:53:07Z' - zulu = timeutils.parse_isotime(time_str) - self.assertEqual(zulu.tzinfo, iso8601.iso8601.UTC) - self.assertEqual(timeutils.isotime(zulu), time_str) - - def test_east_roundtrip(self): - time_str = '2012-02-14T20:53:07-07:00' - east = timeutils.parse_isotime(time_str) - self.assertEqual(east.tzinfo.tzname(None), '-07:00') - self.assertEqual(timeutils.isotime(east), time_str) - - def test_west_roundtrip(self): - time_str = '2012-02-14T20:53:07+11:30' - west = timeutils.parse_isotime(time_str) - self.assertEqual(west.tzinfo.tzname(None), '+11:30') - self.assertEqual(timeutils.isotime(west), time_str) - - def test_now_roundtrip(self): - time_str = timeutils.isotime() - now = timeutils.parse_isotime(time_str) - self.assertEqual(now.tzinfo, iso8601.iso8601.UTC) - self.assertEqual(timeutils.isotime(now), time_str) - - def test_zulu_normalize(self): - time_str = '2012-02-14T20:53:07Z' - zulu = timeutils.parse_isotime(time_str) - normed = timeutils.normalize_time(zulu) - self._instaneous(normed, 2012, 2, 14, 20, 53, 7, 0) - - def test_east_normalize(self): - time_str = '2012-02-14T20:53:07-07:00' - east = timeutils.parse_isotime(time_str) - normed = timeutils.normalize_time(east) - self._instaneous(normed, 2012, 2, 15, 3, 53, 7, 0) - - def test_west_normalize(self): - time_str = '2012-02-14T20:53:07+21:00' - west = timeutils.parse_isotime(time_str) - normed = timeutils.normalize_time(west) - self._instaneous(normed, 2012, 2, 13, 23, 53, 7, 0) - - def test_normalize_aware_to_naive(self): - dt = datetime.datetime(2011, 2, 14, 20, 53, 7) - time_str = '2011-02-14T20:53:07+21:00' - aware = timeutils.parse_isotime(time_str) - naive = timeutils.normalize_time(aware) - self.assertTrue(naive < dt) - - def test_normalize_zulu_aware_to_naive(self): - dt = datetime.datetime(2011, 2, 14, 20, 53, 7) - time_str = '2011-02-14T19:53:07Z' - aware = timeutils.parse_isotime(time_str) - naive = timeutils.normalize_time(aware) - self.assertTrue(naive < dt) - - def test_normalize_naive(self): - dt = datetime.datetime(2011, 2, 14, 20, 53, 7) - dtn = datetime.datetime(2011, 2, 14, 19, 53, 7) - naive = timeutils.normalize_time(dtn) - self.assertTrue(naive < dt) diff --git a/tests/test_uuidutils.py b/tests/test_uuidutils.py deleted file mode 100644 index 63e5551..0000000 --- a/tests/test_uuidutils.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (c) 2012 Intel Corporation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from oslotest import base as test_base - -from oslo.utils import uuidutils - - -class UUIDUtilsTest(test_base.BaseTestCase): - - def test_generate_uuid(self): - uuid_string = uuidutils.generate_uuid() - self.assertTrue(isinstance(uuid_string, str)) - self.assertEqual(len(uuid_string), 36) - # make sure there are 4 dashes - self.assertEqual(len(uuid_string.replace('-', '')), 32) - - def test_is_uuid_like(self): - self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()))) - self.assertTrue(uuidutils.is_uuid_like( - '{12345678-1234-5678-1234-567812345678}')) - self.assertTrue(uuidutils.is_uuid_like( - '12345678123456781234567812345678')) - self.assertTrue(uuidutils.is_uuid_like( - 'urn:uuid:12345678-1234-5678-1234-567812345678')) - self.assertTrue(uuidutils.is_uuid_like( - 'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb')) - self.assertTrue(uuidutils.is_uuid_like( - 'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb')) - self.assertTrue(uuidutils.is_uuid_like( - '{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}')) - - def test_is_uuid_like_insensitive(self): - self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper())) - - def test_id_is_uuid_like(self): - self.assertFalse(uuidutils.is_uuid_like(1234567)) - - def test_name_is_uuid_like(self): - self.assertFalse(uuidutils.is_uuid_like('zhongyueluo')) diff --git a/tests/test_warning.py b/tests/test_warning.py deleted file mode 100644 index 306d93d..0000000 --- a/tests/test_warning.py +++ /dev/null @@ -1,61 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import imp -import os -import warnings - -import mock -from oslotest import base as test_base -import six - - -class DeprecationWarningTest(test_base.BaseTestCase): - - @mock.patch('warnings.warn') - def test_warning(self, mock_warn): - import oslo.utils - imp.reload(oslo.utils) - self.assertTrue(mock_warn.called) - args = mock_warn.call_args - self.assertIn('oslo_utils', args[0][0]) - self.assertIn('deprecated', args[0][0]) - self.assertTrue(issubclass(args[0][1], DeprecationWarning)) - - def test_real_warning(self): - with warnings.catch_warnings(record=True) as warning_msgs: - warnings.resetwarnings() - warnings.simplefilter('always', DeprecationWarning) - import oslo.utils - - # Use a separate function to get the stack level correct - # so we know the message points back to this file. This - # corresponds to an import or reload, which isn't working - # inside the test under Python 3.3. That may be due to a - # difference in the import implementation not triggering - # warnings properly when the module is reloaded, or - # because the warnings module is mostly implemented in C - # and something isn't cleanly resetting the global state - # used to track whether a warning needs to be - # emitted. Whatever the cause, we definitely see the - # warnings.warn() being invoked on a reload (see the test - # above) and warnings are reported on the console when we - # run the tests. A simpler test script run outside of - # testr does correctly report the warnings. - def foo(): - oslo.utils.deprecated() - - foo() - self.assertEqual(1, len(warning_msgs)) - msg = warning_msgs[0] - self.assertIn('oslo_utils', six.text_type(msg.message)) - self.assertEqual('test_warning.py', os.path.basename(msg.filename)) diff --git a/tests/tests_encodeutils.py b/tests/tests_encodeutils.py deleted file mode 100644 index 5e50d3a..0000000 --- a/tests/tests_encodeutils.py +++ /dev/null @@ -1,105 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2014 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -from oslotest import base as test_base -import six - -from oslo.utils import encodeutils - - -class EncodeUtilsTest(test_base.BaseTestCase): - - def test_safe_decode(self): - safe_decode = encodeutils.safe_decode - self.assertRaises(TypeError, safe_decode, True) - self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"), - incoming="utf-8")) - if six.PY2: - # In Python 3, bytes.decode() doesn't support anymore - # bytes => bytes encodings like base64 - self.assertEqual(six.u("test"), safe_decode("dGVzdA==", - incoming='base64')) - - self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'), - errors='ignore')) - - self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'), - incoming='iso-8859-1')) - - # Forcing incoming to ascii so it falls back to utf-8 - self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'), - incoming='ascii')) - - self.assertEqual(six.u('foo'), safe_decode(b'foo')) - - def test_safe_encode_none_instead_of_text(self): - self.assertRaises(TypeError, encodeutils.safe_encode, None) - - def test_safe_encode_bool_instead_of_text(self): - self.assertRaises(TypeError, encodeutils.safe_encode, True) - - def test_safe_encode_int_instead_of_text(self): - self.assertRaises(TypeError, encodeutils.safe_encode, 1) - - def test_safe_encode_list_instead_of_text(self): - self.assertRaises(TypeError, encodeutils.safe_encode, []) - - def test_safe_encode_dict_instead_of_text(self): - self.assertRaises(TypeError, encodeutils.safe_encode, {}) - - def test_safe_encode_tuple_instead_of_text(self): - self.assertRaises(TypeError, encodeutils.safe_encode, ('foo', 'bar', )) - - def test_safe_encode_py2(self): - if six.PY2: - # In Python 3, str.encode() doesn't support anymore - # text => text encodings like base64 - self.assertEqual( - six.b("dGVzdA==\n"), - encodeutils.safe_encode("test", encoding='base64'), - ) - else: - self.skipTest("Requires py2.x") - - def test_safe_encode_force_incoming_utf8_to_ascii(self): - # Forcing incoming to ascii so it falls back to utf-8 - self.assertEqual( - six.b('ni\xc3\xb1o'), - encodeutils.safe_encode(six.b('ni\xc3\xb1o'), incoming='ascii'), - ) - - def test_safe_encode_same_encoding_different_cases(self): - with mock.patch.object(encodeutils, 'safe_decode', mock.Mock()): - utf8 = encodeutils.safe_encode( - six.u('foo\xf1bar'), encoding='utf-8') - self.assertEqual( - encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'), - encodeutils.safe_encode(utf8, 'utf-8', 'UTF-8'), - ) - self.assertEqual( - encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'), - encodeutils.safe_encode(utf8, 'utf-8', 'utf-8'), - ) - encodeutils.safe_decode.assert_has_calls([]) - - def test_safe_encode_different_encodings(self): - text = six.u('foo\xc3\xb1bar') - result = encodeutils.safe_encode( - text=text, incoming='utf-8', encoding='iso-8859-1') - self.assertNotEqual(text, result) - self.assertNotEqual(six.b("foo\xf1bar"), result) |