diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
| commit | 07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch) | |
| tree | 050ef65db988559c60f7aa40f2d0bfe24947e548 /test/base | |
| parent | 560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff) | |
| parent | ee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff) | |
| download | sqlalchemy-ticket_2501.tar.gz | |
Merge branch 'master' into ticket_2501ticket_2501
Conflicts:
lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/base')
| -rw-r--r-- | test/base/test_events.py | 248 | ||||
| -rw-r--r-- | test/base/test_utils.py | 114 |
2 files changed, 335 insertions, 27 deletions
diff --git a/test/base/test_events.py b/test/base/test_events.py index 1e0568f27..e985f8d5b 100644 --- a/test/base/test_events.py +++ b/test/base/test_events.py @@ -6,15 +6,12 @@ from sqlalchemy import event, exc from sqlalchemy.testing import fixtures from sqlalchemy.testing.util import gc_collect from sqlalchemy.testing.mock import Mock, call - +from sqlalchemy import testing class EventsTest(fixtures.TestBase): """Test class- and instance-level event registration.""" def setUp(self): - assert 'event_one' not in event._registrars - assert 'event_two' not in event._registrars - class TargetEvents(event.Events): def event_one(self, x, y): pass @@ -30,7 +27,7 @@ class EventsTest(fixtures.TestBase): self.Target = Target def tearDown(self): - event._remove_dispatcher(self.Target.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.Target.__dict__['dispatch'].events) def test_register_class(self): def listen(x, y): @@ -84,7 +81,7 @@ class EventsTest(fixtures.TestBase): eq_(len(self.Target().dispatch.event_one), 2) eq_(len(t1.dispatch.event_one), 3) - def test_append_vs_insert(self): + def test_append_vs_insert_cls(self): def listen_one(x, y): pass @@ -103,6 +100,26 @@ class EventsTest(fixtures.TestBase): [listen_three, listen_one, listen_two] ) + def test_append_vs_insert_instance(self): + def listen_one(x, y): + pass + + def listen_two(x, y): + pass + + def listen_three(x, y): + pass + + target = self.Target() + event.listen(target, "event_one", listen_one) + event.listen(target, "event_one", listen_two) + event.listen(target, "event_one", listen_three, insert=True) + + eq_( + list(target.dispatch.event_one), + [listen_three, listen_one, listen_two] + ) + def test_decorator(self): @event.listens_for(self.Target, "event_one") def listen_one(x, y): @@ -189,7 +206,7 @@ class NamedCallTest(fixtures.TestBase): self.TargetOne = TargetOne def tearDown(self): - event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) def test_kw_accept(self): @@ -261,7 +278,7 @@ class LegacySignatureTest(fixtures.TestBase): self.TargetOne = TargetOne def tearDown(self): - event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) def test_legacy_accept(self): canary = Mock() @@ -294,6 +311,26 @@ class LegacySignatureTest(fixtures.TestBase): canary(x, y, kw) self._test_legacy_accept_kw(inst, canary) + def test_legacy_accept_partial(self): + canary = Mock() + def evt(a, x, y, **kw): + canary(a, x, y, **kw) + from functools import partial + evt_partial = partial(evt, 5) + target = self.TargetOne() + event.listen(target, "event_four", evt_partial) + # can't do legacy accept on a partial; we can't inspect it + assert_raises( + TypeError, + target.dispatch.event_four, 4, 5, 6, 7, foo="bar" + ) + target.dispatch.event_four(4, 5, foo="bar") + eq_( + canary.mock_calls, + [call(5, 4, 5, foo="bar")] + ) + + def _test_legacy_accept_kw(self, target, canary): target.dispatch.event_four(4, 5, 6, 7, foo="bar") @@ -375,7 +412,7 @@ class ClsLevelListenTest(fixtures.TestBase): def tearDown(self): - event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) def setUp(self): class TargetEventsOne(event.Events): @@ -386,7 +423,7 @@ class ClsLevelListenTest(fixtures.TestBase): self.TargetOne = TargetOne def tearDown(self): - event._remove_dispatcher( + event.base._remove_dispatcher( self.TargetOne.__dict__['dispatch'].events) def test_lis_subcalss_lis(self): @@ -473,8 +510,8 @@ class AcceptTargetsTest(fixtures.TestBase): self.TargetTwo = TargetTwo def tearDown(self): - event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) - event._remove_dispatcher(self.TargetTwo.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.TargetTwo.__dict__['dispatch'].events) def test_target_accept(self): """Test that events of the same name are routed to the correct @@ -543,7 +580,7 @@ class CustomTargetsTest(fixtures.TestBase): self.Target = Target def tearDown(self): - event._remove_dispatcher(self.Target.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.Target.__dict__['dispatch'].events) def test_indirect(self): def listen(x, y): @@ -593,14 +630,14 @@ class ListenOverrideTest(fixtures.TestBase): def setUp(self): class TargetEvents(event.Events): @classmethod - def _listen(cls, target, identifier, fn, add=False): + def _listen(cls, event_key, add=False): + fn = event_key.fn if add: def adapt(x, y): fn(x + y) - else: - adapt = fn + event_key = event_key.with_wrapper(adapt) - event.Events._listen(target, identifier, adapt) + event_key.base_listen() def event_one(self, x, y): pass @@ -610,7 +647,7 @@ class ListenOverrideTest(fixtures.TestBase): self.Target = Target def tearDown(self): - event._remove_dispatcher(self.Target.__dict__['dispatch'].events) + event.base._remove_dispatcher(self.Target.__dict__['dispatch'].events) def test_listen_override(self): listen_one = Mock() @@ -700,7 +737,7 @@ class JoinTest(fixtures.TestBase): for cls in (self.TargetElement, self.TargetFactory, self.BaseTarget): if 'dispatch' in cls.__dict__: - event._remove_dispatcher(cls.__dict__['dispatch'].events) + event.base._remove_dispatcher(cls.__dict__['dispatch'].events) def test_neither(self): element = self.TargetFactory().create() @@ -842,13 +879,19 @@ class JoinTest(fixtures.TestBase): element.run_event(2) element.run_event(3) - # c1 gets no events due to _JoinedListener - # fixing the "parent" at construction time. - # this can be changed to be "live" at the cost - # of performance. + # if _JoinedListener fixes .listeners + # at construction time, then we don't get + # the new listeners. + #eq_(l1.mock_calls, []) + + # alternatively, if _JoinedListener shares the list + # using a @property, then we get them, at the arguable + # expense of the extra method call to access the .listeners + # collection eq_( - l1.mock_calls, [] + l1.mock_calls, [call(element, 2), call(element, 3)] ) + eq_( l2.mock_calls, [call(element, 1), call(element, 2), call(element, 3)] @@ -892,3 +935,160 @@ class JoinTest(fixtures.TestBase): l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)] ) + +class RemovalTest(fixtures.TestBase): + def _fixture(self): + class TargetEvents(event.Events): + def event_one(self, x, y): + pass + + def event_two(self, x): + pass + + def event_three(self, x): + pass + + class Target(object): + dispatch = event.dispatcher(TargetEvents) + return Target + + def test_clslevel(self): + Target = self._fixture() + + m1 = Mock() + + event.listen(Target, "event_two", m1) + + t1 = Target() + t1.dispatch.event_two("x") + + event.remove(Target, "event_two", m1) + + t1.dispatch.event_two("y") + + eq_(m1.mock_calls, [call("x")]) + + def test_clslevel_subclass(self): + Target = self._fixture() + class SubTarget(Target): + pass + + m1 = Mock() + + event.listen(Target, "event_two", m1) + + t1 = SubTarget() + t1.dispatch.event_two("x") + + event.remove(Target, "event_two", m1) + + t1.dispatch.event_two("y") + + eq_(m1.mock_calls, [call("x")]) + + def test_instance(self): + Target = self._fixture() + + class Foo(object): + def __init__(self): + self.mock = Mock() + + def evt(self, arg): + self.mock(arg) + + f1 = Foo() + f2 = Foo() + + event.listen(Target, "event_one", f1.evt) + event.listen(Target, "event_one", f2.evt) + + t1 = Target() + t1.dispatch.event_one("x") + + event.remove(Target, "event_one", f1.evt) + + t1.dispatch.event_one("y") + + eq_(f1.mock.mock_calls, [call("x")]) + eq_(f2.mock.mock_calls, [call("x"), call("y")]) + + def test_propagate(self): + Target = self._fixture() + + m1 = Mock() + + t1 = Target() + t2 = Target() + + event.listen(t1, "event_one", m1, propagate=True) + event.listen(t1, "event_two", m1, propagate=False) + + t2.dispatch._update(t1.dispatch) + + t1.dispatch.event_one("t1e1x") + t1.dispatch.event_two("t1e2x") + t2.dispatch.event_one("t2e1x") + t2.dispatch.event_two("t2e2x") + + event.remove(t1, "event_one", m1) + event.remove(t1, "event_two", m1) + + t1.dispatch.event_one("t1e1y") + t1.dispatch.event_two("t1e2y") + t2.dispatch.event_one("t2e1y") + t2.dispatch.event_two("t2e2y") + + eq_(m1.mock_calls, + [call('t1e1x'), call('t1e2x'), + call('t2e1x')]) + + @testing.requires.predictable_gc + def test_listener_collection_removed_cleanup(self): + from sqlalchemy.event import registry + + Target = self._fixture() + + m1 = Mock() + + t1 = Target() + + event.listen(t1, "event_one", m1) + + key = (id(t1), "event_one", id(m1)) + + assert key in registry._key_to_collection + collection_ref = list(registry._key_to_collection[key])[0] + assert collection_ref in registry._collection_to_key + + t1.dispatch.event_one("t1") + + del t1 + + gc_collect() + + assert key not in registry._key_to_collection + assert collection_ref not in registry._collection_to_key + + def test_remove_not_listened(self): + Target = self._fixture() + + m1 = Mock() + + t1 = Target() + + event.listen(t1, "event_one", m1, propagate=True) + event.listen(t1, "event_three", m1) + + event.remove(t1, "event_one", m1) + assert_raises_message( + exc.InvalidRequestError, + r"No listeners found for event <.*Target.*> / 'event_two' / <Mock.*> ", + event.remove, t1, "event_two", m1 + ) + + event.remove(t1, "event_three", m1) + + + + + diff --git a/test/base/test_utils.py b/test/base/test_utils.py index aefc6d421..86e4b190a 100644 --- a/test/base/test_utils.py +++ b/test/base/test_utils.py @@ -1,10 +1,10 @@ import copy -from sqlalchemy import util, sql, exc +from sqlalchemy import util, sql, exc, testing from sqlalchemy.testing import assert_raises, assert_raises_message, fixtures from sqlalchemy.testing import eq_, is_, ne_, fails_if -from sqlalchemy.testing.util import picklers -from sqlalchemy.util import classproperty +from sqlalchemy.testing.util import picklers, gc_collect +from sqlalchemy.util import classproperty, WeakSequence, get_callable_argspec class KeyedTupleTest(): @@ -115,6 +115,36 @@ class KeyedTupleTest(): keyed_tuple[0] = 100 assert_raises(TypeError, should_raise) +class WeakSequenceTest(fixtures.TestBase): + @testing.requires.predictable_gc + def test_cleanout_elements(self): + class Foo(object): + pass + f1, f2, f3 = Foo(), Foo(), Foo() + w = WeakSequence([f1, f2, f3]) + eq_(len(w), 3) + eq_(len(w._storage), 3) + del f2 + gc_collect() + eq_(len(w), 2) + eq_(len(w._storage), 2) + + @testing.requires.predictable_gc + def test_cleanout_appended(self): + class Foo(object): + pass + f1, f2, f3 = Foo(), Foo(), Foo() + w = WeakSequence() + w.append(f1) + w.append(f2) + w.append(f3) + eq_(len(w), 3) + eq_(len(w._storage), 3) + del f2 + gc_collect() + eq_(len(w), 2) + eq_(len(w._storage), 2) + class OrderedDictTest(fixtures.TestBase): @@ -1154,6 +1184,33 @@ class ArgInspectionTest(fixtures.TestBase): test(f3) test(f4) + def test_callable_argspec_fn(self): + def foo(x, y, **kw): + pass + eq_( + get_callable_argspec(foo), + (['x', 'y'], None, 'kw', None) + ) + + def test_callable_argspec_method(self): + class Foo(object): + def foo(self, x, y, **kw): + pass + eq_( + get_callable_argspec(Foo.foo), + (['self', 'x', 'y'], None, 'kw', None) + ) + + def test_callable_argspec_partial(self): + from functools import partial + def foo(x, y, z, **kw): + pass + bar = partial(foo, 5) + + assert_raises( + ValueError, + get_callable_argspec, bar + ) class SymbolTest(fixtures.TestBase): @@ -1389,6 +1446,55 @@ class GenericReprTest(fixtures.TestBase): "Foo(b=5, d=7)" ) + def test_multi_kw(self): + class Foo(object): + def __init__(self, a, b, c=3, d=4): + self.a = a + self.b = b + self.c = c + self.d = d + class Bar(Foo): + def __init__(self, e, f, g=5, **kw): + self.e = e + self.f = f + self.g = g + super(Bar, self).__init__(**kw) + + eq_( + util.generic_repr( + Bar('e', 'f', g=7, a=6, b=5, d=9), + to_inspect=[Bar, Foo] + ), + "Bar('e', 'f', g=7, a=6, b=5, d=9)" + ) + + eq_( + util.generic_repr( + Bar('e', 'f', a=6, b=5), + to_inspect=[Bar, Foo] + ), + "Bar('e', 'f', a=6, b=5)" + ) + + def test_multi_kw_repeated(self): + class Foo(object): + def __init__(self, a=1, b=2): + self.a = a + self.b = b + class Bar(Foo): + def __init__(self, b=3, c=4, **kw): + self.c = c + super(Bar, self).__init__(b=b, **kw) + + eq_( + util.generic_repr( + Bar(a='a', b='b', c='c'), + to_inspect=[Bar, Foo] + ), + "Bar(b='b', c='c', a='a')" + ) + + def test_discard_vargs(self): class Foo(object): def __init__(self, a, b, *args): @@ -1586,3 +1692,5 @@ class TestClassProperty(fixtures.TestBase): return d eq_(B.something, {'foo': 1, 'bazz': 2}) + + |
