diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-08-31 15:22:00 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-08-31 15:22:00 -0400 |
| commit | 3c60d3b1ca492ba77d64111f0378892acaadf36b (patch) | |
| tree | 5e2df552a7142cd3bb2ef9e5236db85e5c2859f4 /lib/sqlalchemy/testing | |
| parent | 903b0a42e71c81ff99494352760c0f92fa7a486d (diff) | |
| download | sqlalchemy-3c60d3b1ca492ba77d64111f0378892acaadf36b.tar.gz | |
- A new style of warning can be emitted which will "filter" up to
N occurrences of a parameterized string. This allows parameterized
warnings that can refer to their arguments to be delivered a fixed
number of times until allowing Python warning filters to squelch them,
and prevents memory from growing unbounded within Python's
warning registries.
fixes #3178
Diffstat (limited to 'lib/sqlalchemy/testing')
| -rw-r--r-- | lib/sqlalchemy/testing/__init__.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/assertions.py | 129 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/plugin/plugin_base.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/warnings.py | 42 |
4 files changed, 76 insertions, 101 deletions
diff --git a/lib/sqlalchemy/testing/__init__.py b/lib/sqlalchemy/testing/__init__.py index 8f8f56412..e53fb28b1 100644 --- a/lib/sqlalchemy/testing/__init__.py +++ b/lib/sqlalchemy/testing/__init__.py @@ -6,7 +6,7 @@ # the MIT License: http://www.opensource.org/licenses/mit-license.php -from .warnings import testing_warn, assert_warnings, resetwarnings +from .warnings import assert_warnings from . import config diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 79411af7e..dbe365ad5 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -9,79 +9,79 @@ from __future__ import absolute_import from . import util as testutil from sqlalchemy import pool, orm, util -from sqlalchemy.engine import default, create_engine, url -from sqlalchemy import exc as sa_exc +from sqlalchemy.engine import default, url from sqlalchemy.util import decorator from sqlalchemy import types as sqltypes, schema import warnings import re -from .warnings import resetwarnings from .exclusions import db_spec, _is_excluded from . import assertsql from . import config -import itertools from .util import fail import contextlib +from . import mock -def emits_warning(*messages): - """Mark a test as emitting a warning. +def expect_warnings(*messages): + """Context manager which expects one or more warnings. + + With no arguments, squelches all SAWarnings emitted via + sqlalchemy.util.warn and sqlalchemy.util.warn_limited. Otherwise + pass string expressions that will match selected warnings via regex; + all non-matching warnings are sent through. + + Note that the test suite sets SAWarning warnings to raise exceptions. + + """ + return _expect_warnings( + "sqlalchemy.util.deprecations.warnings.warn", messages) + + +@contextlib.contextmanager +def expect_warnings_on(db, *messages): + """Context manager which expects one or more warnings on specific + dialects. - With no arguments, squelches all SAWarning failures. Or pass one or more - strings; these will be matched to the root of the warning description by - warnings.filterwarnings(). """ - # TODO: it would be nice to assert that a named warning was - # emitted. should work with some monkeypatching of warnings, - # and may work on non-CPython if they keep to the spirit of - # warnings.showwarning's docstring. - # - update: jython looks ok, it uses cpython's module + spec = db_spec(db) + + if isinstance(db, util.string_types) and not spec(config._current): + yield + elif not _is_excluded(*db): + yield + else: + with expect_warnings(*messages): + yield + + +def emits_warning(*messages): + """Decorator form of expect_warnings().""" @decorator def decorate(fn, *args, **kw): - # todo: should probably be strict about this, too - filters = [dict(action='ignore', - category=sa_exc.SAPendingDeprecationWarning)] - if not messages: - filters.append(dict(action='ignore', - category=sa_exc.SAWarning)) - else: - filters.extend(dict(action='ignore', - message=message, - category=sa_exc.SAWarning) - for message in messages) - for f in filters: - warnings.filterwarnings(**f) - try: + with expect_warnings(*messages): return fn(*args, **kw) - finally: - resetwarnings() + return decorate -def emits_warning_on(db, *warnings): +def expect_deprecated(*messages): + return _expect_warnings( + "sqlalchemy.util.deprecations.warnings.warn", messages) + + +def emits_warning_on(db, *messages): """Mark a test as emitting a warning on a specific dialect. With no arguments, squelches all SAWarning failures. Or pass one or more strings; these will be matched to the root of the warning description by warnings.filterwarnings(). """ - spec = db_spec(db) - @decorator def decorate(fn, *args, **kw): - if isinstance(db, util.string_types): - if not spec(config._current): - return fn(*args, **kw) - else: - wrapped = emits_warning(*warnings)(fn) - return wrapped(*args, **kw) - else: - if not _is_excluded(*db): - return fn(*args, **kw) - else: - wrapped = emits_warning(*warnings)(fn) - return wrapped(*args, **kw) + with expect_warnings_on(db, *messages): + return fn(*args, **kw) + return decorate @@ -105,29 +105,24 @@ def uses_deprecated(*messages): @contextlib.contextmanager -def expect_deprecated(*messages): - # todo: should probably be strict about this, too - filters = [dict(action='ignore', - category=sa_exc.SAPendingDeprecationWarning)] - if not messages: - filters.append(dict(action='ignore', - category=sa_exc.SADeprecationWarning)) - else: - filters.extend( - [dict(action='ignore', - message=message, - category=sa_exc.SADeprecationWarning) - for message in - [(m.startswith('//') and - ('Call to deprecated function ' + m[2:]) or m) - for m in messages]]) - - for f in filters: - warnings.filterwarnings(**f) - try: +def _expect_warnings(to_patch, messages): + + filters = [re.compile(msg, re.I) for msg in messages] + + real_warn = warnings.warn + + def our_warn(msg, exception, *arg, **kw): + if not filters: + return + + for filter_ in filters: + if filter_.match(msg): + break + else: + real_warn(msg, exception, *arg, **kw) + + with mock.patch(to_patch, our_warn): yield - finally: - resetwarnings() def global_cleanup_assertions(): diff --git a/lib/sqlalchemy/testing/plugin/plugin_base.py b/lib/sqlalchemy/testing/plugin/plugin_base.py index c02f0556b..7ba31d3e3 100644 --- a/lib/sqlalchemy/testing/plugin/plugin_base.py +++ b/lib/sqlalchemy/testing/plugin/plugin_base.py @@ -181,7 +181,7 @@ def post_begin(): from sqlalchemy.testing import fixtures, engines, exclusions, \ assertions, warnings, profiling, config from sqlalchemy import util - + warnings.setup_filters() def _log(opt_str, value, parser): global logging @@ -491,13 +491,11 @@ def before_test(test, test_module_name, test_class, test_name): id_ = "%s.%s.%s" % (test_module_name, name, test_name) - warnings.resetwarnings() profiling._current_test = id_ def after_test(test): engines.testing_reaper._after_test_ctx() - warnings.resetwarnings() def _possible_configs_for_cls(cls, reasons=None): diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py index b3314de6e..47f1e1404 100644 --- a/lib/sqlalchemy/testing/warnings.py +++ b/lib/sqlalchemy/testing/warnings.py @@ -9,25 +9,11 @@ from __future__ import absolute_import import warnings from .. import exc as sa_exc -from .. import util import re -def testing_warn(msg, stacklevel=3): - """Replaces sqlalchemy.util.warn during tests.""" - - filename = "sqlalchemy.testing.warnings" - lineno = 1 - if isinstance(msg, util.string_types): - warnings.warn_explicit(msg, sa_exc.SAWarning, filename, lineno) - else: - warnings.warn_explicit(msg, filename, lineno) - - -def resetwarnings(): - """Reset warning behavior to testing defaults.""" - - util.warn = util.langhelpers.warn = testing_warn +def setup_filters(): + """Set global warning behavior for the test suite.""" warnings.filterwarnings('ignore', category=sa_exc.SAPendingDeprecationWarning) @@ -35,24 +21,20 @@ def resetwarnings(): warnings.filterwarnings('error', category=sa_exc.SAWarning) -def assert_warnings(fn, warnings, regex=False): +def assert_warnings(fn, warning_msgs, regex=False): """Assert that each of the given warnings are emitted by fn.""" - from .assertions import eq_, emits_warning + from .assertions import eq_ - canary = [] - orig_warn = util.warn + with warnings.catch_warnings(record=True) as log: + # ensure that nothing is going into __warningregistry__ + warnings.filterwarnings("always") - def capture_warnings(*args, **kw): - orig_warn(*args, **kw) - popwarn = warnings.pop(0) - canary.append(popwarn) + result = fn() + for warning in log: + popwarn = warning_msgs.pop(0) if regex: - assert re.match(popwarn, args[0]) + assert re.match(popwarn, str(warning.message)) else: - eq_(args[0], popwarn) - util.warn = util.langhelpers.warn = capture_warnings - - result = emits_warning()(fn)() - assert canary, "No warning was emitted" + eq_(popwarn, str(warning.message)) return result |
