summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/databases/informix.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/databases/informix.py')
-rw-r--r--lib/sqlalchemy/databases/informix.py493
1 files changed, 0 insertions, 493 deletions
diff --git a/lib/sqlalchemy/databases/informix.py b/lib/sqlalchemy/databases/informix.py
deleted file mode 100644
index 4476af3b9..000000000
--- a/lib/sqlalchemy/databases/informix.py
+++ /dev/null
@@ -1,493 +0,0 @@
-# informix.py
-# Copyright (C) 2005,2006, 2007, 2008, 2009 Michael Bayer mike_mp@zzzcomputing.com
-#
-# coding: gbk
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-import datetime
-
-from sqlalchemy import sql, schema, exc, pool, util
-from sqlalchemy.sql import compiler
-from sqlalchemy.engine import default
-from sqlalchemy import types as sqltypes
-
-
-# for offset
-
-class informix_cursor(object):
- def __init__( self , con ):
- self.__cursor = con.cursor()
- self.rowcount = 0
-
- def offset( self , n ):
- if n > 0:
- self.fetchmany( n )
- self.rowcount = self.__cursor.rowcount - n
- if self.rowcount < 0:
- self.rowcount = 0
- else:
- self.rowcount = self.__cursor.rowcount
-
- def execute( self , sql , params ):
- if params is None or len( params ) == 0:
- params = []
-
- return self.__cursor.execute( sql , params )
-
- def __getattr__( self , name ):
- if name not in ( 'offset' , '__cursor' , 'rowcount' , '__del__' , 'execute' ):
- return getattr( self.__cursor , name )
-
-class InfoNumeric(sqltypes.Numeric):
- def get_col_spec(self):
- if not self.precision:
- return 'NUMERIC'
- else:
- return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale}
-
-class InfoInteger(sqltypes.Integer):
- def get_col_spec(self):
- return "INTEGER"
-
-class InfoSmallInteger(sqltypes.Smallinteger):
- def get_col_spec(self):
- return "SMALLINT"
-
-class InfoDate(sqltypes.Date):
- def get_col_spec( self ):
- return "DATE"
-
-class InfoDateTime(sqltypes.DateTime ):
- def get_col_spec(self):
- return "DATETIME YEAR TO SECOND"
-
- def bind_processor(self, dialect):
- def process(value):
- if value is not None:
- if value.microsecond:
- value = value.replace( microsecond = 0 )
- return value
- return process
-
-class InfoTime(sqltypes.Time ):
- def get_col_spec(self):
- return "DATETIME HOUR TO SECOND"
-
- def bind_processor(self, dialect):
- def process(value):
- if value is not None:
- if value.microsecond:
- value = value.replace( microsecond = 0 )
- return value
- return process
-
- def result_processor(self, dialect):
- def process(value):
- if isinstance( value , datetime.datetime ):
- return value.time()
- else:
- return value
- return process
-
-class InfoText(sqltypes.String):
- def get_col_spec(self):
- return "VARCHAR(255)"
-
-class InfoString(sqltypes.String):
- def get_col_spec(self):
- return "VARCHAR(%(length)s)" % {'length' : self.length}
-
- def bind_processor(self, dialect):
- def process(value):
- if value == '':
- return None
- else:
- return value
- return process
-
-class InfoChar(sqltypes.CHAR):
- def get_col_spec(self):
- return "CHAR(%(length)s)" % {'length' : self.length}
-
-class InfoBinary(sqltypes.Binary):
- def get_col_spec(self):
- return "BYTE"
-
-class InfoBoolean(sqltypes.Boolean):
- default_type = 'NUM'
- def get_col_spec(self):
- return "SMALLINT"
-
- def result_processor(self, dialect):
- def process(value):
- if value is None:
- return None
- return value and True or False
- return process
-
- def bind_processor(self, dialect):
- def process(value):
- if value is True:
- return 1
- elif value is False:
- return 0
- elif value is None:
- return None
- else:
- return value and True or False
- return process
-
-colspecs = {
- sqltypes.Integer : InfoInteger,
- sqltypes.Smallinteger : InfoSmallInteger,
- sqltypes.Numeric : InfoNumeric,
- sqltypes.Float : InfoNumeric,
- sqltypes.DateTime : InfoDateTime,
- sqltypes.Date : InfoDate,
- sqltypes.Time: InfoTime,
- sqltypes.String : InfoString,
- sqltypes.Binary : InfoBinary,
- sqltypes.Boolean : InfoBoolean,
- sqltypes.Text : InfoText,
- sqltypes.CHAR: InfoChar,
-}
-
-
-ischema_names = {
- 0 : InfoString, # CHAR
- 1 : InfoSmallInteger, # SMALLINT
- 2 : InfoInteger, # INT
- 3 : InfoNumeric, # Float
- 3 : InfoNumeric, # SmallFloat
- 5 : InfoNumeric, # DECIMAL
- 6 : InfoInteger, # Serial
- 7 : InfoDate, # DATE
- 8 : InfoNumeric, # MONEY
- 10 : InfoDateTime, # DATETIME
- 11 : InfoBinary, # BYTE
- 12 : InfoText, # TEXT
- 13 : InfoString, # VARCHAR
- 15 : InfoString, # NCHAR
- 16 : InfoString, # NVARCHAR
- 17 : InfoInteger, # INT8
- 18 : InfoInteger, # Serial8
- 43 : InfoString, # LVARCHAR
- -1 : InfoBinary, # BLOB
- -1 : InfoText, # CLOB
-}
-
-
-class InfoExecutionContext(default.DefaultExecutionContext):
- # cursor.sqlerrd
- # 0 - estimated number of rows returned
- # 1 - serial value after insert or ISAM error code
- # 2 - number of rows processed
- # 3 - estimated cost
- # 4 - offset of the error into the SQL statement
- # 5 - rowid after insert
- def post_exec(self):
- if getattr(self.compiled, "isinsert", False) and self.last_inserted_ids() is None:
- self._last_inserted_ids = [self.cursor.sqlerrd[1]]
- elif hasattr( self.compiled , 'offset' ):
- self.cursor.offset( self.compiled.offset )
- super(InfoExecutionContext, self).post_exec()
-
- def create_cursor( self ):
- return informix_cursor( self.connection.connection )
-
-class InfoDialect(default.DefaultDialect):
- name = 'informix'
- default_paramstyle = 'qmark'
- # for informix 7.31
- max_identifier_length = 18
-
- def __init__(self, use_ansi=True, **kwargs):
- self.use_ansi = use_ansi
- default.DefaultDialect.__init__(self, **kwargs)
-
- def dbapi(cls):
- import informixdb
- return informixdb
- dbapi = classmethod(dbapi)
-
- def is_disconnect(self, e):
- if isinstance(e, self.dbapi.OperationalError):
- return 'closed the connection' in str(e) or 'connection not open' in str(e)
- else:
- return False
-
- def do_begin(self , connect ):
- cu = connect.cursor()
- cu.execute( 'SET LOCK MODE TO WAIT' )
- #cu.execute( 'SET ISOLATION TO REPEATABLE READ' )
-
- def type_descriptor(self, typeobj):
- return sqltypes.adapt_type(typeobj, colspecs)
-
- def create_connect_args(self, url):
- if url.host:
- dsn = '%s@%s' % ( url.database , url.host )
- else:
- dsn = url.database
-
- if url.username:
- opt = { 'user':url.username , 'password': url.password }
- else:
- opt = {}
-
- return ([dsn], opt)
-
- def table_names(self, connection, schema):
- s = "select tabname from systables"
- return [row[0] for row in connection.execute(s)]
-
- def has_table(self, connection, table_name, schema=None):
- cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower() )
- return bool( cursor.fetchone() is not None )
-
- def reflecttable(self, connection, table, include_columns):
- c = connection.execute ("select distinct OWNER from systables where tabname=?", table.name.lower() )
- rows = c.fetchall()
- if not rows :
- raise exc.NoSuchTableError(table.name)
- else:
- if table.owner is not None:
- if table.owner.lower() in [r[0] for r in rows]:
- owner = table.owner.lower()
- else:
- raise AssertionError("Specified owner %s does not own table %s"%(table.owner, table.name))
- else:
- if len(rows)==1:
- owner = rows[0][0]
- else:
- raise AssertionError("There are multiple tables with name %s in the schema, you must specifie owner"%table.name)
-
- c = connection.execute ("""select colname , coltype , collength , t3.default , t1.colno from syscolumns as t1 , systables as t2 , OUTER sysdefaults as t3
- where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=?
- and t3.tabid = t2.tabid and t3.colno = t1.colno
- order by t1.colno""", table.name.lower(), owner )
- rows = c.fetchall()
-
- if not rows:
- raise exc.NoSuchTableError(table.name)
-
- for name , colattr , collength , default , colno in rows:
- name = name.lower()
- if include_columns and name not in include_columns:
- continue
-
- # in 7.31, coltype = 0x000
- # ^^-- column type
- # ^-- 1 not null , 0 null
- nullable , coltype = divmod( colattr , 256 )
- if coltype not in ( 0 , 13 ) and default:
- default = default.split()[-1]
-
- if coltype == 0 or coltype == 13: # char , varchar
- coltype = ischema_names.get(coltype, InfoString)(collength)
- if default:
- default = "'%s'" % default
- elif coltype == 5: # decimal
- precision , scale = ( collength & 0xFF00 ) >> 8 , collength & 0xFF
- if scale == 255:
- scale = 0
- coltype = InfoNumeric(precision, scale)
- else:
- try:
- coltype = ischema_names[coltype]
- except KeyError:
- util.warn("Did not recognize type '%s' of column '%s'" %
- (coltype, name))
- coltype = sqltypes.NULLTYPE
-
- colargs = []
- if default is not None:
- colargs.append(schema.DefaultClause(sql.text(default)))
-
- table.append_column(schema.Column(name, coltype, nullable = (nullable == 0), *colargs))
-
- # FK
- c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type ,
- t4.colname as local_column , t7.tabname as remote_table ,
- t6.colname as remote_column
- from sysconstraints as t1 , systables as t2 ,
- sysindexes as t3 , syscolumns as t4 ,
- sysreferences as t5 , syscolumns as t6 , systables as t7 ,
- sysconstraints as t8 , sysindexes as t9
- where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'R'
- and t3.tabid = t2.tabid and t3.idxname = t1.idxname
- and t4.tabid = t2.tabid and t4.colno = t3.part1
- and t5.constrid = t1.constrid and t8.constrid = t5.primary
- and t6.tabid = t5.ptabid and t6.colno = t9.part1 and t9.idxname = t8.idxname
- and t7.tabid = t5.ptabid""", table.name.lower(), owner )
- rows = c.fetchall()
- fks = {}
- for cons_name, cons_type, local_column, remote_table, remote_column in rows:
- try:
- fk = fks[cons_name]
- except KeyError:
- fk = ([], [])
- fks[cons_name] = fk
- refspec = ".".join([remote_table, remote_column])
- schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection)
- if local_column not in fk[0]:
- fk[0].append(local_column)
- if refspec not in fk[1]:
- fk[1].append(refspec)
-
- for name, value in fks.iteritems():
- table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1] , None, link_to_name=True ))
-
- # PK
- c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type ,
- t4.colname as local_column
- from sysconstraints as t1 , systables as t2 ,
- sysindexes as t3 , syscolumns as t4
- where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'P'
- and t3.tabid = t2.tabid and t3.idxname = t1.idxname
- and t4.tabid = t2.tabid and t4.colno = t3.part1""", table.name.lower(), owner )
- rows = c.fetchall()
- for cons_name, cons_type, local_column in rows:
- table.primary_key.add( table.c[local_column] )
-
-class InfoCompiler(compiler.DefaultCompiler):
- """Info compiler modifies the lexical structure of Select statements to work under
- non-ANSI configured Oracle databases, if the use_ansi flag is False."""
-
- def __init__(self, *args, **kwargs):
- self.limit = 0
- self.offset = 0
-
- compiler.DefaultCompiler.__init__( self , *args, **kwargs )
-
- def default_from(self):
- return " from systables where tabname = 'systables' "
-
- def get_select_precolumns( self , select ):
- s = select._distinct and "DISTINCT " or ""
- # only has limit
- if select._limit:
- off = select._offset or 0
- s += " FIRST %s " % ( select._limit + off )
- else:
- s += ""
- return s
-
- def visit_select(self, select):
- if select._offset:
- self.offset = select._offset
- self.limit = select._limit or 0
- # the column in order by clause must in select too
-
- def __label( c ):
- try:
- return c._label.lower()
- except:
- return ''
-
- # TODO: dont modify the original select, generate a new one
- a = [ __label(c) for c in select._raw_columns ]
- for c in select._order_by_clause.clauses:
- if ( __label(c) not in a ):
- select.append_column( c )
-
- return compiler.DefaultCompiler.visit_select(self, select)
-
- def limit_clause(self, select):
- return ""
-
- def visit_function( self , func ):
- if func.name.lower() == 'current_date':
- return "today"
- elif func.name.lower() == 'current_time':
- return "CURRENT HOUR TO SECOND"
- elif func.name.lower() in ( 'current_timestamp' , 'now' ):
- return "CURRENT YEAR TO SECOND"
- else:
- return compiler.DefaultCompiler.visit_function( self , func )
-
- def visit_clauselist(self, list, **kwargs):
- return ', '.join([s for s in [self.process(c) for c in list.clauses] if s is not None])
-
-class InfoSchemaGenerator(compiler.SchemaGenerator):
- def get_column_specification(self, column, first_pk=False):
- colspec = self.preparer.format_column(column)
- if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and \
- isinstance(column.type, sqltypes.Integer) and not getattr( self , 'has_serial' , False ) and first_pk:
- colspec += " SERIAL"
- self.has_serial = True
- else:
- colspec += " " + column.type.dialect_impl(self.dialect).get_col_spec()
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
-
- if not column.nullable:
- colspec += " NOT NULL"
-
- return colspec
-
- def post_create_table(self, table):
- if hasattr( self , 'has_serial' ):
- del self.has_serial
- return ''
-
- def visit_primary_key_constraint(self, constraint):
- # for informix 7.31 not support constraint name
- name = constraint.name
- constraint.name = None
- super(InfoSchemaGenerator, self).visit_primary_key_constraint(constraint)
- constraint.name = name
-
- def visit_unique_constraint(self, constraint):
- # for informix 7.31 not support constraint name
- name = constraint.name
- constraint.name = None
- super(InfoSchemaGenerator, self).visit_unique_constraint(constraint)
- constraint.name = name
-
- def visit_foreign_key_constraint( self , constraint ):
- if constraint.name is not None:
- constraint.use_alter = True
- else:
- super( InfoSchemaGenerator , self ).visit_foreign_key_constraint( constraint )
-
- def define_foreign_key(self, constraint):
- # for informix 7.31 not support constraint name
- if constraint.use_alter:
- name = constraint.name
- constraint.name = None
- self.append( "CONSTRAINT " )
- super(InfoSchemaGenerator, self).define_foreign_key(constraint)
- constraint.name = name
- if name is not None:
- self.append( " CONSTRAINT " + name )
- else:
- super(InfoSchemaGenerator, self).define_foreign_key(constraint)
-
- def visit_index(self, index):
- if len( index.columns ) == 1 and index.columns[0].foreign_key:
- return
- super(InfoSchemaGenerator, self).visit_index(index)
-
-class InfoIdentifierPreparer(compiler.IdentifierPreparer):
- def __init__(self, dialect):
- super(InfoIdentifierPreparer, self).__init__(dialect, initial_quote="'")
-
- def _requires_quotes(self, value):
- return False
-
-class InfoSchemaDropper(compiler.SchemaDropper):
- def drop_foreignkey(self, constraint):
- if constraint.name is not None:
- super( InfoSchemaDropper , self ).drop_foreignkey( constraint )
-
-dialect = InfoDialect
-poolclass = pool.SingletonThreadPool
-dialect.statement_compiler = InfoCompiler
-dialect.schemagenerator = InfoSchemaGenerator
-dialect.schemadropper = InfoSchemaDropper
-dialect.preparer = InfoIdentifierPreparer
-dialect.execution_ctx_cls = InfoExecutionContext \ No newline at end of file