diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2022-06-08 17:13:28 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-06-08 17:13:28 +0000 |
| commit | 73f867d996450fb4fcb0918d3d147ce170ca5fe7 (patch) | |
| tree | 8104e9967cf7cc56c0bd39de61ebdd57afa12b4f /test | |
| parent | 258d07e478eab9ee385f36a5c5fd56c56bf7dfea (diff) | |
| parent | 93bc7ed534f12934528c0cbf5489417ddc025e40 (diff) | |
| download | sqlalchemy-73f867d996450fb4fcb0918d3d147ce170ca5fe7.tar.gz | |
Merge "graceful degrade for FKs not reflectable" into main
Diffstat (limited to 'test')
| -rw-r--r-- | test/engine/test_reflection.py | 196 | ||||
| -rw-r--r-- | test/sql/test_selectable.py | 62 |
2 files changed, 212 insertions, 46 deletions
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index d2e3d5f40..76099e863 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -12,6 +12,7 @@ from sqlalchemy import inspect from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import schema +from sqlalchemy import select from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import testing @@ -23,6 +24,7 @@ from sqlalchemy.testing import ComparesTables from sqlalchemy.testing import config from sqlalchemy.testing import eq_ from sqlalchemy.testing import eq_regex +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ @@ -252,44 +254,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): ) assert "nonexistent" not in meta.tables - def test_include_columns(self, connection, metadata): - meta = metadata - foo = Table( - "foo", - meta, - *[ - Column(n, sa.String(30)) - for n in ["a", "b", "c", "d", "e", "f"] - ], - ) - meta.create_all(connection) - meta2 = MetaData() - foo = Table( - "foo", - meta2, - autoload_with=connection, - include_columns=["b", "f", "e"], - ) - # test that cols come back in original order - eq_([c.name for c in foo.c], ["b", "e", "f"]) - for c in ("b", "f", "e"): - assert c in foo.c - for c in ("a", "c", "d"): - assert c not in foo.c - - # test against a table which is already reflected - meta3 = MetaData() - foo = Table("foo", meta3, autoload_with=connection) - - foo = Table( - "foo", meta3, include_columns=["b", "f", "e"], extend_existing=True - ) - eq_([c.name for c in foo.c], ["b", "e", "f"]) - for c in ("b", "f", "e"): - assert c in foo.c - for c in ("a", "c", "d"): - assert c not in foo.c - def test_extend_existing(self, connection, metadata): meta = metadata @@ -2237,3 +2201,159 @@ class IdentityColumnTest(fixtures.TablesTest): is_true(table.c.id1.identity is not None) eq_(table.c.id1.identity.start, 2) eq_(table.c.id1.identity.increment, 3) + + +class IncludeColsFksTest(AssertsCompiledSQL, fixtures.TestBase): + __dialect__ = "default" + + @testing.fixture + def tab_wo_fks(self, connection, metadata): + meta = metadata + foo = Table( + "foo", + meta, + *[ + Column(n, sa.String(30)) + for n in ["a", "b", "c", "d", "e", "f"] + ], + ) + meta.create_all(connection) + + return foo + + @testing.fixture + def tab_w_fks(self, connection, metadata): + Table( + "a", + metadata, + Column("x", Integer, primary_key=True), + test_needs_fk=True, + ) + + b = Table( + "b", + metadata, + Column("x", Integer, primary_key=True), + Column("q", Integer), + Column("p", Integer), + Column("r", Integer, ForeignKey("a.x")), + Column("s", Integer), + Column("t", Integer), + test_needs_fk=True, + ) + + metadata.create_all(connection) + + return b + + def test_include_columns(self, connection, tab_wo_fks): + foo = tab_wo_fks + meta2 = MetaData() + foo = Table( + "foo", + meta2, + autoload_with=connection, + include_columns=["b", "f", "e"], + ) + # test that cols come back in original order + eq_([c.name for c in foo.c], ["b", "e", "f"]) + for c in ("b", "f", "e"): + assert c in foo.c + for c in ("a", "c", "d"): + assert c not in foo.c + + # test against a table which is already reflected + meta3 = MetaData() + foo = Table("foo", meta3, autoload_with=connection) + + foo = Table( + "foo", meta3, include_columns=["b", "f", "e"], extend_existing=True + ) + eq_([c.name for c in foo.c], ["b", "e", "f"]) + for c in ("b", "f", "e"): + assert c in foo.c + for c in ("a", "c", "d"): + assert c not in foo.c + + @testing.emits_warning + @testing.combinations(True, False, argnames="resolve_fks") + def test_include_cols_skip_fk_col( + self, connection, tab_w_fks, resolve_fks + ): + """test #8100""" + + m2 = MetaData() + + b2 = Table( + "b", + m2, + autoload_with=connection, + resolve_fks=resolve_fks, + include_columns=["x", "q", "p"], + ) + + eq_([c.name for c in b2.c], ["x", "q", "p"]) + + # no FK, whether or not resolve_fks was called + eq_(b2.constraints, set((b2.primary_key,))) + + b2a = b2.alias() + eq_([c.name for c in b2a.c], ["x", "q", "p"]) + + self.assert_compile(select(b2), "SELECT b.x, b.q, b.p FROM b") + self.assert_compile( + select(b2.alias()), + "SELECT b_1.x, b_1.q, b_1.p FROM b AS b_1", + ) + + def test_table_works_minus_fks(self, connection, tab_w_fks): + """test #8101""" + + m2 = MetaData() + + b2 = Table( + "b", + m2, + autoload_with=connection, + resolve_fks=False, + ) + + eq_([c.name for c in b2.c], ["x", "q", "p", "r", "s", "t"]) + + b2a = b2.alias() + eq_([c.name for c in b2a.c], ["x", "q", "p", "r", "s", "t"]) + + self.assert_compile( + select(b2), "SELECT b.x, b.q, b.p, b.r, b.s, b.t FROM b" + ) + b2a_1 = b2.alias() + self.assert_compile( + select(b2a_1), + "SELECT b_1.x, b_1.q, b_1.p, b_1.r, b_1.s, b_1.t FROM b AS b_1", + ) + + # reflecting the related table + a2 = Table("a", m2, autoload_with=connection) + + # the existing alias doesn't know about it + with expect_raises_message( + sa.exc.InvalidRequestError, + "Foreign key associated with column 'anon_1.r' could not find " + "table 'a' with which to generate a foreign key to target " + "column 'x'", + ): + select(b2a_1).join(a2).compile() + + # can still join manually (needed to fix inside of util for this...) + self.assert_compile( + select(b2a_1).join(a2, b2a_1.c.r == a2.c.x), + "SELECT b_1.x, b_1.q, b_1.p, b_1.r, b_1.s, b_1.t " + "FROM b AS b_1 JOIN a ON b_1.r = a.x", + ) + + # a new alias does know about it however + self.assert_compile( + select(b2.alias()).join(a2), + "SELECT b_1.x, b_1.q, b_1.p, b_1.r, b_1.s, b_1.t " + "FROM b AS b_1 JOIN a ON a.x = b_1.r", + ) diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index d05d7ad8b..3ecbfca27 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1478,21 +1478,67 @@ class SelectableTest( assert j4.corresponding_column(j2.c.aid) is j4.c.aid assert j4.corresponding_column(a.c.id) is j4.c.id - def test_two_metadata_join_raises(self): + @testing.combinations(True, False) + def test_two_metadata_join_raises(self, include_a_joining_table): + """test case from 2008 enhanced as of #8101, more specific failure + modes for non-resolvable FKs + + """ m = MetaData() m2 = MetaData() t1 = Table("t1", m, Column("id", Integer), Column("id2", Integer)) - t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id"))) + + if include_a_joining_table: + t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id"))) + t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2"))) - s = ( - select(t2, t3) - .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) - .subquery() - ) + with expect_raises_message( + exc.NoReferencedTableError, + "Foreign key associated with column 't3.id'", + ): + t3.join(t1) - assert_raises(exc.NoReferencedTableError, s.join, t1) + if include_a_joining_table: + s = ( + select(t2, t3) + .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) + .subquery() + ) + else: + s = ( + select(t3) + .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) + .subquery() + ) + + with expect_raises_message( + exc.NoReferencedTableError, + "Foreign key associated with column 'anon_1.t3_id' could not " + "find table 't1' with which to generate a foreign key to target " + "column 'id2'", + ): + select(s.join(t1)), + + # manual join is OK. using select().join() here is also exercising + # that join() does not need to resolve FKs if we provided the + # ON clause + if include_a_joining_table: + self.assert_compile( + select(s).join( + t1, and_(s.c.t2_id == t1.c.id, s.c.t3_id == t1.c.id) + ), + "SELECT anon_1.t2_id, anon_1.t3_id FROM (SELECT " + "t2.id AS t2_id, t3.id AS t3_id FROM t2, t3) AS anon_1 " + "JOIN t1 ON anon_1.t2_id = t1.id AND anon_1.t3_id = t1.id", + ) + else: + self.assert_compile( + select(s).join(t1, s.c.t3_id == t1.c.id), + "SELECT anon_1.t3_id FROM (SELECT t3.id AS t3_id FROM t3) " + "AS anon_1 JOIN t1 ON anon_1.t3_id = t1.id", + ) def test_multi_label_chain_naming_col(self): # See [ticket:2167] for this one. |
