diff options
| -rw-r--r-- | lib/extensions.py | 5 | ||||
| -rw-r--r-- | psycopg/connection.h | 37 | ||||
| -rw-r--r-- | psycopg/connection_int.c | 382 | ||||
| -rw-r--r-- | psycopg/connection_type.c | 207 | ||||
| -rw-r--r-- | psycopg/psycopgmodule.c | 13 |
5 files changed, 572 insertions, 72 deletions
diff --git a/lib/extensions.py b/lib/extensions.py index adc729e..64ea7cb 100644 --- a/lib/extensions.py +++ b/lib/extensions.py @@ -76,6 +76,11 @@ STATUS_ASYNC = 4 # This is a usefull mnemonic to check if the connection is in a transaction STATUS_IN_TRANSACTION = STATUS_BEGIN +"""psycopg async connection polling values""" +POLL_READ = 1 +POLL_WRITE = 2 +POLL_OK = 3 + """Backend transaction status values.""" TRANSACTION_STATUS_IDLE = 0 TRANSACTION_STATUS_ACTIVE = 1 diff --git a/psycopg/connection.h b/psycopg/connection.h index 82bfab8..d3f4048 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -41,10 +41,33 @@ extern "C" { #define CONN_STATUS_BEGIN 2 #define CONN_STATUS_SYNC 3 #define CONN_STATUS_ASYNC 4 +/* async connection building statuses */ +#define CONN_STATUS_SEND_DATESTYLE 5 +#define CONN_STATUS_SENT_DATESTYLE 6 +#define CONN_STATUS_GET_DATESTYLE 7 +#define CONN_STATUS_SEND_CLIENT_ENCODING 8 +#define CONN_STATUS_SENT_CLIENT_ENCODING 9 +#define CONN_STATUS_GET_CLIENT_ENCODING 10 +#define CONN_STATUS_SEND_TRANSACTION_ISOLATION 11 +#define CONN_STATUS_SENT_TRANSACTION_ISOLATION 12 +#define CONN_STATUS_GET_TRANSACTION_ISOLATION 13 + +/* polling result, try to keep in sync with PostgresPollingStatusType from + libpq-fe.h */ +#define PSYCO_POLL_READ 1 +#define PSYCO_POLL_WRITE 2 +#define PSYCO_POLL_OK 3 /* Hard limit on the notices stored by the Python connection */ #define CONN_NOTICES_LIMIT 50 +/* we need the initial date style to be ISO, for typecasters; if the user + later change it, she must know what she's doing... these are the queries we + need to issue */ +#define psyco_datestyle "SET DATESTYLE TO 'ISO'" +#define psyco_client_encoding "SHOW client_encoding" +#define psyco_transaction_isolation "SHOW default_transaction_isolation" + extern HIDDEN PyTypeObject connectionType; struct connectionObject_notice { @@ -66,12 +89,14 @@ typedef struct { long int isolation_level; /* isolation level for this connection */ long int mark; /* number of commits/rollbacks done so far */ int status; /* status of the connection */ + + long int async; /* 1 means the connection is async */ int protocol; /* protocol version */ int server_version; /* server version */ - PGconn *pgconn; /* the postgresql connection */ + PGconn *pgconn; /* the postgresql connection */ - PyObject *async_cursor; + PyObject *async_cursor; /* a cursor executing an asynchronous query */ /* notice processing */ PyObject *notice_list; @@ -89,15 +114,21 @@ typedef struct { } connectionObject; /* C-callable functions in connection_int.c and connection_ext.c */ +HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn); +HIDDEN char *conn_get_encoding(PGresult *pgres); +HIDDEN int conn_get_isolation_level(PGresult *pgres); +HIDDEN int conn_get_protocol_version(PGconn *pgconn); HIDDEN void conn_notice_process(connectionObject *self); HIDDEN void conn_notice_clean(connectionObject *self); HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn); -HIDDEN int conn_connect(connectionObject *self); +HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN void conn_close(connectionObject *self); HIDDEN int conn_commit(connectionObject *self); HIDDEN int conn_rollback(connectionObject *self); HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); +HIDDEN PyObject *conn_poll_send(connectionObject *self); +HIDDEN PyObject *conn_poll_fetch(connectionObject *self); /* exception-raising macros */ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8c619ec..4646eea 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -123,35 +123,16 @@ conn_notice_clean(connectionObject *self) Py_END_ALLOW_THREADS; } -/* conn_setup - setup and read basic information about the connection */ +/* + * the conn_get_* family of functions makes it easier to obtain the connection + * parameters from query results or by interrogating the connection itself +*/ int -conn_setup(connectionObject *self, PGconn *pgconn) +conn_get_standard_conforming_strings(PGconn *pgconn) { - PGresult *pgres; - const char *data, *tmp; - const char *scs; /* standard-conforming strings */ - size_t i; - - /* we need the initial date style to be ISO, for typecasters; if the user - later change it, she must know what she's doing... */ - static const char datestyle[] = "SET DATESTYLE TO 'ISO'"; - static const char encoding[] = "SHOW client_encoding"; - static const char isolevel[] = "SHOW default_transaction_isolation"; - - static const char lvl1a[] = "read uncommitted"; - static const char lvl1b[] = "read committed"; - static const char lvl2a[] = "repeatable read"; - static const char lvl2b[] = "serializable"; - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&self->lock); - Py_BLOCK_THREADS; - - if (self->encoding) free(self->encoding); - self->equote = 0; - self->isolation_level = 0; - + int equote; + const char *scs; /* * The presence of the 'standard_conforming_strings' parameter * means that the server _accepts_ the E'' quote. @@ -173,15 +154,82 @@ conn_setup(connectionObject *self, PGconn *pgconn) scs ? scs : "unavailable"); #ifndef PSYCOPG_OWN_QUOTING - self->equote = (scs && (0 == strcmp("off", scs))); + equote = (scs && (0 == strcmp("off", scs))); #else - self->equote = (scs != NULL); + equote = (scs != NULL); #endif Dprintf("conn_connect: server requires E'' quotes: %s", - self->equote ? "YES" : "NO"); + equote ? "YES" : "NO"); + + return equote; +} + +char * +conn_get_encoding(PGresult *pgres) +{ + char *tmp, *encoding; + size_t i; + + tmp = PQgetvalue(pgres, 0, 0); + encoding = malloc(strlen(tmp)+1); + if (encoding == NULL) { + PyErr_NoMemory(); + IFCLEARPGRES(pgres); + return NULL; + } + for (i=0 ; i < strlen(tmp) ; i++) + encoding[i] = toupper(tmp[i]); + encoding[i] = '\0'; + CLEARPGRES(pgres); + + return encoding; +} + +int +conn_get_isolation_level(PGresult *pgres) +{ + static const char lvl1a[] = "read uncommitted"; + static const char lvl1b[] = "read committed"; + char *isolation_level = PQgetvalue(pgres, 0, 0); + + CLEARPGRES(pgres); + + if ((strncmp(lvl1a, isolation_level, strlen(isolation_level)) == 0) + || (strncmp(lvl1b, isolation_level, strlen(isolation_level)) == 0)) + return 1; + else /* if it's not one of the lower ones, it's SERIALIZABLE */ + return 2; +} + +int +conn_get_protocol_version(PGconn *pgconn) +{ +#ifdef HAVE_PQPROTOCOL3 + return PQprotocolVersion(pgconn); +#else + return 2; +#endif +} + +/* conn_setup - setup and read basic information about the connection */ + +int +conn_setup(connectionObject *self, PGconn *pgconn) +{ + PGresult *pgres; + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); + Py_BLOCK_THREADS; + + if (self->encoding) free(self->encoding); + self->equote = 0; + self->isolation_level = 0; + + self->equote = conn_get_standard_conforming_strings(pgconn); Py_UNBLOCK_THREADS; - pgres = PQexec(pgconn, datestyle); + pgres = PQexec(pgconn, psyco_datestyle); Py_BLOCK_THREADS; if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { @@ -196,7 +244,7 @@ conn_setup(connectionObject *self, PGconn *pgconn) CLEARPGRES(pgres); Py_UNBLOCK_THREADS; - pgres = PQexec(pgconn, encoding); + pgres = PQexec(pgconn, psyco_client_encoding); Py_BLOCK_THREADS; if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) { @@ -208,24 +256,19 @@ conn_setup(connectionObject *self, PGconn *pgconn) Py_BLOCK_THREADS; return -1; } - tmp = PQgetvalue(pgres, 0, 0); - self->encoding = malloc(strlen(tmp)+1); + + /* conn_get_encoding returns a malloc'd string */ + self->encoding = conn_get_encoding(pgres); if (self->encoding == NULL) { - PyErr_NoMemory(); PQfinish(pgconn); - IFCLEARPGRES(pgres); Py_UNBLOCK_THREADS; pthread_mutex_unlock(&self->lock); Py_BLOCK_THREADS; return -1; } - for (i=0 ; i < strlen(tmp) ; i++) - self->encoding[i] = toupper(tmp[i]); - self->encoding[i] = '\0'; - CLEARPGRES(pgres); Py_UNBLOCK_THREADS; - pgres = PQexec(pgconn, isolevel); + pgres = PQexec(pgconn, psyco_transaction_isolation); Py_BLOCK_THREADS; if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) { @@ -238,16 +281,7 @@ conn_setup(connectionObject *self, PGconn *pgconn) Py_BLOCK_THREADS; return -1; } - data = PQgetvalue(pgres, 0, 0); - if ((strncmp(lvl1a, data, strlen(lvl1a)) == 0) - || (strncmp(lvl1b, data, strlen(lvl1b)) == 0)) - self->isolation_level = 1; - else if ((strncmp(lvl2a, data, strlen(lvl2a)) == 0) - || (strncmp(lvl2b, data, strlen(lvl2b)) == 0)) - self->isolation_level = 2; - else - self->isolation_level = 2; - CLEARPGRES(pgres); + self->isolation_level = conn_get_isolation_level(pgres); Py_UNBLOCK_THREADS; pthread_mutex_unlock(&self->lock); @@ -259,7 +293,7 @@ conn_setup(connectionObject *self, PGconn *pgconn) /* conn_connect - execute a connection to the database */ int -conn_connect(connectionObject *self) +conn_sync_connect(connectionObject *self) { PGconn *pgconn; @@ -295,11 +329,7 @@ conn_connect(connectionObject *self) return -1; } -#ifdef HAVE_PQPROTOCOL3 - self->protocol = PQprotocolVersion(pgconn); -#else - self->protocol = 2; -#endif + self->protocol = conn_get_protocol_version(pgconn); Dprintf("conn_connect: using protocol %d", self->protocol); self->server_version = (int)PQserverVersion(pgconn); @@ -308,6 +338,246 @@ conn_connect(connectionObject *self) return 0; } +static int +conn_async_connect(connectionObject *self) +{ + PGconn *pgconn; + + pgconn = PQconnectStart(self->dsn); + + Dprintf("conn_connect: new postgresql connection at %p", pgconn); + + if (pgconn == NULL) + { + Dprintf("conn_connect: PQconnectStart(%s) FAILED", self->dsn); + PyErr_SetString(OperationalError, "PQconnectStart() failed"); + return -1; + } + else if (PQstatus(pgconn) == CONNECTION_BAD) + { + Dprintf("conn_connect: PQconnectdb(%s) returned BAD", self->dsn); + PyErr_SetString(OperationalError, PQerrorMessage(pgconn)); + PQfinish(pgconn); + return -1; + } + + PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self); + + self->status = CONN_STATUS_ASYNC; + self->pgconn = pgconn; + + return 0; +} + +int +conn_connect(connectionObject *self, long int async) +{ + if (async == 1) { + Dprintf("con_connect: connecting in ASYNC mode"); + return conn_async_connect(self); + } + else { + Dprintf("con_connect: connecting in SYNC mode"); + return conn_sync_connect(self); + } +} + +/* conn_poll_send - handle connection polling when flushing output */ + +PyObject * +conn_poll_send(connectionObject *self) +{ + const char *query; + int next_status; + int ret; + + Dprintf("conn_poll_send: status %d", self->status); + + switch (self->status) { + case CONN_STATUS_SEND_DATESTYLE: + /* set the datestyle */ + query = psyco_datestyle; + next_status = CONN_STATUS_SENT_DATESTYLE; + break; + case CONN_STATUS_SEND_CLIENT_ENCODING: + /* get the client_encoding */ + query = psyco_client_encoding; + next_status = CONN_STATUS_SENT_CLIENT_ENCODING; + break; + case CONN_STATUS_SEND_TRANSACTION_ISOLATION: + /* get the default isolevel */ + query = psyco_transaction_isolation; + next_status = CONN_STATUS_SENT_TRANSACTION_ISOLATION; + break; + default: + /* unexpected state, error out */ + PyErr_Format(OperationalError, + "unexpected state: %d", self->status); + return NULL; + } + + Dprintf("conn_poll_send: sending query %-.200s", query); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(self->lock)); + + if (PQsendQuery(self->pgconn, query) != 1) { + pthread_mutex_unlock(&(self->lock)); + Py_BLOCK_THREADS; + PyErr_SetString(OperationalError, + PQerrorMessage(self->pgconn)); + return NULL; + } + + if (PQflush(self->pgconn) == 0) { + /* the query got fully sent to the server */ + Dprintf("conn_poll_send: query got flushed immediately"); + /* the return value will be POLL_READ */ + ret = PSYCO_POLL_READ; + + /* advance the next status, since we skip over the "waiting for the + query to be sent" status */ + switch (next_status) { + case CONN_STATUS_SENT_DATESTYLE: + next_status = CONN_STATUS_GET_DATESTYLE; + break; + case CONN_STATUS_SENT_CLIENT_ENCODING: + next_status = CONN_STATUS_GET_CLIENT_ENCODING; + break; + case CONN_STATUS_SENT_TRANSACTION_ISOLATION: + next_status = CONN_STATUS_GET_TRANSACTION_ISOLATION; + break; + } + } + else { + /* query did not get sent completely, tell the client to wait for the + socket to become writable */ + ret = PSYCO_POLL_WRITE; + } + + self->status = next_status; + Dprintf("conn_poll_send: next status is %d, returning %d", + self->status, ret); + + pthread_mutex_unlock(&(self->lock)); + Py_END_ALLOW_THREADS; + + return PyInt_FromLong(ret); +} + +/* curs_poll_fetch - handle connection polling when reading result */ + +PyObject * +conn_poll_fetch(connectionObject *self) +{ + PGresult *pgres; + int is_busy; + int next_status; + int ret; + + Dprintf("conn_poll_fetch: status %d", self->status); + + /* consume the input */ + is_busy = pq_is_busy(self); + if (is_busy == -1) { + /* there was an error, raise the exception */ + return NULL; + } + else if (is_busy == 1) { + /* the connection is busy, tell the user to wait more */ + Dprintf("conn_poll_fetch: connection busy, returning %d", + PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(self->lock)); + + /* connection no longer busy, process input */ + pgres = PQgetResult(self->pgconn); + + /* do the rest while holding the GIL, we won't be calling into any + blocking API */ + pthread_mutex_unlock(&(self->lock)); + Py_END_ALLOW_THREADS; + + Dprintf("conn_poll_fetch: got result %p", pgres); + + /* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */ + if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK && + PQresultStatus(pgres) != PGRES_TUPLES_OK)) { + PyErr_SetString(OperationalError, "can't issue " + "initial connection queries"); + PQfinish(self->pgconn); + IFCLEARPGRES(pgres); + return NULL; + } + + if (self->status == CONN_STATUS_GET_DATESTYLE) { + /* got the result from SET DATESTYLE*/ + Dprintf("conn_poll_fetch: datestyle set"); + next_status = CONN_STATUS_SEND_CLIENT_ENCODING; + } + else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) { + /* got the client_encoding */ + self->encoding = conn_get_encoding(pgres); + if (self->encoding == NULL) { + PQfinish(self->pgconn); + return NULL; + } + Dprintf("conn_poll_fetch: got client_encoding %s", self->encoding); + next_status = CONN_STATUS_SEND_TRANSACTION_ISOLATION; + } + else if (self->status == CONN_STATUS_GET_TRANSACTION_ISOLATION) { + /* got the default isolevel */ + self->isolation_level = conn_get_isolation_level(pgres); + Dprintf("conn_poll_fetch: got isolevel %ld", self->isolation_level); + + /* since this is the last step, set the other instance variables now */ + self->equote = conn_get_standard_conforming_strings(self->pgconn); + self->protocol = conn_get_protocol_version(self->pgconn); + self->server_version = (int) PQserverVersion(self->pgconn); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(self->lock)); + + /* set the connection to nonblocking */ + if (PQsetnonblocking(self->pgconn, 1) != 0) { + Dprintf("conn_async_connect: PQsetnonblocking() FAILED"); + Py_BLOCK_THREADS; + PyErr_SetString(OperationalError, "PQsetnonblocking() failed"); + PQfinish(self->pgconn); + return NULL; + } + + pthread_mutex_unlock(&(self->lock)); + Py_END_ALLOW_THREADS; + + /* next status is going to READY */ + next_status = CONN_STATUS_READY; + } + else { + /* unexpected state, error out */ + PyErr_Format(OperationalError, + "unexpected state: %d", self->status); + return NULL; + } + + /* clear any leftover result, there should be none, but the protocol + requires calling PQgetResult until you get a NULL */ + pq_clear_async(self); + + self->status = next_status; + + /* if the curent status is READY it means we got the result of the + last initialization query, so we return POLL_OK, otherwise we need to + send another query, so return POLL_WRITE */ + ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE; + Dprintf("conn_poll_fetch: next status is %d, returning %d", + self->status, ret); + return PyInt_FromLong(ret); +} + /* conn_close - do anything needed to shut down the connection */ void diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 48858d3..3d5629d 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -37,6 +37,7 @@ #include "psycopg/psycopg.h" #include "psycopg/connection.h" #include "psycopg/cursor.h" +#include "psycopg/pqpath.h" #include "psycopg/lobject.h" /** DBAPI methods **/ @@ -66,6 +67,20 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds) EXC_IF_CONN_CLOSED(self); + if (self->status != CONN_STATUS_READY && + self->status != CONN_STATUS_BEGIN) { + PyErr_SetString(OperationalError, + "asynchronous connection attempt underway"); + return NULL; + } + + if (name != NULL && self->async) { + PyErr_SetString(OperationalError, + "asynchronous connections " + "cannot produce named cursors"); + return NULL; + } + Dprintf("psyco_conn_cursor: new cursor for connection at %p", self); Dprintf("psyco_conn_cursor: parameters: name = %s", name); @@ -389,6 +404,172 @@ psyco_conn_get_exception(PyObject *self, void *closure) return exception; } +#define psyco_conn_poll_doc \ +"poll() -- return POLL_OK if the connection has been estabilished, " \ + "POLL_READ if the application should be waiting " \ + "for the socket to be readable or POLL_WRITE " \ + "if the socket should be writable." + +static PyObject * +psyco_conn_poll(connectionObject *self) +{ + PostgresPollingStatusType poll_status; + + Dprintf("conn_poll: polling with status %d", self->status); + + switch (self->status) { + + case CONN_STATUS_SEND_DATESTYLE: + case CONN_STATUS_SEND_CLIENT_ENCODING: + case CONN_STATUS_SEND_TRANSACTION_ISOLATION: + /* these mean that we need to wait for the socket to become writable + to send the rest of our query */ + return conn_poll_send(self); + + case CONN_STATUS_GET_DATESTYLE: + case CONN_STATUS_GET_CLIENT_ENCODING: + case CONN_STATUS_GET_TRANSACTION_ISOLATION: + /* these mean that we are waiting for the results of the queries */ + return conn_poll_fetch(self); + + case CONN_STATUS_ASYNC: + /* this means we are in the middle of a PQconnectPoll loop */ + break; + + case CONN_STATUS_READY: + /* we have completed the connection setup */ + return PyInt_FromLong(PSYCO_POLL_OK); + + default: + /* everything else is an error */ + PyErr_SetString(OperationalError, + "not in asynchronous connection attempt"); + return NULL; + + } + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); + + poll_status = PQconnectPoll(self->pgconn); + + if (poll_status == PGRES_POLLING_READING) { + pthread_mutex_unlock(&(self->lock)); + Py_BLOCK_THREADS; + Dprintf("conn_poll: returing POLL_READ"); + return PyInt_FromLong(PSYCO_POLL_READ); + } + + if (poll_status == PGRES_POLLING_WRITING) { + pthread_mutex_unlock(&(self->lock)); + Py_BLOCK_THREADS; + Dprintf("conn_poll: returing POLL_WRITE"); + return PyInt_FromLong(PSYCO_POLL_WRITE); + } + + if (poll_status == PGRES_POLLING_FAILED) { + pthread_mutex_unlock(&(self->lock)); + Py_BLOCK_THREADS; + PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); + return NULL; + } + + /* the only other thing that PQconnectPoll can return is PGRES_POLLING_OK, + but make sure */ + if (poll_status != PGRES_POLLING_OK) { + pthread_mutex_unlock(&(self->lock)); + Py_BLOCK_THREADS; + PyErr_Format(OperationalError, + "unexpected result from PQconnectPoll: %d", poll_status); + return NULL; + } + + Dprintf("conn_poll: got POLL_OK"); + + /* the connection is built, but we want to do a few other things before we + let the user use it */ + + self->equote = conn_get_standard_conforming_strings(self->pgconn); + + Dprintf("conn_poll: got standard_conforming_strings"); + + /* + * Here is the tricky part, we need to figure the datestyle, + * client_encoding and isolevel, all using nonblocking calls. To do that + * we will keep telling the user to poll, while we are waiting for our + * asynchronous queries to complete. + */ + pthread_mutex_unlock(&(self->lock)); + Py_END_ALLOW_THREADS; + /* the next operation the client will do is send a query, so ask him to + wait for a writable condition */ + self->status = CONN_STATUS_SEND_DATESTYLE; + Dprintf("conn_poll: connection is built, retrning %d", + PSYCO_POLL_WRITE); + return PyInt_FromLong(PSYCO_POLL_WRITE); +} + + +/* extension: fileno - return the file descriptor of the connection */ + +#define psyco_conn_fileno_doc \ +"fileno() -> int -- Return file descriptor associated to database connection." + +static PyObject * +psyco_conn_fileno(connectionObject *self) +{ + long int socket; + + EXC_IF_CONN_CLOSED(self); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(self->lock)); + socket = (long int)PQsocket(self->pgconn); + pthread_mutex_unlock(&(self->lock)); + Py_END_ALLOW_THREADS; + + return PyInt_FromLong(socket); +} + + +/* extension: issync - tell if the connection is synchronous */ + +#define psyco_conn_issync_doc \ +"issync() -> bool -- Return True if the connection is synchronous." + +static PyObject * +psyco_conn_issync(connectionObject *self) +{ + if (self->async) { + Py_INCREF(Py_False); + return Py_False; + } + else { + Py_INCREF(Py_True); + return Py_True; + } +} + + +/* extension: executing - check for asynchronous operations */ + +#define psyco_conn_executing_doc \ +"executing() -> bool -- Return True if the connection is " \ + "executing an asynchronous operation." + +static PyObject * +psyco_conn_executing(connectionObject *self) +{ + if (self->async_cursor == NULL) { + Py_INCREF(Py_False); + return Py_False; + } + else { + Py_INCREF(Py_True); + return Py_True; + } +} + /** the connection object **/ @@ -418,6 +599,14 @@ static struct PyMethodDef connectionObject_methods[] = { METH_VARARGS|METH_KEYWORDS, psyco_conn_lobject_doc}, {"reset", (PyCFunction)psyco_conn_reset, METH_NOARGS, psyco_conn_reset_doc}, + {"poll", (PyCFunction)psyco_conn_poll, + METH_NOARGS, psyco_conn_lobject_doc}, + {"fileno", (PyCFunction)psyco_conn_fileno, + METH_NOARGS, psyco_conn_fileno_doc}, + {"issync", (PyCFunction)psyco_conn_issync, + METH_NOARGS, psyco_conn_issync_doc}, + {"executing", (PyCFunction)psyco_conn_executing, + METH_NOARGS, psyco_conn_executing_doc}, #endif {NULL} }; @@ -476,21 +665,22 @@ static struct PyGetSetDef connectionObject_getsets[] = { /* initialization and finalization methods */ static int -connection_setup(connectionObject *self, const char *dsn) +connection_setup(connectionObject *self, const char *dsn, long int async) { char *pos; int res; - Dprintf("connection_setup: init connection object at %p, refcnt = " - FORMAT_CODE_PY_SSIZE_T, - self, ((PyObject *)self)->ob_refcnt + Dprintf("connection_setup: init connection object at %p, " + "async %ld, refcnt = " FORMAT_CODE_PY_SSIZE_T, + self, async, ((PyObject *)self)->ob_refcnt ); self->dsn = strdup(dsn); self->notice_list = PyList_New(0); self->notifies = PyList_New(0); self->closed = 0; - self->status = CONN_STATUS_READY; + self->async = async; + self->status = async ? CONN_STATUS_ASYNC : CONN_STATUS_READY; self->critical = NULL; self->async_cursor = NULL; self->pgconn = NULL; @@ -502,7 +692,7 @@ connection_setup(connectionObject *self, const char *dsn) pthread_mutex_init(&(self->lock), NULL); - if (conn_connect(self) != 0) { + if (conn_connect(self, async) != 0) { Dprintf("connection_init: FAILED"); res = -1; } @@ -560,11 +750,12 @@ static int connection_init(PyObject *obj, PyObject *args, PyObject *kwds) { const char *dsn; + long int async = 0; - if (!PyArg_ParseTuple(args, "s", &dsn)) + if (!PyArg_ParseTuple(args, "s|l", &dsn, &async)) return -1; - return connection_setup((connectionObject *)obj, dsn); + return connection_setup((connectionObject *)obj, dsn, async); } static PyObject * diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index 7726319..8ececea 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -90,6 +90,7 @@ HIDDEN int psycopg_debug_enabled = 0; "- ``user`` -- user name used to authenticate\n" \ "- ``password`` -- password used to authenticate\n" \ "- ``sslmode`` -- SSL mode (see PostgreSQL documentation)\n\n" \ +"- ``async`` -- if the connection should provide asynchronous API\n\n" \ "If the ``connection_factory`` keyword argument is not provided this\n" \ "function always return an instance of the `connection` class.\n" \ "Else the given sub-class of `extensions.connection` will be used to\n" \ @@ -118,14 +119,16 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds) const char *database=NULL, *user=NULL, *password=NULL; const char *host=NULL, *sslmode=NULL; char port[16]; + int async = 0; static char *kwlist[] = {"dsn", "database", "host", "port", "user", "password", "sslmode", - "connection_factory", NULL}; + "connection_factory", "async", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sssOsssO", kwlist, + if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sssOsssOi", kwlist, &dsn_static, &database, &host, &pyport, - &user, &password, &sslmode, &factory)) { + &user, &password, &sslmode, + &factory, &async)) { return NULL; } @@ -190,11 +193,11 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds) { const char *dsn = (dsn_static != NULL ? dsn_static : dsn_dynamic); - Dprintf("psyco_connect: dsn = '%s'", dsn); + Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async); /* allocate connection, fill with errors and return it */ if (factory == NULL) factory = (PyObject *)&connectionType; - conn = PyObject_CallFunction(factory, "s", dsn); + conn = PyObject_CallFunction(factory, "si", dsn, async); } goto cleanup; |
