diff options
| author | Federico Caselli <cfederico87@gmail.com> | 2022-06-18 21:08:27 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-06-18 21:08:27 +0000 |
| commit | be576e7d88b6038781e52f7ef79799dbad09cd54 (patch) | |
| tree | 772c368e107d13537ad8fb030b2b02fb1638169b /test | |
| parent | f7daad21ef66c29aecfbdb2b967641d0adad8779 (diff) | |
| parent | db08a699489c9b0259579d7ff7fd6bf3496ca3a2 (diff) | |
| download | sqlalchemy-be576e7d88b6038781e52f7ef79799dbad09cd54.tar.gz | |
Merge "rearchitect reflection for batched performance" into main
Diffstat (limited to 'test')
| -rw-r--r-- | test/dialect/mysql/test_reflection.py | 18 | ||||
| -rw-r--r-- | test/dialect/oracle/test_reflection.py | 554 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 20 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_reflection.py | 441 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_types.py | 35 | ||||
| -rw-r--r-- | test/dialect/test_sqlite.py | 31 | ||||
| -rw-r--r-- | test/engine/test_reflection.py | 57 | ||||
| -rw-r--r-- | test/perf/many_table_reflection.py | 617 | ||||
| -rw-r--r-- | test/requirements.py | 80 |
9 files changed, 1658 insertions, 195 deletions
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index f414c9c37..846001347 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -33,9 +33,9 @@ from sqlalchemy import UniqueConstraint from sqlalchemy.dialects.mysql import base as mysql from sqlalchemy.dialects.mysql import reflection as _reflection from sqlalchemy.schema import CreateIndex -from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ @@ -558,6 +558,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): ) def test_skip_not_describable(self, metadata, connection): + """This test is the only one that test the _default_multi_reflect + behaviour with UnreflectableTableError + """ + @event.listens_for(metadata, "before_drop") def cleanup(*arg, **kw): with testing.db.begin() as conn: @@ -579,14 +583,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): m.reflect(views=True, bind=conn) eq_(m.tables["test_t2"].name, "test_t2") - assert_raises_message( - exc.UnreflectableTableError, - "references invalid table", - Table, - "test_v", - MetaData(), - autoload_with=conn, - ) + with expect_raises_message( + exc.UnreflectableTableError, "references invalid table" + ): + Table("test_v", MetaData(), autoload_with=conn) @testing.exclude("mysql", "<", (5, 0, 0), "no information_schema support") def test_system_views(self): diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py index bf76dca43..53eb94df3 100644 --- a/test/dialect/oracle/test_reflection.py +++ b/test/dialect/oracle/test_reflection.py @@ -27,14 +27,18 @@ from sqlalchemy.dialects.oracle.base import BINARY_FLOAT from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION from sqlalchemy.dialects.oracle.base import NUMBER from sqlalchemy.dialects.oracle.base import REAL +from sqlalchemy.engine import ObjectKind from sqlalchemy.testing import assert_warns from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing import config from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_true from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import eq_compile_type from sqlalchemy.testing.schema import Table @@ -384,6 +388,7 @@ class SystemTableTablenamesTest(fixtures.TestBase): __backend__ = True def setup_test(self): + with testing.db.begin() as conn: conn.exec_driver_sql("create table my_table (id integer)") conn.exec_driver_sql( @@ -417,6 +422,14 @@ class SystemTableTablenamesTest(fixtures.TestBase): set(["my_table", "foo_table"]), ) + def test_reflect_system_table(self): + meta = MetaData() + t = Table("foo_table", meta, autoload_with=testing.db) + assert t.columns.keys() == ["id"] + + t = Table("my_temp_table", meta, autoload_with=testing.db) + assert t.columns.keys() == ["id"] + class DontReflectIOTTest(fixtures.TestBase): """test that index overflow tables aren't included in @@ -509,6 +522,228 @@ class TableReflectionTest(fixtures.TestBase): tbl = Table("test_compress", m2, autoload_with=connection) assert tbl.dialect_options["oracle"]["compress"] == "OLTP" + def test_reflect_hidden_column(self): + with testing.db.begin() as conn: + conn.exec_driver_sql( + "CREATE TABLE my_table(id integer, hide integer INVISIBLE)" + ) + + try: + insp = inspect(conn) + cols = insp.get_columns("my_table") + assert len(cols) == 1 + assert cols[0]["name"] == "id" + finally: + conn.exec_driver_sql("DROP TABLE my_table") + + +class ViewReflectionTest(fixtures.TestBase): + __only_on__ = "oracle" + __backend__ = True + + @classmethod + def setup_test_class(cls): + sql = """ + CREATE TABLE tbl ( + id INTEGER PRIMARY KEY, + data INTEGER + ); + + CREATE VIEW tbl_plain_v AS + SELECT id, data FROM tbl WHERE id > 100; + + -- comments on plain views are created with "comment on table" + -- because why not.. + COMMENT ON TABLE tbl_plain_v IS 'view comment'; + + CREATE MATERIALIZED VIEW tbl_v AS + SELECT id, data FROM tbl WHERE id > 42; + + COMMENT ON MATERIALIZED VIEW tbl_v IS 'my mat view comment'; + + CREATE MATERIALIZED VIEW tbl_v2 AS + SELECT id, data FROM tbl WHERE id < 42; + + COMMENT ON MATERIALIZED VIEW tbl_v2 IS 'my other mat view comment'; + + CREATE SYNONYM view_syn FOR tbl_plain_v; + CREATE SYNONYM %(test_schema)s.ts_v_s FOR tbl_plain_v; + + CREATE VIEW %(test_schema)s.schema_view AS + SELECT 1 AS value FROM dual; + + COMMENT ON TABLE %(test_schema)s.schema_view IS 'schema view comment'; + CREATE SYNONYM syn_schema_view FOR %(test_schema)s.schema_view; + """ + if testing.requires.oracle_test_dblink.enabled: + cls.dblink = config.file_config.get( + "sqla_testing", "oracle_db_link" + ) + sql += """ + CREATE SYNONYM syn_link FOR tbl_plain_v@%(link)s; + """ % { + "link": cls.dblink + } + with testing.db.begin() as conn: + for stmt in ( + sql % {"test_schema": testing.config.test_schema} + ).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + @classmethod + def teardown_test_class(cls): + sql = """ + DROP MATERIALIZED VIEW tbl_v; + DROP MATERIALIZED VIEW tbl_v2; + DROP VIEW tbl_plain_v; + DROP TABLE tbl; + DROP VIEW %(test_schema)s.schema_view; + DROP SYNONYM view_syn; + DROP SYNONYM %(test_schema)s.ts_v_s; + DROP SYNONYM syn_schema_view; + """ + if testing.requires.oracle_test_dblink.enabled: + sql += """ + DROP SYNONYM syn_link; + """ + with testing.db.begin() as conn: + for stmt in ( + sql % {"test_schema": testing.config.test_schema} + ).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + def test_get_names(self, connection): + insp = inspect(connection) + eq_(insp.get_table_names(), ["tbl"]) + eq_(insp.get_view_names(), ["tbl_plain_v"]) + eq_(insp.get_materialized_view_names(), ["tbl_v", "tbl_v2"]) + eq_( + insp.get_view_names(schema=testing.config.test_schema), + ["schema_view"], + ) + + def test_get_table_comment_on_view(self, connection): + insp = inspect(connection) + eq_(insp.get_table_comment("tbl_v"), {"text": "my mat view comment"}) + eq_(insp.get_table_comment("tbl_plain_v"), {"text": "view comment"}) + + def test_get_multi_view_comment(self, connection): + insp = inspect(connection) + plain = {(None, "tbl_plain_v"): {"text": "view comment"}} + mat = { + (None, "tbl_v"): {"text": "my mat view comment"}, + (None, "tbl_v2"): {"text": "my other mat view comment"}, + } + eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW), + mat, + ) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW), + {**plain, **mat}, + ) + ts = testing.config.test_schema + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW, schema=ts), + {(ts, "schema_view"): {"text": "schema view comment"}}, + ) + eq_(insp.get_multi_table_comment(), {(None, "tbl"): {"text": None}}) + + def test_get_table_comment_synonym(self, connection): + insp = inspect(connection) + eq_( + insp.get_table_comment("view_syn", oracle_resolve_synonyms=True), + {"text": "view comment"}, + ) + eq_( + insp.get_table_comment( + "syn_schema_view", oracle_resolve_synonyms=True + ), + {"text": "schema view comment"}, + ) + eq_( + insp.get_table_comment( + "ts_v_s", + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ), + {"text": "view comment"}, + ) + + def test_get_multi_view_comment_synonym(self, connection): + insp = inspect(connection) + exp = { + (None, "view_syn"): {"text": "view comment"}, + (None, "syn_schema_view"): {"text": "schema view comment"}, + } + if testing.requires.oracle_test_dblink.enabled: + exp[(None, "syn_link")] = {"text": "view comment"} + eq_( + insp.get_multi_table_comment( + oracle_resolve_synonyms=True, kind=ObjectKind.ANY_VIEW + ), + exp, + ) + ts = testing.config.test_schema + eq_( + insp.get_multi_table_comment( + oracle_resolve_synonyms=True, + schema=ts, + kind=ObjectKind.ANY_VIEW, + ), + {(ts, "ts_v_s"): {"text": "view comment"}}, + ) + + def test_get_view_definition(self, connection): + insp = inspect(connection) + eq_( + insp.get_view_definition("tbl_plain_v"), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_v"), + "SELECT id, data FROM tbl WHERE id > 42", + ) + with expect_raises(exc.NoSuchTableError): + eq_(insp.get_view_definition("view_syn"), None) + eq_( + insp.get_view_definition("view_syn", oracle_resolve_synonyms=True), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition( + "syn_schema_view", oracle_resolve_synonyms=True + ), + "SELECT 1 AS value FROM dual", + ) + eq_( + insp.get_view_definition( + "ts_v_s", + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ), + "SELECT id, data FROM tbl WHERE id > 100", + ) + + @testing.requires.oracle_test_dblink + def test_get_view_definition_dblink(self, connection): + insp = inspect(connection) + eq_( + insp.get_view_definition("syn_link", oracle_resolve_synonyms=True), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_plain_v", dblink=self.dblink), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_v", dblink=self.dblink), + "SELECT id, data FROM tbl WHERE id > 42", + ) + class RoundTripIndexTest(fixtures.TestBase): __only_on__ = "oracle" @@ -722,8 +957,6 @@ class DBLinkReflectionTest(fixtures.TestBase): @classmethod def setup_test_class(cls): - from sqlalchemy.testing import config - cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link") # note that the synonym here is still not totally functional @@ -863,3 +1096,320 @@ class IdentityReflectionTest(fixtures.TablesTest): exp = common.copy() exp["order"] = True eq_(col["identity"], exp) + + +class AdditionalReflectionTests(fixtures.TestBase): + __only_on__ = "oracle" + __backend__ = True + + @classmethod + def setup_test_class(cls): + # currently assuming full DBA privs for the user. + # don't really know how else to go here unless + # we connect as the other user. + + sql = """ +CREATE TABLE %(schema)sparent( + id INTEGER, + data VARCHAR2(50), + CONSTRAINT parent_pk_%(schema_id)s PRIMARY KEY (id) +); +CREATE TABLE %(schema)smy_table( + id INTEGER, + name VARCHAR2(125), + related INTEGER, + data%(schema_id)s NUMBER NOT NULL, + CONSTRAINT my_table_pk_%(schema_id)s PRIMARY KEY (id), + CONSTRAINT my_table_fk_%(schema_id)s FOREIGN KEY(related) + REFERENCES %(schema)sparent(id), + CONSTRAINT my_table_check_%(schema_id)s CHECK (data%(schema_id)s > 42), + CONSTRAINT data_unique%(schema_id)s UNIQUE (data%(schema_id)s) +); +CREATE INDEX my_table_index_%(schema_id)s on %(schema)smy_table (id, name); +COMMENT ON TABLE %(schema)smy_table IS 'my table comment %(schema_id)s'; +COMMENT ON COLUMN %(schema)smy_table.name IS +'my table.name comment %(schema_id)s'; +""" + + with testing.db.begin() as conn: + for schema in ("", testing.config.test_schema): + dd = { + "schema": f"{schema}." if schema else "", + "schema_id": "sch" if schema else "", + } + for stmt in (sql % dd).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + @classmethod + def teardown_test_class(cls): + sql = """ +drop table %(schema)smy_table; +drop table %(schema)sparent; +""" + with testing.db.begin() as conn: + for schema in ("", testing.config.test_schema): + dd = {"schema": f"{schema}." if schema else ""} + for stmt in (sql % dd).split(";"): + if stmt.strip(): + try: + conn.exec_driver_sql(stmt) + except: + pass + + def setup_test(self): + self.dblink = config.file_config.get("sqla_testing", "oracle_db_link") + self.dblink2 = config.file_config.get( + "sqla_testing", "oracle_db_link2" + ) + self.columns = {} + self.indexes = {} + self.primary_keys = {} + self.comments = {} + self.uniques = {} + self.checks = {} + self.foreign_keys = {} + self.options = {} + self.allDicts = [ + self.columns, + self.indexes, + self.primary_keys, + self.comments, + self.uniques, + self.checks, + self.foreign_keys, + self.options, + ] + for schema in (None, testing.config.test_schema): + suffix = "sch" if schema else "" + + self.columns[schema] = { + (schema, "my_table"): [ + { + "name": "id", + "nullable": False, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": "name", + "nullable": True, + "type": eq_compile_type("VARCHAR(125)"), + "default": None, + "comment": f"my table.name comment {suffix}", + }, + { + "name": "related", + "nullable": True, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": f"data{suffix}", + "nullable": False, + "type": eq_compile_type("NUMBER"), + "default": None, + "comment": None, + }, + ], + (schema, "parent"): [ + { + "name": "id", + "nullable": False, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": "data", + "nullable": True, + "type": eq_compile_type("VARCHAR(50)"), + "default": None, + "comment": None, + }, + ], + } + self.indexes[schema] = { + (schema, "my_table"): [ + { + "name": f"data_unique{suffix}", + "column_names": [f"data{suffix}"], + "dialect_options": {}, + "unique": True, + }, + { + "name": f"my_table_index_{suffix}", + "column_names": ["id", "name"], + "dialect_options": {}, + "unique": False, + }, + ], + (schema, "parent"): [], + } + self.primary_keys[schema] = { + (schema, "my_table"): { + "name": f"my_table_pk_{suffix}", + "constrained_columns": ["id"], + }, + (schema, "parent"): { + "name": f"parent_pk_{suffix}", + "constrained_columns": ["id"], + }, + } + self.comments[schema] = { + (schema, "my_table"): {"text": f"my table comment {suffix}"}, + (schema, "parent"): {"text": None}, + } + self.foreign_keys[schema] = { + (schema, "my_table"): [ + { + "name": f"my_table_fk_{suffix}", + "constrained_columns": ["related"], + "referred_schema": schema, + "referred_table": "parent", + "referred_columns": ["id"], + "options": {}, + } + ], + (schema, "parent"): [], + } + self.checks[schema] = { + (schema, "my_table"): [ + { + "name": f"my_table_check_{suffix}", + "sqltext": f"data{suffix} > 42", + } + ], + (schema, "parent"): [], + } + self.uniques[schema] = { + (schema, "my_table"): [ + { + "name": f"data_unique{suffix}", + "column_names": [f"data{suffix}"], + "duplicates_index": f"data_unique{suffix}", + } + ], + (schema, "parent"): [], + } + self.options[schema] = { + (schema, "my_table"): {}, + (schema, "parent"): {}, + } + + def test_tables(self, connection): + insp = inspect(connection) + + eq_(sorted(insp.get_table_names()), ["my_table", "parent"]) + + def _check_reflection(self, conn, schema, res_schema=False, **kw): + if res_schema is False: + res_schema = schema + insp = inspect(conn) + eq_( + insp.get_multi_columns(schema=schema, **kw), + self.columns[res_schema], + ) + eq_( + insp.get_multi_indexes(schema=schema, **kw), + self.indexes[res_schema], + ) + eq_( + insp.get_multi_pk_constraint(schema=schema, **kw), + self.primary_keys[res_schema], + ) + eq_( + insp.get_multi_table_comment(schema=schema, **kw), + self.comments[res_schema], + ) + eq_( + insp.get_multi_foreign_keys(schema=schema, **kw), + self.foreign_keys[res_schema], + ) + eq_( + insp.get_multi_check_constraints(schema=schema, **kw), + self.checks[res_schema], + ) + eq_( + insp.get_multi_unique_constraints(schema=schema, **kw), + self.uniques[res_schema], + ) + eq_( + insp.get_multi_table_options(schema=schema, **kw), + self.options[res_schema], + ) + + @testing.combinations(True, False, argnames="schema") + def test_schema_translate_map(self, connection, schema): + schema = testing.config.test_schema if schema else None + c = connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } + ) + self._check_reflection(c, schema) + + @testing.requires.oracle_test_dblink + def test_db_link(self, connection): + self._check_reflection(connection, schema=None, dblink=self.dblink) + self._check_reflection( + connection, + schema=testing.config.test_schema, + dblink=self.dblink, + ) + + def test_no_synonyms(self, connection): + # oracle_resolve_synonyms is ignored if there are no matching synonym + self._check_reflection( + connection, schema=None, oracle_resolve_synonyms=True + ) + connection.exec_driver_sql("CREATE SYNONYM tmp FOR parent") + for dict_ in self.allDicts: + dict_["tmp"] = {(None, "parent"): dict_[None][(None, "parent")]} + try: + self._check_reflection( + connection, + schema=None, + res_schema="tmp", + oracle_resolve_synonyms=True, + filter_names=["parent"], + ) + finally: + connection.exec_driver_sql("DROP SYNONYM tmp") + + @testing.requires.oracle_test_dblink + @testing.requires.oracle_test_dblink2 + def test_multi_dblink_synonyms(self, connection): + # oracle_resolve_synonyms handles multiple dblink at once + connection.exec_driver_sql( + f"CREATE SYNONYM s1 FOR my_table@{self.dblink}" + ) + connection.exec_driver_sql( + f"CREATE SYNONYM s2 FOR {testing.config.test_schema}." + f"my_table@{self.dblink2}" + ) + connection.exec_driver_sql("CREATE SYNONYM s3 FOR parent") + for dict_ in self.allDicts: + dict_["tmp"] = { + (None, "s1"): dict_[None][(None, "my_table")], + (None, "s2"): dict_[testing.config.test_schema][ + (testing.config.test_schema, "my_table") + ], + (None, "s3"): dict_[None][(None, "parent")], + } + fk = self.foreign_keys["tmp"][(None, "s1")][0] + fk["referred_table"] = "s3" + try: + self._check_reflection( + connection, + schema=None, + res_schema="tmp", + oracle_resolve_synonyms=True, + ) + finally: + connection.exec_driver_sql("DROP SYNONYM s1") + connection.exec_driver_sql("DROP SYNONYM s2") + connection.exec_driver_sql("DROP SYNONYM s3") diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 0093eb5ba..d55aa8203 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -887,19 +887,12 @@ class MiscBackendTest( ) @testing.combinations( - ((8, 1), False, False), - ((8, 1), None, False), - ((11, 5), True, False), - ((11, 5), False, True), + (True, False), + (False, True), ) - def test_backslash_escapes_detection( - self, version, explicit_setting, expected - ): + def test_backslash_escapes_detection(self, explicit_setting, expected): engine = engines.testing_engine() - def _server_version(conn): - return version - if explicit_setting is not None: @event.listens_for(engine, "connect", insert=True) @@ -912,11 +905,8 @@ class MiscBackendTest( ) dbapi_connection.commit() - with mock.patch.object( - engine.dialect, "_get_server_version_info", _server_version - ): - with engine.connect(): - eq_(engine.dialect._backslash_escapes, expected) + with engine.connect(): + eq_(engine.dialect._backslash_escapes, expected) def test_dbapi_autocommit_attribute(self): """all the supported DBAPIs have an .autocommit attribute. make diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index 3e0569d32..21b4149bc 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -27,17 +27,21 @@ from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import ExcludeConstraint from sqlalchemy.dialects.postgresql import INTEGER from sqlalchemy.dialects.postgresql import INTERVAL +from sqlalchemy.dialects.postgresql import pg_catalog from sqlalchemy.dialects.postgresql import TSRANGE +from sqlalchemy.engine import ObjectKind +from sqlalchemy.engine import ObjectScope from sqlalchemy.schema import CreateIndex from sqlalchemy.sql.schema import CheckConstraint from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock -from sqlalchemy.testing.assertions import assert_raises from sqlalchemy.testing.assertions import assert_warns from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.assertions import expect_raises from sqlalchemy.testing.assertions import is_ +from sqlalchemy.testing.assertions import is_false from sqlalchemy.testing.assertions import is_true @@ -231,17 +235,36 @@ class MaterializedViewReflectionTest( connection.execute(target.insert(), {"id": 89, "data": "d1"}) materialized_view = sa.DDL( - "CREATE MATERIALIZED VIEW test_mview AS " "SELECT * FROM testtable" + "CREATE MATERIALIZED VIEW test_mview AS SELECT * FROM testtable" ) plain_view = sa.DDL( - "CREATE VIEW test_regview AS " "SELECT * FROM testtable" + "CREATE VIEW test_regview AS SELECT data FROM testtable" ) sa.event.listen(testtable, "after_create", plain_view) sa.event.listen(testtable, "after_create", materialized_view) sa.event.listen( testtable, + "after_create", + sa.DDL("COMMENT ON VIEW test_regview IS 'regular view comment'"), + ) + sa.event.listen( + testtable, + "after_create", + sa.DDL( + "COMMENT ON MATERIALIZED VIEW test_mview " + "IS 'materialized view comment'" + ), + ) + sa.event.listen( + testtable, + "after_create", + sa.DDL("CREATE INDEX mat_index ON test_mview(data DESC)"), + ) + + sa.event.listen( + testtable, "before_drop", sa.DDL("DROP MATERIALIZED VIEW test_mview"), ) @@ -249,6 +272,12 @@ class MaterializedViewReflectionTest( testtable, "before_drop", sa.DDL("DROP VIEW test_regview") ) + def test_has_type(self, connection): + insp = inspect(connection) + is_true(insp.has_type("test_mview")) + is_true(insp.has_type("test_regview")) + is_true(insp.has_type("testtable")) + def test_mview_is_reflected(self, connection): metadata = MetaData() table = Table("test_mview", metadata, autoload_with=connection) @@ -265,49 +294,99 @@ class MaterializedViewReflectionTest( def test_get_view_names(self, inspect_fixture): insp, conn = inspect_fixture - eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) + eq_(set(insp.get_view_names()), set(["test_regview"])) - def test_get_view_names_plain(self, connection): + def test_get_materialized_view_names(self, inspect_fixture): + insp, conn = inspect_fixture + eq_(set(insp.get_materialized_view_names()), set(["test_mview"])) + + def test_get_view_names_reflection_cache_ok(self, connection): insp = inspect(connection) + eq_(set(insp.get_view_names()), set(["test_regview"])) eq_( - set(insp.get_view_names(include=("plain",))), set(["test_regview"]) + set(insp.get_materialized_view_names()), + set(["test_mview"]), + ) + eq_( + set(insp.get_view_names()).union( + insp.get_materialized_view_names() + ), + set(["test_regview", "test_mview"]), ) - def test_get_view_names_plain_string(self, connection): + def test_get_view_definition(self, connection): insp = inspect(connection) - eq_(set(insp.get_view_names(include="plain")), set(["test_regview"])) - def test_get_view_names_materialized(self, connection): - insp = inspect(connection) + def normalize(definition): + return re.sub(r"[\n\t ]+", " ", definition.strip()) + eq_( - set(insp.get_view_names(include=("materialized",))), - set(["test_mview"]), + normalize(insp.get_view_definition("test_mview")), + "SELECT testtable.id, testtable.data FROM testtable;", + ) + eq_( + normalize(insp.get_view_definition("test_regview")), + "SELECT testtable.data FROM testtable;", ) - def test_get_view_names_reflection_cache_ok(self, connection): + def test_get_view_comment(self, connection): insp = inspect(connection) eq_( - set(insp.get_view_names(include=("plain",))), set(["test_regview"]) + insp.get_table_comment("test_regview"), + {"text": "regular view comment"}, ) eq_( - set(insp.get_view_names(include=("materialized",))), - set(["test_mview"]), + insp.get_table_comment("test_mview"), + {"text": "materialized view comment"}, ) - eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) - def test_get_view_names_empty(self, connection): + def test_get_multi_view_comment(self, connection): insp = inspect(connection) - assert_raises(ValueError, insp.get_view_names, include=()) + eq_( + insp.get_multi_table_comment(), + {(None, "testtable"): {"text": None}}, + ) + plain = {(None, "test_regview"): {"text": "regular view comment"}} + mat = {(None, "test_mview"): {"text": "materialized view comment"}} + eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW), + mat, + ) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW), + {**plain, **mat}, + ) + eq_( + insp.get_multi_table_comment( + kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY + ), + {}, + ) - def test_get_view_definition(self, connection): + def test_get_multi_view_indexes(self, connection): insp = inspect(connection) + eq_(insp.get_multi_indexes(), {(None, "testtable"): []}) + + exp = { + "name": "mat_index", + "unique": False, + "column_names": ["data"], + "column_sorting": {"data": ("desc",)}, + } + if connection.dialect.server_version_info >= (11, 0): + exp["include_columns"] = [] + exp["dialect_options"] = {"postgresql_include": []} + plain = {(None, "test_regview"): []} + mat = {(None, "test_mview"): [exp]} + eq_(insp.get_multi_indexes(kind=ObjectKind.VIEW), plain) + eq_(insp.get_multi_indexes(kind=ObjectKind.MATERIALIZED_VIEW), mat) + eq_(insp.get_multi_indexes(kind=ObjectKind.ANY_VIEW), {**plain, **mat}) eq_( - re.sub( - r"[\n\t ]+", - " ", - insp.get_view_definition("test_mview").strip(), + insp.get_multi_indexes( + kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY ), - "SELECT testtable.id, testtable.data FROM testtable;", + {}, ) @@ -993,9 +1072,9 @@ class ReflectionTest( go, [ "Skipped unsupported reflection of " - "expression-based index idx1", + "expression-based index idx1 of table party", "Skipped unsupported reflection of " - "expression-based index idx3", + "expression-based index idx3 of table party", ], ) @@ -1016,7 +1095,7 @@ class ReflectionTest( metadata.create_all(connection) - ind = connection.dialect.get_indexes(connection, t1, None) + ind = connection.dialect.get_indexes(connection, t1.name, None) partial_definitions = [] for ix in ind: @@ -1337,6 +1416,9 @@ class ReflectionTest( } ], ) + is_true(inspector.has_type("mood", "test_schema")) + is_true(inspector.has_type("mood", "*")) + is_false(inspector.has_type("mood")) def test_inspect_enums(self, metadata, inspect_fixture): @@ -1345,30 +1427,49 @@ class ReflectionTest( enum_type = postgresql.ENUM( "cat", "dog", "rat", name="pet", metadata=metadata ) + enum_type.create(conn) + conn.commit() - with conn.begin(): - enum_type.create(conn) - - eq_( - inspector.get_enums(), - [ - { - "visible": True, - "labels": ["cat", "dog", "rat"], - "name": "pet", - "schema": "public", - } - ], - ) - - def test_get_table_oid(self, metadata, inspect_fixture): - - inspector, conn = inspect_fixture + res = [ + { + "visible": True, + "labels": ["cat", "dog", "rat"], + "name": "pet", + "schema": "public", + } + ] + eq_(inspector.get_enums(), res) + is_true(inspector.has_type("pet", "*")) + is_true(inspector.has_type("pet")) + is_false(inspector.has_type("pet", "test_schema")) + + enum_type.drop(conn) + conn.commit() + eq_(inspector.get_enums(), res) + is_true(inspector.has_type("pet")) + inspector.clear_cache() + eq_(inspector.get_enums(), []) + is_false(inspector.has_type("pet")) + + def test_get_table_oid(self, metadata, connection): + Table("t1", metadata, Column("col", Integer)) + Table("t1", metadata, Column("col", Integer), schema="test_schema") + metadata.create_all(connection) + insp = inspect(connection) + oid = insp.get_table_oid("t1") + oid_schema = insp.get_table_oid("t1", schema="test_schema") + is_true(isinstance(oid, int)) + is_true(isinstance(oid_schema, int)) + is_true(oid != oid_schema) - with conn.begin(): - Table("some_table", metadata, Column("q", Integer)).create(conn) + with expect_raises(exc.NoSuchTableError): + insp.get_table_oid("does_not_exist") - assert inspector.get_table_oid("some_table") is not None + metadata.tables["t1"].drop(connection) + eq_(insp.get_table_oid("t1"), oid) + insp.clear_cache() + with expect_raises(exc.NoSuchTableError): + insp.get_table_oid("t1") def test_inspect_enums_case_sensitive(self, metadata, connection): sa.event.listen( @@ -1707,77 +1808,146 @@ class ReflectionTest( ) def test_reflect_check_warning(self): - rows = [("some name", "NOTCHECK foobar")] + rows = [("foo", "some name", "NOTCHECK foobar")] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 + with testing.expect_warnings( + "Could not parse CHECK constraint text: 'NOTCHECK foobar'" ): - with testing.expect_warnings( - "Could not parse CHECK constraint text: 'NOTCHECK foobar'" - ): - testing.db.dialect.get_check_constraints(conn, "foo") + testing.db.dialect.get_check_constraints(conn, "foo") def test_reflect_extra_newlines(self): rows = [ - ("some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"), - ("some other name", "CHECK ((b\nIS\nNOT\nNULL))"), - ("some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"), - ("some name", "CHECK (c != 'hi\nim a name\n')"), + ("foo", "some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"), + ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))"), + ("foo", "some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"), + ("foo", "some name", "CHECK (c != 'hi\nim a name\n')"), ] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 - ): - check_constraints = testing.db.dialect.get_check_constraints( - conn, "foo" - ) - eq_( - check_constraints, - [ - { - "name": "some name", - "sqltext": "a \nIS\n NOT\n\n NULL\n", - }, - {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"}, - { - "name": "some CRLF name", - "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL", - }, - {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"}, - ], - ) + check_constraints = testing.db.dialect.get_check_constraints( + conn, "foo" + ) + eq_( + check_constraints, + [ + { + "name": "some name", + "sqltext": "a \nIS\n NOT\n\n NULL\n", + }, + {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"}, + { + "name": "some CRLF name", + "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL", + }, + {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"}, + ], + ) def test_reflect_with_not_valid_check_constraint(self): - rows = [("some name", "CHECK ((a IS NOT NULL)) NOT VALID")] + rows = [("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID")] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 - ): - check_constraints = testing.db.dialect.get_check_constraints( - conn, "foo" + check_constraints = testing.db.dialect.get_check_constraints( + conn, "foo" + ) + eq_( + check_constraints, + [ + { + "name": "some name", + "sqltext": "a IS NOT NULL", + "dialect_options": {"not_valid": True}, + } + ], + ) + + def _apply_stm(self, connection, use_map): + if use_map: + return connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } ) - eq_( - check_constraints, - [ - { - "name": "some name", - "sqltext": "a IS NOT NULL", - "dialect_options": {"not_valid": True}, - } - ], + else: + return connection + + @testing.combinations(True, False, argnames="use_map") + @testing.combinations(True, False, argnames="schema") + def test_schema_translate_map(self, metadata, connection, use_map, schema): + schema = testing.config.test_schema if schema else None + Table( + "foo", + metadata, + Column("id", Integer, primary_key=True), + Column("a", Integer, index=True), + Column( + "b", + ForeignKey(f"{schema}.foo.id" if schema else "foo.id"), + unique=True, + ), + CheckConstraint("a>10", name="foo_check"), + comment="comm", + schema=schema, + ) + metadata.create_all(connection) + if use_map: + connection = connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } ) + insp = inspect(connection) + eq_( + [c["name"] for c in insp.get_columns("foo", schema=schema)], + ["id", "a", "b"], + ) + eq_( + [ + i["column_names"] + for i in insp.get_indexes("foo", schema=schema) + ], + [["b"], ["a"]], + ) + eq_( + insp.get_pk_constraint("foo", schema=schema)[ + "constrained_columns" + ], + ["id"], + ) + eq_(insp.get_table_comment("foo", schema=schema), {"text": "comm"}) + eq_( + [ + f["constrained_columns"] + for f in insp.get_foreign_keys("foo", schema=schema) + ], + [["b"]], + ) + eq_( + [ + c["name"] + for c in insp.get_check_constraints("foo", schema=schema) + ], + ["foo_check"], + ) + eq_( + [ + u["column_names"] + for u in insp.get_unique_constraints("foo", schema=schema) + ], + [["b"]], + ) class CustomTypeReflectionTest(fixtures.TestBase): @@ -1804,9 +1974,23 @@ class CustomTypeReflectionTest(fixtures.TestBase): ("my_custom_type(ARG1)", ("ARG1", None)), ("my_custom_type(ARG1, ARG2)", ("ARG1", "ARG2")), ]: - column_info = dialect._get_column_info( - "colname", sch, None, False, {}, {}, "public", None, "", None + row_dict = { + "name": "colname", + "table_name": "tblname", + "format_type": sch, + "default": None, + "not_null": False, + "comment": None, + "generated": "", + "identity_options": None, + } + column_info = dialect._get_columns_info( + [row_dict], {}, {}, "public" ) + assert ("public", "tblname") in column_info + column_info = column_info[("public", "tblname")] + assert len(column_info) == 1 + column_info = column_info[0] assert isinstance(column_info["type"], self.CustomType) eq_(column_info["type"].arg1, args[0]) eq_(column_info["type"].arg2, args[1]) @@ -1951,3 +2135,64 @@ class IdentityReflectionTest(fixtures.TablesTest): exp = default.copy() exp.update(maxvalue=2**15 - 1) eq_(col["identity"], exp) + + +class TestReflectDifficultColTypes(fixtures.TablesTest): + __only_on__ = "postgresql" + __backend__ = True + + def define_tables(metadata): + Table( + "sample_table", + metadata, + Column("c1", Integer, primary_key=True), + Column("c2", Integer, unique=True), + Column("c3", Integer), + Index("sample_table_index", "c2", "c3"), + ) + + def check_int_list(self, row, key): + value = row[key] + is_true(isinstance(value, list)) + is_true(len(value) > 0) + is_true(all(isinstance(v, int) for v in value)) + + def test_pg_index(self, connection): + insp = inspect(connection) + + pgc_oid = insp.get_table_oid("sample_table") + cols = [ + col + for col in pg_catalog.pg_index.c + if testing.db.dialect.server_version_info + >= col.info.get("server_version", (0,)) + ] + + stmt = sa.select(*cols).filter_by(indrelid=pgc_oid) + rows = connection.execute(stmt).mappings().all() + is_true(len(rows) > 0) + cols = [ + col + for col in ["indkey", "indoption", "indclass", "indcollation"] + if testing.db.dialect.server_version_info + >= pg_catalog.pg_index.c[col].info.get("server_version", (0,)) + ] + for row in rows: + for col in cols: + self.check_int_list(row, col) + + def test_pg_constraint(self, connection): + insp = inspect(connection) + + pgc_oid = insp.get_table_oid("sample_table") + cols = [ + col + for col in pg_catalog.pg_constraint.c + if testing.db.dialect.server_version_info + >= col.info.get("server_version", (0,)) + ] + stmt = sa.select(*cols).filter_by(conrelid=pgc_oid) + rows = connection.execute(stmt).mappings().all() + is_true(len(rows) > 0) + for row in rows: + self.check_int_list(row, "conkey") diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py index fd4b91db1..79f029391 100644 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@ -454,11 +454,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): asserter.assert_( # check for table RegexSQL( - "select relname from pg_class c join pg_namespace.*", + "SELECT pg_catalog.pg_class.relname FROM pg_catalog." + "pg_class JOIN pg_catalog.pg_namespace.*", dialect="postgresql", ), # check for enum, just once - RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"), + RegexSQL( + r"SELECT pg_catalog.pg_type.typname .* WHERE " + "pg_catalog.pg_type.typname = ", + dialect="postgresql", + ), RegexSQL("CREATE TYPE myenum AS ENUM .*", dialect="postgresql"), RegexSQL(r"CREATE TABLE t .*", dialect="postgresql"), ) @@ -468,11 +473,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): asserter.assert_( RegexSQL( - "select relname from pg_class c join pg_namespace.*", + "SELECT pg_catalog.pg_class.relname FROM pg_catalog." + "pg_class JOIN pg_catalog.pg_namespace.*", dialect="postgresql", ), RegexSQL("DROP TABLE t", dialect="postgresql"), - RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"), + RegexSQL( + r"SELECT pg_catalog.pg_type.typname .* WHERE " + "pg_catalog.pg_type.typname = ", + dialect="postgresql", + ), RegexSQL("DROP TYPE myenum", dialect="postgresql"), ) @@ -694,23 +704,6 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): connection, "fourfivesixtype" ) - def test_no_support(self, testing_engine): - def server_version_info(self): - return (8, 2) - - e = testing_engine() - dialect = e.dialect - dialect._get_server_version_info = server_version_info - - assert dialect.supports_native_enum - e.connect() - assert not dialect.supports_native_enum - - # initialize is called again on new pool - e.dispose() - e.connect() - assert not dialect.supports_native_enum - def test_reflection(self, metadata, connection): etype = Enum( "four", "five", "six", name="fourfivesixtype", metadata=metadata diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index fb4331998..ed9d67612 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -50,6 +50,7 @@ from sqlalchemy.testing import combinations from sqlalchemy.testing import config from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ @@ -856,27 +857,15 @@ class AttachedDBTest(fixtures.TestBase): ["foo", "bar"], ) - eq_( - [ - d["name"] - for d in insp.get_columns("nonexistent", schema="test_schema") - ], - [], - ) - eq_( - [ - d["name"] - for d in insp.get_columns("another_created", schema=None) - ], - [], - ) - eq_( - [ - d["name"] - for d in insp.get_columns("local_only", schema="test_schema") - ], - [], - ) + with expect_raises(exc.NoSuchTableError): + insp.get_columns("nonexistent", schema="test_schema") + + with expect_raises(exc.NoSuchTableError): + insp.get_columns("another_created", schema=None) + + with expect_raises(exc.NoSuchTableError): + insp.get_columns("local_only", schema="test_schema") + eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"]) def test_table_names_present(self): diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index 76099e863..2f6c06ace 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -2,6 +2,7 @@ import unicodedata import sqlalchemy as sa from sqlalchemy import Computed +from sqlalchemy import Connection from sqlalchemy import DefaultClause from sqlalchemy import event from sqlalchemy import FetchedValue @@ -17,6 +18,7 @@ from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import UniqueConstraint +from sqlalchemy.engine import Inspector from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL @@ -1254,12 +1256,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): m2 = MetaData() t2 = Table("x", m2, autoload_with=connection) - ck = [ + cks = [ const for const in t2.constraints if isinstance(const, sa.CheckConstraint) - ][0] - + ] + eq_(len(cks), 1) + ck = cks[0] eq_regex(ck.sqltext.text, r"[\(`]*q[\)`]* > 10") eq_(ck.name, "ck1") @@ -1268,11 +1271,17 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): sa.Index("x_ix", t.c.a, t.c.b) metadata.create_all(connection) - def mock_get_columns(self, connection, table_name, **kw): - return [{"name": "b", "type": Integer, "primary_key": False}] + gri = Inspector._get_reflection_info + + def mock_gri(self, *a, **kw): + res = gri(self, *a, **kw) + res.columns[(None, "x")] = [ + col for col in res.columns[(None, "x")] if col["name"] == "b" + ] + return res with testing.mock.patch.object( - connection.dialect, "get_columns", mock_get_columns + Inspector, "_get_reflection_info", mock_gri ): m = MetaData() with testing.expect_warnings( @@ -1409,38 +1418,49 @@ class CreateDropTest(fixtures.TablesTest): eq_(ua, ["users", "email_addresses"]) eq_(oi, ["orders", "items"]) - def test_checkfirst(self, connection): + def test_checkfirst(self, connection: Connection) -> None: insp = inspect(connection) + users = self.tables.users is_false(insp.has_table("users")) users.create(connection) + insp.clear_cache() is_true(insp.has_table("users")) users.create(connection, checkfirst=True) users.drop(connection) users.drop(connection, checkfirst=True) + insp.clear_cache() is_false(insp.has_table("users")) users.create(connection, checkfirst=True) users.drop(connection) - def test_createdrop(self, connection): + def test_createdrop(self, connection: Connection) -> None: insp = inspect(connection) metadata = self.tables_test_metadata + assert metadata is not None metadata.create_all(connection) is_true(insp.has_table("items")) is_true(insp.has_table("email_addresses")) metadata.create_all(connection) + insp.clear_cache() is_true(insp.has_table("items")) metadata.drop_all(connection) + insp.clear_cache() is_false(insp.has_table("items")) is_false(insp.has_table("email_addresses")) metadata.drop_all(connection) + insp.clear_cache() is_false(insp.has_table("items")) - def test_tablenames(self, connection): + def test_has_table_and_table_names(self, connection): + """establish that has_table and get_table_names are consistent w/ + each other with regard to caching + + """ metadata = self.tables_test_metadata metadata.create_all(bind=connection) insp = inspect(connection) @@ -1448,6 +1468,19 @@ class CreateDropTest(fixtures.TablesTest): # ensure all tables we created are in the list. is_true(set(insp.get_table_names()).issuperset(metadata.tables)) + assert insp.has_table("items") + assert "items" in insp.get_table_names() + + self.tables.items.drop(connection) + + # cached + assert insp.has_table("items") + assert "items" in insp.get_table_names() + + insp = inspect(connection) + assert not insp.has_table("items") + assert "items" not in insp.get_table_names() + class SchemaManipulationTest(fixtures.TestBase): __backend__ = True @@ -1602,13 +1635,7 @@ class SchemaTest(fixtures.TestBase): __backend__ = True @testing.requires.schemas - @testing.requires.cross_schema_fk_reflection def test_has_schema(self): - if not hasattr(testing.db.dialect, "has_schema"): - testing.config.skip_test( - "dialect %s doesn't have a has_schema method" - % testing.db.dialect.name - ) with testing.db.connect() as conn: eq_( testing.db.dialect.has_schema( diff --git a/test/perf/many_table_reflection.py b/test/perf/many_table_reflection.py new file mode 100644 index 000000000..8749df5c2 --- /dev/null +++ b/test/perf/many_table_reflection.py @@ -0,0 +1,617 @@ +from argparse import ArgumentDefaultsHelpFormatter +from argparse import ArgumentParser +from collections import defaultdict +from contextlib import contextmanager +from functools import wraps +from pprint import pprint +import random +import time + +import sqlalchemy as sa +from sqlalchemy.engine import Inspector + +types = (sa.Integer, sa.BigInteger, sa.String(200), sa.DateTime) +USE_CONNECTION = False + + +def generate_table(meta: sa.MetaData, min_cols, max_cols, dialect_name): + col_number = random.randint(min_cols, max_cols) + table_num = len(meta.tables) + add_identity = random.random() > 0.90 + identity = sa.Identity( + always=random.randint(0, 1), + start=random.randint(1, 100), + increment=random.randint(1, 7), + ) + is_mssql = dialect_name == "mssql" + cols = [] + for i in range(col_number - (0 if is_mssql else add_identity)): + args = [] + if random.random() < 0.95 or table_num == 0: + if is_mssql and add_identity and i == 0: + args.append(sa.Integer) + args.append(identity) + else: + args.append(random.choice(types)) + else: + args.append( + sa.ForeignKey(f"table_{table_num-1}.table_{table_num-1}_col_1") + ) + cols.append( + sa.Column( + f"table_{table_num}_col_{i+1}", + *args, + primary_key=i == 0, + comment=f"primary key of table_{table_num}" + if i == 0 + else None, + index=random.random() > 0.9 and i > 0, + unique=random.random() > 0.95 and i > 0, + ) + ) + if add_identity and not is_mssql: + cols.append( + sa.Column( + f"table_{table_num}_col_{col_number}", + sa.Integer, + identity, + ) + ) + args = () + if table_num % 3 == 0: + # mysql can't do check constraint on PK col + args = (sa.CheckConstraint(cols[1].is_not(None)),) + return sa.Table( + f"table_{table_num}", + meta, + *cols, + *args, + comment=f"comment for table_{table_num}" if table_num % 2 else None, + ) + + +def generate_meta(schema_name, table_number, min_cols, max_cols, dialect_name): + meta = sa.MetaData(schema=schema_name) + log = defaultdict(int) + for _ in range(table_number): + t = generate_table(meta, min_cols, max_cols, dialect_name) + log["tables"] += 1 + log["columns"] += len(t.columns) + log["index"] += len(t.indexes) + log["check_con"] += len( + [c for c in t.constraints if isinstance(c, sa.CheckConstraint)] + ) + log["foreign_keys_con"] += len( + [ + c + for c in t.constraints + if isinstance(c, sa.ForeignKeyConstraint) + ] + ) + log["unique_con"] += len( + [c for c in t.constraints if isinstance(c, sa.UniqueConstraint)] + ) + log["identity"] += len([c for c in t.columns if c.identity]) + + print("Meta info", dict(log)) + return meta + + +def log(fn): + @wraps(fn) + def wrap(*a, **kw): + print("Running ", fn.__name__, "...", flush=True, end="") + try: + r = fn(*a, **kw) + except NotImplementedError: + print(" [not implemented]", flush=True) + r = None + else: + print("... done", flush=True) + return r + + return wrap + + +tests = {} + + +def define_test(fn): + name: str = fn.__name__ + if name.startswith("reflect_"): + name = name[8:] + tests[name] = wfn = log(fn) + return wfn + + +@log +def create_tables(engine, meta): + tables = list(meta.tables.values()) + for i in range(0, len(tables), 500): + meta.create_all(engine, tables[i : i + 500]) + + +@log +def drop_tables(engine, meta, schema_name, table_names: list): + tables = list(meta.tables.values())[::-1] + for i in range(0, len(tables), 500): + meta.drop_all(engine, tables[i : i + 500]) + + remaining = sa.inspect(engine).get_table_names(schema=schema_name) + suffix = "" + if engine.dialect.name.startswith("postgres"): + suffix = "CASCADE" + + remaining = sorted( + remaining, key=lambda tn: int(tn.partition("_")[2]), reverse=True + ) + with engine.connect() as conn: + for i, tn in enumerate(remaining): + if engine.dialect.requires_name_normalize: + name = engine.dialect.denormalize_name(tn) + else: + name = tn + if schema_name: + conn.execute( + sa.schema.DDL( + f'DROP TABLE {schema_name}."{name}" {suffix}' + ) + ) + else: + conn.execute(sa.schema.DDL(f'DROP TABLE "{name}" {suffix}')) + if i % 500 == 0: + conn.commit() + conn.commit() + + +@log +def reflect_tables(engine, schema_name): + ref_meta = sa.MetaData(schema=schema_name) + ref_meta.reflect(engine) + + +def verify_dict(multi, single, str_compare=False): + if single is None or multi is None: + return + if single != multi: + keys = set(single) | set(multi) + diff = [] + for key in sorted(keys): + se, me = single.get(key), multi.get(key) + if str(se) != str(me) if str_compare else se != me: + diff.append((key, single.get(key), multi.get(key))) + if diff: + print("\nfound different result:") + pprint(diff) + + +def _single_test( + singe_fn_name, + multi_fn_name, + engine, + schema_name, + table_names, + timing, + mode, +): + single = None + if "single" in mode: + singe_fn = getattr(Inspector, singe_fn_name) + + def go(bind): + insp = sa.inspect(bind) + single = {} + with timing(singe_fn.__name__): + for t in table_names: + single[(schema_name, t)] = singe_fn( + insp, t, schema=schema_name + ) + return single + + if USE_CONNECTION: + with engine.connect() as c: + single = go(c) + else: + single = go(engine) + + multi = None + if "multi" in mode: + insp = sa.inspect(engine) + multi_fn = getattr(Inspector, multi_fn_name) + with timing(multi_fn.__name__): + multi = multi_fn(insp, schema=schema_name) + return (multi, single) + + +@define_test +def reflect_columns( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_columns", + "get_multi_columns", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single, str_compare=True) + + +@define_test +def reflect_table_options( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_table_options", + "get_multi_table_options", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +@define_test +def reflect_pk(engine, schema_name, table_names, timing, mode, ignore_diff): + multi, single = _single_test( + "get_pk_constraint", + "get_multi_pk_constraint", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +@define_test +def reflect_comment( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_table_comment", + "get_multi_table_comment", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +@define_test +def reflect_whole_tables( + engine, schema_name, table_names, timing, mode, ignore_diff +): + single = None + meta = sa.MetaData(schema=schema_name) + + if "single" in mode: + + def go(bind): + single = {} + with timing("Table_autoload_with"): + for name in table_names: + single[(None, name)] = sa.Table( + name, meta, autoload_with=bind + ) + return single + + if USE_CONNECTION: + with engine.connect() as c: + single = go(c) + else: + single = go(engine) + + multi_meta = sa.MetaData(schema=schema_name) + if "multi" in mode: + with timing("MetaData_reflect"): + multi_meta.reflect(engine, only=table_names) + return (multi_meta, single) + + +@define_test +def reflect_check_constraints( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_check_constraints", + "get_multi_check_constraints", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +@define_test +def reflect_indexes( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_indexes", + "get_multi_indexes", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +@define_test +def reflect_foreign_keys( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_foreign_keys", + "get_multi_foreign_keys", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +@define_test +def reflect_unique_constraints( + engine, schema_name, table_names, timing, mode, ignore_diff +): + multi, single = _single_test( + "get_unique_constraints", + "get_multi_unique_constraints", + engine, + schema_name, + table_names, + timing, + mode, + ) + if not ignore_diff: + verify_dict(multi, single) + + +def _apply_events(engine): + queries = defaultdict(list) + + now = 0 + + @sa.event.listens_for(engine, "before_cursor_execute") + def before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + + nonlocal now + now = time.time() + + @sa.event.listens_for(engine, "after_cursor_execute") + def after_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + total = time.time() - now + + if context and context.compiled: + statement_str = context.compiled.string + else: + statement_str = statement + queries[statement_str].append(total) + + return queries + + +def _print_query_stats(queries): + number_of_queries = sum( + len(query_times) for query_times in queries.values() + ) + print("-" * 50) + q_list = list(queries.items()) + q_list.sort(key=lambda rec: -sum(rec[1])) + total = sum([sum(t) for _, t in q_list]) + print(f"total number of queries: {number_of_queries}. Total time {total}") + print("-" * 50) + + for stmt, times in q_list: + total_t = sum(times) + max_t = max(times) + min_t = min(times) + avg_t = total_t / len(times) + times.sort() + median_t = times[len(times) // 2] + + print( + f"Query times: {total_t=}, {max_t=}, {min_t=}, {avg_t=}, " + f"{median_t=} Number of calls: {len(times)}" + ) + print(stmt.strip(), "\n") + + +def main(db, schema_name, table_number, min_cols, max_cols, args): + timing = timer() + if args.pool_class: + engine = sa.create_engine( + db, echo=args.echo, poolclass=getattr(sa.pool, args.pool_class) + ) + else: + engine = sa.create_engine(db, echo=args.echo) + + if engine.name == "oracle": + # clear out oracle caches so that we get the real-world time the + # queries would normally take for scripts that aren't run repeatedly + with engine.connect() as conn: + # https://stackoverflow.com/questions/2147456/how-to-clear-all-cached-items-in-oracle + conn.exec_driver_sql("alter system flush buffer_cache") + conn.exec_driver_sql("alter system flush shared_pool") + if not args.no_create: + print( + f"Generating {table_number} using engine {engine} in " + f"schema {schema_name or 'default'}", + ) + meta = sa.MetaData() + table_names = [] + stats = {} + try: + if not args.no_create: + with timing("populate-meta"): + meta = generate_meta( + schema_name, table_number, min_cols, max_cols, engine.name + ) + with timing("create-tables"): + create_tables(engine, meta) + + with timing("get_table_names"): + with engine.connect() as conn: + table_names = engine.dialect.get_table_names( + conn, schema=schema_name + ) + print( + f"Reflected table number {len(table_names)} in " + f"schema {schema_name or 'default'}" + ) + mode = {"single", "multi"} + if args.multi_only: + mode.discard("single") + if args.single_only: + mode.discard("multi") + + if args.sqlstats: + print("starting stats for subsequent tests") + stats = _apply_events(engine) + for test_name, test_fn in tests.items(): + if test_name in args.test or "all" in args.test: + test_fn( + engine, + schema_name, + table_names, + timing, + mode, + args.ignore_diff, + ) + + if args.reflect: + with timing("reflect-tables"): + reflect_tables(engine, schema_name) + finally: + # copy stats to new dict + if args.sqlstats: + stats = dict(stats) + try: + if not args.no_drop: + with timing("drop-tables"): + drop_tables(engine, meta, schema_name, table_names) + finally: + pprint(timing.timing, sort_dicts=False) + if args.sqlstats: + _print_query_stats(stats) + + +def timer(): + timing = {} + + @contextmanager + def track_time(name): + s = time.time() + yield + timing[name] = time.time() - s + + track_time.timing = timing + return track_time + + +if __name__ == "__main__": + parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) + parser.add_argument( + "--db", help="Database url", default="sqlite:///many-table.db" + ) + parser.add_argument( + "--schema-name", + help="optional schema name", + type=str, + default=None, + ) + parser.add_argument( + "--table-number", + help="Number of table to generate.", + type=int, + default=250, + ) + parser.add_argument( + "--min-cols", + help="Min number of column per table.", + type=int, + default=15, + ) + parser.add_argument( + "--max-cols", + help="Max number of column per table.", + type=int, + default=250, + ) + parser.add_argument( + "--no-create", help="Do not run create tables", action="store_true" + ) + parser.add_argument( + "--no-drop", help="Do not run drop tables", action="store_true" + ) + parser.add_argument("--reflect", help="Run reflect", action="store_true") + parser.add_argument( + "--test", + help="Run these tests. 'all' runs all tests", + nargs="+", + choices=tuple(tests) + ("all", "none"), + default=["all"], + ) + parser.add_argument( + "--sqlstats", + help="count and time individual queries", + action="store_true", + ) + parser.add_argument( + "--multi-only", help="Only run multi table tests", action="store_true" + ) + parser.add_argument( + "--single-only", + help="Only run single table tests", + action="store_true", + ) + parser.add_argument( + "--echo", action="store_true", help="Enable echo on the engine" + ) + parser.add_argument( + "--ignore-diff", + action="store_true", + help="Ignores differences in the single/multi reflections", + ) + parser.add_argument( + "--single-inspect-conn", + action="store_true", + help="Uses inspect on a connection instead of on the engine when " + "using single reflections. Mainly for sqlite.", + ) + parser.add_argument("--pool-class", help="The pool class to use") + + args = parser.parse_args() + min_cols = args.min_cols + max_cols = args.max_cols + USE_CONNECTION = args.single_inspect_conn + assert min_cols <= max_cols and min_cols >= 1 + assert not (args.multi_only and args.single_only) + main( + args.db, args.schema_name, args.table_number, min_cols, max_cols, args + ) diff --git a/test/requirements.py b/test/requirements.py index bea861a83..db12990a3 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -76,6 +76,18 @@ class DefaultRequirements(SuiteRequirements): return skip_if(no_support("sqlite", "not supported by database")) @property + def foreign_keys_reflect_as_index(self): + return only_on(["mysql", "mariadb"]) + + @property + def unique_index_reflect_as_unique_constraints(self): + return only_on(["mysql", "mariadb"]) + + @property + def unique_constraints_reflect_as_index(self): + return only_on(["mysql", "mariadb", "oracle", "postgresql", "mssql"]) + + @property def foreign_key_constraint_name_reflection(self): return fails_if( lambda config: against(config, ["mysql", "mariadb"]) @@ -84,6 +96,10 @@ class DefaultRequirements(SuiteRequirements): ) @property + def reflect_indexes_with_ascdesc(self): + return fails_if(["oracle"]) + + @property def table_ddl_if_exists(self): """target platform supports IF NOT EXISTS / IF EXISTS for tables.""" @@ -508,6 +524,12 @@ class DefaultRequirements(SuiteRequirements): return exclusions.open() @property + def schema_create_delete(self): + """target database supports schema create and dropped with + 'CREATE SCHEMA' and 'DROP SCHEMA'""" + return exclusions.skip_if(["sqlite", "oracle"]) + + @property def cross_schema_fk_reflection(self): """target system must support reflection of inter-schema foreign keys""" @@ -547,11 +569,13 @@ class DefaultRequirements(SuiteRequirements): @property def check_constraint_reflection(self): - return fails_on_everything_except( - "postgresql", - "sqlite", - "oracle", - self._mysql_and_check_constraints_exist, + return only_on( + [ + "postgresql", + "sqlite", + "oracle", + self._mysql_and_check_constraints_exist, + ] ) @property @@ -562,7 +586,9 @@ class DefaultRequirements(SuiteRequirements): def temp_table_names(self): """target dialect supports listing of temporary table names""" - return only_on(["sqlite", "oracle"]) + skip_if(self._sqlite_file_db) + return only_on(["sqlite", "oracle", "postgresql"]) + skip_if( + self._sqlite_file_db + ) @property def temporary_views(self): @@ -792,8 +818,7 @@ class DefaultRequirements(SuiteRequirements): @property def views(self): """Target database must support VIEWs.""" - - return skip_if("drizzle", "no VIEW support") + return exclusions.open() @property def empty_strings_varchar(self): @@ -1331,14 +1356,28 @@ class DefaultRequirements(SuiteRequirements): ) ) + def _has_oracle_test_dblink(self, key): + def check(config): + assert config.db.dialect.name == "oracle" + name = config.file_config.get("sqla_testing", key) + if not name: + return False + with config.db.connect() as conn: + links = config.db.dialect._list_dblinks(conn) + return config.db.dialect.normalize_name(name) in links + + return only_on(["oracle"]) + only_if( + check, + f"{key} option not specified in config or dblink not found in db", + ) + @property def oracle_test_dblink(self): - return skip_if( - lambda config: not config.file_config.has_option( - "sqla_testing", "oracle_db_link" - ), - "oracle_db_link option not specified in config", - ) + return self._has_oracle_test_dblink("oracle_db_link") + + @property + def oracle_test_dblink2(self): + return self._has_oracle_test_dblink("oracle_db_link2") @property def postgresql_test_dblink(self): @@ -1774,6 +1813,19 @@ class DefaultRequirements(SuiteRequirements): return only_on(["mssql"]) + only_if(check) @property + def reflect_table_options(self): + return only_on(["mysql", "mariadb", "oracle"]) + + @property + def materialized_views(self): + """Target database must support MATERIALIZED VIEWs.""" + return only_on(["postgresql", "oracle"]) + + @property + def materialized_views_reflect_pk(self): + return only_on(["oracle"]) + + @property def uuid_data_type(self): """Return databases that support the UUID datatype.""" return only_on(("postgresql >= 8.3", "mariadb >= 10.7.0")) |
