summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2022-06-18 21:08:27 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-06-18 21:08:27 +0000
commitbe576e7d88b6038781e52f7ef79799dbad09cd54 (patch)
tree772c368e107d13537ad8fb030b2b02fb1638169b /test
parentf7daad21ef66c29aecfbdb2b967641d0adad8779 (diff)
parentdb08a699489c9b0259579d7ff7fd6bf3496ca3a2 (diff)
downloadsqlalchemy-be576e7d88b6038781e52f7ef79799dbad09cd54.tar.gz
Merge "rearchitect reflection for batched performance" into main
Diffstat (limited to 'test')
-rw-r--r--test/dialect/mysql/test_reflection.py18
-rw-r--r--test/dialect/oracle/test_reflection.py554
-rw-r--r--test/dialect/postgresql/test_dialect.py20
-rw-r--r--test/dialect/postgresql/test_reflection.py441
-rw-r--r--test/dialect/postgresql/test_types.py35
-rw-r--r--test/dialect/test_sqlite.py31
-rw-r--r--test/engine/test_reflection.py57
-rw-r--r--test/perf/many_table_reflection.py617
-rw-r--r--test/requirements.py80
9 files changed, 1658 insertions, 195 deletions
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py
index f414c9c37..846001347 100644
--- a/test/dialect/mysql/test_reflection.py
+++ b/test/dialect/mysql/test_reflection.py
@@ -33,9 +33,9 @@ from sqlalchemy import UniqueConstraint
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.dialects.mysql import reflection as _reflection
from sqlalchemy.schema import CreateIndex
-from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -558,6 +558,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_skip_not_describable(self, metadata, connection):
+ """This test is the only one that test the _default_multi_reflect
+ behaviour with UnreflectableTableError
+ """
+
@event.listens_for(metadata, "before_drop")
def cleanup(*arg, **kw):
with testing.db.begin() as conn:
@@ -579,14 +583,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
m.reflect(views=True, bind=conn)
eq_(m.tables["test_t2"].name, "test_t2")
- assert_raises_message(
- exc.UnreflectableTableError,
- "references invalid table",
- Table,
- "test_v",
- MetaData(),
- autoload_with=conn,
- )
+ with expect_raises_message(
+ exc.UnreflectableTableError, "references invalid table"
+ ):
+ Table("test_v", MetaData(), autoload_with=conn)
@testing.exclude("mysql", "<", (5, 0, 0), "no information_schema support")
def test_system_views(self):
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py
index bf76dca43..53eb94df3 100644
--- a/test/dialect/oracle/test_reflection.py
+++ b/test/dialect/oracle/test_reflection.py
@@ -27,14 +27,18 @@ from sqlalchemy.dialects.oracle.base import BINARY_FLOAT
from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION
from sqlalchemy.dialects.oracle.base import NUMBER
from sqlalchemy.dialects.oracle.base import REAL
+from sqlalchemy.engine import ObjectKind
from sqlalchemy.testing import assert_warns
from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import config
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
+from sqlalchemy.testing.schema import eq_compile_type
from sqlalchemy.testing.schema import Table
@@ -384,6 +388,7 @@ class SystemTableTablenamesTest(fixtures.TestBase):
__backend__ = True
def setup_test(self):
+
with testing.db.begin() as conn:
conn.exec_driver_sql("create table my_table (id integer)")
conn.exec_driver_sql(
@@ -417,6 +422,14 @@ class SystemTableTablenamesTest(fixtures.TestBase):
set(["my_table", "foo_table"]),
)
+ def test_reflect_system_table(self):
+ meta = MetaData()
+ t = Table("foo_table", meta, autoload_with=testing.db)
+ assert t.columns.keys() == ["id"]
+
+ t = Table("my_temp_table", meta, autoload_with=testing.db)
+ assert t.columns.keys() == ["id"]
+
class DontReflectIOTTest(fixtures.TestBase):
"""test that index overflow tables aren't included in
@@ -509,6 +522,228 @@ class TableReflectionTest(fixtures.TestBase):
tbl = Table("test_compress", m2, autoload_with=connection)
assert tbl.dialect_options["oracle"]["compress"] == "OLTP"
+ def test_reflect_hidden_column(self):
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql(
+ "CREATE TABLE my_table(id integer, hide integer INVISIBLE)"
+ )
+
+ try:
+ insp = inspect(conn)
+ cols = insp.get_columns("my_table")
+ assert len(cols) == 1
+ assert cols[0]["name"] == "id"
+ finally:
+ conn.exec_driver_sql("DROP TABLE my_table")
+
+
+class ViewReflectionTest(fixtures.TestBase):
+ __only_on__ = "oracle"
+ __backend__ = True
+
+ @classmethod
+ def setup_test_class(cls):
+ sql = """
+ CREATE TABLE tbl (
+ id INTEGER PRIMARY KEY,
+ data INTEGER
+ );
+
+ CREATE VIEW tbl_plain_v AS
+ SELECT id, data FROM tbl WHERE id > 100;
+
+ -- comments on plain views are created with "comment on table"
+ -- because why not..
+ COMMENT ON TABLE tbl_plain_v IS 'view comment';
+
+ CREATE MATERIALIZED VIEW tbl_v AS
+ SELECT id, data FROM tbl WHERE id > 42;
+
+ COMMENT ON MATERIALIZED VIEW tbl_v IS 'my mat view comment';
+
+ CREATE MATERIALIZED VIEW tbl_v2 AS
+ SELECT id, data FROM tbl WHERE id < 42;
+
+ COMMENT ON MATERIALIZED VIEW tbl_v2 IS 'my other mat view comment';
+
+ CREATE SYNONYM view_syn FOR tbl_plain_v;
+ CREATE SYNONYM %(test_schema)s.ts_v_s FOR tbl_plain_v;
+
+ CREATE VIEW %(test_schema)s.schema_view AS
+ SELECT 1 AS value FROM dual;
+
+ COMMENT ON TABLE %(test_schema)s.schema_view IS 'schema view comment';
+ CREATE SYNONYM syn_schema_view FOR %(test_schema)s.schema_view;
+ """
+ if testing.requires.oracle_test_dblink.enabled:
+ cls.dblink = config.file_config.get(
+ "sqla_testing", "oracle_db_link"
+ )
+ sql += """
+ CREATE SYNONYM syn_link FOR tbl_plain_v@%(link)s;
+ """ % {
+ "link": cls.dblink
+ }
+ with testing.db.begin() as conn:
+ for stmt in (
+ sql % {"test_schema": testing.config.test_schema}
+ ).split(";"):
+ if stmt.strip():
+ conn.exec_driver_sql(stmt)
+
+ @classmethod
+ def teardown_test_class(cls):
+ sql = """
+ DROP MATERIALIZED VIEW tbl_v;
+ DROP MATERIALIZED VIEW tbl_v2;
+ DROP VIEW tbl_plain_v;
+ DROP TABLE tbl;
+ DROP VIEW %(test_schema)s.schema_view;
+ DROP SYNONYM view_syn;
+ DROP SYNONYM %(test_schema)s.ts_v_s;
+ DROP SYNONYM syn_schema_view;
+ """
+ if testing.requires.oracle_test_dblink.enabled:
+ sql += """
+ DROP SYNONYM syn_link;
+ """
+ with testing.db.begin() as conn:
+ for stmt in (
+ sql % {"test_schema": testing.config.test_schema}
+ ).split(";"):
+ if stmt.strip():
+ conn.exec_driver_sql(stmt)
+
+ def test_get_names(self, connection):
+ insp = inspect(connection)
+ eq_(insp.get_table_names(), ["tbl"])
+ eq_(insp.get_view_names(), ["tbl_plain_v"])
+ eq_(insp.get_materialized_view_names(), ["tbl_v", "tbl_v2"])
+ eq_(
+ insp.get_view_names(schema=testing.config.test_schema),
+ ["schema_view"],
+ )
+
+ def test_get_table_comment_on_view(self, connection):
+ insp = inspect(connection)
+ eq_(insp.get_table_comment("tbl_v"), {"text": "my mat view comment"})
+ eq_(insp.get_table_comment("tbl_plain_v"), {"text": "view comment"})
+
+ def test_get_multi_view_comment(self, connection):
+ insp = inspect(connection)
+ plain = {(None, "tbl_plain_v"): {"text": "view comment"}}
+ mat = {
+ (None, "tbl_v"): {"text": "my mat view comment"},
+ (None, "tbl_v2"): {"text": "my other mat view comment"},
+ }
+ eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain)
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW),
+ mat,
+ )
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW),
+ {**plain, **mat},
+ )
+ ts = testing.config.test_schema
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW, schema=ts),
+ {(ts, "schema_view"): {"text": "schema view comment"}},
+ )
+ eq_(insp.get_multi_table_comment(), {(None, "tbl"): {"text": None}})
+
+ def test_get_table_comment_synonym(self, connection):
+ insp = inspect(connection)
+ eq_(
+ insp.get_table_comment("view_syn", oracle_resolve_synonyms=True),
+ {"text": "view comment"},
+ )
+ eq_(
+ insp.get_table_comment(
+ "syn_schema_view", oracle_resolve_synonyms=True
+ ),
+ {"text": "schema view comment"},
+ )
+ eq_(
+ insp.get_table_comment(
+ "ts_v_s",
+ oracle_resolve_synonyms=True,
+ schema=testing.config.test_schema,
+ ),
+ {"text": "view comment"},
+ )
+
+ def test_get_multi_view_comment_synonym(self, connection):
+ insp = inspect(connection)
+ exp = {
+ (None, "view_syn"): {"text": "view comment"},
+ (None, "syn_schema_view"): {"text": "schema view comment"},
+ }
+ if testing.requires.oracle_test_dblink.enabled:
+ exp[(None, "syn_link")] = {"text": "view comment"}
+ eq_(
+ insp.get_multi_table_comment(
+ oracle_resolve_synonyms=True, kind=ObjectKind.ANY_VIEW
+ ),
+ exp,
+ )
+ ts = testing.config.test_schema
+ eq_(
+ insp.get_multi_table_comment(
+ oracle_resolve_synonyms=True,
+ schema=ts,
+ kind=ObjectKind.ANY_VIEW,
+ ),
+ {(ts, "ts_v_s"): {"text": "view comment"}},
+ )
+
+ def test_get_view_definition(self, connection):
+ insp = inspect(connection)
+ eq_(
+ insp.get_view_definition("tbl_plain_v"),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition("tbl_v"),
+ "SELECT id, data FROM tbl WHERE id > 42",
+ )
+ with expect_raises(exc.NoSuchTableError):
+ eq_(insp.get_view_definition("view_syn"), None)
+ eq_(
+ insp.get_view_definition("view_syn", oracle_resolve_synonyms=True),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition(
+ "syn_schema_view", oracle_resolve_synonyms=True
+ ),
+ "SELECT 1 AS value FROM dual",
+ )
+ eq_(
+ insp.get_view_definition(
+ "ts_v_s",
+ oracle_resolve_synonyms=True,
+ schema=testing.config.test_schema,
+ ),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+
+ @testing.requires.oracle_test_dblink
+ def test_get_view_definition_dblink(self, connection):
+ insp = inspect(connection)
+ eq_(
+ insp.get_view_definition("syn_link", oracle_resolve_synonyms=True),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition("tbl_plain_v", dblink=self.dblink),
+ "SELECT id, data FROM tbl WHERE id > 100",
+ )
+ eq_(
+ insp.get_view_definition("tbl_v", dblink=self.dblink),
+ "SELECT id, data FROM tbl WHERE id > 42",
+ )
+
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = "oracle"
@@ -722,8 +957,6 @@ class DBLinkReflectionTest(fixtures.TestBase):
@classmethod
def setup_test_class(cls):
- from sqlalchemy.testing import config
-
cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link")
# note that the synonym here is still not totally functional
@@ -863,3 +1096,320 @@ class IdentityReflectionTest(fixtures.TablesTest):
exp = common.copy()
exp["order"] = True
eq_(col["identity"], exp)
+
+
+class AdditionalReflectionTests(fixtures.TestBase):
+ __only_on__ = "oracle"
+ __backend__ = True
+
+ @classmethod
+ def setup_test_class(cls):
+ # currently assuming full DBA privs for the user.
+ # don't really know how else to go here unless
+ # we connect as the other user.
+
+ sql = """
+CREATE TABLE %(schema)sparent(
+ id INTEGER,
+ data VARCHAR2(50),
+ CONSTRAINT parent_pk_%(schema_id)s PRIMARY KEY (id)
+);
+CREATE TABLE %(schema)smy_table(
+ id INTEGER,
+ name VARCHAR2(125),
+ related INTEGER,
+ data%(schema_id)s NUMBER NOT NULL,
+ CONSTRAINT my_table_pk_%(schema_id)s PRIMARY KEY (id),
+ CONSTRAINT my_table_fk_%(schema_id)s FOREIGN KEY(related)
+ REFERENCES %(schema)sparent(id),
+ CONSTRAINT my_table_check_%(schema_id)s CHECK (data%(schema_id)s > 42),
+ CONSTRAINT data_unique%(schema_id)s UNIQUE (data%(schema_id)s)
+);
+CREATE INDEX my_table_index_%(schema_id)s on %(schema)smy_table (id, name);
+COMMENT ON TABLE %(schema)smy_table IS 'my table comment %(schema_id)s';
+COMMENT ON COLUMN %(schema)smy_table.name IS
+'my table.name comment %(schema_id)s';
+"""
+
+ with testing.db.begin() as conn:
+ for schema in ("", testing.config.test_schema):
+ dd = {
+ "schema": f"{schema}." if schema else "",
+ "schema_id": "sch" if schema else "",
+ }
+ for stmt in (sql % dd).split(";"):
+ if stmt.strip():
+ conn.exec_driver_sql(stmt)
+
+ @classmethod
+ def teardown_test_class(cls):
+ sql = """
+drop table %(schema)smy_table;
+drop table %(schema)sparent;
+"""
+ with testing.db.begin() as conn:
+ for schema in ("", testing.config.test_schema):
+ dd = {"schema": f"{schema}." if schema else ""}
+ for stmt in (sql % dd).split(";"):
+ if stmt.strip():
+ try:
+ conn.exec_driver_sql(stmt)
+ except:
+ pass
+
+ def setup_test(self):
+ self.dblink = config.file_config.get("sqla_testing", "oracle_db_link")
+ self.dblink2 = config.file_config.get(
+ "sqla_testing", "oracle_db_link2"
+ )
+ self.columns = {}
+ self.indexes = {}
+ self.primary_keys = {}
+ self.comments = {}
+ self.uniques = {}
+ self.checks = {}
+ self.foreign_keys = {}
+ self.options = {}
+ self.allDicts = [
+ self.columns,
+ self.indexes,
+ self.primary_keys,
+ self.comments,
+ self.uniques,
+ self.checks,
+ self.foreign_keys,
+ self.options,
+ ]
+ for schema in (None, testing.config.test_schema):
+ suffix = "sch" if schema else ""
+
+ self.columns[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": "id",
+ "nullable": False,
+ "type": eq_compile_type("INTEGER"),
+ "default": None,
+ "comment": None,
+ },
+ {
+ "name": "name",
+ "nullable": True,
+ "type": eq_compile_type("VARCHAR(125)"),
+ "default": None,
+ "comment": f"my table.name comment {suffix}",
+ },
+ {
+ "name": "related",
+ "nullable": True,
+ "type": eq_compile_type("INTEGER"),
+ "default": None,
+ "comment": None,
+ },
+ {
+ "name": f"data{suffix}",
+ "nullable": False,
+ "type": eq_compile_type("NUMBER"),
+ "default": None,
+ "comment": None,
+ },
+ ],
+ (schema, "parent"): [
+ {
+ "name": "id",
+ "nullable": False,
+ "type": eq_compile_type("INTEGER"),
+ "default": None,
+ "comment": None,
+ },
+ {
+ "name": "data",
+ "nullable": True,
+ "type": eq_compile_type("VARCHAR(50)"),
+ "default": None,
+ "comment": None,
+ },
+ ],
+ }
+ self.indexes[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"data_unique{suffix}",
+ "column_names": [f"data{suffix}"],
+ "dialect_options": {},
+ "unique": True,
+ },
+ {
+ "name": f"my_table_index_{suffix}",
+ "column_names": ["id", "name"],
+ "dialect_options": {},
+ "unique": False,
+ },
+ ],
+ (schema, "parent"): [],
+ }
+ self.primary_keys[schema] = {
+ (schema, "my_table"): {
+ "name": f"my_table_pk_{suffix}",
+ "constrained_columns": ["id"],
+ },
+ (schema, "parent"): {
+ "name": f"parent_pk_{suffix}",
+ "constrained_columns": ["id"],
+ },
+ }
+ self.comments[schema] = {
+ (schema, "my_table"): {"text": f"my table comment {suffix}"},
+ (schema, "parent"): {"text": None},
+ }
+ self.foreign_keys[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"my_table_fk_{suffix}",
+ "constrained_columns": ["related"],
+ "referred_schema": schema,
+ "referred_table": "parent",
+ "referred_columns": ["id"],
+ "options": {},
+ }
+ ],
+ (schema, "parent"): [],
+ }
+ self.checks[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"my_table_check_{suffix}",
+ "sqltext": f"data{suffix} > 42",
+ }
+ ],
+ (schema, "parent"): [],
+ }
+ self.uniques[schema] = {
+ (schema, "my_table"): [
+ {
+ "name": f"data_unique{suffix}",
+ "column_names": [f"data{suffix}"],
+ "duplicates_index": f"data_unique{suffix}",
+ }
+ ],
+ (schema, "parent"): [],
+ }
+ self.options[schema] = {
+ (schema, "my_table"): {},
+ (schema, "parent"): {},
+ }
+
+ def test_tables(self, connection):
+ insp = inspect(connection)
+
+ eq_(sorted(insp.get_table_names()), ["my_table", "parent"])
+
+ def _check_reflection(self, conn, schema, res_schema=False, **kw):
+ if res_schema is False:
+ res_schema = schema
+ insp = inspect(conn)
+ eq_(
+ insp.get_multi_columns(schema=schema, **kw),
+ self.columns[res_schema],
+ )
+ eq_(
+ insp.get_multi_indexes(schema=schema, **kw),
+ self.indexes[res_schema],
+ )
+ eq_(
+ insp.get_multi_pk_constraint(schema=schema, **kw),
+ self.primary_keys[res_schema],
+ )
+ eq_(
+ insp.get_multi_table_comment(schema=schema, **kw),
+ self.comments[res_schema],
+ )
+ eq_(
+ insp.get_multi_foreign_keys(schema=schema, **kw),
+ self.foreign_keys[res_schema],
+ )
+ eq_(
+ insp.get_multi_check_constraints(schema=schema, **kw),
+ self.checks[res_schema],
+ )
+ eq_(
+ insp.get_multi_unique_constraints(schema=schema, **kw),
+ self.uniques[res_schema],
+ )
+ eq_(
+ insp.get_multi_table_options(schema=schema, **kw),
+ self.options[res_schema],
+ )
+
+ @testing.combinations(True, False, argnames="schema")
+ def test_schema_translate_map(self, connection, schema):
+ schema = testing.config.test_schema if schema else None
+ c = connection.execution_options(
+ schema_translate_map={
+ None: "foo",
+ testing.config.test_schema: "bar",
+ }
+ )
+ self._check_reflection(c, schema)
+
+ @testing.requires.oracle_test_dblink
+ def test_db_link(self, connection):
+ self._check_reflection(connection, schema=None, dblink=self.dblink)
+ self._check_reflection(
+ connection,
+ schema=testing.config.test_schema,
+ dblink=self.dblink,
+ )
+
+ def test_no_synonyms(self, connection):
+ # oracle_resolve_synonyms is ignored if there are no matching synonym
+ self._check_reflection(
+ connection, schema=None, oracle_resolve_synonyms=True
+ )
+ connection.exec_driver_sql("CREATE SYNONYM tmp FOR parent")
+ for dict_ in self.allDicts:
+ dict_["tmp"] = {(None, "parent"): dict_[None][(None, "parent")]}
+ try:
+ self._check_reflection(
+ connection,
+ schema=None,
+ res_schema="tmp",
+ oracle_resolve_synonyms=True,
+ filter_names=["parent"],
+ )
+ finally:
+ connection.exec_driver_sql("DROP SYNONYM tmp")
+
+ @testing.requires.oracle_test_dblink
+ @testing.requires.oracle_test_dblink2
+ def test_multi_dblink_synonyms(self, connection):
+ # oracle_resolve_synonyms handles multiple dblink at once
+ connection.exec_driver_sql(
+ f"CREATE SYNONYM s1 FOR my_table@{self.dblink}"
+ )
+ connection.exec_driver_sql(
+ f"CREATE SYNONYM s2 FOR {testing.config.test_schema}."
+ f"my_table@{self.dblink2}"
+ )
+ connection.exec_driver_sql("CREATE SYNONYM s3 FOR parent")
+ for dict_ in self.allDicts:
+ dict_["tmp"] = {
+ (None, "s1"): dict_[None][(None, "my_table")],
+ (None, "s2"): dict_[testing.config.test_schema][
+ (testing.config.test_schema, "my_table")
+ ],
+ (None, "s3"): dict_[None][(None, "parent")],
+ }
+ fk = self.foreign_keys["tmp"][(None, "s1")][0]
+ fk["referred_table"] = "s3"
+ try:
+ self._check_reflection(
+ connection,
+ schema=None,
+ res_schema="tmp",
+ oracle_resolve_synonyms=True,
+ )
+ finally:
+ connection.exec_driver_sql("DROP SYNONYM s1")
+ connection.exec_driver_sql("DROP SYNONYM s2")
+ connection.exec_driver_sql("DROP SYNONYM s3")
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index 0093eb5ba..d55aa8203 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -887,19 +887,12 @@ class MiscBackendTest(
)
@testing.combinations(
- ((8, 1), False, False),
- ((8, 1), None, False),
- ((11, 5), True, False),
- ((11, 5), False, True),
+ (True, False),
+ (False, True),
)
- def test_backslash_escapes_detection(
- self, version, explicit_setting, expected
- ):
+ def test_backslash_escapes_detection(self, explicit_setting, expected):
engine = engines.testing_engine()
- def _server_version(conn):
- return version
-
if explicit_setting is not None:
@event.listens_for(engine, "connect", insert=True)
@@ -912,11 +905,8 @@ class MiscBackendTest(
)
dbapi_connection.commit()
- with mock.patch.object(
- engine.dialect, "_get_server_version_info", _server_version
- ):
- with engine.connect():
- eq_(engine.dialect._backslash_escapes, expected)
+ with engine.connect():
+ eq_(engine.dialect._backslash_escapes, expected)
def test_dbapi_autocommit_attribute(self):
"""all the supported DBAPIs have an .autocommit attribute. make
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 3e0569d32..21b4149bc 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -27,17 +27,21 @@ from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import ExcludeConstraint
from sqlalchemy.dialects.postgresql import INTEGER
from sqlalchemy.dialects.postgresql import INTERVAL
+from sqlalchemy.dialects.postgresql import pg_catalog
from sqlalchemy.dialects.postgresql import TSRANGE
+from sqlalchemy.engine import ObjectKind
+from sqlalchemy.engine import ObjectScope
from sqlalchemy.schema import CreateIndex
from sqlalchemy.sql.schema import CheckConstraint
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
-from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_warns
from sqlalchemy.testing.assertions import AssertsExecutionResults
from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.assertions import is_
+from sqlalchemy.testing.assertions import is_false
from sqlalchemy.testing.assertions import is_true
@@ -231,17 +235,36 @@ class MaterializedViewReflectionTest(
connection.execute(target.insert(), {"id": 89, "data": "d1"})
materialized_view = sa.DDL(
- "CREATE MATERIALIZED VIEW test_mview AS " "SELECT * FROM testtable"
+ "CREATE MATERIALIZED VIEW test_mview AS SELECT * FROM testtable"
)
plain_view = sa.DDL(
- "CREATE VIEW test_regview AS " "SELECT * FROM testtable"
+ "CREATE VIEW test_regview AS SELECT data FROM testtable"
)
sa.event.listen(testtable, "after_create", plain_view)
sa.event.listen(testtable, "after_create", materialized_view)
sa.event.listen(
testtable,
+ "after_create",
+ sa.DDL("COMMENT ON VIEW test_regview IS 'regular view comment'"),
+ )
+ sa.event.listen(
+ testtable,
+ "after_create",
+ sa.DDL(
+ "COMMENT ON MATERIALIZED VIEW test_mview "
+ "IS 'materialized view comment'"
+ ),
+ )
+ sa.event.listen(
+ testtable,
+ "after_create",
+ sa.DDL("CREATE INDEX mat_index ON test_mview(data DESC)"),
+ )
+
+ sa.event.listen(
+ testtable,
"before_drop",
sa.DDL("DROP MATERIALIZED VIEW test_mview"),
)
@@ -249,6 +272,12 @@ class MaterializedViewReflectionTest(
testtable, "before_drop", sa.DDL("DROP VIEW test_regview")
)
+ def test_has_type(self, connection):
+ insp = inspect(connection)
+ is_true(insp.has_type("test_mview"))
+ is_true(insp.has_type("test_regview"))
+ is_true(insp.has_type("testtable"))
+
def test_mview_is_reflected(self, connection):
metadata = MetaData()
table = Table("test_mview", metadata, autoload_with=connection)
@@ -265,49 +294,99 @@ class MaterializedViewReflectionTest(
def test_get_view_names(self, inspect_fixture):
insp, conn = inspect_fixture
- eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"]))
+ eq_(set(insp.get_view_names()), set(["test_regview"]))
- def test_get_view_names_plain(self, connection):
+ def test_get_materialized_view_names(self, inspect_fixture):
+ insp, conn = inspect_fixture
+ eq_(set(insp.get_materialized_view_names()), set(["test_mview"]))
+
+ def test_get_view_names_reflection_cache_ok(self, connection):
insp = inspect(connection)
+ eq_(set(insp.get_view_names()), set(["test_regview"]))
eq_(
- set(insp.get_view_names(include=("plain",))), set(["test_regview"])
+ set(insp.get_materialized_view_names()),
+ set(["test_mview"]),
+ )
+ eq_(
+ set(insp.get_view_names()).union(
+ insp.get_materialized_view_names()
+ ),
+ set(["test_regview", "test_mview"]),
)
- def test_get_view_names_plain_string(self, connection):
+ def test_get_view_definition(self, connection):
insp = inspect(connection)
- eq_(set(insp.get_view_names(include="plain")), set(["test_regview"]))
- def test_get_view_names_materialized(self, connection):
- insp = inspect(connection)
+ def normalize(definition):
+ return re.sub(r"[\n\t ]+", " ", definition.strip())
+
eq_(
- set(insp.get_view_names(include=("materialized",))),
- set(["test_mview"]),
+ normalize(insp.get_view_definition("test_mview")),
+ "SELECT testtable.id, testtable.data FROM testtable;",
+ )
+ eq_(
+ normalize(insp.get_view_definition("test_regview")),
+ "SELECT testtable.data FROM testtable;",
)
- def test_get_view_names_reflection_cache_ok(self, connection):
+ def test_get_view_comment(self, connection):
insp = inspect(connection)
eq_(
- set(insp.get_view_names(include=("plain",))), set(["test_regview"])
+ insp.get_table_comment("test_regview"),
+ {"text": "regular view comment"},
)
eq_(
- set(insp.get_view_names(include=("materialized",))),
- set(["test_mview"]),
+ insp.get_table_comment("test_mview"),
+ {"text": "materialized view comment"},
)
- eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"]))
- def test_get_view_names_empty(self, connection):
+ def test_get_multi_view_comment(self, connection):
insp = inspect(connection)
- assert_raises(ValueError, insp.get_view_names, include=())
+ eq_(
+ insp.get_multi_table_comment(),
+ {(None, "testtable"): {"text": None}},
+ )
+ plain = {(None, "test_regview"): {"text": "regular view comment"}}
+ mat = {(None, "test_mview"): {"text": "materialized view comment"}}
+ eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain)
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW),
+ mat,
+ )
+ eq_(
+ insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW),
+ {**plain, **mat},
+ )
+ eq_(
+ insp.get_multi_table_comment(
+ kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY
+ ),
+ {},
+ )
- def test_get_view_definition(self, connection):
+ def test_get_multi_view_indexes(self, connection):
insp = inspect(connection)
+ eq_(insp.get_multi_indexes(), {(None, "testtable"): []})
+
+ exp = {
+ "name": "mat_index",
+ "unique": False,
+ "column_names": ["data"],
+ "column_sorting": {"data": ("desc",)},
+ }
+ if connection.dialect.server_version_info >= (11, 0):
+ exp["include_columns"] = []
+ exp["dialect_options"] = {"postgresql_include": []}
+ plain = {(None, "test_regview"): []}
+ mat = {(None, "test_mview"): [exp]}
+ eq_(insp.get_multi_indexes(kind=ObjectKind.VIEW), plain)
+ eq_(insp.get_multi_indexes(kind=ObjectKind.MATERIALIZED_VIEW), mat)
+ eq_(insp.get_multi_indexes(kind=ObjectKind.ANY_VIEW), {**plain, **mat})
eq_(
- re.sub(
- r"[\n\t ]+",
- " ",
- insp.get_view_definition("test_mview").strip(),
+ insp.get_multi_indexes(
+ kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY
),
- "SELECT testtable.id, testtable.data FROM testtable;",
+ {},
)
@@ -993,9 +1072,9 @@ class ReflectionTest(
go,
[
"Skipped unsupported reflection of "
- "expression-based index idx1",
+ "expression-based index idx1 of table party",
"Skipped unsupported reflection of "
- "expression-based index idx3",
+ "expression-based index idx3 of table party",
],
)
@@ -1016,7 +1095,7 @@ class ReflectionTest(
metadata.create_all(connection)
- ind = connection.dialect.get_indexes(connection, t1, None)
+ ind = connection.dialect.get_indexes(connection, t1.name, None)
partial_definitions = []
for ix in ind:
@@ -1337,6 +1416,9 @@ class ReflectionTest(
}
],
)
+ is_true(inspector.has_type("mood", "test_schema"))
+ is_true(inspector.has_type("mood", "*"))
+ is_false(inspector.has_type("mood"))
def test_inspect_enums(self, metadata, inspect_fixture):
@@ -1345,30 +1427,49 @@ class ReflectionTest(
enum_type = postgresql.ENUM(
"cat", "dog", "rat", name="pet", metadata=metadata
)
+ enum_type.create(conn)
+ conn.commit()
- with conn.begin():
- enum_type.create(conn)
-
- eq_(
- inspector.get_enums(),
- [
- {
- "visible": True,
- "labels": ["cat", "dog", "rat"],
- "name": "pet",
- "schema": "public",
- }
- ],
- )
-
- def test_get_table_oid(self, metadata, inspect_fixture):
-
- inspector, conn = inspect_fixture
+ res = [
+ {
+ "visible": True,
+ "labels": ["cat", "dog", "rat"],
+ "name": "pet",
+ "schema": "public",
+ }
+ ]
+ eq_(inspector.get_enums(), res)
+ is_true(inspector.has_type("pet", "*"))
+ is_true(inspector.has_type("pet"))
+ is_false(inspector.has_type("pet", "test_schema"))
+
+ enum_type.drop(conn)
+ conn.commit()
+ eq_(inspector.get_enums(), res)
+ is_true(inspector.has_type("pet"))
+ inspector.clear_cache()
+ eq_(inspector.get_enums(), [])
+ is_false(inspector.has_type("pet"))
+
+ def test_get_table_oid(self, metadata, connection):
+ Table("t1", metadata, Column("col", Integer))
+ Table("t1", metadata, Column("col", Integer), schema="test_schema")
+ metadata.create_all(connection)
+ insp = inspect(connection)
+ oid = insp.get_table_oid("t1")
+ oid_schema = insp.get_table_oid("t1", schema="test_schema")
+ is_true(isinstance(oid, int))
+ is_true(isinstance(oid_schema, int))
+ is_true(oid != oid_schema)
- with conn.begin():
- Table("some_table", metadata, Column("q", Integer)).create(conn)
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_table_oid("does_not_exist")
- assert inspector.get_table_oid("some_table") is not None
+ metadata.tables["t1"].drop(connection)
+ eq_(insp.get_table_oid("t1"), oid)
+ insp.clear_cache()
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_table_oid("t1")
def test_inspect_enums_case_sensitive(self, metadata, connection):
sa.event.listen(
@@ -1707,77 +1808,146 @@ class ReflectionTest(
)
def test_reflect_check_warning(self):
- rows = [("some name", "NOTCHECK foobar")]
+ rows = [("foo", "some name", "NOTCHECK foobar")]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
)
)
- with mock.patch.object(
- testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1
+ with testing.expect_warnings(
+ "Could not parse CHECK constraint text: 'NOTCHECK foobar'"
):
- with testing.expect_warnings(
- "Could not parse CHECK constraint text: 'NOTCHECK foobar'"
- ):
- testing.db.dialect.get_check_constraints(conn, "foo")
+ testing.db.dialect.get_check_constraints(conn, "foo")
def test_reflect_extra_newlines(self):
rows = [
- ("some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"),
- ("some other name", "CHECK ((b\nIS\nNOT\nNULL))"),
- ("some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"),
- ("some name", "CHECK (c != 'hi\nim a name\n')"),
+ ("foo", "some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"),
+ ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))"),
+ ("foo", "some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"),
+ ("foo", "some name", "CHECK (c != 'hi\nim a name\n')"),
]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
)
)
- with mock.patch.object(
- testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1
- ):
- check_constraints = testing.db.dialect.get_check_constraints(
- conn, "foo"
- )
- eq_(
- check_constraints,
- [
- {
- "name": "some name",
- "sqltext": "a \nIS\n NOT\n\n NULL\n",
- },
- {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"},
- {
- "name": "some CRLF name",
- "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL",
- },
- {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"},
- ],
- )
+ check_constraints = testing.db.dialect.get_check_constraints(
+ conn, "foo"
+ )
+ eq_(
+ check_constraints,
+ [
+ {
+ "name": "some name",
+ "sqltext": "a \nIS\n NOT\n\n NULL\n",
+ },
+ {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"},
+ {
+ "name": "some CRLF name",
+ "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL",
+ },
+ {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"},
+ ],
+ )
def test_reflect_with_not_valid_check_constraint(self):
- rows = [("some name", "CHECK ((a IS NOT NULL)) NOT VALID")]
+ rows = [("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID")]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
)
)
- with mock.patch.object(
- testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1
- ):
- check_constraints = testing.db.dialect.get_check_constraints(
- conn, "foo"
+ check_constraints = testing.db.dialect.get_check_constraints(
+ conn, "foo"
+ )
+ eq_(
+ check_constraints,
+ [
+ {
+ "name": "some name",
+ "sqltext": "a IS NOT NULL",
+ "dialect_options": {"not_valid": True},
+ }
+ ],
+ )
+
+ def _apply_stm(self, connection, use_map):
+ if use_map:
+ return connection.execution_options(
+ schema_translate_map={
+ None: "foo",
+ testing.config.test_schema: "bar",
+ }
)
- eq_(
- check_constraints,
- [
- {
- "name": "some name",
- "sqltext": "a IS NOT NULL",
- "dialect_options": {"not_valid": True},
- }
- ],
+ else:
+ return connection
+
+ @testing.combinations(True, False, argnames="use_map")
+ @testing.combinations(True, False, argnames="schema")
+ def test_schema_translate_map(self, metadata, connection, use_map, schema):
+ schema = testing.config.test_schema if schema else None
+ Table(
+ "foo",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("a", Integer, index=True),
+ Column(
+ "b",
+ ForeignKey(f"{schema}.foo.id" if schema else "foo.id"),
+ unique=True,
+ ),
+ CheckConstraint("a>10", name="foo_check"),
+ comment="comm",
+ schema=schema,
+ )
+ metadata.create_all(connection)
+ if use_map:
+ connection = connection.execution_options(
+ schema_translate_map={
+ None: "foo",
+ testing.config.test_schema: "bar",
+ }
)
+ insp = inspect(connection)
+ eq_(
+ [c["name"] for c in insp.get_columns("foo", schema=schema)],
+ ["id", "a", "b"],
+ )
+ eq_(
+ [
+ i["column_names"]
+ for i in insp.get_indexes("foo", schema=schema)
+ ],
+ [["b"], ["a"]],
+ )
+ eq_(
+ insp.get_pk_constraint("foo", schema=schema)[
+ "constrained_columns"
+ ],
+ ["id"],
+ )
+ eq_(insp.get_table_comment("foo", schema=schema), {"text": "comm"})
+ eq_(
+ [
+ f["constrained_columns"]
+ for f in insp.get_foreign_keys("foo", schema=schema)
+ ],
+ [["b"]],
+ )
+ eq_(
+ [
+ c["name"]
+ for c in insp.get_check_constraints("foo", schema=schema)
+ ],
+ ["foo_check"],
+ )
+ eq_(
+ [
+ u["column_names"]
+ for u in insp.get_unique_constraints("foo", schema=schema)
+ ],
+ [["b"]],
+ )
class CustomTypeReflectionTest(fixtures.TestBase):
@@ -1804,9 +1974,23 @@ class CustomTypeReflectionTest(fixtures.TestBase):
("my_custom_type(ARG1)", ("ARG1", None)),
("my_custom_type(ARG1, ARG2)", ("ARG1", "ARG2")),
]:
- column_info = dialect._get_column_info(
- "colname", sch, None, False, {}, {}, "public", None, "", None
+ row_dict = {
+ "name": "colname",
+ "table_name": "tblname",
+ "format_type": sch,
+ "default": None,
+ "not_null": False,
+ "comment": None,
+ "generated": "",
+ "identity_options": None,
+ }
+ column_info = dialect._get_columns_info(
+ [row_dict], {}, {}, "public"
)
+ assert ("public", "tblname") in column_info
+ column_info = column_info[("public", "tblname")]
+ assert len(column_info) == 1
+ column_info = column_info[0]
assert isinstance(column_info["type"], self.CustomType)
eq_(column_info["type"].arg1, args[0])
eq_(column_info["type"].arg2, args[1])
@@ -1951,3 +2135,64 @@ class IdentityReflectionTest(fixtures.TablesTest):
exp = default.copy()
exp.update(maxvalue=2**15 - 1)
eq_(col["identity"], exp)
+
+
+class TestReflectDifficultColTypes(fixtures.TablesTest):
+ __only_on__ = "postgresql"
+ __backend__ = True
+
+ def define_tables(metadata):
+ Table(
+ "sample_table",
+ metadata,
+ Column("c1", Integer, primary_key=True),
+ Column("c2", Integer, unique=True),
+ Column("c3", Integer),
+ Index("sample_table_index", "c2", "c3"),
+ )
+
+ def check_int_list(self, row, key):
+ value = row[key]
+ is_true(isinstance(value, list))
+ is_true(len(value) > 0)
+ is_true(all(isinstance(v, int) for v in value))
+
+ def test_pg_index(self, connection):
+ insp = inspect(connection)
+
+ pgc_oid = insp.get_table_oid("sample_table")
+ cols = [
+ col
+ for col in pg_catalog.pg_index.c
+ if testing.db.dialect.server_version_info
+ >= col.info.get("server_version", (0,))
+ ]
+
+ stmt = sa.select(*cols).filter_by(indrelid=pgc_oid)
+ rows = connection.execute(stmt).mappings().all()
+ is_true(len(rows) > 0)
+ cols = [
+ col
+ for col in ["indkey", "indoption", "indclass", "indcollation"]
+ if testing.db.dialect.server_version_info
+ >= pg_catalog.pg_index.c[col].info.get("server_version", (0,))
+ ]
+ for row in rows:
+ for col in cols:
+ self.check_int_list(row, col)
+
+ def test_pg_constraint(self, connection):
+ insp = inspect(connection)
+
+ pgc_oid = insp.get_table_oid("sample_table")
+ cols = [
+ col
+ for col in pg_catalog.pg_constraint.c
+ if testing.db.dialect.server_version_info
+ >= col.info.get("server_version", (0,))
+ ]
+ stmt = sa.select(*cols).filter_by(conrelid=pgc_oid)
+ rows = connection.execute(stmt).mappings().all()
+ is_true(len(rows) > 0)
+ for row in rows:
+ self.check_int_list(row, "conkey")
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index fd4b91db1..79f029391 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -454,11 +454,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
asserter.assert_(
# check for table
RegexSQL(
- "select relname from pg_class c join pg_namespace.*",
+ "SELECT pg_catalog.pg_class.relname FROM pg_catalog."
+ "pg_class JOIN pg_catalog.pg_namespace.*",
dialect="postgresql",
),
# check for enum, just once
- RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"),
+ RegexSQL(
+ r"SELECT pg_catalog.pg_type.typname .* WHERE "
+ "pg_catalog.pg_type.typname = ",
+ dialect="postgresql",
+ ),
RegexSQL("CREATE TYPE myenum AS ENUM .*", dialect="postgresql"),
RegexSQL(r"CREATE TABLE t .*", dialect="postgresql"),
)
@@ -468,11 +473,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
asserter.assert_(
RegexSQL(
- "select relname from pg_class c join pg_namespace.*",
+ "SELECT pg_catalog.pg_class.relname FROM pg_catalog."
+ "pg_class JOIN pg_catalog.pg_namespace.*",
dialect="postgresql",
),
RegexSQL("DROP TABLE t", dialect="postgresql"),
- RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"),
+ RegexSQL(
+ r"SELECT pg_catalog.pg_type.typname .* WHERE "
+ "pg_catalog.pg_type.typname = ",
+ dialect="postgresql",
+ ),
RegexSQL("DROP TYPE myenum", dialect="postgresql"),
)
@@ -694,23 +704,6 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
connection, "fourfivesixtype"
)
- def test_no_support(self, testing_engine):
- def server_version_info(self):
- return (8, 2)
-
- e = testing_engine()
- dialect = e.dialect
- dialect._get_server_version_info = server_version_info
-
- assert dialect.supports_native_enum
- e.connect()
- assert not dialect.supports_native_enum
-
- # initialize is called again on new pool
- e.dispose()
- e.connect()
- assert not dialect.supports_native_enum
-
def test_reflection(self, metadata, connection):
etype = Enum(
"four", "five", "six", name="fourfivesixtype", metadata=metadata
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index fb4331998..ed9d67612 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -50,6 +50,7 @@ from sqlalchemy.testing import combinations
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -856,27 +857,15 @@ class AttachedDBTest(fixtures.TestBase):
["foo", "bar"],
)
- eq_(
- [
- d["name"]
- for d in insp.get_columns("nonexistent", schema="test_schema")
- ],
- [],
- )
- eq_(
- [
- d["name"]
- for d in insp.get_columns("another_created", schema=None)
- ],
- [],
- )
- eq_(
- [
- d["name"]
- for d in insp.get_columns("local_only", schema="test_schema")
- ],
- [],
- )
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_columns("nonexistent", schema="test_schema")
+
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_columns("another_created", schema=None)
+
+ with expect_raises(exc.NoSuchTableError):
+ insp.get_columns("local_only", schema="test_schema")
+
eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
def test_table_names_present(self):
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index 76099e863..2f6c06ace 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -2,6 +2,7 @@ import unicodedata
import sqlalchemy as sa
from sqlalchemy import Computed
+from sqlalchemy import Connection
from sqlalchemy import DefaultClause
from sqlalchemy import event
from sqlalchemy import FetchedValue
@@ -17,6 +18,7 @@ from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import UniqueConstraint
+from sqlalchemy.engine import Inspector
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
@@ -1254,12 +1256,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
m2 = MetaData()
t2 = Table("x", m2, autoload_with=connection)
- ck = [
+ cks = [
const
for const in t2.constraints
if isinstance(const, sa.CheckConstraint)
- ][0]
-
+ ]
+ eq_(len(cks), 1)
+ ck = cks[0]
eq_regex(ck.sqltext.text, r"[\(`]*q[\)`]* > 10")
eq_(ck.name, "ck1")
@@ -1268,11 +1271,17 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
sa.Index("x_ix", t.c.a, t.c.b)
metadata.create_all(connection)
- def mock_get_columns(self, connection, table_name, **kw):
- return [{"name": "b", "type": Integer, "primary_key": False}]
+ gri = Inspector._get_reflection_info
+
+ def mock_gri(self, *a, **kw):
+ res = gri(self, *a, **kw)
+ res.columns[(None, "x")] = [
+ col for col in res.columns[(None, "x")] if col["name"] == "b"
+ ]
+ return res
with testing.mock.patch.object(
- connection.dialect, "get_columns", mock_get_columns
+ Inspector, "_get_reflection_info", mock_gri
):
m = MetaData()
with testing.expect_warnings(
@@ -1409,38 +1418,49 @@ class CreateDropTest(fixtures.TablesTest):
eq_(ua, ["users", "email_addresses"])
eq_(oi, ["orders", "items"])
- def test_checkfirst(self, connection):
+ def test_checkfirst(self, connection: Connection) -> None:
insp = inspect(connection)
+
users = self.tables.users
is_false(insp.has_table("users"))
users.create(connection)
+ insp.clear_cache()
is_true(insp.has_table("users"))
users.create(connection, checkfirst=True)
users.drop(connection)
users.drop(connection, checkfirst=True)
+ insp.clear_cache()
is_false(insp.has_table("users"))
users.create(connection, checkfirst=True)
users.drop(connection)
- def test_createdrop(self, connection):
+ def test_createdrop(self, connection: Connection) -> None:
insp = inspect(connection)
metadata = self.tables_test_metadata
+ assert metadata is not None
metadata.create_all(connection)
is_true(insp.has_table("items"))
is_true(insp.has_table("email_addresses"))
metadata.create_all(connection)
+ insp.clear_cache()
is_true(insp.has_table("items"))
metadata.drop_all(connection)
+ insp.clear_cache()
is_false(insp.has_table("items"))
is_false(insp.has_table("email_addresses"))
metadata.drop_all(connection)
+ insp.clear_cache()
is_false(insp.has_table("items"))
- def test_tablenames(self, connection):
+ def test_has_table_and_table_names(self, connection):
+ """establish that has_table and get_table_names are consistent w/
+ each other with regard to caching
+
+ """
metadata = self.tables_test_metadata
metadata.create_all(bind=connection)
insp = inspect(connection)
@@ -1448,6 +1468,19 @@ class CreateDropTest(fixtures.TablesTest):
# ensure all tables we created are in the list.
is_true(set(insp.get_table_names()).issuperset(metadata.tables))
+ assert insp.has_table("items")
+ assert "items" in insp.get_table_names()
+
+ self.tables.items.drop(connection)
+
+ # cached
+ assert insp.has_table("items")
+ assert "items" in insp.get_table_names()
+
+ insp = inspect(connection)
+ assert not insp.has_table("items")
+ assert "items" not in insp.get_table_names()
+
class SchemaManipulationTest(fixtures.TestBase):
__backend__ = True
@@ -1602,13 +1635,7 @@ class SchemaTest(fixtures.TestBase):
__backend__ = True
@testing.requires.schemas
- @testing.requires.cross_schema_fk_reflection
def test_has_schema(self):
- if not hasattr(testing.db.dialect, "has_schema"):
- testing.config.skip_test(
- "dialect %s doesn't have a has_schema method"
- % testing.db.dialect.name
- )
with testing.db.connect() as conn:
eq_(
testing.db.dialect.has_schema(
diff --git a/test/perf/many_table_reflection.py b/test/perf/many_table_reflection.py
new file mode 100644
index 000000000..8749df5c2
--- /dev/null
+++ b/test/perf/many_table_reflection.py
@@ -0,0 +1,617 @@
+from argparse import ArgumentDefaultsHelpFormatter
+from argparse import ArgumentParser
+from collections import defaultdict
+from contextlib import contextmanager
+from functools import wraps
+from pprint import pprint
+import random
+import time
+
+import sqlalchemy as sa
+from sqlalchemy.engine import Inspector
+
+types = (sa.Integer, sa.BigInteger, sa.String(200), sa.DateTime)
+USE_CONNECTION = False
+
+
+def generate_table(meta: sa.MetaData, min_cols, max_cols, dialect_name):
+ col_number = random.randint(min_cols, max_cols)
+ table_num = len(meta.tables)
+ add_identity = random.random() > 0.90
+ identity = sa.Identity(
+ always=random.randint(0, 1),
+ start=random.randint(1, 100),
+ increment=random.randint(1, 7),
+ )
+ is_mssql = dialect_name == "mssql"
+ cols = []
+ for i in range(col_number - (0 if is_mssql else add_identity)):
+ args = []
+ if random.random() < 0.95 or table_num == 0:
+ if is_mssql and add_identity and i == 0:
+ args.append(sa.Integer)
+ args.append(identity)
+ else:
+ args.append(random.choice(types))
+ else:
+ args.append(
+ sa.ForeignKey(f"table_{table_num-1}.table_{table_num-1}_col_1")
+ )
+ cols.append(
+ sa.Column(
+ f"table_{table_num}_col_{i+1}",
+ *args,
+ primary_key=i == 0,
+ comment=f"primary key of table_{table_num}"
+ if i == 0
+ else None,
+ index=random.random() > 0.9 and i > 0,
+ unique=random.random() > 0.95 and i > 0,
+ )
+ )
+ if add_identity and not is_mssql:
+ cols.append(
+ sa.Column(
+ f"table_{table_num}_col_{col_number}",
+ sa.Integer,
+ identity,
+ )
+ )
+ args = ()
+ if table_num % 3 == 0:
+ # mysql can't do check constraint on PK col
+ args = (sa.CheckConstraint(cols[1].is_not(None)),)
+ return sa.Table(
+ f"table_{table_num}",
+ meta,
+ *cols,
+ *args,
+ comment=f"comment for table_{table_num}" if table_num % 2 else None,
+ )
+
+
+def generate_meta(schema_name, table_number, min_cols, max_cols, dialect_name):
+ meta = sa.MetaData(schema=schema_name)
+ log = defaultdict(int)
+ for _ in range(table_number):
+ t = generate_table(meta, min_cols, max_cols, dialect_name)
+ log["tables"] += 1
+ log["columns"] += len(t.columns)
+ log["index"] += len(t.indexes)
+ log["check_con"] += len(
+ [c for c in t.constraints if isinstance(c, sa.CheckConstraint)]
+ )
+ log["foreign_keys_con"] += len(
+ [
+ c
+ for c in t.constraints
+ if isinstance(c, sa.ForeignKeyConstraint)
+ ]
+ )
+ log["unique_con"] += len(
+ [c for c in t.constraints if isinstance(c, sa.UniqueConstraint)]
+ )
+ log["identity"] += len([c for c in t.columns if c.identity])
+
+ print("Meta info", dict(log))
+ return meta
+
+
+def log(fn):
+ @wraps(fn)
+ def wrap(*a, **kw):
+ print("Running ", fn.__name__, "...", flush=True, end="")
+ try:
+ r = fn(*a, **kw)
+ except NotImplementedError:
+ print(" [not implemented]", flush=True)
+ r = None
+ else:
+ print("... done", flush=True)
+ return r
+
+ return wrap
+
+
+tests = {}
+
+
+def define_test(fn):
+ name: str = fn.__name__
+ if name.startswith("reflect_"):
+ name = name[8:]
+ tests[name] = wfn = log(fn)
+ return wfn
+
+
+@log
+def create_tables(engine, meta):
+ tables = list(meta.tables.values())
+ for i in range(0, len(tables), 500):
+ meta.create_all(engine, tables[i : i + 500])
+
+
+@log
+def drop_tables(engine, meta, schema_name, table_names: list):
+ tables = list(meta.tables.values())[::-1]
+ for i in range(0, len(tables), 500):
+ meta.drop_all(engine, tables[i : i + 500])
+
+ remaining = sa.inspect(engine).get_table_names(schema=schema_name)
+ suffix = ""
+ if engine.dialect.name.startswith("postgres"):
+ suffix = "CASCADE"
+
+ remaining = sorted(
+ remaining, key=lambda tn: int(tn.partition("_")[2]), reverse=True
+ )
+ with engine.connect() as conn:
+ for i, tn in enumerate(remaining):
+ if engine.dialect.requires_name_normalize:
+ name = engine.dialect.denormalize_name(tn)
+ else:
+ name = tn
+ if schema_name:
+ conn.execute(
+ sa.schema.DDL(
+ f'DROP TABLE {schema_name}."{name}" {suffix}'
+ )
+ )
+ else:
+ conn.execute(sa.schema.DDL(f'DROP TABLE "{name}" {suffix}'))
+ if i % 500 == 0:
+ conn.commit()
+ conn.commit()
+
+
+@log
+def reflect_tables(engine, schema_name):
+ ref_meta = sa.MetaData(schema=schema_name)
+ ref_meta.reflect(engine)
+
+
+def verify_dict(multi, single, str_compare=False):
+ if single is None or multi is None:
+ return
+ if single != multi:
+ keys = set(single) | set(multi)
+ diff = []
+ for key in sorted(keys):
+ se, me = single.get(key), multi.get(key)
+ if str(se) != str(me) if str_compare else se != me:
+ diff.append((key, single.get(key), multi.get(key)))
+ if diff:
+ print("\nfound different result:")
+ pprint(diff)
+
+
+def _single_test(
+ singe_fn_name,
+ multi_fn_name,
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+):
+ single = None
+ if "single" in mode:
+ singe_fn = getattr(Inspector, singe_fn_name)
+
+ def go(bind):
+ insp = sa.inspect(bind)
+ single = {}
+ with timing(singe_fn.__name__):
+ for t in table_names:
+ single[(schema_name, t)] = singe_fn(
+ insp, t, schema=schema_name
+ )
+ return single
+
+ if USE_CONNECTION:
+ with engine.connect() as c:
+ single = go(c)
+ else:
+ single = go(engine)
+
+ multi = None
+ if "multi" in mode:
+ insp = sa.inspect(engine)
+ multi_fn = getattr(Inspector, multi_fn_name)
+ with timing(multi_fn.__name__):
+ multi = multi_fn(insp, schema=schema_name)
+ return (multi, single)
+
+
+@define_test
+def reflect_columns(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_columns",
+ "get_multi_columns",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single, str_compare=True)
+
+
+@define_test
+def reflect_table_options(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_table_options",
+ "get_multi_table_options",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+@define_test
+def reflect_pk(engine, schema_name, table_names, timing, mode, ignore_diff):
+ multi, single = _single_test(
+ "get_pk_constraint",
+ "get_multi_pk_constraint",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+@define_test
+def reflect_comment(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_table_comment",
+ "get_multi_table_comment",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+@define_test
+def reflect_whole_tables(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ single = None
+ meta = sa.MetaData(schema=schema_name)
+
+ if "single" in mode:
+
+ def go(bind):
+ single = {}
+ with timing("Table_autoload_with"):
+ for name in table_names:
+ single[(None, name)] = sa.Table(
+ name, meta, autoload_with=bind
+ )
+ return single
+
+ if USE_CONNECTION:
+ with engine.connect() as c:
+ single = go(c)
+ else:
+ single = go(engine)
+
+ multi_meta = sa.MetaData(schema=schema_name)
+ if "multi" in mode:
+ with timing("MetaData_reflect"):
+ multi_meta.reflect(engine, only=table_names)
+ return (multi_meta, single)
+
+
+@define_test
+def reflect_check_constraints(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_check_constraints",
+ "get_multi_check_constraints",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+@define_test
+def reflect_indexes(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_indexes",
+ "get_multi_indexes",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+@define_test
+def reflect_foreign_keys(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_foreign_keys",
+ "get_multi_foreign_keys",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+@define_test
+def reflect_unique_constraints(
+ engine, schema_name, table_names, timing, mode, ignore_diff
+):
+ multi, single = _single_test(
+ "get_unique_constraints",
+ "get_multi_unique_constraints",
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ )
+ if not ignore_diff:
+ verify_dict(multi, single)
+
+
+def _apply_events(engine):
+ queries = defaultdict(list)
+
+ now = 0
+
+ @sa.event.listens_for(engine, "before_cursor_execute")
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+
+ nonlocal now
+ now = time.time()
+
+ @sa.event.listens_for(engine, "after_cursor_execute")
+ def after_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ total = time.time() - now
+
+ if context and context.compiled:
+ statement_str = context.compiled.string
+ else:
+ statement_str = statement
+ queries[statement_str].append(total)
+
+ return queries
+
+
+def _print_query_stats(queries):
+ number_of_queries = sum(
+ len(query_times) for query_times in queries.values()
+ )
+ print("-" * 50)
+ q_list = list(queries.items())
+ q_list.sort(key=lambda rec: -sum(rec[1]))
+ total = sum([sum(t) for _, t in q_list])
+ print(f"total number of queries: {number_of_queries}. Total time {total}")
+ print("-" * 50)
+
+ for stmt, times in q_list:
+ total_t = sum(times)
+ max_t = max(times)
+ min_t = min(times)
+ avg_t = total_t / len(times)
+ times.sort()
+ median_t = times[len(times) // 2]
+
+ print(
+ f"Query times: {total_t=}, {max_t=}, {min_t=}, {avg_t=}, "
+ f"{median_t=} Number of calls: {len(times)}"
+ )
+ print(stmt.strip(), "\n")
+
+
+def main(db, schema_name, table_number, min_cols, max_cols, args):
+ timing = timer()
+ if args.pool_class:
+ engine = sa.create_engine(
+ db, echo=args.echo, poolclass=getattr(sa.pool, args.pool_class)
+ )
+ else:
+ engine = sa.create_engine(db, echo=args.echo)
+
+ if engine.name == "oracle":
+ # clear out oracle caches so that we get the real-world time the
+ # queries would normally take for scripts that aren't run repeatedly
+ with engine.connect() as conn:
+ # https://stackoverflow.com/questions/2147456/how-to-clear-all-cached-items-in-oracle
+ conn.exec_driver_sql("alter system flush buffer_cache")
+ conn.exec_driver_sql("alter system flush shared_pool")
+ if not args.no_create:
+ print(
+ f"Generating {table_number} using engine {engine} in "
+ f"schema {schema_name or 'default'}",
+ )
+ meta = sa.MetaData()
+ table_names = []
+ stats = {}
+ try:
+ if not args.no_create:
+ with timing("populate-meta"):
+ meta = generate_meta(
+ schema_name, table_number, min_cols, max_cols, engine.name
+ )
+ with timing("create-tables"):
+ create_tables(engine, meta)
+
+ with timing("get_table_names"):
+ with engine.connect() as conn:
+ table_names = engine.dialect.get_table_names(
+ conn, schema=schema_name
+ )
+ print(
+ f"Reflected table number {len(table_names)} in "
+ f"schema {schema_name or 'default'}"
+ )
+ mode = {"single", "multi"}
+ if args.multi_only:
+ mode.discard("single")
+ if args.single_only:
+ mode.discard("multi")
+
+ if args.sqlstats:
+ print("starting stats for subsequent tests")
+ stats = _apply_events(engine)
+ for test_name, test_fn in tests.items():
+ if test_name in args.test or "all" in args.test:
+ test_fn(
+ engine,
+ schema_name,
+ table_names,
+ timing,
+ mode,
+ args.ignore_diff,
+ )
+
+ if args.reflect:
+ with timing("reflect-tables"):
+ reflect_tables(engine, schema_name)
+ finally:
+ # copy stats to new dict
+ if args.sqlstats:
+ stats = dict(stats)
+ try:
+ if not args.no_drop:
+ with timing("drop-tables"):
+ drop_tables(engine, meta, schema_name, table_names)
+ finally:
+ pprint(timing.timing, sort_dicts=False)
+ if args.sqlstats:
+ _print_query_stats(stats)
+
+
+def timer():
+ timing = {}
+
+ @contextmanager
+ def track_time(name):
+ s = time.time()
+ yield
+ timing[name] = time.time() - s
+
+ track_time.timing = timing
+ return track_time
+
+
+if __name__ == "__main__":
+ parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
+ parser.add_argument(
+ "--db", help="Database url", default="sqlite:///many-table.db"
+ )
+ parser.add_argument(
+ "--schema-name",
+ help="optional schema name",
+ type=str,
+ default=None,
+ )
+ parser.add_argument(
+ "--table-number",
+ help="Number of table to generate.",
+ type=int,
+ default=250,
+ )
+ parser.add_argument(
+ "--min-cols",
+ help="Min number of column per table.",
+ type=int,
+ default=15,
+ )
+ parser.add_argument(
+ "--max-cols",
+ help="Max number of column per table.",
+ type=int,
+ default=250,
+ )
+ parser.add_argument(
+ "--no-create", help="Do not run create tables", action="store_true"
+ )
+ parser.add_argument(
+ "--no-drop", help="Do not run drop tables", action="store_true"
+ )
+ parser.add_argument("--reflect", help="Run reflect", action="store_true")
+ parser.add_argument(
+ "--test",
+ help="Run these tests. 'all' runs all tests",
+ nargs="+",
+ choices=tuple(tests) + ("all", "none"),
+ default=["all"],
+ )
+ parser.add_argument(
+ "--sqlstats",
+ help="count and time individual queries",
+ action="store_true",
+ )
+ parser.add_argument(
+ "--multi-only", help="Only run multi table tests", action="store_true"
+ )
+ parser.add_argument(
+ "--single-only",
+ help="Only run single table tests",
+ action="store_true",
+ )
+ parser.add_argument(
+ "--echo", action="store_true", help="Enable echo on the engine"
+ )
+ parser.add_argument(
+ "--ignore-diff",
+ action="store_true",
+ help="Ignores differences in the single/multi reflections",
+ )
+ parser.add_argument(
+ "--single-inspect-conn",
+ action="store_true",
+ help="Uses inspect on a connection instead of on the engine when "
+ "using single reflections. Mainly for sqlite.",
+ )
+ parser.add_argument("--pool-class", help="The pool class to use")
+
+ args = parser.parse_args()
+ min_cols = args.min_cols
+ max_cols = args.max_cols
+ USE_CONNECTION = args.single_inspect_conn
+ assert min_cols <= max_cols and min_cols >= 1
+ assert not (args.multi_only and args.single_only)
+ main(
+ args.db, args.schema_name, args.table_number, min_cols, max_cols, args
+ )
diff --git a/test/requirements.py b/test/requirements.py
index bea861a83..db12990a3 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -76,6 +76,18 @@ class DefaultRequirements(SuiteRequirements):
return skip_if(no_support("sqlite", "not supported by database"))
@property
+ def foreign_keys_reflect_as_index(self):
+ return only_on(["mysql", "mariadb"])
+
+ @property
+ def unique_index_reflect_as_unique_constraints(self):
+ return only_on(["mysql", "mariadb"])
+
+ @property
+ def unique_constraints_reflect_as_index(self):
+ return only_on(["mysql", "mariadb", "oracle", "postgresql", "mssql"])
+
+ @property
def foreign_key_constraint_name_reflection(self):
return fails_if(
lambda config: against(config, ["mysql", "mariadb"])
@@ -84,6 +96,10 @@ class DefaultRequirements(SuiteRequirements):
)
@property
+ def reflect_indexes_with_ascdesc(self):
+ return fails_if(["oracle"])
+
+ @property
def table_ddl_if_exists(self):
"""target platform supports IF NOT EXISTS / IF EXISTS for tables."""
@@ -508,6 +524,12 @@ class DefaultRequirements(SuiteRequirements):
return exclusions.open()
@property
+ def schema_create_delete(self):
+ """target database supports schema create and dropped with
+ 'CREATE SCHEMA' and 'DROP SCHEMA'"""
+ return exclusions.skip_if(["sqlite", "oracle"])
+
+ @property
def cross_schema_fk_reflection(self):
"""target system must support reflection of inter-schema foreign
keys"""
@@ -547,11 +569,13 @@ class DefaultRequirements(SuiteRequirements):
@property
def check_constraint_reflection(self):
- return fails_on_everything_except(
- "postgresql",
- "sqlite",
- "oracle",
- self._mysql_and_check_constraints_exist,
+ return only_on(
+ [
+ "postgresql",
+ "sqlite",
+ "oracle",
+ self._mysql_and_check_constraints_exist,
+ ]
)
@property
@@ -562,7 +586,9 @@ class DefaultRequirements(SuiteRequirements):
def temp_table_names(self):
"""target dialect supports listing of temporary table names"""
- return only_on(["sqlite", "oracle"]) + skip_if(self._sqlite_file_db)
+ return only_on(["sqlite", "oracle", "postgresql"]) + skip_if(
+ self._sqlite_file_db
+ )
@property
def temporary_views(self):
@@ -792,8 +818,7 @@ class DefaultRequirements(SuiteRequirements):
@property
def views(self):
"""Target database must support VIEWs."""
-
- return skip_if("drizzle", "no VIEW support")
+ return exclusions.open()
@property
def empty_strings_varchar(self):
@@ -1331,14 +1356,28 @@ class DefaultRequirements(SuiteRequirements):
)
)
+ def _has_oracle_test_dblink(self, key):
+ def check(config):
+ assert config.db.dialect.name == "oracle"
+ name = config.file_config.get("sqla_testing", key)
+ if not name:
+ return False
+ with config.db.connect() as conn:
+ links = config.db.dialect._list_dblinks(conn)
+ return config.db.dialect.normalize_name(name) in links
+
+ return only_on(["oracle"]) + only_if(
+ check,
+ f"{key} option not specified in config or dblink not found in db",
+ )
+
@property
def oracle_test_dblink(self):
- return skip_if(
- lambda config: not config.file_config.has_option(
- "sqla_testing", "oracle_db_link"
- ),
- "oracle_db_link option not specified in config",
- )
+ return self._has_oracle_test_dblink("oracle_db_link")
+
+ @property
+ def oracle_test_dblink2(self):
+ return self._has_oracle_test_dblink("oracle_db_link2")
@property
def postgresql_test_dblink(self):
@@ -1774,6 +1813,19 @@ class DefaultRequirements(SuiteRequirements):
return only_on(["mssql"]) + only_if(check)
@property
+ def reflect_table_options(self):
+ return only_on(["mysql", "mariadb", "oracle"])
+
+ @property
+ def materialized_views(self):
+ """Target database must support MATERIALIZED VIEWs."""
+ return only_on(["postgresql", "oracle"])
+
+ @property
+ def materialized_views_reflect_pk(self):
+ return only_on(["oracle"])
+
+ @property
def uuid_data_type(self):
"""Return databases that support the UUID datatype."""
return only_on(("postgresql >= 8.3", "mariadb >= 10.7.0"))