summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-11-15 16:58:50 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2020-12-11 13:26:05 -0500
commitba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (patch)
tree038f2263d581d5e49d74731af68febc4bf64eb19 /test/dialect/postgresql
parent87d58b6d8188ccff808b3207d5f9398bb9adf9b9 (diff)
downloadsqlalchemy-ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41.tar.gz
correct for "autocommit" deprecation warning
Ensure no autocommit warnings occur internally or within tests. Also includes fixes for SQL Server full text tests which apparently have not been working at all for a long time, as it used long removed APIs. CI has not had fulltext running for some years and is now installed. Change-Id: Id806e1856c9da9f0a9eac88cebc7a94ecc95eb96
Diffstat (limited to 'test/dialect/postgresql')
-rw-r--r--test/dialect/postgresql/test_dialect.py291
-rw-r--r--test/dialect/postgresql/test_on_conflict.py894
-rw-r--r--test/dialect/postgresql/test_query.py220
-rw-r--r--test/dialect/postgresql/test_reflection.py180
-rw-r--r--test/dialect/postgresql/test_types.py2
5 files changed, 796 insertions, 791 deletions
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index 5cea604d6..3bd8e9da0 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -36,6 +36,7 @@ from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.engine import engine_from_config
from sqlalchemy.engine import url
+from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
@@ -51,7 +52,7 @@ from sqlalchemy.testing.assertions import eq_regex
from sqlalchemy.testing.assertions import ne_
from sqlalchemy.util import u
from sqlalchemy.util import ue
-from ...engine import test_execute
+from ...engine import test_deprecations
if True:
from sqlalchemy.dialects.postgresql.psycopg2 import (
@@ -195,6 +196,20 @@ class ExecuteManyMode(object):
options = None
+ @config.fixture()
+ def connection(self):
+ eng = engines.testing_engine(options=self.options)
+
+ conn = eng.connect()
+ trans = conn.begin()
+ try:
+ yield conn
+ finally:
+ if trans.is_active:
+ trans.rollback()
+ conn.close()
+ eng.dispose()
+
@classmethod
def define_tables(cls, metadata):
Table(
@@ -213,20 +228,12 @@ class ExecuteManyMode(object):
Column(ue("\u6e2c\u8a66"), Integer),
)
- def setup(self):
- super(ExecuteManyMode, self).setup()
- self.engine = engines.testing_engine(options=self.options)
-
- def teardown(self):
- self.engine.dispose()
- super(ExecuteManyMode, self).teardown()
-
- def test_insert(self):
+ def test_insert(self, connection):
from psycopg2 import extras
- values_page_size = self.engine.dialect.executemany_values_page_size
- batch_page_size = self.engine.dialect.executemany_batch_page_size
- if self.engine.dialect.executemany_mode & EXECUTEMANY_VALUES:
+ values_page_size = connection.dialect.executemany_values_page_size
+ batch_page_size = connection.dialect.executemany_batch_page_size
+ if connection.dialect.executemany_mode & EXECUTEMANY_VALUES:
meth = extras.execute_values
stmt = "INSERT INTO data (x, y) VALUES %s"
expected_kwargs = {
@@ -234,7 +241,7 @@ class ExecuteManyMode(object):
"page_size": values_page_size,
"fetch": False,
}
- elif self.engine.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ elif connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
meth = extras.execute_batch
stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
expected_kwargs = {"page_size": batch_page_size}
@@ -244,24 +251,23 @@ class ExecuteManyMode(object):
with mock.patch.object(
extras, meth.__name__, side_effect=meth
) as mock_exec:
- with self.engine.connect() as conn:
- conn.execute(
- self.tables.data.insert(),
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- )
+ connection.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
- eq_(
- conn.execute(select(self.tables.data)).fetchall(),
- [
- (1, "x1", "y1", 5),
- (2, "x2", "y2", 5),
- (3, "x3", "y3", 5),
- ],
- )
+ eq_(
+ connection.execute(select(self.tables.data)).fetchall(),
+ [
+ (1, "x1", "y1", 5),
+ (2, "x2", "y2", 5),
+ (3, "x3", "y3", 5),
+ ],
+ )
eq_(
mock_exec.mock_calls,
[
@@ -278,14 +284,13 @@ class ExecuteManyMode(object):
],
)
- def test_insert_no_page_size(self):
+ def test_insert_no_page_size(self, connection):
from psycopg2 import extras
- values_page_size = self.engine.dialect.executemany_values_page_size
- batch_page_size = self.engine.dialect.executemany_batch_page_size
+ values_page_size = connection.dialect.executemany_values_page_size
+ batch_page_size = connection.dialect.executemany_batch_page_size
- eng = self.engine
- if eng.dialect.executemany_mode & EXECUTEMANY_VALUES:
+ if connection.dialect.executemany_mode & EXECUTEMANY_VALUES:
meth = extras.execute_values
stmt = "INSERT INTO data (x, y) VALUES %s"
expected_kwargs = {
@@ -293,7 +298,7 @@ class ExecuteManyMode(object):
"page_size": values_page_size,
"fetch": False,
}
- elif eng.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ elif connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
meth = extras.execute_batch
stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
expected_kwargs = {"page_size": batch_page_size}
@@ -303,15 +308,14 @@ class ExecuteManyMode(object):
with mock.patch.object(
extras, meth.__name__, side_effect=meth
) as mock_exec:
- with eng.connect() as conn:
- conn.execute(
- self.tables.data.insert(),
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- )
+ connection.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
eq_(
mock_exec.mock_calls,
@@ -356,7 +360,7 @@ class ExecuteManyMode(object):
with mock.patch.object(
extras, meth.__name__, side_effect=meth
) as mock_exec:
- with eng.connect() as conn:
+ with eng.begin() as conn:
conn.execute(
self.tables.data.insert(),
[
@@ -398,11 +402,10 @@ class ExecuteManyMode(object):
eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)])
- def test_update_fallback(self):
+ def test_update_fallback(self, connection):
from psycopg2 import extras
- batch_page_size = self.engine.dialect.executemany_batch_page_size
- eng = self.engine
+ batch_page_size = connection.dialect.executemany_batch_page_size
meth = extras.execute_batch
stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s"
expected_kwargs = {"page_size": batch_page_size}
@@ -410,18 +413,17 @@ class ExecuteManyMode(object):
with mock.patch.object(
extras, meth.__name__, side_effect=meth
) as mock_exec:
- with eng.connect() as conn:
- conn.execute(
- self.tables.data.update()
- .where(self.tables.data.c.x == bindparam("xval"))
- .values(y=bindparam("yval")),
- [
- {"xval": "x1", "yval": "y5"},
- {"xval": "x3", "yval": "y6"},
- ],
- )
+ connection.execute(
+ self.tables.data.update()
+ .where(self.tables.data.c.x == bindparam("xval"))
+ .values(y=bindparam("yval")),
+ [
+ {"xval": "x1", "yval": "y5"},
+ {"xval": "x3", "yval": "y6"},
+ ],
+ )
- if eng.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ if connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
eq_(
mock_exec.mock_calls,
[
@@ -439,36 +441,34 @@ class ExecuteManyMode(object):
else:
eq_(mock_exec.mock_calls, [])
- def test_not_sane_rowcount(self):
- self.engine.connect().close()
- if self.engine.dialect.executemany_mode & EXECUTEMANY_BATCH:
- assert not self.engine.dialect.supports_sane_multi_rowcount
+ def test_not_sane_rowcount(self, connection):
+ if connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ assert not connection.dialect.supports_sane_multi_rowcount
else:
- assert self.engine.dialect.supports_sane_multi_rowcount
+ assert connection.dialect.supports_sane_multi_rowcount
- def test_update(self):
- with self.engine.connect() as conn:
- conn.execute(
- self.tables.data.insert(),
- [
- {"x": "x1", "y": "y1"},
- {"x": "x2", "y": "y2"},
- {"x": "x3", "y": "y3"},
- ],
- )
+ def test_update(self, connection):
+ connection.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
- conn.execute(
- self.tables.data.update()
- .where(self.tables.data.c.x == bindparam("xval"))
- .values(y=bindparam("yval")),
- [{"xval": "x1", "yval": "y5"}, {"xval": "x3", "yval": "y6"}],
- )
- eq_(
- conn.execute(
- select(self.tables.data).order_by(self.tables.data.c.id)
- ).fetchall(),
- [(1, "x1", "y5", 5), (2, "x2", "y2", 5), (3, "x3", "y6", 5)],
- )
+ connection.execute(
+ self.tables.data.update()
+ .where(self.tables.data.c.x == bindparam("xval"))
+ .values(y=bindparam("yval")),
+ [{"xval": "x1", "yval": "y5"}, {"xval": "x3", "yval": "y6"}],
+ )
+ eq_(
+ connection.execute(
+ select(self.tables.data).order_by(self.tables.data.c.id)
+ ).fetchall(),
+ [(1, "x1", "y5", 5), (2, "x2", "y2", 5), (3, "x3", "y6", 5)],
+ )
class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
@@ -578,7 +578,7 @@ class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
[(pk,) for pk in range(1 + first_pk, total_rows + first_pk)],
)
- def test_insert_w_newlines(self):
+ def test_insert_w_newlines(self, connection):
from psycopg2 import extras
t = self.tables.data
@@ -606,15 +606,14 @@ class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
extras, "execute_values", side_effect=meth
) as mock_exec:
- with self.engine.connect() as conn:
- conn.execute(
- ins,
- [
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ],
- )
+ connection.execute(
+ ins,
+ [
+ {"id": 1, "y": "y1", "z": 1},
+ {"id": 2, "y": "y2", "z": 2},
+ {"id": 3, "y": "y3", "z": 3},
+ ],
+ )
eq_(
mock_exec.mock_calls,
@@ -629,12 +628,12 @@ class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
),
template="(%(id)s, (SELECT 5 \nFROM data), %(y)s, %(z)s)",
fetch=False,
- page_size=conn.dialect.executemany_values_page_size,
+ page_size=connection.dialect.executemany_values_page_size,
)
],
)
- def test_insert_modified_by_event(self):
+ def test_insert_modified_by_event(self, connection):
from psycopg2 import extras
t = self.tables.data
@@ -664,33 +663,33 @@ class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
extras, "execute_batch", side_effect=meth
) as mock_batch:
- with self.engine.connect() as conn:
-
- # create an event hook that will change the statement to
- # something else, meaning the dialect has to detect that
- # insert_single_values_expr is no longer useful
- @event.listens_for(conn, "before_cursor_execute", retval=True)
- def before_cursor_execute(
- conn, cursor, statement, parameters, context, executemany
- ):
- statement = (
- "INSERT INTO data (id, y, z) VALUES "
- "(%(id)s, %(y)s, %(z)s)"
- )
- return statement, parameters
-
- conn.execute(
- ins,
- [
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ],
+ # create an event hook that will change the statement to
+ # something else, meaning the dialect has to detect that
+ # insert_single_values_expr is no longer useful
+ @event.listens_for(
+ connection, "before_cursor_execute", retval=True
+ )
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ statement = (
+ "INSERT INTO data (id, y, z) VALUES "
+ "(%(id)s, %(y)s, %(z)s)"
)
+ return statement, parameters
+
+ connection.execute(
+ ins,
+ [
+ {"id": 1, "y": "y1", "z": 1},
+ {"id": 2, "y": "y2", "z": 2},
+ {"id": 3, "y": "y3", "z": 3},
+ ],
+ )
eq_(mock_values.mock_calls, [])
- if self.engine.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ if connection.dialect.executemany_mode & EXECUTEMANY_BATCH:
eq_(
mock_batch.mock_calls,
[
@@ -727,10 +726,10 @@ class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
("values_only", EXECUTEMANY_VALUES),
("values_plus_batch", EXECUTEMANY_VALUES_PLUS_BATCH),
]:
- self.engine = engines.testing_engine(
+ connection = engines.testing_engine(
options={"executemany_mode": opt}
)
- is_(self.engine.dialect.executemany_mode, expected)
+ is_(connection.dialect.executemany_mode, expected)
def test_executemany_wrong_flag_options(self):
for opt in [1, True, "batch_insert"]:
@@ -1082,7 +1081,7 @@ $$ LANGUAGE plpgsql;
t.create(connection, checkfirst=True)
@testing.provide_metadata
- def test_schema_roundtrips(self):
+ def test_schema_roundtrips(self, connection):
meta = self.metadata
users = Table(
"users",
@@ -1091,33 +1090,37 @@ $$ LANGUAGE plpgsql;
Column("name", String(50)),
schema="test_schema",
)
- users.create()
- users.insert().execute(id=1, name="name1")
- users.insert().execute(id=2, name="name2")
- users.insert().execute(id=3, name="name3")
- users.insert().execute(id=4, name="name4")
+ users.create(connection)
+ connection.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=2, name="name2"))
+ connection.execute(users.insert(), dict(id=3, name="name3"))
+ connection.execute(users.insert(), dict(id=4, name="name4"))
eq_(
- users.select().where(users.c.name == "name2").execute().fetchall(),
+ connection.execute(
+ users.select().where(users.c.name == "name2")
+ ).fetchall(),
[(2, "name2")],
)
eq_(
- users.select(use_labels=True)
- .where(users.c.name == "name2")
- .execute()
- .fetchall(),
+ connection.execute(
+ users.select().apply_labels().where(users.c.name == "name2")
+ ).fetchall(),
[(2, "name2")],
)
- users.delete().where(users.c.id == 3).execute()
+ connection.execute(users.delete().where(users.c.id == 3))
eq_(
- users.select().where(users.c.name == "name3").execute().fetchall(),
+ connection.execute(
+ users.select().where(users.c.name == "name3")
+ ).fetchall(),
[],
)
- users.update().where(users.c.name == "name4").execute(name="newname")
+ connection.execute(
+ users.update().where(users.c.name == "name4"), dict(name="newname")
+ )
eq_(
- users.select(use_labels=True)
- .where(users.c.id == 4)
- .execute()
- .fetchall(),
+ connection.execute(
+ users.select().apply_labels().where(users.c.id == 4)
+ ).fetchall(),
[(4, "newname")],
)
@@ -1233,7 +1236,7 @@ $$ LANGUAGE plpgsql;
ne_(conn.connection.status, STATUS_IN_TRANSACTION)
-class AutocommitTextTest(test_execute.AutocommitTextTest):
+class AutocommitTextTest(test_deprecations.AutocommitTextTest):
__only_on__ = "postgresql"
def test_grant(self):
diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py
index 760487842..4e96cc6a2 100644
--- a/test/dialect/postgresql/test_on_conflict.py
+++ b/test/dialect/postgresql/test_on_conflict.py
@@ -99,28 +99,29 @@ class OnConflictTest(fixtures.TablesTest):
ValueError, insert(self.tables.users).on_conflict_do_update
)
- def test_on_conflict_do_nothing(self):
+ def test_on_conflict_do_nothing(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- result = conn.execute(
- insert(users).on_conflict_do_nothing(),
- dict(id=1, name="name1"),
- )
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
-
- result = conn.execute(
- insert(users).on_conflict_do_nothing(),
- dict(id=1, name="name2"),
- )
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
-
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(),
+ dict(id=1, name="name1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
+
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(),
+ dict(id=1, name="name2"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
def test_on_conflict_do_nothing_connectionless(self, connection):
users = self.tables.users_xtra
@@ -147,95 +148,99 @@ class OnConflictTest(fixtures.TablesTest):
)
@testing.provide_metadata
- def test_on_conflict_do_nothing_target(self):
+ def test_on_conflict_do_nothing_target(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- result = conn.execute(
- insert(users).on_conflict_do_nothing(
- index_elements=users.primary_key.columns
- ),
- dict(id=1, name="name1"),
- )
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
-
- result = conn.execute(
- insert(users).on_conflict_do_nothing(
- index_elements=users.primary_key.columns
- ),
- dict(id=1, name="name2"),
- )
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
-
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
-
- def test_on_conflict_do_update_one(self):
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=users.primary_key.columns
+ ),
+ dict(id=1, name="name1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
+
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=users.primary_key.columns
+ ),
+ dict(id=1, name="name2"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_update_one(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.id], set_=dict(name=i.excluded.name)
- )
- result = conn.execute(i, dict(id=1, name="name1"))
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id], set_=dict(name=i.excluded.name)
+ )
+ result = connection.execute(i, dict(id=1, name="name1"))
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
- def test_on_conflict_do_update_schema(self):
+ def test_on_conflict_do_update_schema(self, connection):
users = self.tables.get("%s.users_schema" % config.test_schema)
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.id], set_=dict(name=i.excluded.name)
- )
- result = conn.execute(i, dict(id=1, name="name1"))
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id], set_=dict(name=i.excluded.name)
+ )
+ result = connection.execute(i, dict(id=1, name="name1"))
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
- def test_on_conflict_do_update_column_as_key_set(self):
+ def test_on_conflict_do_update_column_as_key_set(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.id],
- set_={users.c.name: i.excluded.name},
- )
- result = conn.execute(i, dict(id=1, name="name1"))
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id],
+ set_={users.c.name: i.excluded.name},
+ )
+ result = connection.execute(i, dict(id=1, name="name1"))
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
- def test_on_conflict_do_update_clauseelem_as_key_set(self):
+ def test_on_conflict_do_update_clauseelem_as_key_set(self, connection):
users = self.tables.users
class MyElem(object):
@@ -245,162 +250,165 @@ class OnConflictTest(fixtures.TablesTest):
def __clause_element__(self):
return self.expr
- with testing.db.connect() as conn:
- conn.execute(
- users.insert(),
- {"id": 1, "name": "name1"},
- )
+ connection.execute(
+ users.insert(),
+ {"id": 1, "name": "name1"},
+ )
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.id],
- set_={MyElem(users.c.name): i.excluded.name},
- ).values({MyElem(users.c.id): 1, MyElem(users.c.name): "name1"})
- result = conn.execute(i)
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id],
+ set_={MyElem(users.c.name): i.excluded.name},
+ ).values({MyElem(users.c.id): 1, MyElem(users.c.name): "name1"})
+ result = connection.execute(i)
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
- def test_on_conflict_do_update_column_as_key_set_schema(self):
+ def test_on_conflict_do_update_column_as_key_set_schema(self, connection):
users = self.tables.get("%s.users_schema" % config.test_schema)
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.id],
- set_={users.c.name: i.excluded.name},
- )
- result = conn.execute(i, dict(id=1, name="name1"))
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id],
+ set_={users.c.name: i.excluded.name},
+ )
+ result = connection.execute(i, dict(id=1, name="name1"))
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name1")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1")],
+ )
- def test_on_conflict_do_update_two(self):
+ def test_on_conflict_do_update_two(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.id],
- set_=dict(id=i.excluded.id, name=i.excluded.name),
- )
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id],
+ set_=dict(id=i.excluded.id, name=i.excluded.name),
+ )
- result = conn.execute(i, dict(id=1, name="name2"))
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ result = connection.execute(i, dict(id=1, name="name2"))
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name2")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name2")],
+ )
- def test_on_conflict_do_update_three(self):
+ def test_on_conflict_do_update_three(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=users.primary_key.columns,
- set_=dict(name=i.excluded.name),
- )
- result = conn.execute(i, dict(id=1, name="name3"))
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(name=i.excluded.name),
+ )
+ result = connection.execute(i, dict(id=1, name="name3"))
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name3")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name3")],
+ )
- def test_on_conflict_do_update_four(self):
+ def test_on_conflict_do_update_four(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=users.primary_key.columns,
- set_=dict(id=i.excluded.id, name=i.excluded.name),
- ).values(id=1, name="name4")
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(id=i.excluded.id, name=i.excluded.name),
+ ).values(id=1, name="name4")
- result = conn.execute(i)
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ result = connection.execute(i)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name4")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name4")],
+ )
- def test_on_conflict_do_update_five(self):
+ def test_on_conflict_do_update_five(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=1, name="name1"))
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=users.primary_key.columns,
- set_=dict(id=10, name="I'm a name"),
- ).values(id=1, name="name4")
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(id=10, name="I'm a name"),
+ ).values(id=1, name="name4")
- result = conn.execute(i)
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
+ result = connection.execute(i)
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
- eq_(
- conn.execute(
- users.select().where(users.c.id == 10)
- ).fetchall(),
- [(10, "I'm a name")],
- )
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 10)
+ ).fetchall(),
+ [(10, "I'm a name")],
+ )
- def test_on_conflict_do_update_multivalues(self):
+ def test_on_conflict_do_update_multivalues(self, connection):
users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(id=1, name="name1"))
- conn.execute(users.insert(), dict(id=2, name="name2"))
-
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=users.primary_key.columns,
- set_=dict(name="updated"),
- where=(i.excluded.name != "name12"),
- ).values(
- [
- dict(id=1, name="name11"),
- dict(id=2, name="name12"),
- dict(id=3, name="name13"),
- dict(id=4, name="name14"),
- ]
- )
-
- result = conn.execute(i)
- eq_(result.inserted_primary_key, (None,))
- eq_(result.returned_defaults, None)
-
- eq_(
- conn.execute(users.select().order_by(users.c.id)).fetchall(),
- [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")],
- )
+ connection.execute(users.insert(), dict(id=1, name="name1"))
+ connection.execute(users.insert(), dict(id=2, name="name2"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(name="updated"),
+ where=(i.excluded.name != "name12"),
+ ).values(
+ [
+ dict(id=1, name="name11"),
+ dict(id=2, name="name12"),
+ dict(id=3, name="name13"),
+ dict(id=4, name="name14"),
+ ]
+ )
+
+ result = connection.execute(i)
+ eq_(result.inserted_primary_key, (None,))
+ eq_(result.returned_defaults, None)
+
+ eq_(
+ connection.execute(users.select().order_by(users.c.id)).fetchall(),
+ [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")],
+ )
def _exotic_targets_fixture(self, conn):
users = self.tables.users_xtra
@@ -429,260 +437,250 @@ class OnConflictTest(fixtures.TablesTest):
[(1, "name1", "name1@gmail.com", "not")],
)
- def test_on_conflict_do_update_exotic_targets_two(self):
+ def test_on_conflict_do_update_exotic_targets_two(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- self._exotic_targets_fixture(conn)
- # try primary key constraint: cause an upsert on unique id column
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=users.primary_key.columns,
- set_=dict(
- name=i.excluded.name, login_email=i.excluded.login_email
- ),
- )
- result = conn.execute(
- i,
- dict(
- id=1,
- name="name2",
- login_email="name1@gmail.com",
- lets_index_this="not",
- ),
- )
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, None)
-
- eq_(
- conn.execute(users.select().where(users.c.id == 1)).fetchall(),
- [(1, "name2", "name1@gmail.com", "not")],
- )
-
- def test_on_conflict_do_update_exotic_targets_three(self):
+ self._exotic_targets_fixture(connection)
+ # try primary key constraint: cause an upsert on unique id column
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+ result = connection.execute(
+ i,
+ dict(
+ id=1,
+ name="name2",
+ login_email="name1@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, None)
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name2", "name1@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_three(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- self._exotic_targets_fixture(conn)
- # try unique constraint: cause an upsert on target
- # login_email, not id
- i = insert(users)
- i = i.on_conflict_do_update(
- constraint=self.unique_constraint,
- set_=dict(
- id=i.excluded.id,
- name=i.excluded.name,
- login_email=i.excluded.login_email,
- ),
- )
- # note: lets_index_this value totally ignored in SET clause.
- result = conn.execute(
- i,
- dict(
- id=42,
- name="nameunique",
- login_email="name2@gmail.com",
- lets_index_this="unique",
- ),
- )
- eq_(result.inserted_primary_key, (42,))
- eq_(result.returned_defaults, None)
-
- eq_(
- conn.execute(
- users.select().where(
- users.c.login_email == "name2@gmail.com"
- )
- ).fetchall(),
- [(42, "nameunique", "name2@gmail.com", "not")],
- )
-
- def test_on_conflict_do_update_exotic_targets_four(self):
+ self._exotic_targets_fixture(connection)
+ # try unique constraint: cause an upsert on target
+ # login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ constraint=self.unique_constraint,
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+ # note: lets_index_this value totally ignored in SET clause.
+ result = connection.execute(
+ i,
+ dict(
+ id=42,
+ name="nameunique",
+ login_email="name2@gmail.com",
+ lets_index_this="unique",
+ ),
+ )
+ eq_(result.inserted_primary_key, (42,))
+ eq_(result.returned_defaults, None)
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.login_email == "name2@gmail.com")
+ ).fetchall(),
+ [(42, "nameunique", "name2@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_four(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- self._exotic_targets_fixture(conn)
- # try unique constraint by name: cause an
- # upsert on target login_email, not id
- i = insert(users)
- i = i.on_conflict_do_update(
- constraint=self.unique_constraint.name,
- set_=dict(
- id=i.excluded.id,
- name=i.excluded.name,
- login_email=i.excluded.login_email,
- ),
- )
- # note: lets_index_this value totally ignored in SET clause.
-
- result = conn.execute(
- i,
- dict(
- id=43,
- name="nameunique2",
- login_email="name2@gmail.com",
- lets_index_this="unique",
- ),
- )
- eq_(result.inserted_primary_key, (43,))
- eq_(result.returned_defaults, None)
-
- eq_(
- conn.execute(
- users.select().where(
- users.c.login_email == "name2@gmail.com"
- )
- ).fetchall(),
- [(43, "nameunique2", "name2@gmail.com", "not")],
- )
-
- def test_on_conflict_do_update_exotic_targets_four_no_pk(self):
+ self._exotic_targets_fixture(connection)
+ # try unique constraint by name: cause an
+ # upsert on target login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ constraint=self.unique_constraint.name,
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+ # note: lets_index_this value totally ignored in SET clause.
+
+ result = connection.execute(
+ i,
+ dict(
+ id=43,
+ name="nameunique2",
+ login_email="name2@gmail.com",
+ lets_index_this="unique",
+ ),
+ )
+ eq_(result.inserted_primary_key, (43,))
+ eq_(result.returned_defaults, None)
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.login_email == "name2@gmail.com")
+ ).fetchall(),
+ [(43, "nameunique2", "name2@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_four_no_pk(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- self._exotic_targets_fixture(conn)
- # try unique constraint by name: cause an
- # upsert on target login_email, not id
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.login_email],
- set_=dict(
- id=i.excluded.id,
- name=i.excluded.name,
- login_email=i.excluded.login_email,
- ),
- )
-
- result = conn.execute(
- i, dict(name="name3", login_email="name1@gmail.com")
- )
- eq_(result.inserted_primary_key, (1,))
- eq_(result.returned_defaults, (1,))
-
- eq_(
- conn.execute(users.select().order_by(users.c.id)).fetchall(),
- [
- (1, "name3", "name1@gmail.com", "not"),
- (2, "name2", "name2@gmail.com", "not"),
- ],
- )
-
- def test_on_conflict_do_update_exotic_targets_five(self):
+ self._exotic_targets_fixture(connection)
+ # try unique constraint by name: cause an
+ # upsert on target login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.login_email],
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+
+ result = connection.execute(
+ i, dict(name="name3", login_email="name1@gmail.com")
+ )
+ eq_(result.inserted_primary_key, (1,))
+ eq_(result.returned_defaults, (1,))
+
+ eq_(
+ connection.execute(users.select().order_by(users.c.id)).fetchall(),
+ [
+ (1, "name3", "name1@gmail.com", "not"),
+ (2, "name2", "name2@gmail.com", "not"),
+ ],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_five(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- self._exotic_targets_fixture(conn)
- # try bogus index
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=self.bogus_index.columns,
- index_where=self.bogus_index.dialect_options["postgresql"][
- "where"
- ],
- set_=dict(
- name=i.excluded.name, login_email=i.excluded.login_email
- ),
- )
-
- assert_raises(
- exc.ProgrammingError,
- conn.execute,
- i,
- dict(
- id=1,
- name="namebogus",
- login_email="bogus@gmail.com",
- lets_index_this="bogus",
- ),
- )
-
- def test_on_conflict_do_update_exotic_targets_six(self):
+ self._exotic_targets_fixture(connection)
+ # try bogus index
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=self.bogus_index.columns,
+ index_where=self.bogus_index.dialect_options["postgresql"][
+ "where"
+ ],
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+
+ assert_raises(
+ exc.ProgrammingError,
+ connection.execute,
+ i,
+ dict(
+ id=1,
+ name="namebogus",
+ login_email="bogus@gmail.com",
+ lets_index_this="bogus",
+ ),
+ )
+
+ def test_on_conflict_do_update_exotic_targets_six(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- conn.execute(
- insert(users),
+ connection.execute(
+ insert(users),
+ dict(
+ id=1,
+ name="name1",
+ login_email="mail1@gmail.com",
+ lets_index_this="unique_name",
+ ),
+ )
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=self.unique_partial_index.columns,
+ index_where=self.unique_partial_index.dialect_options[
+ "postgresql"
+ ]["where"],
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+
+ connection.execute(
+ i,
+ [
dict(
- id=1,
name="name1",
- login_email="mail1@gmail.com",
+ login_email="mail2@gmail.com",
lets_index_this="unique_name",
- ),
- )
-
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=self.unique_partial_index.columns,
- index_where=self.unique_partial_index.dialect_options[
- "postgresql"
- ]["where"],
- set_=dict(
- name=i.excluded.name, login_email=i.excluded.login_email
- ),
- )
-
- conn.execute(
- i,
- [
- dict(
- name="name1",
- login_email="mail2@gmail.com",
- lets_index_this="unique_name",
- )
- ],
- )
-
- eq_(
- conn.execute(users.select()).fetchall(),
- [(1, "name1", "mail2@gmail.com", "unique_name")],
- )
-
- def test_on_conflict_do_update_no_row_actually_affected(self):
+ )
+ ],
+ )
+
+ eq_(
+ connection.execute(users.select()).fetchall(),
+ [(1, "name1", "mail2@gmail.com", "unique_name")],
+ )
+
+ def test_on_conflict_do_update_no_row_actually_affected(self, connection):
users = self.tables.users_xtra
- with testing.db.connect() as conn:
- self._exotic_targets_fixture(conn)
- i = insert(users)
- i = i.on_conflict_do_update(
- index_elements=[users.c.login_email],
- set_=dict(name="new_name"),
- where=(i.excluded.name == "other_name"),
- )
- result = conn.execute(
- i, dict(name="name2", login_email="name1@gmail.com")
- )
-
- eq_(result.returned_defaults, None)
- eq_(result.inserted_primary_key, None)
-
- eq_(
- conn.execute(users.select()).fetchall(),
- [
- (1, "name1", "name1@gmail.com", "not"),
- (2, "name2", "name2@gmail.com", "not"),
- ],
- )
-
- def test_on_conflict_do_update_special_types_in_set(self):
+ self._exotic_targets_fixture(connection)
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.login_email],
+ set_=dict(name="new_name"),
+ where=(i.excluded.name == "other_name"),
+ )
+ result = connection.execute(
+ i, dict(name="name2", login_email="name1@gmail.com")
+ )
+
+ eq_(result.returned_defaults, None)
+ eq_(result.inserted_primary_key, None)
+
+ eq_(
+ connection.execute(users.select()).fetchall(),
+ [
+ (1, "name1", "name1@gmail.com", "not"),
+ (2, "name2", "name2@gmail.com", "not"),
+ ],
+ )
+
+ def test_on_conflict_do_update_special_types_in_set(self, connection):
bind_targets = self.tables.bind_targets
- with testing.db.connect() as conn:
- i = insert(bind_targets)
- conn.execute(i, {"id": 1, "data": "initial data"})
-
- eq_(
- conn.scalar(sql.select(bind_targets.c.data)),
- "initial data processed",
- )
-
- i = insert(bind_targets)
- i = i.on_conflict_do_update(
- index_elements=[bind_targets.c.id],
- set_=dict(data="new updated data"),
- )
- conn.execute(i, {"id": 1, "data": "new inserted data"})
-
- eq_(
- conn.scalar(sql.select(bind_targets.c.data)),
- "new updated data processed",
- )
+ i = insert(bind_targets)
+ connection.execute(i, {"id": 1, "data": "initial data"})
+
+ eq_(
+ connection.scalar(sql.select(bind_targets.c.data)),
+ "initial data processed",
+ )
+
+ i = insert(bind_targets)
+ i = i.on_conflict_do_update(
+ index_elements=[bind_targets.c.id],
+ set_=dict(data="new updated data"),
+ )
+ connection.execute(i, {"id": 1, "data": "new inserted data"})
+
+ eq_(
+ connection.scalar(sql.select(bind_targets.c.data)),
+ "new updated data processed",
+ )
diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py
index c959acf35..94af168ee 100644
--- a/test/dialect/postgresql/test_query.py
+++ b/test/dialect/postgresql/test_query.py
@@ -35,30 +35,32 @@ from sqlalchemy.testing.assertsql import CursorSQL
from sqlalchemy.testing.assertsql import DialectSQL
-matchtable = cattable = None
-
-
class InsertTest(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = "postgresql"
__backend__ = True
- @classmethod
- def setup_class(cls):
- cls.metadata = MetaData(testing.db)
+ def setup(self):
+ self.metadata = MetaData()
def teardown(self):
- self.metadata.drop_all()
- self.metadata.clear()
+ with testing.db.begin() as conn:
+ self.metadata.drop_all(conn)
+
+ @testing.combinations((False,), (True,))
+ def test_foreignkey_missing_insert(self, implicit_returning):
+ engine = engines.testing_engine(
+ options={"implicit_returning": implicit_returning}
+ )
- def test_foreignkey_missing_insert(self):
Table("t1", self.metadata, Column("id", Integer, primary_key=True))
t2 = Table(
"t2",
self.metadata,
Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
)
- self.metadata.create_all()
+
+ self.metadata.create_all(engine)
# want to ensure that "null value in column "id" violates not-
# null constraint" is raised (IntegrityError on psycoopg2, but
@@ -67,19 +69,13 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
# the latter corresponds to autoincrement behavior, which is not
# the case here due to the foreign key.
- for eng in [
- engines.testing_engine(options={"implicit_returning": False}),
- engines.testing_engine(options={"implicit_returning": True}),
- ]:
- with expect_warnings(
- ".*has no Python-side or server-side default.*"
- ):
- with eng.connect() as conn:
- assert_raises(
- (exc.IntegrityError, exc.ProgrammingError),
- conn.execute,
- t2.insert(),
- )
+ with expect_warnings(".*has no Python-side or server-side default.*"):
+ with engine.begin() as conn:
+ assert_raises(
+ (exc.IntegrityError, exc.ProgrammingError),
+ conn.execute,
+ t2.insert(),
+ )
def test_sequence_insert(self):
table = Table(
@@ -88,7 +84,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
Column("id", Integer, Sequence("my_seq"), primary_key=True),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_with_sequence(table, "my_seq")
@testing.requires.returning
@@ -99,7 +95,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
Column("id", Integer, Sequence("my_seq"), primary_key=True),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_with_sequence_returning(table, "my_seq")
def test_opt_sequence_insert(self):
@@ -114,7 +110,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_autoincrement(table)
@testing.requires.returning
@@ -130,7 +126,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_autoincrement_returning(table)
def test_autoincrement_insert(self):
@@ -140,7 +136,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
Column("id", Integer, primary_key=True),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_autoincrement(table)
@testing.requires.returning
@@ -151,7 +147,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
Column("id", Integer, primary_key=True),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_autoincrement_returning(table)
def test_noautoincrement_insert(self):
@@ -161,7 +157,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(30)),
)
- self.metadata.create_all()
+ self.metadata.create_all(testing.db)
self._assert_data_noautoincrement(table)
def _assert_data_autoincrement(self, table):
@@ -169,7 +165,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
with self.sql_execution_asserter(engine) as asserter:
- with engine.connect() as conn:
+ with engine.begin() as conn:
# execute with explicit id
r = conn.execute(table.insert(), {"id": 30, "data": "d1"})
@@ -226,7 +222,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
),
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[
@@ -250,7 +246,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table = Table(table.name, m2, autoload_with=engine)
with self.sql_execution_asserter(engine) as asserter:
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
r = conn.execute(table.insert(), {"data": "d2"})
eq_(r.inserted_primary_key, (5,))
@@ -288,7 +284,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
"INSERT INTO testtable (data) VALUES (:data)", [{"data": "d8"}]
),
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[
@@ -308,7 +304,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engine = engines.testing_engine(options={"implicit_returning": True})
with self.sql_execution_asserter(engine) as asserter:
- with engine.connect() as conn:
+ with engine.begin() as conn:
# execute with explicit id
@@ -367,7 +363,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
),
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[
@@ -390,7 +386,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table = Table(table.name, m2, autoload_with=engine)
with self.sql_execution_asserter(engine) as asserter:
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
r = conn.execute(table.insert(), {"data": "d2"})
eq_(r.inserted_primary_key, (5,))
@@ -430,7 +426,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
),
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[
@@ -450,7 +446,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engine = engines.testing_engine(options={"implicit_returning": False})
with self.sql_execution_asserter(engine) as asserter:
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
conn.execute(table.insert(), {"data": "d2"})
conn.execute(
@@ -491,7 +487,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
[{"data": "d8"}],
),
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[
@@ -513,7 +509,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engine = engines.testing_engine(options={"implicit_returning": True})
with self.sql_execution_asserter(engine) as asserter:
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
conn.execute(table.insert(), {"data": "d2"})
conn.execute(
@@ -555,7 +551,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
),
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
eq_(
conn.execute(table.select()).fetchall(),
[
@@ -578,9 +574,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
# turning off the cache because we are checking for compile-time
# warnings
- with engine.connect().execution_options(compiled_cache=None) as conn:
+ engine = engine.execution_options(compiled_cache=None)
+
+ with engine.begin() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
+ with engine.begin() as conn:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
@@ -590,6 +589,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
{"data": "d2"},
)
+
+ with engine.begin() as conn:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
@@ -599,6 +600,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
[{"data": "d2"}, {"data": "d3"}],
)
+
+ with engine.begin() as conn:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
@@ -608,6 +611,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
{"data": "d2"},
)
+
+ with engine.begin() as conn:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
@@ -618,6 +623,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
[{"data": "d2"}, {"data": "d3"}],
)
+ with engine.begin() as conn:
conn.execute(
table.insert(),
[{"id": 31, "data": "d2"}, {"id": 32, "data": "d3"}],
@@ -634,9 +640,10 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
m2 = MetaData()
table = Table(table.name, m2, autoload_with=engine)
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
+ with engine.begin() as conn:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
@@ -646,6 +653,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
{"data": "d2"},
)
+
+ with engine.begin() as conn:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
@@ -655,6 +664,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
[{"data": "d2"}, {"data": "d3"}],
)
+
+ with engine.begin() as conn:
conn.execute(
table.insert(),
[{"id": 31, "data": "d2"}, {"id": 32, "data": "d3"}],
@@ -666,36 +677,40 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
)
-class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
+class MatchTest(fixtures.TablesTest, AssertsCompiledSQL):
__only_on__ = "postgresql >= 8.3"
__backend__ = True
@classmethod
- def setup_class(cls):
- global metadata, cattable, matchtable
- metadata = MetaData(testing.db)
- cattable = Table(
+ def define_tables(cls, metadata):
+ Table(
"cattable",
metadata,
Column("id", Integer, primary_key=True),
Column("description", String(50)),
)
- matchtable = Table(
+ Table(
"matchtable",
metadata,
Column("id", Integer, primary_key=True),
Column("title", String(200)),
Column("category_id", Integer, ForeignKey("cattable.id")),
)
- metadata.create_all()
- cattable.insert().execute(
+
+ @classmethod
+ def insert_data(cls, connection):
+ cattable, matchtable = cls.tables("cattable", "matchtable")
+
+ connection.execute(
+ cattable.insert(),
[
{"id": 1, "description": "Python"},
{"id": 2, "description": "Ruby"},
- ]
+ ],
)
- matchtable.insert().execute(
+ connection.execute(
+ matchtable.insert(),
[
{
"id": 1,
@@ -714,15 +729,12 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
"category_id": 1,
},
{"id": 5, "title": "Python in a Nutshell", "category_id": 1},
- ]
+ ],
)
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
-
@testing.requires.pyformat_paramstyle
def test_expression_pyformat(self):
+ matchtable = self.tables.matchtable
self.assert_compile(
matchtable.c.title.match("somstr"),
"matchtable.title @@ to_tsquery(%(title_1)s" ")",
@@ -730,51 +742,47 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.requires.format_paramstyle
def test_expression_positional(self):
+ matchtable = self.tables.matchtable
self.assert_compile(
matchtable.c.title.match("somstr"),
"matchtable.title @@ to_tsquery(%s)",
)
- def test_simple_match(self):
- results = (
+ def test_simple_match(self, connection):
+ matchtable = self.tables.matchtable
+ results = connection.execute(
matchtable.select()
.where(matchtable.c.title.match("python"))
.order_by(matchtable.c.id)
- .execute()
- .fetchall()
- )
+ ).fetchall()
eq_([2, 5], [r.id for r in results])
- def test_not_match(self):
- results = (
+ def test_not_match(self, connection):
+ matchtable = self.tables.matchtable
+ results = connection.execute(
matchtable.select()
.where(~matchtable.c.title.match("python"))
.order_by(matchtable.c.id)
- .execute()
- .fetchall()
- )
+ ).fetchall()
eq_([1, 3, 4], [r.id for r in results])
- def test_simple_match_with_apostrophe(self):
- results = (
- matchtable.select()
- .where(matchtable.c.title.match("Matz's"))
- .execute()
- .fetchall()
- )
+ def test_simple_match_with_apostrophe(self, connection):
+ matchtable = self.tables.matchtable
+ results = connection.execute(
+ matchtable.select().where(matchtable.c.title.match("Matz's"))
+ ).fetchall()
eq_([3], [r.id for r in results])
- def test_simple_derivative_match(self):
- results = (
- matchtable.select()
- .where(matchtable.c.title.match("nutshells"))
- .execute()
- .fetchall()
- )
+ def test_simple_derivative_match(self, connection):
+ matchtable = self.tables.matchtable
+ results = connection.execute(
+ matchtable.select().where(matchtable.c.title.match("nutshells"))
+ ).fetchall()
eq_([5], [r.id for r in results])
- def test_or_match(self):
- results1 = (
+ def test_or_match(self, connection):
+ matchtable = self.tables.matchtable
+ results1 = connection.execute(
matchtable.select()
.where(
or_(
@@ -783,42 +791,36 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
)
)
.order_by(matchtable.c.id)
- .execute()
- .fetchall()
- )
+ ).fetchall()
eq_([3, 5], [r.id for r in results1])
- results2 = (
+ results2 = connection.execute(
matchtable.select()
.where(matchtable.c.title.match("nutshells | rubies"))
.order_by(matchtable.c.id)
- .execute()
- .fetchall()
- )
+ ).fetchall()
eq_([3, 5], [r.id for r in results2])
- def test_and_match(self):
- results1 = (
- matchtable.select()
- .where(
+ def test_and_match(self, connection):
+ matchtable = self.tables.matchtable
+ results1 = connection.execute(
+ matchtable.select().where(
and_(
matchtable.c.title.match("python"),
matchtable.c.title.match("nutshells"),
)
)
- .execute()
- .fetchall()
- )
+ ).fetchall()
eq_([5], [r.id for r in results1])
- results2 = (
- matchtable.select()
- .where(matchtable.c.title.match("python & nutshells"))
- .execute()
- .fetchall()
- )
+ results2 = connection.execute(
+ matchtable.select().where(
+ matchtable.c.title.match("python & nutshells")
+ )
+ ).fetchall()
eq_([5], [r.id for r in results2])
- def test_match_across_joins(self):
- results = (
+ def test_match_across_joins(self, connection):
+ cattable, matchtable = self.tables("cattable", "matchtable")
+ results = connection.execute(
matchtable.select()
.where(
and_(
@@ -830,9 +832,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
)
)
.order_by(matchtable.c.id)
- .execute()
- .fetchall()
- )
+ ).fetchall()
eq_([1, 3, 5], [r.id for r in results])
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 4de4d88e3..824f6cd36 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -291,63 +291,64 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
- con = testing.db.connect()
- for ddl in [
- 'CREATE SCHEMA "SomeSchema"',
- "CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42",
- "CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0",
- "CREATE TYPE testtype AS ENUM ('test')",
- "CREATE DOMAIN enumdomain AS testtype",
- "CREATE DOMAIN arraydomain AS INTEGER[]",
- 'CREATE DOMAIN "SomeSchema"."Quoted.Domain" INTEGER DEFAULT 0',
- ]:
- try:
- con.exec_driver_sql(ddl)
- except exc.DBAPIError as e:
- if "already exists" not in str(e):
- raise e
- con.exec_driver_sql(
- "CREATE TABLE testtable (question integer, answer " "testdomain)"
- )
- con.exec_driver_sql(
- "CREATE TABLE test_schema.testtable(question "
- "integer, answer test_schema.testdomain, anything "
- "integer)"
- )
- con.exec_driver_sql(
- "CREATE TABLE crosschema (question integer, answer "
- "test_schema.testdomain)"
- )
+ with testing.db.begin() as con:
+ for ddl in [
+ 'CREATE SCHEMA "SomeSchema"',
+ "CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42",
+ "CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0",
+ "CREATE TYPE testtype AS ENUM ('test')",
+ "CREATE DOMAIN enumdomain AS testtype",
+ "CREATE DOMAIN arraydomain AS INTEGER[]",
+ 'CREATE DOMAIN "SomeSchema"."Quoted.Domain" INTEGER DEFAULT 0',
+ ]:
+ try:
+ con.exec_driver_sql(ddl)
+ except exc.DBAPIError as e:
+ if "already exists" not in str(e):
+ raise e
+ con.exec_driver_sql(
+ "CREATE TABLE testtable (question integer, answer "
+ "testdomain)"
+ )
+ con.exec_driver_sql(
+ "CREATE TABLE test_schema.testtable(question "
+ "integer, answer test_schema.testdomain, anything "
+ "integer)"
+ )
+ con.exec_driver_sql(
+ "CREATE TABLE crosschema (question integer, answer "
+ "test_schema.testdomain)"
+ )
- con.exec_driver_sql(
- "CREATE TABLE enum_test (id integer, data enumdomain)"
- )
+ con.exec_driver_sql(
+ "CREATE TABLE enum_test (id integer, data enumdomain)"
+ )
- con.exec_driver_sql(
- "CREATE TABLE array_test (id integer, data arraydomain)"
- )
+ con.exec_driver_sql(
+ "CREATE TABLE array_test (id integer, data arraydomain)"
+ )
- con.exec_driver_sql(
- "CREATE TABLE quote_test "
- '(id integer, data "SomeSchema"."Quoted.Domain")'
- )
+ con.exec_driver_sql(
+ "CREATE TABLE quote_test "
+ '(id integer, data "SomeSchema"."Quoted.Domain")'
+ )
@classmethod
def teardown_class(cls):
- con = testing.db.connect()
- con.exec_driver_sql("DROP TABLE testtable")
- con.exec_driver_sql("DROP TABLE test_schema.testtable")
- con.exec_driver_sql("DROP TABLE crosschema")
- con.exec_driver_sql("DROP TABLE quote_test")
- con.exec_driver_sql("DROP DOMAIN testdomain")
- con.exec_driver_sql("DROP DOMAIN test_schema.testdomain")
- con.exec_driver_sql("DROP TABLE enum_test")
- con.exec_driver_sql("DROP DOMAIN enumdomain")
- con.exec_driver_sql("DROP TYPE testtype")
- con.exec_driver_sql("DROP TABLE array_test")
- con.exec_driver_sql("DROP DOMAIN arraydomain")
- con.exec_driver_sql('DROP DOMAIN "SomeSchema"."Quoted.Domain"')
- con.exec_driver_sql('DROP SCHEMA "SomeSchema"')
+ with testing.db.begin() as con:
+ con.exec_driver_sql("DROP TABLE testtable")
+ con.exec_driver_sql("DROP TABLE test_schema.testtable")
+ con.exec_driver_sql("DROP TABLE crosschema")
+ con.exec_driver_sql("DROP TABLE quote_test")
+ con.exec_driver_sql("DROP DOMAIN testdomain")
+ con.exec_driver_sql("DROP DOMAIN test_schema.testdomain")
+ con.exec_driver_sql("DROP TABLE enum_test")
+ con.exec_driver_sql("DROP DOMAIN enumdomain")
+ con.exec_driver_sql("DROP TYPE testtype")
+ con.exec_driver_sql("DROP TABLE array_test")
+ con.exec_driver_sql("DROP DOMAIN arraydomain")
+ con.exec_driver_sql('DROP DOMAIN "SomeSchema"."Quoted.Domain"')
+ con.exec_driver_sql('DROP SCHEMA "SomeSchema"')
def test_table_is_reflected(self):
metadata = MetaData()
@@ -486,7 +487,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("ref", Integer, ForeignKey("subject.id$")),
)
- meta1.create_all()
+ meta1.create_all(testing.db)
meta2 = MetaData()
subject = Table("subject", meta2, autoload_with=testing.db)
referer = Table("referer", meta2, autoload_with=testing.db)
@@ -523,9 +524,11 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
with testing.db.begin() as conn:
r = conn.execute(t2.insert())
eq_(r.inserted_primary_key, (1,))
- testing.db.connect().execution_options(
- autocommit=True
- ).exec_driver_sql("alter table t_id_seq rename to foobar_id_seq")
+
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql(
+ "alter table t_id_seq rename to foobar_id_seq"
+ )
m3 = MetaData()
t3 = Table("t", m3, autoload_with=testing.db, implicit_returning=False)
eq_(
@@ -545,10 +548,12 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
- metadata.create_all()
- testing.db.connect().execution_options(
- autocommit=True
- ).exec_driver_sql("alter table t alter column id type varchar(50)")
+ metadata.create_all(testing.db)
+
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql(
+ "alter table t alter column id type varchar(50)"
+ )
m2 = MetaData()
t2 = Table("t", m2, autoload_with=testing.db)
eq_(t2.c.id.autoincrement, False)
@@ -558,10 +563,9 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
def test_renamed_pk_reflection(self):
metadata = self.metadata
Table("t", metadata, Column("id", Integer, primary_key=True))
- metadata.create_all()
- testing.db.connect().execution_options(
- autocommit=True
- ).exec_driver_sql("alter table t rename id to t_id")
+ metadata.create_all(testing.db)
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql("alter table t rename id to t_id")
m2 = MetaData()
t2 = Table("t", m2, autoload_with=testing.db)
eq_([c.name for c in t2.primary_key], ["t_id"])
@@ -936,13 +940,13 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("name", String(20), index=True),
Column("aname", String(20)),
)
- metadata.create_all()
- with testing.db.connect() as c:
- c.exec_driver_sql("create index idx1 on party ((id || name))")
- c.exec_driver_sql(
+ metadata.create_all(testing.db)
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql("create index idx1 on party ((id || name))")
+ conn.exec_driver_sql(
"create unique index idx2 on party (id) where name = 'test'"
)
- c.exec_driver_sql(
+ conn.exec_driver_sql(
"""
create index idx3 on party using btree
(lower(name::text), lower(aname::text))
@@ -1029,7 +1033,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("aname", String(20)),
)
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
t1.create(conn)
@@ -1109,18 +1113,19 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
- metadata.create_all()
- conn = testing.db.connect().execution_options(autocommit=True)
- conn.exec_driver_sql("CREATE INDEX idx1 ON t (x)")
- conn.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y")
+ metadata.create_all(testing.db)
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql("CREATE INDEX idx1 ON t (x)")
+ conn.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y")
- ind = testing.db.dialect.get_indexes(conn, "t", None)
- expected = [{"name": "idx1", "unique": False, "column_names": ["y"]}]
- if testing.requires.index_reflects_included_columns.enabled:
- expected[0]["include_columns"] = []
+ ind = testing.db.dialect.get_indexes(conn, "t", None)
+ expected = [
+ {"name": "idx1", "unique": False, "column_names": ["y"]}
+ ]
+ if testing.requires.index_reflects_included_columns.enabled:
+ expected[0]["include_columns"] = []
- eq_(ind, expected)
- conn.close()
+ eq_(ind, expected)
@testing.fails_if("postgresql < 8.2", "reloptions not supported")
@testing.provide_metadata
@@ -1135,9 +1140,9 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
- metadata.create_all()
+ metadata.create_all(testing.db)
- with testing.db.connect().execution_options(autocommit=True) as conn:
+ with testing.db.begin() as conn:
conn.exec_driver_sql(
"CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)"
)
@@ -1177,8 +1182,8 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("x", ARRAY(Integer)),
)
- metadata.create_all()
- with testing.db.connect().execution_options(autocommit=True) as conn:
+ metadata.create_all(testing.db)
+ with testing.db.begin() as conn:
conn.exec_driver_sql("CREATE INDEX idx1 ON t USING gin (x)")
ind = testing.db.dialect.get_indexes(conn, "t", None)
@@ -1215,7 +1220,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("name", String(20)),
)
metadata.create_all()
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
conn.exec_driver_sql("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
# prior to #5205, this would return:
@@ -1312,8 +1317,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
eq_(fk, fk_ref[fk["name"]])
@testing.provide_metadata
- def test_inspect_enums_schema(self):
- conn = testing.db.connect()
+ def test_inspect_enums_schema(self, connection):
enum_type = postgresql.ENUM(
"sad",
"ok",
@@ -1322,8 +1326,8 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
schema="test_schema",
metadata=self.metadata,
)
- enum_type.create(conn)
- inspector = inspect(conn)
+ enum_type.create(connection)
+ inspector = inspect(connection)
eq_(
inspector.get_enums("test_schema"),
[
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index e7174f234..ae7a65a3a 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -206,7 +206,7 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
),
schema=symbol_name,
)
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
conn = conn.execution_options(
schema_translate_map={symbol_name: testing.config.test_schema}
)