diff options
author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2011-08-23 18:52:08 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@hq.newdream.net> | 2011-08-23 18:52:08 -0700 |
commit | 190b89364c81eda645a129976ca80e0db146ddd9 (patch) | |
tree | acc943b6c319b9f1722d80df0a79b2535451dd9e /src/rgw | |
parent | adf1efe9a94efa9aed2ebda9bbe6e956769d39c6 (diff) | |
parent | 1a7eac4d82ec34e04af30f3858fec5231408576e (diff) | |
download | ceph-190b89364c81eda645a129976ca80e0db146ddd9.tar.gz |
Merge remote-tracking branch 'origin/master' into wip-decouple-bucket
Conflicts:
src/rgw/rgw_rados.cc
Diffstat (limited to 'src/rgw')
-rw-r--r-- | src/rgw/rgw_access.h | 5 | ||||
-rw-r--r-- | src/rgw/rgw_cache.cc | 7 | ||||
-rw-r--r-- | src/rgw/rgw_cache.h | 3 | ||||
-rw-r--r-- | src/rgw/rgw_common.h | 7 | ||||
-rw-r--r-- | src/rgw/rgw_op.cc | 3 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 101 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 37 |
7 files changed, 133 insertions, 30 deletions
diff --git a/src/rgw/rgw_access.h b/src/rgw/rgw_access.h index 22a1ef913f2..2038bfd73fa 100644 --- a/src/rgw/rgw_access.h +++ b/src/rgw/rgw_access.h @@ -188,7 +188,7 @@ public: info.dst_ofs = dst_ofs; info.len = size; v.push_back(info); - return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, true); + return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, true, false); } virtual int clone_objs(void *ctx, rgw_obj& dst_obj, @@ -196,7 +196,8 @@ public: map<string, bufferlist> attrs, string& category, time_t *pmtime, - bool truncate_dest) { return -ENOTSUP; } + bool truncate_dest, + bool exclusive) { return -ENOTSUP; } /** * a simple object read without keeping state */ diff --git a/src/rgw/rgw_cache.cc b/src/rgw/rgw_cache.cc index 4dde65ffe09..249d8baa4a2 100644 --- a/src/rgw/rgw_cache.cc +++ b/src/rgw/rgw_cache.cc @@ -7,6 +7,8 @@ using namespace std; int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask) { + Mutex::Locker l(lock); + map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name); if (iter == cache_map.end()) { RGW_LOG(10) << "cache get: name=" << name << " : miss" << dendl; @@ -29,6 +31,8 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask) void ObjectCache::put(string& name, ObjectCacheInfo& info) { + Mutex::Locker l(lock); + RGW_LOG(10) << "cache put: name=" << name << dendl; map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name); if (iter == cache_map.end()) { @@ -67,6 +71,8 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info) void ObjectCache::remove(string& name) { + Mutex::Locker l(lock); + map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name); if (iter == cache_map.end()) return; @@ -93,7 +99,6 @@ void ObjectCache::touch_lru(string& name, std::list<string>::iterator& lru_iter) lru_iter--; RGW_LOG(10) << "adding " << name << " to cache LRU end" << dendl; } else { - string name = *lru_iter; RGW_LOG(10) << "moving " << name << " to cache LRU end" << dendl; lru.erase(lru_iter); lru.push_back(name); diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 1fe8d09410a..d5c8c11ed13 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -104,11 +104,12 @@ struct ObjectCacheEntry { class ObjectCache { std::map<string, ObjectCacheEntry> cache_map; std::list<string> lru; + Mutex lock; void touch_lru(string& name, std::list<string>::iterator& lru_iter); void remove_lru(string& name, std::list<string>::iterator& lru_iter); public: - ObjectCache() { } + ObjectCache() : lock("ObjectCache") { } int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask); void put(std::string& name, ObjectCacheInfo& bl); void remove(std::string& name); diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 9cbbb4f12d8..aa276dbde2b 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -661,6 +661,13 @@ public: set_key(orig_key); } + string loc() { + if (orig_key.empty()) + return orig_obj; + else + return orig_key; + } + static bool translate_raw_obj(string& obj, string& ns) { if (ns.empty()) { if (obj[0] != '_') diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index d8548882110..9ad7607cd54 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -274,6 +274,7 @@ void RGWGetObj::execute() goto done; while (ofs <= end) { + data = NULL; ret = rgwstore->get_obj(s->obj_ctx, &handle, obj, &data, ofs, end); if (ret < 0) { goto done; @@ -1335,7 +1336,7 @@ void RGWCompleteMultipart::execute() ofs += obj_iter->second.size; } - ret = rgwstore->clone_objs(s->obj_ctx, target_obj, ranges, attrs, rgw_obj_category_main, NULL, true); + ret = rgwstore->clone_objs(s->obj_ctx, target_obj, ranges, attrs, rgw_obj_category_main, NULL, true, false); if (ret < 0) goto done; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 6e5e9813d36..9b7855183cc 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -315,7 +315,7 @@ int RGWRados::create_bucket(std::string& id, rgw_bucket& bucket, map<std::string op.setxattr(iter->first.c_str(), iter->second); bufferlist outbl; - int ret = root_pool_ctx.operate(bucket.name, &op, &outbl); + int ret = root_pool_ctx.operate(bucket.name, &op); if (ret < 0) return ret; @@ -407,7 +407,7 @@ int RGWRados::put_obj_meta(void *ctx, std::string& id, rgw_obj& obj, if (!op.size()) return 0; - r = io_ctx.operate(oid, &op, NULL); + r = io_ctx.operate(oid, &op); if (r < 0) return r; @@ -716,6 +716,7 @@ int RGWRados::bucket_suspended(rgw_bucket& bucket, bool *suspended) *suspended = (auid == RGW_SUSPENDED_USER_AUID); return 0; } + /** * Delete an object. * id: unused @@ -723,7 +724,7 @@ int RGWRados::bucket_suspended(rgw_bucket& bucket, bool *suspended) * obj: name of the object to delete * Returns: 0 on success, -ERR# otherwise. */ -int RGWRados::delete_obj(void *ctx, std::string& id, rgw_obj& obj, bool sync) +int RGWRados::delete_obj_impl(void *ctx, std::string& id, rgw_obj& obj, bool sync) { rgw_bucket& bucket = obj.bucket; std::string& oid = obj.object; @@ -744,18 +745,30 @@ int RGWRados::delete_obj(void *ctx, std::string& id, rgw_obj& obj, bool sync) op.remove(); if (sync) { - r = io_ctx.operate(oid, &op, NULL); + r = io_ctx.operate(oid, &op); } else { librados::AioCompletion *completion = rados->aio_create_completion(NULL, NULL, NULL); - r = io_ctx.aio_operate(obj.object, completion, &op, NULL); + r = io_ctx.aio_operate(obj.object, completion, &op); completion->release(); } + + atomic_write_finish(state, r); + if (r < 0) return r; return 0; } +int RGWRados::delete_obj(void *ctx, std::string& id, rgw_obj& obj, bool sync) +{ + int r; + do { + r = delete_obj_impl(ctx, id, obj, sync); + } while (r == -ECANCELED); + return r; +} + int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state) { RGWObjState *s = rctx->get_state(obj); @@ -870,12 +883,9 @@ int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCt return 0; } -int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, +int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate) { - if (!rctx) - return 0; - int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate); if (r < 0) return r; @@ -888,6 +898,7 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados if (state->obj_tag.length() == 0 || state->shadow_obj.size() == 0) { RGW_LOG(0) << "can't clone object " << obj << " to shadow object, tag/shadow_obj haven't been set" << dendl; + // FIXME: need to test object does not exist } else { RGW_LOG(0) << "cloning object " << obj << " to name=" << state->shadow_obj << dendl; rgw_obj dest_obj(obj.bucket, state->shadow_obj); @@ -897,14 +908,17 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados else dest_obj.set_key(obj.object); - /* FIXME: clone obj should be conditional, should check src object id-tag */ pair<string, bufferlist> cond(RGW_ATTR_ID_TAG, state->obj_tag); - r = clone_obj_cond(NULL, dest_obj, 0, obj, 0, state->size, state->attrset, shadow_category, &state->mtime, &cond); + RGW_LOG(0) << "cloning: dest_obj=" << dest_obj << " size=" << state->size << " tag=" << state->obj_tag.c_str() << dendl; + r = clone_obj_cond(NULL, dest_obj, 0, obj, 0, state->size, state->attrset, shadow_category, &state->mtime, false, true, &cond); + if (r == -EEXIST) + r = 0; if (r == -ECANCELED) { /* we lost in a race here, original object was replaced, we assume it was cloned as required */ RGW_LOG(0) << "clone_obj_cond was cancelled, lost in a race" << dendl; - r = 0; + state->clear(); + return r; } else { int ret = rctx->notify_intent(dest_obj, DEL_OBJ); if (ret < 0) { @@ -915,6 +929,10 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados RGW_LOG(0) << "ERROR: failed to clone object r=" << r << dendl; return r; } + + /* first verify that the object wasn't replaced under */ + op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag); + // FIXME: need to add FAIL_NOTEXIST_OK for racing deletion } string tag; @@ -922,6 +940,7 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados bufferlist bl; bl.append(tag); + op.setxattr(RGW_ATTR_ID_TAG, bl); string shadow = obj.object; @@ -935,6 +954,22 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados return 0; } +int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, + string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate) +{ + if (!rctx) { + *pstate = NULL; + return 0; + } + + int r; + do { + r = prepare_atomic_for_write_impl(rctx, obj, io_ctx, actual_obj, op, pstate); + } while (r == -ECANCELED); + + return r; +} + /** * Set an attr on an object. * bucket: name of the bucket holding the object @@ -970,9 +1005,8 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl if (r < 0) return r; - bufferlist outbl; op.setxattr(name, bl); - r = io_ctx.operate(actual_obj, &op, &outbl); + r = io_ctx.operate(actual_obj, &op); if (state && r >= 0) state->attrset[name] = bl; @@ -980,7 +1014,8 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl if (r == -ECANCELED) { /* a race! object was replaced, we need to set attr on the original obj */ dout(0) << "RGWRados::set_attr: raced with another process, going to the shadow obj instead" << dendl; - rgw_obj shadow(obj.bucket, state->shadow_obj, obj.key, shadow_ns); + string loc = obj.loc(); + rgw_obj shadow(obj.bucket, state->shadow_obj, loc, shadow_ns); r = set_attr(NULL, shadow, name, bl); } @@ -1138,12 +1173,13 @@ done_err: return r; } -int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj, +int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, vector<RGWCloneRangeInfo>& ranges, map<string, bufferlist> attrs, string& category, time_t *pmtime, bool truncate_dest, + bool exclusive, pair<string, bufferlist> *xattr_cond) { rgw_bucket& bucket = dst_obj.bucket; @@ -1157,16 +1193,15 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj, io_ctx.locator_set_key(dst_obj.key); ObjectWriteOperation op; - if (truncate_dest) { op.remove(); op.set_op_flags(OP_FAILOK); // don't fail if object didn't exist } if (category.size()) - op.create(false, category); + op.create(exclusive, category); else - op.create(false); + op.create(exclusive); map<string, bufferlist>::iterator iter; @@ -1208,10 +1243,30 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj, op.mtime(pmtime); bufferlist outbl; - int ret = io_ctx.operate(dst_oid, &op, &outbl); + int ret = io_ctx.operate(dst_oid, &op); + + atomic_write_finish(state, ret); + return ret; } +int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj, + vector<RGWCloneRangeInfo>& ranges, + map<string, bufferlist> attrs, + string& category, + time_t *pmtime, + bool truncate_dest, + bool exclusive, + pair<string, bufferlist> *xattr_cond) +{ + int r; + do { + r = clone_objs_impl(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, exclusive, xattr_cond); + } while (ctx && r == -ECANCELED); + return r; +} + + int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj, char **data, off_t ofs, off_t end) { @@ -1248,7 +1303,8 @@ int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj, if (r == -ECANCELED) { /* a race! object was replaced, we need to set attr on the original obj */ dout(0) << "RGWRados::get_obj: raced with another process, going to the shadow obj instead" << dendl; - rgw_obj shadow(obj.bucket, astate->shadow_obj, obj.key, shadow_ns); + string loc = obj.loc(); + rgw_obj shadow(obj.bucket, astate->shadow_obj, loc, shadow_ns); r = get_obj(NULL, handle, shadow, data, ofs, end); return r; } @@ -1302,7 +1358,8 @@ int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& if (r == -ECANCELED) { /* a race! object was replaced, we need to set attr on the original obj */ dout(0) << "RGWRados::get_obj: raced with another process, going to the shadow obj instead" << dendl; - rgw_obj shadow(obj.bucket, astate->shadow_obj, obj.key, shadow_ns); + string loc = obj.loc(); + rgw_obj shadow(obj.bucket, astate->shadow_obj, loc, shadow_ns); r = read(NULL, shadow, ofs, size, bl); } return r; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 15b5ec921c6..f3f20d45044 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -27,6 +27,16 @@ struct RGWObjState { } return false; } + + void clear() { + has_attrs = false; + exists = false; + size = 0; + mtime = 0; + obj_tag.clear(); + shadow_obj.clear(); + attrset.clear(); + } }; struct RGWRadosCtx { @@ -75,8 +85,26 @@ class RGWRados : public RGWAccess int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state); int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, librados::ObjectOperation& op, RGWObjState **state); + int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, + string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate); int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate); + + void atomic_write_finish(RGWObjState *state, int r) { + if (state && r == -ECANCELED) { + state->clear(); + } + } + + int clone_objs_impl(void *ctx, rgw_obj& dst_obj, + vector<RGWCloneRangeInfo>& ranges, + map<string, bufferlist> attrs, + string& category, + time_t *pmtime, + bool truncate_dest, + bool exclusive, + pair<string, bufferlist> *cmp_xattr); + int delete_obj_impl(void *ctx, std::string& id, rgw_obj& src_obj, bool sync); public: RGWRados() : watcher(NULL), watch_handle(0) {} @@ -115,8 +143,8 @@ public: vector<RGWCloneRangeInfo>& ranges, map<string, bufferlist> attrs, string& category, - time_t *pmtime, bool truncate_dest) { - return clone_objs(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, NULL); + time_t *pmtime, bool truncate_dest, bool exclusive) { + return clone_objs(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, exclusive, NULL); } int clone_objs(void *ctx, rgw_obj& dst_obj, @@ -125,6 +153,7 @@ public: string& category, time_t *pmtime, bool truncate_dest, + bool exclusive, pair<string, bufferlist> *cmp_xattr); int clone_obj_cond(void *ctx, rgw_obj& dst_obj, off_t dst_ofs, @@ -132,6 +161,8 @@ public: uint64_t size, map<string, bufferlist> attrs, string& category, time_t *pmtime, + bool truncate_dest, + bool exclusive, pair<string, bufferlist> *xattr_cond) { RGWCloneRangeInfo info; vector<RGWCloneRangeInfo> v; @@ -140,7 +171,7 @@ public: info.dst_ofs = dst_ofs; info.len = size; v.push_back(info); - return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, true, xattr_cond); + return clone_objs(ctx, dst_obj, v, attrs, category, pmtime, truncate_dest, exclusive, xattr_cond); } /** Copy an object, with many extra options */ |