diff options
author | Alan Conway <aconway@apache.org> | 2008-09-21 05:04:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-21 05:04:04 +0000 |
commit | 558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (patch) | |
tree | 9b306597ee07b264fa18580546ed5645f0c3766d | |
parent | 7c70d21ca2d788d4432cfa89851c9b928c9f30aa (diff) | |
download | qpid-python-558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05.tar.gz |
DumpClient send connections & session IDs to new members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697446 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/Exception.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 33 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 82 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/MemberHandler.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/MemberHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/types.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 46 |
21 files changed, 262 insertions, 117 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index 9e884efec0..05d1a26f57 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -28,7 +28,7 @@ namespace qpid { Exception::Exception(const std::string& msg) throw() : message(msg) { - QPID_LOG(debug, "Exception constructed: " << message); + QPID_LOG_IF(debug, !msg.empty(), "Exception constructed: " << message); } Exception::~Exception() throw() {} diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ac4ec81cb9..17f58adc78 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -165,6 +165,12 @@ void Connection::close( getOutput().close(); } +// Send a close to the client but keep the channels. Used by cluster. +void Connection::sendClose() { + adapter.close(200, "OK", 0, 0); + getOutput().close(); +} + void Connection::idleOut(){} void Connection::idleIn(){} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index b81f1eda21..cf0b4bc5c0 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -28,25 +28,29 @@ #include <boost/ptr_container/ptr_map.hpp> -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/sys/AggregateOutput.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/framing/ProtocolVersion.h" #include "Broker.h" -#include "qpid/sys/Socket.h" -#include "qpid/Exception.h" #include "ConnectionHandler.h" #include "ConnectionState.h" #include "SessionHandler.h" -#include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Connection.h" +#include "qpid/Exception.h" #include "qpid/RefCounted.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/management/Manageable.h" +#include "qpid/ptr_map.h" +#include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/TimeoutHandler.h" #include <boost/ptr_container/ptr_map.hpp> +#include <boost/bind.hpp> + +#include <algorithm> namespace qpid { namespace broker { @@ -93,6 +97,13 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); + template <class F> void eachSessionHandler(const F& f) { + for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) + f(*ptr_map_ptr(i)); + } + + void sendClose(); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 7b1cacb640..4575c45a35 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -74,7 +74,7 @@ SessionImpl::~SessionImpl() { { Lock l(state); if (state != DETACHED) { - QPID_LOG(error, "Session was not closed cleanly"); + QPID_LOG(warning, "Session was not closed cleanly"); setState(DETACHED); handleClosed(); state.waitWaiters(); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9549527416..79b76f68be 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -92,6 +92,11 @@ void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); } +void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(lock); + handler->catchUpClosed(c); +} + void Cluster::erase(ConnectionId id) { Mutex::ScopedLock l(lock); connections.erase(id); @@ -119,6 +124,7 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con } void Cluster::mcastEvent(const Event& e) { + QPID_LOG(trace, "MCAST " << e); e.mcast(name, cpg); } @@ -170,8 +176,10 @@ void Cluster::deliver( throw Exception(QPID_MSG("Invalid cluster control")); } } - else + else { + QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e); handler->deliver(e); + } } catch (const std::exception& e) { QPID_LOG(critical, "Error in cluster deliver: " << e.what()); @@ -181,14 +189,17 @@ void Cluster::deliver( void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - assert(e.getConnection()); - if (e.getType() == DATA) - e.getConnection()->deliverBuffer(buf); + QPID_LOG(trace, "EXEC: " << e); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); + assert(connection); + if (e.getType() == DATA) { + connection->deliverBuffer(buf); + } else { // control AMQFrame frame; while (frame.decode(buf)) { - QPID_LOG(trace, "DLVR [" << self << "]: " << frame); - e.getConnection()->received(frame); + QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame); + connection->received(frame); } } } @@ -196,26 +207,28 @@ void Cluster::connectionEvent(const Event& e) { struct AddrList { const cpg_address* addrs; int count; - const char* prefix; - AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {} + const char *prefix, *suffix; + AddrList(const cpg_address* a, int n, const char* p="", const char* s="") + : addrs(a), count(n), prefix(p), suffix(s) {} }; ostream& operator<<(ostream& o, const AddrList& a) { - if (a.count && a.prefix) o << a.prefix; + if (!a.count) return o; + o << a.prefix; for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " joined"; break; - case CPG_REASON_LEAVE: reasonString = " left";break; - case CPG_REASON_NODEDOWN: reasonString = " node-down";break; - case CPG_REASON_NODEUP: reasonString = " node-up";break; - case CPG_REASON_PROCDOWN: reasonString = " process-down";break; + case CPG_REASON_JOIN: reasonString = " joined "; break; + case CPG_REASON_LEAVE: reasonString = " left "; break; + case CPG_REASON_NODEDOWN: reasonString = " node-down "; break; + case CPG_REASON_NODEUP: reasonString = " node-up "; break; + case CPG_REASON_PROCDOWN: reasonString = " process-down "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); - o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : ""); + o << member << reasonString; } - return o; + return o << a.suffix; } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -238,8 +251,8 @@ void Cluster::configChange( cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". " - << AddrList(left, nLeft, "Left: ")); + QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent) + << AddrList(left, nLeft, "( ", ")")); if (find(left, left+nLeft, self) != left+nLeft) { // I have left the group, this is the final config change. @@ -289,9 +302,14 @@ void Cluster::stall() { } void Cluster::ready() { - // Called with lock held - QPID_LOG(info, self << " ready at URL " << url); + QPID_LOG(debug, self << " ready at " << url); + unstall(); mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); +} + +void Cluster::unstall() { + // Called with lock held + QPID_LOG(debug, self << " un-stalling"); handler = &memberHandler; // Member mode. connectionEventQueue.start(poller); // if (mgmtObject!=0) diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index c63e2e3e58..b37a1e343b 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -64,6 +64,8 @@ class Cluster : private Cpg::Handler, public management::Manageable void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection void erase(ConnectionId); // Erase a connection. + + void catchUpClosed(const boost::intrusive_ptr<Connection>&); // Insert a local connection /** Get the URLs of current cluster members. */ std::vector<Url> getUrls() const; @@ -88,8 +90,9 @@ class Cluster : private Cpg::Handler, public management::Manageable MemberId getSelf() const { return self; } - void stall(); void ready(); + void stall(); + void unstall(); void shutdown(); diff --git a/cpp/src/qpid/cluster/ClusterHandler.h b/cpp/src/qpid/cluster/ClusterHandler.h index 95106de016..ee25b8522b 100644 --- a/cpp/src/qpid/cluster/ClusterHandler.h +++ b/cpp/src/qpid/cluster/ClusterHandler.h @@ -59,6 +59,7 @@ class ClusterHandler cpg_address *joined, int nJoined) = 0; virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0; + virtual void catchUpClosed(const boost::intrusive_ptr<Connection>& c) = 0; protected: Cluster& cluster; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index e14e35998f..20d6fbb21e 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -21,6 +21,7 @@ #include "ClusterMap.h" #include "qpid/Url.h" #include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> #include <functional> @@ -86,6 +87,7 @@ void ClusterMap::ready(const MemberId& id, const Url& url) { members[id] = url; if (id == dumper) dumper = MemberId(); + QPID_LOG(info, id << " joined cluster: " << *this); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index b225ba3568..a1ed5f34f5 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -31,18 +31,23 @@ namespace cluster { using namespace framing; +// Shadow connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp() -{} + : cluster(c), self(myId), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{ + QPID_LOG(debug, "New connection: " << *this); +} +// Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp) - : cluster(c), self(myId, this), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), - catchUp(isCatchUp), exCatchUp() -{} + : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{ + QPID_LOG(debug, "New connection: " << *this); +} Connection::~Connection() {} @@ -59,6 +64,7 @@ void Connection::deliverDoOutput(uint32_t requested) { } void Connection::received(framing::AMQFrame& f) { + QPID_LOG(trace, "EXEC [" << *this << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); @@ -70,16 +76,10 @@ void Connection::closed() { // connection around but replace the output handler with a // no-op handler as the network output handler will be // deleted. - - // FIXME aconway 2008-09-18: output handler reset in right place? - // connection.setOutputHandler(&discardHandler); output.setOutputHandler(discardHandler); if (catchUp) { - // This was a catch-up connection, may be promoted to a - // shadow connection. catchUp = false; - exCatchUp = true; - cluster.insert(boost::intrusive_ptr<Connection>(this)); + cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); } else { // This was a local replicated connection. Multicast a deliver closed @@ -125,8 +125,11 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/, // FIXME aconway 2008-09-10: TODO } -void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) { - // FIXME aconway 2008-09-10: TODO +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { + ConnectionId shadow = ConnectionId(memberId, connectionId); + QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); + self = shadow; + assert(isShadow()); } void Connection::dumpComplete() { @@ -134,6 +137,11 @@ void Connection::dumpComplete() { } bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } - + +std::ostream& operator<<(std::ostream& o, const Connection& c) { + return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") + << (c.isCatchUp() ? ",catchup" : "") << ")"; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index c664427ea1..150c53807e 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -34,6 +34,8 @@ #include "qpid/framing/FrameDecoder.h" #include "qpid/framing/SequenceNumber.h" +#include <iosfwd> + namespace qpid { namespace framing { class AMQFrame; } @@ -59,11 +61,10 @@ class Connection : ConnectionId getId() const { return self; } broker::Connection& getBrokerConnection() { return connection; } bool isLocal() const; + bool isShadow() const { return !isLocal(); } /** True if the connection is in "catch-up" mode: building initial state */ bool isCatchUp() const { return catchUp; } - bool isExCatchUp() const { return exCatchUp; } - Cluster& getCluster() { return cluster; } @@ -109,6 +110,7 @@ class Connection : Cluster& cluster; ConnectionId self; + bool catchUp; NoOpConnectionOutputHandler discardHandler; WriteEstimate writeEstimate; OutputInterceptor output; @@ -116,8 +118,8 @@ class Connection : broker::Connection connection; framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; - bool catchUp; - bool exCatchUp; + + friend std::ostream& operator<<(std::ostream&, const Connection&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index d95a321adf..accf83ebc7 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" +#include <stdexcept> namespace qpid { namespace cluster { @@ -57,8 +58,18 @@ ConnectionCodec::~ConnectionCodec() {} // ConnectionCodec functions delegate to the codecOutput size_t ConnectionCodec::decode(const char* buffer, size_t size) { - if (interceptor->isCatchUp()) - return codec.decode(buffer, size); + if (interceptor->isShadow()) + throw Exception(QPID_MSG("Unexpected decode for shadow connection " << *interceptor)); + else if (interceptor->isCatchUp()) { + size_t ret = codec.decode(buffer, size); + if (interceptor->isShadow()) { + // Promoted to shadow, close the codec. + // FIXME aconway 2008-09-19: can we close cleanly? + // codec.close(); + throw Exception("Close codec"); + } + return ret; + } else return interceptor->decode(buffer, size); } diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 43c30d3b07..c78859cc39 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -19,6 +19,8 @@ * */ #include "DumpClient.h" +#include "Cluster.h" +#include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -26,8 +28,11 @@ #include "qpid/broker/Message.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionDumpCompleteBody.h" +#include "qpid/framing/ClusterConnectionShadowReadyBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" @@ -39,6 +44,7 @@ namespace qpid { namespace client { struct ConnectionAccess { static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } + static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; } }; } // namespace client @@ -50,17 +56,30 @@ using broker::Queue; using broker::QueueBinding; using broker::Message; using namespace framing; -using namespace framing::message; -using namespace client; +namespace arg=client::arg; +using client::SessionBase_0_10Access; + +// Create a connection with special version that marks it as a catch-up connection. +client::Connection catchUpConnection() { + client::Connection c; + client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10)); + return c; +} +// Send a control body directly to the session. +void send(client::Session& s, const AMQBody& body) { + client::SessionBase_0_10Access sb(s); + sb.get()->send(body); +} -DumpClient::DumpClient(const Url& url, Broker& b, +DumpClient::DumpClient(const Url& url, Cluster& c, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) - : donor(b), done(ok), failed(fail) + : receiver(url), donor(c), + connection(catchUpConnection()), shadowConnection(catchUpConnection()), + done(ok), failed(fail) { - // Special version identifies this as a catch-up connectionn. - client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 0x80 + 10)); + QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url); connection.open(url); session = connection.newSession(); } @@ -72,15 +91,18 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { - donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); + Broker& b = donor.getBroker(); + b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); - donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); - SessionBase_0_10Access sb(session); - // FIXME aconway 2008-09-18: inidicate successful end-of-dump. + b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); session.sync(); session.close(); + donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); + QPID_LOG(debug, "Dump sent, closing catch_up connection."); + // FIXME aconway 2008-09-18: inidicate successful end-of-dump. connection.close(); + QPID_LOG(debug, "Dump sent."); } void DumpClient::run() { @@ -121,7 +143,8 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { void DumpClient::dumpMessage(const broker::QueuedMessage& message) { SessionBase_0_10Access sb(session); - framing::MessageTransferBody transfer(framing::ProtocolVersion(), CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED); + framing::MessageTransferBody transfer( + framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames()); } @@ -129,5 +152,42 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi session.exchangeBind(queue, binding.exchange, binding.key, binding.args); } +void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { + QPID_LOG(debug, "Dump connection " << *dumpConnection); + + shadowConnection = catchUpConnection(); + // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc. + shadowConnection.open(receiver); + dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); + boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection); + // FIXME aconway 2008-09-19: use proxy for cluster commands? + AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(), + dumpConnection->getId().getMember(), + reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()))); + impl->handle(ready); + // Will be closed from the other end. + QPID_LOG(debug, "Dump done, connection " << *dumpConnection); +} + +void DumpClient::dumpSession(broker::SessionHandler& sh) { + QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] " + << sh.getSession()->getId()); + + broker::SessionState* s = sh.getSession(); + if (!s) return; // no session. + // Re-create the session. + boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); + size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; + // FIXME aconway 2008-09-19: verify matching ID. + boost::shared_ptr<client::SessionImpl> simpl( + new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); + cimpl->addSession(simpl); + simpl->open(0); + client::Session cs; + client::SessionBase_0_10Access(cs).set(simpl); + cs.sync(); + // FIXME aconway 2008-09-19: remaining session state. + QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId()); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index 83c9ac4076..6cd382667a 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -24,11 +24,6 @@ #include "qpid/client/Connection.h" #include "qpid/client/AsyncSession.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/ExchangeRegistry.h" #include "qpid/sys/Runnable.h" #include <boost/shared_ptr.hpp> @@ -45,16 +40,21 @@ class Exchange; class QueueBindings; class QueueBinding; class QueuedMessage; +class SessionHandler; + } // namespace broker namespace cluster { +class Cluster; +class Connection; + /** * A client that dumps the contents of a local broker to a remote one using AMQP. */ class DumpClient : public sys::Runnable { public: - DumpClient(const Url& url, broker::Broker& donor, + DumpClient(const Url& receiver, Cluster& donor, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail); @@ -67,11 +67,14 @@ class DumpClient : public sys::Runnable { void dumpExchange(const boost::shared_ptr<broker::Exchange>&); void dumpMessage(const broker::QueuedMessage&); void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); + void dumpConnection(const boost::intrusive_ptr<Connection>& connection); + void dumpSession(broker::SessionHandler& s); private: - client::Connection connection; - client::AsyncSession session; - broker::Broker& donor; + Url receiver; + Cluster& donor; + client::Connection connection, shadowConnection; + client::AsyncSession session, shadowSession; boost::function<void()> done; boost::function<void(const std::exception& e)> failed; }; diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index a0f9bc0e49..b8c2bc3901 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -56,16 +56,12 @@ class Event { char* getData() { return data; } const char* getData() const { return data; } - boost::intrusive_ptr<Connection> getConnection() const { return connection; } - void setConnection(const boost::intrusive_ptr<Connection>& c) { connection=c; } - operator framing::Buffer() const; private: static const size_t OVERHEAD; EventType type; ConnectionId connectionId; - boost::intrusive_ptr<Connection> connection; size_t size; RefCountedBuffer::pointer data; }; diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index c188fe438e..8f08cb615f 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -40,14 +40,13 @@ void JoiningHandler::configChange( if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster. QPID_LOG(notice, cluster.self << " first in cluster."); cluster.map.ready(cluster.self, cluster.url); - cluster.ready(); + cluster.unstall(); } } void JoiningHandler::deliver(Event& e) { - // Discard connection events unless we are stalled and getting a dump. + // Discard connection events unless we are stalled to receive a dump. if (state == STALLED) { - e.setConnection(cluster.getConnection(e.getConnectionId())); cluster.connectionEventQueue.push(e); } } @@ -73,6 +72,7 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { } else { // Start a new dump cluster.map.dumper = cluster.map.first(); + QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee); if (dumpee == cluster.self) { // My turn switch (state) { case START: @@ -101,24 +101,23 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) { void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { if (c->isCatchUp()) { ++catchUpConnections; - QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections."); - } - else if (c->isExCatchUp()) { - if (c->getId().getConnectionPtr() != c.get()) // become shadow connection - cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); - QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining"); - if (--catchUpConnections == 0) - dumpComplete(); + QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections); } - else // Local connection, will be stalled till dump complete. + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); +} + +void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << catchUpConnections-1); + if (c->isShadow()) cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + if (--catchUpConnections == 0) + dumpComplete(); } void JoiningHandler::dumpComplete() { // FIXME aconway 2008-09-18: need to detect incomplete dump. // if (state == STALLED) { - QPID_LOG(debug, "Dump complete, unstalling."); cluster.ready(); } else { diff --git a/cpp/src/qpid/cluster/JoiningHandler.h b/cpp/src/qpid/cluster/JoiningHandler.h index c2cdb2c504..097d765e5e 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.h +++ b/cpp/src/qpid/cluster/JoiningHandler.h @@ -47,6 +47,7 @@ class JoiningHandler : public ClusterHandler void ready(const MemberId&, const std::string& url); void insert(const boost::intrusive_ptr<Connection>& c); + void catchUpClosed(const boost::intrusive_ptr<Connection>& c); private: void checkDumpRequest(); diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp index 1997ced9b0..ec9e7790c5 100644 --- a/cpp/src/qpid/cluster/MemberHandler.cpp +++ b/cpp/src/qpid/cluster/MemberHandler.cpp @@ -47,7 +47,6 @@ void MemberHandler::configChange( } void MemberHandler::deliver(Event& e) { - e.setConnection(cluster.getConnection(e.getConnectionId())); cluster.connectionEventQueue.push(e); } @@ -64,7 +63,7 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt cluster.stall(); if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. - dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker, + dumpThread = Thread(new DumpClient(Url(urlStr), cluster, boost::bind(&MemberHandler::dumpSent, this), boost::bind(&MemberHandler::dumpError, this, _1))); } @@ -92,4 +91,9 @@ void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) { cluster.connections[c->getId()] = c; } +void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode"); + assert(0); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MemberHandler.h b/cpp/src/qpid/cluster/MemberHandler.h index 6657ea4f53..9a07507a17 100644 --- a/cpp/src/qpid/cluster/MemberHandler.h +++ b/cpp/src/qpid/cluster/MemberHandler.h @@ -53,6 +53,7 @@ class MemberHandler : public ClusterHandler void dumpError(const std::exception&); void insert(const boost::intrusive_ptr<Connection>& c); + void catchUpClosed(const boost::intrusive_ptr<Connection>& ); public: sys::Thread dumpThread; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 8718154d3e..4424864787 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -39,13 +39,18 @@ OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutp void OutputInterceptor::send(framing::AMQFrame& f) { Locker l(lock); next->send(f); - sent += f.size(); + if (!parent.isCatchUp()) + sent += f.size(); } void OutputInterceptor::activateOutput() { - Locker l(lock); - moreOutput = true; - sendDoOutput(); + Locker l(lock); + if (parent.isCatchUp()) + next->activateOutput(); + else { + moreOutput = true; + sendDoOutput(); + } } // Called in write thread when the IO layer has no more data to write. diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index 4fbe79e118..f16cc18f41 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -59,6 +59,8 @@ std::ostream& operator<<(std::ostream&, const MemberId&); struct ConnectionId : public std::pair<MemberId, Connection*> { ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {} + ConnectionId(uint64_t m, uint64_t c) + : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {} MemberId getMember() const { return first; } Connection* getConnectionPtr() const { return second; } }; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 1b44902054..60f85df02d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -81,21 +81,26 @@ struct ClusterFixture : public vector<uint16_t> { void add(); void add0(bool force); void setup(); + void kill(size_t n) { if (n) forkedBrokers[n-1].kill(); else broker0->broker->shutdown(); } + + void waitFor(size_t n) { + size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up. + while (retry && getGlobalCluster().size() != n) { + ::usleep(1000); + --retry; + } + } }; ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. // Wait for all n members to join the cluster - int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster().size() != n) { - ::sleep(1); - --retry; - } + waitFor(n); BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); } @@ -139,7 +144,7 @@ void ClusterFixture::add0(bool init) { qpid::log::Logger::instance().setPrefix("main"); broker0.reset(new BrokerFixture(parseOpts(argc, argv))); - push_back(broker0->getPort()); + if (size()) front() = broker0->getPort(); else push_back(broker0->getPort()); } // For debugging: op << for CPG types. @@ -190,14 +195,12 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) { BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0); } - #endif - QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); + // Create some shared state. c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("foo","q")); @@ -205,24 +208,33 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { while (c0.session.queueQuery("q").getMessageCount() != 2) ::usleep(1000); // Wait for message to show up on broker 0. - // FIXME aconway 2008-09-18: close session until we catchup session state also. - c0.session.close(); - c0.connection.close(); - - // Now join new broker, should catch up. + // Add a new broker, it should catch up. cluster.add(); - // FIXME aconway 2008-09-18: when we do session state try adding - // further stuff from broker 0, and leaving a subscription active. + // Do some work post-add + c0.session.queueDeclare("p"); + c0.session.messageTransfer(arg::content=Message("pfoo","p")); + // Do some work post-join + cluster.waitFor(2); + c0.session.messageTransfer(arg::content=Message("pbar","p")); + // Verify new broker has all state. Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "foo"); BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "bar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "pfoo"); + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "pbar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testWiringReplication) { |