summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rwxr-xr-xtests/test_connection.py2
-rwxr-xr-xtests/test_cursor.py42
-rwxr-xr-xtests/test_module.py4
-rwxr-xr-xtests/test_psycopg2_dbapi20.py26
-rwxr-xr-xtests/test_quote.py10
-rwxr-xr-x[-rw-r--r--]tests/test_replication.py32
-rw-r--r--tests/testutils.py14
7 files changed, 112 insertions, 18 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 8744488..833751b 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -465,7 +465,7 @@ class MakeDsnTestCase(ConnectingTestCase):
conn = self.connect()
d = conn.get_dsn_parameters()
self.assertEqual(d['dbname'], dbname) # the only param we can check reliably
- self.assertNotIn('password', d)
+ self.assert_('password' not in d, d)
class IsolationLevelsTestCase(ConnectingTestCase):
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index 4aae6b2..fc924c4 100755
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -498,6 +498,48 @@ class CursorTests(ConnectingTestCase):
cur = self.conn.cursor()
self.assertRaises(TypeError, cur.callproc, 'lower', 42)
+ # It would be inappropriate to test callproc's named parameters in the
+ # DBAPI2.0 test section because they are a psycopg2 extension.
+ @skip_before_postgres(9, 0)
+ def test_callproc_dict(self):
+ # This parameter name tests for injection and quote escaping
+ paramname = '''
+ Robert'); drop table "students" --
+ '''.strip()
+ escaped_paramname = '"%s"' % paramname.replace('"', '""')
+ procname = 'pg_temp.randall'
+
+ cur = self.conn.cursor()
+
+ # Set up the temporary function
+ cur.execute('''
+ CREATE FUNCTION %s(%s INT)
+ RETURNS INT AS
+ 'SELECT $1 * $1'
+ LANGUAGE SQL
+ ''' % (procname, escaped_paramname));
+
+ # Make sure callproc works right
+ cur.callproc(procname, { paramname: 2 })
+ self.assertEquals(cur.fetchone()[0], 4)
+
+ # Make sure callproc fails right
+ failing_cases = [
+ ({ paramname: 2, 'foo': 'bar' }, psycopg2.ProgrammingError),
+ ({ paramname: '2' }, psycopg2.ProgrammingError),
+ ({ paramname: 'two' }, psycopg2.ProgrammingError),
+ ({ u'bj\xc3rn': 2 }, psycopg2.ProgrammingError),
+ ({ 3: 2 }, TypeError),
+ ({ self: 2 }, TypeError),
+ ]
+ for parameter_sequence, exception in failing_cases:
+ self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
+ self.conn.rollback()
+
+ def test_callproc_badparam(self):
+ cur = self.conn.cursor()
+ self.assertRaises(TypeError, cur.callproc, 'lower', 42)
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff --git a/tests/test_module.py b/tests/test_module.py
index 1a9a19d..6a1606d 100755
--- a/tests/test_module.py
+++ b/tests/test_module.py
@@ -119,8 +119,8 @@ class ConnectTestCase(unittest.TestCase):
def test_int_port_param(self):
psycopg2.connect(database='sony', port=6543)
dsn = " %s " % self.args[0]
- self.assertIn(" dbname=sony ", dsn)
- self.assertIn(" port=6543 ", dsn)
+ self.assert_(" dbname=sony " in dsn, dsn)
+ self.assert_(" port=6543 " in dsn, dsn)
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
diff --git a/tests/test_psycopg2_dbapi20.py b/tests/test_psycopg2_dbapi20.py
index 80473b7..c780d50 100755
--- a/tests/test_psycopg2_dbapi20.py
+++ b/tests/test_psycopg2_dbapi20.py
@@ -36,7 +36,31 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test):
connect_args = ()
connect_kw_args = {'dsn': dsn}
- lower_func = 'lower' # For stored procedure test
+ lower_func = 'lower' # For stored procedure test
+
+ def test_callproc(self):
+ # Until DBAPI 2.0 compliance, callproc should return None or it's just
+ # misleading. Therefore, we will skip the return value test for
+ # callproc and only perform the fetch test.
+ #
+ # For what it's worth, the DBAPI2.0 test_callproc doesn't actually
+ # test for DBAPI2.0 compliance! It doesn't check for modified OUT and
+ # IN/OUT parameters in the return values!
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if self.lower_func and hasattr(cur,'callproc'):
+ cur.callproc(self.lower_func,('FOO',))
+ r = cur.fetchall()
+ self.assertEqual(len(r),1,'callproc produced no result set')
+ self.assertEqual(len(r[0]),1,
+ 'callproc produced invalid result set'
+ )
+ self.assertEqual(r[0][0],'foo',
+ 'callproc produced invalid results'
+ )
+ finally:
+ con.close()
def test_setoutputsize(self):
# psycopg2's setoutputsize() is a no-op
diff --git a/tests/test_quote.py b/tests/test_quote.py
index f74fd85..72c9c1e 100755
--- a/tests/test_quote.py
+++ b/tests/test_quote.py
@@ -65,11 +65,13 @@ class QuotingTestCase(ConnectingTestCase):
curs = self.conn.cursor()
data = 'abcd\x01\x00cdefg'
- with self.assertRaises(ValueError) as e:
+ try:
curs.execute("SELECT %s", (data,))
-
- self.assertEquals(str(e.exception),
- 'A string literal cannot contain NUL (0x00) characters.')
+ except ValueError as e:
+ self.assertEquals(str(e),
+ 'A string literal cannot contain NUL (0x00) characters.')
+ else:
+ self.fail("ValueError not raised")
def test_binary(self):
data = b"""some data with \000\013 binary
diff --git a/tests/test_replication.py b/tests/test_replication.py
index ca99038..79d1295 100644..100755
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -23,23 +23,19 @@
# License for more details.
import psycopg2
-import psycopg2.extensions
from psycopg2.extras import (
PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication)
import testconfig
-from testutils import unittest
-from testutils import skip_before_postgres
-from testutils import ConnectingTestCase
+from testutils import unittest, ConnectingTestCase
+from testutils import skip_before_postgres, skip_if_green
+
+skip_repl_if_green = skip_if_green("replication not supported in green mode")
class ReplicationTestCase(ConnectingTestCase):
def setUp(self):
- if not testconfig.repl_dsn:
- self.skipTest("replication tests disabled by default")
-
super(ReplicationTestCase, self).setUp()
-
self.slot = testconfig.repl_slot
self._slots = []
@@ -93,6 +89,20 @@ class ReplicationTest(ReplicationTestCase):
cur.execute("IDENTIFY_SYSTEM")
cur.fetchall()
+ @skip_before_postgres(9, 0)
+ def test_datestyle(self):
+ if testconfig.repl_dsn is None:
+ return self.skipTest("replication tests disabled by default")
+
+ conn = self.repl_connect(
+ dsn=testconfig.repl_dsn, options='-cdatestyle=german',
+ connection_factory=PhysicalReplicationConnection)
+ if conn is None:
+ return
+ cur = conn.cursor()
+ cur.execute("IDENTIFY_SYSTEM")
+ cur.fetchall()
+
@skip_before_postgres(9, 4)
def test_logical_replication_connection(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
@@ -114,6 +124,7 @@ class ReplicationTest(ReplicationTestCase):
psycopg2.ProgrammingError, self.create_replication_slot, cur)
@skip_before_postgres(9, 4) # slots require 9.4
+ @skip_repl_if_green
def test_start_on_missing_replication_slot(self):
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
if conn is None:
@@ -127,6 +138,7 @@ class ReplicationTest(ReplicationTestCase):
cur.start_replication(self.slot)
@skip_before_postgres(9, 4) # slots require 9.4
+ @skip_repl_if_green
def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
@@ -148,6 +160,7 @@ class ReplicationTest(ReplicationTestCase):
cur.start_replication(slot_name=self.slot)
@skip_before_postgres(9, 4) # slots require 9.4
+ @skip_repl_if_green
def test_stop_replication(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
@@ -167,12 +180,13 @@ class ReplicationTest(ReplicationTestCase):
class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4) # slots require 9.4
+ @skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
- self.wait(conn)
+
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
diff --git a/tests/testutils.py b/tests/testutils.py
index d0a34bc..9347735 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -122,13 +122,25 @@ class ConnectingTestCase(unittest.TestCase):
Should raise a skip test if not available, but guard for None on
old Python versions.
"""
+ if repl_dsn is None:
+ return self.skipTest("replication tests disabled by default")
+
if 'dsn' not in kwargs:
kwargs['dsn'] = repl_dsn
import psycopg2
try:
conn = self.connect(**kwargs)
+ if conn.async == 1:
+ self.wait(conn)
except psycopg2.OperationalError, e:
- return self.skipTest("replication db not configured: %s" % e)
+ # If pgcode is not set it is a genuine connection error
+ # Otherwise we tried to run some bad operation in the connection
+ # (e.g. bug #482) and we'd rather know that.
+ if e.pgcode is None:
+ return self.skipTest("replication db not configured: %s" % e)
+ else:
+ raise
+
return conn
def _get_conn(self):