summaryrefslogtreecommitdiff
path: root/test/orm/test_mapper.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/test_mapper.py')
-rw-r--r--test/orm/test_mapper.py2475
1 files changed, 2475 insertions, 0 deletions
diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py
new file mode 100644
index 000000000..025b96424
--- /dev/null
+++ b/test/orm/test_mapper.py
@@ -0,0 +1,2475 @@
+"""General mapper operations with an emphasis on selecting/loading."""
+
+from sqlalchemy.test.testing import assert_raises, assert_raises_message
+import sqlalchemy as sa
+from sqlalchemy.test import testing, pickleable
+from sqlalchemy import MetaData, Integer, String, ForeignKey, func
+from sqlalchemy.test.schema import Table
+from sqlalchemy.test.schema import Column
+from sqlalchemy.engine import default
+from sqlalchemy.orm import mapper, relation, backref, create_session, class_mapper, compile_mappers, reconstructor, validates, aliased
+from sqlalchemy.orm import defer, deferred, synonym, attributes, column_property, composite, relation, dynamic_loader, comparable_property
+from sqlalchemy.test.testing import eq_, AssertsCompiledSQL
+from test.orm import _base, _fixtures
+
+
+class MapperTest(_fixtures.FixtureTest):
+
+ @testing.resolve_artifact_names
+ def test_prop_shadow(self):
+ """A backref name may not shadow an existing property name."""
+
+ mapper(Address, addresses)
+ mapper(User, users,
+ properties={
+ 'addresses':relation(Address, backref='email_address')
+ })
+ assert_raises(sa.exc.ArgumentError, sa.orm.compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_update_attr_keys(self):
+ """test that update()/insert() use the correct key when given InstrumentedAttributes."""
+
+ mapper(User, users, properties={
+ 'foobar':users.c.name
+ })
+
+ users.insert().values({User.foobar:'name1'}).execute()
+ eq_(sa.select([User.foobar]).where(User.foobar=='name1').execute().fetchall(), [('name1',)])
+
+ users.update().values({User.foobar:User.foobar + 'foo'}).execute()
+ eq_(sa.select([User.foobar]).where(User.foobar=='name1foo').execute().fetchall(), [('name1foo',)])
+
+ @testing.resolve_artifact_names
+ def test_utils(self):
+ from sqlalchemy.orm.util import _is_mapped_class, _is_aliased_class
+
+ class Foo(object):
+ x = "something"
+ @property
+ def y(self):
+ return "somethign else"
+ m = mapper(Foo, users)
+ a1 = aliased(Foo)
+
+ f = Foo()
+
+ for fn, arg, ret in [
+ (_is_mapped_class, Foo.x, False),
+ (_is_mapped_class, Foo.y, False),
+ (_is_mapped_class, Foo, True),
+ (_is_mapped_class, f, False),
+ (_is_mapped_class, a1, True),
+ (_is_mapped_class, m, True),
+ (_is_aliased_class, a1, True),
+ (_is_aliased_class, Foo.x, False),
+ (_is_aliased_class, Foo.y, False),
+ (_is_aliased_class, Foo, False),
+ (_is_aliased_class, f, False),
+ (_is_aliased_class, a1, True),
+ (_is_aliased_class, m, False),
+ ]:
+ assert fn(arg) == ret
+
+
+
+ @testing.resolve_artifact_names
+ def test_prop_accessor(self):
+ mapper(User, users)
+ assert_raises(NotImplementedError,
+ getattr, sa.orm.class_mapper(User), 'properties')
+
+
+ @testing.resolve_artifact_names
+ def test_bad_cascade(self):
+ mapper(Address, addresses)
+ assert_raises(sa.exc.ArgumentError,
+ relation, Address, cascade="fake, all, delete-orphan")
+
+ @testing.resolve_artifact_names
+ def test_exceptions_sticky(self):
+ """test preservation of mapper compile errors raised during hasattr()."""
+
+ mapper(Address, addresses, properties={
+ 'user':relation(User)
+ })
+
+ hasattr(Address.user, 'property')
+ assert_raises_message(sa.exc.InvalidRequestError, r"suppressed within a hasattr\(\)", compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_column_prefix(self):
+ mapper(User, users, column_prefix='_', properties={
+ 'user_name': synonym('_name')
+ })
+
+ s = create_session()
+ u = s.query(User).get(7)
+ eq_(u._name, 'jack')
+ eq_(u._id,7)
+ u2 = s.query(User).filter_by(user_name='jack').one()
+ assert u is u2
+
+ @testing.resolve_artifact_names
+ def test_no_pks_1(self):
+ s = sa.select([users.c.name]).alias('foo')
+ assert_raises(sa.exc.ArgumentError, mapper, User, s)
+
+ @testing.emits_warning(
+ 'mapper Mapper|User|Select object creating an alias for '
+ 'the given selectable - use Class attributes for queries')
+ @testing.resolve_artifact_names
+ def test_no_pks_2(self):
+ s = sa.select([users.c.name])
+ assert_raises(sa.exc.ArgumentError, mapper, User, s)
+
+ @testing.resolve_artifact_names
+ def test_recompile_on_other_mapper(self):
+ """A compile trigger on an already-compiled mapper still triggers a check against all mappers."""
+ mapper(User, users)
+ sa.orm.compile_mappers()
+ assert sa.orm.mapperlib._new_mappers is False
+
+ m = mapper(Address, addresses, properties={
+ 'user': relation(User, backref="addresses")})
+
+ assert m.compiled is False
+ assert sa.orm.mapperlib._new_mappers is True
+ u = User()
+ assert User.addresses
+ assert sa.orm.mapperlib._new_mappers is False
+
+ @testing.resolve_artifact_names
+ def test_compile_on_session(self):
+ m = mapper(User, users)
+ session = create_session()
+ session.connection(m)
+
+ @testing.resolve_artifact_names
+ def test_incomplete_columns(self):
+ """Loading from a select which does not contain all columns"""
+ mapper(Address, addresses)
+ s = create_session()
+ a = s.query(Address).from_statement(
+ sa.select([addresses.c.id, addresses.c.user_id])).first()
+ eq_(a.user_id, 7)
+ eq_(a.id, 1)
+ # email address auto-defers
+ assert 'email_addres' not in a.__dict__
+ eq_(a.email_address, 'jack@bean.com')
+
+ @testing.resolve_artifact_names
+ def test_bad_constructor(self):
+ """If the construction of a mapped class fails, the instance does not get placed in the session"""
+ class Foo(object):
+ def __init__(self, one, two, _sa_session=None):
+ pass
+
+ mapper(Foo, users, extension=sa.orm.scoped_session(
+ create_session).extension)
+
+ sess = create_session()
+ assert_raises(TypeError, Foo, 'one', _sa_session=sess)
+ eq_(len(list(sess)), 0)
+ assert_raises(TypeError, Foo, 'one')
+ Foo('one', 'two', _sa_session=sess)
+ eq_(len(list(sess)), 1)
+
+ @testing.resolve_artifact_names
+ def test_constructor_exc_1(self):
+ """Exceptions raised in the mapped class are not masked by sa decorations"""
+ ex = AssertionError('oops')
+ sess = create_session()
+
+ class Foo(object):
+ def __init__(self, **kw):
+ raise ex
+ mapper(Foo, users)
+
+ try:
+ Foo()
+ assert False
+ except Exception, e:
+ assert e is ex
+
+ sa.orm.clear_mappers()
+ mapper(Foo, users, extension=sa.orm.scoped_session(
+ create_session).extension)
+ def bad_expunge(foo):
+ raise Exception("this exception should be stated as a warning")
+
+ sess.expunge = bad_expunge
+ assert_raises(sa.exc.SAWarning, Foo, _sa_session=sess)
+
+ @testing.resolve_artifact_names
+ def test_constructor_exc_2(self):
+ """TypeError is raised for illegal constructor args, whether or not explicit __init__ is present [ticket:908]."""
+
+ class Foo(object):
+ def __init__(self):
+ pass
+ class Bar(object):
+ pass
+
+ mapper(Foo, users)
+ mapper(Bar, addresses)
+ assert_raises(TypeError, Foo, x=5)
+ assert_raises(TypeError, Bar, x=5)
+
+ @testing.resolve_artifact_names
+ def test_props(self):
+ m = mapper(User, users, properties = {
+ 'addresses' : relation(mapper(Address, addresses))
+ }).compile()
+ assert User.addresses.property is m.get_property('addresses')
+
+ @testing.resolve_artifact_names
+ def test_compile_on_prop_1(self):
+ mapper(User, users, properties = {
+ 'addresses' : relation(mapper(Address, addresses))
+ })
+ User.addresses.any(Address.email_address=='foo@bar.com')
+
+ @testing.resolve_artifact_names
+ def test_compile_on_prop_2(self):
+ mapper(User, users, properties = {
+ 'addresses' : relation(mapper(Address, addresses))
+ })
+ eq_(str(User.id == 3), str(users.c.id==3))
+
+ @testing.resolve_artifact_names
+ def test_compile_on_prop_3(self):
+ class Foo(User):pass
+ mapper(User, users)
+ mapper(Foo, addresses, inherits=User)
+ assert getattr(Foo().__class__, 'name').impl is not None
+
+ @testing.resolve_artifact_names
+ def test_deferred_subclass_attribute_instrument(self):
+ class Foo(User):pass
+ mapper(User, users)
+ compile_mappers()
+ mapper(Foo, addresses, inherits=User)
+ assert getattr(Foo().__class__, 'name').impl is not None
+
+ @testing.resolve_artifact_names
+ def test_compile_on_get_props_1(self):
+ m =mapper(User, users)
+ assert not m.compiled
+ assert list(m.iterate_properties)
+ assert m.compiled
+
+ @testing.resolve_artifact_names
+ def test_compile_on_get_props_2(self):
+ m= mapper(User, users)
+ assert not m.compiled
+ assert m.get_property('name')
+ assert m.compiled
+
+ @testing.resolve_artifact_names
+ def test_add_property(self):
+ assert_col = []
+
+ class User(_base.ComparableEntity):
+ def _get_name(self):
+ assert_col.append(('get', self._name))
+ return self._name
+ def _set_name(self, name):
+ assert_col.append(('set', name))
+ self._name = name
+ name = property(_get_name, _set_name)
+
+ def _uc_name(self):
+ if self._name is None:
+ return None
+ return self._name.upper()
+ uc_name = property(_uc_name)
+ uc_name2 = property(_uc_name)
+
+ m = mapper(User, users)
+ mapper(Address, addresses)
+
+ class UCComparator(sa.orm.PropComparator):
+ __hash__ = None
+
+ def __eq__(self, other):
+ cls = self.prop.parent.class_
+ col = getattr(cls, 'name')
+ if other is None:
+ return col == None
+ else:
+ return sa.func.upper(col) == sa.func.upper(other)
+
+ m.add_property('_name', deferred(users.c.name))
+ m.add_property('name', synonym('_name'))
+ m.add_property('addresses', relation(Address))
+ m.add_property('uc_name', sa.orm.comparable_property(UCComparator))
+ m.add_property('uc_name2', sa.orm.comparable_property(
+ UCComparator, User.uc_name2))
+
+ sess = create_session(autocommit=False)
+ assert sess.query(User).get(7)
+
+ u = sess.query(User).filter_by(name='jack').one()
+
+ def go():
+ eq_(len(u.addresses),
+ len(self.static.user_address_result[0].addresses))
+ eq_(u.name, 'jack')
+ eq_(u.uc_name, 'JACK')
+ eq_(u.uc_name2, 'JACK')
+ eq_(assert_col, [('get', 'jack')], str(assert_col))
+ self.sql_count_(2, go)
+
+ u.name = 'ed'
+ u3 = User()
+ u3.name = 'some user'
+ sess.add(u3)
+ sess.flush()
+ sess.rollback()
+
+ @testing.resolve_artifact_names
+ def test_replace_property(self):
+ m = mapper(User, users)
+ m.add_property('_name',users.c.name)
+ m.add_property('name', synonym('_name', proxy=True))
+
+ sess = create_session()
+ u = sess.query(User).filter_by(name='jack').one()
+ eq_(u._name, 'jack')
+ eq_(u.name, 'jack')
+ u.name = 'jacko'
+ assert m._columntoproperty[users.c.name] is m.get_property('_name')
+
+ sa.orm.clear_mappers()
+
+ m = mapper(User, users)
+ m.add_property('name', synonym('_name', map_column=True))
+
+ sess.expunge_all()
+ u = sess.query(User).filter_by(name='jack').one()
+ eq_(u._name, 'jack')
+ eq_(u.name, 'jack')
+ u.name = 'jacko'
+ assert m._columntoproperty[users.c.name] is m.get_property('_name')
+
+ @testing.resolve_artifact_names
+ def test_synonym_replaces_backref(self):
+ assert_calls = []
+ class Address(object):
+ def _get_user(self):
+ assert_calls.append("get")
+ return self._user
+ def _set_user(self, user):
+ assert_calls.append("set")
+ self._user = user
+ user = property(_get_user, _set_user)
+
+ # synonym is created against nonexistent prop
+ mapper(Address, addresses, properties={
+ 'user':synonym('_user')
+ })
+ sa.orm.compile_mappers()
+
+ # later, backref sets up the prop
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='_user')
+ })
+
+ sess = create_session()
+ u1 = sess.query(User).get(7)
+ u2 = sess.query(User).get(8)
+ # comparaison ops need to work
+ a1 = sess.query(Address).filter(Address.user==u1).one()
+ eq_(a1.id, 1)
+ a1.user = u2
+ assert a1.user is u2
+ eq_(assert_calls, ["set", "get"])
+
+ @testing.resolve_artifact_names
+ def test_self_ref_synonym(self):
+ t = Table('nodes', MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')))
+
+ class Node(object):
+ pass
+
+ mapper(Node, t, properties={
+ '_children':relation(Node, backref=backref('_parent', remote_side=t.c.id)),
+ 'children':synonym('_children'),
+ 'parent':synonym('_parent')
+ })
+
+ n1 = Node()
+ n2 = Node()
+ n1.children.append(n2)
+ assert n2.parent is n2._parent is n1
+ assert n1.children[0] is n1._children[0] is n2
+ eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id")
+
+ @testing.resolve_artifact_names
+ def test_illegal_non_primary(self):
+ mapper(User, users)
+ mapper(Address, addresses)
+ try:
+ mapper(User, users, non_primary=True, properties={
+ 'addresses':relation(Address)
+ }).compile()
+ assert False
+ except sa.exc.ArgumentError, e:
+ assert "Attempting to assign a new relation 'addresses' to a non-primary mapper on class 'User'" in str(e)
+
+ @testing.resolve_artifact_names
+ def test_illegal_non_primary_2(self):
+ try:
+ mapper(User, users, non_primary=True)
+ assert False
+ except sa.exc.InvalidRequestError, e:
+ assert "Configure a primary mapper first" in str(e)
+
+ @testing.resolve_artifact_names
+ def test_prop_filters(self):
+ t = Table('person', MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('type', String(128)),
+ Column('name', String(128)),
+ Column('employee_number', Integer),
+ Column('boss_id', Integer, ForeignKey('person.id')),
+ Column('vendor_id', Integer))
+
+ class Person(object): pass
+ class Vendor(Person): pass
+ class Employee(Person): pass
+ class Manager(Employee): pass
+ class Hoho(object): pass
+ class Lala(object): pass
+
+ class HasDef(object):
+ def name(self):
+ pass
+
+ p_m = mapper(Person, t, polymorphic_on=t.c.type,
+ include_properties=('id', 'type', 'name'))
+ e_m = mapper(Employee, inherits=p_m, polymorphic_identity='employee',
+ properties={
+ 'boss': relation(Manager, backref=backref('peon', ), remote_side=t.c.id)
+ },
+ exclude_properties=('vendor_id',))
+
+ m_m = mapper(Manager, inherits=e_m, polymorphic_identity='manager',
+ include_properties=('id', 'type'))
+
+ v_m = mapper(Vendor, inherits=p_m, polymorphic_identity='vendor',
+ exclude_properties=('boss_id', 'employee_number'))
+ h_m = mapper(Hoho, t, include_properties=('id', 'type', 'name'))
+ l_m = mapper(Lala, t, exclude_properties=('vendor_id', 'boss_id'),
+ column_prefix="p_")
+
+ hd_m = mapper(HasDef, t, column_prefix="h_")
+
+ p_m.compile()
+ #sa.orm.compile_mappers()
+
+ def assert_props(cls, want):
+ have = set([n for n in dir(cls) if not n.startswith('_')])
+ want = set(want)
+ eq_(have, want)
+
+ def assert_instrumented(cls, want):
+ have = set([p.key for p in class_mapper(cls).iterate_properties])
+ want = set(want)
+ eq_(have, want)
+
+ assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id', 'name', 'h_name', 'h_vendor_id', 'h_type'])
+ assert_props(Person, ['id', 'name', 'type'])
+ assert_instrumented(Person, ['id', 'name', 'type'])
+ assert_props(Employee, ['boss', 'boss_id', 'employee_number',
+ 'id', 'name', 'type'])
+ assert_instrumented(Employee,['boss', 'boss_id', 'employee_number',
+ 'id', 'name', 'type'])
+ assert_props(Manager, ['boss', 'boss_id', 'employee_number', 'peon',
+ 'id', 'name', 'type'])
+
+ # 'peon' and 'type' are both explicitly stated properties
+ assert_instrumented(Manager, ['peon', 'type', 'id'])
+
+ assert_props(Vendor, ['vendor_id', 'id', 'name', 'type'])
+ assert_props(Hoho, ['id', 'name', 'type'])
+ assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])
+
+ # excluding the discriminator column is currently not allowed
+ class Foo(Person):
+ pass
+ assert_raises(sa.exc.InvalidRequestError, mapper, Foo, inherits=Person, polymorphic_identity='foo', exclude_properties=('type',) )
+
+ @testing.resolve_artifact_names
+ def test_mapping_to_join(self):
+ """Mapping to a join"""
+ usersaddresses = sa.join(users, addresses,
+ users.c.id == addresses.c.user_id)
+ mapper(User, usersaddresses, primary_key=[users.c.id])
+ l = create_session().query(User).order_by(users.c.id).all()
+ eq_(l, self.static.user_result[:3])
+
+ @testing.resolve_artifact_names
+ def test_mapping_to_join_no_pk(self):
+ m = mapper(Address, addresses.join(email_bounces))
+ m.compile()
+ assert addresses in m._pks_by_table
+ assert email_bounces not in m._pks_by_table
+
+ sess = create_session()
+ a = Address(id=10, email_address='e1')
+ sess.add(a)
+ sess.flush()
+
+ eq_(addresses.count().scalar(), 6)
+ eq_(email_bounces.count().scalar(), 5)
+
+ @testing.resolve_artifact_names
+ def test_mapping_to_outerjoin(self):
+ """Mapping to an outer join with a nullable composite primary key."""
+
+
+ mapper(User, users.outerjoin(addresses),
+ allow_null_pks=True,
+ primary_key=[users.c.id, addresses.c.id],
+ properties=dict(
+ address_id=addresses.c.id))
+
+ session = create_session()
+ l = session.query(User).order_by(User.id, User.address_id).all()
+
+ eq_(l, [
+ User(id=7, address_id=1),
+ User(id=8, address_id=2),
+ User(id=8, address_id=3),
+ User(id=8, address_id=4),
+ User(id=9, address_id=5),
+ User(id=10, address_id=None)])
+
+ @testing.resolve_artifact_names
+ def test_custom_join(self):
+ """select_from totally replace the FROM parameters."""
+
+ mapper(Item, items)
+
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, order_items)))
+
+ mapper(User, users, properties=dict(
+ orders=relation(Order)))
+
+ session = create_session()
+ l = (session.query(User).
+ select_from(users.join(orders).
+ join(order_items).
+ join(items)).
+ filter(items.c.description == 'item 4')).all()
+
+ eq_(l, [self.static.user_result[0]])
+
+ @testing.resolve_artifact_names
+ def test_cancel_order_by(self):
+ mapper(User, users, order_by=users.c.name.desc())
+
+ assert "order by users.name desc" in str(create_session().query(User).statement).lower()
+ assert "order by" not in str(create_session().query(User).order_by(None).statement).lower()
+ assert "order by users.name asc" in str(create_session().query(User).order_by(User.name.asc()).statement).lower()
+
+ eq_(
+ create_session().query(User).all(),
+ [User(id=7, name=u'jack'), User(id=9, name=u'fred'), User(id=8, name=u'ed'), User(id=10, name=u'chuck')]
+ )
+
+ eq_(
+ create_session().query(User).order_by(User.name).all(),
+ [User(id=10, name=u'chuck'), User(id=8, name=u'ed'), User(id=9, name=u'fred'), User(id=7, name=u'jack')]
+ )
+
+ # 'Raises a "expression evaluation not supported" error at prepare time
+ @testing.fails_on('firebird', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_function(self):
+ """Mapping to a SELECT statement that has functions in it."""
+
+ s = sa.select([users,
+ (users.c.id * 2).label('concat'),
+ sa.func.count(addresses.c.id).label('count')],
+ users.c.id == addresses.c.user_id,
+ group_by=[c for c in users.c]).alias('myselect')
+
+ mapper(User, s, order_by=s.c.id)
+ sess = create_session()
+ l = sess.query(User).all()
+
+ for idx, total in enumerate((14, 16)):
+ eq_(l[idx].concat, l[idx].id * 2)
+ eq_(l[idx].concat, total)
+
+ @testing.resolve_artifact_names
+ def test_count(self):
+ """The count function on Query."""
+
+ mapper(User, users)
+
+ session = create_session()
+ q = session.query(User)
+
+ eq_(q.count(), 4)
+ eq_(q.filter(User.id.in_([8,9])).count(), 2)
+ eq_(q.filter(users.c.id.in_([8,9])).count(), 2)
+
+ eq_(session.query(User.id).count(), 4)
+ eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2)
+
+ @testing.resolve_artifact_names
+ def test_many_to_many_count(self):
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties=dict(
+ keywords = relation(Keyword, item_keywords, lazy=True)))
+
+ session = create_session()
+ q = (session.query(Item).
+ join('keywords').
+ distinct().
+ filter(Keyword.name == "red"))
+ eq_(q.count(), 2)
+
+ @testing.resolve_artifact_names
+ def test_override_1(self):
+ """Overriding a column raises an error."""
+ def go():
+ mapper(User, users,
+ properties=dict(
+ name=relation(mapper(Address, addresses))))
+
+ assert_raises(sa.exc.ArgumentError, go)
+
+ @testing.resolve_artifact_names
+ def test_override_2(self):
+ """exclude_properties cancels the error."""
+
+ mapper(User, users,
+ exclude_properties=['name'],
+ properties=dict(
+ name=relation(mapper(Address, addresses))))
+
+ assert bool(User.name)
+
+ @testing.resolve_artifact_names
+ def test_override_3(self):
+ """The column being named elsewhere also cancels the error,"""
+ mapper(User, users,
+ properties=dict(
+ name=relation(mapper(Address, addresses)),
+ foo=users.c.name))
+
+ @testing.resolve_artifact_names
+ def test_synonym(self):
+
+ assert_col = []
+ class extendedproperty(property):
+ attribute = 123
+ def __getitem__(self, key):
+ return 'value'
+
+ class User(object):
+ def _get_name(self):
+ assert_col.append(('get', self.name))
+ return self.name
+ def _set_name(self, name):
+ assert_col.append(('set', name))
+ self.name = name
+ uname = extendedproperty(_get_name, _set_name)
+
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=True),
+ uname = synonym('name'),
+ adlist = synonym('addresses', proxy=True),
+ adname = synonym('addresses')
+ ))
+
+ # ensure the synonym can get at the proxied comparators without
+ # an explicit compile
+ User.name == 'ed'
+ User.adname.any()
+
+ assert hasattr(User, 'adlist')
+ # as of 0.4.2, synonyms always create a property
+ assert hasattr(User, 'adname')
+
+ # test compile
+ assert not isinstance(User.uname == 'jack', bool)
+
+ assert User.uname.property
+ assert User.adlist.property
+
+ sess = create_session()
+
+ # test RowTuple names
+ row = sess.query(User.id, User.uname).first()
+ assert row.uname == row[1]
+
+ u = sess.query(User).filter(User.uname=='jack').one()
+
+ fixture = self.static.user_address_result[0].addresses
+ eq_(u.adlist, fixture)
+
+ addr = sess.query(Address).filter_by(id=fixture[0].id).one()
+ u = sess.query(User).filter(User.adname.contains(addr)).one()
+ u2 = sess.query(User).filter(User.adlist.contains(addr)).one()
+
+ assert u is u2
+
+ assert u not in sess.dirty
+ u.uname = "some user name"
+ assert len(assert_col) > 0
+ eq_(assert_col, [('set', 'some user name')])
+ eq_(u.uname, "some user name")
+ eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')])
+ eq_(u.name, "some user name")
+ assert u in sess.dirty
+
+ eq_(User.uname.attribute, 123)
+ eq_(User.uname['key'], 'value')
+
+ @testing.resolve_artifact_names
+ def test_synonym_column_location(self):
+ def go():
+ mapper(User, users, properties={
+ 'not_name':synonym('_name', map_column=True)})
+
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ ("Can't compile synonym '_name': no column on table "
+ "'users' named 'not_name'"),
+ go)
+
+ @testing.resolve_artifact_names
+ def test_column_synonyms(self):
+ """Synonyms which automatically instrument properties, set up aliased column, etc."""
+
+
+ assert_col = []
+ class User(object):
+ def _get_name(self):
+ assert_col.append(('get', self._name))
+ return self._name
+ def _set_name(self, name):
+ assert_col.append(('set', name))
+ self._name = name
+ name = property(_get_name, _set_name)
+
+ mapper(Address, addresses)
+ mapper(User, users, properties = {
+ 'addresses':relation(Address, lazy=True),
+ 'name':synonym('_name', map_column=True)
+ })
+
+ # test compile
+ assert not isinstance(User.name == 'jack', bool)
+
+ assert hasattr(User, 'name')
+ assert hasattr(User, '_name')
+
+ sess = create_session()
+ u = sess.query(User).filter(User.name == 'jack').one()
+ eq_(u.name, 'jack')
+ u.name = 'foo'
+ eq_(u.name, 'foo')
+ eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')])
+
+ @testing.resolve_artifact_names
+ def test_comparable(self):
+ class extendedproperty(property):
+ attribute = 123
+
+ def method1(self):
+ return "method1"
+
+ def __getitem__(self, key):
+ return 'value'
+
+ class UCComparator(sa.orm.PropComparator):
+ __hash__ = None
+
+ def method1(self):
+ return "uccmethod1"
+
+ def method2(self, other):
+ return "method2"
+
+ def __eq__(self, other):
+ cls = self.prop.parent.class_
+ col = getattr(cls, 'name')
+ if other is None:
+ return col == None
+ else:
+ return sa.func.upper(col) == sa.func.upper(other)
+
+ def map_(with_explicit_property):
+ class User(object):
+ @extendedproperty
+ def uc_name(self):
+ if self.name is None:
+ return None
+ return self.name.upper()
+ if with_explicit_property:
+ args = (UCComparator, User.uc_name)
+ else:
+ args = (UCComparator,)
+
+ mapper(User, users, properties=dict(
+ uc_name = sa.orm.comparable_property(*args)))
+ return User
+
+ for User in (map_(True), map_(False)):
+ sess = create_session()
+ sess.begin()
+ q = sess.query(User)
+
+ assert hasattr(User, 'name')
+ assert hasattr(User, 'uc_name')
+
+ eq_(User.uc_name.method1(), "method1")
+ eq_(User.uc_name.method2('x'), "method2")
+
+ assert_raises_message(
+ AttributeError,
+ "Neither 'extendedproperty' object nor 'UCComparator' object has an attribute 'nonexistent'",
+ getattr, User.uc_name, 'nonexistent')
+
+ # test compile
+ assert not isinstance(User.uc_name == 'jack', bool)
+ u = q.filter(User.uc_name=='JACK').one()
+
+ assert u.uc_name == "JACK"
+ assert u not in sess.dirty
+
+ u.name = "some user name"
+ eq_(u.name, "some user name")
+ assert u in sess.dirty
+ eq_(u.uc_name, "SOME USER NAME")
+
+ sess.flush()
+ sess.expunge_all()
+
+ q = sess.query(User)
+ u2 = q.filter(User.name=='some user name').one()
+ u3 = q.filter(User.uc_name=='SOME USER NAME').one()
+
+ assert u2 is u3
+
+ eq_(User.uc_name.attribute, 123)
+ eq_(User.uc_name['key'], 'value')
+ sess.rollback()
+
+ @testing.resolve_artifact_names
+ def test_comparable_column(self):
+ class MyComparator(sa.orm.properties.ColumnProperty.Comparator):
+ def __eq__(self, other):
+ # lower case comparison
+ return func.lower(self.__clause_element__()) == func.lower(other)
+
+ def intersects(self, other):
+ # non-standard comparator
+ return self.__clause_element__().op('&=')(other)
+
+ mapper(User, users, properties={
+ 'name':sa.orm.column_property(users.c.name, comparator_factory=MyComparator)
+ })
+
+ assert_raises_message(
+ AttributeError,
+ "Neither 'InstrumentedAttribute' object nor 'MyComparator' object has an attribute 'nonexistent'",
+ getattr, User.name, "nonexistent")
+
+ eq_(str((User.name == 'ed').compile(dialect=sa.engine.default.DefaultDialect())) , "lower(users.name) = lower(:lower_1)")
+ eq_(str((User.name.intersects('ed')).compile(dialect=sa.engine.default.DefaultDialect())), "users.name &= :name_1")
+
+
+ @testing.resolve_artifact_names
+ def test_reconstructor(self):
+ recon = []
+
+ class User(object):
+ @reconstructor
+ def reconstruct(self):
+ recon.append('go')
+
+ mapper(User, users)
+
+ User()
+ eq_(recon, [])
+ create_session().query(User).first()
+ eq_(recon, ['go'])
+
+ @testing.resolve_artifact_names
+ def test_reconstructor_inheritance(self):
+ recon = []
+ class A(object):
+ @reconstructor
+ def reconstruct(self):
+ recon.append('A')
+
+ class B(A):
+ @reconstructor
+ def reconstruct(self):
+ recon.append('B')
+
+ class C(A):
+ @reconstructor
+ def reconstruct(self):
+ recon.append('C')
+
+ mapper(A, users, polymorphic_on=users.c.name,
+ polymorphic_identity='jack')
+ mapper(B, inherits=A, polymorphic_identity='ed')
+ mapper(C, inherits=A, polymorphic_identity='chuck')
+
+ A()
+ B()
+ C()
+ eq_(recon, [])
+
+ sess = create_session()
+ sess.query(A).first()
+ sess.query(B).first()
+ sess.query(C).first()
+ eq_(recon, ['A', 'B', 'C'])
+
+ @testing.resolve_artifact_names
+ def test_unmapped_reconstructor_inheritance(self):
+ recon = []
+ class Base(object):
+ @reconstructor
+ def reconstruct(self):
+ recon.append('go')
+
+ class User(Base):
+ pass
+
+ mapper(User, users)
+
+ User()
+ eq_(recon, [])
+
+ create_session().query(User).first()
+ eq_(recon, ['go'])
+
+ @testing.resolve_artifact_names
+ def test_unmapped_error(self):
+ mapper(Address, addresses)
+ sa.orm.clear_mappers()
+
+ mapper(User, users, properties={
+ 'addresses':relation(Address)
+ })
+
+ assert_raises(sa.orm.exc.UnmappedClassError, sa.orm.compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_oldstyle_mixin(self):
+ class OldStyle:
+ pass
+ class NewStyle(object):
+ pass
+
+ class A(NewStyle, OldStyle):
+ pass
+
+ mapper(A, users)
+
+ class B(OldStyle, NewStyle):
+ pass
+
+ mapper(B, users)
+
+
+class OptionsTest(_fixtures.FixtureTest):
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_synonym_options(self):
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=True,
+ order_by=addresses.c.id),
+ adlist = synonym('addresses', proxy=True)))
+
+
+ def go():
+ sess = create_session()
+ u = (sess.query(User).
+ order_by(User.id).
+ options(sa.orm.eagerload('adlist')).
+ filter_by(name='jack')).one()
+ eq_(u.adlist,
+ [self.static.user_address_result[0].addresses[0]])
+ self.assert_sql_count(testing.db, go, 1)
+
+ @testing.resolve_artifact_names
+ def test_eager_options(self):
+ """A lazy relation can be upgraded to an eager relation."""
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses),
+ order_by=addresses.c.id)))
+
+ sess = create_session()
+ l = (sess.query(User).
+ order_by(User.id).
+ options(sa.orm.eagerload('addresses'))).all()
+
+ def go():
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(0, go)
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_eager_options_with_limit(self):
+ mapper(User, users, properties=dict(
+ addresses=relation(mapper(Address, addresses), lazy=True)))
+
+ sess = create_session()
+ u = (sess.query(User).
+ options(sa.orm.eagerload('addresses')).
+ filter_by(id=8)).one()
+
+ def go():
+ eq_(u.id, 8)
+ eq_(len(u.addresses), 3)
+ self.sql_count_(0, go)
+
+ sess.expunge_all()
+
+ u = sess.query(User).filter_by(id=8).one()
+ eq_(u.id, 8)
+ eq_(len(u.addresses), 3)
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_lazy_options_with_limit(self):
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=False)))
+
+ sess = create_session()
+ u = (sess.query(User).
+ options(sa.orm.lazyload('addresses')).
+ filter_by(id=8)).one()
+
+ def go():
+ eq_(u.id, 8)
+ eq_(len(u.addresses), 3)
+ self.sql_count_(1, go)
+
+ @testing.resolve_artifact_names
+ def test_eager_degrade(self):
+ """An eager relation automatically degrades to a lazy relation if eager columns are not available"""
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=False)))
+
+ sess = create_session()
+ # first test straight eager load, 1 statement
+ def go():
+ l = sess.query(User).order_by(User.id).all()
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(1, go)
+
+ sess.expunge_all()
+
+ # then select just from users. run it into instances.
+ # then assert the data, which will launch 3 more lazy loads
+ # (previous users in session fell out of scope and were removed from
+ # session's identity map)
+ r = users.select().order_by(users.c.id).execute()
+ def go():
+ l = list(sess.query(User).instances(r))
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(4, go)
+
+
+ @testing.resolve_artifact_names
+ def test_eager_degrade_deep(self):
+ # test with a deeper set of eager loads. when we first load the three
+ # users, they will have no addresses or orders. the number of lazy
+ # loads when traversing the whole thing will be three for the
+ # addresses and three for the orders.
+ mapper(Address, addresses)
+
+ mapper(Keyword, keywords)
+
+ mapper(Item, items, properties=dict(
+ keywords=relation(Keyword, secondary=item_keywords,
+ lazy=False,
+ order_by=item_keywords.c.keyword_id)))
+
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, secondary=order_items, lazy=False,
+ order_by=order_items.c.item_id)))
+
+ mapper(User, users, properties=dict(
+ addresses=relation(Address, lazy=False,
+ order_by=addresses.c.id),
+ orders=relation(Order, lazy=False,
+ order_by=orders.c.id)))
+
+ sess = create_session()
+
+ # first test straight eager load, 1 statement
+ def go():
+ l = sess.query(User).order_by(User.id).all()
+ eq_(l, self.static.user_all_result)
+ self.assert_sql_count(testing.db, go, 1)
+
+ sess.expunge_all()
+
+ # then select just from users. run it into instances.
+ # then assert the data, which will launch 6 more lazy loads
+ r = users.select().execute()
+ def go():
+ l = list(sess.query(User).instances(r))
+ eq_(l, self.static.user_all_result)
+ self.assert_sql_count(testing.db, go, 6)
+
+ @testing.resolve_artifact_names
+ def test_lazy_options(self):
+ """An eager relation can be upgraded to a lazy relation."""
+ mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=False)
+ ))
+
+ sess = create_session()
+ l = (sess.query(User).
+ order_by(User.id).
+ options(sa.orm.lazyload('addresses'))).all()
+
+ def go():
+ eq_(l, self.static.user_address_result)
+ self.sql_count_(4, go)
+
+
+class DeepOptionsTest(_fixtures.FixtureTest):
+ @classmethod
+ @testing.resolve_artifact_names
+ def setup_mappers(cls):
+ mapper(Keyword, keywords)
+
+ mapper(Item, items, properties=dict(
+ keywords=relation(Keyword, item_keywords,
+ order_by=item_keywords.c.item_id)))
+
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, order_items,
+ order_by=items.c.id)))
+
+ mapper(User, users, order_by=users.c.id, properties=dict(
+ orders=relation(Order, order_by=orders.c.id)))
+
+ @testing.resolve_artifact_names
+ def test_deep_options_1(self):
+ sess = create_session()
+
+ # eagerload nothing.
+ u = sess.query(User).all()
+ def go():
+ x = u[0].orders[1].items[0].keywords[1]
+ self.assert_sql_count(testing.db, go, 3)
+
+ @testing.resolve_artifact_names
+ def test_deep_options_2(self):
+ sess = create_session()
+
+ # eagerload orders.items.keywords; eagerload_all() implies eager load
+ # of orders, orders.items
+ l = (sess.query(User).
+ options(sa.orm.eagerload_all('orders.items.keywords'))).all()
+ def go():
+ x = l[0].orders[1].items[0].keywords[1]
+ self.sql_count_(0, go)
+
+
+ @testing.resolve_artifact_names
+ def test_deep_options_3(self):
+ sess = create_session()
+
+ # same thing, with separate options calls
+ q2 = (sess.query(User).
+ options(sa.orm.eagerload('orders')).
+ options(sa.orm.eagerload('orders.items')).
+ options(sa.orm.eagerload('orders.items.keywords')))
+ u = q2.all()
+ def go():
+ x = u[0].orders[1].items[0].keywords[1]
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_deep_options_4(self):
+ sess = create_session()
+
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ r"Can't find entity Mapper\|Order\|orders in Query. "
+ r"Current list: \['Mapper\|User\|users'\]",
+ sess.query(User).options, sa.orm.eagerload(Order.items))
+
+ # eagerload "keywords" on items. it will lazy load "orders", then
+ # lazy load the "items" on the order, but on "items" it will eager
+ # load the "keywords"
+ q3 = sess.query(User).options(sa.orm.eagerload('orders.items.keywords'))
+ u = q3.all()
+ def go():
+ x = u[0].orders[1].items[0].keywords[1]
+ self.sql_count_(2, go)
+
+class ValidatorTest(_fixtures.FixtureTest):
+ @testing.resolve_artifact_names
+ def test_scalar(self):
+ class User(_base.ComparableEntity):
+ @validates('name')
+ def validate_name(self, key, name):
+ assert name != 'fred'
+ return name + ' modified'
+
+ mapper(User, users)
+ sess = create_session()
+ u1 = User(name='ed')
+ eq_(u1.name, 'ed modified')
+ assert_raises(AssertionError, setattr, u1, "name", "fred")
+ eq_(u1.name, 'ed modified')
+ sess.add(u1)
+ sess.flush()
+ sess.expunge_all()
+ eq_(sess.query(User).filter_by(name='ed modified').one(), User(name='ed'))
+
+
+ @testing.resolve_artifact_names
+ def test_collection(self):
+ class User(_base.ComparableEntity):
+ @validates('addresses')
+ def validate_address(self, key, ad):
+ assert '@' in ad.email_address
+ return ad
+
+ mapper(User, users, properties={'addresses':relation(Address)})
+ mapper(Address, addresses)
+ sess = create_session()
+ u1 = User(name='edward')
+ assert_raises(AssertionError, u1.addresses.append, Address(email_address='noemail'))
+ u1.addresses.append(Address(id=15, email_address='foo@bar.com'))
+ sess.add(u1)
+ sess.flush()
+ sess.expunge_all()
+ eq_(
+ sess.query(User).filter_by(name='edward').one(),
+ User(name='edward', addresses=[Address(email_address='foo@bar.com')])
+ )
+
+class ComparatorFactoryTest(_fixtures.FixtureTest, AssertsCompiledSQL):
+ @testing.resolve_artifact_names
+ def test_kwarg_accepted(self):
+ class DummyComposite(object):
+ def __init__(self, x, y):
+ pass
+
+ from sqlalchemy.orm.interfaces import PropComparator
+
+ class MyFactory(PropComparator):
+ pass
+
+ for args in (
+ (column_property, users.c.name),
+ (deferred, users.c.name),
+ (synonym, 'name'),
+ (composite, DummyComposite, users.c.id, users.c.name),
+ (relation, Address),
+ (backref, 'address'),
+ (comparable_property, ),
+ (dynamic_loader, Address)
+ ):
+ fn = args[0]
+ args = args[1:]
+ fn(comparator_factory=MyFactory, *args)
+
+ @testing.resolve_artifact_names
+ def test_column(self):
+ from sqlalchemy.orm.properties import ColumnProperty
+
+ class MyFactory(ColumnProperty.Comparator):
+ __hash__ = None
+ def __eq__(self, other):
+ return func.foobar(self.__clause_element__()) == func.foobar(other)
+ mapper(User, users, properties={'name':column_property(users.c.name, comparator_factory=MyFactory)})
+ self.assert_compile(User.name == 'ed', "foobar(users.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+ self.assert_compile(aliased(User).name == 'ed', "foobar(users_1.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+
+ @testing.resolve_artifact_names
+ def test_synonym(self):
+ from sqlalchemy.orm.properties import ColumnProperty
+ class MyFactory(ColumnProperty.Comparator):
+ __hash__ = None
+ def __eq__(self, other):
+ return func.foobar(self.__clause_element__()) == func.foobar(other)
+ mapper(User, users, properties={'name':synonym('_name', map_column=True, comparator_factory=MyFactory)})
+ self.assert_compile(User.name == 'ed', "foobar(users.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+ self.assert_compile(aliased(User).name == 'ed', "foobar(users_1.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+
+ @testing.resolve_artifact_names
+ def test_relation(self):
+ from sqlalchemy.orm.properties import PropertyLoader
+
+ class MyFactory(PropertyLoader.Comparator):
+ __hash__ = None
+ def __eq__(self, other):
+ return func.foobar(self.__clause_element__().c.user_id) == func.foobar(other.id)
+
+ class MyFactory2(PropertyLoader.Comparator):
+ __hash__ = None
+ def __eq__(self, other):
+ return func.foobar(self.__clause_element__().c.id) == func.foobar(other.user_id)
+
+ mapper(User, users)
+ mapper(Address, addresses, properties={
+ 'user':relation(User, comparator_factory=MyFactory,
+ backref=backref("addresses", comparator_factory=MyFactory2)
+ )
+ }
+ )
+ self.assert_compile(Address.user == User(id=5), "foobar(addresses.user_id) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+ self.assert_compile(User.addresses == Address(id=5, user_id=7), "foobar(users.id) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+
+ self.assert_compile(aliased(Address).user == User(id=5), "foobar(addresses_1.user_id) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+ self.assert_compile(aliased(User).addresses == Address(id=5, user_id=7), "foobar(users_1.id) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+
+
+class DeferredTest(_fixtures.FixtureTest):
+
+ @testing.resolve_artifact_names
+ def test_basic(self):
+ """A basic deferred load."""
+
+ mapper(Order, orders, order_by=orders.c.id, properties={
+ 'description': deferred(orders.c.description)})
+
+ o = Order()
+ self.assert_(o.description is None)
+
+ q = create_session().query(Order)
+ def go():
+ l = q.all()
+ o2 = l[2]
+ x = o2.description
+
+ self.sql_eq_(go, [
+ ("SELECT orders.id AS orders_id, "
+ "orders.user_id AS orders_user_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders ORDER BY orders.id", {}),
+ ("SELECT orders.description AS orders_description "
+ "FROM orders WHERE orders.id = :param_1",
+ {'param_1':3})])
+
+ @testing.resolve_artifact_names
+ def test_unsaved(self):
+ """Deferred loading does not kick in when just PK cols are set."""
+
+ mapper(Order, orders, properties={
+ 'description': deferred(orders.c.description)})
+
+ sess = create_session()
+ o = Order()
+ sess.add(o)
+ o.id = 7
+ def go():
+ o.description = "some description"
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_synonym_group_bug(self):
+ mapper(Order, orders, properties={
+ 'isopen':synonym('_isopen', map_column=True),
+ 'description':deferred(orders.c.description, group='foo')
+ })
+
+ sess = create_session()
+ o1 = sess.query(Order).get(1)
+ eq_(o1.description, "order 1")
+
+ @testing.resolve_artifact_names
+ def test_unsaved_2(self):
+ mapper(Order, orders, properties={
+ 'description': deferred(orders.c.description)})
+
+ sess = create_session()
+ o = Order()
+ sess.add(o)
+ def go():
+ o.description = "some description"
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_unsaved_group(self):
+ """Deferred loading doesnt kick in when just PK cols are set"""
+
+ mapper(Order, orders, order_by=orders.c.id, properties=dict(
+ description=deferred(orders.c.description, group='primary'),
+ opened=deferred(orders.c.isopen, group='primary')))
+
+ sess = create_session()
+ o = Order()
+ sess.add(o)
+ o.id = 7
+ def go():
+ o.description = "some description"
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_unsaved_group_2(self):
+ mapper(Order, orders, order_by=orders.c.id, properties=dict(
+ description=deferred(orders.c.description, group='primary'),
+ opened=deferred(orders.c.isopen, group='primary')))
+
+ sess = create_session()
+ o = Order()
+ sess.add(o)
+ def go():
+ o.description = "some description"
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_save(self):
+ m = mapper(Order, orders, properties={
+ 'description': deferred(orders.c.description)})
+
+ sess = create_session()
+ o2 = sess.query(Order).get(2)
+ o2.isopen = 1
+ sess.flush()
+
+ @testing.resolve_artifact_names
+ def test_group(self):
+ """Deferred load with a group"""
+ mapper(Order, orders, properties={
+ 'userident': deferred(orders.c.user_id, group='primary'),
+ 'addrident': deferred(orders.c.address_id, group='primary'),
+ 'description': deferred(orders.c.description, group='primary'),
+ 'opened': deferred(orders.c.isopen, group='primary')
+ })
+
+ sess = create_session()
+ q = sess.query(Order).order_by(Order.id)
+ def go():
+ l = q.all()
+ o2 = l[2]
+ eq_(o2.opened, 1)
+ eq_(o2.userident, 7)
+ eq_(o2.description, 'order 3')
+
+ self.sql_eq_(go, [
+ ("SELECT orders.id AS orders_id "
+ "FROM orders ORDER BY orders.id", {}),
+ ("SELECT orders.user_id AS orders_user_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders WHERE orders.id = :param_1",
+ {'param_1':3})])
+
+ o2 = q.all()[2]
+ eq_(o2.description, 'order 3')
+ assert o2 not in sess.dirty
+ o2.description = 'order 3'
+ def go():
+ sess.flush()
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_preserve_changes(self):
+ """A deferred load operation doesn't revert modifications on attributes"""
+ mapper(Order, orders, properties = {
+ 'userident': deferred(orders.c.user_id, group='primary'),
+ 'description': deferred(orders.c.description, group='primary'),
+ 'opened': deferred(orders.c.isopen, group='primary')
+ })
+ sess = create_session()
+ o = sess.query(Order).get(3)
+ assert 'userident' not in o.__dict__
+ o.description = 'somenewdescription'
+ eq_(o.description, 'somenewdescription')
+ def go():
+ eq_(o.opened, 1)
+ self.assert_sql_count(testing.db, go, 1)
+ eq_(o.description, 'somenewdescription')
+ assert o in sess.dirty
+
+ @testing.resolve_artifact_names
+ def test_commits_state(self):
+ """
+ When deferred elements are loaded via a group, they get the proper
+ CommittedState and don't result in changes being committed
+
+ """
+ mapper(Order, orders, properties = {
+ 'userident':deferred(orders.c.user_id, group='primary'),
+ 'description':deferred(orders.c.description, group='primary'),
+ 'opened':deferred(orders.c.isopen, group='primary')})
+
+ sess = create_session()
+ o2 = sess.query(Order).get(3)
+
+ # this will load the group of attributes
+ eq_(o2.description, 'order 3')
+ assert o2 not in sess.dirty
+ # this will mark it as 'dirty', but nothing actually changed
+ o2.description = 'order 3'
+ # therefore the flush() shouldnt actually issue any SQL
+ self.assert_sql_count(testing.db, sess.flush, 0)
+
+ @testing.resolve_artifact_names
+ def test_options(self):
+ """Options on a mapper to create deferred and undeferred columns"""
+
+ mapper(Order, orders)
+
+ sess = create_session()
+ q = sess.query(Order).order_by(Order.id).options(defer('user_id'))
+
+ def go():
+ q.all()[0].user_id
+
+ self.sql_eq_(go, [
+ ("SELECT orders.id AS orders_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders ORDER BY orders.id", {}),
+ ("SELECT orders.user_id AS orders_user_id "
+ "FROM orders WHERE orders.id = :param_1",
+ {'param_1':1})])
+ sess.expunge_all()
+
+ q2 = q.options(sa.orm.undefer('user_id'))
+ self.sql_eq_(q2.all, [
+ ("SELECT orders.id AS orders_id, "
+ "orders.user_id AS orders_user_id, "
+ "orders.address_id AS orders_address_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen "
+ "FROM orders ORDER BY orders.id",
+ {})])
+
+ @testing.resolve_artifact_names
+ def test_undefer_group(self):
+ mapper(Order, orders, properties={
+ 'userident':deferred(orders.c.user_id, group='primary'),
+ 'description':deferred(orders.c.description, group='primary'),
+ 'opened':deferred(orders.c.isopen, group='primary')})
+
+ sess = create_session()
+ q = sess.query(Order).order_by(Order.id)
+ def go():
+ l = q.options(sa.orm.undefer_group('primary')).all()
+ o2 = l[2]
+ eq_(o2.opened, 1)
+ eq_(o2.userident, 7)
+ eq_(o2.description, 'order 3')
+
+ self.sql_eq_(go, [
+ ("SELECT orders.user_id AS orders_user_id, "
+ "orders.description AS orders_description, "
+ "orders.isopen AS orders_isopen, "
+ "orders.id AS orders_id, "
+ "orders.address_id AS orders_address_id "
+ "FROM orders ORDER BY orders.id",
+ {})])
+
+ @testing.resolve_artifact_names
+ def test_locates_col(self):
+ """Manually adding a column to the result undefers the column."""
+
+ mapper(Order, orders, properties={
+ 'description':deferred(orders.c.description)})
+
+ sess = create_session()
+ o1 = sess.query(Order).order_by(Order.id).first()
+ def go():
+ eq_(o1.description, 'order 1')
+ self.sql_count_(1, go)
+
+ sess = create_session()
+ o1 = (sess.query(Order).
+ order_by(Order.id).
+ add_column(orders.c.description).first())[0]
+ def go():
+ eq_(o1.description, 'order 1')
+ self.sql_count_(0, go)
+
+ @testing.resolve_artifact_names
+ def test_deep_options(self):
+ mapper(Item, items, properties=dict(
+ description=deferred(items.c.description)))
+ mapper(Order, orders, properties=dict(
+ items=relation(Item, secondary=order_items)))
+ mapper(User, users, properties=dict(
+ orders=relation(Order, order_by=orders.c.id)))
+
+ sess = create_session()
+ q = sess.query(User).order_by(User.id)
+ l = q.all()
+ item = l[0].orders[1].items[1]
+ def go():
+ eq_(item.description, 'item 4')
+ self.sql_count_(1, go)
+ eq_(item.description, 'item 4')
+
+ sess.expunge_all()
+ l = q.options(sa.orm.undefer('orders.items.description')).all()
+ item = l[0].orders[1].items[1]
+ def go():
+ eq_(item.description, 'item 4')
+ self.sql_count_(0, go)
+ eq_(item.description, 'item 4')
+
+class DeferredPopulationTest(_base.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table("thing", metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(20)))
+
+ Table("human", metadata,
+ Column("id", Integer, primary_key=True),
+ Column("thing_id", Integer, ForeignKey("thing.id")),
+ Column("name", String(20)))
+
+ @classmethod
+ @testing.resolve_artifact_names
+ def setup_mappers(cls):
+ class Human(_base.BasicEntity): pass
+ class Thing(_base.BasicEntity): pass
+
+ mapper(Human, human, properties={"thing": relation(Thing)})
+ mapper(Thing, thing, properties={"name": deferred(thing.c.name)})
+
+ @classmethod
+ @testing.resolve_artifact_names
+ def insert_data(cls):
+ thing.insert().execute([
+ {"id": 1, "name": "Chair"},
+ ])
+
+ human.insert().execute([
+ {"id": 1, "thing_id": 1, "name": "Clark Kent"},
+ ])
+
+ def _test(self, thing):
+ assert "name" in attributes.instance_state(thing).dict
+
+ @testing.resolve_artifact_names
+ def test_no_previous_query(self):
+ session = create_session()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+ @testing.resolve_artifact_names
+ def test_query_twice_with_clear(self):
+ session = create_session()
+ result = session.query(Thing).first()
+ session.expunge_all()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+ @testing.resolve_artifact_names
+ def test_query_twice_no_clear(self):
+ session = create_session()
+ result = session.query(Thing).first()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+ @testing.resolve_artifact_names
+ def test_eagerload_with_clear(self):
+ session = create_session()
+ human = session.query(Human).options(sa.orm.eagerload("thing")).first()
+ session.expunge_all()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+ @testing.resolve_artifact_names
+ def test_eagerload_no_clear(self):
+ session = create_session()
+ human = session.query(Human).options(sa.orm.eagerload("thing")).first()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+ @testing.resolve_artifact_names
+ def test_join_with_clear(self):
+ session = create_session()
+ result = session.query(Human).add_entity(Thing).join("thing").first()
+ session.expunge_all()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+ @testing.resolve_artifact_names
+ def test_join_no_clear(self):
+ session = create_session()
+ result = session.query(Human).add_entity(Thing).join("thing").first()
+ thing = session.query(Thing).options(sa.orm.undefer("name")).first()
+ self._test(thing)
+
+
+class CompositeTypesTest(_base.MappedTest):
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('graphs', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('version_id', Integer, primary_key=True, nullable=True),
+ Column('name', String(30)))
+
+ Table('edges', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('graph_id', Integer, nullable=False),
+ Column('graph_version_id', Integer, nullable=False),
+ Column('x1', Integer),
+ Column('y1', Integer),
+ Column('x2', Integer),
+ Column('y2', Integer),
+ sa.ForeignKeyConstraint(
+ ['graph_id', 'graph_version_id'],
+ ['graphs.id', 'graphs.version_id']))
+
+ Table('foobars', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x1', Integer, default=2),
+ Column('x2', Integer),
+ Column('x3', Integer, default=15),
+ Column('x4', Integer)
+ )
+
+ @testing.resolve_artifact_names
+ def test_basic(self):
+ class Point(object):
+ def __init__(self, x, y):
+ self.x = x
+ self.y = y
+ def __composite_values__(self):
+ return [self.x, self.y]
+ __hash__ = None
+ def __eq__(self, other):
+ return isinstance(other, Point) and other.x == self.x and other.y == self.y
+ def __ne__(self, other):
+ return not isinstance(other, Point) or not self.__eq__(other)
+
+ class Graph(object):
+ pass
+ class Edge(object):
+ def __init__(self, start, end):
+ self.start = start
+ self.end = end
+
+ mapper(Graph, graphs, properties={
+ 'edges':relation(Edge)
+ })
+ mapper(Edge, edges, properties={
+ 'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1),
+ 'end': sa.orm.composite(Point, edges.c.x2, edges.c.y2)
+ })
+
+ sess = create_session()
+ g = Graph()
+ g.id = 1
+ g.version_id=1
+ g.edges.append(Edge(Point(3, 4), Point(5, 6)))
+ g.edges.append(Edge(Point(14, 5), Point(2, 7)))
+ sess.add(g)
+ sess.flush()
+
+ sess.expunge_all()
+ g2 = sess.query(Graph).get([g.id, g.version_id])
+ for e1, e2 in zip(g.edges, g2.edges):
+ eq_(e1.start, e2.start)
+ eq_(e1.end, e2.end)
+
+ g2.edges[1].end = Point(18, 4)
+ sess.flush()
+ sess.expunge_all()
+ e = sess.query(Edge).get(g2.edges[1].id)
+ eq_(e.end, Point(18, 4))
+
+ e.end.x = 19
+ e.end.y = 5
+ sess.flush()
+ sess.expunge_all()
+ eq_(sess.query(Edge).get(g2.edges[1].id).end, Point(19, 5))
+
+ g.edges[1].end = Point(19, 5)
+
+ sess.expunge_all()
+ def go():
+ g2 = (sess.query(Graph).
+ options(sa.orm.eagerload('edges'))).get([g.id, g.version_id])
+ for e1, e2 in zip(g.edges, g2.edges):
+ eq_(e1.start, e2.start)
+ eq_(e1.end, e2.end)
+ self.assert_sql_count(testing.db, go, 1)
+
+ # test comparison of CompositeProperties to their object instances
+ g = sess.query(Graph).get([1, 1])
+ assert sess.query(Edge).filter(Edge.start==Point(3, 4)).one() is g.edges[0]
+
+ assert sess.query(Edge).filter(Edge.start!=Point(3, 4)).first() is g.edges[1]
+
+ eq_(sess.query(Edge).filter(Edge.start==None).all(), [])
+
+ # query by columns
+ eq_(sess.query(Edge.start, Edge.end).all(), [(3, 4, 5, 6), (14, 5, 19, 5)])
+
+ e = g.edges[1]
+ e.end.x = e.end.y = None
+ sess.flush()
+ eq_(sess.query(Edge.start, Edge.end).all(), [(3, 4, 5, 6), (14, 5, None, None)])
+
+
+ @testing.resolve_artifact_names
+ def test_pk(self):
+ """Using a composite type as a primary key"""
+
+ class Version(object):
+ def __init__(self, id, version):
+ self.id = id
+ self.version = version
+ def __composite_values__(self):
+ return (self.id, self.version)
+ __hash__ = None
+ def __eq__(self, other):
+ return other.id == self.id and other.version == self.version
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ class Graph(object):
+ def __init__(self, version):
+ self.version = version
+
+ mapper(Graph, graphs, allow_null_pks=True, properties={
+ 'version':sa.orm.composite(Version, graphs.c.id,
+ graphs.c.version_id)})
+
+ sess = create_session()
+ g = Graph(Version(1, 1))
+ sess.add(g)
+ sess.flush()
+
+ sess.expunge_all()
+ g2 = sess.query(Graph).get([1, 1])
+ eq_(g.version, g2.version)
+ sess.expunge_all()
+
+ g2 = sess.query(Graph).get(Version(1, 1))
+ eq_(g.version, g2.version)
+
+ # test pk mutation
+ @testing.fails_on('mssql', 'Cannot update identity columns.')
+ def update_pk():
+ g2.version = Version(2, 1)
+ sess.flush()
+ g3 = sess.query(Graph).get(Version(2, 1))
+ eq_(g2.version, g3.version)
+ update_pk()
+
+ # test pk with one column NULL
+ # TODO: can't seem to get NULL in for a PK value
+ # in either mysql or postgres, autoincrement=False etc.
+ # notwithstanding
+ @testing.fails_on_everything_except("sqlite")
+ def go():
+ g = Graph(Version(2, None))
+ sess.add(g)
+ sess.flush()
+ sess.expunge_all()
+ g2 = sess.query(Graph).filter_by(version=Version(2, None)).one()
+ eq_(g.version, g2.version)
+ go()
+
+ @testing.resolve_artifact_names
+ def test_attributes_with_defaults(self):
+ class Foobar(object):
+ pass
+
+ class FBComposite(object):
+ def __init__(self, x1, x2, x3, x4):
+ self.x1 = x1
+ self.x2 = x2
+ self.x3 = x3
+ self.x4 = x4
+ def __composite_values__(self):
+ return self.x1, self.x2, self.x3, self.x4
+ __hash__ = None
+ def __eq__(self, other):
+ return other.x1 == self.x1 and other.x2 == self.x2 and other.x3 == self.x3 and other.x4 == self.x4
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ mapper(Foobar, foobars, properties=dict(
+ foob=sa.orm.composite(FBComposite, foobars.c.x1, foobars.c.x2, foobars.c.x3, foobars.c.x4)
+ ))
+
+ sess = create_session()
+ f1 = Foobar()
+ f1.foob = FBComposite(None, 5, None, None)
+ sess.add(f1)
+ sess.flush()
+
+ assert f1.foob == FBComposite(2, 5, 15, None)
+
+
+ f2 = Foobar()
+ sess.add(f2)
+ sess.flush()
+ assert f2.foob == FBComposite(2, None, 15, None)
+
+
+ @testing.resolve_artifact_names
+ def test_set_composite_values(self):
+ class Foobar(object):
+ pass
+
+ class FBComposite(object):
+ def __init__(self, x1, x2, x3, x4):
+ self.x1val = x1
+ self.x2val = x2
+ self.x3 = x3
+ self.x4 = x4
+ def __composite_values__(self):
+ return self.x1val, self.x2val, self.x3, self.x4
+ def __set_composite_values__(self, x1, x2, x3, x4):
+ self.x1val = x1
+ self.x2val = x2
+ self.x3 = x3
+ self.x4 = x4
+ __hash__ = None
+ def __eq__(self, other):
+ return other.x1val == self.x1val and other.x2val == self.x2val and other.x3 == self.x3 and other.x4 == self.x4
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ mapper(Foobar, foobars, properties=dict(
+ foob=sa.orm.composite(FBComposite, foobars.c.x1, foobars.c.x2, foobars.c.x3, foobars.c.x4)
+ ))
+
+ sess = create_session()
+ f1 = Foobar()
+ f1.foob = FBComposite(None, 5, None, None)
+ sess.add(f1)
+ sess.flush()
+
+ assert f1.foob == FBComposite(2, 5, 15, None)
+
+ @testing.resolve_artifact_names
+ def test_save_null(self):
+ """test saving a null composite value
+
+ See google groups thread for more context:
+ http://groups.google.com/group/sqlalchemy/browse_thread/thread/0c6580a1761b2c29
+
+ """
+ class Point(object):
+ def __init__(self, x, y):
+ self.x = x
+ self.y = y
+ def __composite_values__(self):
+ return [self.x, self.y]
+ __hash__ = None
+ def __eq__(self, other):
+ return other.x == self.x and other.y == self.y
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ class Graph(object):
+ pass
+ class Edge(object):
+ def __init__(self, start, end):
+ self.start = start
+ self.end = end
+
+ mapper(Graph, graphs, properties={
+ 'edges':relation(Edge)
+ })
+ mapper(Edge, edges, properties={
+ 'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1),
+ 'end':sa.orm.composite(Point, edges.c.x2, edges.c.y2)
+ })
+
+ sess = create_session()
+ g = Graph()
+ g.id = 1
+ g.version_id=1
+ e = Edge(None, None)
+ g.edges.append(e)
+
+ sess.add(g)
+ sess.flush()
+
+ sess.expunge_all()
+
+ g2 = sess.query(Graph).get([1, 1])
+ assert g2.edges[-1].start.x is None
+ assert g2.edges[-1].start.y is None
+
+
+class NoLoadTest(_fixtures.FixtureTest):
+ run_inserts = 'once'
+ run_deletes = None
+
+ @testing.resolve_artifact_names
+ def test_basic(self):
+ """A basic one-to-many lazy load"""
+ m = mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=None)
+ ))
+ q = create_session().query(m)
+ l = [None]
+ def go():
+ x = q.filter(User.id == 7).all()
+ x[0].addresses
+ l[0] = x
+ self.assert_sql_count(testing.db, go, 1)
+
+ self.assert_result(l[0], User,
+ {'id' : 7, 'addresses' : (Address, [])},
+ )
+
+ @testing.resolve_artifact_names
+ def test_options(self):
+ m = mapper(User, users, properties=dict(
+ addresses = relation(mapper(Address, addresses), lazy=None)
+ ))
+ q = create_session().query(m).options(sa.orm.lazyload('addresses'))
+ l = [None]
+ def go():
+ x = q.filter(User.id == 7).all()
+ x[0].addresses
+ l[0] = x
+ self.sql_count_(2, go)
+
+ self.assert_result(l[0], User,
+ {'id' : 7, 'addresses' : (Address, [{'id' : 1}])},
+ )
+
+
+class MapperExtensionTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ def extension(self):
+ methods = []
+
+ class Ext(sa.orm.MapperExtension):
+ def instrument_class(self, mapper, cls):
+ methods.append('instrument_class')
+ return sa.orm.EXT_CONTINUE
+
+ def init_instance(self, mapper, class_, oldinit, instance, args, kwargs):
+ methods.append('init_instance')
+ return sa.orm.EXT_CONTINUE
+
+ def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
+ methods.append('init_failed')
+ return sa.orm.EXT_CONTINUE
+
+ def translate_row(self, mapper, context, row):
+ methods.append('translate_row')
+ return sa.orm.EXT_CONTINUE
+
+ def create_instance(self, mapper, selectcontext, row, class_):
+ methods.append('create_instance')
+ return sa.orm.EXT_CONTINUE
+
+ def reconstruct_instance(self, mapper, instance):
+ methods.append('reconstruct_instance')
+ return sa.orm.EXT_CONTINUE
+
+ def append_result(self, mapper, selectcontext, row, instance, result, **flags):
+ methods.append('append_result')
+ return sa.orm.EXT_CONTINUE
+
+ def populate_instance(self, mapper, selectcontext, row, instance, **flags):
+ methods.append('populate_instance')
+ return sa.orm.EXT_CONTINUE
+
+ def before_insert(self, mapper, connection, instance):
+ methods.append('before_insert')
+ return sa.orm.EXT_CONTINUE
+
+ def after_insert(self, mapper, connection, instance):
+ methods.append('after_insert')
+ return sa.orm.EXT_CONTINUE
+
+ def before_update(self, mapper, connection, instance):
+ methods.append('before_update')
+ return sa.orm.EXT_CONTINUE
+
+ def after_update(self, mapper, connection, instance):
+ methods.append('after_update')
+ return sa.orm.EXT_CONTINUE
+
+ def before_delete(self, mapper, connection, instance):
+ methods.append('before_delete')
+ return sa.orm.EXT_CONTINUE
+
+ def after_delete(self, mapper, connection, instance):
+ methods.append('after_delete')
+ return sa.orm.EXT_CONTINUE
+
+ return Ext, methods
+
+ @testing.resolve_artifact_names
+ def test_basic(self):
+ """test that common user-defined methods get called."""
+ Ext, methods = self.extension()
+
+ mapper(User, users, extension=Ext())
+ sess = create_session()
+ u = User(name='u1')
+ sess.add(u)
+ sess.flush()
+ u = sess.query(User).populate_existing().get(u.id)
+ sess.expunge_all()
+ u = sess.query(User).get(u.id)
+ u.name = 'u1 changed'
+ sess.flush()
+ sess.delete(u)
+ sess.flush()
+ eq_(methods,
+ ['instrument_class', 'init_instance', 'before_insert',
+ 'after_insert', 'translate_row', 'populate_instance',
+ 'append_result', 'translate_row', 'create_instance',
+ 'populate_instance', 'reconstruct_instance', 'append_result',
+ 'before_update', 'after_update', 'before_delete', 'after_delete'])
+
+ @testing.resolve_artifact_names
+ def test_inheritance(self):
+ Ext, methods = self.extension()
+
+ class AdminUser(User):
+ pass
+
+ mapper(User, users, extension=Ext())
+ mapper(AdminUser, addresses, inherits=User)
+
+ sess = create_session()
+ am = AdminUser(name='au1', email_address='au1@e1')
+ sess.add(am)
+ sess.flush()
+ am = sess.query(AdminUser).populate_existing().get(am.id)
+ sess.expunge_all()
+ am = sess.query(AdminUser).get(am.id)
+ am.name = 'au1 changed'
+ sess.flush()
+ sess.delete(am)
+ sess.flush()
+ eq_(methods,
+ ['instrument_class', 'instrument_class', 'init_instance',
+ 'before_insert', 'after_insert', 'translate_row',
+ 'populate_instance', 'append_result', 'translate_row',
+ 'create_instance', 'populate_instance', 'reconstruct_instance',
+ 'append_result', 'before_update', 'after_update', 'before_delete',
+ 'after_delete'])
+
+ @testing.resolve_artifact_names
+ def test_after_with_no_changes(self):
+ """after_update is called even if no columns were updated."""
+
+ Ext, methods = self.extension()
+
+ mapper(Item, items, extension=Ext() , properties={
+ 'keywords': relation(Keyword, secondary=item_keywords)})
+ mapper(Keyword, keywords, extension=Ext())
+
+ sess = create_session()
+ i1 = Item(description="i1")
+ k1 = Keyword(name="k1")
+ sess.add(i1)
+ sess.add(k1)
+ sess.flush()
+ eq_(methods,
+ ['instrument_class', 'instrument_class', 'init_instance',
+ 'init_instance', 'before_insert', 'after_insert',
+ 'before_insert', 'after_insert'])
+
+ del methods[:]
+ i1.keywords.append(k1)
+ sess.flush()
+ eq_(methods, ['before_update', 'after_update'])
+
+
+ @testing.resolve_artifact_names
+ def test_inheritance_with_dupes(self):
+ """Inheritance with the same extension instance on both mappers."""
+ Ext, methods = self.extension()
+
+ class AdminUser(User):
+ pass
+
+ ext = Ext()
+ mapper(User, users, extension=ext)
+ mapper(AdminUser, addresses, inherits=User, extension=ext)
+
+ sess = create_session()
+ am = AdminUser(name="au1", email_address="au1@e1")
+ sess.add(am)
+ sess.flush()
+ am = sess.query(AdminUser).populate_existing().get(am.id)
+ sess.expunge_all()
+ am = sess.query(AdminUser).get(am.id)
+ am.name = 'au1 changed'
+ sess.flush()
+ sess.delete(am)
+ sess.flush()
+ eq_(methods,
+ ['instrument_class', 'instrument_class', 'init_instance',
+ 'before_insert', 'after_insert', 'translate_row',
+ 'populate_instance', 'append_result', 'translate_row',
+ 'create_instance', 'populate_instance', 'reconstruct_instance',
+ 'append_result', 'before_update', 'after_update', 'before_delete',
+ 'after_delete'])
+
+ @testing.resolve_artifact_names
+ def test_create_instance(self):
+ class CreateUserExt(sa.orm.MapperExtension):
+ def create_instance(self, mapper, selectcontext, row, class_):
+ return User.__new__(User)
+
+ mapper(User, users, extension=CreateUserExt())
+ sess = create_session()
+ u1 = User()
+ u1.name = 'ed'
+ sess.add(u1)
+ sess.flush()
+ sess.expunge_all()
+ assert sess.query(User).first()
+
+
+class RequirementsTest(_base.MappedTest):
+ """Tests the contract for user classes."""
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('ht1', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('value', String(10)))
+ Table('ht2', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('ht1_id', Integer, ForeignKey('ht1.id')),
+ Column('value', String(10)))
+ Table('ht3', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('value', String(10)))
+ Table('ht4', metadata,
+ Column('ht1_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('ht3_id', Integer, ForeignKey('ht3.id'),
+ primary_key=True))
+ Table('ht5', metadata,
+ Column('ht1_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True))
+ Table('ht6', metadata,
+ Column('ht1a_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('ht1b_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('value', String(10)))
+
+ @testing.resolve_artifact_names
+ def test_baseclass(self):
+ class OldStyle:
+ pass
+
+ assert_raises(sa.exc.ArgumentError, mapper, OldStyle, ht1)
+
+ assert_raises(sa.exc.ArgumentError, mapper, 123)
+
+ class NoWeakrefSupport(str):
+ pass
+
+ # TODO: is weakref support detectable without an instance?
+ #self.assertRaises(sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
+
+ @testing.resolve_artifact_names
+ def test_comparison_overrides(self):
+ """Simple tests to ensure users can supply comparison __methods__.
+
+ The suite-level test --options are better suited to detect
+ problems- they add selected __methods__ across the board on all
+ ORM tests. This test simply shoves a variety of operations
+ through the ORM to catch basic regressions early in a standard
+ test run.
+ """
+
+ # adding these methods directly to each class to avoid decoration
+ # by the testlib decorators.
+ class _Base(object):
+ def __init__(self, value='abc'):
+ self.value = value
+ def __nonzero__(self):
+ return False
+ def __hash__(self):
+ return hash(self.value)
+ def __eq__(self, other):
+ if isinstance(other, type(self)):
+ return self.value == other.value
+ return False
+
+ class H1(_Base):
+ pass
+ class H2(_Base):
+ pass
+ class H3(_Base):
+ pass
+ class H6(_Base):
+ pass
+
+ mapper(H1, ht1, properties={
+ 'h2s': relation(H2, backref='h1'),
+ 'h3s': relation(H3, secondary=ht4, backref='h1s'),
+ 'h1s': relation(H1, secondary=ht5, backref='parent_h1'),
+ 't6a': relation(H6, backref='h1a',
+ primaryjoin=ht1.c.id==ht6.c.ht1a_id),
+ 't6b': relation(H6, backref='h1b',
+ primaryjoin=ht1.c.id==ht6.c.ht1b_id),
+ })
+ mapper(H2, ht2)
+ mapper(H3, ht3)
+ mapper(H6, ht6)
+
+ s = create_session()
+ for i in range(3):
+ h1 = H1()
+ s.add(h1)
+
+ h1.h2s.append(H2())
+ h1.h3s.extend([H3(), H3()])
+ h1.h1s.append(H1())
+
+ s.flush()
+ eq_(ht1.count().scalar(), 4)
+
+ h6 = H6()
+ h6.h1a = h1
+ h6.h1b = h1
+
+ h6 = H6()
+ h6.h1a = h1
+ h6.h1b = x = H1()
+ assert x in s
+
+ h6.h1b.h2s.append(H2())
+
+ s.flush()
+
+ h1.h2s.extend([H2(), H2()])
+ s.flush()
+
+ h1s = s.query(H1).options(sa.orm.eagerload('h2s')).all()
+ eq_(len(h1s), 5)
+
+ self.assert_unordered_result(h1s, H1,
+ {'h2s': []},
+ {'h2s': []},
+ {'h2s': (H2, [{'value': 'abc'},
+ {'value': 'abc'},
+ {'value': 'abc'}])},
+ {'h2s': []},
+ {'h2s': (H2, [{'value': 'abc'}])})
+
+ h1s = s.query(H1).options(sa.orm.eagerload('h3s')).all()
+
+ eq_(len(h1s), 5)
+ h1s = s.query(H1).options(sa.orm.eagerload_all('t6a.h1b'),
+ sa.orm.eagerload('h2s'),
+ sa.orm.eagerload_all('h3s.h1s')).all()
+ eq_(len(h1s), 5)
+
+
+class MagicNamesTest(_base.MappedTest):
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('cartographers', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(50)),
+ Column('alias', String(50)),
+ Column('quip', String(100)))
+ Table('maps', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('cart_id', Integer,
+ ForeignKey('cartographers.id')),
+ Column('state', String(2)),
+ Column('data', sa.Text))
+
+ @classmethod
+ def setup_classes(cls):
+ class Cartographer(_base.BasicEntity):
+ pass
+
+ class Map(_base.BasicEntity):
+ pass
+
+ @testing.resolve_artifact_names
+ def test_mappish(self):
+ mapper(Cartographer, cartographers, properties=dict(
+ query=cartographers.c.quip))
+ mapper(Map, maps, properties=dict(
+ mapper=relation(Cartographer, backref='maps')))
+
+ c = Cartographer(name='Lenny', alias='The Dude',
+ query='Where be dragons?')
+ m = Map(state='AK', mapper=c)
+
+ sess = create_session()
+ sess.add(c)
+ sess.flush()
+ sess.expunge_all()
+
+ for C, M in ((Cartographer, Map),
+ (sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
+ c1 = (sess.query(C).
+ filter(C.alias=='The Dude').
+ filter(C.query=='Where be dragons?')).one()
+ m1 = sess.query(M).filter(M.mapper==c1).one()
+
+ @testing.resolve_artifact_names
+ def test_direct_stateish(self):
+ for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
+ sa.orm.attributes.ClassManager.MANAGER_ATTR):
+ t = Table('t', sa.MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column(reserved, Integer))
+ class T(object):
+ pass
+
+ assert_raises_message(
+ KeyError,
+ ('%r: requested attribute name conflicts with '
+ 'instrumentation attribute of the same name.' % reserved),
+ mapper, T, t)
+
+ @testing.resolve_artifact_names
+ def test_indirect_stateish(self):
+ for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
+ sa.orm.attributes.ClassManager.MANAGER_ATTR):
+ class M(object):
+ pass
+
+ assert_raises_message(
+ KeyError,
+ ('requested attribute name conflicts with '
+ 'instrumentation attribute of the same name'),
+ mapper, M, maps, properties={
+ reserved: maps.c.state})
+
+
+