summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Henstridge <james@jamesh.id.au>2008-01-10 06:14:20 +0000
committerJames Henstridge <james@jamesh.id.au>2008-01-10 06:14:20 +0000
commitd190d5918aed4757a79b0799679fecdaa92a234d (patch)
treeaed99b834328b553b912486da5a1553cf28dd490
parent5fe08ae83e07aef2c7f9a56962d1ef28acf58b4d (diff)
downloadpsycopg2-d190d5918aed4757a79b0799679fecdaa92a234d.tar.gz
2007-12-23 James Henstridge <james@jamesh.id.au>
* psycopg/pqpath.c (pq_execute_command_locked): add an error argument to hold an error when no PGresult is returned by PQexec, rather than using pq_set_critical(). (pq_complete_error): new function that converts the error returned by pq_execute_command_locked() to a Python exception. (pq_begin_locked): add error argument. (pq_commit): use pq_complete_error(). (pq_abort): use pq_complete_error(). (pq_abort_locked): always call pq_set_critical() on error, and clear the error message from pq_execute_command_locked(). (pq_execute): use pq_complete_error() to handle the error from pq_begin_locked(). * psycopg/pqpath.c (pq_begin): remove unused function. * psycopg/connection_type.c (psyco_conn_commit): if conn_commit() raises an error, just return NULL, since it is now setting an exception itself. (psyco_conn_rollback): same here. * psycopg/connection_int.c (conn_commit): don't drop GIL and lock connection before calling pq_commit(). (conn_rollback): same here. (conn_close): use pq_abort_locked(). (conn_switch_isolation_level): same here. (conn_set_client_encoding): same here. * psycopg/pqpath.h: add prototype for pq_abort_locked(). * psycopg/pqpath.c (pq_commit): convert function to run with GIL held, and handle errors appropriately. (pq_abort): same here. (pq_abort_locked): new function to abort a locked connection. 2007-12-22 James Henstridge <james@jamesh.id.au> * psycopg/pqpath.c (pq_raise): add a "pgres" argument so we can generate nice errors not related to a particular cursor. (pq_execute): use pq_begin_locked() rather than pq_begin(). Use pq_raise() to handle any errors from it. * psycopg/pqpath.c (pq_execute_command_locked): helper function used to execute a command-style query on a locked connection. (pq_begin_locked): a variant of pq_begin() that uses pq_execute_command_locked(). (pq_begin): rewrite to use pq_begin_locked().
-rw-r--r--ChangeLog49
-rw-r--r--psycopg/connection_int.c24
-rw-r--r--psycopg/connection_type.c10
-rw-r--r--psycopg/pqpath.c246
-rw-r--r--psycopg/pqpath.h2
5 files changed, 217 insertions, 114 deletions
diff --git a/ChangeLog b/ChangeLog
index abdf993..91f5ff2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,52 @@
+2007-12-23 James Henstridge <james@jamesh.id.au>
+
+ * psycopg/pqpath.c (pq_execute_command_locked): add an error
+ argument to hold an error when no PGresult is returned by PQexec,
+ rather than using pq_set_critical().
+ (pq_complete_error): new function that converts the error returned
+ by pq_execute_command_locked() to a Python exception.
+ (pq_begin_locked): add error argument.
+ (pq_commit): use pq_complete_error().
+ (pq_abort): use pq_complete_error().
+ (pq_abort_locked): always call pq_set_critical() on error, and
+ clear the error message from pq_execute_command_locked().
+ (pq_execute): use pq_complete_error() to handle the error from
+ pq_begin_locked().
+
+ * psycopg/pqpath.c (pq_begin): remove unused function.
+
+ * psycopg/connection_type.c (psyco_conn_commit): if conn_commit()
+ raises an error, just return NULL, since it is now setting an
+ exception itself.
+ (psyco_conn_rollback): same here.
+
+ * psycopg/connection_int.c (conn_commit): don't drop GIL and lock
+ connection before calling pq_commit().
+ (conn_rollback): same here.
+ (conn_close): use pq_abort_locked().
+ (conn_switch_isolation_level): same here.
+ (conn_set_client_encoding): same here.
+
+ * psycopg/pqpath.h: add prototype for pq_abort_locked().
+
+ * psycopg/pqpath.c (pq_commit): convert function to run with GIL
+ held, and handle errors appropriately.
+ (pq_abort): same here.
+ (pq_abort_locked): new function to abort a locked connection.
+
+2007-12-22 James Henstridge <james@jamesh.id.au>
+
+ * psycopg/pqpath.c (pq_raise): add a "pgres" argument so we can
+ generate nice errors not related to a particular cursor.
+ (pq_execute): use pq_begin_locked() rather than pq_begin(). Use
+ pq_raise() to handle any errors from it.
+
+ * psycopg/pqpath.c (pq_execute_command_locked): helper function
+ used to execute a command-style query on a locked connection.
+ (pq_begin_locked): a variant of pq_begin() that uses
+ pq_execute_command_locked().
+ (pq_begin): rewrite to use pq_begin_locked().
+
2007-12-22 James Henstridge <james@jamesh.id.au>
* psycopg/config.h: only print debug messages if
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 1906e82..94b762b 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -217,7 +217,7 @@ conn_close(connectionObject *self)
/* execute a forced rollback on the connection (but don't check the
result, we're going to close the pq connection anyway */
if (self->pgconn) {
- pq_abort(self);
+ pq_abort_locked(self);
PQfinish(self->pgconn);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
@@ -235,17 +235,8 @@ conn_commit(connectionObject *self)
{
int res;
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&self->lock);
-
res = pq_commit(self);
self->mark++;
-
- pthread_mutex_unlock(&self->lock);
- Py_END_ALLOW_THREADS;
-
- if (res == -1)
- pq_resolve_critical(self, 0);
return res;
}
@@ -256,17 +247,8 @@ conn_rollback(connectionObject *self)
{
int res;
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&self->lock);
-
res = pq_abort(self);
self->mark++;
-
- pthread_mutex_unlock(&self->lock);
- Py_END_ALLOW_THREADS;
-
- if (res == -1)
- pq_resolve_critical(self, 0);
return res;
}
@@ -286,7 +268,7 @@ conn_switch_isolation_level(connectionObject *self, int level)
/* if the current isolation level is > 0 we need to abort the current
transaction before changing; that all folks! */
if (self->isolation_level != level && self->isolation_level > 0) {
- res = pq_abort(self);
+ res = pq_abort_locked(self);
}
self->isolation_level = level;
self->mark++;
@@ -322,7 +304,7 @@ conn_set_client_encoding(connectionObject *self, char *enc)
/* abort the current transaction, to set the encoding ouside of
transactions */
- res = pq_abort(self);
+ res = pq_abort_locked(self);
if (res == 0) {
pgres = PQexec(self->pgconn, query);
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 4eb1680..2d18661 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -117,11 +117,8 @@ psyco_conn_commit(connectionObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL;
- if (conn_commit(self) < 0) {
- PyErr_SetString(OperationalError,
- PQerrorMessage(self->pgconn));
+ if (conn_commit(self) < 0)
return NULL;
- }
Py_INCREF(Py_None);
return Py_None;
@@ -140,11 +137,8 @@ psyco_conn_rollback(connectionObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL;
- if (conn_rollback(self) < 0) {
- PyErr_SetString(OperationalError,
- PQerrorMessage(self->pgconn));
+ if (conn_rollback(self) < 0)
return NULL;
- }
Py_INCREF(Py_None);
return Py_None;
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index ae7d65c..b7daba7 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -61,8 +61,8 @@ strip_severity(const char *msg)
This function should be called while holding the GIL. */
static void
-pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
- const char *msg)
+pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres,
+ PyObject *exc, const char *msg)
{
PyObject *pgc = (PyObject*)curs;
@@ -76,11 +76,14 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
return;
}
- if (curs && curs->pgres) {
- err = PQresultErrorMessage(curs->pgres);
+ if (pgres == NULL && curs != NULL)
+ pgres = curs->pgres;
+
+ if (pgres) {
+ err = PQresultErrorMessage(pgres);
#ifdef HAVE_PQPROTOCOL3
if (err != NULL && conn->protocol == 3) {
- code = PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
+ code = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
}
#endif
}
@@ -98,11 +101,11 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
/* if exc is NULL, analyze the message and try to deduce the right
exception kind (only if we have a pgres, obviously) */
if (exc == NULL) {
- if (curs && curs->pgres) {
+ if (pgres) {
if (conn->protocol == 3) {
#ifdef HAVE_PQPROTOCOL3
char *pgstate =
- PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
+ PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
if (pgstate != NULL && !strncmp(pgstate, "23", 2))
exc = IntegrityError;
else
@@ -227,63 +230,122 @@ pq_clear_async(connectionObject *conn)
} while (pgres != NULL);
}
-/* pq_begin - send a BEGIN WORK, if necessary
+/* pg_execute_command_locked - execute a no-result query on a locked connection.
- this function does not call any Py_*_ALLOW_THREADS macros */
+ This function should only be called on a locked connection without
+ holding the global interpreter lock.
-int
-pq_begin(connectionObject *conn)
+ On error, -1 is returned, and the pgres argument will hold the
+ relevant result structure.
+ */
+static int
+pq_execute_command_locked(connectionObject *conn, const char *query,
+ PGresult **pgres, char **error)
{
- const char *query[] = {
- NULL,
- "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
- "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
-
int pgstatus, retvalue = -1;
- PGresult *pgres = NULL;
-
- Dprintf("pq_begin: pgconn = %p, isolevel = %ld, status = %d",
- conn->pgconn, conn->isolation_level, conn->status);
- if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
- Dprintf("pq_begin: transaction in progress");
- return 0;
- }
+ Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
+ conn->pgconn, query);
+ *error = NULL;
+ *pgres = PQexec(conn->pgconn, query);
+ if (*pgres == NULL) {
+ const char *msg;
- pq_clear_async(conn);
- pgres = PQexec(conn->pgconn, query[conn->isolation_level]);
- if (pgres == NULL) {
- Dprintf("pq_begin: PQexec() failed");
- pq_set_critical(conn, NULL);
+ Dprintf("pq_execute_command_locked: PQexec returned NULL");
+ msg = PQerrorMessage(conn->pgconn);
+ if (msg)
+ *error = strdup(msg);
goto cleanup;
}
- pgstatus = PQresultStatus(pgres);
+ pgstatus = PQresultStatus(*pgres);
if (pgstatus != PGRES_COMMAND_OK ) {
- Dprintf("pq_begin: result is NOT OK");
- pq_set_critical(conn, NULL);
+ Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)",
+ pgstatus);
goto cleanup;
}
- Dprintf("pq_begin: issued '%s' command", query[conn->isolation_level]);
retvalue = 0;
- conn->status = CONN_STATUS_BEGIN;
+ IFCLEARPGRES(*pgres);
cleanup:
- IFCLEARPGRES(pgres);
return retvalue;
}
+/* pq_complete_error: handle an error from pq_execute_command_locked()
+
+ If pq_execute_command_locked() returns -1, this function should be
+ called to convert the result to a Python exception.
+
+ This function should be called while holding the global interpreter
+ lock.
+ */
+static void
+pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
+{
+ Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s",
+ conn->pgconn, *pgres, *error ? *error : "(null)");
+ if (*pgres != NULL)
+ pq_raise(conn, NULL, *pgres, OperationalError, NULL);
+ else if (*error != NULL) {
+ PyErr_SetString(OperationalError, *error);
+ free(*error);
+ } else {
+ PyErr_SetString(OperationalError, "unknown error");
+ }
+ IFCLEARPGRES(*pgres);
+ if (*error) {
+ free(*error);
+ *error = NULL;
+ }
+}
+
+
+/* pq_begin_locked - begin a transaction, if necessary
+
+ This function should only be called on a locked connection without
+ holding the global interpreter lock.
+
+ On error, -1 is returned, and the pgres argument will hold the
+ relevant result structure.
+ */
+static int
+pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error)
+{
+ const char *query[] = {
+ NULL,
+ "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
+ "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
+ int result;
+
+ Dprintf("pq_begin_locked: pgconn = %p, isolevel = %ld, status = %d",
+ conn->pgconn, conn->isolation_level, conn->status);
+
+ if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
+ Dprintf("pq_begin_locked: transaction in progress");
+ return 0;
+ }
+
+ pq_clear_async(conn);
+ result = pq_execute_command_locked(conn, query[conn->isolation_level],
+ pgres, error);
+ if (result == 0)
+ conn->status = CONN_STATUS_BEGIN;
+
+ return result;
+}
+
/* pq_commit - send an END, if necessary
- this function does not call any Py_*_ALLOW_THREADS macros */
+ This function should be called while holding the global interpreter
+ lock. */
int
pq_commit(connectionObject *conn)
{
- const char *query = "END";
- int pgstatus, retvalue = -1;
+ int retvalue = -1;
PGresult *pgres = NULL;
+ char *error = NULL;
Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
@@ -293,43 +355,63 @@ pq_commit(connectionObject *conn)
return 0;
}
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&conn->lock);
+
pq_clear_async(conn);
- pgres = PQexec(conn->pgconn, query);
- if (pgres == NULL) {
- Dprintf("pq_commit: PQexec() failed");
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
+ retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error);
- pgstatus = PQresultStatus(pgres);
- if (pgstatus != PGRES_COMMAND_OK ) {
- Dprintf("pq_commit: result is NOT OK");
- /* if the result is not OK the transaction has been rolled back
- so we set the status to CONN_STATUS_READY anyway */
- conn->status = CONN_STATUS_READY;
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
- Dprintf("pq_commit: issued '%s' command", query);
+ pthread_mutex_unlock(&conn->lock);
+ Py_END_ALLOW_THREADS;
- retvalue = 0;
+ if (retvalue < 0)
+ pq_complete_error(conn, &pgres, &error);
+
+ /* Even if an error occurred, the connection will be rolled back,
+ so we unconditionally set the connection status here. */
conn->status = CONN_STATUS_READY;
- cleanup:
+ return retvalue;
+}
+
+int
+pq_abort_locked(connectionObject *conn)
+{
+ int retvalue = -1;
+ PGresult *pgres = NULL;
+ char *error = NULL;
+
+ Dprintf("pq_abort_locked: pgconn = %p, isolevel = %ld, status = %d",
+ conn->pgconn, conn->isolation_level, conn->status);
+
+ if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
+ Dprintf("pq_abort_locked: no transaction to abort");
+ return 0;
+ }
+
+ pq_clear_async(conn);
+ retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error);
+
+ if (retvalue < 0)
+ pq_set_critical(conn, NULL);
+
IFCLEARPGRES(pgres);
+ if (error)
+ free(error);
return retvalue;
}
/* pq_abort - send an ABORT, if necessary
- this function does not call any Py_*_ALLOW_THREADS macros */
+ This function should be called while holding the global interpreter
+ lock. */
int
pq_abort(connectionObject *conn)
{
- const char *query = "ABORT";
- int pgstatus, retvalue = -1;
+ int retvalue = -1;
PGresult *pgres = NULL;
+ char *error = NULL;
Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
@@ -339,27 +421,20 @@ pq_abort(connectionObject *conn)
return 0;
}
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&conn->lock);
+
pq_clear_async(conn);
- pgres = PQexec(conn->pgconn, query);
- if (pgres == NULL) {
- Dprintf("pq_abort: PQexec() failed");
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
+ retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error);
- pgstatus = PQresultStatus(pgres);
- if (pgstatus != PGRES_COMMAND_OK ) {
- Dprintf("pq_abort: result is NOT OK");
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
- Dprintf("pq_abort: issued '%s' command", query);
+ pthread_mutex_unlock(&conn->lock);
+ Py_END_ALLOW_THREADS;
- retvalue = 0;
- conn->status = CONN_STATUS_READY;
+ if (retvalue < 0)
+ pq_complete_error(conn, &pgres, &error);
+ else
+ conn->status = CONN_STATUS_READY;
- cleanup:
- IFCLEARPGRES(pgres);
return retvalue;
}
@@ -424,6 +499,9 @@ pq_is_busy(connectionObject *conn)
int
pq_execute(cursorObject *curs, const char *query, int async)
{
+ PGresult *pgres = NULL;
+ char *error = NULL;
+
/* if the status of the connection is critical raise an exception and
definitely close the connection */
if (curs->conn->critical) {
@@ -442,10 +520,10 @@ pq_execute(cursorObject *curs, const char *query, int async)
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
- if (pq_begin(curs->conn) < 0) {
+ if (pq_begin_locked(curs->conn, &pgres, &error) < 0) {
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
- pq_resolve_critical(curs->conn, 0);
+ pq_complete_error(curs->conn, &pgres, &error);
return -1;
}
@@ -719,7 +797,7 @@ _pq_copy_in_v3(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@@ -753,7 +831,7 @@ _pq_copy_in(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@@ -790,7 +868,7 @@ _pq_copy_out_v3(cursorObject *curs)
}
if (len == -2) {
- pq_raise(curs->conn, NULL, NULL, NULL);
+ pq_raise(curs->conn, NULL, NULL, NULL, NULL);
return -1;
}
@@ -798,7 +876,7 @@ _pq_copy_out_v3(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return 1;
@@ -849,7 +927,7 @@ _pq_copy_out(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@@ -984,7 +1062,7 @@ pq_fetch(cursorObject *curs)
default:
Dprintf("pq_fetch: uh-oh, something FAILED");
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
ex = -1;
break;
diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h
index 57be047..d55b0b6 100644
--- a/psycopg/pqpath.h
+++ b/psycopg/pqpath.h
@@ -32,8 +32,8 @@
/* exported functions */
extern int pq_fetch(cursorObject *curs);
extern int pq_execute(cursorObject *curs, const char *query, int async);
-extern int pq_begin(connectionObject *conn);
extern int pq_commit(connectionObject *conn);
+extern int pq_abort_locked(connectionObject *conn);
extern int pq_abort(connectionObject *conn);
extern int pq_is_busy(connectionObject *conn);
extern void pq_set_critical(connectionObject *conn, const char *msg);