diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-01-06 01:14:26 -0500 |
|---|---|---|
| committer | mike bayer <mike_mp@zzzcomputing.com> | 2019-01-06 17:34:50 +0000 |
| commit | 1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch) | |
| tree | 28e725c5c8188bd0cfd133d1e268dbca9b524978 /test/sql/test_selectable.py | |
| parent | 404e69426b05a82d905cbb3ad33adafccddb00dd (diff) | |
| download | sqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz | |
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits
applied at all.
The black run will format code consistently, however in
some cases that are prevalent in SQLAlchemy code it produces
too-long lines. The too-long lines will be resolved in the
following commit that will resolve all remaining flake8 issues
including shadowed builtins, long lines, import order, unused
imports, duplicate imports, and docstring issues.
Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'test/sql/test_selectable.py')
| -rw-r--r-- | test/sql/test_selectable.py | 1857 |
1 files changed, 1024 insertions, 833 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 4b92e3e3e..023a0bc61 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,10 +1,12 @@ """Test various algorithmic properties of selectables.""" -from sqlalchemy.testing import eq_, assert_raises, \ - assert_raises_message, is_ +from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, is_ from sqlalchemy import * -from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ - AssertsExecutionResults +from sqlalchemy.testing import ( + fixtures, + AssertsCompiledSQL, + AssertsExecutionResults, +) from sqlalchemy.sql import elements from sqlalchemy import testing from sqlalchemy.sql import util as sql_util, visitors, expression @@ -14,33 +16,37 @@ from sqlalchemy import util from sqlalchemy.schema import Column, Table, MetaData metadata = MetaData() -table1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Integer), - Column('colx', Integer), - - ) - -table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, ForeignKey('table1.col1')), - Column('col3', String(20)), - Column('coly', Integer), - ) - -keyed = Table('keyed', metadata, - Column('x', Integer, key='colx'), - Column('y', Integer, key='coly'), - Column('z', Integer), - ) +table1 = Table( + "table1", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", String(20)), + Column("col3", Integer), + Column("colx", Integer), +) + +table2 = Table( + "table2", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", Integer, ForeignKey("table1.col1")), + Column("col3", String(20)), + Column("coly", Integer), +) + +keyed = Table( + "keyed", + metadata, + Column("x", Integer, key="colx"), + Column("y", Integer, key="coly"), + Column("z", Integer), +) class SelectableTest( - fixtures.TestBase, - AssertsExecutionResults, - AssertsCompiledSQL): - __dialect__ = 'default' + fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL +): + __dialect__ = "default" def test_indirect_correspondence_on_labels(self): # this test depends upon 'distance' to @@ -48,8 +54,13 @@ class SelectableTest( # same column three times - s = select([table1.c.col1.label('c2'), table1.c.col1, - table1.c.col1.label('c1')]) + s = select( + [ + table1.c.col1.label("c2"), + table1.c.col1, + table1.c.col1.label("c1"), + ] + ) # this tests the same thing as # test_direct_correspondence_on_labels below - @@ -60,25 +71,25 @@ class SelectableTest( assert s.corresponding_column(s.c.c1) is s.c.c1 def test_labeled_subquery_twice(self): - scalar_select = select([table1.c.col1]).label('foo') + scalar_select = select([table1.c.col1]).label("foo") s1 = select([scalar_select]) s2 = select([scalar_select, scalar_select]) eq_( s1.c.foo.proxy_set, - set([s1.c.foo, scalar_select, scalar_select.element]) + set([s1.c.foo, scalar_select, scalar_select.element]), ) eq_( s2.c.foo.proxy_set, - set([s2.c.foo, scalar_select, scalar_select.element]) + set([s2.c.foo, scalar_select, scalar_select.element]), ) assert s1.corresponding_column(scalar_select) is s1.c.foo assert s2.corresponding_column(scalar_select) is s2.c.foo def test_label_grouped_still_corresponds(self): - label = select([table1.c.col1]).label('foo') + label = select([table1.c.col1]).label("foo") label2 = label.self_group() s1 = select([label]) @@ -90,14 +101,14 @@ class SelectableTest( # this test depends on labels being part # of the proxy set to get the right result - l1, l2 = table1.c.col1.label('foo'), table1.c.col1.label('bar') + l1, l2 = table1.c.col1.label("foo"), table1.c.col1.label("bar") sel = select([l1, l2]) sel2 = sel.alias() assert sel2.corresponding_column(l1) is sel2.c.foo assert sel2.corresponding_column(l2) is sel2.c.bar - sel2 = select([table1.c.col1.label('foo'), table1.c.col2.label('bar')]) + sel2 = select([table1.c.col1.label("foo"), table1.c.col2.label("bar")]) sel3 = sel.union(sel2).alias() assert sel3.corresponding_column(l1) is sel3.c.foo @@ -105,9 +116,9 @@ class SelectableTest( def test_keyed_gen(self): s = select([keyed]) - eq_(s.c.colx.key, 'colx') + eq_(s.c.colx.key, "colx") - eq_(s.c.colx.name, 'x') + eq_(s.c.colx.name, "x") assert s.corresponding_column(keyed.c.colx) is s.c.colx assert s.corresponding_column(keyed.c.coly) is s.c.coly @@ -131,26 +142,26 @@ class SelectableTest( assert sel2.corresponding_column(keyed.c.z) is sel2.c.keyed_z def test_keyed_c_collection_upper(self): - c = Column('foo', Integer, key='bar') - t = Table('t', MetaData(), c) + c = Column("foo", Integer, key="bar") + t = Table("t", MetaData(), c) is_(t.c.bar, c) def test_keyed_c_collection_lower(self): - c = column('foo') - c.key = 'bar' - t = table('t', c) + c = column("foo") + c.key = "bar" + t = table("t", c) is_(t.c.bar, c) def test_clone_c_proxy_key_upper(self): - c = Column('foo', Integer, key='bar') - t = Table('t', MetaData(), c) + c = Column("foo", Integer, key="bar") + t = Table("t", MetaData(), c) s = select([t])._clone() assert c in s.c.bar.proxy_set def test_clone_c_proxy_key_lower(self): - c = column('foo') - c.key = 'bar' - t = table('t', c) + c = column("foo") + c.key = "bar" + t = table("t", c) s = select([t])._clone() assert c in s.c.bar.proxy_set @@ -160,19 +171,16 @@ class SelectableTest( def myop(x, y): pass - t = table('t', column('x'), column('y')) + t = table("t", column("x"), column("y")) expr = BinaryExpression(t.c.x, t.c.y, myop) s = select([t, expr]) - eq_( - s.c.keys(), - ['x', 'y', expr.anon_label] - ) + eq_(s.c.keys(), ["x", "y", expr.anon_label]) def test_cloned_intersection(self): - t1 = table('t1', column('x')) - t2 = table('t2', column('x')) + t1 = table("t1", column("x")) + t2 = table("t2", column("x")) s1 = t1.select() s2 = t2.select() @@ -184,15 +192,13 @@ class SelectableTest( s3c1 = s3._clone() eq_( - expression._cloned_intersection( - [s1c1, s3c1], [s2c1, s1c2] - ), - set([s1c1]) + expression._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]), + set([s1c1]), ) def test_cloned_difference(self): - t1 = table('t1', column('x')) - t2 = table('t2', column('x')) + t1 = table("t1", column("x")) + t2 = table("t2", column("x")) s1 = t1.select() s2 = t2.select() @@ -205,75 +211,70 @@ class SelectableTest( s3c1 = s3._clone() eq_( - expression._cloned_difference( - [s1c1, s2c1, s3c1], [s2c1, s1c2] - ), - set([s3c1]) + expression._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]), + set([s3c1]), ) def test_distance_on_aliases(self): - a1 = table1.alias('a1') - for s in (select([a1, table1], use_labels=True), - select([table1, a1], use_labels=True)): - assert s.corresponding_column(table1.c.col1) \ - is s.c.table1_col1 + a1 = table1.alias("a1") + for s in ( + select([a1, table1], use_labels=True), + select([table1, a1], use_labels=True), + ): + assert s.corresponding_column(table1.c.col1) is s.c.table1_col1 assert s.corresponding_column(a1.c.col1) is s.c.a1_col1 def test_join_against_self(self): - jj = select([table1.c.col1.label('bar_col1')]) + jj = select([table1.c.col1.label("bar_col1")]) jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) # test column directly against itself - assert jjj.corresponding_column(jjj.c.table1_col1) \ - is jjj.c.table1_col1 + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 # test alias of the join - j2 = jjj.alias('foo') - assert j2.corresponding_column(table1.c.col1) \ - is j2.c.table1_col1 + j2 = jjj.alias("foo") + assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1 def test_clone_append_column(self): - sel = select([literal_column('1').label('a')]) - eq_(list(sel.c.keys()), ['a']) + sel = select([literal_column("1").label("a")]) + eq_(list(sel.c.keys()), ["a"]) cloned = visitors.ReplacingCloningVisitor().traverse(sel) - cloned.append_column(literal_column('2').label('b')) + cloned.append_column(literal_column("2").label("b")) cloned.append_column(func.foo()) - eq_(list(cloned.c.keys()), ['a', 'b', 'foo()']) + eq_(list(cloned.c.keys()), ["a", "b", "foo()"]) def test_append_column_after_replace_selectable(self): - basesel = select([literal_column('1').label('a')]) - tojoin = select([ - literal_column('1').label('a'), - literal_column('2').label('b') - ]) - basefrom = basesel.alias('basefrom') - joinfrom = tojoin.alias('joinfrom') + basesel = select([literal_column("1").label("a")]) + tojoin = select( + [literal_column("1").label("a"), literal_column("2").label("b")] + ) + basefrom = basesel.alias("basefrom") + joinfrom = tojoin.alias("joinfrom") sel = select([basefrom.c.a]) replaced = sel.replace_selectable( - basefrom, - basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a) + basefrom, basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a) ) self.assert_compile( replaced, "SELECT basefrom.a FROM (SELECT 1 AS a) AS basefrom " "JOIN (SELECT 1 AS a, 2 AS b) AS joinfrom " - "ON basefrom.a = joinfrom.a" + "ON basefrom.a = joinfrom.a", ) replaced.append_column(joinfrom.c.b) self.assert_compile( replaced, "SELECT basefrom.a, joinfrom.b FROM (SELECT 1 AS a) AS basefrom " "JOIN (SELECT 1 AS a, 2 AS b) AS joinfrom " - "ON basefrom.a = joinfrom.a" + "ON basefrom.a = joinfrom.a", ) def test_against_cloned_non_table(self): # test that corresponding column digs across # clone boundaries with anonymous labeled elements - col = func.count().label('foo') + col = func.count().label("foo") sel = select([col]) sel2 = visitors.ReplacingCloningVisitor().traverse(sel) @@ -287,14 +288,14 @@ class SelectableTest( self.assert_compile( s1.with_only_columns([s1]), "SELECT (SELECT table1.col1, table1.col2, " - "table1.col3, table1.colx FROM table1) AS anon_1" + "table1.col3, table1.colx FROM table1) AS anon_1", ) def test_type_coerce_preserve_subq(self): class MyType(TypeDecorator): impl = Integer - stmt = select([type_coerce(column('x'), MyType).label('foo')]) + stmt = select([type_coerce(column("x"), MyType).label("foo")]) stmt2 = stmt.select() assert isinstance(stmt._raw_columns[0].type, MyType) assert isinstance(stmt.c.foo.type, MyType) @@ -303,30 +304,32 @@ class SelectableTest( def test_select_on_table(self): sel = select([table1, table2], use_labels=True) - assert sel.corresponding_column(table1.c.col1) \ + assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1 + assert ( + sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1 - assert sel.corresponding_column( - table1.c.col1, - require_embedded=True) is sel.c.table1_col1 - assert table1.corresponding_column(sel.c.table1_col1) \ - is table1.c.col1 - assert table1.corresponding_column(sel.c.table1_col1, - require_embedded=True) is None + ) + assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1 + assert ( + table1.corresponding_column( + sel.c.table1_col1, require_embedded=True + ) + is None + ) def test_join_against_join(self): j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2) - jj = select([table1.c.col1.label('bar_col1')], - from_obj=[j]).alias('foo') + jj = select([table1.c.col1.label("bar_col1")], from_obj=[j]).alias( + "foo" + ) jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) - assert jjj.corresponding_column(jjj.c.table1_col1) \ - is jjj.c.table1_col1 - j2 = jjj.alias('foo') - assert j2.corresponding_column(jjj.c.table1_col1) \ - is j2.c.table1_col1 + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + j2 = jjj.alias("foo") + assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 def test_table_alias(self): - a = table1.alias('a') + a = table1.alias("a") j = join(a, table2) @@ -338,13 +341,13 @@ class SelectableTest( # prominent w/ PostgreSQL's tuple functions stmt = select([table1.c.col1, table1.c.col2]) - a = stmt.alias('a') + a = stmt.alias("a") self.assert_compile( select([func.foo(a)]), "SELECT foo(SELECT table1.col1, table1.col2 FROM table1) " "AS foo_1 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2 FROM table1) " - "AS a" + "AS a", ) def test_union(self): @@ -353,15 +356,25 @@ class SelectableTest( # with a certain Table, against a column in a Union where one of # its underlying Selects matches to that same Table - u = select([table1.c.col1, - table1.c.col2, - table1.c.col3, - table1.c.colx, - null().label('coly')]).union(select([table2.c.col1, - table2.c.col2, - table2.c.col3, - null().label('colx'), - table2.c.coly])) + u = select( + [ + table1.c.col1, + table1.c.col2, + table1.c.col3, + table1.c.colx, + null().label("coly"), + ] + ).union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -388,8 +401,10 @@ class SelectableTest( assert u1.corresponding_column(table1.c.col3) is u1.c.col1 def test_singular_union(self): - u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), - select([table1.c.col1, table1.c.col2, table1.c.col3])) + u = union( + select([table1.c.col1, table1.c.col2, table1.c.col3]), + select([table1.c.col1, table1.c.col2, table1.c.col3]), + ) u = union(select([table1.c.col1, table1.c.col2, table1.c.col3])) assert u.c.col1 is not None assert u.c.col2 is not None @@ -399,14 +414,29 @@ class SelectableTest( # same as testunion, except its an alias of the union - u = select([table1.c.col1, + u = ( + select( + [ + table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, - null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, - null().label('colx'), table2.c.coly]) - ).alias('analias') + null().label("coly"), + ] + ) + .union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) + .alias("analias") + ) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 @@ -429,7 +459,8 @@ class SelectableTest( def test_union_of_text(self): s1 = select([table1.c.col1, table1.c.col2]) s2 = text("select col1, col2 from foo").columns( - column('col1'), column('col2')) + column("col1"), column("col2") + ) u1 = union(s1, s2) assert u1.corresponding_column(s1.c.col1) is u1.c.col1 @@ -445,8 +476,10 @@ class SelectableTest( s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) u1 = union(s1, s2) - assert u1.corresponding_column( - s1.c._all_columns[0]) is u1.c._all_columns[0] + assert ( + u1.corresponding_column(s1.c._all_columns[0]) + is u1.c._all_columns[0] + ) assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -462,8 +495,10 @@ class SelectableTest( s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) u1 = union(s1, s2) - assert u1.corresponding_column( - s1.c._all_columns[0]) is u1.c._all_columns[0] + assert ( + u1.corresponding_column(s1.c._all_columns[0]) + is u1.c._all_columns[0] + ) assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -477,13 +512,18 @@ class SelectableTest( @testing.emits_warning("Column 'col1'") def test_union_alias_dupe_keys_grouped(self): - s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).\ - limit(1).alias() + s1 = ( + select([table1.c.col1, table1.c.col2, table2.c.col1]) + .limit(1) + .alias() + ) s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]).limit(1) u1 = union(s1, s2) - assert u1.corresponding_column( - s1.c._all_columns[0]) is u1.c._all_columns[0] + assert ( + u1.corresponding_column(s1.c._all_columns[0]) + is u1.c._all_columns[0] + ) assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -499,14 +539,29 @@ class SelectableTest( # like testaliasunion, but off a Select off the union. - u = select([table1.c.col1, + u = ( + select( + [ + table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, - null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, - null().label('colx'), table2.c.coly]) - ).alias('analias') + null().label("coly"), + ] + ) + .union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) + .alias("analias") + ) s = select([u]) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -517,14 +572,29 @@ class SelectableTest( # same as testunion, except its an alias of the union - u = select([table1.c.col1, + u = ( + select( + [ + table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, - null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, - null().label('colx'), table2.c.coly]) - ).alias('analias') + null().label("coly"), + ] + ) + .union( + select( + [ + table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label("colx"), + table2.c.coly, + ] + ) + ) + .alias("analias") + ) j1 = table1.join(table2) assert u.corresponding_column(j1.c.table1_colx) is u.c.colx assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx @@ -532,14 +602,14 @@ class SelectableTest( def test_join(self): a = join(table1, table2) print(str(a.select(use_labels=True))) - b = table2.alias('b') + b = table2.alias("b") j = join(a, b) print(str(j)) criterion = a.c.table1_col1 == b.c.col2 self.assert_(criterion.compare(j.onclause)) def test_select_alias(self): - a = table1.select().alias('a') + a = table1.select().alias("a") j = join(a, table2) criterion = a.c.col1 == table2.c.col2 @@ -562,15 +632,19 @@ class SelectableTest( is_(expr2.left, sel2) def test_column_labels(self): - a = select([table1.c.col1.label('acol1'), - table1.c.col2.label('acol2'), - table1.c.col3.label('acol3')]) + a = select( + [ + table1.c.col1.label("acol1"), + table1.c.col2.label("acol2"), + table1.c.col3.label("acol3"), + ] + ) j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) def test_labeled_select_correspoinding(self): - l1 = select([func.max(table1.c.col1)]).label('foo') + l1 = select([func.max(table1.c.col1)]).label("foo") s = select([l1]) eq_(s.corresponding_column(l1), s.c.foo) @@ -579,7 +653,7 @@ class SelectableTest( eq_(s.corresponding_column(l1), s.c.foo) def test_select_alias_labels(self): - a = table2.select(use_labels=True).alias('a') + a = table2.select(use_labels=True).alias("a") j = join(a, table1) criterion = table1.c.col1 == a.c.table2_col2 @@ -587,14 +661,13 @@ class SelectableTest( def test_table_joined_to_select_of_table(self): metadata = MetaData() - a = Table('a', metadata, - Column('id', Integer, primary_key=True)) + a = Table("a", metadata, Column("id", Integer, primary_key=True)) - j2 = select([a.c.id.label('aid')]).alias('bar') + j2 = select([a.c.id.label("aid")]).alias("bar") j3 = a.join(j2, j2.c.aid == a.c.id) - j4 = select([j3]).alias('foo') + j4 = select([j3]).alias("foo") assert j4.corresponding_column(j2.c.aid) is j4.c.aid assert j4.corresponding_column(a.c.id) is j4.c.id @@ -602,9 +675,9 @@ class SelectableTest( m = MetaData() m2 = MetaData() - t1 = Table('t1', m, Column('id', Integer), Column('id2', Integer)) - t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'))) - t3 = Table('t3', m2, Column('id', Integer, ForeignKey('t1.id2'))) + t1 = Table("t1", m, Column("id", Integer), Column("id2", Integer)) + t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id"))) + t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2"))) s = select([t2, t3], use_labels=True) @@ -612,24 +685,24 @@ class SelectableTest( def test_multi_label_chain_naming_col(self): # See [ticket:2167] for this one. - l1 = table1.c.col1.label('a') - l2 = select([l1]).label('b') + l1 = table1.c.col1.label("a") + l2 = select([l1]).label("b") s = select([l2]) assert s.c.b is not None self.assert_compile( s.select(), - "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)" + "SELECT b FROM (SELECT (SELECT table1.col1 AS a FROM table1) AS b)", ) - s2 = select([s.label('c')]) + s2 = select([s.label("c")]) self.assert_compile( s2.select(), "SELECT c FROM (SELECT (SELECT (" - "SELECT table1.col1 AS a FROM table1) AS b) AS c)" + "SELECT table1.col1 AS a FROM table1) AS b) AS c)", ) def test_self_referential_select_raises(self): - t = table('t', column('x')) + t = table("t", column("x")) s = select([t]) @@ -637,140 +710,110 @@ class SelectableTest( assert_raises_message( exc.InvalidRequestError, r"select\(\) construct refers to itself as a FROM", - s.compile + s.compile, ) def test_unusual_column_elements_text(self): """test that .c excludes text().""" s = select([table1.c.col1, text("foo")]) - eq_( - list(s.c), - [s.c.col1] - ) + eq_(list(s.c), [s.c.col1]) def test_unusual_column_elements_clauselist(self): """Test that raw ClauseList is expanded into .c.""" from sqlalchemy.sql.expression import ClauseList + s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)]) - eq_( - list(s.c), - [s.c.col1, s.c.col2, s.c.col3] - ) + eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3]) def test_unusual_column_elements_boolean_clauselist(self): """test that BooleanClauseList is placed as single element in .c.""" c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4) s = select([table1.c.col1, c2]) - eq_( - list(s.c), - [s.c.col1, s.corresponding_column(c2)] - ) + eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)]) def test_from_list_deferred_constructor(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]) - t = Table('t', MetaData(), c1, c2) + t = Table("t", MetaData(), c1, c2) eq_(c1._from_objects, [t]) eq_(c2._from_objects, [t]) - self.assert_compile(select([c1]), - "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), - "SELECT t.c2 FROM t") + self.assert_compile(select([c1]), "SELECT t.c1 FROM t") + self.assert_compile(select([c2]), "SELECT t.c2 FROM t") def test_from_list_deferred_whereclause(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]).where(c1 == 5) - t = Table('t', MetaData(), c1, c2) + t = Table("t", MetaData(), c1, c2) eq_(c1._from_objects, [t]) eq_(c2._from_objects, [t]) - self.assert_compile(select([c1]), - "SELECT t.c1 FROM t") - self.assert_compile(select([c2]), - "SELECT t.c2 FROM t") + self.assert_compile(select([c1]), "SELECT t.c1 FROM t") + self.assert_compile(select([c2]), "SELECT t.c2 FROM t") def test_from_list_deferred_fromlist(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer)) + t1 = Table("t1", m, Column("x", Integer)) - c1 = Column('c1', Integer) + c1 = Column("c1", Integer) s = select([c1]).where(c1 == 5).select_from(t1) - t2 = Table('t2', MetaData(), c1) + t2 = Table("t2", MetaData(), c1) eq_(c1._from_objects, [t2]) - self.assert_compile(select([c1]), - "SELECT t2.c1 FROM t2") + self.assert_compile(select([c1]), "SELECT t2.c1 FROM t2") def test_from_list_deferred_cloning(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]) s2 = select([c2]) s3 = sql_util.ClauseAdapter(s).traverse(s2) - Table('t', MetaData(), c1, c2) + Table("t", MetaData(), c1, c2) - self.assert_compile( - s3, - "SELECT t.c2 FROM t" - ) + self.assert_compile(s3, "SELECT t.c2 FROM t") def test_from_list_with_columns(self): - table1 = table('t1', column('a')) - table2 = table('t2', column('b')) + table1 = table("t1", column("a")) + table2 = table("t2", column("b")) s1 = select([table1.c.a, table2.c.b]) - self.assert_compile(s1, - "SELECT t1.a, t2.b FROM t1, t2" - ) + self.assert_compile(s1, "SELECT t1.a, t2.b FROM t1, t2") s2 = s1.with_only_columns([table2.c.b]) - self.assert_compile(s2, - "SELECT t2.b FROM t2" - ) + self.assert_compile(s2, "SELECT t2.b FROM t2") s3 = sql_util.ClauseAdapter(table1).traverse(s1) - self.assert_compile(s3, - "SELECT t1.a, t2.b FROM t1, t2" - ) + self.assert_compile(s3, "SELECT t1.a, t2.b FROM t1, t2") s4 = s3.with_only_columns([table2.c.b]) - self.assert_compile(s4, - "SELECT t2.b FROM t2" - ) + self.assert_compile(s4, "SELECT t2.b FROM t2") def test_from_list_warning_against_existing(self): - c1 = Column('c1', Integer) + c1 = Column("c1", Integer) s = select([c1]) # force a compile. - self.assert_compile( - s, - "SELECT c1" - ) + self.assert_compile(s, "SELECT c1") - Table('t', MetaData(), c1) + Table("t", MetaData(), c1) - self.assert_compile( - s, - "SELECT t.c1 FROM t" - ) + self.assert_compile(s, "SELECT t.c1 FROM t") def test_from_list_recovers_after_warning(self): - c1 = Column('c1', Integer) - c2 = Column('c2', Integer) + c1 = Column("c1", Integer) + c2 = Column("c2", Integer) s = select([c1]) @@ -779,7 +822,8 @@ class SelectableTest( @testing.emits_warning() def go(): - return Table('t', MetaData(), c1, c2) + return Table("t", MetaData(), c1, c2) + t = go() eq_(c1._from_objects, [t]) @@ -793,175 +837,175 @@ class SelectableTest( self.assert_compile(select([c2]), "SELECT t.c2 FROM t") def test_label_gen_resets_on_table(self): - c1 = Column('c1', Integer) + c1 = Column("c1", Integer) eq_(c1._label, "c1") - Table('t1', MetaData(), c1) + Table("t1", MetaData(), c1) eq_(c1._label, "t1_c1") class RefreshForNewColTest(fixtures.TestBase): - def test_join_uninit(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) j = a.join(b, a.c.x == b.c.y) - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_join_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) j = a.join(b, a.c.x == b.c.y) j.c - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_join_samename_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) j = a.join(b, a.c.x == b.c.y) j.c - q = column('x') + q = column("x") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_x is q def test_select_samename_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) s = select([a, b]).apply_labels() s.c - q = column('x') + q = column("x") b.append_column(q) s._refresh_for_new_column(q) assert q in s.c.b_x.proxy_set def test_aliased_select_samename_uninit(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) s = select([a, b]).apply_labels().alias() - q = column('x') + q = column("x") b.append_column(q) s._refresh_for_new_column(q) assert q in s.c.b_x.proxy_set def test_aliased_select_samename_init(self): - a = table('a', column('x')) - b = table('b', column('y')) + a = table("a", column("x")) + b = table("b", column("y")) s = select([a, b]).apply_labels().alias() s.c - q = column('x') + q = column("x") b.append_column(q) s._refresh_for_new_column(q) assert q in s.c.b_x.proxy_set def test_aliased_select_irrelevant(self): - a = table('a', column('x')) - b = table('b', column('y')) - c = table('c', column('z')) + a = table("a", column("x")) + b = table("b", column("y")) + c = table("c", column("z")) s = select([a, b]).apply_labels().alias() s.c - q = column('x') + q = column("x") c.append_column(q) s._refresh_for_new_column(q) - assert 'c_x' not in s.c + assert "c_x" not in s.c def test_aliased_select_no_cols_clause(self): - a = table('a', column('x')) + a = table("a", column("x")) s = select([a.c.x]).apply_labels().alias() s.c - q = column('q') + q = column("q") a.append_column(q) s._refresh_for_new_column(q) - assert 'a_q' not in s.c + assert "a_q" not in s.c def test_union_uninit(self): - a = table('a', column('x')) + a = table("a", column("x")) s1 = select([a]) s2 = select([a]) s3 = s1.union(s2) - q = column('q') + q = column("q") a.append_column(q) s3._refresh_for_new_column(q) assert a.c.q in s3.c.q.proxy_set def test_union_init_raises(self): - a = table('a', column('x')) + a = table("a", column("x")) s1 = select([a]) s2 = select([a]) s3 = s1.union(s2) s3.c - q = column('q') + q = column("q") a.append_column(q) assert_raises_message( NotImplementedError, "CompoundSelect constructs don't support addition of " "columns to underlying selectables", - s3._refresh_for_new_column, q + s3._refresh_for_new_column, + q, ) def test_nested_join_uninit(self): - a = table('a', column('x')) - b = table('b', column('y')) - c = table('c', column('z')) + a = table("a", column("x")) + b = table("b", column("y")) + c = table("c", column("z")) j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z) - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_nested_join_init(self): - a = table('a', column('x')) - b = table('b', column('y')) - c = table('c', column('z')) + a = table("a", column("x")) + b = table("b", column("y")) + c = table("c", column("z")) j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z) j.c - q = column('q') + q = column("q") b.append_column(q) j._refresh_for_new_column(q) assert j.c.b_q is q def test_fk_table(self): m = MetaData() - fk = ForeignKey('x.id') - Table('x', m, Column('id', Integer)) - a = Table('a', m, Column('x', Integer, fk)) + fk = ForeignKey("x.id") + Table("x", m, Column("id", Integer)) + a = Table("a", m, Column("x", Integer, fk)) a.c - q = Column('q', Integer) + q = Column("q", Integer) a.append_column(q) a._refresh_for_new_column(q) eq_(a.foreign_keys, set([fk])) - fk2 = ForeignKey('g.id') - p = Column('p', Integer, fk2) + fk2 = ForeignKey("g.id") + p = Column("p", Integer, fk2) a.append_column(p) a._refresh_for_new_column(p) eq_(a.foreign_keys, set([fk, fk2])) def test_fk_join(self): m = MetaData() - fk = ForeignKey('x.id') - Table('x', m, Column('id', Integer)) - a = Table('a', m, Column('x', Integer, fk)) - b = Table('b', m, Column('y', Integer)) + fk = ForeignKey("x.id") + Table("x", m, Column("id", Integer)) + a = Table("a", m, Column("x", Integer, fk)) + b = Table("b", m, Column("y", Integer)) j = a.join(b, a.c.x == b.c.y) j.c - q = Column('q', Integer) + q = Column("q", Integer) b.append_column(q) j._refresh_for_new_column(q) eq_(j.foreign_keys, set([fk])) - fk2 = ForeignKey('g.id') - p = Column('p', Integer, fk2) + fk2 = ForeignKey("g.id") + p = Column("p", Integer, fk2) b.append_column(p) j._refresh_for_new_column(p) eq_(j.foreign_keys, set([fk, fk2])) @@ -972,18 +1016,18 @@ class AnonLabelTest(fixtures.TestBase): """Test behaviors fixed by [ticket:2168].""" def test_anon_labels_named_column(self): - c1 = column('x') + c1 = column("x") assert c1.label(None) is not c1 eq_(str(select([c1.label(None)])), "SELECT x AS x_1") def test_anon_labels_literal_column(self): - c1 = literal_column('x') + c1 = literal_column("x") assert c1.label(None) is not c1 eq_(str(select([c1.label(None)])), "SELECT x AS x_1") def test_anon_labels_func(self): - c1 = func.count('*') + c1 = func.count("*") assert c1.label(None) is not c1 eq_(str(select([c1])), "SELECT count(:count_2) AS count_1") @@ -992,76 +1036,76 @@ class AnonLabelTest(fixtures.TestBase): eq_(str(select([c1.label(None)])), "SELECT count(:count_2) AS count_1") def test_named_labels_named_column(self): - c1 = column('x') - eq_(str(select([c1.label('y')])), "SELECT x AS y") + c1 = column("x") + eq_(str(select([c1.label("y")])), "SELECT x AS y") def test_named_labels_literal_column(self): - c1 = literal_column('x') - eq_(str(select([c1.label('y')])), "SELECT x AS y") + c1 = literal_column("x") + eq_(str(select([c1.label("y")])), "SELECT x AS y") class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_flat_ok_on_non_join(self): - a = table('a', column('a')) + a = table("a", column("a")) s = a.select() self.assert_compile( s.alias(flat=True).select(), - "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1" + "SELECT anon_1.a FROM (SELECT a.a AS a FROM a) AS anon_1", ) def test_join_alias(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( a.join(b, a.c.a == b.c.b).alias(), - "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b" + "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b", ) def test_join_standalone_alias(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( alias(a.join(b, a.c.a == b.c.b)), - "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b" + "SELECT a.a AS a_a, b.b AS b_b FROM a JOIN b ON a.a = b.b", ) def test_join_alias_flat(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( a.join(b, a.c.a == b.c.b).alias(flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b" + "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b", ) def test_join_standalone_alias_flat(self): - a = table('a', column('a')) - b = table('b', column('b')) + a = table("a", column("a")) + b = table("b", column("b")) self.assert_compile( alias(a.join(b, a.c.a == b.c.b), flat=True), - "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b" + "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b", ) def test_composed_join_alias_flat(self): - a = table('a', column('a')) - b = table('b', column('b')) - c = table('c', column('c')) - d = table('d', column('d')) + a = table("a", column("a")) + b = table("b", column("b")) + c = table("c", column("c")) + d = table("d", column("d")) j1 = a.join(b, a.c.a == b.c.b) j2 = c.join(d, c.c.c == d.c.d) self.assert_compile( j1.join(j2, b.c.b == c.c.c).alias(flat=True), "a AS a_1 JOIN b AS b_1 ON a_1.a = b_1.b JOIN " - "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c" + "(c AS c_1 JOIN d AS d_1 ON c_1.c = d_1.d) ON b_1.b = c_1.c", ) def test_composed_join_alias(self): - a = table('a', column('a')) - b = table('b', column('b')) - c = table('c', column('c')) - d = table('d', column('d')) + a = table("a", column("a")) + b = table("b", column("b")) + c = table("c", column("c")) + d = table("d", column("d")) j1 = a.join(b, a.c.a == b.c.b) j2 = c.join(d, c.c.c == d.c.d) @@ -1070,29 +1114,35 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT anon_1.a_a, anon_1.b_b, anon_1.c_c, anon_1.d_d " "FROM (SELECT a.a AS a_a, b.b AS b_b, c.c AS c_c, d.d AS d_d " "FROM a JOIN b ON a.a = b.b " - "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1" + "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1", ) class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_join_condition(self): m = MetaData() - t1 = Table('t1', m, Column('id', Integer)) - t2 = Table('t2', m, - Column('id', Integer), - Column('t1id', ForeignKey('t1.id'))) - t3 = Table('t3', m, - Column('id', Integer), - Column('t1id', ForeignKey('t1.id')), - Column('t2id', ForeignKey('t2.id'))) - t4 = Table('t4', m, Column('id', Integer), - Column('t2id', ForeignKey('t2.id'))) - t5 = Table('t5', m, - Column('t1id1', ForeignKey('t1.id')), - Column('t1id2', ForeignKey('t1.id')), - ) + t1 = Table("t1", m, Column("id", Integer)) + t2 = Table( + "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) + ) + t3 = Table( + "t3", + m, + Column("id", Integer), + Column("t1id", ForeignKey("t1.id")), + Column("t2id", ForeignKey("t2.id")), + ) + t4 = Table( + "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id")) + ) + t5 = Table( + "t5", + m, + Column("t1id1", ForeignKey("t1.id")), + Column("t1id2", ForeignKey("t1.id")), + ) t1t2 = t1.join(t2) t2t3 = t2.join(t3) @@ -1108,10 +1158,8 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id), ]: assert expected.compare( - sql_util.join_condition( - left, - right, - a_subset=a_subset)) + sql_util.join_condition(left, right, a_subset=a_subset) + ) # these are ambiguous, or have no joins for left, right, a_subset in [ @@ -1120,12 +1168,14 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t1, t4, None), (t1t2, t2t3, None), (t5, t1, None), - (t5.select(use_labels=True), t1, None) + (t5.select(use_labels=True), t1, None), ]: assert_raises( exc.ArgumentError, sql_util.join_condition, - left, right, a_subset=a_subset + left, + right, + a_subset=a_subset, ) als = t2t3.alias() @@ -1138,41 +1188,38 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t2t3, t4, t2t3.c.t2_id == t4.c.t2id), (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id), (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id), - (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id) + (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id), ]: - assert expected.compare( - left.join(right).onclause - ) + assert expected.compare(left.join(right).onclause) # these are right-nested joins j = t1t2.join(t2t3) assert j.onclause.compare(t2.c.id == t3.c.t2id) self.assert_compile( - j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN " - "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id") + j, + "t1 JOIN t2 ON t1.id = t2.t1id JOIN " + "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id", + ) st2t3 = t2t3.select(use_labels=True) j = t1t2.join(st2t3) assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id) self.assert_compile( - j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN " + j, + "t1 JOIN t2 ON t1.id = t2.t1id JOIN " "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, " "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id " - "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id") + "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id", + ) def test_join_multiple_equiv_fks(self): m = MetaData() - t1 = Table('t1', m, - Column('id', Integer, primary_key=True) - ) + t1 = Table("t1", m, Column("id", Integer, primary_key=True)) t2 = Table( - 't2', + "t2", m, - Column( - 't1id', - Integer, - ForeignKey('t1.id'), - ForeignKey('t1.id'))) + Column("t1id", Integer, ForeignKey("t1.id"), ForeignKey("t1.id")), + ) assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id) @@ -1181,35 +1228,43 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): # bounding the "good" column with two "bad" ones is so to # try to get coverage to get the "continue" statements # in the loop... - t1 = Table('t1', m, - Column('y', Integer, ForeignKey('t22.id')), - Column('x', Integer, ForeignKey('t2.id')), - Column('q', Integer, ForeignKey('t22.id')), - ) - t2 = Table('t2', m, Column('id', Integer)) + t1 = Table( + "t1", + m, + Column("y", Integer, ForeignKey("t22.id")), + Column("x", Integer, ForeignKey("t2.id")), + Column("q", Integer, ForeignKey("t22.id")), + ) + t2 = Table("t2", m, Column("id", Integer)) assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) def test_join_cond_no_such_unrelated_column(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')), - Column('y', Integer, ForeignKey('t3.q'))) - t2 = Table('t2', m, Column('id', Integer)) - Table('t3', m, Column('id', Integer)) + t1 = Table( + "t1", + m, + Column("x", Integer, ForeignKey("t2.id")), + Column("y", Integer, ForeignKey("t3.q")), + ) + t2 = Table("t2", m, Column("id", Integer)) + Table("t3", m, Column("id", Integer)) assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) def test_join_cond_no_such_related_table(self): m1 = MetaData() m2 = MetaData() - t1 = Table('t1', m1, Column('x', Integer, ForeignKey('t2.id'))) - t2 = Table('t2', m2, Column('id', Integer)) + t1 = Table("t1", m1, Column("x", Integer, ForeignKey("t2.id"))) + t2 = Table("t2", m2, Column("id", Integer)) assert_raises_message( exc.NoReferencedTableError, "Foreign key associated with column 't1.x' could not find " "table 't2' with which to generate a foreign key to " "target column 'id'", - sql_util.join_condition, t1, t2 + sql_util.join_condition, + t1, + t2, ) assert_raises_message( @@ -1217,19 +1272,23 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): "Foreign key associated with column 't1.x' could not find " "table 't2' with which to generate a foreign key to " "target column 'id'", - sql_util.join_condition, t2, t1 + sql_util.join_condition, + t2, + t1, ) def test_join_cond_no_such_related_column(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.q'))) - t2 = Table('t2', m, Column('id', Integer)) + t1 = Table("t1", m, Column("x", Integer, ForeignKey("t2.q"))) + t2 = Table("t2", m, Column("id", Integer)) assert_raises_message( exc.NoReferencedColumnError, "Could not initialize target column for " "ForeignKey 't2.q' on table 't1': " "table 't2' has no column named 'q'", - sql_util.join_condition, t1, t2 + sql_util.join_condition, + t1, + t2, ) assert_raises_message( @@ -1237,25 +1296,35 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): "Could not initialize target column for " "ForeignKey 't2.q' on table 't1': " "table 't2' has no column named 'q'", - sql_util.join_condition, t2, t1 + sql_util.join_condition, + t2, + t1, ) class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): - def test_join_pk_collapse_implicit(self): """test that redundant columns in a join get 'collapsed' into a minimal primary key, which is the root column along a chain of foreign key relationships.""" meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), - primary_key=True)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), - primary_key=True)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), - primary_key=True)) + a = Table("a", meta, Column("id", Integer, primary_key=True)) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + ) + c = Table( + "c", + meta, + Column("id", Integer, ForeignKey("b.id"), primary_key=True), + ) + d = Table( + "d", + meta, + Column("id", Integer, ForeignKey("c.id"), primary_key=True), + ) assert c.c.id.references(b.c.id) assert not d.c.id.references(a.c.id) assert list(a.join(b).primary_key) == [a.c.id] @@ -1271,43 +1340,60 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): explicit join conditions.""" meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), - Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), - primary_key=True), Column('x', Integer)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), - primary_key=True), Column('x', Integer)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), - primary_key=True), Column('x', Integer)) + a = Table( + "a", + meta, + Column("id", Integer, primary_key=True), + Column("x", Integer), + ) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + Column("x", Integer), + ) + c = Table( + "c", + meta, + Column("id", Integer, ForeignKey("b.id"), primary_key=True), + Column("x", Integer), + ) + d = Table( + "d", + meta, + Column("id", Integer, ForeignKey("c.id"), primary_key=True), + Column("x", Integer), + ) print(list(a.join(b, a.c.x == b.c.id).primary_key)) assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id] assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \ - == [a.c.id] - assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \ - == [b.c.id] - assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \ - == [b.c.id] + assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) == [a.c.id] + assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) == [b.c.id] + assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) == [b.c.id] + assert list( + d.join(b, d.c.id == b.c.id).join(c, b.c.id == c.c.x).primary_key + ) == [b.c.id] assert list( - d.join( - b, - d.c.id == b.c.id).join( - c, - b.c.id == c.c.x).primary_key) == [ - b.c.id] - assert list(a.join(b).join(c, c.c.id - == b.c.x).join(d).primary_key) == [a.c.id] - assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x - == b.c.id)).primary_key) == [a.c.id] + a.join(b).join(c, c.c.id == b.c.x).join(d).primary_key + ) == [a.c.id] + assert list( + a.join(b, and_(a.c.id == b.c.id, a.c.x == b.c.id)).primary_key + ) == [a.c.id] def test_init_doesnt_blowitaway(self): meta = MetaData() - a = Table('a', meta, - Column('id', Integer, primary_key=True), - Column('x', Integer)) - b = Table('b', meta, - Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('x', Integer)) + a = Table( + "a", + meta, + Column("id", Integer, primary_key=True), + Column("x", Integer), + ) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + Column("x", Integer), + ) j = a.join(b) assert list(j.primary_key) == [a.c.id] @@ -1317,12 +1403,18 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): def test_non_column_clause(self): meta = MetaData() - a = Table('a', meta, - Column('id', Integer, primary_key=True), - Column('x', Integer)) - b = Table('b', meta, - Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('x', Integer, primary_key=True)) + a = Table( + "a", + meta, + Column("id", Integer, primary_key=True), + Column("x", Integer), + ) + b = Table( + "b", + meta, + Column("id", Integer, ForeignKey("a.id"), primary_key=True), + Column("x", Integer, primary_key=True), + ) j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5)) assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j) @@ -1331,123 +1423,156 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): def test_onclause_direction(self): metadata = MetaData() - employee = Table('Employee', metadata, - Column('name', String(100)), - Column('id', Integer, primary_key=True), - ) + employee = Table( + "Employee", + metadata, + Column("name", String(100)), + Column("id", Integer, primary_key=True), + ) - engineer = Table('Engineer', metadata, - Column('id', Integer, - ForeignKey('Employee.id'), primary_key=True)) + engineer = Table( + "Engineer", + metadata, + Column("id", Integer, ForeignKey("Employee.id"), primary_key=True), + ) - eq_(util.column_set(employee.join(engineer, employee.c.id - == engineer.c.id).primary_key), - util.column_set([employee.c.id])) - eq_(util.column_set(employee.join(engineer, engineer.c.id - == employee.c.id).primary_key), - util.column_set([employee.c.id])) + eq_( + util.column_set( + employee.join( + engineer, employee.c.id == engineer.c.id + ).primary_key + ), + util.column_set([employee.c.id]), + ) + eq_( + util.column_set( + employee.join( + engineer, engineer.c.id == employee.c.id + ).primary_key + ), + util.column_set([employee.c.id]), + ) class ReduceTest(fixtures.TestBase, AssertsExecutionResults): - def test_reduce(self): meta = MetaData() - t1 = Table('t1', meta, - Column('t1id', Integer, primary_key=True), - Column('t1data', String(30))) + t1 = Table( + "t1", + meta, + Column("t1id", Integer, primary_key=True), + Column("t1data", String(30)), + ) t2 = Table( - 't2', + "t2", meta, - Column( - 't2id', - Integer, - ForeignKey('t1.t1id'), - primary_key=True), - Column( - 't2data', - String(30))) + Column("t2id", Integer, ForeignKey("t1.t1id"), primary_key=True), + Column("t2data", String(30)), + ) t3 = Table( - 't3', + "t3", meta, - Column( - 't3id', - Integer, - ForeignKey('t2.t2id'), - primary_key=True), - Column( - 't3data', - String(30))) - - eq_(util.column_set(sql_util.reduce_columns([ - t1.c.t1id, - t1.c.t1data, - t2.c.t2id, - t2.c.t2data, - t3.c.t3id, - t3.c.t3data, - ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, - t3.c.t3data])) + Column("t3id", Integer, ForeignKey("t2.t2id"), primary_key=True), + Column("t3data", String(30)), + ) + + eq_( + util.column_set( + sql_util.reduce_columns( + [ + t1.c.t1id, + t1.c.t1data, + t2.c.t2id, + t2.c.t2data, + t3.c.t3id, + t3.c.t3data, + ] + ) + ), + util.column_set( + [t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data] + ), + ) def test_reduce_selectable(self): metadata = MetaData() - engineers = Table('engineers', metadata, - Column('engineer_id', Integer, primary_key=True), - Column('engineer_name', String(50))) - managers = Table('managers', metadata, - Column('manager_id', Integer, primary_key=True), - Column('manager_name', String(50))) - s = select([engineers, - managers]).where(engineers.c.engineer_name - == managers.c.manager_name) - eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), - util.column_set([s.c.engineer_id, s.c.engineer_name, - s.c.manager_id])) + engineers = Table( + "engineers", + metadata, + Column("engineer_id", Integer, primary_key=True), + Column("engineer_name", String(50)), + ) + managers = Table( + "managers", + metadata, + Column("manager_id", Integer, primary_key=True), + Column("manager_name", String(50)), + ) + s = select([engineers, managers]).where( + engineers.c.engineer_name == managers.c.manager_name + ) + eq_( + util.column_set(sql_util.reduce_columns(list(s.c), s)), + util.column_set( + [s.c.engineer_id, s.c.engineer_name, s.c.manager_id] + ), + ) def test_reduce_generation(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer)) - t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')), - Column('q', Integer)) + t1 = Table( + "t1", + m, + Column("x", Integer, primary_key=True), + Column("y", Integer), + ) + t2 = Table( + "t2", + m, + Column("z", Integer, ForeignKey("t1.x")), + Column("q", Integer), + ) s1 = select([t1, t2]) s2 = s1.reduce_columns(only_synonyms=False) - eq_( - set(s2.inner_columns), - set([t1.c.x, t1.c.y, t2.c.q]) - ) + eq_(set(s2.inner_columns), set([t1.c.x, t1.c.y, t2.c.q])) s2 = s1.reduce_columns() - eq_( - set(s2.inner_columns), - set([t1.c.x, t1.c.y, t2.c.z, t2.c.q]) - ) + eq_(set(s2.inner_columns), set([t1.c.x, t1.c.y, t2.c.z, t2.c.q])) def test_reduce_only_synonym_fk(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer)) - t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), - Column('q', Integer, ForeignKey('t1.y'))) + t1 = Table( + "t1", + m, + Column("x", Integer, primary_key=True), + Column("y", Integer), + ) + t2 = Table( + "t2", + m, + Column("x", Integer, ForeignKey("t1.x")), + Column("q", Integer, ForeignKey("t1.y")), + ) s1 = select([t1, t2]) s1 = s1.reduce_columns(only_synonyms=True) - eq_( - set(s1.c), - set([s1.c.x, s1.c.y, s1.c.q]) - ) + eq_(set(s1.c), set([s1.c.x, s1.c.y, s1.c.q])) def test_reduce_only_synonym_lineage(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer), - Column('z', Integer) - ) + t1 = Table( + "t1", + m, + Column("x", Integer, primary_key=True), + Column("y", Integer), + Column("z", Integer), + ) # test that the first appearance in the columns clause # wins - t1 is first, t1.c.x wins s1 = select([t1]) s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( set(s2.reduce_columns().inner_columns), - set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) + set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]), ) # reverse order, s1.c.x wins @@ -1455,91 +1580,129 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( set(s2.reduce_columns().inner_columns), - set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) + set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]), ) def test_reduce_aliased_join(self): metadata = MetaData() people = Table( - 'people', metadata, Column( - 'person_id', Integer, Sequence( - 'person_id_seq', optional=True), primary_key=True), Column( - 'name', String(50)), Column( - 'type', String(30))) + "people", + metadata, + Column( + "person_id", + Integer, + Sequence("person_id_seq", optional=True), + primary_key=True, + ), + Column("name", String(50)), + Column("type", String(30)), + ) engineers = Table( - 'engineers', + "engineers", metadata, - Column('person_id', Integer, ForeignKey('people.person_id' - ), primary_key=True), - Column('status', String(30)), - Column('engineer_name', String(50)), - Column('primary_language', String(50)), + Column( + "person_id", + Integer, + ForeignKey("people.person_id"), + primary_key=True, + ), + Column("status", String(30)), + Column("engineer_name", String(50)), + Column("primary_language", String(50)), ) managers = Table( - 'managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), - primary_key=True), - Column('status', String(30)), - Column('manager_name', String(50))) - pjoin = \ - people.outerjoin(engineers).outerjoin(managers).\ - select(use_labels=True).alias('pjoin' - ) - eq_(util.column_set(sql_util.reduce_columns( - [pjoin.c.people_person_id, pjoin.c.engineers_person_id, - pjoin.c.managers_person_id])), - util.column_set([pjoin.c.people_person_id])) + "managers", + metadata, + Column( + "person_id", + Integer, + ForeignKey("people.person_id"), + primary_key=True, + ), + Column("status", String(30)), + Column("manager_name", String(50)), + ) + pjoin = ( + people.outerjoin(engineers) + .outerjoin(managers) + .select(use_labels=True) + .alias("pjoin") + ) + eq_( + util.column_set( + sql_util.reduce_columns( + [ + pjoin.c.people_person_id, + pjoin.c.engineers_person_id, + pjoin.c.managers_person_id, + ] + ) + ), + util.column_set([pjoin.c.people_person_id]), + ) def test_reduce_aliased_union(self): metadata = MetaData() item_table = Table( - 'item', + "item", metadata, Column( - 'id', - Integer, - ForeignKey('base_item.id'), - primary_key=True), - Column( - 'dummy', - Integer, - default=0)) + "id", Integer, ForeignKey("base_item.id"), primary_key=True + ), + Column("dummy", Integer, default=0), + ) base_item_table = Table( - 'base_item', metadata, Column( - 'id', Integer, primary_key=True), Column( - 'child_name', String(255), default=None)) + "base_item", + metadata, + Column("id", Integer, primary_key=True), + Column("child_name", String(255), default=None), + ) from sqlalchemy.orm.util import polymorphic_union - item_join = polymorphic_union({ - 'BaseItem': - base_item_table.select( - base_item_table.c.child_name - == 'BaseItem'), - 'Item': base_item_table.join(item_table)}, - None, 'item_join') - eq_(util.column_set(sql_util.reduce_columns([item_join.c.id, - item_join.c.dummy, - item_join.c.child_name])), - util.column_set([item_join.c.id, - item_join.c.dummy, - item_join.c.child_name])) + + item_join = polymorphic_union( + { + "BaseItem": base_item_table.select( + base_item_table.c.child_name == "BaseItem" + ), + "Item": base_item_table.join(item_table), + }, + None, + "item_join", + ) + eq_( + util.column_set( + sql_util.reduce_columns( + [item_join.c.id, item_join.c.dummy, item_join.c.child_name] + ) + ), + util.column_set( + [item_join.c.id, item_join.c.dummy, item_join.c.child_name] + ), + ) def test_reduce_aliased_union_2(self): metadata = MetaData() - page_table = Table('page', metadata, Column('id', Integer, - primary_key=True)) - magazine_page_table = Table('magazine_page', metadata, - Column('page_id', Integer, - ForeignKey('page.id'), - primary_key=True)) + page_table = Table( + "page", metadata, Column("id", Integer, primary_key=True) + ) + magazine_page_table = Table( + "magazine_page", + metadata, + Column( + "page_id", Integer, ForeignKey("page.id"), primary_key=True + ), + ) classified_page_table = Table( - 'classified_page', + "classified_page", metadata, Column( - 'magazine_page_id', + "magazine_page_id", Integer, - ForeignKey('magazine_page.page_id'), - primary_key=True)) + ForeignKey("magazine_page.page_id"), + primary_key=True, + ), + ) # this is essentially the union formed by the ORM's # polymorphic_union function. we define two versions with @@ -1549,25 +1712,33 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): # classified_page.magazine_page_id pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]). - select_from( - page_table.join(magazine_page_table). - join(classified_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]). - select_from(page_table.join(magazine_page_table)) - ).alias('pjoin') - eq_(util.column_set(sql_util.reduce_columns( - [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id])) + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id, + ] + ).select_from( + page_table.join(magazine_page_table).join( + classified_page_table + ) + ), + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label("magazine_page_id"), + ] + ).select_from(page_table.join(magazine_page_table)), + ).alias("pjoin") + eq_( + util.column_set( + sql_util.reduce_columns( + [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id] + ) + ), + util.column_set([pjoin.c.id]), + ) # the first selectable has a CAST, which is a placeholder for # classified_page.magazine_page_id in the second selectable. @@ -1576,45 +1747,70 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): # currently makes the external column look like that of the # first selectable only. - pjoin = union(select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]). - select_from(page_table.join(magazine_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]). - select_from(page_table.join(magazine_page_table). - join(classified_page_table)) - ).alias('pjoin') - eq_(util.column_set(sql_util.reduce_columns( - [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id])) + pjoin = union( + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label("magazine_page_id"), + ] + ).select_from(page_table.join(magazine_page_table)), + select( + [ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id, + ] + ).select_from( + page_table.join(magazine_page_table).join( + classified_page_table + ) + ), + ).alias("pjoin") + eq_( + util.column_set( + sql_util.reduce_columns( + [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id] + ) + ), + util.column_set([pjoin.c.id]), + ) class DerivedTest(fixtures.TestBase, AssertsExecutionResults): - def test_table(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) + t1 = Table( + "t1", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) + t2 = Table( + "t2", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) assert t1.is_derived_from(t1) assert not t2.is_derived_from(t1) def test_alias(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) + t1 = Table( + "t1", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) + t2 = Table( + "t2", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) assert t1.alias().is_derived_from(t1) assert not t2.alias().is_derived_from(t1) @@ -1624,44 +1820,43 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults): def test_select(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), - Column('c2', String(30))) + t1 = Table( + "t1", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) + t2 = Table( + "t2", + meta, + Column("c1", Integer, primary_key=True), + Column("c2", String(30)), + ) assert t1.select().is_derived_from(t1) assert not t2.select().is_derived_from(t1) assert select([t1, t2]).is_derived_from(t1) - assert t1.select().alias('foo').is_derived_from(t1) - assert select([t1, t2]).alias('foo').is_derived_from(t1) - assert not t2.select().alias('foo').is_derived_from(t1) + assert t1.select().alias("foo").is_derived_from(t1) + assert select([t1, t2]).alias("foo").is_derived_from(t1) + assert not t2.select().alias("foo").is_derived_from(t1) class AnnotationsTest(fixtures.TestBase): - def test_hashing(self): - t = table('t', column('x')) + t = table("t", column("x")) a = t.alias() s = t.select() s2 = a.select() - for obj in [ - t, - t.c.x, - a, - s, - s2, - t.c.x > 1, - (t.c.x > 1).label(None) - ]: + for obj in [t, t.c.x, a, s, s2, t.c.x > 1, (t.c.x > 1).label(None)]: annot = obj._annotate({}) eq_(set([obj]), set([annot])) def test_compare(self): - t = table('t', column('x'), column('y')) + t = table("t", column("x"), column("y")) x_a = t.c.x._annotate({}) assert t.c.x.compare(x_a) assert x_a.compare(t.c.x) @@ -1681,40 +1876,44 @@ class AnnotationsTest(fixtures.TestBase): def test_late_name_add(self): from sqlalchemy.schema import Column + c1 = Column(Integer) c1_a = c1._annotate({"foo": "bar"}) - c1.name = 'somename' - eq_(c1_a.name, 'somename') + c1.name = "somename" + eq_(c1_a.name, "somename") def test_late_table_add(self): c1 = Column("foo", Integer) c1_a = c1._annotate({"foo": "bar"}) - t = Table('t', MetaData(), c1) + t = Table("t", MetaData(), c1) is_(c1_a.table, t) def test_basic_attrs(self): - t = Table('t', MetaData(), - Column('x', Integer, info={'q': 'p'}), - Column('y', Integer, key='q')) + t = Table( + "t", + MetaData(), + Column("x", Integer, info={"q": "p"}), + Column("y", Integer, key="q"), + ) x_a = t.c.x._annotate({}) y_a = t.c.q._annotate({}) - t.c.x.info['z'] = 'h' + t.c.x.info["z"] = "h" - eq_(y_a.key, 'q') + eq_(y_a.key, "q") is_(x_a.table, t) - eq_(x_a.info, {'q': 'p', 'z': 'h'}) + eq_(x_a.info, {"q": "p", "z": "h"}) eq_(t.c.x.anon_label, x_a.anon_label) def test_custom_constructions(self): from sqlalchemy.schema import Column class MyColumn(Column): - def __init__(self): - Column.__init__(self, 'foo', Integer) + Column.__init__(self, "foo", Integer) + _constructor = Column - t1 = Table('t1', MetaData(), MyColumn()) + t1 = Table("t1", MetaData(), MyColumn()) s1 = t1.select() assert isinstance(t1.c.foo, MyColumn) assert isinstance(s1.c.foo, Column) @@ -1734,8 +1933,9 @@ class AnnotationsTest(fixtures.TestBase): pass assert isinstance( - MyColumn('x', Integer)._annotate({"foo": "bar"}), - AnnotatedColumnElement) + MyColumn("x", Integer)._annotate({"foo": "bar"}), + AnnotatedColumnElement, + ) def test_custom_construction_correct_anno_expr(self): # [ticket:2918] @@ -1744,14 +1944,14 @@ class AnnotationsTest(fixtures.TestBase): class MyColumn(Column): pass - col = MyColumn('x', Integer) + col = MyColumn("x", Integer) binary_1 = col == 5 - col_anno = MyColumn('x', Integer)._annotate({"foo": "bar"}) + col_anno = MyColumn("x", Integer)._annotate({"foo": "bar"}) binary_2 = col_anno == 5 eq_(binary_2.left._annotations, {"foo": "bar"}) def test_annotated_corresponding_column(self): - table1 = table('table1', column("col1")) + table1 = table("table1", column("col1")) s1 = select([table1.c.col1]) t1 = s1._annotate({}) @@ -1766,93 +1966,107 @@ class AnnotationsTest(fixtures.TestBase): inner = select([s1]) - assert inner.corresponding_column( - t2.c.col1, - require_embedded=False) is inner.corresponding_column( - t2.c.col1, - require_embedded=True) is inner.c.col1 - assert inner.corresponding_column( - t1.c.col1, - require_embedded=False) is inner.corresponding_column( - t1.c.col1, - require_embedded=True) is inner.c.col1 + assert ( + inner.corresponding_column(t2.c.col1, require_embedded=False) + is inner.corresponding_column(t2.c.col1, require_embedded=True) + is inner.c.col1 + ) + assert ( + inner.corresponding_column(t1.c.col1, require_embedded=False) + is inner.corresponding_column(t1.c.col1, require_embedded=True) + is inner.c.col1 + ) def test_annotated_visit(self): - table1 = table('table1', column("col1"), column("col2")) + table1 = table("table1", column("col1"), column("col2")) - bin = table1.c.col1 == bindparam('foo', value=None) + bin = table1.c.col1 == bindparam("foo", value=None) assert str(bin) == "table1.col1 = :foo" def visit_binary(b): b.right = table1.c.col2 - b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary}) + b2 = visitors.cloned_traverse(bin, {}, {"binary": visit_binary}) assert str(b2) == "table1.col1 = table1.col2" - b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary': - visit_binary}) - assert str(b3) == 'table1.col1 = table1.col2' + b3 = visitors.cloned_traverse( + bin._annotate({}), {}, {"binary": visit_binary} + ) + assert str(b3) == "table1.col1 = table1.col2" def visit_binary(b): - b.left = bindparam('bar') + b.left = bindparam("bar") - b4 = visitors.cloned_traverse(b2, {}, {'binary': visit_binary}) + b4 = visitors.cloned_traverse(b2, {}, {"binary": visit_binary}) assert str(b4) == ":bar = table1.col2" - b5 = visitors.cloned_traverse(b3, {}, {'binary': visit_binary}) + b5 = visitors.cloned_traverse(b3, {}, {"binary": visit_binary}) assert str(b5) == ":bar = table1.col2" def test_label_accessors(self): - t1 = table('t1', column('c1')) + t1 = table("t1", column("c1")) l1 = t1.c.c1.label(None) is_(l1._order_by_label_element, l1) l1a = l1._annotate({"foo": "bar"}) is_(l1a._order_by_label_element, l1a) def test_annotate_aliased(self): - t1 = table('t1', column('c1')) - s = select([(t1.c.c1 + 3).label('bat')]) + t1 = table("t1", column("c1")) + s = select([(t1.c.c1 + 3).label("bat")]) a = s.alias() - a = sql_util._deep_annotate(a, {'foo': 'bar'}) - eq_(a._annotations['foo'], 'bar') - eq_(a.element._annotations['foo'], 'bar') + a = sql_util._deep_annotate(a, {"foo": "bar"}) + eq_(a._annotations["foo"], "bar") + eq_(a.element._annotations["foo"], "bar") def test_annotate_expressions(self): - table1 = table('table1', column('col1'), column('col2')) - for expr, expected in [(table1.c.col1, 'table1.col1'), - (table1.c.col1 == 5, - 'table1.col1 = :col1_1'), - (table1.c.col1.in_([2, 3, 4]), - 'table1.col1 IN (:col1_1, :col1_2, ' - ':col1_3)')]: + table1 = table("table1", column("col1"), column("col2")) + for expr, expected in [ + (table1.c.col1, "table1.col1"), + (table1.c.col1 == 5, "table1.col1 = :col1_1"), + ( + table1.c.col1.in_([2, 3, 4]), + "table1.col1 IN (:col1_1, :col1_2, " ":col1_3)", + ), + ]: eq_(str(expr), expected) eq_(str(expr._annotate({})), expected) eq_(str(sql_util._deep_annotate(expr, {})), expected) - eq_(str(sql_util._deep_annotate( - expr, {}, exclude=[table1.c.col1])), expected) + eq_( + str( + sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1]) + ), + expected, + ) def test_deannotate(self): - table1 = table('table1', column("col1"), column("col2")) + table1 = table("table1", column("col1"), column("col2")) - bin = table1.c.col1 == bindparam('foo', value=None) + bin = table1.c.col1 == bindparam("foo", value=None) - b2 = sql_util._deep_annotate(bin, {'_orm_adapt': True}) + b2 = sql_util._deep_annotate(bin, {"_orm_adapt": True}) b3 = sql_util._deep_deannotate(b2) b4 = sql_util._deep_deannotate(bin) for elem in (b2._annotations, b2.left._annotations): - assert '_orm_adapt' in elem + assert "_orm_adapt" in elem - for elem in b3._annotations, b3.left._annotations, \ - b4._annotations, b4.left._annotations: + for elem in ( + b3._annotations, + b3.left._annotations, + b4._annotations, + b4.left._annotations, + ): assert elem == {} assert b2.left is not bin.left assert b3.left is not b2.left and b2.left is not bin.left assert b4.left is bin.left # since column is immutable # deannotate copies the element - assert bin.right is not b2.right and b2.right is not b3.right \ + assert ( + bin.right is not b2.right + and b2.right is not b3.right and b3.right is not b4.right + ) def test_annotate_unique_traversal(self): """test that items are copied only once during @@ -1864,16 +2078,14 @@ class AnnotationsTest(fixtures.TestBase): case now, as deannotate is making clones again in some cases. """ - table1 = table('table1', column('x')) - table2 = table('table2', column('y')) + table1 = table("table1", column("x")) + table2 = table("table2", column("y")) a1 = table1.alias() - s = select([a1.c.x]).select_from( - a1.join(table2, a1.c.x == table2.c.y) - ) + s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y)) for sel in ( sql_util._deep_deannotate(s), visitors.cloned_traverse(s, {}, {}), - visitors.replacement_traverse(s, {}, lambda x: None) + visitors.replacement_traverse(s, {}, lambda x: None), ): # the columns clause isn't changed at all assert sel._raw_columns[0].table is a1 @@ -1886,7 +2098,7 @@ class AnnotationsTest(fixtures.TestBase): # when encountered. for sel in ( sql_util._deep_deannotate(s, {"foo": "bar"}), - sql_util._deep_annotate(s, {'foo': 'bar'}), + sql_util._deep_annotate(s, {"foo": "bar"}), ): assert sel._froms[0] is not sel._froms[1].left @@ -1899,7 +2111,7 @@ class AnnotationsTest(fixtures.TestBase): preserving them when deep_annotate is run on them. """ - t1 = table('table1', column("col1"), column("col2")) + t1 = table("table1", column("col1"), column("col2")) s = select([t1.c.col1._annotate({"foo": "bar"})]) s2 = select([t1.c.col1._annotate({"bat": "hoho"})]) s3 = s.union(s2) @@ -1907,42 +2119,40 @@ class AnnotationsTest(fixtures.TestBase): eq_( sel.selects[0]._raw_columns[0]._annotations, - {"foo": "bar", "new": "thing"} + {"foo": "bar", "new": "thing"}, ) eq_( sel.selects[1]._raw_columns[0]._annotations, - {"bat": "hoho", "new": "thing"} + {"bat": "hoho", "new": "thing"}, ) def test_deannotate_2(self): - table1 = table('table1', column("col1"), column("col2")) - j = table1.c.col1._annotate({"remote": True}) == \ - table1.c.col2._annotate({"local": True}) + table1 = table("table1", column("col1"), column("col2")) + j = table1.c.col1._annotate( + {"remote": True} + ) == table1.c.col2._annotate({"local": True}) j2 = sql_util._deep_deannotate(j) - eq_( - j.left._annotations, {"remote": True} - ) - eq_( - j2.left._annotations, {} - ) + eq_(j.left._annotations, {"remote": True}) + eq_(j2.left._annotations, {}) def test_deannotate_3(self): - table1 = table('table1', column("col1"), column("col2"), - column("col3"), column("col4")) + table1 = table( + "table1", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) j = and_( - table1.c.col1._annotate({"remote": True}) == - table1.c.col2._annotate({"local": True}), - table1.c.col3._annotate({"remote": True}) == - table1.c.col4._annotate({"local": True}) + table1.c.col1._annotate({"remote": True}) + == table1.c.col2._annotate({"local": True}), + table1.c.col3._annotate({"remote": True}) + == table1.c.col4._annotate({"local": True}), ) j2 = sql_util._deep_deannotate(j) - eq_( - j.clauses[0].left._annotations, {"remote": True} - ) - eq_( - j2.clauses[0].left._annotations, {} - ) + eq_(j.clauses[0].left._annotations, {"remote": True}) + eq_(j2.clauses[0].left._annotations, {}) def test_annotate_fromlist_preservation(self): """test the FROM list in select still works @@ -1952,37 +2162,34 @@ class AnnotationsTest(fixtures.TestBase): #2453, continued """ - table1 = table('table1', column('x')) - table2 = table('table2', column('y')) + table1 = table("table1", column("x")) + table2 = table("table2", column("y")) a1 = table1.alias() - s = select([a1.c.x]).select_from( - a1.join(table2, a1.c.x == table2.c.y) - ) + s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y)) assert_s = select([select([s])]) for fn in ( sql_util._deep_deannotate, - lambda s: sql_util._deep_annotate(s, {'foo': 'bar'}), + lambda s: sql_util._deep_annotate(s, {"foo": "bar"}), lambda s: visitors.cloned_traverse(s, {}, {}), - lambda s: visitors.replacement_traverse(s, {}, lambda x: None) + lambda s: visitors.replacement_traverse(s, {}, lambda x: None), ): sel = fn(select([fn(select([fn(s)]))])) eq_(str(assert_s), str(sel)) def test_bind_unique_test(self): - table('t', column('a'), column('b')) + table("t", column("a"), column("b")) b = bindparam("bind", value="x", unique=True) # the annotation of "b" should render the # same. The "unique" test in compiler should # also pass, [ticket:2425] - eq_(str(or_(b, b._annotate({"foo": "bar"}))), - ":bind_1 OR :bind_1") + eq_(str(or_(b, b._annotate({"foo": "bar"}))), ":bind_1 OR :bind_1") def test_comparators_cleaned_out_construction(self): - c = column('a') + c = column("a") comp1 = c.comparator @@ -1991,7 +2198,7 @@ class AnnotationsTest(fixtures.TestBase): assert comp1 is not comp2 def test_comparators_cleaned_out_reannotate(self): - c = column('a') + c = column("a") c1 = c._annotate({"foo": "bar"}) comp1 = c1.comparator @@ -2002,7 +2209,7 @@ class AnnotationsTest(fixtures.TestBase): assert comp1 is not comp2 def test_comparator_cleanout_integration(self): - c = column('a') + c = column("a") c1 = c._annotate({"foo": "bar"}) comp1 = c1.comparator @@ -2018,8 +2225,8 @@ class ReprTest(fixtures.TestBase): for obj in [ elements.Cast(1, 2), elements.TypeClause(String()), - elements.ColumnClause('x'), - elements.BindParameter('q'), + elements.ColumnClause("x"), + elements.BindParameter("q"), elements.Null(), elements.True_(), elements.False_(), @@ -2027,22 +2234,21 @@ class ReprTest(fixtures.TestBase): elements.BooleanClauseList.and_(), elements.Tuple(), elements.Case([]), - elements.Extract('foo', column('x')), - elements.UnaryExpression(column('x')), - elements.Grouping(column('x')), + elements.Extract("foo", column("x")), + elements.UnaryExpression(column("x")), + elements.Grouping(column("x")), elements.Over(func.foo()), - elements.Label('q', column('x')), + elements.Label("q", column("x")), ]: repr(obj) class WithLabelsTest(fixtures.TestBase): - def _assert_labels_warning(self, s): assert_raises_message( exc.SAWarning, r"replaced by Column.*, which has the same key", - lambda: s.c + lambda: s.c, ) def _assert_result_keys(self, s, keys): @@ -2055,147 +2261,128 @@ class WithLabelsTest(fixtures.TestBase): def _names_overlap(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer)) - t2 = Table('t2', m, Column('x', Integer)) + t1 = Table("t1", m, Column("x", Integer)) + t2 = Table("t2", m, Column("x", Integer)) return select([t1, t2]) def test_names_overlap_nolabel(self): sel = self._names_overlap() self._assert_labels_warning(sel) - self._assert_result_keys(sel, ['x']) + self._assert_result_keys(sel, ["x"]) def test_names_overlap_label(self): sel = self._names_overlap().apply_labels() - eq_( - list(sel.c.keys()), - ['t1_x', 't2_x'] - ) - self._assert_result_keys(sel, ['t1_x', 't2_x']) + eq_(list(sel.c.keys()), ["t1_x", "t2_x"]) + self._assert_result_keys(sel, ["t1_x", "t2_x"]) def _names_overlap_keys_dont(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer, key='a')) - t2 = Table('t2', m, Column('x', Integer, key='b')) + t1 = Table("t1", m, Column("x", Integer, key="a")) + t2 = Table("t2", m, Column("x", Integer, key="b")) return select([t1, t2]) def test_names_overlap_keys_dont_nolabel(self): sel = self._names_overlap_keys_dont() - eq_( - list(sel.c.keys()), - ['a', 'b'] - ) - self._assert_result_keys(sel, ['x']) + eq_(list(sel.c.keys()), ["a", "b"]) + self._assert_result_keys(sel, ["x"]) def test_names_overlap_keys_dont_label(self): sel = self._names_overlap_keys_dont().apply_labels() - eq_( - list(sel.c.keys()), - ['t1_a', 't2_b'] - ) - self._assert_result_keys(sel, ['t1_x', 't2_x']) + eq_(list(sel.c.keys()), ["t1_a", "t2_b"]) + self._assert_result_keys(sel, ["t1_x", "t2_x"]) def _labels_overlap(self): m = MetaData() - t1 = Table('t', m, Column('x_id', Integer)) - t2 = Table('t_x', m, Column('id', Integer)) + t1 = Table("t", m, Column("x_id", Integer)) + t2 = Table("t_x", m, Column("id", Integer)) return select([t1, t2]) def test_labels_overlap_nolabel(self): sel = self._labels_overlap() - eq_( - list(sel.c.keys()), - ['x_id', 'id'] - ) - self._assert_result_keys(sel, ['x_id', 'id']) + eq_(list(sel.c.keys()), ["x_id", "id"]) + self._assert_result_keys(sel, ["x_id", "id"]) def test_labels_overlap_label(self): sel = self._labels_overlap().apply_labels() t2 = sel.froms[1] - eq_( - list(sel.c.keys()), - ['t_x_id', t2.c.id.anon_label] - ) - self._assert_result_keys(sel, ['t_x_id', 'id_1']) - self._assert_subq_result_keys(sel, ['t_x_id', 'id_1']) + eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label]) + self._assert_result_keys(sel, ["t_x_id", "id_1"]) + self._assert_subq_result_keys(sel, ["t_x_id", "id_1"]) def _labels_overlap_keylabels_dont(self): m = MetaData() - t1 = Table('t', m, Column('x_id', Integer, key='a')) - t2 = Table('t_x', m, Column('id', Integer, key='b')) + t1 = Table("t", m, Column("x_id", Integer, key="a")) + t2 = Table("t_x", m, Column("id", Integer, key="b")) return select([t1, t2]) def test_labels_overlap_keylabels_dont_nolabel(self): sel = self._labels_overlap_keylabels_dont() - eq_(list(sel.c.keys()), ['a', 'b']) - self._assert_result_keys(sel, ['x_id', 'id']) + eq_(list(sel.c.keys()), ["a", "b"]) + self._assert_result_keys(sel, ["x_id", "id"]) def test_labels_overlap_keylabels_dont_label(self): sel = self._labels_overlap_keylabels_dont().apply_labels() - eq_(list(sel.c.keys()), ['t_a', 't_x_b']) - self._assert_result_keys(sel, ['t_x_id', 'id_1']) + eq_(list(sel.c.keys()), ["t_a", "t_x_b"]) + self._assert_result_keys(sel, ["t_x_id", "id_1"]) def _keylabels_overlap_labels_dont(self): m = MetaData() - t1 = Table('t', m, Column('a', Integer, key='x_id')) - t2 = Table('t_x', m, Column('b', Integer, key='id')) + t1 = Table("t", m, Column("a", Integer, key="x_id")) + t2 = Table("t_x", m, Column("b", Integer, key="id")) return select([t1, t2]) def test_keylabels_overlap_labels_dont_nolabel(self): sel = self._keylabels_overlap_labels_dont() - eq_(list(sel.c.keys()), ['x_id', 'id']) - self._assert_result_keys(sel, ['a', 'b']) + eq_(list(sel.c.keys()), ["x_id", "id"]) + self._assert_result_keys(sel, ["a", "b"]) def test_keylabels_overlap_labels_dont_label(self): sel = self._keylabels_overlap_labels_dont().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ['t_x_id', t2.c.id.anon_label]) - self._assert_result_keys(sel, ['t_a', 't_x_b']) - self._assert_subq_result_keys(sel, ['t_a', 't_x_b']) + eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label]) + self._assert_result_keys(sel, ["t_a", "t_x_b"]) + self._assert_subq_result_keys(sel, ["t_a", "t_x_b"]) def _keylabels_overlap_labels_overlap(self): m = MetaData() - t1 = Table('t', m, Column('x_id', Integer, key='x_a')) - t2 = Table('t_x', m, Column('id', Integer, key='a')) + t1 = Table("t", m, Column("x_id", Integer, key="x_a")) + t2 = Table("t_x", m, Column("id", Integer, key="a")) return select([t1, t2]) def test_keylabels_overlap_labels_overlap_nolabel(self): sel = self._keylabels_overlap_labels_overlap() - eq_(list(sel.c.keys()), ['x_a', 'a']) - self._assert_result_keys(sel, ['x_id', 'id']) - self._assert_subq_result_keys(sel, ['x_id', 'id']) + eq_(list(sel.c.keys()), ["x_a", "a"]) + self._assert_result_keys(sel, ["x_id", "id"]) + self._assert_subq_result_keys(sel, ["x_id", "id"]) def test_keylabels_overlap_labels_overlap_label(self): sel = self._keylabels_overlap_labels_overlap().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ['t_x_a', t2.c.a.anon_label]) - self._assert_result_keys(sel, ['t_x_id', 'id_1']) - self._assert_subq_result_keys(sel, ['t_x_id', 'id_1']) + eq_(list(sel.c.keys()), ["t_x_a", t2.c.a.anon_label]) + self._assert_result_keys(sel, ["t_x_id", "id_1"]) + self._assert_subq_result_keys(sel, ["t_x_id", "id_1"]) def _keys_overlap_names_dont(self): m = MetaData() - t1 = Table('t1', m, Column('a', Integer, key='x')) - t2 = Table('t2', m, Column('b', Integer, key='x')) + t1 = Table("t1", m, Column("a", Integer, key="x")) + t2 = Table("t2", m, Column("b", Integer, key="x")) return select([t1, t2]) def test_keys_overlap_names_dont_nolabel(self): sel = self._keys_overlap_names_dont() self._assert_labels_warning(sel) - self._assert_result_keys(sel, ['a', 'b']) + self._assert_result_keys(sel, ["a", "b"]) def test_keys_overlap_names_dont_label(self): sel = self._keys_overlap_names_dont().apply_labels() - eq_( - list(sel.c.keys()), - ['t1_x', 't2_x'] - ) - self._assert_result_keys(sel, ['t1_a', 't2_b']) + eq_(list(sel.c.keys()), ["t1_x", "t2_x"]) + self._assert_result_keys(sel, ["t1_a", "t2_b"]) class ResultMapTest(fixtures.TestBase): - def _fixture(self): m = MetaData() - t = Table('t', m, Column('x', Integer), Column('y', Integer)) + t = Table("t", m, Column("x", Integer), Column("y", Integer)) return t def _mapping(self, stmt): @@ -2208,7 +2395,7 @@ class ResultMapTest(fixtures.TestBase): def test_select_label_alt_name(self): t = self._fixture() - l1, l2 = t.c.x.label('a'), t.c.y.label('b') + l1, l2 = t.c.x.label("a"), t.c.y.label("b") s = select([l1, l2]) mapping = self._mapping(s) assert l1 in mapping @@ -2217,7 +2404,7 @@ class ResultMapTest(fixtures.TestBase): def test_select_alias_label_alt_name(self): t = self._fixture() - l1, l2 = t.c.x.label('a'), t.c.y.label('b') + l1, l2 = t.c.x.label("a"), t.c.y.label("b") s = select([l1, l2]).alias() mapping = self._mapping(s) assert l1 in mapping @@ -2253,7 +2440,7 @@ class ResultMapTest(fixtures.TestBase): x, y = t.c.x, t.c.y ta = t.alias() - l1, l2 = ta.c.x.label('a'), ta.c.y.label('b') + l1, l2 = ta.c.x.label("a"), ta.c.y.label("b") s = select([l1, l2]) mapping = self._mapping(s) @@ -2268,34 +2455,40 @@ class ResultMapTest(fixtures.TestBase): assert t.c.x not in mapping eq_( [type(entry[-1]) for entry in s.compile()._result_columns], - [Boolean] + [Boolean], ) def test_plain_exists(self): expr = exists([1]) eq_(type(expr.type), Boolean) eq_( - [type(entry[-1]) for - entry in select([expr]).compile()._result_columns], - [Boolean] + [ + type(entry[-1]) + for entry in select([expr]).compile()._result_columns + ], + [Boolean], ) def test_plain_exists_negate(self): expr = ~exists([1]) eq_(type(expr.type), Boolean) eq_( - [type(entry[-1]) for - entry in select([expr]).compile()._result_columns], - [Boolean] + [ + type(entry[-1]) + for entry in select([expr]).compile()._result_columns + ], + [Boolean], ) def test_plain_exists_double_negate(self): expr = ~(~exists([1])) eq_(type(expr.type), Boolean) eq_( - [type(entry[-1]) for - entry in select([expr]).compile()._result_columns], - [Boolean] + [ + type(entry[-1]) + for entry in select([expr]).compile()._result_columns + ], + [Boolean], ) def test_column_subquery_plain(self): @@ -2307,7 +2500,7 @@ class ResultMapTest(fixtures.TestBase): assert s1 in mapping eq_( [type(entry[-1]) for entry in s2.compile()._result_columns], - [Integer] + [Integer], ) def test_unary_boolean(self): @@ -2315,7 +2508,7 @@ class ResultMapTest(fixtures.TestBase): s1 = select([not_(True)], use_labels=True) eq_( [type(entry[-1]) for entry in s1.compile()._result_columns], - [Boolean] + [Boolean], ) @@ -2323,19 +2516,15 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _assert_legacy(self, leg, read=False, nowait=False): - t = table('t', column('c')) + t = table("t", column("c")) s1 = select([t], for_update=leg) if leg is False: assert s1._for_update_arg is None assert s1.for_update is None else: - eq_( - s1._for_update_arg.read, read - ) - eq_( - s1._for_update_arg.nowait, nowait - ) + eq_(s1._for_update_arg.read, read) + eq_(s1._for_update_arg.nowait, nowait) eq_(s1.for_update, leg) def test_false_legacy(self): @@ -2354,28 +2543,30 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): self._assert_legacy("read_nowait", read=True, nowait=True) def test_legacy_setter(self): - t = table('t', column('c')) + t = table("t", column("c")) s = select([t]) - s.for_update = 'nowait' + s.for_update = "nowait" eq_(s._for_update_arg.nowait, True) def test_basic_clone(self): - t = table('t', column('c')) + t = table("t", column("c")) s = select([t]).with_for_update(read=True, of=t.c.c) s2 = visitors.ReplacingCloningVisitor().traverse(s) assert s2._for_update_arg is not s._for_update_arg eq_(s2._for_update_arg.read, True) eq_(s2._for_update_arg.of, [t.c.c]) - self.assert_compile(s2, - "SELECT t.c FROM t FOR SHARE OF t", - dialect="postgresql") + self.assert_compile( + s2, "SELECT t.c FROM t FOR SHARE OF t", dialect="postgresql" + ) def test_adapt(self): - t = table('t', column('c')) + t = table("t", column("c")) s = select([t]).with_for_update(read=True, of=t.c.c) a = t.alias() s2 = sql_util.ClauseAdapter(a).traverse(s) eq_(s2._for_update_arg.of, [a.c.c]) - self.assert_compile(s2, - "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", - dialect="postgresql") + self.assert_compile( + s2, + "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", + dialect="postgresql", + ) |
