diff options
author | Ask Solem <ask@celeryproject.org> | 2012-11-15 18:25:01 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-11-21 14:12:54 +0000 |
commit | 5308e26835ec0d66b30203030ff34b53af2dcd87 (patch) | |
tree | bef64427d3d24cda0be80fff65b31fb830242267 | |
parent | fff733c2616ac58ac9a205ae4340c8428eff05af (diff) | |
download | kombu-5308e26835ec0d66b30203030ff34b53af2dcd87.tar.gz |
Fixes deadlock in ProducerPool. Closes celery/celery#1009
-rw-r--r-- | kombu/connection.py | 5 | ||||
-rw-r--r-- | kombu/pools.py | 15 |
2 files changed, 16 insertions, 4 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index d17a6f6a..e1842d16 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -709,7 +709,10 @@ class Resource(object): except Empty: self._add_when_empty() else: - R = self.prepare(R) + try: + R = self.prepare(R) + except BaseException: + self.release_resource(R) self._dirty.add(R) break else: diff --git a/kombu/pools.py b/kombu/pools.py index f77ed444..c6dcaee7 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -39,7 +39,12 @@ class ProducerPool(Resource): return self.connections.acquire(block=True) def create_producer(self): - return self.Producer(self._acquire_connection()) + conn = self._acquire_connection() + try: + return self.Producer(conn) + except BaseException: + conn.release() + raise def new(self): return lambda: self.create_producer() @@ -56,8 +61,12 @@ class ProducerPool(Resource): if callable(p): p = p() if not p.channel: - connection = self._acquire_connection() - p.revive(connection.default_channel) + conn = self._acquire_connection() + try: + p.revive(conn.default_channel) + except BaseException: + conn.release() + raise return p def release(self, resource): |