summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-07-12 18:29:22 +0000
committerTed Ross <tross@apache.org>2011-07-12 18:29:22 +0000
commit2885957b73306b183b5889dd8bbfd8b58c61c521 (patch)
treebd7a55221e8ef11f2d95676f1fe2d860cd8cff9c /cpp/src/qpid/broker
parentfbbc3211abb59a99c5e9eb07dfd0f252a6416590 (diff)
downloadqpid-python-2885957b73306b183b5889dd8bbfd8b58c61c521.tar.gz
QPID-3352 - Federation bridge doesn't recover from session errors
Applied patch from Jason Dillaman git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1145706 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--cpp/src/qpid/broker/Bridge.h2
-rw-r--r--cpp/src/qpid/broker/Link.cpp15
3 files changed, 22 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 7fbbf4e2c4..c709606c17 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -164,6 +164,12 @@ void Bridge::destroy()
listener(this);
}
+bool Bridge::isSessionReady() const
+{
+ SessionHandler& sessionHandler = conn->getChannel(id);
+ return sessionHandler.ready();
+}
+
void Bridge::setPersistenceId(uint64_t pId) const
{
persistenceId = pId;
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index a846254c57..8b4559a871 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -59,6 +59,8 @@ public:
void destroy();
bool isDurable() { return args.i_durable; }
+ bool isSessionReady() const;
+
management::ManagementObject* GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
management::Args& args,
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 9ab4379a69..8010bf43e7 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -248,6 +248,19 @@ void Link::ioThreadProcessing()
return;
QPID_LOG(debug, "Link::ioThreadProcessing()");
+ // check for bridge session errors and recover
+ if (!active.empty()) {
+ Bridges::iterator removed = std::remove_if(
+ active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+ for (Bridges::iterator i = removed; i != active.end(); ++i) {
+ Bridge::shared_ptr bridge = *i;
+ bridge->closed();
+ bridge->cancel(*connection);
+ created.push_back(bridge);
+ }
+ active.erase(removed, active.end());
+ }
+
//process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -296,7 +309,7 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}