summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
committerAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
commit558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (patch)
tree9b306597ee07b264fa18580546ed5645f0c3766d
parent7c70d21ca2d788d4432cfa89851c9b928c9f30aa (diff)
downloadqpid-python-558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05.tar.gz
DumpClient send connections & session IDs to new members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697446 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/Exception.cpp2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp6
-rw-r--r--cpp/src/qpid/broker/Connection.h33
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp58
-rw-r--r--cpp/src/qpid/cluster/Cluster.h5
-rw-r--r--cpp/src/qpid/cluster/ClusterHandler.h1
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp42
-rw-r--r--cpp/src/qpid/cluster/Connection.h10
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp15
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp82
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h21
-rw-r--r--cpp/src/qpid/cluster/Event.h4
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp25
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.h1
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.cpp8
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.h1
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp13
-rw-r--r--cpp/src/qpid/cluster/types.h2
-rw-r--r--cpp/src/tests/cluster_test.cpp46
21 files changed, 262 insertions, 117 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index 9e884efec0..05d1a26f57 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -28,7 +28,7 @@
namespace qpid {
Exception::Exception(const std::string& msg) throw() : message(msg) {
- QPID_LOG(debug, "Exception constructed: " << message);
+ QPID_LOG_IF(debug, !msg.empty(), "Exception constructed: " << message);
}
Exception::~Exception() throw() {}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ac4ec81cb9..17f58adc78 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -165,6 +165,12 @@ void Connection::close(
getOutput().close();
}
+// Send a close to the client but keep the channels. Used by cluster.
+void Connection::sendClose() {
+ adapter.close(200, "OK", 0, 0);
+ getOutput().close();
+}
+
void Connection::idleOut(){}
void Connection::idleIn(){}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index b81f1eda21..cf0b4bc5c0 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -28,25 +28,29 @@
#include <boost/ptr_container/ptr_map.hpp>
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/framing/ProtocolVersion.h"
#include "Broker.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/Exception.h"
#include "ConnectionHandler.h"
#include "ConnectionState.h"
#include "SessionHandler.h"
-#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Connection.h"
+#include "qpid/Exception.h"
#include "qpid/RefCounted.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/ptr_map.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/TimeoutHandler.h"
#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -93,6 +97,13 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
+ template <class F> void eachSessionHandler(const F& f) {
+ for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
+ f(*ptr_map_ptr(i));
+ }
+
+ void sendClose();
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 7b1cacb640..4575c45a35 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -74,7 +74,7 @@ SessionImpl::~SessionImpl() {
{
Lock l(state);
if (state != DETACHED) {
- QPID_LOG(error, "Session was not closed cleanly");
+ QPID_LOG(warning, "Session was not closed cleanly");
setState(DETACHED);
handleClosed();
state.waitWaiters();
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9549527416..79b76f68be 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -92,6 +92,11 @@ void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
handler->insert(c);
}
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ handler->catchUpClosed(c);
+}
+
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
connections.erase(id);
@@ -119,6 +124,7 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
}
void Cluster::mcastEvent(const Event& e) {
+ QPID_LOG(trace, "MCAST " << e);
e.mcast(name, cpg);
}
@@ -170,8 +176,10 @@ void Cluster::deliver(
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else
+ else {
+ QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e);
handler->deliver(e);
+ }
}
catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster deliver: " << e.what());
@@ -181,14 +189,17 @@ void Cluster::deliver(
void Cluster::connectionEvent(const Event& e) {
Buffer buf(e);
- assert(e.getConnection());
- if (e.getType() == DATA)
- e.getConnection()->deliverBuffer(buf);
+ QPID_LOG(trace, "EXEC: " << e);
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
+ assert(connection);
+ if (e.getType() == DATA) {
+ connection->deliverBuffer(buf);
+ }
else { // control
AMQFrame frame;
while (frame.decode(buf)) {
- QPID_LOG(trace, "DLVR [" << self << "]: " << frame);
- e.getConnection()->received(frame);
+ QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame);
+ connection->received(frame);
}
}
}
@@ -196,26 +207,28 @@ void Cluster::connectionEvent(const Event& e) {
struct AddrList {
const cpg_address* addrs;
int count;
- const char* prefix;
- AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {}
+ const char *prefix, *suffix;
+ AddrList(const cpg_address* a, int n, const char* p="", const char* s="")
+ : addrs(a), count(n), prefix(p), suffix(s) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
- if (a.count && a.prefix) o << a.prefix;
+ if (!a.count) return o;
+ o << a.prefix;
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined"; break;
- case CPG_REASON_LEAVE: reasonString = " left";break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
- case CPG_REASON_NODEUP: reasonString = " node-up";break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
+ case CPG_REASON_JOIN: reasonString = " joined "; break;
+ case CPG_REASON_LEAVE: reasonString = " left "; break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down "; break;
+ case CPG_REASON_NODEUP: reasonString = " node-up "; break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
- o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : "");
+ o << member << reasonString;
}
- return o;
+ return o << a.suffix;
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -238,8 +251,8 @@ void Cluster::configChange(
cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
- << AddrList(left, nLeft, "Left: "));
+ QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent)
+ << AddrList(left, nLeft, "( ", ")"));
if (find(left, left+nLeft, self) != left+nLeft) {
// I have left the group, this is the final config change.
@@ -289,9 +302,14 @@ void Cluster::stall() {
}
void Cluster::ready() {
- // Called with lock held
- QPID_LOG(info, self << " ready at URL " << url);
+ QPID_LOG(debug, self << " ready at " << url);
+ unstall();
mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+}
+
+void Cluster::unstall() {
+ // Called with lock held
+ QPID_LOG(debug, self << " un-stalling");
handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
// if (mgmtObject!=0)
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index c63e2e3e58..b37a1e343b 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -64,6 +64,8 @@ class Cluster : private Cpg::Handler, public management::Manageable
void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
void erase(ConnectionId); // Erase a connection.
+
+ void catchUpClosed(const boost::intrusive_ptr<Connection>&); // Insert a local connection
/** Get the URLs of current cluster members. */
std::vector<Url> getUrls() const;
@@ -88,8 +90,9 @@ class Cluster : private Cpg::Handler, public management::Manageable
MemberId getSelf() const { return self; }
- void stall();
void ready();
+ void stall();
+ void unstall();
void shutdown();
diff --git a/cpp/src/qpid/cluster/ClusterHandler.h b/cpp/src/qpid/cluster/ClusterHandler.h
index 95106de016..ee25b8522b 100644
--- a/cpp/src/qpid/cluster/ClusterHandler.h
+++ b/cpp/src/qpid/cluster/ClusterHandler.h
@@ -59,6 +59,7 @@ class ClusterHandler
cpg_address *joined, int nJoined) = 0;
virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
+ virtual void catchUpClosed(const boost::intrusive_ptr<Connection>& c) = 0;
protected:
Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index e14e35998f..20d6fbb21e 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -21,6 +21,7 @@
#include "ClusterMap.h"
#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <functional>
@@ -86,6 +87,7 @@ void ClusterMap::ready(const MemberId& id, const Url& url) {
members[id] = url;
if (id == dumper)
dumper = MemberId();
+ QPID_LOG(info, id << " joined cluster: " << *this);
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index b225ba3568..a1ed5f34f5 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -31,18 +31,23 @@ namespace cluster {
using namespace framing;
+// Shadow connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
- : cluster(c), self(myId), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp()
-{}
+ : cluster(c), self(myId), catchUp(false), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{
+ QPID_LOG(debug, "New connection: " << *this);
+}
+// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp)
- : cluster(c), self(myId, this), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId),
- catchUp(isCatchUp), exCatchUp()
-{}
+ : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{
+ QPID_LOG(debug, "New connection: " << *this);
+}
Connection::~Connection() {}
@@ -59,6 +64,7 @@ void Connection::deliverDoOutput(uint32_t requested) {
}
void Connection::received(framing::AMQFrame& f) {
+ QPID_LOG(trace, "EXEC [" << *this << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
@@ -70,16 +76,10 @@ void Connection::closed() {
// connection around but replace the output handler with a
// no-op handler as the network output handler will be
// deleted.
-
- // FIXME aconway 2008-09-18: output handler reset in right place?
- // connection.setOutputHandler(&discardHandler);
output.setOutputHandler(discardHandler);
if (catchUp) {
- // This was a catch-up connection, may be promoted to a
- // shadow connection.
catchUp = false;
- exCatchUp = true;
- cluster.insert(boost::intrusive_ptr<Connection>(this));
+ cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
}
else {
// This was a local replicated connection. Multicast a deliver closed
@@ -125,8 +125,11 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/,
// FIXME aconway 2008-09-10: TODO
}
-void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) {
- // FIXME aconway 2008-09-10: TODO
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
+ ConnectionId shadow = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
+ self = shadow;
+ assert(isShadow());
}
void Connection::dumpComplete() {
@@ -134,6 +137,11 @@ void Connection::dumpComplete() {
}
bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
-
+
+std::ostream& operator<<(std::ostream& o, const Connection& c) {
+ return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
+ << (c.isCatchUp() ? ",catchup" : "") << ")";
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index c664427ea1..150c53807e 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -34,6 +34,8 @@
#include "qpid/framing/FrameDecoder.h"
#include "qpid/framing/SequenceNumber.h"
+#include <iosfwd>
+
namespace qpid {
namespace framing { class AMQFrame; }
@@ -59,11 +61,10 @@ class Connection :
ConnectionId getId() const { return self; }
broker::Connection& getBrokerConnection() { return connection; }
bool isLocal() const;
+ bool isShadow() const { return !isLocal(); }
/** True if the connection is in "catch-up" mode: building initial state */
bool isCatchUp() const { return catchUp; }
- bool isExCatchUp() const { return exCatchUp; }
-
Cluster& getCluster() { return cluster; }
@@ -109,6 +110,7 @@ class Connection :
Cluster& cluster;
ConnectionId self;
+ bool catchUp;
NoOpConnectionOutputHandler discardHandler;
WriteEstimate writeEstimate;
OutputInterceptor output;
@@ -116,8 +118,8 @@ class Connection :
broker::Connection connection;
framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
- bool catchUp;
- bool exCatchUp;
+
+ friend std::ostream& operator<<(std::ostream&, const Connection&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index d95a321adf..accf83ebc7 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -25,6 +25,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
+#include <stdexcept>
namespace qpid {
namespace cluster {
@@ -57,8 +58,18 @@ ConnectionCodec::~ConnectionCodec() {}
// ConnectionCodec functions delegate to the codecOutput
size_t ConnectionCodec::decode(const char* buffer, size_t size) {
- if (interceptor->isCatchUp())
- return codec.decode(buffer, size);
+ if (interceptor->isShadow())
+ throw Exception(QPID_MSG("Unexpected decode for shadow connection " << *interceptor));
+ else if (interceptor->isCatchUp()) {
+ size_t ret = codec.decode(buffer, size);
+ if (interceptor->isShadow()) {
+ // Promoted to shadow, close the codec.
+ // FIXME aconway 2008-09-19: can we close cleanly?
+ // codec.close();
+ throw Exception("Close codec");
+ }
+ return ret;
+ }
else
return interceptor->decode(buffer, size);
}
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 43c30d3b07..c78859cc39 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -19,6 +19,8 @@
*
*/
#include "DumpClient.h"
+#include "Cluster.h"
+#include "Connection.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -26,8 +28,11 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
+#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
@@ -39,6 +44,7 @@ namespace qpid {
namespace client {
struct ConnectionAccess {
static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
+ static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; }
};
} // namespace client
@@ -50,17 +56,30 @@ using broker::Queue;
using broker::QueueBinding;
using broker::Message;
using namespace framing;
-using namespace framing::message;
-using namespace client;
+namespace arg=client::arg;
+using client::SessionBase_0_10Access;
+
+// Create a connection with special version that marks it as a catch-up connection.
+client::Connection catchUpConnection() {
+ client::Connection c;
+ client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
+ return c;
+}
+// Send a control body directly to the session.
+void send(client::Session& s, const AMQBody& body) {
+ client::SessionBase_0_10Access sb(s);
+ sb.get()->send(body);
+}
-DumpClient::DumpClient(const Url& url, Broker& b,
+DumpClient::DumpClient(const Url& url, Cluster& c,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
- : donor(b), done(ok), failed(fail)
+ : receiver(url), donor(c),
+ connection(catchUpConnection()), shadowConnection(catchUpConnection()),
+ done(ok), failed(fail)
{
- // Special version identifies this as a catch-up connectionn.
- client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 0x80 + 10));
+ QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url);
connection.open(url);
session = connection.newSession();
}
@@ -72,15 +91,18 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
- donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
+ Broker& b = donor.getBroker();
+ b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
- donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
- SessionBase_0_10Access sb(session);
- // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
+ b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
session.sync();
session.close();
+ donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
+ QPID_LOG(debug, "Dump sent, closing catch_up connection.");
+ // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
connection.close();
+ QPID_LOG(debug, "Dump sent.");
}
void DumpClient::run() {
@@ -121,7 +143,8 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
SessionBase_0_10Access sb(session);
- framing::MessageTransferBody transfer(framing::ProtocolVersion(), CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED);
+ framing::MessageTransferBody transfer(
+ framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames());
}
@@ -129,5 +152,42 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
+void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
+ QPID_LOG(debug, "Dump connection " << *dumpConnection);
+
+ shadowConnection = catchUpConnection();
+ // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc.
+ shadowConnection.open(receiver);
+ dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
+ boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
+ // FIXME aconway 2008-09-19: use proxy for cluster commands?
+ AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(),
+ dumpConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())));
+ impl->handle(ready);
+ // Will be closed from the other end.
+ QPID_LOG(debug, "Dump done, connection " << *dumpConnection);
+}
+
+void DumpClient::dumpSession(broker::SessionHandler& sh) {
+ QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] "
+ << sh.getSession()->getId());
+
+ broker::SessionState* s = sh.getSession();
+ if (!s) return; // no session.
+ // Re-create the session.
+ boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
+ size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
+ // FIXME aconway 2008-09-19: verify matching ID.
+ boost::shared_ptr<client::SessionImpl> simpl(
+ new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size));
+ cimpl->addSession(simpl);
+ simpl->open(0);
+ client::Session cs;
+ client::SessionBase_0_10Access(cs).set(simpl);
+ cs.sync();
+ // FIXME aconway 2008-09-19: remaining session state.
+ QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 83c9ac4076..6cd382667a 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -24,11 +24,6 @@
#include "qpid/client/Connection.h"
#include "qpid/client/AsyncSession.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
@@ -45,16 +40,21 @@ class Exchange;
class QueueBindings;
class QueueBinding;
class QueuedMessage;
+class SessionHandler;
+
} // namespace broker
namespace cluster {
+class Cluster;
+class Connection;
+
/**
* A client that dumps the contents of a local broker to a remote one using AMQP.
*/
class DumpClient : public sys::Runnable {
public:
- DumpClient(const Url& url, broker::Broker& donor,
+ DumpClient(const Url& receiver, Cluster& donor,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail);
@@ -67,11 +67,14 @@ class DumpClient : public sys::Runnable {
void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
void dumpMessage(const broker::QueuedMessage&);
void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
+ void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
+ void dumpSession(broker::SessionHandler& s);
private:
- client::Connection connection;
- client::AsyncSession session;
- broker::Broker& donor;
+ Url receiver;
+ Cluster& donor;
+ client::Connection connection, shadowConnection;
+ client::AsyncSession session, shadowSession;
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
};
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index a0f9bc0e49..b8c2bc3901 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -56,16 +56,12 @@ class Event {
char* getData() { return data; }
const char* getData() const { return data; }
- boost::intrusive_ptr<Connection> getConnection() const { return connection; }
- void setConnection(const boost::intrusive_ptr<Connection>& c) { connection=c; }
-
operator framing::Buffer() const;
private:
static const size_t OVERHEAD;
EventType type;
ConnectionId connectionId;
- boost::intrusive_ptr<Connection> connection;
size_t size;
RefCountedBuffer::pointer data;
};
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index c188fe438e..8f08cb615f 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -40,14 +40,13 @@ void JoiningHandler::configChange(
if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
QPID_LOG(notice, cluster.self << " first in cluster.");
cluster.map.ready(cluster.self, cluster.url);
- cluster.ready();
+ cluster.unstall();
}
}
void JoiningHandler::deliver(Event& e) {
- // Discard connection events unless we are stalled and getting a dump.
+ // Discard connection events unless we are stalled to receive a dump.
if (state == STALLED) {
- e.setConnection(cluster.getConnection(e.getConnectionId()));
cluster.connectionEventQueue.push(e);
}
}
@@ -73,6 +72,7 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
}
else { // Start a new dump
cluster.map.dumper = cluster.map.first();
+ QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee);
if (dumpee == cluster.self) { // My turn
switch (state) {
case START:
@@ -101,24 +101,23 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) {
void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
if (c->isCatchUp()) {
++catchUpConnections;
- QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections.");
- }
- else if (c->isExCatchUp()) {
- if (c->getId().getConnectionPtr() != c.get()) // become shadow connection
- cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
- QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining");
- if (--catchUpConnections == 0)
- dumpComplete();
+ QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
}
- else // Local connection, will be stalled till dump complete.
+ cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
+
+void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << catchUpConnections-1);
+ if (c->isShadow())
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+ if (--catchUpConnections == 0)
+ dumpComplete();
}
void JoiningHandler::dumpComplete() {
// FIXME aconway 2008-09-18: need to detect incomplete dump.
//
if (state == STALLED) {
- QPID_LOG(debug, "Dump complete, unstalling.");
cluster.ready();
}
else {
diff --git a/cpp/src/qpid/cluster/JoiningHandler.h b/cpp/src/qpid/cluster/JoiningHandler.h
index c2cdb2c504..097d765e5e 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.h
+++ b/cpp/src/qpid/cluster/JoiningHandler.h
@@ -47,6 +47,7 @@ class JoiningHandler : public ClusterHandler
void ready(const MemberId&, const std::string& url);
void insert(const boost::intrusive_ptr<Connection>& c);
+ void catchUpClosed(const boost::intrusive_ptr<Connection>& c);
private:
void checkDumpRequest();
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp
index 1997ced9b0..ec9e7790c5 100644
--- a/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -47,7 +47,6 @@ void MemberHandler::configChange(
}
void MemberHandler::deliver(Event& e) {
- e.setConnection(cluster.getConnection(e.getConnectionId()));
cluster.connectionEventQueue.push(e);
}
@@ -64,7 +63,7 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt
cluster.stall();
if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
- dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+ dumpThread = Thread(new DumpClient(Url(urlStr), cluster,
boost::bind(&MemberHandler::dumpSent, this),
boost::bind(&MemberHandler::dumpError, this, _1)));
}
@@ -92,4 +91,9 @@ void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
cluster.connections[c->getId()] = c;
}
+void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
+ assert(0);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MemberHandler.h b/cpp/src/qpid/cluster/MemberHandler.h
index 6657ea4f53..9a07507a17 100644
--- a/cpp/src/qpid/cluster/MemberHandler.h
+++ b/cpp/src/qpid/cluster/MemberHandler.h
@@ -53,6 +53,7 @@ class MemberHandler : public ClusterHandler
void dumpError(const std::exception&);
void insert(const boost::intrusive_ptr<Connection>& c);
+ void catchUpClosed(const boost::intrusive_ptr<Connection>& );
public:
sys::Thread dumpThread;
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 8718154d3e..4424864787 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -39,13 +39,18 @@ OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutp
void OutputInterceptor::send(framing::AMQFrame& f) {
Locker l(lock);
next->send(f);
- sent += f.size();
+ if (!parent.isCatchUp())
+ sent += f.size();
}
void OutputInterceptor::activateOutput() {
- Locker l(lock);
- moreOutput = true;
- sendDoOutput();
+ Locker l(lock);
+ if (parent.isCatchUp())
+ next->activateOutput();
+ else {
+ moreOutput = true;
+ sendDoOutput();
+ }
}
// Called in write thread when the IO layer has no more data to write.
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index 4fbe79e118..f16cc18f41 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -59,6 +59,8 @@ std::ostream& operator<<(std::ostream&, const MemberId&);
struct ConnectionId : public std::pair<MemberId, Connection*> {
ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {}
+ ConnectionId(uint64_t m, uint64_t c)
+ : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
MemberId getMember() const { return first; }
Connection* getConnectionPtr() const { return second; }
};
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 1b44902054..60f85df02d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -81,21 +81,26 @@ struct ClusterFixture : public vector<uint16_t> {
void add();
void add0(bool force);
void setup();
+
void kill(size_t n) {
if (n) forkedBrokers[n-1].kill();
else broker0->broker->shutdown();
}
+
+ void waitFor(size_t n) {
+ size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
+ while (retry && getGlobalCluster().size() != n) {
+ ::usleep(1000);
+ --retry;
+ }
+ }
};
ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
add(n);
if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case.
// Wait for all n members to join the cluster
- int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
- while (retry && getGlobalCluster().size() != n) {
- ::sleep(1);
- --retry;
- }
+ waitFor(n);
BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
}
@@ -139,7 +144,7 @@ void ClusterFixture::add0(bool init) {
qpid::log::Logger::instance().setPrefix("main");
broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
- push_back(broker0->getPort());
+ if (size()) front() = broker0->getPort(); else push_back(broker0->getPort());
}
// For debugging: op << for CPG types.
@@ -190,14 +195,12 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) {
BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0);
}
-
#endif
-
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
-
Client c0(cluster[0], "c0");
+
// Create some shared state.
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo","q"));
@@ -205,24 +208,33 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
while (c0.session.queueQuery("q").getMessageCount() != 2)
::usleep(1000); // Wait for message to show up on broker 0.
- // FIXME aconway 2008-09-18: close session until we catchup session state also.
- c0.session.close();
- c0.connection.close();
-
- // Now join new broker, should catch up.
+ // Add a new broker, it should catch up.
cluster.add();
- // FIXME aconway 2008-09-18: when we do session state try adding
- // further stuff from broker 0, and leaving a subscription active.
+ // Do some work post-add
+ c0.session.queueDeclare("p");
+ c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+ // Do some work post-join
+ cluster.waitFor(2);
+ c0.session.messageTransfer(arg::content=Message("pbar","p"));
+
// Verify new broker has all state.
Message m;
+
Client c1(cluster[1], "c1");
+
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "foo");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "bar");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "pbar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {