From d43d1912b376322e27fdcda551a73f9ff5487972 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 3 Aug 2012 12:13:32 +0000 Subject: QPID-3858: Updated branch - merged from trunk r.1368650 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Connection.cpp | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) (limited to 'cpp/src/qpid/cluster/Connection.cpp') diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 512e0f03cb..ff855eef18 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -58,6 +58,8 @@ namespace qpid { namespace cluster { +using std::string; + using namespace framing; using namespace framing::cluster; using amqp_0_10::ListCodec; @@ -83,7 +85,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external) : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), + connectionCtor(&output, cluster.getBroker(), mgmtId, external, + false/*isLink*/, 0/*objectId*/, true/*shadow*/, false/*delayManagement*/, + false/*authenticated*/), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -100,9 +104,10 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, external, isLink, isCatchUp ? ++catchUpId : 0, - // The first catch-up connection is not considered a shadow - // as it needs to be authenticated. - isCatchUp && self.second > 1), + // The first catch-up connection is not a shadow + isCatchUp && self.second > 1, + false, // delayManagement + true), // catch up connecytions are authenticated expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -272,6 +277,8 @@ void Connection::closed() { if (announced) cluster.getMulticast().mcastControl( ClusterConnectionDeliverCloseBody(), self); + else + close(); } } catch (const std::exception& e) { @@ -404,11 +411,12 @@ void Connection::shadowSetUser(const std::string& userId) { } void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit) + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); c->setPosition(position); c->setBlocked(blocked); + c->setDeliveryCount(deliveryCount); if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); @@ -522,6 +530,7 @@ broker::QueuedMessage Connection::getUpdateMessage() { boost::shared_ptr updateq = findQueue(UpdateClient::UPDATE); assert(!updateq->isDurable()); broker::QueuedMessage m = updateq->get(); + updateq->dequeue(0, m); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -782,16 +791,18 @@ void Connection::managementSetupState( void Connection::config(const std::string& encoded) { Buffer buf(const_cast(encoded.data()), encoded.size()); string kind; + uint32_t p = buf.getPosition(); buf.getShortString (kind); - if (kind == "link") { + buf.setPosition(p); + if (broker::Link::isEncodedLink(kind)) { broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); + broker::Link::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated link " << link->getHost() << ":" << link->getPort()); } - else if (kind == "bridge") { + else if (broker::Bridge::isEncodedBridge(kind)) { broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); } else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); -- cgit v1.2.1