summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2022-10-29 01:36:19 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-10-29 01:36:19 +0000
commited7bef700477ac85819040478ce2f2cca78c5700 (patch)
treecbc47a732d2c4defffd90b491b4fa867245f43ef /lib/sqlalchemy
parent92cafa979370478f5fc698fd82b4caac3d7971fa (diff)
parent59f2d0c96152128d8e02e43dcda045593153c86e (diff)
downloadsqlalchemy-ed7bef700477ac85819040478ce2f2cca78c5700.tar.gz
Merge "use only object_id() function for temp tables" into main
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py25
-rw-r--r--lib/sqlalchemy/testing/requirements.py5
-rw-r--r--lib/sqlalchemy/testing/suite/test_reflection.py77
3 files changed, 74 insertions, 33 deletions
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index fdb0704f6..5d42d98e3 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -3226,28 +3226,13 @@ class MSDialect(default.DefaultDialect):
if tablename.startswith("#"): # temporary table
# mssql does not support temporary views
# SQL Error [4103] [S0001]: "#v": Temporary views are not allowed
- tables = ischema.mssql_temp_table_columns
-
- s = sql.select(tables.c.table_name).where(
- tables.c.table_name.like(
- self._temp_table_name_like_pattern(tablename)
+ return bool(
+ connection.scalar(
+ # U filters on user tables only.
+ text("SELECT object_id(:table_name, 'U')"),
+ {"table_name": "tempdb.dbo.[{}]".format(tablename)},
)
)
-
- # #7168: fetch all (not just first match) in case some other #temp
- # table with the same name happens to appear first
- table_names = connection.scalars(s).all()
- # #6910: verify it's not a temp table from another session
- for table_name in table_names:
- if bool(
- connection.scalar(
- text("SELECT object_id(:table_name)"),
- {"table_name": "tempdb.dbo.[{}]".format(table_name)},
- )
- ):
- return True
- else:
- return False
else:
tables = ischema.tables
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index 1d6f1103d..8ceb2af2f 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -710,6 +710,11 @@ class SuiteRequirements(Requirements):
return exclusions.closed()
@property
+ def has_temp_table(self):
+ """target dialect supports checking a single temp table name"""
+ return exclusions.closed()
+
+ @property
def temporary_tables(self):
"""target database supports temporary tables"""
return exclusions.open()
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py
index dd7d38c5a..68d1c13fa 100644
--- a/lib/sqlalchemy/testing/suite/test_reflection.py
+++ b/lib/sqlalchemy/testing/suite/test_reflection.py
@@ -45,7 +45,23 @@ from ...testing import mock
metadata, users = None, None
-class HasTableTest(fixtures.TablesTest):
+class OneConnectionTablesTest(fixtures.TablesTest):
+ @classmethod
+ def setup_bind(cls):
+ # TODO: when temp tables are subject to server reset,
+ # this will also have to disable that server reset from
+ # happening
+ if config.requirements.independent_connections.enabled:
+ from sqlalchemy import pool
+
+ return engines.testing_engine(
+ options=dict(poolclass=pool.StaticPool, scope="class"),
+ )
+ else:
+ return config.db
+
+
+class HasTableTest(OneConnectionTablesTest):
__backend__ = True
@classmethod
@@ -67,6 +83,8 @@ class HasTableTest(fixtures.TablesTest):
if testing.requires.view_reflection:
cls.define_views(metadata)
+ if testing.requires.has_temp_table.enabled:
+ cls.define_temp_tables(metadata)
@classmethod
def define_views(cls, metadata):
@@ -87,6 +105,37 @@ class HasTableTest(fixtures.TablesTest):
DDL("DROP VIEW %s.vv" % (config.test_schema)),
)
+ @classmethod
+ def temp_table_name(cls):
+ return get_temp_table_name(
+ config, config.db, f"user_tmp_{config.ident}"
+ )
+
+ @classmethod
+ def define_temp_tables(cls, metadata):
+ kw = temp_table_keyword_args(config, config.db)
+ table_name = cls.temp_table_name()
+ user_tmp = Table(
+ table_name,
+ metadata,
+ Column("id", sa.INT, primary_key=True),
+ Column("name", sa.VARCHAR(50)),
+ **kw,
+ )
+ if (
+ testing.requires.view_reflection.enabled
+ and testing.requires.temporary_views.enabled
+ ):
+ event.listen(
+ user_tmp,
+ "after_create",
+ DDL(
+ "create temporary view user_tmp_v as "
+ "select * from user_tmp_%s" % config.ident
+ ),
+ )
+ event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
+
def test_has_table(self):
with config.db.begin() as conn:
is_true(config.db.dialect.has_table(conn, "test_table"))
@@ -130,6 +179,19 @@ class HasTableTest(fixtures.TablesTest):
insp = inspect(connection)
is_true(insp.has_table("vv"))
+ @testing.requires.has_temp_table
+ def test_has_table_temp_table(self, connection):
+ insp = inspect(connection)
+ temp_table_name = self.temp_table_name()
+ is_true(insp.has_table(temp_table_name))
+
+ @testing.requires.has_temp_table
+ @testing.requires.view_reflection
+ @testing.requires.temporary_views
+ def test_has_table_temp_view(self, connection):
+ insp = inspect(connection)
+ is_true(insp.has_table("user_tmp_v"))
+
@testing.requires.views
@testing.requires.schemas
def test_has_table_view_schema(self, connection):
@@ -393,23 +455,12 @@ def _multi_combination(fn):
return schema(scope(kind(filter_names(fn))))
-class ComponentReflectionTest(ComparesTables, fixtures.TablesTest):
+class ComponentReflectionTest(ComparesTables, OneConnectionTablesTest):
run_inserts = run_deletes = None
__backend__ = True
@classmethod
- def setup_bind(cls):
- if config.requirements.independent_connections.enabled:
- from sqlalchemy import pool
-
- return engines.testing_engine(
- options=dict(poolclass=pool.StaticPool, scope="class"),
- )
- else:
- return config.db
-
- @classmethod
def define_tables(cls, metadata):
cls.define_reflected_tables(metadata, None)
if testing.requires.schemas.enabled: