diff options
author | Ask Solem <ask@celeryproject.org> | 2011-02-07 20:07:16 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2011-02-07 20:07:16 +0100 |
commit | 227fdf29056bd36597864e773f4baeafb23972ac (patch) | |
tree | 948c4895a1d1e5d85962681a5d865ea11c6bd4d2 | |
parent | 84a8c282307e94b5ebf04b5bd8337c3a3ddb0a25 (diff) | |
download | kombu-227fdf29056bd36597864e773f4baeafb23972ac.tar.gz |
ConnectionPool: Re-connect if amqplib connection closed
-rw-r--r-- | kombu/connection.py | 6 | ||||
-rw-r--r-- | kombu/transport/base.py | 3 | ||||
-rw-r--r-- | kombu/transport/pyamqplib.py | 3 |
3 files changed, 9 insertions, 3 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index 652149f3..b34822eb 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -391,7 +391,8 @@ class BrokerConnection(object): """ if self._closed: return - if not self._connection: + if not self._connection or not \ + self.transport.verify_connection(connection): self._connection = self._establish_connection() self._closed = False return self._connection @@ -548,8 +549,7 @@ class ConnectionPool(Resource): self._resource.put_nowait(conn) def prepare(self, resource): - if not resource._connection: - resource.connect() + resource.connect() return resource diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 849aef53..9d221949 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -163,3 +163,6 @@ class Transport(object): def drain_events(self, connection, **kwargs): raise NotImplementedError("Subclass responsibility") + + def verify_connection(self, connection): + return True diff --git a/kombu/transport/pyamqplib.py b/kombu/transport/pyamqplib.py index 54a5264a..e8660375 100644 --- a/kombu/transport/pyamqplib.py +++ b/kombu/transport/pyamqplib.py @@ -223,3 +223,6 @@ class Transport(base.Transport): def close_connection(self, connection): """Close the AMQP broker connection.""" connection.close() + + def verify_connection(self, connection): + return connection.channels is not None |