summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog26
-rw-r--r--NEWS11
-rw-r--r--psycopg/connection.h2
-rw-r--r--psycopg/connection_int.c32
-rw-r--r--psycopg/connection_type.c9
-rw-r--r--psycopg/cursor.h7
-rw-r--r--psycopg/cursor_type.c55
-rw-r--r--psycopg/pqpath.c43
8 files changed, 111 insertions, 74 deletions
diff --git a/ChangeLog b/ChangeLog
index 5f4357a..db34be4 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,27 @@
+2004-12-10 Federico Di Gregorio <fog@debian.org>
+
+ * psycopg/cursor_type.c: now *all* write or async accesses to the
+ connection object are arbitrated using the connection lock.
+
+ * psycopg/cursor_type.c (psyco_curs_isready): now we reset the
+ current async cursor if it is ready, to allow other cursors to
+ .execute() without raising the "transaction in progress" error.
+
+ * psycopg/pqpath.c (pq_is_busy): gained status of high-level
+ function with its own blocking and locking.
+
+ * psycopg/cursor.h (EXC_IF_CURS_CLOSED): also checks the
+ connection (a closed connection implies a closed cursor.)
+
+ * psycopg/cursor_type.c: cursor's connection is correctly
+ INCREFfed and DECREFfed.
+
+ * psycopg/connection_type.c: removed the cursors list from the
+ connection object. It is not necessary anymore for the connection
+ to know about the cursors and the reference counting will keep the
+ connection alive (but possibly closed) until all cursors are
+ garbage collected.
+
2004-11-20 Federico Di Gregorio <fog@initd.org>
* psycopg/cursor_type.c (_mogrify): ported %% fix from 1.1.15.
@@ -19,7 +43,7 @@
tuples are filled using PyTuple_SET_ITEM while extended types
(created via row_factory) are filled using PySequence_SetItem.
- * psycopg/cursor_type.c: change cursor attribute name from
+ * psycopg/cursor_type.c: changed cursor attribute name from
tuple_factory to row_factory.
2004-10-14 Federico Di Gregorio <fog@debian.org>
diff --git a/NEWS b/NEWS
index e2415fc..ad56051 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,14 @@
+What's new in psycopg 1.99.11
+-----------------------------
+
+* 'cursor' argument in .cursor() connection method renamed to
+ 'cursor_factory'.
+
+* changed 'tuple_factory' cursor attribute name to 'row_factory'.
+
+* the .cursor attribute is gone and connections and cursors are propely
+ gc-managed.
+
What's new in psycopg 1.99.10
-----------------------------
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 56193bd..c973b92 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -40,8 +40,6 @@ extern PyTypeObject connectionType;
typedef struct {
PyObject HEAD;
- PyObject *cursors; /* all cursors derived from this connection */
-
pthread_mutex_t lock; /* the global connection lock */
char *dsn; /* data source name */
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 8e72ff2..270ed6c 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -131,9 +131,6 @@ conn_connect(connectionObject *self)
void
conn_close(connectionObject *self)
{
- int len, i;
- PyObject *t = NULL;
-
/* sets this connection as closed even for other threads; also note that
we need to check the value of pgconn, because we get called even when
the connection fails! */
@@ -143,34 +140,17 @@ conn_close(connectionObject *self)
self->closed = 1;
/* execute a forced rollback on the connection (but don't check the
- result, we're going to close the pq connection anyway */
- if (self->pgconn) pq_abort(self);
-
- /* orphans all the children cursors but do NOT destroy them (note that we
- need to lock the connection before orphaning a cursor: we don't want to
- remove a connection from a cursor executing a DB operation */
- pthread_mutex_unlock(&self->lock);
- Py_END_ALLOW_THREADS;
-
- pthread_mutex_lock(&self->lock);
- len = PyList_Size(self->cursors);
- Dprintf("conn_close: ophaning %d cursors", len);
- for (i = len-1; i >= 0; i--) {
- t = PySequence_GetItem(self->cursors, i);
- Dprintf("conn close: cursor at %p: refcnt = %d", t, t->ob_refcnt);
- PySequence_DelItem(self->cursors, i);
- ((cursorObject *)t)->conn = NULL; /* orphaned */
- Dprintf("conn_close: -> new refcnt = %d", t->ob_refcnt);
- }
- pthread_mutex_unlock(&self->lock);
-
- /* now that all cursors have been orphaned (they can't operate on the
- database anymore) we can shut down the connection */
+ result, we're going to close the pq connection anyway */
if (self->pgconn) {
+ pq_abort(self);
PQfinish(self->pgconn);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
}
+
+ pthread_mutex_unlock(&self->lock);
+ Py_END_ALLOW_THREADS;
+
}
/* conn_commit - commit on a connection */
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 1d544f6..d1bb978 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -65,11 +65,6 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
/* TODO: added error checking on obj (cursor) here */
- /* add the cursor to this connection's list (and decref it, so that it has
- the right number of references to go away even if still in the list) */
- PyList_Append(self->cursors, obj);
- Py_DECREF(obj);
-
Dprintf("psyco_conn_cursor: new cursor at %p: refcnt = %d",
obj, obj->ob_refcnt);
return obj;
@@ -244,7 +239,6 @@ static struct PyMemberDef connectionObject_members[] = {
{"isolation_level", T_LONG,
offsetof(connectionObject, isolation_level), RO},
{"encoding", T_STRING, offsetof(connectionObject, encoding), RO},
- {"cursors", T_OBJECT, offsetof(connectionObject, cursors), RO},
{"notices", T_OBJECT, offsetof(connectionObject, notice_list), RO},
{"notifies", T_OBJECT, offsetof(connectionObject, notifies), RO},
{"dsn", T_STRING, offsetof(connectionObject, dsn), RO},
@@ -261,7 +255,6 @@ connection_setup(connectionObject *self, char *dsn)
self, ((PyObject *)self)->ob_refcnt);
self->dsn = strdup(dsn);
- self->cursors = PyList_New(0);
self->notice_list = PyList_New(0);
self->closed = 0;
self->isolation_level = 1;
@@ -273,7 +266,6 @@ connection_setup(connectionObject *self, char *dsn)
pthread_mutex_init(&(self->lock), NULL);
if (conn_connect(self) != 0) {
- Py_XDECREF(self->cursors);
pthread_mutex_destroy(&(self->lock));
Dprintf("connection_init: FAILED");
return -1;
@@ -291,7 +283,6 @@ connection_dealloc(PyObject* obj)
if (self->closed == 0) conn_close(self);
- Py_XDECREF(self->cursors);
if (self->dsn) free(self->dsn);
if (self->encoding) free(self->encoding);
if (self->critical) free(self->critical);
diff --git a/psycopg/cursor.h b/psycopg/cursor.h
index 2c2983e..a1d6ac3 100644
--- a/psycopg/cursor.h
+++ b/psycopg/cursor.h
@@ -72,11 +72,12 @@ typedef struct {
extern void curs_reset(cursorObject *self);
/* exception-raising macros */
-#define EXC_IF_CURS_CLOSED(self) if ((self)->closed) { \
- PyErr_SetString(InterfaceError, "cursor already closed"); \
+#define EXC_IF_CURS_CLOSED(self) \
+if ((self)->closed || ((self)->conn && (self)->conn->closed)) { \
+ PyErr_SetString(InterfaceError, "cursor already closed"); \
return NULL; }
-#define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \
+#define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \
PyErr_SetString(ProgrammingError, "no results to fetch"); \
return NULL; }
diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c
index 5482f66..1bb1894 100644
--- a/psycopg/cursor_type.c
+++ b/psycopg/cursor_type.c
@@ -249,12 +249,15 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
return NULL;
}
+ pthread_mutex_lock(&(self->conn->lock));
if (self->conn->async_cursor != NULL
&& self->conn->async_cursor != (PyObject*)self) {
+ pthread_mutex_unlock(&(self->conn->lock));
PyErr_SetString(ProgrammingError,
"asynchronous query already in execution");
return NULL;
}
+ pthread_mutex_unlock(&(self->conn->lock));
if (PyUnicode_Check(operation)) {
PyObject *enc = PyDict_GetItemString(psycoEncodings,
@@ -498,12 +501,15 @@ _psyco_curs_prefetch(cursorObject *self)
/* check if the fetching cursor is the one that did the asynchronous query
and raise an exception if not */
+ pthread_mutex_lock(&(self->conn->lock));
if (self->conn->async_cursor != NULL
&& self->conn->async_cursor != (PyObject*)self) {
+ pthread_mutex_unlock(&(self->conn->lock));
PyErr_SetString(ProgrammingError,
"asynchronous fetch by wrong cursor");
return -2;
}
+ pthread_mutex_unlock(&(self->conn->lock));
if (self->pgres == NULL) {
Dprintf("_psyco_curs_prefetch: trying to fetch data");
@@ -622,10 +628,9 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
/* if the query was async aggresively free pgres, to allow
successive requests to reallocate it */
if (self->row >= self->rowcount
-
&& self->conn->async_cursor == (PyObject*)self)
IFCLEARPGRES(self->pgres);
-
+
return res;
}
@@ -687,7 +692,8 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
/* if the query was async aggresively free pgres, to allow
successive requests to reallocate it */
- if (self->row >= self->rowcount && self->conn->async_cursor)
+ if (self->row >= self->rowcount
+ && self->conn->async_cursor == (PyObject*)self)
IFCLEARPGRES(self->pgres);
return list;
@@ -743,7 +749,8 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
/* if the query was async aggresively free pgres, to allow
successive requests to reallocate it */
- if (self->row >= self->rowcount && self->conn->async_cursor)
+ if (self->row >= self->rowcount
+ && self->conn->async_cursor == (PyObject*)self)
IFCLEARPGRES(self->pgres);
return list;
@@ -929,13 +936,21 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args)
static PyObject *
psyco_curs_fileno(cursorObject *self, PyObject *args)
{
+ long int socket;
+
if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
/* note how we call PQflush() to make sure the user will use
select() in the safe way! */
+ pthread_mutex_lock(&(self->conn->lock));
+ Py_BEGIN_ALLOW_THREADS;
PQflush(self->conn->pgconn);
- return PyInt_FromLong((long int)PQsocket(self->conn->pgconn));
+ socket = (long int)PQsocket(self->conn->pgconn);
+ Py_END_ALLOW_THREADS;
+ pthread_mutex_unlock(&(self->conn->lock));
+
+ return PyInt_FromLong(socket);
}
/* extension: isready - return true if data from async execute is ready */
@@ -949,11 +964,20 @@ psyco_curs_isready(cursorObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
+ /* pq_is_busy does its own locking, we don't need anything special but if
+ the cursor is ready we need to fetch the result and free the connection
+ for the next query. */
+
if (pq_is_busy(self->conn)) {
Py_INCREF(Py_False);
return Py_False;
}
else {
+ IFCLEARPGRES(self->pgres);
+ pthread_mutex_lock(&(self->conn->lock));
+ self->pgres = PQgetResult(self->conn->pgconn);
+ self->conn->async_cursor = NULL;
+ pthread_mutex_unlock(&(self->conn->lock));
Py_INCREF(Py_True);
return Py_True;
}
@@ -1061,6 +1085,8 @@ cursor_setup(cursorObject *self, connectionObject *conn)
self, ((PyObject *)self)->ob_refcnt);
self->conn = conn;
+ Py_INCREF((PyObject*)self->conn);
+
self->closed = 0;
self->pgres = NULL;
@@ -1092,25 +1118,10 @@ cursor_dealloc(PyObject* obj)
{
cursorObject *self = (cursorObject *)obj;
- /* if necessary remove cursor from connection */
- if (self->conn != NULL) {
- PyObject *t;
- int len, i;
-
- if ((len = PyList_Size(self->conn->cursors)) > 0) {
- for (i = 0; i < len; i++) {
- t = PyList_GET_ITEM(self->conn->cursors, i);
- if (self == (cursorObject *)t) {
- Dprintf("cursor_dealloc: found myself in cursor list");
- PySequence_DelItem(self->conn->cursors, i);
- break;
- }
- }
- }
- }
if (self->query) free(self->query);
-
+
+ Py_DECREF((PyObject*)self->conn);
Py_XDECREF(self->casts);
Py_XDECREF(self->description);
Py_XDECREF(self->pgstatus);
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 0f5beb1..e7bed66 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -19,6 +19,12 @@
* Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
+/* IMPORTANT NOTE: no function in this file do its own connection locking
+ except for pg_execute and pq_fetch (that are somehow high-level. This means
+ that all the othe functions should be called while holding a lock to the
+ connection.
+*/
+
#include <Python.h>
#include <string.h>
@@ -33,7 +39,9 @@
#include "psycopg/pgtypes.h"
#include "psycopg/pgversion.h"
-/* pq_raise - raise a python exception of the right kind */
+/* pq_raise - raise a python exception of the right kind
+
+ This function should be called while holding the GIL. */
void
pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg)
@@ -106,7 +114,8 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg)
critical condition like out of memory or lost connection. it save the error
message and mark the connection as 'wanting cleanup'.
- both functions do not call any Py_*_ALLOW_THREADS macros. */
+ both functions do not call any Py_*_ALLOW_THREADS macros.
+ pq_resolve_critical should be called while holding the GIL. */
void
pq_set_critical(connectionObject *conn, const char *msg)
@@ -140,6 +149,7 @@ pq_resolve_critical(connectionObject *conn, int close)
note that this function does block because it needs to wait for the full
result sets of the previous query to clear them.
+
this function does not call any Py_*_ALLOW_THREADS macros */
void
@@ -149,6 +159,7 @@ pq_clear_async(connectionObject *conn)
do {
pgres = PQgetResult(conn->pgconn);
+ Dprintf("pq_clear_async: clearing PGresult at %p", pgres);
IFCLEARPGRES(pgres);
} while (pgres != NULL);
}
@@ -291,7 +302,10 @@ pq_abort(connectionObject *conn)
a status of 1 means that a call to pq_fetch will block, while a status of 0
means that there is data available to be collected. -1 means an error, the
- exception will be set accordingly. */
+ exception will be set accordingly.
+
+ this fucntion locks the connection object
+ this function call Py_*_ALLOW_THREADS macros */
int
pq_is_busy(connectionObject *conn)
@@ -299,9 +313,15 @@ pq_is_busy(connectionObject *conn)
PGnotify *pgn;
Dprintf("pq_is_busy: consuming input");
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&(conn->lock));
+
if (PQconsumeInput(conn->pgconn) == 0) {
Dprintf("pq_is_busy: PQconsumeInput() failed");
PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
+ pthread_mutex_unlock(&(conn->lock));
+ Py_BLOCK_THREADS;
return -1;
}
@@ -315,11 +335,12 @@ pq_is_busy(connectionObject *conn)
notify = PyTuple_New(2);
PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
- pthread_mutex_lock(&(conn->lock));
PyList_Append(conn->notifies, notify);
- pthread_mutex_unlock(&(conn->lock));
free(pgn);
}
+
+ pthread_mutex_unlock(&(conn->lock));
+ Py_END_ALLOW_THREADS;
return PQisBusy(conn->pgconn);
}
@@ -656,14 +677,14 @@ pq_fetch(cursorObject *curs)
Dprintf("pq_fetch: no data: entering polling loop");
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&(curs->conn->lock));
-
while (pq_is_busy(curs->conn) > 0) {
fd_set rfds;
struct timeval tv;
int sval, sock;
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&(curs->conn->lock));
+
sock = PQsocket(curs->conn->pgconn);
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
@@ -677,14 +698,14 @@ pq_fetch(cursorObject *curs)
Dprintf("pq_fetch: entering PDflush() loop");
while (PQflush(curs->conn->pgconn) != 0);
sval = select(sock+1, &rfds, NULL, NULL, &tv);
+
+ pthread_mutex_unlock(&(curs->conn->lock));
+ Py_END_ALLOW_THREADS;
}
Dprintf("pq_fetch: data is probably ready");
IFCLEARPGRES(curs->pgres);
curs->pgres = PQgetResult(curs->conn->pgconn);
-
- pthread_mutex_unlock(&(curs->conn->lock));
- Py_END_ALLOW_THREADS;
}
/* check for PGRES_FATAL_ERROR result */