summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-28 18:24:31 +0000
committerAlan Conway <aconway@apache.org>2012-05-28 18:24:31 +0000
commit7064d141a8c4c33385bf0962d83d4a2797b37dc5 (patch)
tree52866311f22c63356f2e1fc41ce58e3a9ac2ed49 /qpid/cpp/src
parent9b6714700c49e24825a707dcd3c748d6c0be8be2 (diff)
downloadqpid-python-7064d141a8c4c33385bf0962d83d4a2797b37dc5.tar.gz
QPID-3603: Failover optimization restored.
A backup broker that fails over to a new primary can avoid downloading messages that it already has from the previous primary. The backup sends its position to the primary as a client-arg and the primary sends back any necessary dequeues and starts replicating after the messages on the backup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp35
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp217
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h37
3 files changed, 198 insertions, 91 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 76840ea92e..47fc3afdeb 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -112,25 +112,19 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
-
- // FIXME aconway 2011-12-09: Failover optimization removed.
- // There was code here to re-use messages already on the backup
- // during fail-over. This optimization was removed to simplify
- // the logic till we get the basic replication stable, it
- // can be re-introduced later. Last revision with the optimization:
- // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
-
- // Clear out any old messages, reset the queue to start replicating fresh.
- queue->purge(); // FIXME aconway 2012-05-02: race
- queue->setPosition(0);
-
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- // TODO aconway 2011-12-19: optimize.
- settings.setInt(QPID_SYNC_FREQUENCY, 1);
- peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings);
+ settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ SequenceNumber front;
+ if (ReplicatingSubscription::getFront(*queue, front))
+ settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);
+ peer.getMessage().subscribe(
+ args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
+ false/*exclusive*/, "", 0, settings);
+ // FIXME aconway 2012-05-22: use a finite credit window
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
+ QPID_LOG(debug, logPrefix << "Subscribed bridge: " << bridgeName << " " << settings);
}
namespace {
@@ -174,11 +168,10 @@ void QueueReplicator::route(Deliverable& msg)
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
<< " to " << position);
- if (queue->getPosition() > position) {
- throw Exception(
- QPID_MSG(logPrefix << "Invalid position update from "
- << queue->getPosition() << " to " << position));
- }
+ // Verify that there are no messages after the new position in the queue.
+ SequenceNumber next;
+ if (ReplicatingSubscription::getNext(*queue, position, next))
+ throw Exception("Invalid position move, preceeds messages");
queue->setPosition(position);
}
// Ignore unknown event keys, may be introduced in later versions.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 9bab20048c..8c88382b94 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -38,17 +38,78 @@ using namespace broker;
using namespace std;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
namespace {
const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
+class DequeueRemover
+{
+ public:
+ DequeueRemover(
+ SequenceSet& r,
+ const SequenceNumber& s,
+ const SequenceNumber& e
+ ) : dequeues(r), start(s), end(e)
+ {
+ dequeues.add(start, end);
+ }
+
+ void operator()(const QueuedMessage& message) {
+ if (message.position >= start && message.position <= end) {
+ //i.e. message is within the intial range and has not been dequeued,
+ //so remove it from the dequeues
+ dequeues.remove(message.position);
+ }
+ }
+
+ private:
+ SequenceSet& dequeues;
+ const SequenceNumber start;
+ const SequenceNumber end;
+};
+
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
}
+
+/** Dummy consumer used to get the front position on the queue */
+class GetPositionConsumer : public Consumer
+{
+ public:
+ GetPositionConsumer() :
+ Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
+ bool deliver(broker::QueuedMessage& ) { return true; }
+ void notify() {}
+ bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
+ bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+ bool browseAcquired() const { return true; }
+ broker::OwnershipToken* getSession() { return 0; }
+};
+
+
+bool ReplicatingSubscription::getNext(
+ broker::Queue& q, SequenceNumber from, SequenceNumber& result)
+{
+ boost::shared_ptr<Consumer> c(new GetPositionConsumer);
+ c->setPosition(from);
+ if (!q.dispatch(c)) return false;
+ result = c->getPosition();
+ return true;
+}
+
+bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
+ // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
+ return getNext(q, 0, front);
+}
+
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -70,14 +131,47 @@ ReplicatingSubscription::Factory::create(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->addObserver(rs);
- // NOTE: readyPosition must be set _after_ addObserver, so
+ // NOTE: initialize must be called _after_ addObserver, so
// messages can't be enqueued after setting readyPosition
// but before registering the observer.
- rs->setReadyPosition();
+ rs->initialize();
}
return rs;
}
+struct QueueRange {
+ bool empty;
+ SequenceNumber front;
+ SequenceNumber back;
+
+ // FIXME aconway 2012-05-26: fix front calculation
+ QueueRange(broker::Queue& q) {
+ back = q.getPosition();
+ front = back+1;
+ empty = !ReplicatingSubscription::getFront(q, front);
+ }
+
+ QueueRange(const framing::FieldTable args) {
+ back = args.getAsInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER);
+ front = back+1;
+ empty = !args.isSet(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER);
+ if (!empty) {
+ front = args.getAsInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER);
+ if (back < front)
+ throw InvalidArgumentException("Invalid bounds for backup queue");
+ }
+ }
+
+ /** Consumer position to start consuming from the front */
+ SequenceNumber browserStart() { return front-1; }
+};
+
+ostream& operator<<(ostream& o, const QueueRange& qr) {
+
+ if (qr.front > qr.back) return o << "empty(" << qr.back << ")";
+ else return o << "[" << qr.front << "," << qr.back << "]";
+}
+
ReplicatingSubscription::ReplicatingSubscription(
LogPrefix lp,
SemanticState* parent,
@@ -96,25 +190,50 @@ ReplicatingSubscription::ReplicatingSubscription(
dummy(new Queue(mask(name))),
ready(false)
{
- // Separate the remote part from a "local-remote" address for logging.
- string address = parent->getSession().getConnection().getUrl();
- size_t i = address.find('-');
- if (i != string::npos) address = address.substr(i+1);
- logSuffix = " (" + address + ")";
-
- // FIXME aconway 2011-12-09: Failover optimization removed.
- // There was code here to re-use messages already on the backup
- // during fail-over. This optimization was removed to simplify
- // the logic till we get the basic replication stable, it
- // can be re-introduced later. Last revision with the optimization:
- // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
-
- // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
- // so we will start consuming from the lowest numbered message.
- // This is incorrect if the sequence number wraps around, but
- // this is what all consumers currently do.
-
- QPID_LOG(debug, logPrefix << "Created replicating subscription" << logSuffix);
+ try {
+ // FIXME aconway 2012-05-22: use hostname from brokerinfo
+ // Separate the remote part from a "local-remote" address for logging.
+ string address = parent->getSession().getConnection().getUrl();
+ size_t i = address.find('-');
+ if (i != string::npos) address = address.substr(i+1);
+ logSuffix = " (" + address + ")";
+
+ QueueRange primary(*queue);
+ QueueRange backup(arguments);
+ backupPosition = backup.back;
+
+ // We can re-use some backup messages if backup and primary queues
+ // overlap and the backup is not missing messages at the front of the queue.
+ if (!primary.empty && // Primary not empty
+ !backup.empty && // Backup not empty
+ primary.front >= backup.front && // Not missing messages at the front
+ primary.front <= backup.back // Overlap
+ )
+ {
+ // Remove messages that are still on the primary queue from dequeues
+ // FIXME aconway 2012-05-22: optimize to iterate only the relevant
+ // section of the queue
+ DequeueRemover remover(dequeues, backup.front, backup.back);
+ queue->eachMessage(remover);
+ position = std::min(primary.back, backup.back);
+ }
+ else {
+ // Clear the backup queue and reset to start browsing at the
+ // front of the primary queue.
+ if (!backup.empty) dequeues.add(backup.front, backup.back);
+ position = primary.browserStart();
+
+ }
+ QPID_LOG(debug, logPrefix << "New backup subscription " << getName()
+ << " backup range " << backup
+ << " primary range " << primary
+ << " position " << position
+ << " dequeues " << dequeues << logSuffix);
+ }
+ catch (const std::exception& e) {
+ throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
+ << e.what() << logSuffix));
+ }
}
ReplicatingSubscription::~ReplicatingSubscription() {
@@ -122,39 +241,37 @@ ReplicatingSubscription::~ReplicatingSubscription() {
}
// Called in subscription's connection thread when the subscription is created.
-void ReplicatingSubscription::setReadyPosition() {
- // Don't need to lock, this is called only on creation.
+void ReplicatingSubscription::initialize() {
+ sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently
+ // Send initial dequeues and position to the backup.
+ // There most be a shared_ptr(this) when sending.
+ sendDequeueEvent(l);
+ sendPositionEvent(position, l);
+ backupPosition = position;
+ // Set the ready position, all messages after this has been set as an observer.
// All messages after this position have been seen by us as QueueObserver.
- readyPosition = getQueue()->getPosition();
- // Create a separate subscription to browse the front message on
- // the queue so that we can test for queue empty.
- boost::shared_ptr<Consumer> c(new GetPositionConsumer);
- bool found = getQueue()->dispatch(c);
- SequenceNumber front = c->getPosition();
- if (!found || front >= readyPosition) {
- // The queue is empty, or has already advanced past the ready position.
- QPID_LOG(debug, logPrefix << "backup subscribed, no catch up, at "
- << readyPosition << logSuffix);
- // Fake lock, only called during creation:
- setReady(*(sys::Mutex::ScopedLock*)0);
+ QueueRange range(*getQueue());
+ readyPosition = range.back;
+ if (range.empty || position >= readyPosition) {
+ setReady(l);
}
else {
- QPID_LOG(debug, logPrefix << "backup subscribed, catching up "
- << front << "-" << readyPosition << logSuffix);
+ QPID_LOG(debug, logPrefix << "Backup subscription catching up from "
+ << position << " to " << readyPosition << logSuffix);
}
}
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
try {
- // Add position events for the subscribed queue, not for the internal event queue.
+ // Add position events for the subscribed queue, not the internal event queue.
if (qm.queue == getQueue().get()) {
QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix);
{
sys::Mutex::ScopedLock l(lock);
assert(position == qm.position);
- // qm.position is the position of the newly enqueued qm on the local queue.
+ // qm.position is the position of the newly enqueued qm on local queue.
// backupPosition is latest position on backup queue before enqueueing
if (qm.position <= backupPosition)
throw Exception(
@@ -162,9 +279,8 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
<< " but got " << qm.position));
if (qm.position - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(qm.position);
- --send; // Send the position before qm was enqueued.
- sendPositionEvent(send);
+ // Send the position before qm was enqueued.
+ sendPositionEvent(qm.position-1, l);
}
backupPosition = qm.position;
}
@@ -186,7 +302,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-// Send a ready event to the backup.
void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
if (ready) return;
ready = true;
@@ -269,11 +384,13 @@ bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
{
+ if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
<< " from " << getQueue()->getName() << logSuffix);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
+ dequeues.clear();
buffer.reset();
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
}
@@ -295,13 +412,14 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber position)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "sending position " << position
+ if (pos == backupPosition) return; // No need to send.
+ QPID_LOG(trace, logPrefix << "sending position " << pos
<< ", was " << backupPosition << logSuffix);
- string buf(backupPosition.encodedSize(),'\0');
+ string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
- position.encode(buffer);
+ pos.encode(buffer);
buffer.reset();
sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
}
@@ -342,10 +460,7 @@ bool ReplicatingSubscription::doDispatch()
{
{
sys::Mutex::ScopedLock l(lock);
- if (!dequeues.empty()) {
- sendDequeueEvent(l);
- dequeues.clear();
- }
+ if (!dequeues.empty()) sendDequeueEvent(l);
}
return ConsumerImpl::doDispatch();
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index b0aff18879..0956d6d503 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -74,6 +74,19 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+ static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
+ // FIXME aconway 2012-05-23: these don't belong on ReplicatingSubscription
+ /** Get position of front message on queue.
+ *@return false if queue is empty.
+ */
+ static bool getFront(broker::Queue&, framing::SequenceNumber& result);
+ /** Get next message after from in queue.
+ *@return false if none found.
+ */
+ static bool getNext(broker::Queue&, framing::SequenceNumber from,
+ framing::SequenceNumber& result);
ReplicatingSubscription(LogPrefix,
broker::SemanticState* parent,
@@ -97,7 +110,10 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool browseAcquired() const { return true; }
bool hideDeletedError();
- void setReadyPosition();
+ /** Initialization that must be done after construction because it
+ * requires a shared_ptr to this to exist.
+ */
+ void initialize();
protected:
bool doDispatch();
@@ -116,26 +132,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
void sendDequeueEvent(const sys::Mutex::ScopedLock&);
- void sendPositionEvent(framing::SequenceNumber);
+ void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
void setReady(const sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&);
-
- /** Dummy consumer used to get the front position on the queue */
- class GetPositionConsumer : public Consumer
- {
- public:
- GetPositionConsumer() :
- Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
- bool deliver(broker::QueuedMessage& ) { return true; }
- void notify() {}
- bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
- bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
- void cancel() {}
- void acknowledged(const broker::QueuedMessage&) {}
- bool browseAcquired() const { return true; }
- broker::OwnershipToken* getSession() { return 0; }
- };
-
friend struct Factory;
};