From 5fb0138a3220161703e6ab1087319a669d14e7f4 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 4 Jul 2020 12:21:36 -0400 Subject: Implement rudimentary asyncio support w/ asyncpg Using the approach introduced at https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e We can now create asyncio endpoints that are then handled in "implicit IO" form within the majority of the Core internals. Then coroutines are re-exposed at the point at which we call into asyncpg methods. Patch includes: * asyncpg dialect * asyncio package * engine, result, ORM session classes * new test fixtures, tests * some work with pep-484 and a short plugin for the pyannotate package, which seems to have so-so results Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d Fixes: #3414 --- lib/sqlalchemy/engine/base.py | 7 ++- lib/sqlalchemy/engine/create.py | 2 +- lib/sqlalchemy/engine/default.py | 3 ++ lib/sqlalchemy/engine/result.py | 96 ++++++++++++++++++++++++++-------------- 4 files changed, 73 insertions(+), 35 deletions(-) (limited to 'lib/sqlalchemy/engine') diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index d60f14f31..34bf720b7 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -89,6 +89,7 @@ class Connection(Connectable): if connection is not None else engine.raw_connection() ) + self._transaction = self._nested_transaction = None self.__savepoint_seq = 0 self.__in_begin = False @@ -623,6 +624,9 @@ class Connection(Connectable): self._dbapi_connection.detach() + def _autobegin(self): + self.begin() + def begin(self): """Begin a transaction and return a transaction handle. @@ -1433,7 +1437,7 @@ class Connection(Connectable): self._invalid_transaction() if self._is_future and self._transaction is None: - self.begin() + self._autobegin() context.pre_exec() @@ -2592,6 +2596,7 @@ class Engine(Connectable, log.Identified): return self.conn def __exit__(self, type_, value, traceback): + if type_ is not None: self.transaction.rollback() else: diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index dc895ee15..66173d9b0 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -553,7 +553,7 @@ def create_engine(url, **kwargs): poolclass = pop_kwarg("poolclass", None) if poolclass is None: - poolclass = dialect_cls.get_pool_class(u) + poolclass = dialect.get_dialect_pool_class(u) pool_args = {"dialect": dialect} # consume pool arguments from kwargs, translating a few of diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index c76f820f9..4fb20a3d5 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -317,6 +317,9 @@ class DefaultDialect(interfaces.Dialect): def get_pool_class(cls, url): return getattr(cls, "poolclass", pool.QueuePool) + def get_dialect_pool_class(self, url): + return self.get_pool_class(url) + @classmethod def load_provisioning(cls): package = ".".join(cls.__module__.split(".")[0:-1]) diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 9badbffc3..10a88c7d8 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -20,6 +20,7 @@ from ..sql.base import _generative from ..sql.base import HasMemoized from ..sql.base import InPlaceGenerative from ..util import collections_abc +from ..util import py2k if util.TYPE_CHECKING: from typing import Any @@ -616,6 +617,16 @@ class ResultInternal(InPlaceGenerative): else: return row + def _iter_impl(self): + return self._iterator_getter(self) + + def _next_impl(self): + row = self._onerow_getter(self) + if row is _NO_ROW: + raise StopIteration() + else: + return row + @_generative def _column_slices(self, indexes): real_result = self._real_result if self._real_result else self @@ -892,16 +903,15 @@ class Result(ResultInternal): raise NotImplementedError() def __iter__(self): - return self._iterator_getter(self) + return self._iter_impl() def __next__(self): - row = self._onerow_getter(self) - if row is _NO_ROW: - raise StopIteration() - else: - return row + return self._next_impl() + + if py2k: - next = __next__ + def next(self): # noqa + return self._next_impl() def partitions(self, size=None): # type: (Optional[Int]) -> Iterator[List[Row]] @@ -1015,12 +1025,10 @@ class Result(ResultInternal): column of the first row, use the :meth:`.Result.scalar` method, or combine :meth:`.Result.scalars` and :meth:`.Result.first`. - .. comment: A warning is emitted if additional rows remain. - :return: a :class:`.Row` object, or None if no rows remain. - .. seealso:: + .. seealso:: :meth:`_result.Result.scalar` @@ -1186,18 +1194,6 @@ class FilterResult(ResultInternal): def _attributes(self): return self._real_result._attributes - def __iter__(self): - return self._iterator_getter(self) - - def __next__(self): - row = self._onerow_getter(self) - if row is _NO_ROW: - raise StopIteration() - else: - return row - - next = __next__ - def _fetchiter_impl(self): return self._real_result._fetchiter_impl() @@ -1299,6 +1295,17 @@ class ScalarResult(FilterResult): """ return self._allrows() + def __iter__(self): + return self._iter_impl() + + def __next__(self): + return self._next_impl() + + if py2k: + + def next(self): # noqa + return self._next_impl() + def first(self): # type: () -> Optional[Any] """Fetch the first object or None if no object is present. @@ -1409,7 +1416,7 @@ class MappingResult(FilterResult): def fetchall(self): # type: () -> List[Mapping] - """A synonym for the :meth:`_engine.ScalarResult.all` method.""" + """A synonym for the :meth:`_engine.MappingResult.all` method.""" return self._allrows() @@ -1453,6 +1460,17 @@ class MappingResult(FilterResult): return self._allrows() + def __iter__(self): + return self._iter_impl() + + def __next__(self): + return self._next_impl() + + if py2k: + + def next(self): # noqa + return self._next_impl() + def first(self): # type: () -> Optional[Mapping] """Fetch the first object or None if no object is present. @@ -1519,13 +1537,11 @@ class FrozenResult(object): .. seealso:: - .. seealso:: - - :ref:`do_orm_execute_re_executing` - example usage within the - ORM to implement a result-set cache. + :ref:`do_orm_execute_re_executing` - example usage within the + ORM to implement a result-set cache. - :func:`_orm.loading.merge_frozen_result` - ORM function to merge - a frozen result back into a :class:`_orm.Session`. + :func:`_orm.loading.merge_frozen_result` - ORM function to merge + a frozen result back into a :class:`_orm.Session`. """ @@ -1624,21 +1640,36 @@ class ChunkedIteratorResult(IteratorResult): """ def __init__( - self, cursor_metadata, chunks, source_supports_scalars=False, raw=None + self, + cursor_metadata, + chunks, + source_supports_scalars=False, + raw=None, + dynamic_yield_per=False, ): self._metadata = cursor_metadata self.chunks = chunks self._source_supports_scalars = source_supports_scalars self.raw = raw self.iterator = itertools.chain.from_iterable(self.chunks(None)) + self.dynamic_yield_per = dynamic_yield_per @_generative def yield_per(self, num): + # TODO: this throws away the iterator which may be holding + # onto a chunk. the yield_per cannot be changed once any + # rows have been fetched. either find a way to enforce this, + # or we can't use itertools.chain and will instead have to + # keep track. + self._yield_per = num - # TODO: this should raise if the iterator has already been started. - # we can't change the yield mid-stream like this self.iterator = itertools.chain.from_iterable(self.chunks(num)) + def _fetchmany_impl(self, size=None): + if self.dynamic_yield_per: + self.iterator = itertools.chain.from_iterable(self.chunks(size)) + return super(ChunkedIteratorResult, self)._fetchmany_impl(size=size) + class MergedResult(IteratorResult): """A :class:`_engine.Result` that is merged from any number of @@ -1677,6 +1708,5 @@ class MergedResult(IteratorResult): def _soft_close(self, hard=False): for r in self._results: r._soft_close(hard=hard) - if hard: self.closed = True -- cgit v1.2.1