summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGord Thompson <gord@gordthompson.com>2020-04-12 11:58:58 -0600
committerGord Thompson <gord@gordthompson.com>2020-04-12 13:37:48 -0600
commit6eb11d877799cc42983c011ff16730af174b71e5 (patch)
tree06073c38f376431299e7bb790e2e96e6fdee18a7
parent43bcb0de1ecc1dc2abd7e7c3eb0c39f3eab9adc0 (diff)
downloadsqlalchemy-6eb11d877799cc42983c011ff16730af174b71e5.tar.gz
Clean up .execute in test/sql/test_resultset.py
Change-Id: Ie4bb2b8e94361d15a3f1f1c21ce1c59cff1cf735
-rw-r--r--test/sql/test_resultset.py770
1 files changed, 396 insertions, 374 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 253ad7b38..f8d245228 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -90,29 +90,31 @@ class ResultProxyTest(fixtures.TablesTest):
test_needs_acid=True,
)
- def test_row_iteration(self):
+ def test_row_iteration(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
)
- r = users.select().execute()
+ r = connection.execute(users.select())
rows = []
for row in r:
rows.append(row)
eq_(len(rows), 3)
- def test_row_next(self):
+ def test_row_next(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
)
- r = users.select().execute()
+ r = connection.execute(users.select())
rows = []
while True:
row = next(r, "foo")
@@ -122,10 +124,11 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(len(rows), 3)
@testing.requires.subqueries
- def test_anonymous_rows(self):
+ def test_anonymous_rows(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
@@ -136,15 +139,17 @@ class ResultProxyTest(fixtures.TablesTest):
.where(users.c.user_name == "jack")
.scalar_subquery()
)
- for row in select([sel + 1, sel + 3], bind=users.bind).execute():
+ for row in connection.execute(
+ select([sel + 1, sel + 3], bind=users.bind)
+ ):
eq_(row._mapping["anon_1"], 8)
eq_(row._mapping["anon_2"], 10)
- def test_row_comparison(self):
+ def test_row_comparison(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="jack")
- rp = users.select().execute().first()
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ rp = connection.execute(users.select()).first()
eq_(rp, rp)
is_(not (rp != rp), True)
@@ -192,19 +197,19 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(control, op(compare, rp))
@testing.provide_metadata
- def test_column_label_overlap_fallback(self):
+ def test_column_label_overlap_fallback(self, connection):
content = Table("content", self.metadata, Column("type", String(30)))
bar = Table("bar", self.metadata, Column("content_type", String(30)))
- self.metadata.create_all(testing.db)
- testing.db.execute(content.insert().values(type="t1"))
+ self.metadata.create_all(connection)
+ connection.execute(content.insert().values(type="t1"))
- row = testing.db.execute(content.select(use_labels=True)).first()
+ row = connection.execute(content.select(use_labels=True)).first()
in_(content.c.type, row._mapping)
not_in_(bar.c.content_type, row._mapping)
not_in_(bar.c.content_type, row._mapping)
- row = testing.db.execute(
+ row = connection.execute(
select([func.now().label("content_type")])
).first()
@@ -212,10 +217,11 @@ class ResultProxyTest(fixtures.TablesTest):
not_in_(bar.c.content_type, row._mapping)
- def test_pickled_rows(self):
+ def test_pickled_rows(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
@@ -223,12 +229,11 @@ class ResultProxyTest(fixtures.TablesTest):
for pickle in False, True:
for use_labels in False, True:
- result = (
- users.select(use_labels=use_labels)
- .order_by(users.c.user_id)
- .execute()
- .fetchall()
- )
+ result = connection.execute(
+ users.select(use_labels=use_labels).order_by(
+ users.c.user_id
+ )
+ ).fetchall()
if pickle:
result = util.pickle.loads(util.pickle.dumps(result))
@@ -255,8 +260,8 @@ class ResultProxyTest(fixtures.TablesTest):
lambda: result[0]._mapping["fake key"],
)
- def test_column_error_printing(self):
- result = testing.db.execute(select([1]))
+ def test_column_error_printing(self, connection):
+ result = connection.execute(select([1]))
row = result.first()
class unprintable(object):
@@ -283,42 +288,45 @@ class ResultProxyTest(fixtures.TablesTest):
lambda: row._mapping[accessor],
)
- def test_fetchmany(self):
+ def test_fetchmany(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="jack")
- users.insert().execute(user_id=8, user_name="ed")
- users.insert().execute(user_id=9, user_name="fred")
- r = users.select().execute()
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), user_id=8, user_name="ed")
+ connection.execute(users.insert(), user_id=9, user_name="fred")
+ r = connection.execute(users.select())
rows = []
for row in r.fetchmany(size=2):
rows.append(row)
eq_(len(rows), 2)
- def test_column_slices(self):
+ def test_column_slices(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- users.insert().execute(user_id=1, user_name="john")
- users.insert().execute(user_id=2, user_name="jack")
- addresses.insert().execute(
- address_id=1, user_id=2, address="foo@bar.com"
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), user_id=2, user_name="jack")
+ connection.execute(
+ addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
)
- r = text("select * from addresses", bind=testing.db).execute().first()
+ r = connection.execute(
+ text("select * from addresses", bind=testing.db)
+ ).first()
eq_(r[0:1], (1,))
eq_(r[1:], (2, "foo@bar.com"))
eq_(r[:-1], (1, 2))
- def test_column_accessor_basic_compiled_mapping(self):
+ def test_column_accessor_basic_compiled_mapping(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = users.select(users.c.user_id == 2).execute().first()
+ r = connection.execute(users.select(users.c.user_id == 2)).first()
eq_(r.user_id, 2)
eq_(r._mapping["user_id"], 2)
eq_(r._mapping[users.c.user_id], 2)
@@ -327,15 +335,16 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(r._mapping["user_name"], "jack")
eq_(r._mapping[users.c.user_name], "jack")
- def test_column_accessor_basic_compiled_traditional(self):
+ def test_column_accessor_basic_compiled_traditional(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = users.select(users.c.user_id == 2).execute().first()
+ r = connection.execute(users.select(users.c.user_id == 2)).first()
eq_(r.user_id, 2)
eq_(r._mapping["user_id"], 2)
@@ -345,28 +354,30 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(r._mapping["user_name"], "jack")
eq_(r._mapping[users.c.user_name], "jack")
- def test_row_getitem_string(self):
+ def test_row_getitem_string(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
eq_(r._mapping["user_name"], "jack")
- def test_column_accessor_basic_text(self):
+ def test_column_accessor_basic_text(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
@@ -379,14 +390,15 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(r.user_name, "jack")
eq_(r._mapping["user_name"], "jack")
- def test_column_accessor_text_colexplicit(self):
+ def test_column_accessor_text_colexplicit(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2").columns(
users.c.user_id, users.c.user_name
)
@@ -400,16 +412,17 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(r._mapping["user_name"], "jack")
eq_(r._mapping[users.c.user_name], "jack")
- def test_column_accessor_textual_select(self):
+ def test_column_accessor_textual_select(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
# this will create column() objects inside
# the select(), these need to match on name anyway
- r = testing.db.execute(
+ r = connection.execute(
select([column("user_id"), column("user_name")])
.select_from(table("users"))
.where(text("user_id=2"))
@@ -422,14 +435,14 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(r.user_name, "jack")
eq_(r._mapping["user_name"], "jack")
- def test_column_accessor_dotted_union(self):
+ def test_column_accessor_dotted_union(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
# test a little sqlite < 3.10.0 weirdness - with the UNION,
# cols come back as "users.user_id" in cursor.description
- r = testing.db.execute(
+ r = connection.execute(
text(
"select users.user_id, users.user_name "
"from users "
@@ -441,23 +454,20 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(r._mapping["user_name"], "john")
eq_(list(r._fields), ["user_id", "user_name"])
- def test_column_accessor_sqlite_raw(self):
+ def test_column_accessor_sqlite_raw(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
- r = (
+ r = connection.execute(
text(
"select users.user_id, users.user_name "
"from users "
"UNION select users.user_id, "
"users.user_name from users",
bind=testing.db,
- )
- .execution_options(sqlite_raw_colnames=True)
- .execute()
- .first()
- )
+ ).execution_options(sqlite_raw_colnames=True)
+ ).first()
if testing.against("sqlite < 3.10.0"):
not_in_("user_id", r)
@@ -474,12 +484,12 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(list(r._fields), ["user_id", "user_name"])
- def test_column_accessor_sqlite_translated(self):
+ def test_column_accessor_sqlite_translated(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
- r = (
+ r = connection.execute(
text(
"select users.user_id, users.user_name "
"from users "
@@ -487,9 +497,7 @@ class ResultProxyTest(fixtures.TablesTest):
"users.user_name from users",
bind=testing.db,
)
- .execute()
- .first()
- )
+ ).first()
eq_(r._mapping["user_id"], 1)
eq_(r._mapping["user_name"], "john")
@@ -502,44 +510,38 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(list(r._fields), ["user_id", "user_name"])
- def test_column_accessor_labels_w_dots(self):
+ def test_column_accessor_labels_w_dots(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
# test using literal tablename.colname
- r = (
+ r = connection.execute(
text(
'select users.user_id AS "users.user_id", '
'users.user_name AS "users.user_name" '
"from users",
bind=testing.db,
- )
- .execution_options(sqlite_raw_colnames=True)
- .execute()
- .first()
- )
+ ).execution_options(sqlite_raw_colnames=True)
+ ).first()
eq_(r._mapping["users.user_id"], 1)
eq_(r._mapping["users.user_name"], "john")
not_in_("user_name", r._mapping)
eq_(list(r._fields), ["users.user_id", "users.user_name"])
- def test_column_accessor_unary(self):
+ def test_column_accessor_unary(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
# unary expressions
- r = (
- select([users.c.user_name.distinct()])
- .order_by(users.c.user_name)
- .execute()
- .first()
- )
+ r = connection.execute(
+ select([users.c.user_name.distinct()]).order_by(users.c.user_name)
+ ).first()
eq_(r._mapping[users.c.user_name], "john")
eq_(r.user_name, "john")
- def test_column_accessor_err(self):
- r = testing.db.execute(select([1])).first()
+ def test_column_accessor_err(self, connection):
+ r = connection.execute(select([1])).first()
assert_raises_message(
AttributeError,
"Could not locate column in row for column 'foo'",
@@ -601,6 +603,7 @@ class ResultProxyTest(fixtures.TablesTest):
)
def test_connectionless_autoclose_rows_exhausted(self):
+ # TODO: deprecate for 2.0
users = self.tables.users
with testing.db.connect() as conn:
conn.execute(users.insert(), dict(user_id=1, user_name="john"))
@@ -615,6 +618,7 @@ class ResultProxyTest(fixtures.TablesTest):
@testing.requires.returning
def test_connectionless_autoclose_crud_rows_exhausted(self):
+ # TODO: deprecate for 2.0
users = self.tables.users
stmt = (
users.insert()
@@ -630,6 +634,7 @@ class ResultProxyTest(fixtures.TablesTest):
assert connection.closed
def test_connectionless_autoclose_no_rows(self):
+ # TODO: deprecate for 2.0
result = testing.db.execute(text("select * from users"))
connection = result.connection
assert not connection.closed
@@ -638,6 +643,7 @@ class ResultProxyTest(fixtures.TablesTest):
@testing.requires.updateable_autoincrement_pks
def test_connectionless_autoclose_no_metadata(self):
+ # TODO: deprecate for 2.0
result = testing.db.execute(text("update users set user_id=5"))
connection = result.connection
assert connection.closed
@@ -647,8 +653,8 @@ class ResultProxyTest(fixtures.TablesTest):
result.fetchone,
)
- def test_row_case_sensitive(self):
- row = testing.db.execute(
+ def test_row_case_sensitive(self, connection):
+ row = connection.execute(
select(
[
literal_column("1").label("case_insensitive"),
@@ -670,75 +676,80 @@ class ResultProxyTest(fixtures.TablesTest):
assert_raises(KeyError, lambda: row._mapping["casesensitive"])
def test_row_case_sensitive_unoptimized(self):
- ins_db = engines.testing_engine()
- row = ins_db.execute(
- select(
- [
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols"),
- ]
- )
- ).first()
+ with engines.testing_engine().connect() as ins_conn:
+ row = ins_conn.execute(
+ select(
+ [
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive"),
+ text("3 AS screw_up_the_cols"),
+ ]
+ )
+ ).first()
- eq_(
- list(row._fields),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
- )
+ eq_(
+ list(row._fields),
+ ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
+ )
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- not_in_("casesensitive", row._keymap)
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ not_in_("casesensitive", row._keymap)
- eq_(row._mapping["case_insensitive"], 1)
- eq_(row._mapping["CaseSensitive"], 2)
- eq_(row._mapping["screw_up_the_cols"], 3)
+ eq_(row._mapping["case_insensitive"], 1)
+ eq_(row._mapping["CaseSensitive"], 2)
+ eq_(row._mapping["screw_up_the_cols"], 3)
- assert_raises(KeyError, lambda: row._mapping["Case_insensitive"])
- assert_raises(KeyError, lambda: row._mapping["casesensitive"])
- assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"])
+ assert_raises(KeyError, lambda: row._mapping["Case_insensitive"])
+ assert_raises(KeyError, lambda: row._mapping["casesensitive"])
+ assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"])
- def test_row_as_args(self):
+ def test_row_as_args(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="john")
- r = users.select(users.c.user_id == 1).execute().first()
- users.delete().execute()
- users.insert().execute(r._mapping)
- eq_(users.select().execute().fetchall(), [(1, "john")])
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ r = connection.execute(users.select(users.c.user_id == 1)).first()
+ connection.execute(users.delete())
+ connection.execute(users.insert(), r._mapping)
+ eq_(connection.execute(users.select()).fetchall(), [(1, "john")])
- def test_result_as_args(self):
+ def test_result_as_args(self, connection):
users = self.tables.users
users2 = self.tables.users2
- users.insert().execute(
+ connection.execute(
+ users.insert(),
[
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="ed"),
- ]
+ ],
)
- r = users.select().execute()
- users2.insert().execute([row._mapping for row in r])
+ r = connection.execute(users.select())
+ connection.execute(users2.insert(), [row._mapping for row in r])
eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
+ connection.execute(
+ users2.select().order_by(users2.c.user_id)
+ ).fetchall(),
[(1, "john"), (2, "ed")],
)
- users2.delete().execute()
- r = users.select().execute()
- users2.insert().execute(*[row._mapping for row in r])
+ connection.execute(users2.delete())
+ r = connection.execute(users.select())
+ connection.execute(users2.insert(), *[row._mapping for row in r])
eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
+ connection.execute(
+ users2.select().order_by(users2.c.user_id)
+ ).fetchall(),
[(1, "john"), (2, "ed")],
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column(self):
+ def test_ambiguous_column(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- users.insert().execute(user_id=1, user_name="john")
- result = users.outerjoin(addresses).select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ result = connection.execute(users.outerjoin(addresses).select())
r = result.first()
assert_raises_message(
@@ -776,7 +787,7 @@ class ResultProxyTest(fixtures.TablesTest):
lambda: r._mapping["user_id"],
)
- result = users.outerjoin(addresses).select().execute()
+ result = connection.execute(users.outerjoin(addresses).select())
result = _result.BufferedColumnResultProxy(result.context)
r = result.first()
assert isinstance(r, _result.BufferedColumnRow)
@@ -787,16 +798,16 @@ class ResultProxyTest(fixtures.TablesTest):
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_by_col(self):
+ def test_ambiguous_column_by_col(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="john")
+ connection.execute(users.insert(), user_id=1, user_name="john")
ua = users.alias()
u2 = users.alias()
- result = (
- select([users.c.user_id, ua.c.user_id])
- .select_from(users.join(ua, true()))
- .execute()
+ result = connection.execute(
+ select([users.c.user_id, ua.c.user_id]).select_from(
+ users.join(ua, true())
+ )
)
row = result.first()
@@ -815,18 +826,18 @@ class ResultProxyTest(fixtures.TablesTest):
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_contains(self):
+ def test_ambiguous_column_contains(self, connection):
users = self.tables.users
addresses = self.tables.addresses
# ticket 2702. in 0.7 we'd get True, False.
# in 0.8, both columns are present so it's True;
# but when they're fetched you'll get the ambiguous error.
- users.insert().execute(user_id=1, user_name="john")
- result = (
- select([users.c.user_id, addresses.c.user_id])
- .select_from(users.outerjoin(addresses))
- .execute()
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ result = connection.execute(
+ select([users.c.user_id, addresses.c.user_id]).select_from(
+ users.outerjoin(addresses)
+ )
)
row = result.first()
@@ -840,106 +851,102 @@ class ResultProxyTest(fixtures.TablesTest):
set([True]),
)
- def test_loose_matching_one(self):
+ def test_loose_matching_one(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
- conn.execute(
- addresses.insert(),
- {"address_id": 1, "user_id": 1, "address": "email"},
+ connection.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+ connection.execute(
+ addresses.insert(),
+ {"address_id": 1, "user_id": 1, "address": "email"},
+ )
+
+ # use some column labels in the SELECT
+ result = connection.execute(
+ TextualSelect(
+ text(
+ "select users.user_name AS users_user_name, "
+ "users.user_id AS user_id, "
+ "addresses.address_id AS address_id "
+ "FROM users JOIN addresses "
+ "ON users.user_id = addresses.user_id "
+ "WHERE users.user_id=1 "
+ ),
+ [users.c.user_id, users.c.user_name, addresses.c.address_id],
+ positional=False,
)
+ )
+ row = result.first()
+ eq_(row._mapping[users.c.user_id], 1)
+ eq_(row._mapping[users.c.user_name], "john")
- # use some column labels in the SELECT
- result = conn.execute(
- TextualSelect(
- text(
- "select users.user_name AS users_user_name, "
- "users.user_id AS user_id, "
- "addresses.address_id AS address_id "
- "FROM users JOIN addresses "
- "ON users.user_id = addresses.user_id "
- "WHERE users.user_id=1 "
- ),
- [
- users.c.user_id,
- users.c.user_name,
- addresses.c.address_id,
- ],
- positional=False,
- )
- )
- row = result.first()
- eq_(row._mapping[users.c.user_id], 1)
- eq_(row._mapping[users.c.user_name], "john")
-
- def test_loose_matching_two(self):
+ def test_loose_matching_two(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
- conn.execute(
- addresses.insert(),
- {"address_id": 1, "user_id": 1, "address": "email"},
- )
-
- # use some column labels in the SELECT
- result = conn.execute(
- TextualSelect(
- text(
- "select users.user_name AS users_user_name, "
- "users.user_id AS user_id, "
- "addresses.user_id "
- "FROM users JOIN addresses "
- "ON users.user_id = addresses.user_id "
- "WHERE users.user_id=1 "
- ),
- [users.c.user_id, users.c.user_name, addresses.c.user_id],
- positional=False,
- )
+ connection.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+ connection.execute(
+ addresses.insert(),
+ {"address_id": 1, "user_id": 1, "address": "email"},
+ )
+
+ # use some column labels in the SELECT
+ result = connection.execute(
+ TextualSelect(
+ text(
+ "select users.user_name AS users_user_name, "
+ "users.user_id AS user_id, "
+ "addresses.user_id "
+ "FROM users JOIN addresses "
+ "ON users.user_id = addresses.user_id "
+ "WHERE users.user_id=1 "
+ ),
+ [users.c.user_id, users.c.user_name, addresses.c.user_id],
+ positional=False,
)
- row = result.first()
+ )
+ row = result.first()
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row._mapping[users.c.user_id],
- )
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row._mapping[addresses.c.user_id],
- )
- eq_(row._mapping[users.c.user_name], "john")
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row._mapping[users.c.user_id],
+ )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row._mapping[addresses.c.user_id],
+ )
+ eq_(row._mapping[users.c.user_name], "john")
- def test_ambiguous_column_by_col_plus_label(self):
+ def test_ambiguous_column_by_col_plus_label(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="john")
- result = select(
- [
- users.c.user_id,
- type_coerce(users.c.user_id, Integer).label("foo"),
- ]
- ).execute()
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ result = connection.execute(
+ select(
+ [
+ users.c.user_id,
+ type_coerce(users.c.user_id, Integer).label("foo"),
+ ]
+ )
+ )
row = result.first()
eq_(row._mapping[users.c.user_id], 1)
eq_(row[1], 1)
- def test_fetch_partial_result_map(self):
+ def test_fetch_partial_result_map(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="ed")
+ connection.execute(users.insert(), user_id=7, user_name="ed")
t = text("select * from users").columns(user_name=String())
- eq_(testing.db.execute(t).fetchall(), [(7, "ed")])
+ eq_(connection.execute(t).fetchall(), [(7, "ed")])
- def test_fetch_unordered_result_map(self):
+ def test_fetch_unordered_result_map(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="ed")
+ connection.execute(users.insert(), user_id=7, user_name="ed")
class Goofy1(TypeDecorator):
impl = String
@@ -963,28 +970,28 @@ class ResultProxyTest(fixtures.TablesTest):
"select user_name as a, user_name as b, "
"user_name as c from users"
).columns(a=Goofy1(), b=Goofy2(), c=Goofy3())
- eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")])
+ eq_(connection.execute(t).fetchall(), [("eda", "edb", "edc")])
@testing.requires.subqueries
- def test_column_label_targeting(self):
+ def test_column_label_targeting(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="ed")
+ connection.execute(users.insert(), user_id=7, user_name="ed")
for s in (
users.select().alias("foo"),
users.select().alias(users.name),
):
- row = s.select(use_labels=True).execute().first()
+ row = connection.execute(s.select(use_labels=True)).first()
eq_(row._mapping[s.c.user_id], 7)
eq_(row._mapping[s.c.user_name], "ed")
@testing.requires.python3
- def test_ro_mapping_py3k(self):
+ def test_ro_mapping_py3k(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
row = result.first()
dict_row = row._asdict()
@@ -1003,11 +1010,11 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(odict_row.items(), mapping_row.items())
@testing.requires.python2
- def test_ro_mapping_py2k(self):
+ def test_ro_mapping_py2k(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
row = result.first()
dict_row = row._asdict()
@@ -1023,33 +1030,33 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(odict_row.values(), list(mapping_row.values()))
eq_(odict_row.items(), list(mapping_row.items()))
- def test_keys(self):
+ def test_keys(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
eq_(result.keys(), ["user_id", "user_name"])
row = result.first()
eq_(list(row._mapping.keys()), ["user_id", "user_name"])
eq_(row._fields, ("user_id", "user_name"))
- def test_row_keys_legacy_dont_warn(self):
+ def test_row_keys_legacy_dont_warn(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
row = result.first()
# DO NOT WARN DEPRECATED IN 1.x, ONLY 2.0 WARNING
eq_(dict(row), {"user_id": 1, "user_name": "foo"})
eq_(row.keys(), ["user_id", "user_name"])
- def test_keys_anon_labels(self):
+ def test_keys_anon_labels(self, connection):
"""test [ticket:3483]"""
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = testing.db.execute(
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(
select(
[
users.c.user_id,
@@ -1064,11 +1071,11 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(row._fields, ("user_id", "user_name_1", "count_1"))
eq_(list(row._mapping.keys()), ["user_id", "user_name_1", "count_1"])
- def test_items(self):
+ def test_items(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- r = users.select().execute().first()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ r = connection.execute(users.select()).first()
eq_(
[(x[0].lower(), x[1]) for x in list(r._mapping.items())],
[("user_id", 1), ("user_name", "foo")],
@@ -1088,27 +1095,30 @@ class ResultProxyTest(fixtures.TablesTest):
r = connection.exec_driver_sql("select user_name from users").first()
eq_(len(r), 1)
- def test_sorting_in_python(self):
+ def test_sorting_in_python(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="foo"),
dict(user_id=2, user_name="bar"),
dict(user_id=3, user_name="def"),
)
- rows = users.select().order_by(users.c.user_name).execute().fetchall()
+ rows = connection.execute(
+ users.select().order_by(users.c.user_name)
+ ).fetchall()
eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")])
eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")])
- def test_column_order_with_simple_query(self):
+ def test_column_order_with_simple_query(self, connection):
# should return values in column definition order
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- r = users.select(users.c.user_id == 1).execute().first()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ r = connection.execute(users.select(users.c.user_id == 1)).first()
eq_(r[0], 1)
eq_(r[1], "foo")
eq_([x.lower() for x in r._fields], ["user_id", "user_name"])
@@ -1128,10 +1138,10 @@ class ResultProxyTest(fixtures.TablesTest):
eq_([x.lower() for x in r._fields], ["user_name", "user_id"])
eq_(list(r._mapping.values()), ["foo", 1])
- @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()")
+ @testing.crashes("oracle", "FIXME: unknown, verify not fails_on()")
@testing.crashes("firebird", "An identifier must begin with a letter")
@testing.provide_metadata
- def test_column_accessor_shadow(self):
+ def test_column_accessor_shadow(self, connection):
shadowed = Table(
"test_shadowed",
self.metadata,
@@ -1142,8 +1152,9 @@ class ResultProxyTest(fixtures.TablesTest):
Column("_parent", VARCHAR(20)),
Column("_row", VARCHAR(20)),
)
- self.metadata.create_all()
- shadowed.insert().execute(
+ self.metadata.create_all(connection)
+ connection.execute(
+ shadowed.insert(),
shadow_id=1,
shadow_name="The Shadow",
parent="The Light",
@@ -1151,7 +1162,9 @@ class ResultProxyTest(fixtures.TablesTest):
_parent="Hidden parent",
_row="Hidden row",
)
- r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
+ r = connection.execute(
+ shadowed.select(shadowed.c.shadow_id == 1)
+ ).first()
eq_(r.shadow_id, 1)
eq_(r._mapping["shadow_id"], 1)
@@ -1221,25 +1234,26 @@ class ResultProxyTest(fixtures.TablesTest):
with patch.object(
engine.dialect.execution_ctx_cls, "rowcount"
) as mock_rowcount:
- mock_rowcount.__get__ = Mock()
- engine.execute(
- t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
- )
+ with engine.connect() as conn:
+ mock_rowcount.__get__ = Mock()
+ conn.execute(
+ t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
+ )
- eq_(len(mock_rowcount.__get__.mock_calls), 0)
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
- eq_(
- engine.execute(t.select()).fetchall(),
- [("d1",), ("d2",), ("d3",)],
- )
- eq_(len(mock_rowcount.__get__.mock_calls), 0)
+ eq_(
+ conn.execute(t.select()).fetchall(),
+ [("d1",), ("d2",), ("d3",)],
+ )
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
- engine.execute(t.update(), {"data": "d4"})
+ conn.execute(t.update(), {"data": "d4"})
- eq_(len(mock_rowcount.__get__.mock_calls), 1)
+ eq_(len(mock_rowcount.__get__.mock_calls), 1)
- engine.execute(t.delete())
- eq_(len(mock_rowcount.__get__.mock_calls), 2)
+ conn.execute(t.delete())
+ eq_(len(mock_rowcount.__get__.mock_calls), 2)
def test_row_is_sequence(self):
@@ -1259,17 +1273,17 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(hash(row), hash((1, "value", "foo")))
@testing.provide_metadata
- def test_row_getitem_indexes_compiled(self):
+ def test_row_getitem_indexes_compiled(self, connection):
values = Table(
"rp",
self.metadata,
Column("key", String(10), primary_key=True),
Column("value", String(10)),
)
- values.create()
+ values.create(connection)
- testing.db.execute(values.insert(), dict(key="One", value="Uno"))
- row = testing.db.execute(values.select()).first()
+ connection.execute(values.insert(), dict(key="One", value="Uno"))
+ row = connection.execute(values.select()).first()
eq_(row._mapping["key"], "One")
eq_(row._mapping["value"], "Uno")
eq_(row[0], "One")
@@ -1293,7 +1307,7 @@ class ResultProxyTest(fixtures.TablesTest):
@testing.requires.cextensions
def test_row_c_sequence_check(self):
-
+ # TODO: modernize for 2.0
metadata = MetaData()
metadata.bind = "sqlite://"
users = Table(
@@ -1409,39 +1423,39 @@ class KeyTargetingTest(fixtures.TablesTest):
)
@testing.requires.schemas
- def test_keyed_accessor_wschema(self):
+ def test_keyed_accessor_wschema(self, connection):
keyed1 = self.tables["%s.wschema" % testing.config.test_schema]
- row = testing.db.execute(keyed1.select()).first()
+ row = connection.execute(keyed1.select()).first()
eq_(row.b, "a1")
eq_(row.q, "c1")
eq_(row.a, "a1")
eq_(row.c, "c1")
- def test_keyed_accessor_single(self):
+ def test_keyed_accessor_single(self, connection):
keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select()).first()
+ row = connection.execute(keyed1.select()).first()
eq_(row.b, "a1")
eq_(row.q, "c1")
eq_(row.a, "a1")
eq_(row.c, "c1")
- def test_keyed_accessor_single_labeled(self):
+ def test_keyed_accessor_single_labeled(self, connection):
keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select().apply_labels()).first()
+ row = connection.execute(keyed1.select().apply_labels()).first()
eq_(row.keyed1_b, "a1")
eq_(row.keyed1_q, "c1")
eq_(row.keyed1_a, "a1")
eq_(row.keyed1_c, "c1")
- def _test_keyed_targeting_no_label_at_all(self, expression):
+ def _test_keyed_targeting_no_label_at_all(self, expression, conn):
lt = literal_column("2")
stmt = select([literal_column("1"), expression, lt]).select_from(
self.tables.keyed1
)
- row = testing.db.execute(stmt).first()
+ row = conn.execute(stmt).first()
eq_(row._mapping[expression], "a1")
eq_(row._mapping[lt], 2)
@@ -1450,7 +1464,7 @@ class KeyTargetingTest(fixtures.TablesTest):
# easily. we get around that because we know that "2" is unique
eq_(row._mapping["2"], 2)
- def test_keyed_targeting_no_label_at_all_one(self):
+ def test_keyed_targeting_no_label_at_all_one(self, connection):
class not_named_max(expression.ColumnElement):
name = "not_named_max"
@@ -1464,9 +1478,9 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(str(select([not_named_max()])), "SELECT max(a)")
nnm = not_named_max()
- self._test_keyed_targeting_no_label_at_all(nnm)
+ self._test_keyed_targeting_no_label_at_all(nnm, connection)
- def test_keyed_targeting_no_label_at_all_two(self):
+ def test_keyed_targeting_no_label_at_all_two(self, connection):
class not_named_max(expression.ColumnElement):
name = "not_named_max"
@@ -1479,24 +1493,24 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(str(select([not_named_max()])), "SELECT max(a)")
nnm = not_named_max()
- self._test_keyed_targeting_no_label_at_all(nnm)
+ self._test_keyed_targeting_no_label_at_all(nnm, connection)
- def test_keyed_targeting_no_label_at_all_text(self):
+ def test_keyed_targeting_no_label_at_all_text(self, connection):
t1 = text("max(a)")
t2 = text("min(a)")
stmt = select([t1, t2]).select_from(self.tables.keyed1)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
eq_(row._mapping[t1], "a1")
eq_(row._mapping[t2], "a1")
@testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_conflict_2(self):
+ def test_keyed_accessor_composite_conflict_2(self, connection):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed2]).select_from(keyed1.join(keyed2, true()))
).first()
@@ -1522,14 +1536,16 @@ class KeyTargetingTest(fixtures.TablesTest):
# illustrate why row.b above is ambiguous, and not "b2"; because
# if we didn't have keyed2, now it matches row.a. a new column
# shouldn't be able to grab the value from a previous column.
- row = testing.db.execute(select([keyed1])).first()
+ row = connection.execute(select([keyed1])).first()
eq_(row.b, "a1")
- def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self):
+ def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(
+ self, connection
+ ):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed2])
.select_from(keyed1.join(keyed2, true()))
.apply_labels()
@@ -1541,11 +1557,11 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(row._mapping["keyed2_b"], "b2")
eq_(row._mapping["keyed1_a"], "a1")
- def test_keyed_accessor_composite_names_precedent(self):
+ def test_keyed_accessor_composite_names_precedent(self, connection):
keyed1 = self.tables.keyed1
keyed4 = self.tables.keyed4
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed4]).select_from(keyed1.join(keyed4, true()))
).first()
eq_(row.b, "b4")
@@ -1554,11 +1570,11 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(row.c, "c1")
@testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_keys_precedent(self):
+ def test_keyed_accessor_composite_keys_precedent(self, connection):
keyed1 = self.tables.keyed1
keyed3 = self.tables.keyed3
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed3]).select_from(keyed1.join(keyed3, true()))
).first()
eq_(row.q, "c1")
@@ -1578,11 +1594,11 @@ class KeyTargetingTest(fixtures.TablesTest):
)
eq_(row.d, "d3")
- def test_keyed_accessor_composite_labeled(self):
+ def test_keyed_accessor_composite_labeled(self, connection):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed2])
.select_from(keyed1.join(keyed2, true()))
.apply_labels()
@@ -1599,7 +1615,9 @@ class KeyTargetingTest(fixtures.TablesTest):
assert_raises(KeyError, lambda: row._mapping["keyed2_c"])
assert_raises(KeyError, lambda: row._mapping["keyed2_q"])
- def test_keyed_accessor_column_is_repeated_multiple_times(self):
+ def test_keyed_accessor_column_is_repeated_multiple_times(
+ self, connection
+ ):
# test new logic added as a result of the combination of #4892 and
# #4887. We allow duplicate columns, but we also have special logic
# to disambiguate for the same column repeated, and as #4887 adds
@@ -1627,7 +1645,7 @@ class KeyTargetingTest(fixtures.TablesTest):
.apply_labels()
)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
is_false(result._metadata.matched_on_name)
# ensure the result map is the same number of cols so we can
@@ -1664,34 +1682,34 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(row[6], "d3")
eq_(row[7], "d3")
- def test_columnclause_schema_column_one(self):
+ def test_columnclause_schema_column_one(self, connection):
# originally addressed by [ticket:2932], however liberalized
# Column-targeting rules are deprecated
a, b = sql.column("a"), sql.column("b")
stmt = select([a, b]).select_from(table("keyed2"))
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(a, row._mapping)
in_(b, row._mapping)
- def test_columnclause_schema_column_two(self):
+ def test_columnclause_schema_column_two(self, connection):
keyed2 = self.tables.keyed2
stmt = select([keyed2.c.a, keyed2.c.b])
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(keyed2.c.a, row._mapping)
in_(keyed2.c.b, row._mapping)
- def test_columnclause_schema_column_three(self):
+ def test_columnclause_schema_column_three(self, connection):
# this is also addressed by [ticket:2932]
stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(stmt.selected_columns.a, row._mapping)
in_(stmt.selected_columns.b, row._mapping)
- def test_columnclause_schema_column_four(self):
+ def test_columnclause_schema_column_four(self, connection):
# originally addressed by [ticket:2932], however liberalized
# Column-targeting rules are deprecated
@@ -1699,7 +1717,7 @@ class KeyTargetingTest(fixtures.TablesTest):
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
a, b
)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(a, row._mapping)
in_(b, row._mapping)
@@ -1707,13 +1725,13 @@ class KeyTargetingTest(fixtures.TablesTest):
in_(stmt.selected_columns.keyed2_a, row._mapping)
in_(stmt.selected_columns.keyed2_b, row._mapping)
- def test_columnclause_schema_column_five(self):
+ def test_columnclause_schema_column_five(self, connection):
# this is also addressed by [ticket:2932]
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
keyed2_a=CHAR, keyed2_b=CHAR
)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(stmt.selected_columns.keyed2_a, row._mapping)
in_(stmt.selected_columns.keyed2_b, row._mapping)
@@ -1818,15 +1836,17 @@ class PositionalTextTest(fixtures.TablesTest):
@classmethod
def insert_data(cls):
- cls.tables.text1.insert().execute(
- [dict(a="a1", b="b1", c="c1", d="d1")]
- )
+ with testing.db.connect() as conn:
+ conn.execute(
+ cls.tables.text1.insert(),
+ [dict(a="a1", b="b1", c="c1", d="d1")],
+ )
- def test_via_column(self):
+ def test_via_column(self, connection):
c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c2], "b1")
@@ -1838,23 +1858,23 @@ class PositionalTextTest(fixtures.TablesTest):
eq_(row._mapping["r"], "c1")
eq_(row._mapping["d"], "d1")
- def test_fewer_cols_than_sql_positional(self):
+ def test_fewer_cols_than_sql_positional(self, connection):
c1, c2 = column("q"), column("p")
stmt = text("select a, b, c, d from text1").columns(c1, c2)
# no warning as this can be similar for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
eq_(row._mapping["c"], "c1")
- def test_fewer_cols_than_sql_non_positional(self):
+ def test_fewer_cols_than_sql_non_positional(self, connection):
c1, c2 = column("a"), column("p")
stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR)
# no warning as this can be similar for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
# c1 name matches, locates
@@ -1868,7 +1888,7 @@ class PositionalTextTest(fixtures.TablesTest):
lambda: row._mapping[c2],
)
- def test_more_cols_than_sql_positional(self):
+ def test_more_cols_than_sql_positional(self, connection):
c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
stmt = text("select a, b from text1").columns(c1, c2, c3, c4)
@@ -1876,7 +1896,7 @@ class PositionalTextTest(fixtures.TablesTest):
r"Number of columns in textual SQL \(4\) is "
r"smaller than number of columns requested \(2\)"
):
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c2], "b1")
@@ -1887,14 +1907,14 @@ class PositionalTextTest(fixtures.TablesTest):
lambda: row._mapping[c3],
)
- def test_more_cols_than_sql_nonpositional(self):
+ def test_more_cols_than_sql_nonpositional(self, connection):
c1, c2, c3, c4 = column("b"), column("a"), column("r"), column("d")
stmt = TextualSelect(
text("select a, b from text1"), [c1, c2, c3, c4], positional=False
)
# no warning for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "b1")
@@ -1906,7 +1926,7 @@ class PositionalTextTest(fixtures.TablesTest):
lambda: row._mapping[c3],
)
- def test_more_cols_than_sql_nonpositional_labeled_cols(self):
+ def test_more_cols_than_sql_nonpositional_labeled_cols(self, connection):
text1 = self.tables.text1
c1, c2, c3, c4 = text1.c.b, text1.c.a, column("r"), column("d")
@@ -1919,7 +1939,7 @@ class PositionalTextTest(fixtures.TablesTest):
)
# no warning for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "b1")
@@ -1931,7 +1951,7 @@ class PositionalTextTest(fixtures.TablesTest):
lambda: row._mapping[c3],
)
- def test_dupe_col_obj(self):
+ def test_dupe_col_obj(self, connection):
c1, c2, c3 = column("q"), column("p"), column("r")
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2)
@@ -1939,11 +1959,11 @@ class PositionalTextTest(fixtures.TablesTest):
exc.InvalidRequestError,
"Duplicate column expression requested in "
"textual SQL: <.*.ColumnClause.*; p>",
- testing.db.execute,
+ connection.execute,
stmt,
)
- def test_anon_aliased_unique(self):
+ def test_anon_aliased_unique(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label(None)
@@ -1952,7 +1972,7 @@ class PositionalTextTest(fixtures.TablesTest):
c4 = text1.alias().c.d.label(None)
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
@@ -1968,7 +1988,7 @@ class PositionalTextTest(fixtures.TablesTest):
lambda: row._mapping[text1.c.b],
)
- def test_anon_aliased_overlapping(self):
+ def test_anon_aliased_overlapping(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label(None)
@@ -1977,7 +1997,7 @@ class PositionalTextTest(fixtures.TablesTest):
c4 = text1.c.a.label(None)
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
@@ -1985,7 +2005,7 @@ class PositionalTextTest(fixtures.TablesTest):
eq_(row._mapping[c3], "c1")
eq_(row._mapping[c4], "d1")
- def test_anon_aliased_name_conflict(self):
+ def test_anon_aliased_name_conflict(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label("a")
@@ -1998,7 +2018,7 @@ class PositionalTextTest(fixtures.TablesTest):
stmt = text("select a, b as a, c as a, d as a from text1").columns(
c1, c2, c3, c4
)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
@@ -2034,10 +2054,11 @@ class AlternateResultProxyTest(fixtures.TablesTest):
@classmethod
def insert_data(cls):
- cls.engine.execute(
- cls.tables.test.insert(),
- [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
- )
+ with cls.engine.connect() as conn:
+ conn.execute(
+ cls.tables.test.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
+ )
@contextmanager
def _proxy_fixture(self, cls):
@@ -2059,53 +2080,54 @@ class AlternateResultProxyTest(fixtures.TablesTest):
def _test_proxy(self, cls):
with self._proxy_fixture(cls):
rows = []
- r = self.engine.execute(select([self.table]))
- assert isinstance(r.cursor_strategy, cls)
- for i in range(5):
- rows.append(r.fetchone())
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ with self.engine.connect() as conn:
+ r = conn.execute(select([self.table]))
+ assert isinstance(r.cursor_strategy, cls)
+ for i in range(5):
+ rows.append(r.fetchone())
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- rows = r.fetchmany(3)
- eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+ rows = r.fetchmany(3)
+ eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
- rows = r.fetchall()
- eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+ rows = r.fetchall()
+ eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
- r = self.engine.execute(select([self.table]))
- rows = r.fetchmany(None)
- eq_(rows[0], (1, "t_1"))
- # number of rows here could be one, or the whole thing
- assert len(rows) == 1 or len(rows) == 11
+ r = conn.execute(select([self.table]))
+ rows = r.fetchmany(None)
+ eq_(rows[0], (1, "t_1"))
+ # number of rows here could be one, or the whole thing
+ assert len(rows) == 1 or len(rows) == 11
- r = self.engine.execute(select([self.table]).limit(1))
- r.fetchone()
- eq_(r.fetchone(), None)
+ r = conn.execute(select([self.table]).limit(1))
+ r.fetchone()
+ eq_(r.fetchone(), None)
- r = self.engine.execute(select([self.table]).limit(5))
- rows = r.fetchmany(6)
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ r = conn.execute(select([self.table]).limit(5))
+ rows = r.fetchmany(6)
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- # result keeps going just fine with blank results...
- eq_(r.fetchmany(2), [])
+ # result keeps going just fine with blank results...
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchmany(2), [])
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchall(), [])
+ eq_(r.fetchall(), [])
- eq_(r.fetchone(), None)
+ eq_(r.fetchone(), None)
- # until we close
- r.close()
+ # until we close
+ r.close()
- self._assert_result_closed(r)
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.first(), (1, "t_1"))
- self._assert_result_closed(r)
+ r = conn.execute(select([self.table]).limit(5))
+ eq_(r.first(), (1, "t_1"))
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.scalar(), 1)
- self._assert_result_closed(r)
+ r = conn.execute(select([self.table]).limit(5))
+ eq_(r.scalar(), 1)
+ self._assert_result_closed(r)
def _assert_result_closed(self, r):
assert_raises_message(