diff options
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 31 |
1 files changed, 12 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index ed339b2f85..2b079a22bc 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -23,6 +23,7 @@ #include "ClusterMap.h" #include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" @@ -41,14 +42,6 @@ #include <boost/bind.hpp> namespace qpid { - -namespace client { -struct ConnectionAccess { - static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } - static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; } -}; -} // namespace client - namespace cluster { using broker::Broker; @@ -169,10 +162,15 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); shadowConnection = catchUpConnection(); + broker::Connection& bc = dumpConnection->getBrokerConnection(); // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, // authentication etc. See ConnectionSettings. shadowConnection.open(dumpeeUrl, bc.getUserId()); + + // Stop the failover listener as its session will conflict with re-creating-sessions + client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener(); + dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( dumpConnection->getId().getMember(), @@ -184,26 +182,21 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn void DumpClient::dumpSession(broker::SessionHandler& sh) { QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); - broker::SessionState* s = sh.getSession(); - if (!s) return; // no session. + broker::SessionState* ss = sh.getSession(); + if (!ss) return; // no session. - // Re-create the session. + // Create a client session to dump session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); - size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; - boost::shared_ptr<client::SessionImpl> simpl( - new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); - cimpl->addSession(simpl); - simpl->open(sh.getSession()->getTimeout()); + boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); client::SessionBase_0_10Access(shadowSession).set(simpl); AMQP_AllProxy::ClusterConnection proxy(simpl->out); // Re-create session state on remote connection. - broker::SessionState* ss = sh.getSession(); // For reasons unknown, boost::bind does not work here with boost 1.33. ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); - // FIXME aconway 2008-09-19: remaining session state. + // FIXME aconway 2008-09-19: update remaining session state. // Reset command-sequence state. proxy.sessionState( @@ -216,7 +209,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { ss->receiverGetIncomplete() ); - // FIXME aconway 2008-09-23: session replay list. + // FIXME aconway 2008-09-23: update session replay list. QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } |