diff options
| author | Federico Caselli <cfederico87@gmail.com> | 2021-10-14 21:45:57 +0200 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-06-18 14:57:26 -0400 |
| commit | db08a699489c9b0259579d7ff7fd6bf3496ca3a2 (patch) | |
| tree | 741feb8714d9f94f0ddfd03af437f94d2d5a505b /test/dialect | |
| parent | 964c26feecc7607d6d3a66240c3f33f4ae9215d4 (diff) | |
| download | sqlalchemy-db08a699489c9b0259579d7ff7fd6bf3496ca3a2.tar.gz | |
rearchitect reflection for batched performance
Rearchitected the schema reflection API to allow some dialects to make use
of high performing batch queries to reflect the schemas of many tables at
once using much fewer queries. The new performance features are targeted
first at the PostgreSQL and Oracle backends, and may be applied to any
dialect that makes use of SELECT queries against system catalog tables to
reflect tables (currently this omits the MySQL and SQLite dialects which
instead make use of parsing the "CREATE TABLE" statement, however these
dialects do not have a pre-existing performance issue with reflection. MS
SQL Server is still a TODO).
The new API is backwards compatible with the previous system, and should
require no changes to third party dialects to retain compatibility;
third party dialects can also opt into the new system by implementing
batched queries for schema reflection.
Along with this change is an updated reflection API that is fully
:pep:`484` typed, features many new methods and some changes.
Fixes: #4379
Change-Id: I897ec09843543aa7012bcdce758792ed3d415d08
Diffstat (limited to 'test/dialect')
| -rw-r--r-- | test/dialect/mysql/test_reflection.py | 18 | ||||
| -rw-r--r-- | test/dialect/oracle/test_reflection.py | 554 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 20 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_reflection.py | 441 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_types.py | 35 | ||||
| -rw-r--r-- | test/dialect/test_sqlite.py | 31 |
6 files changed, 933 insertions, 166 deletions
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index f414c9c37..846001347 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -33,9 +33,9 @@ from sqlalchemy import UniqueConstraint from sqlalchemy.dialects.mysql import base as mysql from sqlalchemy.dialects.mysql import reflection as _reflection from sqlalchemy.schema import CreateIndex -from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ @@ -558,6 +558,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): ) def test_skip_not_describable(self, metadata, connection): + """This test is the only one that test the _default_multi_reflect + behaviour with UnreflectableTableError + """ + @event.listens_for(metadata, "before_drop") def cleanup(*arg, **kw): with testing.db.begin() as conn: @@ -579,14 +583,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): m.reflect(views=True, bind=conn) eq_(m.tables["test_t2"].name, "test_t2") - assert_raises_message( - exc.UnreflectableTableError, - "references invalid table", - Table, - "test_v", - MetaData(), - autoload_with=conn, - ) + with expect_raises_message( + exc.UnreflectableTableError, "references invalid table" + ): + Table("test_v", MetaData(), autoload_with=conn) @testing.exclude("mysql", "<", (5, 0, 0), "no information_schema support") def test_system_views(self): diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py index bf76dca43..53eb94df3 100644 --- a/test/dialect/oracle/test_reflection.py +++ b/test/dialect/oracle/test_reflection.py @@ -27,14 +27,18 @@ from sqlalchemy.dialects.oracle.base import BINARY_FLOAT from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION from sqlalchemy.dialects.oracle.base import NUMBER from sqlalchemy.dialects.oracle.base import REAL +from sqlalchemy.engine import ObjectKind from sqlalchemy.testing import assert_warns from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing import config from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_true from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import eq_compile_type from sqlalchemy.testing.schema import Table @@ -384,6 +388,7 @@ class SystemTableTablenamesTest(fixtures.TestBase): __backend__ = True def setup_test(self): + with testing.db.begin() as conn: conn.exec_driver_sql("create table my_table (id integer)") conn.exec_driver_sql( @@ -417,6 +422,14 @@ class SystemTableTablenamesTest(fixtures.TestBase): set(["my_table", "foo_table"]), ) + def test_reflect_system_table(self): + meta = MetaData() + t = Table("foo_table", meta, autoload_with=testing.db) + assert t.columns.keys() == ["id"] + + t = Table("my_temp_table", meta, autoload_with=testing.db) + assert t.columns.keys() == ["id"] + class DontReflectIOTTest(fixtures.TestBase): """test that index overflow tables aren't included in @@ -509,6 +522,228 @@ class TableReflectionTest(fixtures.TestBase): tbl = Table("test_compress", m2, autoload_with=connection) assert tbl.dialect_options["oracle"]["compress"] == "OLTP" + def test_reflect_hidden_column(self): + with testing.db.begin() as conn: + conn.exec_driver_sql( + "CREATE TABLE my_table(id integer, hide integer INVISIBLE)" + ) + + try: + insp = inspect(conn) + cols = insp.get_columns("my_table") + assert len(cols) == 1 + assert cols[0]["name"] == "id" + finally: + conn.exec_driver_sql("DROP TABLE my_table") + + +class ViewReflectionTest(fixtures.TestBase): + __only_on__ = "oracle" + __backend__ = True + + @classmethod + def setup_test_class(cls): + sql = """ + CREATE TABLE tbl ( + id INTEGER PRIMARY KEY, + data INTEGER + ); + + CREATE VIEW tbl_plain_v AS + SELECT id, data FROM tbl WHERE id > 100; + + -- comments on plain views are created with "comment on table" + -- because why not.. + COMMENT ON TABLE tbl_plain_v IS 'view comment'; + + CREATE MATERIALIZED VIEW tbl_v AS + SELECT id, data FROM tbl WHERE id > 42; + + COMMENT ON MATERIALIZED VIEW tbl_v IS 'my mat view comment'; + + CREATE MATERIALIZED VIEW tbl_v2 AS + SELECT id, data FROM tbl WHERE id < 42; + + COMMENT ON MATERIALIZED VIEW tbl_v2 IS 'my other mat view comment'; + + CREATE SYNONYM view_syn FOR tbl_plain_v; + CREATE SYNONYM %(test_schema)s.ts_v_s FOR tbl_plain_v; + + CREATE VIEW %(test_schema)s.schema_view AS + SELECT 1 AS value FROM dual; + + COMMENT ON TABLE %(test_schema)s.schema_view IS 'schema view comment'; + CREATE SYNONYM syn_schema_view FOR %(test_schema)s.schema_view; + """ + if testing.requires.oracle_test_dblink.enabled: + cls.dblink = config.file_config.get( + "sqla_testing", "oracle_db_link" + ) + sql += """ + CREATE SYNONYM syn_link FOR tbl_plain_v@%(link)s; + """ % { + "link": cls.dblink + } + with testing.db.begin() as conn: + for stmt in ( + sql % {"test_schema": testing.config.test_schema} + ).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + @classmethod + def teardown_test_class(cls): + sql = """ + DROP MATERIALIZED VIEW tbl_v; + DROP MATERIALIZED VIEW tbl_v2; + DROP VIEW tbl_plain_v; + DROP TABLE tbl; + DROP VIEW %(test_schema)s.schema_view; + DROP SYNONYM view_syn; + DROP SYNONYM %(test_schema)s.ts_v_s; + DROP SYNONYM syn_schema_view; + """ + if testing.requires.oracle_test_dblink.enabled: + sql += """ + DROP SYNONYM syn_link; + """ + with testing.db.begin() as conn: + for stmt in ( + sql % {"test_schema": testing.config.test_schema} + ).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + def test_get_names(self, connection): + insp = inspect(connection) + eq_(insp.get_table_names(), ["tbl"]) + eq_(insp.get_view_names(), ["tbl_plain_v"]) + eq_(insp.get_materialized_view_names(), ["tbl_v", "tbl_v2"]) + eq_( + insp.get_view_names(schema=testing.config.test_schema), + ["schema_view"], + ) + + def test_get_table_comment_on_view(self, connection): + insp = inspect(connection) + eq_(insp.get_table_comment("tbl_v"), {"text": "my mat view comment"}) + eq_(insp.get_table_comment("tbl_plain_v"), {"text": "view comment"}) + + def test_get_multi_view_comment(self, connection): + insp = inspect(connection) + plain = {(None, "tbl_plain_v"): {"text": "view comment"}} + mat = { + (None, "tbl_v"): {"text": "my mat view comment"}, + (None, "tbl_v2"): {"text": "my other mat view comment"}, + } + eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW), + mat, + ) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW), + {**plain, **mat}, + ) + ts = testing.config.test_schema + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW, schema=ts), + {(ts, "schema_view"): {"text": "schema view comment"}}, + ) + eq_(insp.get_multi_table_comment(), {(None, "tbl"): {"text": None}}) + + def test_get_table_comment_synonym(self, connection): + insp = inspect(connection) + eq_( + insp.get_table_comment("view_syn", oracle_resolve_synonyms=True), + {"text": "view comment"}, + ) + eq_( + insp.get_table_comment( + "syn_schema_view", oracle_resolve_synonyms=True + ), + {"text": "schema view comment"}, + ) + eq_( + insp.get_table_comment( + "ts_v_s", + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ), + {"text": "view comment"}, + ) + + def test_get_multi_view_comment_synonym(self, connection): + insp = inspect(connection) + exp = { + (None, "view_syn"): {"text": "view comment"}, + (None, "syn_schema_view"): {"text": "schema view comment"}, + } + if testing.requires.oracle_test_dblink.enabled: + exp[(None, "syn_link")] = {"text": "view comment"} + eq_( + insp.get_multi_table_comment( + oracle_resolve_synonyms=True, kind=ObjectKind.ANY_VIEW + ), + exp, + ) + ts = testing.config.test_schema + eq_( + insp.get_multi_table_comment( + oracle_resolve_synonyms=True, + schema=ts, + kind=ObjectKind.ANY_VIEW, + ), + {(ts, "ts_v_s"): {"text": "view comment"}}, + ) + + def test_get_view_definition(self, connection): + insp = inspect(connection) + eq_( + insp.get_view_definition("tbl_plain_v"), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_v"), + "SELECT id, data FROM tbl WHERE id > 42", + ) + with expect_raises(exc.NoSuchTableError): + eq_(insp.get_view_definition("view_syn"), None) + eq_( + insp.get_view_definition("view_syn", oracle_resolve_synonyms=True), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition( + "syn_schema_view", oracle_resolve_synonyms=True + ), + "SELECT 1 AS value FROM dual", + ) + eq_( + insp.get_view_definition( + "ts_v_s", + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ), + "SELECT id, data FROM tbl WHERE id > 100", + ) + + @testing.requires.oracle_test_dblink + def test_get_view_definition_dblink(self, connection): + insp = inspect(connection) + eq_( + insp.get_view_definition("syn_link", oracle_resolve_synonyms=True), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_plain_v", dblink=self.dblink), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_v", dblink=self.dblink), + "SELECT id, data FROM tbl WHERE id > 42", + ) + class RoundTripIndexTest(fixtures.TestBase): __only_on__ = "oracle" @@ -722,8 +957,6 @@ class DBLinkReflectionTest(fixtures.TestBase): @classmethod def setup_test_class(cls): - from sqlalchemy.testing import config - cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link") # note that the synonym here is still not totally functional @@ -863,3 +1096,320 @@ class IdentityReflectionTest(fixtures.TablesTest): exp = common.copy() exp["order"] = True eq_(col["identity"], exp) + + +class AdditionalReflectionTests(fixtures.TestBase): + __only_on__ = "oracle" + __backend__ = True + + @classmethod + def setup_test_class(cls): + # currently assuming full DBA privs for the user. + # don't really know how else to go here unless + # we connect as the other user. + + sql = """ +CREATE TABLE %(schema)sparent( + id INTEGER, + data VARCHAR2(50), + CONSTRAINT parent_pk_%(schema_id)s PRIMARY KEY (id) +); +CREATE TABLE %(schema)smy_table( + id INTEGER, + name VARCHAR2(125), + related INTEGER, + data%(schema_id)s NUMBER NOT NULL, + CONSTRAINT my_table_pk_%(schema_id)s PRIMARY KEY (id), + CONSTRAINT my_table_fk_%(schema_id)s FOREIGN KEY(related) + REFERENCES %(schema)sparent(id), + CONSTRAINT my_table_check_%(schema_id)s CHECK (data%(schema_id)s > 42), + CONSTRAINT data_unique%(schema_id)s UNIQUE (data%(schema_id)s) +); +CREATE INDEX my_table_index_%(schema_id)s on %(schema)smy_table (id, name); +COMMENT ON TABLE %(schema)smy_table IS 'my table comment %(schema_id)s'; +COMMENT ON COLUMN %(schema)smy_table.name IS +'my table.name comment %(schema_id)s'; +""" + + with testing.db.begin() as conn: + for schema in ("", testing.config.test_schema): + dd = { + "schema": f"{schema}." if schema else "", + "schema_id": "sch" if schema else "", + } + for stmt in (sql % dd).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + @classmethod + def teardown_test_class(cls): + sql = """ +drop table %(schema)smy_table; +drop table %(schema)sparent; +""" + with testing.db.begin() as conn: + for schema in ("", testing.config.test_schema): + dd = {"schema": f"{schema}." if schema else ""} + for stmt in (sql % dd).split(";"): + if stmt.strip(): + try: + conn.exec_driver_sql(stmt) + except: + pass + + def setup_test(self): + self.dblink = config.file_config.get("sqla_testing", "oracle_db_link") + self.dblink2 = config.file_config.get( + "sqla_testing", "oracle_db_link2" + ) + self.columns = {} + self.indexes = {} + self.primary_keys = {} + self.comments = {} + self.uniques = {} + self.checks = {} + self.foreign_keys = {} + self.options = {} + self.allDicts = [ + self.columns, + self.indexes, + self.primary_keys, + self.comments, + self.uniques, + self.checks, + self.foreign_keys, + self.options, + ] + for schema in (None, testing.config.test_schema): + suffix = "sch" if schema else "" + + self.columns[schema] = { + (schema, "my_table"): [ + { + "name": "id", + "nullable": False, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": "name", + "nullable": True, + "type": eq_compile_type("VARCHAR(125)"), + "default": None, + "comment": f"my table.name comment {suffix}", + }, + { + "name": "related", + "nullable": True, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": f"data{suffix}", + "nullable": False, + "type": eq_compile_type("NUMBER"), + "default": None, + "comment": None, + }, + ], + (schema, "parent"): [ + { + "name": "id", + "nullable": False, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": "data", + "nullable": True, + "type": eq_compile_type("VARCHAR(50)"), + "default": None, + "comment": None, + }, + ], + } + self.indexes[schema] = { + (schema, "my_table"): [ + { + "name": f"data_unique{suffix}", + "column_names": [f"data{suffix}"], + "dialect_options": {}, + "unique": True, + }, + { + "name": f"my_table_index_{suffix}", + "column_names": ["id", "name"], + "dialect_options": {}, + "unique": False, + }, + ], + (schema, "parent"): [], + } + self.primary_keys[schema] = { + (schema, "my_table"): { + "name": f"my_table_pk_{suffix}", + "constrained_columns": ["id"], + }, + (schema, "parent"): { + "name": f"parent_pk_{suffix}", + "constrained_columns": ["id"], + }, + } + self.comments[schema] = { + (schema, "my_table"): {"text": f"my table comment {suffix}"}, + (schema, "parent"): {"text": None}, + } + self.foreign_keys[schema] = { + (schema, "my_table"): [ + { + "name": f"my_table_fk_{suffix}", + "constrained_columns": ["related"], + "referred_schema": schema, + "referred_table": "parent", + "referred_columns": ["id"], + "options": {}, + } + ], + (schema, "parent"): [], + } + self.checks[schema] = { + (schema, "my_table"): [ + { + "name": f"my_table_check_{suffix}", + "sqltext": f"data{suffix} > 42", + } + ], + (schema, "parent"): [], + } + self.uniques[schema] = { + (schema, "my_table"): [ + { + "name": f"data_unique{suffix}", + "column_names": [f"data{suffix}"], + "duplicates_index": f"data_unique{suffix}", + } + ], + (schema, "parent"): [], + } + self.options[schema] = { + (schema, "my_table"): {}, + (schema, "parent"): {}, + } + + def test_tables(self, connection): + insp = inspect(connection) + + eq_(sorted(insp.get_table_names()), ["my_table", "parent"]) + + def _check_reflection(self, conn, schema, res_schema=False, **kw): + if res_schema is False: + res_schema = schema + insp = inspect(conn) + eq_( + insp.get_multi_columns(schema=schema, **kw), + self.columns[res_schema], + ) + eq_( + insp.get_multi_indexes(schema=schema, **kw), + self.indexes[res_schema], + ) + eq_( + insp.get_multi_pk_constraint(schema=schema, **kw), + self.primary_keys[res_schema], + ) + eq_( + insp.get_multi_table_comment(schema=schema, **kw), + self.comments[res_schema], + ) + eq_( + insp.get_multi_foreign_keys(schema=schema, **kw), + self.foreign_keys[res_schema], + ) + eq_( + insp.get_multi_check_constraints(schema=schema, **kw), + self.checks[res_schema], + ) + eq_( + insp.get_multi_unique_constraints(schema=schema, **kw), + self.uniques[res_schema], + ) + eq_( + insp.get_multi_table_options(schema=schema, **kw), + self.options[res_schema], + ) + + @testing.combinations(True, False, argnames="schema") + def test_schema_translate_map(self, connection, schema): + schema = testing.config.test_schema if schema else None + c = connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } + ) + self._check_reflection(c, schema) + + @testing.requires.oracle_test_dblink + def test_db_link(self, connection): + self._check_reflection(connection, schema=None, dblink=self.dblink) + self._check_reflection( + connection, + schema=testing.config.test_schema, + dblink=self.dblink, + ) + + def test_no_synonyms(self, connection): + # oracle_resolve_synonyms is ignored if there are no matching synonym + self._check_reflection( + connection, schema=None, oracle_resolve_synonyms=True + ) + connection.exec_driver_sql("CREATE SYNONYM tmp FOR parent") + for dict_ in self.allDicts: + dict_["tmp"] = {(None, "parent"): dict_[None][(None, "parent")]} + try: + self._check_reflection( + connection, + schema=None, + res_schema="tmp", + oracle_resolve_synonyms=True, + filter_names=["parent"], + ) + finally: + connection.exec_driver_sql("DROP SYNONYM tmp") + + @testing.requires.oracle_test_dblink + @testing.requires.oracle_test_dblink2 + def test_multi_dblink_synonyms(self, connection): + # oracle_resolve_synonyms handles multiple dblink at once + connection.exec_driver_sql( + f"CREATE SYNONYM s1 FOR my_table@{self.dblink}" + ) + connection.exec_driver_sql( + f"CREATE SYNONYM s2 FOR {testing.config.test_schema}." + f"my_table@{self.dblink2}" + ) + connection.exec_driver_sql("CREATE SYNONYM s3 FOR parent") + for dict_ in self.allDicts: + dict_["tmp"] = { + (None, "s1"): dict_[None][(None, "my_table")], + (None, "s2"): dict_[testing.config.test_schema][ + (testing.config.test_schema, "my_table") + ], + (None, "s3"): dict_[None][(None, "parent")], + } + fk = self.foreign_keys["tmp"][(None, "s1")][0] + fk["referred_table"] = "s3" + try: + self._check_reflection( + connection, + schema=None, + res_schema="tmp", + oracle_resolve_synonyms=True, + ) + finally: + connection.exec_driver_sql("DROP SYNONYM s1") + connection.exec_driver_sql("DROP SYNONYM s2") + connection.exec_driver_sql("DROP SYNONYM s3") diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 0093eb5ba..d55aa8203 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -887,19 +887,12 @@ class MiscBackendTest( ) @testing.combinations( - ((8, 1), False, False), - ((8, 1), None, False), - ((11, 5), True, False), - ((11, 5), False, True), + (True, False), + (False, True), ) - def test_backslash_escapes_detection( - self, version, explicit_setting, expected - ): + def test_backslash_escapes_detection(self, explicit_setting, expected): engine = engines.testing_engine() - def _server_version(conn): - return version - if explicit_setting is not None: @event.listens_for(engine, "connect", insert=True) @@ -912,11 +905,8 @@ class MiscBackendTest( ) dbapi_connection.commit() - with mock.patch.object( - engine.dialect, "_get_server_version_info", _server_version - ): - with engine.connect(): - eq_(engine.dialect._backslash_escapes, expected) + with engine.connect(): + eq_(engine.dialect._backslash_escapes, expected) def test_dbapi_autocommit_attribute(self): """all the supported DBAPIs have an .autocommit attribute. make diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index cbb1809e4..00e5dc5b9 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -27,17 +27,21 @@ from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import ExcludeConstraint from sqlalchemy.dialects.postgresql import INTEGER from sqlalchemy.dialects.postgresql import INTERVAL +from sqlalchemy.dialects.postgresql import pg_catalog from sqlalchemy.dialects.postgresql import TSRANGE +from sqlalchemy.engine import ObjectKind +from sqlalchemy.engine import ObjectScope from sqlalchemy.schema import CreateIndex from sqlalchemy.sql.schema import CheckConstraint from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock -from sqlalchemy.testing.assertions import assert_raises from sqlalchemy.testing.assertions import assert_warns from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.assertions import expect_raises from sqlalchemy.testing.assertions import is_ +from sqlalchemy.testing.assertions import is_false from sqlalchemy.testing.assertions import is_true @@ -231,17 +235,36 @@ class MaterializedViewReflectionTest( connection.execute(target.insert(), {"id": 89, "data": "d1"}) materialized_view = sa.DDL( - "CREATE MATERIALIZED VIEW test_mview AS " "SELECT * FROM testtable" + "CREATE MATERIALIZED VIEW test_mview AS SELECT * FROM testtable" ) plain_view = sa.DDL( - "CREATE VIEW test_regview AS " "SELECT * FROM testtable" + "CREATE VIEW test_regview AS SELECT data FROM testtable" ) sa.event.listen(testtable, "after_create", plain_view) sa.event.listen(testtable, "after_create", materialized_view) sa.event.listen( testtable, + "after_create", + sa.DDL("COMMENT ON VIEW test_regview IS 'regular view comment'"), + ) + sa.event.listen( + testtable, + "after_create", + sa.DDL( + "COMMENT ON MATERIALIZED VIEW test_mview " + "IS 'materialized view comment'" + ), + ) + sa.event.listen( + testtable, + "after_create", + sa.DDL("CREATE INDEX mat_index ON test_mview(data DESC)"), + ) + + sa.event.listen( + testtable, "before_drop", sa.DDL("DROP MATERIALIZED VIEW test_mview"), ) @@ -249,6 +272,12 @@ class MaterializedViewReflectionTest( testtable, "before_drop", sa.DDL("DROP VIEW test_regview") ) + def test_has_type(self, connection): + insp = inspect(connection) + is_true(insp.has_type("test_mview")) + is_true(insp.has_type("test_regview")) + is_true(insp.has_type("testtable")) + def test_mview_is_reflected(self, connection): metadata = MetaData() table = Table("test_mview", metadata, autoload_with=connection) @@ -265,49 +294,99 @@ class MaterializedViewReflectionTest( def test_get_view_names(self, inspect_fixture): insp, conn = inspect_fixture - eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) + eq_(set(insp.get_view_names()), set(["test_regview"])) - def test_get_view_names_plain(self, connection): + def test_get_materialized_view_names(self, inspect_fixture): + insp, conn = inspect_fixture + eq_(set(insp.get_materialized_view_names()), set(["test_mview"])) + + def test_get_view_names_reflection_cache_ok(self, connection): insp = inspect(connection) + eq_(set(insp.get_view_names()), set(["test_regview"])) eq_( - set(insp.get_view_names(include=("plain",))), set(["test_regview"]) + set(insp.get_materialized_view_names()), + set(["test_mview"]), + ) + eq_( + set(insp.get_view_names()).union( + insp.get_materialized_view_names() + ), + set(["test_regview", "test_mview"]), ) - def test_get_view_names_plain_string(self, connection): + def test_get_view_definition(self, connection): insp = inspect(connection) - eq_(set(insp.get_view_names(include="plain")), set(["test_regview"])) - def test_get_view_names_materialized(self, connection): - insp = inspect(connection) + def normalize(definition): + return re.sub(r"[\n\t ]+", " ", definition.strip()) + eq_( - set(insp.get_view_names(include=("materialized",))), - set(["test_mview"]), + normalize(insp.get_view_definition("test_mview")), + "SELECT testtable.id, testtable.data FROM testtable;", + ) + eq_( + normalize(insp.get_view_definition("test_regview")), + "SELECT testtable.data FROM testtable;", ) - def test_get_view_names_reflection_cache_ok(self, connection): + def test_get_view_comment(self, connection): insp = inspect(connection) eq_( - set(insp.get_view_names(include=("plain",))), set(["test_regview"]) + insp.get_table_comment("test_regview"), + {"text": "regular view comment"}, ) eq_( - set(insp.get_view_names(include=("materialized",))), - set(["test_mview"]), + insp.get_table_comment("test_mview"), + {"text": "materialized view comment"}, ) - eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) - def test_get_view_names_empty(self, connection): + def test_get_multi_view_comment(self, connection): insp = inspect(connection) - assert_raises(ValueError, insp.get_view_names, include=()) + eq_( + insp.get_multi_table_comment(), + {(None, "testtable"): {"text": None}}, + ) + plain = {(None, "test_regview"): {"text": "regular view comment"}} + mat = {(None, "test_mview"): {"text": "materialized view comment"}} + eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW), + mat, + ) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW), + {**plain, **mat}, + ) + eq_( + insp.get_multi_table_comment( + kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY + ), + {}, + ) - def test_get_view_definition(self, connection): + def test_get_multi_view_indexes(self, connection): insp = inspect(connection) + eq_(insp.get_multi_indexes(), {(None, "testtable"): []}) + + exp = { + "name": "mat_index", + "unique": False, + "column_names": ["data"], + "column_sorting": {"data": ("desc",)}, + } + if connection.dialect.server_version_info >= (11, 0): + exp["include_columns"] = [] + exp["dialect_options"] = {"postgresql_include": []} + plain = {(None, "test_regview"): []} + mat = {(None, "test_mview"): [exp]} + eq_(insp.get_multi_indexes(kind=ObjectKind.VIEW), plain) + eq_(insp.get_multi_indexes(kind=ObjectKind.MATERIALIZED_VIEW), mat) + eq_(insp.get_multi_indexes(kind=ObjectKind.ANY_VIEW), {**plain, **mat}) eq_( - re.sub( - r"[\n\t ]+", - " ", - insp.get_view_definition("test_mview").strip(), + insp.get_multi_indexes( + kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY ), - "SELECT testtable.id, testtable.data FROM testtable;", + {}, ) @@ -993,9 +1072,9 @@ class ReflectionTest( go, [ "Skipped unsupported reflection of " - "expression-based index idx1", + "expression-based index idx1 of table party", "Skipped unsupported reflection of " - "expression-based index idx3", + "expression-based index idx3 of table party", ], ) @@ -1016,7 +1095,7 @@ class ReflectionTest( metadata.create_all(connection) - ind = connection.dialect.get_indexes(connection, t1, None) + ind = connection.dialect.get_indexes(connection, t1.name, None) partial_definitions = [] for ix in ind: @@ -1337,6 +1416,9 @@ class ReflectionTest( } ], ) + is_true(inspector.has_type("mood", "test_schema")) + is_true(inspector.has_type("mood", "*")) + is_false(inspector.has_type("mood")) def test_inspect_enums(self, metadata, inspect_fixture): @@ -1345,30 +1427,49 @@ class ReflectionTest( enum_type = postgresql.ENUM( "cat", "dog", "rat", name="pet", metadata=metadata ) + enum_type.create(conn) + conn.commit() - with conn.begin(): - enum_type.create(conn) - - eq_( - inspector.get_enums(), - [ - { - "visible": True, - "labels": ["cat", "dog", "rat"], - "name": "pet", - "schema": "public", - } - ], - ) - - def test_get_table_oid(self, metadata, inspect_fixture): - - inspector, conn = inspect_fixture + res = [ + { + "visible": True, + "labels": ["cat", "dog", "rat"], + "name": "pet", + "schema": "public", + } + ] + eq_(inspector.get_enums(), res) + is_true(inspector.has_type("pet", "*")) + is_true(inspector.has_type("pet")) + is_false(inspector.has_type("pet", "test_schema")) + + enum_type.drop(conn) + conn.commit() + eq_(inspector.get_enums(), res) + is_true(inspector.has_type("pet")) + inspector.clear_cache() + eq_(inspector.get_enums(), []) + is_false(inspector.has_type("pet")) + + def test_get_table_oid(self, metadata, connection): + Table("t1", metadata, Column("col", Integer)) + Table("t1", metadata, Column("col", Integer), schema="test_schema") + metadata.create_all(connection) + insp = inspect(connection) + oid = insp.get_table_oid("t1") + oid_schema = insp.get_table_oid("t1", schema="test_schema") + is_true(isinstance(oid, int)) + is_true(isinstance(oid_schema, int)) + is_true(oid != oid_schema) - with conn.begin(): - Table("some_table", metadata, Column("q", Integer)).create(conn) + with expect_raises(exc.NoSuchTableError): + insp.get_table_oid("does_not_exist") - assert inspector.get_table_oid("some_table") is not None + metadata.tables["t1"].drop(connection) + eq_(insp.get_table_oid("t1"), oid) + insp.clear_cache() + with expect_raises(exc.NoSuchTableError): + insp.get_table_oid("t1") def test_inspect_enums_case_sensitive(self, metadata, connection): sa.event.listen( @@ -1707,77 +1808,146 @@ class ReflectionTest( ) def test_reflect_check_warning(self): - rows = [("some name", "NOTCHECK foobar")] + rows = [("foo", "some name", "NOTCHECK foobar")] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 + with testing.expect_warnings( + "Could not parse CHECK constraint text: 'NOTCHECK foobar'" ): - with testing.expect_warnings( - "Could not parse CHECK constraint text: 'NOTCHECK foobar'" - ): - testing.db.dialect.get_check_constraints(conn, "foo") + testing.db.dialect.get_check_constraints(conn, "foo") def test_reflect_extra_newlines(self): rows = [ - ("some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"), - ("some other name", "CHECK ((b\nIS\nNOT\nNULL))"), - ("some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"), - ("some name", "CHECK (c != 'hi\nim a name\n')"), + ("foo", "some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"), + ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))"), + ("foo", "some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"), + ("foo", "some name", "CHECK (c != 'hi\nim a name\n')"), ] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 - ): - check_constraints = testing.db.dialect.get_check_constraints( - conn, "foo" - ) - eq_( - check_constraints, - [ - { - "name": "some name", - "sqltext": "a \nIS\n NOT\n\n NULL\n", - }, - {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"}, - { - "name": "some CRLF name", - "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL", - }, - {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"}, - ], - ) + check_constraints = testing.db.dialect.get_check_constraints( + conn, "foo" + ) + eq_( + check_constraints, + [ + { + "name": "some name", + "sqltext": "a \nIS\n NOT\n\n NULL\n", + }, + {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"}, + { + "name": "some CRLF name", + "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL", + }, + {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"}, + ], + ) def test_reflect_with_not_valid_check_constraint(self): - rows = [("some name", "CHECK ((a IS NOT NULL)) NOT VALID")] + rows = [("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID")] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 - ): - check_constraints = testing.db.dialect.get_check_constraints( - conn, "foo" + check_constraints = testing.db.dialect.get_check_constraints( + conn, "foo" + ) + eq_( + check_constraints, + [ + { + "name": "some name", + "sqltext": "a IS NOT NULL", + "dialect_options": {"not_valid": True}, + } + ], + ) + + def _apply_stm(self, connection, use_map): + if use_map: + return connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } ) - eq_( - check_constraints, - [ - { - "name": "some name", - "sqltext": "a IS NOT NULL", - "dialect_options": {"not_valid": True}, - } - ], + else: + return connection + + @testing.combinations(True, False, argnames="use_map") + @testing.combinations(True, False, argnames="schema") + def test_schema_translate_map(self, metadata, connection, use_map, schema): + schema = testing.config.test_schema if schema else None + Table( + "foo", + metadata, + Column("id", Integer, primary_key=True), + Column("a", Integer, index=True), + Column( + "b", + ForeignKey(f"{schema}.foo.id" if schema else "foo.id"), + unique=True, + ), + CheckConstraint("a>10", name="foo_check"), + comment="comm", + schema=schema, + ) + metadata.create_all(connection) + if use_map: + connection = connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } ) + insp = inspect(connection) + eq_( + [c["name"] for c in insp.get_columns("foo", schema=schema)], + ["id", "a", "b"], + ) + eq_( + [ + i["column_names"] + for i in insp.get_indexes("foo", schema=schema) + ], + [["b"], ["a"]], + ) + eq_( + insp.get_pk_constraint("foo", schema=schema)[ + "constrained_columns" + ], + ["id"], + ) + eq_(insp.get_table_comment("foo", schema=schema), {"text": "comm"}) + eq_( + [ + f["constrained_columns"] + for f in insp.get_foreign_keys("foo", schema=schema) + ], + [["b"]], + ) + eq_( + [ + c["name"] + for c in insp.get_check_constraints("foo", schema=schema) + ], + ["foo_check"], + ) + eq_( + [ + u["column_names"] + for u in insp.get_unique_constraints("foo", schema=schema) + ], + [["b"]], + ) class CustomTypeReflectionTest(fixtures.TestBase): @@ -1804,9 +1974,23 @@ class CustomTypeReflectionTest(fixtures.TestBase): ("my_custom_type(ARG1)", ("ARG1", None)), ("my_custom_type(ARG1, ARG2)", ("ARG1", "ARG2")), ]: - column_info = dialect._get_column_info( - "colname", sch, None, False, {}, {}, "public", None, "", None + row_dict = { + "name": "colname", + "table_name": "tblname", + "format_type": sch, + "default": None, + "not_null": False, + "comment": None, + "generated": "", + "identity_options": None, + } + column_info = dialect._get_columns_info( + [row_dict], {}, {}, "public" ) + assert ("public", "tblname") in column_info + column_info = column_info[("public", "tblname")] + assert len(column_info) == 1 + column_info = column_info[0] assert isinstance(column_info["type"], self.CustomType) eq_(column_info["type"].arg1, args[0]) eq_(column_info["type"].arg2, args[1]) @@ -1951,3 +2135,64 @@ class IdentityReflectionTest(fixtures.TablesTest): exp = default.copy() exp.update(maxvalue=2**15 - 1) eq_(col["identity"], exp) + + +class TestReflectDifficultColTypes(fixtures.TablesTest): + __only_on__ = "postgresql" + __backend__ = True + + def define_tables(metadata): + Table( + "sample_table", + metadata, + Column("c1", Integer, primary_key=True), + Column("c2", Integer, unique=True), + Column("c3", Integer), + Index("sample_table_index", "c2", "c3"), + ) + + def check_int_list(self, row, key): + value = row[key] + is_true(isinstance(value, list)) + is_true(len(value) > 0) + is_true(all(isinstance(v, int) for v in value)) + + def test_pg_index(self, connection): + insp = inspect(connection) + + pgc_oid = insp.get_table_oid("sample_table") + cols = [ + col + for col in pg_catalog.pg_index.c + if testing.db.dialect.server_version_info + >= col.info.get("server_version", (0,)) + ] + + stmt = sa.select(*cols).filter_by(indrelid=pgc_oid) + rows = connection.execute(stmt).mappings().all() + is_true(len(rows) > 0) + cols = [ + col + for col in ["indkey", "indoption", "indclass", "indcollation"] + if testing.db.dialect.server_version_info + >= pg_catalog.pg_index.c[col].info.get("server_version", (0,)) + ] + for row in rows: + for col in cols: + self.check_int_list(row, col) + + def test_pg_constraint(self, connection): + insp = inspect(connection) + + pgc_oid = insp.get_table_oid("sample_table") + cols = [ + col + for col in pg_catalog.pg_constraint.c + if testing.db.dialect.server_version_info + >= col.info.get("server_version", (0,)) + ] + stmt = sa.select(*cols).filter_by(conrelid=pgc_oid) + rows = connection.execute(stmt).mappings().all() + is_true(len(rows) > 0) + for row in rows: + self.check_int_list(row, "conkey") diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py index 266263d5f..8b6532ce5 100644 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@ -451,11 +451,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): asserter.assert_( # check for table RegexSQL( - "select relname from pg_class c join pg_namespace.*", + "SELECT pg_catalog.pg_class.relname FROM pg_catalog." + "pg_class JOIN pg_catalog.pg_namespace.*", dialect="postgresql", ), # check for enum, just once - RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"), + RegexSQL( + r"SELECT pg_catalog.pg_type.typname .* WHERE " + "pg_catalog.pg_type.typname = ", + dialect="postgresql", + ), RegexSQL("CREATE TYPE myenum AS ENUM .*", dialect="postgresql"), RegexSQL(r"CREATE TABLE t .*", dialect="postgresql"), ) @@ -465,11 +470,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): asserter.assert_( RegexSQL( - "select relname from pg_class c join pg_namespace.*", + "SELECT pg_catalog.pg_class.relname FROM pg_catalog." + "pg_class JOIN pg_catalog.pg_namespace.*", dialect="postgresql", ), RegexSQL("DROP TABLE t", dialect="postgresql"), - RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"), + RegexSQL( + r"SELECT pg_catalog.pg_type.typname .* WHERE " + "pg_catalog.pg_type.typname = ", + dialect="postgresql", + ), RegexSQL("DROP TYPE myenum", dialect="postgresql"), ) @@ -691,23 +701,6 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): connection, "fourfivesixtype" ) - def test_no_support(self, testing_engine): - def server_version_info(self): - return (8, 2) - - e = testing_engine() - dialect = e.dialect - dialect._get_server_version_info = server_version_info - - assert dialect.supports_native_enum - e.connect() - assert not dialect.supports_native_enum - - # initialize is called again on new pool - e.dispose() - e.connect() - assert not dialect.supports_native_enum - def test_reflection(self, metadata, connection): etype = Enum( "four", "five", "six", name="fourfivesixtype", metadata=metadata diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index fb4331998..ed9d67612 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -50,6 +50,7 @@ from sqlalchemy.testing import combinations from sqlalchemy.testing import config from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ @@ -856,27 +857,15 @@ class AttachedDBTest(fixtures.TestBase): ["foo", "bar"], ) - eq_( - [ - d["name"] - for d in insp.get_columns("nonexistent", schema="test_schema") - ], - [], - ) - eq_( - [ - d["name"] - for d in insp.get_columns("another_created", schema=None) - ], - [], - ) - eq_( - [ - d["name"] - for d in insp.get_columns("local_only", schema="test_schema") - ], - [], - ) + with expect_raises(exc.NoSuchTableError): + insp.get_columns("nonexistent", schema="test_schema") + + with expect_raises(exc.NoSuchTableError): + insp.get_columns("another_created", schema=None) + + with expect_raises(exc.NoSuchTableError): + insp.get_columns("local_only", schema="test_schema") + eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"]) def test_table_names_present(self): |
