diff options
| author | Alan Conway <aconway@apache.org> | 2013-09-09 17:08:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-09-09 17:08:23 +0000 |
| commit | fcafb84f19127b245394f311e4f3ed47180a5188 (patch) | |
| tree | fe6a32abfc2dc0ea8258869d42b3ce928b7dee8c /qpid/cpp/src | |
| parent | 037f641f3e098c9b9df47f6ca42979bfa583dd4d (diff) | |
| download | qpid-python-fcafb84f19127b245394f311e4f3ed47180a5188.tar.gz | |
QPID-4327: HA support for TX transactions - fix auth bugs.
- Set auth info on status check connections
- Clean up status check loging
- Use realm@username for authentication name (was using just username)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1521190 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.cpp | 38 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 3 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 18 |
10 files changed, 51 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index e28ca1fa6a..503de3e351 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -52,9 +52,7 @@ using sys::Mutex; Backup::Backup(HaBroker& hb, const Settings& s) : logPrefix("Backup: "), membership(hb.getMembership()), stopped(false), haBroker(hb), broker(hb.getBroker()), settings(s), - statusCheck( - new StatusCheck( - logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo())) + statusCheck(new StatusCheck(hb)) {} void Backup::setBrokerUrl(const Url& brokers) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 49fdee49d0..3cfdc40b03 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -799,7 +799,7 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { // messages. Any reroutes will be done at the primary and // replicated as normal. if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); - haBroker.deleteQueue(name, remoteHost); + haBroker.getBroker().deleteQueue(name, userId, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); } } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 635aa1f65c..4d6a5ee51e 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -64,6 +64,7 @@ using boost::dynamic_pointer_cast; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : systemId(b.getSystem()->getSystemId().data()), settings(s), + userId(s.username+"@"+b.getOptions().realm), broker(b), observer(new ConnectionObserver(*this, systemId)), role(new StandAlone), @@ -221,8 +222,4 @@ boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::stri broker.getExchanges().find(QueueReplicator::replicatorName(queueName))); } -void HaBroker::deleteQueue(const string& name, const string& connectionId) { - broker.deleteQueue(name, settings.username, connectionId); -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index c18e44d949..1a42111cea 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -102,9 +102,8 @@ class HaBroker : public management::Manageable boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName); - /**@param connectionId optional, only available on backup */ - void deleteQueue(const std::string& name, - const std::string& connectionId=std::string()); + /** Authenticated user ID for queue create/delete */ + std::string getUserId() const { return userId; } private: void setPublicUrl(const Url&); @@ -116,6 +115,7 @@ class HaBroker : public management::Manageable // Immutable members const types::Uuid systemId; const Settings settings; + const std::string userId; // Member variables protected by lock mutable sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 6a916d3f84..4c7dc2ef0d 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -169,7 +169,8 @@ void PrimaryTxObserver::rollback() { void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { // Don't destroy the tx-queue if there are connected subscriptions. if (ended && unfinished.empty()) { - haBroker.deleteQueue(txQueue->getName()); + haBroker.getBroker().deleteQueue( + txQueue->getName(), haBroker.getUserId(), string()); broker.getExchanges().destroy(getExchangeName()); } } diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp index 11f65aa574..4ab3c347a0 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp +++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp @@ -20,6 +20,8 @@ */ #include "StatusCheck.h" #include "ConnectionObserver.h" +#include "HaBroker.h" +#include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" @@ -41,27 +43,32 @@ const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"; class StatusCheckThread : public sys::Runnable { public: - StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self) - : url(addr), statusCheck(sc), brokerInfo(self) {} + StatusCheckThread(StatusCheck& sc, const qpid::Address& addr) + : url(addr), statusCheck(sc) {} void run(); private: Url url; StatusCheck& statusCheck; - BrokerInfo brokerInfo; }; void StatusCheckThread::run() { - QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url); + string logPrefix("Status check " + url.str() + ": "); Connection c; try { + // Check for self connections Variant::Map options, clientProperties; - clientProperties = brokerInfo.asMap(); // Detect self connections. clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups. clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str(); - clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap(); + clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap(); + // Set connection options + Settings settings(statusCheck.haBroker.getSettings()); + if (settings.username.size()) options["username"] = settings.username; + if (settings.password.size()) options["password"] = settings.password; + if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism; options["client-properties"] = clientProperties; - options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC; + sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval); + options["heartbeat"] = heartbeat/sys::TIME_SEC; c = Connection(url.str(), options); c.open(); @@ -81,7 +88,7 @@ void StatusCheckThread::run() { content["_object_id"] = oid; encode(content, request); s.send(request); - messaging::Duration timeout(statusCheck.linkHeartbeatInterval/sys::TIME_MSEC); + messaging::Duration timeout(heartbeat/sys::TIME_MSEC); Message response = r.fetch(timeout); session.acknowledge(); Variant::List contentIn; @@ -89,23 +96,22 @@ void StatusCheckThread::run() { if (contentIn.size() == 1) { Variant::Map details = contentIn.front().asMap()["_values"].asMap(); string status = details["status"].getString(); + QPID_LOG(debug, logPrefix << status); if (status != "joining") { statusCheck.setPromote(false); - QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is " - << status << ", this broker will refuse promotion."); + QPID_LOG(info, logPrefix << "Joining established cluster"); } - QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); } } catch(const exception& error) { - QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url - << ": " << error.what()); + // Its not an error to fail to connect to self. + if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0]) + QPID_LOG(warning, logPrefix << error.what()); } try { c.close(); } catch(...) {} delete this; } -StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self) - : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self) +StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb) {} StatusCheck::~StatusCheck() { @@ -116,7 +122,7 @@ StatusCheck::~StatusCheck() { void StatusCheck::setUrl(const Url& url) { Mutex::ScopedLock l(lock); for (size_t i = 0; i < url.size(); ++i) - threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo))); + threads.push_back(Thread(new StatusCheckThread(*this, url[i]))); } bool StatusCheck::canPromote() { diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h index 65ad3cefcf..8896969f55 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.h +++ b/qpid/cpp/src/qpid/ha/StatusCheck.h @@ -23,6 +23,7 @@ */ #include "BrokerInfo.h" +#include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" @@ -33,6 +34,8 @@ namespace qpid { namespace ha { +class HaBroker; + // TODO aconway 2012-12-21: This solution is incomplete. It will only protect // against bad promotion if there are READY brokers when this broker starts. // It will not help the situation where brokers became READY after this one starts. @@ -51,7 +54,7 @@ namespace ha { class StatusCheck { public: - StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self); + StatusCheck(HaBroker&); ~StatusCheck(); void setUrl(const Url&); bool canPromote(); @@ -59,12 +62,11 @@ class StatusCheck private: void setPromote(bool p); - std::string logPrefix; sys::Mutex lock; std::vector<sys::Thread> threads; bool promote; - sys::Duration linkHeartbeatInterval; - BrokerInfo brokerInfo; + HaBroker& haBroker; + friend class StatusCheckThread; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 75ad554cb9..10181c15df 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -222,7 +222,8 @@ void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) { if (!e.members.count(haBroker.getMembership().getSelf())) { QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating"); // Destroy the tx-queue, which will destroy this via QueueReplicator destroy. - haBroker.deleteQueue(getQueue()->getName()); + haBroker.getBroker().deleteQueue( + getQueue()->getName(), haBroker.getUserId(), string()); } } diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 5f21b8f8a4..1c51673763 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -125,7 +125,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", + "--log-enable=debug+:ha::", "--log-enable=debug+:acl::", # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 47a1be815a..79f068641d 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -565,6 +565,7 @@ class ReplicationTests(HaBrokerTest): # Verify that replication works with auth=yes and HA user has at least the following # privileges: aclf.write(""" +# HA user acl allow zag@QPID access queue acl allow zag@QPID create queue acl allow zag@QPID consume queue @@ -576,6 +577,9 @@ acl allow zag@QPID publish exchange acl allow zag@QPID delete exchange acl allow zag@QPID access method acl allow zag@QPID create link +# Normal user +acl allow zig@QPID all all + acl deny all all """) aclf.close() @@ -586,14 +590,16 @@ acl deny all all "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" ], client_credentials=Credentials("zag", "zag", "PLAIN")) - s0 = cluster[0].connect(username="zag", password="zag").session(); + c = cluster[0].connect(username="zig", password="zig") + s0 = c.session(); s0.receiver("q;{create:always}") s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") - cluster[1].wait_backup("q") - cluster[1].wait_backup("ex") - s1 = cluster[1].connect_admin().session(); # Uses Credentials above. - s1.sender("ex").send("foo"); - self.assertEqual(s1.receiver("q").fetch().content, "foo") + s0.sender("ex").send("foo"); + s1 = c.session(transactional=True) + s1.sender("ex").send("tx"); + cluster[1].assert_browse_backup("q", ["foo"]) + s1.commit() + cluster[1].assert_browse_backup("q", ["foo", "tx"]) def test_alternate_exchange(self): """Verify that alternate-exchange on exchanges and queues is propagated |
