diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-04-05 01:42:30 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-04-21 15:21:33 +0100 |
| commit | 0ec73a18b47040b8fef41a8e546057b767b26e26 (patch) | |
| tree | c8584a14a03e6f8dc4f10a25227e0ed4f88f9af0 | |
| parent | 7a06c0455b914ba3fc4a8a819aba2e95d78bda74 (diff) | |
| download | psycopg2-0ec73a18b47040b8fef41a8e546057b767b26e26.tar.gz | |
'pq_execute_command_locked()' calls the wait callback if set.
The function is called without holding the GIL. Because it is necessary
to execute the Python callback if set, we need to re-acquire the GIL and
tnen release it again. In order to correctly bookkeep the thread state,
the pointer of the _save variable is passed to the function.
| -rw-r--r-- | psycopg/connection_int.c | 15 | ||||
| -rw-r--r-- | psycopg/lobject_int.c | 6 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 43 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 7 |
4 files changed, 44 insertions, 27 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index d87f9ca..24a86c1 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -760,21 +760,22 @@ conn_close(connectionObject *self) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); - if (self->closed == 0) - self->closed = 1; - /* execute a forced rollback on the connection (but don't check the result, we're going to close the pq connection anyway */ if (self->pgconn && self->closed == 1) { PGresult *pgres = NULL; char *error = NULL; - if (pq_abort_locked(self, &pgres, &error) < 0) { + if (pq_abort_locked(self, &pgres, &error, &_save) < 0) { IFCLEARPGRES(pgres); if (error) free (error); } } + + if (self->closed == 0) + self->closed = 1; + if (self->pgconn) { PQfinish(self->pgconn); Dprintf("conn_close: PQfinish called"); @@ -825,7 +826,7 @@ conn_switch_isolation_level(connectionObject *self, int level) /* if the current isolation level is > 0 we need to abort the current transaction before changing; that all folks! */ if (self->isolation_level != level && self->isolation_level > 0) { - res = pq_abort_locked(self, &pgres, &error); + res = pq_abort_locked(self, &pgres, &error, &_save); } self->isolation_level = level; @@ -862,10 +863,10 @@ conn_set_client_encoding(connectionObject *self, const char *enc) /* abort the current transaction, to set the encoding ouside of transactions */ - res = pq_abort_locked(self, &pgres, &error); + res = pq_abort_locked(self, &pgres, &error, &_save); if (res == 0) { - res = pq_execute_command_locked(self, query, &pgres, &error); + res = pq_execute_command_locked(self, query, &pgres, &error, &_save); if (res == 0) { /* no error, we can proceeed and store the new encoding */ if (self->encoding) free(self->encoding); diff --git a/psycopg/lobject_int.c b/psycopg/lobject_int.c index 91de1cf..772232b 100644 --- a/psycopg/lobject_int.c +++ b/psycopg/lobject_int.c @@ -58,7 +58,7 @@ lobject_open(lobjectObject *self, connectionObject *conn, Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(self->conn->lock)); - retvalue = pq_begin_locked(self->conn, &pgres, &error); + retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); if (retvalue < 0) goto end; @@ -174,7 +174,7 @@ lobject_unlink(lobjectObject *self) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(self->conn->lock)); - retvalue = pq_begin_locked(self->conn, &pgres, &error); + retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); if (retvalue < 0) goto end; @@ -316,7 +316,7 @@ lobject_export(lobjectObject *self, const char *filename) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(self->conn->lock)); - retvalue = pq_begin_locked(self->conn, &pgres, &error); + retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); if (retvalue < 0) goto end; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 7923989..e4533d9 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -334,17 +334,29 @@ pq_set_non_blocking(connectionObject *conn, int arg, int pyerr) On error, -1 is returned, and the pgres argument will hold the relevant result structure. + + The tstate parameter should be the pointer of the _save variable created by + Py_BEGIN_ALLOW_THREADS: this enables the function to acquire and release + again the GIL if needed, i.e. if a Python wait callback must be invoked. */ int pq_execute_command_locked(connectionObject *conn, const char *query, - PGresult **pgres, char **error) + PGresult **pgres, char **error, + PyThreadState **tstate) { int pgstatus, retvalue = -1; Dprintf("pq_execute_command_locked: pgconn = %p, query = %s", conn->pgconn, query); *error = NULL; - *pgres = PQexec(conn->pgconn, query); + + if (!psyco_green()) { + *pgres = PQexec(conn->pgconn, query); + } else { + PyEval_RestoreThread(*tstate); + *pgres = psyco_exec_green(conn, query); + *tstate = PyEval_SaveThread(); + } if (*pgres == NULL) { const char *msg; @@ -406,7 +418,8 @@ pq_complete_error(connectionObject *conn, PGresult **pgres, char **error) relevant result structure. */ int -pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error) +pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error, + PyThreadState **tstate) { const char *query[] = { NULL, @@ -423,7 +436,7 @@ pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error) } result = pq_execute_command_locked(conn, query[conn->isolation_level], - pgres, error); + pgres, error, tstate); if (result == 0) conn->status = CONN_STATUS_BEGIN; @@ -455,7 +468,7 @@ pq_commit(connectionObject *conn) pthread_mutex_lock(&conn->lock); conn->mark += 1; - retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error); + retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error, &_save); pthread_mutex_unlock(&conn->lock); Py_END_ALLOW_THREADS; @@ -473,7 +486,8 @@ pq_commit(connectionObject *conn) } int -pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error) +pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error, + PyThreadState **tstate) { int retvalue = -1; @@ -486,7 +500,7 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error) } conn->mark += 1; - retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error); + retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error, tstate); if (retvalue == 0) conn->status = CONN_STATUS_READY; @@ -516,7 +530,7 @@ pq_abort(connectionObject *conn) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&conn->lock); - retvalue = pq_abort_locked(conn, &pgres, &error); + retvalue = pq_abort_locked(conn, &pgres, &error, &_save); pthread_mutex_unlock(&conn->lock); Py_END_ALLOW_THREADS; @@ -539,7 +553,8 @@ pq_abort(connectionObject *conn) */ int -pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error) +pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error, + PyThreadState **tstate) { int retvalue = -1; @@ -549,15 +564,15 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error) conn->mark += 1; if (conn->isolation_level > 0 && conn->status == CONN_STATUS_BEGIN) { - retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error); + retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error, tstate); if (retvalue != 0) return retvalue; } - retvalue = pq_execute_command_locked(conn, "RESET ALL", pgres, error); + retvalue = pq_execute_command_locked(conn, "RESET ALL", pgres, error, tstate); if (retvalue != 0) return retvalue; retvalue = pq_execute_command_locked(conn, - "SET SESSION AUTHORIZATION DEFAULT", pgres, error); + "SET SESSION AUTHORIZATION DEFAULT", pgres, error, tstate); if (retvalue != 0) return retvalue; conn->status = CONN_STATUS_READY; @@ -578,7 +593,7 @@ pq_reset(connectionObject *conn) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&conn->lock); - retvalue = pq_reset_locked(conn, &pgres, &error); + retvalue = pq_reset_locked(conn, &pgres, &error, &_save); pthread_mutex_unlock(&conn->lock); Py_END_ALLOW_THREADS; @@ -684,7 +699,7 @@ pq_execute(cursorObject *curs, const char *query, int async) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(curs->conn->lock)); - if (pq_begin_locked(curs->conn, &pgres, &error) < 0) { + if (pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; pq_complete_error(curs->conn, &pgres, &error); diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 7ae38dd..e7e957e 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -38,10 +38,10 @@ HIDDEN int pq_fetch(cursorObject *curs); HIDDEN int pq_execute(cursorObject *curs, const char *query, int async); HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres, - char **error); + char **error, PyThreadState **tstate); HIDDEN int pq_commit(connectionObject *conn); HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres, - char **error); + char **error, PyThreadState **tstate); HIDDEN int pq_abort(connectionObject *conn); HIDDEN int pq_reset(connectionObject *conn); HIDDEN int pq_is_busy(connectionObject *conn); @@ -53,7 +53,8 @@ HIDDEN void pq_set_critical(connectionObject *conn, const char *msg); HIDDEN int pq_execute_command_locked(connectionObject *conn, const char *query, - PGresult **pgres, char **error); + PGresult **pgres, char **error, + PyThreadState **tstate); HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, char **error); |
