summaryrefslogtreecommitdiff
path: root/test/base
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
commit07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch)
tree050ef65db988559c60f7aa40f2d0bfe24947e548 /test/base
parent560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff)
parentee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff)
downloadsqlalchemy-ticket_2501.tar.gz
Merge branch 'master' into ticket_2501ticket_2501
Conflicts: lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/base')
-rw-r--r--test/base/test_events.py248
-rw-r--r--test/base/test_utils.py114
2 files changed, 335 insertions, 27 deletions
diff --git a/test/base/test_events.py b/test/base/test_events.py
index 1e0568f27..e985f8d5b 100644
--- a/test/base/test_events.py
+++ b/test/base/test_events.py
@@ -6,15 +6,12 @@ from sqlalchemy import event, exc
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.mock import Mock, call
-
+from sqlalchemy import testing
class EventsTest(fixtures.TestBase):
"""Test class- and instance-level event registration."""
def setUp(self):
- assert 'event_one' not in event._registrars
- assert 'event_two' not in event._registrars
-
class TargetEvents(event.Events):
def event_one(self, x, y):
pass
@@ -30,7 +27,7 @@ class EventsTest(fixtures.TestBase):
self.Target = Target
def tearDown(self):
- event._remove_dispatcher(self.Target.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.Target.__dict__['dispatch'].events)
def test_register_class(self):
def listen(x, y):
@@ -84,7 +81,7 @@ class EventsTest(fixtures.TestBase):
eq_(len(self.Target().dispatch.event_one), 2)
eq_(len(t1.dispatch.event_one), 3)
- def test_append_vs_insert(self):
+ def test_append_vs_insert_cls(self):
def listen_one(x, y):
pass
@@ -103,6 +100,26 @@ class EventsTest(fixtures.TestBase):
[listen_three, listen_one, listen_two]
)
+ def test_append_vs_insert_instance(self):
+ def listen_one(x, y):
+ pass
+
+ def listen_two(x, y):
+ pass
+
+ def listen_three(x, y):
+ pass
+
+ target = self.Target()
+ event.listen(target, "event_one", listen_one)
+ event.listen(target, "event_one", listen_two)
+ event.listen(target, "event_one", listen_three, insert=True)
+
+ eq_(
+ list(target.dispatch.event_one),
+ [listen_three, listen_one, listen_two]
+ )
+
def test_decorator(self):
@event.listens_for(self.Target, "event_one")
def listen_one(x, y):
@@ -189,7 +206,7 @@ class NamedCallTest(fixtures.TestBase):
self.TargetOne = TargetOne
def tearDown(self):
- event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
def test_kw_accept(self):
@@ -261,7 +278,7 @@ class LegacySignatureTest(fixtures.TestBase):
self.TargetOne = TargetOne
def tearDown(self):
- event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
def test_legacy_accept(self):
canary = Mock()
@@ -294,6 +311,26 @@ class LegacySignatureTest(fixtures.TestBase):
canary(x, y, kw)
self._test_legacy_accept_kw(inst, canary)
+ def test_legacy_accept_partial(self):
+ canary = Mock()
+ def evt(a, x, y, **kw):
+ canary(a, x, y, **kw)
+ from functools import partial
+ evt_partial = partial(evt, 5)
+ target = self.TargetOne()
+ event.listen(target, "event_four", evt_partial)
+ # can't do legacy accept on a partial; we can't inspect it
+ assert_raises(
+ TypeError,
+ target.dispatch.event_four, 4, 5, 6, 7, foo="bar"
+ )
+ target.dispatch.event_four(4, 5, foo="bar")
+ eq_(
+ canary.mock_calls,
+ [call(5, 4, 5, foo="bar")]
+ )
+
+
def _test_legacy_accept_kw(self, target, canary):
target.dispatch.event_four(4, 5, 6, 7, foo="bar")
@@ -375,7 +412,7 @@ class ClsLevelListenTest(fixtures.TestBase):
def tearDown(self):
- event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
def setUp(self):
class TargetEventsOne(event.Events):
@@ -386,7 +423,7 @@ class ClsLevelListenTest(fixtures.TestBase):
self.TargetOne = TargetOne
def tearDown(self):
- event._remove_dispatcher(
+ event.base._remove_dispatcher(
self.TargetOne.__dict__['dispatch'].events)
def test_lis_subcalss_lis(self):
@@ -473,8 +510,8 @@ class AcceptTargetsTest(fixtures.TestBase):
self.TargetTwo = TargetTwo
def tearDown(self):
- event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
- event._remove_dispatcher(self.TargetTwo.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.TargetTwo.__dict__['dispatch'].events)
def test_target_accept(self):
"""Test that events of the same name are routed to the correct
@@ -543,7 +580,7 @@ class CustomTargetsTest(fixtures.TestBase):
self.Target = Target
def tearDown(self):
- event._remove_dispatcher(self.Target.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.Target.__dict__['dispatch'].events)
def test_indirect(self):
def listen(x, y):
@@ -593,14 +630,14 @@ class ListenOverrideTest(fixtures.TestBase):
def setUp(self):
class TargetEvents(event.Events):
@classmethod
- def _listen(cls, target, identifier, fn, add=False):
+ def _listen(cls, event_key, add=False):
+ fn = event_key.fn
if add:
def adapt(x, y):
fn(x + y)
- else:
- adapt = fn
+ event_key = event_key.with_wrapper(adapt)
- event.Events._listen(target, identifier, adapt)
+ event_key.base_listen()
def event_one(self, x, y):
pass
@@ -610,7 +647,7 @@ class ListenOverrideTest(fixtures.TestBase):
self.Target = Target
def tearDown(self):
- event._remove_dispatcher(self.Target.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(self.Target.__dict__['dispatch'].events)
def test_listen_override(self):
listen_one = Mock()
@@ -700,7 +737,7 @@ class JoinTest(fixtures.TestBase):
for cls in (self.TargetElement,
self.TargetFactory, self.BaseTarget):
if 'dispatch' in cls.__dict__:
- event._remove_dispatcher(cls.__dict__['dispatch'].events)
+ event.base._remove_dispatcher(cls.__dict__['dispatch'].events)
def test_neither(self):
element = self.TargetFactory().create()
@@ -842,13 +879,19 @@ class JoinTest(fixtures.TestBase):
element.run_event(2)
element.run_event(3)
- # c1 gets no events due to _JoinedListener
- # fixing the "parent" at construction time.
- # this can be changed to be "live" at the cost
- # of performance.
+ # if _JoinedListener fixes .listeners
+ # at construction time, then we don't get
+ # the new listeners.
+ #eq_(l1.mock_calls, [])
+
+ # alternatively, if _JoinedListener shares the list
+ # using a @property, then we get them, at the arguable
+ # expense of the extra method call to access the .listeners
+ # collection
eq_(
- l1.mock_calls, []
+ l1.mock_calls, [call(element, 2), call(element, 3)]
)
+
eq_(
l2.mock_calls,
[call(element, 1), call(element, 2), call(element, 3)]
@@ -892,3 +935,160 @@ class JoinTest(fixtures.TestBase):
l1.mock_calls,
[call(element, 1), call(element, 2), call(element, 3)]
)
+
+class RemovalTest(fixtures.TestBase):
+ def _fixture(self):
+ class TargetEvents(event.Events):
+ def event_one(self, x, y):
+ pass
+
+ def event_two(self, x):
+ pass
+
+ def event_three(self, x):
+ pass
+
+ class Target(object):
+ dispatch = event.dispatcher(TargetEvents)
+ return Target
+
+ def test_clslevel(self):
+ Target = self._fixture()
+
+ m1 = Mock()
+
+ event.listen(Target, "event_two", m1)
+
+ t1 = Target()
+ t1.dispatch.event_two("x")
+
+ event.remove(Target, "event_two", m1)
+
+ t1.dispatch.event_two("y")
+
+ eq_(m1.mock_calls, [call("x")])
+
+ def test_clslevel_subclass(self):
+ Target = self._fixture()
+ class SubTarget(Target):
+ pass
+
+ m1 = Mock()
+
+ event.listen(Target, "event_two", m1)
+
+ t1 = SubTarget()
+ t1.dispatch.event_two("x")
+
+ event.remove(Target, "event_two", m1)
+
+ t1.dispatch.event_two("y")
+
+ eq_(m1.mock_calls, [call("x")])
+
+ def test_instance(self):
+ Target = self._fixture()
+
+ class Foo(object):
+ def __init__(self):
+ self.mock = Mock()
+
+ def evt(self, arg):
+ self.mock(arg)
+
+ f1 = Foo()
+ f2 = Foo()
+
+ event.listen(Target, "event_one", f1.evt)
+ event.listen(Target, "event_one", f2.evt)
+
+ t1 = Target()
+ t1.dispatch.event_one("x")
+
+ event.remove(Target, "event_one", f1.evt)
+
+ t1.dispatch.event_one("y")
+
+ eq_(f1.mock.mock_calls, [call("x")])
+ eq_(f2.mock.mock_calls, [call("x"), call("y")])
+
+ def test_propagate(self):
+ Target = self._fixture()
+
+ m1 = Mock()
+
+ t1 = Target()
+ t2 = Target()
+
+ event.listen(t1, "event_one", m1, propagate=True)
+ event.listen(t1, "event_two", m1, propagate=False)
+
+ t2.dispatch._update(t1.dispatch)
+
+ t1.dispatch.event_one("t1e1x")
+ t1.dispatch.event_two("t1e2x")
+ t2.dispatch.event_one("t2e1x")
+ t2.dispatch.event_two("t2e2x")
+
+ event.remove(t1, "event_one", m1)
+ event.remove(t1, "event_two", m1)
+
+ t1.dispatch.event_one("t1e1y")
+ t1.dispatch.event_two("t1e2y")
+ t2.dispatch.event_one("t2e1y")
+ t2.dispatch.event_two("t2e2y")
+
+ eq_(m1.mock_calls,
+ [call('t1e1x'), call('t1e2x'),
+ call('t2e1x')])
+
+ @testing.requires.predictable_gc
+ def test_listener_collection_removed_cleanup(self):
+ from sqlalchemy.event import registry
+
+ Target = self._fixture()
+
+ m1 = Mock()
+
+ t1 = Target()
+
+ event.listen(t1, "event_one", m1)
+
+ key = (id(t1), "event_one", id(m1))
+
+ assert key in registry._key_to_collection
+ collection_ref = list(registry._key_to_collection[key])[0]
+ assert collection_ref in registry._collection_to_key
+
+ t1.dispatch.event_one("t1")
+
+ del t1
+
+ gc_collect()
+
+ assert key not in registry._key_to_collection
+ assert collection_ref not in registry._collection_to_key
+
+ def test_remove_not_listened(self):
+ Target = self._fixture()
+
+ m1 = Mock()
+
+ t1 = Target()
+
+ event.listen(t1, "event_one", m1, propagate=True)
+ event.listen(t1, "event_three", m1)
+
+ event.remove(t1, "event_one", m1)
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"No listeners found for event <.*Target.*> / 'event_two' / <Mock.*> ",
+ event.remove, t1, "event_two", m1
+ )
+
+ event.remove(t1, "event_three", m1)
+
+
+
+
+
diff --git a/test/base/test_utils.py b/test/base/test_utils.py
index aefc6d421..86e4b190a 100644
--- a/test/base/test_utils.py
+++ b/test/base/test_utils.py
@@ -1,10 +1,10 @@
import copy
-from sqlalchemy import util, sql, exc
+from sqlalchemy import util, sql, exc, testing
from sqlalchemy.testing import assert_raises, assert_raises_message, fixtures
from sqlalchemy.testing import eq_, is_, ne_, fails_if
-from sqlalchemy.testing.util import picklers
-from sqlalchemy.util import classproperty
+from sqlalchemy.testing.util import picklers, gc_collect
+from sqlalchemy.util import classproperty, WeakSequence, get_callable_argspec
class KeyedTupleTest():
@@ -115,6 +115,36 @@ class KeyedTupleTest():
keyed_tuple[0] = 100
assert_raises(TypeError, should_raise)
+class WeakSequenceTest(fixtures.TestBase):
+ @testing.requires.predictable_gc
+ def test_cleanout_elements(self):
+ class Foo(object):
+ pass
+ f1, f2, f3 = Foo(), Foo(), Foo()
+ w = WeakSequence([f1, f2, f3])
+ eq_(len(w), 3)
+ eq_(len(w._storage), 3)
+ del f2
+ gc_collect()
+ eq_(len(w), 2)
+ eq_(len(w._storage), 2)
+
+ @testing.requires.predictable_gc
+ def test_cleanout_appended(self):
+ class Foo(object):
+ pass
+ f1, f2, f3 = Foo(), Foo(), Foo()
+ w = WeakSequence()
+ w.append(f1)
+ w.append(f2)
+ w.append(f3)
+ eq_(len(w), 3)
+ eq_(len(w._storage), 3)
+ del f2
+ gc_collect()
+ eq_(len(w), 2)
+ eq_(len(w._storage), 2)
+
class OrderedDictTest(fixtures.TestBase):
@@ -1154,6 +1184,33 @@ class ArgInspectionTest(fixtures.TestBase):
test(f3)
test(f4)
+ def test_callable_argspec_fn(self):
+ def foo(x, y, **kw):
+ pass
+ eq_(
+ get_callable_argspec(foo),
+ (['x', 'y'], None, 'kw', None)
+ )
+
+ def test_callable_argspec_method(self):
+ class Foo(object):
+ def foo(self, x, y, **kw):
+ pass
+ eq_(
+ get_callable_argspec(Foo.foo),
+ (['self', 'x', 'y'], None, 'kw', None)
+ )
+
+ def test_callable_argspec_partial(self):
+ from functools import partial
+ def foo(x, y, z, **kw):
+ pass
+ bar = partial(foo, 5)
+
+ assert_raises(
+ ValueError,
+ get_callable_argspec, bar
+ )
class SymbolTest(fixtures.TestBase):
@@ -1389,6 +1446,55 @@ class GenericReprTest(fixtures.TestBase):
"Foo(b=5, d=7)"
)
+ def test_multi_kw(self):
+ class Foo(object):
+ def __init__(self, a, b, c=3, d=4):
+ self.a = a
+ self.b = b
+ self.c = c
+ self.d = d
+ class Bar(Foo):
+ def __init__(self, e, f, g=5, **kw):
+ self.e = e
+ self.f = f
+ self.g = g
+ super(Bar, self).__init__(**kw)
+
+ eq_(
+ util.generic_repr(
+ Bar('e', 'f', g=7, a=6, b=5, d=9),
+ to_inspect=[Bar, Foo]
+ ),
+ "Bar('e', 'f', g=7, a=6, b=5, d=9)"
+ )
+
+ eq_(
+ util.generic_repr(
+ Bar('e', 'f', a=6, b=5),
+ to_inspect=[Bar, Foo]
+ ),
+ "Bar('e', 'f', a=6, b=5)"
+ )
+
+ def test_multi_kw_repeated(self):
+ class Foo(object):
+ def __init__(self, a=1, b=2):
+ self.a = a
+ self.b = b
+ class Bar(Foo):
+ def __init__(self, b=3, c=4, **kw):
+ self.c = c
+ super(Bar, self).__init__(b=b, **kw)
+
+ eq_(
+ util.generic_repr(
+ Bar(a='a', b='b', c='c'),
+ to_inspect=[Bar, Foo]
+ ),
+ "Bar(b='b', c='c', a='a')"
+ )
+
+
def test_discard_vargs(self):
class Foo(object):
def __init__(self, a, b, *args):
@@ -1586,3 +1692,5 @@ class TestClassProperty(fixtures.TestBase):
return d
eq_(B.something, {'foo': 1, 'bazz': 2})
+
+