summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-02-07 20:07:16 +0100
committerAsk Solem <ask@celeryproject.org>2011-02-07 20:07:16 +0100
commit227fdf29056bd36597864e773f4baeafb23972ac (patch)
tree948c4895a1d1e5d85962681a5d865ea11c6bd4d2
parent84a8c282307e94b5ebf04b5bd8337c3a3ddb0a25 (diff)
downloadkombu-227fdf29056bd36597864e773f4baeafb23972ac.tar.gz
ConnectionPool: Re-connect if amqplib connection closed
-rw-r--r--kombu/connection.py6
-rw-r--r--kombu/transport/base.py3
-rw-r--r--kombu/transport/pyamqplib.py3
3 files changed, 9 insertions, 3 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index 652149f3..b34822eb 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -391,7 +391,8 @@ class BrokerConnection(object):
"""
if self._closed:
return
- if not self._connection:
+ if not self._connection or not \
+ self.transport.verify_connection(connection):
self._connection = self._establish_connection()
self._closed = False
return self._connection
@@ -548,8 +549,7 @@ class ConnectionPool(Resource):
self._resource.put_nowait(conn)
def prepare(self, resource):
- if not resource._connection:
- resource.connect()
+ resource.connect()
return resource
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 849aef53..9d221949 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -163,3 +163,6 @@ class Transport(object):
def drain_events(self, connection, **kwargs):
raise NotImplementedError("Subclass responsibility")
+
+ def verify_connection(self, connection):
+ return True
diff --git a/kombu/transport/pyamqplib.py b/kombu/transport/pyamqplib.py
index 54a5264a..e8660375 100644
--- a/kombu/transport/pyamqplib.py
+++ b/kombu/transport/pyamqplib.py
@@ -223,3 +223,6 @@ class Transport(base.Transport):
def close_connection(self, connection):
"""Close the AMQP broker connection."""
connection.close()
+
+ def verify_connection(self, connection):
+ return connection.channels is not None