diff options
Diffstat (limited to 'lib/sqlalchemy/dialects/sqlite/base.py')
-rw-r--r-- | lib/sqlalchemy/dialects/sqlite/base.py | 384 |
1 files changed, 266 insertions, 118 deletions
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 33003297c..1ed89bacb 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -9,6 +9,7 @@ .. dialect:: sqlite :name: SQLite +.. _sqlite_datetime: Date and Time Types ------------------- @@ -23,6 +24,20 @@ These types represent dates and times as ISO formatted strings, which also nicely support ordering. There's no reliance on typical "libc" internals for these functions so historical dates are fully supported. +Ensuring Text affinity +^^^^^^^^^^^^^^^^^^^^^^ + +The DDL rendered for these types is the standard ``DATE``, ``TIME`` +and ``DATETIME`` indicators. However, custom storage formats can also be +applied to these types. When the +storage format is detected as containing no alpha characters, the DDL for +these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, +so that the column continues to have textual affinity. + +.. seealso:: + + `Type Affinity <http://www.sqlite.org/datatype3.html#affinity>`_ - in the SQLite documentation + .. _sqlite_autoincrement: SQLite Auto Incrementing Behavior @@ -92,8 +107,10 @@ The following subsections introduce areas that are impacted by SQLite's file-based architecture and additionally will usually require workarounds to work when using the pysqlite driver. +.. _sqlite_isolation_level: + Transaction Isolation Level -=========================== +---------------------------- SQLite supports "transaction isolation" in a non-standard way, along two axes. One is that of the `PRAGMA read_uncommitted <http://www.sqlite.org/pragma.html#pragma_read_uncommitted>`_ @@ -126,7 +143,7 @@ by *not even emitting BEGIN* until the first write operation. for techniques to work around this behavior. SAVEPOINT Support -================= +---------------------------- SQLite supports SAVEPOINTs, which only function once a transaction is begun. SQLAlchemy's SAVEPOINT support is available using the @@ -142,7 +159,7 @@ won't work at all with pysqlite unless workarounds are taken. for techniques to work around this behavior. Transactional DDL -================= +---------------------------- The SQLite database supports transactional :term:`DDL` as well. In this case, the pysqlite driver is not only failing to start transactions, @@ -186,6 +203,15 @@ new connections through the usage of events:: cursor.execute("PRAGMA foreign_keys=ON") cursor.close() +.. warning:: + + When SQLite foreign keys are enabled, it is **not possible** + to emit CREATE or DROP statements for tables that contain + mutually-dependent foreign key constraints; + to emit the DDL for these tables requires that ALTER TABLE be used to + create or drop these constraints separately, for which SQLite has + no support. + .. seealso:: `SQLite Foreign Key Support <http://www.sqlite.org/foreignkeys.html>`_ @@ -193,6 +219,9 @@ new connections through the usage of events:: :ref:`event_toplevel` - SQLAlchemy event API. + :ref:`use_alter` - more information on SQLAlchemy's facilities for handling + mutually-dependent foreign key constraints. + .. _sqlite_type_reflection: Type Reflection @@ -255,7 +284,7 @@ from ... import util from ...engine import default, reflection from ...sql import compiler -from ...types import (BLOB, BOOLEAN, CHAR, DATE, DECIMAL, FLOAT, +from ...types import (BLOB, BOOLEAN, CHAR, DECIMAL, FLOAT, INTEGER, REAL, NUMERIC, SMALLINT, TEXT, TIMESTAMP, VARCHAR) @@ -271,6 +300,25 @@ class _DateTimeMixin(object): if storage_format is not None: self._storage_format = storage_format + @property + def format_is_text_affinity(self): + """return True if the storage format will automatically imply + a TEXT affinity. + + If the storage format contains no non-numeric characters, + it will imply a NUMERIC storage format on SQLite; in this case, + the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, + TIME_CHAR. + + .. versionadded:: 1.0.0 + + """ + spec = self._storage_format % { + "year": 0, "month": 0, "day": 0, "hour": 0, + "minute": 0, "second": 0, "microsecond": 0 + } + return bool(re.search(r'[^0-9]', spec)) + def adapt(self, cls, **kw): if issubclass(cls, _DateTimeMixin): if self._storage_format: @@ -526,7 +574,9 @@ ischema_names = { 'BOOLEAN': sqltypes.BOOLEAN, 'CHAR': sqltypes.CHAR, 'DATE': sqltypes.DATE, + 'DATE_CHAR': sqltypes.DATE, 'DATETIME': sqltypes.DATETIME, + 'DATETIME_CHAR': sqltypes.DATETIME, 'DOUBLE': sqltypes.FLOAT, 'DECIMAL': sqltypes.DECIMAL, 'FLOAT': sqltypes.FLOAT, @@ -537,6 +587,7 @@ ischema_names = { 'SMALLINT': sqltypes.SMALLINT, 'TEXT': sqltypes.TEXT, 'TIME': sqltypes.TIME, + 'TIME_CHAR': sqltypes.TIME, 'TIMESTAMP': sqltypes.TIMESTAMP, 'VARCHAR': sqltypes.VARCHAR, 'NVARCHAR': sqltypes.NVARCHAR, @@ -611,7 +662,8 @@ class SQLiteCompiler(compiler.SQLCompiler): class SQLiteDDLCompiler(compiler.DDLCompiler): def get_column_specification(self, column, **kwargs): - coltype = self.dialect.type_compiler.process(column.type) + coltype = self.dialect.type_compiler.process( + column.type, type_expression=column) colspec = self.preparer.format_column(column) + " " + coltype default = self.get_column_default_string(column) if default is not None: @@ -667,9 +719,30 @@ class SQLiteDDLCompiler(compiler.DDLCompiler): class SQLiteTypeCompiler(compiler.GenericTypeCompiler): - def visit_large_binary(self, type_): + def visit_large_binary(self, type_, **kw): return self.visit_BLOB(type_) + def visit_DATETIME(self, type_, **kw): + if not isinstance(type_, _DateTimeMixin) or \ + type_.format_is_text_affinity: + return super(SQLiteTypeCompiler, self).visit_DATETIME(type_) + else: + return "DATETIME_CHAR" + + def visit_DATE(self, type_, **kw): + if not isinstance(type_, _DateTimeMixin) or \ + type_.format_is_text_affinity: + return super(SQLiteTypeCompiler, self).visit_DATE(type_) + else: + return "DATE_CHAR" + + def visit_TIME(self, type_, **kw): + if not isinstance(type_, _DateTimeMixin) or \ + type_.format_is_text_affinity: + return super(SQLiteTypeCompiler, self).visit_TIME(type_) + else: + return "TIME_CHAR" + class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): reserved_words = set([ @@ -855,22 +928,9 @@ class SQLiteDialect(default.DefaultDialect): return [row[0] for row in rs] def has_table(self, connection, table_name, schema=None): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - statement = "%stable_info(%s)" % (pragma, qtable) - cursor = _pragma_cursor(connection.execute(statement)) - row = cursor.fetchone() - - # consume remaining rows, to work around - # http://www.sqlite.org/cvstrac/tktview?tn=1884 - while not cursor.closed and cursor.fetchone() is not None: - pass - - return row is not None + info = self._get_table_pragma( + connection, "table_info", table_name, schema=schema) + return bool(info) @reflection.cache def get_view_names(self, connection, schema=None, **kw): @@ -912,18 +972,11 @@ class SQLiteDialect(default.DefaultDialect): @reflection.cache def get_columns(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - statement = "%stable_info(%s)" % (pragma, qtable) - c = _pragma_cursor(connection.execute(statement)) + info = self._get_table_pragma( + connection, "table_info", table_name, schema=schema) - rows = c.fetchall() columns = [] - for row in rows: + for row in info: (name, type_, nullable, default, primary_key) = ( row[1], row[2].upper(), not row[3], row[4], row[5]) @@ -1010,92 +1063,192 @@ class SQLiteDialect(default.DefaultDialect): @reflection.cache def get_foreign_keys(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - statement = "%sforeign_key_list(%s)" % (pragma, qtable) - c = _pragma_cursor(connection.execute(statement)) - fkeys = [] + # sqlite makes this *extremely difficult*. + # First, use the pragma to get the actual FKs. + pragma_fks = self._get_table_pragma( + connection, "foreign_key_list", + table_name, schema=schema + ) + fks = {} - while True: - row = c.fetchone() - if row is None: - break + + for row in pragma_fks: (numerical_id, rtbl, lcol, rcol) = ( row[0], row[2], row[3], row[4]) - self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol) - return fkeys + if rcol is None: + rcol = lcol - def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol): - # sqlite won't return rcol if the table was created with REFERENCES - # <tablename>, no col - if rcol is None: - rcol = lcol + if self._broken_fk_pragma_quotes: + rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl) - if self._broken_fk_pragma_quotes: - rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl) + if numerical_id in fks: + fk = fks[numerical_id] + else: + fk = fks[numerical_id] = { + 'name': None, + 'constrained_columns': [], + 'referred_schema': None, + 'referred_table': rtbl, + 'referred_columns': [], + } + fks[numerical_id] = fk - try: - fk = fks[numerical_id] - except KeyError: - fk = { - 'name': None, - 'constrained_columns': [], - 'referred_schema': None, - 'referred_table': rtbl, - 'referred_columns': [], - } - fkeys.append(fk) - fks[numerical_id] = fk - - if lcol not in fk['constrained_columns']: fk['constrained_columns'].append(lcol) - if rcol not in fk['referred_columns']: fk['referred_columns'].append(rcol) - return fk + + def fk_sig(constrained_columns, referred_table, referred_columns): + return tuple(constrained_columns) + (referred_table,) + \ + tuple(referred_columns) + + # then, parse the actual SQL and attempt to find DDL that matches + # the names as well. SQLite saves the DDL in whatever format + # it was typed in as, so need to be liberal here. + + keys_by_signature = dict( + ( + fk_sig( + fk['constrained_columns'], + fk['referred_table'], fk['referred_columns']), + fk + ) for fk in fks.values() + ) + + table_data = self._get_table_sql(connection, table_name, schema=schema) + if table_data is None: + # system tables, etc. + return [] + + def parse_fks(): + FK_PATTERN = ( + '(?:CONSTRAINT (\w+) +)?' + 'FOREIGN KEY *\( *(.+?) *\) +' + 'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\)' + ) + + for match in re.finditer(FK_PATTERN, table_data, re.I): + ( + constraint_name, constrained_columns, + referred_quoted_name, referred_name, + referred_columns) = match.group(1, 2, 3, 4, 5) + constrained_columns = list( + self._find_cols_in_sig(constrained_columns)) + if not referred_columns: + referred_columns = constrained_columns + else: + referred_columns = list( + self._find_cols_in_sig(referred_columns)) + referred_name = referred_quoted_name or referred_name + yield ( + constraint_name, constrained_columns, + referred_name, referred_columns) + fkeys = [] + + for ( + constraint_name, constrained_columns, + referred_name, referred_columns) in parse_fks(): + sig = fk_sig( + constrained_columns, referred_name, referred_columns) + if sig not in keys_by_signature: + util.warn( + "WARNING: SQL-parsed foreign key constraint " + "'%s' could not be located in PRAGMA " + "foreign_keys for table %s" % ( + sig, + table_name + )) + continue + key = keys_by_signature.pop(sig) + key['name'] = constraint_name + fkeys.append(key) + # assume the remainders are the unnamed, inline constraints, just + # use them as is as it's extremely difficult to parse inline + # constraints + fkeys.extend(keys_by_signature.values()) + return fkeys + + def _find_cols_in_sig(self, sig): + for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): + yield match.group(1) or match.group(2) + + @reflection.cache + def get_unique_constraints(self, connection, table_name, + schema=None, **kw): + + auto_index_by_sig = {} + for idx in self.get_indexes( + connection, table_name, schema=schema, + include_auto_indexes=True, **kw): + if not idx['name'].startswith("sqlite_autoindex"): + continue + sig = tuple(idx['column_names']) + auto_index_by_sig[sig] = idx + + table_data = self._get_table_sql( + connection, table_name, schema=schema, **kw) + if not table_data: + return [] + + unique_constraints = [] + + def parse_uqs(): + UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) +)?UNIQUE *\((.+?)\)' + INLINE_UNIQUE_PATTERN = ( + '(?:(".+?")|([a-z0-9]+)) ' + '+[a-z0-9_ ]+? +UNIQUE') + + for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): + name, cols = match.group(1, 2) + yield name, list(self._find_cols_in_sig(cols)) + + # we need to match inlines as well, as we seek to differentiate + # a UNIQUE constraint from a UNIQUE INDEX, even though these + # are kind of the same thing :) + for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): + cols = list( + self._find_cols_in_sig(match.group(1) or match.group(2))) + yield None, cols + + for name, cols in parse_uqs(): + sig = tuple(cols) + if sig in auto_index_by_sig: + auto_index_by_sig.pop(sig) + parsed_constraint = { + 'name': name, + 'column_names': cols + } + unique_constraints.append(parsed_constraint) + # NOTE: auto_index_by_sig might not be empty here, + # the PRIMARY KEY may have an entry. + return unique_constraints @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - include_auto_indexes = kw.pop('include_auto_indexes', False) - qtable = quote(table_name) - statement = "%sindex_list(%s)" % (pragma, qtable) - c = _pragma_cursor(connection.execute(statement)) + pragma_indexes = self._get_table_pragma( + connection, "index_list", table_name, schema=schema) indexes = [] - while True: - row = c.fetchone() - if row is None: - break + + include_auto_indexes = kw.pop('include_auto_indexes', False) + for row in pragma_indexes: # ignore implicit primary key index. # http://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html - elif (not include_auto_indexes and - row[1].startswith('sqlite_autoindex')): + if (not include_auto_indexes and + row[1].startswith('sqlite_autoindex')): continue indexes.append(dict(name=row[1], column_names=[], unique=row[2])) + # loop thru unique indexes to get the column names. for idx in indexes: - statement = "%sindex_info(%s)" % (pragma, quote(idx['name'])) - c = connection.execute(statement) - cols = idx['column_names'] - while True: - row = c.fetchone() - if row is None: - break - cols.append(row[2]) + pragma_index = self._get_table_pragma( + connection, "index_info", idx['name']) + + for row in pragma_index: + idx['column_names'].append(row[2]) return indexes @reflection.cache - def get_unique_constraints(self, connection, table_name, - schema=None, **kw): + def _get_table_sql(self, connection, table_name, schema=None, **kw): try: s = ("SELECT sql FROM " " (SELECT * FROM sqlite_master UNION ALL " @@ -1107,27 +1260,22 @@ class SQLiteDialect(default.DefaultDialect): s = ("SELECT sql FROM sqlite_master WHERE name = '%s' " "AND type = 'table'") % table_name rs = connection.execute(s) - row = rs.fetchone() - if row is None: - # sqlite won't return the schema for the sqlite_master or - # sqlite_temp_master tables from this query. These tables - # don't have any unique constraints anyway. - return [] - table_data = row[0] - - UNIQUE_PATTERN = 'CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)' - return [ - {'name': name, - 'column_names': [col.strip(' "') for col in cols.split(',')]} - for name, cols in re.findall(UNIQUE_PATTERN, table_data) - ] + return rs.scalar() - -def _pragma_cursor(cursor): - """work around SQLite issue whereby cursor.description - is blank when PRAGMA returns no rows.""" - - if cursor.closed: - cursor.fetchone = lambda: None - cursor.fetchall = lambda: [] - return cursor + def _get_table_pragma(self, connection, pragma, table_name, schema=None): + quote = self.identifier_preparer.quote_identifier + if schema is not None: + statement = "PRAGMA %s." % quote(schema) + else: + statement = "PRAGMA " + qtable = quote(table_name) + statement = "%s%s(%s)" % (statement, pragma, qtable) + cursor = connection.execute(statement) + if not cursor.closed: + # work around SQLite issue whereby cursor.description + # is blank when PRAGMA returns no rows: + # http://www.sqlite.org/cvstrac/tktview?tn=1884 + result = cursor.fetchall() + else: + result = [] + return result |