diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:31 +0000 |
| commit | 7064d141a8c4c33385bf0962d83d4a2797b37dc5 (patch) | |
| tree | 52866311f22c63356f2e1fc41ce58e3a9ac2ed49 /qpid/cpp/src | |
| parent | 9b6714700c49e24825a707dcd3c748d6c0be8be2 (diff) | |
| download | qpid-python-7064d141a8c4c33385bf0962d83d4a2797b37dc5.tar.gz | |
QPID-3603: Failover optimization restored.
A backup broker that fails over to a new primary can avoid downloading messages
that it already has from the previous primary. The backup sends its position to
the primary as a client-arg and the primary sends back any necessary dequeues
and starts replicating after the messages on the backup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 35 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 217 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 37 |
3 files changed, 198 insertions, 91 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 76840ea92e..47fc3afdeb 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -112,25 +112,19 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; - - // FIXME aconway 2011-12-09: Failover optimization removed. - // There was code here to re-use messages already on the backup - // during fail-over. This optimization was removed to simplify - // the logic till we get the basic replication stable, it - // can be re-introduced later. Last revision with the optimization: - // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - - // Clear out any old messages, reset the queue to start replicating fresh. - queue->purge(); // FIXME aconway 2012-05-02: race - queue->setPosition(0); - settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - // TODO aconway 2011-12-19: optimize. - settings.setInt(QPID_SYNC_FREQUENCY, 1); - peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); + settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + SequenceNumber front; + if (ReplicatingSubscription::getFront(*queue, front)) + settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front); + peer.getMessage().subscribe( + args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, + false/*exclusive*/, "", 0, settings); + // FIXME aconway 2012-05-22: use a finite credit window peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName); + QPID_LOG(debug, logPrefix << "Subscribed bridge: " << bridgeName << " " << settings); } namespace { @@ -174,11 +168,10 @@ void QueueReplicator::route(Deliverable& msg) SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() << " to " << position); - if (queue->getPosition() > position) { - throw Exception( - QPID_MSG(logPrefix << "Invalid position update from " - << queue->getPosition() << " to " << position)); - } + // Verify that there are no messages after the new position in the queue. + SequenceNumber next; + if (ReplicatingSubscription::getNext(*queue, position, next)) + throw Exception("Invalid position move, preceeds messages"); queue->setPosition(position); } // Ignore unknown event keys, may be introduced in later versions. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 9bab20048c..8c88382b94 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -38,17 +38,78 @@ using namespace broker; using namespace std; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); +const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); +const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); namespace { const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace +class DequeueRemover +{ + public: + DequeueRemover( + SequenceSet& r, + const SequenceNumber& s, + const SequenceNumber& e + ) : dequeues(r), start(s), end(e) + { + dequeues.add(start, end); + } + + void operator()(const QueuedMessage& message) { + if (message.position >= start && message.position <= end) { + //i.e. message is within the intial range and has not been dequeued, + //so remove it from the dequeues + dequeues.remove(message.position); + } + } + + private: + SequenceSet& dequeues; + const SequenceNumber start; + const SequenceNumber end; +}; + string mask(const string& in) { return DOLLAR + in + INTERNAL; } + +/** Dummy consumer used to get the front position on the queue */ +class GetPositionConsumer : public Consumer +{ + public: + GetPositionConsumer() : + Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {} + bool deliver(broker::QueuedMessage& ) { return true; } + void notify() {} + bool filter(boost::intrusive_ptr<broker::Message>) { return true; } + bool accept(boost::intrusive_ptr<broker::Message>) { return true; } + void cancel() {} + void acknowledged(const broker::QueuedMessage&) {} + bool browseAcquired() const { return true; } + broker::OwnershipToken* getSession() { return 0; } +}; + + +bool ReplicatingSubscription::getNext( + broker::Queue& q, SequenceNumber from, SequenceNumber& result) +{ + boost::shared_ptr<Consumer> c(new GetPositionConsumer); + c->setPosition(from); + if (!q.dispatch(c)) return false; + result = c->getPosition(); + return true; +} + +bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { + // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue. + return getNext(q, 0, front); +} + /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -70,14 +131,47 @@ ReplicatingSubscription::Factory::create( parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->addObserver(rs); - // NOTE: readyPosition must be set _after_ addObserver, so + // NOTE: initialize must be called _after_ addObserver, so // messages can't be enqueued after setting readyPosition // but before registering the observer. - rs->setReadyPosition(); + rs->initialize(); } return rs; } +struct QueueRange { + bool empty; + SequenceNumber front; + SequenceNumber back; + + // FIXME aconway 2012-05-26: fix front calculation + QueueRange(broker::Queue& q) { + back = q.getPosition(); + front = back+1; + empty = !ReplicatingSubscription::getFront(q, front); + } + + QueueRange(const framing::FieldTable args) { + back = args.getAsInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER); + front = back+1; + empty = !args.isSet(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER); + if (!empty) { + front = args.getAsInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER); + if (back < front) + throw InvalidArgumentException("Invalid bounds for backup queue"); + } + } + + /** Consumer position to start consuming from the front */ + SequenceNumber browserStart() { return front-1; } +}; + +ostream& operator<<(ostream& o, const QueueRange& qr) { + + if (qr.front > qr.back) return o << "empty(" << qr.back << ")"; + else return o << "[" << qr.front << "," << qr.back << "]"; +} + ReplicatingSubscription::ReplicatingSubscription( LogPrefix lp, SemanticState* parent, @@ -96,25 +190,50 @@ ReplicatingSubscription::ReplicatingSubscription( dummy(new Queue(mask(name))), ready(false) { - // Separate the remote part from a "local-remote" address for logging. - string address = parent->getSession().getConnection().getUrl(); - size_t i = address.find('-'); - if (i != string::npos) address = address.substr(i+1); - logSuffix = " (" + address + ")"; - - // FIXME aconway 2011-12-09: Failover optimization removed. - // There was code here to re-use messages already on the backup - // during fail-over. This optimization was removed to simplify - // the logic till we get the basic replication stable, it - // can be re-introduced later. Last revision with the optimization: - // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - - // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 - // so we will start consuming from the lowest numbered message. - // This is incorrect if the sequence number wraps around, but - // this is what all consumers currently do. - - QPID_LOG(debug, logPrefix << "Created replicating subscription" << logSuffix); + try { + // FIXME aconway 2012-05-22: use hostname from brokerinfo + // Separate the remote part from a "local-remote" address for logging. + string address = parent->getSession().getConnection().getUrl(); + size_t i = address.find('-'); + if (i != string::npos) address = address.substr(i+1); + logSuffix = " (" + address + ")"; + + QueueRange primary(*queue); + QueueRange backup(arguments); + backupPosition = backup.back; + + // We can re-use some backup messages if backup and primary queues + // overlap and the backup is not missing messages at the front of the queue. + if (!primary.empty && // Primary not empty + !backup.empty && // Backup not empty + primary.front >= backup.front && // Not missing messages at the front + primary.front <= backup.back // Overlap + ) + { + // Remove messages that are still on the primary queue from dequeues + // FIXME aconway 2012-05-22: optimize to iterate only the relevant + // section of the queue + DequeueRemover remover(dequeues, backup.front, backup.back); + queue->eachMessage(remover); + position = std::min(primary.back, backup.back); + } + else { + // Clear the backup queue and reset to start browsing at the + // front of the primary queue. + if (!backup.empty) dequeues.add(backup.front, backup.back); + position = primary.browserStart(); + + } + QPID_LOG(debug, logPrefix << "New backup subscription " << getName() + << " backup range " << backup + << " primary range " << primary + << " position " << position + << " dequeues " << dequeues << logSuffix); + } + catch (const std::exception& e) { + throw Exception(QPID_MSG(logPrefix << "Error setting up replication: " + << e.what() << logSuffix)); + } } ReplicatingSubscription::~ReplicatingSubscription() { @@ -122,39 +241,37 @@ ReplicatingSubscription::~ReplicatingSubscription() { } // Called in subscription's connection thread when the subscription is created. -void ReplicatingSubscription::setReadyPosition() { - // Don't need to lock, this is called only on creation. +void ReplicatingSubscription::initialize() { + sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently + // Send initial dequeues and position to the backup. + // There most be a shared_ptr(this) when sending. + sendDequeueEvent(l); + sendPositionEvent(position, l); + backupPosition = position; + // Set the ready position, all messages after this has been set as an observer. // All messages after this position have been seen by us as QueueObserver. - readyPosition = getQueue()->getPosition(); - // Create a separate subscription to browse the front message on - // the queue so that we can test for queue empty. - boost::shared_ptr<Consumer> c(new GetPositionConsumer); - bool found = getQueue()->dispatch(c); - SequenceNumber front = c->getPosition(); - if (!found || front >= readyPosition) { - // The queue is empty, or has already advanced past the ready position. - QPID_LOG(debug, logPrefix << "backup subscribed, no catch up, at " - << readyPosition << logSuffix); - // Fake lock, only called during creation: - setReady(*(sys::Mutex::ScopedLock*)0); + QueueRange range(*getQueue()); + readyPosition = range.back; + if (range.empty || position >= readyPosition) { + setReady(l); } else { - QPID_LOG(debug, logPrefix << "backup subscribed, catching up " - << front << "-" << readyPosition << logSuffix); + QPID_LOG(debug, logPrefix << "Backup subscription catching up from " + << position << " to " << readyPosition << logSuffix); } } // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver(QueuedMessage& qm) { try { - // Add position events for the subscribed queue, not for the internal event queue. + // Add position events for the subscribed queue, not the internal event queue. if (qm.queue == getQueue().get()) { QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix); { sys::Mutex::ScopedLock l(lock); assert(position == qm.position); - // qm.position is the position of the newly enqueued qm on the local queue. + // qm.position is the position of the newly enqueued qm on local queue. // backupPosition is latest position on backup queue before enqueueing if (qm.position <= backupPosition) throw Exception( @@ -162,9 +279,8 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { << " but got " << qm.position)); if (qm.position - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(qm.position); - --send; // Send the position before qm was enqueued. - sendPositionEvent(send); + // Send the position before qm was enqueued. + sendPositionEvent(qm.position-1, l); } backupPosition = qm.position; } @@ -186,7 +302,6 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } } -// Send a ready event to the backup. void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) { if (ready) return; ready = true; @@ -269,11 +384,13 @@ bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) { + if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues << " from " << getQueue()->getName() << logSuffix); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); + dequeues.clear(); buffer.reset(); sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); } @@ -295,13 +412,14 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber position) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "sending position " << position + if (pos == backupPosition) return; // No need to send. + QPID_LOG(trace, logPrefix << "sending position " << pos << ", was " << backupPosition << logSuffix); - string buf(backupPosition.encodedSize(),'\0'); + string buf(pos.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); - position.encode(buffer); + pos.encode(buffer); buffer.reset(); sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); } @@ -342,10 +460,7 @@ bool ReplicatingSubscription::doDispatch() { { sys::Mutex::ScopedLock l(lock); - if (!dequeues.empty()) { - sendDequeueEvent(l); - dequeues.clear(); - } + if (!dequeues.empty()) sendDequeueEvent(l); } return ConsumerImpl::doDispatch(); } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index b0aff18879..0956d6d503 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -74,6 +74,19 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Argument names for consume command. static const std::string QPID_REPLICATING_SUBSCRIPTION; + static const std::string QPID_HIGH_SEQUENCE_NUMBER; + static const std::string QPID_LOW_SEQUENCE_NUMBER; + + // FIXME aconway 2012-05-23: these don't belong on ReplicatingSubscription + /** Get position of front message on queue. + *@return false if queue is empty. + */ + static bool getFront(broker::Queue&, framing::SequenceNumber& result); + /** Get next message after from in queue. + *@return false if none found. + */ + static bool getNext(broker::Queue&, framing::SequenceNumber from, + framing::SequenceNumber& result); ReplicatingSubscription(LogPrefix, broker::SemanticState* parent, @@ -97,7 +110,10 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool browseAcquired() const { return true; } bool hideDeletedError(); - void setReadyPosition(); + /** Initialization that must be done after construction because it + * requires a shared_ptr to this to exist. + */ + void initialize(); protected: bool doDispatch(); @@ -116,26 +132,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); void sendDequeueEvent(const sys::Mutex::ScopedLock&); - void sendPositionEvent(framing::SequenceNumber); + void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); void setReady(const sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&); - - /** Dummy consumer used to get the front position on the queue */ - class GetPositionConsumer : public Consumer - { - public: - GetPositionConsumer() : - Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {} - bool deliver(broker::QueuedMessage& ) { return true; } - void notify() {} - bool filter(boost::intrusive_ptr<broker::Message>) { return true; } - bool accept(boost::intrusive_ptr<broker::Message>) { return true; } - void cancel() {} - void acknowledged(const broker::QueuedMessage&) {} - bool browseAcquired() const { return true; } - broker::OwnershipToken* getSession() { return 0; } - }; - friend struct Factory; }; |
