From 734845b79adc733bd9b7414602111824c55fdb2c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Fri, 8 Mar 2019 03:24:19 +0000 Subject: Added pq_get_result_async() replaced pg_get_last_result() The new function keeps together PQconsumeInput() with PQisBusy(), in order to handle the condition in which not all the results of a sequence of statements arrive in the same roundtrip. Added pointer to a PGresult to the connection to keep the state across async communication: it can probably be used to simplify other code paths where a result is brought forward manually. Close #802 Close #855 Close #856 --- tests/test_async.py | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) (limited to 'tests/test_async.py') diff --git a/tests/test_async.py b/tests/test_async.py index 057deac..21ef7fe 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -29,6 +29,7 @@ import unittest import warnings import psycopg2 +import psycopg2.errors from psycopg2 import extensions as ext from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow @@ -405,6 +406,15 @@ class AsyncTests(ConnectingTestCase): cur.execute("delete from table1") self.wait(cur) + def test_stop_on_first_error(self): + cur = self.conn.cursor() + cur.execute("select 1; select x; select 1/0; select 2") + self.assertRaises(psycopg2.errors.UndefinedColumn, self.wait, cur) + + cur.execute("select 1") + self.wait(cur) + self.assertEqual(cur.fetchone(), (1,)) + def test_error_two_cursors(self): cur = self.conn.cursor() cur2 = self.conn.cursor() @@ -454,6 +464,37 @@ class AsyncTests(ConnectingTestCase): cur.execute("copy (select 1) to stdout") self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn) + @slow + @skip_before_postgres(9, 0) + def test_non_block_after_notification(self): + from select import select + + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = 0 + while True: + state = self.conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_READ: + select([self.conn], [], [], 0.1) + elif state == psycopg2.extensions.POLL_WRITE: + select([], [self.conn], [], 0.1) + else: + raise Exception("Unexpected result from poll: %r", state) + polls += 1 + + self.assert_(polls >= 8, polls) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) -- cgit v1.2.1