diff options
| author | Federico Caselli <cfederico87@gmail.com> | 2022-06-18 21:08:27 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-06-18 21:08:27 +0000 |
| commit | be576e7d88b6038781e52f7ef79799dbad09cd54 (patch) | |
| tree | 772c368e107d13537ad8fb030b2b02fb1638169b /test/dialect | |
| parent | f7daad21ef66c29aecfbdb2b967641d0adad8779 (diff) | |
| parent | db08a699489c9b0259579d7ff7fd6bf3496ca3a2 (diff) | |
| download | sqlalchemy-be576e7d88b6038781e52f7ef79799dbad09cd54.tar.gz | |
Merge "rearchitect reflection for batched performance" into main
Diffstat (limited to 'test/dialect')
| -rw-r--r-- | test/dialect/mysql/test_reflection.py | 18 | ||||
| -rw-r--r-- | test/dialect/oracle/test_reflection.py | 554 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 20 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_reflection.py | 441 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_types.py | 35 | ||||
| -rw-r--r-- | test/dialect/test_sqlite.py | 31 |
6 files changed, 933 insertions, 166 deletions
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index f414c9c37..846001347 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -33,9 +33,9 @@ from sqlalchemy import UniqueConstraint from sqlalchemy.dialects.mysql import base as mysql from sqlalchemy.dialects.mysql import reflection as _reflection from sqlalchemy.schema import CreateIndex -from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ @@ -558,6 +558,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): ) def test_skip_not_describable(self, metadata, connection): + """This test is the only one that test the _default_multi_reflect + behaviour with UnreflectableTableError + """ + @event.listens_for(metadata, "before_drop") def cleanup(*arg, **kw): with testing.db.begin() as conn: @@ -579,14 +583,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): m.reflect(views=True, bind=conn) eq_(m.tables["test_t2"].name, "test_t2") - assert_raises_message( - exc.UnreflectableTableError, - "references invalid table", - Table, - "test_v", - MetaData(), - autoload_with=conn, - ) + with expect_raises_message( + exc.UnreflectableTableError, "references invalid table" + ): + Table("test_v", MetaData(), autoload_with=conn) @testing.exclude("mysql", "<", (5, 0, 0), "no information_schema support") def test_system_views(self): diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py index bf76dca43..53eb94df3 100644 --- a/test/dialect/oracle/test_reflection.py +++ b/test/dialect/oracle/test_reflection.py @@ -27,14 +27,18 @@ from sqlalchemy.dialects.oracle.base import BINARY_FLOAT from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION from sqlalchemy.dialects.oracle.base import NUMBER from sqlalchemy.dialects.oracle.base import REAL +from sqlalchemy.engine import ObjectKind from sqlalchemy.testing import assert_warns from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing import config from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_true from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import eq_compile_type from sqlalchemy.testing.schema import Table @@ -384,6 +388,7 @@ class SystemTableTablenamesTest(fixtures.TestBase): __backend__ = True def setup_test(self): + with testing.db.begin() as conn: conn.exec_driver_sql("create table my_table (id integer)") conn.exec_driver_sql( @@ -417,6 +422,14 @@ class SystemTableTablenamesTest(fixtures.TestBase): set(["my_table", "foo_table"]), ) + def test_reflect_system_table(self): + meta = MetaData() + t = Table("foo_table", meta, autoload_with=testing.db) + assert t.columns.keys() == ["id"] + + t = Table("my_temp_table", meta, autoload_with=testing.db) + assert t.columns.keys() == ["id"] + class DontReflectIOTTest(fixtures.TestBase): """test that index overflow tables aren't included in @@ -509,6 +522,228 @@ class TableReflectionTest(fixtures.TestBase): tbl = Table("test_compress", m2, autoload_with=connection) assert tbl.dialect_options["oracle"]["compress"] == "OLTP" + def test_reflect_hidden_column(self): + with testing.db.begin() as conn: + conn.exec_driver_sql( + "CREATE TABLE my_table(id integer, hide integer INVISIBLE)" + ) + + try: + insp = inspect(conn) + cols = insp.get_columns("my_table") + assert len(cols) == 1 + assert cols[0]["name"] == "id" + finally: + conn.exec_driver_sql("DROP TABLE my_table") + + +class ViewReflectionTest(fixtures.TestBase): + __only_on__ = "oracle" + __backend__ = True + + @classmethod + def setup_test_class(cls): + sql = """ + CREATE TABLE tbl ( + id INTEGER PRIMARY KEY, + data INTEGER + ); + + CREATE VIEW tbl_plain_v AS + SELECT id, data FROM tbl WHERE id > 100; + + -- comments on plain views are created with "comment on table" + -- because why not.. + COMMENT ON TABLE tbl_plain_v IS 'view comment'; + + CREATE MATERIALIZED VIEW tbl_v AS + SELECT id, data FROM tbl WHERE id > 42; + + COMMENT ON MATERIALIZED VIEW tbl_v IS 'my mat view comment'; + + CREATE MATERIALIZED VIEW tbl_v2 AS + SELECT id, data FROM tbl WHERE id < 42; + + COMMENT ON MATERIALIZED VIEW tbl_v2 IS 'my other mat view comment'; + + CREATE SYNONYM view_syn FOR tbl_plain_v; + CREATE SYNONYM %(test_schema)s.ts_v_s FOR tbl_plain_v; + + CREATE VIEW %(test_schema)s.schema_view AS + SELECT 1 AS value FROM dual; + + COMMENT ON TABLE %(test_schema)s.schema_view IS 'schema view comment'; + CREATE SYNONYM syn_schema_view FOR %(test_schema)s.schema_view; + """ + if testing.requires.oracle_test_dblink.enabled: + cls.dblink = config.file_config.get( + "sqla_testing", "oracle_db_link" + ) + sql += """ + CREATE SYNONYM syn_link FOR tbl_plain_v@%(link)s; + """ % { + "link": cls.dblink + } + with testing.db.begin() as conn: + for stmt in ( + sql % {"test_schema": testing.config.test_schema} + ).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + @classmethod + def teardown_test_class(cls): + sql = """ + DROP MATERIALIZED VIEW tbl_v; + DROP MATERIALIZED VIEW tbl_v2; + DROP VIEW tbl_plain_v; + DROP TABLE tbl; + DROP VIEW %(test_schema)s.schema_view; + DROP SYNONYM view_syn; + DROP SYNONYM %(test_schema)s.ts_v_s; + DROP SYNONYM syn_schema_view; + """ + if testing.requires.oracle_test_dblink.enabled: + sql += """ + DROP SYNONYM syn_link; + """ + with testing.db.begin() as conn: + for stmt in ( + sql % {"test_schema": testing.config.test_schema} + ).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + def test_get_names(self, connection): + insp = inspect(connection) + eq_(insp.get_table_names(), ["tbl"]) + eq_(insp.get_view_names(), ["tbl_plain_v"]) + eq_(insp.get_materialized_view_names(), ["tbl_v", "tbl_v2"]) + eq_( + insp.get_view_names(schema=testing.config.test_schema), + ["schema_view"], + ) + + def test_get_table_comment_on_view(self, connection): + insp = inspect(connection) + eq_(insp.get_table_comment("tbl_v"), {"text": "my mat view comment"}) + eq_(insp.get_table_comment("tbl_plain_v"), {"text": "view comment"}) + + def test_get_multi_view_comment(self, connection): + insp = inspect(connection) + plain = {(None, "tbl_plain_v"): {"text": "view comment"}} + mat = { + (None, "tbl_v"): {"text": "my mat view comment"}, + (None, "tbl_v2"): {"text": "my other mat view comment"}, + } + eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW), + mat, + ) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW), + {**plain, **mat}, + ) + ts = testing.config.test_schema + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW, schema=ts), + {(ts, "schema_view"): {"text": "schema view comment"}}, + ) + eq_(insp.get_multi_table_comment(), {(None, "tbl"): {"text": None}}) + + def test_get_table_comment_synonym(self, connection): + insp = inspect(connection) + eq_( + insp.get_table_comment("view_syn", oracle_resolve_synonyms=True), + {"text": "view comment"}, + ) + eq_( + insp.get_table_comment( + "syn_schema_view", oracle_resolve_synonyms=True + ), + {"text": "schema view comment"}, + ) + eq_( + insp.get_table_comment( + "ts_v_s", + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ), + {"text": "view comment"}, + ) + + def test_get_multi_view_comment_synonym(self, connection): + insp = inspect(connection) + exp = { + (None, "view_syn"): {"text": "view comment"}, + (None, "syn_schema_view"): {"text": "schema view comment"}, + } + if testing.requires.oracle_test_dblink.enabled: + exp[(None, "syn_link")] = {"text": "view comment"} + eq_( + insp.get_multi_table_comment( + oracle_resolve_synonyms=True, kind=ObjectKind.ANY_VIEW + ), + exp, + ) + ts = testing.config.test_schema + eq_( + insp.get_multi_table_comment( + oracle_resolve_synonyms=True, + schema=ts, + kind=ObjectKind.ANY_VIEW, + ), + {(ts, "ts_v_s"): {"text": "view comment"}}, + ) + + def test_get_view_definition(self, connection): + insp = inspect(connection) + eq_( + insp.get_view_definition("tbl_plain_v"), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_v"), + "SELECT id, data FROM tbl WHERE id > 42", + ) + with expect_raises(exc.NoSuchTableError): + eq_(insp.get_view_definition("view_syn"), None) + eq_( + insp.get_view_definition("view_syn", oracle_resolve_synonyms=True), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition( + "syn_schema_view", oracle_resolve_synonyms=True + ), + "SELECT 1 AS value FROM dual", + ) + eq_( + insp.get_view_definition( + "ts_v_s", + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ), + "SELECT id, data FROM tbl WHERE id > 100", + ) + + @testing.requires.oracle_test_dblink + def test_get_view_definition_dblink(self, connection): + insp = inspect(connection) + eq_( + insp.get_view_definition("syn_link", oracle_resolve_synonyms=True), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_plain_v", dblink=self.dblink), + "SELECT id, data FROM tbl WHERE id > 100", + ) + eq_( + insp.get_view_definition("tbl_v", dblink=self.dblink), + "SELECT id, data FROM tbl WHERE id > 42", + ) + class RoundTripIndexTest(fixtures.TestBase): __only_on__ = "oracle" @@ -722,8 +957,6 @@ class DBLinkReflectionTest(fixtures.TestBase): @classmethod def setup_test_class(cls): - from sqlalchemy.testing import config - cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link") # note that the synonym here is still not totally functional @@ -863,3 +1096,320 @@ class IdentityReflectionTest(fixtures.TablesTest): exp = common.copy() exp["order"] = True eq_(col["identity"], exp) + + +class AdditionalReflectionTests(fixtures.TestBase): + __only_on__ = "oracle" + __backend__ = True + + @classmethod + def setup_test_class(cls): + # currently assuming full DBA privs for the user. + # don't really know how else to go here unless + # we connect as the other user. + + sql = """ +CREATE TABLE %(schema)sparent( + id INTEGER, + data VARCHAR2(50), + CONSTRAINT parent_pk_%(schema_id)s PRIMARY KEY (id) +); +CREATE TABLE %(schema)smy_table( + id INTEGER, + name VARCHAR2(125), + related INTEGER, + data%(schema_id)s NUMBER NOT NULL, + CONSTRAINT my_table_pk_%(schema_id)s PRIMARY KEY (id), + CONSTRAINT my_table_fk_%(schema_id)s FOREIGN KEY(related) + REFERENCES %(schema)sparent(id), + CONSTRAINT my_table_check_%(schema_id)s CHECK (data%(schema_id)s > 42), + CONSTRAINT data_unique%(schema_id)s UNIQUE (data%(schema_id)s) +); +CREATE INDEX my_table_index_%(schema_id)s on %(schema)smy_table (id, name); +COMMENT ON TABLE %(schema)smy_table IS 'my table comment %(schema_id)s'; +COMMENT ON COLUMN %(schema)smy_table.name IS +'my table.name comment %(schema_id)s'; +""" + + with testing.db.begin() as conn: + for schema in ("", testing.config.test_schema): + dd = { + "schema": f"{schema}." if schema else "", + "schema_id": "sch" if schema else "", + } + for stmt in (sql % dd).split(";"): + if stmt.strip(): + conn.exec_driver_sql(stmt) + + @classmethod + def teardown_test_class(cls): + sql = """ +drop table %(schema)smy_table; +drop table %(schema)sparent; +""" + with testing.db.begin() as conn: + for schema in ("", testing.config.test_schema): + dd = {"schema": f"{schema}." if schema else ""} + for stmt in (sql % dd).split(";"): + if stmt.strip(): + try: + conn.exec_driver_sql(stmt) + except: + pass + + def setup_test(self): + self.dblink = config.file_config.get("sqla_testing", "oracle_db_link") + self.dblink2 = config.file_config.get( + "sqla_testing", "oracle_db_link2" + ) + self.columns = {} + self.indexes = {} + self.primary_keys = {} + self.comments = {} + self.uniques = {} + self.checks = {} + self.foreign_keys = {} + self.options = {} + self.allDicts = [ + self.columns, + self.indexes, + self.primary_keys, + self.comments, + self.uniques, + self.checks, + self.foreign_keys, + self.options, + ] + for schema in (None, testing.config.test_schema): + suffix = "sch" if schema else "" + + self.columns[schema] = { + (schema, "my_table"): [ + { + "name": "id", + "nullable": False, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": "name", + "nullable": True, + "type": eq_compile_type("VARCHAR(125)"), + "default": None, + "comment": f"my table.name comment {suffix}", + }, + { + "name": "related", + "nullable": True, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": f"data{suffix}", + "nullable": False, + "type": eq_compile_type("NUMBER"), + "default": None, + "comment": None, + }, + ], + (schema, "parent"): [ + { + "name": "id", + "nullable": False, + "type": eq_compile_type("INTEGER"), + "default": None, + "comment": None, + }, + { + "name": "data", + "nullable": True, + "type": eq_compile_type("VARCHAR(50)"), + "default": None, + "comment": None, + }, + ], + } + self.indexes[schema] = { + (schema, "my_table"): [ + { + "name": f"data_unique{suffix}", + "column_names": [f"data{suffix}"], + "dialect_options": {}, + "unique": True, + }, + { + "name": f"my_table_index_{suffix}", + "column_names": ["id", "name"], + "dialect_options": {}, + "unique": False, + }, + ], + (schema, "parent"): [], + } + self.primary_keys[schema] = { + (schema, "my_table"): { + "name": f"my_table_pk_{suffix}", + "constrained_columns": ["id"], + }, + (schema, "parent"): { + "name": f"parent_pk_{suffix}", + "constrained_columns": ["id"], + }, + } + self.comments[schema] = { + (schema, "my_table"): {"text": f"my table comment {suffix}"}, + (schema, "parent"): {"text": None}, + } + self.foreign_keys[schema] = { + (schema, "my_table"): [ + { + "name": f"my_table_fk_{suffix}", + "constrained_columns": ["related"], + "referred_schema": schema, + "referred_table": "parent", + "referred_columns": ["id"], + "options": {}, + } + ], + (schema, "parent"): [], + } + self.checks[schema] = { + (schema, "my_table"): [ + { + "name": f"my_table_check_{suffix}", + "sqltext": f"data{suffix} > 42", + } + ], + (schema, "parent"): [], + } + self.uniques[schema] = { + (schema, "my_table"): [ + { + "name": f"data_unique{suffix}", + "column_names": [f"data{suffix}"], + "duplicates_index": f"data_unique{suffix}", + } + ], + (schema, "parent"): [], + } + self.options[schema] = { + (schema, "my_table"): {}, + (schema, "parent"): {}, + } + + def test_tables(self, connection): + insp = inspect(connection) + + eq_(sorted(insp.get_table_names()), ["my_table", "parent"]) + + def _check_reflection(self, conn, schema, res_schema=False, **kw): + if res_schema is False: + res_schema = schema + insp = inspect(conn) + eq_( + insp.get_multi_columns(schema=schema, **kw), + self.columns[res_schema], + ) + eq_( + insp.get_multi_indexes(schema=schema, **kw), + self.indexes[res_schema], + ) + eq_( + insp.get_multi_pk_constraint(schema=schema, **kw), + self.primary_keys[res_schema], + ) + eq_( + insp.get_multi_table_comment(schema=schema, **kw), + self.comments[res_schema], + ) + eq_( + insp.get_multi_foreign_keys(schema=schema, **kw), + self.foreign_keys[res_schema], + ) + eq_( + insp.get_multi_check_constraints(schema=schema, **kw), + self.checks[res_schema], + ) + eq_( + insp.get_multi_unique_constraints(schema=schema, **kw), + self.uniques[res_schema], + ) + eq_( + insp.get_multi_table_options(schema=schema, **kw), + self.options[res_schema], + ) + + @testing.combinations(True, False, argnames="schema") + def test_schema_translate_map(self, connection, schema): + schema = testing.config.test_schema if schema else None + c = connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } + ) + self._check_reflection(c, schema) + + @testing.requires.oracle_test_dblink + def test_db_link(self, connection): + self._check_reflection(connection, schema=None, dblink=self.dblink) + self._check_reflection( + connection, + schema=testing.config.test_schema, + dblink=self.dblink, + ) + + def test_no_synonyms(self, connection): + # oracle_resolve_synonyms is ignored if there are no matching synonym + self._check_reflection( + connection, schema=None, oracle_resolve_synonyms=True + ) + connection.exec_driver_sql("CREATE SYNONYM tmp FOR parent") + for dict_ in self.allDicts: + dict_["tmp"] = {(None, "parent"): dict_[None][(None, "parent")]} + try: + self._check_reflection( + connection, + schema=None, + res_schema="tmp", + oracle_resolve_synonyms=True, + filter_names=["parent"], + ) + finally: + connection.exec_driver_sql("DROP SYNONYM tmp") + + @testing.requires.oracle_test_dblink + @testing.requires.oracle_test_dblink2 + def test_multi_dblink_synonyms(self, connection): + # oracle_resolve_synonyms handles multiple dblink at once + connection.exec_driver_sql( + f"CREATE SYNONYM s1 FOR my_table@{self.dblink}" + ) + connection.exec_driver_sql( + f"CREATE SYNONYM s2 FOR {testing.config.test_schema}." + f"my_table@{self.dblink2}" + ) + connection.exec_driver_sql("CREATE SYNONYM s3 FOR parent") + for dict_ in self.allDicts: + dict_["tmp"] = { + (None, "s1"): dict_[None][(None, "my_table")], + (None, "s2"): dict_[testing.config.test_schema][ + (testing.config.test_schema, "my_table") + ], + (None, "s3"): dict_[None][(None, "parent")], + } + fk = self.foreign_keys["tmp"][(None, "s1")][0] + fk["referred_table"] = "s3" + try: + self._check_reflection( + connection, + schema=None, + res_schema="tmp", + oracle_resolve_synonyms=True, + ) + finally: + connection.exec_driver_sql("DROP SYNONYM s1") + connection.exec_driver_sql("DROP SYNONYM s2") + connection.exec_driver_sql("DROP SYNONYM s3") diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 0093eb5ba..d55aa8203 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -887,19 +887,12 @@ class MiscBackendTest( ) @testing.combinations( - ((8, 1), False, False), - ((8, 1), None, False), - ((11, 5), True, False), - ((11, 5), False, True), + (True, False), + (False, True), ) - def test_backslash_escapes_detection( - self, version, explicit_setting, expected - ): + def test_backslash_escapes_detection(self, explicit_setting, expected): engine = engines.testing_engine() - def _server_version(conn): - return version - if explicit_setting is not None: @event.listens_for(engine, "connect", insert=True) @@ -912,11 +905,8 @@ class MiscBackendTest( ) dbapi_connection.commit() - with mock.patch.object( - engine.dialect, "_get_server_version_info", _server_version - ): - with engine.connect(): - eq_(engine.dialect._backslash_escapes, expected) + with engine.connect(): + eq_(engine.dialect._backslash_escapes, expected) def test_dbapi_autocommit_attribute(self): """all the supported DBAPIs have an .autocommit attribute. make diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index 3e0569d32..21b4149bc 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -27,17 +27,21 @@ from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import ExcludeConstraint from sqlalchemy.dialects.postgresql import INTEGER from sqlalchemy.dialects.postgresql import INTERVAL +from sqlalchemy.dialects.postgresql import pg_catalog from sqlalchemy.dialects.postgresql import TSRANGE +from sqlalchemy.engine import ObjectKind +from sqlalchemy.engine import ObjectScope from sqlalchemy.schema import CreateIndex from sqlalchemy.sql.schema import CheckConstraint from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock -from sqlalchemy.testing.assertions import assert_raises from sqlalchemy.testing.assertions import assert_warns from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.assertions import expect_raises from sqlalchemy.testing.assertions import is_ +from sqlalchemy.testing.assertions import is_false from sqlalchemy.testing.assertions import is_true @@ -231,17 +235,36 @@ class MaterializedViewReflectionTest( connection.execute(target.insert(), {"id": 89, "data": "d1"}) materialized_view = sa.DDL( - "CREATE MATERIALIZED VIEW test_mview AS " "SELECT * FROM testtable" + "CREATE MATERIALIZED VIEW test_mview AS SELECT * FROM testtable" ) plain_view = sa.DDL( - "CREATE VIEW test_regview AS " "SELECT * FROM testtable" + "CREATE VIEW test_regview AS SELECT data FROM testtable" ) sa.event.listen(testtable, "after_create", plain_view) sa.event.listen(testtable, "after_create", materialized_view) sa.event.listen( testtable, + "after_create", + sa.DDL("COMMENT ON VIEW test_regview IS 'regular view comment'"), + ) + sa.event.listen( + testtable, + "after_create", + sa.DDL( + "COMMENT ON MATERIALIZED VIEW test_mview " + "IS 'materialized view comment'" + ), + ) + sa.event.listen( + testtable, + "after_create", + sa.DDL("CREATE INDEX mat_index ON test_mview(data DESC)"), + ) + + sa.event.listen( + testtable, "before_drop", sa.DDL("DROP MATERIALIZED VIEW test_mview"), ) @@ -249,6 +272,12 @@ class MaterializedViewReflectionTest( testtable, "before_drop", sa.DDL("DROP VIEW test_regview") ) + def test_has_type(self, connection): + insp = inspect(connection) + is_true(insp.has_type("test_mview")) + is_true(insp.has_type("test_regview")) + is_true(insp.has_type("testtable")) + def test_mview_is_reflected(self, connection): metadata = MetaData() table = Table("test_mview", metadata, autoload_with=connection) @@ -265,49 +294,99 @@ class MaterializedViewReflectionTest( def test_get_view_names(self, inspect_fixture): insp, conn = inspect_fixture - eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) + eq_(set(insp.get_view_names()), set(["test_regview"])) - def test_get_view_names_plain(self, connection): + def test_get_materialized_view_names(self, inspect_fixture): + insp, conn = inspect_fixture + eq_(set(insp.get_materialized_view_names()), set(["test_mview"])) + + def test_get_view_names_reflection_cache_ok(self, connection): insp = inspect(connection) + eq_(set(insp.get_view_names()), set(["test_regview"])) eq_( - set(insp.get_view_names(include=("plain",))), set(["test_regview"]) + set(insp.get_materialized_view_names()), + set(["test_mview"]), + ) + eq_( + set(insp.get_view_names()).union( + insp.get_materialized_view_names() + ), + set(["test_regview", "test_mview"]), ) - def test_get_view_names_plain_string(self, connection): + def test_get_view_definition(self, connection): insp = inspect(connection) - eq_(set(insp.get_view_names(include="plain")), set(["test_regview"])) - def test_get_view_names_materialized(self, connection): - insp = inspect(connection) + def normalize(definition): + return re.sub(r"[\n\t ]+", " ", definition.strip()) + eq_( - set(insp.get_view_names(include=("materialized",))), - set(["test_mview"]), + normalize(insp.get_view_definition("test_mview")), + "SELECT testtable.id, testtable.data FROM testtable;", + ) + eq_( + normalize(insp.get_view_definition("test_regview")), + "SELECT testtable.data FROM testtable;", ) - def test_get_view_names_reflection_cache_ok(self, connection): + def test_get_view_comment(self, connection): insp = inspect(connection) eq_( - set(insp.get_view_names(include=("plain",))), set(["test_regview"]) + insp.get_table_comment("test_regview"), + {"text": "regular view comment"}, ) eq_( - set(insp.get_view_names(include=("materialized",))), - set(["test_mview"]), + insp.get_table_comment("test_mview"), + {"text": "materialized view comment"}, ) - eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) - def test_get_view_names_empty(self, connection): + def test_get_multi_view_comment(self, connection): insp = inspect(connection) - assert_raises(ValueError, insp.get_view_names, include=()) + eq_( + insp.get_multi_table_comment(), + {(None, "testtable"): {"text": None}}, + ) + plain = {(None, "test_regview"): {"text": "regular view comment"}} + mat = {(None, "test_mview"): {"text": "materialized view comment"}} + eq_(insp.get_multi_table_comment(kind=ObjectKind.VIEW), plain) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.MATERIALIZED_VIEW), + mat, + ) + eq_( + insp.get_multi_table_comment(kind=ObjectKind.ANY_VIEW), + {**plain, **mat}, + ) + eq_( + insp.get_multi_table_comment( + kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY + ), + {}, + ) - def test_get_view_definition(self, connection): + def test_get_multi_view_indexes(self, connection): insp = inspect(connection) + eq_(insp.get_multi_indexes(), {(None, "testtable"): []}) + + exp = { + "name": "mat_index", + "unique": False, + "column_names": ["data"], + "column_sorting": {"data": ("desc",)}, + } + if connection.dialect.server_version_info >= (11, 0): + exp["include_columns"] = [] + exp["dialect_options"] = {"postgresql_include": []} + plain = {(None, "test_regview"): []} + mat = {(None, "test_mview"): [exp]} + eq_(insp.get_multi_indexes(kind=ObjectKind.VIEW), plain) + eq_(insp.get_multi_indexes(kind=ObjectKind.MATERIALIZED_VIEW), mat) + eq_(insp.get_multi_indexes(kind=ObjectKind.ANY_VIEW), {**plain, **mat}) eq_( - re.sub( - r"[\n\t ]+", - " ", - insp.get_view_definition("test_mview").strip(), + insp.get_multi_indexes( + kind=ObjectKind.ANY_VIEW, scope=ObjectScope.TEMPORARY ), - "SELECT testtable.id, testtable.data FROM testtable;", + {}, ) @@ -993,9 +1072,9 @@ class ReflectionTest( go, [ "Skipped unsupported reflection of " - "expression-based index idx1", + "expression-based index idx1 of table party", "Skipped unsupported reflection of " - "expression-based index idx3", + "expression-based index idx3 of table party", ], ) @@ -1016,7 +1095,7 @@ class ReflectionTest( metadata.create_all(connection) - ind = connection.dialect.get_indexes(connection, t1, None) + ind = connection.dialect.get_indexes(connection, t1.name, None) partial_definitions = [] for ix in ind: @@ -1337,6 +1416,9 @@ class ReflectionTest( } ], ) + is_true(inspector.has_type("mood", "test_schema")) + is_true(inspector.has_type("mood", "*")) + is_false(inspector.has_type("mood")) def test_inspect_enums(self, metadata, inspect_fixture): @@ -1345,30 +1427,49 @@ class ReflectionTest( enum_type = postgresql.ENUM( "cat", "dog", "rat", name="pet", metadata=metadata ) + enum_type.create(conn) + conn.commit() - with conn.begin(): - enum_type.create(conn) - - eq_( - inspector.get_enums(), - [ - { - "visible": True, - "labels": ["cat", "dog", "rat"], - "name": "pet", - "schema": "public", - } - ], - ) - - def test_get_table_oid(self, metadata, inspect_fixture): - - inspector, conn = inspect_fixture + res = [ + { + "visible": True, + "labels": ["cat", "dog", "rat"], + "name": "pet", + "schema": "public", + } + ] + eq_(inspector.get_enums(), res) + is_true(inspector.has_type("pet", "*")) + is_true(inspector.has_type("pet")) + is_false(inspector.has_type("pet", "test_schema")) + + enum_type.drop(conn) + conn.commit() + eq_(inspector.get_enums(), res) + is_true(inspector.has_type("pet")) + inspector.clear_cache() + eq_(inspector.get_enums(), []) + is_false(inspector.has_type("pet")) + + def test_get_table_oid(self, metadata, connection): + Table("t1", metadata, Column("col", Integer)) + Table("t1", metadata, Column("col", Integer), schema="test_schema") + metadata.create_all(connection) + insp = inspect(connection) + oid = insp.get_table_oid("t1") + oid_schema = insp.get_table_oid("t1", schema="test_schema") + is_true(isinstance(oid, int)) + is_true(isinstance(oid_schema, int)) + is_true(oid != oid_schema) - with conn.begin(): - Table("some_table", metadata, Column("q", Integer)).create(conn) + with expect_raises(exc.NoSuchTableError): + insp.get_table_oid("does_not_exist") - assert inspector.get_table_oid("some_table") is not None + metadata.tables["t1"].drop(connection) + eq_(insp.get_table_oid("t1"), oid) + insp.clear_cache() + with expect_raises(exc.NoSuchTableError): + insp.get_table_oid("t1") def test_inspect_enums_case_sensitive(self, metadata, connection): sa.event.listen( @@ -1707,77 +1808,146 @@ class ReflectionTest( ) def test_reflect_check_warning(self): - rows = [("some name", "NOTCHECK foobar")] + rows = [("foo", "some name", "NOTCHECK foobar")] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 + with testing.expect_warnings( + "Could not parse CHECK constraint text: 'NOTCHECK foobar'" ): - with testing.expect_warnings( - "Could not parse CHECK constraint text: 'NOTCHECK foobar'" - ): - testing.db.dialect.get_check_constraints(conn, "foo") + testing.db.dialect.get_check_constraints(conn, "foo") def test_reflect_extra_newlines(self): rows = [ - ("some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"), - ("some other name", "CHECK ((b\nIS\nNOT\nNULL))"), - ("some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"), - ("some name", "CHECK (c != 'hi\nim a name\n')"), + ("foo", "some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"), + ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))"), + ("foo", "some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"), + ("foo", "some name", "CHECK (c != 'hi\nim a name\n')"), ] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 - ): - check_constraints = testing.db.dialect.get_check_constraints( - conn, "foo" - ) - eq_( - check_constraints, - [ - { - "name": "some name", - "sqltext": "a \nIS\n NOT\n\n NULL\n", - }, - {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"}, - { - "name": "some CRLF name", - "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL", - }, - {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"}, - ], - ) + check_constraints = testing.db.dialect.get_check_constraints( + conn, "foo" + ) + eq_( + check_constraints, + [ + { + "name": "some name", + "sqltext": "a \nIS\n NOT\n\n NULL\n", + }, + {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"}, + { + "name": "some CRLF name", + "sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL", + }, + {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"}, + ], + ) def test_reflect_with_not_valid_check_constraint(self): - rows = [("some name", "CHECK ((a IS NOT NULL)) NOT VALID")] + rows = [("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID")] conn = mock.Mock( execute=lambda *arg, **kw: mock.MagicMock( fetchall=lambda: rows, __iter__=lambda self: iter(rows) ) ) - with mock.patch.object( - testing.db.dialect, "get_table_oid", lambda *arg, **kw: 1 - ): - check_constraints = testing.db.dialect.get_check_constraints( - conn, "foo" + check_constraints = testing.db.dialect.get_check_constraints( + conn, "foo" + ) + eq_( + check_constraints, + [ + { + "name": "some name", + "sqltext": "a IS NOT NULL", + "dialect_options": {"not_valid": True}, + } + ], + ) + + def _apply_stm(self, connection, use_map): + if use_map: + return connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } ) - eq_( - check_constraints, - [ - { - "name": "some name", - "sqltext": "a IS NOT NULL", - "dialect_options": {"not_valid": True}, - } - ], + else: + return connection + + @testing.combinations(True, False, argnames="use_map") + @testing.combinations(True, False, argnames="schema") + def test_schema_translate_map(self, metadata, connection, use_map, schema): + schema = testing.config.test_schema if schema else None + Table( + "foo", + metadata, + Column("id", Integer, primary_key=True), + Column("a", Integer, index=True), + Column( + "b", + ForeignKey(f"{schema}.foo.id" if schema else "foo.id"), + unique=True, + ), + CheckConstraint("a>10", name="foo_check"), + comment="comm", + schema=schema, + ) + metadata.create_all(connection) + if use_map: + connection = connection.execution_options( + schema_translate_map={ + None: "foo", + testing.config.test_schema: "bar", + } ) + insp = inspect(connection) + eq_( + [c["name"] for c in insp.get_columns("foo", schema=schema)], + ["id", "a", "b"], + ) + eq_( + [ + i["column_names"] + for i in insp.get_indexes("foo", schema=schema) + ], + [["b"], ["a"]], + ) + eq_( + insp.get_pk_constraint("foo", schema=schema)[ + "constrained_columns" + ], + ["id"], + ) + eq_(insp.get_table_comment("foo", schema=schema), {"text": "comm"}) + eq_( + [ + f["constrained_columns"] + for f in insp.get_foreign_keys("foo", schema=schema) + ], + [["b"]], + ) + eq_( + [ + c["name"] + for c in insp.get_check_constraints("foo", schema=schema) + ], + ["foo_check"], + ) + eq_( + [ + u["column_names"] + for u in insp.get_unique_constraints("foo", schema=schema) + ], + [["b"]], + ) class CustomTypeReflectionTest(fixtures.TestBase): @@ -1804,9 +1974,23 @@ class CustomTypeReflectionTest(fixtures.TestBase): ("my_custom_type(ARG1)", ("ARG1", None)), ("my_custom_type(ARG1, ARG2)", ("ARG1", "ARG2")), ]: - column_info = dialect._get_column_info( - "colname", sch, None, False, {}, {}, "public", None, "", None + row_dict = { + "name": "colname", + "table_name": "tblname", + "format_type": sch, + "default": None, + "not_null": False, + "comment": None, + "generated": "", + "identity_options": None, + } + column_info = dialect._get_columns_info( + [row_dict], {}, {}, "public" ) + assert ("public", "tblname") in column_info + column_info = column_info[("public", "tblname")] + assert len(column_info) == 1 + column_info = column_info[0] assert isinstance(column_info["type"], self.CustomType) eq_(column_info["type"].arg1, args[0]) eq_(column_info["type"].arg2, args[1]) @@ -1951,3 +2135,64 @@ class IdentityReflectionTest(fixtures.TablesTest): exp = default.copy() exp.update(maxvalue=2**15 - 1) eq_(col["identity"], exp) + + +class TestReflectDifficultColTypes(fixtures.TablesTest): + __only_on__ = "postgresql" + __backend__ = True + + def define_tables(metadata): + Table( + "sample_table", + metadata, + Column("c1", Integer, primary_key=True), + Column("c2", Integer, unique=True), + Column("c3", Integer), + Index("sample_table_index", "c2", "c3"), + ) + + def check_int_list(self, row, key): + value = row[key] + is_true(isinstance(value, list)) + is_true(len(value) > 0) + is_true(all(isinstance(v, int) for v in value)) + + def test_pg_index(self, connection): + insp = inspect(connection) + + pgc_oid = insp.get_table_oid("sample_table") + cols = [ + col + for col in pg_catalog.pg_index.c + if testing.db.dialect.server_version_info + >= col.info.get("server_version", (0,)) + ] + + stmt = sa.select(*cols).filter_by(indrelid=pgc_oid) + rows = connection.execute(stmt).mappings().all() + is_true(len(rows) > 0) + cols = [ + col + for col in ["indkey", "indoption", "indclass", "indcollation"] + if testing.db.dialect.server_version_info + >= pg_catalog.pg_index.c[col].info.get("server_version", (0,)) + ] + for row in rows: + for col in cols: + self.check_int_list(row, col) + + def test_pg_constraint(self, connection): + insp = inspect(connection) + + pgc_oid = insp.get_table_oid("sample_table") + cols = [ + col + for col in pg_catalog.pg_constraint.c + if testing.db.dialect.server_version_info + >= col.info.get("server_version", (0,)) + ] + stmt = sa.select(*cols).filter_by(conrelid=pgc_oid) + rows = connection.execute(stmt).mappings().all() + is_true(len(rows) > 0) + for row in rows: + self.check_int_list(row, "conkey") diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py index fd4b91db1..79f029391 100644 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@ -454,11 +454,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): asserter.assert_( # check for table RegexSQL( - "select relname from pg_class c join pg_namespace.*", + "SELECT pg_catalog.pg_class.relname FROM pg_catalog." + "pg_class JOIN pg_catalog.pg_namespace.*", dialect="postgresql", ), # check for enum, just once - RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"), + RegexSQL( + r"SELECT pg_catalog.pg_type.typname .* WHERE " + "pg_catalog.pg_type.typname = ", + dialect="postgresql", + ), RegexSQL("CREATE TYPE myenum AS ENUM .*", dialect="postgresql"), RegexSQL(r"CREATE TABLE t .*", dialect="postgresql"), ) @@ -468,11 +473,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): asserter.assert_( RegexSQL( - "select relname from pg_class c join pg_namespace.*", + "SELECT pg_catalog.pg_class.relname FROM pg_catalog." + "pg_class JOIN pg_catalog.pg_namespace.*", dialect="postgresql", ), RegexSQL("DROP TABLE t", dialect="postgresql"), - RegexSQL(r".*SELECT EXISTS ", dialect="postgresql"), + RegexSQL( + r"SELECT pg_catalog.pg_type.typname .* WHERE " + "pg_catalog.pg_type.typname = ", + dialect="postgresql", + ), RegexSQL("DROP TYPE myenum", dialect="postgresql"), ) @@ -694,23 +704,6 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): connection, "fourfivesixtype" ) - def test_no_support(self, testing_engine): - def server_version_info(self): - return (8, 2) - - e = testing_engine() - dialect = e.dialect - dialect._get_server_version_info = server_version_info - - assert dialect.supports_native_enum - e.connect() - assert not dialect.supports_native_enum - - # initialize is called again on new pool - e.dispose() - e.connect() - assert not dialect.supports_native_enum - def test_reflection(self, metadata, connection): etype = Enum( "four", "five", "six", name="fourfivesixtype", metadata=metadata diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index fb4331998..ed9d67612 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -50,6 +50,7 @@ from sqlalchemy.testing import combinations from sqlalchemy.testing import config from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ @@ -856,27 +857,15 @@ class AttachedDBTest(fixtures.TestBase): ["foo", "bar"], ) - eq_( - [ - d["name"] - for d in insp.get_columns("nonexistent", schema="test_schema") - ], - [], - ) - eq_( - [ - d["name"] - for d in insp.get_columns("another_created", schema=None) - ], - [], - ) - eq_( - [ - d["name"] - for d in insp.get_columns("local_only", schema="test_schema") - ], - [], - ) + with expect_raises(exc.NoSuchTableError): + insp.get_columns("nonexistent", schema="test_schema") + + with expect_raises(exc.NoSuchTableError): + insp.get_columns("another_created", schema=None) + + with expect_raises(exc.NoSuchTableError): + insp.get_columns("local_only", schema="test_schema") + eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"]) def test_table_names_present(self): |
