summaryrefslogtreecommitdiff
path: root/test/sql/test_selectable.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r--test/sql/test_selectable.py1857
1 files changed, 1024 insertions, 833 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 4b92e3e3e..023a0bc61 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -1,10 +1,12 @@
"""Test various algorithmic properties of selectables."""
-from sqlalchemy.testing import eq_, assert_raises, \
- assert_raises_message, is_
+from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, is_
from sqlalchemy import *
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
- AssertsExecutionResults
+from sqlalchemy.testing import (
+ fixtures,
+ AssertsCompiledSQL,
+ AssertsExecutionResults,
+)
from sqlalchemy.sql import elements
from sqlalchemy import testing
from sqlalchemy.sql import util as sql_util, visitors, expression
@@ -14,33 +16,37 @@ from sqlalchemy import util
from sqlalchemy.schema import Column, Table, MetaData
metadata = MetaData()
-table1 = Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', String(20)),
- Column('col3', Integer),
- Column('colx', Integer),
-
- )
-
-table2 = Table('table2', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', Integer, ForeignKey('table1.col1')),
- Column('col3', String(20)),
- Column('coly', Integer),
- )
-
-keyed = Table('keyed', metadata,
- Column('x', Integer, key='colx'),
- Column('y', Integer, key='coly'),
- Column('z', Integer),
- )
+table1 = Table(
+ "table1",
+ metadata,
+ Column("col1", Integer, primary_key=True),
+ Column("col2", String(20)),
+ Column("col3", Integer),
+ Column("colx", Integer),
+)
+
+table2 = Table(
+ "table2",
+ metadata,
+ Column("col1", Integer, primary_key=True),
+ Column("col2", Integer, ForeignKey("table1.col1")),
+ Column("col3", String(20)),
+ Column("coly", Integer),
+)
+
+keyed = Table(
+ "keyed",
+ metadata,
+ Column("x", Integer, key="colx"),
+ Column("y", Integer, key="coly"),
+ Column("z", Integer),
+)
class SelectableTest(
- fixtures.TestBase,
- AssertsExecutionResults,
- AssertsCompiledSQL):
- __dialect__ = 'default'
+ fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
+):
+ __dialect__ = "default"
def test_indirect_correspondence_on_labels(self):
# this test depends upon 'distance' to
@@ -48,8 +54,13 @@ class SelectableTest(
# same column three times
- s = select([table1.c.col1.label('c2'), table1.c.col1,
- table1.c.col1.label('c1')])
+ s = select(
+ [
+ table1.c.col1.label("c2"),
+ table1.c.col1,
+ table1.c.col1.label("c1"),
+ ]
+ )
# this tests the same thing as
# test_direct_correspondence_on_labels below -
@@ -60,25 +71,25 @@ class SelectableTest(
assert s.corresponding_column(s.c.c1) is s.c.c1
def test_labeled_subquery_twice(self):
- scalar_select = select([table1.c.col1]).label('foo')
+ scalar_select = select([table1.c.col1]).label("foo")
s1 = select([scalar_select])
s2 = select([scalar_select, scalar_select])
eq_(
s1.c.foo.proxy_set,
- set([s1.c.foo, scalar_select, scalar_select.element])
+ set([s1.c.foo, scalar_select, scalar_select.element]),
)
eq_(
s2.c.foo.proxy_set,
- set([s2.c.foo, scalar_select, scalar_select.element])
+ set([s2.c.foo, scalar_select, scalar_select.element]),
)
assert s1.corresponding_column(scalar_select) is s1.c.foo
assert s2.corresponding_column(scalar_select) is s2.c.foo
def test_label_grouped_still_corresponds(self):
- label = select([table1.c.col1]).label('foo')
+ label = select([table1.c.col1]).label("foo")
label2 = label.self_group()
s1 = select([label])
@@ -90,14 +101,14 @@ class SelectableTest(
# this test depends on labels being part
# of the proxy set to get the right result
- l1, l2 = table1.c.col1.label('foo'), table1.c.col1.label('bar')
+ l1, l2 = table1.c.col1.label("foo"), table1.c.col1.label("bar")
sel = select([l1, l2])
sel2 = sel.alias()
assert sel2.corresponding_column(l1) is sel2.c.foo
assert sel2.corresponding_column(l2) is sel2.c.bar
- sel2 = select([table1.c.col1.label('foo'), table1.c.col2.label('bar')])
+ sel2 = select([table1.c.col1.label("foo"), table1.c.col2.label("bar")])
sel3 = sel.union(sel2).alias()
assert sel3.corresponding_column(l1) is sel3.c.foo
@@ -105,9 +116,9 @@ class SelectableTest(
def test_keyed_gen(self):
s = select([keyed])
- eq_(s.c.colx.key, 'colx')
+ eq_(s.c.colx.key, "colx")
- eq_(s.c.colx.name, 'x')
+ eq_(s.c.colx.name, "x")
assert s.corresponding_column(keyed.c.colx) is s.c.colx
assert s.corresponding_column(keyed.c.coly) is s.c.coly
@@ -131,26 +142,26 @@ class SelectableTest(
assert sel2.corresponding_column(keyed.c.z) is sel2.c.keyed_z
def test_keyed_c_collection_upper(self):
- c = Column('foo', Integer, key='bar')
- t = Table('t', MetaData(), c)
+ c = Column("foo", Integer, key="bar")
+ t = Table("t", MetaData(), c)
is_(t.c.bar, c)
def test_keyed_c_collection_lower(self):
- c = column('foo')
- c.key = 'bar'
- t = table('t', c)
+ c = column("foo")
+ c.key = "bar"
+ t = table("t", c)
is_(t.c.bar, c)
def test_clone_c_proxy_key_upper(self):
- c = Column('foo', Integer, key='bar')
- t = Table('t', MetaData(), c)
+ c = Column("foo", Integer, key="bar")
+ t = Table("t", MetaData(), c)
s = select([t])._clone()
assert c in s.c.bar.proxy_set
def test_clone_c_proxy_key_lower(self):
- c = column('foo')
- c.key = 'bar'
- t = table('t', c)
+ c = column("foo")
+ c.key = "bar"
+ t = table("t", c)
s = select([t])._clone()
assert c in s.c.bar.proxy_set
@@ -160,19 +171,16 @@ class SelectableTest(
def myop(x, y):
pass
- t = table('t', column('x'), column('y'))
+ t = table("t", column("x"), column("y"))
expr = BinaryExpression(t.c.x, t.c.y, myop)
s = select([t, expr])
- eq_(
- s.c.keys(),
- ['x', 'y', expr.anon_label]
- )
+ eq_(s.c.keys(), ["x", "y", expr.anon_label])
def test_cloned_intersection(self):
- t1 = table('t1', column('x'))
- t2 = table('t2', column('x'))
+ t1 = table("t1", column("x"))
+ t2 = table("t2", column("x"))
s1 = t1.select()
s2 = t2.select()
@@ -184,15 +192,13 @@ class SelectableTest(
s3c1 = s3._clone()
eq_(
- expression._cloned_intersection(
- [s1c1, s3c1], [s2c1, s1c2]
- ),
- set([s1c1])
+ expression._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]),
+ set([s1c1]),
)
def test_cloned_difference(self):
- t1 = table('t1', column('x'))
- t2 = table('t2', column('x'))
+ t1 = table("t1", column("x"))
+ t2 = table("t2", column("x"))
s1 = t1.select()
s2 = t2.select()
@@ -205,75 +211,70 @@ class SelectableTest(
s3c1 = s3._clone()
eq_(
- expression._cloned_difference(
- [s1c1, s2c1, s3c1], [s2c1, s1c2]
- ),
- set([s3c1])
+ expression._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]),
+ set([s3c1]),
)
def test_distance_on_aliases(self):
- a1 = table1.alias('a1')
- for s in (select([a1, table1], use_labels=True),
- select([table1, a1], use_labels=True)):
- assert s.corresponding_column(table1.c.col1) \
- is s.c.table1_col1
+ a1 = table1.alias("a1")
+ for s in (
+ select([a1, table1], use_labels=True),
+ select([table1, a1], use_labels=True),
+ ):
+ assert s.corresponding_column(table1.c.col1) is s.c.table1_col1
assert s.corresponding_column(a1.c.col1) is s.c.a1_col1
def test_join_against_self(self):
- jj = select([table1.c.col1.label('bar_col1')])
+ jj = select([table1.c.col1.label("bar_col1")])
jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
# test column directly against itself
- assert jjj.corresponding_column(jjj.c.table1_col1) \
- is jjj.c.table1_col1
+ assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
# test alias of the join
- j2 = jjj.alias('foo')
- assert j2.corresponding_column(table1.c.col1) \
- is j2.c.table1_col1
+ j2 = jjj.alias("foo")
+ assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1
def test_clone_append_column(self):
- sel = select([literal_column('1').label('a')])
- eq_(list(sel.c.keys()), ['a'])
+ sel = select([literal_column("1").label("a")])
+ eq_(list(sel.c.keys()), ["a"])
cloned = visitors.ReplacingCloningVisitor().traverse(sel)
- cloned.append_column(literal_column('2').label('b'))
+ cloned.append_column(literal_column("2").label("b"))
cloned.append_column(func.foo())
- eq_(list(cloned.c.keys()), ['a', 'b', 'foo()'])
+ eq_(list(cloned.c.keys()), ["a", "b", "foo()"])
def test_append_column_after_replace_selectable(self):
- basesel = select([literal_column('1').label('a')])
- tojoin = select([
- literal_column('1').label('a'),
- literal_column('2').label('b')
- ])
- basefrom = basesel.alias('basefrom')
- joinfrom = tojoin.alias('joinfrom')
+ basesel = select([literal_column("1").label("a")])
+ tojoin = select(
+ [literal_column("1").label("a"), literal_column("2").label("b")]
+ )
+ basefrom = basesel.alias("basefrom")
+ joinfrom = tojoin.alias("joinfrom")
sel = select([basefrom.c.a])
replaced = sel.replace_selectable(
- basefrom,
- basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a)
+ basefrom, basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a)
)
self.assert_compile(
replaced,
"SELECT basefrom.a FROM (SELECT 1 AS a) AS basefrom "
"JOIN (SELECT 1 AS a, 2 AS b) AS joinfrom "
- "ON basefrom.a = joinfrom.a"
+ "ON basefrom.a = joinfrom.a",
)
replaced.append_column(joinfrom.c.b)
self.assert_compile(
replaced,
"SELECT basefrom.a, joinfrom.b FROM (SELECT 1 AS a) AS basefrom "
"JOIN (SELECT 1 AS a, 2 AS b) AS joinfrom "
- "ON basefrom.a = joinfrom.a"
+ "ON basefrom.a = joinfrom.a",
)
def test_against_cloned_non_table(self):
# test that corresponding column digs across
# clone boundaries with anonymous labeled elements
- col = func.count().label('foo')
+ col = func.count().label("foo")
sel = select([col])
sel2 = visitors.ReplacingCloningVisitor().traverse(sel)
@@ -287,14 +288,14 @@ class SelectableTest(
self.assert_compile(
s1.with_only_columns([s1]),
"SELECT (SELECT table1.col1, table1.col2, "
- "table1.col3, table1.colx FROM table1) AS anon_1"
+ "table1.col3, table1.colx FROM table1) AS anon_1",
)
def test_type_coerce_preserve_subq(self):
class MyType(TypeDecorator):
impl = Integer
- stmt = select([type_coerce(column('x'), MyType).label('foo')])
+ stmt = select([type_coerce(column("x"), MyType).label("foo")])
stmt2 = stmt.select()
assert isinstance(stmt._raw_columns[0].type, MyType)
assert isinstance(stmt.c.foo.type, MyType)
@@ -303,30 +304,32 @@ class SelectableTest(
def test_select_on_table(self):
sel = select([table1, table2], use_labels=True)
- assert sel.corresponding_column(table1.c.col1) \
+ assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1
+ assert (
+ sel.corresponding_column(table1.c.col1, require_embedded=True)
is sel.c.table1_col1
- assert sel.corresponding_column(
- table1.c.col1,
- require_embedded=True) is sel.c.table1_col1
- assert table1.corresponding_column(sel.c.table1_col1) \
- is table1.c.col1
- assert table1.corresponding_column(sel.c.table1_col1,
- require_embedded=True) is None
+ )
+ assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1
+ assert (
+ table1.corresponding_column(
+ sel.c.table1_col1, require_embedded=True
+ )
+ is None
+ )
def test_join_against_join(self):
j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2)
- jj = select([table1.c.col1.label('bar_col1')],
- from_obj=[j]).alias('foo')
+ jj = select([table1.c.col1.label("bar_col1")], from_obj=[j]).alias(
+ "foo"
+ )
jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
- assert jjj.corresponding_column(jjj.c.table1_col1) \
- is jjj.c.table1_col1
- j2 = jjj.alias('foo')
- assert j2.corresponding_column(jjj.c.table1_col1) \
- is j2.c.table1_col1
+ assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+ j2 = jjj.alias("foo")
+ assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
def test_table_alias(self):
- a = table1.alias('a')
+ a = table1.alias("a")
j = join(a, table2)
@@ -338,13 +341,13 @@ class SelectableTest(
# prominent w/ PostgreSQL's tuple functions
stmt = select([table1.c.col1, table1.c.col2])
- a = stmt.alias('a')
+ a = stmt.alias("a")
self.assert_compile(
select([func.foo(a)]),
"SELECT foo(SELECT table1.col1, table1.col2 FROM table1) "
"AS foo_1 FROM "
"(SELECT table1.col1 AS col1, table1.col2 AS col2 FROM table1) "
- "AS a"
+ "AS a",
)
def test_union(self):
@@ -353,15 +356,25 @@ class SelectableTest(
# with a certain Table, against a column in a Union where one of
# its underlying Selects matches to that same Table
- u = select([table1.c.col1,
- table1.c.col2,
- table1.c.col3,
- table1.c.colx,
- null().label('coly')]).union(select([table2.c.col1,
- table2.c.col2,
- table2.c.col3,
- null().label('colx'),
- table2.c.coly]))
+ u = select(
+ [
+ table1.c.col1,
+ table1.c.col2,
+ table1.c.col3,
+ table1.c.colx,
+ null().label("coly"),
+ ]
+ ).union(
+ select(
+ [
+ table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label("colx"),
+ table2.c.coly,
+ ]
+ )
+ )
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
@@ -388,8 +401,10 @@ class SelectableTest(
assert u1.corresponding_column(table1.c.col3) is u1.c.col1
def test_singular_union(self):
- u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]),
- select([table1.c.col1, table1.c.col2, table1.c.col3]))
+ u = union(
+ select([table1.c.col1, table1.c.col2, table1.c.col3]),
+ select([table1.c.col1, table1.c.col2, table1.c.col3]),
+ )
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
assert u.c.col1 is not None
assert u.c.col2 is not None
@@ -399,14 +414,29 @@ class SelectableTest(
# same as testunion, except its an alias of the union
- u = select([table1.c.col1,
+ u = (
+ select(
+ [
+ table1.c.col1,
table1.c.col2,
table1.c.col3,
table1.c.colx,
- null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3,
- null().label('colx'), table2.c.coly])
- ).alias('analias')
+ null().label("coly"),
+ ]
+ )
+ .union(
+ select(
+ [
+ table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label("colx"),
+ table2.c.coly,
+ ]
+ )
+ )
+ .alias("analias")
+ )
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
@@ -429,7 +459,8 @@ class SelectableTest(
def test_union_of_text(self):
s1 = select([table1.c.col1, table1.c.col2])
s2 = text("select col1, col2 from foo").columns(
- column('col1'), column('col2'))
+ column("col1"), column("col2")
+ )
u1 = union(s1, s2)
assert u1.corresponding_column(s1.c.col1) is u1.c.col1
@@ -445,8 +476,10 @@ class SelectableTest(
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3])
u1 = union(s1, s2)
- assert u1.corresponding_column(
- s1.c._all_columns[0]) is u1.c._all_columns[0]
+ assert (
+ u1.corresponding_column(s1.c._all_columns[0])
+ is u1.c._all_columns[0]
+ )
assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
assert u1.corresponding_column(s1.c.col2) is u1.c.col2
assert u1.corresponding_column(s2.c.col2) is u1.c.col2
@@ -462,8 +495,10 @@ class SelectableTest(
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3])
u1 = union(s1, s2)
- assert u1.corresponding_column(
- s1.c._all_columns[0]) is u1.c._all_columns[0]
+ assert (
+ u1.corresponding_column(s1.c._all_columns[0])
+ is u1.c._all_columns[0]
+ )
assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
assert u1.corresponding_column(s1.c.col2) is u1.c.col2
assert u1.corresponding_column(s2.c.col2) is u1.c.col2
@@ -477,13 +512,18 @@ class SelectableTest(
@testing.emits_warning("Column 'col1'")
def test_union_alias_dupe_keys_grouped(self):
- s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).\
- limit(1).alias()
+ s1 = (
+ select([table1.c.col1, table1.c.col2, table2.c.col1])
+ .limit(1)
+ .alias()
+ )
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]).limit(1)
u1 = union(s1, s2)
- assert u1.corresponding_column(
- s1.c._all_columns[0]) is u1.c._all_columns[0]
+ assert (
+ u1.corresponding_column(s1.c._all_columns[0])
+ is u1.c._all_columns[0]
+ )
assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
assert u1.corresponding_column(s1.c.col2) is u1.c.col2
assert u1.corresponding_column(s2.c.col2) is u1.c.col2
@@ -499,14 +539,29 @@ class SelectableTest(
# like testaliasunion, but off a Select off the union.
- u = select([table1.c.col1,
+ u = (
+ select(
+ [
+ table1.c.col1,
table1.c.col2,
table1.c.col3,
table1.c.colx,
- null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3,
- null().label('colx'), table2.c.coly])
- ).alias('analias')
+ null().label("coly"),
+ ]
+ )
+ .union(
+ select(
+ [
+ table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label("colx"),
+ table2.c.coly,
+ ]
+ )
+ )
+ .alias("analias")
+ )
s = select([u])
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
@@ -517,14 +572,29 @@ class SelectableTest(
# same as testunion, except its an alias of the union
- u = select([table1.c.col1,
+ u = (
+ select(
+ [
+ table1.c.col1,
table1.c.col2,
table1.c.col3,
table1.c.colx,
- null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3,
- null().label('colx'), table2.c.coly])
- ).alias('analias')
+ null().label("coly"),
+ ]
+ )
+ .union(
+ select(
+ [
+ table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label("colx"),
+ table2.c.coly,
+ ]
+ )
+ )
+ .alias("analias")
+ )
j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
@@ -532,14 +602,14 @@ class SelectableTest(
def test_join(self):
a = join(table1, table2)
print(str(a.select(use_labels=True)))
- b = table2.alias('b')
+ b = table2.alias("b")
j = join(a, b)
print(str(j))
criterion = a.c.table1_col1 == b.c.col2
self.assert_(criterion.compare(j.onclause))
def test_select_alias(self):
- a = table1.select().alias('a')
+ a = table1.select().alias("a")
j = join(a, table2)
criterion = a.c.col1 == table2.c.col2
@@ -562,15 +632,19 @@ class SelectableTest(
is_(expr2.left, sel2)
def test_column_labels(self):
- a = select([table1.c.col1.label('acol1'),
- table1.c.col2.label('acol2'),
- table1.c.col3.label('acol3')])
+ a = select(
+ [
+ table1.c.col1.label("acol1"),
+ table1.c.col2.label("acol2"),
+ table1.c.col3.label("acol3"),
+ ]
+ )
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
def test_labeled_select_correspoinding(self):
- l1 = select([func.max(table1.c.col1)]).label('foo')
+ l1 = select([func.max(table1.c.col1)]).label("foo")
s = select([l1])
eq_(s.corresponding_column(l1), s.c.foo)
@@ -579,7 +653,7 @@ class SelectableTest(
eq_(s.corresponding_column(l1), s.c.foo)
def test_select_alias_labels(self):
- a = table2.select(use_labels=True).alias('a')
+ a = table2.select(use_labels=True).alias("a")
j = join(a, table1)
criterion = table1.c.col1 == a.c.table2_col2
@@ -587,14 +661,13 @@ class SelectableTest(
def test_table_joined_to_select_of_table(self):
metadata = MetaData()
- a = Table('a', metadata,
- Column('id', Integer, primary_key=True))
+ a = Table("a", metadata, Column("id", Integer, primary_key=True))
- j2 = select([a.c.id.label('aid')]).alias('bar')
+ j2 = select([a.c.id.label("aid")]).alias("bar")
j3 = a.join(j2, j2.c.aid == a.c.id)
- j4 = select([j3]).alias('foo')
+ j4 = select([j3]).alias("foo")
assert j4.corresponding_column(j2.c.aid) is j4.c.aid
assert j4.corresponding_column(a.c.id) is j4.c.id
@@ -602,9 +675,9 @@ class SelectableTest(
m = MetaData()
m2 = MetaData()
- t1 = Table('t1', m, Column('id', Integer), Column('id2', Integer))
- t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id')))
- t3 = Table('t3', m2, Column('id', Integer, ForeignKey('t1.id2')))
+ t1 = Table("t1", m, Column("id", Integer), Column("id2", Integer))
+ t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id")))
+ t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2")))
s = select([t2, t3], use_labels=True)
@@ -612,24 +685,24 @@ class SelectableTest(
def test_multi_label_chain_naming_col(self):
# See [ticket:2167] for this one.
- l1 = table1.c.col1.label('a')
- l2 = select([l1]).label('b')
+ l1 = table1.c.col1.label("a")
+ l2 = select([l1]).label("b")
s = select([l2])
assert s.c.b is not None
self.assert_compile(
s.select(),
- "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)"
+ "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)",
)
- s2 = select([s.label('c')])
+ s2 = select([s.label("c")])
self.assert_compile(
s2.select(),
"SELECT c FROM (SELECT (SELECT ("
- "SELECT table1.col1 AS a FROM table1) AS b) AS c)"
+ "SELECT table1.col1 AS a FROM table1) AS b) AS c)",
)
def test_self_referential_select_raises(self):
- t = table('t', column('x'))
+ t = table("t", column("x"))
s = select([t])
@@ -637,140 +710,110 @@ class SelectableTest(
assert_raises_message(
exc.InvalidRequestError,
r"select\(\) construct refers to itself as a FROM",
- s.compile
+ s.compile,
)
def test_unusual_column_elements_text(self):
"""test that .c excludes text()."""
s = select([table1.c.col1, text("foo")])
- eq_(
- list(s.c),
- [s.c.col1]
- )
+ eq_(list(s.c), [s.c.col1])
def test_unusual_column_elements_clauselist(self):
"""Test that raw ClauseList is expanded into .c."""
from sqlalchemy.sql.expression import ClauseList
+
s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)])
- eq_(
- list(s.c),
- [s.c.col1, s.c.col2, s.c.col3]
- )
+ eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3])
def test_unusual_column_elements_boolean_clauselist(self):
"""test that BooleanClauseList is placed as single element in .c."""
c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4)
s = select([table1.c.col1, c2])
- eq_(
- list(s.c),
- [s.c.col1, s.corresponding_column(c2)]
- )
+ eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)])
def test_from_list_deferred_constructor(self):
- c1 = Column('c1', Integer)
- c2 = Column('c2', Integer)
+ c1 = Column("c1", Integer)
+ c2 = Column("c2", Integer)
s = select([c1])
- t = Table('t', MetaData(), c1, c2)
+ t = Table("t", MetaData(), c1, c2)
eq_(c1._from_objects, [t])
eq_(c2._from_objects, [t])
- self.assert_compile(select([c1]),
- "SELECT t.c1 FROM t")
- self.assert_compile(select([c2]),
- "SELECT t.c2 FROM t")
+ self.assert_compile(select([c1]), "SELECT t.c1 FROM t")
+ self.assert_compile(select([c2]), "SELECT t.c2 FROM t")
def test_from_list_deferred_whereclause(self):
- c1 = Column('c1', Integer)
- c2 = Column('c2', Integer)
+ c1 = Column("c1", Integer)
+ c2 = Column("c2", Integer)
s = select([c1]).where(c1 == 5)
- t = Table('t', MetaData(), c1, c2)
+ t = Table("t", MetaData(), c1, c2)
eq_(c1._from_objects, [t])
eq_(c2._from_objects, [t])
- self.assert_compile(select([c1]),
- "SELECT t.c1 FROM t")
- self.assert_compile(select([c2]),
- "SELECT t.c2 FROM t")
+ self.assert_compile(select([c1]), "SELECT t.c1 FROM t")
+ self.assert_compile(select([c2]), "SELECT t.c2 FROM t")
def test_from_list_deferred_fromlist(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer))
+ t1 = Table("t1", m, Column("x", Integer))
- c1 = Column('c1', Integer)
+ c1 = Column("c1", Integer)
s = select([c1]).where(c1 == 5).select_from(t1)
- t2 = Table('t2', MetaData(), c1)
+ t2 = Table("t2", MetaData(), c1)
eq_(c1._from_objects, [t2])
- self.assert_compile(select([c1]),
- "SELECT t2.c1 FROM t2")
+ self.assert_compile(select([c1]), "SELECT t2.c1 FROM t2")
def test_from_list_deferred_cloning(self):
- c1 = Column('c1', Integer)
- c2 = Column('c2', Integer)
+ c1 = Column("c1", Integer)
+ c2 = Column("c2", Integer)
s = select([c1])
s2 = select([c2])
s3 = sql_util.ClauseAdapter(s).traverse(s2)
- Table('t', MetaData(), c1, c2)
+ Table("t", MetaData(), c1, c2)
- self.assert_compile(
- s3,
- "SELECT t.c2 FROM t"
- )
+ self.assert_compile(s3, "SELECT t.c2 FROM t")
def test_from_list_with_columns(self):
- table1 = table('t1', column('a'))
- table2 = table('t2', column('b'))
+ table1 = table("t1", column("a"))
+ table2 = table("t2", column("b"))
s1 = select([table1.c.a, table2.c.b])
- self.assert_compile(s1,
- "SELECT t1.a, t2.b FROM t1, t2"
- )
+ self.assert_compile(s1, "SELECT t1.a, t2.b FROM t1, t2")
s2 = s1.with_only_columns([table2.c.b])
- self.assert_compile(s2,
- "SELECT t2.b FROM t2"
- )
+ self.assert_compile(s2, "SELECT t2.b FROM t2")
s3 = sql_util.ClauseAdapter(table1).traverse(s1)
- self.assert_compile(s3,
- "SELECT t1.a, t2.b FROM t1, t2"
- )
+ self.assert_compile(s3, "SELECT t1.a, t2.b FROM t1, t2")
s4 = s3.with_only_columns([table2.c.b])
- self.assert_compile(s4,
- "SELECT t2.b FROM t2"
- )
+ self.assert_compile(s4, "SELECT t2.b FROM t2")
def test_from_list_warning_against_existing(self):
- c1 = Column('c1', Integer)
+ c1 = Column("c1", Integer)
s = select([c1])
# force a compile.
- self.assert_compile(
- s,
- "SELECT c1"
- )
+ self.assert_compile(s, "SELECT c1")
- Table('t', MetaData(), c1)
+ Table("t", MetaData(), c1)
- self.assert_compile(
- s,
- "SELECT t.c1 FROM t"
- )
+ self.assert_compile(s, "SELECT t.c1 FROM t")
def test_from_list_recovers_after_warning(self):
- c1 = Column('c1', Integer)
- c2 = Column('c2', Integer)
+ c1 = Column("c1", Integer)
+ c2 = Column("c2", Integer)
s = select([c1])
@@ -779,7 +822,8 @@ class SelectableTest(
@testing.emits_warning()
def go():
- return Table('t', MetaData(), c1, c2)
+ return Table("t", MetaData(), c1, c2)
+
t = go()
eq_(c1._from_objects, [t])
@@ -793,175 +837,175 @@ class SelectableTest(
self.assert_compile(select([c2]), "SELECT t.c2 FROM t")
def test_label_gen_resets_on_table(self):
- c1 = Column('c1', Integer)
+ c1 = Column("c1", Integer)
eq_(c1._label, "c1")
- Table('t1', MetaData(), c1)
+ Table("t1", MetaData(), c1)
eq_(c1._label, "t1_c1")
class RefreshForNewColTest(fixtures.TestBase):
-
def test_join_uninit(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
j = a.join(b, a.c.x == b.c.y)
- q = column('q')
+ q = column("q")
b.append_column(q)
j._refresh_for_new_column(q)
assert j.c.b_q is q
def test_join_init(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
j = a.join(b, a.c.x == b.c.y)
j.c
- q = column('q')
+ q = column("q")
b.append_column(q)
j._refresh_for_new_column(q)
assert j.c.b_q is q
def test_join_samename_init(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
j = a.join(b, a.c.x == b.c.y)
j.c
- q = column('x')
+ q = column("x")
b.append_column(q)
j._refresh_for_new_column(q)
assert j.c.b_x is q
def test_select_samename_init(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
s = select([a, b]).apply_labels()
s.c
- q = column('x')
+ q = column("x")
b.append_column(q)
s._refresh_for_new_column(q)
assert q in s.c.b_x.proxy_set
def test_aliased_select_samename_uninit(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
s = select([a, b]).apply_labels().alias()
- q = column('x')
+ q = column("x")
b.append_column(q)
s._refresh_for_new_column(q)
assert q in s.c.b_x.proxy_set
def test_aliased_select_samename_init(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
s = select([a, b]).apply_labels().alias()
s.c
- q = column('x')
+ q = column("x")
b.append_column(q)
s._refresh_for_new_column(q)
assert q in s.c.b_x.proxy_set
def test_aliased_select_irrelevant(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
- c = table('c', column('z'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
+ c = table("c", column("z"))
s = select([a, b]).apply_labels().alias()
s.c
- q = column('x')
+ q = column("x")
c.append_column(q)
s._refresh_for_new_column(q)
- assert 'c_x' not in s.c
+ assert "c_x" not in s.c
def test_aliased_select_no_cols_clause(self):
- a = table('a', column('x'))
+ a = table("a", column("x"))
s = select([a.c.x]).apply_labels().alias()
s.c
- q = column('q')
+ q = column("q")
a.append_column(q)
s._refresh_for_new_column(q)
- assert 'a_q' not in s.c
+ assert "a_q" not in s.c
def test_union_uninit(self):
- a = table('a', column('x'))
+ a = table("a", column("x"))
s1 = select([a])
s2 = select([a])
s3 = s1.union(s2)
- q = column('q')
+ q = column("q")
a.append_column(q)
s3._refresh_for_new_column(q)
assert a.c.q in s3.c.q.proxy_set
def test_union_init_raises(self):
- a = table('a', column('x'))
+ a = table("a", column("x"))
s1 = select([a])
s2 = select([a])
s3 = s1.union(s2)
s3.c
- q = column('q')
+ q = column("q")
a.append_column(q)
assert_raises_message(
NotImplementedError,
"CompoundSelect constructs don't support addition of "
"columns to underlying selectables",
- s3._refresh_for_new_column, q
+ s3._refresh_for_new_column,
+ q,
)
def test_nested_join_uninit(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
- c = table('c', column('z'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
+ c = table("c", column("z"))
j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
- q = column('q')
+ q = column("q")
b.append_column(q)
j._refresh_for_new_column(q)
assert j.c.b_q is q
def test_nested_join_init(self):
- a = table('a', column('x'))
- b = table('b', column('y'))
- c = table('c', column('z'))
+ a = table("a", column("x"))
+ b = table("b", column("y"))
+ c = table("c", column("z"))
j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
j.c
- q = column('q')
+ q = column("q")
b.append_column(q)
j._refresh_for_new_column(q)
assert j.c.b_q is q
def test_fk_table(self):
m = MetaData()
- fk = ForeignKey('x.id')
- Table('x', m, Column('id', Integer))
- a = Table('a', m, Column('x', Integer, fk))
+ fk = ForeignKey("x.id")
+ Table("x", m, Column("id", Integer))
+ a = Table("a", m, Column("x", Integer, fk))
a.c
- q = Column('q', Integer)
+ q = Column("q", Integer)
a.append_column(q)
a._refresh_for_new_column(q)
eq_(a.foreign_keys, set([fk]))
- fk2 = ForeignKey('g.id')
- p = Column('p', Integer, fk2)
+ fk2 = ForeignKey("g.id")
+ p = Column("p", Integer, fk2)
a.append_column(p)
a._refresh_for_new_column(p)
eq_(a.foreign_keys, set([fk, fk2]))
def test_fk_join(self):
m = MetaData()
- fk = ForeignKey('x.id')
- Table('x', m, Column('id', Integer))
- a = Table('a', m, Column('x', Integer, fk))
- b = Table('b', m, Column('y', Integer))
+ fk = ForeignKey("x.id")
+ Table("x", m, Column("id", Integer))
+ a = Table("a", m, Column("x", Integer, fk))
+ b = Table("b", m, Column("y", Integer))
j = a.join(b, a.c.x == b.c.y)
j.c
- q = Column('q', Integer)
+ q = Column("q", Integer)
b.append_column(q)
j._refresh_for_new_column(q)
eq_(j.foreign_keys, set([fk]))
- fk2 = ForeignKey('g.id')
- p = Column('p', Integer, fk2)
+ fk2 = ForeignKey("g.id")
+ p = Column("p", Integer, fk2)
b.append_column(p)
j._refresh_for_new_column(p)
eq_(j.foreign_keys, set([fk, fk2]))
@@ -972,18 +1016,18 @@ class AnonLabelTest(fixtures.TestBase):
"""Test behaviors fixed by [ticket:2168]."""
def test_anon_labels_named_column(self):
- c1 = column('x')
+ c1 = column("x")
assert c1.label(None) is not c1
eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
def test_anon_labels_literal_column(self):
- c1 = literal_column('x')
+ c1 = literal_column("x")
assert c1.label(None) is not c1
eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
def test_anon_labels_func(self):
- c1 = func.count('*')
+ c1 = func.count("*")
assert c1.label(None) is not c1
eq_(str(select([c1])), "SELECT count(:count_2) AS count_1")
@@ -992,76 +1036,76 @@ class AnonLabelTest(fixtures.TestBase):
eq_(str(select([c1.label(None)])), "SELECT count(:count_2) AS count_1")
def test_named_labels_named_column(self):
- c1 = column('x')
- eq_(str(select([c1.label('y')])), "SELECT x AS y")
+ c1 = column("x")
+ eq_(str(select([c1.label("y")])), "SELECT x AS y")
def test_named_labels_literal_column(self):
- c1 = literal_column('x')
- eq_(str(select([c1.label('y')])), "SELECT x AS y")
+ c1 = literal_column("x")
+ eq_(str(select([c1.label("y")])), "SELECT x AS y")
class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def test_flat_ok_on_non_join(self):
- a = table('a', column('a'))
+ a = table("a", column("a"))
s = a.select()
self.assert_compile(
s.alias(flat=True).select(),
- "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1"
+ "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1",
)
def test_join_alias(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
+ a = table("a", column("a"))
+ b = table("b", column("b"))
self.assert_compile(
a.join(b, a.c.a == b.c.b).alias(),
- "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b"
+ "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b",
)
def test_join_standalone_alias(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
+ a = table("a", column("a"))
+ b = table("b", column("b"))
self.assert_compile(
alias(a.join(b, a.c.a == b.c.b)),
- "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b"
+ "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b",
)
def test_join_alias_flat(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
+ a = table("a", column("a"))
+ b = table("b", column("b"))
self.assert_compile(
a.join(b, a.c.a == b.c.b).alias(flat=True),
- "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b"
+ "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b",
)
def test_join_standalone_alias_flat(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
+ a = table("a", column("a"))
+ b = table("b", column("b"))
self.assert_compile(
alias(a.join(b, a.c.a == b.c.b), flat=True),
- "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b"
+ "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b",
)
def test_composed_join_alias_flat(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
- c = table('c', column('c'))
- d = table('d', column('d'))
+ a = table("a", column("a"))
+ b = table("b", column("b"))
+ c = table("c", column("c"))
+ d = table("d", column("d"))
j1 = a.join(b, a.c.a == b.c.b)
j2 = c.join(d, c.c.c == d.c.d)
self.assert_compile(
j1.join(j2, b.c.b == c.c.c).alias(flat=True),
"a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b JOIN "
- "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c"
+ "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c",
)
def test_composed_join_alias(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
- c = table('c', column('c'))
- d = table('d', column('d'))
+ a = table("a", column("a"))
+ b = table("b", column("b"))
+ c = table("c", column("c"))
+ d = table("d", column("d"))
j1 = a.join(b, a.c.a == b.c.b)
j2 = c.join(d, c.c.c == d.c.d)
@@ -1070,29 +1114,35 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT anon_1.a_a, anon_1.b_b, anon_1.c_c, anon_1.d_d "
"FROM (SELECT a.a AS a_a, b.b AS b_b, c.c AS c_c, d.d AS d_d "
"FROM a JOIN b ON a.a = b.b "
- "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1"
+ "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1",
)
class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def test_join_condition(self):
m = MetaData()
- t1 = Table('t1', m, Column('id', Integer))
- t2 = Table('t2', m,
- Column('id', Integer),
- Column('t1id', ForeignKey('t1.id')))
- t3 = Table('t3', m,
- Column('id', Integer),
- Column('t1id', ForeignKey('t1.id')),
- Column('t2id', ForeignKey('t2.id')))
- t4 = Table('t4', m, Column('id', Integer),
- Column('t2id', ForeignKey('t2.id')))
- t5 = Table('t5', m,
- Column('t1id1', ForeignKey('t1.id')),
- Column('t1id2', ForeignKey('t1.id')),
- )
+ t1 = Table("t1", m, Column("id", Integer))
+ t2 = Table(
+ "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
+ )
+ t3 = Table(
+ "t3",
+ m,
+ Column("id", Integer),
+ Column("t1id", ForeignKey("t1.id")),
+ Column("t2id", ForeignKey("t2.id")),
+ )
+ t4 = Table(
+ "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id"))
+ )
+ t5 = Table(
+ "t5",
+ m,
+ Column("t1id1", ForeignKey("t1.id")),
+ Column("t1id2", ForeignKey("t1.id")),
+ )
t1t2 = t1.join(t2)
t2t3 = t2.join(t3)
@@ -1108,10 +1158,8 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
(t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id),
]:
assert expected.compare(
- sql_util.join_condition(
- left,
- right,
- a_subset=a_subset))
+ sql_util.join_condition(left, right, a_subset=a_subset)
+ )
# these are ambiguous, or have no joins
for left, right, a_subset in [
@@ -1120,12 +1168,14 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
(t1, t4, None),
(t1t2, t2t3, None),
(t5, t1, None),
- (t5.select(use_labels=True), t1, None)
+ (t5.select(use_labels=True), t1, None),
]:
assert_raises(
exc.ArgumentError,
sql_util.join_condition,
- left, right, a_subset=a_subset
+ left,
+ right,
+ a_subset=a_subset,
)
als = t2t3.alias()
@@ -1138,41 +1188,38 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
(t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
(t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
(t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
- (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id)
+ (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id),
]:
- assert expected.compare(
- left.join(right).onclause
- )
+ assert expected.compare(left.join(right).onclause)
# these are right-nested joins
j = t1t2.join(t2t3)
assert j.onclause.compare(t2.c.id == t3.c.t2id)
self.assert_compile(
- j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
- "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id")
+ j,
+ "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
+ "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id",
+ )
st2t3 = t2t3.select(use_labels=True)
j = t1t2.join(st2t3)
assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id)
self.assert_compile(
- j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
+ j,
+ "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
"(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, "
"t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id "
- "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id")
+ "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id",
+ )
def test_join_multiple_equiv_fks(self):
m = MetaData()
- t1 = Table('t1', m,
- Column('id', Integer, primary_key=True)
- )
+ t1 = Table("t1", m, Column("id", Integer, primary_key=True))
t2 = Table(
- 't2',
+ "t2",
m,
- Column(
- 't1id',
- Integer,
- ForeignKey('t1.id'),
- ForeignKey('t1.id')))
+ Column("t1id", Integer, ForeignKey("t1.id"), ForeignKey("t1.id")),
+ )
assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id)
@@ -1181,35 +1228,43 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
# bounding the "good" column with two "bad" ones is so to
# try to get coverage to get the "continue" statements
# in the loop...
- t1 = Table('t1', m,
- Column('y', Integer, ForeignKey('t22.id')),
- Column('x', Integer, ForeignKey('t2.id')),
- Column('q', Integer, ForeignKey('t22.id')),
- )
- t2 = Table('t2', m, Column('id', Integer))
+ t1 = Table(
+ "t1",
+ m,
+ Column("y", Integer, ForeignKey("t22.id")),
+ Column("x", Integer, ForeignKey("t2.id")),
+ Column("q", Integer, ForeignKey("t22.id")),
+ )
+ t2 = Table("t2", m, Column("id", Integer))
assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
def test_join_cond_no_such_unrelated_column(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')),
- Column('y', Integer, ForeignKey('t3.q')))
- t2 = Table('t2', m, Column('id', Integer))
- Table('t3', m, Column('id', Integer))
+ t1 = Table(
+ "t1",
+ m,
+ Column("x", Integer, ForeignKey("t2.id")),
+ Column("y", Integer, ForeignKey("t3.q")),
+ )
+ t2 = Table("t2", m, Column("id", Integer))
+ Table("t3", m, Column("id", Integer))
assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
def test_join_cond_no_such_related_table(self):
m1 = MetaData()
m2 = MetaData()
- t1 = Table('t1', m1, Column('x', Integer, ForeignKey('t2.id')))
- t2 = Table('t2', m2, Column('id', Integer))
+ t1 = Table("t1", m1, Column("x", Integer, ForeignKey("t2.id")))
+ t2 = Table("t2", m2, Column("id", Integer))
assert_raises_message(
exc.NoReferencedTableError,
"Foreign key associated with column 't1.x' could not find "
"table 't2' with which to generate a foreign key to "
"target column 'id'",
- sql_util.join_condition, t1, t2
+ sql_util.join_condition,
+ t1,
+ t2,
)
assert_raises_message(
@@ -1217,19 +1272,23 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
"Foreign key associated with column 't1.x' could not find "
"table 't2' with which to generate a foreign key to "
"target column 'id'",
- sql_util.join_condition, t2, t1
+ sql_util.join_condition,
+ t2,
+ t1,
)
def test_join_cond_no_such_related_column(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.q')))
- t2 = Table('t2', m, Column('id', Integer))
+ t1 = Table("t1", m, Column("x", Integer, ForeignKey("t2.q")))
+ t2 = Table("t2", m, Column("id", Integer))
assert_raises_message(
exc.NoReferencedColumnError,
"Could not initialize target column for "
"ForeignKey 't2.q' on table 't1': "
"table 't2' has no column named 'q'",
- sql_util.join_condition, t1, t2
+ sql_util.join_condition,
+ t1,
+ t2,
)
assert_raises_message(
@@ -1237,25 +1296,35 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
"Could not initialize target column for "
"ForeignKey 't2.q' on table 't1': "
"table 't2' has no column named 'q'",
- sql_util.join_condition, t2, t1
+ sql_util.join_condition,
+ t2,
+ t1,
)
class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
-
def test_join_pk_collapse_implicit(self):
"""test that redundant columns in a join get 'collapsed' into a
minimal primary key, which is the root column along a chain of
foreign key relationships."""
meta = MetaData()
- a = Table('a', meta, Column('id', Integer, primary_key=True))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
- primary_key=True))
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
- primary_key=True))
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
- primary_key=True))
+ a = Table("a", meta, Column("id", Integer, primary_key=True))
+ b = Table(
+ "b",
+ meta,
+ Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+ )
+ c = Table(
+ "c",
+ meta,
+ Column("id", Integer, ForeignKey("b.id"), primary_key=True),
+ )
+ d = Table(
+ "d",
+ meta,
+ Column("id", Integer, ForeignKey("c.id"), primary_key=True),
+ )
assert c.c.id.references(b.c.id)
assert not d.c.id.references(a.c.id)
assert list(a.join(b).primary_key) == [a.c.id]
@@ -1271,43 +1340,60 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
explicit join conditions."""
meta = MetaData()
- a = Table('a', meta, Column('id', Integer, primary_key=True),
- Column('x', Integer))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
- primary_key=True), Column('x', Integer))
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
- primary_key=True), Column('x', Integer))
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
- primary_key=True), Column('x', Integer))
+ a = Table(
+ "a",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ )
+ b = Table(
+ "b",
+ meta,
+ Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+ Column("x", Integer),
+ )
+ c = Table(
+ "c",
+ meta,
+ Column("id", Integer, ForeignKey("b.id"), primary_key=True),
+ Column("x", Integer),
+ )
+ d = Table(
+ "d",
+ meta,
+ Column("id", Integer, ForeignKey("c.id"), primary_key=True),
+ Column("x", Integer),
+ )
print(list(a.join(b, a.c.x == b.c.id).primary_key))
assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id]
assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id]
- assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \
- == [a.c.id]
- assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \
- == [b.c.id]
- assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \
- == [b.c.id]
+ assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) == [a.c.id]
+ assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) == [b.c.id]
+ assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) == [b.c.id]
+ assert list(
+ d.join(b, d.c.id == b.c.id).join(c, b.c.id == c.c.x).primary_key
+ ) == [b.c.id]
assert list(
- d.join(
- b,
- d.c.id == b.c.id).join(
- c,
- b.c.id == c.c.x).primary_key) == [
- b.c.id]
- assert list(a.join(b).join(c, c.c.id
- == b.c.x).join(d).primary_key) == [a.c.id]
- assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x
- == b.c.id)).primary_key) == [a.c.id]
+ a.join(b).join(c, c.c.id == b.c.x).join(d).primary_key
+ ) == [a.c.id]
+ assert list(
+ a.join(b, and_(a.c.id == b.c.id, a.c.x == b.c.id)).primary_key
+ ) == [a.c.id]
def test_init_doesnt_blowitaway(self):
meta = MetaData()
- a = Table('a', meta,
- Column('id', Integer, primary_key=True),
- Column('x', Integer))
- b = Table('b', meta,
- Column('id', Integer, ForeignKey('a.id'), primary_key=True),
- Column('x', Integer))
+ a = Table(
+ "a",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ )
+ b = Table(
+ "b",
+ meta,
+ Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+ Column("x", Integer),
+ )
j = a.join(b)
assert list(j.primary_key) == [a.c.id]
@@ -1317,12 +1403,18 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
def test_non_column_clause(self):
meta = MetaData()
- a = Table('a', meta,
- Column('id', Integer, primary_key=True),
- Column('x', Integer))
- b = Table('b', meta,
- Column('id', Integer, ForeignKey('a.id'), primary_key=True),
- Column('x', Integer, primary_key=True))
+ a = Table(
+ "a",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ )
+ b = Table(
+ "b",
+ meta,
+ Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+ Column("x", Integer, primary_key=True),
+ )
j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5))
assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
@@ -1331,123 +1423,156 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
def test_onclause_direction(self):
metadata = MetaData()
- employee = Table('Employee', metadata,
- Column('name', String(100)),
- Column('id', Integer, primary_key=True),
- )
+ employee = Table(
+ "Employee",
+ metadata,
+ Column("name", String(100)),
+ Column("id", Integer, primary_key=True),
+ )
- engineer = Table('Engineer', metadata,
- Column('id', Integer,
- ForeignKey('Employee.id'), primary_key=True))
+ engineer = Table(
+ "Engineer",
+ metadata,
+ Column("id", Integer, ForeignKey("Employee.id"), primary_key=True),
+ )
- eq_(util.column_set(employee.join(engineer, employee.c.id
- == engineer.c.id).primary_key),
- util.column_set([employee.c.id]))
- eq_(util.column_set(employee.join(engineer, engineer.c.id
- == employee.c.id).primary_key),
- util.column_set([employee.c.id]))
+ eq_(
+ util.column_set(
+ employee.join(
+ engineer, employee.c.id == engineer.c.id
+ ).primary_key
+ ),
+ util.column_set([employee.c.id]),
+ )
+ eq_(
+ util.column_set(
+ employee.join(
+ engineer, engineer.c.id == employee.c.id
+ ).primary_key
+ ),
+ util.column_set([employee.c.id]),
+ )
class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
-
def test_reduce(self):
meta = MetaData()
- t1 = Table('t1', meta,
- Column('t1id', Integer, primary_key=True),
- Column('t1data', String(30)))
+ t1 = Table(
+ "t1",
+ meta,
+ Column("t1id", Integer, primary_key=True),
+ Column("t1data", String(30)),
+ )
t2 = Table(
- 't2',
+ "t2",
meta,
- Column(
- 't2id',
- Integer,
- ForeignKey('t1.t1id'),
- primary_key=True),
- Column(
- 't2data',
- String(30)))
+ Column("t2id", Integer, ForeignKey("t1.t1id"), primary_key=True),
+ Column("t2data", String(30)),
+ )
t3 = Table(
- 't3',
+ "t3",
meta,
- Column(
- 't3id',
- Integer,
- ForeignKey('t2.t2id'),
- primary_key=True),
- Column(
- 't3data',
- String(30)))
-
- eq_(util.column_set(sql_util.reduce_columns([
- t1.c.t1id,
- t1.c.t1data,
- t2.c.t2id,
- t2.c.t2data,
- t3.c.t3id,
- t3.c.t3data,
- ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data,
- t3.c.t3data]))
+ Column("t3id", Integer, ForeignKey("t2.t2id"), primary_key=True),
+ Column("t3data", String(30)),
+ )
+
+ eq_(
+ util.column_set(
+ sql_util.reduce_columns(
+ [
+ t1.c.t1id,
+ t1.c.t1data,
+ t2.c.t2id,
+ t2.c.t2data,
+ t3.c.t3id,
+ t3.c.t3data,
+ ]
+ )
+ ),
+ util.column_set(
+ [t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data]
+ ),
+ )
def test_reduce_selectable(self):
metadata = MetaData()
- engineers = Table('engineers', metadata,
- Column('engineer_id', Integer, primary_key=True),
- Column('engineer_name', String(50)))
- managers = Table('managers', metadata,
- Column('manager_id', Integer, primary_key=True),
- Column('manager_name', String(50)))
- s = select([engineers,
- managers]).where(engineers.c.engineer_name
- == managers.c.manager_name)
- eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
- util.column_set([s.c.engineer_id, s.c.engineer_name,
- s.c.manager_id]))
+ engineers = Table(
+ "engineers",
+ metadata,
+ Column("engineer_id", Integer, primary_key=True),
+ Column("engineer_name", String(50)),
+ )
+ managers = Table(
+ "managers",
+ metadata,
+ Column("manager_id", Integer, primary_key=True),
+ Column("manager_name", String(50)),
+ )
+ s = select([engineers, managers]).where(
+ engineers.c.engineer_name == managers.c.manager_name
+ )
+ eq_(
+ util.column_set(sql_util.reduce_columns(list(s.c), s)),
+ util.column_set(
+ [s.c.engineer_id, s.c.engineer_name, s.c.manager_id]
+ ),
+ )
def test_reduce_generation(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer, primary_key=True),
- Column('y', Integer))
- t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')),
- Column('q', Integer))
+ t1 = Table(
+ "t1",
+ m,
+ Column("x", Integer, primary_key=True),
+ Column("y", Integer),
+ )
+ t2 = Table(
+ "t2",
+ m,
+ Column("z", Integer, ForeignKey("t1.x")),
+ Column("q", Integer),
+ )
s1 = select([t1, t2])
s2 = s1.reduce_columns(only_synonyms=False)
- eq_(
- set(s2.inner_columns),
- set([t1.c.x, t1.c.y, t2.c.q])
- )
+ eq_(set(s2.inner_columns), set([t1.c.x, t1.c.y, t2.c.q]))
s2 = s1.reduce_columns()
- eq_(
- set(s2.inner_columns),
- set([t1.c.x, t1.c.y, t2.c.z, t2.c.q])
- )
+ eq_(set(s2.inner_columns), set([t1.c.x, t1.c.y, t2.c.z, t2.c.q]))
def test_reduce_only_synonym_fk(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer, primary_key=True),
- Column('y', Integer))
- t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
- Column('q', Integer, ForeignKey('t1.y')))
+ t1 = Table(
+ "t1",
+ m,
+ Column("x", Integer, primary_key=True),
+ Column("y", Integer),
+ )
+ t2 = Table(
+ "t2",
+ m,
+ Column("x", Integer, ForeignKey("t1.x")),
+ Column("q", Integer, ForeignKey("t1.y")),
+ )
s1 = select([t1, t2])
s1 = s1.reduce_columns(only_synonyms=True)
- eq_(
- set(s1.c),
- set([s1.c.x, s1.c.y, s1.c.q])
- )
+ eq_(set(s1.c), set([s1.c.x, s1.c.y, s1.c.q]))
def test_reduce_only_synonym_lineage(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer, primary_key=True),
- Column('y', Integer),
- Column('z', Integer)
- )
+ t1 = Table(
+ "t1",
+ m,
+ Column("x", Integer, primary_key=True),
+ Column("y", Integer),
+ Column("z", Integer),
+ )
# test that the first appearance in the columns clause
# wins - t1 is first, t1.c.x wins
s1 = select([t1])
s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
eq_(
set(s2.reduce_columns().inner_columns),
- set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
+ set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]),
)
# reverse order, s1.c.x wins
@@ -1455,91 +1580,129 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
eq_(
set(s2.reduce_columns().inner_columns),
- set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
+ set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]),
)
def test_reduce_aliased_join(self):
metadata = MetaData()
people = Table(
- 'people', metadata, Column(
- 'person_id', Integer, Sequence(
- 'person_id_seq', optional=True), primary_key=True), Column(
- 'name', String(50)), Column(
- 'type', String(30)))
+ "people",
+ metadata,
+ Column(
+ "person_id",
+ Integer,
+ Sequence("person_id_seq", optional=True),
+ primary_key=True,
+ ),
+ Column("name", String(50)),
+ Column("type", String(30)),
+ )
engineers = Table(
- 'engineers',
+ "engineers",
metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'
- ), primary_key=True),
- Column('status', String(30)),
- Column('engineer_name', String(50)),
- Column('primary_language', String(50)),
+ Column(
+ "person_id",
+ Integer,
+ ForeignKey("people.person_id"),
+ primary_key=True,
+ ),
+ Column("status", String(30)),
+ Column("engineer_name", String(50)),
+ Column("primary_language", String(50)),
)
managers = Table(
- 'managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'),
- primary_key=True),
- Column('status', String(30)),
- Column('manager_name', String(50)))
- pjoin = \
- people.outerjoin(engineers).outerjoin(managers).\
- select(use_labels=True).alias('pjoin'
- )
- eq_(util.column_set(sql_util.reduce_columns(
- [pjoin.c.people_person_id, pjoin.c.engineers_person_id,
- pjoin.c.managers_person_id])),
- util.column_set([pjoin.c.people_person_id]))
+ "managers",
+ metadata,
+ Column(
+ "person_id",
+ Integer,
+ ForeignKey("people.person_id"),
+ primary_key=True,
+ ),
+ Column("status", String(30)),
+ Column("manager_name", String(50)),
+ )
+ pjoin = (
+ people.outerjoin(engineers)
+ .outerjoin(managers)
+ .select(use_labels=True)
+ .alias("pjoin")
+ )
+ eq_(
+ util.column_set(
+ sql_util.reduce_columns(
+ [
+ pjoin.c.people_person_id,
+ pjoin.c.engineers_person_id,
+ pjoin.c.managers_person_id,
+ ]
+ )
+ ),
+ util.column_set([pjoin.c.people_person_id]),
+ )
def test_reduce_aliased_union(self):
metadata = MetaData()
item_table = Table(
- 'item',
+ "item",
metadata,
Column(
- 'id',
- Integer,
- ForeignKey('base_item.id'),
- primary_key=True),
- Column(
- 'dummy',
- Integer,
- default=0))
+ "id", Integer, ForeignKey("base_item.id"), primary_key=True
+ ),
+ Column("dummy", Integer, default=0),
+ )
base_item_table = Table(
- 'base_item', metadata, Column(
- 'id', Integer, primary_key=True), Column(
- 'child_name', String(255), default=None))
+ "base_item",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("child_name", String(255), default=None),
+ )
from sqlalchemy.orm.util import polymorphic_union
- item_join = polymorphic_union({
- 'BaseItem':
- base_item_table.select(
- base_item_table.c.child_name
- == 'BaseItem'),
- 'Item': base_item_table.join(item_table)},
- None, 'item_join')
- eq_(util.column_set(sql_util.reduce_columns([item_join.c.id,
- item_join.c.dummy,
- item_join.c.child_name])),
- util.column_set([item_join.c.id,
- item_join.c.dummy,
- item_join.c.child_name]))
+
+ item_join = polymorphic_union(
+ {
+ "BaseItem": base_item_table.select(
+ base_item_table.c.child_name == "BaseItem"
+ ),
+ "Item": base_item_table.join(item_table),
+ },
+ None,
+ "item_join",
+ )
+ eq_(
+ util.column_set(
+ sql_util.reduce_columns(
+ [item_join.c.id, item_join.c.dummy, item_join.c.child_name]
+ )
+ ),
+ util.column_set(
+ [item_join.c.id, item_join.c.dummy, item_join.c.child_name]
+ ),
+ )
def test_reduce_aliased_union_2(self):
metadata = MetaData()
- page_table = Table('page', metadata, Column('id', Integer,
- primary_key=True))
- magazine_page_table = Table('magazine_page', metadata,
- Column('page_id', Integer,
- ForeignKey('page.id'),
- primary_key=True))
+ page_table = Table(
+ "page", metadata, Column("id", Integer, primary_key=True)
+ )
+ magazine_page_table = Table(
+ "magazine_page",
+ metadata,
+ Column(
+ "page_id", Integer, ForeignKey("page.id"), primary_key=True
+ ),
+ )
classified_page_table = Table(
- 'classified_page',
+ "classified_page",
metadata,
Column(
- 'magazine_page_id',
+ "magazine_page_id",
Integer,
- ForeignKey('magazine_page.page_id'),
- primary_key=True))
+ ForeignKey("magazine_page.page_id"),
+ primary_key=True,
+ ),
+ )
# this is essentially the union formed by the ORM's
# polymorphic_union function. we define two versions with
@@ -1549,25 +1712,33 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
# classified_page.magazine_page_id
pjoin = union(
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).
- select_from(
- page_table.join(magazine_page_table).
- join(classified_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).
- select_from(page_table.join(magazine_page_table))
- ).alias('pjoin')
- eq_(util.column_set(sql_util.reduce_columns(
- [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id]))
+ select(
+ [
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id,
+ ]
+ ).select_from(
+ page_table.join(magazine_page_table).join(
+ classified_page_table
+ )
+ ),
+ select(
+ [
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label("magazine_page_id"),
+ ]
+ ).select_from(page_table.join(magazine_page_table)),
+ ).alias("pjoin")
+ eq_(
+ util.column_set(
+ sql_util.reduce_columns(
+ [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id]
+ )
+ ),
+ util.column_set([pjoin.c.id]),
+ )
# the first selectable has a CAST, which is a placeholder for
# classified_page.magazine_page_id in the second selectable.
@@ -1576,45 +1747,70 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
# currently makes the external column look like that of the
# first selectable only.
- pjoin = union(select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).
- select_from(page_table.join(magazine_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).
- select_from(page_table.join(magazine_page_table).
- join(classified_page_table))
- ).alias('pjoin')
- eq_(util.column_set(sql_util.reduce_columns(
- [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id]))
+ pjoin = union(
+ select(
+ [
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label("magazine_page_id"),
+ ]
+ ).select_from(page_table.join(magazine_page_table)),
+ select(
+ [
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id,
+ ]
+ ).select_from(
+ page_table.join(magazine_page_table).join(
+ classified_page_table
+ )
+ ),
+ ).alias("pjoin")
+ eq_(
+ util.column_set(
+ sql_util.reduce_columns(
+ [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id]
+ )
+ ),
+ util.column_set([pjoin.c.id]),
+ )
class DerivedTest(fixtures.TestBase, AssertsExecutionResults):
-
def test_table(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
+ t1 = Table(
+ "t1",
+ meta,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", String(30)),
+ )
+ t2 = Table(
+ "t2",
+ meta,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", String(30)),
+ )
assert t1.is_derived_from(t1)
assert not t2.is_derived_from(t1)
def test_alias(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
+ t1 = Table(
+ "t1",
+ meta,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", String(30)),
+ )
+ t2 = Table(
+ "t2",
+ meta,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", String(30)),
+ )
assert t1.alias().is_derived_from(t1)
assert not t2.alias().is_derived_from(t1)
@@ -1624,44 +1820,43 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults):
def test_select(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
+ t1 = Table(
+ "t1",
+ meta,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", String(30)),
+ )
+ t2 = Table(
+ "t2",
+ meta,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", String(30)),
+ )
assert t1.select().is_derived_from(t1)
assert not t2.select().is_derived_from(t1)
assert select([t1, t2]).is_derived_from(t1)
- assert t1.select().alias('foo').is_derived_from(t1)
- assert select([t1, t2]).alias('foo').is_derived_from(t1)
- assert not t2.select().alias('foo').is_derived_from(t1)
+ assert t1.select().alias("foo").is_derived_from(t1)
+ assert select([t1, t2]).alias("foo").is_derived_from(t1)
+ assert not t2.select().alias("foo").is_derived_from(t1)
class AnnotationsTest(fixtures.TestBase):
-
def test_hashing(self):
- t = table('t', column('x'))
+ t = table("t", column("x"))
a = t.alias()
s = t.select()
s2 = a.select()
- for obj in [
- t,
- t.c.x,
- a,
- s,
- s2,
- t.c.x > 1,
- (t.c.x > 1).label(None)
- ]:
+ for obj in [t, t.c.x, a, s, s2, t.c.x > 1, (t.c.x > 1).label(None)]:
annot = obj._annotate({})
eq_(set([obj]), set([annot]))
def test_compare(self):
- t = table('t', column('x'), column('y'))
+ t = table("t", column("x"), column("y"))
x_a = t.c.x._annotate({})
assert t.c.x.compare(x_a)
assert x_a.compare(t.c.x)
@@ -1681,40 +1876,44 @@ class AnnotationsTest(fixtures.TestBase):
def test_late_name_add(self):
from sqlalchemy.schema import Column
+
c1 = Column(Integer)
c1_a = c1._annotate({"foo": "bar"})
- c1.name = 'somename'
- eq_(c1_a.name, 'somename')
+ c1.name = "somename"
+ eq_(c1_a.name, "somename")
def test_late_table_add(self):
c1 = Column("foo", Integer)
c1_a = c1._annotate({"foo": "bar"})
- t = Table('t', MetaData(), c1)
+ t = Table("t", MetaData(), c1)
is_(c1_a.table, t)
def test_basic_attrs(self):
- t = Table('t', MetaData(),
- Column('x', Integer, info={'q': 'p'}),
- Column('y', Integer, key='q'))
+ t = Table(
+ "t",
+ MetaData(),
+ Column("x", Integer, info={"q": "p"}),
+ Column("y", Integer, key="q"),
+ )
x_a = t.c.x._annotate({})
y_a = t.c.q._annotate({})
- t.c.x.info['z'] = 'h'
+ t.c.x.info["z"] = "h"
- eq_(y_a.key, 'q')
+ eq_(y_a.key, "q")
is_(x_a.table, t)
- eq_(x_a.info, {'q': 'p', 'z': 'h'})
+ eq_(x_a.info, {"q": "p", "z": "h"})
eq_(t.c.x.anon_label, x_a.anon_label)
def test_custom_constructions(self):
from sqlalchemy.schema import Column
class MyColumn(Column):
-
def __init__(self):
- Column.__init__(self, 'foo', Integer)
+ Column.__init__(self, "foo", Integer)
+
_constructor = Column
- t1 = Table('t1', MetaData(), MyColumn())
+ t1 = Table("t1", MetaData(), MyColumn())
s1 = t1.select()
assert isinstance(t1.c.foo, MyColumn)
assert isinstance(s1.c.foo, Column)
@@ -1734,8 +1933,9 @@ class AnnotationsTest(fixtures.TestBase):
pass
assert isinstance(
- MyColumn('x', Integer)._annotate({"foo": "bar"}),
- AnnotatedColumnElement)
+ MyColumn("x", Integer)._annotate({"foo": "bar"}),
+ AnnotatedColumnElement,
+ )
def test_custom_construction_correct_anno_expr(self):
# [ticket:2918]
@@ -1744,14 +1944,14 @@ class AnnotationsTest(fixtures.TestBase):
class MyColumn(Column):
pass
- col = MyColumn('x', Integer)
+ col = MyColumn("x", Integer)
binary_1 = col == 5
- col_anno = MyColumn('x', Integer)._annotate({"foo": "bar"})
+ col_anno = MyColumn("x", Integer)._annotate({"foo": "bar"})
binary_2 = col_anno == 5
eq_(binary_2.left._annotations, {"foo": "bar"})
def test_annotated_corresponding_column(self):
- table1 = table('table1', column("col1"))
+ table1 = table("table1", column("col1"))
s1 = select([table1.c.col1])
t1 = s1._annotate({})
@@ -1766,93 +1966,107 @@ class AnnotationsTest(fixtures.TestBase):
inner = select([s1])
- assert inner.corresponding_column(
- t2.c.col1,
- require_embedded=False) is inner.corresponding_column(
- t2.c.col1,
- require_embedded=True) is inner.c.col1
- assert inner.corresponding_column(
- t1.c.col1,
- require_embedded=False) is inner.corresponding_column(
- t1.c.col1,
- require_embedded=True) is inner.c.col1
+ assert (
+ inner.corresponding_column(t2.c.col1, require_embedded=False)
+ is inner.corresponding_column(t2.c.col1, require_embedded=True)
+ is inner.c.col1
+ )
+ assert (
+ inner.corresponding_column(t1.c.col1, require_embedded=False)
+ is inner.corresponding_column(t1.c.col1, require_embedded=True)
+ is inner.c.col1
+ )
def test_annotated_visit(self):
- table1 = table('table1', column("col1"), column("col2"))
+ table1 = table("table1", column("col1"), column("col2"))
- bin = table1.c.col1 == bindparam('foo', value=None)
+ bin = table1.c.col1 == bindparam("foo", value=None)
assert str(bin) == "table1.col1 = :foo"
def visit_binary(b):
b.right = table1.c.col2
- b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary})
+ b2 = visitors.cloned_traverse(bin, {}, {"binary": visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
- b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':
- visit_binary})
- assert str(b3) == 'table1.col1 = table1.col2'
+ b3 = visitors.cloned_traverse(
+ bin._annotate({}), {}, {"binary": visit_binary}
+ )
+ assert str(b3) == "table1.col1 = table1.col2"
def visit_binary(b):
- b.left = bindparam('bar')
+ b.left = bindparam("bar")
- b4 = visitors.cloned_traverse(b2, {}, {'binary': visit_binary})
+ b4 = visitors.cloned_traverse(b2, {}, {"binary": visit_binary})
assert str(b4) == ":bar = table1.col2"
- b5 = visitors.cloned_traverse(b3, {}, {'binary': visit_binary})
+ b5 = visitors.cloned_traverse(b3, {}, {"binary": visit_binary})
assert str(b5) == ":bar = table1.col2"
def test_label_accessors(self):
- t1 = table('t1', column('c1'))
+ t1 = table("t1", column("c1"))
l1 = t1.c.c1.label(None)
is_(l1._order_by_label_element, l1)
l1a = l1._annotate({"foo": "bar"})
is_(l1a._order_by_label_element, l1a)
def test_annotate_aliased(self):
- t1 = table('t1', column('c1'))
- s = select([(t1.c.c1 + 3).label('bat')])
+ t1 = table("t1", column("c1"))
+ s = select([(t1.c.c1 + 3).label("bat")])
a = s.alias()
- a = sql_util._deep_annotate(a, {'foo': 'bar'})
- eq_(a._annotations['foo'], 'bar')
- eq_(a.element._annotations['foo'], 'bar')
+ a = sql_util._deep_annotate(a, {"foo": "bar"})
+ eq_(a._annotations["foo"], "bar")
+ eq_(a.element._annotations["foo"], "bar")
def test_annotate_expressions(self):
- table1 = table('table1', column('col1'), column('col2'))
- for expr, expected in [(table1.c.col1, 'table1.col1'),
- (table1.c.col1 == 5,
- 'table1.col1 = :col1_1'),
- (table1.c.col1.in_([2, 3, 4]),
- 'table1.col1 IN (:col1_1, :col1_2, '
- ':col1_3)')]:
+ table1 = table("table1", column("col1"), column("col2"))
+ for expr, expected in [
+ (table1.c.col1, "table1.col1"),
+ (table1.c.col1 == 5, "table1.col1 = :col1_1"),
+ (
+ table1.c.col1.in_([2, 3, 4]),
+ "table1.col1 IN (:col1_1, :col1_2, " ":col1_3)",
+ ),
+ ]:
eq_(str(expr), expected)
eq_(str(expr._annotate({})), expected)
eq_(str(sql_util._deep_annotate(expr, {})), expected)
- eq_(str(sql_util._deep_annotate(
- expr, {}, exclude=[table1.c.col1])), expected)
+ eq_(
+ str(
+ sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1])
+ ),
+ expected,
+ )
def test_deannotate(self):
- table1 = table('table1', column("col1"), column("col2"))
+ table1 = table("table1", column("col1"), column("col2"))
- bin = table1.c.col1 == bindparam('foo', value=None)
+ bin = table1.c.col1 == bindparam("foo", value=None)
- b2 = sql_util._deep_annotate(bin, {'_orm_adapt': True})
+ b2 = sql_util._deep_annotate(bin, {"_orm_adapt": True})
b3 = sql_util._deep_deannotate(b2)
b4 = sql_util._deep_deannotate(bin)
for elem in (b2._annotations, b2.left._annotations):
- assert '_orm_adapt' in elem
+ assert "_orm_adapt" in elem
- for elem in b3._annotations, b3.left._annotations, \
- b4._annotations, b4.left._annotations:
+ for elem in (
+ b3._annotations,
+ b3.left._annotations,
+ b4._annotations,
+ b4.left._annotations,
+ ):
assert elem == {}
assert b2.left is not bin.left
assert b3.left is not b2.left and b2.left is not bin.left
assert b4.left is bin.left # since column is immutable
# deannotate copies the element
- assert bin.right is not b2.right and b2.right is not b3.right \
+ assert (
+ bin.right is not b2.right
+ and b2.right is not b3.right
and b3.right is not b4.right
+ )
def test_annotate_unique_traversal(self):
"""test that items are copied only once during
@@ -1864,16 +2078,14 @@ class AnnotationsTest(fixtures.TestBase):
case now, as deannotate is making
clones again in some cases.
"""
- table1 = table('table1', column('x'))
- table2 = table('table2', column('y'))
+ table1 = table("table1", column("x"))
+ table2 = table("table2", column("y"))
a1 = table1.alias()
- s = select([a1.c.x]).select_from(
- a1.join(table2, a1.c.x == table2.c.y)
- )
+ s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y))
for sel in (
sql_util._deep_deannotate(s),
visitors.cloned_traverse(s, {}, {}),
- visitors.replacement_traverse(s, {}, lambda x: None)
+ visitors.replacement_traverse(s, {}, lambda x: None),
):
# the columns clause isn't changed at all
assert sel._raw_columns[0].table is a1
@@ -1886,7 +2098,7 @@ class AnnotationsTest(fixtures.TestBase):
# when encountered.
for sel in (
sql_util._deep_deannotate(s, {"foo": "bar"}),
- sql_util._deep_annotate(s, {'foo': 'bar'}),
+ sql_util._deep_annotate(s, {"foo": "bar"}),
):
assert sel._froms[0] is not sel._froms[1].left
@@ -1899,7 +2111,7 @@ class AnnotationsTest(fixtures.TestBase):
preserving them when deep_annotate is run on them.
"""
- t1 = table('table1', column("col1"), column("col2"))
+ t1 = table("table1", column("col1"), column("col2"))
s = select([t1.c.col1._annotate({"foo": "bar"})])
s2 = select([t1.c.col1._annotate({"bat": "hoho"})])
s3 = s.union(s2)
@@ -1907,42 +2119,40 @@ class AnnotationsTest(fixtures.TestBase):
eq_(
sel.selects[0]._raw_columns[0]._annotations,
- {"foo": "bar", "new": "thing"}
+ {"foo": "bar", "new": "thing"},
)
eq_(
sel.selects[1]._raw_columns[0]._annotations,
- {"bat": "hoho", "new": "thing"}
+ {"bat": "hoho", "new": "thing"},
)
def test_deannotate_2(self):
- table1 = table('table1', column("col1"), column("col2"))
- j = table1.c.col1._annotate({"remote": True}) == \
- table1.c.col2._annotate({"local": True})
+ table1 = table("table1", column("col1"), column("col2"))
+ j = table1.c.col1._annotate(
+ {"remote": True}
+ ) == table1.c.col2._annotate({"local": True})
j2 = sql_util._deep_deannotate(j)
- eq_(
- j.left._annotations, {"remote": True}
- )
- eq_(
- j2.left._annotations, {}
- )
+ eq_(j.left._annotations, {"remote": True})
+ eq_(j2.left._annotations, {})
def test_deannotate_3(self):
- table1 = table('table1', column("col1"), column("col2"),
- column("col3"), column("col4"))
+ table1 = table(
+ "table1",
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ column("col4"),
+ )
j = and_(
- table1.c.col1._annotate({"remote": True}) ==
- table1.c.col2._annotate({"local": True}),
- table1.c.col3._annotate({"remote": True}) ==
- table1.c.col4._annotate({"local": True})
+ table1.c.col1._annotate({"remote": True})
+ == table1.c.col2._annotate({"local": True}),
+ table1.c.col3._annotate({"remote": True})
+ == table1.c.col4._annotate({"local": True}),
)
j2 = sql_util._deep_deannotate(j)
- eq_(
- j.clauses[0].left._annotations, {"remote": True}
- )
- eq_(
- j2.clauses[0].left._annotations, {}
- )
+ eq_(j.clauses[0].left._annotations, {"remote": True})
+ eq_(j2.clauses[0].left._annotations, {})
def test_annotate_fromlist_preservation(self):
"""test the FROM list in select still works
@@ -1952,37 +2162,34 @@ class AnnotationsTest(fixtures.TestBase):
#2453, continued
"""
- table1 = table('table1', column('x'))
- table2 = table('table2', column('y'))
+ table1 = table("table1", column("x"))
+ table2 = table("table2", column("y"))
a1 = table1.alias()
- s = select([a1.c.x]).select_from(
- a1.join(table2, a1.c.x == table2.c.y)
- )
+ s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y))
assert_s = select([select([s])])
for fn in (
sql_util._deep_deannotate,
- lambda s: sql_util._deep_annotate(s, {'foo': 'bar'}),
+ lambda s: sql_util._deep_annotate(s, {"foo": "bar"}),
lambda s: visitors.cloned_traverse(s, {}, {}),
- lambda s: visitors.replacement_traverse(s, {}, lambda x: None)
+ lambda s: visitors.replacement_traverse(s, {}, lambda x: None),
):
sel = fn(select([fn(select([fn(s)]))]))
eq_(str(assert_s), str(sel))
def test_bind_unique_test(self):
- table('t', column('a'), column('b'))
+ table("t", column("a"), column("b"))
b = bindparam("bind", value="x", unique=True)
# the annotation of "b" should render the
# same. The "unique" test in compiler should
# also pass, [ticket:2425]
- eq_(str(or_(b, b._annotate({"foo": "bar"}))),
- ":bind_1 OR :bind_1")
+ eq_(str(or_(b, b._annotate({"foo": "bar"}))), ":bind_1 OR :bind_1")
def test_comparators_cleaned_out_construction(self):
- c = column('a')
+ c = column("a")
comp1 = c.comparator
@@ -1991,7 +2198,7 @@ class AnnotationsTest(fixtures.TestBase):
assert comp1 is not comp2
def test_comparators_cleaned_out_reannotate(self):
- c = column('a')
+ c = column("a")
c1 = c._annotate({"foo": "bar"})
comp1 = c1.comparator
@@ -2002,7 +2209,7 @@ class AnnotationsTest(fixtures.TestBase):
assert comp1 is not comp2
def test_comparator_cleanout_integration(self):
- c = column('a')
+ c = column("a")
c1 = c._annotate({"foo": "bar"})
comp1 = c1.comparator
@@ -2018,8 +2225,8 @@ class ReprTest(fixtures.TestBase):
for obj in [
elements.Cast(1, 2),
elements.TypeClause(String()),
- elements.ColumnClause('x'),
- elements.BindParameter('q'),
+ elements.ColumnClause("x"),
+ elements.BindParameter("q"),
elements.Null(),
elements.True_(),
elements.False_(),
@@ -2027,22 +2234,21 @@ class ReprTest(fixtures.TestBase):
elements.BooleanClauseList.and_(),
elements.Tuple(),
elements.Case([]),
- elements.Extract('foo', column('x')),
- elements.UnaryExpression(column('x')),
- elements.Grouping(column('x')),
+ elements.Extract("foo", column("x")),
+ elements.UnaryExpression(column("x")),
+ elements.Grouping(column("x")),
elements.Over(func.foo()),
- elements.Label('q', column('x')),
+ elements.Label("q", column("x")),
]:
repr(obj)
class WithLabelsTest(fixtures.TestBase):
-
def _assert_labels_warning(self, s):
assert_raises_message(
exc.SAWarning,
r"replaced by Column.*, which has the same key",
- lambda: s.c
+ lambda: s.c,
)
def _assert_result_keys(self, s, keys):
@@ -2055,147 +2261,128 @@ class WithLabelsTest(fixtures.TestBase):
def _names_overlap(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer))
- t2 = Table('t2', m, Column('x', Integer))
+ t1 = Table("t1", m, Column("x", Integer))
+ t2 = Table("t2", m, Column("x", Integer))
return select([t1, t2])
def test_names_overlap_nolabel(self):
sel = self._names_overlap()
self._assert_labels_warning(sel)
- self._assert_result_keys(sel, ['x'])
+ self._assert_result_keys(sel, ["x"])
def test_names_overlap_label(self):
sel = self._names_overlap().apply_labels()
- eq_(
- list(sel.c.keys()),
- ['t1_x', 't2_x']
- )
- self._assert_result_keys(sel, ['t1_x', 't2_x'])
+ eq_(list(sel.c.keys()), ["t1_x", "t2_x"])
+ self._assert_result_keys(sel, ["t1_x", "t2_x"])
def _names_overlap_keys_dont(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer, key='a'))
- t2 = Table('t2', m, Column('x', Integer, key='b'))
+ t1 = Table("t1", m, Column("x", Integer, key="a"))
+ t2 = Table("t2", m, Column("x", Integer, key="b"))
return select([t1, t2])
def test_names_overlap_keys_dont_nolabel(self):
sel = self._names_overlap_keys_dont()
- eq_(
- list(sel.c.keys()),
- ['a', 'b']
- )
- self._assert_result_keys(sel, ['x'])
+ eq_(list(sel.c.keys()), ["a", "b"])
+ self._assert_result_keys(sel, ["x"])
def test_names_overlap_keys_dont_label(self):
sel = self._names_overlap_keys_dont().apply_labels()
- eq_(
- list(sel.c.keys()),
- ['t1_a', 't2_b']
- )
- self._assert_result_keys(sel, ['t1_x', 't2_x'])
+ eq_(list(sel.c.keys()), ["t1_a", "t2_b"])
+ self._assert_result_keys(sel, ["t1_x", "t2_x"])
def _labels_overlap(self):
m = MetaData()
- t1 = Table('t', m, Column('x_id', Integer))
- t2 = Table('t_x', m, Column('id', Integer))
+ t1 = Table("t", m, Column("x_id", Integer))
+ t2 = Table("t_x", m, Column("id", Integer))
return select([t1, t2])
def test_labels_overlap_nolabel(self):
sel = self._labels_overlap()
- eq_(
- list(sel.c.keys()),
- ['x_id', 'id']
- )
- self._assert_result_keys(sel, ['x_id', 'id'])
+ eq_(list(sel.c.keys()), ["x_id", "id"])
+ self._assert_result_keys(sel, ["x_id", "id"])
def test_labels_overlap_label(self):
sel = self._labels_overlap().apply_labels()
t2 = sel.froms[1]
- eq_(
- list(sel.c.keys()),
- ['t_x_id', t2.c.id.anon_label]
- )
- self._assert_result_keys(sel, ['t_x_id', 'id_1'])
- self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
+ eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label])
+ self._assert_result_keys(sel, ["t_x_id", "id_1"])
+ self._assert_subq_result_keys(sel, ["t_x_id", "id_1"])
def _labels_overlap_keylabels_dont(self):
m = MetaData()
- t1 = Table('t', m, Column('x_id', Integer, key='a'))
- t2 = Table('t_x', m, Column('id', Integer, key='b'))
+ t1 = Table("t", m, Column("x_id", Integer, key="a"))
+ t2 = Table("t_x", m, Column("id", Integer, key="b"))
return select([t1, t2])
def test_labels_overlap_keylabels_dont_nolabel(self):
sel = self._labels_overlap_keylabels_dont()
- eq_(list(sel.c.keys()), ['a', 'b'])
- self._assert_result_keys(sel, ['x_id', 'id'])
+ eq_(list(sel.c.keys()), ["a", "b"])
+ self._assert_result_keys(sel, ["x_id", "id"])
def test_labels_overlap_keylabels_dont_label(self):
sel = self._labels_overlap_keylabels_dont().apply_labels()
- eq_(list(sel.c.keys()), ['t_a', 't_x_b'])
- self._assert_result_keys(sel, ['t_x_id', 'id_1'])
+ eq_(list(sel.c.keys()), ["t_a", "t_x_b"])
+ self._assert_result_keys(sel, ["t_x_id", "id_1"])
def _keylabels_overlap_labels_dont(self):
m = MetaData()
- t1 = Table('t', m, Column('a', Integer, key='x_id'))
- t2 = Table('t_x', m, Column('b', Integer, key='id'))
+ t1 = Table("t", m, Column("a", Integer, key="x_id"))
+ t2 = Table("t_x", m, Column("b", Integer, key="id"))
return select([t1, t2])
def test_keylabels_overlap_labels_dont_nolabel(self):
sel = self._keylabels_overlap_labels_dont()
- eq_(list(sel.c.keys()), ['x_id', 'id'])
- self._assert_result_keys(sel, ['a', 'b'])
+ eq_(list(sel.c.keys()), ["x_id", "id"])
+ self._assert_result_keys(sel, ["a", "b"])
def test_keylabels_overlap_labels_dont_label(self):
sel = self._keylabels_overlap_labels_dont().apply_labels()
t2 = sel.froms[1]
- eq_(list(sel.c.keys()), ['t_x_id', t2.c.id.anon_label])
- self._assert_result_keys(sel, ['t_a', 't_x_b'])
- self._assert_subq_result_keys(sel, ['t_a', 't_x_b'])
+ eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label])
+ self._assert_result_keys(sel, ["t_a", "t_x_b"])
+ self._assert_subq_result_keys(sel, ["t_a", "t_x_b"])
def _keylabels_overlap_labels_overlap(self):
m = MetaData()
- t1 = Table('t', m, Column('x_id', Integer, key='x_a'))
- t2 = Table('t_x', m, Column('id', Integer, key='a'))
+ t1 = Table("t", m, Column("x_id", Integer, key="x_a"))
+ t2 = Table("t_x", m, Column("id", Integer, key="a"))
return select([t1, t2])
def test_keylabels_overlap_labels_overlap_nolabel(self):
sel = self._keylabels_overlap_labels_overlap()
- eq_(list(sel.c.keys()), ['x_a', 'a'])
- self._assert_result_keys(sel, ['x_id', 'id'])
- self._assert_subq_result_keys(sel, ['x_id', 'id'])
+ eq_(list(sel.c.keys()), ["x_a", "a"])
+ self._assert_result_keys(sel, ["x_id", "id"])
+ self._assert_subq_result_keys(sel, ["x_id", "id"])
def test_keylabels_overlap_labels_overlap_label(self):
sel = self._keylabels_overlap_labels_overlap().apply_labels()
t2 = sel.froms[1]
- eq_(list(sel.c.keys()), ['t_x_a', t2.c.a.anon_label])
- self._assert_result_keys(sel, ['t_x_id', 'id_1'])
- self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
+ eq_(list(sel.c.keys()), ["t_x_a", t2.c.a.anon_label])
+ self._assert_result_keys(sel, ["t_x_id", "id_1"])
+ self._assert_subq_result_keys(sel, ["t_x_id", "id_1"])
def _keys_overlap_names_dont(self):
m = MetaData()
- t1 = Table('t1', m, Column('a', Integer, key='x'))
- t2 = Table('t2', m, Column('b', Integer, key='x'))
+ t1 = Table("t1", m, Column("a", Integer, key="x"))
+ t2 = Table("t2", m, Column("b", Integer, key="x"))
return select([t1, t2])
def test_keys_overlap_names_dont_nolabel(self):
sel = self._keys_overlap_names_dont()
self._assert_labels_warning(sel)
- self._assert_result_keys(sel, ['a', 'b'])
+ self._assert_result_keys(sel, ["a", "b"])
def test_keys_overlap_names_dont_label(self):
sel = self._keys_overlap_names_dont().apply_labels()
- eq_(
- list(sel.c.keys()),
- ['t1_x', 't2_x']
- )
- self._assert_result_keys(sel, ['t1_a', 't2_b'])
+ eq_(list(sel.c.keys()), ["t1_x", "t2_x"])
+ self._assert_result_keys(sel, ["t1_a", "t2_b"])
class ResultMapTest(fixtures.TestBase):
-
def _fixture(self):
m = MetaData()
- t = Table('t', m, Column('x', Integer), Column('y', Integer))
+ t = Table("t", m, Column("x", Integer), Column("y", Integer))
return t
def _mapping(self, stmt):
@@ -2208,7 +2395,7 @@ class ResultMapTest(fixtures.TestBase):
def test_select_label_alt_name(self):
t = self._fixture()
- l1, l2 = t.c.x.label('a'), t.c.y.label('b')
+ l1, l2 = t.c.x.label("a"), t.c.y.label("b")
s = select([l1, l2])
mapping = self._mapping(s)
assert l1 in mapping
@@ -2217,7 +2404,7 @@ class ResultMapTest(fixtures.TestBase):
def test_select_alias_label_alt_name(self):
t = self._fixture()
- l1, l2 = t.c.x.label('a'), t.c.y.label('b')
+ l1, l2 = t.c.x.label("a"), t.c.y.label("b")
s = select([l1, l2]).alias()
mapping = self._mapping(s)
assert l1 in mapping
@@ -2253,7 +2440,7 @@ class ResultMapTest(fixtures.TestBase):
x, y = t.c.x, t.c.y
ta = t.alias()
- l1, l2 = ta.c.x.label('a'), ta.c.y.label('b')
+ l1, l2 = ta.c.x.label("a"), ta.c.y.label("b")
s = select([l1, l2])
mapping = self._mapping(s)
@@ -2268,34 +2455,40 @@ class ResultMapTest(fixtures.TestBase):
assert t.c.x not in mapping
eq_(
[type(entry[-1]) for entry in s.compile()._result_columns],
- [Boolean]
+ [Boolean],
)
def test_plain_exists(self):
expr = exists([1])
eq_(type(expr.type), Boolean)
eq_(
- [type(entry[-1]) for
- entry in select([expr]).compile()._result_columns],
- [Boolean]
+ [
+ type(entry[-1])
+ for entry in select([expr]).compile()._result_columns
+ ],
+ [Boolean],
)
def test_plain_exists_negate(self):
expr = ~exists([1])
eq_(type(expr.type), Boolean)
eq_(
- [type(entry[-1]) for
- entry in select([expr]).compile()._result_columns],
- [Boolean]
+ [
+ type(entry[-1])
+ for entry in select([expr]).compile()._result_columns
+ ],
+ [Boolean],
)
def test_plain_exists_double_negate(self):
expr = ~(~exists([1]))
eq_(type(expr.type), Boolean)
eq_(
- [type(entry[-1]) for
- entry in select([expr]).compile()._result_columns],
- [Boolean]
+ [
+ type(entry[-1])
+ for entry in select([expr]).compile()._result_columns
+ ],
+ [Boolean],
)
def test_column_subquery_plain(self):
@@ -2307,7 +2500,7 @@ class ResultMapTest(fixtures.TestBase):
assert s1 in mapping
eq_(
[type(entry[-1]) for entry in s2.compile()._result_columns],
- [Integer]
+ [Integer],
)
def test_unary_boolean(self):
@@ -2315,7 +2508,7 @@ class ResultMapTest(fixtures.TestBase):
s1 = select([not_(True)], use_labels=True)
eq_(
[type(entry[-1]) for entry in s1.compile()._result_columns],
- [Boolean]
+ [Boolean],
)
@@ -2323,19 +2516,15 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
def _assert_legacy(self, leg, read=False, nowait=False):
- t = table('t', column('c'))
+ t = table("t", column("c"))
s1 = select([t], for_update=leg)
if leg is False:
assert s1._for_update_arg is None
assert s1.for_update is None
else:
- eq_(
- s1._for_update_arg.read, read
- )
- eq_(
- s1._for_update_arg.nowait, nowait
- )
+ eq_(s1._for_update_arg.read, read)
+ eq_(s1._for_update_arg.nowait, nowait)
eq_(s1.for_update, leg)
def test_false_legacy(self):
@@ -2354,28 +2543,30 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
self._assert_legacy("read_nowait", read=True, nowait=True)
def test_legacy_setter(self):
- t = table('t', column('c'))
+ t = table("t", column("c"))
s = select([t])
- s.for_update = 'nowait'
+ s.for_update = "nowait"
eq_(s._for_update_arg.nowait, True)
def test_basic_clone(self):
- t = table('t', column('c'))
+ t = table("t", column("c"))
s = select([t]).with_for_update(read=True, of=t.c.c)
s2 = visitors.ReplacingCloningVisitor().traverse(s)
assert s2._for_update_arg is not s._for_update_arg
eq_(s2._for_update_arg.read, True)
eq_(s2._for_update_arg.of, [t.c.c])
- self.assert_compile(s2,
- "SELECT t.c FROM t FOR SHARE OF t",
- dialect="postgresql")
+ self.assert_compile(
+ s2, "SELECT t.c FROM t FOR SHARE OF t", dialect="postgresql"
+ )
def test_adapt(self):
- t = table('t', column('c'))
+ t = table("t", column("c"))
s = select([t]).with_for_update(read=True, of=t.c.c)
a = t.alias()
s2 = sql_util.ClauseAdapter(a).traverse(s)
eq_(s2._for_update_arg.of, [a.c.c])
- self.assert_compile(s2,
- "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1",
- dialect="postgresql")
+ self.assert_compile(
+ s2,
+ "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1",
+ dialect="postgresql",
+ )