summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-05 13:28:14 +0000
committerAlan Conway <aconway@apache.org>2009-03-05 13:28:14 +0000
commitaff798ccbc13b41696c661fe07bd3934deb18625 (patch)
treeba59dbefe36754a60386f8632cb07f05e89a61ea
parent23053617b74f1bbb6c8ae3c60fe24953701a4583 (diff)
downloadqpid-python-aff798ccbc13b41696c661fe07bd3934deb18625.tar.gz
cluster: fix delivery-property.exchange-name set on updated messages.
Logging improvements, useful for debugging: - qpid/SessionState.cpp: show frame bodies with command IDs. - assign cluster-wide id number to each Event. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@750456 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/cluster.mk1
-rw-r--r--cpp/src/qpid/SessionState.cpp7
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp4
-rw-r--r--cpp/src/qpid/broker/Exchange.h3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp16
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp61
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp5
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/Event.cpp5
-rw-r--r--cpp/src/qpid/cluster/Event.h6
-rw-r--r--cpp/src/qpid/cluster/EventFrame.cpp6
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h6
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp11
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp12
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h3
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.h45
-rw-r--r--cpp/src/tests/cluster_test.cpp4
-rw-r--r--cpp/src/tests/failover_soak.cpp28
-rw-r--r--cpp/src/tests/replaying_sender.cpp2
-rw-r--r--cpp/xml/cluster.xml1
22 files changed, 144 insertions, 89 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 5acabce694..925b1d1e68 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -64,6 +64,7 @@ cluster_la_SOURCES = \
qpid/cluster/ExpiryPolicy.cpp \
qpid/cluster/FailoverExchange.cpp \
qpid/cluster/FailoverExchange.h \
+ qpid/cluster/UpdateExchange.h \
qpid/cluster/Multicaster.cpp \
qpid/cluster/Multicaster.h \
qpid/cluster/McastFrameHandler.h \
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 2ea6b39f72..3e844fb24b 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -113,7 +113,8 @@ SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expec
void SessionState::senderRecord(const AMQFrame& f) {
if (isControl(f)) return; // Ignore control frames.
- QPID_LOG_IF(debug, f.getMethod(), getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getMethod());
+ QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody());
+
stateful = true;
if (timeout) sender.replayList.push_back(f);
sender.unflushedSize += f.encodedSize();
@@ -193,8 +194,8 @@ bool SessionState::receiverRecord(const AMQFrame& f) {
receiver.received = receiver.expected;
receiver.incomplete += receiverGetCurrent();
}
- QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod());
- QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f);
+ QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody());
+ if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame.");
return firstTime;
}
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index f8b9e4b183..174ed165c9 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -275,3 +275,7 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
{
return b->queue == queue;
}
+
+void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
+ msg->getProperties<DeliveryProperties>()->setExchange(getName());
+}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 488549bbf6..dce9007643 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -139,8 +139,9 @@ public:
virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+ virtual void setProperties(const boost::intrusive_ptr<Message>&);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-
+
//PersistableExchange:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ed6c2dc386..3c7c6d9afa 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -355,20 +355,12 @@ const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
+ msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+
std::string exchangeName = msg->getExchangeName();
- //TODO: the following should be hidden behind message (using MessageAdapter or similar)
-
- if (msg->isA<MessageTransferBody>()) {
- // Do not replace the delivery-properties.exchange if it is is already set.
- // This is used internally (by the cluster) to force the exchange name on a message.
- // The client library ensures this is always empty for messages from normal clients.
- if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty())
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
- }
- if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ if (!cacheExchange || cacheExchange->getName() != exchangeName)
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
- }
+ cacheExchange->setProperties(msg);
/* verify the userid if specified: */
std::string id =
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index bea336644f..69a63ad83c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -21,6 +21,7 @@
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "UpdateExchange.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -106,13 +107,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
+ eventId(0),
frameId(0),
initialized(false),
state(INIT),
connections(*this),
lastSize(0),
- lastBroker(false),
- sequence(0)
+ lastBroker(false)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -122,7 +123,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
mgmtObject->set_status("JOINING");
}
+ // Failover exchange provides membership updates to clients.
failoverExchange.reset(new FailoverExchange(this));
+ broker.getExchanges().registerExchange(failoverExchange);
+
+ // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
@@ -212,7 +219,6 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- e.setSequence(sequence++);
if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e);
@@ -225,34 +231,40 @@ void Cluster::deliver(const Event& e) {
}
// Handler for deliverEventQueue
-void Cluster::deliveredEvent(const Event& e) {
- QPID_LATENCY_RECORD("delivered event queue", e);
+void Cluster::deliveredEvent(const Event& event) {
+ Event e(event);
Mutex::ScopedLock l(lock);
+ if (state >= CATCHUP) {
+ e.setId(++eventId);
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ }
if (e.isCluster()) { // Cluster control, process in this thread.
- AMQFrame frame(e.getFrame());
+ EventFrame ef(e, e.getFrame());
+ QPID_LOG(trace, *this << " DLVR: " << ef);
ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
- else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
- if (e.getType() == CONTROL)
+ else if (state >= CATCHUP) { // Handle connection frames
+ if (e.getType() == CONTROL) {
connectionFrame(EventFrame(e, e.getFrame()));
+ }
else
connections.decode(e, e.getData());
}
- else // connection frame && state < CATCHUP. Drop.
- QPID_LOG(trace, *this << " DROP: " << e);
+ // Drop connection frames while state < CATCHUP
}
// Handler for deliverFrameQueue
-void Cluster::deliveredFrame(const EventFrame& e) {
+void Cluster::deliveredFrame(const EventFrame& event) {
Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ EventFrame e(event);
assert(!e.isCluster()); // Only connection frames on this queue.
- QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.type == DATA) // Sequence number to identify data frames.
- const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.type == DATA) // Add cluster-id to to data frames.
+ e.frame.setClusterId(frameId++);
boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
+ if (connection) // Ignore frames to closed local connections.
connection->deliveredFrame(e);
}
@@ -389,6 +401,10 @@ void Cluster::stall(Lock&) {
// Stop processing the deliveredEventQueue in order to send or
// recieve an update.
deliverEventQueue.stop();
+
+ // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must
+ // also wait for it to be empty before we are stalled, so that
+ // our local model is up-to-date to give an update.
}
void Cluster::unstall(Lock&) {
@@ -434,17 +450,18 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(),
+ new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) {
Lock l(lock);
updatedMap = m;
- frameId = fid;
+ eventId = eventId_;
+ frameId = frameId_;
checkUpdateIn(l);
}
@@ -601,9 +618,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
}
void Cluster::connectionFrame(const EventFrame& frame) {
- // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition.
- // Measure performance impact, restore with better locking.
+ // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition.
+ // Measure performance impact & review.
+ //
// deliverFrameQueue.push(frame);
+ //
deliveredFrame(frame);
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 4d358cf495..898ec2879f 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -88,7 +88,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&, uint64_t frameId);
+ void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -214,6 +214,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Used only in deliveredFrame thread
ClusterMap::Set elders;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ uint64_t eventId; // FIXME aconway 2009-03-04: review use for thread safety frame-q thread re-enabled.
uint64_t frameId;
// Used only during initialization
@@ -238,7 +239,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ClusterMap map;
size_t lastSize;
bool lastBroker;
- uint64_t sequence;
// Update related
sys::Thread updateThread;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 132043f91a..adb6621caf 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -138,7 +138,6 @@ struct ClusterPlugin : public Plugin {
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
- broker->getExchanges().registerExchange(cluster->getFailoverExchange());
ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 0f71a91293..4391b3eccb 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -280,6 +280,7 @@ void Connection::sessionState(
const SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete)
{
+
sessionState().setState(
replayStart,
sendCommandPoint,
@@ -299,9 +300,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
clusterDecoder.setFragment(fragment.data(), fragment.size());
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t eventId, uint64_t frameId) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members), frameId);
+ cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId);
self.second = 0; // Mark this as completed update connection.
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 048008f2a5..9f126d68c4 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -123,7 +123,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t eventId, uint64_t frameId);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 749fbf240f..ccb4d3ede8 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -44,7 +44,7 @@ const size_t EventHeader::HEADER_SIZE =
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
- : type(t), connectionId(c), size(s), sequence(0) {}
+ : type(t), connectionId(c), size(s), id(0) {}
Event::Event() {}
@@ -128,8 +128,7 @@ std::ostream& operator << (std::ostream& o, EventType t) {
}
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
- o << "[event " << e.getConnectionId() << "/" << e.getSequence()
- << " " << e.getType() << " " << e.getSize() << " bytes]";
+ o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index c9f44725df..382a550015 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -57,8 +57,8 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
/** Size of header + payload. */
size_t getStoreSize() { return size + HEADER_SIZE; }
- uint64_t getSequence() const { return sequence; }
- void setSequence(uint64_t n) { sequence = n; }
+ uint64_t getId() const { return id; }
+ void setId(uint64_t n) { id = n; }
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
@@ -69,7 +69,7 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
EventType type;
ConnectionId connectionId;
size_t size;
- uint64_t sequence;
+ uint64_t id;
};
/**
diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp
index 48c9eab958..4de76eafbe 100644
--- a/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/cpp/src/qpid/cluster/EventFrame.cpp
@@ -24,16 +24,16 @@
namespace qpid {
namespace cluster {
-EventFrame::EventFrame() : sequence(0) {}
+EventFrame::EventFrame() : eventId(0) {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
- : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc), type(e.getType())
+ : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), readCredit(rc), type(e.getType())
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit << " type=" << e.type;
+ return o << e.frame << "(from event " << e.eventId << " read-credit=" << e.readCredit << ")";
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index abeea3ef16..bb2d9d5493 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -49,14 +49,14 @@ struct EventFrame
// True if this frame follows immediately after frame e.
bool follows(const EventFrame& e) const {
- return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit);
+ return eventId == e.eventId || (eventId == e.eventId+1 && e.readCredit);
}
- bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
+ bool operator<(const EventFrame& e) const { return eventId < e.eventId; }
ConnectionId connectionId;
framing::AMQFrame frame;
- uint64_t sequence;
+ uint64_t eventId;
int readCredit; ///< last frame in an event, give credit when processed.
EventType type;
};
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 45a369eea9..cd42446016 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -70,17 +70,12 @@ void OutputInterceptor::giveReadCredit(int32_t credit) {
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
-bool OutputInterceptor::doOutput() {
- QPID_LOG(trace, parent << " write idle.");
- return false;
-}
+bool OutputInterceptor::doOutput() { return false; }
// Delivery of doOutput allows us to run the real connection doOutput()
// which tranfers frames to the codec for writing.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
- QPID_LATENCY_RECORD("deliver do-output", *this);
- QPID_LATENCY_CLEAR(*this);
size_t buf = getBuffered();
if (parent.isLocal())
writeEstimate.delivered(requested, sent, buf); // Update the estimate.
@@ -91,9 +86,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) {
moreOutput = parent.getBrokerConnection().doOutput();
} while (sent < requested && moreOutput);
sent += buf; // Include buffered data in the sent total.
-
- QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
-
+ QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput);
if (parent.isLocal() && moreOutput) {
QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available.");
sendDoOutput();
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 9cba377122..7e349905ab 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -86,14 +86,14 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_,
const Cluster::Connections& cons,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- frameId(frameId_), connections(cons),
+ eventId(eventId_), frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -104,7 +104,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.qpid-update");
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
void UpdateClient::run() {
try {
@@ -120,9 +120,6 @@ void UpdateClient::update() {
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
-
- // Update exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
@@ -133,6 +130,7 @@ void UpdateClient::update() {
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
+ membership.setEventId(eventId);
membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -274,7 +272,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 08267392f4..d6b821904f 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -63,7 +63,7 @@ class UpdateClient : public sys::Runnable {
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
+ broker::Broker& donor, const ClusterMap& map, uint64_t eventId, uint64_t frameId,
const std::vector<boost::intrusive_ptr<Connection> >& ,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
@@ -92,6 +92,7 @@ class UpdateClient : public sys::Runnable {
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
+ uint64_t eventId;
uint64_t frameId;
std::vector<boost::intrusive_ptr<Connection> > connections;
client::Connection connection, shadowConnection;
diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h
new file mode 100644
index 0000000000..7a4a484c8a
--- /dev/null
+++ b/cpp/src/qpid/cluster/UpdateExchange.h
@@ -0,0 +1,45 @@
+#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H
+#define QPID_CLUSTER_UPDATEEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "UpdateClient.h"
+#include "qpid/broker/FanOutExchange.h"
+
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange
+ * on messages.
+ */
+class UpdateExchange : public broker::FanOutExchange
+{
+ public:
+ UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
+ void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 6702686c2a..1acb2d4bf4 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -509,10 +509,11 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo","q"));
c0.session.messageTransfer(arg::content=Message("bar","q"));
+
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
- // Add a new broker, it should catch up.
+ // Add a new broker, it will catch up.
cluster.add();
// Do some work post-add
@@ -530,6 +531,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp
index 65c93c8e27..d84b6d58f7 100644
--- a/cpp/src/tests/failover_soak.cpp
+++ b/cpp/src/tests/failover_soak.cpp
@@ -339,7 +339,6 @@ wait_for_newbie ( )
{
Connection connection;
connection.open ( "127.0.0.1", newbie_port );
- sleep ( 2 );
connection.close();
newbie = 0; // He's no newbie anymore!
return true;
@@ -368,20 +367,19 @@ startNewBroker ( brokerVector & brokers,
module << moduleDir << "/cluster.so";
path << srcRoot << "/qpidd";
prefix << "soak-" << brokerId;
-
- std::vector<std::string> argv =
- list_of<string> ("qpidd")
- ("--no-module-dir")
- ("--load-module=cluster.so")
- ("--cluster-name")
- (clusterName)
- ("--auth=no")
- ("--no-data-dir")
- ("--mgmt-enable=no")
- ("--log-prefix")
- (prefix.str())
- ("--log-to-file")
- ("/tmp/qpidd.log");
+ std::vector<std::string> argv = list_of<string>
+ ("qpidd")
+ ("--no-module-dir")
+ ("--load-module=cluster.so")
+ ("--cluster-name")
+ (clusterName)
+ ("--auth=no")
+ ("--no-data-dir")
+ ("--mgmt-enable=no")
+ ("--log-prefix")
+ (prefix.str())
+ ("--log-to-file")
+ (prefix.str()+".log");
newbie = new ForkedBroker ( argv );
newbie_port = newbie->getPort();
diff --git a/cpp/src/tests/replaying_sender.cpp b/cpp/src/tests/replaying_sender.cpp
index f66056e357..ea2a13bd54 100644
--- a/cpp/src/tests/replaying_sender.cpp
+++ b/cpp/src/tests/replaying_sender.cpp
@@ -72,7 +72,7 @@ void Sender::execute(AsyncSession& session, bool isRetry)
sender.send(message);
if (count > reportFrequency && !(sent % reportFrequency)) {
if ( verbosity > 0 )
- std::cout << "sent " << sent << " of " << count << std::endl;
+ std::cout << "Sender sent " << sent << " of " << count << std::endl;
}
}
message.setData("That's all, folks!");
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index d3e4b488fb..5883f59efe 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -132,6 +132,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
+ <field name="event-id" type="uint64"/>> <!-- Event id counter value -->
<field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
</control>