summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rwxr-xr-xtest/sql/selectable.py129
1 files changed, 79 insertions, 50 deletions
diff --git a/test/sql/selectable.py b/test/sql/selectable.py
index e41165b5b..3f9464283 100755
--- a/test/sql/selectable.py
+++ b/test/sql/selectable.py
@@ -5,11 +5,12 @@ every selectable unit behaving nicely with others.."""
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
from testlib import *
-from sqlalchemy.sql import util as sql_util
+from sqlalchemy.sql import util as sql_util, visitors
from sqlalchemy import exc
+from sqlalchemy.sql import table, column
metadata = MetaData()
-table = Table('table1', metadata,
+table1 = Table('table1', metadata,
Column('col1', Integer, primary_key=True),
Column('col2', String(20)),
Column('col3', Integer),
@@ -27,16 +28,16 @@ table2 = Table('table2', metadata,
class SelectableTest(TestBase, AssertsExecutionResults):
def test_distance(self):
# same column three times
- s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')])
+ s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')])
# didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far
- #assert s.corresponding_column(table.c.col1) is s.c.col1
+ #assert s.corresponding_column(table1.c.col1) is s.c.col1
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
def test_join_against_self(self):
- jj = select([table.c.col1.label('bar_col1')])
- jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+ jj = select([table1.c.col1.label('bar_col1')])
+ jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
# test column directly agaisnt itself
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
@@ -45,22 +46,22 @@ class SelectableTest(TestBase, AssertsExecutionResults):
# test alias of the join, targets the column with the least
# "distance" between the requested column and the returned column
- # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
- # there is from j2.c.bar_col1 to table.c.col1)
+ # (i.e. there is less indirection between j2.c.table1_col1 and table1.c.col1, than
+ # there is from j2.c.bar_col1 to table1.c.col1)
j2 = jjj.alias('foo')
- assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
+ assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1
def test_select_on_table(self):
- sel = select([table, table2], use_labels=True)
- assert sel.corresponding_column(table.c.col1) is sel.c.table1_col1
- assert sel.corresponding_column(table.c.col1, require_embedded=True) is sel.c.table1_col1
- assert table.corresponding_column(sel.c.table1_col1) is table.c.col1
- assert table.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
+ sel = select([table1, table2], use_labels=True)
+ assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1
+ assert sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1
+ assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1
+ assert table1.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
def test_join_against_join(self):
- j = outerjoin(table, table2, table.c.col1==table2.c.col2)
- jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
- jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+ j = outerjoin(table1, table2, table1.c.col1==table2.c.col2)
+ jj = select([ table1.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
+ jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
j2 = jjj.alias('foo')
@@ -70,7 +71,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
def test_table_alias(self):
- a = table.alias('a')
+ a = table1.alias('a')
j = join(a, table2)
@@ -80,10 +81,10 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_union(self):
# tests that we can correspond a column in a Select statement with a certain Table, against
# a column in a Union where one of its underlying Selects matches to that same Table
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
)
- s1 = table.select(use_labels=True)
+ s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
print ["%d %s" % (id(c),c.key) for c in u.c]
c = u.corresponding_column(s1.c.table1_col2)
@@ -94,19 +95,19 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
def test_singular_union(self):
- u = union(select([table.c.col1, table.c.col2, table.c.col3]), select([table.c.col1, table.c.col2, table.c.col3]))
+ u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3]))
- u = union(select([table.c.col1, table.c.col2, table.c.col3]))
+ u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
assert u.c.col1
assert u.c.col2
assert u.c.col3
def test_alias_union(self):
# same as testunion, except its an alias of the union
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
- s1 = table.select(use_labels=True)
+ s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
@@ -115,26 +116,26 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_select_union(self):
# like testaliasunion, but off a Select off the union.
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
s = select([u])
- s1 = table.select(use_labels=True)
+ s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
def test_union_against_join(self):
# same as testunion, except its an alias of the union
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
- j1 = table.join(table2)
+ j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
def test_join(self):
- a = join(table, table2)
+ a = join(table1, table2)
print str(a.select(use_labels=True))
b = table2.alias('b')
j = join(a, b)
@@ -143,52 +144,40 @@ class SelectableTest(TestBase, AssertsExecutionResults):
self.assert_(criterion.compare(j.onclause))
def test_select_alias(self):
- a = table.select().alias('a')
- print str(a.select())
+ a = table1.select().alias('a')
j = join(a, table2)
criterion = a.c.col1 == table2.c.col2
- print criterion
- print j.onclause
self.assert_(criterion.compare(j.onclause))
def test_select_labels(self):
- a = table.select(use_labels=True)
+ a = table1.select(use_labels=True)
print str(a.select())
j = join(a, table2)
criterion = a.c.table1_col1 == table2.c.col2
- print
- print str(j)
self.assert_(criterion.compare(j.onclause))
def test_column_labels(self):
- a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])
- print str(a)
- print [c for c in a.columns]
- print str(a.select())
+ a = select([table1.c.col1.label('acol1'), table1.c.col2.label('acol2'), table1.c.col3.label('acol3')])
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
- print str(j)
self.assert_(criterion.compare(j.onclause))
def test_labeled_select_correspoinding(self):
- l1 = select([func.max(table.c.col1)]).label('foo')
+ l1 = select([func.max(table1.c.col1)]).label('foo')
s = select([l1])
assert s.corresponding_column(l1).name == s.c.foo
- s = select([table.c.col1, l1])
+ s = select([table1.c.col1, l1])
assert s.corresponding_column(l1).name == s.c.foo
def test_select_alias_labels(self):
a = table2.select(use_labels=True).alias('a')
- print str(a.select())
- j = join(a, table)
+ j = join(a, table1)
- criterion = table.c.col1 == a.c.table2_col2
- print str(criterion)
- print str(j.onclause)
+ criterion = table1.c.col1 == a.c.table2_col2
self.assert_(criterion.compare(j.onclause))
def test_table_joined_to_select_of_table(self):
@@ -458,8 +447,6 @@ class DerivedTest(TestBase, AssertsExecutionResults):
class AnnotationsTest(TestBase):
def test_annotated_corresponding_column(self):
- from sqlalchemy.sql import table, column
-
table1 = table('table1', column("col1"))
s1 = select([table1.c.col1])
@@ -475,6 +462,48 @@ class AnnotationsTest(TestBase):
assert inner.corresponding_column(t2.c.col1, require_embedded=False) is inner.corresponding_column(t2.c.col1, require_embedded=True) is inner.c.col1
assert inner.corresponding_column(t1.c.col1, require_embedded=False) is inner.corresponding_column(t1.c.col1, require_embedded=True) is inner.c.col1
+ def test_annotated_visit(self):
+ table1 = table('table1', column("col1"), column("col2"))
+
+ bin = table1.c.col1 == bindparam('foo', value=None)
+ assert str(bin) == "table1.col1 = :foo"
+ def visit_binary(b):
+ b.right = table1.c.col2
+
+ b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
+ assert str(b2) == "table1.col1 = table1.col2"
+
+ b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':visit_binary})
+ assert str(b3) == "table1.col1 = table1.col2"
+
+ def visit_binary(b):
+ b.left = bindparam('bar')
+
+ b4 = visitors.cloned_traverse(b2, {}, {'binary':visit_binary})
+ assert str(b4) == ":bar = table1.col2"
+
+ b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
+ assert str(b5) == ":bar = table1.col2"
+
+ def test_deannotate(self):
+ table1 = table('table1', column("col1"), column("col2"))
+
+ bin = table1.c.col1 == bindparam('foo', value=None)
+
+ b2 = sql_util._deep_annotate(bin, {'_orm_adapt':True})
+ b3 = sql_util._deep_deannotate(b2)
+ b4 = sql_util._deep_deannotate(bin)
+
+ for elem in (b2._annotations, b2.left._annotations):
+ assert '_orm_adapt' in elem
+
+ for elem in (b3._annotations, b3.left._annotations, b4._annotations, b4.left._annotations):
+ assert elem == {}
+
+ assert b2.left is not bin.left
+ assert b3.left is not b2.left is not bin.left
+ assert b4.left is bin.left # since column is immutable
+ assert b4.right is not bin.right is not b2.right is not b3.right
if __name__ == "__main__":
testenv.main()