summaryrefslogtreecommitdiff
path: root/test/dialect
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2022-06-18 21:08:27 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-06-18 21:08:27 +0000
commitbe576e7d88b6038781e52f7ef79799dbad09cd54 (patch)
tree772c368e107d13537ad8fb030b2b02fb1638169b /test/dialect
parentf7daad21ef66c29aecfbdb2b967641d0adad8779 (diff)
parentdb08a699489c9b0259579d7ff7fd6bf3496ca3a2 (diff)
downloadsqlalchemy-be576e7d88b6038781e52f7ef79799dbad09cd54.tar.gz
Merge "rearchitect reflection for batched performance" into main
Diffstat (limited to 'test/dialect')
-rw-r--r--test/dialect/mysql/test_reflection.py18
-rw-r--r--test/dialect/oracle/test_reflection.py554
-rw-r--r--test/dialect/postgresql/test_dialect.py20
-rw-r--r--test/dialect/postgresql/test_reflection.py441
-rw-r--r--test/dialect/postgresql/test_types.py35
-rw-r--r--test/dialect/test_sqlite.py31
6 files changed, 933 insertions, 166 deletions
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py
index f414c9c37..846001347 100644
--- a/test/dialect/mysql/test_reflection.py
+++ b/test/dialect/mysql/test_reflection.py
@@ -33,9 +33,9 @@ from sqlalchemy import UniqueConstraint
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.dialects.mysql import reflection as _reflection
from sqlalchemy.schema import CreateIndex
-from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -558,6 +558,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_skip_not_describable(self, metadata, connection):
+ """This test is the only one that test the _default_multi_reflect
+ behaviour with UnreflectableTableError
+ """
+
@event.listens_for(metadata, "before_drop")
def cleanup(*arg, **kw):
with testing.db.begin() as conn:
@@ -579,14 +583,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
m.reflect(views=True, bind=conn)
eq_(m.tables["test_t2"].name, "test_t2")
- assert_raises_message(
- exc.UnreflectableTableError,
- "references invalid table",
- Table,
- "test_v",
- MetaData(),
- autoload_with=conn,
- )
+ with expect_raises_message(
+ exc.UnreflectableTableError, "references invalid table"
+ ):
+ Table("test_v", MetaData(), autoload_with=conn)
@testing.exclude("mysql", "<", (5, 0, 0), "no information_schema support")
def test_system_views(self):
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py
index bf76dca43..53eb94df3 100644
--- a/test/dialect/oracle/test_reflection.py
+++ b/test/dialect/oracle/test_reflection.py
@@ -27,14 +27,18 @@ from sqlalchemy.dialects.oracle.base import BINARY_FLOAT
from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION
from sqlalchemy.dialects.oracle.base import NUMBER
from sqlalchemy.dialects.oracle.base import REAL
+from sqlalchemy.engine import ObjectKind
from sqlalchemy.testing import assert_warns
from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import config
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
+from sqlalchemy.testing.schema import eq_compile_type
from sqlalchemy.testing.schema import Table
@@ -384,6 +388,7 @@ class SystemTableTablenamesTest(fixtures.TestBase):
__backend__ = True
def setup_test(self):
+
with testing.db.begin() as conn:
conn.exec_driver_sql("create table my_table (id integer)")
conn.exec_driver_sql(
@@ -417,6 +422,14 @@ class SystemTableTablenamesTest(fixtures.TestBase):
set(["my_table", "foo_table"]),
)
+ def test_reflect_system_table(self):
+ meta = MetaData()
+ t = Table("foo_table", meta, autoload_with=testing.db)
+ assert t.columns.keys() == ["id"]
+
+ t = Table("my_temp_table", meta, autoload_with=testing.db)
+ assert t.columns.keys() == ["id"]
+
class DontReflectIOTTest(fixtures.TestBase):
"""test that index overflow tables aren't included in
@@ -509,6 +522,228 @@ class TableReflectionTest(fixtures.TestBase):
tbl = Table("test_compress", m2, autoload_with=connection)
assert tbl.dialect_options["oracle"]["compress"] == "OLTP"
+ def test_reflect_hidden_column(self):
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql(
+ "CREATE TABLE my_table(id integer, hide integer INVISIBLE)"
+ )
+
+ try:
+ insp = inspect(conn)
+ cols = insp.get_columns("my_table")
+ assert len(cols) == 1
+ assert cols[0]["name"] == "id"
+ finally:
+ conn.exec_driver_sql("DROP TABLE my_table")
+
+
+class ViewReflectionTest(fixtures.TestBase):
+ __only_on__ = "oracle"
+ __backend__ = True
+
+ @classmethod
+ def setup_test_class(cls):
+ sql = """
+ CREATE TABLE tbl (
+ id INTEGER PRIMARY KEY,
+ data INTEGER
+ );
+
+ CREATE VIEW tbl_plain_v AS
+ SELECT id, data FROM tbl WHERE id > 100;
+
+ -- comments on plain views are created with "comment on table"
+ -- because why not..
+ COMMENT ON TABLE tbl_plain_v IS 'view comment';
+
+ CREATE MATERIALIZED VIEW tbl_v AS
+ SELECT id, data FROM tbl WHERE id > 42;
+
+ COMMENT ON MATERIALIZED VIEW tbl_v IS 'my mat view comment';
+
+ CREATE MATERIALIZED VIEW tbl_v2 AS
+ SELECT id, data FROM tbl WHERE id < 42;
+
+ COMMENT ON MATERIALIZED VIEW tbl_v2 IS 'my other mat view comment';
+
+ CREATE SYNONYM view_syn FOR tbl_plain_v;
+ CREATE SYNONYM %(test_schema)s.ts_v_s FOR tbl_plain_v;
+
+ CREATE VIEW %(test_schema)s.schema_view AS
+ SELECT 1 AS value FROM dual;
+
+ COMMENT ON TABLE %(test_schema)s.schema_view IS 'schema view comment';
+ CREATE SYNONYM syn_schema_view FOR %(test_schema)s.schema_view;
+ """
+ if testing.requires.oracle_test_dblink.enabled:
+ cls.dblink = config.file_config.get(
+ "sqla_testing", "oracle_db_link"
+ )
+ sql += """
+ CREATE SYNONYM syn_link FOR tbl_plain_v@%(link)s;
+ """ % {
+ "link": cls.dblink
+ }
+ with testing.db.begin() as conn:
+ for stmt in (
+ sql % {"test_schema": testing.config.test_schema}
+ ).split(";"):
+ if stmt.strip():
+ conn.exec_driver_sql(stmt)
+
+ @classmethod
+ def teardown_test_class(cls):
+ sql = """
+ DROP MATERIALIZED VIEW tbl_v;
+ DROP MATERIALIZED VIEW tbl_v2;
+ DROP VIEW tbl_plain_v;
+ DROP TABLE tbl;
+ DROP VIEW %(test_schema)s.schema_view;
+ DROP SYNONYM view_syn;
+ DROP SYNONYM %(test_schema)s.ts_v_s;
+ DROP SYNONYM syn_schema_view;
+ """
+ if testing.requires.oracle_test_dblink.enabled:
+ sql += """
+ DROP SYNONYM syn_link;
+ """
+ with testing.db.begin() as conn:
+ for stmt in (
+ sql % {"test_schema": testing.config.test_schema}
+ ).split(";"):
+ if stmt.strip():
+ conn.exec_driver_sql(stmt)
+
+ def test_get_names(self, connection):
+ insp = inspect(connection)
+ eq_(insp.get_table_names(), ["tbl"])
+ eq_(insp.get_view_names(), ["tbl_plain_v"])
+ eq_(insp.get_materialized_view_names(), ["tbl_v", "tbl_v2"])
+ eq_(
+ insp.get_view_names(schema=testing.config.test_schema),
+ ["schema_view"],
+ )
+
+ def test_get_table_comment_on_view(self, connection):
+ insp = inspect(connection)
+ eq_(insp.get_table_comment("tbl_v"), {"text": "my mat view comment"})
+ eq_(insp.get_table_comment("tbl_plain_v"), {"text": "view comment"})
+
+ def test_get_multi_view_comment(self, connection):
+ insp = inspect(connection)
+ plain = {(None, "tbl_plain_v"): {"text": "view comment"}}
+ mat = {
+ (None, "tbl_v"): {"text": "my mat view comment"},
+ (None, "tbl_v2"): {"text": "my other mat view comment"},
+ }
+ eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain)
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW),
+ mat,
+ )
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW),
+ {**plain, **mat},
+ )
+ ts = testing.config.test_schema
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW, schema=ts),
+ {(ts, "schema_view"): {"text": "schema view comment"}},
+ )
+ eq_(insp.get_multi_table_comment(), {(None, "tbl"): {"text": None}})
+
+ def test_get_table_comment_synonym(self, connection):
+ insp = inspect(connection)
+ eq_(
+ insp.get_table_comment("view_syn", oracle_resolve_synonyms=True),
+ {"text": "view comment"},
+ )
+ eq_(
+ insp.get_table_comment(
+ "syn_schema_view", oracle_resolve_synonyms=True
+ ),
+ {"text": "schema view comment"},
+ )
+ eq_(
+ insp.get_table_comment(
+ "ts_v_s",
+ oracle_resolve_synonyms=True,
+ schema=testing.config.test_schema,
+ ),
+ {"text": "view comment"},
+ )
+
+ def test_get_multi_view_comment_synonym(self, connection):
+ insp = inspect(connection)
+ exp = {
+ (None, "view_syn"): {"text": "view comment"},
+ (None, "syn_schema_view"): {"text": "schema view comment"},
+ }
+ if testing.requires.oracle_test_dblink.enabled:
+ exp[(None, "syn_link")] = {"text": "view comment"}
+ eq_(
+ insp.get_multi_table_comment(
+ oracle_resolve_synonyms=True, kind=ObjectKind.ANY_VIEW
+ ),
+ exp,
+ )
+ ts = testing.config.test_schema
+ eq_(
+ insp.get_multi_table_comment(
+ oracle_resolve_synonyms=True,
+ schema=ts,
+ kind=ObjectKind.ANY_VIEW,
+ ),
+ {(ts, "ts_v_s"): {"text": "view comment"}},
+ )
+
+ def test_get_view_definition(self, connection):
+ insp = inspect(connection)
+ eq_(
+ insp.get_view_definition("tbl_plain_v"),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition("tbl_v"),
+ "SELECT id, data FROM tbl WHERE id > 42",
+ )
+ with expect_raises(exc.NoSuchTableError):
+ eq_(insp.get_view_definition("view_syn"), None)
+ eq_(
+ insp.get_view_definition("view_syn", oracle_resolve_synonyms=True),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition(
+ "syn_schema_view", oracle_resolve_synonyms=True
+ ),
+ "SELECT 1 AS value FROM dual",
+ )
+ eq_(
+ insp.get_view_definition(
+ "ts_v_s",
+ oracle_resolve_synonyms=True,
+ schema=testing.config.test_schema,
+ ),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+
+ @testing.requires.oracle_test_dblink
+ def test_get_view_definition_dblink(self, connection):
+ insp = inspect(connection)
+ eq_(
+ insp.get_view_definition("syn_link", oracle_resolve_synonyms=True),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition("tbl_plain_v", dblink=self.dblink),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition("tbl_v", dblink=self.dblink),
+ "SELECT id, data FROM tbl WHERE id > 42",
+ )
+
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = "oracle"
@@ -722,8 +957,6 @@ class DBLinkReflectionTest(fixtures.TestBase):
@classmethod
def setup_test_class(cls):
- from sqlalchemy.testing import config
-
cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link")
# note that the synonym here is still not totally functional
@@ -863,3 +1096,320 @@ class IdentityReflectionTest(fixtures.TablesTest):
exp = common.copy()
exp["order"] = True
eq_(col["identity"], exp)
+
+
+class AdditionalReflectionTests(fixtures.TestBase):
+ __only_on__ = "oracle"
+ __backend__ = True
+
+ @classmethod
+ def setup_test_class(cls):
+ # currently assuming full DBA privs for the user.
+ # don't really know how else to go here unless
+ # we connect as the other user.
+
+ sql = """
+CREATE TABLE %(schema)sparent(
+ id INTEGER,
+ data VARCHAR2(50),
+ CONSTRAINT parent_pk_%(schema_id)s PRIMARY KEY (id)
+);
+CREATE TABLE %(schema)smy_table(
+ id INTEGER,
+ name VARCHAR2(125),
+ related INTEGER,
+ data%(schema_id)s NUMBER NOT NULL,
+ CONSTRAINT my_table_pk_%(schema_id)s PRIMARY KEY (id),
+ CONSTRAINT my_table_fk_%(schema_id)s FOREIGN KEY(related)
+ REFERENCES %(schema)sparent(id),
+ CONSTRAINT my_table_check_%(schema_id)s CHECK (data%(schema_id)s > 42),
+ CONSTRAINT data_unique%(schema_id)s UNIQUE (data%(schema_id)s)
+);
+CREATE INDEX my_table_index_%(schema_id)s on %(schema)smy_table (id, name);
+COMMENT ON TABLE %(schema)smy_table IS 'my table comment %(schema_id)s';
+COMMENT ON COLUMN %(schema)smy_table.name IS
+'my table.name comment %(schema_id)s';
+"""
+
+ with testing.db.begin() as conn:
+ for schema in ("", testing.config.test_schema):
+ dd = {
+ "schema": f"{schema}." if schema else "",
+ "schema_id": "sch" if schema else "",
+ }
+ for stmt in (sql % dd).split(";"):
+ if stmt.strip():
+ conn.exec_driver_sql(stmt)
+
+ @classmethod
+ def teardown_test_class(cls):
+ sql = """
+drop table %(schema)smy_table;
+drop table %(schema)sparent;
+"""
+ with testing.db.begin() as conn:
+ for schema in ("", testing.config.test_schema):
+ dd = {"schema": f"{schema}." if schema else ""}
+ for stmt in (sql % dd).split(";"):
+ if stmt.strip():
+ try:
+ conn.exec_driver_sql(stmt)
+ except:
+ pass
+
+ def setup_test(self):
+ self.dblink = config.file_config.get("sqla_testing", "oracle_db_link")
+ self.dblink2 = config.file_config.get(
+ "sqla_testing", "oracle_db_link2"
+ )
+ self.columns = {}
+ self.indexes = {}
+ self.primary_keys = {}
+ self.comments = {}
+ self.uniques = {}
+ self.checks = {}
+ self.foreign_keys = {}
+ self.options = {}
+ self.allDicts = [
+ self.columns,
+ self.indexes,
+ self.primary_keys,
+ self.comments,
+ self.uniques,
+ self.checks,
+ self.foreign_keys,
+ self.options,
+ ]
+ for schema in (None, testing.config.test_schema):
+ suffix = "sch" if schema else ""
+
+ self.columns[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": "id",
+ "nullable": False,
+ "type": eq_compile_type("INTEGER"),
+ "default": None,
+ "comment": None,
+ },
+ {
+ "name": "name",
+ "nullable": True,
+ "type": eq_compile_type("VARCHAR(125)"),
+ "default": None,
+ "comment": f"my table.name comment {suffix}",
+ },
+ {
+ "name": "related",
+ "nullable": True,
+ "type": eq_compile_type("INTEGER"),
+ "default": None,
+ "comment": None,
+ },
+ {
+ "name": f"data{suffix}",
+ "nullable": False,
+ "type": eq_compile_type("NUMBER"),
+ "default": None,
+ "comment": None,
+ },
+ ],
+ (schema, "parent"): [
+ {
+ "name": "id",
+ "nullable": False,
+ "type": eq_compile_type("INTEGER"),
+ "default": None,
+ "comment": None,
+ },
+ {
+ "name": "data",
+ "nullable": True,
+ "type": eq_compile_type("VARCHAR(50)"),
+ "default": None,
+ "comment": None,
+ },
+ ],
+ }
+ self.indexes[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"data_unique{suffix}",
+ "column_names": [f"data{suffix}"],
+ "dialect_options": {},
+ "unique": True,
+ },
+ {
+ "name": f"my_table_index_{suffix}",
+ "column_names": ["id", "name"],
+ "dialect_options": {},
+ "unique": False,
+ },
+ ],
+ (schema, "parent"): [],
+ }
+ self.primary_keys[schema] = {
+ (schema, "my_table"): {
+ "name": f"my_table_pk_{suffix}",
+ "constrained_columns": ["id"],
+ },
+ (schema, "parent"): {
+ "name": f"parent_pk_{suffix}",
+ "constrained_columns": ["id"],
+ },
+ }
+ self.comments[schema] = {
+ (schema, "my_table"): {"text": f"my table comment {suffix}"},
+ (schema, "parent"): {"text": None},
+ }
+ self.foreign_keys[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"my_table_fk_{suffix}",
+ "constrained_columns": ["related"],
+ "referred_schema": schema,
+ "referred_table": "parent",
+ "referred_columns": ["id"],
+ "options": {},
+ }
+ ],
+ (schema, "parent"): [],
+ }
+ self.checks[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"my_table_check_{suffix}",
+ "sqltext": f"data{suffix} > 42",
+ }
+ ],
+ (schema, "parent"): [],
+ }
+ self.uniques[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"data_unique{suffix}",
+ "column_names": [f"data{suffix}"],
+ "duplicates_index": f"data_unique{suffix}",
+ }
+ ],
+ (schema, "parent"): [],
+ }
+ self.options[schema] = {
+ (schema, "my_table"): {},
+ (schema, "parent"): {},
+ }
+
+ def test_tables(self, connection):
+ insp = inspect(connection)
+
+ eq_(sorted(insp.get_table_names()), ["my_table", "parent"])
+
+ def _check_reflection(self, conn, schema, res_schema=False, **kw):
+ if res_schema is False:
+ res_schema = schema
+ insp = inspect(conn)
+ eq_(
+ insp.get_multi_columns(schema=schema, **kw),
+ self.columns[res_schema],
+ )
+ eq_(
+ insp.get_multi_indexes(schema=schema, **kw),
+ self.indexes[res_schema],
+ )
+ eq_(
+ insp.get_multi_pk_constraint(schema=schema, **kw),
+ self.primary_keys[res_schema],
+ )
+ eq_(
+ insp.get_multi_table_comment(schema=schema, **kw),
+ self.comments[res_schema],
+ )
+ eq_(
+ insp.get_multi_foreign_keys(schema=schema, **kw),
+ self.foreign_keys[res_schema],
+ )
+ eq_(
+ insp.get_multi_check_constraints(schema=schema, **kw),
+ self.checks[res_schema],
+ )
+ eq_(
+ insp.get_multi_unique_constraints(schema=schema, **kw),
+ self.uniques[res_schema],
+ )
+ eq_(
+ insp.get_multi_table_options(schema=schema, **kw),
+ self.options[res_schema],
+ )
+
+ @testing.combinations(True, False, argnames="schema")
+ def test_schema_translate_map(self, connection, schema):
+ schema = testing.config.test_schema if schema else None
+ c = connection.execution_options(
+ schema_translate_map={
+ None: "foo",
+ testing.config.test_schema: "bar",
+ }
+ )
+ self._check_reflection(c, schema)
+
+ @testing.requires.oracle_test_dblink
+ def test_db_link(self, connection):
+ self._check_reflection(connection, schema=None, dblink=self.dblink)
+ self._check_reflection(
+ connection,
+ schema=testing.config.test_schema,
+ dblink=self.dblink,
+ )
+
+ def test_no_synonyms(self, connection):
+ # oracle_resolve_synonyms is ignored if there are no matching synonym
+ self._check_reflection(
+ connection, schema=None, oracle_resolve_synonyms=True
+ )
+ connection.exec_driver_sql("CREATE SYNONYM tmp FOR parent")
+ for dict_ in self.allDicts:
+ dict_["tmp"] = {(None, "parent"): dict_[None][(None, "parent")]}
+ try:
+ self._check_reflection(
+ connection,
+ schema=None,
+ res_schema="tmp",
+ oracle_resolve_synonyms=True,
+ filter_names=["parent"],
+ )
+ finally:
+ connection.exec_driver_sql("DROP SYNONYM tmp")
+
+ @testing.requires.oracle_test_dblink
+ @testing.requires.oracle_test_dblink2
+ def test_multi_dblink_synonyms(self, connection):
+ # oracle_resolve_synonyms handles multiple dblink at once
+ connection.exec_driver_sql(
+ f"CREATE SYNONYM s1 FOR my_table@{self.dblink}"
+ )
+ connection.exec_driver_sql(
+ f"CREATE SYNONYM s2 FOR {testing.config.test_schema}."
+ f"my_table@{self.dblink2}"
+ )
+ connection.exec_driver_sql("CREATE SYNONYM s3 FOR parent")
+ for dict_ in self.allDicts:
+ dict_["tmp"] = {
+ (None, "s1"): dict_[None][(None, "my_table")],
+ (None, "s2"): dict_[testing.config.test_schema][
+ (testing.config.test_schema, "my_table")
+ ],
+ (None, "s3"): dict_[None][(None, "parent")],
+ }
+ fk = self.foreign_keys["tmp"][(None, "s1")][0]
+ fk["referred_table"] = "s3"
+ try:
+ self._check_reflection(
+ connection,
+ schema=None,
+ res_schema="tmp",
+ oracle_resolve_synonyms=True,
+ )
+ finally:
+ connection.exec_driver_sql("DROP SYNONYM s1")
+ connection.exec_driver_sql("DROP SYNONYM s2")
+ connection.exec_driver_sql("DROP SYNONYM s3")
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index 0093eb5ba..d55aa8203 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -887,19 +887,12 @@ class MiscBackendTest(
)
@testing.combinations(
- ((8, 1), False, False),
- ((8, 1), None, False),
- ((11, 5), True, False),
- ((11, 5), False, True),
+ (True, False),
+ (False, True),
)
- def test_backslash_escapes_detection(
- self, version, explicit_setting, expected
- ):
+ def test_backslash_escapes_detection(self, explicit_setting, expected):
engine = engines.testing_engine()
- def _server_version(conn):
- return version
-
if explicit_setting is not None:
@event.listens_for(engine, "connect", insert=True)
@@ -912,11 +905,8 @@ class MiscBackendTest(
)
dbapi_connection.commit()
- with mock.patch.object(
- engine.dialect, "_get_server_version_info", _server_version
- ):
- with engine.connect():
- eq_(engine.dialect._backslash_escapes, expected)
+ with engine.connect():
+ eq_(engine.dialect._backslash_escapes, expected)
def test_dbapi_autocommit_attribute(self):
"""all the supported DBAPIs have an .autocommit attribute. make
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 3e0569d32..21b4149bc 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -27,17 +27,21 @@ from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import ExcludeConstraint
from sqlalchemy.dialects.postgresql import INTEGER
from sqlalchemy.dialects.postgresql import INTERVAL
+from sqlalchemy.dialects.postgresql import pg_catalog
from sqlalchemy.dialects.postgresql import TSRANGE
+from sqlalchemy.engine import ObjectKind
+from sqlalchemy.engine import ObjectScope
from sqlalchemy.schema import CreateIndex
from sqlalchemy.sql.schema import CheckConstraint
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
-from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_warns
from sqlalchemy.testing.assertions import AssertsExecutionResults
from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.assertions import is_
+from sqlalchemy.testing.assertions import is_false
from sqlalchemy.testing.assertions import is_true
@@ -231,17 +235,36 @@ class MaterializedViewReflectionTest(
connection.execute(target.insert(), {"id": 89, "data": "d1"})
materialized_view = sa.DDL(
- "CREATE MATERIALIZED VIEW test_mview AS " "SELECT * FROM testtable"
+ "CREATE MATERIALIZED VIEW test_mview AS SELECT * FROM testtable"
)
plain_view = sa.DDL(
- "CREATE VIEW test_regview AS " "SELECT * FROM testtable"
+ "CREATE VIEW test_regview AS SELECT data FROM testtable"
)
sa.event.listen(testtable, "after_create", plain_view)
sa.event.listen(testtable, "after_create", materialized_view)
sa.event.listen(
testtable,
+ "after_create",
+ sa.DDL("COMMENT ON VIEW test_regview IS 'regular view comment'"),
+ )
+ sa.event.listen(
+ testtable,
+ "after_create",
+ sa.DDL(
+ "COMMENT ON MATERIALIZED VIEW test_mview "
+ "IS 'materialized view comment'"
+ ),
+ )
+ sa.event.listen(
+ testtable,
+ "after_create",
+ sa.DDL("CREATE INDEX mat_index ON test_mview(data DESC)"),
+ )
+
+ sa.event.listen(
+ testtable,
"before_drop",
sa.DDL("DROP MATERIALIZED VIEW test_mview"),
)
@@ -249,6 +272,12 @@ class MaterializedViewReflectionTest(
testtable, "before_drop", sa.DDL("DROP VIEW test_regview")
)
+ def test_has_type(self, connection):
+ insp = inspect(connection)
+ is_true(insp.has_type("test_mview"))
+ is_true(insp.has_type("test_regview"))
+ is_true(insp.has_type("testtable"))
+
def test_mview_is_reflected(self, connection):
metadata = MetaData()
table = Table("test_mview", metadata, autoload_with=connection)
@@ -265,49 +294,99 @@ class MaterializedViewReflectionTest(
def test_get_view_names(self, inspect_fixture):
insp, conn = inspect_fixture
- eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"]))
+ eq_(set(insp.get_view_names()), set(["test_regview"]))
- def test_get_view_names_plain(self, connection):
+ def test_get_materialized_view_names(self, inspect_fixture):
+ insp, conn = inspect_fixture
+ eq_(set(insp.get_materialized_view_names()), set(["test_mview"]))
+
+ def test_get_view_names_reflection_cache_ok(self, connection):
insp = inspect(connection)
+ eq_(set(insp.get_view_names()), set(["test_regview"]))
eq_(
- set(insp.get_view_names(include=("plain",))), set(["test_regview"])
+ set(insp.get_materialized_view_names()),
+ set(["test_mview"]),
+ )
+ eq_(
+ set(insp.get_view_names()).union(
+ insp.get_materialized_view_names()
+ ),
+ set(["test_regview", "test_mview"]),
)
- def test_get_view_names_plain_string(self, connection):
+ def test_get_view_definition(self, connection):
insp = inspect(connection)
- eq_(set(insp.get_view_names(include="plain")), set(["test_regview"]))
- def test_get_view_names_materialized(self, connection):
- insp = inspect(connection)
+ def normalize(definition):
+ return re.sub(r"[\n\t ]+", " ", definition.strip())
+
eq_(
- set(insp.get_view_names(include=("materialized",))),
- set(["test_mview"]),
+ normalize(insp.get_view_definition("test_mview")),
+ "SELECT testtable.id, testtable.data FROM testtable;",
+ )
+ eq_(
+ normalize(insp.get_view_definition("test_regview")),
+ "SELECT testtable.data FROM testtable;",
)
- def test_get_view_names_reflection_cache_ok(self, connection):
+ def test_get_view_comment(self, connection):
insp = inspect(connection)
eq_(
- set(insp.get_view_names(include=("plain",))), set(["test_regview"])
+ insp.get_table_comment("test_regview"),
+ {"text": "regular view comment"},
)
eq_(
- set(insp.get_view_names(include=("materialized",))),
- set(["test_mview"]),
+ insp.get_table_comment("test_mview"),
+ {"text": "materialized view comment"},
)
- eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"]))
- def test_get_view_names_empty(self, connection):
+ def test_get_multi_view_comment(self, connection):
insp = inspect(connection)
- assert_raises(ValueError, insp.get_view_names, include=())
+ eq_(
+ insp.get_multi_table_comment(),
+ {(None, "testtable"): {"text": None}},
+ )
+ plain = {(None, "test_regview"): {"text": "regular view comment"}}
+ mat = {(None, "test_mview"): {"text": "materialized view comment"}}
+ eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain)
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW),
+ mat,
+ )
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW),
+ {**plain, **mat},
+ )
+ eq_(
+ insp.get_multi_table_comment(
+ kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY
+ ),
+ {},
+ )
- def test_get_view_definition(self, connection):
+ def test_get_multi_view_indexes(self, connection):
insp = inspect(connection)
+ eq_(insp.get_multi_indexes(), {(None, "testtable"): []})
+
+ exp = {
+ "name": "mat_index",
+ "unique": False,
+ "column_names": ["data"],
+ "column_sorting": {"data": ("desc",)},
+ }
+ if connection.dialect.server_version_info >= (11, 0):
+ exp["include_columns"] = []
+ exp["dialect_options"] = {"postgresql_include": []}
+ plain = {(None, "test_regview"): []}
+ mat = {(None, "test_mview"): [exp]}
+ eq_(insp.get_multi_indexes(kind=ObjectKind.VIEW), plain)
+ eq_(insp.get_multi_indexes(kind=ObjectKind.MATERIALIZED_VIEW), mat)
+ eq_(insp.get_multi_indexes(kind=ObjectKind.ANY_VIEW), {**plain, **mat})
eq_(
- re.sub(
- r"[\n\t ]+",
- " ",
- insp.get_view_definition("test_mview").strip(),
+ insp.get_multi_indexes(
+ kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY
),
- "SELECT testtable.id, testtable.data FROM testtable;",
+ {},
)
@@ -993,9 +1072,9 @@ class ReflectionTest(
go,
[
"Skipped unsupported reflection of "
- "expression-based index idx1",
+ "expression-based index idx1 of table party",
"Skipped unsupported reflection of "
- "expression-based index idx3",
+ "expression-based index idx3 of table party",
],
)
@@ -1016,7 +1095,7 @@ class ReflectionTest(
metadata.create_all(connection)
- ind = connection.dialect.get_indexes(connection, t1, None)
+ ind = connection.dialect.get_indexes(connection, t1.name, None)
partial_definitions = []
for ix in ind:
@@ -1337,6 +1416,9 @@ class ReflectionTest(
}
],
)
+ is_true(inspector.has_type("mood", "test_schema"))
+ is_true(inspector.has_type("mood", "*"))
+ is_false(inspector.has_type("mood"))
def test_inspect_enums(self, metadata, inspect_fixture):
@@ -1345,30 +1427,49 @@ class ReflectionTest(
enum_type = postgresql.ENUM(
"cat", "dog", "rat", name="pet", metadata=metadata
)
+ enum_type.create(conn)
+ conn.commit()
- with conn.begin():
- enum_type.create(conn)
-
- eq_(
- inspector.get_enums(),
- [
- {
- "visible": True,
- "labels": ["cat", "dog", "rat"],
- "name": "pet",
- "schema": "public",
- }
- ],
- )
-
- def test_get_table_oid(self, metadata, inspect_fixture):
-
- inspector, conn = inspect_fixture
+ res = [
+ {
+ "visible": True,
+ "labels": ["cat", "dog", "rat"],
+ "name": "pet",
+ "schema": "public",
+ }
+ ]
+ eq_(inspector.get_enums(), res)
+ is_true(inspector.has_type("pet", "*"))
+ is_true(inspector.has_type("pet"))
+ is_false(inspector.has_type("pet", "test_schema"))
+
+ enum_type.drop(conn)
+ conn.commit()
+ eq_(inspector.get_enums(), res)
+ is_true(inspector.has_type("pet"))
+ inspector.clear_cache()
+ eq_(inspector.get_enums(), [])
+ is_false(inspector.has_type("pet"))
+
+ def test_get_table_oid(self, metadata, connection):
+ Table("t1", metadata, Column("col", Integer))
+ Table("t1", metadata, Column("col", Integer), schema="test_schema")
+ metadata.create_all(connection)
+ insp = inspect(connection)
+ oid = insp.get_table_oid("t1")
+ oid_schema = insp.get_table_oid("t1", schema="test_schema")
+ is_true(isinstance(oid, int))
+ is_true(isinstance(oid_schema, int))
+ is_true(oid != oid_schema)
- with conn.begin():
- Table("some_table", metadata, Column("q", Integer)).create(conn)
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_table_oid("does_not_exist")
- assert inspector.get_table_oid("some_table") is not None
+ metadata.tables["t1"].drop(connection)
+ eq_(insp.get_table_oid("t1"), oid)
+ insp.clear_cache()
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_table_oid("t1")
def test_inspect_enums_case_sensitive(self, metadata, connection):
sa.event.listen(
@@ -1707,77 +1808,146 @@ class ReflectionTest(
)
def test_reflect_check_warning(self):
- rows = [("some name", "NOTCHECK foobar")]
+ rows = [("foo", "some name", "NOTCHECK foobar")]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
)
)
- with mock.patch.object(
- testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1
+ with testing.expect_warnings(
+ "Could not parse CHECK constraint text: 'NOTCHECK foobar'"
):
- with testing.expect_warnings(
- "Could not parse CHECK constraint text: 'NOTCHECK foobar'"
- ):
- testing.db.dialect.get_check_constraints(conn, "foo")
+ testing.db.dialect.get_check_constraints(conn, "foo")
def test_reflect_extra_newlines(self):
rows = [
- ("some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"),
- ("some other name", "CHECK ((b\nIS\nNOT\nNULL))"),
- ("some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"),
- ("some name", "CHECK (c != 'hi\nim a name\n')"),
+ ("foo", "some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"),
+ ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))"),
+ ("foo", "some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"),
+ ("foo", "some name", "CHECK (c != 'hi\nim a name\n')"),
]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
)
)
- with mock.patch.object(
- testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1
- ):
- check_constraints = testing.db.dialect.get_check_constraints(
- conn, "foo"
- )
- eq_(
- check_constraints,
- [
- {
- "name": "some name",
- "sqltext": "a \nIS\n NOT\n\n NULL\n",
- },
- {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"},
- {
- "name": "some CRLF name",
- "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL",
- },
- {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"},
- ],
- )
+ check_constraints = testing.db.dialect.get_check_constraints(
+ conn, "foo"
+ )
+ eq_(
+ check_constraints,
+ [
+ {
+ "name": "some name",
+ "sqltext": "a \nIS\n NOT\n\n NULL\n",
+ },
+ {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"},
+ {
+ "name": "some CRLF name",
+ "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL",
+ },
+ {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"},
+ ],
+ )
def test_reflect_with_not_valid_check_constraint(self):
- rows = [("some name", "CHECK ((a IS NOT NULL)) NOT VALID")]
+ rows = [("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID")]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
)
)
- with mock.patch.object(
- testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1
- ):
- check_constraints = testing.db.dialect.get_check_constraints(
- conn, "foo"
+ check_constraints = testing.db.dialect.get_check_constraints(
+ conn, "foo"
+ )
+ eq_(
+ check_constraints,
+ [
+ {
+ "name": "some name",
+ "sqltext": "a IS NOT NULL",
+ "dialect_options": {"not_valid": True},
+ }
+ ],
+ )
+
+ def _apply_stm(self, connection, use_map):
+ if use_map:
+ return connection.execution_options(
+ schema_translate_map={
+ None: "foo",
+ testing.config.test_schema: "bar",
+ }
)
- eq_(
- check_constraints,
- [
- {
- "name": "some name",
- "sqltext": "a IS NOT NULL",
- "dialect_options": {"not_valid": True},
- }
- ],
+ else:
+ return connection
+
+ @testing.combinations(True, False, argnames="use_map")
+ @testing.combinations(True, False, argnames="schema")
+ def test_schema_translate_map(self, metadata, connection, use_map, schema):
+ schema = testing.config.test_schema if schema else None
+ Table(
+ "foo",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("a", Integer, index=True),
+ Column(
+ "b",
+ ForeignKey(f"{schema}.foo.id" if schema else "foo.id"),
+ unique=True,
+ ),
+ CheckConstraint("a>10", name="foo_check"),
+ comment="comm",
+ schema=schema,
+ )
+ metadata.create_all(connection)
+ if use_map:
+ connection = connection.execution_options(
+ schema_translate_map={
+ None: "foo",
+ testing.config.test_schema: "bar",
+ }
)
+ insp = inspect(connection)
+ eq_(
+ [c["name"] for c in insp.get_columns("foo", schema=schema)],
+ ["id", "a", "b"],
+ )
+ eq_(
+ [
+ i["column_names"]
+ for i in insp.get_indexes("foo", schema=schema)
+ ],
+ [["b"], ["a"]],
+ )
+ eq_(
+ insp.get_pk_constraint("foo", schema=schema)[
+ "constrained_columns"
+ ],
+ ["id"],
+ )
+ eq_(insp.get_table_comment("foo", schema=schema), {"text": "comm"})
+ eq_(
+ [
+ f["constrained_columns"]
+ for f in insp.get_foreign_keys("foo", schema=schema)
+ ],
+ [["b"]],
+ )
+ eq_(
+ [
+ c["name"]
+ for c in insp.get_check_constraints("foo", schema=schema)
+ ],
+ ["foo_check"],
+ )
+ eq_(
+ [
+ u["column_names"]
+ for u in insp.get_unique_constraints("foo", schema=schema)
+ ],
+ [["b"]],
+ )
class CustomTypeReflectionTest(fixtures.TestBase):
@@ -1804,9 +1974,23 @@ class CustomTypeReflectionTest(fixtures.TestBase):
("my_custom_type(ARG1)", ("ARG1", None)),
("my_custom_type(ARG1, ARG2)", ("ARG1", "ARG2")),
]:
- column_info = dialect._get_column_info(
- "colname", sch, None, False, {}, {}, "public", None, "", None
+ row_dict = {
+ "name": "colname",
+ "table_name": "tblname",
+ "format_type": sch,
+ "default": None,
+ "not_null": False,
+ "comment": None,
+ "generated": "",
+ "identity_options": None,
+ }
+ column_info = dialect._get_columns_info(
+ [row_dict], {}, {}, "public"
)
+ assert ("public", "tblname") in column_info
+ column_info = column_info[("public", "tblname")]
+ assert len(column_info) == 1
+ column_info = column_info[0]
assert isinstance(column_info["type"], self.CustomType)
eq_(column_info["type"].arg1, args[0])
eq_(column_info["type"].arg2, args[1])
@@ -1951,3 +2135,64 @@ class IdentityReflectionTest(fixtures.TablesTest):
exp = default.copy()
exp.update(maxvalue=2**15 - 1)
eq_(col["identity"], exp)
+
+
+class TestReflectDifficultColTypes(fixtures.TablesTest):
+ __only_on__ = "postgresql"
+ __backend__ = True
+
+ def define_tables(metadata):
+ Table(
+ "sample_table",
+ metadata,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", Integer, unique=True),
+ Column("c3", Integer),
+ Index("sample_table_index", "c2", "c3"),
+ )
+
+ def check_int_list(self, row, key):
+ value = row[key]
+ is_true(isinstance(value, list))
+ is_true(len(value) > 0)
+ is_true(all(isinstance(v, int) for v in value))
+
+ def test_pg_index(self, connection):
+ insp = inspect(connection)
+
+ pgc_oid = insp.get_table_oid("sample_table")
+ cols = [
+ col
+ for col in pg_catalog.pg_index.c
+ if testing.db.dialect.server_version_info
+ >= col.info.get("server_version", (0,))
+ ]
+
+ stmt = sa.select(*cols).filter_by(indrelid=pgc_oid)
+ rows = connection.execute(stmt).mappings().all()
+ is_true(len(rows) > 0)
+ cols = [
+ col
+ for col in ["indkey", "indoption", "indclass", "indcollation"]
+ if testing.db.dialect.server_version_info
+ >= pg_catalog.pg_index.c[col].info.get("server_version", (0,))
+ ]
+ for row in rows:
+ for col in cols:
+ self.check_int_list(row, col)
+
+ def test_pg_constraint(self, connection):
+ insp = inspect(connection)
+
+ pgc_oid = insp.get_table_oid("sample_table")
+ cols = [
+ col
+ for col in pg_catalog.pg_constraint.c
+ if testing.db.dialect.server_version_info
+ >= col.info.get("server_version", (0,))
+ ]
+ stmt = sa.select(*cols).filter_by(conrelid=pgc_oid)
+ rows = connection.execute(stmt).mappings().all()
+ is_true(len(rows) > 0)
+ for row in rows:
+ self.check_int_list(row, "conkey")
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index fd4b91db1..79f029391 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -454,11 +454,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
asserter.assert_(
# check for table
RegexSQL(
- "select relname from pg_class c join pg_namespace.*",
+ "SELECT pg_catalog.pg_class.relname FROM pg_catalog."
+ "pg_class JOIN pg_catalog.pg_namespace.*",
dialect="postgresql",
),
# check for enum, just once
- RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"),
+ RegexSQL(
+ r"SELECT pg_catalog.pg_type.typname .* WHERE "
+ "pg_catalog.pg_type.typname = ",
+ dialect="postgresql",
+ ),
RegexSQL("CREATE TYPE myenum AS ENUM .*", dialect="postgresql"),
RegexSQL(r"CREATE TABLE t .*", dialect="postgresql"),
)
@@ -468,11 +473,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
asserter.assert_(
RegexSQL(
- "select relname from pg_class c join pg_namespace.*",
+ "SELECT pg_catalog.pg_class.relname FROM pg_catalog."
+ "pg_class JOIN pg_catalog.pg_namespace.*",
dialect="postgresql",
),
RegexSQL("DROP TABLE t", dialect="postgresql"),
- RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"),
+ RegexSQL(
+ r"SELECT pg_catalog.pg_type.typname .* WHERE "
+ "pg_catalog.pg_type.typname = ",
+ dialect="postgresql",
+ ),
RegexSQL("DROP TYPE myenum", dialect="postgresql"),
)
@@ -694,23 +704,6 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
connection, "fourfivesixtype"
)
- def test_no_support(self, testing_engine):
- def server_version_info(self):
- return (8, 2)
-
- e = testing_engine()
- dialect = e.dialect
- dialect._get_server_version_info = server_version_info
-
- assert dialect.supports_native_enum
- e.connect()
- assert not dialect.supports_native_enum
-
- # initialize is called again on new pool
- e.dispose()
- e.connect()
- assert not dialect.supports_native_enum
-
def test_reflection(self, metadata, connection):
etype = Enum(
"four", "five", "six", name="fourfivesixtype", metadata=metadata
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index fb4331998..ed9d67612 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -50,6 +50,7 @@ from sqlalchemy.testing import combinations
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -856,27 +857,15 @@ class AttachedDBTest(fixtures.TestBase):
["foo", "bar"],
)
- eq_(
- [
- d["name"]
- for d in insp.get_columns("nonexistent", schema="test_schema")
- ],
- [],
- )
- eq_(
- [
- d["name"]
- for d in insp.get_columns("another_created", schema=None)
- ],
- [],
- )
- eq_(
- [
- d["name"]
- for d in insp.get_columns("local_only", schema="test_schema")
- ],
- [],
- )
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_columns("nonexistent", schema="test_schema")
+
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_columns("another_created", schema=None)
+
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_columns("local_only", schema="test_schema")
+
eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
def test_table_names_present(self):