summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2013-03-01 22:20:33 +0000
committerCharles E. Rolke <chug@apache.org>2013-03-01 22:20:33 +0000
commitb9f04e24129422f03cb1c4658be339e66b397179 (patch)
treea3e28c86015f99fcfbd813ee3781d0d2905fe000 /qpid/cpp/src
parenta2208ec6b1c7e9c1c9cca7ea22820b791d5127b8 (diff)
downloadqpid-python-b9f04e24129422f03cb1c4658be339e66b397179.tar.gz
QPID-4604: C++ Broker queue limits controlled by ACL file. Patch from Ernie Allen.
See https://reviews.apache.org/r/9703/ git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1451737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.cpp21
-rw-r--r--qpid/cpp/src/qpid/acl/AclData.cpp63
-rw-r--r--qpid/cpp/src/qpid/acl/AclData.h17
-rw-r--r--qpid/cpp/src/qpid/acl/AclReader.cpp76
-rw-r--r--qpid/cpp/src/qpid/acl/AclReader.h31
-rw-r--r--qpid/cpp/src/qpid/acl/AclResourceCounter.cpp87
-rw-r--r--qpid/cpp/src/qpid/acl/AclResourceCounter.h17
-rwxr-xr-xqpid/cpp/src/tests/acl.py162
8 files changed, 343 insertions, 131 deletions
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp
index 998510a17e..aeee42563c 100644
--- a/qpid/cpp/src/qpid/acl/Acl.cpp
+++ b/qpid/cpp/src/qpid/acl/Acl.cpp
@@ -64,7 +64,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
if (aclValues.aclMaxConnectTotal > AclData::getConnectMaxSpec())
throw Exception("--max-connections switch cannot be larger than " + AclData::getMaxConnectSpecStr());
if (aclValues.aclMaxQueuesPerUser > AclData::getConnectMaxSpec())
- throw Exception("--max-queues-per-user switch cannot be larger than " + AclData::getMaxConnectSpecStr());
+ throw Exception("--max-queues-per-user switch cannot be larger than " + AclData::getMaxQueueSpecStr());
agent = broker->getManagementAgent();
@@ -164,7 +164,18 @@ bool Acl::approveConnection(const qpid::broker::Connection& conn)
bool Acl::approveCreateQueue(const std::string& userId, const std::string& queueName)
{
- return resourceCounter->approveCreateQueue(userId, queueName);
+// return resourceCounter->approveCreateQueue(userId, queueName);
+ uint16_t queueLimit(0);
+
+ boost::shared_ptr<AclData> dataLocal;
+ {
+ Mutex::ScopedLock locker(dataLock);
+ dataLocal = data; //rcu copy
+ }
+
+ (void) dataLocal->getQueueQuotaForUser(userId, &queueLimit);
+
+ return resourceCounter->approveCreateQueue(userId, queueName, dataLocal->enforcingQueueQuotas(), queueLimit);
}
@@ -228,7 +239,7 @@ bool Acl::readAclFile(std::string& errorText)
bool Acl::readAclFile(std::string& aclFile, std::string& errorText) {
boost::shared_ptr<AclData> d(new AclData);
- AclReader ar(aclValues.aclMaxConnectPerUser);
+ AclReader ar(aclValues.aclMaxConnectPerUser, aclValues.aclMaxQueuesPerUser);
if (ar.read(aclFile, d)){
agent->raiseEvent(_qmf::EventFileLoadFailed("", ar.getError()));
errorText = ar.getError();
@@ -253,6 +264,10 @@ bool Acl::readAclFile(std::string& aclFile, std::string& errorText) {
QPID_LOG(debug, "ACL: Connection quotas are Enabled.");
}
+ if (data->enforcingQueueQuotas()){
+ QPID_LOG(debug, "ACL: Queue quotas are Enabled.");
+ }
+
data->aclSource = aclFile;
if (mgmtObject!=0){
mgmtObject->set_transferAcl(transferAcl?1:0);
diff --git a/qpid/cpp/src/qpid/acl/AclData.cpp b/qpid/cpp/src/qpid/acl/AclData.cpp
index ca866ab7d3..922f65ba69 100644
--- a/qpid/cpp/src/qpid/acl/AclData.cpp
+++ b/qpid/cpp/src/qpid/acl/AclData.cpp
@@ -35,6 +35,7 @@ namespace acl {
const std::string AclData::ACL_KEYWORD_GROUP = "group";
const std::string AclData::ACL_KEYWORD_QUOTA = "quota";
const std::string AclData::ACL_KEYWORD_QUOTA_CONNECTIONS = "connections";
+ const std::string AclData::ACL_KEYWORD_QUOTA_QUEUES = "queues";
const char AclData::ACL_SYMBOL_WILDCARD = '*';
const std::string AclData::ACL_KEYWORD_WILDCARD = "*";
const char AclData::ACL_SYMBOL_LINE_CONTINUATION = '\\';
@@ -47,7 +48,9 @@ namespace acl {
transferAcl(false),
aclSource("UNKNOWN"),
connQuotaRulesExist(false),
- connQuotaRuleSettings(new quotaRuleSet)
+ connQuotaRuleSettings(new quotaRuleSet),
+ queueQuotaRulesExist(false),
+ queueQuotaRuleSettings(new quotaRuleSet)
{
for (unsigned int cnt=0; cnt< qpid::acl::ACTIONSIZE; cnt++)
{
@@ -73,6 +76,8 @@ namespace acl {
transferAcl = false;
connQuotaRulesExist = false;
connQuotaRuleSettings->clear();
+ queueQuotaRulesExist = false;
+ queueQuotaRuleSettings->clear();
}
@@ -561,6 +566,62 @@ namespace acl {
//
//
//
+ void AclData::setQueueQuotaRuleSettings (
+ bool rulesExist, boost::shared_ptr<quotaRuleSet> quotaPtr)
+ {
+ queueQuotaRulesExist = rulesExist;
+ queueQuotaRuleSettings = quotaPtr;
+ }
+
+
+ //
+ // getQueueQuotaForUser
+ //
+ // Return the true or false value of queueQuotaRulesExist,
+ // indicating whether any kind of lookup was done or not.
+ //
+ // When lookups are performed return the result value of
+ // 1. The user's setting else
+ // 2. The 'all' user setting else
+ // 3. Zero
+ // When lookups are not performed then return a result value of Zero.
+ //
+ bool AclData::getQueueQuotaForUser(const std::string& theUserName,
+ uint16_t* theResult) const {
+ if (queueQuotaRulesExist) {
+ // look for this user explicitly
+ quotaRuleSetItr nameItr = (*queueQuotaRuleSettings).find(theUserName);
+ if (nameItr != (*queueQuotaRuleSettings).end()) {
+ QPID_LOG(trace, "ACL: Queue quota for user " << theUserName
+ << " explicitly set to : " << (*nameItr).second);
+ *theResult = (*nameItr).second;
+ } else {
+ // Look for the 'all' user
+ nameItr = (*queueQuotaRuleSettings).find(ACL_KEYWORD_ALL);
+ if (nameItr != (*queueQuotaRuleSettings).end()) {
+ QPID_LOG(trace, "ACL: Queue quota for user " << theUserName
+ << " chosen through value for 'all' : " << (*nameItr).second);
+ *theResult = (*nameItr).second;
+ } else {
+ // Neither userName nor "all" found.
+ QPID_LOG(trace, "ACL: Queue quota for user " << theUserName
+ << " absent in quota settings. Return value : 0");
+ *theResult = 0;
+ }
+ }
+ } else {
+ // Rules do not exist
+ QPID_LOG(trace, "ACL: Queue quota for user " << theUserName
+ << " unavailable; quota settings are not specified. Return value : 0");
+ *theResult = 0;
+ }
+ return queueQuotaRulesExist;
+ }
+
+
+ //
+ //
+ //
AclData::~AclData()
{
clear();
diff --git a/qpid/cpp/src/qpid/acl/AclData.h b/qpid/cpp/src/qpid/acl/AclData.h
index 43cb5193f5..c561b95e09 100644
--- a/qpid/cpp/src/qpid/acl/AclData.h
+++ b/qpid/cpp/src/qpid/acl/AclData.h
@@ -144,6 +144,7 @@ public:
static const std::string ACL_KEYWORD_GROUP;
static const std::string ACL_KEYWORD_QUOTA;
static const std::string ACL_KEYWORD_QUOTA_CONNECTIONS;
+ static const std::string ACL_KEYWORD_QUOTA_QUEUES;
static const char ACL_SYMBOL_WILDCARD;
static const std::string ACL_KEYWORD_WILDCARD;
static const char ACL_SYMBOL_LINE_CONTINUATION;
@@ -164,6 +165,13 @@ public:
bool enforcingConnectionQuotas() { return connQuotaRulesExist; }
bool getConnQuotaForUser(const std::string&, uint16_t*) const;
+ // Per user queue quotas extracted from acl rule file
+ // Set by reader
+ void setQueueQuotaRuleSettings (bool, boost::shared_ptr<quotaRuleSet>);
+ // Get by queue approvers
+ bool enforcingQueueQuotas() { return queueQuotaRulesExist; }
+ bool getQueueQuotaForUser(const std::string&, uint16_t*) const;
+
/** getConnectMaxSpec
* Connection quotas are held in uint16_t variables.
* This function specifies the largest value that a user is allowed
@@ -181,6 +189,12 @@ public:
return "65530";
}
+ static uint16_t getQueueMaxSpec() {
+ return 65530;
+ }
+ static std::string getMaxQueueSpecStr() {
+ return "65530";
+ }
AclData();
virtual ~AclData();
@@ -197,6 +211,9 @@ private:
// Per-user connection quota
bool connQuotaRulesExist;
boost::shared_ptr<quotaRuleSet> connQuotaRuleSettings; // Map of user-to-N values from rule file
+ // Per-user queue quota
+ bool queueQuotaRulesExist;
+ boost::shared_ptr<quotaRuleSet> queueQuotaRuleSettings; // Map of user-to-N values from rule file
};
}} // namespace qpid::acl
diff --git a/qpid/cpp/src/qpid/acl/AclReader.cpp b/qpid/cpp/src/qpid/acl/AclReader.cpp
index 0fc6437a6c..1fd5445b52 100644
--- a/qpid/cpp/src/qpid/acl/AclReader.cpp
+++ b/qpid/cpp/src/qpid/acl/AclReader.cpp
@@ -222,6 +222,8 @@ namespace acl {
// connection quota
d->setConnQuotaRuleSettings(connQuotaRulesExist, connQuota);
+ // queue quota
+ d->setQueueQuotaRuleSettings(queueQuotaRulesExist, queueQuota);
}
@@ -238,11 +240,15 @@ namespace acl {
}
}
- AclReader::AclReader(uint16_t theCliMaxConnPerUser) : lineNumber(0), contFlag(false),
+ AclReader::AclReader(uint16_t theCliMaxConnPerUser, uint16_t theCliMaxQueuesPerUser) :
+ lineNumber(0), contFlag(false),
validationMap(new AclHelper::objectMap),
cliMaxConnPerUser (theCliMaxConnPerUser),
connQuotaRulesExist(false),
- connQuota(new AclData::quotaRuleSet) {
+ connQuota(new AclData::quotaRuleSet),
+ cliMaxQueuesPerUser (theCliMaxQueuesPerUser),
+ queueQuotaRulesExist(false),
+ queueQuota(new AclData::quotaRuleSet) {
AclHelper::loadValidationMap(validationMap);
names.insert(AclData::ACL_KEYWORD_WILDCARD);
}
@@ -267,6 +273,11 @@ namespace acl {
connQuotaRulesExist = true;
(*connQuota)[AclData::ACL_KEYWORD_ALL] = cliMaxConnPerUser;
}
+ // Propagate nonzero per-user max queue setting from CLI
+ if (cliMaxQueuesPerUser > 0) {
+ queueQuotaRulesExist = true;
+ (*queueQuota)[AclData::ACL_KEYWORD_ALL] = cliMaxQueuesPerUser;
+ }
// Loop to process the Acl file
try {
bool err = false;
@@ -296,7 +307,8 @@ namespace acl {
}
printNames();
printRules();
- printConnectionQuotas();
+ printQuotas(AclData::ACL_KEYWORD_QUOTA_CONNECTIONS, connQuota);
+ printQuotas(AclData::ACL_KEYWORD_QUOTA_QUEUES, queueQuota);
loadDecisionData(d);
return 0;
@@ -370,61 +382,69 @@ namespace acl {
}
if (toks[1].compare(AclData::ACL_KEYWORD_QUOTA_CONNECTIONS) == 0) {
- return processQuotaConnLine(toks);
+ if (processQuotaLine(toks, AclData::ACL_KEYWORD_QUOTA_CONNECTIONS, AclData::getConnectMaxSpec(), connQuota)) {
+ // We have processed a connection quota rule
+ connQuotaRulesExist = true;
+ return true;
+ }
+ } else if (toks[1].compare(AclData::ACL_KEYWORD_QUOTA_QUEUES) == 0) {
+ if (processQuotaLine(toks, AclData::ACL_KEYWORD_QUOTA_QUEUES, AclData::getConnectMaxSpec(), queueQuota)) {
+ // We have processed a queue quota rule
+ queueQuotaRulesExist = true;
+ return true;
+ }
} else {
errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
<< ", Quota type \"" << toks[1] << "\" unrecognized.";
return false;
}
+ return false;
}
- // Process 'quota connections' rule lines
+ // Process quota rule lines
// Return true if the line is successfully processed without errors
- bool AclReader::processQuotaConnLine(tokList& toks) {
+ bool AclReader::processQuotaLine(tokList& toks, const std::string theNoun, uint16_t maxSpec, aclQuotaRuleSet theRules) {
const unsigned toksSize = toks.size();
- uint16_t nConns(0);
+ uint16_t nEntities(0);
try {
- nConns = boost::lexical_cast<uint16_t>(toks[2]);
+ nEntities = boost::lexical_cast<uint16_t>(toks[2]);
} catch(const boost::bad_lexical_cast&) {
errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
- << ", Connection quota value \"" << toks[2]
+ << ", " << theNoun << " quota value \"" << toks[2]
<< "\" cannot be converted to a 16-bit unsigned integer.";
return false;
}
- // limit check the connection setting
- if (nConns > AclData::getConnectMaxSpec())
+ // limit check the setting
+ if (nEntities > maxSpec)
{
errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
- << ", Connection quota value \"" << toks[2]
+ << ", " << theNoun << " quota value \"" << toks[2]
<< "\" exceeds maximum configuration setting of "
- << AclData::getConnectMaxSpec();
+ << maxSpec;
return false;
}
- // Apply the connection count to all names in rule
+ // Apply the ount to all names in rule
for (unsigned idx = 3; idx < toksSize; idx++) {
if (groups.find(toks[idx]) == groups.end()) {
// This is the name of an individual, not a group
- (*connQuota)[toks[idx]] = nConns;
+ (*theRules)[toks[idx]] = nEntities;
} else {
- if (!processQuotaConnGroup(toks[idx], nConns))
+ if (!processQuotaGroup(toks[idx], nEntities, theRules))
return false;
}
}
- // We have processed a connection quota rule
- connQuotaRulesExist = true;
-
return true;
}
- // Process 'quota connections' group expansion
+ // Process quota group expansion
// Return true if the quota is applied to all members of the group
- bool AclReader::processQuotaConnGroup(const std::string& theGroup, uint16_t theQuota) {
+ bool AclReader::processQuotaGroup(const std::string& theGroup, uint16_t theQuota, aclQuotaRuleSet theRules) {
gmCitr citr = groups.find(theGroup);
if (citr == groups.end()) {
@@ -435,9 +455,9 @@ namespace acl {
for (nsCitr gni=citr->second->begin(); gni!=citr->second->end(); gni++) {
if (groups.find(*gni) == groups.end()) {
- (*connQuota)[*gni] = theQuota;
+ (*theRules)[*gni] = theQuota;
} else {
- if (!processQuotaConnGroup(*gni, theQuota))
+ if (!processQuotaGroup(*gni, theQuota, theRules))
return false;
}
}
@@ -445,14 +465,14 @@ namespace acl {
}
- void AclReader::printConnectionQuotas() const {
- QPID_LOG(debug, "ACL: connection quota: " << (*connQuota).size() << " rules found:");
+ void AclReader::printQuotas(const std::string theNoun, aclQuotaRuleSet theRules) const {
+ QPID_LOG(debug, "ACL: " << theNoun << " quota: " << (*theRules).size() << " rules found:");
int cnt = 1;
- for (AclData::quotaRuleSetItr itr=(*connQuota).begin();
- itr != (*connQuota).end();
+ for (AclData::quotaRuleSetItr itr=(*theRules).begin();
+ itr != (*theRules).end();
++itr,++cnt) {
QPID_LOG(debug, "ACL: quota " << cnt << " : " << (*itr).second
- << " connections for " << (*itr).first)
+ << " " << theNoun << " for " << (*itr).first)
}
}
diff --git a/qpid/cpp/src/qpid/acl/AclReader.h b/qpid/cpp/src/qpid/acl/AclReader.h
index 1fa374c59c..e3a345998d 100644
--- a/qpid/cpp/src/qpid/acl/AclReader.h
+++ b/qpid/cpp/src/qpid/acl/AclReader.h
@@ -75,16 +75,17 @@ class AclReader {
private:
void processName(const std::string& name, const groupMap& groups);
};
- typedef boost::shared_ptr<aclRule> aclRulePtr;
- typedef std::vector<aclRulePtr> ruleList;
- typedef ruleList::const_iterator rlCitr;
+ typedef boost::shared_ptr<AclData::quotaRuleSet> aclQuotaRuleSet;
+ typedef boost::shared_ptr<aclRule> aclRulePtr;
+ typedef std::vector<aclRulePtr> ruleList;
+ typedef ruleList::const_iterator rlCitr;
- typedef std::vector<std::string> tokList;
- typedef tokList::const_iterator tlCitr;
+ typedef std::vector<std::string> tokList;
+ typedef tokList::const_iterator tlCitr;
- typedef std::set<std::string> keywordSet;
- typedef keywordSet::const_iterator ksCitr;
- typedef std::pair<std::string, std::string> nvPair; // Name-Value pair
+ typedef std::set<std::string> keywordSet;
+ typedef keywordSet::const_iterator ksCitr;
+ typedef std::pair<std::string, std::string> nvPair; // Name-Value pair
std::string fileName;
int lineNumber;
@@ -97,7 +98,7 @@ class AclReader {
std::ostringstream errorStream;
public:
- AclReader(uint16_t cliMaxConnPerUser);
+ AclReader(uint16_t cliMaxConnPerUser, uint16_t cliMaxQueuesPerUser);
virtual ~AclReader();
int read(const std::string& fn, boost::shared_ptr<AclData> d); // return=0 for success
std::string getError();
@@ -118,16 +119,20 @@ class AclReader {
bool isValidUserName(const std::string& name);
bool processQuotaLine(tokList& toks);
- bool processQuotaConnLine(tokList& toks);
- bool processQuotaConnGroup(const std::string&, uint16_t);
- void printConnectionQuotas() const;
+ bool processQuotaLine(tokList& toks, const std::string theNoun, uint16_t maxSpec, aclQuotaRuleSet theRules);
+ bool processQuotaGroup(const std::string&, uint16_t, aclQuotaRuleSet theRules);
+ void printQuotas(const std::string theNoun, aclQuotaRuleSet theRules) const;
static bool isValidGroupName(const std::string& name);
static nvPair splitNameValuePair(const std::string& nvpString);
const uint16_t cliMaxConnPerUser;
bool connQuotaRulesExist;
- boost::shared_ptr<AclData::quotaRuleSet> connQuota;
+ aclQuotaRuleSet connQuota;
+
+ const uint16_t cliMaxQueuesPerUser;
+ bool queueQuotaRulesExist;
+ aclQuotaRuleSet queueQuota;
};
}} // namespace qpid::acl
diff --git a/qpid/cpp/src/qpid/acl/AclResourceCounter.cpp b/qpid/cpp/src/qpid/acl/AclResourceCounter.cpp
index 66dfd0777e..2527af6375 100644
--- a/qpid/cpp/src/qpid/acl/AclResourceCounter.cpp
+++ b/qpid/cpp/src/qpid/acl/AclResourceCounter.cpp
@@ -54,34 +54,43 @@ ResourceCounter::~ResourceCounter() {}
// Called with lock held.
//
bool ResourceCounter::limitApproveLH(
- const std::string& theTitle,
countsMap_t& theMap,
const std::string& theName,
uint16_t theLimit,
- bool emitLog) {
+ bool emitLog,
+ bool enforceLimit) {
bool result(true);
- if (theLimit > 0) {
- uint16_t count;
- countsMap_t::iterator eRef = theMap.find(theName);
- if (eRef != theMap.end()) {
- count = (uint16_t)(*eRef).second;
- result = count < theLimit;
- if (result) {
- count += 1;
- (*eRef).second = count;
+ uint16_t count;
+ countsMap_t::iterator eRef = theMap.find(theName);
+ if (eRef != theMap.end()) {
+ count = (uint16_t)(*eRef).second;
+ result = (enforceLimit ? count < theLimit : true);
+ if (result) {
+ count += 1;
+ (*eRef).second = count;
+ }
+ } else {
+ // user not found in map
+ if (enforceLimit) {
+ if (theLimit > 0) {
+ theMap[theName] = count = 1;
+ } else {
+ count = 0;
+ result = false;
}
- } else {
- // Not found
- theMap[theName] = count = 1;
}
- if (emitLog) {
- QPID_LOG(trace, theTitle << theName
- << " limit=" << theLimit
- << " curValue=" << count
- << " result=" << (result ? "allow" : "deny"));
+ else {
+ // not enforcing the limit
+ theMap[theName] = count = 1;
}
}
+ if (emitLog) {
+ QPID_LOG(trace, "ACL QueueApprover user=" << theName
+ << " limit=" << theLimit
+ << " curValue=" << count
+ << " result=" << (result ? "allow" : "deny"));
+ }
return result;
}
@@ -92,24 +101,21 @@ bool ResourceCounter::limitApproveLH(
// Decrement the name's count in map.
// called with dataLock already taken
//
-void ResourceCounter::releaseLH(
- const std::string& theTitle, countsMap_t& theMap, const std::string& theName, uint16_t theLimit) {
-
- if (theLimit > 0) {
- countsMap_t::iterator eRef = theMap.find(theName);
- if (eRef != theMap.end()) {
- uint16_t count = (uint16_t) (*eRef).second;
- assert (count > 0);
- if (1 == count) {
- theMap.erase (eRef);
- } else {
- (*eRef).second = count - 1;
- }
+void ResourceCounter::releaseLH(countsMap_t& theMap, const std::string& theName) {
+
+ countsMap_t::iterator eRef = theMap.find(theName);
+ if (eRef != theMap.end()) {
+ uint16_t count = (uint16_t) (*eRef).second;
+ assert (count > 0);
+ if (1 == count) {
+ theMap.erase (eRef);
} else {
- // User had no connections.
- QPID_LOG(notice, theTitle << theName
- << "' not found in resource count pool");
+ (*eRef).second = count - 1;
}
+ } else {
+ // User had no connections.
+ QPID_LOG(notice, "ACL resource counter: Queue owner for queue '" << theName
+ << "' not found in resource count pool");
}
}
@@ -119,11 +125,14 @@ void ResourceCounter::releaseLH(
// Count an attempted queue creation by this user.
// Disapprove if over limit.
//
-bool ResourceCounter::approveCreateQueue(const std::string& userId, const std::string& queueName)
+bool ResourceCounter::approveCreateQueue(const std::string& userId,
+ const std::string& queueName,
+ bool enforcingQueueQuotas,
+ uint16_t queueUserQuota )
{
Mutex::ScopedLock locker(dataLock);
- bool okByQ = limitApproveLH("ACL Queue creation approver. userId:", queuePerUserMap, userId, queueLimit, true);
+ bool okByQ = limitApproveLH(queuePerUserMap, userId, queueUserQuota, true, enforcingQueueQuotas);
if (okByQ) {
// Queue is owned by this userId
@@ -133,7 +142,7 @@ bool ResourceCounter::approveCreateQueue(const std::string& userId, const std::s
<< "' queue '" << queueName << "'");
} else {
- QPID_LOG(error, "Client max queue count limit of " << queueLimit
+ QPID_LOG(error, "Client max queue count limit of " << queueUserQuota
<< " exceeded by '" << userId << "' creating queue '"
<< queueName << "'. Queue creation denied.");
@@ -153,7 +162,7 @@ void ResourceCounter::recordDestroyQueue(const std::string& queueName)
queueOwnerMap_t::iterator eRef = queueOwnerMap.find(queueName);
if (eRef != queueOwnerMap.end()) {
- releaseLH("ACL resource counter: Queue owner for queue '", queuePerUserMap, (*eRef).second, queueLimit);
+ releaseLH(queuePerUserMap, (*eRef).second);
queueOwnerMap.erase(eRef);
} else {
diff --git a/qpid/cpp/src/qpid/acl/AclResourceCounter.h b/qpid/cpp/src/qpid/acl/AclResourceCounter.h
index f5995eb961..79c4d43e38 100644
--- a/qpid/cpp/src/qpid/acl/AclResourceCounter.h
+++ b/qpid/cpp/src/qpid/acl/AclResourceCounter.h
@@ -52,24 +52,25 @@ private:
countsMap_t queuePerUserMap;
/** Return approval for proposed resource creation */
- bool limitApproveLH(const std::string& theTitle,
- countsMap_t& theMap,
+ bool limitApproveLH(countsMap_t& theMap,
const std::string& theName,
uint16_t theLimit,
- bool emitLog);
+ bool emitLog,
+ bool enforceLimit);
/** Release a connection */
- void releaseLH(const std::string& theTitle,
- countsMap_t& theMap,
- const std::string& theName,
- uint16_t theLimit);
+ void releaseLH(countsMap_t& theMap,
+ const std::string& theName);
public:
ResourceCounter(Acl& acl, uint16_t ql);
~ResourceCounter();
// Queue counting
- bool approveCreateQueue(const std::string& userId, const std::string& queueName);
+ bool approveCreateQueue(const std::string& userId,
+ const std::string& queueName,
+ bool enforcingQueueQuotas,
+ uint16_t queueUserQuota );
void recordDestroyQueue(const std::string& queueName);
};
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 595063d6c5..94ede22783 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -2871,58 +2871,142 @@ class ACLTests(TestBase010):
# Queue per-user quota
#=====================================
- def test_queue_per_user_quota(self):
+ def queue_quota(self, user, passwd, count, byPort=None):
+ """ Helper method to:
+ - create a number of queues (should succeed)
+ - create too many queues (should fail)
+ - create another queue after deleting a queue (should succeed)
"""
- Test ACL queue counting limits.
- port_q has a limit of 2
- """
- # bob should be able to create two queues
- session = self.get_session_by_port('bob','bob', self.port_q())
try:
- session.queue_declare(queue="queue1")
- session.queue_declare(queue="queue2")
- except qpid.session.SessionException, e:
- self.fail("Error during queue create request");
+ if byPort:
+ session = self.get_session_by_port(user, passwd, byPort)
+ else:
+ session = self.get_session(user, passwd)
+ except Exception, e:
+ self.fail("Unexpected error creating session for %s: %s" % (user, str(e)))
- # third queue should fail
+ # Should be able to create count queues per user
try:
- session.queue_declare(queue="queue3")
- self.fail("Should not be able to create third queue")
+ for i in range(count):
+ session.queue_declare(queue="%s%d" % (user, i))
except Exception, e:
- result = None
- session = self.get_session_by_port('bob','bob', self.port_q())
-
- # alice should be able to create two queues
- session2 = self.get_session_by_port('alice','alice', self.port_q())
+ self.fail("Could not create %s for %s: %s" % ("%s%d" % (user, i), user, str(e)))
+ # next queue should fail
try:
- session2.queue_declare(queue="queuea1")
- session2.queue_declare(queue="queuea2")
- except qpid.session.SessionException, e:
- self.fail("Error during queue create request");
+ session.queue_declare(queue="%s%d" % (user, count))
+ self.fail("Should not be able to create another queue for user %s" % user)
+ except Exception, e:
+ if byPort:
+ session = self.get_session_by_port(user, passwd, byPort)
+ else:
+ session = self.get_session(user, passwd)
- # third queue should fail
+ if count > 0:
+ # Deleting a queue should allow another queue.
+ session.queue_delete(queue="%s0" % user)
+ try:
+ session.queue_declare(queue="%s%d" % (user, count))
+ except Exception, e:
+ self.fail("Could not recreate additional queue for user %s: %s " % (user, str(e)))
+
+ # Clean up
+ for i in range(1, count+1):
+ session.queue_delete(queue="%s%d" % (user, i))
try:
- session2.queue_declare(queue="queuea3")
- self.fail("Should not be able to create third queue")
+ session.close()
except Exception, e:
- result = None
- session2 = self.get_session_by_port('alice','alice', self.port_q())
+ pass
- # bob should be able to delete a queue and create another
- try:
- session.queue_delete(queue="queue1")
- session.queue_declare(queue="queue3")
- except qpid.session.SessionException, e:
- self.fail("Error during queue create request");
+ def test_queue_per_named_user_quota(self):
+ """
+ Test ACL queue counting limits per named user.
+ """
+ aclf = self.get_acl_file()
+ aclf.write('quota queues 2 ted@QPID carrol@QPID\n')
+ aclf.write('quota queues 1 edward@QPID\n')
+ aclf.write('quota queues 0 mick@QPID\n')
+ aclf.write('acl allow all all')
+ aclf.close()
- # alice should be able to delete a queue and create another
- try:
- session2.queue_delete(queue="queuea1")
- session2.queue_declare(queue="queuea3")
- except qpid.session.SessionException, e:
- self.fail("Error during queue create request");
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # named users should be able to create specified number of queues
+ self.queue_quota("ted", 'ted', 2)
+ self.queue_quota("carrol", 'carrol', 2)
+ self.queue_quota("edward", 'edward', 1)
+
+ # User with quota of 0 is denied
+ self.queue_quota("mick", 'mick', 0)
+
+ # User not named in quotas is denied
+ self.queue_quota("dan", 'dan', 0)
+
+ def test_queue_per_user_quota(self):
+ """
+ Test ACL queue counting limits.
+ port_q has a limit of 2
+ """
+ # bob should be able to create two queues
+ self.queue_quota("bob", 'bob', 2, self.port_q())
+
+ # alice should be able to create two queues
+ self.queue_quota("alice", 'alice', 2, self.port_q())
+
+
+ def test_queue_limits_by_unnamed_all(self):
+ """
+ Test ACL control queue limits
+ """
+ aclf = self.get_acl_file()
+ aclf.write('quota queues 2 aliceQUA@QPID bobQUA@QPID\n')
+ aclf.write('quota queues 1 all\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # By username should be able to connect twice per user
+ self.queue_quota('aliceQUA', 'alice', 2)
+ self.queue_quota('bobQUA', 'bob', 2)
+
+ # User not named in quotas gets 'all' quota
+ self.queue_quota('charlieQUA', 'charlie', 1)
+
+
+ def test_queue_limits_by_group(self):
+ """
+ Test ACL control queue limits
+ """
+ aclf = self.get_acl_file()
+ aclf.write('group hobbits frodoGR@QPID samGR@QPID merryGR@QPID\n')
+ aclf.write('quota queues 2 gandalfGR@QPID aragornGR@QPID\n')
+ aclf.write('quota queues 2 hobbits rosieGR@QPID\n')
+ aclf.write('# user and groups may be overwritten. Should use last value\n')
+ aclf.write('quota queues 3 aragornGR@QPID hobbits\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # gandalf gets 2
+ self.queue_quota('gandalfGR', 'gandalf', 2)
+
+ # aragorn gets 3
+ self.queue_quota('aragornGR', 'aragorn', 3)
+
+ # frodo gets 3
+ self.queue_quota('frodoGR', 'frodo', 3)
+
+ # User not named in quotas is denied
+ self.queue_quota('bilboGR', 'bilbo', 0)
class BrokerAdmin:
def __init__(self, broker, username=None, password=None):