diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-06-04 17:29:20 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-02-21 17:53:33 -0500 |
| commit | f559f378c47811b5528ad1769cb86925e85fd1e5 (patch) | |
| tree | fd8325501a96cf1e4280c15f267f63b2af7b5f97 /test/sql/test_resultset.py | |
| parent | 93b7767d00267ebe149cabcae7246b6796352eb8 (diff) | |
| download | sqlalchemy-f559f378c47811b5528ad1769cb86925e85fd1e5.tar.gz | |
Result initial introduction
This builds on cc718cccc0bf8a01abdf4068c7ea4f3 which moved
RowProxy to Row, allowing Row to be more like a named tuple.
- KeyedTuple in ORM is replaced with Row
- ResultSetMetaData broken out into "simple" and "cursor" versions
for ORM and Core, as well as LegacyCursor version.
- Row now has _mapping attribute that supplies full mapping behavior.
Row and SimpleRow both have named tuple behavior otherwise.
LegacyRow has some mapping features on the tuple which emit
deprecation warnings (e.g. keys(), values(), etc). the biggest
change for mapping->tuple is the behavior of __contains__ which
moves from testing of "key in row" to "value in row".
- ResultProxy breaks into ResultProxy and FutureResult (interim),
the latter has the newer APIs. Made available to dialects
using execution options.
- internal reflection methods and most tests move off of implicit
Row mapping behavior and move to row._mapping, result.mappings()
method using future result
- a new strategy system for cursor handling replaces the various
subclasses of RowProxy
- some execution context adjustments. We will leave EC in but
refined things like get_result_proxy() and out parameter handling.
Dialects for 1.4 will need to adjust from get_result_proxy()
to get_result_cursor_strategy(), if they are using this method
- out parameter handling now accommodated by get_out_parameter_values()
EC method. Oracle changes for this. external dialect for
DB2 for example will also need to adjust for this.
- deprecate case_insensitive flag for engine / result, this
feature is not used
mapping-methods on Row are deprecated, and replaced with
Row._mapping.<meth>, including:
row.keys() -> use row._mapping.keys()
row.items() -> use row._mapping.items()
row.values() -> use row._mapping.values()
key in row -> use key in row._mapping
int in row -> use int < len(row)
Fixes: #4710
Fixes: #4878
Change-Id: Ieb9085e9bcff564359095b754da9ae0af55679f0
Diffstat (limited to 'test/sql/test_resultset.py')
| -rw-r--r-- | test/sql/test_resultset.py | 591 |
1 files changed, 349 insertions, 242 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 8aa524d78..2a6851a99 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -1,3 +1,4 @@ +import collections from contextlib import contextmanager import csv import operator @@ -134,8 +135,8 @@ class ResultProxyTest(fixtures.TablesTest): .scalar_subquery() ) for row in select([sel + 1, sel + 3], bind=users.bind).execute(): - eq_(row["anon_1"], 8) - eq_(row["anon_2"], 10) + eq_(row._mapping["anon_1"], 8) + eq_(row._mapping["anon_2"], 10) def test_row_comparison(self): users = self.tables.users @@ -196,15 +197,18 @@ class ResultProxyTest(fixtures.TablesTest): testing.db.execute(content.insert().values(type="t1")) row = testing.db.execute(content.select(use_labels=True)).first() - in_(content.c.type, row) - not_in_(bar.c.content_type, row) + in_(content.c.type, row._mapping) + not_in_(bar.c.content_type, row._mapping) + + not_in_(bar.c.content_type, row._mapping) row = testing.db.execute( select([func.now().label("content_type")]) ).first() - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) + not_in_(content.c.type, row._mapping) + + not_in_(bar.c.content_type, row._mapping) def test_pickled_rows(self): users = self.tables.users @@ -229,14 +233,14 @@ class ResultProxyTest(fixtures.TablesTest): eq_(result, [(7, "jack"), (8, "ed"), (9, "fred")]) if use_labels: - eq_(result[0]["users_user_id"], 7) + eq_(result[0]._mapping["users_user_id"], 7) eq_( - list(result[0].keys()), + list(result[0]._fields), ["users_user_id", "users_user_name"], ) else: - eq_(result[0]["user_id"], 7) - eq_(list(result[0].keys()), ["user_id", "user_name"]) + eq_(result[0]._mapping["user_id"], 7) + eq_(list(result[0]._fields), ["user_id", "user_name"]) eq_(result[0][0], 7) @@ -244,6 +248,11 @@ class ResultProxyTest(fixtures.TablesTest): exc.NoSuchColumnError, lambda: result[0]["fake key"] ) + assert_raises( + exc.NoSuchColumnError, + lambda: result[0]._mapping["fake key"], + ) + def test_column_error_printing(self): result = testing.db.execute(select([1])) row = result.first() @@ -267,7 +276,9 @@ class ResultProxyTest(fixtures.TablesTest): is_(result._getter(accessor, False), None) assert_raises_message( - exc.NoSuchColumnError, msg % repl, lambda: row[accessor] + exc.NoSuchColumnError, + msg % repl, + lambda: row._mapping[accessor], ) def test_fetchmany(self): @@ -297,7 +308,24 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r[1:], (2, "foo@bar.com")) eq_(r[:-1], (1, 2)) - def test_column_accessor_basic_compiled(self): + def test_column_accessor_basic_compiled_mapping(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), + ) + + r = users.select(users.c.user_id == 2).execute().first() + eq_(r.user_id, 2) + eq_(r._mapping["user_id"], 2) + eq_(r._mapping[users.c.user_id], 2) + + eq_(r.user_name, "jack") + eq_(r._mapping["user_name"], "jack") + eq_(r._mapping[users.c.user_name], "jack") + + def test_column_accessor_basic_compiled_traditional(self): users = self.tables.users users.insert().execute( @@ -306,13 +334,28 @@ class ResultProxyTest(fixtures.TablesTest): ) r = users.select(users.c.user_id == 2).execute().first() + eq_(r.user_id, 2) - eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) + eq_(r._mapping["user_id"], 2) + eq_(r._mapping[users.c.user_id], 2) eq_(r.user_name, "jack") - eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") + eq_(r._mapping["user_name"], "jack") + eq_(r._mapping[users.c.user_name], "jack") + + def test_row_getitem_string(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), + ) + + r = testing.db.execute( + text("select * from users where user_id=2") + ).first() + + eq_(r._mapping["user_name"], "jack") def test_column_accessor_basic_text(self): users = self.tables.users @@ -326,10 +369,34 @@ class ResultProxyTest(fixtures.TablesTest): ).first() eq_(r.user_id, 2) - eq_(r["user_id"], 2) eq_(r.user_name, "jack") - eq_(r["user_name"], "jack") + + eq_(r._mapping["user_id"], 2) + + eq_(r.user_name, "jack") + eq_(r._mapping["user_name"], "jack") + + def test_column_accessor_text_colexplicit(self): + users = self.tables.users + + users.insert().execute( + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), + ) + r = testing.db.execute( + text("select * from users where user_id=2").columns( + users.c.user_id, users.c.user_name + ) + ).first() + + eq_(r.user_id, 2) + eq_(r._mapping["user_id"], 2) + eq_(r._mapping[users.c.user_id], 2) + + eq_(r.user_name, "jack") + eq_(r._mapping["user_name"], "jack") + eq_(r._mapping[users.c.user_name], "jack") def test_column_accessor_textual_select(self): users = self.tables.users @@ -346,11 +413,12 @@ class ResultProxyTest(fixtures.TablesTest): .where(text("user_id=2")) ).first() + # keyed access works in many ways eq_(r.user_id, 2) - eq_(r["user_id"], 2) - eq_(r.user_name, "jack") - eq_(r["user_name"], "jack") + eq_(r._mapping["user_id"], 2) + eq_(r.user_name, "jack") + eq_(r._mapping["user_name"], "jack") def test_column_accessor_dotted_union(self): users = self.tables.users @@ -367,9 +435,9 @@ class ResultProxyTest(fixtures.TablesTest): "users.user_name from users" ) ).first() - eq_(r["user_id"], 1) - eq_(r["user_name"], "john") - eq_(list(r.keys()), ["user_id", "user_name"]) + eq_(r._mapping["user_id"], 1) + eq_(r._mapping["user_name"], "john") + eq_(list(r._fields), ["user_id", "user_name"]) def test_column_accessor_sqlite_raw(self): users = self.tables.users @@ -395,14 +463,14 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r["users.user_id"], 1) eq_(r["users.user_name"], "john") - eq_(list(r.keys()), ["users.user_id", "users.user_name"]) + eq_(list(r._fields), ["users.user_id", "users.user_name"]) else: - not_in_("users.user_id", r) - not_in_("users.user_name", r) - eq_(r["user_id"], 1) - eq_(r["user_name"], "john") + not_in_("users.user_id", r._mapping) + not_in_("users.user_name", r._mapping) + eq_(r._mapping["user_id"], 1) + eq_(r._mapping["user_name"], "john") - eq_(list(r.keys()), ["user_id", "user_name"]) + eq_(list(r._fields), ["user_id", "user_name"]) def test_column_accessor_sqlite_translated(self): users = self.tables.users @@ -420,17 +488,17 @@ class ResultProxyTest(fixtures.TablesTest): .execute() .first() ) - eq_(r["user_id"], 1) - eq_(r["user_name"], "john") + eq_(r._mapping["user_id"], 1) + eq_(r._mapping["user_name"], "john") if testing.against("sqlite < 3.10.0"): - eq_(r["users.user_id"], 1) - eq_(r["users.user_name"], "john") + eq_(r._mapping["users.user_id"], 1) + eq_(r._mapping["users.user_name"], "john") else: - not_in_("users.user_id", r) - not_in_("users.user_name", r) + not_in_("users.user_id", r._mapping) + not_in_("users.user_name", r._mapping) - eq_(list(r.keys()), ["user_id", "user_name"]) + eq_(list(r._fields), ["user_id", "user_name"]) def test_column_accessor_labels_w_dots(self): users = self.tables.users @@ -448,10 +516,10 @@ class ResultProxyTest(fixtures.TablesTest): .execute() .first() ) - eq_(r["users.user_id"], 1) - eq_(r["users.user_name"], "john") - not_in_("user_name", r) - eq_(list(r.keys()), ["users.user_id", "users.user_name"]) + eq_(r._mapping["users.user_id"], 1) + eq_(r._mapping["users.user_name"], "john") + not_in_("user_name", r._mapping) + eq_(list(r._fields), ["users.user_id", "users.user_name"]) def test_column_accessor_unary(self): users = self.tables.users @@ -465,7 +533,7 @@ class ResultProxyTest(fixtures.TablesTest): .execute() .first() ) - eq_(r[users.c.user_name], "john") + eq_(r._mapping[users.c.user_name], "john") eq_(r.user_name, "john") def test_column_accessor_err(self): @@ -480,7 +548,7 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( KeyError, "Could not locate column in row for column 'foo'", - lambda: r["foo"], + lambda: r._mapping["foo"], ) def test_graceful_fetch_on_non_rows(self): @@ -586,20 +654,20 @@ class ResultProxyTest(fixtures.TablesTest): ) ).first() - eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) + eq_(list(row._fields), ["case_insensitive", "CaseSensitive"]) in_("case_insensitive", row._keymap) in_("CaseSensitive", row._keymap) not_in_("casesensitive", row._keymap) - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) + eq_(row._mapping["case_insensitive"], 1) + eq_(row._mapping["CaseSensitive"], 2) - assert_raises(KeyError, lambda: row["Case_insensitive"]) - assert_raises(KeyError, lambda: row["casesensitive"]) + assert_raises(KeyError, lambda: row._mapping["Case_insensitive"]) + assert_raises(KeyError, lambda: row._mapping["casesensitive"]) def test_row_case_sensitive_unoptimized(self): - ins_db = engines.testing_engine(options={"case_sensitive": True}) + ins_db = engines.testing_engine() row = ins_db.execute( select( [ @@ -611,7 +679,7 @@ class ResultProxyTest(fixtures.TablesTest): ).first() eq_( - list(row.keys()), + list(row._fields), ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], ) @@ -619,63 +687,13 @@ class ResultProxyTest(fixtures.TablesTest): in_("CaseSensitive", row._keymap) not_in_("casesensitive", row._keymap) - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["screw_up_the_cols"], 3) - - assert_raises(KeyError, lambda: row["Case_insensitive"]) - assert_raises(KeyError, lambda: row["casesensitive"]) - assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) - - def test_row_case_insensitive(self): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select( - [ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - ] - ) - ).first() - - eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["Case_insensitive"], 1) - eq_(row["casesensitive"], 2) - - def test_row_case_insensitive_unoptimized(self): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select( - [ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols"), - ] - ) - ).first() - - eq_( - list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], - ) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) + eq_(row._mapping["case_insensitive"], 1) + eq_(row._mapping["CaseSensitive"], 2) + eq_(row._mapping["screw_up_the_cols"], 3) - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["screw_up_the_cols"], 3) - eq_(row["Case_insensitive"], 1) - eq_(row["casesensitive"], 2) - eq_(row["screw_UP_the_cols"], 3) + assert_raises(KeyError, lambda: row._mapping["Case_insensitive"]) + assert_raises(KeyError, lambda: row._mapping["casesensitive"]) + assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"]) def test_row_as_args(self): users = self.tables.users @@ -683,7 +701,7 @@ class ResultProxyTest(fixtures.TablesTest): users.insert().execute(user_id=1, user_name="john") r = users.select(users.c.user_id == 1).execute().first() users.delete().execute() - users.insert().execute(r) + users.insert().execute(r._mapping) eq_(users.select().execute().fetchall(), [(1, "john")]) def test_result_as_args(self): @@ -697,7 +715,7 @@ class ResultProxyTest(fixtures.TablesTest): ] ) r = users.select().execute() - users2.insert().execute(list(r)) + users2.insert().execute([row._mapping for row in r]) eq_( users2.select().order_by(users2.c.user_id).execute().fetchall(), [(1, "john"), (2, "ed")], @@ -705,7 +723,7 @@ class ResultProxyTest(fixtures.TablesTest): users2.delete().execute() r = users.select().execute() - users2.insert().execute(*list(r)) + users2.insert().execute(*[row._mapping for row in r]) eq_( users2.select().order_by(users2.c.user_id).execute().fetchall(), [(1, "john"), (2, "ed")], @@ -723,7 +741,7 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: r["user_id"], + lambda: r._mapping["user_id"], ) assert_raises_message( @@ -736,8 +754,8 @@ class ResultProxyTest(fixtures.TablesTest): # pure positional targeting; users.c.user_id # and addresses.c.user_id are known! # works as of 1.1 issue #3501 - eq_(r[users.c.user_id], 1) - eq_(r[addresses.c.user_id], None) + eq_(r._mapping[users.c.user_id], 1) + eq_(r._mapping[addresses.c.user_id], None) # try to trick it - fake_table isn't in the result! # we get the correct error @@ -745,14 +763,14 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Could not locate column in row for column 'fake.user_id'", - lambda: r[fake_table.c.user_id], + lambda: r._mapping[fake_table.c.user_id], ) r = util.pickle.loads(util.pickle.dumps(r)) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: r["user_id"], + lambda: r._mapping["user_id"], ) result = users.outerjoin(addresses).select().execute() @@ -762,7 +780,7 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: r["user_id"], + lambda: r._mapping["user_id"], ) @testing.requires.duplicate_names_in_cursor_description @@ -781,35 +799,16 @@ class ResultProxyTest(fixtures.TablesTest): # as of 1.1 issue #3501, we use pure positional # targeting for the column objects here - eq_(row[users.c.user_id], 1) + eq_(row._mapping[users.c.user_id], 1) - eq_(row[ua.c.user_id], 1) + eq_(row._mapping[ua.c.user_id], 1) # this now works as of 1.1 issue #3501; # previously this was stuck on "ambiguous column name" assert_raises_message( exc.InvalidRequestError, "Could not locate column in row", - lambda: row[u2.c.user_id], - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_case_sensitive(self): - eng = engines.testing_engine(options=dict(case_sensitive=False)) - - row = eng.execute( - select( - [ - literal_column("1").label("SOMECOL"), - literal_column("1").label("SOMECOL"), - ] - ) - ).first() - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row["somecol"], + lambda: row._mapping[u2.c.user_id], ) @testing.requires.duplicate_names_in_cursor_description @@ -829,7 +828,12 @@ class ResultProxyTest(fixtures.TablesTest): row = result.first() eq_( - set([users.c.user_id in row, addresses.c.user_id in row]), + set( + [ + users.c.user_id in row._mapping, + addresses.c.user_id in row._mapping, + ] + ), set([True]), ) @@ -864,8 +868,8 @@ class ResultProxyTest(fixtures.TablesTest): ) ) row = result.first() - eq_(row[users.c.user_id], 1) - eq_(row[users.c.user_name], "john") + eq_(row._mapping[users.c.user_id], 1) + eq_(row._mapping[users.c.user_name], "john") def test_loose_matching_two(self): users = self.tables.users @@ -898,14 +902,14 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: row[users.c.user_id], + lambda: row._mapping[users.c.user_id], ) assert_raises_message( exc.InvalidRequestError, "Ambiguous column name", - lambda: row[addresses.c.user_id], + lambda: row._mapping[addresses.c.user_id], ) - eq_(row[users.c.user_name], "john") + eq_(row._mapping[users.c.user_name], "john") def test_ambiguous_column_by_col_plus_label(self): users = self.tables.users @@ -918,7 +922,7 @@ class ResultProxyTest(fixtures.TablesTest): ] ).execute() row = result.first() - eq_(row[users.c.user_id], 1) + eq_(row._mapping[users.c.user_id], 1) eq_(row[1], 1) def test_fetch_partial_result_map(self): @@ -969,8 +973,52 @@ class ResultProxyTest(fixtures.TablesTest): users.select().alias(users.name), ): row = s.select(use_labels=True).execute().first() - eq_(row[s.c.user_id], 7) - eq_(row[s.c.user_name], "ed") + eq_(row._mapping[s.c.user_id], 7) + eq_(row._mapping[s.c.user_name], "ed") + + @testing.requires.python3 + def test_ro_mapping_py3k(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name="foo") + result = users.select().execute() + + row = result.first() + dict_row = row._asdict() + + # dictionaries aren't ordered in Python 3 until 3.7 + odict_row = collections.OrderedDict( + [("user_id", 1), ("user_name", "foo")] + ) + eq_(dict_row, odict_row) + + mapping_row = row._mapping + + eq_(list(mapping_row), list(mapping_row.keys())) + eq_(odict_row.keys(), mapping_row.keys()) + eq_(odict_row.values(), mapping_row.values()) + eq_(odict_row.items(), mapping_row.items()) + + @testing.requires.python2 + def test_ro_mapping_py2k(self): + users = self.tables.users + + users.insert().execute(user_id=1, user_name="foo") + result = users.select().execute() + + row = result.first() + dict_row = row._asdict() + + odict_row = collections.OrderedDict( + [("user_id", 1), ("user_name", "foo")] + ) + eq_(dict_row, odict_row) + mapping_row = row._mapping + + eq_(list(mapping_row), list(mapping_row.keys())) + eq_(odict_row.keys(), list(mapping_row.keys())) + eq_(odict_row.values(), list(mapping_row.values())) + eq_(odict_row.items(), list(mapping_row.items())) def test_keys(self): users = self.tables.users @@ -979,7 +1027,8 @@ class ResultProxyTest(fixtures.TablesTest): result = users.select().execute() eq_(result.keys(), ["user_id", "user_name"]) row = result.first() - eq_(row.keys(), ["user_id", "user_name"]) + eq_(list(row._mapping.keys()), ["user_id", "user_name"]) + eq_(row._fields, ("user_id", "user_name")) def test_keys_anon_labels(self): """test [ticket:3483]""" @@ -999,7 +1048,8 @@ class ResultProxyTest(fixtures.TablesTest): eq_(result.keys(), ["user_id", "user_name_1", "count_1"]) row = result.first() - eq_(row.keys(), ["user_id", "user_name_1", "count_1"]) + eq_(row._fields, ("user_id", "user_name_1", "count_1")) + eq_(list(row._mapping.keys()), ["user_id", "user_name_1", "count_1"]) def test_items(self): users = self.tables.users @@ -1007,7 +1057,7 @@ class ResultProxyTest(fixtures.TablesTest): users.insert().execute(user_id=1, user_name="foo") r = users.select().execute().first() eq_( - [(x[0].lower(), x[1]) for x in list(r.items())], + [(x[0].lower(), x[1]) for x in list(r._mapping.items())], [("user_id", 1), ("user_name", "foo")], ) @@ -1046,8 +1096,8 @@ class ResultProxyTest(fixtures.TablesTest): r = users.select(users.c.user_id == 1).execute().first() eq_(r[0], 1) eq_(r[1], "foo") - eq_([x.lower() for x in list(r.keys())], ["user_id", "user_name"]) - eq_(list(r.values()), [1, "foo"]) + eq_([x.lower() for x in r._fields], ["user_id", "user_name"]) + eq_(list(r._mapping.values()), [1, "foo"]) def test_column_order_with_text_query(self): # should return values in query order @@ -1057,8 +1107,8 @@ class ResultProxyTest(fixtures.TablesTest): r = testing.db.execute("select user_name, user_id from users").first() eq_(r[0], "foo") eq_(r[1], 1) - eq_([x.lower() for x in list(r.keys())], ["user_name", "user_id"]) - eq_(list(r.values()), ["foo", 1]) + eq_([x.lower() for x in r._fields], ["user_name", "user_id"]) + eq_(list(r._mapping.values()), ["foo", 1]) @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()") @testing.crashes("firebird", "An identifier must begin with a letter") @@ -1086,23 +1136,23 @@ class ResultProxyTest(fixtures.TablesTest): r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() eq_(r.shadow_id, 1) - eq_(r["shadow_id"], 1) - eq_(r[shadowed.c.shadow_id], 1) + eq_(r._mapping["shadow_id"], 1) + eq_(r._mapping[shadowed.c.shadow_id], 1) eq_(r.shadow_name, "The Shadow") - eq_(r["shadow_name"], "The Shadow") - eq_(r[shadowed.c.shadow_name], "The Shadow") + eq_(r._mapping["shadow_name"], "The Shadow") + eq_(r._mapping[shadowed.c.shadow_name], "The Shadow") eq_(r.parent, "The Light") - eq_(r["parent"], "The Light") - eq_(r[shadowed.c.parent], "The Light") + eq_(r._mapping["parent"], "The Light") + eq_(r._mapping[shadowed.c.parent], "The Light") eq_(r.row, "Without light there is no shadow") - eq_(r["row"], "Without light there is no shadow") - eq_(r[shadowed.c.row], "Without light there is no shadow") + eq_(r._mapping["row"], "Without light there is no shadow") + eq_(r._mapping[shadowed.c.row], "Without light there is no shadow") - eq_(r["_parent"], "Hidden parent") - eq_(r["_row"], "Hidden row") + eq_(r._mapping["_parent"], "Hidden parent") + eq_(r._mapping["_row"], "Hidden row") def test_nontuple_row(self): """ensure the C version of BaseRow handles @@ -1131,7 +1181,7 @@ class ResultProxyTest(fixtures.TablesTest): ) eq_(list(proxy), ["value"]) eq_(proxy[0], "value") - eq_(proxy["key"], "value") + eq_(proxy._mapping["key"], "value") @testing.provide_metadata def test_no_rowcount_on_selects_inserts(self): @@ -1202,8 +1252,8 @@ class ResultProxyTest(fixtures.TablesTest): testing.db.execute(values.insert(), dict(key="One", value="Uno")) row = testing.db.execute(values.select()).first() - eq_(row["key"], "One") - eq_(row["value"], "Uno") + eq_(row._mapping["key"], "One") + eq_(row._mapping["value"], "Uno") eq_(row[0], "One") eq_(row[1], "Uno") eq_(row[-2], "One") @@ -1213,8 +1263,8 @@ class ResultProxyTest(fixtures.TablesTest): @testing.only_on("sqlite") def test_row_getitem_indexes_raw(self): row = testing.db.execute("select 'One' as key, 'Uno' as value").first() - eq_(row["key"], "One") - eq_(row["value"], "Uno") + eq_(row._mapping["key"], "One") + eq_(row._mapping["value"], "Uno") eq_(row[0], "One") eq_(row[1], "Uno") eq_(row[-2], "One") @@ -1366,12 +1416,12 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - eq_(row[expression], "a1") - eq_(row[lt], 2) + eq_(row._mapping[expression], "a1") + eq_(row._mapping[lt], 2) # Postgresql for example has the key as "?column?", which dupes # easily. we get around that because we know that "2" is unique - eq_(row["2"], 2) + eq_(row._mapping["2"], 2) def test_keyed_targeting_no_label_at_all_one(self): class not_named_max(expression.ColumnElement): @@ -1411,8 +1461,8 @@ class KeyTargetingTest(fixtures.TablesTest): stmt = select([t1, t2]).select_from(self.tables.keyed1) row = testing.db.execute(stmt).first() - eq_(row[t1], "a1") - eq_(row[t2], "a1") + eq_(row._mapping[t1], "a1") + eq_(row._mapping[t2], "a1") @testing.requires.duplicate_names_in_cursor_description def test_keyed_accessor_composite_conflict_2(self): @@ -1424,7 +1474,7 @@ class KeyTargetingTest(fixtures.TablesTest): ).first() # column access is unambiguous - eq_(row[self.tables.keyed2.c.b], "b2") + eq_(row._mapping[self.tables.keyed2.c.b], "b2") # row.a is ambiguous assert_raises_message( @@ -1459,10 +1509,10 @@ class KeyTargetingTest(fixtures.TablesTest): ).first() # column access is unambiguous - eq_(row[self.tables.keyed2.c.b], "b2") + eq_(row._mapping[self.tables.keyed2.c.b], "b2") - eq_(row["keyed2_b"], "b2") - eq_(row["keyed1_a"], "a1") + eq_(row._mapping["keyed2_b"], "b2") + eq_(row._mapping["keyed1_a"], "a1") def test_keyed_accessor_composite_names_precedent(self): keyed1 = self.tables.keyed1 @@ -1516,8 +1566,11 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.keyed1_c, "c1") eq_(row.keyed2_a, "a2") eq_(row.keyed2_b, "b2") + assert_raises(KeyError, lambda: row["keyed2_c"]) assert_raises(KeyError, lambda: row["keyed2_q"]) + assert_raises(KeyError, lambda: row._mapping["keyed2_c"]) + assert_raises(KeyError, lambda: row._mapping["keyed2_q"]) def test_keyed_accessor_column_is_repeated_multiple_times(self): # test new logic added as a result of the combination of #4892 and @@ -1568,10 +1621,10 @@ class KeyTargetingTest(fixtures.TablesTest): row = result.first() # keyed access will ignore the dupe cols - eq_(row[keyed2.c.a], "a2") - eq_(row[keyed3.c.a], "a3") + eq_(row._mapping[keyed2.c.a], "a2") + eq_(row._mapping[keyed3.c.a], "a3") eq_(result._getter(keyed3.c.a)(row), "a3") - eq_(row[keyed3.c.d], "d3") + eq_(row._mapping[keyed3.c.d], "d3") # however we can get everything positionally eq_(row, ("a2", "a3", "a2", "a2", "a3", "a3", "d3", "d3")) @@ -1591,8 +1644,8 @@ class KeyTargetingTest(fixtures.TablesTest): stmt = select([a, b]).select_from(table("keyed2")) row = testing.db.execute(stmt).first() - in_(a, row) - in_(b, row) + in_(a, row._mapping) + in_(b, row._mapping) def test_columnclause_schema_column_two(self): keyed2 = self.tables.keyed2 @@ -1600,17 +1653,16 @@ class KeyTargetingTest(fixtures.TablesTest): stmt = select([keyed2.c.a, keyed2.c.b]) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) + in_(keyed2.c.a, row._mapping) + in_(keyed2.c.b, row._mapping) def test_columnclause_schema_column_three(self): # this is also addressed by [ticket:2932] - stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) row = testing.db.execute(stmt).first() - in_(stmt.selected_columns.a, row) - in_(stmt.selected_columns.b, row) + in_(stmt.selected_columns.a, row._mapping) + in_(stmt.selected_columns.b, row._mapping) def test_columnclause_schema_column_four(self): # originally addressed by [ticket:2932], however liberalized @@ -1622,10 +1674,11 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(a, row) - in_(b, row) - in_(stmt.selected_columns.keyed2_a, row) - in_(stmt.selected_columns.keyed2_b, row) + in_(a, row._mapping) + in_(b, row._mapping) + + in_(stmt.selected_columns.keyed2_a, row._mapping) + in_(stmt.selected_columns.keyed2_b, row._mapping) def test_columnclause_schema_column_five(self): # this is also addressed by [ticket:2932] @@ -1635,8 +1688,8 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(stmt.selected_columns.keyed2_a, row) - in_(stmt.selected_columns.keyed2_b, row) + in_(stmt.selected_columns.keyed2_a, row._mapping) + in_(stmt.selected_columns.keyed2_b, row._mapping) class PositionalTextTest(fixtures.TablesTest): @@ -1668,13 +1721,14 @@ class PositionalTextTest(fixtures.TablesTest): result = testing.db.execute(stmt) row = result.first() - eq_(row[c2], "b1") - eq_(row[c4], "d1") + eq_(row._mapping[c2], "b1") + eq_(row._mapping[c4], "d1") eq_(row[1], "b1") - eq_(row["b"], "b1") - eq_(row.keys(), ["a", "b", "c", "d"]) - eq_(row["r"], "c1") - eq_(row["d"], "d1") + eq_(row._mapping["b"], "b1") + eq_(list(row._mapping.keys()), ["a", "b", "c", "d"]) + eq_(row._fields, ("a", "b", "c", "d")) + eq_(row._mapping["r"], "c1") + eq_(row._mapping["d"], "d1") def test_fewer_cols_than_sql_positional(self): c1, c2 = column("q"), column("p") @@ -1684,8 +1738,8 @@ class PositionalTextTest(fixtures.TablesTest): result = testing.db.execute(stmt) row = result.first() - eq_(row[c1], "a1") - eq_(row["c"], "c1") + eq_(row._mapping[c1], "a1") + eq_(row._mapping["c"], "c1") def test_fewer_cols_than_sql_non_positional(self): c1, c2 = column("a"), column("p") @@ -1696,15 +1750,17 @@ class PositionalTextTest(fixtures.TablesTest): row = result.first() # c1 name matches, locates - eq_(row[c1], "a1") - eq_(row["c"], "c1") + eq_(row._mapping[c1], "a1") + eq_(row._mapping["c"], "c1") # c2 name does not match, doesn't locate assert_raises_message( - exc.NoSuchColumnError, "in row for column 'p'", lambda: row[c2] + exc.NoSuchColumnError, + "in row for column 'p'", + lambda: row._mapping[c2], ) - def test_more_cols_than_sql(self): + def test_more_cols_than_sql_positional(self): c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") stmt = text("select a, b from text1").columns(c1, c2, c3, c4) @@ -1715,10 +1771,56 @@ class PositionalTextTest(fixtures.TablesTest): result = testing.db.execute(stmt) row = result.first() - eq_(row[c2], "b1") + eq_(row._mapping[c2], "b1") + + assert_raises_message( + exc.NoSuchColumnError, + "in row for column 'r'", + lambda: row._mapping[c3], + ) + + def test_more_cols_than_sql_nonpositional(self): + c1, c2, c3, c4 = column("b"), column("a"), column("r"), column("d") + stmt = TextualSelect( + text("select a, b from text1"), [c1, c2, c3, c4], positional=False + ) + + # no warning for non-positional + result = testing.db.execute(stmt) + + row = result.first() + eq_(row._mapping[c1], "b1") + eq_(row._mapping[c2], "a1") + + assert_raises_message( + exc.NoSuchColumnError, + "in row for column 'r'", + lambda: row._mapping[c3], + ) + + def test_more_cols_than_sql_nonpositional_labeled_cols(self): + text1 = self.tables.text1 + c1, c2, c3, c4 = text1.c.b, text1.c.a, column("r"), column("d") + + # the compiler will enable loose matching for this statement + # so that column._label is taken into account + stmt = TextualSelect( + text("select a, b AS text1_b from text1"), + [c1, c2, c3, c4], + positional=False, + ) + + # no warning for non-positional + result = testing.db.execute(stmt) + + row = result.first() + eq_(row._mapping[c1], "b1") + eq_(row._mapping[c2], "a1") assert_raises_message( - exc.NoSuchColumnError, "in row for column 'r'", lambda: row[c3] + exc.NoSuchColumnError, + "in row for column 'r'", + lambda: row._mapping[c3], ) def test_dupe_col_obj(self): @@ -1745,17 +1847,17 @@ class PositionalTextTest(fixtures.TablesTest): result = testing.db.execute(stmt) row = result.first() - eq_(row[c1], "a1") - eq_(row[c2], "b1") - eq_(row[c3], "c1") - eq_(row[c4], "d1") + eq_(row._mapping[c1], "a1") + eq_(row._mapping[c2], "b1") + eq_(row._mapping[c3], "c1") + eq_(row._mapping[c4], "d1") # text1.c.b goes nowhere....because we hit key fallback # but the text1.c.b doesn't derive from text1.c.c assert_raises_message( exc.NoSuchColumnError, "Could not locate column in row for column 'text1.b'", - lambda: row[text1.c.b], + lambda: row._mapping[text1.c.b], ) def test_anon_aliased_overlapping(self): @@ -1770,10 +1872,10 @@ class PositionalTextTest(fixtures.TablesTest): result = testing.db.execute(stmt) row = result.first() - eq_(row[c1], "a1") - eq_(row[c2], "b1") - eq_(row[c3], "c1") - eq_(row[c4], "d1") + eq_(row._mapping[c1], "a1") + eq_(row._mapping[c2], "b1") + eq_(row._mapping[c3], "c1") + eq_(row._mapping[c4], "d1") def test_anon_aliased_name_conflict(self): text1 = self.tables.text1 @@ -1791,17 +1893,17 @@ class PositionalTextTest(fixtures.TablesTest): result = testing.db.execute(stmt) row = result.first() - eq_(row[c1], "a1") - eq_(row[c2], "b1") - eq_(row[c3], "c1") - eq_(row[c4], "d1") + eq_(row._mapping[c1], "a1") + eq_(row._mapping[c2], "b1") + eq_(row._mapping[c3], "c1") + eq_(row._mapping[c4], "d1") # fails, because we hit key fallback and find conflicts # in columns that are presnet assert_raises_message( exc.NoSuchColumnError, "Could not locate column in row for column 'text1.a'", - lambda: row[text1.c.a], + lambda: row._mapping[text1.c.a], ) @@ -1834,8 +1936,11 @@ class AlternateResultProxyTest(fixtures.TablesTest): self.table = self.tables.test class ExcCtx(default.DefaultExecutionContext): + def get_result_cursor_strategy(self, result): + return cls.create(result) + def get_result_proxy(self): - return cls(self) + raise NotImplementedError() self.patcher = patch.object( self.engine.dialect, "execution_ctx_cls", ExcCtx @@ -1847,7 +1952,7 @@ class AlternateResultProxyTest(fixtures.TablesTest): with self._proxy_fixture(cls): rows = [] r = self.engine.execute(select([self.table])) - assert isinstance(r, cls) + assert isinstance(r.cursor_strategy, cls) for i in range(5): rows.append(r.fetchone()) eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) @@ -1908,40 +2013,42 @@ class AlternateResultProxyTest(fixtures.TablesTest): ) def test_basic_plain(self): - self._test_proxy(_result.ResultProxy) + self._test_proxy(_result.DefaultCursorFetchStrategy) def test_basic_buffered_row_result_proxy(self): - self._test_proxy(_result.BufferedRowResultProxy) + self._test_proxy(_result.BufferedRowCursorFetchStrategy) def test_basic_fully_buffered_result_proxy(self): - self._test_proxy(_result.FullyBufferedResultProxy) + self._test_proxy(_result.FullyBufferedCursorFetchStrategy) def test_basic_buffered_column_result_proxy(self): - self._test_proxy(_result.BufferedColumnResultProxy) + self._test_proxy(_result.DefaultCursorFetchStrategy) def test_resultprocessor_plain(self): - self._test_result_processor(_result.ResultProxy, False) + self._test_result_processor(_result.DefaultCursorFetchStrategy, False) def test_resultprocessor_plain_cached(self): - self._test_result_processor(_result.ResultProxy, True) - - def test_resultprocessor_buffered_column(self): - self._test_result_processor(_result.BufferedColumnResultProxy, False) - - def test_resultprocessor_buffered_column_cached(self): - self._test_result_processor(_result.BufferedColumnResultProxy, True) + self._test_result_processor(_result.DefaultCursorFetchStrategy, True) def test_resultprocessor_buffered_row(self): - self._test_result_processor(_result.BufferedRowResultProxy, False) + self._test_result_processor( + _result.BufferedRowCursorFetchStrategy, False + ) def test_resultprocessor_buffered_row_cached(self): - self._test_result_processor(_result.BufferedRowResultProxy, True) + self._test_result_processor( + _result.BufferedRowCursorFetchStrategy, True + ) def test_resultprocessor_fully_buffered(self): - self._test_result_processor(_result.FullyBufferedResultProxy, False) + self._test_result_processor( + _result.FullyBufferedCursorFetchStrategy, False + ) def test_resultprocessor_fully_buffered_cached(self): - self._test_result_processor(_result.FullyBufferedResultProxy, True) + self._test_result_processor( + _result.FullyBufferedCursorFetchStrategy, True + ) def _test_result_processor(self, cls, use_cache): class MyType(TypeDecorator): @@ -1963,7 +2070,7 @@ class AlternateResultProxyTest(fixtures.TablesTest): @testing.fixture def row_growth_fixture(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): + with self._proxy_fixture(_result.BufferedRowCursorFetchStrategy): with self.engine.connect() as conn: conn.execute( self.table.insert(), @@ -2001,7 +2108,7 @@ class AlternateResultProxyTest(fixtures.TablesTest): max_size = max(checks.values()) for idx, row in enumerate(result, 0): if idx in checks: - assertion[idx] = result._bufsize - le_(len(result._BufferedRowResultProxy__rowbuffer), max_size) + assertion[idx] = result.cursor_strategy._bufsize + le_(len(result.cursor_strategy._rowbuffer), max_size) eq_(checks, assertion) |
