diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2022-03-31 14:05:09 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-03-31 14:05:09 +0000 |
| commit | 575cb635206135b5f68ba59639521171508a1b71 (patch) | |
| tree | 9200af50d0737e9dffc6053844640498dd0a1970 | |
| parent | 3ec009eefd126c8623ddd04e94f0297724b2b476 (diff) | |
| parent | 169ab546a99f91d696ff85c81d9e5403236dc748 (diff) | |
| download | sqlalchemy-575cb635206135b5f68ba59639521171508a1b71.tar.gz | |
Merge "add close=False parameter to engine.dispose()" into main
| -rw-r--r-- | doc/build/changelog/unreleased_14/7877.rst | 13 | ||||
| -rw-r--r-- | doc/build/core/connections.rst | 11 | ||||
| -rw-r--r-- | doc/build/core/pooling.rst | 77 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 47 | ||||
| -rw-r--r-- | test/engine/test_execute.py | 25 |
5 files changed, 119 insertions, 54 deletions
diff --git a/doc/build/changelog/unreleased_14/7877.rst b/doc/build/changelog/unreleased_14/7877.rst new file mode 100644 index 000000000..d6ad6facd --- /dev/null +++ b/doc/build/changelog/unreleased_14/7877.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: usecase, engine + :tickets: 7877, 7815 + + Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True. + When False, the engine disposal does not touch the connections in the old + pool at all, simply dropping the pool and replacing it. This use case is so + that when the original pool is transferred from a parent process, the + parent process may continue to use those connections. + + .. seealso:: + + :ref:`pooling_multiprocessing` - revised documentation diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst index 2f19367d4..11d35237a 100644 --- a/doc/build/core/connections.rst +++ b/doc/build/core/connections.rst @@ -1720,7 +1720,10 @@ Valid use cases for calling :meth:`_engine.Engine.dispose` include: :class:`_engine.Engine` object is copied to the child process, :meth:`_engine.Engine.dispose` should be called so that the engine creates brand new database connections local to that fork. Database connections - generally do **not** travel across process boundaries. + generally do **not** travel across process boundaries. Use the + :paramref:`.Engine.dispose.close` parameter set to False in this case. + See the section :ref:`pooling_multiprocessing` for more background on this + use case. * Within test suites or multitenancy scenarios where many ad-hoc, short-lived :class:`_engine.Engine` objects may be created and disposed. @@ -1745,6 +1748,12 @@ use of new connections, and means that when a connection is checked in, it is entirely closed out and is not held in memory. See :ref:`pool_switching` for guidelines on how to disable pooling. +.. seealso:: + + :ref:`pooling_toplevel` + + :ref:`pooling_multiprocessing` + .. _dbapi_connections: Working with Driver SQL and Raw DBAPI Connections diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index f685b240d..58a3a4350 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -461,45 +461,50 @@ are three general approaches to this: engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool) -2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine` - **directly before** the new process is started, so that the new process - will create new connections, as well as not attempt to close connections that - were shared from the parent which can impact the parent's subsequent - use of those connections. **This is the recommended approach**:: - - engine = create_engine("mysql+mysqldb://user:pass@host/dbname") - - def run_in_process(): - with engine.connect() as conn: - conn.execute(text("...")) - - # before process starts, ensure engine.dispose() is called - engine.dispose() - p = Process(target=run_in_process) - p.start() - -3. Alternatively, if the :class:`_engine.Engine` is only to be used in - child processes, and will not be used from the parent process subsequent - to the creation of child forks, the dispose may be within the child process - right as it begins:: - - engine = create_engine("mysql+mysqldb://user:pass@host/dbname") - - def run_in_process(): - # process starts. ensure engine.dispose() is called just once - # at the beginning. note this cause parent process connections - # to be closed for most drivers - engine.dispose() +2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`, + passing the :paramref:`.Engine.dispose.close` parameter with a value of + ``False``, within the initialize phase of the child process. This is + so that the new process will not touch any of the parent process' connections + and will instead start with new connections. + **This is the recommended approach**:: + + from multiprocessing import Pool + + engine = create_engine("mysql+mysqldb://user:pass@host/dbname") + + def run_in_process(some_data_record): + with engine.connect() as conn: + conn.execute(text("...")) + + def initializer(): + """ensure the parent proc's database connections are not touched + in the new connection pool""" + engine.dispose(close=False) - with engine.connect() as conn: - conn.execute(text("...")) + with Pool(10, initializer=initializer) as p: + p.map(run_in_process, data) - p = Process(target=run_in_process) - p.start() - # after child process starts, "engine" above should not be used within - # the parent process for connectivity, without calling - # engine.dispose() first + .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` + parameter to allow the replacement of a connection pool in a child + process without interfering with the connections used by the parent + process. + +3. Call :meth:`.Engine.dispose` **directly before** the child process is + created. This will also cause the child process to start with a new + connection pool, while ensuring the parent connections are not transferred + to the child process:: + + engine = create_engine("mysql://user:pass@host/dbname") + + def run_in_process(): + with engine.connect() as conn: + conn.execute(text("...")) + + # before process starts, ensure engine.dispose() is called + engine.dispose() + p = Process(target=run_in_process) + p.start() 4. An event handler can be applied to the connection pool that tests for connections being shared across process boundaries, and invalidates them:: diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index db6d1ef3f..ad31585fb 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -2688,32 +2688,45 @@ class Engine( def __repr__(self) -> str: return "Engine(%r)" % (self.url,) - def dispose(self) -> None: + def dispose(self, close: bool = True) -> None: """Dispose of the connection pool used by this :class:`_engine.Engine`. - This has the effect of fully closing all **currently checked in** - database connections. Connections that are still checked out - will **not** be closed, however they will no longer be associated - with this :class:`_engine.Engine`, - so when they are closed individually, - eventually the :class:`_pool.Pool` which they are associated with will - be garbage collected and they will be closed out fully, if - not already closed on checkin. - - A new connection pool is created immediately after the old one has - been disposed. This new pool, like all SQLAlchemy connection pools, - does not make any actual connections to the database until one is - first requested, so as long as the :class:`_engine.Engine` - isn't used again, - no new connections will be made. + A new connection pool is created immediately after the old one has been + disposed. The previous connection pool is disposed either actively, by + closing out all currently checked-in connections in that pool, or + passively, by losing references to it but otherwise not closing any + connections. The latter strategy is more appropriate for an initializer + in a forked Python process. + + :param close: if left at its default of ``True``, has the + effect of fully closing all **currently checked in** + database connections. Connections that are still checked out + will **not** be closed, however they will no longer be associated + with this :class:`_engine.Engine`, + so when they are closed individually, eventually the + :class:`_pool.Pool` which they are associated with will + be garbage collected and they will be closed out fully, if + not already closed on checkin. + + If set to ``False``, the previous connection pool is de-referenced, + and otherwise not touched in any way. + + .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` + parameter to allow the replacement of a connection pool in a child + process without interfering with the connections used by the parent + process. + .. seealso:: :ref:`engine_disposal` + :ref:`pooling_multiprocessing` + """ - self.pool.dispose() + if close: + self.pool.dispose() self.pool = self.pool.recreate() self.dispatch.engine_disposed(self) diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 2c8a07220..b561db99e 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -3,6 +3,7 @@ import collections.abc as collections_abc from contextlib import contextmanager from contextlib import nullcontext +import copy from io import StringIO import re import threading @@ -2277,6 +2278,30 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary.mock_calls, [call(eng), call(eng)]) + @testing.requires.ad_hoc_engines + @testing.combinations(True, False, argnames="close") + def test_close_parameter(self, testing_engine, close): + eng = testing_engine( + options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool) + ) + + conn = eng.connect() + dbapi_conn_one = conn.connection.dbapi_connection + conn.close() + + eng_copy = copy.copy(eng) + eng_copy.dispose(close=close) + copy_conn = eng_copy.connect() + dbapi_conn_two = copy_conn.connection.dbapi_connection + + is_not(dbapi_conn_one, dbapi_conn_two) + + conn = eng.connect() + if close: + is_not(dbapi_conn_one, conn.connection.dbapi_connection) + else: + is_(dbapi_conn_one, conn.connection.dbapi_connection) + def test_retval_flag(self): canary = [] |
