diff options
author | Alan Conway <aconway@apache.org> | 2008-09-24 17:34:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-24 17:34:08 +0000 |
commit | a2a56cf9a7483e165fb579d0b519b284d02009e3 (patch) | |
tree | 11264fc87ea6e54c54b476e245ad4ee9c83faaeb | |
parent | 30be110b6914959a1eaee4803ff8c1c9938db7bb (diff) | |
download | qpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz |
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
22 files changed, 316 insertions, 154 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 52c0ca229c..85f86c85c9 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -237,7 +237,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) : replayFlushLimit(flush), replayHardLimit(hard) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : id(i), timeout(), config(c), stateful() + : id(i), timeout(), config(c), stateful() { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } @@ -250,4 +250,29 @@ std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { return o << "(" << p.command.getValue() << "+" << p.offset << ")"; } +void SessionState::setState( + const SequenceNumber& replayStart, + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete +) +{ + sender.replayPoint = replayStart; + sender.flushPoint = sendCommandPoint; + sender.sendPoint = sendCommandPoint; + sender.unflushedSize = 0; + sender.replaySize = 0; // Replay list will be updated separately. + sender.incomplete = sentIncomplete; + sender.bytesSinceKnownCompleted = 0; + + receiver.expected = expected; + receiver.received = received; + receiver.unknownCompleted = unknownCompleted; + receiver.incomplete = receivedIncomplete; + receiver.bytesSinceKnownCompleted = 0; +} + } // namespace qpid diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 10937b7a1e..bf4ff6d326 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -130,8 +130,8 @@ class SessionState { virtual SessionPoint senderGetReplayPoint() const; /** Peer expecting commands from this point. - virtual *@return Range of frames to be replayed. - */ + *@return Range of frames to be replayed. + */ virtual ReplayRange senderExpected(const SessionPoint& expected); // ==== Functions for receiver state @@ -168,6 +168,19 @@ class SessionState { /** ID of the command currently being handled. */ virtual SequenceNumber receiverGetCurrent() const; + /** Set the state variables, used to create a session that will resume + * from some previously established point. + */ + virtual void setState( + const SequenceNumber& replayStart, + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete + ); + private: struct SendState { diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 0c56885f8f..2170fe4e2e 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -110,6 +110,12 @@ class SemanticState : public sys::OutputTask, bool doOutput(); std::string getName() const { return name; } + + bool isAckExpected() const { return ackExpected; } + bool isAcquire() const { return acquire; } + bool isWindowing() const { return windowing; } + uint32_t getMsgCredit() const { return msgCredit; } + uint32_t getByteCredit() const { return byteCredit; } }; private: diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 003e206bff..64b0b80446 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -436,7 +436,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, uint8_t acceptMode, uint8_t acquireMode, bool exclusive, - const string& /*resumeId*/,//TODO implement resume behaviour + const string& /*resumeId*/,//TODO implement resume behaviour. Need to update cluster. uint64_t /*resumeTtl*/, const FieldTable& arguments) { diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 22ebec76bf..0321c2e6aa 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -50,6 +50,9 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, framing::Proto ESTABLISHED.insert(FAILED); ESTABLISHED.insert(CLOSED); ESTABLISHED.insert(OPEN); + + FINISHED.insert(FAILED); + FINISHED.insert(CLOSED); } void ConnectionHandler::incoming(AMQFrame& frame) @@ -107,7 +110,7 @@ void ConnectionHandler::close() case OPEN: setState(CLOSING); proxy.close(200, OK); - waitFor(CLOSED); + waitFor(FINISHED); break; // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED } diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index f8bd5e5d49..ffb612fae8 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -44,7 +44,7 @@ class ConnectionHandler : private StateManager, { typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations; enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; - std::set<int> ESTABLISHED; + std::set<int> ESTABLISHED, FINISHED; class Adapter : public framing::FrameHandler { diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 79b76f68be..93625af948 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), handler(&joiningHandler), joiningHandler(*this), - memberHandler(*this) + memberHandler(*this), + mcastId() { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ @@ -109,22 +110,22 @@ void Cluster::leave() { } void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { - QPID_LOG(trace, "MCAST [" << self << "]: " << body); AMQFrame f(body); - Event e(CONTROL, ConnectionId(self, cptr), f.size()); + Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId); Buffer buf(e); f.encode(buf); + QPID_LOG(trace, "MCAST " << e << " " << body); mcastEvent(e); } -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { - Event e(DATA, connection, size); +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, size_t id) { + Event e(DATA, connection, size, id); memcpy(e.getData(), data, size); + QPID_LOG(trace, "MCAST " << e); mcastEvent(e); } void Cluster::mcastEvent(const Event& e) { - QPID_LOG(trace, "MCAST " << e); e.mcast(name, cpg); } @@ -166,12 +167,13 @@ void Cluster::deliver( try { MemberId from(nodeid, pid); Event e = Event::delivered(from, msg, msg_len); + // Process cluster controls immediately if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control Buffer buf(e); AMQFrame frame; while (frame.decode(buf)) { - QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody()); + QPID_LOG(trace, "DLVR " << e << " " << frame); if (!handler->invoke(e.getConnectionId().getMember(), frame)) throw Exception(QPID_MSG("Invalid cluster control")); } @@ -189,17 +191,17 @@ void Cluster::deliver( void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - QPID_LOG(trace, "EXEC: " << e); boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); assert(connection); if (e.getType() == DATA) { + QPID_LOG(trace, "EXEC: " << e); connection->deliverBuffer(buf); } else { // control AMQFrame frame; while (frame.decode(buf)) { - QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame); - connection->received(frame); + QPID_LOG(trace, "EXEC " << e << " " << frame); + connection->delivered(frame); } } } @@ -351,7 +353,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args* void Cluster::stopClusterNode(void) { - // FIXME aconway 2008-09-18: + // FIXME aconway 2008-09-18: mgmt QPID_LOG(notice, self << " disconnected from cluster " << name.str()); broker.shutdown(); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index b37a1e343b..55358e25db 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -77,7 +77,7 @@ class Cluster : private Cpg::Handler, public management::Manageable /** Send to the cluster */ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr); - void mcastBuffer(const char*, size_t, const ConnectionId&); + void mcastBuffer(const char*, size_t, const ConnectionId&, size_t id); void mcastEvent(const Event& e); /** Leave the cluster */ @@ -89,6 +89,7 @@ class Cluster : private Cpg::Handler, public management::Manageable void ready(const MemberId&, const std::string& url); MemberId getSelf() const { return self; } + MemberId getId() const { return self; } void ready(); void stall(); @@ -169,6 +170,8 @@ class Cluster : private Cpg::Handler, public management::Manageable JoiningHandler joiningHandler; MemberHandler memberHandler; + size_t mcastId; + friend class JoiningHandler; friend class MemberHandler; }; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index a1ed5f34f5..4aa66cce1f 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -20,10 +20,15 @@ */ #include "Connection.h" #include "Cluster.h" + +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/ConnectionCloseBody.h" +#include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" -#include "qpid/framing/AllInvoker.h" + #include <boost/current_function.hpp> namespace qpid { @@ -31,6 +36,8 @@ namespace cluster { using namespace framing; +NoOpConnectionOutputHandler Connection::discardHandler; + // Shadow connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) @@ -49,7 +56,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, QPID_LOG(debug, "New connection: " << *this); } -Connection::~Connection() {} +Connection::~Connection() { + QPID_LOG(debug, "Deleted connection: " << *this); +} bool Connection::doOutput() { return output.doOutput(); @@ -63,28 +72,55 @@ void Connection::deliverDoOutput(uint32_t requested) { output.deliverDoOutput(requested); } +// Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { - QPID_LOG(trace, "EXEC [" << *this << "]: " << f); + QPID_LOG(trace, "RECV " << *this << ": " << f); + if (isShadow()) { + // Final close that completes catch-up for shadow connection. + if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + AMQFrame ok(in_place<ConnectionCloseOkBody>()); + connection.getOutput().send(ok); + } + else + QPID_LOG(warning, *this << " ignoring unexpected frame: " << f); + } + else { + currentChannel = f.getChannel(); + if (!framing::invoke(*this, *f.getBody()).wasHandled()) + connection.received(f); + } +} + +// Delivered from cluster. +void Connection::delivered(framing::AMQFrame& f) { + QPID_LOG(trace, "DLVR " << *this << ": " << f); + assert(!isCatchUp()); // Handle connection controls, deliver other frames to connection. + currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } void Connection::closed() { try { + QPID_LOG(debug, "Connection closed " << *this); + + if (catchUp) { + catchUp = false; + cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); + if (!isShadow()) connection.closed(); + } + // Local network connection has closed. We need to keep the // connection around but replace the output handler with a // no-op handler as the network output handler will be // deleted. output.setOutputHandler(discardHandler); - if (catchUp) { - catchUp = false; - cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); - } - else { - // This was a local replicated connection. Multicast a deliver closed - // and process any outstanding frames from the cluster until - // self-delivery of deliver-closed. + + if (isLocal()) { + // This was a local replicated connection. Multicast a deliver + // closed and process any outstanding frames from the cluster + // until self-delivery of deliver-close. cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); ++mcastSeq; } @@ -100,29 +136,48 @@ void Connection::deliverClose () { cluster.erase(self); } -size_t Connection::decode(const char* buffer, size_t size) { - assert(!catchUp); - ++mcastSeq; - cluster.mcastBuffer(buffer, size, self); +// Decode data from local clients. +size_t Connection::decode(const char* buffer, size_t size) { + if (catchUp) { // Handle catch-up locally. + Buffer buf(const_cast<char*>(buffer), size); + while (localDecoder.decode(buf)) + received(localDecoder.frame); + } + else { // Multicast local connections. + assert(isLocal()); + cluster.mcastBuffer(buffer, size, self, ++mcastSeq); + } return size; } void Connection::deliverBuffer(Buffer& buf) { assert(!catchUp); ++deliverSeq; - while (decoder.decode(buf)) - received(decoder.frame); + while (mcastDecoder.decode(buf)) + delivered(mcastDecoder.frame); } -void Connection::sessionState(const SequenceNumber& /*replayStart*/, - const SequenceSet& /*sentIncomplete*/, - const SequenceNumber& /*expected*/, - const SequenceNumber& /*received*/, - const SequenceSet& /*unknownCompleted*/, - const SequenceSet& /*receivedIncomplete*/) +void Connection::sessionState( + const SequenceNumber& replayStart, + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete) { - // FIXME aconway 2008-09-10: TODO + broker::SessionHandler& h = connection.getChannel(currentChannel); + broker::SessionState* s = h.getSession(); + s->setState( + replayStart, + sendCommandPoint, + sentIncomplete, + expected, + received, + unknownCompleted, + receivedIncomplete); + QPID_LOG(debug, "Received session state dump for " << s->getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 150c53807e..df3c035c8a 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -47,7 +47,6 @@ class Cluster; class Connection : public RefCounted, public sys::ConnectionInputHandler, - public sys::ConnectionOutputHandler, public framing::AMQP_AllOperations::ClusterConnectionHandler { @@ -60,20 +59,18 @@ class Connection : ConnectionId getId() const { return self; } broker::Connection& getBrokerConnection() { return connection; } + + /** True for connections from direct clients of local broker */ bool isLocal() const; + + /** True for connections that are shadowing remote broker connections */ bool isShadow() const { return !isLocal(); } - /** True if the connection is in "catch-up" mode: building initial state */ + /** True if the connection is in "catch-up" mode: building initial broker state. */ bool isCatchUp() const { return catchUp; } Cluster& getCluster() { return cluster; } - // ConnectionOutputHandler methods - void close() {} - void send(framing::AMQFrame&) {} - void activateOutput() {} - virtual size_t getBuffered() const { assert(0); return 0; } - // ConnectionInputHandler methods void received(framing::AMQFrame&); void closed(); @@ -85,18 +82,19 @@ class Connection : // ConnectionCodec methods size_t decode(const char* buffer, size_t size); - // Called by cluster to deliver a buffer from CPG. + // Called for data delivered from the cluster. void deliverBuffer(framing::Buffer&); - + void delivered(framing::AMQFrame&); // ==== Used in catch-up mode to build initial state. // // State dump methods. void sessionState(const SequenceNumber& replayStart, - const SequenceSet& sentIncomplete, - const SequenceNumber& expected, - const SequenceNumber& received, - const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); void shadowReady(uint64_t memberId, uint64_t connectionId); @@ -108,17 +106,20 @@ class Connection : void deliverDoOutput(uint32_t requested); void sendDoOutput(); + static NoOpConnectionOutputHandler discardHandler; + Cluster& cluster; ConnectionId self; bool catchUp; - NoOpConnectionOutputHandler discardHandler; WriteEstimate writeEstimate; OutputInterceptor output; - framing::FrameDecoder decoder; + framing::FrameDecoder localDecoder; + framing::FrameDecoder mcastDecoder; broker::Connection connection; framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; - + framing::ChannelId currentChannel; + friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index accf83ebc7..1458a87923 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -23,18 +23,23 @@ #include "Cluster.h" #include "ProxyInputHandler.h" #include "qpid/broker/Connection.h" +#include "qpid/framing/ConnectionCloseBody.h" +#include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include <stdexcept> +#include <boost/utility/in_place_factory.hpp> namespace qpid { namespace cluster { +using namespace framing; + sys::ConnectionCodec* -ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == framing::ProtocolVersion(0, 10)) +ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { + if (v == ProtocolVersion(0, 10)) return new ConnectionCodec(out, id, cluster, false); - else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10)) + else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) return new ConnectionCodec(out, id, cluster, true); // Catch-up connection return 0; } @@ -47,7 +52,8 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp) : codec(out, id, false), - interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)) + interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)), + id(interceptor->getId()) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); @@ -56,28 +62,18 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, ConnectionCodec::~ConnectionCodec() {} -// ConnectionCodec functions delegate to the codecOutput size_t ConnectionCodec::decode(const char* buffer, size_t size) { - if (interceptor->isShadow()) - throw Exception(QPID_MSG("Unexpected decode for shadow connection " << *interceptor)); - else if (interceptor->isCatchUp()) { - size_t ret = codec.decode(buffer, size); - if (interceptor->isShadow()) { - // Promoted to shadow, close the codec. - // FIXME aconway 2008-09-19: can we close cleanly? - // codec.close(); - throw Exception("Close codec"); - } - return ret; - } - else - return interceptor->decode(buffer, size); + return interceptor->decode(buffer, size); } +bool ConnectionCodec::isClosed() const { return codec.isClosed(); } + size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); } + bool ConnectionCodec::canEncode() { return codec.canEncode(); } + void ConnectionCodec::closed() { codec.closed(); } -bool ConnectionCodec::isClosed() const { return codec.isClosed(); } -framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); } + +ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index a82569decd..e6ab7f5ba1 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -71,6 +71,7 @@ class ConnectionCodec : public sys::ConnectionCodec { private: amqp_0_10::Connection codec; boost::intrusive_ptr<cluster::Connection> interceptor; + cluster::ConnectionId id; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 45ccec7166..ee87afb468 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -72,6 +72,8 @@ void send(client::Session& s, const AMQBody& body) { sb.get()->send(body); } +// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. + DumpClient::DumpClient(const Url& url, Cluster& c, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) @@ -79,9 +81,8 @@ DumpClient::DumpClient(const Url& url, Cluster& c, connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { - QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url); connection.open(url); - session = connection.newSession(); + session = connection.newSession("dump_shared"); } DumpClient::~DumpClient() {} @@ -91,6 +92,7 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { + QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver); Broker& b = donor.getBroker(); b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. @@ -99,10 +101,9 @@ void DumpClient::dump() { session.sync(); session.close(); donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); - QPID_LOG(debug, "Dump sent, closing catch_up connection."); // FIXME aconway 2008-09-18: inidicate successful end-of-dump. connection.close(); - QPID_LOG(debug, "Dump sent."); + QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver); } void DumpClient::run() { @@ -153,49 +154,79 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi } void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { - QPID_LOG(debug, "Dump connection " << *dumpConnection); - shadowConnection = catchUpConnection(); - // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc. - shadowConnection.open(receiver); + broker::Connection& bc = dumpConnection->getBrokerConnection(); + // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, + // authentication etc. See ConnectionSettings. + shadowConnection.open(receiver, bc.getUserId()); dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection); - // FIXME aconway 2008-09-19: use proxy for cluster commands? - AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(), - dumpConnection->getId().getMember(), - reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()))); - impl->handle(ready); - // Will be closed from the other end. - QPID_LOG(debug, "Dump done, connection " << *dumpConnection); + AMQP_AllProxy::ClusterConnection proxy(*impl); + proxy.shadowReady(dumpConnection->getId().getMember(), + reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())); + shadowConnection.close(); + QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection); } void DumpClient::dumpSession(broker::SessionHandler& sh) { - QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] " + QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); - broker::SessionState* s = sh.getSession(); if (!s) return; // no session. + // Re-create the session. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; - // FIXME aconway 2008-09-19: verify matching ID. boost::shared_ptr<client::SessionImpl> simpl( new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); cimpl->addSession(simpl); - simpl->open(0); - client::Session cs; - client::SessionBase_0_10Access(cs).set(simpl); - cs.sync(); + simpl->open(sh.getSession()->getTimeout()); + client::SessionBase_0_10Access(shadowSession).set(simpl); + AMQP_AllProxy::ClusterConnection proxy(simpl->out); + // Re-create session state on remote connection. broker::SessionState* ss = sh.getSession(); + ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1)); // FIXME aconway 2008-09-19: remaining session state. - QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId()); + + // Reset command-sequence state. + proxy.sessionState( + ss->senderGetReplayPoint().command, + ss->senderGetCommandPoint().command, + ss->senderGetIncomplete(), + ss->receiverGetExpected().command, + ss->receiverGetReceived().command, + ss->receiverGetUnknownComplete(), + ss->receiverGetIncomplete() + ); + + // FIXME aconway 2008-09-23: session replay list. + + QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId()); } void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName()); + using namespace message; + shadowSession.messageSubscribe( + arg::queue = ci->getQueue()->getName(), + arg::destination = ci->getName(), + arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, + arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, + arg::exclusive = false , // FIXME aconway 2008-09-23: how to read. + + // TODO aconway 2008-09-23: remaining args not used by current broker. + // Update this code when they are. + arg::resumeId=std::string(), + arg::resumeTtl=0, + arg::arguments=FieldTable() + ); + shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled? + QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 89c6268d7f..43335e3607 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -31,17 +31,18 @@ namespace cluster { using framing::Buffer; -const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t); +const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(size_t); -Event::Event(EventType t, const ConnectionId c, const size_t s) - : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {} +Event::Event(EventType t, const ConnectionId& c, size_t s, size_t i) + : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), id(i) {} Event Event::delivered(const MemberId& m, void* d, size_t s) { Buffer buf(static_cast<char*>(d), s); EventType type((EventType)buf.getOctet()); ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong())); + size_t id = buf.getLong(); assert(buf.getPosition() == OVERHEAD); - Event e(type, connection, s-OVERHEAD); + Event e(type, connection, s-OVERHEAD, id); memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD); return e; } @@ -51,6 +52,7 @@ void Event::mcast (const Cpg::Name& name, Cpg& cpg) const { Buffer b(header, OVERHEAD); b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr())); + b.putLong(id); iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); } @@ -60,13 +62,12 @@ Event::operator Buffer() const { } static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; + std::ostream& operator << (std::ostream& o, const Event& e) { - o << "[event: " << e.getConnectionId() + o << "[event " << e.getConnectionId() << "/" << e.getId() << " " << EVENT_TYPE_NAMES[e.getType()] - << " " << e.getSize() << " bytes: "; - std::ostream_iterator<char> oi(o,""); - std::copy(e.getData(), e.getData()+std::min(e.getSize(), size_t(16)), oi); - return o << "...]"; + << " " << e.getSize() << " bytes]"; + return o; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index b8c2bc3901..12a7a9388a 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -43,7 +43,7 @@ namespace cluster { class Event { public: /** Create an event to mcast with a buffer of size bytes. */ - Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0); + Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0, size_t id=0); /** Create an event copied from delivered data. */ static Event delivered(const MemberId& m, void* data, size_t size); @@ -55,6 +55,7 @@ class Event { size_t getSize() const { return size; } char* getData() { return data; } const char* getData() const { return data; } + size_t getId() const { return id; } operator framing::Buffer() const; @@ -64,6 +65,7 @@ class Event { ConnectionId connectionId; size_t size; RefCountedBuffer::pointer data; + size_t id; }; std::ostream& operator << (std::ostream&, const Event&); diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index 8f08cb615f..75f6651b0a 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -46,9 +46,10 @@ void JoiningHandler::configChange( void JoiningHandler::deliver(Event& e) { // Discard connection events unless we are stalled to receive a dump. - if (state == STALLED) { + if (state == STALLED) cluster.connectionEventQueue.push(e); - } + else + QPID_LOG(trace, "Discarded pre-join event " << e); } void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) { @@ -80,12 +81,13 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { assert(0); break; case DUMP_REQUESTED: - QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper); + QPID_LOG(debug, cluster.self << " stalling for dump from " << cluster.map.dumper); state = STALLED; cluster.stall(); break; case DUMP_COMPLETE: + QPID_LOG(debug, cluster.self << " at start point and dump complete, ready."); cluster.ready(); break; } @@ -107,8 +109,8 @@ void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { } void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << catchUpConnections-1); - if (c->isShadow()) + QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1); + if (c->isShadow()) cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); if (--catchUpConnections == 0) dumpComplete(); @@ -118,10 +120,11 @@ void JoiningHandler::dumpComplete() { // FIXME aconway 2008-09-18: need to detect incomplete dump. // if (state == STALLED) { + QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling."); cluster.ready(); } else { - QPID_LOG(debug, "Dump complete, waiting for stall point."); + QPID_LOG(debug, cluster.self << " received dump, waiting for start point."); assert(state == DUMP_REQUESTED); state = DUMP_COMPLETE; } diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp index ec9e7790c5..0f600a4995 100644 --- a/cpp/src/qpid/cluster/MemberHandler.cpp +++ b/cpp/src/qpid/cluster/MemberHandler.cpp @@ -34,7 +34,8 @@ using namespace framing; MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {} MemberHandler::~MemberHandler() { - if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. + if (dumpThread.id()) + dumpThread.join(); // Join the last dumpthread. } void MemberHandler::configChange( @@ -62,7 +63,8 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled. cluster.stall(); - if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. + if (dumpThread.id()) + dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread(new DumpClient(Url(urlStr), cluster, boost::bind(&MemberHandler::dumpSent, this), boost::bind(&MemberHandler::dumpError, this, _1))); diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 4424864787..e69992517c 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -45,10 +45,12 @@ void OutputInterceptor::send(framing::AMQFrame& f) { void OutputInterceptor::activateOutput() { Locker l(lock); + if (parent.isCatchUp()) next->activateOutput(); else { moreOutput = true; + QPID_LOG(trace, &parent << " activateOutput - sending doOutput"); sendDoOutput(); } } @@ -79,15 +81,19 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); - if (parent.isLocal() && moreOutput) + if (parent.isLocal() && moreOutput) { + QPID_LOG(trace, &parent << " deliverDoOutput - sending doOutput, more output available."); sendDoOutput(); + } else doingOutput = false; } void OutputInterceptor::startDoOutput() { - if (!doingOutput) + if (!doingOutput) { + QPID_LOG(trace, &parent << " startDoOutput - sending doOutput, more output available."); sendDoOutput(); + } } // Send a doOutput request if one is not already in flight. @@ -111,4 +117,14 @@ void OutputInterceptor::setOutputHandler(sys::ConnectionOutputHandler& h) { next = &h; } +void OutputInterceptor::close() { + Locker l(lock); + next->close(); +} + +size_t OutputInterceptor::getBuffered() const { + Locker l(lock); + return next->getBuffered(); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h index ad9d9952bf..17ab5bbc55 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -43,8 +43,8 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { // sys::ConnectionOutputHandler functions void send(framing::AMQFrame& f); void activateOutput(); - void close() { Locker l(lock); next->close(); } - size_t getBuffered() const { Locker l(lock); return next->getBuffered(); } + void close(); + size_t getBuffered() const; // Delivery point for doOutput requests. void deliverDoOutput(size_t requested); diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index c2da8edc2c..48a5a5ce85 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -36,8 +36,8 @@ namespace sys { class ConnectionOutputHandlerPtr : public ConnectionOutputHandler { public: - ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {} - void set(ConnectionOutputHandler* p) { next = p; } + ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) { assert(next); } + void set(ConnectionOutputHandler* p) { next = p; assert(next); } ConnectionOutputHandler* get() { return next; } const ConnectionOutputHandler* get() const { return next; } diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 6bb5e4a8ca..9573caf61d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -98,7 +98,7 @@ struct ClusterFixture : public vector<uint16_t> { ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); - if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. + if (!init0) return; // Defer initialization of broker0 // Wait for all n members to join the cluster waitFor(n); BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); @@ -164,36 +164,35 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -#if 0 // FIXME aconway 2008-09-22: enable. QPID_AUTO_TEST_CASE(DumpConsumers) { - ClusterFixture cluster(1); - Client c0(cluster[0]); + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); - c0.subs.subscribe(c0.lq, "q"); - c0.session.messageTransfer(arg::content=Message("before", "q")); - Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "before"); + c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); + c0.session.sync(); - // Start new member + // Start new members cluster.add(); - Client c1(cluster[1]); + Client c1(cluster[1], "c1"); + cluster.add(); + Client c2(cluster[2], "c2"); - // Transfer some messages to the subscription by client c0. + // Transfer a message, verify all members see it. c0.session.messageTransfer(arg::content=Message("aaa", "q")); - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u); - c1.session.messageTransfer(arg::content=Message("bbb", "q")); + // Activate the subscription, ensure message removed on all queues. + c0.subs.setFlowControl("q", FlowControl::messageCredit(1)); + Message m; BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); - // Verify that the queue has been drained on both brokers. - // This proves that the consumer was replicated when the second broker joined. BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); } -#endif QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); @@ -217,7 +216,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { cluster.waitFor(2); c0.session.messageTransfer(arg::content=Message("pbar","p")); - // Verify new brokers have all state. + // Verify new brokers have state. Message m; Client c1(cluster[1], "c1"); @@ -228,11 +227,14 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + // Add & verify another broker. + cluster.add(); + Client c2(cluster[2], "c2"); + BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -333,7 +335,6 @@ QPID_AUTO_TEST_CASE(testStall) { c1.session.messageTransfer(arg::content=Message("foo","q")); while (c1.session.queueQuery("q").getMessageCount() != 1) ::usleep(1000); // Wait for message to show up on broker 1. - sleep(2); // FIXME aconway 2008-09-11: remove. // But it should not be on broker 0. boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); BOOST_REQUIRE(q0); diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 2acf0aea82..040a3eeae0 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -67,10 +67,11 @@ o<?xml version="1.0"?> <!-- Target session deduced from channel number. --> <field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.--> + <field name="command-point" type="sequence-no"/> <!-- Id of next command sent --> <field name="sent-incomplete" type="sequence-set"/> <!-- Commands sent and incomplete. --> - <field name="expected" type="sequence-no"/> <!-- Idempotence barrier --> - <field name="received" type="sequence-no"/> <!-- Received up to here > expected--> + <field name="expected" type="sequence-no"/> <!-- Next command expected. --> + <field name="received" type="sequence-no"/> <!-- Received up to here (>= expected) --> <field name="unknown-completed" type="sequence-set"/> <!-- Completed but not known to peer. --> <field name="received-incomplete" type="sequence-set"/> <!-- Received and incomplete --> </control> |