summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-06-22 13:29:52 +0000
committerAlan Conway <aconway@apache.org>2010-06-22 13:29:52 +0000
commita49decc7d56bdb704a5d1580058c0da57e9a9353 (patch)
treeaf0acf1f9e7e5f48336407ae438e11528db75b38 /cpp/src/qpid/cluster/Connection.cpp
parent265841a55cca55a7d3f8eea1d9e9c24a5fc2e350 (diff)
downloadqpid-python-a49decc7d56bdb704a5d1580058c0da57e9a9353.tar.gz
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state outside cluster context" and "confirmed < (50+0) but only sent < (49+0)" Fix was to: - delay completion of incoming update till update connection closes. - delay addding new connections to managment until connection is announced. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp45
1 files changed, 25 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 22e1db2036..42f800bd18 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -22,7 +22,6 @@
#include "UpdateClient.h"
#include "Cluster.h"
#include "UpdateReceiver.h"
-
#include "qpid/assert.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
@@ -43,7 +42,6 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-
#include <boost/current_function.hpp>
@@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
- // Local clients are announced to the cluster
- // and initialized when the announce is received.
giveReadCredit(cluster.getSettings().readMax); // Flow control
- init();
+ // Delay adding the connection to the management map until announce()
+ connectionCtor.delayManagement = true;
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- init();
- }
- QPID_LOG(info, "incoming connection " << *this);
+ }
+ init();
+ QPID_LOG(debug, cluster << " local connection " << *this);
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -152,8 +149,11 @@ void Connection::announce(
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- // Local connections are already initialized.
- if (isShadow()) {
+ // Local connections are already initialized but with management delayed.
+ if (isLocalClient()) {
+ connection->addManagementObject();
+ }
+ else if (isShadow()) {
init();
// Play initial frames into the connection.
Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
@@ -162,8 +162,9 @@ void Connection::announce(
connection->received(frame);
connection->setUserId(username);
}
- // Raise the connection management event now that the connection is replicated.
+ // Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
+ QPID_LOG(debug, cluster << " replicated connection " << *this);
}
Connection::~Connection() {
@@ -249,6 +250,7 @@ void Connection::closed() {
if (isUpdated()) {
QPID_LOG(debug, cluster << " update connection closed " << *this);
close();
+ cluster.updateInClosed();
}
else if (catchUp) {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
@@ -259,7 +261,8 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.closeOutput();
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self);
}
}
catch (const std::exception& e) {
@@ -268,17 +271,21 @@ void Connection::closed() {
}
// Self-delivery of close message, close the connection.
-void Connection::deliverClose () {
- assert(!catchUp);
- close();
+void Connection::deliverClose (bool aborted) {
+ QPID_LOG(debug, cluster << " replicated close of " << *this);
+ if (connection.get()) {
+ if (aborted) connection->abort();
+ else connection->closed();
+ connection.reset();
+ }
cluster.erase(self);
}
// Close the connection
void Connection::close() {
+ QPID_LOG(debug, cluster << " local close of " << *this);
if (connection.get()) {
connection->closed();
- // Ensure we delete the broker::Connection in the deliver thread.
connection.reset();
}
}
@@ -286,11 +293,9 @@ void Connection::close() {
// The connection has been killed for misbehaving, called in connection thread.
void Connection::abort() {
if (connection.get()) {
- connection->abort();
- // Ensure we delete the broker::Connection in the deliver thread.
- connection.reset();
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self);
}
- cluster.erase(self);
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.