summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan UrbaƄski <wulczer@wulczer.org>2010-07-24 23:01:27 +0200
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-11-28 11:50:02 +0000
commit751bfa1ea605298c2204de14111bd182d137136e (patch)
tree1370c6efc7bf4ff6c74eca73c97b0a4dee14c69b
parent9f781415326db3e09696b0601250b999bd4f6a0e (diff)
downloadpsycopg2-751bfa1ea605298c2204de14111bd182d137136e.tar.gz
Support query cancellation.
Add a cancel() method do the connection object that will interrupt the current query using the libpq PQcancel() function.
-rw-r--r--doc/src/connection.rst20
-rw-r--r--psycopg/connection.h1
-rw-r--r--psycopg/connection_int.c19
-rw-r--r--psycopg/connection_type.c34
-rwxr-xr-xtests/__init__.py2
-rw-r--r--tests/test_cancel.py89
6 files changed, 165 insertions, 0 deletions
diff --git a/doc/src/connection.rst b/doc/src/connection.rst
index 22241fe..153fea5 100644
--- a/doc/src/connection.rst
+++ b/doc/src/connection.rst
@@ -264,6 +264,26 @@ The ``connection`` class
(0) or closed (1).
+ .. method:: cancel
+
+ Cancel the current database operation.
+
+ The method interrupts the processing of the current operation. If no
+ query is being executed, it does nothing. You can call this function
+ from a different thread than the one currently executing a database
+ operation, for instance if you want to cancel a long running query if a
+ button is pushed in the UI. Interrupting query execution will cause the
+ cancelled method to raise a
+ `~psycopg2.extensions.QueryCanceledError`. Note that the termination
+ of the query is not guaranteed to succeed: see the documentation for
+ |PQcancel|_.
+
+ .. |PQcancel| replace:: `!PQcancel()`
+ .. _PQcancel: http://www.postgresql.org/docs/8.4/static/libpq-cancel.html#AEN34765
+
+ .. versionadded:: 2.2.3
+
+
.. method:: reset
Reset the connection to the default.
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 9653687..76a6a09 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -101,6 +101,7 @@ typedef struct {
int server_version; /* server version */
PGconn *pgconn; /* the postgresql connection */
+ PGcancel *cancel; /* the cancellation structure */
PyObject *async_cursor; /* a cursor executing an asynchronous query */
int async_status; /* asynchronous execution status */
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index f98e4fd..73292b8 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -282,6 +282,12 @@ conn_get_server_version(PGconn *pgconn)
return (int)PQserverVersion(pgconn);
}
+PGcancel *
+conn_get_cancel(PGconn *pgconn)
+{
+ return PQgetCancel(pgconn);
+}
+
/* Return 1 if the server datestyle allows us to work without problems,
0 if it needs to be set to something better, e.g. ISO. */
@@ -320,6 +326,12 @@ conn_setup(connectionObject *self, PGconn *pgconn)
return -1;
}
+ self->cancel = conn_get_cancel(self->pgconn);
+ if (self->cancel == NULL) {
+ PyErr_SetString(OperationalError, "can't get cancellation key");
+ return -1;
+ }
+
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
@@ -645,6 +657,11 @@ _conn_poll_setup_async(connectionObject *self)
if (self->encoding == NULL) {
break;
}
+ self->cancel = conn_get_cancel(self->pgconn);
+ if (self->cancel == NULL) {
+ PyErr_SetString(OperationalError, "can't get cancellation key");
+ break;
+ }
/* asynchronous connections always use isolation level 0, the user is
* expected to manage the transactions himself, by sending
@@ -782,8 +799,10 @@ conn_close(connectionObject *self)
if (self->pgconn) {
PQfinish(self->pgconn);
+ PQfreeCancel(self->cancel);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
+ self->cancel = NULL;
}
pthread_mutex_unlock(&self->lock);
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 19945ce..e27b312 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -697,6 +697,37 @@ psyco_conn_isexecuting(connectionObject *self)
return Py_False;
}
+
+/* extension: cancel - cancel the current operation */
+
+#define psyco_conn_cancel_doc \
+"cancel() -- cancel the current operation"
+
+static PyObject *
+psyco_conn_cancel(connectionObject *self)
+{
+ char errbuf[256];
+
+ EXC_IF_CONN_CLOSED(self);
+
+ /* do not allow cancellation while the connection is being built */
+ Dprintf("psyco_conn_cancel: cancelling with key %p", self->cancel);
+ if (self->status != CONN_STATUS_READY &&
+ self->status != CONN_STATUS_BEGIN) {
+ PyErr_SetString(OperationalError,
+ "asynchronous connection attempt underway");
+ return NULL;
+ }
+
+ if (PQcancel(self->cancel, errbuf, sizeof(errbuf)) == 0) {
+ Dprintf("psyco_conn_cancel: cancelling failed: %s", errbuf);
+ PyErr_SetString(OperationalError, errbuf);
+ return NULL;
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
#endif /* PSYCOPG_EXTENSIONS */
@@ -747,6 +778,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_NOARGS, psyco_conn_fileno_doc},
{"isexecuting", (PyCFunction)psyco_conn_isexecuting,
METH_NOARGS, psyco_conn_isexecuting_doc},
+ {"cancel", (PyCFunction)psyco_conn_cancel,
+ METH_NOARGS, psyco_conn_cancel_doc},
#endif
{NULL}
};
@@ -827,6 +860,7 @@ connection_setup(connectionObject *self, const char *dsn, long int async)
self->async_cursor = NULL;
self->async_status = ASYNC_DONE;
self->pgconn = NULL;
+ self->cancel = NULL;
self->mark = 0;
self->string_types = PyDict_New();
self->binary_types = PyDict_New();
diff --git a/tests/__init__.py b/tests/__init__.py
index 2319341..4ee01ff 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -46,6 +46,7 @@ import test_copy
import test_notify
import test_async
import test_green
+import test_cancel
def test_suite():
suite = unittest.TestSuite()
@@ -64,6 +65,7 @@ def test_suite():
suite.addTest(test_notify.test_suite())
suite.addTest(test_async.test_suite())
suite.addTest(test_green.test_suite())
+ suite.addTest(test_cancel.test_suite())
return suite
if __name__ == '__main__':
diff --git a/tests/test_cancel.py b/tests/test_cancel.py
new file mode 100644
index 0000000..746b211
--- /dev/null
+++ b/tests/test_cancel.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+import time
+import threading
+import unittest
+
+import tests
+import psycopg2
+import psycopg2.extensions
+from psycopg2 import extras
+
+
+class CancelTests(unittest.TestCase):
+
+ def setUp(self):
+ self.conn = psycopg2.connect(tests.dsn)
+ cur = self.conn.cursor()
+ cur.execute('''
+ CREATE TEMPORARY TABLE table1 (
+ id int PRIMARY KEY
+ )''')
+ self.conn.commit()
+
+ def tearDown(self):
+ self.conn.close()
+
+ def test_empty_cancel(self):
+ self.conn.cancel()
+
+ def test_cancel(self):
+ errors = []
+
+ def neverending(conn):
+ cur = conn.cursor()
+ try:
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ cur.execute, "select pg_sleep(10000)")
+ # make sure the connection still works
+ conn.rollback()
+ cur.execute("select 1")
+ self.assertEqual(cur.fetchall(), [(1, )])
+ except Exception, e:
+ errors.append(e)
+ raise
+
+ def canceller(conn):
+ cur = conn.cursor()
+ try:
+ conn.cancel()
+ except Exception, e:
+ errors.append(e)
+ raise
+
+ thread1 = threading.Thread(target=neverending, args=(self.conn, ))
+ # wait a bit to make sure that the other thread is already in
+ # pg_sleep -- ugly and racy, but the chances are ridiculously low
+ thread2 = threading.Timer(0.3, canceller, args=(self.conn, ))
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ self.assertEqual(errors, [])
+
+ def test_async_cancel(self):
+ async_conn = psycopg2.connect(tests.dsn, async=True)
+ self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
+ extras.wait_select(async_conn)
+ cur = async_conn.cursor()
+ cur.execute("select pg_sleep(10000)")
+ self.assertTrue(async_conn.isexecuting())
+ async_conn.cancel()
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ extras.wait_select, async_conn)
+ cur.execute("select 1")
+ extras.wait_select(async_conn)
+ self.assertEqual(cur.fetchall(), [(1, )])
+
+ def test_async_connection_cancel(self):
+ async_conn = psycopg2.connect(tests.dsn, async=True)
+ async_conn.close()
+ self.assertTrue(async_conn.closed)
+
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()