diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-21 12:51:13 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-01 16:09:24 -0400 |
| commit | aded39f68c29e44a50c85be1ddb370d3d1affe9d (patch) | |
| tree | 0855ecfe2ecf5092f1e350c33f460571f495f1b8 /test/sql | |
| parent | 18ce4f9937c2d6753acbb054b4990c7da298a5d7 (diff) | |
| download | sqlalchemy-aded39f68c29e44a50c85be1ddb370d3d1affe9d.tar.gz | |
Propose Result as immediate replacement for ResultProxy
As progress is made on the _future.Result, including breaking
it out such that DBAPI behaviors are local to specific
implementations, it becomes apparent that the Result object
is a functional superset of ResultProxy and that basic
operations like fetchone(), fetchall(), and fetchmany()
behave pretty much exactly the same way on the new object.
Reorganize things so that ResultProxy is now referred to
as LegacyCursorResult, which subclasses CursorResult
that represents the DBAPI-cursor version of Result,
making use of a multiple inheritance pattern so that
the functionality of Result is also available in non-DBAPI
contexts, as will be necessary for some ORM
patterns.
Additionally propose the composition system for Result
that will form the basis for ORM-alternative result
systems such as horizontal sharding and dogpile cache.
As ORM results will soon be coming directly from
instances of Result, these extensions will instead
build their own ResultFetchStrategies that perform
the special steps to create composed or cached
result sets.
Also considering at the moment not emitting deprecation
warnings for fetchXYZ() methods; the immediate issue
is Keystone tests are calling upon it, but as the
implementations here are proving to be not in any
kind of conflict with how Result works, there's
not too much issue leaving them around and deprecating
at some later point.
References: #5087
References: #4395
Fixes: #4959
Change-Id: I8091919d45421e3f53029b8660427f844fee0228
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_compiler.py | 2 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 366 |
2 files changed, 333 insertions, 35 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 440eaa05a..4b0b58b7e 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -5020,7 +5020,7 @@ class ResultMapTest(fixtures.TestBase): ) def test_nested_api(self): - from sqlalchemy.engine.result import CursorResultMetaData + from sqlalchemy.engine.cursor import CursorResultMetaData stmt2 = select([table2]).subquery() diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 1611dc1ba..a0c456056 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -25,8 +25,8 @@ from sqlalchemy import type_coerce from sqlalchemy import TypeDecorator from sqlalchemy import util from sqlalchemy import VARCHAR +from sqlalchemy.engine import cursor as _cursor from sqlalchemy.engine import default -from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row from sqlalchemy.ext.compiler import compiles from sqlalchemy.future import select as future_select @@ -43,7 +43,6 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ from sqlalchemy.testing import is_ -from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import le_ from sqlalchemy.testing import ne_ @@ -55,7 +54,7 @@ from sqlalchemy.testing.schema import Table from sqlalchemy.util import collections_abc -class ResultProxyTest(fixtures.TablesTest): +class CursorResultTest(fixtures.TablesTest): __backend__ = True @classmethod @@ -564,9 +563,9 @@ class ResultProxyTest(fixtures.TablesTest): # these proxies don't work with no cursor.description present. # so they don't apply to this test at the moment. - # result.FullyBufferedResultProxy, - # result.BufferedRowResultProxy, - # result.BufferedColumnResultProxy + # result.FullyBufferedCursorResult, + # result.BufferedRowCursorResult, + # result.BufferedColumnCursorResult users = self.tables.users @@ -578,7 +577,9 @@ class ResultProxyTest(fixtures.TablesTest): lambda r: r.scalar(), lambda r: r.fetchmany(), lambda r: r._getter("user"), - lambda r: r._has_key("user"), + lambda r: r.keys(), + lambda r: r.columns("user"), + lambda r: r.cursor_strategy.fetchone(r), ]: trans = conn.begin() result = conn.execute(users.insert(), user_id=1) @@ -648,11 +649,17 @@ class ResultProxyTest(fixtures.TablesTest): result = testing.db.execute(text("update users set user_id=5")) connection = result.connection assert connection.closed + assert_raises_message( exc.ResourceClosedError, "This result object does not return rows.", result.fetchone, ) + assert_raises_message( + exc.ResourceClosedError, + "This result object does not return rows.", + result.keys, + ) def test_row_case_sensitive(self, connection): row = connection.execute( @@ -788,16 +795,6 @@ class ResultProxyTest(fixtures.TablesTest): lambda: r._mapping["user_id"], ) - result = connection.execute(users.outerjoin(addresses).select()) - result = _result.BufferedColumnResultProxy(result.context) - r = result.first() - assert isinstance(r, _result.BufferedColumnRow) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r._mapping["user_id"], - ) - @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_by_col(self, connection): users = self.tables.users @@ -1031,7 +1028,43 @@ class ResultProxyTest(fixtures.TablesTest): eq_(odict_row.values(), list(mapping_row.values())) eq_(odict_row.items(), list(mapping_row.items())) - def test_keys(self, connection): + @testing.combinations( + (lambda result: result), + (lambda result: result.first(),), + (lambda result: result.first()._mapping), + argnames="get_object", + ) + def test_keys(self, connection, get_object): + users = self.tables.users + addresses = self.tables.addresses + + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute(users.select()) + + obj = get_object(result) + + # Row still has a .keys() method as well as LegacyRow + # as in 1.3.x, the KeyedTuple object also had a keys() method. + # it emits a 2.0 deprecation warning. + keys = obj.keys() + + # in 1.4, keys() is now a view that includes support for testing + # of columns and other objects + eq_(len(keys), 2) + eq_(list(keys), ["user_id", "user_name"]) + eq_(keys, ["user_id", "user_name"]) + ne_(keys, ["user_name", "user_id"]) + in_("user_id", keys) + not_in_("foo", keys) + in_(users.c.user_id, keys) + not_in_(0, keys) + not_in_(addresses.c.user_id, keys) + not_in_(addresses.c.address, keys) + + if isinstance(obj, Row): + eq_(obj._fields, ("user_id", "user_name")) + + def test_row_mapping_keys(self, connection): users = self.tables.users connection.execute(users.insert(), user_id=1, user_name="foo") @@ -1041,6 +1074,10 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(row._mapping.keys()), ["user_id", "user_name"]) eq_(row._fields, ("user_id", "user_name")) + in_("user_id", row.keys()) + not_in_("foo", row.keys()) + in_(users.c.user_id, row.keys()) + def test_row_keys_legacy_dont_warn(self, connection): users = self.tables.users @@ -1645,7 +1682,6 @@ class KeyTargetingTest(fixtures.TablesTest): ) result = connection.execute(stmt) - is_false(result._metadata.matched_on_name) # ensure the result map is the same number of cols so we can # use positional targeting @@ -2032,7 +2068,7 @@ class PositionalTextTest(fixtures.TablesTest): ) -class AlternateResultProxyTest(fixtures.TablesTest): +class AlternateCursorResultTest(fixtures.TablesTest): __requires__ = ("sqlite",) @classmethod @@ -2139,41 +2175,41 @@ class AlternateResultProxyTest(fixtures.TablesTest): ) def test_basic_plain(self): - self._test_proxy(_result.DefaultCursorFetchStrategy) + self._test_proxy(_cursor.CursorFetchStrategy) def test_basic_buffered_row_result_proxy(self): - self._test_proxy(_result.BufferedRowCursorFetchStrategy) + self._test_proxy(_cursor.BufferedRowCursorFetchStrategy) def test_basic_fully_buffered_result_proxy(self): - self._test_proxy(_result.FullyBufferedCursorFetchStrategy) + self._test_proxy(_cursor.FullyBufferedCursorFetchStrategy) def test_basic_buffered_column_result_proxy(self): - self._test_proxy(_result.DefaultCursorFetchStrategy) + self._test_proxy(_cursor.CursorFetchStrategy) def test_resultprocessor_plain(self): - self._test_result_processor(_result.DefaultCursorFetchStrategy, False) + self._test_result_processor(_cursor.CursorFetchStrategy, False) def test_resultprocessor_plain_cached(self): - self._test_result_processor(_result.DefaultCursorFetchStrategy, True) + self._test_result_processor(_cursor.CursorFetchStrategy, True) def test_resultprocessor_buffered_row(self): self._test_result_processor( - _result.BufferedRowCursorFetchStrategy, False + _cursor.BufferedRowCursorFetchStrategy, False ) def test_resultprocessor_buffered_row_cached(self): self._test_result_processor( - _result.BufferedRowCursorFetchStrategy, True + _cursor.BufferedRowCursorFetchStrategy, True ) def test_resultprocessor_fully_buffered(self): self._test_result_processor( - _result.FullyBufferedCursorFetchStrategy, False + _cursor.FullyBufferedCursorFetchStrategy, False ) def test_resultprocessor_fully_buffered_cached(self): self._test_result_processor( - _result.FullyBufferedCursorFetchStrategy, True + _cursor.FullyBufferedCursorFetchStrategy, True ) def _test_result_processor(self, cls, use_cache): @@ -2196,7 +2232,7 @@ class AlternateResultProxyTest(fixtures.TablesTest): @testing.fixture def row_growth_fixture(self): - with self._proxy_fixture(_result.BufferedRowCursorFetchStrategy): + with self._proxy_fixture(_cursor.BufferedRowCursorFetchStrategy): with self.engine.connect() as conn: conn.execute( self.table.insert(), @@ -2237,10 +2273,230 @@ class AlternateResultProxyTest(fixtures.TablesTest): assertion[idx] = result.cursor_strategy._bufsize le_(len(result.cursor_strategy._rowbuffer), max_size) - eq_(checks, assertion) + def test_buffered_fetchmany_fixed(self, row_growth_fixture): + """The BufferedRow cursor strategy will defer to the fetchmany + size passed when given rather than using the buffer growth + heuristic. + + """ + result = row_growth_fixture.execute(self.table.select()) + eq_(len(result.cursor_strategy._rowbuffer), 1) + + rows = result.fetchmany(300) + eq_(len(rows), 300) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + rows = result.fetchmany(300) + eq_(len(rows), 300) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + bufsize = result.cursor_strategy._bufsize + result.fetchone() + + # the fetchone() caused it to buffer a full set of rows + eq_(len(result.cursor_strategy._rowbuffer), bufsize - 1) + + # assert partitions uses fetchmany(), therefore controlling + # how the buffer is used + lens = [] + for partition in result.partitions(180): + lens.append(len(partition)) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + for lp in lens[0:-1]: + eq_(lp, 180) + + def test_buffered_fetchmany_yield_per(self, connection): + table = self.tables.test + + connection.execute( + table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], + ) + + result = connection.execute(table.select()) + assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy) + result.fetchmany(5) -class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest): + result = result.yield_per(100) + assert isinstance( + result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(result.cursor_strategy._bufsize, 100) + eq_(result.cursor_strategy._growth_factor, 0) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + result.fetchone() + eq_(len(result.cursor_strategy._rowbuffer), 99) + + for i, row in enumerate(result): + if i == 188: + break + + # buffer of 98, plus buffer of 99 - 89, 10 rows + eq_(len(result.cursor_strategy._rowbuffer), 10) + + def test_buffered_fetchmany_yield_per_all(self, connection): + table = self.tables.test + + connection.execute( + table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 500)], + ) + + result = connection.execute(table.select()) + assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy) + + result.fetchmany(5) + + result = result.yield_per(0) + assert isinstance( + result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(result.cursor_strategy._bufsize, 0) + eq_(result.cursor_strategy._growth_factor, 0) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + result.fetchone() + eq_(len(result.cursor_strategy._rowbuffer), 490) + + for i, row in enumerate(result): + if i == 188: + break + + eq_(len(result.cursor_strategy._rowbuffer), 301) + + # already buffered, so this doesn't change things + result.yield_per(10) + + result.fetchmany(5) + eq_(len(result.cursor_strategy._rowbuffer), 296) + + +class MergeCursorResultTest(fixtures.TablesTest): + __backend__ = True + + __requires__ = ("independent_cursors",) + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + @classmethod + def insert_data(cls, connection): + users = cls.tables.users + + connection.execute( + users.insert(), + [ + {"user_id": 7, "user_name": "u1"}, + {"user_id": 8, "user_name": "u2"}, + {"user_id": 9, "user_name": "u3"}, + {"user_id": 10, "user_name": "u4"}, + {"user_id": 11, "user_name": "u5"}, + {"user_id": 12, "user_name": "u6"}, + ], + ) + + @testing.fixture + def merge_fixture(self): + users = self.tables.users + + def results(connection): + + r1 = connection.execute( + users.select() + .where(users.c.user_id.in_([7, 8])) + .order_by(users.c.user_id) + ) + r2 = connection.execute( + users.select() + .where(users.c.user_id.in_([9])) + .order_by(users.c.user_id) + ) + r3 = connection.execute( + users.select() + .where(users.c.user_id.in_([10, 11])) + .order_by(users.c.user_id) + ) + r4 = connection.execute( + users.select() + .where(users.c.user_id.in_([12])) + .order_by(users.c.user_id) + ) + return r1, r2, r3, r4 + + return results + + def test_merge_results(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + + eq_(result.keys(), ["user_id", "user_name"]) + row = result.fetchone() + eq_(row, (7, "u1")) + result.close() + + def test_close(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + + for r in [result, r1, r2, r3, r4]: + assert not r.closed + + result.close() + for r in [result, r1, r2, r3, r4]: + assert r.closed + + def test_fetchall(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + eq_( + result.fetchall(), + [ + (7, "u1"), + (8, "u2"), + (9, "u3"), + (10, "u4"), + (11, "u5"), + (12, "u6"), + ], + ) + for r in [r1, r2, r3, r4]: + assert r._soft_closed + + def test_first(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + eq_( + result.first(), (7, "u1"), + ) + for r in [r1, r2, r3, r4]: + assert r.closed + + def test_columns(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + eq_( + result.columns("user_name").fetchmany(4), + [("u1",), ("u2",), ("u3",), ("u4",)], + ) + result.close() + + +class GenerativeResultTest(fixtures.TablesTest): __backend__ = True @classmethod @@ -2303,7 +2559,49 @@ class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest): result = connection.execute( future_select(users).order_by(users.c.user_id) ) - eq_(result.columns(*columns).all(), expected) + + all_ = result.columns(*columns).all() + eq_(all_, expected) + + # ensure Row / LegacyRow comes out with .columns + assert type(all_[0]) is result._process_row + + def test_columns_twice(self, connection): + users = self.tables.users + connection.execute( + users.insert(), + [{"user_id": 7, "user_name": "jack", "x": 1, "y": 2}], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + + all_ = ( + result.columns("x", "y", "user_name", "user_id") + .columns("user_name", "x") + .all() + ) + eq_(all_, [("jack", 1)]) + + # ensure Row / LegacyRow comes out with .columns + assert type(all_[0]) is result._process_row + + def test_columns_plus_getter(self, connection): + users = self.tables.users + connection.execute( + users.insert(), + [{"user_id": 7, "user_name": "jack", "x": 1, "y": 2}], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + + result = result.columns("x", "y", "user_name") + getter = result._metadata._getter("y") + + eq_(getter(result.first()), 2) def test_partitions(self, connection): users = self.tables.users |
