summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/extensions.py5
-rw-r--r--psycopg/connection.h37
-rw-r--r--psycopg/connection_int.c382
-rw-r--r--psycopg/connection_type.c207
-rw-r--r--psycopg/psycopgmodule.c13
5 files changed, 572 insertions, 72 deletions
diff --git a/lib/extensions.py b/lib/extensions.py
index adc729e..64ea7cb 100644
--- a/lib/extensions.py
+++ b/lib/extensions.py
@@ -76,6 +76,11 @@ STATUS_ASYNC = 4
# This is a usefull mnemonic to check if the connection is in a transaction
STATUS_IN_TRANSACTION = STATUS_BEGIN
+"""psycopg async connection polling values"""
+POLL_READ = 1
+POLL_WRITE = 2
+POLL_OK = 3
+
"""Backend transaction status values."""
TRANSACTION_STATUS_IDLE = 0
TRANSACTION_STATUS_ACTIVE = 1
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 82bfab8..d3f4048 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -41,10 +41,33 @@ extern "C" {
#define CONN_STATUS_BEGIN 2
#define CONN_STATUS_SYNC 3
#define CONN_STATUS_ASYNC 4
+/* async connection building statuses */
+#define CONN_STATUS_SEND_DATESTYLE 5
+#define CONN_STATUS_SENT_DATESTYLE 6
+#define CONN_STATUS_GET_DATESTYLE 7
+#define CONN_STATUS_SEND_CLIENT_ENCODING 8
+#define CONN_STATUS_SENT_CLIENT_ENCODING 9
+#define CONN_STATUS_GET_CLIENT_ENCODING 10
+#define CONN_STATUS_SEND_TRANSACTION_ISOLATION 11
+#define CONN_STATUS_SENT_TRANSACTION_ISOLATION 12
+#define CONN_STATUS_GET_TRANSACTION_ISOLATION 13
+
+/* polling result, try to keep in sync with PostgresPollingStatusType from
+ libpq-fe.h */
+#define PSYCO_POLL_READ 1
+#define PSYCO_POLL_WRITE 2
+#define PSYCO_POLL_OK 3
/* Hard limit on the notices stored by the Python connection */
#define CONN_NOTICES_LIMIT 50
+/* we need the initial date style to be ISO, for typecasters; if the user
+ later change it, she must know what she's doing... these are the queries we
+ need to issue */
+#define psyco_datestyle "SET DATESTYLE TO 'ISO'"
+#define psyco_client_encoding "SHOW client_encoding"
+#define psyco_transaction_isolation "SHOW default_transaction_isolation"
+
extern HIDDEN PyTypeObject connectionType;
struct connectionObject_notice {
@@ -66,12 +89,14 @@ typedef struct {
long int isolation_level; /* isolation level for this connection */
long int mark; /* number of commits/rollbacks done so far */
int status; /* status of the connection */
+
+ long int async; /* 1 means the connection is async */
int protocol; /* protocol version */
int server_version; /* server version */
- PGconn *pgconn; /* the postgresql connection */
+ PGconn *pgconn; /* the postgresql connection */
- PyObject *async_cursor;
+ PyObject *async_cursor; /* a cursor executing an asynchronous query */
/* notice processing */
PyObject *notice_list;
@@ -89,15 +114,21 @@ typedef struct {
} connectionObject;
/* C-callable functions in connection_int.c and connection_ext.c */
+HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn);
+HIDDEN char *conn_get_encoding(PGresult *pgres);
+HIDDEN int conn_get_isolation_level(PGresult *pgres);
+HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self);
HIDDEN void conn_notice_clean(connectionObject *self);
HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
-HIDDEN int conn_connect(connectionObject *self);
+HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
HIDDEN int conn_commit(connectionObject *self);
HIDDEN int conn_rollback(connectionObject *self);
HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
+HIDDEN PyObject *conn_poll_send(connectionObject *self);
+HIDDEN PyObject *conn_poll_fetch(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 8c619ec..4646eea 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -123,35 +123,16 @@ conn_notice_clean(connectionObject *self)
Py_END_ALLOW_THREADS;
}
-/* conn_setup - setup and read basic information about the connection */
+/*
+ * the conn_get_* family of functions makes it easier to obtain the connection
+ * parameters from query results or by interrogating the connection itself
+*/
int
-conn_setup(connectionObject *self, PGconn *pgconn)
+conn_get_standard_conforming_strings(PGconn *pgconn)
{
- PGresult *pgres;
- const char *data, *tmp;
- const char *scs; /* standard-conforming strings */
- size_t i;
-
- /* we need the initial date style to be ISO, for typecasters; if the user
- later change it, she must know what she's doing... */
- static const char datestyle[] = "SET DATESTYLE TO 'ISO'";
- static const char encoding[] = "SHOW client_encoding";
- static const char isolevel[] = "SHOW default_transaction_isolation";
-
- static const char lvl1a[] = "read uncommitted";
- static const char lvl1b[] = "read committed";
- static const char lvl2a[] = "repeatable read";
- static const char lvl2b[] = "serializable";
-
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&self->lock);
- Py_BLOCK_THREADS;
-
- if (self->encoding) free(self->encoding);
- self->equote = 0;
- self->isolation_level = 0;
-
+ int equote;
+ const char *scs;
/*
* The presence of the 'standard_conforming_strings' parameter
* means that the server _accepts_ the E'' quote.
@@ -173,15 +154,82 @@ conn_setup(connectionObject *self, PGconn *pgconn)
scs ? scs : "unavailable");
#ifndef PSYCOPG_OWN_QUOTING
- self->equote = (scs && (0 == strcmp("off", scs)));
+ equote = (scs && (0 == strcmp("off", scs)));
#else
- self->equote = (scs != NULL);
+ equote = (scs != NULL);
#endif
Dprintf("conn_connect: server requires E'' quotes: %s",
- self->equote ? "YES" : "NO");
+ equote ? "YES" : "NO");
+
+ return equote;
+}
+
+char *
+conn_get_encoding(PGresult *pgres)
+{
+ char *tmp, *encoding;
+ size_t i;
+
+ tmp = PQgetvalue(pgres, 0, 0);
+ encoding = malloc(strlen(tmp)+1);
+ if (encoding == NULL) {
+ PyErr_NoMemory();
+ IFCLEARPGRES(pgres);
+ return NULL;
+ }
+ for (i=0 ; i < strlen(tmp) ; i++)
+ encoding[i] = toupper(tmp[i]);
+ encoding[i] = '\0';
+ CLEARPGRES(pgres);
+
+ return encoding;
+}
+
+int
+conn_get_isolation_level(PGresult *pgres)
+{
+ static const char lvl1a[] = "read uncommitted";
+ static const char lvl1b[] = "read committed";
+ char *isolation_level = PQgetvalue(pgres, 0, 0);
+
+ CLEARPGRES(pgres);
+
+ if ((strncmp(lvl1a, isolation_level, strlen(isolation_level)) == 0)
+ || (strncmp(lvl1b, isolation_level, strlen(isolation_level)) == 0))
+ return 1;
+ else /* if it's not one of the lower ones, it's SERIALIZABLE */
+ return 2;
+}
+
+int
+conn_get_protocol_version(PGconn *pgconn)
+{
+#ifdef HAVE_PQPROTOCOL3
+ return PQprotocolVersion(pgconn);
+#else
+ return 2;
+#endif
+}
+
+/* conn_setup - setup and read basic information about the connection */
+
+int
+conn_setup(connectionObject *self, PGconn *pgconn)
+{
+ PGresult *pgres;
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&self->lock);
+ Py_BLOCK_THREADS;
+
+ if (self->encoding) free(self->encoding);
+ self->equote = 0;
+ self->isolation_level = 0;
+
+ self->equote = conn_get_standard_conforming_strings(pgconn);
Py_UNBLOCK_THREADS;
- pgres = PQexec(pgconn, datestyle);
+ pgres = PQexec(pgconn, psyco_datestyle);
Py_BLOCK_THREADS;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
@@ -196,7 +244,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
CLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
- pgres = PQexec(pgconn, encoding);
+ pgres = PQexec(pgconn, psyco_client_encoding);
Py_BLOCK_THREADS;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
@@ -208,24 +256,19 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS;
return -1;
}
- tmp = PQgetvalue(pgres, 0, 0);
- self->encoding = malloc(strlen(tmp)+1);
+
+ /* conn_get_encoding returns a malloc'd string */
+ self->encoding = conn_get_encoding(pgres);
if (self->encoding == NULL) {
- PyErr_NoMemory();
PQfinish(pgconn);
- IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
}
- for (i=0 ; i < strlen(tmp) ; i++)
- self->encoding[i] = toupper(tmp[i]);
- self->encoding[i] = '\0';
- CLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
- pgres = PQexec(pgconn, isolevel);
+ pgres = PQexec(pgconn, psyco_transaction_isolation);
Py_BLOCK_THREADS;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
@@ -238,16 +281,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS;
return -1;
}
- data = PQgetvalue(pgres, 0, 0);
- if ((strncmp(lvl1a, data, strlen(lvl1a)) == 0)
- || (strncmp(lvl1b, data, strlen(lvl1b)) == 0))
- self->isolation_level = 1;
- else if ((strncmp(lvl2a, data, strlen(lvl2a)) == 0)
- || (strncmp(lvl2b, data, strlen(lvl2b)) == 0))
- self->isolation_level = 2;
- else
- self->isolation_level = 2;
- CLEARPGRES(pgres);
+ self->isolation_level = conn_get_isolation_level(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
@@ -259,7 +293,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
/* conn_connect - execute a connection to the database */
int
-conn_connect(connectionObject *self)
+conn_sync_connect(connectionObject *self)
{
PGconn *pgconn;
@@ -295,11 +329,7 @@ conn_connect(connectionObject *self)
return -1;
}
-#ifdef HAVE_PQPROTOCOL3
- self->protocol = PQprotocolVersion(pgconn);
-#else
- self->protocol = 2;
-#endif
+ self->protocol = conn_get_protocol_version(pgconn);
Dprintf("conn_connect: using protocol %d", self->protocol);
self->server_version = (int)PQserverVersion(pgconn);
@@ -308,6 +338,246 @@ conn_connect(connectionObject *self)
return 0;
}
+static int
+conn_async_connect(connectionObject *self)
+{
+ PGconn *pgconn;
+
+ pgconn = PQconnectStart(self->dsn);
+
+ Dprintf("conn_connect: new postgresql connection at %p", pgconn);
+
+ if (pgconn == NULL)
+ {
+ Dprintf("conn_connect: PQconnectStart(%s) FAILED", self->dsn);
+ PyErr_SetString(OperationalError, "PQconnectStart() failed");
+ return -1;
+ }
+ else if (PQstatus(pgconn) == CONNECTION_BAD)
+ {
+ Dprintf("conn_connect: PQconnectdb(%s) returned BAD", self->dsn);
+ PyErr_SetString(OperationalError, PQerrorMessage(pgconn));
+ PQfinish(pgconn);
+ return -1;
+ }
+
+ PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self);
+
+ self->status = CONN_STATUS_ASYNC;
+ self->pgconn = pgconn;
+
+ return 0;
+}
+
+int
+conn_connect(connectionObject *self, long int async)
+{
+ if (async == 1) {
+ Dprintf("con_connect: connecting in ASYNC mode");
+ return conn_async_connect(self);
+ }
+ else {
+ Dprintf("con_connect: connecting in SYNC mode");
+ return conn_sync_connect(self);
+ }
+}
+
+/* conn_poll_send - handle connection polling when flushing output */
+
+PyObject *
+conn_poll_send(connectionObject *self)
+{
+ const char *query;
+ int next_status;
+ int ret;
+
+ Dprintf("conn_poll_send: status %d", self->status);
+
+ switch (self->status) {
+ case CONN_STATUS_SEND_DATESTYLE:
+ /* set the datestyle */
+ query = psyco_datestyle;
+ next_status = CONN_STATUS_SENT_DATESTYLE;
+ break;
+ case CONN_STATUS_SEND_CLIENT_ENCODING:
+ /* get the client_encoding */
+ query = psyco_client_encoding;
+ next_status = CONN_STATUS_SENT_CLIENT_ENCODING;
+ break;
+ case CONN_STATUS_SEND_TRANSACTION_ISOLATION:
+ /* get the default isolevel */
+ query = psyco_transaction_isolation;
+ next_status = CONN_STATUS_SENT_TRANSACTION_ISOLATION;
+ break;
+ default:
+ /* unexpected state, error out */
+ PyErr_Format(OperationalError,
+ "unexpected state: %d", self->status);
+ return NULL;
+ }
+
+ Dprintf("conn_poll_send: sending query %-.200s", query);
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&(self->lock));
+
+ if (PQsendQuery(self->pgconn, query) != 1) {
+ pthread_mutex_unlock(&(self->lock));
+ Py_BLOCK_THREADS;
+ PyErr_SetString(OperationalError,
+ PQerrorMessage(self->pgconn));
+ return NULL;
+ }
+
+ if (PQflush(self->pgconn) == 0) {
+ /* the query got fully sent to the server */
+ Dprintf("conn_poll_send: query got flushed immediately");
+ /* the return value will be POLL_READ */
+ ret = PSYCO_POLL_READ;
+
+ /* advance the next status, since we skip over the "waiting for the
+ query to be sent" status */
+ switch (next_status) {
+ case CONN_STATUS_SENT_DATESTYLE:
+ next_status = CONN_STATUS_GET_DATESTYLE;
+ break;
+ case CONN_STATUS_SENT_CLIENT_ENCODING:
+ next_status = CONN_STATUS_GET_CLIENT_ENCODING;
+ break;
+ case CONN_STATUS_SENT_TRANSACTION_ISOLATION:
+ next_status = CONN_STATUS_GET_TRANSACTION_ISOLATION;
+ break;
+ }
+ }
+ else {
+ /* query did not get sent completely, tell the client to wait for the
+ socket to become writable */
+ ret = PSYCO_POLL_WRITE;
+ }
+
+ self->status = next_status;
+ Dprintf("conn_poll_send: next status is %d, returning %d",
+ self->status, ret);
+
+ pthread_mutex_unlock(&(self->lock));
+ Py_END_ALLOW_THREADS;
+
+ return PyInt_FromLong(ret);
+}
+
+/* curs_poll_fetch - handle connection polling when reading result */
+
+PyObject *
+conn_poll_fetch(connectionObject *self)
+{
+ PGresult *pgres;
+ int is_busy;
+ int next_status;
+ int ret;
+
+ Dprintf("conn_poll_fetch: status %d", self->status);
+
+ /* consume the input */
+ is_busy = pq_is_busy(self);
+ if (is_busy == -1) {
+ /* there was an error, raise the exception */
+ return NULL;
+ }
+ else if (is_busy == 1) {
+ /* the connection is busy, tell the user to wait more */
+ Dprintf("conn_poll_fetch: connection busy, returning %d",
+ PSYCO_POLL_READ);
+ return PyInt_FromLong(PSYCO_POLL_READ);
+ }
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&(self->lock));
+
+ /* connection no longer busy, process input */
+ pgres = PQgetResult(self->pgconn);
+
+ /* do the rest while holding the GIL, we won't be calling into any
+ blocking API */
+ pthread_mutex_unlock(&(self->lock));
+ Py_END_ALLOW_THREADS;
+
+ Dprintf("conn_poll_fetch: got result %p", pgres);
+
+ /* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */
+ if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK &&
+ PQresultStatus(pgres) != PGRES_TUPLES_OK)) {
+ PyErr_SetString(OperationalError, "can't issue "
+ "initial connection queries");
+ PQfinish(self->pgconn);
+ IFCLEARPGRES(pgres);
+ return NULL;
+ }
+
+ if (self->status == CONN_STATUS_GET_DATESTYLE) {
+ /* got the result from SET DATESTYLE*/
+ Dprintf("conn_poll_fetch: datestyle set");
+ next_status = CONN_STATUS_SEND_CLIENT_ENCODING;
+ }
+ else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) {
+ /* got the client_encoding */
+ self->encoding = conn_get_encoding(pgres);
+ if (self->encoding == NULL) {
+ PQfinish(self->pgconn);
+ return NULL;
+ }
+ Dprintf("conn_poll_fetch: got client_encoding %s", self->encoding);
+ next_status = CONN_STATUS_SEND_TRANSACTION_ISOLATION;
+ }
+ else if (self->status == CONN_STATUS_GET_TRANSACTION_ISOLATION) {
+ /* got the default isolevel */
+ self->isolation_level = conn_get_isolation_level(pgres);
+ Dprintf("conn_poll_fetch: got isolevel %ld", self->isolation_level);
+
+ /* since this is the last step, set the other instance variables now */
+ self->equote = conn_get_standard_conforming_strings(self->pgconn);
+ self->protocol = conn_get_protocol_version(self->pgconn);
+ self->server_version = (int) PQserverVersion(self->pgconn);
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&(self->lock));
+
+ /* set the connection to nonblocking */
+ if (PQsetnonblocking(self->pgconn, 1) != 0) {
+ Dprintf("conn_async_connect: PQsetnonblocking() FAILED");
+ Py_BLOCK_THREADS;
+ PyErr_SetString(OperationalError, "PQsetnonblocking() failed");
+ PQfinish(self->pgconn);
+ return NULL;
+ }
+
+ pthread_mutex_unlock(&(self->lock));
+ Py_END_ALLOW_THREADS;
+
+ /* next status is going to READY */
+ next_status = CONN_STATUS_READY;
+ }
+ else {
+ /* unexpected state, error out */
+ PyErr_Format(OperationalError,
+ "unexpected state: %d", self->status);
+ return NULL;
+ }
+
+ /* clear any leftover result, there should be none, but the protocol
+ requires calling PQgetResult until you get a NULL */
+ pq_clear_async(self);
+
+ self->status = next_status;
+
+ /* if the curent status is READY it means we got the result of the
+ last initialization query, so we return POLL_OK, otherwise we need to
+ send another query, so return POLL_WRITE */
+ ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE;
+ Dprintf("conn_poll_fetch: next status is %d, returning %d",
+ self->status, ret);
+ return PyInt_FromLong(ret);
+}
+
/* conn_close - do anything needed to shut down the connection */
void
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 48858d3..3d5629d 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -37,6 +37,7 @@
#include "psycopg/psycopg.h"
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
+#include "psycopg/pqpath.h"
#include "psycopg/lobject.h"
/** DBAPI methods **/
@@ -66,6 +67,20 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
EXC_IF_CONN_CLOSED(self);
+ if (self->status != CONN_STATUS_READY &&
+ self->status != CONN_STATUS_BEGIN) {
+ PyErr_SetString(OperationalError,
+ "asynchronous connection attempt underway");
+ return NULL;
+ }
+
+ if (name != NULL && self->async) {
+ PyErr_SetString(OperationalError,
+ "asynchronous connections "
+ "cannot produce named cursors");
+ return NULL;
+ }
+
Dprintf("psyco_conn_cursor: new cursor for connection at %p", self);
Dprintf("psyco_conn_cursor: parameters: name = %s", name);
@@ -389,6 +404,172 @@ psyco_conn_get_exception(PyObject *self, void *closure)
return exception;
}
+#define psyco_conn_poll_doc \
+"poll() -- return POLL_OK if the connection has been estabilished, " \
+ "POLL_READ if the application should be waiting " \
+ "for the socket to be readable or POLL_WRITE " \
+ "if the socket should be writable."
+
+static PyObject *
+psyco_conn_poll(connectionObject *self)
+{
+ PostgresPollingStatusType poll_status;
+
+ Dprintf("conn_poll: polling with status %d", self->status);
+
+ switch (self->status) {
+
+ case CONN_STATUS_SEND_DATESTYLE:
+ case CONN_STATUS_SEND_CLIENT_ENCODING:
+ case CONN_STATUS_SEND_TRANSACTION_ISOLATION:
+ /* these mean that we need to wait for the socket to become writable
+ to send the rest of our query */
+ return conn_poll_send(self);
+
+ case CONN_STATUS_GET_DATESTYLE:
+ case CONN_STATUS_GET_CLIENT_ENCODING:
+ case CONN_STATUS_GET_TRANSACTION_ISOLATION:
+ /* these mean that we are waiting for the results of the queries */
+ return conn_poll_fetch(self);
+
+ case CONN_STATUS_ASYNC:
+ /* this means we are in the middle of a PQconnectPoll loop */
+ break;
+
+ case CONN_STATUS_READY:
+ /* we have completed the connection setup */
+ return PyInt_FromLong(PSYCO_POLL_OK);
+
+ default:
+ /* everything else is an error */
+ PyErr_SetString(OperationalError,
+ "not in asynchronous connection attempt");
+ return NULL;
+
+ }
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&self->lock);
+
+ poll_status = PQconnectPoll(self->pgconn);
+
+ if (poll_status == PGRES_POLLING_READING) {
+ pthread_mutex_unlock(&(self->lock));
+ Py_BLOCK_THREADS;
+ Dprintf("conn_poll: returing POLL_READ");
+ return PyInt_FromLong(PSYCO_POLL_READ);
+ }
+
+ if (poll_status == PGRES_POLLING_WRITING) {
+ pthread_mutex_unlock(&(self->lock));
+ Py_BLOCK_THREADS;
+ Dprintf("conn_poll: returing POLL_WRITE");
+ return PyInt_FromLong(PSYCO_POLL_WRITE);
+ }
+
+ if (poll_status == PGRES_POLLING_FAILED) {
+ pthread_mutex_unlock(&(self->lock));
+ Py_BLOCK_THREADS;
+ PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
+ return NULL;
+ }
+
+ /* the only other thing that PQconnectPoll can return is PGRES_POLLING_OK,
+ but make sure */
+ if (poll_status != PGRES_POLLING_OK) {
+ pthread_mutex_unlock(&(self->lock));
+ Py_BLOCK_THREADS;
+ PyErr_Format(OperationalError,
+ "unexpected result from PQconnectPoll: %d", poll_status);
+ return NULL;
+ }
+
+ Dprintf("conn_poll: got POLL_OK");
+
+ /* the connection is built, but we want to do a few other things before we
+ let the user use it */
+
+ self->equote = conn_get_standard_conforming_strings(self->pgconn);
+
+ Dprintf("conn_poll: got standard_conforming_strings");
+
+ /*
+ * Here is the tricky part, we need to figure the datestyle,
+ * client_encoding and isolevel, all using nonblocking calls. To do that
+ * we will keep telling the user to poll, while we are waiting for our
+ * asynchronous queries to complete.
+ */
+ pthread_mutex_unlock(&(self->lock));
+ Py_END_ALLOW_THREADS;
+ /* the next operation the client will do is send a query, so ask him to
+ wait for a writable condition */
+ self->status = CONN_STATUS_SEND_DATESTYLE;
+ Dprintf("conn_poll: connection is built, retrning %d",
+ PSYCO_POLL_WRITE);
+ return PyInt_FromLong(PSYCO_POLL_WRITE);
+}
+
+
+/* extension: fileno - return the file descriptor of the connection */
+
+#define psyco_conn_fileno_doc \
+"fileno() -> int -- Return file descriptor associated to database connection."
+
+static PyObject *
+psyco_conn_fileno(connectionObject *self)
+{
+ long int socket;
+
+ EXC_IF_CONN_CLOSED(self);
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&(self->lock));
+ socket = (long int)PQsocket(self->pgconn);
+ pthread_mutex_unlock(&(self->lock));
+ Py_END_ALLOW_THREADS;
+
+ return PyInt_FromLong(socket);
+}
+
+
+/* extension: issync - tell if the connection is synchronous */
+
+#define psyco_conn_issync_doc \
+"issync() -> bool -- Return True if the connection is synchronous."
+
+static PyObject *
+psyco_conn_issync(connectionObject *self)
+{
+ if (self->async) {
+ Py_INCREF(Py_False);
+ return Py_False;
+ }
+ else {
+ Py_INCREF(Py_True);
+ return Py_True;
+ }
+}
+
+
+/* extension: executing - check for asynchronous operations */
+
+#define psyco_conn_executing_doc \
+"executing() -> bool -- Return True if the connection is " \
+ "executing an asynchronous operation."
+
+static PyObject *
+psyco_conn_executing(connectionObject *self)
+{
+ if (self->async_cursor == NULL) {
+ Py_INCREF(Py_False);
+ return Py_False;
+ }
+ else {
+ Py_INCREF(Py_True);
+ return Py_True;
+ }
+}
+
/** the connection object **/
@@ -418,6 +599,14 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_conn_lobject_doc},
{"reset", (PyCFunction)psyco_conn_reset,
METH_NOARGS, psyco_conn_reset_doc},
+ {"poll", (PyCFunction)psyco_conn_poll,
+ METH_NOARGS, psyco_conn_lobject_doc},
+ {"fileno", (PyCFunction)psyco_conn_fileno,
+ METH_NOARGS, psyco_conn_fileno_doc},
+ {"issync", (PyCFunction)psyco_conn_issync,
+ METH_NOARGS, psyco_conn_issync_doc},
+ {"executing", (PyCFunction)psyco_conn_executing,
+ METH_NOARGS, psyco_conn_executing_doc},
#endif
{NULL}
};
@@ -476,21 +665,22 @@ static struct PyGetSetDef connectionObject_getsets[] = {
/* initialization and finalization methods */
static int
-connection_setup(connectionObject *self, const char *dsn)
+connection_setup(connectionObject *self, const char *dsn, long int async)
{
char *pos;
int res;
- Dprintf("connection_setup: init connection object at %p, refcnt = "
- FORMAT_CODE_PY_SSIZE_T,
- self, ((PyObject *)self)->ob_refcnt
+ Dprintf("connection_setup: init connection object at %p, "
+ "async %ld, refcnt = " FORMAT_CODE_PY_SSIZE_T,
+ self, async, ((PyObject *)self)->ob_refcnt
);
self->dsn = strdup(dsn);
self->notice_list = PyList_New(0);
self->notifies = PyList_New(0);
self->closed = 0;
- self->status = CONN_STATUS_READY;
+ self->async = async;
+ self->status = async ? CONN_STATUS_ASYNC : CONN_STATUS_READY;
self->critical = NULL;
self->async_cursor = NULL;
self->pgconn = NULL;
@@ -502,7 +692,7 @@ connection_setup(connectionObject *self, const char *dsn)
pthread_mutex_init(&(self->lock), NULL);
- if (conn_connect(self) != 0) {
+ if (conn_connect(self, async) != 0) {
Dprintf("connection_init: FAILED");
res = -1;
}
@@ -560,11 +750,12 @@ static int
connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
const char *dsn;
+ long int async = 0;
- if (!PyArg_ParseTuple(args, "s", &dsn))
+ if (!PyArg_ParseTuple(args, "s|l", &dsn, &async))
return -1;
- return connection_setup((connectionObject *)obj, dsn);
+ return connection_setup((connectionObject *)obj, dsn, async);
}
static PyObject *
diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c
index 7726319..8ececea 100644
--- a/psycopg/psycopgmodule.c
+++ b/psycopg/psycopgmodule.c
@@ -90,6 +90,7 @@ HIDDEN int psycopg_debug_enabled = 0;
"- ``user`` -- user name used to authenticate\n" \
"- ``password`` -- password used to authenticate\n" \
"- ``sslmode`` -- SSL mode (see PostgreSQL documentation)\n\n" \
+"- ``async`` -- if the connection should provide asynchronous API\n\n" \
"If the ``connection_factory`` keyword argument is not provided this\n" \
"function always return an instance of the `connection` class.\n" \
"Else the given sub-class of `extensions.connection` will be used to\n" \
@@ -118,14 +119,16 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
const char *database=NULL, *user=NULL, *password=NULL;
const char *host=NULL, *sslmode=NULL;
char port[16];
+ int async = 0;
static char *kwlist[] = {"dsn", "database", "host", "port",
"user", "password", "sslmode",
- "connection_factory", NULL};
+ "connection_factory", "async", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sssOsssO", kwlist,
+ if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sssOsssOi", kwlist,
&dsn_static, &database, &host, &pyport,
- &user, &password, &sslmode, &factory)) {
+ &user, &password, &sslmode,
+ &factory, &async)) {
return NULL;
}
@@ -190,11 +193,11 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
{
const char *dsn = (dsn_static != NULL ? dsn_static : dsn_dynamic);
- Dprintf("psyco_connect: dsn = '%s'", dsn);
+ Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async);
/* allocate connection, fill with errors and return it */
if (factory == NULL) factory = (PyObject *)&connectionType;
- conn = PyObject_CallFunction(factory, "s", dsn);
+ conn = PyObject_CallFunction(factory, "si", dsn, async);
}
goto cleanup;