summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-09-09 17:08:23 +0000
committerAlan Conway <aconway@apache.org>2013-09-09 17:08:23 +0000
commitfcafb84f19127b245394f311e4f3ed47180a5188 (patch)
treefe6a32abfc2dc0ea8258869d42b3ce928b7dee8c /qpid/cpp/src
parent037f641f3e098c9b9df47f6ca42979bfa583dd4d (diff)
downloadqpid-python-fcafb84f19127b245394f311e4f3ed47180a5188.tar.gz
QPID-4327: HA support for TX transactions - fix auth bugs.
- Set auth info on status check connections - Clean up status check loging - Use realm@username for authentication name (was using just username) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1521190 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h6
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.cpp38
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.h10
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp3
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py18
10 files changed, 51 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index e28ca1fa6a..503de3e351 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -52,9 +52,7 @@ using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
haBroker(hb), broker(hb.getBroker()), settings(s),
- statusCheck(
- new StatusCheck(
- logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
+ statusCheck(new StatusCheck(hb))
{}
void Backup::setBrokerUrl(const Url& brokers) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 49fdee49d0..3cfdc40b03 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -799,7 +799,7 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
// messages. Any reroutes will be done at the primary and
// replicated as normal.
if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
- haBroker.deleteQueue(name, remoteHost);
+ haBroker.getBroker().deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 635aa1f65c..4d6a5ee51e 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -64,6 +64,7 @@ using boost::dynamic_pointer_cast;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ userId(s.username+"@"+b.getOptions().realm),
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
@@ -221,8 +222,4 @@ boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::stri
broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
}
-void HaBroker::deleteQueue(const string& name, const string& connectionId) {
- broker.deleteQueue(name, settings.username, connectionId);
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index c18e44d949..1a42111cea 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -102,9 +102,8 @@ class HaBroker : public management::Manageable
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
- /**@param connectionId optional, only available on backup */
- void deleteQueue(const std::string& name,
- const std::string& connectionId=std::string());
+ /** Authenticated user ID for queue create/delete */
+ std::string getUserId() const { return userId; }
private:
void setPublicUrl(const Url&);
@@ -116,6 +115,7 @@ class HaBroker : public management::Manageable
// Immutable members
const types::Uuid systemId;
const Settings settings;
+ const std::string userId;
// Member variables protected by lock
mutable sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 6a916d3f84..4c7dc2ef0d 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -169,7 +169,8 @@ void PrimaryTxObserver::rollback() {
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue if there are connected subscriptions.
if (ended && unfinished.empty()) {
- haBroker.deleteQueue(txQueue->getName());
+ haBroker.getBroker().deleteQueue(
+ txQueue->getName(), haBroker.getUserId(), string());
broker.getExchanges().destroy(getExchangeName());
}
}
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
index 11f65aa574..4ab3c347a0 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
@@ -20,6 +20,8 @@
*/
#include "StatusCheck.h"
#include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
@@ -41,27 +43,32 @@ const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker";
class StatusCheckThread : public sys::Runnable {
public:
- StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
- : url(addr), statusCheck(sc), brokerInfo(self) {}
+ StatusCheckThread(StatusCheck& sc, const qpid::Address& addr)
+ : url(addr), statusCheck(sc) {}
void run();
private:
Url url;
StatusCheck& statusCheck;
- BrokerInfo brokerInfo;
};
void StatusCheckThread::run() {
- QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+ string logPrefix("Status check " + url.str() + ": ");
Connection c;
try {
+ // Check for self connections
Variant::Map options, clientProperties;
- clientProperties = brokerInfo.asMap(); // Detect self connections.
clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
- clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+ // Set connection options
+ Settings settings(statusCheck.haBroker.getSettings());
+ if (settings.username.size()) options["username"] = settings.username;
+ if (settings.password.size()) options["password"] = settings.password;
+ if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
options["client-properties"] = clientProperties;
- options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
+ sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
+ options["heartbeat"] = heartbeat/sys::TIME_SEC;
c = Connection(url.str(), options);
c.open();
@@ -81,7 +88,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- messaging::Duration timeout(statusCheck.linkHeartbeatInterval/sys::TIME_MSEC);
+ messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
Message response = r.fetch(timeout);
session.acknowledge();
Variant::List contentIn;
@@ -89,23 +96,22 @@ void StatusCheckThread::run() {
if (contentIn.size() == 1) {
Variant::Map details = contentIn.front().asMap()["_values"].asMap();
string status = details["status"].getString();
+ QPID_LOG(debug, logPrefix << status);
if (status != "joining") {
statusCheck.setPromote(false);
- QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
- << status << ", this broker will refuse promotion.");
+ QPID_LOG(info, logPrefix << "Joining established cluster");
}
- QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
} catch(const exception& error) {
- QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url
- << ": " << error.what());
+ // Its not an error to fail to connect to self.
+ if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
+ QPID_LOG(warning, logPrefix << error.what());
}
try { c.close(); } catch(...) {}
delete this;
}
-StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self)
- : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
{}
StatusCheck::~StatusCheck() {
@@ -116,7 +122,7 @@ StatusCheck::~StatusCheck() {
void StatusCheck::setUrl(const Url& url) {
Mutex::ScopedLock l(lock);
for (size_t i = 0; i < url.size(); ++i)
- threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+ threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
}
bool StatusCheck::canPromote() {
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h
index 65ad3cefcf..8896969f55 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.h
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.h
@@ -23,6 +23,7 @@
*/
#include "BrokerInfo.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
@@ -33,6 +34,8 @@
namespace qpid {
namespace ha {
+class HaBroker;
+
// TODO aconway 2012-12-21: This solution is incomplete. It will only protect
// against bad promotion if there are READY brokers when this broker starts.
// It will not help the situation where brokers became READY after this one starts.
@@ -51,7 +54,7 @@ namespace ha {
class StatusCheck
{
public:
- StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self);
+ StatusCheck(HaBroker&);
~StatusCheck();
void setUrl(const Url&);
bool canPromote();
@@ -59,12 +62,11 @@ class StatusCheck
private:
void setPromote(bool p);
- std::string logPrefix;
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- sys::Duration linkHeartbeatInterval;
- BrokerInfo brokerInfo;
+ HaBroker& haBroker;
+
friend class StatusCheckThread;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 75ad554cb9..10181c15df 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -222,7 +222,8 @@ void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
if (!e.members.count(haBroker.getMembership().getSelf())) {
QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
// Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- haBroker.deleteQueue(getQueue()->getName());
+ haBroker.getBroker().deleteQueue(
+ getQueue()->getName(), haBroker.getUserId(), string());
}
}
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 5f21b8f8a4..1c51673763 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -125,7 +125,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
+ "--log-enable=debug+:ha::", "--log-enable=debug+:acl::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 47a1be815a..79f068641d 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -565,6 +565,7 @@ class ReplicationTests(HaBrokerTest):
# Verify that replication works with auth=yes and HA user has at least the following
# privileges:
aclf.write("""
+# HA user
acl allow zag@QPID access queue
acl allow zag@QPID create queue
acl allow zag@QPID consume queue
@@ -576,6 +577,9 @@ acl allow zag@QPID publish exchange
acl allow zag@QPID delete exchange
acl allow zag@QPID access method
acl allow zag@QPID create link
+# Normal user
+acl allow zig@QPID all all
+
acl deny all all
""")
aclf.close()
@@ -586,14 +590,16 @@ acl deny all all
"--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
],
client_credentials=Credentials("zag", "zag", "PLAIN"))
- s0 = cluster[0].connect(username="zag", password="zag").session();
+ c = cluster[0].connect(username="zig", password="zig")
+ s0 = c.session();
s0.receiver("q;{create:always}")
s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
- cluster[1].wait_backup("q")
- cluster[1].wait_backup("ex")
- s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
- s1.sender("ex").send("foo");
- self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ s0.sender("ex").send("foo");
+ s1 = c.session(transactional=True)
+ s1.sender("ex").send("tx");
+ cluster[1].assert_browse_backup("q", ["foo"])
+ s1.commit()
+ cluster[1].assert_browse_backup("q", ["foo", "tx"])
def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated