diff options
| -rw-r--r-- | psycopg/connection.h | 5 | ||||
| -rw-r--r-- | psycopg/connection_type.c | 6 | ||||
| -rw-r--r-- | psycopg/cursor.h | 11 | ||||
| -rw-r--r-- | psycopg/cursor_type.c | 35 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 8 |
5 files changed, 34 insertions, 31 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h index 32aaddf..c0fec18 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -137,6 +137,11 @@ HIDDEN PyObject *conn_poll_fetch(connectionObject *self); PyErr_SetString(InterfaceError, "connection already closed"); \ return NULL; } +#define EXC_IF_CONN_ASYNC(self, cmd) if ((self)->async == 1) { \ + PyErr_SetString(ProgrammingError, #cmd " cannot be used " \ + "in asynchronous mode"); \ + return NULL; } + #ifdef __cplusplus } #endif diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index a4f5430..f80eba1 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -134,6 +134,7 @@ static PyObject * psyco_conn_commit(connectionObject *self, PyObject *args) { EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, commit); if (!PyArg_ParseTuple(args, "")) return NULL; @@ -154,6 +155,7 @@ static PyObject * psyco_conn_rollback(connectionObject *self, PyObject *args) { EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, rollback); if (!PyArg_ParseTuple(args, "")) return NULL; @@ -178,6 +180,7 @@ psyco_conn_set_isolation_level(connectionObject *self, PyObject *args) int level = 1; EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, set_isolation_level); if (!PyArg_ParseTuple(args, "i", &level)) return NULL; @@ -211,6 +214,7 @@ psyco_conn_set_client_encoding(connectionObject *self, PyObject *args) size_t i, j; EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, set_client_encoding); if (!PyArg_ParseTuple(args, "s", &enc)) return NULL; @@ -309,6 +313,7 @@ psyco_conn_lobject(connectionObject *self, PyObject *args, PyObject *keywds) } EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, lobject); Dprintf("psyco_conn_lobject: new lobject for connection at %p", self); Dprintf("psyco_conn_lobject: parameters: oid = %d, mode = %s", @@ -381,6 +386,7 @@ psyco_conn_reset(connectionObject *self) int res; EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, reset); if (pq_reset(self) < 0) return NULL; diff --git a/psycopg/cursor.h b/psycopg/cursor.h index bcfc81f..a2293a0 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -106,6 +106,17 @@ if ((self)->mark != (self)->conn->mark) { \ PyErr_SetString(ProgrammingError, "named cursor isn't valid anymore"); \ return NULL; } +#define EXC_IF_CURS_ASYNC(self, cmd) if ((self)->conn->async == 1) { \ + PyErr_SetString(ProgrammingError, #cmd " cannot be used " \ + "in asynchronous mode"); \ + return NULL; } + +#define EXC_IF_ASYNC_IN_PROGRESS(self, cmd) \ +if ((self)->conn->async_cursor != NULL) { \ + PyErr_SetString(ProgrammingError, #cmd " cannot be used " \ + "while an asynchronous query is underway"); \ + return NULL; } + #ifdef __cplusplus } #endif diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 358eb23..5cabf81 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -57,6 +57,7 @@ psyco_curs_close(cursorObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "")) return NULL; EXC_IF_CURS_CLOSED(self); + EXC_IF_ASYNC_IN_PROGRESS(self, close); if (self->name != NULL) { char buffer[128]; @@ -67,7 +68,6 @@ psyco_curs_close(cursorObject *self, PyObject *args) } self->closed = 1; - pq_clear_async(self->conn); Dprintf("psyco_curs_close: cursor at %p closed", self); Py_INCREF(Py_None); @@ -471,6 +471,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs) } EXC_IF_CURS_CLOSED(self); + EXC_IF_ASYNC_IN_PROGRESS(self, execute); if (_psyco_curs_execute(self, operation, vars, self->conn->async)) { Py_INCREF(Py_None); @@ -502,6 +503,7 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs) } EXC_IF_CURS_CLOSED(self); + EXC_IF_CURS_ASYNC(self, executemany); if (self->name != NULL) { psyco_set_error(ProgrammingError, (PyObject*)self, @@ -509,12 +511,6 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs) return NULL; } - if (self->conn->async == 1) { - psyco_set_error(ProgrammingError, (PyObject*)self, - "can't call .executemany() on async cursors", NULL, NULL); - return NULL; - } - if (!PyIter_Check(vars)) { vars = iter = PyObject_GetIter(vars); if (iter == NULL) return NULL; @@ -528,7 +524,7 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs) } else { if (self->rowcount == -1) - rowcount = -1; + rowcount = -1; else if (rowcount >= 0) rowcount += self->rowcount; Py_DECREF(v); @@ -649,20 +645,6 @@ _psyco_curs_prefetch(cursorObject *self) { int i = 0; - /* check if there is an asynchronous query in progess and block until data - is read from it */ - if (self->conn->async_cursor != NULL) { - /* first check if it's the right cursor */ - if (self->conn->async_cursor != (PyObject*)self) { - psyco_set_error(ProgrammingError, (PyObject*)self, - "asynchronous fetch by wrong cursor", NULL, NULL); - return -2; - } - /* now get the result */ - Dprintf("_psyco_curs_prefetch: blocking until all data is read"); - curs_get_last_result(self); - } - if (self->pgres == NULL || self->needsfetch) { self->needsfetch = 0; Dprintf("_psyco_curs_prefetch: trying to fetch data"); @@ -759,7 +741,8 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args) if (args && !PyArg_ParseTuple(args, "")) return NULL; - EXC_IF_CURS_CLOSED(self) + EXC_IF_CURS_CLOSED(self); + EXC_IF_ASYNC_IN_PROGRESS(self, fetchone); if (_psyco_curs_prefetch(self) < 0) return NULL; EXC_IF_NO_TUPLES(self); @@ -820,6 +803,7 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords) } EXC_IF_CURS_CLOSED(self); + EXC_IF_ASYNC_IN_PROGRESS(self, fetchmany); if (_psyco_curs_prefetch(self) < 0) return NULL; EXC_IF_NO_TUPLES(self); @@ -892,6 +876,7 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args) } EXC_IF_CURS_CLOSED(self); + EXC_IF_ASYNC_IN_PROGRESS(self, fetchall); if (_psyco_curs_prefetch(self) < 0) return NULL; EXC_IF_NO_TUPLES(self); @@ -959,6 +944,7 @@ psyco_curs_callproc(cursorObject *self, PyObject *args, PyObject *kwargs) { return NULL; } EXC_IF_CURS_CLOSED(self); + EXC_IF_ASYNC_IN_PROGRESS(self, callproc); if (self->name != NULL) { psyco_set_error(ProgrammingError, (PyObject*)self, @@ -1223,6 +1209,7 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs) return NULL; EXC_IF_CURS_CLOSED(self); + EXC_IF_CURS_ASYNC(self, copy_from); quoted_delimiter = psycopg_escape_string((PyObject*)self->conn, sep, 0, NULL, NULL); if (quoted_delimiter == NULL) { @@ -1327,6 +1314,7 @@ psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs) return NULL; EXC_IF_CURS_CLOSED(self); + EXC_IF_CURS_ASYNC(self, copy_to); quoted_delimiter = psycopg_escape_string((PyObject*)self->conn, sep, 0, NULL, NULL); if (quoted_delimiter == NULL) { PyErr_NoMemory(); @@ -1410,6 +1398,7 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs) { return NULL; } EXC_IF_CURS_CLOSED(self); + EXC_IF_CURS_ASYNC(self, copy_expert); sql = _psyco_curs_validate_sql_basic(self, sql); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index b987164..f3f3fc8 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -398,7 +398,6 @@ pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error) return 0; } - pq_clear_async(conn); result = pq_execute_command_locked(conn, query[conn->isolation_level], pgres, error); if (result == 0) @@ -432,7 +431,6 @@ pq_commit(connectionObject *conn) pthread_mutex_lock(&conn->lock); conn->mark += 1; - pq_clear_async(conn); retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error); pthread_mutex_unlock(&conn->lock); @@ -464,7 +462,6 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error) } conn->mark += 1; - pq_clear_async(conn); retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error); if (retvalue == 0) conn->status = CONN_STATUS_READY; @@ -526,7 +523,6 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error) conn->pgconn, conn->isolation_level, conn->status); conn->mark += 1; - pq_clear_async(conn); if (conn->isolation_level > 0 && conn->status == CONN_STATUS_BEGIN) { retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error); @@ -682,10 +678,6 @@ pq_execute(cursorObject *curs, const char *query, int async) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(curs->conn->lock)); - /* FIXME: we should first try to cancel the query, otherwise we will block - until it completes AND until we get the result back */ - pq_clear_async(curs->conn); - if (pq_begin_locked(curs->conn, &pgres, &error) < 0) { pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; |
