summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFederico Di Gregorio <fog@initd.org>2005-03-02 14:07:03 +0000
committerFederico Di Gregorio <fog@initd.org>2005-03-02 14:07:03 +0000
commit3cf4b7ca6fc59e0dbfdb371c751da42c479a6cf4 (patch)
tree28133763ffe39fcfaef2576bf376e5b48564e1f4
parente5f558a6be623ab09f3517a0ff03f94610d18c0e (diff)
downloadpsycopg2-3cf4b7ca6fc59e0dbfdb371c751da42c479a6cf4.tar.gz
Finished COPY TO/COPY FROM implementation.
-rw-r--r--ChangeLog7
-rw-r--r--examples/copy_to.py105
-rw-r--r--psycopg/cursor_type.c51
-rw-r--r--psycopg/pqpath.c43
4 files changed, 187 insertions, 19 deletions
diff --git a/ChangeLog b/ChangeLog
index 2c49c7c..6b34265 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,12 @@
2005-03-02 Federico Di Gregorio <fog@debian.org>
+ * COPY TO implemented using both old and new (v3) protocol.
+
+ * psycopg/pqpath.c (_pq_copy_out_v3): implemented and working.
+
+ * psycopg/cursor_type.c (psyco_curs_copy_to): added cursor object
+ interface for copy_to.
+
* COPY FROM implemented using both old and new (v3) protocol.
* psycopg/config.h (Dprintf): declaration for asprintf is gone.
diff --git a/examples/copy_to.py b/examples/copy_to.py
new file mode 100644
index 0000000..f4433ed
--- /dev/null
+++ b/examples/copy_to.py
@@ -0,0 +1,105 @@
+# copy_to.py -- example about copy_to
+#
+# Copyright (C) 2002 Tom Jenkins <tjenkins@devis.com>
+# Copyright (C) 2005 Federico Di Gregorio <fog@initd.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2, or (at your option) any later
+# version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+
+## put in DSN your DSN string
+
+DSN = 'dbname=test'
+
+## don't modify anything below tis line (except for experimenting)
+
+import sys
+import os
+import StringIO
+import psycopg
+
+if len(sys.argv) > 1:
+ DSN = sys.argv[1]
+
+print "Opening connection using dns:", DSN
+conn = psycopg.connect(DSN)
+print "Encoding for this connection is", conn.encoding
+
+curs = conn.cursor()
+try:
+ curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
+except:
+ conn.rollback()
+ curs.execute("DROP TABLE test_copy")
+ curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
+conn.commit()
+
+# demostrate copy_to functionality
+data = [('Tom', 'Jenkins', '37'),
+ ('Madonna', None, '45'),
+ ('Federico', 'Di Gregorio', None)]
+query = "INSERT INTO test_copy VALUES (%s, %s, %s)"
+for row in data:
+ curs.execute(query, row)
+conn.commit()
+
+# copy_to using defaults
+io = open('copy_to.txt', 'w')
+curs.copy_to(io, 'test_copy')
+print "1) Copy %d records into file object using defaults: " % len (data) + \
+ "sep = \\t and null = \\N"
+io.close()
+
+rows = open('copy_to.txt', 'r').readlines()
+print " File has %d rows:" % len(rows)
+
+for r in rows:
+ print " ", r,
+
+# copy_to using custom separator
+io = open('copy_to.txt', 'w')
+curs.copy_to(io, 'test_copy', ':')
+print "2) Copy %d records into file object using sep = :" % len(data)
+io.close()
+
+rows = open('copy_to.txt', 'r').readlines()
+print " File has %d rows:" % len(rows)
+
+for r in rows:
+ print " ", r,
+
+# copy_to using custom null identifier
+io = open('copy_to.txt', 'w')
+curs.copy_to(io, 'test_copy', null='NULL')
+print "3) Copy %d records into file object using null = NULL" % len(data)
+io.close()
+
+rows = open('copy_to.txt', 'r').readlines()
+print " File has %d rows:" % len(rows)
+
+for r in rows:
+ print " ", r,
+
+# copy_to using custom separator and null identifier
+io = open('copy_to.txt', 'w')
+curs.copy_to(io, 'test_copy', ':', 'NULL')
+print "4) Copy %d records into file object using sep = : and null ) NULL" % \
+ len(data)
+io.close()
+
+rows = open('copy_to.txt', 'r').readlines()
+print " File has %d rows:" % len(rows)
+
+for r in rows:
+ print " ", r,
+
+curs.execute("DROP TABLE test_copy")
+os.unlink('copy_to.txt')
+conn.commit()
diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c
index 95729b0..3dab43a 100644
--- a/psycopg/cursor_type.c
+++ b/psycopg/cursor_type.c
@@ -949,27 +949,31 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
if (null) {
- PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'"
+ PyOS_snprintf(query, 255, "COPY %s FROM stdin USING DELIMITERS '%s'"
" WITH NULL AS '%s'", table_name, sep, null);
}
else {
- PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'",
+ PyOS_snprintf(query, 255, "COPY %s FROM stdin USING DELIMITERS '%s'",
table_name, sep);
}
Dprintf("psyco_curs_copy_from: query = %s", query);
self->copysize = bufsize;
self->copyfile = file;
- Py_INCREF(file);
if (pq_execute(self, query, 0) == 1) {
res = Py_None;
Py_INCREF(Py_None);
}
+
+ self->copyfile =NULL;
return res;
}
+#define psyco_curs_copy_to_doc \
+"copy_to(file, table, sep='\\t', null='\\N') -> copy file to table."
+
static int
_psyco_curs_has_write_check(PyObject* o, void* var)
{
@@ -985,6 +989,45 @@ _psyco_curs_has_write_check(PyObject* o, void* var)
}
}
+static PyObject *
+psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs)
+{
+ char query[256];
+ char *table_name;
+ char *sep = "\t", *null = NULL;
+ PyObject *file, *res = NULL;
+
+ static char *kwlist[] = {"file", "table", "sep", "null", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&s|ss", kwlist,
+ _psyco_curs_has_write_check, &file,
+ &table_name, &sep, &null)) {
+ return NULL;
+ }
+
+ EXC_IF_CURS_CLOSED(self);
+
+ if (null) {
+ PyOS_snprintf(query, 255, "COPY %s TO stdout USING DELIMITERS '%s'"
+ " WITH NULL AS '%s'", table_name, sep, null);
+ }
+ else {
+ PyOS_snprintf(query, 255, "COPY %s TO stdout USING DELIMITERS '%s'",
+ table_name, sep);
+ }
+
+ self->copysize = 0;
+ self->copyfile = file;
+
+ if (pq_execute(self, query, 0) == 1) {
+ res = Py_None;
+ Py_INCREF(Py_None);
+ }
+
+ self->copyfile = NULL;
+
+ return res;
+}
/* extension: fileno - return the file descripor of the connection */
#define psyco_curs_fileno_doc \
@@ -1108,6 +1151,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS, psyco_curs_isready_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
+ {"copy_to", (PyCFunction)psyco_curs_copy_to,
+ METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
#endif
{NULL}
};
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index e9b0afb..c51ed5f 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -451,7 +451,7 @@ _pq_fetch_tuples(cursorObject *curs)
/* calculate the display size for each column (cpu intensive, can be
switched off at configuration time) */
#ifdef PSYCOPG_DISPLAY_SIZE
- dsize = (int *)calloc(pgnfields, sizeof(int));
+ dsize = (int *)PyMem_Malloc(pgnfields * sizeof(int));
if (dsize != NULL) {
if (curs->rowcount == 0) {
for (i=0; i < pgnfields; i++)
@@ -554,7 +554,7 @@ _pq_fetch_tuples(cursorObject *curs)
PyTuple_SET_ITEM(dtitem, 6, Py_None);
}
- if (dsize) free(dsize);
+ if (dsize) PyMem_Free(dsize);
}
#ifdef HAVE_PQPROTOCOL3
@@ -566,8 +566,6 @@ _pq_copy_in_v3(cursorObject *curs)
exception */
PyObject *o;
int length = 0, error = 0;
-
- Dprintf("_pq_copy_in_v3: called with object at %p", curs->copyfile);
while (1) {
o = PyObject_CallMethod(curs->copyfile, "read", "i", curs->copysize);
@@ -589,8 +587,6 @@ _pq_copy_in_v3(cursorObject *curs)
}
Py_XDECREF(o);
-
- Dprintf("_pq_copy_in_v3: error = %d", error);
if (error == 0 || error == 2)
/* 0 means that the copy went well, 2 that there was an error on the
@@ -657,7 +653,7 @@ _pq_copy_out_v3(cursorObject *curs)
Py_END_ALLOW_THREADS;
if (len > 0 && buffer) {
- PyObject_CallMethod(curs->copyfile, "write", "s", buffer);
+ PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len);
PQfreemem(buffer);
}
/* we break on len == 0 but note that that should *not* happen,
@@ -671,6 +667,13 @@ _pq_copy_out_v3(cursorObject *curs)
return -1;
}
+ /* and finally we grab the operation result from the backend */
+ IFCLEARPGRES(curs->pgres);
+ while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
+ if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
+ pq_raise(curs->conn, curs, NULL, NULL);
+ IFCLEARPGRES(curs->pgres);
+ }
return 1;
}
#endif
@@ -680,11 +683,11 @@ _pq_copy_out(cursorObject *curs)
{
char buffer[4096];
int status, len;
- PyObject *o;
while (1) {
Py_BEGIN_ALLOW_THREADS;
status = PQgetline(curs->conn->pgconn, buffer, 4096);
+ Py_END_ALLOW_THREADS;
if (status == 0) {
if (buffer[0] == '\\' && buffer[1] == '.') break;
@@ -695,21 +698,27 @@ _pq_copy_out(cursorObject *curs)
len = 4096-1;
}
else {
- Py_BLOCK_THREADS;
return -1;
}
- Py_END_ALLOW_THREADS;
- o = PyString_FromStringAndSize(buffer, len);
- PyObject_CallMethod(curs->copyfile, "write", "O", o);
- Py_DECREF(o);
+ PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len);
}
- if (PQendcopy(curs->conn->pgconn) != 0) return -1;
+ status = 1;
+ if (PQendcopy(curs->conn->pgconn) != 0)
+ status = -1;
- return 1;
-}
+ /* if for some reason we're using a protocol 3 libpq to connect to a
+ protocol 2 backend we still need to cycle on the result set */
+ IFCLEARPGRES(curs->pgres);
+ while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
+ if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
+ pq_raise(curs->conn, curs, NULL, NULL);
+ IFCLEARPGRES(curs->pgres);
+ }
+ return status;
+}
int
pq_fetch(cursorObject *curs)
@@ -840,6 +849,8 @@ pq_fetch(cursorObject *curs)
break;
}
+ Dprintf("pq_fetch: fetching done; check for critical errors");
+
/* error checking, close the connection if necessary (some critical errors
are not really critical, like a COPY FROM error: if that's the case we
raise the exception but we avoid to close the connection) */