diff options
| author | James Henstridge <james@jamesh.id.au> | 2008-01-10 06:14:20 +0000 |
|---|---|---|
| committer | James Henstridge <james@jamesh.id.au> | 2008-01-10 06:14:20 +0000 |
| commit | d190d5918aed4757a79b0799679fecdaa92a234d (patch) | |
| tree | aed99b834328b553b912486da5a1553cf28dd490 | |
| parent | 5fe08ae83e07aef2c7f9a56962d1ef28acf58b4d (diff) | |
| download | psycopg2-d190d5918aed4757a79b0799679fecdaa92a234d.tar.gz | |
2007-12-23 James Henstridge <james@jamesh.id.au>
* psycopg/pqpath.c (pq_execute_command_locked): add an error
argument to hold an error when no PGresult is returned by PQexec,
rather than using pq_set_critical().
(pq_complete_error): new function that converts the error returned
by pq_execute_command_locked() to a Python exception.
(pq_begin_locked): add error argument.
(pq_commit): use pq_complete_error().
(pq_abort): use pq_complete_error().
(pq_abort_locked): always call pq_set_critical() on error, and
clear the error message from pq_execute_command_locked().
(pq_execute): use pq_complete_error() to handle the error from
pq_begin_locked().
* psycopg/pqpath.c (pq_begin): remove unused function.
* psycopg/connection_type.c (psyco_conn_commit): if conn_commit()
raises an error, just return NULL, since it is now setting an
exception itself.
(psyco_conn_rollback): same here.
* psycopg/connection_int.c (conn_commit): don't drop GIL and lock
connection before calling pq_commit().
(conn_rollback): same here.
(conn_close): use pq_abort_locked().
(conn_switch_isolation_level): same here.
(conn_set_client_encoding): same here.
* psycopg/pqpath.h: add prototype for pq_abort_locked().
* psycopg/pqpath.c (pq_commit): convert function to run with GIL
held, and handle errors appropriately.
(pq_abort): same here.
(pq_abort_locked): new function to abort a locked connection.
2007-12-22 James Henstridge <james@jamesh.id.au>
* psycopg/pqpath.c (pq_raise): add a "pgres" argument so we can
generate nice errors not related to a particular cursor.
(pq_execute): use pq_begin_locked() rather than pq_begin(). Use
pq_raise() to handle any errors from it.
* psycopg/pqpath.c (pq_execute_command_locked): helper function
used to execute a command-style query on a locked connection.
(pq_begin_locked): a variant of pq_begin() that uses
pq_execute_command_locked().
(pq_begin): rewrite to use pq_begin_locked().
| -rw-r--r-- | ChangeLog | 49 | ||||
| -rw-r--r-- | psycopg/connection_int.c | 24 | ||||
| -rw-r--r-- | psycopg/connection_type.c | 10 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 246 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 2 |
5 files changed, 217 insertions, 114 deletions
@@ -1,3 +1,52 @@ +2007-12-23 James Henstridge <james@jamesh.id.au> + + * psycopg/pqpath.c (pq_execute_command_locked): add an error + argument to hold an error when no PGresult is returned by PQexec, + rather than using pq_set_critical(). + (pq_complete_error): new function that converts the error returned + by pq_execute_command_locked() to a Python exception. + (pq_begin_locked): add error argument. + (pq_commit): use pq_complete_error(). + (pq_abort): use pq_complete_error(). + (pq_abort_locked): always call pq_set_critical() on error, and + clear the error message from pq_execute_command_locked(). + (pq_execute): use pq_complete_error() to handle the error from + pq_begin_locked(). + + * psycopg/pqpath.c (pq_begin): remove unused function. + + * psycopg/connection_type.c (psyco_conn_commit): if conn_commit() + raises an error, just return NULL, since it is now setting an + exception itself. + (psyco_conn_rollback): same here. + + * psycopg/connection_int.c (conn_commit): don't drop GIL and lock + connection before calling pq_commit(). + (conn_rollback): same here. + (conn_close): use pq_abort_locked(). + (conn_switch_isolation_level): same here. + (conn_set_client_encoding): same here. + + * psycopg/pqpath.h: add prototype for pq_abort_locked(). + + * psycopg/pqpath.c (pq_commit): convert function to run with GIL + held, and handle errors appropriately. + (pq_abort): same here. + (pq_abort_locked): new function to abort a locked connection. + +2007-12-22 James Henstridge <james@jamesh.id.au> + + * psycopg/pqpath.c (pq_raise): add a "pgres" argument so we can + generate nice errors not related to a particular cursor. + (pq_execute): use pq_begin_locked() rather than pq_begin(). Use + pq_raise() to handle any errors from it. + + * psycopg/pqpath.c (pq_execute_command_locked): helper function + used to execute a command-style query on a locked connection. + (pq_begin_locked): a variant of pq_begin() that uses + pq_execute_command_locked(). + (pq_begin): rewrite to use pq_begin_locked(). + 2007-12-22 James Henstridge <james@jamesh.id.au> * psycopg/config.h: only print debug messages if diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 1906e82..94b762b 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -217,7 +217,7 @@ conn_close(connectionObject *self) /* execute a forced rollback on the connection (but don't check the result, we're going to close the pq connection anyway */ if (self->pgconn) { - pq_abort(self); + pq_abort_locked(self); PQfinish(self->pgconn); Dprintf("conn_close: PQfinish called"); self->pgconn = NULL; @@ -235,17 +235,8 @@ conn_commit(connectionObject *self) { int res; - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&self->lock); - res = pq_commit(self); self->mark++; - - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; - - if (res == -1) - pq_resolve_critical(self, 0); return res; } @@ -256,17 +247,8 @@ conn_rollback(connectionObject *self) { int res; - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&self->lock); - res = pq_abort(self); self->mark++; - - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; - - if (res == -1) - pq_resolve_critical(self, 0); return res; } @@ -286,7 +268,7 @@ conn_switch_isolation_level(connectionObject *self, int level) /* if the current isolation level is > 0 we need to abort the current transaction before changing; that all folks! */ if (self->isolation_level != level && self->isolation_level > 0) { - res = pq_abort(self); + res = pq_abort_locked(self); } self->isolation_level = level; self->mark++; @@ -322,7 +304,7 @@ conn_set_client_encoding(connectionObject *self, char *enc) /* abort the current transaction, to set the encoding ouside of transactions */ - res = pq_abort(self); + res = pq_abort_locked(self); if (res == 0) { pgres = PQexec(self->pgconn, query); diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 4eb1680..2d18661 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -117,11 +117,8 @@ psyco_conn_commit(connectionObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "")) return NULL; - if (conn_commit(self) < 0) { - PyErr_SetString(OperationalError, - PQerrorMessage(self->pgconn)); + if (conn_commit(self) < 0) return NULL; - } Py_INCREF(Py_None); return Py_None; @@ -140,11 +137,8 @@ psyco_conn_rollback(connectionObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "")) return NULL; - if (conn_rollback(self) < 0) { - PyErr_SetString(OperationalError, - PQerrorMessage(self->pgconn)); + if (conn_rollback(self) < 0) return NULL; - } Py_INCREF(Py_None); return Py_None; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index ae7d65c..b7daba7 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -61,8 +61,8 @@ strip_severity(const char *msg) This function should be called while holding the GIL. */ static void -pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, - const char *msg) +pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres, + PyObject *exc, const char *msg) { PyObject *pgc = (PyObject*)curs; @@ -76,11 +76,14 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, return; } - if (curs && curs->pgres) { - err = PQresultErrorMessage(curs->pgres); + if (pgres == NULL && curs != NULL) + pgres = curs->pgres; + + if (pgres) { + err = PQresultErrorMessage(pgres); #ifdef HAVE_PQPROTOCOL3 if (err != NULL && conn->protocol == 3) { - code = PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE); + code = PQresultErrorField(pgres, PG_DIAG_SQLSTATE); } #endif } @@ -98,11 +101,11 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, /* if exc is NULL, analyze the message and try to deduce the right exception kind (only if we have a pgres, obviously) */ if (exc == NULL) { - if (curs && curs->pgres) { + if (pgres) { if (conn->protocol == 3) { #ifdef HAVE_PQPROTOCOL3 char *pgstate = - PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE); + PQresultErrorField(pgres, PG_DIAG_SQLSTATE); if (pgstate != NULL && !strncmp(pgstate, "23", 2)) exc = IntegrityError; else @@ -227,63 +230,122 @@ pq_clear_async(connectionObject *conn) } while (pgres != NULL); } -/* pq_begin - send a BEGIN WORK, if necessary +/* pg_execute_command_locked - execute a no-result query on a locked connection. - this function does not call any Py_*_ALLOW_THREADS macros */ + This function should only be called on a locked connection without + holding the global interpreter lock. -int -pq_begin(connectionObject *conn) + On error, -1 is returned, and the pgres argument will hold the + relevant result structure. + */ +static int +pq_execute_command_locked(connectionObject *conn, const char *query, + PGresult **pgres, char **error) { - const char *query[] = { - NULL, - "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED", - "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"}; - int pgstatus, retvalue = -1; - PGresult *pgres = NULL; - - Dprintf("pq_begin: pgconn = %p, isolevel = %ld, status = %d", - conn->pgconn, conn->isolation_level, conn->status); - if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) { - Dprintf("pq_begin: transaction in progress"); - return 0; - } + Dprintf("pq_execute_command_locked: pgconn = %p, query = %s", + conn->pgconn, query); + *error = NULL; + *pgres = PQexec(conn->pgconn, query); + if (*pgres == NULL) { + const char *msg; - pq_clear_async(conn); - pgres = PQexec(conn->pgconn, query[conn->isolation_level]); - if (pgres == NULL) { - Dprintf("pq_begin: PQexec() failed"); - pq_set_critical(conn, NULL); + Dprintf("pq_execute_command_locked: PQexec returned NULL"); + msg = PQerrorMessage(conn->pgconn); + if (msg) + *error = strdup(msg); goto cleanup; } - pgstatus = PQresultStatus(pgres); + pgstatus = PQresultStatus(*pgres); if (pgstatus != PGRES_COMMAND_OK ) { - Dprintf("pq_begin: result is NOT OK"); - pq_set_critical(conn, NULL); + Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)", + pgstatus); goto cleanup; } - Dprintf("pq_begin: issued '%s' command", query[conn->isolation_level]); retvalue = 0; - conn->status = CONN_STATUS_BEGIN; + IFCLEARPGRES(*pgres); cleanup: - IFCLEARPGRES(pgres); return retvalue; } +/* pq_complete_error: handle an error from pq_execute_command_locked() + + If pq_execute_command_locked() returns -1, this function should be + called to convert the result to a Python exception. + + This function should be called while holding the global interpreter + lock. + */ +static void +pq_complete_error(connectionObject *conn, PGresult **pgres, char **error) +{ + Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s", + conn->pgconn, *pgres, *error ? *error : "(null)"); + if (*pgres != NULL) + pq_raise(conn, NULL, *pgres, OperationalError, NULL); + else if (*error != NULL) { + PyErr_SetString(OperationalError, *error); + free(*error); + } else { + PyErr_SetString(OperationalError, "unknown error"); + } + IFCLEARPGRES(*pgres); + if (*error) { + free(*error); + *error = NULL; + } +} + + +/* pq_begin_locked - begin a transaction, if necessary + + This function should only be called on a locked connection without + holding the global interpreter lock. + + On error, -1 is returned, and the pgres argument will hold the + relevant result structure. + */ +static int +pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error) +{ + const char *query[] = { + NULL, + "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED", + "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"}; + int result; + + Dprintf("pq_begin_locked: pgconn = %p, isolevel = %ld, status = %d", + conn->pgconn, conn->isolation_level, conn->status); + + if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) { + Dprintf("pq_begin_locked: transaction in progress"); + return 0; + } + + pq_clear_async(conn); + result = pq_execute_command_locked(conn, query[conn->isolation_level], + pgres, error); + if (result == 0) + conn->status = CONN_STATUS_BEGIN; + + return result; +} + /* pq_commit - send an END, if necessary - this function does not call any Py_*_ALLOW_THREADS macros */ + This function should be called while holding the global interpreter + lock. */ int pq_commit(connectionObject *conn) { - const char *query = "END"; - int pgstatus, retvalue = -1; + int retvalue = -1; PGresult *pgres = NULL; + char *error = NULL; Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d", conn->pgconn, conn->isolation_level, conn->status); @@ -293,43 +355,63 @@ pq_commit(connectionObject *conn) return 0; } + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&conn->lock); + pq_clear_async(conn); - pgres = PQexec(conn->pgconn, query); - if (pgres == NULL) { - Dprintf("pq_commit: PQexec() failed"); - pq_set_critical(conn, NULL); - goto cleanup; - } + retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error); - pgstatus = PQresultStatus(pgres); - if (pgstatus != PGRES_COMMAND_OK ) { - Dprintf("pq_commit: result is NOT OK"); - /* if the result is not OK the transaction has been rolled back - so we set the status to CONN_STATUS_READY anyway */ - conn->status = CONN_STATUS_READY; - pq_set_critical(conn, NULL); - goto cleanup; - } - Dprintf("pq_commit: issued '%s' command", query); + pthread_mutex_unlock(&conn->lock); + Py_END_ALLOW_THREADS; - retvalue = 0; + if (retvalue < 0) + pq_complete_error(conn, &pgres, &error); + + /* Even if an error occurred, the connection will be rolled back, + so we unconditionally set the connection status here. */ conn->status = CONN_STATUS_READY; - cleanup: + return retvalue; +} + +int +pq_abort_locked(connectionObject *conn) +{ + int retvalue = -1; + PGresult *pgres = NULL; + char *error = NULL; + + Dprintf("pq_abort_locked: pgconn = %p, isolevel = %ld, status = %d", + conn->pgconn, conn->isolation_level, conn->status); + + if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) { + Dprintf("pq_abort_locked: no transaction to abort"); + return 0; + } + + pq_clear_async(conn); + retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error); + + if (retvalue < 0) + pq_set_critical(conn, NULL); + IFCLEARPGRES(pgres); + if (error) + free(error); return retvalue; } /* pq_abort - send an ABORT, if necessary - this function does not call any Py_*_ALLOW_THREADS macros */ + This function should be called while holding the global interpreter + lock. */ int pq_abort(connectionObject *conn) { - const char *query = "ABORT"; - int pgstatus, retvalue = -1; + int retvalue = -1; PGresult *pgres = NULL; + char *error = NULL; Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d", conn->pgconn, conn->isolation_level, conn->status); @@ -339,27 +421,20 @@ pq_abort(connectionObject *conn) return 0; } + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&conn->lock); + pq_clear_async(conn); - pgres = PQexec(conn->pgconn, query); - if (pgres == NULL) { - Dprintf("pq_abort: PQexec() failed"); - pq_set_critical(conn, NULL); - goto cleanup; - } + retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error); - pgstatus = PQresultStatus(pgres); - if (pgstatus != PGRES_COMMAND_OK ) { - Dprintf("pq_abort: result is NOT OK"); - pq_set_critical(conn, NULL); - goto cleanup; - } - Dprintf("pq_abort: issued '%s' command", query); + pthread_mutex_unlock(&conn->lock); + Py_END_ALLOW_THREADS; - retvalue = 0; - conn->status = CONN_STATUS_READY; + if (retvalue < 0) + pq_complete_error(conn, &pgres, &error); + else + conn->status = CONN_STATUS_READY; - cleanup: - IFCLEARPGRES(pgres); return retvalue; } @@ -424,6 +499,9 @@ pq_is_busy(connectionObject *conn) int pq_execute(cursorObject *curs, const char *query, int async) { + PGresult *pgres = NULL; + char *error = NULL; + /* if the status of the connection is critical raise an exception and definitely close the connection */ if (curs->conn->critical) { @@ -442,10 +520,10 @@ pq_execute(cursorObject *curs, const char *query, int async) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(curs->conn->lock)); - if (pq_begin(curs->conn) < 0) { + if (pq_begin_locked(curs->conn, &pgres, &error) < 0) { pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; - pq_resolve_critical(curs->conn, 0); + pq_complete_error(curs->conn, &pgres, &error); return -1; } @@ -719,7 +797,7 @@ _pq_copy_in_v3(cursorObject *curs) IFCLEARPGRES(curs->pgres); while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) - pq_raise(curs->conn, curs, NULL, NULL); + pq_raise(curs->conn, curs, NULL, NULL, NULL); IFCLEARPGRES(curs->pgres); } @@ -753,7 +831,7 @@ _pq_copy_in(cursorObject *curs) IFCLEARPGRES(curs->pgres); while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) - pq_raise(curs->conn, curs, NULL, NULL); + pq_raise(curs->conn, curs, NULL, NULL, NULL); IFCLEARPGRES(curs->pgres); } @@ -790,7 +868,7 @@ _pq_copy_out_v3(cursorObject *curs) } if (len == -2) { - pq_raise(curs->conn, NULL, NULL, NULL); + pq_raise(curs->conn, NULL, NULL, NULL, NULL); return -1; } @@ -798,7 +876,7 @@ _pq_copy_out_v3(cursorObject *curs) IFCLEARPGRES(curs->pgres); while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) - pq_raise(curs->conn, curs, NULL, NULL); + pq_raise(curs->conn, curs, NULL, NULL, NULL); IFCLEARPGRES(curs->pgres); } return 1; @@ -849,7 +927,7 @@ _pq_copy_out(cursorObject *curs) IFCLEARPGRES(curs->pgres); while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) - pq_raise(curs->conn, curs, NULL, NULL); + pq_raise(curs->conn, curs, NULL, NULL, NULL); IFCLEARPGRES(curs->pgres); } @@ -984,7 +1062,7 @@ pq_fetch(cursorObject *curs) default: Dprintf("pq_fetch: uh-oh, something FAILED"); - pq_raise(curs->conn, curs, NULL, NULL); + pq_raise(curs->conn, curs, NULL, NULL, NULL); IFCLEARPGRES(curs->pgres); ex = -1; break; diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 57be047..d55b0b6 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -32,8 +32,8 @@ /* exported functions */ extern int pq_fetch(cursorObject *curs); extern int pq_execute(cursorObject *curs, const char *query, int async); -extern int pq_begin(connectionObject *conn); extern int pq_commit(connectionObject *conn); +extern int pq_abort_locked(connectionObject *conn); extern int pq_abort(connectionObject *conn); extern int pq_is_busy(connectionObject *conn); extern void pq_set_critical(connectionObject *conn, const char *msg); |
