diff options
Diffstat (limited to 'test/sql/test_resultset.py')
| -rw-r--r-- | test/sql/test_resultset.py | 107 |
1 files changed, 97 insertions, 10 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index f08248440..253ad7b38 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -29,9 +29,11 @@ from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql import ColumnElement from sqlalchemy.sql import expression from sqlalchemy.sql.selectable import TextualSelect from sqlalchemy.sql.sqltypes import NULLTYPE +from sqlalchemy.sql.util import ClauseAdapter from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assertions @@ -1391,16 +1393,20 @@ class KeyTargetingTest(fixtures.TablesTest): @classmethod def insert_data(cls): - cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) - cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) - cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) - cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) - cls.tables.content.insert().execute(type="t1") - - if testing.requires.schemas.enabled: - cls.tables[ - "%s.wschema" % testing.config.test_schema - ].insert().execute(dict(b="a1", q="c1")) + with testing.db.begin() as conn: + conn.execute(cls.tables.keyed1.insert(), dict(b="a1", q="c1")) + conn.execute(cls.tables.keyed2.insert(), dict(a="a2", b="b2")) + conn.execute(cls.tables.keyed3.insert(), dict(a="a3", d="d3")) + conn.execute(cls.tables.keyed4.insert(), dict(b="b4", q="q4")) + conn.execute(cls.tables.content.insert(), dict(type="t1")) + + if testing.requires.schemas.enabled: + conn.execute( + cls.tables[ + "%s.wschema" % testing.config.test_schema + ].insert(), + dict(b="a1", q="c1"), + ) @testing.requires.schemas def test_keyed_accessor_wschema(self): @@ -1712,6 +1718,87 @@ class KeyTargetingTest(fixtures.TablesTest): in_(stmt.selected_columns.keyed2_a, row._mapping) in_(stmt.selected_columns.keyed2_b, row._mapping) + def _adapt_result_columns_fixture_one(self): + keyed1 = self.tables.keyed1 + stmt = ( + select([keyed1.c.b, keyed1.c.q.label("foo")]) + .apply_labels() + .subquery() + ) + + return select([stmt.c.keyed1_b, stmt.c.foo]) + + def _adapt_result_columns_fixture_two(self): + return text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + keyed2_a=CHAR, keyed2_b=CHAR + ) + + def _adapt_result_columns_fixture_three(self): + keyed1 = self.tables.keyed1 + stmt = select([keyed1.c.b, keyed1.c.q.label("foo")]).subquery() + + return select([stmt.c.b, stmt.c.foo]) + + def _adapt_result_columns_fixture_four(self): + keyed1 = self.tables.keyed1 + + stmt1 = select([keyed1]).apply_labels() + + a1 = keyed1.alias() + stmt2 = ClauseAdapter(a1).traverse(stmt1) + + return stmt2 + + @testing.combinations( + _adapt_result_columns_fixture_one, + _adapt_result_columns_fixture_two, + _adapt_result_columns_fixture_three, + _adapt_result_columns_fixture_four, + argnames="stmt_fn", + ) + def test_adapt_result_columns(self, connection, stmt_fn): + """test adaptation of a CursorResultMetadata to another one. + + + This copies the _keymap from one to the other in terms of the + selected columns of a target selectable. + + This is used by the statement caching process to re-use the + CursorResultMetadata from the cached statement against the same + statement sent separately. + + """ + + stmt1 = stmt_fn(self) + stmt2 = stmt_fn(self) + + eq_(stmt1._generate_cache_key(), stmt2._generate_cache_key()) + + column_linkage = dict( + zip(stmt1.selected_columns, stmt2.selected_columns) + ) + + result = connection.execute(stmt1) + + mock_context = Mock( + compiled=result.context.compiled, invoked_statement=stmt2 + ) + existing_metadata = result._metadata + adapted_metadata = existing_metadata._adapt_to_context(mock_context) + + eq_(existing_metadata.keys, adapted_metadata.keys) + + for k in existing_metadata._keymap: + if isinstance(k, ColumnElement) and k in column_linkage: + other_k = column_linkage[k] + else: + other_k = k + + is_( + existing_metadata._keymap[k], adapted_metadata._keymap[other_k] + ) + return stmt1, existing_metadata, stmt2, adapted_metadata + class PositionalTextTest(fixtures.TablesTest): run_inserts = "once" |
