summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-08-31 15:22:00 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-08-31 15:22:00 -0400
commit3c60d3b1ca492ba77d64111f0378892acaadf36b (patch)
tree5e2df552a7142cd3bb2ef9e5236db85e5c2859f4 /lib/sqlalchemy/testing
parent903b0a42e71c81ff99494352760c0f92fa7a486d (diff)
downloadsqlalchemy-3c60d3b1ca492ba77d64111f0378892acaadf36b.tar.gz
- A new style of warning can be emitted which will "filter" up to
N occurrences of a parameterized string. This allows parameterized warnings that can refer to their arguments to be delivered a fixed number of times until allowing Python warning filters to squelch them, and prevents memory from growing unbounded within Python's warning registries. fixes #3178
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r--lib/sqlalchemy/testing/__init__.py2
-rw-r--r--lib/sqlalchemy/testing/assertions.py129
-rw-r--r--lib/sqlalchemy/testing/plugin/plugin_base.py4
-rw-r--r--lib/sqlalchemy/testing/warnings.py42
4 files changed, 76 insertions, 101 deletions
diff --git a/lib/sqlalchemy/testing/__init__.py b/lib/sqlalchemy/testing/__init__.py
index 8f8f56412..e53fb28b1 100644
--- a/lib/sqlalchemy/testing/__init__.py
+++ b/lib/sqlalchemy/testing/__init__.py
@@ -6,7 +6,7 @@
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-from .warnings import testing_warn, assert_warnings, resetwarnings
+from .warnings import assert_warnings
from . import config
diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py
index 79411af7e..dbe365ad5 100644
--- a/lib/sqlalchemy/testing/assertions.py
+++ b/lib/sqlalchemy/testing/assertions.py
@@ -9,79 +9,79 @@ from __future__ import absolute_import
from . import util as testutil
from sqlalchemy import pool, orm, util
-from sqlalchemy.engine import default, create_engine, url
-from sqlalchemy import exc as sa_exc
+from sqlalchemy.engine import default, url
from sqlalchemy.util import decorator
from sqlalchemy import types as sqltypes, schema
import warnings
import re
-from .warnings import resetwarnings
from .exclusions import db_spec, _is_excluded
from . import assertsql
from . import config
-import itertools
from .util import fail
import contextlib
+from . import mock
-def emits_warning(*messages):
- """Mark a test as emitting a warning.
+def expect_warnings(*messages):
+ """Context manager which expects one or more warnings.
+
+ With no arguments, squelches all SAWarnings emitted via
+ sqlalchemy.util.warn and sqlalchemy.util.warn_limited. Otherwise
+ pass string expressions that will match selected warnings via regex;
+ all non-matching warnings are sent through.
+
+ Note that the test suite sets SAWarning warnings to raise exceptions.
+
+ """
+ return _expect_warnings(
+ "sqlalchemy.util.deprecations.warnings.warn", messages)
+
+
+@contextlib.contextmanager
+def expect_warnings_on(db, *messages):
+ """Context manager which expects one or more warnings on specific
+ dialects.
- With no arguments, squelches all SAWarning failures. Or pass one or more
- strings; these will be matched to the root of the warning description by
- warnings.filterwarnings().
"""
- # TODO: it would be nice to assert that a named warning was
- # emitted. should work with some monkeypatching of warnings,
- # and may work on non-CPython if they keep to the spirit of
- # warnings.showwarning's docstring.
- # - update: jython looks ok, it uses cpython's module
+ spec = db_spec(db)
+
+ if isinstance(db, util.string_types) and not spec(config._current):
+ yield
+ elif not _is_excluded(*db):
+ yield
+ else:
+ with expect_warnings(*messages):
+ yield
+
+
+def emits_warning(*messages):
+ """Decorator form of expect_warnings()."""
@decorator
def decorate(fn, *args, **kw):
- # todo: should probably be strict about this, too
- filters = [dict(action='ignore',
- category=sa_exc.SAPendingDeprecationWarning)]
- if not messages:
- filters.append(dict(action='ignore',
- category=sa_exc.SAWarning))
- else:
- filters.extend(dict(action='ignore',
- message=message,
- category=sa_exc.SAWarning)
- for message in messages)
- for f in filters:
- warnings.filterwarnings(**f)
- try:
+ with expect_warnings(*messages):
return fn(*args, **kw)
- finally:
- resetwarnings()
+
return decorate
-def emits_warning_on(db, *warnings):
+def expect_deprecated(*messages):
+ return _expect_warnings(
+ "sqlalchemy.util.deprecations.warnings.warn", messages)
+
+
+def emits_warning_on(db, *messages):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
"""
- spec = db_spec(db)
-
@decorator
def decorate(fn, *args, **kw):
- if isinstance(db, util.string_types):
- if not spec(config._current):
- return fn(*args, **kw)
- else:
- wrapped = emits_warning(*warnings)(fn)
- return wrapped(*args, **kw)
- else:
- if not _is_excluded(*db):
- return fn(*args, **kw)
- else:
- wrapped = emits_warning(*warnings)(fn)
- return wrapped(*args, **kw)
+ with expect_warnings_on(db, *messages):
+ return fn(*args, **kw)
+
return decorate
@@ -105,29 +105,24 @@ def uses_deprecated(*messages):
@contextlib.contextmanager
-def expect_deprecated(*messages):
- # todo: should probably be strict about this, too
- filters = [dict(action='ignore',
- category=sa_exc.SAPendingDeprecationWarning)]
- if not messages:
- filters.append(dict(action='ignore',
- category=sa_exc.SADeprecationWarning))
- else:
- filters.extend(
- [dict(action='ignore',
- message=message,
- category=sa_exc.SADeprecationWarning)
- for message in
- [(m.startswith('//') and
- ('Call to deprecated function ' + m[2:]) or m)
- for m in messages]])
-
- for f in filters:
- warnings.filterwarnings(**f)
- try:
+def _expect_warnings(to_patch, messages):
+
+ filters = [re.compile(msg, re.I) for msg in messages]
+
+ real_warn = warnings.warn
+
+ def our_warn(msg, exception, *arg, **kw):
+ if not filters:
+ return
+
+ for filter_ in filters:
+ if filter_.match(msg):
+ break
+ else:
+ real_warn(msg, exception, *arg, **kw)
+
+ with mock.patch(to_patch, our_warn):
yield
- finally:
- resetwarnings()
def global_cleanup_assertions():
diff --git a/lib/sqlalchemy/testing/plugin/plugin_base.py b/lib/sqlalchemy/testing/plugin/plugin_base.py
index c02f0556b..7ba31d3e3 100644
--- a/lib/sqlalchemy/testing/plugin/plugin_base.py
+++ b/lib/sqlalchemy/testing/plugin/plugin_base.py
@@ -181,7 +181,7 @@ def post_begin():
from sqlalchemy.testing import fixtures, engines, exclusions, \
assertions, warnings, profiling, config
from sqlalchemy import util
-
+ warnings.setup_filters()
def _log(opt_str, value, parser):
global logging
@@ -491,13 +491,11 @@ def before_test(test, test_module_name, test_class, test_name):
id_ = "%s.%s.%s" % (test_module_name, name, test_name)
- warnings.resetwarnings()
profiling._current_test = id_
def after_test(test):
engines.testing_reaper._after_test_ctx()
- warnings.resetwarnings()
def _possible_configs_for_cls(cls, reasons=None):
diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py
index b3314de6e..47f1e1404 100644
--- a/lib/sqlalchemy/testing/warnings.py
+++ b/lib/sqlalchemy/testing/warnings.py
@@ -9,25 +9,11 @@ from __future__ import absolute_import
import warnings
from .. import exc as sa_exc
-from .. import util
import re
-def testing_warn(msg, stacklevel=3):
- """Replaces sqlalchemy.util.warn during tests."""
-
- filename = "sqlalchemy.testing.warnings"
- lineno = 1
- if isinstance(msg, util.string_types):
- warnings.warn_explicit(msg, sa_exc.SAWarning, filename, lineno)
- else:
- warnings.warn_explicit(msg, filename, lineno)
-
-
-def resetwarnings():
- """Reset warning behavior to testing defaults."""
-
- util.warn = util.langhelpers.warn = testing_warn
+def setup_filters():
+ """Set global warning behavior for the test suite."""
warnings.filterwarnings('ignore',
category=sa_exc.SAPendingDeprecationWarning)
@@ -35,24 +21,20 @@ def resetwarnings():
warnings.filterwarnings('error', category=sa_exc.SAWarning)
-def assert_warnings(fn, warnings, regex=False):
+def assert_warnings(fn, warning_msgs, regex=False):
"""Assert that each of the given warnings are emitted by fn."""
- from .assertions import eq_, emits_warning
+ from .assertions import eq_
- canary = []
- orig_warn = util.warn
+ with warnings.catch_warnings(record=True) as log:
+ # ensure that nothing is going into __warningregistry__
+ warnings.filterwarnings("always")
- def capture_warnings(*args, **kw):
- orig_warn(*args, **kw)
- popwarn = warnings.pop(0)
- canary.append(popwarn)
+ result = fn()
+ for warning in log:
+ popwarn = warning_msgs.pop(0)
if regex:
- assert re.match(popwarn, args[0])
+ assert re.match(popwarn, str(warning.message))
else:
- eq_(args[0], popwarn)
- util.warn = util.langhelpers.warn = capture_warnings
-
- result = emits_warning()(fn)()
- assert canary, "No warning was emitted"
+ eq_(popwarn, str(warning.message))
return result