summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-11-15 18:25:01 +0000
committerAsk Solem <ask@celeryproject.org>2012-11-21 14:12:54 +0000
commit5308e26835ec0d66b30203030ff34b53af2dcd87 (patch)
treebef64427d3d24cda0be80fff65b31fb830242267
parentfff733c2616ac58ac9a205ae4340c8428eff05af (diff)
downloadkombu-5308e26835ec0d66b30203030ff34b53af2dcd87.tar.gz
Fixes deadlock in ProducerPool. Closes celery/celery#1009
-rw-r--r--kombu/connection.py5
-rw-r--r--kombu/pools.py15
2 files changed, 16 insertions, 4 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index d17a6f6a..e1842d16 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -709,7 +709,10 @@ class Resource(object):
except Empty:
self._add_when_empty()
else:
- R = self.prepare(R)
+ try:
+ R = self.prepare(R)
+ except BaseException:
+ self.release_resource(R)
self._dirty.add(R)
break
else:
diff --git a/kombu/pools.py b/kombu/pools.py
index f77ed444..c6dcaee7 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -39,7 +39,12 @@ class ProducerPool(Resource):
return self.connections.acquire(block=True)
def create_producer(self):
- return self.Producer(self._acquire_connection())
+ conn = self._acquire_connection()
+ try:
+ return self.Producer(conn)
+ except BaseException:
+ conn.release()
+ raise
def new(self):
return lambda: self.create_producer()
@@ -56,8 +61,12 @@ class ProducerPool(Resource):
if callable(p):
p = p()
if not p.channel:
- connection = self._acquire_connection()
- p.revive(connection.default_channel)
+ conn = self._acquire_connection()
+ try:
+ p.revive(conn.default_channel)
+ except BaseException:
+ conn.release()
+ raise
return p
def release(self, resource):