summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/suite/test_reflection.py
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2020-09-19 22:29:38 +0200
committerMike Bayer <mike_mp@zzzcomputing.com>2020-09-28 18:11:12 -0400
commit7362d454f46107cae4076ce54e9fa430c3370734 (patch)
treeae7545a99a76995ef31a879f09fb1c0fe6764f4b /lib/sqlalchemy/testing/suite/test_reflection.py
parentc3f102c9fe9811fd5286628cc6aafa5fbc324621 (diff)
downloadsqlalchemy-7362d454f46107cae4076ce54e9fa430c3370734.tar.gz
Add reflection for Identity columns
Added support for reflecting "identity" columns, which are now returned as part of the structure returned by :meth:`_reflection.Inspector.get_columns`. When reflecting full :class:`_schema.Table` objects, identity columns will be represented using the :class:`_schema.Identity` construct. Fixed compilation error on oracle for sequence and identity column ``nominvalue`` and ``nomaxvalue`` options that require no space in them. Improved test compatibility with oracle 18. As part of the support for reflecting :class:`_schema.Identity` objects, the method :meth:`_reflection.Inspector.get_columns` no longer returns ``mssql_identity_start`` and ``mssql_identity_increment`` as part of the ``dialect_options``. Use the information in the ``identity`` key instead. The mssql dialect will assume that at least MSSQL 2005 is used. There is no hard exception raised if a previous version is detected, but operations may fail for older versions. Fixes: #5527 Fixes: #5324 Change-Id: If039fe637c46b424499e6bac54a2cbc0dc54cb57
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_reflection.py')
-rw-r--r--lib/sqlalchemy/testing/suite/test_reflection.py140
1 files changed, 140 insertions, 0 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py
index f728310d7..f8f93b563 100644
--- a/lib/sqlalchemy/testing/suite/test_reflection.py
+++ b/lib/sqlalchemy/testing/suite/test_reflection.py
@@ -15,6 +15,7 @@ from ..schema import Column
from ..schema import Table
from ... import event
from ... import ForeignKey
+from ... import Identity
from ... import inspect
from ... import Integer
from ... import MetaData
@@ -1442,6 +1443,144 @@ class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
)
+class IdentityReflectionTest(fixtures.TablesTest):
+ run_inserts = run_deletes = None
+
+ __backend__ = True
+ __requires__ = ("identity_columns", "table_reflection")
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "t1",
+ metadata,
+ Column("normal", Integer),
+ Column("id1", Integer, Identity()),
+ )
+ Table(
+ "t2",
+ metadata,
+ Column(
+ "id2",
+ Integer,
+ Identity(
+ always=True,
+ start=2,
+ increment=3,
+ minvalue=-2,
+ maxvalue=42,
+ cycle=True,
+ cache=4,
+ ),
+ ),
+ )
+ if testing.requires.schemas.enabled:
+ Table(
+ "t1",
+ metadata,
+ Column("normal", Integer),
+ Column("id1", Integer, Identity(always=True, start=20)),
+ schema=config.test_schema,
+ )
+
+ def check(self, value, exp, approx):
+ if testing.requires.identity_columns_standard.enabled:
+ common_keys = (
+ "always",
+ "start",
+ "increment",
+ "minvalue",
+ "maxvalue",
+ "cycle",
+ "cache",
+ )
+ for k in list(value):
+ if k not in common_keys:
+ value.pop(k)
+ if approx:
+ eq_(len(value), len(exp))
+ for k in value:
+ if k == "minvalue":
+ is_true(value[k] <= exp[k])
+ elif k in {"maxvalue", "cache"}:
+ is_true(value[k] >= exp[k])
+ else:
+ eq_(value[k], exp[k], k)
+ else:
+ eq_(value, exp)
+ else:
+ eq_(value["start"], exp["start"])
+ eq_(value["increment"], exp["increment"])
+
+ def test_reflect_identity(self):
+ insp = inspect(config.db)
+
+ cols = insp.get_columns("t1") + insp.get_columns("t2")
+ for col in cols:
+ if col["name"] == "normal":
+ is_false("identity" in col)
+ elif col["name"] == "id1":
+ is_true(col["autoincrement"] in (True, "auto"))
+ eq_(col["default"], None)
+ is_true("identity" in col)
+ self.check(
+ col["identity"],
+ dict(
+ always=False,
+ start=1,
+ increment=1,
+ minvalue=1,
+ maxvalue=2147483647,
+ cycle=False,
+ cache=1,
+ ),
+ approx=True,
+ )
+ elif col["name"] == "id2":
+ is_true(col["autoincrement"] in (True, "auto"))
+ eq_(col["default"], None)
+ is_true("identity" in col)
+ self.check(
+ col["identity"],
+ dict(
+ always=True,
+ start=2,
+ increment=3,
+ minvalue=-2,
+ maxvalue=42,
+ cycle=True,
+ cache=4,
+ ),
+ approx=False,
+ )
+
+ @testing.requires.schemas
+ def test_reflect_identity_schema(self):
+ insp = inspect(config.db)
+
+ cols = insp.get_columns("t1", schema=config.test_schema)
+ for col in cols:
+ if col["name"] == "normal":
+ is_false("identity" in col)
+ elif col["name"] == "id1":
+ is_true(col["autoincrement"] in (True, "auto"))
+ eq_(col["default"], None)
+ is_true("identity" in col)
+ self.check(
+ col["identity"],
+ dict(
+ always=True,
+ start=20,
+ increment=1,
+ minvalue=1,
+ maxvalue=2147483647,
+ cycle=False,
+ cache=1,
+ ),
+ approx=True,
+ )
+
+
__all__ = (
"ComponentReflectionTest",
"QuotedNameArgumentTest",
@@ -1449,4 +1588,5 @@ __all__ = (
"HasIndexTest",
"NormalizedNameTest",
"ComputedReflectionTest",
+ "IdentityReflectionTest",
)