diff options
| -rw-r--r-- | doc/build/changelog/unreleased_14/4914.rst | 12 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 3 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/result.py | 33 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 62 |
4 files changed, 62 insertions, 48 deletions
diff --git a/doc/build/changelog/unreleased_14/4914.rst b/doc/build/changelog/unreleased_14/4914.rst new file mode 100644 index 000000000..49ad91968 --- /dev/null +++ b/doc/build/changelog/unreleased_14/4914.rst @@ -0,0 +1,12 @@ +.. change:: + :tags: usecase, postgresql + :tickets: 4914 + + The maximum buffer size for the :class:`.BufferedRowResultProxy`, which + is used by dialects such as PostgreSQL when ``stream_results=True``, can + now be set to a number greater than 1000 and the buffer will grow to + that size. Previously, the buffer would not go beyond 1000 even if the + value were set larger. The growth of the buffer is also now based + on a simple multiplying factor currently set to 5. Pull request courtesy + Soumaya Mauthoor. + diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 1a4db1108..14d49ee15 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -137,7 +137,8 @@ The following DBAPI-specific options are respected when used with interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the buffer will grow to ultimately store 1000 rows at a time. - .. versionadded:: 1.0.6 + .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than + 1000, and the buffer will grow to that size. .. _psycopg2_executemany_mode: diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 733bd6f6a..004a84da5 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -1486,10 +1486,8 @@ class BufferedRowResultProxy(ResultProxy): The pre-fetching behavior fetches only one row initially, and then grows its buffer size by a fixed amount with each successive need - for additional rows up to a size of 1000. - - The size argument is configurable using the ``max_row_buffer`` - execution option:: + for additional rows up the ``max_row_buffer`` size, which defaults + to 1000:: with psycopg2_engine.connect() as conn: @@ -1497,7 +1495,7 @@ class BufferedRowResultProxy(ResultProxy): stream_results=True, max_row_buffer=50 ).execute("select * from table") - .. versionadded:: 1.0.6 Added the ``max_row_buffer`` option. + .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. .. seealso:: @@ -1506,34 +1504,21 @@ class BufferedRowResultProxy(ResultProxy): def _init_metadata(self): self._max_row_buffer = self.context.execution_options.get( - "max_row_buffer", None + "max_row_buffer", 1000 ) + self._growth_factor = 5 self.__buffer_rows() super(BufferedRowResultProxy, self)._init_metadata() - # this is a "growth chart" for the buffering of rows. - # each successive __buffer_rows call will use the next - # value in the list for the buffer size until the max - # is reached - size_growth = { - 1: 5, - 5: 10, - 10: 20, - 20: 50, - 50: 100, - 100: 250, - 250: 500, - 500: 1000, - } - def __buffer_rows(self): if self.cursor is None: return size = getattr(self, "_bufsize", 1) self.__rowbuffer = collections.deque(self.cursor.fetchmany(size)) - self._bufsize = self.size_growth.get(size, size) - if self._max_row_buffer is not None: - self._bufsize = min(self._max_row_buffer, self._bufsize) + if size < self._max_row_buffer: + self._bufsize = min( + self._max_row_buffer, size * self._growth_factor + ) def _soft_close(self, **kw): self.__rowbuffer.clear() diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 2563c7d0c..794508a32 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -1942,31 +1942,47 @@ class AlternateResultProxyTest(fixtures.TablesTest): r = conn.execute(stmt) eq_(r.scalar(), "HI THERE") - def test_buffered_row_growth(self): + @testing.fixture + def row_growth_fixture(self): with self._proxy_fixture(_result.BufferedRowResultProxy): with self.engine.connect() as conn: conn.execute( self.table.insert(), - [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], + [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], ) - result = conn.execute(self.table.select()) - checks = {0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1351: 1000} - for idx, row in enumerate(result, 0): - if idx in checks: - eq_(result._bufsize, checks[idx]) - le_(len(result._BufferedRowResultProxy__rowbuffer), 1000) - - def test_max_row_buffer_option(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): - with self.engine.connect() as conn: - conn.execute( - self.table.insert(), - [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], - ) - result = conn.execution_options(max_row_buffer=27).execute( - self.table.select() - ) - for idx, row in enumerate(result, 0): - if idx in (16, 70, 150, 250): - eq_(result._bufsize, 27) - le_(len(result._BufferedRowResultProxy__rowbuffer), 27) + yield conn + + @testing.combinations( + ("no option", None, {0: 5, 1: 25, 9: 125, 135: 625, 274: 1000}), + ("lt 1000", 27, {0: 5, 16: 27, 70: 27, 150: 27, 250: 27}), + ( + "gt 1000", + 1500, + {0: 5, 1: 25, 9: 125, 135: 625, 274: 1500, 1351: 1500}, + ), + ( + "gt 1500", + 2000, + {0: 5, 1: 25, 9: 125, 135: 625, 274: 2000, 1351: 2000}, + ), + id_="iaa", + argnames="max_row_buffer,checks", + ) + def test_buffered_row_growth( + self, row_growth_fixture, max_row_buffer, checks + ): + if max_row_buffer: + result = row_growth_fixture.execution_options( + max_row_buffer=max_row_buffer + ).execute(self.table.select()) + else: + result = row_growth_fixture.execute(self.table.select()) + + assertion = {} + max_size = max(checks.values()) + for idx, row in enumerate(result, 0): + if idx in checks: + assertion[idx] = result._bufsize + le_(len(result._BufferedRowResultProxy__rowbuffer), max_size) + + eq_(checks, assertion) |
