diff options
Diffstat (limited to 'test/sql/test_selectable.py')
| -rw-r--r-- | test/sql/test_selectable.py | 1857 |
1 files changed, 1024 insertions, 833 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 4b92e3e3e..023a0bc61 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,10 +1,12 @@ """Test various algorithmic properties of selectables.""" -from sqlalchemy.testing import eq_, assert_raises, \ - assert_raises_message, is_ +from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, is_ from sqlalchemy import * -from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ - AssertsExecutionResults +from sqlalchemy.testing import ( + fixtures, + AssertsCompiledSQL, + AssertsExecutionResults, +) from sqlalchemy.sql import elements from sqlalchemy import testing from sqlalchemy.sql import util as sql_util, visitors, expression @@ -14,33 +16,37 @@ from sqlalchemy import util from sqlalchemy.schema import Column, Table, MetaData metadata = MetaData() -table1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Integer), - Column('colx', Integer), - - ) - -table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, ForeignKey('table1.col1')), - Column('col3', String(20)), - Column('coly', Integer), - ) - -keyed = Table('keyed', metadata, - Column('x', Integer, key='colx'), - Column('y', Integer, key='coly'), - Column('z', Integer), - ) +table1 = Table( + "table1", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", String(20)), + Column("col3", Integer), + Column("colx", Integer), +) + +table2 = Table( + "table2", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", Integer, ForeignKey("table1.col1")), + Column("col3", String(20)), + Column("coly", Integer), +) + +keyed = Table( + "keyed", + metadata, + Column("x", Integer, key="colx"), + Column("y", Integer, key="coly"), + Column("z", Integer), +) class SelectableTest( - fixtures.TestBase, - AssertsExecutionResults, - AssertsCompiledSQL): - __dialect__ = 'default' + fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL +): + __dialect__ = "default" def test_indirect_correspondence_on_labels(self): # this test depends upon 'distance' to @@ -48,8 +54,13 @@ class SelectableTest( # same column three times - s = select([table1.c.col1.label('c2'), table1.c.col1, - table1.c.col1.label('c1')]) + s = select( + [ + table1.c.col1.label("c2"), + table1.c.col1, + table1.c.col1.label("c1"), + ] + ) # this tests the same thing as # test_direct_correspondence_on_labels below - @@ -60,25 +71,25 @@ class SelectableTest( assert s.corresponding_column(s.c.c1) is s.c.c1 def test_labeled_subquery_twice(self): - scalar_select = select([table1.c.col1]).label('foo') + scalar_select = select([table1.c.col1]).label("foo") s1 = select([scalar_select]) s2 = select([scalar_select, scalar_select]) eq_( s1.c.foo.proxy_set, - set([s1.c.foo, scalar_select, scalar_select.element]) + set([s1.c.foo, scalar_select, scalar_select.element]), ) eq_( s2.c.foo.proxy_set, - set([s2.c.foo, scalar_select, scalar_select.element]) + set([s2.c.foo, scalar_select, scalar_select.element]), ) assert s1.corresponding_column(scalar_select) is s1.c.foo assert s2.corresponding_column(scalar_select) is s2.c.foo def test_label_grouped_still_corresponds(self): - label = select([table1.c.col1]).label('foo') + label = select([table1.c.col1]).label("foo") label2 = label.self_group() s1 = select([label]) @@ -90,14 +101,14 @@ class SelectableTest( # this test depends on labels being part # of the proxy set to get the right result - l1, l2 = table1.c.col1.label('foo'), table1.c.col1.label('bar') + l1, l2 = table1.c.col1.label("foo"), table1.c.col1.label("bar") sel = select([l1, l2]) sel2 = sel.alias() assert sel2.corresponding_column(l1) is sel2.c.foo assert sel2.corresponding_column(l2) is sel2.c.bar - sel2 = select([table1.c.col1.label('foo'), table1.c.col2.label('bar')]) + sel2 = select([table1.c.col1.label("foo"), table1.c.col2.label("bar")]) sel3 = sel.union(sel2).alias() assert sel3.corresponding_column(l1) is sel3.c.foo @@ -105,9 +116,9 @@ class SelectableTest( def test_keyed_gen(self): s = select([keyed]) - eq_(s.c.colx.key, 'colx') + eq_(s.c.colx.key, "colx") - eq_(s.c.colx.name, 'x') + eq_(s.c.colx.name, "x") assert s.corresponding_column(keyed.c.colx) is s.c.colx assert s.corresponding_column(keyed.c.coly) is s.c.coly @@ -131,26 +142,26 @@ class SelectableTest( assert sel2.corresponding_column(keyed.c.z) is sel2.c.keyed_z def test_keyed_c_collection_upper(self): - c = Column('foo', Integer, key='bar') - t = Table('t', MetaData(), c) + c = Column("foo", Integer, key="bar") + t = Table("t", MetaData(), c) is_(t.c.bar, c) def test_keyed_c_collection_lower(self): - c = column('foo') - c.key = 'bar' - t = table('t', c) + c = column("foo") + c.key = "bar" + t = table("t", c) is_(t.c.bar, c) def test_clone_c_proxy_key_upper(self): - c = Column('foo', Integer, key='bar') - t = Table('t', MetaData(), c) + c = Column("foo", Integer, key="bar") + t = Table("t", MetaData(), c) s = select([t])._clone() assert c in s.c.bar.proxy_set def test_clone_c_proxy_key_lower(self): - c = column('foo') - c.key = 'bar' - t = table('t', c) + c = column("foo") + c.key = "bar" + t = table("t", c) s = select([t])._clone() assert c in s.c.bar.proxy_set @@ -160,19 +171,16 @@ class SelectableTest( def myop(x, y): pass - t = table('t', column('x'), column('y')) + t = table("t", column("x"), column("y")) expr = BinaryExpression(t.c.x, t.c.y, myop) s = select([t, expr]) - eq_( - s.c.keys(), - ['x', 'y', expr.anon_label] - ) + eq_(s.c.keys(), ["x", "y", expr.anon_label]) def test_cloned_intersection(self): - t1 = table('t1', column('x')) - t2 = table('t2', column('x')) + t1 = table("t1", column("x")) + t2 = table("t2", column("x")) s1 = t1.select() s2 = t2.select() @@ -184,15 +192,13 @@ class SelectableTest( s3c1 = s3._clone() eq_( - expression._cloned_intersection( - [s1c1, s3c1], [s2c1, s1c2] - ), - set([s1c1]) + expression._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]), + set([s1c1]), ) def test_cloned_difference(self): - t1 = table('t1', column('x')) - t2 = table('t2', column('x')) + t1 = table("t1", column("x")) + t2 = table("t2", column("x")) s1 = t1.select() s2 = t2.select() @@ -205,75 +211,70 @@ class SelectableTest( s3c1 = s3._clone() eq_( - expression._cloned_difference( - [s1c1, s2c1, s3c1], [s2c1, s1c2] - ), - set([s3c1]) + expression._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]), + set([s3c1]), ) def test_distance_on_aliases(self): - a1 = table1.alias('a1') - for s in (select([a1, table1], use_labels=True), - select([table1, a1], use_labels=True)): - assert s.corresponding_column(table1.c.col1) \ - is s.c.table1_col1 + a1 = table1.alias("a1") + for s in ( + select([a1, table1], use_labels=True), + select([table1, a1], use_labels=True), + ): + assert s.corresponding_column(table1.c.col1) is s.c.table1_col1 assert s.corresponding_column(a1.c.col1) is s.c.a1_col1 def test_join_against_self(self): - jj = select([table1.c.col1.label('bar_col1')]) + jj = select([table1.c.col1.label("bar_col1")]) jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) # test column directly against itself - assert jjj.corresponding_column(jjj.c.table1_col1) \ - is jjj.c.table1_col1 + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 # test alias of the join - j2 = jjj.alias('foo') - assert j2.corresponding_column(table1.c.col1) \ - is j2.c.table1_col1 + j2 = jjj.alias("foo") + assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1 def test_clone_append_column(self): - sel = select([literal_column('1').label('a')]) - eq_(list(sel.c.keys()), ['a']) + sel = select([literal_column("1").label("a")]) + eq_(list(sel.c.keys()), ["a"]) cloned = visitors.ReplacingCloningVisitor().traverse(sel) - cloned.append_column(literal_column('2').label('b')) + cloned.append_column(literal_column("2").label("b")) cloned.append_column(func.foo()) - eq_(list(cloned.c.keys()), ['a', 'b', 'foo()']) + eq_(list(cloned.c.keys()), ["a", "b", "foo()"]) def test_append_column_after_replace_selectable(self): - basesel = select([literal_column('1').label('a')]) - tojoin = select([ - literal_column('1').label('a'), - literal_column('2').label('b') - ]) - basefrom = basesel.alias('basefrom') - joinfrom = tojoin.alias('joinfrom') + basesel = select([literal_column("1").label("a")]) + tojoin = select( + [literal_column("1").label("a"), literal_column("2").label("b")] + ) + basefrom = basesel.alias("basefrom") + joinfrom = tojoin.alias("joinfrom") sel = select([basefrom.c.a]) replaced = sel.replace_selectable( - basefrom, - basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a) + basefrom, basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a) ) self.assert_compile( replaced, "SELECT basefrom.a FROM (SELECT 1 AS a) AS basefrom " "JOIN (SELECT 1 AS a, 2 AS b) AS joinfrom " - "ON basefrom.a = joinfrom.a" + "ON basefrom.a = joinfrom.a", ) replaced.append_column(joinfrom.c.b) self.assert_compile( replaced, "SELECT basefrom.a, joinfrom.b FROM (SELECT 1 AS a) AS basefrom " "JOIN (SELECT 1 AS a, 2 AS b) AS joinfrom " - "ON basefrom.a = joinfrom.a" + "ON basefrom.a = joinfrom.a", ) def test_against_cloned_non_table(self): # test that corresponding column digs across # clone boundaries with anonymous labeled elements - col = func.count().label('foo') + col = func.count().label("foo") sel = select([col]) sel2 = visitors.ReplacingCloningVisitor().traverse(sel) @@ -287,14 +288,14 @@ class SelectableTest( self.assert_compile( s1.with_only_columns([s1]), "SELECT (SELECT table1.col1, table1.col2, " - "table1.col3, table1.colx FROM table1) AS anon_1" + "table1.col3, table1.colx FROM table1) AS anon_1", ) def test_type_coerce_preserve_subq(self): class MyType(TypeDecorator): impl = Integer - stmt = select([type_coerce(column('x'), MyType).label('foo')]) + stmt = select([type_coerce(column("x"), MyType).label("foo")]) stmt2 = stmt.select() assert isinstance(stmt._raw_columns[0].type, MyType) assert isinstance(stmt.c.foo.type, MyType) @@ -303,30 +304,32 @@ class SelectableTest( def test_select_on_table(self): sel = select([table1, table2], use_labels=True) - assert sel.corresponding_column(table1.c.col1) \ + assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1 + assert ( + sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1 - assert sel.corresponding_column( - table1.c.col1, - require_embedded=True) is sel.c.table1_col1 - assert table1.corresponding_column(sel.c.table1_col1) \ - is table1.c.col1 - assert table1.corresponding_column(sel.c.table1_col1, - require_embedded=True) is None + ) + assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1 + assert ( + table1.corresponding_column( + sel.c.table1_col1, require_embedded=True + ) + is None + ) def test_join_against_join(self): j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2) - jj = select([table1.c.col1.label('bar_col1')], - from_obj=[j]).alias('foo') + jj = select([table1.c.col1.label("bar_col1")], from_obj=[j]).alias( + "foo" + ) jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) - assert jjj.corresponding_column(jjj.c.table1_col1) \ - is jjj.c.table1_col1 - j2 = jjj.alias('foo') - assert j2.corresponding_column(jjj.c.table1_col1) \ - is j2.c.table1_col1 + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + j2 = jjj.alias("foo") + assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 def test_table_alias(self): - a = table1.alias('a') + a = table1.alias("a") j = join(a, table2) @@ -338,13 +341,13 @@ class SelectableTest( # prominent w/ PostgreSQL's tuple functions stmt = select([table1.c.col1, table1.c.col2]) - a = stmt.alias('a') + a = stmt.alias("a") self.assert_compile( select([func.foo(a)]), "SELECT foo(SELECT table1.col1, table1.col2 FROM table1) " "AS foo_1 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2 FROM table1) " - "AS a" + "AS a", ) def test_union(self): @@ -353,15 +356,25 @@ class SelectableTest( # with a certain Table, against a column in a Union where one of # its underlying Selects matches to that same Table - u = select([table1.c.col1, - table1.c.col2, - table1.c.col3, - table1.c.colx, - null().label('coly')]).union(select([table2.c.col1, - table2.c.col2, - table2.c.col3, - null().label('colx'), - table2.c.coly])) + u = select( + [ + table1.c.col1, + table1.c.col2, + table1.c.col3, + table1.c.colx, + null().label("coly"), + ] + ).union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -388,8 +401,10 @@ class SelectableTest( assert u1.corresponding_column(table1.c.col3) is u1.c.col1 def test_singular_union(self): - u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), - select([table1.c.col1, table1.c.col2, table1.c.col3])) + u = union( + select([table1.c.col1, table1.c.col2, table1.c.col3]), + select([table1.c.col1, table1.c.col2, table1.c.col3]), + ) u = union(select([table1.c.col1, table1.c.col2, table1.c.col3])) assert u.c.col1 is not None assert u.c.col2 is not None @@ -399,14 +414,29 @@ class SelectableTest( # same as testunion, except its an alias of the union - u = select([table1.c.col1, + u = ( + select( + [ + table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, - null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, - null().label('colx'), table2.c.coly]) - ).alias('analias') + null().label("coly"), + ] + ) + .union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) + .alias("analias") + ) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 @@ -429,7 +459,8 @@ class SelectableTest( def test_union_of_text(self): s1 = select([table1.c.col1, table1.c.col2]) s2 = text("select col1, col2 from foo").columns( - column('col1'), column('col2')) + column("col1"), column("col2") + ) u1 = union(s1, s2) assert u1.corresponding_column(s1.c.col1) is u1.c.col1 @@ -445,8 +476,10 @@ class SelectableTest( s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) u1 = union(s1, s2) - assert u1.corresponding_column( - s1.c._all_columns[0]) is u1.c._all_columns[0] + assert ( + u1.corresponding_column(s1.c._all_columns[0]) + is u1.c._all_columns[0] + ) assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -462,8 +495,10 @@ class SelectableTest( s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) u1 = union(s1, s2) - assert u1.corresponding_column( - s1.c._all_columns[0]) is u1.c._all_columns[0] + assert ( + u1.corresponding_column(s1.c._all_columns[0]) + is u1.c._all_columns[0] + ) assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -477,13 +512,18 @@ class SelectableTest( @testing.emits_warning("Column 'col1'") def test_union_alias_dupe_keys_grouped(self): - s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).\ - limit(1).alias() + s1 = ( + select([table1.c.col1, table1.c.col2, table2.c.col1]) + .limit(1) + .alias() + ) s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]).limit(1) u1 = union(s1, s2) - assert u1.corresponding_column( - s1.c._all_columns[0]) is u1.c._all_columns[0] + assert ( + u1.corresponding_column(s1.c._all_columns[0]) + is u1.c._all_columns[0] + ) assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -499,14 +539,29 @@ class SelectableTest( # like testaliasunion, but off a Select off the union. - u = select([table1.c.col1, + u = ( + select( + [ + table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, - null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, - null().label('colx'), table2.c.coly]) - ).alias('analias') + null().label("coly"), + ] + ) + .union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) + .alias("analias") + ) s = select([u]) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -517,14 +572,29 @@ class SelectableTest( # same as testunion, except its an alias of the union - u = select([table1.c.col1, + u = ( + select( + [ + table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, - null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, - null().label('colx'), table2.c.coly]) - ).alias('analias') + null().label("coly"), + ] + ) + .union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) + .alias("analias") + ) j1 = table1.join(table2) assert u.corresponding_column(j1.c.table1_colx) is u.c.colx assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx @@ -532,14 +602,14 @@ class SelectableTest( def test_join(self): a = join(table1, table2) print(str(a.select(use_labels=True))) - b = table2.alias('b') + b = table2.alias("b") j = join(a, b) print(str(j)) criterion = a.c.table1_col1 == b.c.col2 self.assert_(criterion.compare(j.onclause)) def test_select_alias(self): - a = table1.select().alias('a') + a = table1.select().alias("a") j = join(a, table2) criterion = a.c.col1 == table2.c.col2 @@ -562,15 +632,19 @@ class SelectableTest( is_(expr2.left, sel2) def test_column_labels(self): - a = select([table1.c.col1.label('acol1'), - table1.c.col2.label('acol2'), - table1.c.col3.label('acol3')]) + a = select( + [ + table1.c.col1.label("acol1"), + table1.c.col2.label("acol2"), + table1.c.col3.label("acol3"), + ] + ) j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) def test_labeled_select_correspoinding(self): - l1 = select([func.max(table1.c.col1)]).label('foo') + l1 = select([func.max(table1.c.col1)]).label("foo") s = select([l1]) eq_(s.corresponding_column(l1), s.c.foo) @@ -579,7 +653,7 @@ class SelectableTest( eq_(s.corresponding_column(l1), s.c.foo) def test_select_alias_labels(self): - a = table2.select(use_labels=True).alias('a') + a = table2.select(use_labels=True).alias("a") j = join(a, table1) criterion = table1.c.col1 == a.c.table2_col2 @@ -587,14 +661,13 @@ class SelectableTest( def test_table_joined_to_select_of_table(self): metadata = MetaData() - a = Table('a', metadata, - Column('id', Integer, primary_key=True)) + a = Table("a", metadata, Column("id", Integer, primary_key=True)) - j2 = select([a.c.id.label('aid')]).alias('bar') + j2 = select([a.c.id.label("aid")]).alias("bar") j3 = a.join(j2, j2.c.aid == a.c.id) - j4 = select([j3]).alias('foo') + j4 = select([j3]).alias("foo") assert j4.corresponding_column(j2.c.aid) is j4.c.aid assert j4.corresponding_column(a.c.id) is j4.c.id @@ -602,9 +675,9 @@ class SelectableTest( m = MetaData() m2 = MetaData() - t1 = Table('t1', m, Column('id', Integer), Column('id2', Integer)) - t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'))) - t3 = Table('t3', m2, Column('id', Integer, ForeignKey('t1.id2'))) + t1 = Table("t1", m, Column("id", Integer), Column("id2", Integer)) + t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id"))) + t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2"))) s = select([t2, t3], use_labels=True) @@ -612,24 +685,24 @@ class SelectableTest( def test_multi_label_chain_naming_col(self): # See [ticket:2167] for this one. - l1 = table1.c.col1.label('a') - l2 = select([l1]).label('b') + l1 = table1.c.col1.label("a") + l2 = select([l1]).label("b") s = select([l2]) assert s.c.b is not None self.assert_compile( s.select(), - "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)" + "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)", ) - s2 = select([s.label('c')]) + s2 = select([s.label("c")]) self.assert_compile( s2.select(), "SELECT c FROM (SELECT (SELECT (" - "SELECT table1.col1 AS a FROM table1) AS b) AS c)" + "SELECT table1.col1 AS a FROM table1) AS b) AS c)", ) def test_self_referential_select_raises(self): - t = table('t', column('x')) + t = table("t", column("x")) s = select([t]) @@ -637,140 +710,110 @@ class SelectableTest( assert_raises_message( exc.InvalidRequestError, r"select\(\) construct refers to itself as a FROM", - s.compile + s.compile, ) def test_unusual_column_elements_text(self): """test that .c excludes text().""" s = select([table1.c.col1, text("foo")]) - eq_( - list(s.c), - [s.c.col1] - ) + eq_(list(s.c), [s.c.col1]) def test_unusual_column_elements_clauselist(self): """Test that raw ClauseList is expanded into .c.""" from sqlalchemy.sql.expression import ClauseList + s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)]) - eq_( - list(s.c), - [s.c.col1, s.c.col2, s.c.col3] - ) + eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3]) def test_unusual_column_elements_boolean_clauselist(self): """test that BooleanClauseList is placed as single element in .c.""" c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4) s = select([table1.c.col1, c2]) - eq_( - list(s.c), - [s.c.col1, s.corresponding_column(c2)] - ) + eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)]) def test_from_list_deferred_constructor(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]) - t = Table('t', MetaData(), c1, c2) + t = Table("t", MetaData(), c1, c2) eq_(c1._from_objects, [t]) eq_(c2._from_objects, [t]) - self.assert_compile(select([c1]), - "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), - "SELECT t.c2 FROM t") + self.assert_compile(select([c1]), "SELECT t.c1 FROM t") + self.assert_compile(select([c2]), "SELECT t.c2 FROM t") def test_from_list_deferred_whereclause(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]).where(c1 == 5) - t = Table('t', MetaData(), c1, c2) + t = Table("t", MetaData(), c1, c2) eq_(c1._from_objects, [t]) eq_(c2._from_objects, [t]) - self.assert_compile(select([c1]), - "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), - "SELECT t.c2 FROM t") + self.assert_compile(select([c1]), "SELECT t.c1 FROM t") + self.assert_compile(select([c2]), "SELECT t.c2 FROM t") def test_from_list_deferred_fromlist(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer)) + t1 = Table("t1", m, Column("x", Integer)) - c1 = Column('c1', Integer) + c1 = Column("c1", Integer) s = select([c1]).where(c1 == 5).select_from(t1) - t2 = Table('t2', MetaData(), c1) + t2 = Table("t2", MetaData(), c1) eq_(c1._from_objects, [t2]) - self.assert_compile(select([c1]), - "SELECT t2.c1 FROM t2") + self.assert_compile(select([c1]), "SELECT t2.c1 FROM t2") def test_from_list_deferred_cloning(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]) s2 = select([c2]) s3 = sql_util.ClauseAdapter(s).traverse(s2) - Table('t', MetaData(), c1, c2) + Table("t", MetaData(), c1, c2) - self.assert_compile( - s3, - "SELECT t.c2 FROM t" - ) + self.assert_compile(s3, "SELECT t.c2 FROM t") def test_from_list_with_columns(self): - table1 = table('t1', column('a')) - table2 = table('t2', column('b')) + table1 = table("t1", column("a")) + table2 = table("t2", column("b")) s1 = select([table1.c.a, table2.c.b]) - self.assert_compile(s1, - "SELECT t1.a, t2.b FROM t1, t2" - ) + self.assert_compile(s1, "SELECT t1.a, t2.b FROM t1, t2") s2 = s1.with_only_columns([table2.c.b]) - self.assert_compile(s2, - "SELECT t2.b FROM t2" - ) + self.assert_compile(s2, "SELECT t2.b FROM t2") s3 = sql_util.ClauseAdapter(table1).traverse(s1) - self.assert_compile(s3, - "SELECT t1.a, t2.b FROM t1, t2" - ) + self.assert_compile(s3, "SELECT t1.a, t2.b FROM t1, t2") s4 = s3.with_only_columns([table2.c.b]) - self.assert_compile(s4, - "SELECT t2.b FROM t2" - ) + self.assert_compile(s4, "SELECT t2.b FROM t2") def test_from_list_warning_against_existing(self): - c1 = Column('c1', Integer) + c1 = Column("c1", Integer) s = select([c1]) # force a compile. - self.assert_compile( - s, - "SELECT c1" - ) + self.assert_compile(s, "SELECT c1") - Table('t', MetaData(), c1) + Table("t", MetaData(), c1) - self.assert_compile( - s, - "SELECT t.c1 FROM t" - ) + self.assert_compile(s, "SELECT t.c1 FROM t") def test_from_list_recovers_after_warning(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]) @@ -779,7 +822,8 @@ class SelectableTest( @testing.emits_warning() def go(): - return Table('t', MetaData(), c1, c2) + return Table("t", MetaData(), c1, c2) + t = go() eq_(c1._from_objects, [t]) @@ -793,175 +837,175 @@ class SelectableTest( self.assert_compile(select([c2]), "SELECT t.c2 FROM t") def test_label_gen_resets_on_table(self): - c1 = Column('c1', Integer) + c1 = Column("c1", Integer) eq_(c1._label, "c1") - Table('t1', MetaData(), c1) + Table("t1", MetaData(), c1) eq_(c1._label, "t1_c1") class RefreshForNewColTest(fixtures.TestBase): - def test_join_uninit(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) j = a.join(b, a.c.x == b.c.y) - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_join_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) j = a.join(b, a.c.x == b.c.y) j.c - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_join_samename_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) j = a.join(b, a.c.x == b.c.y) j.c - q = column('x') + q = column("x") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_x is q def test_select_samename_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) s = select([a, b]).apply_labels() s.c - q = column('x') + q = column("x") b.append_column(q) s._refresh_for_new_column(q) assert q in s.c.b_x.proxy_set def test_aliased_select_samename_uninit(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) s = select([a, b]).apply_labels().alias() - q = column('x') + q = column("x") b.append_column(q) s._refresh_for_new_column(q) assert q in s.c.b_x.proxy_set def test_aliased_select_samename_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) s = select([a, b]).apply_labels().alias() s.c - q = column('x') + q = column("x") b.append_column(q) s._refresh_for_new_column(q) assert q in s.c.b_x.proxy_set def test_aliased_select_irrelevant(self): - a = table('a', column('x')) - b = table('b', column('y')) - c = table('c', column('z')) + a = table("a", column("x")) + b = table("b", column("y")) + c = table("c", column("z")) s = select([a, b]).apply_labels().alias() s.c - q = column('x') + q = column("x") c.append_column(q) s._refresh_for_new_column(q) - assert 'c_x' not in s.c + assert "c_x" not in s.c def test_aliased_select_no_cols_clause(self): - a = table('a', column('x')) + a = table("a", column("x")) s = select([a.c.x]).apply_labels().alias() s.c - q = column('q') + q = column("q") a.append_column(q) s._refresh_for_new_column(q) - assert 'a_q' not in s.c + assert "a_q" not in s.c def test_union_uninit(self): - a = table('a', column('x')) + a = table("a", column("x")) s1 = select([a]) s2 = select([a]) s3 = s1.union(s2) - q = column('q') + q = column("q") a.append_column(q) s3._refresh_for_new_column(q) assert a.c.q in s3.c.q.proxy_set def test_union_init_raises(self): - a = table('a', column('x')) + a = table("a", column("x")) s1 = select([a]) s2 = select([a]) s3 = s1.union(s2) s3.c - q = column('q') + q = column("q") a.append_column(q) assert_raises_message( NotImplementedError, "CompoundSelect constructs don't support addition of " "columns to underlying selectables", - s3._refresh_for_new_column, q + s3._refresh_for_new_column, + q, ) def test_nested_join_uninit(self): - a = table('a', column('x')) - b = table('b', column('y')) - c = table('c', column('z')) + a = table("a", column("x")) + b = table("b", column("y")) + c = table("c", column("z")) j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z) - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_nested_join_init(self): - a = table('a', column('x')) - b = table('b', column('y')) - c = table('c', column('z')) + a = table("a", column("x")) + b = table("b", column("y")) + c = table("c", column("z")) j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z) j.c - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_fk_table(self): m = MetaData() - fk = ForeignKey('x.id') - Table('x', m, Column('id', Integer)) - a = Table('a', m, Column('x', Integer, fk)) + fk = ForeignKey("x.id") + Table("x", m, Column("id", Integer)) + a = Table("a", m, Column("x", Integer, fk)) a.c - q = Column('q', Integer) + q = Column("q", Integer) a.append_column(q) a._refresh_for_new_column(q) eq_(a.foreign_keys, set([fk])) - fk2 = ForeignKey('g.id') - p = Column('p', Integer, fk2) + fk2 = ForeignKey("g.id") + p = Column("p", Integer, fk2) a.append_column(p) a._refresh_for_new_column(p) eq_(a.foreign_keys, set([fk, fk2])) def test_fk_join(self): m = MetaData() - fk = ForeignKey('x.id') - Table('x', m, Column('id', Integer)) - a = Table('a', m, Column('x', Integer, fk)) - b = Table('b', m, Column('y', Integer)) + fk = ForeignKey("x.id") + Table("x", m, Column("id", Integer)) + a = Table("a", m, Column("x", Integer, fk)) + b = Table("b", m, Column("y", Integer)) j = a.join(b, a.c.x == b.c.y) j.c - q = Column('q', Integer) + q = Column("q", Integer) b.append_column(q) j._refresh_for_new_column(q) eq_(j.foreign_keys, set([fk])) - fk2 = ForeignKey('g.id') - p = Column('p', Integer, fk2) + fk2 = ForeignKey("g.id") + p = Column("p", Integer, fk2) b.append_column(p) j._refresh_for_new_column(p) eq_(j.foreign_keys, set([fk, fk2])) @@ -972,18 +1016,18 @@ class AnonLabelTest(fixtures.TestBase): """Test behaviors fixed by [ticket:2168].""" def test_anon_labels_named_column(self): - c1 = column('x') + c1 = column("x") assert c1.label(None) is not c1 eq_(str(select([c1.label(None)])), "SELECT x AS x_1") def test_anon_labels_literal_column(self): - c1 = literal_column('x') + c1 = literal_column("x") assert c1.label(None) is not c1 eq_(str(select([c1.label(None)])), "SELECT x AS x_1") def test_anon_labels_func(self): - c1 = func.count('*') + c1 = func.count("*") assert c1.label(None) is not c1 eq_(str(select([c1])), "SELECT count(:count_2) AS count_1") @@ -992,76 +1036,76 @@ class AnonLabelTest(fixtures.TestBase): eq_(str(select([c1.label(None)])), "SELECT count(:count_2) AS count_1") def test_named_labels_named_column(self): - c1 = column('x') - eq_(str(select([c1.label('y')])), "SELECT x AS y") + c1 = column("x") + eq_(str(select([c1.label("y")])), "SELECT x AS y") def test_named_labels_literal_column(self): - c1 = literal_column('x') - eq_(str(select([c1.label('y')])), "SELECT x AS y") + c1 = literal_column("x") + eq_(str(select([c1.label("y")])), "SELECT x AS y") class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_flat_ok_on_non_join(self): - a = table('a', column('a')) + a = table("a", column("a")) s = a.select() self.assert_compile( s.alias(flat=True).select(), - "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1" + "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1", ) def test_join_alias(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( a.join(b, a.c.a == b.c.b).alias(), - "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b" + "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b", ) def test_join_standalone_alias(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( alias(a.join(b, a.c.a == b.c.b)), - "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b" + "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b", ) def test_join_alias_flat(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( a.join(b, a.c.a == b.c.b).alias(flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b" + "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b", ) def test_join_standalone_alias_flat(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( alias(a.join(b, a.c.a == b.c.b), flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b" + "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b", ) def test_composed_join_alias_flat(self): - a = table('a', column('a')) - b = table('b', column('b')) - c = table('c', column('c')) - d = table('d', column('d')) + a = table("a", column("a")) + b = table("b", column("b")) + c = table("c", column("c")) + d = table("d", column("d")) j1 = a.join(b, a.c.a == b.c.b) j2 = c.join(d, c.c.c == d.c.d) self.assert_compile( j1.join(j2, b.c.b == c.c.c).alias(flat=True), "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b JOIN " - "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c" + "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c", ) def test_composed_join_alias(self): - a = table('a', column('a')) - b = table('b', column('b')) - c = table('c', column('c')) - d = table('d', column('d')) + a = table("a", column("a")) + b = table("b", column("b")) + c = table("c", column("c")) + d = table("d", column("d")) j1 = a.join(b, a.c.a == b.c.b) j2 = c.join(d, c.c.c == d.c.d) @@ -1070,29 +1114,35 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT anon_1.a_a, anon_1.b_b, anon_1.c_c, anon_1.d_d " "FROM (SELECT a.a AS a_a, b.b AS b_b, c.c AS c_c, d.d AS d_d " "FROM a JOIN b ON a.a = b.b " - "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1" + "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1", ) class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_join_condition(self): m = MetaData() - t1 = Table('t1', m, Column('id', Integer)) - t2 = Table('t2', m, - Column('id', Integer), - Column('t1id', ForeignKey('t1.id'))) - t3 = Table('t3', m, - Column('id', Integer), - Column('t1id', ForeignKey('t1.id')), - Column('t2id', ForeignKey('t2.id'))) - t4 = Table('t4', m, Column('id', Integer), - Column('t2id', ForeignKey('t2.id'))) - t5 = Table('t5', m, - Column('t1id1', ForeignKey('t1.id')), - Column('t1id2', ForeignKey('t1.id')), - ) + t1 = Table("t1", m, Column("id", Integer)) + t2 = Table( + "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) + ) + t3 = Table( + "t3", + m, + Column("id", Integer), + Column("t1id", ForeignKey("t1.id")), + Column("t2id", ForeignKey("t2.id")), + ) + t4 = Table( + "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id")) + ) + t5 = Table( + "t5", + m, + Column("t1id1", ForeignKey("t1.id")), + Column("t1id2", ForeignKey("t1.id")), + ) t1t2 = t1.join(t2) t2t3 = t2.join(t3) @@ -1108,10 +1158,8 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id), ]: assert expected.compare( - sql_util.join_condition( - left, - right, - a_subset=a_subset)) + sql_util.join_condition(left, right, a_subset=a_subset) + ) # these are ambiguous, or have no joins for left, right, a_subset in [ @@ -1120,12 +1168,14 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t1, t4, None), (t1t2, t2t3, None), (t5, t1, None), - (t5.select(use_labels=True), t1, None) + (t5.select(use_labels=True), t1, None), ]: assert_raises( exc.ArgumentError, sql_util.join_condition, - left, right, a_subset=a_subset + left, + right, + a_subset=a_subset, ) als = t2t3.alias() @@ -1138,41 +1188,38 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t2t3, t4, t2t3.c.t2_id == t4.c.t2id), (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id), (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id), - (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id) + (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id), ]: - assert expected.compare( - left.join(right).onclause - ) + assert expected.compare(left.join(right).onclause) # these are right-nested joins j = t1t2.join(t2t3) assert j.onclause.compare(t2.c.id == t3.c.t2id) self.assert_compile( - j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN " - "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id") + j, + "t1 JOIN t2 ON t1.id = t2.t1id JOIN " + "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id", + ) st2t3 = t2t3.select(use_labels=True) j = t1t2.join(st2t3) assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id) self.assert_compile( - j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN " + j, + "t1 JOIN t2 ON t1.id = t2.t1id JOIN " "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, " "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id " - "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id") + "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id", + ) def test_join_multiple_equiv_fks(self): m = MetaData() - t1 = Table('t1', m, - Column('id', Integer, primary_key=True) - ) + t1 = Table("t1", m, Column("id", Integer, primary_key=True)) t2 = Table( - 't2', + "t2", m, - Column( - 't1id', - Integer, - ForeignKey('t1.id'), - ForeignKey('t1.id'))) + Column("t1id", Integer, ForeignKey("t1.id"), ForeignKey("t1.id")), + ) assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id) @@ -1181,35 +1228,43 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): # bounding the "good" column with two "bad" ones is so to # try to get coverage to get the "continue" statements # in the loop... - t1 = Table('t1', m, - Column('y', Integer, ForeignKey('t22.id')), - Column('x', Integer, ForeignKey('t2.id')), - Column('q', Integer, ForeignKey('t22.id')), - ) - t2 = Table('t2', m, Column('id', Integer)) + t1 = Table( + "t1", + m, + Column("y", Integer, ForeignKey("t22.id")), + Column("x", Integer, ForeignKey("t2.id")), + Column("q", Integer, ForeignKey("t22.id")), + ) + t2 = Table("t2", m, Column("id", Integer)) assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) def test_join_cond_no_such_unrelated_column(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')), - Column('y', Integer, ForeignKey('t3.q'))) - t2 = Table('t2', m, Column('id', Integer)) - Table('t3', m, Column('id', Integer)) + t1 = Table( + "t1", + m, + Column("x", Integer, ForeignKey("t2.id")), + Column("y", Integer, ForeignKey("t3.q")), + ) + t2 = Table("t2", m, Column("id", Integer)) + Table("t3", m, Column("id", Integer)) assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) def test_join_cond_no_such_related_table(self): m1 = MetaData() m2 = MetaData() - t1 = Table('t1', m1, Column('x', Integer, ForeignKey('t2.id'))) - t2 = Table('t2', m2, Column('id', Integer)) + t1 = Table("t1", m1, Column("x", Integer, ForeignKey("t2.id"))) + t2 = Table("t2", m2, Column("id", Integer)) assert_raises_message( exc.NoReferencedTableError, "Foreign key associated with column 't1.x' could not find " "table 't2' with which to generate a foreign key to " "target column 'id'", - sql_util.join_condition, t1, t2 + sql_util.join_condition, + t1, + t2, ) assert_raises_message( @@ -1217,19 +1272,23 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): "Foreign key associated with column 't1.x' could not find " "table 't2' with which to generate a foreign key to " "target column 'id'", - sql_util.join_condition, t2, t1 + sql_util.join_condition, + t2, + t1, ) def test_join_cond_no_such_related_column(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.q'))) - t2 = Table('t2', m, Column('id', Integer)) + t1 = Table("t1", m, Column("x", Integer, ForeignKey("t2.q"))) + t2 = Table("t2", m, Column("id", Integer)) assert_raises_message( exc.NoReferencedColumnError, "Could not initialize target column for " "ForeignKey 't2.q' on table 't1': " "table 't2' has no column named 'q'", - sql_util.join_condition, t1, t2 + sql_util.join_condition, + t1, + t2, ) assert_raises_message( @@ -1237,25 +1296,35 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): "Could not initialize target column for " "ForeignKey 't2.q' on table 't1': " "table 't2' has no column named 'q'", - sql_util.join_condition, t2, t1 + sql_util.join_condition, + t2, + t1, ) class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): - def test_join_pk_collapse_implicit(self): """test that redundant columns in a join get 'collapsed' into a minimal primary key, which is the root column along a chain of foreign key relationships.""" meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), - primary_key=True)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), - primary_key=True)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), - primary_key=True)) + a = Table("a", meta, Column("id", Integer, primary_key=True)) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + ) + c = Table( + "c", + meta, + Column("id", Integer, ForeignKey("b.id"), primary_key=True), + ) + d = Table( + "d", + meta, + Column("id", Integer, ForeignKey("c.id"), primary_key=True), + ) assert c.c.id.references(b.c.id) assert not d.c.id.references(a.c.id) assert list(a.join(b).primary_key) == [a.c.id] @@ -1271,43 +1340,60 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): explicit join conditions.""" meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), - Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), - primary_key=True), Column('x', Integer)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), - primary_key=True), Column('x', Integer)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), - primary_key=True), Column('x', Integer)) + a = Table( + "a", + meta, + Column("id", Integer, primary_key=True), + Column("x", Integer), + ) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + Column("x", Integer), + ) + c = Table( + "c", + meta, + Column("id", Integer, ForeignKey("b.id"), primary_key=True), + Column("x", Integer), + ) + d = Table( + "d", + meta, + Column("id", Integer, ForeignKey("c.id"), primary_key=True), + Column("x", Integer), + ) print(list(a.join(b, a.c.x == b.c.id).primary_key)) assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id] assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \ - == [a.c.id] - assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \ - == [b.c.id] - assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \ - == [b.c.id] + assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) == [a.c.id] + assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) == [b.c.id] + assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) == [b.c.id] + assert list( + d.join(b, d.c.id == b.c.id).join(c, b.c.id == c.c.x).primary_key + ) == [b.c.id] assert list( - d.join( - b, - d.c.id == b.c.id).join( - c, - b.c.id == c.c.x).primary_key) == [ - b.c.id] - assert list(a.join(b).join(c, c.c.id - == b.c.x).join(d).primary_key) == [a.c.id] - assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x - == b.c.id)).primary_key) == [a.c.id] + a.join(b).join(c, c.c.id == b.c.x).join(d).primary_key + ) == [a.c.id] + assert list( + a.join(b, and_(a.c.id == b.c.id, a.c.x == b.c.id)).primary_key + ) == [a.c.id] def test_init_doesnt_blowitaway(self): meta = MetaData() - a = Table('a', meta, - Column('id', Integer, primary_key=True), - Column('x', Integer)) - b = Table('b', meta, - Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('x', Integer)) + a = Table( + "a", + meta, + Column("id", Integer, primary_key=True), + Column("x", Integer), + ) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + Column("x", Integer), + ) j = a.join(b) assert list(j.primary_key) == [a.c.id] @@ -1317,12 +1403,18 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): def test_non_column_clause(self): meta = MetaData() - a = Table('a', meta, - Column('id', Integer, primary_key=True), - Column('x', Integer)) - b = Table('b', meta, - Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('x', Integer, primary_key=True)) + a = Table( + "a", + meta, + Column("id", Integer, primary_key=True), + Column("x", Integer), + ) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + Column("x", Integer, primary_key=True), + ) j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5)) assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j) @@ -1331,123 +1423,156 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): def test_onclause_direction(self): metadata = MetaData() - employee = Table('Employee', metadata, - Column('name', String(100)), - Column('id', Integer, primary_key=True), - ) + employee = Table( + "Employee", + metadata, + Column("name", String(100)), + Column("id", Integer, primary_key=True), + ) - engineer = Table('Engineer', metadata, - Column('id', Integer, - ForeignKey('Employee.id'), primary_key=True)) + engineer = Table( + "Engineer", + metadata, + Column("id", Integer, ForeignKey("Employee.id"), primary_key=True), + ) - eq_(util.column_set(employee.join(engineer, employee.c.id - == engineer.c.id).primary_key), - util.column_set([employee.c.id])) - eq_(util.column_set(employee.join(engineer, engineer.c.id - == employee.c.id).primary_key), - util.column_set([employee.c.id])) + eq_( + util.column_set( + employee.join( + engineer, employee.c.id == engineer.c.id + ).primary_key + ), + util.column_set([employee.c.id]), + ) + eq_( + util.column_set( + employee.join( + engineer, engineer.c.id == employee.c.id + ).primary_key + ), + util.column_set([employee.c.id]), + ) class ReduceTest(fixtures.TestBase, AssertsExecutionResults): - def test_reduce(self): meta = MetaData() - t1 = Table('t1', meta, - Column('t1id', Integer, primary_key=True), - Column('t1data', String(30))) + t1 = Table( + "t1", + meta, + Column("t1id", Integer, primary_key=True), + Column("t1data", String(30)), + ) t2 = Table( - 't2', + "t2", meta, - Column( - 't2id', - Integer, - ForeignKey('t1.t1id'), - primary_key=True), - Column( - 't2data', - String(30))) + Column("t2id", Integer, ForeignKey("t1.t1id"), primary_key=True), + Column("t2data", String(30)), + ) t3 = Table( - 't3', + "t3", meta, - Column( - 't3id', - Integer, - ForeignKey('t2.t2id'), - primary_key=True), - Column( - 't3data', - String(30))) - - eq_(util.column_set(sql_util.reduce_columns([ - t1.c.t1id, - t1.c.t1data, - t2.c.t2id, - t2.c.t2data, - t3.c.t3id, - t3.c.t3data, - ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, - t3.c.t3data])) + Column("t3id", Integer, ForeignKey("t2.t2id"), primary_key=True), + Column("t3data", String(30)), + ) + + eq_( + util.column_set( + sql_util.reduce_columns( + [ + t1.c.t1id, + t1.c.t1data, + t2.c.t2id, + t2.c.t2data, + t3.c.t3id, + t3.c.t3data, + ] + ) + ), + util.column_set( + [t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data] + ), + ) def test_reduce_selectable(self): metadata = MetaData() - engineers = Table('engineers', metadata, - Column('engineer_id', Integer, primary_key=True), - Column('engineer_name', String(50))) - managers = Table('managers', metadata, - Column('manager_id', Integer, primary_key=True), - Column('manager_name', String(50))) - s = select([engineers, - managers]).where(engineers.c.engineer_name - == managers.c.manager_name) - eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), - util.column_set([s.c.engineer_id, s.c.engineer_name, - s.c.manager_id])) + engineers = Table( + "engineers", + metadata, + Column("engineer_id", Integer, primary_key=True), + Column("engineer_name", String(50)), + ) + managers = Table( + "managers", + metadata, + Column("manager_id", Integer, primary_key=True), + Column("manager_name", String(50)), + ) + s = select([engineers, managers]).where( + engineers.c.engineer_name == managers.c.manager_name + ) + eq_( + util.column_set(sql_util.reduce_columns(list(s.c), s)), + util.column_set( + [s.c.engineer_id, s.c.engineer_name, s.c.manager_id] + ), + ) def test_reduce_generation(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer)) - t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')), - Column('q', Integer)) + t1 = Table( + "t1", + m, + Column("x", Integer, primary_key=True), + Column("y", Integer), + ) + t2 = Table( + "t2", + m, + Column("z", Integer, ForeignKey("t1.x")), + Column("q", Integer), + ) s1 = select([t1, t2]) s2 = s1.reduce_columns(only_synonyms=False) - eq_( - set(s2.inner_columns), - set([t1.c.x, t1.c.y, t2.c.q]) - ) + eq_(set(s2.inner_columns), set([t1.c.x, t1.c.y, t2.c.q])) s2 = s1.reduce_columns() - eq_( - set(s2.inner_columns), - set([t1.c.x, t1.c.y, t2.c.z, t2.c.q]) - ) + eq_(set(s2.inner_columns), set([t1.c.x, t1.c.y, t2.c.z, t2.c.q])) def test_reduce_only_synonym_fk(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer)) - t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), - Column('q', Integer, ForeignKey('t1.y'))) + t1 = Table( + "t1", + m, + Column("x", Integer, primary_key=True), + Column("y", Integer), + ) + t2 = Table( + "t2", + m, + Column("x", Integer, ForeignKey("t1.x")), + Column("q", Integer, ForeignKey("t1.y")), + ) s1 = select([t1, t2]) s1 = s1.reduce_columns(only_synonyms=True) - eq_( - set(s1.c), - set([s1.c.x, s1.c.y, s1.c.q]) - ) + eq_(set(s1.c), set([s1.c.x, s1.c.y, s1.c.q])) def test_reduce_only_synonym_lineage(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer), - Column('z', Integer) - ) + t1 = Table( + "t1", + m, + Column("x", Integer, primary_key=True), + Column("y", Integer), + Column("z", Integer), + ) # test that the first appearance in the columns clause # wins - t1 is first, t1.c.x wins s1 = select([t1]) s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( set(s2.reduce_columns().inner_columns), - set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) + set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]), ) # reverse order, s1.c.x wins @@ -1455,91 +1580,129 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( set(s2.reduce_columns().inner_columns), - set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) + set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]), ) def test_reduce_aliased_join(self): metadata = MetaData() people = Table( - 'people', metadata, Column( - 'person_id', Integer, Sequence( - 'person_id_seq', optional=True), primary_key=True), Column( - 'name', String(50)), Column( - 'type', String(30))) + "people", + metadata, + Column( + "person_id", + Integer, + Sequence("person_id_seq", optional=True), + primary_key=True, + ), + Column("name", String(50)), + Column("type", String(30)), + ) engineers = Table( - 'engineers', + "engineers", metadata, - Column('person_id', Integer, ForeignKey('people.person_id' - ), primary_key=True), - Column('status', String(30)), - Column('engineer_name', String(50)), - Column('primary_language', String(50)), + Column( + "person_id", + Integer, + ForeignKey("people.person_id"), + primary_key=True, + ), + Column("status", String(30)), + Column("engineer_name", String(50)), + Column("primary_language", String(50)), ) managers = Table( - 'managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), - primary_key=True), - Column('status', String(30)), - Column('manager_name', String(50))) - pjoin = \ - people.outerjoin(engineers).outerjoin(managers).\ - select(use_labels=True).alias('pjoin' - ) - eq_(util.column_set(sql_util.reduce_columns( - [pjoin.c.people_person_id, pjoin.c.engineers_person_id, - pjoin.c.managers_person_id])), - util.column_set([pjoin.c.people_person_id])) + "managers", + metadata, + Column( + "person_id", + Integer, + ForeignKey("people.person_id"), + primary_key=True, + ), + Column("status", String(30)), + Column("manager_name", String(50)), + ) + pjoin = ( + people.outerjoin(engineers) + .outerjoin(managers) + .select(use_labels=True) + .alias("pjoin") + ) + eq_( + util.column_set( + sql_util.reduce_columns( + [ + pjoin.c.people_person_id, + pjoin.c.engineers_person_id, + pjoin.c.managers_person_id, + ] + ) + ), + util.column_set([pjoin.c.people_person_id]), + ) def test_reduce_aliased_union(self): metadata = MetaData() item_table = Table( - 'item', + "item", metadata, Column( - 'id', - Integer, - ForeignKey('base_item.id'), - primary_key=True), - Column( - 'dummy', - Integer, - default=0)) + "id", Integer, ForeignKey("base_item.id"), primary_key=True + ), + Column("dummy", Integer, default=0), + ) base_item_table = Table( - 'base_item', metadata, Column( - 'id', Integer, primary_key=True), Column( - 'child_name', String(255), default=None)) + "base_item", + metadata, + Column("id", Integer, primary_key=True), + Column("child_name", String(255), default=None), + ) from sqlalchemy.orm.util import polymorphic_union - item_join = polymorphic_union({ - 'BaseItem': - base_item_table.select( - base_item_table.c.child_name - == 'BaseItem'), - 'Item': base_item_table.join(item_table)}, - None, 'item_join') - eq_(util.column_set(sql_util.reduce_columns([item_join.c.id, - item_join.c.dummy, - item_join.c.child_name])), - util.column_set([item_join.c.id, - item_join.c.dummy, - item_join.c.child_name])) + + item_join = polymorphic_union( + { + "BaseItem": base_item_table.select( + base_item_table.c.child_name == "BaseItem" + ), + "Item": base_item_table.join(item_table), + }, + None, + "item_join", + ) + eq_( + util.column_set( + sql_util.reduce_columns( + [item_join.c.id, item_join.c.dummy, item_join.c.child_name] + ) + ), + util.column_set( + [item_join.c.id, item_join.c.dummy, item_join.c.child_name] + ), + ) def test_reduce_aliased_union_2(self): metadata = MetaData() - page_table = Table('page', metadata, Column('id', Integer, - primary_key=True)) - magazine_page_table = Table('magazine_page', metadata, - Column('page_id', Integer, - ForeignKey('page.id'), - primary_key=True)) + page_table = Table( + "page", metadata, Column("id", Integer, primary_key=True) + ) + magazine_page_table = Table( + "magazine_page", + metadata, + Column( + "page_id", Integer, ForeignKey("page.id"), primary_key=True + ), + ) classified_page_table = Table( - 'classified_page', + "classified_page", metadata, Column( - 'magazine_page_id', + "magazine_page_id", Integer, - ForeignKey('magazine_page.page_id'), - primary_key=True)) + ForeignKey("magazine_page.page_id"), + primary_key=True, + ), + ) # this is essentially the union formed by the ORM's # polymorphic_union function. we define two versions with @@ -1549,25 +1712,33 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): # classified_page.magazine_page_id pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]). - select_from( - page_table.join(magazine_page_table). - join(classified_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]). - select_from(page_table.join(magazine_page_table)) - ).alias('pjoin') - eq_(util.column_set(sql_util.reduce_columns( - [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id])) + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id, + ] + ).select_from( + page_table.join(magazine_page_table).join( + classified_page_table + ) + ), + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label("magazine_page_id"), + ] + ).select_from(page_table.join(magazine_page_table)), + ).alias("pjoin") + eq_( + util.column_set( + sql_util.reduce_columns( + [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id] + ) + ), + util.column_set([pjoin.c.id]), + ) # the first selectable has a CAST, which is a placeholder for # classified_page.magazine_page_id in the second selectable. @@ -1576,45 +1747,70 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): # currently makes the external column look like that of the # first selectable only. - pjoin = union(select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]). - select_from(page_table.join(magazine_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]). - select_from(page_table.join(magazine_page_table). - join(classified_page_table)) - ).alias('pjoin') - eq_(util.column_set(sql_util.reduce_columns( - [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id])) + pjoin = union( + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label("magazine_page_id"), + ] + ).select_from(page_table.join(magazine_page_table)), + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id, + ] + ).select_from( + page_table.join(magazine_page_table).join( + classified_page_table + ) + ), + ).alias("pjoin") + eq_( + util.column_set( + sql_util.reduce_columns( + [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id] + ) + ), + util.column_set([pjoin.c.id]), + ) class DerivedTest(fixtures.TestBase, AssertsExecutionResults): - def test_table(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) + t1 = Table( + "t1", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) + t2 = Table( + "t2", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) assert t1.is_derived_from(t1) assert not t2.is_derived_from(t1) def test_alias(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) + t1 = Table( + "t1", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) + t2 = Table( + "t2", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) assert t1.alias().is_derived_from(t1) assert not t2.alias().is_derived_from(t1) @@ -1624,44 +1820,43 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults): def test_select(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) + t1 = Table( + "t1", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) + t2 = Table( + "t2", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) assert t1.select().is_derived_from(t1) assert not t2.select().is_derived_from(t1) assert select([t1, t2]).is_derived_from(t1) - assert t1.select().alias('foo').is_derived_from(t1) - assert select([t1, t2]).alias('foo').is_derived_from(t1) - assert not t2.select().alias('foo').is_derived_from(t1) + assert t1.select().alias("foo").is_derived_from(t1) + assert select([t1, t2]).alias("foo").is_derived_from(t1) + assert not t2.select().alias("foo").is_derived_from(t1) class AnnotationsTest(fixtures.TestBase): - def test_hashing(self): - t = table('t', column('x')) + t = table("t", column("x")) a = t.alias() s = t.select() s2 = a.select() - for obj in [ - t, - t.c.x, - a, - s, - s2, - t.c.x > 1, - (t.c.x > 1).label(None) - ]: + for obj in [t, t.c.x, a, s, s2, t.c.x > 1, (t.c.x > 1).label(None)]: annot = obj._annotate({}) eq_(set([obj]), set([annot])) def test_compare(self): - t = table('t', column('x'), column('y')) + t = table("t", column("x"), column("y")) x_a = t.c.x._annotate({}) assert t.c.x.compare(x_a) assert x_a.compare(t.c.x) @@ -1681,40 +1876,44 @@ class AnnotationsTest(fixtures.TestBase): def test_late_name_add(self): from sqlalchemy.schema import Column + c1 = Column(Integer) c1_a = c1._annotate({"foo": "bar"}) - c1.name = 'somename' - eq_(c1_a.name, 'somename') + c1.name = "somename" + eq_(c1_a.name, "somename") def test_late_table_add(self): c1 = Column("foo", Integer) c1_a = c1._annotate({"foo": "bar"}) - t = Table('t', MetaData(), c1) + t = Table("t", MetaData(), c1) is_(c1_a.table, t) def test_basic_attrs(self): - t = Table('t', MetaData(), - Column('x', Integer, info={'q': 'p'}), - Column('y', Integer, key='q')) + t = Table( + "t", + MetaData(), + Column("x", Integer, info={"q": "p"}), + Column("y", Integer, key="q"), + ) x_a = t.c.x._annotate({}) y_a = t.c.q._annotate({}) - t.c.x.info['z'] = 'h' + t.c.x.info["z"] = "h" - eq_(y_a.key, 'q') + eq_(y_a.key, "q") is_(x_a.table, t) - eq_(x_a.info, {'q': 'p', 'z': 'h'}) + eq_(x_a.info, {"q": "p", "z": "h"}) eq_(t.c.x.anon_label, x_a.anon_label) def test_custom_constructions(self): from sqlalchemy.schema import Column class MyColumn(Column): - def __init__(self): - Column.__init__(self, 'foo', Integer) + Column.__init__(self, "foo", Integer) + _constructor = Column - t1 = Table('t1', MetaData(), MyColumn()) + t1 = Table("t1", MetaData(), MyColumn()) s1 = t1.select() assert isinstance(t1.c.foo, MyColumn) assert isinstance(s1.c.foo, Column) @@ -1734,8 +1933,9 @@ class AnnotationsTest(fixtures.TestBase): pass assert isinstance( - MyColumn('x', Integer)._annotate({"foo": "bar"}), - AnnotatedColumnElement) + MyColumn("x", Integer)._annotate({"foo": "bar"}), + AnnotatedColumnElement, + ) def test_custom_construction_correct_anno_expr(self): # [ticket:2918] @@ -1744,14 +1944,14 @@ class AnnotationsTest(fixtures.TestBase): class MyColumn(Column): pass - col = MyColumn('x', Integer) + col = MyColumn("x", Integer) binary_1 = col == 5 - col_anno = MyColumn('x', Integer)._annotate({"foo": "bar"}) + col_anno = MyColumn("x", Integer)._annotate({"foo": "bar"}) binary_2 = col_anno == 5 eq_(binary_2.left._annotations, {"foo": "bar"}) def test_annotated_corresponding_column(self): - table1 = table('table1', column("col1")) + table1 = table("table1", column("col1")) s1 = select([table1.c.col1]) t1 = s1._annotate({}) @@ -1766,93 +1966,107 @@ class AnnotationsTest(fixtures.TestBase): inner = select([s1]) - assert inner.corresponding_column( - t2.c.col1, - require_embedded=False) is inner.corresponding_column( - t2.c.col1, - require_embedded=True) is inner.c.col1 - assert inner.corresponding_column( - t1.c.col1, - require_embedded=False) is inner.corresponding_column( - t1.c.col1, - require_embedded=True) is inner.c.col1 + assert ( + inner.corresponding_column(t2.c.col1, require_embedded=False) + is inner.corresponding_column(t2.c.col1, require_embedded=True) + is inner.c.col1 + ) + assert ( + inner.corresponding_column(t1.c.col1, require_embedded=False) + is inner.corresponding_column(t1.c.col1, require_embedded=True) + is inner.c.col1 + ) def test_annotated_visit(self): - table1 = table('table1', column("col1"), column("col2")) + table1 = table("table1", column("col1"), column("col2")) - bin = table1.c.col1 == bindparam('foo', value=None) + bin = table1.c.col1 == bindparam("foo", value=None) assert str(bin) == "table1.col1 = :foo" def visit_binary(b): b.right = table1.c.col2 - b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary}) + b2 = visitors.cloned_traverse(bin, {}, {"binary": visit_binary}) assert str(b2) == "table1.col1 = table1.col2" - b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary': - visit_binary}) - assert str(b3) == 'table1.col1 = table1.col2' + b3 = visitors.cloned_traverse( + bin._annotate({}), {}, {"binary": visit_binary} + ) + assert str(b3) == "table1.col1 = table1.col2" def visit_binary(b): - b.left = bindparam('bar') + b.left = bindparam("bar") - b4 = visitors.cloned_traverse(b2, {}, {'binary': visit_binary}) + b4 = visitors.cloned_traverse(b2, {}, {"binary": visit_binary}) assert str(b4) == ":bar = table1.col2" - b5 = visitors.cloned_traverse(b3, {}, {'binary': visit_binary}) + b5 = visitors.cloned_traverse(b3, {}, {"binary": visit_binary}) assert str(b5) == ":bar = table1.col2" def test_label_accessors(self): - t1 = table('t1', column('c1')) + t1 = table("t1", column("c1")) l1 = t1.c.c1.label(None) is_(l1._order_by_label_element, l1) l1a = l1._annotate({"foo": "bar"}) is_(l1a._order_by_label_element, l1a) def test_annotate_aliased(self): - t1 = table('t1', column('c1')) - s = select([(t1.c.c1 + 3).label('bat')]) + t1 = table("t1", column("c1")) + s = select([(t1.c.c1 + 3).label("bat")]) a = s.alias() - a = sql_util._deep_annotate(a, {'foo': 'bar'}) - eq_(a._annotations['foo'], 'bar') - eq_(a.element._annotations['foo'], 'bar') + a = sql_util._deep_annotate(a, {"foo": "bar"}) + eq_(a._annotations["foo"], "bar") + eq_(a.element._annotations["foo"], "bar") def test_annotate_expressions(self): - table1 = table('table1', column('col1'), column('col2')) - for expr, expected in [(table1.c.col1, 'table1.col1'), - (table1.c.col1 == 5, - 'table1.col1 = :col1_1'), - (table1.c.col1.in_([2, 3, 4]), - 'table1.col1 IN (:col1_1, :col1_2, ' - ':col1_3)')]: + table1 = table("table1", column("col1"), column("col2")) + for expr, expected in [ + (table1.c.col1, "table1.col1"), + (table1.c.col1 == 5, "table1.col1 = :col1_1"), + ( + table1.c.col1.in_([2, 3, 4]), + "table1.col1 IN (:col1_1, :col1_2, " ":col1_3)", + ), + ]: eq_(str(expr), expected) eq_(str(expr._annotate({})), expected) eq_(str(sql_util._deep_annotate(expr, {})), expected) - eq_(str(sql_util._deep_annotate( - expr, {}, exclude=[table1.c.col1])), expected) + eq_( + str( + sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1]) + ), + expected, + ) def test_deannotate(self): - table1 = table('table1', column("col1"), column("col2")) + table1 = table("table1", column("col1"), column("col2")) - bin = table1.c.col1 == bindparam('foo', value=None) + bin = table1.c.col1 == bindparam("foo", value=None) - b2 = sql_util._deep_annotate(bin, {'_orm_adapt': True}) + b2 = sql_util._deep_annotate(bin, {"_orm_adapt": True}) b3 = sql_util._deep_deannotate(b2) b4 = sql_util._deep_deannotate(bin) for elem in (b2._annotations, b2.left._annotations): - assert '_orm_adapt' in elem + assert "_orm_adapt" in elem - for elem in b3._annotations, b3.left._annotations, \ - b4._annotations, b4.left._annotations: + for elem in ( + b3._annotations, + b3.left._annotations, + b4._annotations, + b4.left._annotations, + ): assert elem == {} assert b2.left is not bin.left assert b3.left is not b2.left and b2.left is not bin.left assert b4.left is bin.left # since column is immutable # deannotate copies the element - assert bin.right is not b2.right and b2.right is not b3.right \ + assert ( + bin.right is not b2.right + and b2.right is not b3.right and b3.right is not b4.right + ) def test_annotate_unique_traversal(self): """test that items are copied only once during @@ -1864,16 +2078,14 @@ class AnnotationsTest(fixtures.TestBase): case now, as deannotate is making clones again in some cases. """ - table1 = table('table1', column('x')) - table2 = table('table2', column('y')) + table1 = table("table1", column("x")) + table2 = table("table2", column("y")) a1 = table1.alias() - s = select([a1.c.x]).select_from( - a1.join(table2, a1.c.x == table2.c.y) - ) + s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y)) for sel in ( sql_util._deep_deannotate(s), visitors.cloned_traverse(s, {}, {}), - visitors.replacement_traverse(s, {}, lambda x: None) + visitors.replacement_traverse(s, {}, lambda x: None), ): # the columns clause isn't changed at all assert sel._raw_columns[0].table is a1 @@ -1886,7 +2098,7 @@ class AnnotationsTest(fixtures.TestBase): # when encountered. for sel in ( sql_util._deep_deannotate(s, {"foo": "bar"}), - sql_util._deep_annotate(s, {'foo': 'bar'}), + sql_util._deep_annotate(s, {"foo": "bar"}), ): assert sel._froms[0] is not sel._froms[1].left @@ -1899,7 +2111,7 @@ class AnnotationsTest(fixtures.TestBase): preserving them when deep_annotate is run on them. """ - t1 = table('table1', column("col1"), column("col2")) + t1 = table("table1", column("col1"), column("col2")) s = select([t1.c.col1._annotate({"foo": "bar"})]) s2 = select([t1.c.col1._annotate({"bat": "hoho"})]) s3 = s.union(s2) @@ -1907,42 +2119,40 @@ class AnnotationsTest(fixtures.TestBase): eq_( sel.selects[0]._raw_columns[0]._annotations, - {"foo": "bar", "new": "thing"} + {"foo": "bar", "new": "thing"}, ) eq_( sel.selects[1]._raw_columns[0]._annotations, - {"bat": "hoho", "new": "thing"} + {"bat": "hoho", "new": "thing"}, ) def test_deannotate_2(self): - table1 = table('table1', column("col1"), column("col2")) - j = table1.c.col1._annotate({"remote": True}) == \ - table1.c.col2._annotate({"local": True}) + table1 = table("table1", column("col1"), column("col2")) + j = table1.c.col1._annotate( + {"remote": True} + ) == table1.c.col2._annotate({"local": True}) j2 = sql_util._deep_deannotate(j) - eq_( - j.left._annotations, {"remote": True} - ) - eq_( - j2.left._annotations, {} - ) + eq_(j.left._annotations, {"remote": True}) + eq_(j2.left._annotations, {}) def test_deannotate_3(self): - table1 = table('table1', column("col1"), column("col2"), - column("col3"), column("col4")) + table1 = table( + "table1", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) j = and_( - table1.c.col1._annotate({"remote": True}) == - table1.c.col2._annotate({"local": True}), - table1.c.col3._annotate({"remote": True}) == - table1.c.col4._annotate({"local": True}) + table1.c.col1._annotate({"remote": True}) + == table1.c.col2._annotate({"local": True}), + table1.c.col3._annotate({"remote": True}) + == table1.c.col4._annotate({"local": True}), ) j2 = sql_util._deep_deannotate(j) - eq_( - j.clauses[0].left._annotations, {"remote": True} - ) - eq_( - j2.clauses[0].left._annotations, {} - ) + eq_(j.clauses[0].left._annotations, {"remote": True}) + eq_(j2.clauses[0].left._annotations, {}) def test_annotate_fromlist_preservation(self): """test the FROM list in select still works @@ -1952,37 +2162,34 @@ class AnnotationsTest(fixtures.TestBase): #2453, continued """ - table1 = table('table1', column('x')) - table2 = table('table2', column('y')) + table1 = table("table1", column("x")) + table2 = table("table2", column("y")) a1 = table1.alias() - s = select([a1.c.x]).select_from( - a1.join(table2, a1.c.x == table2.c.y) - ) + s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y)) assert_s = select([select([s])]) for fn in ( sql_util._deep_deannotate, - lambda s: sql_util._deep_annotate(s, {'foo': 'bar'}), + lambda s: sql_util._deep_annotate(s, {"foo": "bar"}), lambda s: visitors.cloned_traverse(s, {}, {}), - lambda s: visitors.replacement_traverse(s, {}, lambda x: None) + lambda s: visitors.replacement_traverse(s, {}, lambda x: None), ): sel = fn(select([fn(select([fn(s)]))])) eq_(str(assert_s), str(sel)) def test_bind_unique_test(self): - table('t', column('a'), column('b')) + table("t", column("a"), column("b")) b = bindparam("bind", value="x", unique=True) # the annotation of "b" should render the # same. The "unique" test in compiler should # also pass, [ticket:2425] - eq_(str(or_(b, b._annotate({"foo": "bar"}))), - ":bind_1 OR :bind_1") + eq_(str(or_(b, b._annotate({"foo": "bar"}))), ":bind_1 OR :bind_1") def test_comparators_cleaned_out_construction(self): - c = column('a') + c = column("a") comp1 = c.comparator @@ -1991,7 +2198,7 @@ class AnnotationsTest(fixtures.TestBase): assert comp1 is not comp2 def test_comparators_cleaned_out_reannotate(self): - c = column('a') + c = column("a") c1 = c._annotate({"foo": "bar"}) comp1 = c1.comparator @@ -2002,7 +2209,7 @@ class AnnotationsTest(fixtures.TestBase): assert comp1 is not comp2 def test_comparator_cleanout_integration(self): - c = column('a') + c = column("a") c1 = c._annotate({"foo": "bar"}) comp1 = c1.comparator @@ -2018,8 +2225,8 @@ class ReprTest(fixtures.TestBase): for obj in [ elements.Cast(1, 2), elements.TypeClause(String()), - elements.ColumnClause('x'), - elements.BindParameter('q'), + elements.ColumnClause("x"), + elements.BindParameter("q"), elements.Null(), elements.True_(), elements.False_(), @@ -2027,22 +2234,21 @@ class ReprTest(fixtures.TestBase): elements.BooleanClauseList.and_(), elements.Tuple(), elements.Case([]), - elements.Extract('foo', column('x')), - elements.UnaryExpression(column('x')), - elements.Grouping(column('x')), + elements.Extract("foo", column("x")), + elements.UnaryExpression(column("x")), + elements.Grouping(column("x")), elements.Over(func.foo()), - elements.Label('q', column('x')), + elements.Label("q", column("x")), ]: repr(obj) class WithLabelsTest(fixtures.TestBase): - def _assert_labels_warning(self, s): assert_raises_message( exc.SAWarning, r"replaced by Column.*, which has the same key", - lambda: s.c + lambda: s.c, ) def _assert_result_keys(self, s, keys): @@ -2055,147 +2261,128 @@ class WithLabelsTest(fixtures.TestBase): def _names_overlap(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer)) - t2 = Table('t2', m, Column('x', Integer)) + t1 = Table("t1", m, Column("x", Integer)) + t2 = Table("t2", m, Column("x", Integer)) return select([t1, t2]) def test_names_overlap_nolabel(self): sel = self._names_overlap() self._assert_labels_warning(sel) - self._assert_result_keys(sel, ['x']) + self._assert_result_keys(sel, ["x"]) def test_names_overlap_label(self): sel = self._names_overlap().apply_labels() - eq_( - list(sel.c.keys()), - ['t1_x', 't2_x'] - ) - self._assert_result_keys(sel, ['t1_x', 't2_x']) + eq_(list(sel.c.keys()), ["t1_x", "t2_x"]) + self._assert_result_keys(sel, ["t1_x", "t2_x"]) def _names_overlap_keys_dont(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, key='a')) - t2 = Table('t2', m, Column('x', Integer, key='b')) + t1 = Table("t1", m, Column("x", Integer, key="a")) + t2 = Table("t2", m, Column("x", Integer, key="b")) return select([t1, t2]) def test_names_overlap_keys_dont_nolabel(self): sel = self._names_overlap_keys_dont() - eq_( - list(sel.c.keys()), - ['a', 'b'] - ) - self._assert_result_keys(sel, ['x']) + eq_(list(sel.c.keys()), ["a", "b"]) + self._assert_result_keys(sel, ["x"]) def test_names_overlap_keys_dont_label(self): sel = self._names_overlap_keys_dont().apply_labels() - eq_( - list(sel.c.keys()), - ['t1_a', 't2_b'] - ) - self._assert_result_keys(sel, ['t1_x', 't2_x']) + eq_(list(sel.c.keys()), ["t1_a", "t2_b"]) + self._assert_result_keys(sel, ["t1_x", "t2_x"]) def _labels_overlap(self): m = MetaData() - t1 = Table('t', m, Column('x_id', Integer)) - t2 = Table('t_x', m, Column('id', Integer)) + t1 = Table("t", m, Column("x_id", Integer)) + t2 = Table("t_x", m, Column("id", Integer)) return select([t1, t2]) def test_labels_overlap_nolabel(self): sel = self._labels_overlap() - eq_( - list(sel.c.keys()), - ['x_id', 'id'] - ) - self._assert_result_keys(sel, ['x_id', 'id']) + eq_(list(sel.c.keys()), ["x_id", "id"]) + self._assert_result_keys(sel, ["x_id", "id"]) def test_labels_overlap_label(self): sel = self._labels_overlap().apply_labels() t2 = sel.froms[1] - eq_( - list(sel.c.keys()), - ['t_x_id', t2.c.id.anon_label] - ) - self._assert_result_keys(sel, ['t_x_id', 'id_1']) - self._assert_subq_result_keys(sel, ['t_x_id', 'id_1']) + eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label]) + self._assert_result_keys(sel, ["t_x_id", "id_1"]) + self._assert_subq_result_keys(sel, ["t_x_id", "id_1"]) def _labels_overlap_keylabels_dont(self): m = MetaData() - t1 = Table('t', m, Column('x_id', Integer, key='a')) - t2 = Table('t_x', m, Column('id', Integer, key='b')) + t1 = Table("t", m, Column("x_id", Integer, key="a")) + t2 = Table("t_x", m, Column("id", Integer, key="b")) return select([t1, t2]) def test_labels_overlap_keylabels_dont_nolabel(self): sel = self._labels_overlap_keylabels_dont() - eq_(list(sel.c.keys()), ['a', 'b']) - self._assert_result_keys(sel, ['x_id', 'id']) + eq_(list(sel.c.keys()), ["a", "b"]) + self._assert_result_keys(sel, ["x_id", "id"]) def test_labels_overlap_keylabels_dont_label(self): sel = self._labels_overlap_keylabels_dont().apply_labels() - eq_(list(sel.c.keys()), ['t_a', 't_x_b']) - self._assert_result_keys(sel, ['t_x_id', 'id_1']) + eq_(list(sel.c.keys()), ["t_a", "t_x_b"]) + self._assert_result_keys(sel, ["t_x_id", "id_1"]) def _keylabels_overlap_labels_dont(self): m = MetaData() - t1 = Table('t', m, Column('a', Integer, key='x_id')) - t2 = Table('t_x', m, Column('b', Integer, key='id')) + t1 = Table("t", m, Column("a", Integer, key="x_id")) + t2 = Table("t_x", m, Column("b", Integer, key="id")) return select([t1, t2]) def test_keylabels_overlap_labels_dont_nolabel(self): sel = self._keylabels_overlap_labels_dont() - eq_(list(sel.c.keys()), ['x_id', 'id']) - self._assert_result_keys(sel, ['a', 'b']) + eq_(list(sel.c.keys()), ["x_id", "id"]) + self._assert_result_keys(sel, ["a", "b"]) def test_keylabels_overlap_labels_dont_label(self): sel = self._keylabels_overlap_labels_dont().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ['t_x_id', t2.c.id.anon_label]) - self._assert_result_keys(sel, ['t_a', 't_x_b']) - self._assert_subq_result_keys(sel, ['t_a', 't_x_b']) + eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label]) + self._assert_result_keys(sel, ["t_a", "t_x_b"]) + self._assert_subq_result_keys(sel, ["t_a", "t_x_b"]) def _keylabels_overlap_labels_overlap(self): m = MetaData() - t1 = Table('t', m, Column('x_id', Integer, key='x_a')) - t2 = Table('t_x', m, Column('id', Integer, key='a')) + t1 = Table("t", m, Column("x_id", Integer, key="x_a")) + t2 = Table("t_x", m, Column("id", Integer, key="a")) return select([t1, t2]) def test_keylabels_overlap_labels_overlap_nolabel(self): sel = self._keylabels_overlap_labels_overlap() - eq_(list(sel.c.keys()), ['x_a', 'a']) - self._assert_result_keys(sel, ['x_id', 'id']) - self._assert_subq_result_keys(sel, ['x_id', 'id']) + eq_(list(sel.c.keys()), ["x_a", "a"]) + self._assert_result_keys(sel, ["x_id", "id"]) + self._assert_subq_result_keys(sel, ["x_id", "id"]) def test_keylabels_overlap_labels_overlap_label(self): sel = self._keylabels_overlap_labels_overlap().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ['t_x_a', t2.c.a.anon_label]) - self._assert_result_keys(sel, ['t_x_id', 'id_1']) - self._assert_subq_result_keys(sel, ['t_x_id', 'id_1']) + eq_(list(sel.c.keys()), ["t_x_a", t2.c.a.anon_label]) + self._assert_result_keys(sel, ["t_x_id", "id_1"]) + self._assert_subq_result_keys(sel, ["t_x_id", "id_1"]) def _keys_overlap_names_dont(self): m = MetaData() - t1 = Table('t1', m, Column('a', Integer, key='x')) - t2 = Table('t2', m, Column('b', Integer, key='x')) + t1 = Table("t1", m, Column("a", Integer, key="x")) + t2 = Table("t2", m, Column("b", Integer, key="x")) return select([t1, t2]) def test_keys_overlap_names_dont_nolabel(self): sel = self._keys_overlap_names_dont() self._assert_labels_warning(sel) - self._assert_result_keys(sel, ['a', 'b']) + self._assert_result_keys(sel, ["a", "b"]) def test_keys_overlap_names_dont_label(self): sel = self._keys_overlap_names_dont().apply_labels() - eq_( - list(sel.c.keys()), - ['t1_x', 't2_x'] - ) - self._assert_result_keys(sel, ['t1_a', 't2_b']) + eq_(list(sel.c.keys()), ["t1_x", "t2_x"]) + self._assert_result_keys(sel, ["t1_a", "t2_b"]) class ResultMapTest(fixtures.TestBase): - def _fixture(self): m = MetaData() - t = Table('t', m, Column('x', Integer), Column('y', Integer)) + t = Table("t", m, Column("x", Integer), Column("y", Integer)) return t def _mapping(self, stmt): @@ -2208,7 +2395,7 @@ class ResultMapTest(fixtures.TestBase): def test_select_label_alt_name(self): t = self._fixture() - l1, l2 = t.c.x.label('a'), t.c.y.label('b') + l1, l2 = t.c.x.label("a"), t.c.y.label("b") s = select([l1, l2]) mapping = self._mapping(s) assert l1 in mapping @@ -2217,7 +2404,7 @@ class ResultMapTest(fixtures.TestBase): def test_select_alias_label_alt_name(self): t = self._fixture() - l1, l2 = t.c.x.label('a'), t.c.y.label('b') + l1, l2 = t.c.x.label("a"), t.c.y.label("b") s = select([l1, l2]).alias() mapping = self._mapping(s) assert l1 in mapping @@ -2253,7 +2440,7 @@ class ResultMapTest(fixtures.TestBase): x, y = t.c.x, t.c.y ta = t.alias() - l1, l2 = ta.c.x.label('a'), ta.c.y.label('b') + l1, l2 = ta.c.x.label("a"), ta.c.y.label("b") s = select([l1, l2]) mapping = self._mapping(s) @@ -2268,34 +2455,40 @@ class ResultMapTest(fixtures.TestBase): assert t.c.x not in mapping eq_( [type(entry[-1]) for entry in s.compile()._result_columns], - [Boolean] + [Boolean], ) def test_plain_exists(self): expr = exists([1]) eq_(type(expr.type), Boolean) eq_( - [type(entry[-1]) for - entry in select([expr]).compile()._result_columns], - [Boolean] + [ + type(entry[-1]) + for entry in select([expr]).compile()._result_columns + ], + [Boolean], ) def test_plain_exists_negate(self): expr = ~exists([1]) eq_(type(expr.type), Boolean) eq_( - [type(entry[-1]) for - entry in select([expr]).compile()._result_columns], - [Boolean] + [ + type(entry[-1]) + for entry in select([expr]).compile()._result_columns + ], + [Boolean], ) def test_plain_exists_double_negate(self): expr = ~(~exists([1])) eq_(type(expr.type), Boolean) eq_( - [type(entry[-1]) for - entry in select([expr]).compile()._result_columns], - [Boolean] + [ + type(entry[-1]) + for entry in select([expr]).compile()._result_columns + ], + [Boolean], ) def test_column_subquery_plain(self): @@ -2307,7 +2500,7 @@ class ResultMapTest(fixtures.TestBase): assert s1 in mapping eq_( [type(entry[-1]) for entry in s2.compile()._result_columns], - [Integer] + [Integer], ) def test_unary_boolean(self): @@ -2315,7 +2508,7 @@ class ResultMapTest(fixtures.TestBase): s1 = select([not_(True)], use_labels=True) eq_( [type(entry[-1]) for entry in s1.compile()._result_columns], - [Boolean] + [Boolean], ) @@ -2323,19 +2516,15 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _assert_legacy(self, leg, read=False, nowait=False): - t = table('t', column('c')) + t = table("t", column("c")) s1 = select([t], for_update=leg) if leg is False: assert s1._for_update_arg is None assert s1.for_update is None else: - eq_( - s1._for_update_arg.read, read - ) - eq_( - s1._for_update_arg.nowait, nowait - ) + eq_(s1._for_update_arg.read, read) + eq_(s1._for_update_arg.nowait, nowait) eq_(s1.for_update, leg) def test_false_legacy(self): @@ -2354,28 +2543,30 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): self._assert_legacy("read_nowait", read=True, nowait=True) def test_legacy_setter(self): - t = table('t', column('c')) + t = table("t", column("c")) s = select([t]) - s.for_update = 'nowait' + s.for_update = "nowait" eq_(s._for_update_arg.nowait, True) def test_basic_clone(self): - t = table('t', column('c')) + t = table("t", column("c")) s = select([t]).with_for_update(read=True, of=t.c.c) s2 = visitors.ReplacingCloningVisitor().traverse(s) assert s2._for_update_arg is not s._for_update_arg eq_(s2._for_update_arg.read, True) eq_(s2._for_update_arg.of, [t.c.c]) - self.assert_compile(s2, - "SELECT t.c FROM t FOR SHARE OF t", - dialect="postgresql") + self.assert_compile( + s2, "SELECT t.c FROM t FOR SHARE OF t", dialect="postgresql" + ) def test_adapt(self): - t = table('t', column('c')) + t = table("t", column("c")) s = select([t]).with_for_update(read=True, of=t.c.c) a = t.alias() s2 = sql_util.ClauseAdapter(a).traverse(s) eq_(s2._for_update_arg.of, [a.c.c]) - self.assert_compile(s2, - "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", - dialect="postgresql") + self.assert_compile( + s2, + "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", + dialect="postgresql", + ) |
