summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2022-06-08 17:13:28 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-06-08 17:13:28 +0000
commit73f867d996450fb4fcb0918d3d147ce170ca5fe7 (patch)
tree8104e9967cf7cc56c0bd39de61ebdd57afa12b4f /test
parent258d07e478eab9ee385f36a5c5fd56c56bf7dfea (diff)
parent93bc7ed534f12934528c0cbf5489417ddc025e40 (diff)
downloadsqlalchemy-73f867d996450fb4fcb0918d3d147ce170ca5fe7.tar.gz
Merge "graceful degrade for FKs not reflectable" into main
Diffstat (limited to 'test')
-rw-r--r--test/engine/test_reflection.py196
-rw-r--r--test/sql/test_selectable.py62
2 files changed, 212 insertions, 46 deletions
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index d2e3d5f40..76099e863 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -12,6 +12,7 @@ from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import schema
+from sqlalchemy import select
from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
@@ -23,6 +24,7 @@ from sqlalchemy.testing import ComparesTables
from sqlalchemy.testing import config
from sqlalchemy.testing import eq_
from sqlalchemy.testing import eq_regex
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
@@ -252,44 +254,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
)
assert "nonexistent" not in meta.tables
- def test_include_columns(self, connection, metadata):
- meta = metadata
- foo = Table(
- "foo",
- meta,
- *[
- Column(n, sa.String(30))
- for n in ["a", "b", "c", "d", "e", "f"]
- ],
- )
- meta.create_all(connection)
- meta2 = MetaData()
- foo = Table(
- "foo",
- meta2,
- autoload_with=connection,
- include_columns=["b", "f", "e"],
- )
- # test that cols come back in original order
- eq_([c.name for c in foo.c], ["b", "e", "f"])
- for c in ("b", "f", "e"):
- assert c in foo.c
- for c in ("a", "c", "d"):
- assert c not in foo.c
-
- # test against a table which is already reflected
- meta3 = MetaData()
- foo = Table("foo", meta3, autoload_with=connection)
-
- foo = Table(
- "foo", meta3, include_columns=["b", "f", "e"], extend_existing=True
- )
- eq_([c.name for c in foo.c], ["b", "e", "f"])
- for c in ("b", "f", "e"):
- assert c in foo.c
- for c in ("a", "c", "d"):
- assert c not in foo.c
-
def test_extend_existing(self, connection, metadata):
meta = metadata
@@ -2237,3 +2201,159 @@ class IdentityColumnTest(fixtures.TablesTest):
is_true(table.c.id1.identity is not None)
eq_(table.c.id1.identity.start, 2)
eq_(table.c.id1.identity.increment, 3)
+
+
+class IncludeColsFksTest(AssertsCompiledSQL, fixtures.TestBase):
+ __dialect__ = "default"
+
+ @testing.fixture
+ def tab_wo_fks(self, connection, metadata):
+ meta = metadata
+ foo = Table(
+ "foo",
+ meta,
+ *[
+ Column(n, sa.String(30))
+ for n in ["a", "b", "c", "d", "e", "f"]
+ ],
+ )
+ meta.create_all(connection)
+
+ return foo
+
+ @testing.fixture
+ def tab_w_fks(self, connection, metadata):
+ Table(
+ "a",
+ metadata,
+ Column("x", Integer, primary_key=True),
+ test_needs_fk=True,
+ )
+
+ b = Table(
+ "b",
+ metadata,
+ Column("x", Integer, primary_key=True),
+ Column("q", Integer),
+ Column("p", Integer),
+ Column("r", Integer, ForeignKey("a.x")),
+ Column("s", Integer),
+ Column("t", Integer),
+ test_needs_fk=True,
+ )
+
+ metadata.create_all(connection)
+
+ return b
+
+ def test_include_columns(self, connection, tab_wo_fks):
+ foo = tab_wo_fks
+ meta2 = MetaData()
+ foo = Table(
+ "foo",
+ meta2,
+ autoload_with=connection,
+ include_columns=["b", "f", "e"],
+ )
+ # test that cols come back in original order
+ eq_([c.name for c in foo.c], ["b", "e", "f"])
+ for c in ("b", "f", "e"):
+ assert c in foo.c
+ for c in ("a", "c", "d"):
+ assert c not in foo.c
+
+ # test against a table which is already reflected
+ meta3 = MetaData()
+ foo = Table("foo", meta3, autoload_with=connection)
+
+ foo = Table(
+ "foo", meta3, include_columns=["b", "f", "e"], extend_existing=True
+ )
+ eq_([c.name for c in foo.c], ["b", "e", "f"])
+ for c in ("b", "f", "e"):
+ assert c in foo.c
+ for c in ("a", "c", "d"):
+ assert c not in foo.c
+
+ @testing.emits_warning
+ @testing.combinations(True, False, argnames="resolve_fks")
+ def test_include_cols_skip_fk_col(
+ self, connection, tab_w_fks, resolve_fks
+ ):
+ """test #8100"""
+
+ m2 = MetaData()
+
+ b2 = Table(
+ "b",
+ m2,
+ autoload_with=connection,
+ resolve_fks=resolve_fks,
+ include_columns=["x", "q", "p"],
+ )
+
+ eq_([c.name for c in b2.c], ["x", "q", "p"])
+
+ # no FK, whether or not resolve_fks was called
+ eq_(b2.constraints, set((b2.primary_key,)))
+
+ b2a = b2.alias()
+ eq_([c.name for c in b2a.c], ["x", "q", "p"])
+
+ self.assert_compile(select(b2), "SELECT b.x, b.q, b.p FROM b")
+ self.assert_compile(
+ select(b2.alias()),
+ "SELECT b_1.x, b_1.q, b_1.p FROM b AS b_1",
+ )
+
+ def test_table_works_minus_fks(self, connection, tab_w_fks):
+ """test #8101"""
+
+ m2 = MetaData()
+
+ b2 = Table(
+ "b",
+ m2,
+ autoload_with=connection,
+ resolve_fks=False,
+ )
+
+ eq_([c.name for c in b2.c], ["x", "q", "p", "r", "s", "t"])
+
+ b2a = b2.alias()
+ eq_([c.name for c in b2a.c], ["x", "q", "p", "r", "s", "t"])
+
+ self.assert_compile(
+ select(b2), "SELECT b.x, b.q, b.p, b.r, b.s, b.t FROM b"
+ )
+ b2a_1 = b2.alias()
+ self.assert_compile(
+ select(b2a_1),
+ "SELECT b_1.x, b_1.q, b_1.p, b_1.r, b_1.s, b_1.t FROM b AS b_1",
+ )
+
+ # reflecting the related table
+ a2 = Table("a", m2, autoload_with=connection)
+
+ # the existing alias doesn't know about it
+ with expect_raises_message(
+ sa.exc.InvalidRequestError,
+ "Foreign key associated with column 'anon_1.r' could not find "
+ "table 'a' with which to generate a foreign key to target "
+ "column 'x'",
+ ):
+ select(b2a_1).join(a2).compile()
+
+ # can still join manually (needed to fix inside of util for this...)
+ self.assert_compile(
+ select(b2a_1).join(a2, b2a_1.c.r == a2.c.x),
+ "SELECT b_1.x, b_1.q, b_1.p, b_1.r, b_1.s, b_1.t "
+ "FROM b AS b_1 JOIN a ON b_1.r = a.x",
+ )
+
+ # a new alias does know about it however
+ self.assert_compile(
+ select(b2.alias()).join(a2),
+ "SELECT b_1.x, b_1.q, b_1.p, b_1.r, b_1.s, b_1.t "
+ "FROM b AS b_1 JOIN a ON a.x = b_1.r",
+ )
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index d05d7ad8b..3ecbfca27 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -1478,21 +1478,67 @@ class SelectableTest(
assert j4.corresponding_column(j2.c.aid) is j4.c.aid
assert j4.corresponding_column(a.c.id) is j4.c.id
- def test_two_metadata_join_raises(self):
+ @testing.combinations(True, False)
+ def test_two_metadata_join_raises(self, include_a_joining_table):
+ """test case from 2008 enhanced as of #8101, more specific failure
+ modes for non-resolvable FKs
+
+ """
m = MetaData()
m2 = MetaData()
t1 = Table("t1", m, Column("id", Integer), Column("id2", Integer))
- t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id")))
+
+ if include_a_joining_table:
+ t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id")))
+
t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2")))
- s = (
- select(t2, t3)
- .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
- .subquery()
- )
+ with expect_raises_message(
+ exc.NoReferencedTableError,
+ "Foreign key associated with column 't3.id'",
+ ):
+ t3.join(t1)
- assert_raises(exc.NoReferencedTableError, s.join, t1)
+ if include_a_joining_table:
+ s = (
+ select(t2, t3)
+ .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+ .subquery()
+ )
+ else:
+ s = (
+ select(t3)
+ .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+ .subquery()
+ )
+
+ with expect_raises_message(
+ exc.NoReferencedTableError,
+ "Foreign key associated with column 'anon_1.t3_id' could not "
+ "find table 't1' with which to generate a foreign key to target "
+ "column 'id2'",
+ ):
+ select(s.join(t1)),
+
+ # manual join is OK. using select().join() here is also exercising
+ # that join() does not need to resolve FKs if we provided the
+ # ON clause
+ if include_a_joining_table:
+ self.assert_compile(
+ select(s).join(
+ t1, and_(s.c.t2_id == t1.c.id, s.c.t3_id == t1.c.id)
+ ),
+ "SELECT anon_1.t2_id, anon_1.t3_id FROM (SELECT "
+ "t2.id AS t2_id, t3.id AS t3_id FROM t2, t3) AS anon_1 "
+ "JOIN t1 ON anon_1.t2_id = t1.id AND anon_1.t3_id = t1.id",
+ )
+ else:
+ self.assert_compile(
+ select(s).join(t1, s.c.t3_id == t1.c.id),
+ "SELECT anon_1.t3_id FROM (SELECT t3.id AS t3_id FROM t3) "
+ "AS anon_1 JOIN t1 ON anon_1.t3_id = t1.id",
+ )
def test_multi_label_chain_naming_col(self):
# See [ticket:2167] for this one.