summaryrefslogtreecommitdiff
path: root/src/rgw
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2011-08-23 18:52:08 -0700
committerYehuda Sadeh <yehuda@hq.newdream.net>2011-08-23 18:52:08 -0700
commit190b89364c81eda645a129976ca80e0db146ddd9 (patch)
treeacc943b6c319b9f1722d80df0a79b2535451dd9e /src/rgw
parentadf1efe9a94efa9aed2ebda9bbe6e956769d39c6 (diff)
parent1a7eac4d82ec34e04af30f3858fec5231408576e (diff)
downloadceph-190b89364c81eda645a129976ca80e0db146ddd9.tar.gz
Merge remote-tracking branch 'origin/master' into wip-decouple-bucket
Conflicts: src/rgw/rgw_rados.cc
Diffstat (limited to 'src/rgw')
-rw-r--r--src/rgw/rgw_access.h5
-rw-r--r--src/rgw/rgw_cache.cc7
-rw-r--r--src/rgw/rgw_cache.h3
-rw-r--r--src/rgw/rgw_common.h7
-rw-r--r--src/rgw/rgw_op.cc3
-rw-r--r--src/rgw/rgw_rados.cc101
-rw-r--r--src/rgw/rgw_rados.h37
7 files changed, 133 insertions, 30 deletions
diff --git a/src/rgw/rgw_access.h b/src/rgw/rgw_access.h
index 22a1ef913f2..2038bfd73fa 100644
--- a/src/rgw/rgw_access.h
+++ b/src/rgw/rgw_access.h
@@ -188,7 +188,7 @@ public:
info.dst_ofs = dst_ofs;
info.len = size;
v.push_back(info);
- return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, true);
+ return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, true, false);
}
virtual int clone_objs(void *ctx, rgw_obj& dst_obj,
@@ -196,7 +196,8 @@ public:
map<string, bufferlist> attrs,
string& category,
time_t *pmtime,
- bool truncate_dest) { return -ENOTSUP; }
+ bool truncate_dest,
+ bool exclusive) { return -ENOTSUP; }
/**
* a simple object read without keeping state
*/
diff --git a/src/rgw/rgw_cache.cc b/src/rgw/rgw_cache.cc
index 4dde65ffe09..249d8baa4a2 100644
--- a/src/rgw/rgw_cache.cc
+++ b/src/rgw/rgw_cache.cc
@@ -7,6 +7,8 @@ using namespace std;
int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask)
{
+ Mutex::Locker l(lock);
+
map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name);
if (iter == cache_map.end()) {
RGW_LOG(10) << "cache get: name=" << name << " : miss" << dendl;
@@ -29,6 +31,8 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask)
void ObjectCache::put(string& name, ObjectCacheInfo& info)
{
+ Mutex::Locker l(lock);
+
RGW_LOG(10) << "cache put: name=" << name << dendl;
map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name);
if (iter == cache_map.end()) {
@@ -67,6 +71,8 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info)
void ObjectCache::remove(string& name)
{
+ Mutex::Locker l(lock);
+
map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name);
if (iter == cache_map.end())
return;
@@ -93,7 +99,6 @@ void ObjectCache::touch_lru(string& name, std::list<string>::iterator& lru_iter)
lru_iter--;
RGW_LOG(10) << "adding " << name << " to cache LRU end" << dendl;
} else {
- string name = *lru_iter;
RGW_LOG(10) << "moving " << name << " to cache LRU end" << dendl;
lru.erase(lru_iter);
lru.push_back(name);
diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h
index 1fe8d09410a..d5c8c11ed13 100644
--- a/src/rgw/rgw_cache.h
+++ b/src/rgw/rgw_cache.h
@@ -104,11 +104,12 @@ struct ObjectCacheEntry {
class ObjectCache {
std::map<string, ObjectCacheEntry> cache_map;
std::list<string> lru;
+ Mutex lock;
void touch_lru(string& name, std::list<string>::iterator& lru_iter);
void remove_lru(string& name, std::list<string>::iterator& lru_iter);
public:
- ObjectCache() { }
+ ObjectCache() : lock("ObjectCache") { }
int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask);
void put(std::string& name, ObjectCacheInfo& bl);
void remove(std::string& name);
diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h
index 9cbbb4f12d8..aa276dbde2b 100644
--- a/src/rgw/rgw_common.h
+++ b/src/rgw/rgw_common.h
@@ -661,6 +661,13 @@ public:
set_key(orig_key);
}
+ string loc() {
+ if (orig_key.empty())
+ return orig_obj;
+ else
+ return orig_key;
+ }
+
static bool translate_raw_obj(string& obj, string& ns) {
if (ns.empty()) {
if (obj[0] != '_')
diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc
index d8548882110..9ad7607cd54 100644
--- a/src/rgw/rgw_op.cc
+++ b/src/rgw/rgw_op.cc
@@ -274,6 +274,7 @@ void RGWGetObj::execute()
goto done;
while (ofs <= end) {
+ data = NULL;
ret = rgwstore->get_obj(s->obj_ctx, &handle, obj, &data, ofs, end);
if (ret < 0) {
goto done;
@@ -1335,7 +1336,7 @@ void RGWCompleteMultipart::execute()
ofs += obj_iter->second.size;
}
- ret = rgwstore->clone_objs(s->obj_ctx, target_obj, ranges, attrs, rgw_obj_category_main, NULL, true);
+ ret = rgwstore->clone_objs(s->obj_ctx, target_obj, ranges, attrs, rgw_obj_category_main, NULL, true, false);
if (ret < 0)
goto done;
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 6e5e9813d36..9b7855183cc 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -315,7 +315,7 @@ int RGWRados::create_bucket(std::string& id, rgw_bucket& bucket, map<std::string
op.setxattr(iter->first.c_str(), iter->second);
bufferlist outbl;
- int ret = root_pool_ctx.operate(bucket.name, &op, &outbl);
+ int ret = root_pool_ctx.operate(bucket.name, &op);
if (ret < 0)
return ret;
@@ -407,7 +407,7 @@ int RGWRados::put_obj_meta(void *ctx, std::string& id, rgw_obj& obj,
if (!op.size())
return 0;
- r = io_ctx.operate(oid, &op, NULL);
+ r = io_ctx.operate(oid, &op);
if (r < 0)
return r;
@@ -716,6 +716,7 @@ int RGWRados::bucket_suspended(rgw_bucket& bucket, bool *suspended)
*suspended = (auid == RGW_SUSPENDED_USER_AUID);
return 0;
}
+
/**
* Delete an object.
* id: unused
@@ -723,7 +724,7 @@ int RGWRados::bucket_suspended(rgw_bucket& bucket, bool *suspended)
* obj: name of the object to delete
* Returns: 0 on success, -ERR# otherwise.
*/
-int RGWRados::delete_obj(void *ctx, std::string& id, rgw_obj& obj, bool sync)
+int RGWRados::delete_obj_impl(void *ctx, std::string& id, rgw_obj& obj, bool sync)
{
rgw_bucket& bucket = obj.bucket;
std::string& oid = obj.object;
@@ -744,18 +745,30 @@ int RGWRados::delete_obj(void *ctx, std::string& id, rgw_obj& obj, bool sync)
op.remove();
if (sync) {
- r = io_ctx.operate(oid, &op, NULL);
+ r = io_ctx.operate(oid, &op);
} else {
librados::AioCompletion *completion = rados->aio_create_completion(NULL, NULL, NULL);
- r = io_ctx.aio_operate(obj.object, completion, &op, NULL);
+ r = io_ctx.aio_operate(obj.object, completion, &op);
completion->release();
}
+
+ atomic_write_finish(state, r);
+
if (r < 0)
return r;
return 0;
}
+int RGWRados::delete_obj(void *ctx, std::string& id, rgw_obj& obj, bool sync)
+{
+ int r;
+ do {
+ r = delete_obj_impl(ctx, id, obj, sync);
+ } while (r == -ECANCELED);
+ return r;
+}
+
int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state)
{
RGWObjState *s = rctx->get_state(obj);
@@ -870,12 +883,9 @@ int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCt
return 0;
}
-int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate)
{
- if (!rctx)
- return 0;
-
int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
if (r < 0)
return r;
@@ -888,6 +898,7 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados
if (state->obj_tag.length() == 0 ||
state->shadow_obj.size() == 0) {
RGW_LOG(0) << "can't clone object " << obj << " to shadow object, tag/shadow_obj haven't been set" << dendl;
+ // FIXME: need to test object does not exist
} else {
RGW_LOG(0) << "cloning object " << obj << " to name=" << state->shadow_obj << dendl;
rgw_obj dest_obj(obj.bucket, state->shadow_obj);
@@ -897,14 +908,17 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados
else
dest_obj.set_key(obj.object);
- /* FIXME: clone obj should be conditional, should check src object id-tag */
pair<string, bufferlist> cond(RGW_ATTR_ID_TAG, state->obj_tag);
- r = clone_obj_cond(NULL, dest_obj, 0, obj, 0, state->size, state->attrset, shadow_category, &state->mtime, &cond);
+ RGW_LOG(0) << "cloning: dest_obj=" << dest_obj << " size=" << state->size << " tag=" << state->obj_tag.c_str() << dendl;
+ r = clone_obj_cond(NULL, dest_obj, 0, obj, 0, state->size, state->attrset, shadow_category, &state->mtime, false, true, &cond);
+ if (r == -EEXIST)
+ r = 0;
if (r == -ECANCELED) {
/* we lost in a race here, original object was replaced, we assume it was cloned
as required */
RGW_LOG(0) << "clone_obj_cond was cancelled, lost in a race" << dendl;
- r = 0;
+ state->clear();
+ return r;
} else {
int ret = rctx->notify_intent(dest_obj, DEL_OBJ);
if (ret < 0) {
@@ -915,6 +929,10 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados
RGW_LOG(0) << "ERROR: failed to clone object r=" << r << dendl;
return r;
}
+
+ /* first verify that the object wasn't replaced under */
+ op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag);
+ // FIXME: need to add FAIL_NOTEXIST_OK for racing deletion
}
string tag;
@@ -922,6 +940,7 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados
bufferlist bl;
bl.append(tag);
+
op.setxattr(RGW_ATTR_ID_TAG, bl);
string shadow = obj.object;
@@ -935,6 +954,22 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados
return 0;
}
+int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+ string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate)
+{
+ if (!rctx) {
+ *pstate = NULL;
+ return 0;
+ }
+
+ int r;
+ do {
+ r = prepare_atomic_for_write_impl(rctx, obj, io_ctx, actual_obj, op, pstate);
+ } while (r == -ECANCELED);
+
+ return r;
+}
+
/**
* Set an attr on an object.
* bucket: name of the bucket holding the object
@@ -970,9 +1005,8 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl
if (r < 0)
return r;
- bufferlist outbl;
op.setxattr(name, bl);
- r = io_ctx.operate(actual_obj, &op, &outbl);
+ r = io_ctx.operate(actual_obj, &op);
if (state && r >= 0)
state->attrset[name] = bl;
@@ -980,7 +1014,8 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl
if (r == -ECANCELED) {
/* a race! object was replaced, we need to set attr on the original obj */
dout(0) << "RGWRados::set_attr: raced with another process, going to the shadow obj instead" << dendl;
- rgw_obj shadow(obj.bucket, state->shadow_obj, obj.key, shadow_ns);
+ string loc = obj.loc();
+ rgw_obj shadow(obj.bucket, state->shadow_obj, loc, shadow_ns);
r = set_attr(NULL, shadow, name, bl);
}
@@ -1138,12 +1173,13 @@ done_err:
return r;
}
-int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
+int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
vector<RGWCloneRangeInfo>& ranges,
map<string, bufferlist> attrs,
string& category,
time_t *pmtime,
bool truncate_dest,
+ bool exclusive,
pair<string, bufferlist> *xattr_cond)
{
rgw_bucket& bucket = dst_obj.bucket;
@@ -1157,16 +1193,15 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
io_ctx.locator_set_key(dst_obj.key);
ObjectWriteOperation op;
-
if (truncate_dest) {
op.remove();
op.set_op_flags(OP_FAILOK); // don't fail if object didn't exist
}
if (category.size())
- op.create(false, category);
+ op.create(exclusive, category);
else
- op.create(false);
+ op.create(exclusive);
map<string, bufferlist>::iterator iter;
@@ -1208,10 +1243,30 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
op.mtime(pmtime);
bufferlist outbl;
- int ret = io_ctx.operate(dst_oid, &op, &outbl);
+ int ret = io_ctx.operate(dst_oid, &op);
+
+ atomic_write_finish(state, ret);
+
return ret;
}
+int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
+ vector<RGWCloneRangeInfo>& ranges,
+ map<string, bufferlist> attrs,
+ string& category,
+ time_t *pmtime,
+ bool truncate_dest,
+ bool exclusive,
+ pair<string, bufferlist> *xattr_cond)
+{
+ int r;
+ do {
+ r = clone_objs_impl(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, exclusive, xattr_cond);
+ } while (ctx && r == -ECANCELED);
+ return r;
+}
+
+
int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj,
char **data, off_t ofs, off_t end)
{
@@ -1248,7 +1303,8 @@ int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj,
if (r == -ECANCELED) {
/* a race! object was replaced, we need to set attr on the original obj */
dout(0) << "RGWRados::get_obj: raced with another process, going to the shadow obj instead" << dendl;
- rgw_obj shadow(obj.bucket, astate->shadow_obj, obj.key, shadow_ns);
+ string loc = obj.loc();
+ rgw_obj shadow(obj.bucket, astate->shadow_obj, loc, shadow_ns);
r = get_obj(NULL, handle, shadow, data, ofs, end);
return r;
}
@@ -1302,7 +1358,8 @@ int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist&
if (r == -ECANCELED) {
/* a race! object was replaced, we need to set attr on the original obj */
dout(0) << "RGWRados::get_obj: raced with another process, going to the shadow obj instead" << dendl;
- rgw_obj shadow(obj.bucket, astate->shadow_obj, obj.key, shadow_ns);
+ string loc = obj.loc();
+ rgw_obj shadow(obj.bucket, astate->shadow_obj, loc, shadow_ns);
r = read(NULL, shadow, ofs, size, bl);
}
return r;
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index 15b5ec921c6..f3f20d45044 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -27,6 +27,16 @@ struct RGWObjState {
}
return false;
}
+
+ void clear() {
+ has_attrs = false;
+ exists = false;
+ size = 0;
+ mtime = 0;
+ obj_tag.clear();
+ shadow_obj.clear();
+ attrset.clear();
+ }
};
struct RGWRadosCtx {
@@ -75,8 +85,26 @@ class RGWRados : public RGWAccess
int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state);
int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
string& actual_obj, librados::ObjectOperation& op, RGWObjState **state);
+ int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+ string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate);
int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate);
+
+ void atomic_write_finish(RGWObjState *state, int r) {
+ if (state && r == -ECANCELED) {
+ state->clear();
+ }
+ }
+
+ int clone_objs_impl(void *ctx, rgw_obj& dst_obj,
+ vector<RGWCloneRangeInfo>& ranges,
+ map<string, bufferlist> attrs,
+ string& category,
+ time_t *pmtime,
+ bool truncate_dest,
+ bool exclusive,
+ pair<string, bufferlist> *cmp_xattr);
+ int delete_obj_impl(void *ctx, std::string& id, rgw_obj& src_obj, bool sync);
public:
RGWRados() : watcher(NULL), watch_handle(0) {}
@@ -115,8 +143,8 @@ public:
vector<RGWCloneRangeInfo>& ranges,
map<string, bufferlist> attrs,
string& category,
- time_t *pmtime, bool truncate_dest) {
- return clone_objs(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, NULL);
+ time_t *pmtime, bool truncate_dest, bool exclusive) {
+ return clone_objs(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, exclusive, NULL);
}
int clone_objs(void *ctx, rgw_obj& dst_obj,
@@ -125,6 +153,7 @@ public:
string& category,
time_t *pmtime,
bool truncate_dest,
+ bool exclusive,
pair<string, bufferlist> *cmp_xattr);
int clone_obj_cond(void *ctx, rgw_obj& dst_obj, off_t dst_ofs,
@@ -132,6 +161,8 @@ public:
uint64_t size, map<string, bufferlist> attrs,
string& category,
time_t *pmtime,
+ bool truncate_dest,
+ bool exclusive,
pair<string, bufferlist> *xattr_cond) {
RGWCloneRangeInfo info;
vector<RGWCloneRangeInfo> v;
@@ -140,7 +171,7 @@ public:
info.dst_ofs = dst_ofs;
info.len = size;
v.push_back(info);
- return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, true, xattr_cond);
+ return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, truncate_dest, exclusive, xattr_cond);
}
/** Copy an object, with many extra options */