summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rwxr-xr-xtests/__init__.py12
-rwxr-xr-xtests/test_async.py21
-rwxr-xr-xtests/test_async_keyword.py217
-rwxr-xr-xtests/test_cancel.py7
-rwxr-xr-xtests/test_connection.py21
-rwxr-xr-xtests/test_copy.py9
-rwxr-xr-xtests/test_cursor.py5
-rwxr-xr-xtests/test_errcodes.py8
-rwxr-xr-xtests/test_fast_executemany.py237
-rwxr-xr-xtests/test_green.py3
-rwxr-xr-xtests/test_lobject.py5
-rwxr-xr-xtests/test_module.py36
-rwxr-xr-xtests/test_notify.py10
-rwxr-xr-xtests/test_replication.py2
-rwxr-xr-xtests/test_transaction.py4
-rwxr-xr-xtests/test_types_extras.py17
-rw-r--r--tests/testutils.py30
17 files changed, 583 insertions, 61 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
index 95f5b9d..85a4ec9 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -22,6 +22,11 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+# Convert warnings into errors here. We can't do it with -W because on
+# Travis importing site raises a warning.
+import warnings
+warnings.simplefilter('error') # noqa
+
import sys
from testconfig import dsn
from testutils import unittest
@@ -36,6 +41,7 @@ import test_cursor
import test_dates
import test_errcodes
import test_extras_dictcursor
+import test_fast_executemany
import test_green
import test_ipaddress
import test_lobject
@@ -50,6 +56,9 @@ import test_types_basic
import test_types_extras
import test_with
+if sys.version_info[:2] < (3, 6):
+ import test_async_keyword
+
def test_suite():
# If connection to test db fails, bail out early.
@@ -65,6 +74,8 @@ def test_suite():
suite = unittest.TestSuite()
suite.addTest(test_async.test_suite())
+ if sys.version_info[:2] < (3, 6):
+ suite.addTest(test_async_keyword.test_suite())
suite.addTest(test_bugX000.test_suite())
suite.addTest(test_bug_gc.test_suite())
suite.addTest(test_cancel.test_suite())
@@ -74,6 +85,7 @@ def test_suite():
suite.addTest(test_dates.test_suite())
suite.addTest(test_errcodes.test_suite())
suite.addTest(test_extras_dictcursor.test_suite())
+ suite.addTest(test_fast_executemany.test_suite())
suite.addTest(test_green.test_suite())
suite.addTest(test_ipaddress.test_suite())
suite.addTest(test_lobject.test_suite())
diff --git a/tests/test_async.py b/tests/test_async.py
index 6f8fed5..63a5513 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -23,7 +23,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
-from testutils import unittest, skip_before_postgres
+from testutils import unittest, skip_before_postgres, slow
import psycopg2
from psycopg2 import extensions
@@ -55,7 +55,7 @@ class AsyncTests(ConnectingTestCase):
ConnectingTestCase.setUp(self)
self.sync_conn = self.conn
- self.conn = self.connect(async=True)
+ self.conn = self.connect(async_=True)
self.wait(self.conn)
@@ -71,8 +71,8 @@ class AsyncTests(ConnectingTestCase):
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
- self.assert_(self.conn.async)
- self.assert_(not self.sync_conn.async)
+ self.assert_(self.conn.async_)
+ self.assert_(not self.sync_conn.async_)
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
@@ -97,6 +97,7 @@ class AsyncTests(ConnectingTestCase):
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchone()[0], "a")
+ @slow
@skip_before_postgres(8, 2)
def test_async_callproc(self):
cur = self.conn.cursor()
@@ -107,6 +108,7 @@ class AsyncTests(ConnectingTestCase):
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchall()[0][0], '')
+ @slow
def test_async_after_async(self):
cur = self.conn.cursor()
cur2 = self.conn.cursor()
@@ -310,14 +312,15 @@ class AsyncTests(ConnectingTestCase):
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
- def __init__(self, dsn, async=0):
- psycopg2.extensions.connection.__init__(self, dsn, async=async)
+ def __init__(self, dsn, async_=0):
+ psycopg2.extensions.connection.__init__(self, dsn, async_=async_)
- conn = self.connect(connection_factory=MyConn, async=True)
+ conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn))
- self.assert_(conn.async)
+ self.assert_(conn.async_)
conn.close()
+ @slow
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
curs = self.conn.cursor()
@@ -438,7 +441,7 @@ class AsyncTests(ConnectingTestCase):
def test_async_connection_error_message(self):
try:
- cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
+ cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async_=True)
self.wait(cnn)
except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed",
diff --git a/tests/test_async_keyword.py b/tests/test_async_keyword.py
new file mode 100755
index 0000000..3b20e8d
--- /dev/null
+++ b/tests/test_async_keyword.py
@@ -0,0 +1,217 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# test_async_keyword.py - test for objects using 'async' as attribute/param
+#
+# Copyright (C) 2017 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+#
+# psycopg2 is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# In addition, as a special exception, the copyright holders give
+# permission to link this program with the OpenSSL library (or with
+# modified versions of OpenSSL that use the same license as OpenSSL),
+# and distribute linked combinations including the two.
+#
+# You must obey the GNU Lesser General Public License in all respects for
+# all of the code used other than OpenSSL.
+#
+# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+import psycopg2
+from psycopg2 import extras
+
+from testconfig import dsn
+from testutils import (ConnectingTestCase, unittest, skip_before_postgres,
+ assertDsnEqual)
+from test_replication import ReplicationTestCase, skip_repl_if_green
+from psycopg2.extras import LogicalReplicationConnection, StopReplication
+
+
+class AsyncTests(ConnectingTestCase):
+ def setUp(self):
+ ConnectingTestCase.setUp(self)
+
+ self.sync_conn = self.conn
+ self.conn = self.connect(async=True)
+
+ self.wait(self.conn)
+
+ curs = self.conn.cursor()
+ curs.execute('''
+ CREATE TEMPORARY TABLE table1 (
+ id int PRIMARY KEY
+ )''')
+ self.wait(curs)
+
+ def test_connection_setup(self):
+ cur = self.conn.cursor()
+ sync_cur = self.sync_conn.cursor()
+ del cur, sync_cur
+
+ self.assert_(self.conn.async)
+ self.assert_(not self.sync_conn.async)
+
+ # the async connection should be in isolevel 0
+ self.assertEquals(self.conn.isolation_level, 0)
+
+ # check other properties to be found on the connection
+ self.assert_(self.conn.server_version)
+ self.assert_(self.conn.protocol_version in (2, 3))
+ self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
+
+ def test_async_subclass(self):
+ class MyConn(psycopg2.extensions.connection):
+ def __init__(self, dsn, async=0):
+ psycopg2.extensions.connection.__init__(self, dsn, async=async)
+
+ conn = self.connect(connection_factory=MyConn, async=True)
+ self.assert_(isinstance(conn, MyConn))
+ self.assert_(conn.async)
+ conn.close()
+
+ def test_async_connection_error_message(self):
+ try:
+ cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
+ self.wait(cnn)
+ except psycopg2.Error, e:
+ self.assertNotEqual(str(e), "asynchronous connection failed",
+ "connection error reason lost")
+ else:
+ self.fail("no exception raised")
+
+
+class CancelTests(ConnectingTestCase):
+ def setUp(self):
+ ConnectingTestCase.setUp(self)
+
+ cur = self.conn.cursor()
+ cur.execute('''
+ CREATE TEMPORARY TABLE table1 (
+ id int PRIMARY KEY
+ )''')
+ self.conn.commit()
+
+ @skip_before_postgres(8, 2)
+ def test_async_cancel(self):
+ async_conn = psycopg2.connect(dsn, async=True)
+ self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
+ extras.wait_select(async_conn)
+ cur = async_conn.cursor()
+ cur.execute("select pg_sleep(10000)")
+ self.assertTrue(async_conn.isexecuting())
+ async_conn.cancel()
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ extras.wait_select, async_conn)
+ cur.execute("select 1")
+ extras.wait_select(async_conn)
+ self.assertEqual(cur.fetchall(), [(1, )])
+
+ def test_async_connection_cancel(self):
+ async_conn = psycopg2.connect(dsn, async=True)
+ async_conn.close()
+ self.assertTrue(async_conn.closed)
+
+
+class ConnectTestCase(unittest.TestCase):
+ def setUp(self):
+ self.args = None
+
+ def connect_stub(dsn, connection_factory=None, async=False):
+ self.args = (dsn, connection_factory, async)
+
+ self._connect_orig = psycopg2._connect
+ psycopg2._connect = connect_stub
+
+ def tearDown(self):
+ psycopg2._connect = self._connect_orig
+
+ def test_there_has_to_be_something(self):
+ self.assertRaises(TypeError, psycopg2.connect)
+ self.assertRaises(TypeError, psycopg2.connect,
+ connection_factory=lambda dsn, async=False: None)
+ self.assertRaises(TypeError, psycopg2.connect,
+ async=True)
+
+ def test_factory(self):
+ def f(dsn, async=False):
+ pass
+
+ psycopg2.connect(database='foo', host='baz', connection_factory=f)
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
+ self.assertEqual(self.args[1], f)
+ self.assertEqual(self.args[2], False)
+
+ psycopg2.connect("dbname=foo host=baz", connection_factory=f)
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
+ self.assertEqual(self.args[1], f)
+ self.assertEqual(self.args[2], False)
+
+ def test_async(self):
+ psycopg2.connect(database='foo', host='baz', async=1)
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
+ self.assertEqual(self.args[1], None)
+ self.assert_(self.args[2])
+
+ psycopg2.connect("dbname=foo host=baz", async=True)
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
+ self.assertEqual(self.args[1], None)
+ self.assert_(self.args[2])
+
+
+class AsyncReplicationTest(ReplicationTestCase):
+ @skip_before_postgres(9, 4) # slots require 9.4
+ @skip_repl_if_green
+ def test_async_replication(self):
+ conn = self.repl_connect(
+ connection_factory=LogicalReplicationConnection, async=1)
+ if conn is None:
+ return
+
+ cur = conn.cursor()
+
+ self.create_replication_slot(cur, output_plugin='test_decoding')
+ self.wait(cur)
+
+ cur.start_replication(self.slot)
+ self.wait(cur)
+
+ self.make_replication_events()
+
+ self.msg_count = 0
+
+ def consume(msg):
+ # just check the methods
+ "%s: %s" % (cur.io_timestamp, repr(msg))
+
+ self.msg_count += 1
+ if self.msg_count > 3:
+ cur.send_feedback(reply=True)
+ raise StopReplication()
+
+ cur.send_feedback(flush_lsn=msg.data_start)
+
+ # cannot be used in asynchronous mode
+ self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
+
+ def process_stream():
+ from select import select
+ while True:
+ msg = cur.read_message()
+ if msg:
+ consume(msg)
+ else:
+ select([cur], [], [])
+ self.assertRaises(StopReplication, process_stream)
+
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_cancel.py b/tests/test_cancel.py
index a8eb750..3a94e5f 100755
--- a/tests/test_cancel.py
+++ b/tests/test_cancel.py
@@ -30,7 +30,7 @@ import psycopg2.extensions
from psycopg2 import extras
from testconfig import dsn
-from testutils import unittest, ConnectingTestCase, skip_before_postgres
+from testutils import unittest, ConnectingTestCase, skip_before_postgres, slow
class CancelTests(ConnectingTestCase):
@@ -48,6 +48,7 @@ class CancelTests(ConnectingTestCase):
def test_empty_cancel(self):
self.conn.cancel()
+ @slow
@skip_before_postgres(8, 2)
def test_cancel(self):
errors = []
@@ -87,7 +88,7 @@ class CancelTests(ConnectingTestCase):
@skip_before_postgres(8, 2)
def test_async_cancel(self):
- async_conn = psycopg2.connect(dsn, async=True)
+ async_conn = psycopg2.connect(dsn, async_=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
@@ -101,7 +102,7 @@ class CancelTests(ConnectingTestCase):
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
- async_conn = psycopg2.connect(dsn, async=True)
+ async_conn = psycopg2.connect(dsn, async_=True)
async_conn.close()
self.assertTrue(async_conn.closed)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 833751b..ea8b8f5 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -33,9 +33,9 @@ import psycopg2.errorcodes
from psycopg2 import extensions as ext
from testutils import (
- unittest, decorate_all_tests, skip_if_no_superuser,
+ unittest, assertDsnEqual, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq,
- ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows)
+ ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
from testconfig import dsn, dbname
@@ -125,6 +125,7 @@ class ConnectionTests(ConnectingTestCase):
self.assert_('table3' in conn.notices[2])
self.assert_('table4' in conn.notices[3])
+ @slow
def test_notices_limited(self):
conn = self.conn
cur = conn.cursor()
@@ -138,6 +139,7 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual(50, len(conn.notices))
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
+ @slow
def test_notices_deque(self):
from collections import deque
@@ -196,6 +198,7 @@ class ConnectionTests(ConnectingTestCase):
self.assertRaises(psycopg2.NotSupportedError,
cnn.xid, 42, "foo", "bar")
+ @slow
@skip_before_postgres(8, 2)
def test_concurrent_execution(self):
def slave():
@@ -246,6 +249,7 @@ class ConnectionTests(ConnectingTestCase):
gc.collect()
self.assert_(w() is None)
+ @slow
def test_commit_concurrency(self):
# The problem is the one reported in ticket #103. Because of bad
# status check, we commit even when a commit is already on its way.
@@ -392,9 +396,6 @@ class ParseDsnTestCase(ConnectingTestCase):
class MakeDsnTestCase(ConnectingTestCase):
- def assertDsnEqual(self, dsn1, dsn2):
- self.assertEqual(set(dsn1.split()), set(dsn2.split()))
-
def test_empty_arguments(self):
self.assertEqual(ext.make_dsn(), '')
@@ -412,7 +413,7 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_empty_param(self):
dsn = ext.make_dsn(dbname='sony', password='')
- self.assertDsnEqual(dsn, "dbname=sony password=''")
+ assertDsnEqual(self, dsn, "dbname=sony password=''")
def test_escape(self):
dsn = ext.make_dsn(dbname='hello world')
@@ -435,10 +436,10 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_params_merging(self):
dsn = ext.make_dsn('dbname=foo host=bar', host='baz')
- self.assertDsnEqual(dsn, 'dbname=foo host=baz')
+ assertDsnEqual(self, dsn, 'dbname=foo host=baz')
dsn = ext.make_dsn('dbname=foo', user='postgres')
- self.assertDsnEqual(dsn, 'dbname=foo user=postgres')
+ assertDsnEqual(self, dsn, 'dbname=foo user=postgres')
def test_no_dsn_munging(self):
dsnin = 'dbname=a host=b user=c password=d'
@@ -452,7 +453,7 @@ class MakeDsnTestCase(ConnectingTestCase):
self.assertEqual(dsn, url)
dsn = ext.make_dsn(url, application_name='woot')
- self.assertDsnEqual(dsn,
+ assertDsnEqual(self, dsn,
'dbname=test user=tester password=secret application_name=woot')
self.assertRaises(psycopg2.ProgrammingError,
@@ -899,6 +900,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
(dbname,))
self.assertEqual('42_Z3RyaWQ=_YnF1YWw=', cur.fetchone()[0])
+ @slow
def test_xid_roundtrip(self):
for fid, gtrid, bqual in [
(0, "", ""),
@@ -921,6 +923,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_rollback(xid)
+ @slow
def test_unparsed_roundtrip(self):
for tid in [
'',
diff --git a/tests/test_copy.py b/tests/test_copy.py
index ac42c98..3aa509b 100755
--- a/tests/test_copy.py
+++ b/tests/test_copy.py
@@ -24,8 +24,8 @@
import sys
import string
-from testutils import unittest, ConnectingTestCase, decorate_all_tests
-from testutils import skip_if_no_iobase, skip_before_postgres
+from testutils import (unittest, ConnectingTestCase, decorate_all_tests,
+ skip_if_no_iobase, skip_before_postgres, slow)
from cStringIO import StringIO
from itertools import cycle, izip
from subprocess import Popen, PIPE
@@ -77,6 +77,7 @@ class CopyTests(ConnectingTestCase):
data text
)''')
+ @slow
def test_copy_from(self):
curs = self.conn.cursor()
try:
@@ -84,6 +85,7 @@ class CopyTests(ConnectingTestCase):
finally:
curs.close()
+ @slow
def test_copy_from_insane_size(self):
# Trying to trigger a "would block" error
curs = self.conn.cursor()
@@ -120,6 +122,7 @@ class CopyTests(ConnectingTestCase):
self.assertRaises(ZeroDivisionError,
curs.copy_from, MinimalRead(f), "tcopy", columns=cols())
+ @slow
def test_copy_to(self):
curs = self.conn.cursor()
try:
@@ -309,6 +312,7 @@ class CopyTests(ConnectingTestCase):
curs.copy_from, StringIO('aaa\nbbb\nccc\n'), 'tcopy')
self.assertEqual(curs.rowcount, -1)
+ @slow
def test_copy_from_segfault(self):
# issue #219
script = ("""\
@@ -327,6 +331,7 @@ conn.close()
proc.communicate()
self.assertEqual(0, proc.returncode)
+ @slow
def test_copy_to_segfault(self):
# issue #219
script = ("""\
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index fc924c4..a8fedcc 100755
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -26,8 +26,8 @@ import time
import pickle
import psycopg2
import psycopg2.extensions
-from testutils import unittest, ConnectingTestCase, skip_before_postgres
-from testutils import skip_if_no_namedtuple, skip_if_no_getrefcount
+from testutils import (unittest, ConnectingTestCase, skip_before_postgres,
+ skip_if_no_namedtuple, skip_if_no_getrefcount, slow)
class CursorTests(ConnectingTestCase):
@@ -331,6 +331,7 @@ class CursorTests(ConnectingTestCase):
curs.scroll(2)
self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
+ @slow
@skip_before_postgres(8, 2)
def test_iter_named_cursor_efficient(self):
curs = self.conn.cursor('tmp')
diff --git a/tests/test_errcodes.py b/tests/test_errcodes.py
index 6865194..4848a76 100755
--- a/tests/test_errcodes.py
+++ b/tests/test_errcodes.py
@@ -22,18 +22,22 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
-from testutils import unittest, ConnectingTestCase
+from testutils import unittest, ConnectingTestCase, slow
try:
reload
except NameError:
- from imp import reload
+ try:
+ from importlib import reload
+ except ImportError:
+ from imp import reload
from threading import Thread
from psycopg2 import errorcodes
class ErrocodeTests(ConnectingTestCase):
+ @slow
def test_lookup_threadsafe(self):
# Increase if it does not fail with KeyError
diff --git a/tests/test_fast_executemany.py b/tests/test_fast_executemany.py
new file mode 100755
index 0000000..9222274
--- /dev/null
+++ b/tests/test_fast_executemany.py
@@ -0,0 +1,237 @@
+#!/usr/bin/env python
+#
+# test_fast_executemany.py - tests for fast executemany implementations
+#
+# Copyright (C) 2017 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+#
+# psycopg2 is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+import unittest
+from datetime import date
+
+from testutils import ConnectingTestCase
+
+import psycopg2
+import psycopg2.extras
+import psycopg2.extensions as ext
+
+
+class TestPaginate(unittest.TestCase):
+ def test_paginate(self):
+ def pag(seq):
+ return psycopg2.extras._paginate(seq, 100)
+
+ self.assertEqual(list(pag([])), [])
+ self.assertEqual(list(pag([1])), [[1]])
+ self.assertEqual(list(pag(range(99))), [list(range(99))])
+ self.assertEqual(list(pag(range(100))), [list(range(100))])
+ self.assertEqual(list(pag(range(101))), [list(range(100)), [100]])
+ self.assertEqual(
+ list(pag(range(200))), [list(range(100)), list(range(100, 200))])
+ self.assertEqual(
+ list(pag(range(1000))),
+ [list(range(i * 100, (i + 1) * 100)) for i in range(10)])
+
+
+class FastExecuteTestMixin(object):
+ def setUp(self):
+ super(FastExecuteTestMixin, self).setUp()
+ cur = self.conn.cursor()
+ cur.execute("""create table testfast (
+ id serial primary key, date date, val int, data text)""")
+
+
+class TestExecuteBatch(FastExecuteTestMixin, ConnectingTestCase):
+ def test_empty(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ [])
+ cur.execute("select * from testfast order by id")
+ self.assertEqual(cur.fetchall(), [])
+
+ def test_one(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ iter([(1, 10)]))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(1, 10)])
+
+ def test_tuples(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, date, val) values (%s, %s, %s)",
+ ((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
+ cur.execute("select id, date, val from testfast order by id")
+ self.assertEqual(cur.fetchall(),
+ [(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
+
+ def test_many(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ ((i, i * 10) for i in range(1000)))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
+
+ def test_pages(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, val) values (%s, %s)",
+ ((i, i * 10) for i in range(25)),
+ page_size=10)
+
+ # last command was 5 statements
+ self.assertEqual(sum(c == u';' for c in cur.query.decode('ascii')), 4)
+
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
+
+ def test_unicode(self):
+ cur = self.conn.cursor()
+ ext.register_type(ext.UNICODE, cur)
+ snowman = u"\u2603"
+
+ # unicode in statement
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
+ [(1, 'x')])
+ cur.execute("select id, data from testfast where id = 1")
+ self.assertEqual(cur.fetchone(), (1, 'x'))
+
+ # unicode in data
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, data) values (%s, %s)",
+ [(2, snowman)])
+ cur.execute("select id, data from testfast where id = 2")
+ self.assertEqual(cur.fetchone(), (2, snowman))
+
+ # unicode in both
+ psycopg2.extras.execute_batch(cur,
+ "insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
+ [(3, snowman)])
+ cur.execute("select id, data from testfast where id = 3")
+ self.assertEqual(cur.fetchone(), (3, snowman))
+
+
+class TestExecuteValuse(FastExecuteTestMixin, ConnectingTestCase):
+ def test_empty(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ [])
+ cur.execute("select * from testfast order by id")
+ self.assertEqual(cur.fetchall(), [])
+
+ def test_one(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ iter([(1, 10)]))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(1, 10)])
+
+ def test_tuples(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, date, val) values %s",
+ ((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
+ cur.execute("select id, date, val from testfast order by id")
+ self.assertEqual(cur.fetchall(),
+ [(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
+
+ def test_dicts(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, date, val) values %s",
+ (dict(id=i, date=date(2017, 1, i + 1), val=i * 10, foo="bar")
+ for i in range(10)),
+ template='(%(id)s, %(date)s, %(val)s)')
+ cur.execute("select id, date, val from testfast order by id")
+ self.assertEqual(cur.fetchall(),
+ [(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
+
+ def test_many(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ ((i, i * 10) for i in range(1000)))
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
+
+ def test_pages(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, val) values %s",
+ ((i, i * 10) for i in range(25)),
+ page_size=10)
+
+ # last statement was 5 tuples (one parens is for the fields list)
+ self.assertEqual(sum(c == '(' for c in cur.query.decode('ascii')), 6)
+
+ cur.execute("select id, val from testfast order by id")
+ self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
+
+ def test_unicode(self):
+ cur = self.conn.cursor()
+ ext.register_type(ext.UNICODE, cur)
+ snowman = u"\u2603"
+
+ # unicode in statement
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %%s -- %s" % snowman,
+ [(1, 'x')])
+ cur.execute("select id, data from testfast where id = 1")
+ self.assertEqual(cur.fetchone(), (1, 'x'))
+
+ # unicode in data
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %s",
+ [(2, snowman)])
+ cur.execute("select id, data from testfast where id = 2")
+ self.assertEqual(cur.fetchone(), (2, snowman))
+
+ # unicode in both
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %%s -- %s" % snowman,
+ [(3, snowman)])
+ cur.execute("select id, data from testfast where id = 3")
+ self.assertEqual(cur.fetchone(), (3, snowman))
+
+ def test_invalid_sql(self):
+ cur = self.conn.cursor()
+ self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
+ "insert", [])
+ self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
+ "insert %s and %s", [])
+ self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
+ "insert %f", [])
+ self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
+ "insert %f %s", [])
+
+ def test_percent_escape(self):
+ cur = self.conn.cursor()
+ psycopg2.extras.execute_values(cur,
+ "insert into testfast (id, data) values %s -- a%%b",
+ [(1, 'hi')])
+ self.assert_(b'a%%b' not in cur.query)
+ self.assert_(b'a%b' in cur.query)
+
+ cur.execute("select id, data from testfast")
+ self.assertEqual(cur.fetchall(), [(1, 'hi')])
+
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_green.py b/tests/test_green.py
index 0424a2c..6d1571d 100755
--- a/tests/test_green.py
+++ b/tests/test_green.py
@@ -27,7 +27,7 @@ import psycopg2
import psycopg2.extensions
import psycopg2.extras
-from testutils import ConnectingTestCase
+from testutils import ConnectingTestCase, slow
class ConnectionStub(object):
@@ -61,6 +61,7 @@ class GreenTestCase(ConnectingTestCase):
lambda conn: psycopg2.extras.wait_select(stub))
return stub
+ @slow
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.conn
diff --git a/tests/test_lobject.py b/tests/test_lobject.py
index 4da20e9..3379ec0 100755
--- a/tests/test_lobject.py
+++ b/tests/test_lobject.py
@@ -29,8 +29,8 @@ from functools import wraps
import psycopg2
import psycopg2.extensions
-from testutils import unittest, decorate_all_tests, skip_if_tpc_disabled
-from testutils import ConnectingTestCase, skip_if_green
+from testutils import (unittest, decorate_all_tests, skip_if_tpc_disabled,
+ ConnectingTestCase, skip_if_green, slow)
def skip_if_no_lo(f):
@@ -191,6 +191,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertEqual(x, u"some")
self.assertEqual(lo.read(), u" data " + snowman)
+ @slow
def test_read_large(self):
lo = self.conn.lobject()
data = "data" * 1000000
diff --git a/tests/test_module.py b/tests/test_module.py
index 6a1606d..8b31a09 100755
--- a/tests/test_module.py
+++ b/tests/test_module.py
@@ -26,8 +26,8 @@ import os
import sys
from subprocess import Popen
-from testutils import unittest, skip_before_python, skip_before_postgres
-from testutils import ConnectingTestCase, skip_copy_if_green, script_to_py3
+from testutils import (unittest, skip_before_python, skip_before_postgres,
+ ConnectingTestCase, skip_copy_if_green, script_to_py3, assertDsnEqual, slow)
import psycopg2
@@ -36,24 +36,21 @@ class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
- def conect_stub(dsn, connection_factory=None, async=False):
- self.args = (dsn, connection_factory, async)
+ def connect_stub(dsn, connection_factory=None, async_=False):
+ self.args = (dsn, connection_factory, async_)
self._connect_orig = psycopg2._connect
- psycopg2._connect = conect_stub
+ psycopg2._connect = connect_stub
def tearDown(self):
psycopg2._connect = self._connect_orig
- def assertDsnEqual(self, dsn1, dsn2):
- self.assertEqual(set(dsn1.split()), set(dsn2.split()))
-
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
- connection_factory=lambda dsn, async=False: None)
+ connection_factory=lambda dsn, async_=False: None)
self.assertRaises(TypeError, psycopg2.connect,
- async=True)
+ async_=True)
def test_no_keywords(self):
psycopg2.connect('')
@@ -92,27 +89,27 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'options=stuff')
def test_factory(self):
- def f(dsn, async=False):
+ def f(dsn, async_=False):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
- self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
- self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
- psycopg2.connect(database='foo', host='baz', async=1)
- self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
+ psycopg2.connect(database='foo', host='baz', async_=1)
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
- psycopg2.connect("dbname=foo host=baz", async=True)
- self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
+ psycopg2.connect("dbname=foo host=baz", async_=True)
+ assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
@@ -124,7 +121,7 @@ class ConnectTestCase(unittest.TestCase):
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
- self.assertDsnEqual(self.args[0], "dbname=sony password=''")
+ assertDsnEqual(self, self.args[0], "dbname=sony password=''")
def test_escape(self):
psycopg2.connect(database='hello world')
@@ -147,7 +144,7 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'dbname=bar')
psycopg2.connect('dbname=foo', user='postgres')
- self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
+ assertDsnEqual(self, self.args[0], 'dbname=foo user=postgres')
class ExceptionsTestCase(ConnectingTestCase):
@@ -311,6 +308,7 @@ class ExceptionsTestCase(ConnectingTestCase):
class TestExtensionModule(unittest.TestCase):
+ @slow
def test_import_internal(self):
# check that the internal package can be imported "naked"
# we may break this property if there is a compelling reason to do so,
diff --git a/tests/test_notify.py b/tests/test_notify.py
index 1a0ac45..0e74e1d 100755
--- a/tests/test_notify.py
+++ b/tests/test_notify.py
@@ -26,7 +26,7 @@ from testutils import unittest
import psycopg2
from psycopg2 import extensions
-from testutils import ConnectingTestCase, script_to_py3
+from testutils import ConnectingTestCase, script_to_py3, slow
from testconfig import dsn
import sys
@@ -72,6 +72,7 @@ conn.close()
return Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
+ @slow
def test_notifies_received_on_poll(self):
self.autocommit(self.conn)
self.listen('foo')
@@ -90,6 +91,7 @@ conn.close()
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
+ @slow
def test_many_notifies(self):
self.autocommit(self.conn)
for name in ['foo', 'bar', 'baz']:
@@ -109,6 +111,7 @@ conn.close()
self.assertEqual(pids[name], pid)
names.pop(name) # raise if name found twice
+ @slow
def test_notifies_received_on_execute(self):
self.autocommit(self.conn)
self.listen('foo')
@@ -119,6 +122,7 @@ conn.close()
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
+ @slow
def test_notify_object(self):
self.autocommit(self.conn)
self.listen('foo')
@@ -128,6 +132,7 @@ conn.close()
notify = self.conn.notifies[0]
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
+ @slow
def test_notify_attributes(self):
self.autocommit(self.conn)
self.listen('foo')
@@ -140,6 +145,7 @@ conn.close()
self.assertEqual('foo', notify.channel)
self.assertEqual('', notify.payload)
+ @slow
def test_notify_payload(self):
if self.conn.server_version < 90000:
return self.skipTest("server version %s doesn't support notify payload"
@@ -155,6 +161,7 @@ conn.close()
self.assertEqual('foo', notify.channel)
self.assertEqual('Hello, world!', notify.payload)
+ @slow
def test_notify_deque(self):
from collections import deque
self.autocommit(self.conn)
@@ -167,6 +174,7 @@ conn.close()
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
self.assertEqual(len(self.conn.notifies), 0)
+ @slow
def test_notify_noappend(self):
self.autocommit(self.conn)
self.conn.notifies = None
diff --git a/tests/test_replication.py b/tests/test_replication.py
index 79d1295..0aed578 100755
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -183,7 +183,7 @@ class AsyncReplicationTest(ReplicationTestCase):
@skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
- connection_factory=LogicalReplicationConnection, async=1)
+ connection_factory=LogicalReplicationConnection, async_=1)
if conn is None:
return
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 2dc44ec..36947de 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -23,7 +23,7 @@
# License for more details.
import threading
-from testutils import unittest, ConnectingTestCase, skip_before_postgres
+from testutils import unittest, ConnectingTestCase, skip_before_postgres, slow
import psycopg2
from psycopg2.extensions import (
@@ -131,6 +131,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
ConnectingTestCase.tearDown(self)
+ @slow
def test_deadlock(self):
self.thread1_error = self.thread2_error = None
step1 = threading.Event()
@@ -178,6 +179,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
self.assertTrue(isinstance(
error, psycopg2.extensions.TransactionRollbackError))
+ @slow
def test_serialisation_failure(self):
self.thread1_error = self.thread2_error = None
step1 = threading.Event()
diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py
index 8e61561..264fca8 100755
--- a/tests/test_types_extras.py
+++ b/tests/test_types_extras.py
@@ -17,14 +17,14 @@ from __future__ import with_statement
import re
import sys
+import warnings
from decimal import Decimal
from datetime import date, datetime
from functools import wraps
from pickle import dumps, loads
-from testutils import unittest, skip_if_no_uuid, skip_before_postgres
-from testutils import ConnectingTestCase, decorate_all_tests
-from testutils import py3_raises_typeerror
+from testutils import (unittest, skip_if_no_uuid, skip_before_postgres,
+ ConnectingTestCase, decorate_all_tests, py3_raises_typeerror, slow)
import psycopg2
import psycopg2.extras
@@ -77,7 +77,10 @@ class TypesExtrasTests(ConnectingTestCase):
self.failUnless(type(s) == list and len(s) == 0)
def testINET(self):
- psycopg2.extras.register_inet()
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ psycopg2.extras.register_inet()
+
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", (i,))
self.failUnless(i.addr == s.addr)
@@ -86,7 +89,10 @@ class TypesExtrasTests(ConnectingTestCase):
self.failUnless(s is None)
def testINETARRAY(self):
- psycopg2.extras.register_inet()
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ psycopg2.extras.register_inet()
+
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", ([i],))
self.failUnless(i.addr == s[0].addr)
@@ -708,6 +714,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
curs.execute("select (1,2)::type_ii")
self.assertRaises(psycopg2.DataError, curs.fetchone)
+ @slow
@skip_if_no_composite
@skip_before_postgres(8, 4)
def test_from_tables(self):
diff --git a/tests/testutils.py b/tests/testutils.py
index 9347735..f1c93de 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -50,7 +50,9 @@ else:
@wraps(f)
def skipIf__(self):
if cond:
- warnings.warn(msg)
+ with warnings.catch_warnings():
+ warnings.simplefilter('always', UserWarning)
+ warnings.warn(msg)
return
else:
return f(self)
@@ -61,7 +63,9 @@ else:
return skipIf(True, msg)
def skipTest(self, msg):
- warnings.warn(msg)
+ with warnings.catch_warnings():
+ warnings.simplefilter('always', UserWarning)
+ warnings.warn(msg)
return
unittest.TestCase.skipTest = skipTest
@@ -130,7 +134,7 @@ class ConnectingTestCase(unittest.TestCase):
import psycopg2
try:
conn = self.connect(**kwargs)
- if conn.async == 1:
+ if conn.async_ == 1:
self.wait(conn)
except psycopg2.OperationalError, e:
# If pgcode is not set it is a genuine connection error
@@ -447,7 +451,6 @@ def script_to_py3(script):
class py3_raises_typeerror(object):
-
def __enter__(self):
pass
@@ -455,3 +458,22 @@ class py3_raises_typeerror(object):
if sys.version_info[0] >= 3:
assert type is TypeError
return True
+
+
+def slow(f):
+ """Decorator to mark slow tests we may want to skip
+
+ Note: in order to find slow tests you can run:
+
+ make check 2>&1 | ts -i "%.s" | sort -n
+ """
+ @wraps(f)
+ def slow_(self):
+ if os.environ.get('PSYCOPG2_TEST_FAST'):
+ return self.skipTest("slow test")
+ return f(self)
+ return slow_
+
+
+def assertDsnEqual(testsuite, dsn1, dsn2):
+ testsuite.assertEqual(set(dsn1.split()), set(dsn2.split()))