# informix.py # Copyright (C) 2005,2006, 2007, 2008, 2009 Michael Bayer mike_mp@zzzcomputing.com # # coding: gbk # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php import datetime from sqlalchemy import sql, schema, exc, pool, util from sqlalchemy.sql import compiler from sqlalchemy.engine import default from sqlalchemy import types as sqltypes # for offset class informix_cursor(object): def __init__( self , con ): self.__cursor = con.cursor() self.rowcount = 0 def offset( self , n ): if n > 0: self.fetchmany( n ) self.rowcount = self.__cursor.rowcount - n if self.rowcount < 0: self.rowcount = 0 else: self.rowcount = self.__cursor.rowcount def execute( self , sql , params ): if params is None or len( params ) == 0: params = [] return self.__cursor.execute( sql , params ) def __getattr__( self , name ): if name not in ( 'offset' , '__cursor' , 'rowcount' , '__del__' , 'execute' ): return getattr( self.__cursor , name ) class InfoNumeric(sqltypes.Numeric): def get_col_spec(self): if not self.precision: return 'NUMERIC' else: return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale} class InfoInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER" class InfoSmallInteger(sqltypes.Smallinteger): def get_col_spec(self): return "SMALLINT" class InfoDate(sqltypes.Date): def get_col_spec( self ): return "DATE" class InfoDateTime(sqltypes.DateTime ): def get_col_spec(self): return "DATETIME YEAR TO SECOND" def bind_processor(self, dialect): def process(value): if value is not None: if value.microsecond: value = value.replace( microsecond = 0 ) return value return process class InfoTime(sqltypes.Time ): def get_col_spec(self): return "DATETIME HOUR TO SECOND" def bind_processor(self, dialect): def process(value): if value is not None: if value.microsecond: value = value.replace( microsecond = 0 ) return value return process def result_processor(self, dialect): def process(value): if isinstance( value , datetime.datetime ): return value.time() else: return value return process class InfoText(sqltypes.String): def get_col_spec(self): return "VARCHAR(255)" class InfoString(sqltypes.String): def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length} def bind_processor(self, dialect): def process(value): if value == '': return None else: return value return process class InfoChar(sqltypes.CHAR): def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length} class InfoBinary(sqltypes.Binary): def get_col_spec(self): return "BYTE" class InfoBoolean(sqltypes.Boolean): default_type = 'NUM' def get_col_spec(self): return "SMALLINT" def result_processor(self, dialect): def process(value): if value is None: return None return value and True or False return process def bind_processor(self, dialect): def process(value): if value is True: return 1 elif value is False: return 0 elif value is None: return None else: return value and True or False return process colspecs = { sqltypes.Integer : InfoInteger, sqltypes.Smallinteger : InfoSmallInteger, sqltypes.Numeric : InfoNumeric, sqltypes.Float : InfoNumeric, sqltypes.DateTime : InfoDateTime, sqltypes.Date : InfoDate, sqltypes.Time: InfoTime, sqltypes.String : InfoString, sqltypes.Binary : InfoBinary, sqltypes.Boolean : InfoBoolean, sqltypes.Text : InfoText, sqltypes.CHAR: InfoChar, } ischema_names = { 0 : InfoString, # CHAR 1 : InfoSmallInteger, # SMALLINT 2 : InfoInteger, # INT 3 : InfoNumeric, # Float 3 : InfoNumeric, # SmallFloat 5 : InfoNumeric, # DECIMAL 6 : InfoInteger, # Serial 7 : InfoDate, # DATE 8 : InfoNumeric, # MONEY 10 : InfoDateTime, # DATETIME 11 : InfoBinary, # BYTE 12 : InfoText, # TEXT 13 : InfoString, # VARCHAR 15 : InfoString, # NCHAR 16 : InfoString, # NVARCHAR 17 : InfoInteger, # INT8 18 : InfoInteger, # Serial8 43 : InfoString, # LVARCHAR -1 : InfoBinary, # BLOB -1 : InfoText, # CLOB } class InfoExecutionContext(default.DefaultExecutionContext): # cursor.sqlerrd # 0 - estimated number of rows returned # 1 - serial value after insert or ISAM error code # 2 - number of rows processed # 3 - estimated cost # 4 - offset of the error into the SQL statement # 5 - rowid after insert def post_exec(self): if getattr(self.compiled, "isinsert", False) and self.last_inserted_ids() is None: self._last_inserted_ids = [self.cursor.sqlerrd[1]] elif hasattr( self.compiled , 'offset' ): self.cursor.offset( self.compiled.offset ) super(InfoExecutionContext, self).post_exec() def create_cursor( self ): return informix_cursor( self.connection.connection ) class InfoDialect(default.DefaultDialect): name = 'informix' default_paramstyle = 'qmark' # for informix 7.31 max_identifier_length = 18 def __init__(self, use_ansi=True, **kwargs): self.use_ansi = use_ansi default.DefaultDialect.__init__(self, **kwargs) def dbapi(cls): import informixdb return informixdb dbapi = classmethod(dbapi) def is_disconnect(self, e): if isinstance(e, self.dbapi.OperationalError): return 'closed the connection' in str(e) or 'connection not open' in str(e) else: return False def do_begin(self , connect ): cu = connect.cursor() cu.execute( 'SET LOCK MODE TO WAIT' ) #cu.execute( 'SET ISOLATION TO REPEATABLE READ' ) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) def create_connect_args(self, url): if url.host: dsn = '%s@%s' % ( url.database , url.host ) else: dsn = url.database if url.username: opt = { 'user':url.username , 'password': url.password } else: opt = {} return ([dsn], opt) def table_names(self, connection, schema): s = "select tabname from systables" return [row[0] for row in connection.execute(s)] def has_table(self, connection, table_name, schema=None): cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower() ) return bool( cursor.fetchone() is not None ) def reflecttable(self, connection, table, include_columns): c = connection.execute ("select distinct OWNER from systables where tabname=?", table.name.lower() ) rows = c.fetchall() if not rows : raise exc.NoSuchTableError(table.name) else: if table.owner is not None: if table.owner.lower() in [r[0] for r in rows]: owner = table.owner.lower() else: raise AssertionError("Specified owner %s does not own table %s"%(table.owner, table.name)) else: if len(rows)==1: owner = rows[0][0] else: raise AssertionError("There are multiple tables with name %s in the schema, you must specifie owner"%table.name) c = connection.execute ("""select colname , coltype , collength , t3.default , t1.colno from syscolumns as t1 , systables as t2 , OUTER sysdefaults as t3 where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t3.tabid = t2.tabid and t3.colno = t1.colno order by t1.colno""", table.name.lower(), owner ) rows = c.fetchall() if not rows: raise exc.NoSuchTableError(table.name) for name , colattr , collength , default , colno in rows: name = name.lower() if include_columns and name not in include_columns: continue # in 7.31, coltype = 0x000 # ^^-- column type # ^-- 1 not null , 0 null nullable , coltype = divmod( colattr , 256 ) if coltype not in ( 0 , 13 ) and default: default = default.split()[-1] if coltype == 0 or coltype == 13: # char , varchar coltype = ischema_names.get(coltype, InfoString)(collength) if default: default = "'%s'" % default elif coltype == 5: # decimal precision , scale = ( collength & 0xFF00 ) >> 8 , collength & 0xFF if scale == 255: scale = 0 coltype = InfoNumeric(precision, scale) else: try: coltype = ischema_names[coltype] except KeyError: util.warn("Did not recognize type '%s' of column '%s'" % (coltype, name)) coltype = sqltypes.NULLTYPE colargs = [] if default is not None: colargs.append(schema.DefaultClause(sql.text(default))) table.append_column(schema.Column(name, coltype, nullable = (nullable == 0), *colargs)) # FK c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type , t4.colname as local_column , t7.tabname as remote_table , t6.colname as remote_column from sysconstraints as t1 , systables as t2 , sysindexes as t3 , syscolumns as t4 , sysreferences as t5 , syscolumns as t6 , systables as t7 , sysconstraints as t8 , sysindexes as t9 where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'R' and t3.tabid = t2.tabid and t3.idxname = t1.idxname and t4.tabid = t2.tabid and t4.colno = t3.part1 and t5.constrid = t1.constrid and t8.constrid = t5.primary and t6.tabid = t5.ptabid and t6.colno = t9.part1 and t9.idxname = t8.idxname and t7.tabid = t5.ptabid""", table.name.lower(), owner ) rows = c.fetchall() fks = {} for cons_name, cons_type, local_column, remote_table, remote_column in rows: try: fk = fks[cons_name] except KeyError: fk = ([], []) fks[cons_name] = fk refspec = ".".join([remote_table, remote_column]) schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection) if local_column not in fk[0]: fk[0].append(local_column) if refspec not in fk[1]: fk[1].append(refspec) for name, value in fks.iteritems(): table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1] , None, link_to_name=True )) # PK c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type , t4.colname as local_column from sysconstraints as t1 , systables as t2 , sysindexes as t3 , syscolumns as t4 where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'P' and t3.tabid = t2.tabid and t3.idxname = t1.idxname and t4.tabid = t2.tabid and t4.colno = t3.part1""", table.name.lower(), owner ) rows = c.fetchall() for cons_name, cons_type, local_column in rows: table.primary_key.add( table.c[local_column] ) class InfoCompiler(compiler.DefaultCompiler): """Info compiler modifies the lexical structure of Select statements to work under non-ANSI configured Oracle databases, if the use_ansi flag is False.""" def __init__(self, *args, **kwargs): self.limit = 0 self.offset = 0 compiler.DefaultCompiler.__init__( self , *args, **kwargs ) def default_from(self): return " from systables where tabname = 'systables' " def get_select_precolumns( self , select ): s = select._distinct and "DISTINCT " or "" # only has limit if select._limit: off = select._offset or 0 s += " FIRST %s " % ( select._limit + off ) else: s += "" return s def visit_select(self, select): if select._offset: self.offset = select._offset self.limit = select._limit or 0 # the column in order by clause must in select too def __label( c ): try: return c._label.lower() except: return '' # TODO: dont modify the original select, generate a new one a = [ __label(c) for c in select._raw_columns ] for c in select._order_by_clause.clauses: if ( __label(c) not in a ): select.append_column( c ) return compiler.DefaultCompiler.visit_select(self, select) def limit_clause(self, select): return "" def visit_function( self , func ): if func.name.lower() == 'current_date': return "today" elif func.name.lower() == 'current_time': return "CURRENT HOUR TO SECOND" elif func.name.lower() in ( 'current_timestamp' , 'now' ): return "CURRENT YEAR TO SECOND" else: return compiler.DefaultCompiler.visit_function( self , func ) def visit_clauselist(self, list, **kwargs): return ', '.join([s for s in [self.process(c) for c in list.clauses] if s is not None]) class InfoSchemaGenerator(compiler.SchemaGenerator): def get_column_specification(self, column, first_pk=False): colspec = self.preparer.format_column(column) if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and \ isinstance(column.type, sqltypes.Integer) and not getattr( self , 'has_serial' , False ) and first_pk: colspec += " SERIAL" self.has_serial = True else: colspec += " " + column.type.dialect_impl(self.dialect).get_col_spec() default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" return colspec def post_create_table(self, table): if hasattr( self , 'has_serial' ): del self.has_serial return '' def visit_primary_key_constraint(self, constraint): # for informix 7.31 not support constraint name name = constraint.name constraint.name = None super(InfoSchemaGenerator, self).visit_primary_key_constraint(constraint) constraint.name = name def visit_unique_constraint(self, constraint): # for informix 7.31 not support constraint name name = constraint.name constraint.name = None super(InfoSchemaGenerator, self).visit_unique_constraint(constraint) constraint.name = name def visit_foreign_key_constraint( self , constraint ): if constraint.name is not None: constraint.use_alter = True else: super( InfoSchemaGenerator , self ).visit_foreign_key_constraint( constraint ) def define_foreign_key(self, constraint): # for informix 7.31 not support constraint name if constraint.use_alter: name = constraint.name constraint.name = None self.append( "CONSTRAINT " ) super(InfoSchemaGenerator, self).define_foreign_key(constraint) constraint.name = name if name is not None: self.append( " CONSTRAINT " + name ) else: super(InfoSchemaGenerator, self).define_foreign_key(constraint) def visit_index(self, index): if len( index.columns ) == 1 and index.columns[0].foreign_key: return super(InfoSchemaGenerator, self).visit_index(index) class InfoIdentifierPreparer(compiler.IdentifierPreparer): def __init__(self, dialect): super(InfoIdentifierPreparer, self).__init__(dialect, initial_quote="'") def _requires_quotes(self, value): return False class InfoSchemaDropper(compiler.SchemaDropper): def drop_foreignkey(self, constraint): if constraint.name is not None: super( InfoSchemaDropper , self ).drop_foreignkey( constraint ) dialect = InfoDialect poolclass = pool.SingletonThreadPool dialect.statement_compiler = InfoCompiler dialect.schemagenerator = InfoSchemaGenerator dialect.schemadropper = InfoSchemaDropper dialect.preparer = InfoIdentifierPreparer dialect.execution_ctx_cls = InfoExecutionContext