diff options
author | Federico Di Gregorio <fog@initd.org> | 2005-03-01 16:41:02 +0000 |
---|---|---|
committer | Federico Di Gregorio <fog@initd.org> | 2005-03-01 16:41:02 +0000 |
commit | e5f558a6be623ab09f3517a0ff03f94610d18c0e (patch) | |
tree | fe015778906681a05cf26e3bb0509ed86459acbe | |
parent | 3141770f5332008165ea5f74057a9932454bf655 (diff) | |
download | psycopg2-e5f558a6be623ab09f3517a0ff03f94610d18c0e.tar.gz |
COPY FROM works.
-rw-r--r-- | ChangeLog | 15 | ||||
-rw-r--r-- | examples/binary.py | 3 | ||||
-rw-r--r-- | examples/copy_from.py | 178 | ||||
-rw-r--r-- | lib/extras.py | 1 | ||||
-rw-r--r-- | psycopg/config.h | 5 | ||||
-rw-r--r-- | psycopg/connection_type.c | 4 | ||||
-rw-r--r-- | psycopg/cursor_type.c | 53 | ||||
-rw-r--r-- | psycopg/pqpath.c | 62 | ||||
-rw-r--r-- | setup.cfg | 3 | ||||
-rw-r--r-- | setup.py | 32 |
10 files changed, 321 insertions, 35 deletions
@@ -1,5 +1,17 @@ +2005-03-02 Federico Di Gregorio <fog@debian.org> + + * COPY FROM implemented using both old and new (v3) protocol. + + * psycopg/config.h (Dprintf): declaration for asprintf is gone. + + * psycopg/pqpath.c (_pq_copy_in_v3): implemented. + 2005-03-01 Federico Di Gregorio <fog@debian.org> + * setup.py: now we generate a slighly more verbose version string + that embeds some of the compile options, to facilitate users' bug + reports. + * psycopg/cursor_type.c (psyco_curs_copy_from): we now use PyOS_snprintf instead of asprintf. On some platforms this can be bad (win32).. if that's your case, get a better platform. :/ @@ -57,7 +69,8 @@ 2005-01-13 Federico Di Gregorio <fog@debian.org> - * ZPsycopgDA/db.py (DB.query): ported ZPsycopgDA connection fix + * ZPsycopgDA/db.py (DB.query + ): ported ZPsycopgDA connection fix from psycopg 1.1. * lib/*.py: added pydoc-friendly messages. diff --git a/examples/binary.py b/examples/binary.py index 3b543e5..f15a5fa 100644 --- a/examples/binary.py +++ b/examples/binary.py @@ -18,7 +18,8 @@ DSN = 'dbname=test' ## don't modify anything below tis line (except for experimenting) -import sys, psycopg +import sys +import psycopg if len(sys.argv) > 1: DSN = sys.argv[1] diff --git a/examples/copy_from.py b/examples/copy_from.py new file mode 100644 index 0000000..125be7c --- /dev/null +++ b/examples/copy_from.py @@ -0,0 +1,178 @@ +# copy_from.py -- example about copy_from +# +# Copyright (C) 2002 Tom Jenkins <tjenkins@devis.com> +# Copyright (C) 2005 Federico Di Gregorio <fog@initd.org> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# + +## put in DSN your DSN string + +DSN = 'dbname=test' + +## don't modify anything below tis line (except for experimenting) + +import sys +import os +import StringIO +import psycopg + +if len(sys.argv) > 1: + DSN = sys.argv[1] + +print "Opening connection using dns:", DSN +conn = psycopg.connect(DSN) +print "Encoding for this connection is", conn.encoding + +curs = conn.cursor() +try: + curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)") +except: + conn.rollback() + curs.execute("DROP TABLE test_copy") + curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)") +conn.commit() + +# copy_from with default arguments, from open file + +io = open('copy_from.txt', 'wr') +data = ['Tom\tJenkins\t37\n', + 'Madonna\t\N\t45\n', + 'Federico\tDi Gregorio\t\N\n'] +io.writelines(data) +io.close() + +io = open('copy_from.txt', 'r') +curs.copy_from(io, 'test_copy') +print "1) Copy %d records from file object " % len(data) + \ + "using defaults (sep: \\t and null = \\N)" +io.close() + +curs.execute("SELECT * FROM test_copy") +rows = curs.fetchall() +print " Select returned %d rows" % len(rows) + +for r in rows: + print " %s %s\t%s" % (r[0], r[1], r[2]) +curs.execute("delete from test_copy") +conn.commit() + +# copy_from using custom separator, from open file + +io = open('copy_from.txt', 'wr') +data = ['Tom:Jenkins:37\n', + 'Madonna:\N:45\n', + 'Federico:Di Gregorio:\N\n'] +io.writelines(data) +io.close() + +io = open('copy_from.txt', 'r') +curs.copy_from(io, 'test_copy', ':') +print "2) Copy %d records from file object using sep = :" % len(data) +io.close() + +curs.execute("SELECT * FROM test_copy") +rows = curs.fetchall() +print " Select returned %d rows" % len(rows) + +for r in rows: + print " %s %s\t%s" % (r[0], r[1], r[2]) +curs.execute("delete from test_copy") +conn.commit() + +# copy_from using custom null identifier, from open file + +io = open('copy_from.txt', 'wr') +data = ['Tom\tJenkins\t37\n', + 'Madonna\tNULL\t45\n', + 'Federico\tDi Gregorio\tNULL\n'] +io.writelines(data) +io.close() + +io = open('copy_from.txt', 'r') +curs.copy_from(io, 'test_copy', null='NULL') +print "3) Copy %d records from file object using null = NULL" % len(data) +io.close() + +curs.execute("SELECT * FROM test_copy") +rows = curs.fetchall() +print " Select using cursor returned %d rows" % len(rows) + +for r in rows: + print " %s %s\t%s" % (r[0], r[1], r[2]) +curs.execute("delete from test_copy") +conn.commit() + +# copy_from using custom separator and null identifier + +io = open('copy_from.txt', 'wr') +data = ['Tom:Jenkins:37\n', 'Madonna:NULL:45\n', 'Federico:Di Gregorio:NULL\n'] +io.writelines(data) +io.close() + +io = open('copy_from.txt', 'r') +curs.copy_from(io, 'test_copy', ':', 'NULL') +print "4) Copy %d records from file object " % len(data) + \ + "using sep = : and null = NULL" +io.close() + +curs.execute("SELECT * FROM test_copy") +rows = curs.fetchall() +print " Select using cursor returned %d rows" % len(rows) + +for r in rows: + print " %s %s\t%s" % (r[0], r[1], r[2]) +curs.execute("delete from test_copy") +conn.commit() + +# anything can be used as a file if it has .read() and .readline() methods + +data = StringIO.StringIO() +data.write('\n'.join(['Tom\tJenkins\t37', + 'Madonna\t\N\t45', + 'Federico\tDi Gregorio\t\N'])) +data.seek(0) + +curs.copy_from(data, 'test_copy') +print "5) Copy 3 records from StringIO object using defaults" + +curs.execute("SELECT * FROM test_copy") +rows = curs.fetchall() +print " Select using cursor returned %d rows" % len(rows) + +for r in rows: + print " %s %s\t%s" % (r[0], r[1], r[2]) +curs.execute("delete from test_copy") +conn.commit() + +# simple error test + +print "6) About to raise an error" +data = StringIO.StringIO() +data.write('\n'.join(['Tom\tJenkins\t37', + 'Madonna\t\N\t45', + 'Federico\tDi Gregorio\taaa'])) +data.seek(0) + +try: + curs.copy_from(data, 'test_copy') +except StandardError, err: + conn.rollback() + print " Catched error (as expected):\n", err + +conn.rollback() + +curs.execute("DROP TABLE test_copy") +os.unlink('copy_from.txt') +conn.commit() + + + diff --git a/lib/extras.py b/lib/extras.py index 1d523d9..2357447 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -57,6 +57,7 @@ class DictCursor(_cursor): self._build_index() return res + class DictRow(list): """A row object that allow by-colun-name access to data.""" diff --git a/psycopg/config.h b/psycopg/config.h index cc15ad8..dff79a7 100644 --- a/psycopg/config.h +++ b/psycopg/config.h @@ -22,11 +22,6 @@ #ifndef PSYCOPG_CONFIG_H #define PSYCOPG_CONFIG_H 1 -/* replacement for asprintf() */ -#ifndef HAVE_ASPRINTF -extern int asprintf(char **buffer, char *fmt, ...); -#endif - /* debug printf-like function */ #if defined( __GNUC__) && !defined(__APPLE__) #ifdef PSYCOPG_DEBUG diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index aab39a7..131fc9c 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -134,7 +134,7 @@ psyco_conn_rollback(connectionObject *self, PyObject *args) - +#ifdef PSYCOPG_EXTENSIONS /* set_isolation_level method - switch connection isolation level */ #define psyco_conn_set_isolation_level_doc \ @@ -186,7 +186,7 @@ psyco_conn_set_client_encoding(connectionObject *self, PyObject *args) return NULL; } } - +#endif /** the connection object **/ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 8789b4b..95729b0 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -901,6 +901,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) Py_INCREF(Py_None); return Py_None; + } @@ -910,42 +911,51 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) /* extension: copy_from - implements COPY FROM */ #define psyco_curs_copy_from_doc \ -"copy_from(file, table, sep='\\t', null='NULL') -> copy file to table." +"copy_from(file, table, sep='\\t', null='\\N') -> copy file to table." static int -_psyco_curs_has_write_check(PyObject* o, void* var) +_psyco_curs_has_read_check(PyObject* o, void* var) { - if (PyObject_HasAttrString(o, "write")) { + if (PyObject_HasAttrString(o, "readline") + && PyObject_HasAttrString(o, "read")) { Py_INCREF(o); *((PyObject**)var) = o; return 1; } else { PyErr_SetString(PyExc_TypeError, - "argument 1 must have a .write() method"); + "argument 1 must have both .read() and .readline() methods"); return 0; } } static PyObject * -psyco_curs_copy_from(cursorObject *self, PyObject *args) +psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs) { char query[256]; char *table_name; - char *sep = "\t", *null ="NULL"; + char *sep = "\t", *null = NULL; long int bufsize = DEFAULT_COPYSIZE; PyObject *file, *res = NULL; - if (!PyArg_ParseTuple(args, "O&s|ssi", - _psyco_curs_has_write_check, &file, - &table_name, &sep, &null, &bufsize)) { + static char *kwlist[] = {"file", "table", "sep", "null", "size", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&s|ssi", kwlist, + _psyco_curs_has_read_check, &file, + &table_name, &sep, &null, &bufsize)) { return NULL; } EXC_IF_CURS_CLOSED(self); - - PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'" - " WITH NULL AS '%s'", table_name, sep, null); + + if (null) { + PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'" + " WITH NULL AS '%s'", table_name, sep, null); + } + else { + PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'", + table_name, sep); + } Dprintf("psyco_curs_copy_from: query = %s", query); self->copysize = bufsize; @@ -957,11 +967,24 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args) Py_INCREF(Py_None); } - free(query); - return res; } +static int +_psyco_curs_has_write_check(PyObject* o, void* var) +{ + if (PyObject_HasAttrString(o, "write")) { + Py_INCREF(o); + *((PyObject**)var) = o; + return 1; + } + else { + PyErr_SetString(PyExc_TypeError, + "argument 1 must have a .write() method"); + return 0; + } +} + /* extension: fileno - return the file descripor of the connection */ #define psyco_curs_fileno_doc \ @@ -1084,7 +1107,7 @@ static struct PyMethodDef cursorObject_methods[] = { {"isready", (PyCFunction)psyco_curs_isready, METH_VARARGS, psyco_curs_isready_doc}, {"copy_from", (PyCFunction)psyco_curs_copy_from, - METH_VARARGS, psyco_curs_copy_from_doc}, + METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc}, #endif {NULL} }; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 89e2e2b..e9b0afb 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -73,8 +73,8 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg) if (curs && curs->pgres) { if (conn->protocol == 3) { #ifdef HAVE_PQPROTOCOL3 - char *pgstate = PQresultErrorField(curs->pgres, - PG_DIAG_SQLSTATE); + char *pgstate = + PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE); if (!strncmp(pgstate, "23", 2)) exc = IntegrityError; else @@ -98,7 +98,7 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg) /* try to remove the initial "ERROR: " part from the postgresql error */ if (err && strlen(err) > 8) err = &(err[8]); - + /* if msg is not NULL, add it to the error message, after a '\n' */ if (msg) { PyErr_Format(exc, "%s\n%s", err, msg); @@ -130,6 +130,8 @@ pq_set_critical(connectionObject *conn, const char *msg) PyObject * pq_resolve_critical(connectionObject *conn, int close) { + Dprintf("pq_resolve_critical: resolving %s", conn->critical); + if (conn->critical) { char *msg = &(conn->critical[6]); Dprintf("pq_resolve_critical: error = %s", msg); @@ -562,8 +564,51 @@ _pq_copy_in_v3(cursorObject *curs) /* COPY FROM implementation when protocol 3 is available: this function uses the new PQputCopyData() and can detect errors and set the correct exception */ + PyObject *o; + int length = 0, error = 0; + + Dprintf("_pq_copy_in_v3: called with object at %p", curs->copyfile); + + while (1) { + o = PyObject_CallMethod(curs->copyfile, "read", "i", curs->copysize); + if (!o || !PyString_Check(o) || (length = PyString_Size(o)) == -1) { + error = 1; + } + if (length == 0 || error == 1) break; + + Py_BEGIN_ALLOW_THREADS; + if (PQputCopyData(curs->conn->pgconn, + PyString_AS_STRING(o), length) == -1) { + error = 2; + } + Py_END_ALLOW_THREADS; + + if (error == 2) break; + + Py_DECREF(o); + } - return -1; + Py_XDECREF(o); + + Dprintf("_pq_copy_in_v3: error = %d", error); + + if (error == 0 || error == 2) + /* 0 means that the copy went well, 2 that there was an error on the + backend: in both cases we'll get the error message from the + PQresult */ + PQputCopyEnd(curs->conn->pgconn, NULL); + else + PQputCopyEnd(curs->conn->pgconn, "error during .read() call"); + + /* and finally we grab the operation result from the backend */ + IFCLEARPGRES(curs->pgres); + while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { + if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) + pq_raise(curs->conn, curs, NULL, NULL); + IFCLEARPGRES(curs->pgres); + } + + return 1; } #endif static int @@ -587,6 +632,15 @@ _pq_copy_in(cursorObject *curs) PQputline(curs->conn->pgconn, "\\.\n"); PQendcopy(curs->conn->pgconn); + /* if for some reason we're using a protocol 3 libpq to connect to a + protocol 2 backend we still need to cycle on the result set */ + IFCLEARPGRES(curs->pgres); + while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { + if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) + pq_raise(curs->conn, curs, NULL, NULL); + IFCLEARPGRES(curs->pgres); + } + return 1; } @@ -1,5 +1,6 @@ [build_ext] -define=PSYCOPG_DEBUG,PSYCOPG_EXTENSIONS,PSYCOPG_DISPLAY_SIZE,HAVE_ASPRINTF,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3 +define=PSYCOPG_EXTENSIONS,PSYCOPG_DISPLAY_SIZE,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3 +# PSYCOPG_DEBUG can be added to enable verbose debug information # PSYCOPG_OWN_QUOTING can be added above but it is deprecated # include_dirs is the preferred method for locating postgresql headers, @@ -48,6 +48,7 @@ from distutils.sysconfig import get_python_inc import distutils.ccompiler PSYCOPG_VERSION = '1.99.11/devel' +version_flags = [] have_pydatetime = False have_mxdatetime = False @@ -80,13 +81,10 @@ if sys.version_info[0] >= 2 and sys.version_info[1] >= 4: ext = [] ; data_files = [] library_dirs = [] ; libraries = [] ; include_dirs = [] -if sys.platform != 'win32': - define_macros.append(('PSYCOPG_VERSION', '"'+PSYCOPG_VERSION+'"')) -else: - define_macros.append(('PSYCOPG_VERSION', '\\"'+PSYCOPG_VERSION+'\\"')) +if sys.platform == 'win32': include_dirs = ['.', - POSTGRESQLDIR + "\\src\\interfaces\\libpq", - POSTGRESQLDIR + "\\src\\include" ] + POSTGRESQLDIR + "\\src\\interfaces\\libpq", + POSTGRESQLDIR + "\\src\\include" ] library_dirs = [ POSTGRESQLDIR + "\\src\\interfaces\\libpq\\Release" ] libraries = ["ws2_32"] if USE_PG_DLL: @@ -124,12 +122,14 @@ if os.path.exists(mxincludedir): define_macros.append(('HAVE_MXDATETIME','1')) sources.append('adapter_mxdatetime.c') have_mxdatetime = True + version_flags.append('mx') # check for python datetime package if os.path.exists(os.path.join(get_python_inc(plat_specific=1),"datetime.h")): define_macros.append(('HAVE_PYDATETIME','1')) sources.append('adapter_datetime.c') have_pydatetime = True + version_flags.append('dt') # now decide which package will be the default for date/time typecasts if have_pydatetime and use_pydatetime \ @@ -143,6 +143,26 @@ else: sys.stderr.write("error: python datetime module not found\n") sys.exit(1) +# generate a nice version string to avoid confusion when users report bugs +from ConfigParser import ConfigParser +parser = ConfigParser() +parser.read('setup.cfg') +for have in parser.get('build_ext', 'define').split(','): + if have == 'PSYCOPG_EXTENSIONS': + version_flags.append('ext') + elif have == 'HAVE_PQPROTOCOL3': + version_flags.append('pq3') +if version_flags: + PSYCOPG_VERSION_EX = PSYCOPG_VERSION + " (%s)" % ' '.join(version_flags) +else: + PSYCOPG_VERSION_EX = PSYCOPG_VERSION + +if sys.platform != 'win32': + define_macros.append(('PSYCOPG_VERSION', '"'+PSYCOPG_VERSION_EX+'"')) +else: + define_macros.append(('PSYCOPG_VERSION', '\\"'+PSYCOPG_VERSION_EX+'\\"')) + + # build the extension sources = map(lambda x: os.path.join('psycopg', x), sources) |