summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog35
-rw-r--r--lib/extensions.py2
-rw-r--r--psycopg/connection.h12
-rw-r--r--psycopg/connection_type.c55
-rw-r--r--psycopg/pqpath.c16
-rw-r--r--psycopg/psycopg.h11
-rw-r--r--psycopg/psycopgmodule.c47
-rw-r--r--psycopg2da/adapter.py9
-rwxr-xr-xtests/test_transaction.py153
9 files changed, 263 insertions, 77 deletions
diff --git a/ChangeLog b/ChangeLog
index bdbed8a..c6e63f9 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,40 @@
2008-01-16 James Henstridge <james@jamesh.id.au>
+ * tests/test_transaction.py (DeadlockSerializationTestCase): port
+ over some tests for serialisation and deadlock errors,
+ demonstrating that TransactionRollbackError is generated.
+ (QueryCancelationTests): add a test to show that
+ QueryCanceledError is raised on statement timeouts.
+
+ * psycopg2da/adapter.py (_handle_psycopg_exception): rather than
+ checking exception messages, check for TransactionRollbackError.
+
+ * psycopg/pqpath.c (exception_from_sqlstate): return
+ TransactionRollbackError for 40xxx errors, and QueryCanceledError
+ for 57014 errors.
+ (pq_raise): If we are using an old server, use
+ TransactionRollbackError if the error message contains "could not
+ serialize" or "deadlock detected".
+
+ * psycopg/psycopgmodule.c (_psyco_connect_fill_exc): remove
+ function, since we no longer need to store pointers to the
+ exceptions in the connection. This also fixes a reference leak.
+ (psyco_connect): remove _psyco_connect_fill_exc() function call.
+
+ * psycopg/connection.h (connectionObject): remove exception
+ members from struct.
+
+ * psycopg/connection_type.c (connectionObject_getsets): modify the
+ exception attributes on the connection object from members to
+ getsets. This reduces the size of the struct.
+
+ * lib/extensions.py: import the two new extensions.
+
+ * psycopg/psycopgmodule.c (exctable): add new QueryCanceledError
+ and TransactionRollbackError exceptions.
+
+2008-01-16 James Henstridge <james@jamesh.id.au>
+
* tests/__init__.py (test_suite): add date tests to test suite.
* tests/test_dates.py: add tests for date/time typecasting and
diff --git a/lib/extensions.py b/lib/extensions.py
index fe61303..a2bc869 100644
--- a/lib/extensions.py
+++ b/lib/extensions.py
@@ -42,6 +42,8 @@ from _psycopg import adapt, adapters, encodings, connection, cursor
from _psycopg import string_types, binary_types, new_type, register_type
from _psycopg import ISQLQuote
+from _psycopg import QueryCanceledError, TransactionRollbackError
+
"""Isolation level values."""
ISOLATION_LEVEL_AUTOCOMMIT = 0
ISOLATION_LEVEL_READ_COMMITTED = 1
diff --git a/psycopg/connection.h b/psycopg/connection.h
index bc62381..068c794 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -67,18 +67,6 @@ typedef struct {
/* notifies */
PyObject *notifies;
- /* errors (DBAPI-2.0 extension) */
- PyObject *exc_Error;
- PyObject *exc_Warning;
- PyObject *exc_InterfaceError;
- PyObject *exc_DatabaseError;
- PyObject *exc_InternalError;
- PyObject *exc_OperationalError;
- PyObject *exc_ProgrammingError;
- PyObject *exc_IntegrityError;
- PyObject *exc_DataError;
- PyObject *exc_NotSupportedError;
-
/* per-connection typecasters */
PyObject *string_types; /* a set of typecasters for string types */
PyObject *binary_types; /* a set of typecasters for binary types */
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 2d18661..92a0686 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -231,6 +231,14 @@ psyco_conn_get_transaction_status(connectionObject *self, PyObject *args)
#endif
+static PyObject *
+psyco_conn_get_exception(PyObject *self, void *closure)
+{
+ PyObject *exception = *(PyObject **)closure;
+
+ Py_INCREF(exception);
+ return exception;
+}
/** the connection object **/
@@ -260,32 +268,6 @@ static struct PyMethodDef connectionObject_methods[] = {
/* object member list */
static struct PyMemberDef connectionObject_members[] = {
- /* DBAPI-2.0 extensions (exception objects) */
- {"Error", T_OBJECT,
- offsetof(connectionObject, exc_Error), RO, Error_doc},
- {"Warning",
- T_OBJECT, offsetof(connectionObject, exc_Warning), RO, Warning_doc},
- {"InterfaceError", T_OBJECT,
- offsetof(connectionObject, exc_InterfaceError), RO,
- InterfaceError_doc},
- {"DatabaseError", T_OBJECT,
- offsetof(connectionObject, exc_DatabaseError), RO, DatabaseError_doc},
- {"InternalError", T_OBJECT,
- offsetof(connectionObject, exc_InternalError), RO, InternalError_doc},
- {"OperationalError", T_OBJECT,
- offsetof(connectionObject, exc_OperationalError), RO,
- OperationalError_doc},
- {"ProgrammingError", T_OBJECT,
- offsetof(connectionObject, exc_ProgrammingError), RO,
- ProgrammingError_doc},
- {"IntegrityError", T_OBJECT,
- offsetof(connectionObject, exc_IntegrityError), RO,
- IntegrityError_doc},
- {"DataError", T_OBJECT,
- offsetof(connectionObject, exc_DataError), RO, DataError_doc},
- {"NotSupportedError", T_OBJECT,
- offsetof(connectionObject, exc_NotSupportedError), RO,
- NotSupportedError_doc},
#ifdef PSYCOPG_EXTENSIONS
{"closed", T_LONG, offsetof(connectionObject, closed), RO,
"True if the connection is closed."},
@@ -309,6 +291,25 @@ static struct PyMemberDef connectionObject_members[] = {
{NULL}
};
+#define EXCEPTION_GETTER(exc) \
+ { #exc, psyco_conn_get_exception, NULL, exc ## _doc, &exc }
+
+static struct PyGetSetDef connectionObject_getsets[] = {
+ /* DBAPI-2.0 extensions (exception objects) */
+ EXCEPTION_GETTER(Error),
+ EXCEPTION_GETTER(Warning),
+ EXCEPTION_GETTER(InterfaceError),
+ EXCEPTION_GETTER(DatabaseError),
+ EXCEPTION_GETTER(InternalError),
+ EXCEPTION_GETTER(OperationalError),
+ EXCEPTION_GETTER(ProgrammingError),
+ EXCEPTION_GETTER(IntegrityError),
+ EXCEPTION_GETTER(DataError),
+ EXCEPTION_GETTER(NotSupportedError),
+ {NULL}
+};
+#undef EXCEPTION_GETTER
+
/* initialization and finalization methods */
static int
@@ -465,7 +466,7 @@ PyTypeObject connectionType = {
connectionObject_methods, /*tp_methods*/
connectionObject_members, /*tp_members*/
- 0, /*tp_getset*/
+ connectionObject_getsets, /*tp_getset*/
0, /*tp_base*/
0, /*tp_dict*/
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index a7f7a7c..02f7be5 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -107,9 +107,13 @@ exception_from_sqlstate(const char *sqlstate)
case '4':
switch (sqlstate[1]) {
case '0': /* Class 40 - Transaction Rollback */
+#ifdef PSYCOPG_EXTENSIONS
+ return TransactionRollbackError;
+#else
return OperationalError;
+#endif
case '2': /* Class 42 - Syntax Error or Access Rule Violation */
- case '4': /* Class 44 — WITH CHECK OPTION Violation */
+ case '4': /* Class 44 - WITH CHECK OPTION Violation */
return ProgrammingError;
}
break;
@@ -119,7 +123,12 @@ exception_from_sqlstate(const char *sqlstate)
Class 55 - Object Not In Prerequisite State
Class 57 - Operator Intervention
Class 58 - System Error (errors external to PostgreSQL itself) */
- return OperationalError;
+#ifdef PSYCOPG_EXTENSIONS
+ if (!strcmp(sqlstate, "57014"))
+ return QueryCanceledError;
+ else
+#endif
+ return OperationalError;
case 'F': /* Class F0 - Configuration File Error */
return InternalError;
case 'P': /* Class P0 - PL/pgSQL Error */
@@ -188,6 +197,9 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres,
|| !strncmp(err, "ERROR: ExecAppend: Fail to add null", 36)
|| strstr(err, "referential integrity violation"))
exc = IntegrityError;
+ else if (strstr(err, "could not serialize") ||
+ strstr(err, "deadlock detected"))
+ exc = TransactionRollbackError;
else
exc = ProgrammingError;
}
diff --git a/psycopg/psycopg.h b/psycopg/psycopg.h
index f97fee1..e5eb70b 100644
--- a/psycopg/psycopg.h
+++ b/psycopg/psycopg.h
@@ -85,6 +85,9 @@ extern psyco_errors_set_RETURN psyco_errors_set psyco_errors_set_PROTO;
extern PyObject *Error, *Warning, *InterfaceError, *DatabaseError,
*InternalError, *OperationalError, *ProgrammingError,
*IntegrityError, *DataError, *NotSupportedError;
+#ifdef PSYCOPG_EXTENSIONS
+extern PyObject *QueryCanceledError, *TransactionRollbackError;
+#endif
/* python versions and compatibility stuff */
#ifndef PyMODINIT_FUNC
@@ -167,6 +170,14 @@ extern void psyco_set_error(PyObject *exc, PyObject *curs, const char *msg,
#define NotSupportedError_doc \
"A not supported datbase API was called."
+#ifdef PSYCOPG_EXTENSIONS
+#define QueryCanceledError_doc \
+"Error related to SQL query cancelation."
+
+#define TransactionRollbackError_doc \
+"Error causing transaction rollback (deadlocks, serialisation failures, etc)."
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c
index 26cc500..a212ba4 100644
--- a/psycopg/psycopgmodule.c
+++ b/psycopg/psycopgmodule.c
@@ -100,32 +100,6 @@ _psyco_connect_fill_dsn(char *dsn, char *kw, char *v, size_t i)
return i;
}
-static void
-_psyco_connect_fill_exc(connectionObject *conn)
-{
- /* fill the connection object with the exceptions */
- conn->exc_Error = Error;
- Py_INCREF(Error);
- conn->exc_Warning = Warning;
- Py_INCREF(Warning);
- conn->exc_InterfaceError = InterfaceError;
- Py_INCREF(InterfaceError);
- conn->exc_DatabaseError = DatabaseError;
- Py_INCREF(DatabaseError);
- conn->exc_InternalError = InternalError;
- Py_INCREF(InternalError);
- conn->exc_ProgrammingError = ProgrammingError;
- Py_INCREF(ProgrammingError);
- conn->exc_IntegrityError = IntegrityError;
- Py_INCREF(IntegrityError);
- conn->exc_DataError = DataError;
- Py_INCREF(DataError);
- conn->exc_NotSupportedError = NotSupportedError;
- Py_INCREF(NotSupportedError);
- conn->exc_OperationalError = OperationalError;
- Py_INCREF(OperationalError);
-}
-
static PyObject *
psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
{
@@ -215,7 +189,6 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
/* allocate connection, fill with errors and return it */
if (factory == NULL) factory = (PyObject *)&connectionType;
conn = PyObject_CallFunction(factory, "s", dsn);
- if (conn) _psyco_connect_fill_exc((connectionObject*)conn);
}
goto cleanup;
@@ -433,6 +406,9 @@ static void psyco_encodings_fill(PyObject *dict)
PyObject *Error, *Warning, *InterfaceError, *DatabaseError,
*InternalError, *OperationalError, *ProgrammingError,
*IntegrityError, *DataError, *NotSupportedError;
+#ifdef PSYCOPG_EXTENSIONS
+PyObject *QueryCanceledError, *TransactionRollbackError;
+#endif
/* mapping between exception names and their PyObject */
static struct {
@@ -455,6 +431,13 @@ static struct {
{ "psycopg2.DataError", &DataError, &DatabaseError, DataError_doc },
{ "psycopg2.NotSupportedError", &NotSupportedError, &DatabaseError,
NotSupportedError_doc },
+#ifdef PSYCOPG_EXTENSIONS
+ { "psycopg2.extensions.QueryCanceledError", &QueryCanceledError,
+ &OperationalError, OperationalError_doc },
+ { "psycopg2.extensions.TransactionRollbackError",
+ &TransactionRollbackError, &OperationalError,
+ TransactionRollbackError_doc },
+#endif
{NULL} /* Sentinel */
};
@@ -507,6 +490,11 @@ psyco_errors_fill(PyObject *dict)
PyDict_SetItemString(dict, "IntegrityError", IntegrityError);
PyDict_SetItemString(dict, "DataError", DataError);
PyDict_SetItemString(dict, "NotSupportedError", NotSupportedError);
+#ifdef PSYCOPG_EXTENSIONS
+ PyDict_SetItemString(dict, "QueryCanceledError", QueryCanceledError);
+ PyDict_SetItemString(dict, "TransactionRollbackError",
+ TransactionRollbackError);
+#endif
}
void
@@ -522,6 +510,11 @@ psyco_errors_set(PyObject *type)
PyObject_SetAttrString(type, "IntegrityError", IntegrityError);
PyObject_SetAttrString(type, "DataError", DataError);
PyObject_SetAttrString(type, "NotSupportedError", NotSupportedError);
+#ifdef PSYCOPG_EXTENSIONS
+ PyObject_SetAttrString(type, "QueryCanceledError", QueryCanceledError);
+ PyObject_SetAttrString(type, "TransactionRollbackError",
+ TransactionRollbackError);
+#endif
}
/* psyco_error_new
diff --git a/psycopg2da/adapter.py b/psycopg2da/adapter.py
index dc79600..6c91206 100644
--- a/psycopg2da/adapter.py
+++ b/psycopg2da/adapter.py
@@ -372,14 +372,7 @@ def _handle_psycopg_exception(error):
If we have a serialization exception or a deadlock, we should retry the
transaction by raising a Retry exception. Otherwise, we reraise.
"""
- if not error.args:
- raise
- msg = error.args[0]
- # These messages are from PostgreSQL 8.0. They may change between
- # PostgreSQL releases - if so, the different messages should be added
- # rather than the existing ones changed so this logic works with
- # different versions.
- if 'could not serialize' in msg or 'deadlock detected' in msg:
+ if isinstance(error, psycopg2.extensions.TransactionRollbackError):
raise Retry(sys.exc_info())
raise
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 81fe54b..bd96ce4 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -1,6 +1,9 @@
#!/usr/bin/env python
-import psycopg2
+import threading
import unittest
+
+import psycopg2
+import psycopg2
import tests
from psycopg2.extensions import (
@@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase):
self.assertEqual(curs.fetchone()[0], 1)
+class DeadlockSerializationTestCase(unittest.TestCase):
+ """Test deadlock and serialization failure errors."""
+
+ def connect(self):
+ conn = psycopg2.connect("dbname=%s" % tests.dbname)
+ conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
+ return conn
+
+ def setUp(self):
+ self.conn = self.connect()
+ curs = self.conn.cursor()
+ # Drop table if it already exists
+ try:
+ curs.execute("DROP TABLE table1")
+ self.conn.commit()
+ except psycopg2.DatabaseError:
+ self.conn.rollback()
+ try:
+ curs.execute("DROP TABLE table2")
+ self.conn.commit()
+ except psycopg2.DatabaseError:
+ self.conn.rollback()
+ # Create sample data
+ curs.execute("""
+ CREATE TABLE table1 (
+ id int PRIMARY KEY,
+ name text)
+ """)
+ curs.execute("INSERT INTO table1 VALUES (1, 'hello')")
+ curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)")
+ self.conn.commit()
+
+ def tearDown(self):
+ curs = self.conn.cursor()
+ curs.execute("DROP TABLE table1")
+ curs.execute("DROP TABLE table2")
+ self.conn.commit()
+ self.conn.close()
+
+ def test_deadlock(self):
+ self.thread1_error = self.thread2_error = None
+ step1 = threading.Event()
+ step2 = threading.Event()
+
+ def task1():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
+ step1.set()
+ step2.wait()
+ curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
+ except psycopg2.DatabaseError, exc:
+ self.thread1_error = exc
+ step1.set()
+ conn.close()
+ def task2():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ step1.wait()
+ curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
+ step2.set()
+ curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
+ except psycopg2.DatabaseError, exc:
+ self.thread2_error = exc
+ step2.set()
+ conn.close()
+
+ # Run the threads in parallel. The "step1" and "step2" events
+ # ensure that the two transactions overlap.
+ thread1 = threading.Thread(target=task1)
+ thread2 = threading.Thread(target=task2)
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ # Exactly one of the threads should have failed with
+ # TransactionRollbackError:
+ self.assertFalse(self.thread1_error and self.thread2_error)
+ error = self.thread1_error or self.thread2_error
+ self.assertTrue(isinstance(
+ error, psycopg2.extensions.TransactionRollbackError))
+
+ def test_serialisation_failure(self):
+ self.thread1_error = self.thread2_error = None
+ step1 = threading.Event()
+ step2 = threading.Event()
+
+ def task1():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("SELECT name FROM table1 WHERE id = 1")
+ curs.fetchall()
+ step1.set()
+ step2.wait()
+ curs.execute("UPDATE table1 SET name='task1' WHERE id = 1")
+ conn.commit()
+ except psycopg2.DatabaseError, exc:
+ self.thread1_error = exc
+ step1.set()
+ conn.close()
+ def task2():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ step1.wait()
+ curs.execute("UPDATE table1 SET name='task2' WHERE id = 1")
+ conn.commit()
+ except psycopg2.DatabaseError, exc:
+ self.thread2_error = exc
+ step2.set()
+ conn.close()
+
+ # Run the threads in parallel. The "step1" and "step2" events
+ # ensure that the two transactions overlap.
+ thread1 = threading.Thread(target=task1)
+ thread2 = threading.Thread(target=task2)
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ # Exactly one of the threads should have failed with
+ # TransactionRollbackError:
+ self.assertFalse(self.thread1_error and self.thread2_error)
+ error = self.thread1_error or self.thread2_error
+ self.assertTrue(isinstance(
+ error, psycopg2.extensions.TransactionRollbackError))
+
+
+class QueryCancelationTests(unittest.TestCase):
+ """Tests for query cancelation."""
+
+ def setUp(self):
+ self.conn = psycopg2.connect("dbname=%s" % tests.dbname)
+ self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
+
+ def test_statement_timeout(self):
+ curs = self.conn.cursor()
+ # Set a low statement timeout, then sleep for a longer period.
+ curs.execute('SET statement_timeout TO 10')
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ curs.execute, 'SELECT pg_sleep(50)')
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)