summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDoug Hellmann <doug@doughellmann.com>2015-07-13 16:11:47 +0000
committerDoug Hellmann <doug@doughellmann.com>2015-07-15 18:20:18 +0000
commit6bd180efc305e0078249e8982b2d21055851b28a (patch)
tree7e3ad9fc27157ec57acc8a0cc813e6826c3f5c4f
parent6c4509587a9494a015c4b482429ae3268d33864d (diff)
downloadoslo-utils-6bd180efc305e0078249e8982b2d21055851b28a.tar.gz
Remove oslo namespace package
Blueprint remove-namespace-packages Depends-on: I4e1cb4f1caaee61683dc9fb0ffc67e8f9e06dc6d for openstack/barbican Depends-on: I52848a1f63cb4a1b30d9e1c32f4bb18c5d9fe80e for openstack/castellan Depends-on: Ia21c15e8eca6bf456f7cfe13f815f5ce068601e7 for openstack/designate Depends-on: I695d2141c00a5ee36e042efbb9bac4e2803c1948 for openstack/ironic Depends-on: I779d8df5872887d5d8ec44012d792b59b0cf4f64 for openstack/ironic-lib Depends-on: I977248bec9560bcd730efe6bed741f1dc025fdf3 for openstack/keystonemiddleware Depends-on: I75e6e15d50ef9830d0581efd4cbcceb3e626f7b7 for openstack/manila Depends-on: I975592f3694be42d52685ebf606f6d3012caf1a8 for openstack/murano Depends-on: Icbd2ae016553b5e5c886205e87acf38f3ccec550 for openstack/murano-dashboard Depends-on: If8a132de65ba1e57ea93f98daac66816a3cefaa8 for openstack/neutron Depends-on: I9891564317aa3cf84874debdc966e48fb270455e for openstack/os-brick Depends-on: Ie11a5d51a6471792e122942dfc9063a0d9d38399 for openstack/python-barbicanclient Depends-on: I5d325e9397404211df00d9e39195b63d9a2f2e76 for openstack/python-muranoclient Depends-on: I474dcd9de166b00d02b3586858a28797e97a3231 for openstack/python-neutronclient Depends-on: I5e4746b58e11bf56957f10517d484c173335dfc9 for openstack/sahara Change-Id: I9ef9506873605661f394476c96e4a3bbdb85fbed
-rw-r--r--oslo/__init__.py15
-rw-r--r--oslo/utils/__init__.py26
-rw-r--r--oslo/utils/encodeutils.py13
-rw-r--r--oslo/utils/excutils.py13
-rw-r--r--oslo/utils/importutils.py13
-rw-r--r--oslo/utils/netutils.py15
-rw-r--r--oslo/utils/reflection.py13
-rw-r--r--oslo/utils/strutils.py13
-rw-r--r--oslo/utils/timeutils.py13
-rw-r--r--oslo/utils/units.py13
-rw-r--r--oslo/utils/uuidutils.py13
-rw-r--r--setup.cfg4
-rw-r--r--tests/__init__.py13
-rw-r--r--tests/base.py55
-rw-r--r--tests/test_excutils.py197
-rw-r--r--tests/test_importutils.py121
-rw-r--r--tests/test_netutils.py204
-rw-r--r--tests/test_reflection.py279
-rw-r--r--tests/test_strutils.py594
-rw-r--r--tests/test_timeutils.py359
-rw-r--r--tests/test_uuidutils.py54
-rw-r--r--tests/test_warning.py61
-rw-r--r--tests/tests_encodeutils.py105
23 files changed, 0 insertions, 2206 deletions
diff --git a/oslo/__init__.py b/oslo/__init__.py
deleted file mode 100644
index 594bc16..0000000
--- a/oslo/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2012 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-__import__('pkg_resources').declare_namespace(__name__)
diff --git a/oslo/utils/__init__.py b/oslo/utils/__init__.py
deleted file mode 100644
index 73e54f3..0000000
--- a/oslo/utils/__init__.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import warnings
-
-
-def deprecated():
- new_name = __name__.replace('.', '_')
- warnings.warn(
- ('The oslo namespace package is deprecated. Please use %s instead.' %
- new_name),
- DeprecationWarning,
- stacklevel=3,
- )
-
-
-deprecated()
diff --git a/oslo/utils/encodeutils.py b/oslo/utils/encodeutils.py
deleted file mode 100644
index 0de2476..0000000
--- a/oslo/utils/encodeutils.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.encodeutils import * # noqa
diff --git a/oslo/utils/excutils.py b/oslo/utils/excutils.py
deleted file mode 100644
index 333c955..0000000
--- a/oslo/utils/excutils.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.excutils import * # noqa
diff --git a/oslo/utils/importutils.py b/oslo/utils/importutils.py
deleted file mode 100644
index 9097dc4..0000000
--- a/oslo/utils/importutils.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.importutils import * # noqa
diff --git a/oslo/utils/netutils.py b/oslo/utils/netutils.py
deleted file mode 100644
index 701224d..0000000
--- a/oslo/utils/netutils.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.netutils import * # noqa
-# NOTE(dhellmann): Needed for taskflow.
-from oslo_utils.netutils import _ModifiedSplitResult # noqa
diff --git a/oslo/utils/reflection.py b/oslo/utils/reflection.py
deleted file mode 100644
index f8b5c16..0000000
--- a/oslo/utils/reflection.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.reflection import * # noqa
diff --git a/oslo/utils/strutils.py b/oslo/utils/strutils.py
deleted file mode 100644
index 989e088..0000000
--- a/oslo/utils/strutils.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.strutils import * # noqa
diff --git a/oslo/utils/timeutils.py b/oslo/utils/timeutils.py
deleted file mode 100644
index 65b856a..0000000
--- a/oslo/utils/timeutils.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.timeutils import * # noqa
diff --git a/oslo/utils/units.py b/oslo/utils/units.py
deleted file mode 100644
index 741e740..0000000
--- a/oslo/utils/units.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.units import * # noqa
diff --git a/oslo/utils/uuidutils.py b/oslo/utils/uuidutils.py
deleted file mode 100644
index 4f98a84..0000000
--- a/oslo/utils/uuidutils.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_utils.uuidutils import * # noqa
diff --git a/setup.cfg b/setup.cfg
index c558b32..e31979d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -21,11 +21,7 @@ classifier =
[files]
packages =
- oslo
- oslo.utils
oslo_utils
-namespace_packages =
- oslo
[pbr]
warnerrors = true
diff --git a/tests/__init__.py b/tests/__init__.py
deleted file mode 100644
index 19f5e72..0000000
--- a/tests/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
diff --git a/tests/base.py b/tests/base.py
deleted file mode 100644
index a3069ed..0000000
--- a/tests/base.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2010-2011 OpenStack Foundation
-# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-
-import fixtures
-import testtools
-
-_TRUE_VALUES = ('true', '1', 'yes')
-
-# FIXME(dhellmann) Update this to use oslo.test library
-
-
-class TestCase(testtools.TestCase):
-
- """Test case base class for all unit tests."""
-
- def setUp(self):
- """Run before each test method to initialize test environment."""
-
- super(TestCase, self).setUp()
- test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
- try:
- test_timeout = int(test_timeout)
- except ValueError:
- # If timeout value is invalid do not set a timeout.
- test_timeout = 0
- if test_timeout > 0:
- self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
-
- self.useFixture(fixtures.NestedTempfile())
- self.useFixture(fixtures.TempHomeDir())
-
- if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
- stdout = self.useFixture(fixtures.StringStream('stdout')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
- if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
- stderr = self.useFixture(fixtures.StringStream('stderr')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
-
- self.log_fixture = self.useFixture(fixtures.FakeLogger())
diff --git a/tests/test_excutils.py b/tests/test_excutils.py
deleted file mode 100644
index 3f35e79..0000000
--- a/tests/test_excutils.py
+++ /dev/null
@@ -1,197 +0,0 @@
-# Copyright 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import time
-
-import mock
-from oslotest import base as test_base
-from oslotest import moxstubout
-
-from oslo.utils import excutils
-
-
-mox = moxstubout.mox
-
-
-class SaveAndReraiseTest(test_base.BaseTestCase):
-
- def test_save_and_reraise_exception(self):
- e = None
- msg = 'foo'
- try:
- try:
- raise Exception(msg)
- except Exception:
- with excutils.save_and_reraise_exception():
- pass
- except Exception as _e:
- e = _e
-
- self.assertEqual(str(e), msg)
-
- @mock.patch('logging.getLogger')
- def test_save_and_reraise_exception_dropped(self, get_logger_mock):
- logger = get_logger_mock()
- e = None
- msg = 'second exception'
- try:
- try:
- raise Exception('dropped')
- except Exception:
- with excutils.save_and_reraise_exception():
- raise Exception(msg)
- except Exception as _e:
- e = _e
- self.assertEqual(str(e), msg)
- self.assertTrue(logger.error.called)
-
- def test_save_and_reraise_exception_no_reraise(self):
- """Test that suppressing the reraise works."""
- try:
- raise Exception('foo')
- except Exception:
- with excutils.save_and_reraise_exception() as ctxt:
- ctxt.reraise = False
-
- @mock.patch('logging.getLogger')
- def test_save_and_reraise_exception_dropped_no_reraise(self,
- get_logger_mock):
- logger = get_logger_mock()
- e = None
- msg = 'second exception'
- try:
- try:
- raise Exception('dropped')
- except Exception:
- with excutils.save_and_reraise_exception(reraise=False):
- raise Exception(msg)
- except Exception as _e:
- e = _e
- self.assertEqual(str(e), msg)
- self.assertFalse(logger.error.called)
-
-
-class ForeverRetryUncaughtExceptionsTest(test_base.BaseTestCase):
-
- def setUp(self):
- super(ForeverRetryUncaughtExceptionsTest, self).setUp()
- moxfixture = self.useFixture(moxstubout.MoxStubout())
- self.mox = moxfixture.mox
- self.stubs = moxfixture.stubs
-
- @excutils.forever_retry_uncaught_exceptions
- def exception_generator(self):
- exc = self.exception_to_raise()
- while exc is not None:
- raise exc
- exc = self.exception_to_raise()
-
- def exception_to_raise(self):
- return None
-
- def my_time_sleep(self, arg):
- pass
-
- def exc_retrier_common_start(self):
- self.stubs.Set(time, 'sleep', self.my_time_sleep)
- self.mox.StubOutWithMock(logging, 'exception')
- self.mox.StubOutWithMock(time, 'time',
- use_mock_anything=True)
- self.mox.StubOutWithMock(self, 'exception_to_raise')
-
- def exc_retrier_sequence(self, exc_id=None, timestamp=None,
- exc_count=None):
- self.exception_to_raise().AndReturn(
- Exception('unexpected %d' % exc_id))
- time.time().AndReturn(timestamp)
- if exc_count != 0:
- logging.exception(mox.In(
- 'Unexpected exception occurred %d time(s)' % exc_count))
-
- def exc_retrier_common_end(self):
- self.exception_to_raise().AndReturn(None)
- self.mox.ReplayAll()
- self.exception_generator()
- self.addCleanup(self.stubs.UnsetAll)
-
- def test_exc_retrier_1exc_gives_1log(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- self.exc_retrier_common_end()
-
- def test_exc_retrier_same_10exc_1min_gives_1log(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- # By design, the following exception don't get logged because they
- # are within the same minute.
- for i in range(2, 11):
- self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
- self.exc_retrier_common_end()
-
- def test_exc_retrier_same_2exc_2min_gives_2logs(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
- self.exc_retrier_common_end()
-
- def test_exc_retrier_same_10exc_2min_gives_2logs(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
- # The previous 4 exceptions are counted here
- self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
- # Again, the following are not logged due to being within
- # the same minute
- self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
- self.exc_retrier_common_end()
-
- def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- # By design, this second 'unexpected 1' exception is not counted. This
- # is likely a rare thing and is a sacrifice for code simplicity.
- self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
- self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
- # Again, trailing exceptions within a minute are not counted.
- self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
- self.exc_retrier_common_end()
-
- def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- # Again, this second exception of the same type is not counted
- # for the sake of code simplicity.
- self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
- # The difference between this and the previous case is the log
- # is also triggered by more than a minute expiring.
- self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
- self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
- self.exc_retrier_common_end()
-
- def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
- self.exc_retrier_common_start()
- self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
- # This time the second 'unexpected 1' exception is counted due
- # to the same exception occurring same when the minute expires.
- self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
- self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
- self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
- self.exc_retrier_common_end()
diff --git a/tests/test_importutils.py b/tests/test_importutils.py
deleted file mode 100644
index c82b8fa..0000000
--- a/tests/test_importutils.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import sys
-
-from oslotest import base as test_base
-
-from oslo.utils import importutils
-
-
-class ImportUtilsTest(test_base.BaseTestCase):
-
- # NOTE(jkoelker) There has GOT to be a way to test this. But mocking
- # __import__ is the devil. Right now we just make
- # sure we can import something from the stdlib
- def test_import_class(self):
- dt = importutils.import_class('datetime.datetime')
- self.assertEqual(sys.modules['datetime'].datetime, dt)
-
- def test_import_bad_class(self):
- self.assertRaises(ImportError, importutils.import_class,
- 'lol.u_mad.brah')
-
- def test_import_module(self):
- dt = importutils.import_module('datetime')
- self.assertEqual(sys.modules['datetime'], dt)
-
- def test_import_object_optional_arg_not_present(self):
- obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver')
- self.assertEqual(obj.__class__.__name__, 'FakeDriver')
-
- def test_import_object_optional_arg_present(self):
- obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver',
- first_arg=False)
- self.assertEqual(obj.__class__.__name__, 'FakeDriver')
-
- def test_import_object_required_arg_not_present(self):
- # arg 1 isn't optional here
- self.assertRaises(TypeError, importutils.import_object,
- 'oslo_utils.tests.fake.FakeDriver2')
-
- def test_import_object_required_arg_present(self):
- obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2',
- first_arg=False)
- self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
-
- # namespace tests
- def test_import_object_ns_optional_arg_not_present(self):
- obj = importutils.import_object_ns('oslo_utils',
- 'tests.fake.FakeDriver')
- self.assertEqual(obj.__class__.__name__, 'FakeDriver')
-
- def test_import_object_ns_optional_arg_present(self):
- obj = importutils.import_object_ns('oslo_utils',
- 'tests.fake.FakeDriver',
- first_arg=False)
- self.assertEqual(obj.__class__.__name__, 'FakeDriver')
-
- def test_import_object_ns_required_arg_not_present(self):
- # arg 1 isn't optional here
- self.assertRaises(TypeError, importutils.import_object_ns,
- 'oslo_utils', 'tests.fake.FakeDriver2')
-
- def test_import_object_ns_required_arg_present(self):
- obj = importutils.import_object_ns('oslo_utils',
- 'tests.fake.FakeDriver2',
- first_arg=False)
- self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
-
- # namespace tests
- def test_import_object_ns_full_optional_arg_not_present(self):
- obj = importutils.import_object_ns('tests2',
- 'oslo_utils.tests.fake.FakeDriver')
- self.assertEqual(obj.__class__.__name__, 'FakeDriver')
-
- def test_import_object_ns_full_optional_arg_present(self):
- obj = importutils.import_object_ns('tests2',
- 'oslo_utils.tests.fake.FakeDriver',
- first_arg=False)
- self.assertEqual(obj.__class__.__name__, 'FakeDriver')
-
- def test_import_object_ns_full_required_arg_not_present(self):
- # arg 1 isn't optional here
- self.assertRaises(TypeError, importutils.import_object_ns,
- 'tests2', 'oslo_utils.tests.fake.FakeDriver2')
-
- def test_import_object_ns_full_required_arg_present(self):
- obj = importutils.import_object_ns('tests2',
- 'oslo_utils.tests.fake.FakeDriver2',
- first_arg=False)
- self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
-
- def test_import_object(self):
- dt = importutils.import_object('datetime.time')
- self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
-
- def test_import_object_with_args(self):
- dt = importutils.import_object('datetime.datetime', 2012, 4, 5)
- self.assertTrue(isinstance(dt, sys.modules['datetime'].datetime))
- self.assertEqual(dt, datetime.datetime(2012, 4, 5))
-
- def test_try_import(self):
- dt = importutils.try_import('datetime')
- self.assertEqual(sys.modules['datetime'], dt)
-
- def test_try_import_returns_default(self):
- foo = importutils.try_import('foo.bar')
- self.assertIsNone(foo)
diff --git a/tests/test_netutils.py b/tests/test_netutils.py
deleted file mode 100644
index 0aa59cf..0000000
--- a/tests/test_netutils.py
+++ /dev/null
@@ -1,204 +0,0 @@
-# Copyright 2012 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import socket
-
-import mock
-from oslotest import base as test_base
-
-from oslo.utils import netutils
-
-
-class NetworkUtilsTest(test_base.BaseTestCase):
-
- def test_no_host(self):
- result = netutils.urlsplit('http://')
- self.assertEqual('', result.netloc)
- self.assertEqual(None, result.port)
- self.assertEqual(None, result.hostname)
- self.assertEqual('http', result.scheme)
-
- def test_parse_host_port(self):
- self.assertEqual(('server01', 80),
- netutils.parse_host_port('server01:80'))
- self.assertEqual(('server01', None),
- netutils.parse_host_port('server01'))
- self.assertEqual(('server01', 1234),
- netutils.parse_host_port('server01',
- default_port=1234))
- self.assertEqual(('::1', 80),
- netutils.parse_host_port('[::1]:80'))
- self.assertEqual(('::1', None),
- netutils.parse_host_port('[::1]'))
- self.assertEqual(('::1', 1234),
- netutils.parse_host_port('[::1]',
- default_port=1234))
- self.assertEqual(('2001:db8:85a3::8a2e:370:7334', 1234),
- netutils.parse_host_port(
- '2001:db8:85a3::8a2e:370:7334',
- default_port=1234))
-
- def test_urlsplit(self):
- result = netutils.urlsplit('rpc://myhost?someparam#somefragment')
- self.assertEqual(result.scheme, 'rpc')
- self.assertEqual(result.netloc, 'myhost')
- self.assertEqual(result.path, '')
- self.assertEqual(result.query, 'someparam')
- self.assertEqual(result.fragment, 'somefragment')
-
- result = netutils.urlsplit(
- 'rpc://myhost/mypath?someparam#somefragment',
- allow_fragments=False)
- self.assertEqual(result.scheme, 'rpc')
- self.assertEqual(result.netloc, 'myhost')
- self.assertEqual(result.path, '/mypath')
- self.assertEqual(result.query, 'someparam#somefragment')
- self.assertEqual(result.fragment, '')
-
- result = netutils.urlsplit(
- 'rpc://user:pass@myhost/mypath?someparam#somefragment',
- allow_fragments=False)
- self.assertEqual(result.scheme, 'rpc')
- self.assertEqual(result.netloc, 'user:pass@myhost')
- self.assertEqual(result.path, '/mypath')
- self.assertEqual(result.query, 'someparam#somefragment')
- self.assertEqual(result.fragment, '')
-
- def test_urlsplit_ipv6(self):
- ipv6_url = 'http://[::1]:443/v2.0/'
- result = netutils.urlsplit(ipv6_url)
- self.assertEqual(result.scheme, 'http')
- self.assertEqual(result.netloc, '[::1]:443')
- self.assertEqual(result.path, '/v2.0/')
- self.assertEqual(result.hostname, '::1')
- self.assertEqual(result.port, 443)
-
- ipv6_url = 'http://user:pass@[::1]/v2.0/'
- result = netutils.urlsplit(ipv6_url)
- self.assertEqual(result.scheme, 'http')
- self.assertEqual(result.netloc, 'user:pass@[::1]')
- self.assertEqual(result.path, '/v2.0/')
- self.assertEqual(result.hostname, '::1')
- self.assertEqual(result.port, None)
-
- ipv6_url = 'https://[2001:db8:85a3::8a2e:370:7334]:1234/v2.0/xy?ab#12'
- result = netutils.urlsplit(ipv6_url)
- self.assertEqual(result.scheme, 'https')
- self.assertEqual(result.netloc, '[2001:db8:85a3::8a2e:370:7334]:1234')
- self.assertEqual(result.path, '/v2.0/xy')
- self.assertEqual(result.hostname, '2001:db8:85a3::8a2e:370:7334')
- self.assertEqual(result.port, 1234)
- self.assertEqual(result.query, 'ab')
- self.assertEqual(result.fragment, '12')
-
- def test_urlsplit_params(self):
- test_url = "http://localhost/?a=b&c=d"
- result = netutils.urlsplit(test_url)
- self.assertEqual({'a': 'b', 'c': 'd'}, result.params())
- self.assertEqual({'a': 'b', 'c': 'd'}, result.params(collapse=False))
-
- test_url = "http://localhost/?a=b&a=c&a=d"
- result = netutils.urlsplit(test_url)
- self.assertEqual({'a': 'd'}, result.params())
- self.assertEqual({'a': ['b', 'c', 'd']}, result.params(collapse=False))
-
- test_url = "http://localhost"
- result = netutils.urlsplit(test_url)
- self.assertEqual({}, result.params())
-
- test_url = "http://localhost?"
- result = netutils.urlsplit(test_url)
- self.assertEqual({}, result.params())
-
- def test_set_tcp_keepalive(self):
- mock_sock = mock.Mock()
- netutils.set_tcp_keepalive(mock_sock, True, 100, 10, 5)
- calls = [
- mock.call.setsockopt(socket.SOL_SOCKET,
- socket.SO_KEEPALIVE, True),
- ]
- if hasattr(socket, 'TCP_KEEPIDLE'):
- calls += [
- mock.call.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_KEEPIDLE, 100)
- ]
- if hasattr(socket, 'TCP_KEEPINTVL'):
- calls += [
- mock.call.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_KEEPINTVL, 10),
- ]
- if hasattr(socket, 'TCP_KEEPCNT'):
- calls += [
- mock.call.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_KEEPCNT, 5)
- ]
- mock_sock.assert_has_calls(calls)
-
- mock_sock.reset_mock()
- netutils.set_tcp_keepalive(mock_sock, False)
- self.assertEqual(1, len(mock_sock.mock_calls))
-
- def test_is_valid_ipv4(self):
- self.assertTrue(netutils.is_valid_ipv4('42.42.42.42'))
-
- self.assertFalse(netutils.is_valid_ipv4('-1.11.11.11'))
-
- self.assertFalse(netutils.is_valid_ipv4(''))
-
- def test_is_valid_ipv6(self):
- self.assertTrue(netutils.is_valid_ipv6('::1'))
-
- self.assertFalse(netutils.is_valid_ipv6(
- '1fff::a88:85a3::172.31.128.1'))
-
- self.assertFalse(netutils.is_valid_ipv6(''))
-
- def test_is_valid_ip(self):
- self.assertTrue(netutils.is_valid_ip('127.0.0.1'))
-
- self.assertTrue(netutils.is_valid_ip('2001:db8::ff00:42:8329'))
-
- self.assertFalse(netutils.is_valid_ip('256.0.0.0'))
-
- self.assertFalse(netutils.is_valid_ip('::1.2.3.'))
-
- self.assertFalse(netutils.is_valid_ip(''))
-
- def test_valid_port(self):
- valid_inputs = [1, '1', 2, '3', '5', 8, 13, 21,
- '80', '3246', '65535']
- for input_str in valid_inputs:
- self.assertTrue(netutils.is_valid_port(input_str))
-
- def test_valid_port_fail(self):
- invalid_inputs = ['-32768', '0', 0, '65536', 528491, '528491',
- '528.491', 'thirty-seven', None]
- for input_str in invalid_inputs:
- self.assertFalse(netutils.is_valid_port(input_str))
-
- def test_get_my_ip(self):
- sock_attrs = {
- 'return_value.getsockname.return_value': ['1.2.3.4', '']}
- with mock.patch('socket.socket', **sock_attrs):
- addr = netutils.get_my_ipv4()
- self.assertEqual(addr, '1.2.3.4')
-
- @mock.patch('socket.socket')
- @mock.patch('oslo_utils.netutils._get_my_ipv4_address')
- def test_get_my_ip_socket_error(self, ip, mock_socket):
- mock_socket.side_effect = socket.error
- ip.return_value = '1.2.3.4'
- addr = netutils.get_my_ipv4()
- self.assertEqual(addr, '1.2.3.4')
diff --git a/tests/test_reflection.py b/tests/test_reflection.py
deleted file mode 100644
index 24c31e1..0000000
--- a/tests/test_reflection.py
+++ /dev/null
@@ -1,279 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslotest import base as test_base
-import six
-import testtools
-
-from oslo.utils import reflection
-
-
-if six.PY3:
- RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
- 'BaseException', 'object']
-else:
- RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
- 'BaseException', 'object']
-
-
-def dummy_decorator(f):
-
- @six.wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
-
- return wrapper
-
-
-def mere_function(a, b):
- pass
-
-
-def function_with_defs(a, b, optional=None):
- pass
-
-
-def function_with_kwargs(a, b, **kwargs):
- pass
-
-
-class Class(object):
-
- def method(self, c, d):
- pass
-
- @staticmethod
- def static_method(e, f):
- pass
-
- @classmethod
- def class_method(cls, g, h):
- pass
-
-
-class CallableClass(object):
- def __call__(self, i, j):
- pass
-
-
-class ClassWithInit(object):
- def __init__(self, k, l):
- pass
-
-
-class CallbackEqualityTest(test_base.BaseTestCase):
- def test_different_simple_callbacks(self):
-
- def a():
- pass
-
- def b():
- pass
-
- self.assertFalse(reflection.is_same_callback(a, b))
-
- def test_static_instance_callbacks(self):
-
- class A(object):
-
- @staticmethod
- def b(a, b, c):
- pass
-
- a = A()
- b = A()
-
- self.assertTrue(reflection.is_same_callback(a.b, b.b))
-
- def test_different_instance_callbacks(self):
-
- class A(object):
- def b(self):
- pass
-
- def __eq__(self, other):
- return True
-
- b = A()
- c = A()
-
- self.assertFalse(reflection.is_same_callback(b.b, c.b))
- self.assertTrue(reflection.is_same_callback(b.b, c.b, strict=False))
-
-
-class GetCallableNameTest(test_base.BaseTestCase):
-
- def test_mere_function(self):
- name = reflection.get_callable_name(mere_function)
- self.assertEqual('.'.join((__name__, 'mere_function')), name)
-
- def test_method(self):
- name = reflection.get_callable_name(Class.method)
- self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
-
- def test_instance_method(self):
- name = reflection.get_callable_name(Class().method)
- self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
-
- def test_static_method(self):
- name = reflection.get_callable_name(Class.static_method)
- if six.PY3:
- self.assertEqual('.'.join((__name__, 'Class', 'static_method')),
- name)
- else:
- # NOTE(imelnikov): static method are just functions, class name
- # is not recorded anywhere in them.
- self.assertEqual('.'.join((__name__, 'static_method')), name)
-
- def test_class_method(self):
- name = reflection.get_callable_name(Class.class_method)
- self.assertEqual('.'.join((__name__, 'Class', 'class_method')), name)
-
- def test_constructor(self):
- name = reflection.get_callable_name(Class)
- self.assertEqual('.'.join((__name__, 'Class')), name)
-
- def test_callable_class(self):
- name = reflection.get_callable_name(CallableClass())
- self.assertEqual('.'.join((__name__, 'CallableClass')), name)
-
- def test_callable_class_call(self):
- name = reflection.get_callable_name(CallableClass().__call__)
- self.assertEqual('.'.join((__name__, 'CallableClass',
- '__call__')), name)
-
-
-# These extended/special case tests only work on python 3, due to python 2
-# being broken/incorrect with regard to these special cases...
-@testtools.skipIf(not six.PY3, 'python 3.x is not currently available')
-class GetCallableNameTestExtended(test_base.BaseTestCase):
- # Tests items in http://legacy.python.org/dev/peps/pep-3155/
-
- class InnerCallableClass(object):
- def __call__(self):
- pass
-
- def test_inner_callable_class(self):
- obj = self.InnerCallableClass()
- name = reflection.get_callable_name(obj.__call__)
- expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
- 'InnerCallableClass', '__call__'))
- self.assertEqual(expected_name, name)
-
- def test_inner_callable_function(self):
- def a():
-
- def b():
- pass
-
- return b
-
- name = reflection.get_callable_name(a())
- expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
- 'test_inner_callable_function', '<locals>',
- 'a', '<locals>', 'b'))
- self.assertEqual(expected_name, name)
-
- def test_inner_class(self):
- obj = self.InnerCallableClass()
- name = reflection.get_callable_name(obj)
- expected_name = '.'.join((__name__,
- 'GetCallableNameTestExtended',
- 'InnerCallableClass'))
- self.assertEqual(expected_name, name)
-
-
-class GetCallableArgsTest(test_base.BaseTestCase):
-
- def test_mere_function(self):
- result = reflection.get_callable_args(mere_function)
- self.assertEqual(['a', 'b'], result)
-
- def test_function_with_defaults(self):
- result = reflection.get_callable_args(function_with_defs)
- self.assertEqual(['a', 'b', 'optional'], result)
-
- def test_required_only(self):
- result = reflection.get_callable_args(function_with_defs,
- required_only=True)
- self.assertEqual(['a', 'b'], result)
-
- def test_method(self):
- result = reflection.get_callable_args(Class.method)
- self.assertEqual(['self', 'c', 'd'], result)
-
- def test_instance_method(self):
- result = reflection.get_callable_args(Class().method)
- self.assertEqual(['c', 'd'], result)
-
- def test_class_method(self):
- result = reflection.get_callable_args(Class.class_method)
- self.assertEqual(['g', 'h'], result)
-
- def test_class_constructor(self):
- result = reflection.get_callable_args(ClassWithInit)
- self.assertEqual(['k', 'l'], result)
-
- def test_class_with_call(self):
- result = reflection.get_callable_args(CallableClass())
- self.assertEqual(['i', 'j'], result)
-
- def test_decorators_work(self):
- @dummy_decorator
- def special_fun(x, y):
- pass
- result = reflection.get_callable_args(special_fun)
- self.assertEqual(['x', 'y'], result)
-
-
-class AcceptsKwargsTest(test_base.BaseTestCase):
-
- def test_no_kwargs(self):
- self.assertEqual(False, reflection.accepts_kwargs(mere_function))
-
- def test_with_kwargs(self):
- self.assertEqual(True, reflection.accepts_kwargs(function_with_kwargs))
-
-
-class GetClassNameTest(test_base.BaseTestCase):
-
- def test_std_exception(self):
- name = reflection.get_class_name(RuntimeError)
- self.assertEqual('RuntimeError', name)
-
- def test_class(self):
- name = reflection.get_class_name(Class)
- self.assertEqual('.'.join((__name__, 'Class')), name)
-
- def test_instance(self):
- name = reflection.get_class_name(Class())
- self.assertEqual('.'.join((__name__, 'Class')), name)
-
- def test_int(self):
- name = reflection.get_class_name(42)
- self.assertEqual('int', name)
-
-
-class GetAllClassNamesTest(test_base.BaseTestCase):
-
- def test_std_class(self):
- names = list(reflection.get_all_class_names(RuntimeError))
- self.assertEqual(RUNTIME_ERROR_CLASSES, names)
-
- def test_std_class_up_to(self):
- names = list(reflection.get_all_class_names(RuntimeError,
- up_to=Exception))
- self.assertEqual(RUNTIME_ERROR_CLASSES[:-2], names)
diff --git a/tests/test_strutils.py b/tests/test_strutils.py
deleted file mode 100644
index 1ac80f9..0000000
--- a/tests/test_strutils.py
+++ /dev/null
@@ -1,594 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import math
-
-import mock
-from oslotest import base as test_base
-import six
-import testscenarios
-
-from oslo.utils import strutils
-from oslo.utils import units
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class StrUtilsTest(test_base.BaseTestCase):
-
- def test_bool_bool_from_string(self):
- self.assertTrue(strutils.bool_from_string(True))
- self.assertFalse(strutils.bool_from_string(False))
-
- def test_bool_bool_from_string_default(self):
- self.assertTrue(strutils.bool_from_string('', default=True))
- self.assertFalse(strutils.bool_from_string('wibble', default=False))
-
- def _test_bool_from_string(self, c):
- self.assertTrue(strutils.bool_from_string(c('true')))
- self.assertTrue(strutils.bool_from_string(c('TRUE')))
- self.assertTrue(strutils.bool_from_string(c('on')))
- self.assertTrue(strutils.bool_from_string(c('On')))
- self.assertTrue(strutils.bool_from_string(c('yes')))
- self.assertTrue(strutils.bool_from_string(c('YES')))
- self.assertTrue(strutils.bool_from_string(c('yEs')))
- self.assertTrue(strutils.bool_from_string(c('1')))
- self.assertTrue(strutils.bool_from_string(c('T')))
- self.assertTrue(strutils.bool_from_string(c('t')))
- self.assertTrue(strutils.bool_from_string(c('Y')))
- self.assertTrue(strutils.bool_from_string(c('y')))
-
- self.assertFalse(strutils.bool_from_string(c('false')))
- self.assertFalse(strutils.bool_from_string(c('FALSE')))
- self.assertFalse(strutils.bool_from_string(c('off')))
- self.assertFalse(strutils.bool_from_string(c('OFF')))
- self.assertFalse(strutils.bool_from_string(c('no')))
- self.assertFalse(strutils.bool_from_string(c('0')))
- self.assertFalse(strutils.bool_from_string(c('42')))
- self.assertFalse(strutils.bool_from_string(c(
- 'This should not be True')))
- self.assertFalse(strutils.bool_from_string(c('F')))
- self.assertFalse(strutils.bool_from_string(c('f')))
- self.assertFalse(strutils.bool_from_string(c('N')))
- self.assertFalse(strutils.bool_from_string(c('n')))
-
- # Whitespace should be stripped
- self.assertTrue(strutils.bool_from_string(c(' 1 ')))
- self.assertTrue(strutils.bool_from_string(c(' true ')))
- self.assertFalse(strutils.bool_from_string(c(' 0 ')))
- self.assertFalse(strutils.bool_from_string(c(' false ')))
-
- def test_bool_from_string(self):
- self._test_bool_from_string(lambda s: s)
-
- def test_unicode_bool_from_string(self):
- self._test_bool_from_string(six.text_type)
- self.assertFalse(strutils.bool_from_string(u'使用', strict=False))
-
- exc = self.assertRaises(ValueError, strutils.bool_from_string,
- u'使用', strict=True)
- expected_msg = (u"Unrecognized value '使用', acceptable values are:"
- u" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
- u" 't', 'true', 'y', 'yes'")
- self.assertEqual(expected_msg, six.text_type(exc))
-
- def test_other_bool_from_string(self):
- self.assertFalse(strutils.bool_from_string(None))
- self.assertFalse(strutils.bool_from_string(mock.Mock()))
-
- def test_int_bool_from_string(self):
- self.assertTrue(strutils.bool_from_string(1))
-
- self.assertFalse(strutils.bool_from_string(-1))
- self.assertFalse(strutils.bool_from_string(0))
- self.assertFalse(strutils.bool_from_string(2))
-
- def test_strict_bool_from_string(self):
- # None isn't allowed in strict mode
- exc = self.assertRaises(ValueError, strutils.bool_from_string, None,
- strict=True)
- expected_msg = ("Unrecognized value 'None', acceptable values are:"
- " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
- " 't', 'true', 'y', 'yes'")
- self.assertEqual(expected_msg, str(exc))
-
- # Unrecognized strings aren't allowed
- self.assertFalse(strutils.bool_from_string('Other', strict=False))
- exc = self.assertRaises(ValueError, strutils.bool_from_string, 'Other',
- strict=True)
- expected_msg = ("Unrecognized value 'Other', acceptable values are:"
- " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
- " 't', 'true', 'y', 'yes'")
- self.assertEqual(expected_msg, str(exc))
-
- # Unrecognized numbers aren't allowed
- exc = self.assertRaises(ValueError, strutils.bool_from_string, 2,
- strict=True)
- expected_msg = ("Unrecognized value '2', acceptable values are:"
- " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
- " 't', 'true', 'y', 'yes'")
- self.assertEqual(expected_msg, str(exc))
-
- # False-like values are allowed
- self.assertFalse(strutils.bool_from_string('f', strict=True))
- self.assertFalse(strutils.bool_from_string('false', strict=True))
- self.assertFalse(strutils.bool_from_string('off', strict=True))
- self.assertFalse(strutils.bool_from_string('n', strict=True))
- self.assertFalse(strutils.bool_from_string('no', strict=True))
- self.assertFalse(strutils.bool_from_string('0', strict=True))
-
- self.assertTrue(strutils.bool_from_string('1', strict=True))
-
- # Avoid font-similarity issues (one looks like lowercase-el, zero like
- # oh, etc...)
- for char in ('O', 'o', 'L', 'l', 'I', 'i'):
- self.assertRaises(ValueError, strutils.bool_from_string, char,
- strict=True)
-
- def test_int_from_bool_as_string(self):
- self.assertEqual(1, strutils.int_from_bool_as_string(True))
- self.assertEqual(0, strutils.int_from_bool_as_string(False))
-
- def test_slugify(self):
- to_slug = strutils.to_slug
- self.assertRaises(TypeError, to_slug, True)
- self.assertEqual(six.u("hello"), to_slug("hello"))
- self.assertEqual(six.u("two-words"), to_slug("Two Words"))
- self.assertEqual(six.u("ma-any-spa-ce-es"),
- to_slug("Ma-any\t spa--ce- es"))
- self.assertEqual(six.u("excamation"), to_slug("exc!amation!"))
- self.assertEqual(six.u("ampserand"), to_slug("&ampser$and"))
- self.assertEqual(six.u("ju5tnum8er"), to_slug("ju5tnum8er"))
- self.assertEqual(six.u("strip-"), to_slug(" strip - "))
- self.assertEqual(six.u("perche"), to_slug(six.b("perch\xc3\xa9")))
- self.assertEqual(six.u("strange"),
- to_slug("\x80strange", errors="ignore"))
-
-
-class StringToBytesTest(test_base.BaseTestCase):
-
- _unit_system = [
- ('si', dict(unit_system='SI')),
- ('iec', dict(unit_system='IEC')),
- ('invalid_unit_system', dict(unit_system='KKK', assert_error=True)),
- ]
-
- _sign = [
- ('no_sign', dict(sign='')),
- ('positive', dict(sign='+')),
- ('negative', dict(sign='-')),
- ('invalid_sign', dict(sign='~', assert_error=True)),
- ]
-
- _magnitude = [
- ('integer', dict(magnitude='79')),
- ('decimal', dict(magnitude='7.9')),
- ('decimal_point_start', dict(magnitude='.9')),
- ('decimal_point_end', dict(magnitude='79.', assert_error=True)),
- ('invalid_literal', dict(magnitude='7.9.9', assert_error=True)),
- ('garbage_value', dict(magnitude='asdf', assert_error=True)),
- ]
-
- _unit_prefix = [
- ('no_unit_prefix', dict(unit_prefix='')),
- ('k', dict(unit_prefix='k')),
- ('K', dict(unit_prefix='K')),
- ('M', dict(unit_prefix='M')),
- ('G', dict(unit_prefix='G')),
- ('T', dict(unit_prefix='T')),
- ('Ki', dict(unit_prefix='Ki')),
- ('Mi', dict(unit_prefix='Mi')),
- ('Gi', dict(unit_prefix='Gi')),
- ('Ti', dict(unit_prefix='Ti')),
- ('invalid_unit_prefix', dict(unit_prefix='B', assert_error=True)),
- ]
-
- _unit_suffix = [
- ('b', dict(unit_suffix='b')),
- ('bit', dict(unit_suffix='bit')),
- ('B', dict(unit_suffix='B')),
- ('invalid_unit_suffix', dict(unit_suffix='Kg', assert_error=True)),
- ]
-
- _return_int = [
- ('return_dec', dict(return_int=False)),
- ('return_int', dict(return_int=True)),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._unit_system,
- cls._sign,
- cls._magnitude,
- cls._unit_prefix,
- cls._unit_suffix,
- cls._return_int)
-
- def test_string_to_bytes(self):
-
- def _get_quantity(sign, magnitude, unit_suffix):
- res = float('%s%s' % (sign, magnitude))
- if unit_suffix in ['b', 'bit']:
- res /= 8
- return res
-
- def _get_constant(unit_prefix, unit_system):
- if not unit_prefix:
- return 1
- elif unit_system == 'SI':
- res = getattr(units, unit_prefix)
- elif unit_system == 'IEC':
- if unit_prefix.endswith('i'):
- res = getattr(units, unit_prefix)
- else:
- res = getattr(units, '%si' % unit_prefix)
- return res
-
- text = ''.join([self.sign, self.magnitude, self.unit_prefix,
- self.unit_suffix])
- err_si = self.unit_system == 'SI' and (self.unit_prefix == 'K' or
- self.unit_prefix.endswith('i'))
- err_iec = self.unit_system == 'IEC' and self.unit_prefix == 'k'
- if getattr(self, 'assert_error', False) or err_si or err_iec:
- self.assertRaises(ValueError, strutils.string_to_bytes,
- text, unit_system=self.unit_system,
- return_int=self.return_int)
- return
- quantity = _get_quantity(self.sign, self.magnitude, self.unit_suffix)
- constant = _get_constant(self.unit_prefix, self.unit_system)
- expected = quantity * constant
- actual = strutils.string_to_bytes(text, unit_system=self.unit_system,
- return_int=self.return_int)
- if self.return_int:
- self.assertEqual(actual, int(math.ceil(expected)))
- else:
- self.assertAlmostEqual(actual, expected)
-
-StringToBytesTest.generate_scenarios()
-
-
-class MaskPasswordTestCase(test_base.BaseTestCase):
-
- def test_json(self):
- # Test 'adminPass' w/o spaces
- payload = """{'adminPass':'mypassword'}"""
- expected = """{'adminPass':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'adminPass' with spaces
- payload = """{ 'adminPass' : 'mypassword' }"""
- expected = """{ 'adminPass' : '***' }"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' w/o spaces
- payload = """{'admin_pass':'mypassword'}"""
- expected = """{'admin_pass':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' with spaces
- payload = """{ 'admin_pass' : 'mypassword' }"""
- expected = """{ 'admin_pass' : '***' }"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' w/o spaces
- payload = """{'admin_password':'mypassword'}"""
- expected = """{'admin_password':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' with spaces
- payload = """{ 'admin_password' : 'mypassword' }"""
- expected = """{ 'admin_password' : '***' }"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' w/o spaces
- payload = """{'password':'mypassword'}"""
- expected = """{'password':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' with spaces
- payload = """{ 'password' : 'mypassword' }"""
- expected = """{ 'password' : '***' }"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'auth_password' w/o spaces
- payload = """{'auth_password':'mypassword'}"""
- expected = """{'auth_password':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'auth_password' with spaces
- payload = """{ 'auth_password' : 'mypassword' }"""
- expected = """{ 'auth_password' : '***' }"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'secret_uuid' w/o spaces
- payload = """{'secret_uuid':'myuuid'}"""
- expected = """{'secret_uuid':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'secret_uuid' with spaces
- payload = """{ 'secret_uuid' : 'myuuid' }"""
- expected = """{ 'secret_uuid' : '***' }"""
- self.assertEqual(expected, strutils.mask_password(payload))
-
- def test_xml(self):
- # Test 'adminPass' w/o spaces
- payload = """<adminPass>mypassword</adminPass>"""
- expected = """<adminPass>***</adminPass>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'adminPass' with spaces
- payload = """<adminPass>
- mypassword
- </adminPass>"""
- expected = """<adminPass>***</adminPass>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' w/o spaces
- payload = """<admin_pass>mypassword</admin_pass>"""
- expected = """<admin_pass>***</admin_pass>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' with spaces
- payload = """<admin_pass>
- mypassword
- </admin_pass>"""
- expected = """<admin_pass>***</admin_pass>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' w/o spaces
- payload = """<admin_password>mypassword</admin_password>"""
- expected = """<admin_password>***</admin_password>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' with spaces
- payload = """<admin_password>
- mypassword
- </admin_password>"""
- expected = """<admin_password>***</admin_password>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' w/o spaces
- payload = """<password>mypassword</password>"""
- expected = """<password>***</password>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' with spaces
- payload = """<password>
- mypassword
- </password>"""
- expected = """<password>***</password>"""
- self.assertEqual(expected, strutils.mask_password(payload))
-
- def test_xml_attribute(self):
- # Test 'adminPass' w/o spaces
- payload = """adminPass='mypassword'"""
- expected = """adminPass='***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'adminPass' with spaces
- payload = """adminPass = 'mypassword'"""
- expected = """adminPass = '***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'adminPass' with double quotes
- payload = """adminPass = "mypassword\""""
- expected = """adminPass = "***\""""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' w/o spaces
- payload = """admin_pass='mypassword'"""
- expected = """admin_pass='***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' with spaces
- payload = """admin_pass = 'mypassword'"""
- expected = """admin_pass = '***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_pass' with double quotes
- payload = """admin_pass = "mypassword\""""
- expected = """admin_pass = "***\""""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' w/o spaces
- payload = """admin_password='mypassword'"""
- expected = """admin_password='***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' with spaces
- payload = """admin_password = 'mypassword'"""
- expected = """admin_password = '***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'admin_password' with double quotes
- payload = """admin_password = "mypassword\""""
- expected = """admin_password = "***\""""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' w/o spaces
- payload = """password='mypassword'"""
- expected = """password='***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' with spaces
- payload = """password = 'mypassword'"""
- expected = """password = '***'"""
- self.assertEqual(expected, strutils.mask_password(payload))
- # Test 'password' with double quotes
- payload = """password = "mypassword\""""
- expected = """password = "***\""""
- self.assertEqual(expected, strutils.mask_password(payload))
-
- def test_json_message(self):
- payload = """body: {"changePassword": {"adminPass": "1234567"}}"""
- expected = """body: {"changePassword": {"adminPass": "***"}}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- payload = """body: {"rescue": {"admin_pass": "1234567"}}"""
- expected = """body: {"rescue": {"admin_pass": "***"}}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- payload = """body: {"rescue": {"admin_password": "1234567"}}"""
- expected = """body: {"rescue": {"admin_password": "***"}}"""
- self.assertEqual(expected, strutils.mask_password(payload))
- payload = """body: {"rescue": {"password": "1234567"}}"""
- expected = """body: {"rescue": {"password": "***"}}"""
- self.assertEqual(expected, strutils.mask_password(payload))
-
- def test_xml_message(self):
- payload = """<?xml version="1.0" encoding="UTF-8"?>
-<rebuild
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- name="foobar"
- imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
- accessIPv4="1.2.3.4"
- accessIPv6="fe80::100"
- adminPass="seekr3t">
- <metadata>
- <meta key="My Server Name">Apache1</meta>
- </metadata>
-</rebuild>"""
- expected = """<?xml version="1.0" encoding="UTF-8"?>
-<rebuild
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- name="foobar"
- imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
- accessIPv4="1.2.3.4"
- accessIPv6="fe80::100"
- adminPass="***">
- <metadata>
- <meta key="My Server Name">Apache1</meta>
- </metadata>
-</rebuild>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- payload = """<?xml version="1.0" encoding="UTF-8"?>
-<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
- admin_pass="MySecretPass"/>"""
- expected = """<?xml version="1.0" encoding="UTF-8"?>
-<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
- admin_pass="***"/>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- payload = """<?xml version="1.0" encoding="UTF-8"?>
-<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
- admin_password="MySecretPass"/>"""
- expected = """<?xml version="1.0" encoding="UTF-8"?>
-<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
- admin_password="***"/>"""
- self.assertEqual(expected, strutils.mask_password(payload))
- payload = """<?xml version="1.0" encoding="UTF-8"?>
-<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
- password="MySecretPass"/>"""
- expected = """<?xml version="1.0" encoding="UTF-8"?>
-<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
- password="***"/>"""
- self.assertEqual(expected, strutils.mask_password(payload))
-
- def test_mask_password(self):
- payload = "test = 'password' : 'aaaaaa'"
- expected = "test = 'password' : '111'"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='111'))
-
- payload = 'mysqld --password "aaaaaa"'
- expected = 'mysqld --password "****"'
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='****'))
-
- payload = 'mysqld --password aaaaaa'
- expected = 'mysqld --password ???'
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='???'))
-
- payload = 'mysqld --password = "aaaaaa"'
- expected = 'mysqld --password = "****"'
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='****'))
-
- payload = "mysqld --password = 'aaaaaa'"
- expected = "mysqld --password = '****'"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='****'))
-
- payload = "mysqld --password = aaaaaa"
- expected = "mysqld --password = ****"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='****'))
-
- payload = "test = password = aaaaaa"
- expected = "test = password = 111"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='111'))
-
- payload = "test = password= aaaaaa"
- expected = "test = password= 111"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='111'))
-
- payload = "test = password =aaaaaa"
- expected = "test = password =111"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='111'))
-
- payload = "test = password=aaaaaa"
- expected = "test = password=111"
- self.assertEqual(expected,
- strutils.mask_password(payload, secret='111'))
-
- payload = 'test = "original_password" : "aaaaaaaaa"'
- expected = 'test = "original_password" : "***"'
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = 'test = "param1" : "value"'
- expected = 'test = "param1" : "value"'
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = """{'adminPass':'mypassword'}"""
- payload = six.text_type(payload)
- expected = """{'adminPass':'***'}"""
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = ("test = 'node.session.auth.password','-v','mypassword',"
- "'nomask'")
- expected = ("test = 'node.session.auth.password','-v','***',"
- "'nomask'")
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = ("test = 'node.session.auth.password', '--password', "
- "'mypassword', 'nomask'")
- expected = ("test = 'node.session.auth.password', '--password', "
- "'***', 'nomask'")
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = ("test = 'node.session.auth.password', '--password', "
- "'mypassword'")
- expected = ("test = 'node.session.auth.password', '--password', "
- "'***'")
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = "test = node.session.auth.password -v mypassword nomask"
- expected = "test = node.session.auth.password -v *** nomask"
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = ("test = node.session.auth.password --password mypassword "
- "nomask")
- expected = ("test = node.session.auth.password --password *** "
- "nomask")
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = ("test = node.session.auth.password --password mypassword")
- expected = ("test = node.session.auth.password --password ***")
- self.assertEqual(expected, strutils.mask_password(payload))
-
- payload = "test = cmd --password my\xe9\x80\x80pass"
- expected = ("test = cmd --password ***")
- self.assertEqual(expected, strutils.mask_password(payload))
-
-
-class IsIntLikeTestCase(test_base.BaseTestCase):
- def test_is_int_like_true(self):
- self.assertTrue(strutils.is_int_like(1))
- self.assertTrue(strutils.is_int_like("1"))
- self.assertTrue(strutils.is_int_like("514"))
- self.assertTrue(strutils.is_int_like("0"))
-
- def test_is_int_like_false(self):
- self.assertFalse(strutils.is_int_like(1.1))
- self.assertFalse(strutils.is_int_like("1.1"))
- self.assertFalse(strutils.is_int_like("1.1.1"))
- self.assertFalse(strutils.is_int_like(None))
- self.assertFalse(strutils.is_int_like("0."))
- self.assertFalse(strutils.is_int_like("aaaaaa"))
- self.assertFalse(strutils.is_int_like("...."))
- self.assertFalse(strutils.is_int_like("1g"))
- self.assertFalse(
- strutils.is_int_like("0cc3346e-9fef-4445-abe6-5d2b2690ec64"))
- self.assertFalse(strutils.is_int_like("a1"))
- # NOTE(viktors): 12e3 - is a float number
- self.assertFalse(strutils.is_int_like("12e3"))
- # NOTE(viktors): Check integer numbers with base not 10
- self.assertFalse(strutils.is_int_like("0o51"))
- self.assertFalse(strutils.is_int_like("0xDEADBEEF"))
diff --git a/tests/test_timeutils.py b/tests/test_timeutils.py
deleted file mode 100644
index bc13a31..0000000
--- a/tests/test_timeutils.py
+++ /dev/null
@@ -1,359 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import calendar
-import datetime
-import time
-
-import iso8601
-import mock
-from oslotest import base as test_base
-from testtools import matchers
-
-from oslo.utils import timeutils
-
-
-class TimeUtilsTest(test_base.BaseTestCase):
-
- def setUp(self):
- super(TimeUtilsTest, self).setUp()
- self.skynet_self_aware_time_str = '1997-08-29T06:14:00Z'
- self.skynet_self_aware_time_ms_str = '1997-08-29T06:14:00.000123Z'
- self.skynet_self_aware_time = datetime.datetime(1997, 8, 29, 6, 14, 0)
- self.skynet_self_aware_ms_time = datetime.datetime(
- 1997, 8, 29, 6, 14, 0, 123)
- self.one_minute_before = datetime.datetime(1997, 8, 29, 6, 13, 0)
- self.one_minute_after = datetime.datetime(1997, 8, 29, 6, 15, 0)
- self.skynet_self_aware_time_perfect_str = '1997-08-29T06:14:00.000000'
- self.skynet_self_aware_time_perfect = datetime.datetime(1997, 8, 29,
- 6, 14, 0)
- self.addCleanup(timeutils.clear_time_override)
-
- def test_isotime(self):
- with mock.patch('datetime.datetime') as datetime_mock:
- datetime_mock.utcnow.return_value = self.skynet_self_aware_time
- dt = timeutils.isotime()
- self.assertEqual(dt, self.skynet_self_aware_time_str)
-
- def test_isotimei_micro_second_precision(self):
- with mock.patch('datetime.datetime') as datetime_mock:
- datetime_mock.utcnow.return_value = self.skynet_self_aware_ms_time
- dt = timeutils.isotime(subsecond=True)
- self.assertEqual(dt, self.skynet_self_aware_time_ms_str)
-
- def test_parse_isotime(self):
- expect = timeutils.parse_isotime(self.skynet_self_aware_time_str)
- skynet_self_aware_time_utc = self.skynet_self_aware_time.replace(
- tzinfo=iso8601.iso8601.UTC)
- self.assertEqual(skynet_self_aware_time_utc, expect)
-
- def test_parse_isotime_micro_second_precision(self):
- expect = timeutils.parse_isotime(self.skynet_self_aware_time_ms_str)
- skynet_self_aware_time_ms_utc = self.skynet_self_aware_ms_time.replace(
- tzinfo=iso8601.iso8601.UTC)
- self.assertEqual(skynet_self_aware_time_ms_utc, expect)
-
- def test_strtime(self):
- expect = timeutils.strtime(self.skynet_self_aware_time_perfect)
- self.assertEqual(self.skynet_self_aware_time_perfect_str, expect)
-
- def test_parse_strtime(self):
- perfect_time_format = self.skynet_self_aware_time_perfect_str
- expect = timeutils.parse_strtime(perfect_time_format)
- self.assertEqual(self.skynet_self_aware_time_perfect, expect)
-
- def test_strtime_and_back(self):
- orig_t = datetime.datetime(1997, 8, 29, 6, 14, 0)
- s = timeutils.strtime(orig_t)
- t = timeutils.parse_strtime(s)
- self.assertEqual(orig_t, t)
-
- def _test_is_older_than(self, fn):
- strptime = datetime.datetime.strptime
- with mock.patch('datetime.datetime') as datetime_mock:
- datetime_mock.utcnow.return_value = self.skynet_self_aware_time
- datetime_mock.strptime = strptime
- expect_true = timeutils.is_older_than(fn(self.one_minute_before),
- 59)
- self.assertTrue(expect_true)
- expect_false = timeutils.is_older_than(fn(self.one_minute_before),
- 60)
- self.assertFalse(expect_false)
- expect_false = timeutils.is_older_than(fn(self.one_minute_before),
- 61)
- self.assertFalse(expect_false)
-
- def test_is_older_than_datetime(self):
- self._test_is_older_than(lambda x: x)
-
- def test_is_older_than_str(self):
- self._test_is_older_than(timeutils.strtime)
-
- def test_is_older_than_aware(self):
- """Tests sending is_older_than an 'aware' datetime."""
- self._test_is_older_than(lambda x: x.replace(
- tzinfo=iso8601.iso8601.UTC))
-
- def _test_is_newer_than(self, fn):
- strptime = datetime.datetime.strptime
- with mock.patch('datetime.datetime') as datetime_mock:
- datetime_mock.utcnow.return_value = self.skynet_self_aware_time
- datetime_mock.strptime = strptime
- expect_true = timeutils.is_newer_than(fn(self.one_minute_after),
- 59)
- self.assertTrue(expect_true)
- expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
- 60)
- self.assertFalse(expect_false)
- expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
- 61)
- self.assertFalse(expect_false)
-
- def test_is_newer_than_datetime(self):
- self._test_is_newer_than(lambda x: x)
-
- def test_is_newer_than_str(self):
- self._test_is_newer_than(timeutils.strtime)
-
- def test_is_newer_than_aware(self):
- """Tests sending is_newer_than an 'aware' datetime."""
- self._test_is_newer_than(lambda x: x.replace(
- tzinfo=iso8601.iso8601.UTC))
-
- def test_set_time_override_using_default(self):
- now = timeutils.utcnow_ts()
-
- # NOTE(kgriffs): Normally it's bad form to sleep in a unit test,
- # but this is the only way to test that set_time_override defaults
- # to setting the override to the current time.
- time.sleep(1)
-
- timeutils.set_time_override()
- overriden_now = timeutils.utcnow_ts()
- self.assertThat(now, matchers.LessThan(overriden_now))
-
- def test_utcnow_ts(self):
- skynet_self_aware_ts = 872835240
- skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts)
- self.assertEqual(self.skynet_self_aware_time, skynet_dt)
-
- # NOTE(kgriffs): timeutils.utcnow_ts() uses time.time()
- # IFF time override is not set.
- with mock.patch('time.time') as time_mock:
- time_mock.return_value = skynet_self_aware_ts
- ts = timeutils.utcnow_ts()
- self.assertEqual(ts, skynet_self_aware_ts)
-
- timeutils.set_time_override(skynet_dt)
- ts = timeutils.utcnow_ts()
- self.assertEqual(ts, skynet_self_aware_ts)
-
- def test_utcnow_ts_microsecond(self):
- skynet_self_aware_ts = 872835240.000123
- skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts)
- self.assertEqual(self.skynet_self_aware_ms_time, skynet_dt)
-
- # NOTE(kgriffs): timeutils.utcnow_ts() uses time.time()
- # IFF time override is not set.
- with mock.patch('time.time') as time_mock:
- time_mock.return_value = skynet_self_aware_ts
- ts = timeutils.utcnow_ts(microsecond=True)
- self.assertEqual(ts, skynet_self_aware_ts)
-
- timeutils.set_time_override(skynet_dt)
- ts = timeutils.utcnow_ts(microsecond=True)
- self.assertEqual(ts, skynet_self_aware_ts)
-
- def test_utcnow(self):
- timeutils.set_time_override(mock.sentinel.utcnow)
- self.assertEqual(timeutils.utcnow(), mock.sentinel.utcnow)
-
- timeutils.clear_time_override()
- self.assertFalse(timeutils.utcnow() == mock.sentinel.utcnow)
-
- self.assertTrue(timeutils.utcnow())
-
- self.assertEqual(timeutils.utcnow(True).tzinfo,
- iso8601.iso8601.UTC)
-
- def test_advance_time_delta(self):
- timeutils.set_time_override(self.one_minute_before)
- timeutils.advance_time_delta(datetime.timedelta(seconds=60))
- self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
-
- def test_advance_time_seconds(self):
- timeutils.set_time_override(self.one_minute_before)
- timeutils.advance_time_seconds(60)
- self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
-
- def test_marshall_time(self):
- now = timeutils.utcnow()
- binary = timeutils.marshall_now(now)
- backagain = timeutils.unmarshall_time(binary)
- self.assertEqual(now, backagain)
-
- def test_delta_seconds(self):
- before = timeutils.utcnow()
- after = before + datetime.timedelta(days=7, seconds=59,
- microseconds=123456)
- self.assertAlmostEquals(604859.123456,
- timeutils.delta_seconds(before, after))
-
- def test_total_seconds(self):
- delta = datetime.timedelta(days=1, hours=2, minutes=3, seconds=4.5)
- self.assertAlmostEquals(93784.5,
- timeutils.total_seconds(delta))
-
- def test_iso8601_from_timestamp(self):
- utcnow = timeutils.utcnow()
- iso = timeutils.isotime(utcnow)
- ts = calendar.timegm(utcnow.timetuple())
- self.assertEqual(iso, timeutils.iso8601_from_timestamp(ts))
-
- def test_is_soon(self):
- expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
- self.assertFalse(timeutils.is_soon(expires, 120))
- self.assertTrue(timeutils.is_soon(expires, 300))
- self.assertTrue(timeutils.is_soon(expires, 600))
-
- with mock.patch('datetime.datetime') as datetime_mock:
- datetime_mock.utcnow.return_value = self.skynet_self_aware_time
- expires = timeutils.utcnow()
- self.assertTrue(timeutils.is_soon(expires, 0))
-
-
-class TestIso8601Time(test_base.BaseTestCase):
-
- def _instaneous(self, timestamp, yr, mon, day, hr, minute, sec, micro):
- self.assertEqual(timestamp.year, yr)
- self.assertEqual(timestamp.month, mon)
- self.assertEqual(timestamp.day, day)
- self.assertEqual(timestamp.hour, hr)
- self.assertEqual(timestamp.minute, minute)
- self.assertEqual(timestamp.second, sec)
- self.assertEqual(timestamp.microsecond, micro)
-
- def _do_test(self, time_str, yr, mon, day, hr, minute, sec, micro, shift):
- DAY_SECONDS = 24 * 60 * 60
- timestamp = timeutils.parse_isotime(time_str)
- self._instaneous(timestamp, yr, mon, day, hr, minute, sec, micro)
- offset = timestamp.tzinfo.utcoffset(None)
- self.assertEqual(offset.seconds + offset.days * DAY_SECONDS, shift)
-
- def test_zulu(self):
- time_str = '2012-02-14T20:53:07Z'
- self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, 0)
-
- def test_zulu_micros(self):
- time_str = '2012-02-14T20:53:07.123Z'
- self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 123000, 0)
-
- def test_offset_east(self):
- time_str = '2012-02-14T20:53:07+04:30'
- offset = 4.5 * 60 * 60
- self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
-
- def test_offset_east_micros(self):
- time_str = '2012-02-14T20:53:07.42+04:30'
- offset = 4.5 * 60 * 60
- self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 420000, offset)
-
- def test_offset_west(self):
- time_str = '2012-02-14T20:53:07-05:30'
- offset = -5.5 * 60 * 60
- self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
-
- def test_offset_west_micros(self):
- time_str = '2012-02-14T20:53:07.654321-05:30'
- offset = -5.5 * 60 * 60
- self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 654321, offset)
-
- def test_compare(self):
- zulu = timeutils.parse_isotime('2012-02-14T20:53:07')
- east = timeutils.parse_isotime('2012-02-14T20:53:07-01:00')
- west = timeutils.parse_isotime('2012-02-14T20:53:07+01:00')
- self.assertTrue(east > west)
- self.assertTrue(east > zulu)
- self.assertTrue(zulu > west)
-
- def test_compare_micros(self):
- zulu = timeutils.parse_isotime('2012-02-14T20:53:07.6544')
- east = timeutils.parse_isotime('2012-02-14T19:53:07.654321-01:00')
- west = timeutils.parse_isotime('2012-02-14T21:53:07.655+01:00')
- self.assertTrue(east < west)
- self.assertTrue(east < zulu)
- self.assertTrue(zulu < west)
-
- def test_zulu_roundtrip(self):
- time_str = '2012-02-14T20:53:07Z'
- zulu = timeutils.parse_isotime(time_str)
- self.assertEqual(zulu.tzinfo, iso8601.iso8601.UTC)
- self.assertEqual(timeutils.isotime(zulu), time_str)
-
- def test_east_roundtrip(self):
- time_str = '2012-02-14T20:53:07-07:00'
- east = timeutils.parse_isotime(time_str)
- self.assertEqual(east.tzinfo.tzname(None), '-07:00')
- self.assertEqual(timeutils.isotime(east), time_str)
-
- def test_west_roundtrip(self):
- time_str = '2012-02-14T20:53:07+11:30'
- west = timeutils.parse_isotime(time_str)
- self.assertEqual(west.tzinfo.tzname(None), '+11:30')
- self.assertEqual(timeutils.isotime(west), time_str)
-
- def test_now_roundtrip(self):
- time_str = timeutils.isotime()
- now = timeutils.parse_isotime(time_str)
- self.assertEqual(now.tzinfo, iso8601.iso8601.UTC)
- self.assertEqual(timeutils.isotime(now), time_str)
-
- def test_zulu_normalize(self):
- time_str = '2012-02-14T20:53:07Z'
- zulu = timeutils.parse_isotime(time_str)
- normed = timeutils.normalize_time(zulu)
- self._instaneous(normed, 2012, 2, 14, 20, 53, 7, 0)
-
- def test_east_normalize(self):
- time_str = '2012-02-14T20:53:07-07:00'
- east = timeutils.parse_isotime(time_str)
- normed = timeutils.normalize_time(east)
- self._instaneous(normed, 2012, 2, 15, 3, 53, 7, 0)
-
- def test_west_normalize(self):
- time_str = '2012-02-14T20:53:07+21:00'
- west = timeutils.parse_isotime(time_str)
- normed = timeutils.normalize_time(west)
- self._instaneous(normed, 2012, 2, 13, 23, 53, 7, 0)
-
- def test_normalize_aware_to_naive(self):
- dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
- time_str = '2011-02-14T20:53:07+21:00'
- aware = timeutils.parse_isotime(time_str)
- naive = timeutils.normalize_time(aware)
- self.assertTrue(naive < dt)
-
- def test_normalize_zulu_aware_to_naive(self):
- dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
- time_str = '2011-02-14T19:53:07Z'
- aware = timeutils.parse_isotime(time_str)
- naive = timeutils.normalize_time(aware)
- self.assertTrue(naive < dt)
-
- def test_normalize_naive(self):
- dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
- dtn = datetime.datetime(2011, 2, 14, 19, 53, 7)
- naive = timeutils.normalize_time(dtn)
- self.assertTrue(naive < dt)
diff --git a/tests/test_uuidutils.py b/tests/test_uuidutils.py
deleted file mode 100644
index 63e5551..0000000
--- a/tests/test_uuidutils.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright (c) 2012 Intel Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-from oslotest import base as test_base
-
-from oslo.utils import uuidutils
-
-
-class UUIDUtilsTest(test_base.BaseTestCase):
-
- def test_generate_uuid(self):
- uuid_string = uuidutils.generate_uuid()
- self.assertTrue(isinstance(uuid_string, str))
- self.assertEqual(len(uuid_string), 36)
- # make sure there are 4 dashes
- self.assertEqual(len(uuid_string.replace('-', '')), 32)
-
- def test_is_uuid_like(self):
- self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4())))
- self.assertTrue(uuidutils.is_uuid_like(
- '{12345678-1234-5678-1234-567812345678}'))
- self.assertTrue(uuidutils.is_uuid_like(
- '12345678123456781234567812345678'))
- self.assertTrue(uuidutils.is_uuid_like(
- 'urn:uuid:12345678-1234-5678-1234-567812345678'))
- self.assertTrue(uuidutils.is_uuid_like(
- 'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
- self.assertTrue(uuidutils.is_uuid_like(
- 'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
- self.assertTrue(uuidutils.is_uuid_like(
- '{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
-
- def test_is_uuid_like_insensitive(self):
- self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
-
- def test_id_is_uuid_like(self):
- self.assertFalse(uuidutils.is_uuid_like(1234567))
-
- def test_name_is_uuid_like(self):
- self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))
diff --git a/tests/test_warning.py b/tests/test_warning.py
deleted file mode 100644
index 306d93d..0000000
--- a/tests/test_warning.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import imp
-import os
-import warnings
-
-import mock
-from oslotest import base as test_base
-import six
-
-
-class DeprecationWarningTest(test_base.BaseTestCase):
-
- @mock.patch('warnings.warn')
- def test_warning(self, mock_warn):
- import oslo.utils
- imp.reload(oslo.utils)
- self.assertTrue(mock_warn.called)
- args = mock_warn.call_args
- self.assertIn('oslo_utils', args[0][0])
- self.assertIn('deprecated', args[0][0])
- self.assertTrue(issubclass(args[0][1], DeprecationWarning))
-
- def test_real_warning(self):
- with warnings.catch_warnings(record=True) as warning_msgs:
- warnings.resetwarnings()
- warnings.simplefilter('always', DeprecationWarning)
- import oslo.utils
-
- # Use a separate function to get the stack level correct
- # so we know the message points back to this file. This
- # corresponds to an import or reload, which isn't working
- # inside the test under Python 3.3. That may be due to a
- # difference in the import implementation not triggering
- # warnings properly when the module is reloaded, or
- # because the warnings module is mostly implemented in C
- # and something isn't cleanly resetting the global state
- # used to track whether a warning needs to be
- # emitted. Whatever the cause, we definitely see the
- # warnings.warn() being invoked on a reload (see the test
- # above) and warnings are reported on the console when we
- # run the tests. A simpler test script run outside of
- # testr does correctly report the warnings.
- def foo():
- oslo.utils.deprecated()
-
- foo()
- self.assertEqual(1, len(warning_msgs))
- msg = warning_msgs[0]
- self.assertIn('oslo_utils', six.text_type(msg.message))
- self.assertEqual('test_warning.py', os.path.basename(msg.filename))
diff --git a/tests/tests_encodeutils.py b/tests/tests_encodeutils.py
deleted file mode 100644
index 5e50d3a..0000000
--- a/tests/tests_encodeutils.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2014 Red Hat, Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-from oslotest import base as test_base
-import six
-
-from oslo.utils import encodeutils
-
-
-class EncodeUtilsTest(test_base.BaseTestCase):
-
- def test_safe_decode(self):
- safe_decode = encodeutils.safe_decode
- self.assertRaises(TypeError, safe_decode, True)
- self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"),
- incoming="utf-8"))
- if six.PY2:
- # In Python 3, bytes.decode() doesn't support anymore
- # bytes => bytes encodings like base64
- self.assertEqual(six.u("test"), safe_decode("dGVzdA==",
- incoming='base64'))
-
- self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'),
- errors='ignore'))
-
- self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'),
- incoming='iso-8859-1'))
-
- # Forcing incoming to ascii so it falls back to utf-8
- self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'),
- incoming='ascii'))
-
- self.assertEqual(six.u('foo'), safe_decode(b'foo'))
-
- def test_safe_encode_none_instead_of_text(self):
- self.assertRaises(TypeError, encodeutils.safe_encode, None)
-
- def test_safe_encode_bool_instead_of_text(self):
- self.assertRaises(TypeError, encodeutils.safe_encode, True)
-
- def test_safe_encode_int_instead_of_text(self):
- self.assertRaises(TypeError, encodeutils.safe_encode, 1)
-
- def test_safe_encode_list_instead_of_text(self):
- self.assertRaises(TypeError, encodeutils.safe_encode, [])
-
- def test_safe_encode_dict_instead_of_text(self):
- self.assertRaises(TypeError, encodeutils.safe_encode, {})
-
- def test_safe_encode_tuple_instead_of_text(self):
- self.assertRaises(TypeError, encodeutils.safe_encode, ('foo', 'bar', ))
-
- def test_safe_encode_py2(self):
- if six.PY2:
- # In Python 3, str.encode() doesn't support anymore
- # text => text encodings like base64
- self.assertEqual(
- six.b("dGVzdA==\n"),
- encodeutils.safe_encode("test", encoding='base64'),
- )
- else:
- self.skipTest("Requires py2.x")
-
- def test_safe_encode_force_incoming_utf8_to_ascii(self):
- # Forcing incoming to ascii so it falls back to utf-8
- self.assertEqual(
- six.b('ni\xc3\xb1o'),
- encodeutils.safe_encode(six.b('ni\xc3\xb1o'), incoming='ascii'),
- )
-
- def test_safe_encode_same_encoding_different_cases(self):
- with mock.patch.object(encodeutils, 'safe_decode', mock.Mock()):
- utf8 = encodeutils.safe_encode(
- six.u('foo\xf1bar'), encoding='utf-8')
- self.assertEqual(
- encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
- encodeutils.safe_encode(utf8, 'utf-8', 'UTF-8'),
- )
- self.assertEqual(
- encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
- encodeutils.safe_encode(utf8, 'utf-8', 'utf-8'),
- )
- encodeutils.safe_decode.assert_has_calls([])
-
- def test_safe_encode_different_encodings(self):
- text = six.u('foo\xc3\xb1bar')
- result = encodeutils.safe_encode(
- text=text, incoming='utf-8', encoding='iso-8859-1')
- self.assertNotEqual(text, result)
- self.assertNotEqual(six.b("foo\xf1bar"), result)