diff options
-rw-r--r-- | NEWS | 3 | ||||
-rw-r--r-- | doc/src/advanced.rst | 8 | ||||
-rw-r--r-- | doc/src/usage.rst | 4 | ||||
-rw-r--r-- | psycopg/connection.h | 5 | ||||
-rw-r--r-- | psycopg/connection_int.c | 2 | ||||
-rw-r--r-- | psycopg/cursor_type.c | 8 | ||||
-rw-r--r-- | psycopg/green.c | 15 | ||||
-rw-r--r-- | psycopg/pqpath.c | 14 |
8 files changed, 39 insertions, 20 deletions
@@ -13,6 +13,9 @@ What's new in psycopg 2.4.2 support was built (ticket #53). - Fixed escape for negative numbers prefixed by minus operator (ticket #57). + - Trying to execute concurrent operations on the same connection + through concurrent green thread results in an error instead of a + deadlock. What's new in psycopg 2.4.1 diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index ac16ca9..c7625b1 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -432,11 +432,9 @@ SQLAlchemy_) to be used in coroutine-based programs. .. warning:: Psycopg connections are not *green thread safe* and can't be used - concurrently by different green threads. Each connection has a lock - used to serialize requests from different cursors to the backend process. - The lock is held for the duration of the command: if the control switched - to a different thread and the latter tried to access the same connection, - the result would be a deadlock. + concurrently by different green threads. Trying to execute more than one + command at time using one cursor per thread will result in an error (or a + deadlock on versions before 2.4.2). Therefore, programmers are advised to either avoid sharing connections between coroutines or to use a library-friendly lock to synchronize shared diff --git a/doc/src/usage.rst b/doc/src/usage.rst index de82c62..efbd158 100644 --- a/doc/src/usage.rst +++ b/doc/src/usage.rst @@ -598,8 +598,8 @@ forking web deploy method such as FastCGI ensure to create the connections .. __: http://www.postgresql.org/docs/9.0/static/libpq-connect.html#LIBPQ-CONNECT -Connections shouldn't be shared either by different green threads: doing so -may result in a deadlock. See :ref:`green-support` for further details. +Connections shouldn't be shared either by different green threads: see +:ref:`green-support` for further details. diff --git a/psycopg/connection.h b/psycopg/connection.h index 30ac5f4..14d58a0 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -90,7 +90,10 @@ typedef struct { PGconn *pgconn; /* the postgresql connection */ PGcancel *cancel; /* the cancellation structure */ - PyObject *async_cursor; /* weakref to a cursor executing an asynchronous query */ + /* Weakref to the object executing an asynchronous query. The object + * is a cursor for async connections, but it may be something else + * for a green connection. If NULL, the connection is idle. */ + PyObject *async_cursor; int async_status; /* asynchronous execution status */ /* notice processing */ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 4170b32..732b22e 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -863,7 +863,7 @@ conn_poll(connectionObject *self) case CONN_STATUS_PREPARED: res = _conn_poll_query(self); - if (res == PSYCO_POLL_OK && self->async_cursor) { + if (res == PSYCO_POLL_OK && self->async && self->async_cursor) { /* An async query has just finished: parse the tuple in the * target cursor. */ cursorObject *curs; diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 8ede845..c18fb71 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -739,7 +739,6 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args) PyObject *res; EXC_IF_CURS_CLOSED(self); - EXC_IF_ASYNC_IN_PROGRESS(self, fetchone); if (_psyco_curs_prefetch(self) < 0) return NULL; EXC_IF_NO_TUPLES(self); @@ -747,6 +746,7 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_ASYNC_IN_PROGRESS(self, fetchone); EXC_IF_TPC_PREPARED(self->conn, fetchone); PyOS_snprintf(buffer, 127, "FETCH FORWARD 1 FROM \"%s\"", self->name); if (pq_execute(self, buffer, 0) == -1) return NULL; @@ -853,7 +853,6 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords) } EXC_IF_CURS_CLOSED(self); - EXC_IF_ASYNC_IN_PROGRESS(self, fetchmany); if (_psyco_curs_prefetch(self) < 0) return NULL; EXC_IF_NO_TUPLES(self); @@ -861,6 +860,7 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_ASYNC_IN_PROGRESS(self, fetchmany); EXC_IF_TPC_PREPARED(self->conn, fetchone); PyOS_snprintf(buffer, 127, "FETCH FORWARD %d FROM \"%s\"", (int)size, self->name); @@ -924,7 +924,6 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args) PyObject *list, *res; EXC_IF_CURS_CLOSED(self); - EXC_IF_ASYNC_IN_PROGRESS(self, fetchall); if (_psyco_curs_prefetch(self) < 0) return NULL; EXC_IF_NO_TUPLES(self); @@ -932,6 +931,7 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_ASYNC_IN_PROGRESS(self, fetchall); EXC_IF_TPC_PREPARED(self->conn, fetchall); PyOS_snprintf(buffer, 127, "FETCH FORWARD ALL FROM \"%s\"", self->name); if (pq_execute(self, buffer, 0) == -1) return NULL; @@ -1112,7 +1112,6 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) return NULL; EXC_IF_CURS_CLOSED(self); - EXC_IF_ASYNC_IN_PROGRESS(self, scroll) /* if the cursor is not named we have the full result set and we can do our own calculations to scroll; else we just delegate the scrolling @@ -1141,6 +1140,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_ASYNC_IN_PROGRESS(self, scroll) EXC_IF_TPC_PREPARED(self->conn, scroll); if (strcmp(mode, "absolute") == 0) { diff --git a/psycopg/green.c b/psycopg/green.c index c9b6e07..65578f5 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -152,6 +152,20 @@ psyco_exec_green(connectionObject *conn, const char *command) { PGresult *result = NULL; + /* Check that there is a single concurrently executing query */ + if (conn->async_cursor) { + PyErr_SetString(ProgrammingError, + "a single async query can be executed on the same connection"); + goto end; + } + /* we don't care about which cursor is executing the query, and + * it may also be that no cursor is involved at all and this is + * an internal query. So just store anything in the async_cursor, + * respecting the code expecting it to be a weakref */ + if (!(conn->async_cursor = PyWeakref_NewRef((PyObject*)conn, NULL))) { + goto end; + } + /* Send the query asynchronously */ if (0 == pq_send_query(conn, command)) { goto end; @@ -173,6 +187,7 @@ psyco_exec_green(connectionObject *conn, const char *command) end: conn->async_status = ASYNC_DONE; + Py_CLEAR(conn->async_cursor); return result; } diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index fb0736a..84110f6 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -343,12 +343,12 @@ pq_execute_command_locked(connectionObject *conn, const char *query, *tstate = PyEval_SaveThread(); } if (*pgres == NULL) { - const char *msg; - Dprintf("pq_execute_command_locked: PQexec returned NULL"); - msg = PQerrorMessage(conn->pgconn); - if (msg) - *error = strdup(msg); + if (!PyErr_Occurred()) { + const char *msg; + msg = PQerrorMessage(conn->pgconn); + if (msg && *msg) { *error = strdup(msg); } + } goto cleanup; } @@ -361,8 +361,8 @@ pq_execute_command_locked(connectionObject *conn, const char *query, retvalue = 0; IFCLEARPGRES(*pgres); - - cleanup: + +cleanup: return retvalue; } |