summaryrefslogtreecommitdiff
path: root/ZPsycopgDA/db.py
diff options
context:
space:
mode:
authorFederico Di Gregorio <fog@initd.org>2004-10-19 03:17:12 +0000
committerFederico Di Gregorio <fog@initd.org>2004-10-19 03:17:12 +0000
commitc904d97f696a665958c2cc43333d09c0e6357577 (patch)
treede88cb1cb6a48230f79bc0b532835d26a33660e9 /ZPsycopgDA/db.py
downloadpsycopg2-c904d97f696a665958c2cc43333d09c0e6357577.tar.gz
Initial psycopg 2 import after SVN crash.
Diffstat (limited to 'ZPsycopgDA/db.py')
-rw-r--r--ZPsycopgDA/db.py209
1 files changed, 209 insertions, 0 deletions
diff --git a/ZPsycopgDA/db.py b/ZPsycopgDA/db.py
new file mode 100644
index 0000000..c859535
--- /dev/null
+++ b/ZPsycopgDA/db.py
@@ -0,0 +1,209 @@
+# ZPsycopgDA/db.py - query execution
+#
+# Copyright (C) 2004 Federico Di Gregorio <fog@initd.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2, or (at your option) any later
+# version.
+#
+# Or, at your option this program (ZPsycopgDA) can be distributed under the
+# Zope Public License (ZPL) Version 1.0, as published on the Zope web site,
+# http://www.zope.org/Resources/ZPL.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
+# or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the LICENSE file for details.
+
+from Shared.DC.ZRDB.TM import TM
+from Shared.DC.ZRDB import dbi_db
+
+from ZODB.POSException import ConflictError
+
+import time
+import site
+import pool
+
+import psycopg
+from psycopg.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN
+from psycopg import NUMBER, STRING, ROWID, DATETIME
+
+
+
+# the DB object, managing all the real query work
+
+class DB(TM, dbi_db.DB):
+
+ _p_oid = _p_changed = _registered = None
+
+ def __init__(self, dsn, tilevel, enc='utf-8'):
+ self.dsn = dsn
+ self.tilevel = tilevel
+ self.encoding = enc
+ self.failures = 0
+ self.calls = 0
+
+ def getconn(self, create=True):
+ conn = pool.getconn(self.dsn)
+ conn.set_isolation_level(int(self.tilevel))
+ return conn
+
+ def putconn(self, close=False):
+ try:
+ conn = pool.getconn(self.dsn, False)
+ except AttributeError:
+ pass
+ pool.putconn(self.dsn, conn, close)
+
+ def getcursor(self):
+ conn = self.getconn()
+ return conn.cursor()
+
+ def _finish(self, *ignored):
+ try:
+ conn = self.getconn(False)
+ conn.commit()
+ self.putconn()
+ except AttributeError:
+ pass
+
+ def _abort(self, *ignored):
+ try:
+ conn = self.getconn(False)
+ conn.rollback()
+ self.putconn()
+ except AttributeError:
+ pass
+
+ def open(self):
+ # this will create a new pool for our DSN if not already existing,
+ # then get and immediately release a connection
+ self.getconn()
+ self.putconn()
+
+ def close(self):
+ # FIXME: if this connection is closed we flush all the pool associated
+ # with the current DSN; does this makes sense?
+ pool.flushpool(self.dsn)
+
+ def sortKey(self):
+ return 1
+
+ ## tables and rows ##
+
+ def tables(self, rdb=0, _care=('TABLE', 'VIEW')):
+ self._register()
+ c = self.getcursor()
+ c.execute(
+ "SELECT t.tablename AS NAME, 'TABLE' AS TYPE "
+ " FROM pg_tables t WHERE tableowner <> 'postgres' "
+ "UNION SELECT v.viewname AS NAME, 'VIEW' AS TYPE "
+ " FROM pg_views v WHERE viewowner <> 'postgres' "
+ "UNION SELECT t.tablename AS NAME, 'SYSTEM_TABLE\' AS TYPE "
+ " FROM pg_tables t WHERE tableowner = 'postgres' "
+ "UNION SELECT v.viewname AS NAME, 'SYSTEM_TABLE' AS TYPE "
+ "FROM pg_views v WHERE viewowner = 'postgres'")
+ res = []
+ for name, typ in c.fetchall():
+ if typ in _care:
+ res.append({'TABLE_NAME': name, 'TABLE_TYPE': typ})
+ self.putconn()
+ return res
+
+ def columns(self, table_name):
+ self._register()
+ c = self.getcursor()
+ try:
+ r = c.execute('SELECT * FROM "%s" WHERE 1=0' % table_name)
+ except:
+ return ()
+ res = []
+ for name, type, width, ds, p, scale, null_ok in c.description:
+ if type == NUMBER:
+ if type == INTEGER:
+ type = INTEGER
+ elif type == FLOAT:
+ type = FLOAT
+ else: type = NUMBER
+ elif type == BOOLEAN:
+ type = BOOLEAN
+ elif type == ROWID:
+ type = ROWID
+ elif type == DATETIME:
+ type = DATETIME
+ else:
+ type = STRING
+
+ res.append({'Name': name,
+ 'Type': type.name,
+ 'Precision': 0,
+ 'Scale': 0,
+ 'Nullable': 0})
+ self.putconn()
+ return res
+
+ ## query execution ##
+
+ def query(self, query_string, max_rows=None, query_data=None):
+ self._register()
+ self.calls = self.calls+1
+
+ desc = ()
+ res = []
+ nselects = 0
+
+ c = self.getcursor()
+
+ try:
+ for qs in [x for x in query_string.split('\0') if x]:
+ if type(qs) == unicode:
+ if self.encoding:
+ qs = qs.encode(self.encoding)
+ try:
+ if (query_data):
+ c.execute(qs, query_data)
+ else:
+ c.execute(qs)
+ except (psycopg.ProgrammingError, psycopg.IntegrityError), e:
+ if e.args[0].find("concurrent update") > -1:
+ raise ConflictError
+ raise e
+ if c.description is not None:
+ nselects += 1
+ if c.description != desc and nselects > 1:
+ raise psycopg.ProgrammingError(
+ 'multiple selects in single query not allowed')
+ if max_rows:
+ res = c.fetchmany(max_rows)
+ else:
+ res = c.fetchall()
+ desc = c.description
+ self.failures = 0
+
+ except StandardError, err:
+ self._abort()
+ raise err
+
+ items = []
+ for name, typ, width, ds, p, scale, null_ok in desc:
+ if typ == NUMBER:
+ if typ == INTEGER or typ == LONGINTEGER: typs = 'i'
+ else: typs = 'n'
+ elif typ == BOOLEAN:
+ typs = 'n'
+ elif typ == ROWID:
+ typs = 'i'
+ elif typ == DATETIME:
+ typs = 'd'
+ else:
+ typs = 's'
+ items.append({
+ 'name': name,
+ 'type': typs,
+ 'width': width,
+ 'null': null_ok,
+ })
+
+ return items, res