summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-10-08 14:05:59 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-10-10 13:51:23 -0700
commitfc358077686b1cc4044a3adfc61757a63d816e76 (patch)
tree31993e62c8b3820e87b6c9720ef2ddd4267c4951
parent2e4ecc29de9c65224e2c6870a4c0a26cc90ec784 (diff)
downloadceph-fc358077686b1cc4044a3adfc61757a63d816e76.tar.gz
rgw: protect against concurrent async quota updates
Leverage the cache lru_map locking for making sure that we don't end up with more than a single concurrent async update on the same bucket within the same update window. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/common/lru_map.h8
-rw-r--r--src/rgw/rgw_quota.cc31
2 files changed, 29 insertions, 10 deletions
diff --git a/src/common/lru_map.h b/src/common/lru_map.h
index 62182dd26e8..98fb44a932e 100644
--- a/src/common/lru_map.h
+++ b/src/common/lru_map.h
@@ -24,7 +24,7 @@ public:
class UpdateContext {
public:
virtual ~UpdateContext() {}
- virtual void update(V& v) = 0;
+ virtual bool update(V& v) = 0;
};
bool _find(const K& key, V *value, UpdateContext *ctx);
@@ -51,8 +51,10 @@ bool lru_map<K, V>::_find(const K& key, V *value, UpdateContext *ctx)
entry& e = iter->second;
entries_lru.erase(e.lru_iter);
+ bool r = true;
+
if (ctx)
- ctx->update(e.value);
+ r = ctx->update(e.value);
if (value)
*value = e.value;
@@ -60,7 +62,7 @@ bool lru_map<K, V>::_find(const K& key, V *value, UpdateContext *ctx)
entries_lru.push_front(key);
e.lru_iter = entries_lru.begin();
- return true;
+ return r;
}
template <class K, class V>
diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc
index 984b3b57dea..11da4bf0177 100644
--- a/src/rgw/rgw_quota.cc
+++ b/src/rgw/rgw_quota.cc
@@ -114,15 +114,30 @@ void AsyncRefreshHandler::handle_response(int r)
cache->async_refresh_response(bucket, bs);
}
+class RGWBucketStatsAsyncTestSet : public lru_map<rgw_bucket, RGWQuotaBucketStats>::UpdateContext {
+ int objs_delta;
+ uint64_t added_bytes;
+ uint64_t removed_bytes;
+public:
+ RGWBucketStatsAsyncTestSet() {}
+ bool update(RGWQuotaBucketStats& entry) {
+ if (entry.async_refresh_time.sec() == 0)
+ return false;
+
+ entry.async_refresh_time = utime_t(0, 0);
+
+ return true;
+ }
+};
+
int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs)
{
-#if 0
- if (qs.async_update_flag.inc() != 1) { /* are we the first one here? */
- qs.async_update_flag.dec();
+ /* protect against multiple updates */
+ RGWBucketStatsAsyncTestSet test_update;
+ if (!stats_map.find_and_update(bucket, NULL, &test_update)) {
+ /* most likely we just raced with another update */
return 0;
}
-#endif
-#warning protect against multiple updates
async_refcount->get();
@@ -166,7 +181,7 @@ int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& st
RGWQuotaBucketStats qs;
utime_t now = ceph_clock_now(store->ctx());
if (stats_map.find(bucket, qs)) {
- if (now >= qs.async_refresh_time) {
+ if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
int r = async_refresh(bucket, qs);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
@@ -197,13 +212,15 @@ class RGWBucketStatsUpdate : public lru_map<rgw_bucket, RGWQuotaBucketStats>::Up
public:
RGWBucketStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) :
objs_delta(_objs_delta), added_bytes(_added_bytes), removed_bytes(_removed_bytes) {}
- void update(RGWQuotaBucketStats& entry) {
+ bool update(RGWQuotaBucketStats& entry) {
uint64_t rounded_kb_added = rgw_rounded_kb(added_bytes);
uint64_t rounded_kb_removed = rgw_rounded_kb(removed_bytes);
entry.stats.num_kb_rounded += (rounded_kb_added - rounded_kb_removed);
entry.stats.num_kb += (added_bytes - removed_bytes) / 1024;
entry.stats.num_objects += objs_delta;
+
+ return true;
}
};