diff options
| author | Alan Conway <aconway@apache.org> | 2010-06-22 13:29:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-06-22 13:29:52 +0000 |
| commit | a49decc7d56bdb704a5d1580058c0da57e9a9353 (patch) | |
| tree | af0acf1f9e7e5f48336407ae438e11528db75b38 /cpp/src/qpid/cluster/Connection.cpp | |
| parent | 265841a55cca55a7d3f8eea1d9e9c24a5fc2e350 (diff) | |
| download | qpid-python-a49decc7d56bdb704a5d1580058c0da57e9a9353.tar.gz | |
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state
outside cluster context" and "confirmed < (50+0) but only sent < (49+0)"
Fix was to:
- delay completion of incoming update till update connection closes.
- delay addding new connections to managment until connection is announced.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 45 |
1 files changed, 25 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 22e1db2036..42f800bd18 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -22,7 +22,6 @@ #include "UpdateClient.h" #include "Cluster.h" #include "UpdateReceiver.h" - #include "qpid/assert.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" @@ -43,7 +42,6 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" - #include <boost/current_function.hpp> @@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, { cluster.addLocalConnection(this); if (isLocalClient()) { - // Local clients are announced to the cluster - // and initialized when the announce is received. giveReadCredit(cluster.getSettings().readMax); // Flow control - init(); + // Delay adding the connection to the management map until announce() + connectionCtor.delayManagement = true; } else { // Catch-up shadow connections initialized using nextShadow id. @@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - init(); - } - QPID_LOG(info, "incoming connection " << *this); + } + init(); + QPID_LOG(debug, cluster << " local connection " << *this); } void Connection::setSecureConnection(broker::SecureConnection* sc) { @@ -152,8 +149,11 @@ void Connection::announce( QPID_ASSERT(ssf == connectionCtor.external.ssf); QPID_ASSERT(authid == connectionCtor.external.authid); QPID_ASSERT(nodict == connectionCtor.external.nodict); - // Local connections are already initialized. - if (isShadow()) { + // Local connections are already initialized but with management delayed. + if (isLocalClient()) { + connection->addManagementObject(); + } + else if (isShadow()) { init(); // Play initial frames into the connection. Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); @@ -162,8 +162,9 @@ void Connection::announce( connection->received(frame); connection->setUserId(username); } - // Raise the connection management event now that the connection is replicated. + // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); + QPID_LOG(debug, cluster << " replicated connection " << *this); } Connection::~Connection() { @@ -249,6 +250,7 @@ void Connection::closed() { if (isUpdated()) { QPID_LOG(debug, cluster << " update connection closed " << *this); close(); + cluster.updateInClosed(); } else if (catchUp) { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); @@ -259,7 +261,8 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self); } } catch (const std::exception& e) { @@ -268,17 +271,21 @@ void Connection::closed() { } // Self-delivery of close message, close the connection. -void Connection::deliverClose () { - assert(!catchUp); - close(); +void Connection::deliverClose (bool aborted) { + QPID_LOG(debug, cluster << " replicated close of " << *this); + if (connection.get()) { + if (aborted) connection->abort(); + else connection->closed(); + connection.reset(); + } cluster.erase(self); } // Close the connection void Connection::close() { + QPID_LOG(debug, cluster << " local close of " << *this); if (connection.get()) { connection->closed(); - // Ensure we delete the broker::Connection in the deliver thread. connection.reset(); } } @@ -286,11 +293,9 @@ void Connection::close() { // The connection has been killed for misbehaving, called in connection thread. void Connection::abort() { if (connection.get()) { - connection->abort(); - // Ensure we delete the broker::Connection in the deliver thread. - connection.reset(); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self); } - cluster.erase(self); } // ConnectionCodec::decode receives read buffers from directly-connected clients. |
