diff options
| -rw-r--r-- | ChangeLog | 7 | ||||
| -rw-r--r-- | psycopg/connection.h | 9 | ||||
| -rw-r--r-- | psycopg/connection_int.c | 56 | ||||
| -rw-r--r-- | psycopg/connection_type.c | 6 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 14 |
5 files changed, 85 insertions, 7 deletions
@@ -1,3 +1,10 @@ +2009-04-04 Federico Di Gregorio <fog@initd.org> + + * connection_int.c(conn_notice_callback): removed all Python + calls because conn_notice_callback() can be called without a lock + on the GIL. Moved processing and cleanup of notices into their + own functions that are called while holding the GIL. + 2009-04-01 Federico Di Gregorio <fog@initd.org> * Applied patch from Menno Smits to fix failures in test_dates diff --git a/psycopg/connection.h b/psycopg/connection.h index 0b969bc..28fef57 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -43,6 +43,11 @@ extern "C" { extern HIDDEN PyTypeObject connectionType; +struct connectionObject_notice { + struct connectionObject_notice *next; + const char *message; +}; + typedef struct { PyObject_HEAD @@ -66,6 +71,7 @@ typedef struct { /* notice processing */ PyObject *notice_list; PyObject *notice_filter; + struct connectionObject_notice *notice_pending; /* notifies */ PyObject *notifies; @@ -75,10 +81,11 @@ typedef struct { PyObject *binary_types; /* a set of typecasters for binary types */ int equote; /* use E''-style quotes for escaped strings */ - } connectionObject; /* C-callable functions in connection_int.c and connection_ext.c */ +HIDDEN void conn_notice_process(connectionObject *self); +HIDDEN void conn_notice_clean(connectionObject *self); HIDDEN int conn_connect(connectionObject *self); HIDDEN void conn_close(connectionObject *self); HIDDEN int conn_commit(connectionObject *self); diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index a2eda87..0ea2c23 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -39,14 +39,39 @@ conn_notice_callback(void *args, const char *message) Dprintf("conn_notice_callback: %s", message); - /* unfortunately the old protocl return COPY FROM errors only as notices, + /* unfortunately the old protocol return COPY FROM errors only as notices, so we need to filter them looking for such errors (but we do it - only if the protocol if <3, else we don't need that */ + only if the protocol if <3, else we don't need that) + + NOTE: if we get here and the connection is unlocked then there is a + problem but this should happen because the notice callback is only + called from libpq and when we're inside libpq the connection is usually + locked. + */ if (self->protocol < 3 && strncmp(message, "ERROR", 5) == 0) pq_set_critical(self, message); else { - PyObject *msg = PyString_FromString(message); + struct connectionObject_notice *notice = + (struct connectionObject_notice *) + PyMem_Malloc(sizeof(struct connectionObject_notice)); + notice->message = strdup(message); + notice->next = self->notice_pending; + self->notice_pending = notice; + } +} + +void +conn_notice_process(connectionObject *self) +{ + pthread_mutex_lock(&self->lock); + + struct connectionObject_notice *notice = self->notice_pending; + + while (notice != NULL) { + PyObject *msg = PyString_FromString(notice->message); + + Dprintf("conn_notice_process: %s", notice->message); PyList_Append(self->notice_list, msg); Py_DECREF(msg); @@ -54,7 +79,32 @@ conn_notice_callback(void *args, const char *message) /* Remove the oldest item if the queue is getting too long. */ if (PyList_GET_SIZE(self->notice_list) > CONN_NOTICES_LIMIT) PySequence_DelItem(self->notice_list, 0); + + notice = notice->next; + } + + pthread_mutex_unlock(&self->lock); + + conn_notice_clean(self); +} + +void +conn_notice_clean(connectionObject *self) +{ + pthread_mutex_lock(&self->lock); + + struct connectionObject_notice *tmp, *notice = self->notice_pending; + + while (notice != NULL) { + tmp = notice; + notice = notice->next; + free(tmp->message); + PyMem_Free(tmp); } + + self->notice_pending = NULL; + + pthread_mutex_unlock(&self->lock); } /* conn_connect - execute a connection to the database */ diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 8b4213c..364a13a 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -430,6 +430,7 @@ connection_setup(connectionObject *self, const char *dsn) self->mark = 0; self->string_types = PyDict_New(); self->binary_types = PyDict_New(); + self->notice_pending = NULL; pthread_mutex_init(&(self->lock), NULL); @@ -461,14 +462,17 @@ connection_dealloc(PyObject* obj) connectionObject *self = (connectionObject *)obj; if (self->closed == 0) conn_close(self); + + conn_notice_clean(self); if (self->dsn) free(self->dsn); if (self->encoding) free(self->encoding); if (self->critical) free(self->critical); + Py_CLEAR(self->async_cursor); Py_CLEAR(self->notice_list); + Py_CLEAR(self->notice_filter); Py_CLEAR(self->notifies); - Py_CLEAR(self->async_cursor); Py_CLEAR(self->string_types); Py_CLEAR(self->binary_types); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 8b6633b..ea48e7e 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -20,7 +20,7 @@ */ /* IMPORTANT NOTE: no function in this file do its own connection locking - except for pg_execute and pq_fetch (that are somehow high-level. This means + except for pg_execute and pq_fetch (that are somehow high-level). This means that all the othe functions should be called while holding a lock to the connection. */ @@ -331,7 +331,7 @@ pq_execute_command_locked(connectionObject *conn, const char *query, retvalue = 0; IFCLEARPGRES(*pgres); - + cleanup: return retvalue; } @@ -427,6 +427,8 @@ pq_commit(connectionObject *conn) pthread_mutex_unlock(&conn->lock); Py_END_ALLOW_THREADS; + + conn_notice_process(conn); if (retvalue < 0) pq_complete_error(conn, &pgres, &error); @@ -487,6 +489,8 @@ pq_abort(connectionObject *conn) pthread_mutex_unlock(&conn->lock); Py_END_ALLOW_THREADS; + + conn_notice_process(conn); if (retvalue < 0) pq_complete_error(conn, &pgres, &error); @@ -543,6 +547,8 @@ pq_is_busy(connectionObject *conn) pthread_mutex_unlock(&(conn->lock)); Py_END_ALLOW_THREADS; + + conn_notice_process(conn); return res; } @@ -622,6 +628,8 @@ pq_execute(cursorObject *curs, const char *query, int async) pthread_mutex_unlock(&(curs->conn->lock)); Py_END_ALLOW_THREADS; + + conn_notice_process(curs->conn); /* if the execute was sync, we call pq_fetch() immediately, to respect the old DBAPI-2.0 compatible behaviour */ @@ -1150,6 +1158,8 @@ pq_fetch(cursorObject *curs) Dprintf("pq_fetch: fetching done; check for critical errors"); + conn_notice_process(curs->conn); + /* error checking, close the connection if necessary (some critical errors are not really critical, like a COPY FROM error: if that's the case we raise the exception but we avoid to close the connection) */ |
