summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2022-03-31 14:05:09 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-03-31 14:05:09 +0000
commit575cb635206135b5f68ba59639521171508a1b71 (patch)
tree9200af50d0737e9dffc6053844640498dd0a1970
parent3ec009eefd126c8623ddd04e94f0297724b2b476 (diff)
parent169ab546a99f91d696ff85c81d9e5403236dc748 (diff)
downloadsqlalchemy-575cb635206135b5f68ba59639521171508a1b71.tar.gz
Merge "add close=False parameter to engine.dispose()" into main
-rw-r--r--doc/build/changelog/unreleased_14/7877.rst13
-rw-r--r--doc/build/core/connections.rst11
-rw-r--r--doc/build/core/pooling.rst77
-rw-r--r--lib/sqlalchemy/engine/base.py47
-rw-r--r--test/engine/test_execute.py25
5 files changed, 119 insertions, 54 deletions
diff --git a/doc/build/changelog/unreleased_14/7877.rst b/doc/build/changelog/unreleased_14/7877.rst
new file mode 100644
index 000000000..d6ad6facd
--- /dev/null
+++ b/doc/build/changelog/unreleased_14/7877.rst
@@ -0,0 +1,13 @@
+.. change::
+ :tags: usecase, engine
+ :tickets: 7877, 7815
+
+ Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True.
+ When False, the engine disposal does not touch the connections in the old
+ pool at all, simply dropping the pool and replacing it. This use case is so
+ that when the original pool is transferred from a parent process, the
+ parent process may continue to use those connections.
+
+ .. seealso::
+
+ :ref:`pooling_multiprocessing` - revised documentation
diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst
index 2f19367d4..11d35237a 100644
--- a/doc/build/core/connections.rst
+++ b/doc/build/core/connections.rst
@@ -1720,7 +1720,10 @@ Valid use cases for calling :meth:`_engine.Engine.dispose` include:
:class:`_engine.Engine` object is copied to the child process,
:meth:`_engine.Engine.dispose` should be called so that the engine creates
brand new database connections local to that fork. Database connections
- generally do **not** travel across process boundaries.
+ generally do **not** travel across process boundaries. Use the
+ :paramref:`.Engine.dispose.close` parameter set to False in this case.
+ See the section :ref:`pooling_multiprocessing` for more background on this
+ use case.
* Within test suites or multitenancy scenarios where many
ad-hoc, short-lived :class:`_engine.Engine` objects may be created and disposed.
@@ -1745,6 +1748,12 @@ use of new connections, and means that when a connection is checked in,
it is entirely closed out and is not held in memory. See :ref:`pool_switching`
for guidelines on how to disable pooling.
+.. seealso::
+
+ :ref:`pooling_toplevel`
+
+ :ref:`pooling_multiprocessing`
+
.. _dbapi_connections:
Working with Driver SQL and Raw DBAPI Connections
diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst
index f685b240d..58a3a4350 100644
--- a/doc/build/core/pooling.rst
+++ b/doc/build/core/pooling.rst
@@ -461,45 +461,50 @@ are three general approaches to this:
engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool)
-2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`
- **directly before** the new process is started, so that the new process
- will create new connections, as well as not attempt to close connections that
- were shared from the parent which can impact the parent's subsequent
- use of those connections. **This is the recommended approach**::
-
- engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
- def run_in_process():
- with engine.connect() as conn:
- conn.execute(text("..."))
-
- # before process starts, ensure engine.dispose() is called
- engine.dispose()
- p = Process(target=run_in_process)
- p.start()
-
-3. Alternatively, if the :class:`_engine.Engine` is only to be used in
- child processes, and will not be used from the parent process subsequent
- to the creation of child forks, the dispose may be within the child process
- right as it begins::
-
- engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
- def run_in_process():
- # process starts. ensure engine.dispose() is called just once
- # at the beginning. note this cause parent process connections
- # to be closed for most drivers
- engine.dispose()
+2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`,
+ passing the :paramref:`.Engine.dispose.close` parameter with a value of
+ ``False``, within the initialize phase of the child process. This is
+ so that the new process will not touch any of the parent process' connections
+ and will instead start with new connections.
+ **This is the recommended approach**::
+
+ from multiprocessing import Pool
+
+ engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
+
+ def run_in_process(some_data_record):
+ with engine.connect() as conn:
+ conn.execute(text("..."))
+
+ def initializer():
+ """ensure the parent proc's database connections are not touched
+ in the new connection pool"""
+ engine.dispose(close=False)
- with engine.connect() as conn:
- conn.execute(text("..."))
+ with Pool(10, initializer=initializer) as p:
+ p.map(run_in_process, data)
- p = Process(target=run_in_process)
- p.start()
- # after child process starts, "engine" above should not be used within
- # the parent process for connectivity, without calling
- # engine.dispose() first
+ .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
+ parameter to allow the replacement of a connection pool in a child
+ process without interfering with the connections used by the parent
+ process.
+
+3. Call :meth:`.Engine.dispose` **directly before** the child process is
+ created. This will also cause the child process to start with a new
+ connection pool, while ensuring the parent connections are not transferred
+ to the child process::
+
+ engine = create_engine("mysql://user:pass@host/dbname")
+
+ def run_in_process():
+ with engine.connect() as conn:
+ conn.execute(text("..."))
+
+ # before process starts, ensure engine.dispose() is called
+ engine.dispose()
+ p = Process(target=run_in_process)
+ p.start()
4. An event handler can be applied to the connection pool that tests for
connections being shared across process boundaries, and invalidates them::
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index db6d1ef3f..ad31585fb 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2688,32 +2688,45 @@ class Engine(
def __repr__(self) -> str:
return "Engine(%r)" % (self.url,)
- def dispose(self) -> None:
+ def dispose(self, close: bool = True) -> None:
"""Dispose of the connection pool used by this
:class:`_engine.Engine`.
- This has the effect of fully closing all **currently checked in**
- database connections. Connections that are still checked out
- will **not** be closed, however they will no longer be associated
- with this :class:`_engine.Engine`,
- so when they are closed individually,
- eventually the :class:`_pool.Pool` which they are associated with will
- be garbage collected and they will be closed out fully, if
- not already closed on checkin.
-
- A new connection pool is created immediately after the old one has
- been disposed. This new pool, like all SQLAlchemy connection pools,
- does not make any actual connections to the database until one is
- first requested, so as long as the :class:`_engine.Engine`
- isn't used again,
- no new connections will be made.
+ A new connection pool is created immediately after the old one has been
+ disposed. The previous connection pool is disposed either actively, by
+ closing out all currently checked-in connections in that pool, or
+ passively, by losing references to it but otherwise not closing any
+ connections. The latter strategy is more appropriate for an initializer
+ in a forked Python process.
+
+ :param close: if left at its default of ``True``, has the
+ effect of fully closing all **currently checked in**
+ database connections. Connections that are still checked out
+ will **not** be closed, however they will no longer be associated
+ with this :class:`_engine.Engine`,
+ so when they are closed individually, eventually the
+ :class:`_pool.Pool` which they are associated with will
+ be garbage collected and they will be closed out fully, if
+ not already closed on checkin.
+
+ If set to ``False``, the previous connection pool is de-referenced,
+ and otherwise not touched in any way.
+
+ .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
+ parameter to allow the replacement of a connection pool in a child
+ process without interfering with the connections used by the parent
+ process.
+
.. seealso::
:ref:`engine_disposal`
+ :ref:`pooling_multiprocessing`
+
"""
- self.pool.dispose()
+ if close:
+ self.pool.dispose()
self.pool = self.pool.recreate()
self.dispatch.engine_disposed(self)
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 2c8a07220..b561db99e 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -3,6 +3,7 @@
import collections.abc as collections_abc
from contextlib import contextmanager
from contextlib import nullcontext
+import copy
from io import StringIO
import re
import threading
@@ -2277,6 +2278,30 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.mock_calls, [call(eng), call(eng)])
+ @testing.requires.ad_hoc_engines
+ @testing.combinations(True, False, argnames="close")
+ def test_close_parameter(self, testing_engine, close):
+ eng = testing_engine(
+ options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
+ )
+
+ conn = eng.connect()
+ dbapi_conn_one = conn.connection.dbapi_connection
+ conn.close()
+
+ eng_copy = copy.copy(eng)
+ eng_copy.dispose(close=close)
+ copy_conn = eng_copy.connect()
+ dbapi_conn_two = copy_conn.connection.dbapi_connection
+
+ is_not(dbapi_conn_one, dbapi_conn_two)
+
+ conn = eng.connect()
+ if close:
+ is_not(dbapi_conn_one, conn.connection.dbapi_connection)
+ else:
+ is_(dbapi_conn_one, conn.connection.dbapi_connection)
+
def test_retval_flag(self):
canary = []