summaryrefslogtreecommitdiff
path: root/psycopg/pqpath.c
diff options
context:
space:
mode:
Diffstat (limited to 'psycopg/pqpath.c')
-rw-r--r--psycopg/pqpath.c246
1 files changed, 162 insertions, 84 deletions
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index ae7d65c..b7daba7 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -61,8 +61,8 @@ strip_severity(const char *msg)
This function should be called while holding the GIL. */
static void
-pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
- const char *msg)
+pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres,
+ PyObject *exc, const char *msg)
{
PyObject *pgc = (PyObject*)curs;
@@ -76,11 +76,14 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
return;
}
- if (curs && curs->pgres) {
- err = PQresultErrorMessage(curs->pgres);
+ if (pgres == NULL && curs != NULL)
+ pgres = curs->pgres;
+
+ if (pgres) {
+ err = PQresultErrorMessage(pgres);
#ifdef HAVE_PQPROTOCOL3
if (err != NULL && conn->protocol == 3) {
- code = PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
+ code = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
}
#endif
}
@@ -98,11 +101,11 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
/* if exc is NULL, analyze the message and try to deduce the right
exception kind (only if we have a pgres, obviously) */
if (exc == NULL) {
- if (curs && curs->pgres) {
+ if (pgres) {
if (conn->protocol == 3) {
#ifdef HAVE_PQPROTOCOL3
char *pgstate =
- PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
+ PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
if (pgstate != NULL && !strncmp(pgstate, "23", 2))
exc = IntegrityError;
else
@@ -227,63 +230,122 @@ pq_clear_async(connectionObject *conn)
} while (pgres != NULL);
}
-/* pq_begin - send a BEGIN WORK, if necessary
+/* pg_execute_command_locked - execute a no-result query on a locked connection.
- this function does not call any Py_*_ALLOW_THREADS macros */
+ This function should only be called on a locked connection without
+ holding the global interpreter lock.
-int
-pq_begin(connectionObject *conn)
+ On error, -1 is returned, and the pgres argument will hold the
+ relevant result structure.
+ */
+static int
+pq_execute_command_locked(connectionObject *conn, const char *query,
+ PGresult **pgres, char **error)
{
- const char *query[] = {
- NULL,
- "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
- "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
-
int pgstatus, retvalue = -1;
- PGresult *pgres = NULL;
-
- Dprintf("pq_begin: pgconn = %p, isolevel = %ld, status = %d",
- conn->pgconn, conn->isolation_level, conn->status);
- if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
- Dprintf("pq_begin: transaction in progress");
- return 0;
- }
+ Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
+ conn->pgconn, query);
+ *error = NULL;
+ *pgres = PQexec(conn->pgconn, query);
+ if (*pgres == NULL) {
+ const char *msg;
- pq_clear_async(conn);
- pgres = PQexec(conn->pgconn, query[conn->isolation_level]);
- if (pgres == NULL) {
- Dprintf("pq_begin: PQexec() failed");
- pq_set_critical(conn, NULL);
+ Dprintf("pq_execute_command_locked: PQexec returned NULL");
+ msg = PQerrorMessage(conn->pgconn);
+ if (msg)
+ *error = strdup(msg);
goto cleanup;
}
- pgstatus = PQresultStatus(pgres);
+ pgstatus = PQresultStatus(*pgres);
if (pgstatus != PGRES_COMMAND_OK ) {
- Dprintf("pq_begin: result is NOT OK");
- pq_set_critical(conn, NULL);
+ Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)",
+ pgstatus);
goto cleanup;
}
- Dprintf("pq_begin: issued '%s' command", query[conn->isolation_level]);
retvalue = 0;
- conn->status = CONN_STATUS_BEGIN;
+ IFCLEARPGRES(*pgres);
cleanup:
- IFCLEARPGRES(pgres);
return retvalue;
}
+/* pq_complete_error: handle an error from pq_execute_command_locked()
+
+ If pq_execute_command_locked() returns -1, this function should be
+ called to convert the result to a Python exception.
+
+ This function should be called while holding the global interpreter
+ lock.
+ */
+static void
+pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
+{
+ Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s",
+ conn->pgconn, *pgres, *error ? *error : "(null)");
+ if (*pgres != NULL)
+ pq_raise(conn, NULL, *pgres, OperationalError, NULL);
+ else if (*error != NULL) {
+ PyErr_SetString(OperationalError, *error);
+ free(*error);
+ } else {
+ PyErr_SetString(OperationalError, "unknown error");
+ }
+ IFCLEARPGRES(*pgres);
+ if (*error) {
+ free(*error);
+ *error = NULL;
+ }
+}
+
+
+/* pq_begin_locked - begin a transaction, if necessary
+
+ This function should only be called on a locked connection without
+ holding the global interpreter lock.
+
+ On error, -1 is returned, and the pgres argument will hold the
+ relevant result structure.
+ */
+static int
+pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error)
+{
+ const char *query[] = {
+ NULL,
+ "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
+ "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
+ int result;
+
+ Dprintf("pq_begin_locked: pgconn = %p, isolevel = %ld, status = %d",
+ conn->pgconn, conn->isolation_level, conn->status);
+
+ if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
+ Dprintf("pq_begin_locked: transaction in progress");
+ return 0;
+ }
+
+ pq_clear_async(conn);
+ result = pq_execute_command_locked(conn, query[conn->isolation_level],
+ pgres, error);
+ if (result == 0)
+ conn->status = CONN_STATUS_BEGIN;
+
+ return result;
+}
+
/* pq_commit - send an END, if necessary
- this function does not call any Py_*_ALLOW_THREADS macros */
+ This function should be called while holding the global interpreter
+ lock. */
int
pq_commit(connectionObject *conn)
{
- const char *query = "END";
- int pgstatus, retvalue = -1;
+ int retvalue = -1;
PGresult *pgres = NULL;
+ char *error = NULL;
Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
@@ -293,43 +355,63 @@ pq_commit(connectionObject *conn)
return 0;
}
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&conn->lock);
+
pq_clear_async(conn);
- pgres = PQexec(conn->pgconn, query);
- if (pgres == NULL) {
- Dprintf("pq_commit: PQexec() failed");
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
+ retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error);
- pgstatus = PQresultStatus(pgres);
- if (pgstatus != PGRES_COMMAND_OK ) {
- Dprintf("pq_commit: result is NOT OK");
- /* if the result is not OK the transaction has been rolled back
- so we set the status to CONN_STATUS_READY anyway */
- conn->status = CONN_STATUS_READY;
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
- Dprintf("pq_commit: issued '%s' command", query);
+ pthread_mutex_unlock(&conn->lock);
+ Py_END_ALLOW_THREADS;
- retvalue = 0;
+ if (retvalue < 0)
+ pq_complete_error(conn, &pgres, &error);
+
+ /* Even if an error occurred, the connection will be rolled back,
+ so we unconditionally set the connection status here. */
conn->status = CONN_STATUS_READY;
- cleanup:
+ return retvalue;
+}
+
+int
+pq_abort_locked(connectionObject *conn)
+{
+ int retvalue = -1;
+ PGresult *pgres = NULL;
+ char *error = NULL;
+
+ Dprintf("pq_abort_locked: pgconn = %p, isolevel = %ld, status = %d",
+ conn->pgconn, conn->isolation_level, conn->status);
+
+ if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
+ Dprintf("pq_abort_locked: no transaction to abort");
+ return 0;
+ }
+
+ pq_clear_async(conn);
+ retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error);
+
+ if (retvalue < 0)
+ pq_set_critical(conn, NULL);
+
IFCLEARPGRES(pgres);
+ if (error)
+ free(error);
return retvalue;
}
/* pq_abort - send an ABORT, if necessary
- this function does not call any Py_*_ALLOW_THREADS macros */
+ This function should be called while holding the global interpreter
+ lock. */
int
pq_abort(connectionObject *conn)
{
- const char *query = "ABORT";
- int pgstatus, retvalue = -1;
+ int retvalue = -1;
PGresult *pgres = NULL;
+ char *error = NULL;
Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
@@ -339,27 +421,20 @@ pq_abort(connectionObject *conn)
return 0;
}
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&conn->lock);
+
pq_clear_async(conn);
- pgres = PQexec(conn->pgconn, query);
- if (pgres == NULL) {
- Dprintf("pq_abort: PQexec() failed");
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
+ retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error);
- pgstatus = PQresultStatus(pgres);
- if (pgstatus != PGRES_COMMAND_OK ) {
- Dprintf("pq_abort: result is NOT OK");
- pq_set_critical(conn, NULL);
- goto cleanup;
- }
- Dprintf("pq_abort: issued '%s' command", query);
+ pthread_mutex_unlock(&conn->lock);
+ Py_END_ALLOW_THREADS;
- retvalue = 0;
- conn->status = CONN_STATUS_READY;
+ if (retvalue < 0)
+ pq_complete_error(conn, &pgres, &error);
+ else
+ conn->status = CONN_STATUS_READY;
- cleanup:
- IFCLEARPGRES(pgres);
return retvalue;
}
@@ -424,6 +499,9 @@ pq_is_busy(connectionObject *conn)
int
pq_execute(cursorObject *curs, const char *query, int async)
{
+ PGresult *pgres = NULL;
+ char *error = NULL;
+
/* if the status of the connection is critical raise an exception and
definitely close the connection */
if (curs->conn->critical) {
@@ -442,10 +520,10 @@ pq_execute(cursorObject *curs, const char *query, int async)
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
- if (pq_begin(curs->conn) < 0) {
+ if (pq_begin_locked(curs->conn, &pgres, &error) < 0) {
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
- pq_resolve_critical(curs->conn, 0);
+ pq_complete_error(curs->conn, &pgres, &error);
return -1;
}
@@ -719,7 +797,7 @@ _pq_copy_in_v3(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@@ -753,7 +831,7 @@ _pq_copy_in(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@@ -790,7 +868,7 @@ _pq_copy_out_v3(cursorObject *curs)
}
if (len == -2) {
- pq_raise(curs->conn, NULL, NULL, NULL);
+ pq_raise(curs->conn, NULL, NULL, NULL, NULL);
return -1;
}
@@ -798,7 +876,7 @@ _pq_copy_out_v3(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return 1;
@@ -849,7 +927,7 @@ _pq_copy_out(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@@ -984,7 +1062,7 @@ pq_fetch(cursorObject *curs)
default:
Dprintf("pq_fetch: uh-oh, something FAILED");
- pq_raise(curs->conn, curs, NULL, NULL);
+ pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
ex = -1;
break;