diff options
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/test_connection.py | 2 | ||||
-rwxr-xr-x | tests/test_cursor.py | 42 | ||||
-rwxr-xr-x | tests/test_module.py | 4 | ||||
-rwxr-xr-x | tests/test_psycopg2_dbapi20.py | 26 | ||||
-rwxr-xr-x | tests/test_quote.py | 10 | ||||
-rwxr-xr-x[-rw-r--r--] | tests/test_replication.py | 32 | ||||
-rw-r--r-- | tests/testutils.py | 14 |
7 files changed, 112 insertions, 18 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py index 8744488..833751b 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -465,7 +465,7 @@ class MakeDsnTestCase(ConnectingTestCase): conn = self.connect() d = conn.get_dsn_parameters() self.assertEqual(d['dbname'], dbname) # the only param we can check reliably - self.assertNotIn('password', d) + self.assert_('password' not in d, d) class IsolationLevelsTestCase(ConnectingTestCase): diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 4aae6b2..fc924c4 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -498,6 +498,48 @@ class CursorTests(ConnectingTestCase): cur = self.conn.cursor() self.assertRaises(TypeError, cur.callproc, 'lower', 42) + # It would be inappropriate to test callproc's named parameters in the + # DBAPI2.0 test section because they are a psycopg2 extension. + @skip_before_postgres(9, 0) + def test_callproc_dict(self): + # This parameter name tests for injection and quote escaping + paramname = ''' + Robert'); drop table "students" -- + '''.strip() + escaped_paramname = '"%s"' % paramname.replace('"', '""') + procname = 'pg_temp.randall' + + cur = self.conn.cursor() + + # Set up the temporary function + cur.execute(''' + CREATE FUNCTION %s(%s INT) + RETURNS INT AS + 'SELECT $1 * $1' + LANGUAGE SQL + ''' % (procname, escaped_paramname)); + + # Make sure callproc works right + cur.callproc(procname, { paramname: 2 }) + self.assertEquals(cur.fetchone()[0], 4) + + # Make sure callproc fails right + failing_cases = [ + ({ paramname: 2, 'foo': 'bar' }, psycopg2.ProgrammingError), + ({ paramname: '2' }, psycopg2.ProgrammingError), + ({ paramname: 'two' }, psycopg2.ProgrammingError), + ({ u'bj\xc3rn': 2 }, psycopg2.ProgrammingError), + ({ 3: 2 }, TypeError), + ({ self: 2 }, TypeError), + ] + for parameter_sequence, exception in failing_cases: + self.assertRaises(exception, cur.callproc, procname, parameter_sequence) + self.conn.rollback() + + def test_callproc_badparam(self): + cur = self.conn.cursor() + self.assertRaises(TypeError, cur.callproc, 'lower', 42) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_module.py b/tests/test_module.py index 1a9a19d..6a1606d 100755 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -119,8 +119,8 @@ class ConnectTestCase(unittest.TestCase): def test_int_port_param(self): psycopg2.connect(database='sony', port=6543) dsn = " %s " % self.args[0] - self.assertIn(" dbname=sony ", dsn) - self.assertIn(" port=6543 ", dsn) + self.assert_(" dbname=sony " in dsn, dsn) + self.assert_(" port=6543 " in dsn, dsn) def test_empty_param(self): psycopg2.connect(database='sony', password='') diff --git a/tests/test_psycopg2_dbapi20.py b/tests/test_psycopg2_dbapi20.py index 80473b7..c780d50 100755 --- a/tests/test_psycopg2_dbapi20.py +++ b/tests/test_psycopg2_dbapi20.py @@ -36,7 +36,31 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test): connect_args = () connect_kw_args = {'dsn': dsn} - lower_func = 'lower' # For stored procedure test + lower_func = 'lower' # For stored procedure test + + def test_callproc(self): + # Until DBAPI 2.0 compliance, callproc should return None or it's just + # misleading. Therefore, we will skip the return value test for + # callproc and only perform the fetch test. + # + # For what it's worth, the DBAPI2.0 test_callproc doesn't actually + # test for DBAPI2.0 compliance! It doesn't check for modified OUT and + # IN/OUT parameters in the return values! + con = self._connect() + try: + cur = con.cursor() + if self.lower_func and hasattr(cur,'callproc'): + cur.callproc(self.lower_func,('FOO',)) + r = cur.fetchall() + self.assertEqual(len(r),1,'callproc produced no result set') + self.assertEqual(len(r[0]),1, + 'callproc produced invalid result set' + ) + self.assertEqual(r[0][0],'foo', + 'callproc produced invalid results' + ) + finally: + con.close() def test_setoutputsize(self): # psycopg2's setoutputsize() is a no-op diff --git a/tests/test_quote.py b/tests/test_quote.py index f74fd85..72c9c1e 100755 --- a/tests/test_quote.py +++ b/tests/test_quote.py @@ -65,11 +65,13 @@ class QuotingTestCase(ConnectingTestCase): curs = self.conn.cursor() data = 'abcd\x01\x00cdefg' - with self.assertRaises(ValueError) as e: + try: curs.execute("SELECT %s", (data,)) - - self.assertEquals(str(e.exception), - 'A string literal cannot contain NUL (0x00) characters.') + except ValueError as e: + self.assertEquals(str(e), + 'A string literal cannot contain NUL (0x00) characters.') + else: + self.fail("ValueError not raised") def test_binary(self): data = b"""some data with \000\013 binary diff --git a/tests/test_replication.py b/tests/test_replication.py index ca99038..79d1295 100644..100755 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -23,23 +23,19 @@ # License for more details. import psycopg2 -import psycopg2.extensions from psycopg2.extras import ( PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication) import testconfig -from testutils import unittest -from testutils import skip_before_postgres -from testutils import ConnectingTestCase +from testutils import unittest, ConnectingTestCase +from testutils import skip_before_postgres, skip_if_green + +skip_repl_if_green = skip_if_green("replication not supported in green mode") class ReplicationTestCase(ConnectingTestCase): def setUp(self): - if not testconfig.repl_dsn: - self.skipTest("replication tests disabled by default") - super(ReplicationTestCase, self).setUp() - self.slot = testconfig.repl_slot self._slots = [] @@ -93,6 +89,20 @@ class ReplicationTest(ReplicationTestCase): cur.execute("IDENTIFY_SYSTEM") cur.fetchall() + @skip_before_postgres(9, 0) + def test_datestyle(self): + if testconfig.repl_dsn is None: + return self.skipTest("replication tests disabled by default") + + conn = self.repl_connect( + dsn=testconfig.repl_dsn, options='-cdatestyle=german', + connection_factory=PhysicalReplicationConnection) + if conn is None: + return + cur = conn.cursor() + cur.execute("IDENTIFY_SYSTEM") + cur.fetchall() + @skip_before_postgres(9, 4) def test_logical_replication_connection(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) @@ -114,6 +124,7 @@ class ReplicationTest(ReplicationTestCase): psycopg2.ProgrammingError, self.create_replication_slot, cur) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_start_on_missing_replication_slot(self): conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) if conn is None: @@ -127,6 +138,7 @@ class ReplicationTest(ReplicationTestCase): cur.start_replication(self.slot) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_start_and_recover_from_error(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) if conn is None: @@ -148,6 +160,7 @@ class ReplicationTest(ReplicationTestCase): cur.start_replication(slot_name=self.slot) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_stop_replication(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) if conn is None: @@ -167,12 +180,13 @@ class ReplicationTest(ReplicationTestCase): class AsyncReplicationTest(ReplicationTestCase): @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_async_replication(self): conn = self.repl_connect( connection_factory=LogicalReplicationConnection, async=1) if conn is None: return - self.wait(conn) + cur = conn.cursor() self.create_replication_slot(cur, output_plugin='test_decoding') diff --git a/tests/testutils.py b/tests/testutils.py index d0a34bc..9347735 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -122,13 +122,25 @@ class ConnectingTestCase(unittest.TestCase): Should raise a skip test if not available, but guard for None on old Python versions. """ + if repl_dsn is None: + return self.skipTest("replication tests disabled by default") + if 'dsn' not in kwargs: kwargs['dsn'] = repl_dsn import psycopg2 try: conn = self.connect(**kwargs) + if conn.async == 1: + self.wait(conn) except psycopg2.OperationalError, e: - return self.skipTest("replication db not configured: %s" % e) + # If pgcode is not set it is a genuine connection error + # Otherwise we tried to run some bad operation in the connection + # (e.g. bug #482) and we'd rather know that. + if e.pgcode is None: + return self.skipTest("replication db not configured: %s" % e) + else: + raise + return conn def _get_conn(self): |