summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-06-17 14:19:10 +0000
committerAlan Conway <aconway@apache.org>2013-06-17 14:19:10 +0000
commit1d55ce68b88256a4de0329d3104436b0c581000c (patch)
treee94c576b90ee33caddd96d6efccdbf2bab51080c /qpid/cpp
parent0fb93ca81a3517d66339b3e890282ea4c82546a9 (diff)
downloadqpid-python-1d55ce68b88256a4de0329d3104436b0c581000c.tar.gz
QPID-4348: HA Use independent sequence numbers for identifying messages
Previously HA code used queue sequence numbers to identify messasges. This assumes that message sequence is identical on primary and backup. Implementing new features (for example transactions) requires that we tolerate ordering differences between primary and backups. This patch introduces a new, queue-scoped HA sequence number managed by the HA plugin. The HA ID is set *before* the message is enqueued and assigned a queue sequence number. This means it is possible to identify messages before they are enqueued, e.g. messages in an open transaction. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1493771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/include/qpid/types/Uuid.h11
-rw-r--r--qpid/cpp/src/CMakeLists.txt36
-rw-r--r--qpid/cpp/src/ha.mk18
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Lvq.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h4
-rw-r--r--qpid/cpp/src/qpid/broker/MessageInterceptor.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Observers.h12
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/QueueObserver.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h7
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp32
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h10
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h62
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp31
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h1
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp203
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h19
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp63
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h54
-rw-r--r--qpid/cpp/src/qpid/ha/QueueRange.h78
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp81
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h15
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshot.h68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueSnapshots.h81
-rw-r--r--qpid/cpp/src/qpid/ha/README.md80
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h24
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp289
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h47
-rw-r--r--qpid/cpp/src/qpid/ha/hash.h42
-rw-r--r--qpid/cpp/src/qpid/ha/makeMessage.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/makeMessage.h23
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/types.h26
-rw-r--r--qpid/cpp/src/qpid/types/Uuid.cpp8
-rw-r--r--qpid/cpp/src/tests/DeliveryRecordTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py63
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark6
49 files changed, 1036 insertions, 594 deletions
diff --git a/qpid/cpp/include/qpid/types/Uuid.h b/qpid/cpp/include/qpid/types/Uuid.h
index 02af4c7e7f..5e3ee94db0 100644
--- a/qpid/cpp/include/qpid/types/Uuid.h
+++ b/qpid/cpp/include/qpid/types/Uuid.h
@@ -57,7 +57,7 @@ class QPID_TYPES_CLASS_EXTERN Uuid
/** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb. */
QPID_TYPES_EXTERN std::string str() const;
- QPID_TYPES_EXTERN size_t size() const;
+ QPID_TYPES_EXTERN size_t size() const;
QPID_TYPES_EXTERN const unsigned char* data() const;
friend QPID_TYPES_EXTERN bool operator==(const Uuid&, const Uuid&);
@@ -69,6 +69,14 @@ class QPID_TYPES_CLASS_EXTERN Uuid
friend QPID_TYPES_EXTERN std::ostream& operator<<(std::ostream&, Uuid);
friend QPID_TYPES_EXTERN std::istream& operator>>(std::istream&, Uuid&);
+ /** Hash value suitable for use with unordered_map */
+ size_t hash() const;
+
+ /** Hasher for use with unordered_map */
+ struct Hasher {
+ size_t operator()(const Uuid& u) const { return u.hash(); }
+ };
+
private:
unsigned char bytes[16];
};
@@ -91,4 +99,5 @@ QPID_TYPES_EXTERN std::istream& operator>>(std::istream&, Uuid&);
}} // namespace qpid::types
+
#endif /*!QPID_TYPES_UUID_H*/
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 538b48d31f..03f9fae74e 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -628,16 +628,14 @@ set (ha_default ON)
option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
+ qpid/ha/QueueSnapshot.h
+ qpid/ha/QueueSnapshots.h
qpid/ha/AlternateExchangeSetter.h
- qpid/ha/BackupConnectionExcluder.h
- qpid/ha/BrokerInfo.cpp
- qpid/ha/BrokerInfo.h
- qpid/ha/QueueGuard.cpp
- qpid/ha/QueueGuard.h
- qpid/ha/ReplicationTest.cpp
- qpid/ha/ReplicationTest.h
qpid/ha/Backup.cpp
qpid/ha/Backup.h
+ qpid/ha/BackupConnectionExcluder.h
+ qpid/ha/BrokerInfo.cpp
+ qpid/ha/BrokerInfo.h
qpid/ha/BrokerReplicator.cpp
qpid/ha/BrokerReplicator.h
qpid/ha/ConnectionObserver.cpp
@@ -647,26 +645,32 @@ if (BUILD_HA)
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
- qpid/ha/makeMessage.cpp
- qpid/ha/makeMessage.h
+ qpid/ha/hash.h
+ qpid/ha/IdSetter.h
+ qpid/ha/QueueSnapshot.h
+ qpid/ha/makeMessage.cpp
+ qpid/ha/makeMessage.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
qpid/ha/Primary.h
- qpid/ha/QueueRange.h
+ qpid/ha/QueueGuard.cpp
+ qpid/ha/QueueGuard.h
qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h
+ qpid/ha/RemoteBackup.cpp
+ qpid/ha/RemoteBackup.h
qpid/ha/ReplicatingSubscription.cpp
qpid/ha/ReplicatingSubscription.h
+ qpid/ha/ReplicationTest.cpp
+ qpid/ha/ReplicationTest.h
+ qpid/ha/Role.h
qpid/ha/Settings.h
- qpid/ha/StatusCheck.cpp
- qpid/ha/StatusCheck.h
+ qpid/ha/StandAlone.h
+ qpid/ha/StatusCheck.cpp
+ qpid/ha/StatusCheck.h
qpid/ha/types.cpp
qpid/ha/types.h
- qpid/ha/RemoteBackup.cpp
- qpid/ha/RemoteBackup.h
- qpid/ha/Role.h
- qpid/ha/StandAlone.h
)
add_library (ha MODULE ${ha_SOURCES})
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 763932b2a2..aa3a572286 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -33,11 +33,14 @@ ha_la_SOURCES = \
qpid/ha/BrokerReplicator.h \
qpid/ha/ConnectionObserver.cpp \
qpid/ha/ConnectionObserver.h \
- qpid/ha/FailoverExchange.cpp \
- qpid/ha/FailoverExchange.h \
+ qpid/ha/FailoverExchange.cpp \
+ qpid/ha/FailoverExchange.h \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
+ qpid/ha/hash.h \
+ qpid/ha/IdSetter.h \
+ qpid/ha/QueueSnapshot.h \
qpid/ha/makeMessage.cpp \
qpid/ha/makeMessage.h \
qpid/ha/Membership.cpp \
@@ -46,20 +49,21 @@ ha_la_SOURCES = \
qpid/ha/Primary.h \
qpid/ha/QueueGuard.cpp \
qpid/ha/QueueGuard.h \
- qpid/ha/QueueRange.h \
qpid/ha/QueueReplicator.cpp \
qpid/ha/QueueReplicator.h \
+ qpid/ha/QueueSnapshot.h \
+ qpid/ha/QueueSnapshots.h \
+ qpid/ha/RemoteBackup.cpp \
+ qpid/ha/RemoteBackup.h \
qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/ReplicatingSubscription.h \
qpid/ha/ReplicationTest.cpp \
qpid/ha/ReplicationTest.h \
+ qpid/ha/Role.h \
qpid/ha/Settings.h \
+ qpid/ha/StandAlone.h \
qpid/ha/StatusCheck.cpp \
qpid/ha/StatusCheck.h \
- qpid/ha/RemoteBackup.cpp \
- qpid/ha/RemoteBackup.h \
- qpid/ha/Role.h \
- qpid/ha/StandAlone.h \
qpid/ha/types.cpp \
qpid/ha/types.h
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index f547ee54c9..14a2e94571 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -35,6 +35,7 @@ using std::string;
DeliveryRecord::DeliveryRecord(const QueueCursor& _msg,
framing::SequenceNumber _msgId,
+ framing::SequenceNumber _replicationId,
const Queue::shared_ptr& _queue,
const std::string& _tag,
const boost::shared_ptr<Consumer>& _consumer,
@@ -52,7 +53,8 @@ DeliveryRecord::DeliveryRecord(const QueueCursor& _msg,
ended(accepted && acquired),
windowing(_windowing),
credit(_credit),
- msgId(_msgId)
+ msgId(_msgId),
+ replicationId(_replicationId)
{}
bool DeliveryRecord::setEnded()
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 10436f3fa0..37ce8dc709 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -68,9 +68,12 @@ class DeliveryRecord
*/
uint32_t credit;
framing::SequenceNumber msgId;
+ framing::SequenceNumber replicationId;
public:
- QPID_BROKER_EXTERN DeliveryRecord(const QueueCursor& msgCursor, framing::SequenceNumber msgId,
+ QPID_BROKER_EXTERN DeliveryRecord(const QueueCursor& msgCursor,
+ framing::SequenceNumber msgId,
+ framing::SequenceNumber replicationId,
const boost::shared_ptr<Queue>& queue,
const std::string& tag,
const boost::shared_ptr<Consumer>& consumer,
@@ -111,6 +114,7 @@ class DeliveryRecord
const QueueCursor& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
framing::SequenceNumber getMessageId() const { return msgId; }
+ framing::SequenceNumber getReplicationId() const { return replicationId; }
boost::shared_ptr<Queue> getQueue() const { return queue; }
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
diff --git a/qpid/cpp/src/qpid/broker/Lvq.cpp b/qpid/cpp/src/qpid/broker/Lvq.cpp
index d71c64d2fb..89a47bb14e 100644
--- a/qpid/cpp/src/qpid/broker/Lvq.cpp
+++ b/qpid/cpp/src/qpid/broker/Lvq.cpp
@@ -38,6 +38,7 @@ void Lvq::push(Message& message, bool isRecovery)
{
qpid::sys::Mutex::ScopedLock locker(messageLock);
message.setSequence(++sequence);
+ interceptors.publish(message);
removed = messageMap.update(message, old);
listeners.populate(copy);
observeEnqueue(message, locker);
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 1b3f3bd827..c14f534826 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -42,12 +42,18 @@ using std::string;
namespace qpid {
namespace broker {
-Message::Message() : deliveryCount(-1), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) {}
+Message::Message() : deliveryCount(-1), publisher(0), expiration(FAR_FUTURE), timestamp(0),
+ isManagementMessage(false), replicationId(0)
+{}
+
Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
- : encoding(e), persistentContext(p), deliveryCount(-1), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false)
+ : encoding(e), persistentContext(p), deliveryCount(-1), publisher(0),
+ expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false),
+ replicationId(0)
{
if (persistentContext) persistentContext->setIngressCompletion(e);
}
+
Message::~Message() {}
@@ -308,4 +314,9 @@ void Message::processProperties(MapHandler& handler) const
encoding->processProperties(handler);
}
+uint64_t Message::getReplicationId() const { return replicationId; }
+
+void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
+
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 64103ecac9..85926f65fb 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -131,6 +131,9 @@ public:
QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const;
QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
+ QPID_BROKER_EXTERN uint64_t getReplicationId() const;
+ QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
+
private:
boost::intrusive_ptr<Encoding> encoding;
boost::intrusive_ptr<PersistableMessage> persistentContext;
@@ -143,6 +146,7 @@ public:
bool isManagementMessage;
MessageState state;
qpid::framing::SequenceNumber sequence;
+ framing::SequenceNumber replicationId;
void annotationsChanged();
};
diff --git a/qpid/cpp/src/qpid/broker/MessageInterceptor.h b/qpid/cpp/src/qpid/broker/MessageInterceptor.h
index 0d0bc93f06..3ada86b6f7 100644
--- a/qpid/cpp/src/qpid/broker/MessageInterceptor.h
+++ b/qpid/cpp/src/qpid/broker/MessageInterceptor.h
@@ -37,12 +37,17 @@ class MessageInterceptor
public:
virtual ~MessageInterceptor() {}
+ /** Modify a message before it is recorded in durable store */
+ virtual void record(Message&) {}
/** Modify a message as it is being published onto the queue. */
- virtual void publish(Message&) = 0;
+ virtual void publish(Message&) {}
};
class MessageInterceptors : public Observers<MessageInterceptor> {
public:
+ void record(Message& m) {
+ each(boost::bind(&MessageInterceptor::record, _1, boost::ref(m)));
+ }
void publish(Message& m) {
each(boost::bind(&MessageInterceptor::publish, _1, boost::ref(m)));
}
diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h
index c62f75d6d0..d50c21e559 100644
--- a/qpid/cpp/src/qpid/broker/Observers.h
+++ b/qpid/cpp/src/qpid/broker/Observers.h
@@ -48,12 +48,6 @@ class Observers
observers.erase(i);
}
- protected:
- typedef std::vector<boost::shared_ptr<Observer> > List;
-
- sys::Mutex lock;
- List observers;
-
template <class F> void each(F f) {
List copy;
{
@@ -62,6 +56,12 @@ class Observers
}
std::for_each(copy.begin(), copy.end(), f);
}
+
+ protected:
+ typedef std::vector<boost::shared_ptr<Observer> > List;
+
+ sys::Mutex lock;
+ List observers;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index e068ce6fe4..3b68d4117b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -285,9 +285,9 @@ void Queue::deliverTo(Message msg, TxBuffer* txn)
} else {
if (enqueue(0, msg)) {
push(msg);
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
+ QPID_LOG(debug, "Message " << msg.getSequence() << " enqueued on " << name);
} else {
- QPID_LOG(debug, "Message " << msg << " dropped from " << name);
+ QPID_LOG(debug, "Message " << msg.getSequence() << " dropped from " << name);
}
}
}
@@ -415,7 +415,8 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
if (c->filter(*msg)) {
if (c->accept(*msg)) {
if (c->preAcquires()) {
- QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState());
+ QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence()
+ << " from '" << name << "' with state " << msg->getState());
if (allocator->acquire(c->getName(), *msg)) {
if (mgmtObject) {
mgmtObject->inc_acquires();
@@ -825,6 +826,7 @@ void Queue::setLastNodeFailure()
*/
bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
+ interceptors.record(msg);
ScopedUse u(barrier);
if (!u.acquired) return false;
diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h
index 2ba98f6945..6d29f83721 100644
--- a/qpid/cpp/src/qpid/broker/QueueObserver.h
+++ b/qpid/cpp/src/qpid/broker/QueueObserver.h
@@ -70,7 +70,6 @@ class QueueObserver
virtual void consumerAdded( const Consumer& ) {};
virtual void consumerRemoved( const Consumer& ) {};
virtual void destroy() {};
- private:
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 1c74ff7325..a42ed883a9 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -360,7 +360,7 @@ bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message
{
allocateCredit(msg);
boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
- DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+ DeliveryRecord record(cursor, msg.getSequence(), msg.getReplicationId(), queue, getTag(),
consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 6d64bf2c82..e28ca1fa6a 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -55,13 +55,7 @@ Backup::Backup(HaBroker& hb, const Settings& s) :
statusCheck(
new StatusCheck(
logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
-{
- // Set link properties to tag outgoing links.
- framing::FieldTable linkProperties = broker.getLinkClientProperties();
- linkProperties.setTable(
- ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
- broker.setLinkClientProperties(linkProperties);
-}
+{}
void Backup::setBrokerUrl(const Url& brokers) {
if (brokers.empty()) return;
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 82e8faf2c6..80e023c11c 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -92,7 +92,7 @@ void BrokerInfo::assign(const Variant::Map& m) {
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
- o << b.getSystemId().str().substr(0,7);
+ o << b.getSystemId().str().substr(0,8);
if (b.getAddress() != empty) o << "@" << b.getAddress();
o << "(" << printable(b.getStatus()) << ")";
return o;
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index c690ea86f7..c9324ce0ca 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -27,6 +27,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/types/Uuid.h"
#include "qpid/types/Variant.h"
+#include "qpid/sys/unordered_map.h"
#include <string>
#include <iosfwd>
#include <vector>
@@ -41,7 +42,7 @@ class BrokerInfo
{
public:
typedef std::set<BrokerInfo> Set;
- typedef std::map<types::Uuid, BrokerInfo> Map;
+ typedef qpid::sys::unordered_map<types::Uuid, BrokerInfo, types::Uuid::Hasher> Map;
BrokerInfo();
BrokerInfo(const types::Uuid& id, BrokerStatus, const Address& = Address());
@@ -50,9 +51,9 @@ class BrokerInfo
types::Uuid getSystemId() const { return systemId; }
BrokerStatus getStatus() const { return status; }
- Address getAddress() const { return address; }
-
void setStatus(BrokerStatus s) { status = s; }
+ Address getAddress() const { return address; }
+ void setAddress(const Address& a) { address = a; }
framing::FieldTable asFieldTable() const;
types::Variant::Map asMap() const;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index fa8f330f27..81b0cd6413 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -124,6 +124,8 @@ const string BROKER("broker");
const string MEMBERS("members");
const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+const string COLON(":");
+
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
{
@@ -282,6 +284,14 @@ class BrokerReplicator::UpdateTracker {
ReplicationTest repTest;
};
+namespace {
+template <class EventType> std::string key() {
+ pair<string,string> name = EventType::getFullName();
+ return name.first + COLON + name.second;
+}
+}
+
+
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(NONE),
@@ -298,14 +308,14 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
- dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
- dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
- dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare;
- dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete;
- dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind;
- dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind;
- dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate;
- dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe;
+ dispatch[key<EventQueueDeclare>()] = &BrokerReplicator::doEventQueueDeclare;
+ dispatch[key<EventQueueDelete>()] = &BrokerReplicator::doEventQueueDelete;
+ dispatch[key<EventExchangeDeclare>()] = &BrokerReplicator::doEventExchangeDeclare;
+ dispatch[key<EventExchangeDelete>()] = &BrokerReplicator::doEventExchangeDelete;
+ dispatch[key<EventBind>()] = &BrokerReplicator::doEventBind;
+ dispatch[key<EventUnbind>()] = &BrokerReplicator::doEventUnbind;
+ dispatch[key<EventMembersUpdate>()] = &BrokerReplicator::doEventMembersUpdate;
+ dispatch[key<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe;
}
void BrokerReplicator::initialize() {
@@ -402,7 +412,7 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
//subscribe to the queue
FieldTable arguments;
- arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
peer.getMessage().subscribe(
queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
false/*exclusive*/, "", 0, arguments);
@@ -439,7 +449,9 @@ void BrokerReplicator::route(Deliverable& msg) {
QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+ std::string key = (schema[PACKAGE_NAME].asString() +
+ COLON +
+ schema[CLASS_NAME].asString());
EventDispatchMap::iterator j = dispatch.find(key);
if (j != dispatch.end()) (this->*(j->second))(values);
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 03df5fec6b..f93e25cb81 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -29,6 +29,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include "qpid/management/ManagementObject.h"
+#include "qpid/sys/unordered_map.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <set>
@@ -88,11 +89,10 @@ class BrokerReplicator : public broker::Exchange,
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
- typedef std::pair<std::string,std::string> EventKey;
typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
- typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+ typedef qpid::sys::unordered_map<std::string, DispatchFunction> EventDispatchMap;
- typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+ typedef qpid::sys::unordered_map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
class UpdateTracker;
class ErrorListener;
@@ -152,7 +152,6 @@ class BrokerReplicator : public broker::Exchange,
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
- typedef std::set<std::string> StringSet;
broker::Connection* connection;
EventDispatchMap dispatch;
std::auto_ptr<UpdateTracker> queueTracker;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 76be46a92b..66e841e988 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -65,6 +65,11 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
return observer;
}
+void ConnectionObserver::reset() {
+ sys::Mutex::ScopedLock l(lock);
+ observer.reset();
+}
+
bool ConnectionObserver::isSelf(const broker::Connection& connection) {
BrokerInfo info;
return getBrokerInfo(connection, info) && info.getSystemId() == self;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index 57ff5b1086..079dc43be6 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver
void setObserver(const ObserverPtr&, const std::string& logPrefix);
ObserverPtr getObserver();
+ void reset();
+
void opened(broker::Connection& connection);
void closed(broker::Connection& connection);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 3f29949aca..7efeaad5b2 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -27,7 +27,10 @@
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "StandAlone.h"
+#include "QueueSnapshot.h"
+#include "QueueSnapshots.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/assert.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -65,7 +68,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
membership(BrokerInfo(systemId, STANDALONE), *this),
- failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
+ failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)),
+ queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
@@ -77,6 +81,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
+ // QueueSnapshots are needed for standalone replication as well as cluster.
+ broker.getConfigurationObservers().add(queueSnapshots);
}
namespace {
@@ -86,8 +92,10 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; }
// Called in Plugin::initialize
void HaBroker::initialize() {
- if (settings.cluster) membership.setStatus(JOINING);
- QPID_LOG(notice, "Initializing: " << membership.getInfo());
+ if (settings.cluster) {
+ membership.setStatus(JOINING);
+ QPID_LOG(notice, "Initializing HA broker: " << membership.getInfo());
+ }
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -103,7 +111,7 @@ void HaBroker::initialize() {
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
shared_ptr<ReplicatingSubscription::Factory>(
- new ReplicatingSubscription::Factory()));
+ new ReplicatingSubscription::Factory(*this)));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
@@ -205,9 +213,12 @@ BrokerStatus HaBroker::getStatus() const {
void HaBroker::setAddress(const Address& a) {
QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
- BrokerInfo b(membership.getSelf(), membership.getStatus(), a);
- membership.add(b);
+ membership.setAddress(a);
}
+boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::string& queueName) {
+ return boost::dynamic_pointer_cast<QueueReplicator>(
+ broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
+}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index a214d2acd3..8e5d30acfb 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -54,6 +54,10 @@ class Backup;
class ConnectionObserver;
class Primary;
class Role;
+class QueueSnapshot;
+class QueueSnapshots;
+class QueueReplicator;
+
/**
* HA state and actions associated with a HA broker. Holds all the management info.
*
@@ -93,8 +97,11 @@ class HaBroker : public management::Manageable
void setAddress(const Address&); // set self address from a self-connection
- private:
+ boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; }
+ boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
+
+ private:
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -117,6 +124,7 @@ class HaBroker : public management::Manageable
boost::shared_ptr<Role> role;
Membership membership;
boost::shared_ptr<FailoverExchange> failoverExchange;
+ boost::shared_ptr<QueueSnapshots> queueSnapshots;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
new file mode 100644
index 0000000000..a56dbc810a
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -0,0 +1,62 @@
+#ifndef QPID_HA_IDSETTER_H
+#define QPID_HA_IDSETTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageInterceptor.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicValue.h"
+
+
+namespace qpid {
+namespace ha {
+
+/**
+ * A MessageInterceptor that sets the ReplicationId on each message as it is
+ * enqueued on a primary queue.
+ *
+ * THREAD UNSAFE: Called sequentially under the queue lock.
+ */
+class IdSetter : public broker::MessageInterceptor
+{
+ public:
+ IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
+ QPID_LOG(trace, "Initial replication ID for " << name << " is " << nextId.get());
+ }
+
+ void record(broker::Message& m) {
+ m.setReplicationId(nextId++);
+ QPID_LOG(trace, "Recorded replication ID " << m.getReplicationId() << " on " << name);
+ }
+
+ private:
+ sys::AtomicValue<uint32_t> nextId;
+ std::string name;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_IDSETTER_H*/
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 6c64d86fd7..6af9b6e6d8 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -18,9 +18,11 @@
* under the License.
*
*/
-#include "Membership.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "Membership.h"
#include "qpid/broker/Broker.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
@@ -109,11 +111,25 @@ bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
- Variant::List brokers = asList();
+ // Update managment and send update event.
+ Variant::List brokerList = asList();
if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
- if (mgmtObject) mgmtObject->set_members(brokers);
+ if (mgmtObject) mgmtObject->set_members(brokerList);
haBroker.getBroker().getManagementAgent()->raiseEvent(
- _qmf::EventMembersUpdate(brokers));
+ _qmf::EventMembersUpdate(brokerList));
+
+ // Update link client properties
+ framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties();
+ if (isBackup(getStatus(l))) {
+ // Set backup tag on outgoing link properties.
+ linkProperties.setTable(
+ ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable());
+ haBroker.getBroker().setLinkClientProperties(linkProperties);
+ } else {
+ // Remove backup tag property from outgoing link properties.
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ haBroker.getBroker().setLinkClientProperties(linkProperties);
+ }
}
void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
@@ -178,5 +194,10 @@ BrokerInfo Membership::getInfo() const {
return i->second;
}
-// FIXME aconway 2013-01-23: move to .h?
+void Membership::setAddress(const Address& a) {
+ Mutex::ScopedLock l(lock);
+ brokers[self].setAddress(a);
+ update(l);
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 956569fbd8..7069e79b7f 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -78,6 +78,7 @@ class Membership
BrokerInfo getInfo() const;
BrokerStatus getStatus() const;
void setStatus(BrokerStatus s);
+ void setAddress(const Address&);
private:
void update(sys::Mutex::ScopedLock&);
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 61cdced1ba..30a0ac5da9 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -22,9 +22,11 @@
#include "HaBroker.h"
#include "Primary.h"
#include "ReplicationTest.h"
+#include "IdSetter.h"
#include "ReplicatingSubscription.h"
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
+#include "QueueReplicator.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/ConfigurationObserver.h"
@@ -41,6 +43,7 @@ namespace qpid {
namespace ha {
using sys::Mutex;
+using boost::shared_ptr;
using namespace std;
using namespace framing;
@@ -87,6 +90,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
replicationTest(hb.getSettings().replicateDefault.get())
{
hb.getMembership().setStatus(RECOVERING);
+ broker::QueueRegistry& queues = hb.getBroker().getQueues();
+ queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
@@ -101,25 +106,16 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
- backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
+ setCatchupQueues(backup, true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout);
timerTask = new ExpectedBackupTimerTask(*this, deadline);
hb.getBroker().getTimer().add(timerTask);
}
-
-
- // Remove backup tag property from outgoing link properties.
- framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
- linkProperties.erase(ConnectionObserver::BACKUP_TAG);
- hb.getBroker().setLinkClientProperties(linkProperties);
-
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
-
- Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
- checkReady(l);
+ checkReady(); // Outside lock
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
@@ -129,29 +125,48 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
+ haBroker.getObserver()->reset();
+}
+
+void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
+ if (replicationTest.useLevel(*q) == ALL) {
+ boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
+ ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
+ q->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
+ }
}
-void Primary::checkReady(Mutex::ScopedLock&) {
- if (!active && expectedBackups.empty()) {
- active = true;
- Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
+void Primary::checkReady() {
+ bool activate = false;
+ {
+ Mutex::ScopedLock l(lock);
+ if (!active && expectedBackups.empty())
+ activate = active = true;
+ }
+ if (activate) {
QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
- membership.setStatus(ACTIVE);
+ membership.setStatus(ACTIVE); // Outside of lock.
}
}
-void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
- if (i != backups.end() && i->second->reportReady()) {
- BrokerInfo info = i->second->getBrokerInfo();
- info.setStatus(READY);
- membership.add(info);
- if (expectedBackups.erase(i->second)) {
- QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
- checkReady(l);
- }
+void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
+ bool ready = false;
+ {
+ Mutex::ScopedLock l(lock);
+ if (backup->reportReady()) {
+ BrokerInfo info = backup->getBrokerInfo();
+ info.setStatus(READY);
+ membership.add(info);
+ if (expectedBackups.erase(backup)) {
+ QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+ ready = true;
+ }
else
QPID_LOG(info, logPrefix << "New backup is ready: " << info);
+ }
}
+ if (ready) checkReady(); // Outside lock
}
void Primary::timeoutExpectedBackups() {
@@ -162,35 +177,39 @@ void Primary::timeoutExpectedBackups() {
// Allow backups that are connected to continue becoming ready.
for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
{
- boost::shared_ptr<RemoteBackup> rb = *i;
- if (!rb->isConnected()) {
- BrokerInfo info = rb->getBrokerInfo();
+ // This loop erases elements of backups in backupDisconnect, so
+ // save and increment the iterator.
+ BackupSet::iterator j = i++;
+ boost::shared_ptr<RemoteBackup> backup = *j;
+ if (!backup->getConnection()) {
+ BrokerInfo info = backup->getBrokerInfo();
QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
- expectedBackups.erase(i++);
- backups.erase(info.getSystemId());
- rb->cancel();
- // Downgrade the broker's status to CATCHUP
+ backupDisconnect(backup, l); // Calls erase(j)
+ // Keep broker in membership but downgrade status to CATCHUP.
// The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
membership.add(info);
}
- else ++i;
}
- checkReady(l);
}
catch(const std::exception& e) {
QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what());
// No-where for this exception to go.
}
+ checkReady();
}
void Primary::readyReplica(const ReplicatingSubscription& rs) {
- sys::Mutex::ScopedLock l(lock);
- BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
- if (i != backups.end()) {
- i->second->ready(rs.getQueue());
- checkReady(i, l);
+ shared_ptr<RemoteBackup> backup;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
+ if (i != backups.end()) {
+ backup = i->second;
+ backup->ready(rs.getQueue());
+ }
}
+ if (backup) checkReady(backup);
}
// NOTE: Called with queue registry lock held.
@@ -201,23 +220,28 @@ void Primary::queueCreate(const QueuePtr& q) {
<< " replication: " << printable(level));
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
- // Give each queue a unique id to avoid confusion of same-named queues.
+ initializeQueue(q);
+ // Give each queue a unique id. Used by backups to avoid confusion of
+ // same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
- i->second->queueCreate(q);
- checkReady(i, l);
+ {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+ i->second->queueCreate(q);
}
+ checkReady(); // Outside lock
}
}
// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
- QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
- i->second->queueDestroy(q);
- checkReady(l);
+ QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
+ {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+ i->second->queueDestroy(q);
+ }
+ checkReady(); // Outside lock
}
// NOTE: Called with exchange registry lock held.
@@ -240,56 +264,88 @@ void Primary::exchangeDestroy(const ExchangePtr& ex) {
// Do nothing
}
+// New backup connected
+shared_ptr<RemoteBackup> Primary::backupConnect(
+ const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
+{
+ shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+ backups[info.getSystemId()] = backup;
+ return backup;
+}
+
+// Remove a backup. Caller should not release the shared pointer returend till
+// outside the lock.
+void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) {
+ types::Uuid id = backup->getBrokerInfo().getSystemId();
+ backup->cancel();
+ expectedBackups.erase(backup);
+ backups.erase(id);
+}
+
+
void Primary::opened(broker::Connection& connection) {
BrokerInfo info;
+ shared_ptr<RemoteBackup> backup;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
- QPID_LOG(info, logPrefix << "New backup connected: " << info);
- boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
- {
- // Avoid deadlock with queue registry lock.
- Mutex::ScopedUnlock u(lock);
- backup->setCatchupQueues(haBroker.getBroker().getQueues(), false);
- }
- backups[info.getSystemId()] = backup;
- i = backups.find(info.getSystemId());
+ QPID_LOG(info, logPrefix << "New backup connection: " << info);
+ backup = backupConnect(info, connection, l);
}
- else {
- QPID_LOG(info, logPrefix << "Known backup connected: " << info);
+ else if (i->second->getConnection()) {
+ // The backup is failing over before we recieved the closed() call
+ // for its previous connection. Remove the old entry and create a new one.
+ QPID_LOG(error, logPrefix << "Known backup reconnect before disconnection: " << info);
+ backupDisconnect(i->second, l);
+ backup = backupConnect(info, connection, l);
+ } else {
+ QPID_LOG(info, logPrefix << "Known backup reconnection: " << info);
i->second->setConnection(&connection);
}
if (info.getStatus() == JOINING) {
info.setStatus(CATCHUP);
membership.add(info);
}
- if (i != backups.end()) checkReady(i, l);
}
else
- QPID_LOG(debug, logPrefix << "Accepted client connection "
- << connection.getMgmtId());
+ QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId());
+
+ // Outside lock
+ if (backup) {
+ setCatchupQueues(backup, false);
+ checkReady(backup);
+ }
+ checkReady();
}
void Primary::closed(broker::Connection& connection) {
BrokerInfo info;
+ shared_ptr<RemoteBackup> backup;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
// NOTE: It is possible for a backup connection to be rejected while we
// are a backup, but closed() is called after we have become primary.
// Checking isConnected() lets us ignore such spurious closes.
- if (i != backups.end() && i->second->isConnected()) {
- QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
- membership.remove(info.getSystemId());
- expectedBackups.erase(i->second);
- backups.erase(i);
- checkReady(l);
+ if (i == backups.end()) {
+ QPID_LOG(info, "Disconnect from unknown backup " << info);
+ }
+ else if (i->second->getConnection() != &connection) {
+ QPID_LOG(info, logPrefix << "Late disconnect from backup " << info);
+ }
+ else {
+ QPID_LOG(info, logPrefix << "Disconnect from "
+ << (i->second->getConnection() ? "" : "disconnected ")
+ << "backup " << info);
+ // Assign to shared_ptr so it will be deleted after we release the lock.
+ backup = i->second;
+ backupDisconnect(backup, l);
}
}
+ checkReady();
}
-
boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
{
Mutex::ScopedLock l(lock);
@@ -302,4 +358,11 @@ Role* Primary::promote() {
return 0;
}
+void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) {
+ // Do queue iteration outside the lock to avoid deadlocks with QueueRegistry.
+ haBroker.getBroker().getQueues().eachQueue(
+ boost::bind(&RemoteBackup::catchupQueue, backup, _1, createGuards));
+ backup->startCatchup();
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index ff85837882..3e1ec48ce6 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -27,9 +27,9 @@
#include "ReplicationTest.h"
#include "Role.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/unordered_map.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <map>
#include <string>
namespace qpid {
@@ -64,6 +64,7 @@ class Primary : public Role
public:
typedef boost::shared_ptr<broker::Queue> QueuePtr;
typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
+ typedef boost::shared_ptr<RemoteBackup> RemoteBackupPtr;
static Primary* get() { return instance; }
@@ -94,11 +95,18 @@ class Primary : public Role
void timeoutExpectedBackups();
private:
- typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap;
- typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet;
+ typedef qpid::sys::unordered_map<
+ types::Uuid, RemoteBackupPtr, types::Uuid::Hasher > BackupMap;
- void checkReady(sys::Mutex::ScopedLock&);
- void checkReady(BackupMap::iterator, sys::Mutex::ScopedLock&);
+ typedef std::set<RemoteBackupPtr > BackupSet;
+
+ RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
+ void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
+
+ void initializeQueue(boost::shared_ptr<broker::Queue>);
+ void checkReady();
+ void checkReady(RemoteBackupPtr);
+ void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
sys::Mutex lock;
HaBroker& haBroker;
@@ -120,7 +128,6 @@ class Primary : public Role
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
-
static Primary* instance;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index d06d88ca29..ac9df05937 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -19,7 +19,7 @@
*
*/
#include "QueueGuard.h"
-#include "ReplicatingSubscription.h"
+#include "BrokerInfo.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/QueueObserver.h"
@@ -32,8 +32,6 @@ namespace ha {
using namespace broker;
using sys::Mutex;
-using framing::SequenceNumber;
-using framing::SequenceSet;
class QueueGuard::QueueObserver : public broker::QueueObserver
{
@@ -50,15 +48,19 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : cancelled(false), queue(q), subscription(0)
+ : cancelled(false), queue(q)
{
std::ostringstream os;
- os << "Primary guard " << queue.getName() << "@" << info << ": ";
+ os << "Guard of " << queue.getName() << " at " << info << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
- // Set range after addObserver so we know that range.back+1 is a guarded position.
- range = QueueRange(q);
+ // Set first after calling addObserver so we know that the back of the
+ // queue+1 is (or will be) a guarded position.
+ QueuePosition front, back;
+ q.getRange(front, back, broker::REPLICATOR);
+ first = back + 1;
+ QPID_LOG(debug, logPrefix << "First guarded position " << first);
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -66,20 +68,20 @@ QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+ ReplicationId id = m.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m));
Mutex::ScopedLock l(lock);
if (cancelled) return; // Don't record enqueues after we are cancelled.
- assert(delayed.find(m.getSequence()) == delayed.end());
- delayed[m.getSequence()] = m.getIngressCompletion();
+ delayed[id] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
}
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
- QPID_LOG(trace, logPrefix << "Dequeued " << m);
+ ReplicationId id = m.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Dequeued " << LogMessageId(queue, m));
Mutex::ScopedLock l(lock);
- if (subscription) subscription->dequeued(m);
- complete(m.getSequence(), l);
+ complete(id, l);
}
void QueueGuard::cancel() {
@@ -87,48 +89,31 @@ void QueueGuard::cancel() {
Mutex::ScopedLock l(lock);
if (cancelled) return;
cancelled = true;
- for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
- complete(i, l);
- delayed.erase(i++);
- }
+ while (!delayed.empty()) complete(delayed.begin(), l);
}
-void QueueGuard::attach(ReplicatingSubscription& rs) {
+bool QueueGuard::complete(ReplicationId id) {
Mutex::ScopedLock l(lock);
- subscription = &rs;
+ return complete(id, l);
}
-bool QueueGuard::subscriptionStart(SequenceNumber position) {
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- Mutex::ScopedLock l(lock);
- Delayed::iterator i = delayed.begin();
- while(i != delayed.end() && i->first <= position) {
- complete(i, l);
- delayed.erase(i++);
- }
- return position >= range.back;
-}
-
-void QueueGuard::complete(SequenceNumber sequence) {
- Mutex::ScopedLock l(lock);
- complete(sequence, l);
-}
-
-void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+bool QueueGuard::complete(ReplicationId id, Mutex::ScopedLock& l) {
// The same message can be completed twice, by
// ReplicatingSubscription::acknowledged and dequeued. Remove it
// from the map so we only call finishCompleter() once
- Delayed::iterator i = delayed.find(sequence);
+ Delayed::iterator i = delayed.find(id);
if (i != delayed.end()) {
complete(i, l);
- delayed.erase(i);
+ return true;
}
+ return false;
}
void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
QPID_LOG(trace, logPrefix << "Completed " << i->first);
i->second->finishCompleter();
+ delayed.erase(i);
}
+
}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index e7ceb351e8..e41a92c74f 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -23,12 +23,12 @@
*/
#include "types.h"
-#include "QueueRange.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/SequenceSet.h"
+#include "hash.h"
#include "qpid/types/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/unordered_map.h"
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <deque>
#include <set>
@@ -37,6 +37,7 @@ namespace broker {
class Queue;
struct QueuedMessage;
class Message;
+class AsyncCompletion;
}
namespace ha {
@@ -53,10 +54,8 @@ class ReplicatingSubscription;
*
* THREAD SAFE: Concurrent calls:
* - enqueued() via QueueObserver in arbitrary connection threads.
- * - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ * - cancel(), complete() from ReplicatingSubscription in subscription thread.
*
- * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
- * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class QueueGuard {
public:
@@ -73,54 +72,35 @@ class QueueGuard {
*/
void dequeued(const broker::Message&);
- /** Complete a delayed message. */
- void complete(framing::SequenceNumber);
+ /** Complete a delayed message.
+ *@return true if the ID was delayed
+ */
+ bool complete(ReplicationId);
/** Complete all delayed messages. */
void cancel();
- void attach(ReplicatingSubscription&);
-
- /**
- * Return the un-guarded queue range at the time the QueueGuard was created.
- *
- * The first position guaranteed to be protected by the guard is
- * getRange().getBack()+1. It is possible that the guard has protected some
- * messages before that point. Any such messages are dealt with in subscriptionStart
- *
- * The QueueGuard is created in 3 situations
- * - when a backup is promoted, guards are created for expected backups.
- * - when a new queue is created on the primary
- * - when a new backup joins.
- *
- * In the last situation the queue is active while the guard is being
- * created.
- *
- */
- const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
-
- /** Inform the guard of the stating position for the attached subscription.
- * Complete messages that will not be seen by the subscription.
- *@return true if the subscription has already advanced to a guarded position.
+ /** Return the first known guarded position on the queue. It is possible
+ * that the guard has seen a few messages before this point.
*/
- bool subscriptionStart(framing::SequenceNumber position);
+ QueuePosition getFirst() const { return first; } // Thread safe: Immutable.
private:
class QueueObserver;
- typedef std::map<framing::SequenceNumber,
- boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+ typedef qpid::sys::unordered_map<ReplicationId,
+ boost::intrusive_ptr<broker::AsyncCompletion>,
+ TrivialHasher<ReplicationId> > Delayed;
- void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+ bool complete(ReplicationId, sys::Mutex::ScopedLock &);
void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
sys::Mutex lock;
+ QueuePosition first;
bool cancelled;
std::string logPrefix;
broker::Queue& queue;
Delayed delayed;
- ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
- QueueRange range;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h
deleted file mode 100644
index f67ac146e6..0000000000
--- a/qpid/cpp/src/qpid/ha/QueueRange.h
+++ /dev/null
@@ -1,78 +0,0 @@
-#ifndef QPID_HA_QUEUERANGE_H
-#define QPID_HA_QUEUERANGE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ReplicatingSubscription.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueCursor.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/SequenceNumber.h"
-#include <iostream>
-
-namespace qpid {
-namespace ha {
-
-/**
- * Get the front/back range of a queue or from a ReplicatingSubscription arguments table.
- *
- * The *back* of the queue is the position of the latest (most recently pushed)
- * message on the queue or, if the queue is empty, the back is n-1 where n is
- * the position that will be assigned to the next message pushed onto the queue.
- *
- * The *front* of the queue is the position of the oldest (next to be consumed) message
- * on the queue or, if the queue is empty, it is the position that will be occupied
- * by the next message pushed onto the queue.
- *
- * This leads to the slightly surprising conclusion that for an empty queue
- * front = back+1
- */
-struct QueueRange {
- public:
- framing::SequenceNumber front, back;
-
- QueueRange() : front(1), back(0) { } // Empty range.
-
- QueueRange(broker::Queue& q) { q.getRange(front, back, broker::REPLICATOR); }
-
- QueueRange(const framing::FieldTable& args) {
- back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
- front = back+1;
- if (args.isSet(ReplicatingSubscription::QPID_FRONT))
- front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
- if (back+1 < front)
- throw Exception(QPID_MSG("Invalid range [" << front << "," << back <<"]"));
- }
-
- bool empty() const { return front == back+1; }
-};
-
-
-inline std::ostream& operator<<(std::ostream& o, const QueueRange& qr) {
- if (qr.front > qr.back) return o << "[-," << qr.back << "]";
- else return o << "[" << qr.front << "," << qr.back << "]";
-}
-
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_QUEUERANGE_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 946831319c..4c3c209eab 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,8 +19,10 @@
*
*/
+#include "makeMessage.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
+#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "qpid/broker/Bridge.h"
@@ -31,10 +33,10 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
-#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/Msg.h"
+#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
namespace {
@@ -51,7 +53,7 @@ using namespace std;
using sys::Mutex;
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position");
+const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
@@ -107,10 +109,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
- logPrefix("Backup queue "+q->getName()+": "),
+ logPrefix("Backup of "+q->getName()+": "),
queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
- settings(hb.getSettings()), destroyed(false)
+ settings(hb.getSettings()), destroyed(false),
+ nextId(0), maxId(0)
{
+ QPID_LOG(debug, logPrefix << "Created");
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -162,13 +166,12 @@ QueueReplicator::~QueueReplicator() {}
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
- QPID_LOG(debug, logPrefix << " destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (destroyed) return;
destroyed = true;
- QPID_LOG(debug, logPrefix << "Destroyed.");
+ QPID_LOG(debug, logPrefix << "Destroyed");
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
link.reset();
@@ -188,12 +191,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
- arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
- SequenceNumber front, back;
- queue->getRange(front, back, broker::REPLICATOR);
- if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
+ arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
+ arguments.setString(ReplicatingSubscription::QPID_ID_SET,
+ encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -222,51 +223,36 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
- if (destroyed) return;
- queue->dequeueMessageAt(n);
-}
-
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result) {
- result = message.getSequence();
- return true;
-}
-bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) {
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
+void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+ //TODO: should be able to optimise the following
+ for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+ PositionMap::iterator j = positions.find(*i);
+ if (j != positions.end()) queue->dequeueMessageAt(j->second);
+ }
}
-} // namespace
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg)
{
try {
- const std::string& key = msg.getMessage().getRoutingKey();
Mutex::ScopedLock l(lock);
if (destroyed) return;
- if (!isEventKey(key)) {
+ const std::string& key = msg.getMessage().getRoutingKey();
+ if (!isEventKey(key)) { // Replicated message
+ ReplicationId id = nextId++;
+ maxId = std::max(maxId, id);
+ msg.getMessage().setReplicationId(id);
msg.deliverTo(queue);
- // We are on a backup so the queue is not modified except via this.
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ QueuePosition position = queue->getPosition();
+ positions[id] = position;
+ QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
}
else if (key == DEQUEUE_EVENT_KEY) {
- SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
- //TODO: should be able to optimise the following
- for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
- dequeue(*i, l);
+ dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
}
- else if (key == POSITION_EVENT_KEY) {
- SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
- << " to " << position);
- // Verify that there are no messages after the new position in the queue.
- SequenceNumber next;
- if (getNext(*queue, position, next))
- throw Exception(QPID_MSG(logPrefix << "Invalid position " << position
- << " preceeds message at " << next));
- queue->setPosition(position);
+ else if (key == ID_EVENT_KEY) {
+ nextId = decodeContent<ReplicationId>(msg.getMessage());
}
// Ignore unknown event keys, may be introduced in later versions.
}
@@ -275,6 +261,11 @@ void QueueReplicator::route(Deliverable& msg)
}
}
+ReplicationId QueueReplicator::getMaxId() {
+ Mutex::ScopedLock l(lock);
+ return maxId;
+}
+
// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 7f0fa52480..811ddba256 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -23,8 +23,8 @@
*/
#include "BrokerInfo.h"
+#include "hash.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/framing/SequenceSet.h"
#include <boost/enable_shared_from_this.hpp>
#include <iosfwd>
@@ -57,7 +57,7 @@ class QueueReplicator : public broker::Exchange,
{
public:
static const std::string DEQUEUE_EVENT_KEY;
- static const std::string POSITION_EVENT_KEY;
+ static const std::string ID_EVENT_KEY;
static const std::string QPID_SYNC_FREQUENCY;
static std::string replicatorName(const std::string& queueName);
@@ -87,13 +87,17 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
+ ReplicationId getMaxId();
+
private:
+ typedef qpid::sys::unordered_map<ReplicationId, QueuePosition, TrivialHasher<int32_t> > PositionMap;
+
class ErrorListener;
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void destroy(); // Called when the queue is destroyed.
- void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+ void dequeue(const ReplicationIdSet&, sys::Mutex::ScopedLock&);
HaBroker& haBroker;
std::string logPrefix;
@@ -106,8 +110,13 @@ class QueueReplicator : public broker::Exchange,
bool subscribed;
const Settings& settings;
bool destroyed;
+ PositionMap positions;
+ ReplicationIdSet idSet; // Set of replicationIds on the queue.
+ ReplicationId nextId; // ID for next message to arrive.
+ ReplicationId maxId; // Max ID used so far.
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshot.h b/qpid/cpp/src/qpid/ha/QueueSnapshot.h
new file mode 100644
index 0000000000..5b1054d934
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueSnapshot.h
@@ -0,0 +1,68 @@
+#ifndef QPID_HA_IDSETOBSERVER_H
+#define QPID_HA_IDSETOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace ha {
+
+/**
+ * A QueueObserver that maintains a ReplicationIdSet of the ReplicationIds of
+ * the messages on the queue.
+ *
+ * THREAD SAFE: Note that QueueObserver methods are called under the Queues messageLock.
+ *
+ */
+class QueueSnapshot : public broker::QueueObserver
+{
+ public:
+ void enqueued(const broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ set += m.getReplicationId();
+ }
+
+ void dequeued(const broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ set -= m.getReplicationId();
+ }
+
+ void acquired(const broker::Message&) {}
+
+ void requeued(const broker::Message&) {}
+
+ ReplicationIdSet snapshot() {
+ sys::Mutex::ScopedLock l(lock);
+ return set;
+ }
+
+ private:
+ sys::Mutex lock;
+ ReplicationIdSet set;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_IDSETOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
new file mode 100644
index 0000000000..258c406954
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueSnapshots.h
@@ -0,0 +1,81 @@
+#ifndef QPID_HA_QUEUESNAPSHOTS_H
+#define QPID_HA_QUEUESNAPSHOTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "QueueSnapshot.h"
+#include "hash.h"
+
+#include "qpid/assert.h"
+#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * ConfigurationObserver that maintains a map of the QueueSnapshot for each queue.
+ * THREAD SAFE.
+ */
+class QueueSnapshots : public broker::ConfigurationObserver
+{
+ public:
+ boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const {
+ sys::Mutex::ScopedLock l(lock);
+ SnapshotMap::const_iterator i = snapshots.find(q);
+ return i != snapshots.end() ? i->second : boost::shared_ptr<QueueSnapshot>();
+ }
+
+ // ConfigurationObserver overrides.
+ void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ sys::Mutex::ScopedLock l(lock);
+ boost::shared_ptr<QueueSnapshot> observer(new QueueSnapshot);
+ snapshots[q] = observer;
+ q->addObserver(observer);
+ }
+
+ void queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
+ sys::Mutex::ScopedLock l(lock);
+ SnapshotMap::iterator i = snapshots.find(q);
+ if (i != snapshots.end()) {
+ q->removeObserver(i->second);
+ snapshots.erase(i);
+ }
+ }
+
+ private:
+ typedef qpid::sys::unordered_map<boost::shared_ptr<broker::Queue>,
+ boost::shared_ptr<QueueSnapshot>,
+ SharedPtrHasher<broker::Queue>
+ > SnapshotMap;
+ SnapshotMap snapshots;
+ mutable sys::Mutex lock;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUESNAPSHOTS_H*/
diff --git a/qpid/cpp/src/qpid/ha/README.md b/qpid/cpp/src/qpid/ha/README.md
new file mode 100644
index 0000000000..0ec4d30750
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/README.md
@@ -0,0 +1,80 @@
+
+Overview of HA replication
+==========================
+
+Message Identifiers
+-------------------
+
+Replication IDs are sequence numbers assigned to messages *before* a message is
+enqueued. Originally the queue position number was used, but that was
+insufficient for two reasons:
+- We sometimes need to identify messages that are not yet enqueued, for example messages in an open transaction.
+- We don't want to require maintaining identical message sequences on every broker e.g. so transactions can be committed independently by each broker.
+
+We use the IDs to:
+- identify messages to dequeue on a backup.
+- remove extra messages from backup on failover.
+- avoid downloading messages already on backup on failover.
+
+
+On the primary
+--------------
+
+The main classes on the primary are as follows:
+
+`RemoteBackup`: Represents a remote backup broker. Container for per-queue
+information about the broker.
+
+Each (queue,backup) pair has an instance of either or both of the following
+classes:
+
+`QueueGuard`: A queue observer that delays completion of messages as they are
+enqueued and completes messages when they are acknowledged or dequeued.
+
+`RepicatingSubscription`: A queue browser that sends messages to the backup and
+receives acknowledgments. Forwards acknowledgments to the `QueueGuard`
+
+`ReplicatingSubscription` and `QueueGuard` are separate because the guard
+can be created before the subscription.
+
+Events intercepted by HA code:
+
+- enqueue: Message published to queue, completion delayed (QueueGuard)
+- deliver: Message delivered to ReplicatingSubscription and sent to backup.
+- acknowledge: Message acknowledged by backup (ReplicatingSubscription)
+- dequeue: Message removed from queue by a consumer (QueueGuard)
+
+Message states:
+- new: initial state.
+- sent: ReplicatingSubscription has sent message to backup.
+- delayed: QueueGuard has delayed completion.
+- delayed-sent: Both sent and delayed.
+- safe: Replication code is done with the message: it is acknowledged or dequeued.
+
+Events:
+- enqueue: message enqueue on queue
+- deliver: message delivered to ReplicatingSubscription
+- acknowledged: message is acknowledged by backup
+- dequeued: message is dequeued by consumer.
+
+State transition diagram:
+
+ (new)--deliver-->(sent)--acknowledged/dequeued---------------->(safe)
+ | L---dequeued- -------------------------------^
+ L-enqueue->(delayed)--dequeued---------------------------------|
+ | |
+ L--deliver->(delayed-sent)--acknowled/dequeued-----|
+
+
+A QueueGuard is set on the queue when a backup subscribes or when a backup is
+promoted. Messages before the _first guarded position_ cannot be delayed
+because they may have already been acknowledged to clients.
+
+A backup sends a set of pre-acknowledged messages when subscribing, messages
+that are already on the backup and therefore safe.
+
+A `ReplicatingSubscription` is _ready_ when all messages are safe or delayed. We
+know this is the case when all the following conditions hold:
+
+- The `ReplicatingSubscription` has reached the position preceeding the first guarded position AND
+- All messages prior to the first guarded position are safe.
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 798ade3f73..c37d44fa08 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -35,22 +35,19 @@ using boost::bind;
RemoteBackup::RemoteBackup(
const BrokerInfo& info, broker::Connection* c
-) : brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false)
+) : brokerInfo(info), replicationTest(NONE), started(false), connection(c), reportedReady(false)
{
std::ostringstream oss;
- oss << "Primary: Remote backup " << info << ": ";
+ oss << "Remote backup at " << info << ": ";
logPrefix = oss.str();
+ QPID_LOG(debug, logPrefix << "Connected");
}
-void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
-{
- queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
- QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues"
- << (createGuards ? " and guards" : ""));
+RemoteBackup::~RemoteBackup() {
+ // Don't cancel here, cancel must be called explicitly in a locked context
+ // where we know the connection pointer is still good.
}
-RemoteBackup::~RemoteBackup() { cancel(); }
-
void RemoteBackup::cancel() {
QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
<< " backup: " << brokerInfo);
@@ -64,7 +61,7 @@ void RemoteBackup::cancel() {
}
bool RemoteBackup::isReady() {
- return connection && catchupQueues.empty();
+ return started && connection && catchupQueues.empty();
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index 769c50457e..e2c5032820 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -25,8 +25,9 @@
#include "ReplicationTest.h"
#include "BrokerInfo.h"
#include "types.h"
+#include "hash.h"
+#include "qpid/sys/unordered_map.h"
#include <set>
-#include <map>
namespace qpid {
@@ -58,17 +59,12 @@ class RemoteBackup
RemoteBackup(const BrokerInfo&, broker::Connection*);
~RemoteBackup();
- /** Set all queues in the registry as catch-up queues.
- *@createGuards if true create guards also, if false guards are created on demand.
- */
- void setCatchupQueues(broker::QueueRegistry&, bool createGuards);
-
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
/** Is the remote backup connected? */
void setConnection(broker::Connection* c) { connection = c; }
- bool isConnected() const { return connection; }
+ broker::Connection* getConnection() const { return connection; }
/** ReplicatingSubscription associated with queue is ready.
* Note: may set isReady()
@@ -90,18 +86,26 @@ class RemoteBackup
/**Cancel all queue guards, called if we are timed out. */
void cancel();
+ /** Set a catch-up queue for this backup.
+ *@createGuard if true create a guard immediately.
+ */
+ void catchupQueue(const QueuePtr&, bool createGuard);
+
BrokerInfo getBrokerInfo() const { return brokerInfo; }
+
+ void startCatchup() { started = true; }
+
private:
- typedef std::map<QueuePtr, GuardPtr> GuardMap;
+ typedef qpid::sys::unordered_map<QueuePtr, GuardPtr,
+ SharedPtrHasher<broker::Queue> > GuardMap;
typedef std::set<QueuePtr> QueueSet;
- void catchupQueue(const QueuePtr&, bool createGuard);
-
std::string logPrefix;
BrokerInfo brokerInfo;
ReplicationTest replicationTest;
GuardMap guards;
QueueSet catchupQueues;
+ bool started;
broker::Connection* connection;
bool reportedReady;
};
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index a9bd7b49f8..7b153f90ca 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,12 +20,16 @@
*/
#include "makeMessage.h"
+#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueRange.h"
#include "QueueReplicator.h"
+#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
+#include "HaBroker.h"
+#include "qpid/assert.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -35,6 +39,7 @@
#include "qpid/types/Uuid.h"
#include <sstream>
+
namespace qpid {
namespace ha {
@@ -45,53 +50,20 @@ using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
-const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
+const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
-namespace {
-const string DOLLAR("$");
-const string INTERNAL("-internal");
-} // namespace
-
-// Scan the queue for gaps and add them to the subscriptions dequed set.
-class DequeueScanner
-{
+class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
public:
- DequeueScanner(
- ReplicatingSubscription& rs,
- SequenceNumber front_,
- SequenceNumber back_ // Inclusive
- ) : subscription(rs), front(front_), back(back_)
- {
- assert(front <= back);
- // INVARIANT deques have been added for positions <= at.
- at = front - 1;
- }
-
- void operator()(const Message& m) {
- if (m.getSequence() >= front && m.getSequence() <= back) {
- if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
- at = m.getSequence();
- }
- }
-
- // Must call after scanning the queue.
- void finish() {
- if (at < back) subscription.dequeued(at+1, back);
- }
-
+ QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
+ void enqueued(const broker::Message&) {}
+ void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
private:
- ReplicatingSubscription& subscription;
- SequenceNumber front;
- SequenceNumber back;
- SequenceNumber at;
+ ReplicatingSubscription& rs;
};
-string mask(const string& in)
-{
- return DOLLAR + in + INTERNAL;
-}
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -110,6 +82,7 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
+ haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
rs->initialize();
@@ -117,7 +90,15 @@ ReplicatingSubscription::Factory::create(
return rs;
}
+namespace {
+void copyIf(boost::shared_ptr<MessageInterceptor> from, boost::shared_ptr<IdSetter>& to) {
+ boost::shared_ptr<IdSetter> result = boost::dynamic_pointer_cast<IdSetter>(from);
+ if (result) to = result;
+}
+} // namespace
+
ReplicatingSubscription::ReplicatingSubscription(
+ HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -130,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- ready(false)
+ position(0), ready(false), cancelled(false),
+ haBroker(hb)
{
try {
FieldTable ft;
@@ -140,64 +122,57 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "Primary " << queue->getName() << "@" << info << ": ";
+ os << "Subscription to " << queue->getName() << " at " << info << ": ";
logPrefix = os.str();
- // NOTE: Once the guard is attached we can have concurrent
- // calls to dequeued so we need to lock use of this->dequeues.
- //
- // However we must attach the guard _before_ we scan for
- // initial dequeues to be sure we don't miss any dequeues
- // between the scan and attaching the guard.
+ // If this is a non-cluster standalone replication then we need to
+ // set up an IdSetter if there is not already one.
+ boost::shared_ptr<IdSetter> idSetter;
+ queue->getMessageInterceptors().each(
+ boost::bind(&copyIf, _1, boost::ref(idSetter)));
+ if (!idSetter) {
+ QPID_LOG(debug, logPrefix << "Standalone replication");
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
+ }
+
+ // If there's already a guard (we are in failover) use it, else create one.
if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
- guard->attach(*this);
- QueueRange backup(arguments); // Remote backup range.
- QueueRange backupOriginal(backup);
- QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
- backupPosition = backup.back;
-
- // Sync backup and primary queues, don't send messages already on the backup
-
- if (backup.front > primary.front || // Missing messages at front
- backup.back < primary.front || // No overlap
- primary.empty() || backup.empty()) // Empty
+ // NOTE: Once the observer is attached we can have concurrent
+ // calls to dequeued so we need to lock use of this->dequeues.
+ //
+ // However we must attach the observer _before_ we snapshot for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the snapshot and attaching the observer.
+ observer.reset(new QueueObserver(*this));
+ queue->addObserver(observer);
+ ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+ std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+ ReplicationIdSet backup;
+ if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+
+ // Initial dequeues are messages on backup but not on primary.
+ ReplicationIdSet initDequeues = backup - primary;
+ QueuePosition front,back;
+ queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
{
- // No useful overlap - erase backup and start from the beginning
- if (!backup.empty()) dequeued(backup.front, backup.back);
- position = primary.front-1;
+ sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
+ dequeues += initDequeues; // Messages on backup that are not on primary.
+ skip = backup - initDequeues; // Messages already on the backup.
+
+ // Queue front is moving but we know this subscriptions will start at a
+ // position >= front so if front is safe then position must be.
+ position = front;
+
+ QPID_LOG(debug, logPrefix << "Subscribed: front " << front
+ << ", back " << back
+ << ", start " << position
+ << ", guarded " << guard->getFirst()
+ << ", on backup " << skip);
+ checkReady(l);
}
- else { // backup and primary do overlap.
- // Remove messages from backup that are not in primary.
- if (primary.back < backup.back) {
- dequeued(primary.back+1, backup.back); // Trim excess messages at back
- backup.back = primary.back;
- }
- if (backup.front < primary.front) {
- dequeued(backup.front, primary.front-1); // Trim excess messages at front
- backup.front = primary.front;
- }
- DequeueScanner scan(*this, backup.front, backup.back);
- // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
- queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
- scan.finish();
- position = backup.back;
- //move cursor to position
- queue->seek(*this, position);
- }
- // NOTE: we are assuming that the messages that are on the backup are
- // consistent with those on the primary. If the backup is a replica
- // queue and hasn't been tampered with then that will be the case.
-
- QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backupOriginal << " adjusted backup:" << backup
- << " primary:" << primary
- << " catch-up: " << position << "-" << primary.back
- << "(" << primary.back-position << ")");
-
- // Check if we are ready yet.
- if (guard->subscriptionStart(position)) setReady();
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Creation error: " << e.what()
@@ -208,6 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription(
ReplicatingSubscription::~ReplicatingSubscription() {}
+
// Called in subscription's connection thread when the subscription is created.
// Called separate from ctor because sending events requires
// shared_from_this
@@ -215,12 +191,9 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
void ReplicatingSubscription::initialize() {
try {
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
-
- // Send initial dequeues and position to the backup.
+ // Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
- sendPositionEvent(position, l);
- backupPosition = position;
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
@@ -229,53 +202,64 @@ void ReplicatingSubscription::initialize() {
}
}
+// True if the next position for the ReplicatingSubscription is a guarded position.
+bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
+ return position+1 >= guard->getFirst();
+}
+
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(
const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
{
+ Mutex::ScopedLock l(lock);
+ ReplicationId id = m.getReplicationId();
+ position = m.getSequence();
try {
- QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
- {
- Mutex::ScopedLock l(lock);
- position = m.getSequence();
-
- // m.getSequence() is the position of the new message on local queue.
- // backupPosition is latest position on backup queue before enqueueing
- if (m.getSequence() <= backupPosition)
- throw Exception(
- QPID_MSG(logPrefix << "Expected position > " << backupPosition
- << " but got " << m.getSequence()));
- if (m.getSequence() - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- // Send the position before message was enqueued.
- sendPositionEvent(m.getSequence()-1, l);
- }
- // Backup will automatically advance by 1 on delivery of message.
- backupPosition = m.getSequence();
+ bool result = false;
+ if (skip.contains(id)) {
+ skip -= id;
+ guard->complete(id); // This will never be acknowledged.
+ result = false;
+ }
+ else {
+ QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
+ // Only consider unguarded messages for ready status.
+ if (!ready && !isGuarded(l)) unacked += id;
+ sendIdEvent(id, l);
+ result = ConsumerImpl::deliver(c, m);
}
- return ConsumerImpl::deliver(c, m);
+ checkReady(l);
+ return result;
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
+ QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m)
<< ": " << e.what());
throw;
}
}
-void ReplicatingSubscription::setReady() {
- {
- Mutex::ScopedLock l(lock);
- if (ready) return;
+/**
+ *@param position: must be <= last position seen by subscription.
+ */
+void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
+ if (!ready && isGuarded(l) && unacked.empty()) {
ready = true;
+ sys::Mutex::ScopedUnlock u(lock);
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(debug, logPrefix << "Caught up");
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
- // Notify Primary that a subscription is ready.
- QPID_LOG(debug, logPrefix << "Caught up");
- if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
+ {
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ }
QPID_LOG(debug, logPrefix << "Cancelled");
+ getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
}
@@ -283,10 +267,15 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
- guard->complete(r.getMessageId());
- // If next message is protected by the guard then we are ready
- if (r.getMessageId() >= guard->getRange().back) setReady();
+ ReplicationId id = r.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Acknowledged " <<
+ LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId()));
+ guard->complete(id);
+ {
+ Mutex::ScopedLock l(lock);
+ unacked -= id;
+ checkReady(l);
+ }
ConsumerImpl::acknowledged(r);
}
@@ -295,59 +284,36 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buf(dequeues.encodedSize(),'\0');
- framing::Buffer buffer(&buf[0], buf.size());
- dequeues.encode(buffer);
+ string buffer = encodeStr(dequeues);
dequeues.clear();
- buffer.reset();
- {
- Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
- }
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
-// Called via QueueObserver::dequeued override on guard.
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const Message& m)
+void ReplicatingSubscription::dequeued(ReplicationId id)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
+ QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
{
Mutex::ScopedLock l(lock);
- dequeues.add(m.getSequence());
+ dequeues.add(id);
}
notify(); // Ensure a call to doDispatch
}
-// Called during construction while scanning for initial dequeues.
-void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
- QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
- {
- Mutex::ScopedLock l(lock);
- dequeues.add(first,last);
- }
-}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- if (pos == backupPosition) return; // No need to send.
- QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
- string buf(pos.encodedSize(),'\0');
- framing::Buffer buffer(&buf[0], buf.size());
- pos.encode(buffer);
- buffer.reset();
- {
- Mutex::ScopedUnlock u(lock);
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
- }
+ sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
}
void ReplicatingSubscription::sendEvent(const std::string& key,
- const framing::Buffer& buffer,
+ const std::string& buffer,
Mutex::ScopedLock&)
{
+ Mutex::ScopedUnlock u(lock);
broker::Message message = makeMessage(buffer);
MessageTransfer& transfer = MessageTransfer::get(message);
DeliveryProperties* props =
@@ -370,7 +336,6 @@ bool ReplicatingSubscription::doDispatch()
return ConsumerImpl::doDispatch();
}
catch (const std::exception& e) {
- // FIXME aconway 2012-10-05: detect queue deletion, no warning.
QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
return false;
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 05584a2e37..d209faef9f 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -43,6 +43,7 @@ class Buffer;
namespace ha {
class QueueGuard;
+class HaBroker;
/**
* A susbcription that replicates to a remote backup.
@@ -61,30 +62,36 @@ class QueueGuard;
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
- * Lock Hierarchy: ReplicatingSubscription MUST NOT call QueueGuard with its
- * lock held QueueGuard MAY call ReplicatingSubscription with its lock held.
+ * ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
- struct Factory : public broker::ConsumerFactory {
+ class Factory : public broker::ConsumerFactory {
+ public:
+ Factory(HaBroker& hb) : haBroker(hb) {}
+
+ HaBroker& getHaBroker() const { return haBroker; }
+
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
const framing::FieldTable& arguments);
+ private:
+ HaBroker& haBroker;
};
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
- static const std::string QPID_BACK;
- static const std::string QPID_FRONT;
static const std::string QPID_BROKER_INFO;
+ static const std::string QPID_ID_SET;
- ReplicatingSubscription(broker::SemanticState* parent,
+ ReplicatingSubscription(HaBroker& haBroker,
+ broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
@@ -92,12 +99,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
~ReplicatingSubscription();
- // Called via QueueGuard::dequeued.
- //@return true if the message requires completion.
- void dequeued(const broker::Message&);
-
- // Called during initial scan for dequeues.
- void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
// Consumer overrides.
bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
@@ -121,19 +122,27 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
bool doDispatch();
private:
+ class QueueObserver;
+ friend class QueueObserver;
+
std::string logPrefix;
- framing::SequenceSet dequeues;
- framing::SequenceNumber position;
- framing::SequenceNumber backupPosition;
+ QueuePosition position;
+ ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
+ ReplicationIdSet skip; // Messages already on backup will be skipped.
+ ReplicationIdSet unacked; // Replicated but un-acknowledged.
bool ready;
+ bool cancelled;
BrokerInfo info;
boost::shared_ptr<QueueGuard> guard;
+ HaBroker& haBroker;
+ boost::shared_ptr<QueueObserver> observer;
+ bool isGuarded(sys::Mutex::ScopedLock&);
+ void dequeued(ReplicationId);
void sendDequeueEvent(sys::Mutex::ScopedLock&);
- void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
- void setReady();
- void sendEvent(const std::string& key, const framing::Buffer&,
- sys::Mutex::ScopedLock&);
+ void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&);
+ void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&);
+ void checkReady(sys::Mutex::ScopedLock&);
friend struct Factory;
};
diff --git a/qpid/cpp/src/qpid/ha/hash.h b/qpid/cpp/src/qpid/ha/hash.h
new file mode 100644
index 0000000000..a513673cce
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/hash.h
@@ -0,0 +1,42 @@
+#ifndef QPID_HA_HASH_H
+#define QPID_HA_HASH_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace ha {
+
+template<class T> struct TrivialHasher {
+ size_t operator()(T value) const { return static_cast<size_t>(value); }
+};
+
+template<class T> struct SharedPtrHasher {
+ size_t operator()(const boost::shared_ptr<T>& ptr) const {
+ return reinterpret_cast<size_t>(ptr.get());
+ }
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_HASH_H*/
diff --git a/qpid/cpp/src/qpid/ha/makeMessage.cpp b/qpid/cpp/src/qpid/ha/makeMessage.cpp
index ca0e48f13d..5b063a23e7 100644
--- a/qpid/cpp/src/qpid/ha/makeMessage.cpp
+++ b/qpid/cpp/src/qpid/ha/makeMessage.cpp
@@ -54,4 +54,9 @@ broker::Message makeMessage(const framing::Buffer& buffer,
return broker::Message(transfer, 0);
}
+broker::Message makeMessage(const std::string& content, const std::string& destination) {
+ framing::Buffer buffer(const_cast<char*>(&content[0]), content.size());
+ return makeMessage(buffer, destination);
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/makeMessage.h b/qpid/cpp/src/qpid/ha/makeMessage.h
index 283b415791..4427cdd948 100644
--- a/qpid/cpp/src/qpid/ha/makeMessage.h
+++ b/qpid/cpp/src/qpid/ha/makeMessage.h
@@ -23,6 +23,10 @@
*/
#include "qpid/broker/Message.h"
+#include "qpid/framing/Buffer.h"
+#include <string>
+
+/** Utilities for creating messages used by HA internally. */
namespace qpid {
namespace framing {
@@ -38,6 +42,25 @@ broker::Message makeMessage(
const std::string& destination=std::string()
);
+broker::Message makeMessage(const std::string& content,
+ const std::string& destination=std::string());
+
+/** Encode value as a string. */
+template <class T> std::string encodeStr(const T& value) {
+ std::string encoded(value.encodedSize(), '\0');
+ framing::Buffer buffer(&encoded[0], encoded.size());
+ value.encode(buffer);
+ return encoded;
+}
+
+/** Decode value from a string. */
+template <class T> T decodeStr(const std::string& encoded) {
+ framing::Buffer buffer(const_cast<char*>(&encoded[0]), encoded.size());
+ T value;
+ value.decode(buffer);
+ return value;
+}
+
}} // namespace qpid::ha
#endif /*!QPID_HA_MAKEMESSAGE_H*/
diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index bb4bf83574..2246355339 100644
--- a/qpid/cpp/src/qpid/ha/types.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -21,6 +21,8 @@
#include "types.h"
#include "qpid/Msg.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
#include "qpid/Exception.h"
#include <algorithm>
#include <iostream>
@@ -83,4 +85,19 @@ ostream& operator<<(ostream& o, const IdSet& ids) {
return o;
}
+LogMessageId::LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) :
+ queue(q.getName()), position(pos), replicationId(id) {}
+
+LogMessageId::LogMessageId(const broker::Queue& q, const broker::Message& m) :
+ queue(q.getName()), position(m.getSequence()), replicationId(m.getReplicationId()) {}
+
+LogMessageId::LogMessageId(const std::string& q, const broker::Message& m) :
+ queue(q), position(m.getSequence()), replicationId(m.getReplicationId()) {}
+
+std::ostream& operator<<(std::ostream& o, const LogMessageId& m) {
+ return o << m.queue << "[" << m.position << "]=" << m.replicationId;
+}
+
+
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h
index f8c48afc5a..9a7e97c66d 100644
--- a/qpid/cpp/src/qpid/ha/types.h
+++ b/qpid/cpp/src/qpid/ha/types.h
@@ -24,12 +24,20 @@
#include "qpid/types/Variant.h"
#include "qpid/types/Uuid.h"
+#include "qpid/framing/SequenceSet.h"
+
+#include <boost/shared_ptr.hpp>
+
#include <string>
#include <set>
#include <iosfwd>
namespace qpid {
+namespace broker {
+class Message;
+class Queue;
+}
namespace framing {
class FieldTable;
}
@@ -106,5 +114,23 @@ class IdSet : public std::set<types::Uuid> {};
std::ostream& operator<<(std::ostream& o, const IdSet& ids);
+// Use type names to distinguish Positions from Replication Ids
+typedef framing::SequenceNumber QueuePosition;
+typedef framing::SequenceNumber ReplicationId;
+typedef framing::SequenceSet QueuePositionSet;
+typedef framing::SequenceSet ReplicationIdSet;
+
+/** Helper for logging message ID */
+struct LogMessageId {
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id);
+ LogMessageId(const broker::Queue& q, const broker::Message& m);
+ LogMessageId(const std::string& q, const broker::Message& m);
+ const std::string& queue;
+ QueuePosition position;
+ ReplicationId replicationId;
+};
+std::ostream& operator<<(std::ostream&, const LogMessageId&);
+
}} // qpid::ha
#endif /*!QPID_HA_ENUM_H*/
diff --git a/qpid/cpp/src/qpid/types/Uuid.cpp b/qpid/cpp/src/qpid/types/Uuid.cpp
index 9862fa8946..1d6fbf430a 100644
--- a/qpid/cpp/src/qpid/types/Uuid.cpp
+++ b/qpid/cpp/src/qpid/types/Uuid.cpp
@@ -144,4 +144,12 @@ std::string Uuid::str() const
return os.str();
}
+size_t Uuid::hash() const {
+ std::size_t seed = 0;
+ for(size_t i = 0; i < SIZE; ++i)
+ seed ^= static_cast<std::size_t>(bytes[i]) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+ return seed;
+}
+
+
}} // namespace qpid::types
diff --git a/qpid/cpp/src/tests/DeliveryRecordTest.cpp b/qpid/cpp/src/tests/DeliveryRecordTest.cpp
index c83bd9a6a4..37b3095f81 100644
--- a/qpid/cpp/src/tests/DeliveryRecordTest.cpp
+++ b/qpid/cpp/src/tests/DeliveryRecordTest.cpp
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
list<DeliveryRecord> records;
for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
- DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
+ DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
r.setId(*i);
records.push_back(r);
}
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 7b0d88a27c..9cf721fd01 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -107,7 +107,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
+ "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
@@ -177,7 +177,7 @@ acl allow all all
self._status = self.ha_status()
return self._status == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
+ assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%(
self, status, self._status)
def wait_queue(self, queue, timeout=1):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 3836381ed2..212e92b0c6 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4, UUID
@@ -100,7 +100,7 @@ class ReplicationTests(HaBrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
# Verify exchange with replicate=configuration
- b.sender(prefix+"e2/key2").send(Message(prefix+"e2"))
+ b.sender(prefix+"e2/key2").send(Message(prefix+"e2"))
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
@@ -284,9 +284,9 @@ class ReplicationTests(HaBrokerTest):
# Set up replication with qpid-ha
backup.replicate(primary.host_port(), "q")
- ps.send("a")
+ ps.send("a", timeout=1)
backup.assert_browse_backup("q", ["a"])
- ps.send("b")
+ ps.send("b", timeout=1)
backup.assert_browse_backup("q", ["a", "b"])
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
@@ -295,11 +295,11 @@ class ReplicationTests(HaBrokerTest):
# Set up replication with qpid-config
ps2 = pc.session().sender("q2;{create:always}")
backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x")
+ ps2.send("x", timeout=1)
backup.assert_browse_backup("q2", ["x"])
finally: l.restore()
- def test_queue_replica_failover(self):
+ def test_standalone_queue_replica_failover(self):
"""Test individual queue replication from a cluster to a standalone
backup broker, verify it fails over."""
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
@@ -319,6 +319,7 @@ class ReplicationTests(HaBrokerTest):
backup.assert_browse_backup("q", ["a"])
ps.send("b")
backup.assert_browse_backup("q", ["a", "b"])
+ cluster[0].wait_status("ready")
cluster.bounce(1)
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
@@ -331,16 +332,20 @@ class ReplicationTests(HaBrokerTest):
"""Verify that we replicate to an LVQ correctly"""
cluster = HaCluster(self, 2)
s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
- def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
- for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
- send(*kv)
- cluster[1].assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
- send("b","b-2")
- cluster[1].assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
- send("c","c-3")
- cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
- send("d","d-1")
- cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
+
+ def send(key,value,expect):
+ s.send(Message(content=value,properties={"lvq-key":key}), timeout=1)
+ cluster[1].assert_browse_backup("lvq", expect)
+
+ send("a", "a-1", ["a-1"])
+ send("b", "b-1", ["a-1", "b-1"])
+ send("a", "a-2", ["b-1", "a-2"])
+ send("a", "a-3", ["b-1", "a-3"])
+ send("c", "c-1", ["b-1", "a-3", "c-1"])
+ send("c", "c-2", ["b-1", "a-3", "c-2"])
+ send("b", "b-2", ["a-3", "c-2", "b-2"])
+ send("c", "c-3", ["a-3", "b-2", "c-3"])
+ send("d", "d-1", ["a-3", "b-2", "c-3", "d-1"])
def test_ring(self):
"""Test replication with the ring queue policy"""
@@ -416,8 +421,8 @@ class ReplicationTests(HaBrokerTest):
def send(self, connection):
"""Send messages, then acquire one but don't acknowledge"""
s = connection.session()
- for m in range(10): s.sender(self.address).send(str(m))
- s.receiver(self.address).fetch()
+ for m in range(10): s.sender(self.address).send(str(m), timeout=1)
+ s.receiver(self.address, timeout=1).fetch()
def verify(self, brokertest, backup):
backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
@@ -959,8 +964,7 @@ class LongTests(HaBrokerTest):
for s in senders: s.sender.assert_running()
for r in receivers: r.receiver.assert_running()
checkpoint = [ r.received+100 for r in receivers ]
- dead = None
- victim = random.randint(0,2)
+ victim = random.choice([0,1,2,primary]) # Give the primary a better chance.
if victim == primary:
# Don't kill primary till it is active and the next
# backup is ready, otherwise we can lose messages.
@@ -984,13 +988,8 @@ class LongTests(HaBrokerTest):
finally:
for s in senders: s.stop()
for r in receivers: r.stop()
- unexpected_dead = []
- for i in xrange(3):
- if not brokers[i].is_running() and i != dead:
- unexpected_dead.append(i)
- if brokers[i].is_running(): brokers.kill(i, False)
- if unexpected_dead:
- raise Exception("Brokers not running: %s"%unexpected_dead)
+ dead = filter(lambda i: not brokers[i].is_running(), xrange(3))
+ if dead: raise Exception("Brokers not running: %s"%dead)
def test_qmf_order(self):
"""QPID 4402: HA QMF events can be out of order.
@@ -1066,7 +1065,7 @@ class RecoveryTests(HaBrokerTest):
# Create a queue before the failure.
s1 = cluster.connect(0).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
- for i in xrange(10): s1.send(str(i))
+ for i in xrange(10): s1.send(str(i), timeout=0.1)
# Kill primary and 2 backups
cluster[3].wait_status("ready")
@@ -1125,17 +1124,17 @@ class RecoveryTests(HaBrokerTest):
cluster[0].wait_status("active") # Primary ready
for b in cluster[1:3]: b.wait_status("ready") # Backups ready
for i in [0,1]: cluster.kill(i, False)
- cluster[2].promote() # New primary, backups will be 1 and 2
+ cluster[2].promote() # New primary, expected backup will 1
cluster[2].wait_status("recovering")
# Should not go active till the expected backup connects or times out.
self.assertEqual(cluster[2].ha_status(), "recovering")
- # Messages should be held expected backup times out
+ # Messages should be held till expected backup times out
s = cluster[2].connect().session().sender("q;{create:always}")
- for i in xrange(100): s.send(str(i), sync=False)
+ s.send("foo", sync=False)
# Verify message held initially.
try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
except Timeout: pass
- s.sync(timeout=1) # And released after the timeout.
+ s.sync(timeout=1) # And released after the timeout.
self.assertEqual(cluster[2].ha_status(), "active")
def test_join_ready_cluster(self):
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 3e6b805692..b72964c1a7 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -1,5 +1,5 @@
#!/bin/sh
-#
+echo#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -26,8 +26,8 @@ REPEAT="--repeat 10"
QUEUES="-q 6"
SENDERS="-s 3"
RECEIVERS="-r 3"
-BROKERS= # Local broker
-CLIENT_HOSTS= # No ssh, all clients are local
+BROKERS= # Local broker
+CLIENT_HOSTS= # No ssh, all clients are local
# Connection options
TCP_NODELAY=false
RECONNECT=true