summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
committerAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
commita2a56cf9a7483e165fb579d0b519b284d02009e3 (patch)
tree11264fc87ea6e54c54b476e245ad4ee9c83faaeb
parent30be110b6914959a1eaee4803ff8c1c9938db7bb (diff)
downloadqpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/SessionState.cpp27
-rw-r--r--cpp/src/qpid/SessionState.h17
-rw-r--r--cpp/src/qpid/broker/SemanticState.h6
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp5
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp24
-rw-r--r--cpp/src/qpid/cluster/Cluster.h5
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp103
-rw-r--r--cpp/src/qpid/cluster/Connection.h35
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp38
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h1
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp79
-rw-r--r--cpp/src/qpid/cluster/Event.cpp19
-rw-r--r--cpp/src/qpid/cluster/Event.h4
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp15
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.cpp6
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp20
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h4
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h4
-rw-r--r--cpp/src/tests/cluster_test.cpp49
-rw-r--r--cpp/xml/cluster.xml5
22 files changed, 316 insertions, 154 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 52c0ca229c..85f86c85c9 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -237,7 +237,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) :
replayFlushLimit(flush), replayHardLimit(hard) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : id(i), timeout(), config(c), stateful()
+ : id(i), timeout(), config(c), stateful()
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
@@ -250,4 +250,29 @@ std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
return o << "(" << p.command.getValue() << "+" << p.offset << ")";
}
+void SessionState::setState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete
+)
+{
+ sender.replayPoint = replayStart;
+ sender.flushPoint = sendCommandPoint;
+ sender.sendPoint = sendCommandPoint;
+ sender.unflushedSize = 0;
+ sender.replaySize = 0; // Replay list will be updated separately.
+ sender.incomplete = sentIncomplete;
+ sender.bytesSinceKnownCompleted = 0;
+
+ receiver.expected = expected;
+ receiver.received = received;
+ receiver.unknownCompleted = unknownCompleted;
+ receiver.incomplete = receivedIncomplete;
+ receiver.bytesSinceKnownCompleted = 0;
+}
+
} // namespace qpid
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index 10937b7a1e..bf4ff6d326 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -130,8 +130,8 @@ class SessionState {
virtual SessionPoint senderGetReplayPoint() const;
/** Peer expecting commands from this point.
- virtual *@return Range of frames to be replayed.
- */
+ *@return Range of frames to be replayed.
+ */
virtual ReplayRange senderExpected(const SessionPoint& expected);
// ==== Functions for receiver state
@@ -168,6 +168,19 @@ class SessionState {
/** ID of the command currently being handled. */
virtual SequenceNumber receiverGetCurrent() const;
+ /** Set the state variables, used to create a session that will resume
+ * from some previously established point.
+ */
+ virtual void setState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete
+ );
+
private:
struct SendState {
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 0c56885f8f..2170fe4e2e 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -110,6 +110,12 @@ class SemanticState : public sys::OutputTask,
bool doOutput();
std::string getName() const { return name; }
+
+ bool isAckExpected() const { return ackExpected; }
+ bool isAcquire() const { return acquire; }
+ bool isWindowing() const { return windowing; }
+ uint32_t getMsgCredit() const { return msgCredit; }
+ uint32_t getByteCredit() const { return byteCredit; }
};
private:
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 003e206bff..64b0b80446 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -436,7 +436,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
uint8_t acceptMode,
uint8_t acquireMode,
bool exclusive,
- const string& /*resumeId*/,//TODO implement resume behaviour
+ const string& /*resumeId*/,//TODO implement resume behaviour. Need to update cluster.
uint64_t /*resumeTtl*/,
const FieldTable& arguments)
{
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 22ebec76bf..0321c2e6aa 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -50,6 +50,9 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, framing::Proto
ESTABLISHED.insert(FAILED);
ESTABLISHED.insert(CLOSED);
ESTABLISHED.insert(OPEN);
+
+ FINISHED.insert(FAILED);
+ FINISHED.insert(CLOSED);
}
void ConnectionHandler::incoming(AMQFrame& frame)
@@ -107,7 +110,7 @@ void ConnectionHandler::close()
case OPEN:
setState(CLOSING);
proxy.close(200, OK);
- waitFor(CLOSED);
+ waitFor(FINISHED);
break;
// Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED
}
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index f8bd5e5d49..ffb612fae8 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -44,7 +44,7 @@ class ConnectionHandler : private StateManager,
{
typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
- std::set<int> ESTABLISHED;
+ std::set<int> ESTABLISHED, FINISHED;
class Adapter : public framing::FrameHandler
{
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 79b76f68be..93625af948 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
handler(&joiningHandler),
joiningHandler(*this),
- memberHandler(*this)
+ memberHandler(*this),
+ mcastId()
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -109,22 +110,22 @@ void Cluster::leave() {
}
void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
- QPID_LOG(trace, "MCAST [" << self << "]: " << body);
AMQFrame f(body);
- Event e(CONTROL, ConnectionId(self, cptr), f.size());
+ Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId);
Buffer buf(e);
f.encode(buf);
+ QPID_LOG(trace, "MCAST " << e << " " << body);
mcastEvent(e);
}
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
- Event e(DATA, connection, size);
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, size_t id) {
+ Event e(DATA, connection, size, id);
memcpy(e.getData(), data, size);
+ QPID_LOG(trace, "MCAST " << e);
mcastEvent(e);
}
void Cluster::mcastEvent(const Event& e) {
- QPID_LOG(trace, "MCAST " << e);
e.mcast(name, cpg);
}
@@ -166,12 +167,13 @@ void Cluster::deliver(
try {
MemberId from(nodeid, pid);
Event e = Event::delivered(from, msg, msg_len);
+
// Process cluster controls immediately
if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
Buffer buf(e);
AMQFrame frame;
while (frame.decode(buf)) {
- QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
+ QPID_LOG(trace, "DLVR " << e << " " << frame);
if (!handler->invoke(e.getConnectionId().getMember(), frame))
throw Exception(QPID_MSG("Invalid cluster control"));
}
@@ -189,17 +191,17 @@ void Cluster::deliver(
void Cluster::connectionEvent(const Event& e) {
Buffer buf(e);
- QPID_LOG(trace, "EXEC: " << e);
boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
assert(connection);
if (e.getType() == DATA) {
+ QPID_LOG(trace, "EXEC: " << e);
connection->deliverBuffer(buf);
}
else { // control
AMQFrame frame;
while (frame.decode(buf)) {
- QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame);
- connection->received(frame);
+ QPID_LOG(trace, "EXEC " << e << " " << frame);
+ connection->delivered(frame);
}
}
}
@@ -351,7 +353,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*
void Cluster::stopClusterNode(void)
{
- // FIXME aconway 2008-09-18:
+ // FIXME aconway 2008-09-18: mgmt
QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index b37a1e343b..55358e25db 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -77,7 +77,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
/** Send to the cluster */
void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
- void mcastBuffer(const char*, size_t, const ConnectionId&);
+ void mcastBuffer(const char*, size_t, const ConnectionId&, size_t id);
void mcastEvent(const Event& e);
/** Leave the cluster */
@@ -89,6 +89,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
void ready(const MemberId&, const std::string& url);
MemberId getSelf() const { return self; }
+ MemberId getId() const { return self; }
void ready();
void stall();
@@ -169,6 +170,8 @@ class Cluster : private Cpg::Handler, public management::Manageable
JoiningHandler joiningHandler;
MemberHandler memberHandler;
+ size_t mcastId;
+
friend class JoiningHandler;
friend class MemberHandler;
};
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index a1ed5f34f5..4aa66cce1f 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -20,10 +20,15 @@
*/
#include "Connection.h"
#include "Cluster.h"
+
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
-#include "qpid/framing/AllInvoker.h"
+
#include <boost/current_function.hpp>
namespace qpid {
@@ -31,6 +36,8 @@ namespace cluster {
using namespace framing;
+NoOpConnectionOutputHandler Connection::discardHandler;
+
// Shadow connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
@@ -49,7 +56,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
QPID_LOG(debug, "New connection: " << *this);
}
-Connection::~Connection() {}
+Connection::~Connection() {
+ QPID_LOG(debug, "Deleted connection: " << *this);
+}
bool Connection::doOutput() {
return output.doOutput();
@@ -63,28 +72,55 @@ void Connection::deliverDoOutput(uint32_t requested) {
output.deliverDoOutput(requested);
}
+// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
- QPID_LOG(trace, "EXEC [" << *this << "]: " << f);
+ QPID_LOG(trace, "RECV " << *this << ": " << f);
+ if (isShadow()) {
+ // Final close that completes catch-up for shadow connection.
+ if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ AMQFrame ok(in_place<ConnectionCloseOkBody>());
+ connection.getOutput().send(ok);
+ }
+ else
+ QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
+ }
+ else {
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection.received(f);
+ }
+}
+
+// Delivered from cluster.
+void Connection::delivered(framing::AMQFrame& f) {
+ QPID_LOG(trace, "DLVR " << *this << ": " << f);
+ assert(!isCatchUp());
// Handle connection controls, deliver other frames to connection.
+ currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
}
void Connection::closed() {
try {
+ QPID_LOG(debug, "Connection closed " << *this);
+
+ if (catchUp) {
+ catchUp = false;
+ cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
+ if (!isShadow()) connection.closed();
+ }
+
// Local network connection has closed. We need to keep the
// connection around but replace the output handler with a
// no-op handler as the network output handler will be
// deleted.
output.setOutputHandler(discardHandler);
- if (catchUp) {
- catchUp = false;
- cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- }
- else {
- // This was a local replicated connection. Multicast a deliver closed
- // and process any outstanding frames from the cluster until
- // self-delivery of deliver-closed.
+
+ if (isLocal()) {
+ // This was a local replicated connection. Multicast a deliver
+ // closed and process any outstanding frames from the cluster
+ // until self-delivery of deliver-close.
cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
@@ -100,29 +136,48 @@ void Connection::deliverClose () {
cluster.erase(self);
}
-size_t Connection::decode(const char* buffer, size_t size) {
- assert(!catchUp);
- ++mcastSeq;
- cluster.mcastBuffer(buffer, size, self);
+// Decode data from local clients.
+size_t Connection::decode(const char* buffer, size_t size) {
+ if (catchUp) { // Handle catch-up locally.
+ Buffer buf(const_cast<char*>(buffer), size);
+ while (localDecoder.decode(buf))
+ received(localDecoder.frame);
+ }
+ else { // Multicast local connections.
+ assert(isLocal());
+ cluster.mcastBuffer(buffer, size, self, ++mcastSeq);
+ }
return size;
}
void Connection::deliverBuffer(Buffer& buf) {
assert(!catchUp);
++deliverSeq;
- while (decoder.decode(buf))
- received(decoder.frame);
+ while (mcastDecoder.decode(buf))
+ delivered(mcastDecoder.frame);
}
-void Connection::sessionState(const SequenceNumber& /*replayStart*/,
- const SequenceSet& /*sentIncomplete*/,
- const SequenceNumber& /*expected*/,
- const SequenceNumber& /*received*/,
- const SequenceSet& /*unknownCompleted*/,
- const SequenceSet& /*receivedIncomplete*/)
+void Connection::sessionState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete)
{
- // FIXME aconway 2008-09-10: TODO
+ broker::SessionHandler& h = connection.getChannel(currentChannel);
+ broker::SessionState* s = h.getSession();
+ s->setState(
+ replayStart,
+ sendCommandPoint,
+ sentIncomplete,
+ expected,
+ received,
+ unknownCompleted,
+ receivedIncomplete);
+ QPID_LOG(debug, "Received session state dump for " << s->getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 150c53807e..df3c035c8a 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -47,7 +47,6 @@ class Cluster;
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
- public sys::ConnectionOutputHandler,
public framing::AMQP_AllOperations::ClusterConnectionHandler
{
@@ -60,20 +59,18 @@ class Connection :
ConnectionId getId() const { return self; }
broker::Connection& getBrokerConnection() { return connection; }
+
+ /** True for connections from direct clients of local broker */
bool isLocal() const;
+
+ /** True for connections that are shadowing remote broker connections */
bool isShadow() const { return !isLocal(); }
- /** True if the connection is in "catch-up" mode: building initial state */
+ /** True if the connection is in "catch-up" mode: building initial broker state. */
bool isCatchUp() const { return catchUp; }
Cluster& getCluster() { return cluster; }
- // ConnectionOutputHandler methods
- void close() {}
- void send(framing::AMQFrame&) {}
- void activateOutput() {}
- virtual size_t getBuffered() const { assert(0); return 0; }
-
// ConnectionInputHandler methods
void received(framing::AMQFrame&);
void closed();
@@ -85,18 +82,19 @@ class Connection :
// ConnectionCodec methods
size_t decode(const char* buffer, size_t size);
- // Called by cluster to deliver a buffer from CPG.
+ // Called for data delivered from the cluster.
void deliverBuffer(framing::Buffer&);
-
+ void delivered(framing::AMQFrame&);
// ==== Used in catch-up mode to build initial state.
//
// State dump methods.
void sessionState(const SequenceNumber& replayStart,
- const SequenceSet& sentIncomplete,
- const SequenceNumber& expected,
- const SequenceNumber& received,
- const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
void shadowReady(uint64_t memberId, uint64_t connectionId);
@@ -108,17 +106,20 @@ class Connection :
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
+ static NoOpConnectionOutputHandler discardHandler;
+
Cluster& cluster;
ConnectionId self;
bool catchUp;
- NoOpConnectionOutputHandler discardHandler;
WriteEstimate writeEstimate;
OutputInterceptor output;
- framing::FrameDecoder decoder;
+ framing::FrameDecoder localDecoder;
+ framing::FrameDecoder mcastDecoder;
broker::Connection connection;
framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
-
+ framing::ChannelId currentChannel;
+
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index accf83ebc7..1458a87923 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -23,18 +23,23 @@
#include "Cluster.h"
#include "ProxyInputHandler.h"
#include "qpid/broker/Connection.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include <stdexcept>
+#include <boost/utility/in_place_factory.hpp>
namespace qpid {
namespace cluster {
+using namespace framing;
+
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == framing::ProtocolVersion(0, 10))
+ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == ProtocolVersion(0, 10))
return new ConnectionCodec(out, id, cluster, false);
- else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10))
+ else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
return new ConnectionCodec(out, id, cluster, true); // Catch-up connection
return 0;
}
@@ -47,7 +52,8 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id)
ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp)
: codec(out, id, false),
- interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp))
+ interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)),
+ id(interceptor->getId())
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
@@ -56,28 +62,18 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id,
ConnectionCodec::~ConnectionCodec() {}
-// ConnectionCodec functions delegate to the codecOutput
size_t ConnectionCodec::decode(const char* buffer, size_t size) {
- if (interceptor->isShadow())
- throw Exception(QPID_MSG("Unexpected decode for shadow connection " << *interceptor));
- else if (interceptor->isCatchUp()) {
- size_t ret = codec.decode(buffer, size);
- if (interceptor->isShadow()) {
- // Promoted to shadow, close the codec.
- // FIXME aconway 2008-09-19: can we close cleanly?
- // codec.close();
- throw Exception("Close codec");
- }
- return ret;
- }
- else
- return interceptor->decode(buffer, size);
+ return interceptor->decode(buffer, size);
}
+bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
+
size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
+
bool ConnectionCodec::canEncode() { return codec.canEncode(); }
+
void ConnectionCodec::closed() { codec.closed(); }
-bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
-framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
+
+ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index a82569decd..e6ab7f5ba1 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -71,6 +71,7 @@ class ConnectionCodec : public sys::ConnectionCodec {
private:
amqp_0_10::Connection codec;
boost::intrusive_ptr<cluster::Connection> interceptor;
+ cluster::ConnectionId id;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 45ccec7166..ee87afb468 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -72,6 +72,8 @@ void send(client::Session& s, const AMQBody& body) {
sb.get()->send(body);
}
+// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
+
DumpClient::DumpClient(const Url& url, Cluster& c,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
@@ -79,9 +81,8 @@ DumpClient::DumpClient(const Url& url, Cluster& c,
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
- QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url);
connection.open(url);
- session = connection.newSession();
+ session = connection.newSession("dump_shared");
}
DumpClient::~DumpClient() {}
@@ -91,6 +92,7 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
+ QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver);
Broker& b = donor.getBroker();
b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
@@ -99,10 +101,9 @@ void DumpClient::dump() {
session.sync();
session.close();
donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
- QPID_LOG(debug, "Dump sent, closing catch_up connection.");
// FIXME aconway 2008-09-18: inidicate successful end-of-dump.
connection.close();
- QPID_LOG(debug, "Dump sent.");
+ QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver);
}
void DumpClient::run() {
@@ -153,49 +154,79 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
}
void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
- QPID_LOG(debug, "Dump connection " << *dumpConnection);
-
shadowConnection = catchUpConnection();
- // FIXME aconway 2008-09-19: Open with settings from dumpConnection - userid etc.
- shadowConnection.open(receiver);
+ broker::Connection& bc = dumpConnection->getBrokerConnection();
+ // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
+ // authentication etc. See ConnectionSettings.
+ shadowConnection.open(receiver, bc.getUserId());
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
- // FIXME aconway 2008-09-19: use proxy for cluster commands?
- AMQFrame ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(),
- dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())));
- impl->handle(ready);
- // Will be closed from the other end.
- QPID_LOG(debug, "Dump done, connection " << *dumpConnection);
+ AMQP_AllProxy::ClusterConnection proxy(*impl);
+ proxy.shadowReady(dumpConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+ shadowConnection.close();
+ QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
}
void DumpClient::dumpSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, "Dump session " << &sh.getConnection() << "[" << sh.getChannel() << "] "
+ QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
-
broker::SessionState* s = sh.getSession();
if (!s) return; // no session.
+
// Re-create the session.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
- // FIXME aconway 2008-09-19: verify matching ID.
boost::shared_ptr<client::SessionImpl> simpl(
new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size));
cimpl->addSession(simpl);
- simpl->open(0);
- client::Session cs;
- client::SessionBase_0_10Access(cs).set(simpl);
- cs.sync();
+ simpl->open(sh.getSession()->getTimeout());
+ client::SessionBase_0_10Access(shadowSession).set(simpl);
+ AMQP_AllProxy::ClusterConnection proxy(simpl->out);
+ // Re-create session state on remote connection.
broker::SessionState* ss = sh.getSession();
+
ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1));
// FIXME aconway 2008-09-19: remaining session state.
- QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
+
+ // Reset command-sequence state.
+ proxy.sessionState(
+ ss->senderGetReplayPoint().command,
+ ss->senderGetCommandPoint().command,
+ ss->senderGetIncomplete(),
+ ss->receiverGetExpected().command,
+ ss->receiverGetReceived().command,
+ ss->receiverGetUnknownComplete(),
+ ss->receiverGetIncomplete()
+ );
+
+ // FIXME aconway 2008-09-23: session replay list.
+
+ QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId());
}
void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName());
+ using namespace message;
+ shadowSession.messageSubscribe(
+ arg::queue = ci->getQueue()->getName(),
+ arg::destination = ci->getName(),
+ arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
+ arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
+ arg::exclusive = false , // FIXME aconway 2008-09-23: how to read.
+
+ // TODO aconway 2008-09-23: remaining args not used by current broker.
+ // Update this code when they are.
+ arg::resumeId=std::string(),
+ arg::resumeTtl=0,
+ arg::arguments=FieldTable()
+ );
+ shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled?
+ QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 89c6268d7f..43335e3607 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -31,17 +31,18 @@ namespace cluster {
using framing::Buffer;
-const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t);
+const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(size_t);
-Event::Event(EventType t, const ConnectionId c, const size_t s)
- : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {}
+Event::Event(EventType t, const ConnectionId& c, size_t s, size_t i)
+ : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), id(i) {}
Event Event::delivered(const MemberId& m, void* d, size_t s) {
Buffer buf(static_cast<char*>(d), s);
EventType type((EventType)buf.getOctet());
ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ size_t id = buf.getLong();
assert(buf.getPosition() == OVERHEAD);
- Event e(type, connection, s-OVERHEAD);
+ Event e(type, connection, s-OVERHEAD, id);
memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD);
return e;
}
@@ -51,6 +52,7 @@ void Event::mcast (const Cpg::Name& name, Cpg& cpg) const {
Buffer b(header, OVERHEAD);
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr()));
+ b.putLong(id);
iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
}
@@ -60,13 +62,12 @@ Event::operator Buffer() const {
}
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+
std::ostream& operator << (std::ostream& o, const Event& e) {
- o << "[event: " << e.getConnectionId()
+ o << "[event " << e.getConnectionId() << "/" << e.getId()
<< " " << EVENT_TYPE_NAMES[e.getType()]
- << " " << e.getSize() << " bytes: ";
- std::ostream_iterator<char> oi(o,"");
- std::copy(e.getData(), e.getData()+std::min(e.getSize(), size_t(16)), oi);
- return o << "...]";
+ << " " << e.getSize() << " bytes]";
+ return o;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index b8c2bc3901..12a7a9388a 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -43,7 +43,7 @@ namespace cluster {
class Event {
public:
/** Create an event to mcast with a buffer of size bytes. */
- Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0);
+ Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0, size_t id=0);
/** Create an event copied from delivered data. */
static Event delivered(const MemberId& m, void* data, size_t size);
@@ -55,6 +55,7 @@ class Event {
size_t getSize() const { return size; }
char* getData() { return data; }
const char* getData() const { return data; }
+ size_t getId() const { return id; }
operator framing::Buffer() const;
@@ -64,6 +65,7 @@ class Event {
ConnectionId connectionId;
size_t size;
RefCountedBuffer::pointer data;
+ size_t id;
};
std::ostream& operator << (std::ostream&, const Event&);
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index 8f08cb615f..75f6651b0a 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -46,9 +46,10 @@ void JoiningHandler::configChange(
void JoiningHandler::deliver(Event& e) {
// Discard connection events unless we are stalled to receive a dump.
- if (state == STALLED) {
+ if (state == STALLED)
cluster.connectionEventQueue.push(e);
- }
+ else
+ QPID_LOG(trace, "Discarded pre-join event " << e);
}
void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
@@ -80,12 +81,13 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
assert(0); break;
case DUMP_REQUESTED:
- QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper);
+ QPID_LOG(debug, cluster.self << " stalling for dump from " << cluster.map.dumper);
state = STALLED;
cluster.stall();
break;
case DUMP_COMPLETE:
+ QPID_LOG(debug, cluster.self << " at start point and dump complete, ready.");
cluster.ready();
break;
}
@@ -107,8 +109,8 @@ void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
}
void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << catchUpConnections-1);
- if (c->isShadow())
+ QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
+ if (c->isShadow())
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
if (--catchUpConnections == 0)
dumpComplete();
@@ -118,10 +120,11 @@ void JoiningHandler::dumpComplete() {
// FIXME aconway 2008-09-18: need to detect incomplete dump.
//
if (state == STALLED) {
+ QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
cluster.ready();
}
else {
- QPID_LOG(debug, "Dump complete, waiting for stall point.");
+ QPID_LOG(debug, cluster.self << " received dump, waiting for start point.");
assert(state == DUMP_REQUESTED);
state = DUMP_COMPLETE;
}
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp
index ec9e7790c5..0f600a4995 100644
--- a/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -34,7 +34,8 @@ using namespace framing;
MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
MemberHandler::~MemberHandler() {
- if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+ if (dumpThread.id())
+ dumpThread.join(); // Join the last dumpthread.
}
void MemberHandler::configChange(
@@ -62,7 +63,8 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt
assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
cluster.stall();
- if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+ if (dumpThread.id())
+ dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(new DumpClient(Url(urlStr), cluster,
boost::bind(&MemberHandler::dumpSent, this),
boost::bind(&MemberHandler::dumpError, this, _1)));
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4424864787..e69992517c 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -45,10 +45,12 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
void OutputInterceptor::activateOutput() {
Locker l(lock);
+
if (parent.isCatchUp())
next->activateOutput();
else {
moreOutput = true;
+ QPID_LOG(trace, &parent << " activateOutput - sending doOutput");
sendDoOutput();
}
}
@@ -79,15 +81,19 @@ void OutputInterceptor::deliverDoOutput(size_t requested) {
QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
- if (parent.isLocal() && moreOutput)
+ if (parent.isLocal() && moreOutput) {
+ QPID_LOG(trace, &parent << " deliverDoOutput - sending doOutput, more output available.");
sendDoOutput();
+ }
else
doingOutput = false;
}
void OutputInterceptor::startDoOutput() {
- if (!doingOutput)
+ if (!doingOutput) {
+ QPID_LOG(trace, &parent << " startDoOutput - sending doOutput, more output available.");
sendDoOutput();
+ }
}
// Send a doOutput request if one is not already in flight.
@@ -111,4 +117,14 @@ void OutputInterceptor::setOutputHandler(sys::ConnectionOutputHandler& h) {
next = &h;
}
+void OutputInterceptor::close() {
+ Locker l(lock);
+ next->close();
+}
+
+size_t OutputInterceptor::getBuffered() const {
+ Locker l(lock);
+ return next->getBuffered();
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index ad9d9952bf..17ab5bbc55 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -43,8 +43,8 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
// sys::ConnectionOutputHandler functions
void send(framing::AMQFrame& f);
void activateOutput();
- void close() { Locker l(lock); next->close(); }
- size_t getBuffered() const { Locker l(lock); return next->getBuffered(); }
+ void close();
+ size_t getBuffered() const;
// Delivery point for doOutput requests.
void deliverDoOutput(size_t requested);
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index c2da8edc2c..48a5a5ce85 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -36,8 +36,8 @@ namespace sys {
class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
{
public:
- ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {}
- void set(ConnectionOutputHandler* p) { next = p; }
+ ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) { assert(next); }
+ void set(ConnectionOutputHandler* p) { next = p; assert(next); }
ConnectionOutputHandler* get() { return next; }
const ConnectionOutputHandler* get() const { return next; }
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 6bb5e4a8ca..9573caf61d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -98,7 +98,7 @@ struct ClusterFixture : public vector<uint16_t> {
ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
add(n);
- if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case.
+ if (!init0) return; // Defer initialization of broker0
// Wait for all n members to join the cluster
waitFor(n);
BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
@@ -164,36 +164,35 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-#if 0 // FIXME aconway 2008-09-22: enable.
QPID_AUTO_TEST_CASE(DumpConsumers) {
- ClusterFixture cluster(1);
- Client c0(cluster[0]);
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
- c0.subs.subscribe(c0.lq, "q");
- c0.session.messageTransfer(arg::content=Message("before", "q"));
- Message m;
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "before");
+ c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+ c0.session.sync();
- // Start new member
+ // Start new members
cluster.add();
- Client c1(cluster[1]);
+ Client c1(cluster[1], "c1");
+ cluster.add();
+ Client c2(cluster[2], "c2");
- // Transfer some messages to the subscription by client c0.
+ // Transfer a message, verify all members see it.
c0.session.messageTransfer(arg::content=Message("aaa", "q"));
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "aaa");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
- c1.session.messageTransfer(arg::content=Message("bbb", "q"));
+ // Activate the subscription, ensure message removed on all queues.
+ c0.subs.setFlowControl("q", FlowControl::messageCredit(1));
+ Message m;
BOOST_CHECK(c0.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
- // Verify that the queue has been drained on both brokers.
- // This proves that the consumer was replicated when the second broker joined.
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
}
-#endif
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
@@ -217,7 +216,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
cluster.waitFor(2);
c0.session.messageTransfer(arg::content=Message("pbar","p"));
- // Verify new brokers have all state.
+ // Verify new brokers have state.
Message m;
Client c1(cluster[1], "c1");
@@ -228,11 +227,14 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ // Add & verify another broker.
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "pfoo");
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "pbar");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -333,7 +335,6 @@ QPID_AUTO_TEST_CASE(testStall) {
c1.session.messageTransfer(arg::content=Message("foo","q"));
while (c1.session.queueQuery("q").getMessageCount() != 1)
::usleep(1000); // Wait for message to show up on broker 1.
- sleep(2); // FIXME aconway 2008-09-11: remove.
// But it should not be on broker 0.
boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
BOOST_REQUIRE(q0);
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 2acf0aea82..040a3eeae0 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -67,10 +67,11 @@ o<?xml version="1.0"?>
<!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.-->
+ <field name="command-point" type="sequence-no"/> <!-- Id of next command sent -->
<field name="sent-incomplete" type="sequence-set"/> <!-- Commands sent and incomplete. -->
- <field name="expected" type="sequence-no"/> <!-- Idempotence barrier -->
- <field name="received" type="sequence-no"/> <!-- Received up to here > expected-->
+ <field name="expected" type="sequence-no"/> <!-- Next command expected. -->
+ <field name="received" type="sequence-no"/> <!-- Received up to here (>= expected) -->
<field name="unknown-completed" type="sequence-set"/> <!-- Completed but not known to peer. -->
<field name="received-incomplete" type="sequence-set"/> <!-- Received and incomplete -->
</control>