diff options
| author | Federico Di Gregorio <fog@initd.org> | 2004-10-19 03:17:12 +0000 |
|---|---|---|
| committer | Federico Di Gregorio <fog@initd.org> | 2004-10-19 03:17:12 +0000 |
| commit | c904d97f696a665958c2cc43333d09c0e6357577 (patch) | |
| tree | de88cb1cb6a48230f79bc0b532835d26a33660e9 /ZPsycopgDA/db.py | |
| download | psycopg2-c904d97f696a665958c2cc43333d09c0e6357577.tar.gz | |
Initial psycopg 2 import after SVN crash.
Diffstat (limited to 'ZPsycopgDA/db.py')
| -rw-r--r-- | ZPsycopgDA/db.py | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/ZPsycopgDA/db.py b/ZPsycopgDA/db.py new file mode 100644 index 0000000..c859535 --- /dev/null +++ b/ZPsycopgDA/db.py @@ -0,0 +1,209 @@ +# ZPsycopgDA/db.py - query execution +# +# Copyright (C) 2004 Federico Di Gregorio <fog@initd.org> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2, or (at your option) any later +# version. +# +# Or, at your option this program (ZPsycopgDA) can be distributed under the +# Zope Public License (ZPL) Version 1.0, as published on the Zope web site, +# http://www.zope.org/Resources/ZPL. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. +# +# See the LICENSE file for details. + +from Shared.DC.ZRDB.TM import TM +from Shared.DC.ZRDB import dbi_db + +from ZODB.POSException import ConflictError + +import time +import site +import pool + +import psycopg +from psycopg.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN +from psycopg import NUMBER, STRING, ROWID, DATETIME + + + +# the DB object, managing all the real query work + +class DB(TM, dbi_db.DB): + + _p_oid = _p_changed = _registered = None + + def __init__(self, dsn, tilevel, enc='utf-8'): + self.dsn = dsn + self.tilevel = tilevel + self.encoding = enc + self.failures = 0 + self.calls = 0 + + def getconn(self, create=True): + conn = pool.getconn(self.dsn) + conn.set_isolation_level(int(self.tilevel)) + return conn + + def putconn(self, close=False): + try: + conn = pool.getconn(self.dsn, False) + except AttributeError: + pass + pool.putconn(self.dsn, conn, close) + + def getcursor(self): + conn = self.getconn() + return conn.cursor() + + def _finish(self, *ignored): + try: + conn = self.getconn(False) + conn.commit() + self.putconn() + except AttributeError: + pass + + def _abort(self, *ignored): + try: + conn = self.getconn(False) + conn.rollback() + self.putconn() + except AttributeError: + pass + + def open(self): + # this will create a new pool for our DSN if not already existing, + # then get and immediately release a connection + self.getconn() + self.putconn() + + def close(self): + # FIXME: if this connection is closed we flush all the pool associated + # with the current DSN; does this makes sense? + pool.flushpool(self.dsn) + + def sortKey(self): + return 1 + + ## tables and rows ## + + def tables(self, rdb=0, _care=('TABLE', 'VIEW')): + self._register() + c = self.getcursor() + c.execute( + "SELECT t.tablename AS NAME, 'TABLE' AS TYPE " + " FROM pg_tables t WHERE tableowner <> 'postgres' " + "UNION SELECT v.viewname AS NAME, 'VIEW' AS TYPE " + " FROM pg_views v WHERE viewowner <> 'postgres' " + "UNION SELECT t.tablename AS NAME, 'SYSTEM_TABLE\' AS TYPE " + " FROM pg_tables t WHERE tableowner = 'postgres' " + "UNION SELECT v.viewname AS NAME, 'SYSTEM_TABLE' AS TYPE " + "FROM pg_views v WHERE viewowner = 'postgres'") + res = [] + for name, typ in c.fetchall(): + if typ in _care: + res.append({'TABLE_NAME': name, 'TABLE_TYPE': typ}) + self.putconn() + return res + + def columns(self, table_name): + self._register() + c = self.getcursor() + try: + r = c.execute('SELECT * FROM "%s" WHERE 1=0' % table_name) + except: + return () + res = [] + for name, type, width, ds, p, scale, null_ok in c.description: + if type == NUMBER: + if type == INTEGER: + type = INTEGER + elif type == FLOAT: + type = FLOAT + else: type = NUMBER + elif type == BOOLEAN: + type = BOOLEAN + elif type == ROWID: + type = ROWID + elif type == DATETIME: + type = DATETIME + else: + type = STRING + + res.append({'Name': name, + 'Type': type.name, + 'Precision': 0, + 'Scale': 0, + 'Nullable': 0}) + self.putconn() + return res + + ## query execution ## + + def query(self, query_string, max_rows=None, query_data=None): + self._register() + self.calls = self.calls+1 + + desc = () + res = [] + nselects = 0 + + c = self.getcursor() + + try: + for qs in [x for x in query_string.split('\0') if x]: + if type(qs) == unicode: + if self.encoding: + qs = qs.encode(self.encoding) + try: + if (query_data): + c.execute(qs, query_data) + else: + c.execute(qs) + except (psycopg.ProgrammingError, psycopg.IntegrityError), e: + if e.args[0].find("concurrent update") > -1: + raise ConflictError + raise e + if c.description is not None: + nselects += 1 + if c.description != desc and nselects > 1: + raise psycopg.ProgrammingError( + 'multiple selects in single query not allowed') + if max_rows: + res = c.fetchmany(max_rows) + else: + res = c.fetchall() + desc = c.description + self.failures = 0 + + except StandardError, err: + self._abort() + raise err + + items = [] + for name, typ, width, ds, p, scale, null_ok in desc: + if typ == NUMBER: + if typ == INTEGER or typ == LONGINTEGER: typs = 'i' + else: typs = 'n' + elif typ == BOOLEAN: + typs = 'n' + elif typ == ROWID: + typs = 'i' + elif typ == DATETIME: + typs = 'd' + else: + typs = 's' + items.append({ + 'name': name, + 'type': typs, + 'width': width, + 'null': null_ok, + }) + + return items, res |
