summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorAntoine Busque <antoinebusque@gmail.com>2020-04-29 17:41:36 -0400
committerAsif Saif Uddin <auvipy@gmail.com>2020-04-30 10:34:02 +0600
commit8f1de37a08318d388a048341ddca12af30019bf3 (patch)
tree7b67a6a8607487c936158b7d55aa960619c755b7 /kombu
parent93242848b8eda6532f2928e7058fff5fe3023115 (diff)
downloadkombu-8f1de37a08318d388a048341ddca12af30019bf3.tar.gz
Fix: make SQLAlchemy Channel init thread-safe
The init method for the SQLAlchemy transport's Channel implementation is not currently thread-safe. This means that if two threads in a process attempt to instantiate a Channel concurrently, there can be a race condition when registering the SQLAlchemy model classes, which leaves the SQLAlchemy ORM mapper in a broken state. This results in an exception like the following: ``` sqlalchemy.exc.InvalidRequestError: Table 'kombu_message' is already defined for this MetaData instance. Specify 'extend_existing=True' to redefine options and columns on an existing Table object. ``` Any subsequent calls to the SQLAlchemy ORM will then fail with an exception like this: ``` sqlalchemy.exc.InvalidRequestError: Multiple classes found for path "Message" in the registry of this declarative base. Please use a fully module-qualified path. ``` This also applies to any SQLAlchemy calls in the same process, not just those made by Kombu or Celery, and can only really be recovered from by killing the process and starting over. To avoid all of this, introduce a mutex which ensures that only one thread at a time can register a SQLAlchemy model when instantiating the Channel class. Signed-off-by: Antoine Busque <antoinebusque@gmail.com>
Diffstat (limited to 'kombu')
-rw-r--r--kombu/transport/sqlalchemy/__init__.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index bcf9ed5b..fe469fae 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -4,6 +4,7 @@
from __future__ import absolute_import, unicode_literals
+import threading
from json import loads, dumps
from sqlalchemy import create_engine
@@ -21,6 +22,8 @@ from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
VERSION = (1, 1, 0)
__version__ = '.'.join(map(str, VERSION))
+_MUTEX = threading.Lock()
+
class Channel(virtual.Channel):
"""The channel class."""
@@ -127,9 +130,10 @@ class Channel(virtual.Channel):
return self._query_all(queue).count()
def _declarative_cls(self, name, base, ns):
- if name in class_registry:
- return class_registry[name]
- return type(str(name), (base, ModelBase), ns)
+ with _MUTEX:
+ if name in class_registry:
+ return class_registry[name]
+ return type(str(name), (base, ModelBase), ns)
@cached_property
def queue_cls(self):