summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2012-05-31 22:29:21 -0700
committerSamuel Just <sam.just@inktank.com>2012-06-05 16:09:50 -0700
commit50331ab1bbacc17bb093167172c962a4cc869fac (patch)
treefa1db07065faa1abb6fe0f87fcbadd1054b957cc
parentffc5f7ef8a530c7bc50cf7bd34b9143fbb8c340b (diff)
downloadceph-50331ab1bbacc17bb093167172c962a4cc869fac.tar.gz
FileStore,DBObjectMap: add SequencerPosition argument to ObjectMap
Previously, sequences like: 1. touch (c1, a) 2. link (c1, c2, a) 3. rm (c1, a) 4. setattr (c2, a) 5. clone (c2, a, b) could result in the omap entries for a being removed once ops 2-3 are replayed. Calls to ObjectMap::sync will include a sequencer posotion and an hobject_t to mark. Updates to the object map will now also check the SequencerPosition entry on the map header preventing replay of earlier ops. Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/os/DBObjectMap.cc85
-rw-r--r--src/os/DBObjectMap.h40
-rw-r--r--src/os/FileStore.cc95
-rw-r--r--src/os/FileStore.h32
-rw-r--r--src/os/ObjectMap.h27
5 files changed, 189 insertions, 90 deletions
diff --git a/src/os/DBObjectMap.cc b/src/os/DBObjectMap.cc
index 4852b2aac76..9c92505c827 100644
--- a/src/os/DBObjectMap.cc
+++ b/src/os/DBObjectMap.cc
@@ -498,12 +498,15 @@ int DBObjectMap::DBObjectMapIteratorImpl::status()
}
int DBObjectMap::set_keys(const hobject_t &hoid,
- const map<string, bufferlist> &set)
+ const map<string, bufferlist> &set,
+ const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
Header header = lookup_create_map_header(hoid, t);
if (!header)
return -EINVAL;
+ if (check_spos(hoid, header, spos))
+ return 0;
t->set(user_prefix(header), set);
@@ -511,12 +514,15 @@ int DBObjectMap::set_keys(const hobject_t &hoid,
}
int DBObjectMap::set_header(const hobject_t &hoid,
- const bufferlist &bl)
+ const bufferlist &bl,
+ const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
Header header = lookup_create_map_header(hoid, t);
if (!header)
return -EINVAL;
+ if (check_spos(hoid, header, spos))
+ return 0;
_set_header(header, bl, t);
return db->submit_transaction(t);
}
@@ -564,12 +570,15 @@ int DBObjectMap::_get_header(Header header,
return 0;
}
-int DBObjectMap::clear(const hobject_t &hoid)
+int DBObjectMap::clear(const hobject_t &hoid,
+ const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
Header header = lookup_map_header(hoid);
if (!header)
return -ENOENT;
+ if (check_spos(hoid, header, spos))
+ return 0;
remove_map_header(hoid, header, t);
assert(header->num_children > 0);
header->num_children--;
@@ -683,12 +692,15 @@ int DBObjectMap::need_parent(DBObjectMapIterator iter)
}
int DBObjectMap::rm_keys(const hobject_t &hoid,
- const set<string> &to_clear)
+ const set<string> &to_clear,
+ const SequencerPosition *spos)
{
Header header = lookup_map_header(hoid);
if (!header)
return -ENOENT;
KeyValueDB::Transaction t = db->get_transaction();
+ if (check_spos(hoid, header, spos))
+ return 0;
t->rmkeys(user_prefix(header), to_clear);
if (!header->parent) {
return db->submit_transaction(t);
@@ -853,29 +865,36 @@ int DBObjectMap::get_all_xattrs(const hobject_t &hoid,
}
int DBObjectMap::set_xattrs(const hobject_t &hoid,
- const map<string, bufferlist> &to_set)
+ const map<string, bufferlist> &to_set,
+ const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
Header header = lookup_create_map_header(hoid, t);
if (!header)
return -EINVAL;
+ if (check_spos(hoid, header, spos))
+ return 0;
t->set(xattr_prefix(header), to_set);
return db->submit_transaction(t);
}
int DBObjectMap::remove_xattrs(const hobject_t &hoid,
- const set<string> &to_remove)
+ const set<string> &to_remove,
+ const SequencerPosition *spos)
{
KeyValueDB::Transaction t = db->get_transaction();
Header header = lookup_map_header(hoid);
if (!header)
return -ENOENT;
+ if (check_spos(hoid, header, spos))
+ return 0;
t->rmkeys(xattr_prefix(header), to_remove);
return db->submit_transaction(t);
}
int DBObjectMap::clone(const hobject_t &hoid,
- const hobject_t &target)
+ const hobject_t &target,
+ const SequencerPosition *spos)
{
if (hoid == target)
return 0;
@@ -885,6 +904,8 @@ int DBObjectMap::clone(const hobject_t &hoid,
Header destination = lookup_map_header(target);
if (destination) {
remove_map_header(target, destination, t);
+ if (check_spos(target, destination, spos))
+ return 0;
destination->num_children--;
_clear(destination, t);
}
@@ -896,6 +917,8 @@ int DBObjectMap::clone(const hobject_t &hoid,
Header source = generate_new_header(hoid, parent);
Header destination = generate_new_header(target, parent);
+ if (spos)
+ destination->spos = *spos;
parent->num_children = 2;
set_header(parent, t);
@@ -981,7 +1004,9 @@ int DBObjectMap::upgrade()
db->submit_transaction(t);
}
state.v = 1;
- write_state(true);
+ KeyValueDB::Transaction t = db->get_transaction();
+ write_state(t);
+ db->submit_transaction_sync(t);
return 0;
}
@@ -1017,19 +1042,32 @@ int DBObjectMap::init(bool do_upgrade)
return 0;
}
-int DBObjectMap::sync() {
- return write_state(true);
+int DBObjectMap::sync(const hobject_t *hoid,
+ const SequencerPosition *spos) {
+ KeyValueDB::Transaction t = db->get_transaction();
+ write_state(t);
+ if (hoid) {
+ assert(spos);
+ Header header = lookup_map_header(*hoid);
+ if (header) {
+ dout(10) << "hoid: " << *hoid << " setting spos to "
+ << *spos << dendl;
+ header->spos = *spos;
+ set_map_header(*hoid, *header, t);
+ }
+ }
+ return db->submit_transaction_sync(t);
}
-int DBObjectMap::write_state(bool sync) {
+int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
- KeyValueDB::Transaction t = db->get_transaction();
+ KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
bufferlist bl;
state.encode(bl);
map<string, bufferlist> to_write;
to_write[GLOBAL_STATE_KEY] = bl;
t->set(SYS_PREFIX, to_write);
- return sync ? db->submit_transaction_sync(t) : db->submit_transaction(t);
+ return _t ? 0 : db->submit_transaction(t);
}
@@ -1154,3 +1192,24 @@ void DBObjectMap::set_map_header(const hobject_t &hoid, _Header header,
header.encode(to_set[map_header_key(hoid)]);
t->set(HOBJECT_TO_SEQ, to_set);
}
+
+bool DBObjectMap::check_spos(const hobject_t &hoid,
+ Header header,
+ const SequencerPosition *spos)
+{
+ if (!spos || *spos > header->spos) {
+ stringstream out;
+ if (spos)
+ dout(10) << "hoid: " << hoid << " not skipping op, *spos "
+ << *spos << dendl;
+ else
+ dout(10) << "hoid: " << hoid << " not skipping op, *spos "
+ << "empty" << dendl;
+ dout(10) << " > header.spos " << header->spos << dendl;
+ return false;
+ } else {
+ dout(10) << "hoid: " << hoid << " skipping op, *spos " << *spos
+ << " <= header.spos " << header->spos << dendl;
+ return true;
+ }
+}
diff --git a/src/os/DBObjectMap.h b/src/os/DBObjectMap.h
index 1ff87f1289d..cccc3316f7b 100644
--- a/src/os/DBObjectMap.h
+++ b/src/os/DBObjectMap.h
@@ -75,12 +75,14 @@ public:
int set_keys(
const hobject_t &hoid,
- const map<string, bufferlist> &set
+ const map<string, bufferlist> &set,
+ const SequencerPosition *spos=0
);
int set_header(
const hobject_t &hoid,
- const bufferlist &bl
+ const bufferlist &bl,
+ const SequencerPosition *spos=0
);
int get_header(
@@ -89,12 +91,14 @@ public:
);
int clear(
- const hobject_t &hoid
+ const hobject_t &hoid,
+ const SequencerPosition *spos=0
);
int rm_keys(
const hobject_t &hoid,
- const set<string> &to_clear
+ const set<string> &to_clear,
+ const SequencerPosition *spos=0
);
int get(
@@ -133,17 +137,20 @@ public:
int set_xattrs(
const hobject_t &hoid,
- const map<string, bufferlist> &to_set
+ const map<string, bufferlist> &to_set,
+ const SequencerPosition *spos=0
);
int remove_xattrs(
const hobject_t &hoid,
- const set<string> &to_remove
+ const set<string> &to_remove,
+ const SequencerPosition *spos=0
);
int clone(
const hobject_t &hoid,
- const hobject_t &target
+ const hobject_t &target,
+ const SequencerPosition *spos=0
);
/// Read initial state from backing store
@@ -156,7 +163,7 @@ public:
bool check(std::ostream &out);
/// Ensure that all previous operations are durable
- int sync();
+ int sync(const hobject_t *hoid=0, const SequencerPosition *spos=0);
ObjectMapIterator get_iterator(const hobject_t &hoid);
@@ -215,23 +222,28 @@ public:
coll_t c;
hobject_t hoid;
+ SequencerPosition spos;
+
void encode(bufferlist &bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(seq, bl);
::encode(parent, bl);
::encode(num_children, bl);
::encode(c, bl);
::encode(hoid, bl);
+ ::encode(spos, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
::decode(seq, bl);
::decode(parent, bl);
::decode(num_children, bl);
::decode(c, bl);
::decode(hoid, bl);
+ if (struct_v >= 2)
+ ::decode(spos, bl);
DECODE_FINISH(bl);
}
@@ -362,6 +374,11 @@ private:
void set_map_header(const hobject_t &hoid, _Header header,
KeyValueDB::Transaction t);
+ /// Set leaf node for c and hoid to the value of header
+ bool check_spos(const hobject_t &hoid,
+ Header header,
+ const SequencerPosition *spos);
+
/// Lookup or create header for c hoid
Header lookup_create_map_header(const hobject_t &hoid,
KeyValueDB::Transaction t);
@@ -407,7 +424,8 @@ private:
KeyValueDB::Transaction t);
/// Writes out State (mainly next_seq)
- int write_state(bool sync = false);
+ int write_state(KeyValueDB::Transaction _t =
+ KeyValueDB::Transaction());
/// 0 if the complete set now contains all of key space, < 0 on error, 1 else
int need_parent(DBObjectMapIterator iter);
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 3faaa76f6ab..2e5f86fe439 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -366,7 +366,8 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
return 0;
}
-int FileStore::lfn_unlink(coll_t cid, const hobject_t& o)
+int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
+ const SequencerPosition &spos)
{
Index index;
int r = get_index(cid, &index);
@@ -385,7 +386,7 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o)
return -errno;
}
if (st.st_nlink == 1) {
- r = object_map->clear(o);
+ r = object_map->clear(o, &spos);
if (r < 0 && r != -ENOENT)
return r;
}
@@ -2459,7 +2460,10 @@ void FileStore::_transaction_finish(int fd)
TEMP_FAILURE_RETRY(::close(fd));
}
-void FileStore::_set_replay_guard(int fd, const SequencerPosition& spos, bool in_progress)
+void FileStore::_set_replay_guard(int fd,
+ const SequencerPosition& spos,
+ const hobject_t *hoid,
+ bool in_progress)
{
if (btrfs_stable_commits)
return;
@@ -2474,7 +2478,7 @@ void FileStore::_set_replay_guard(int fd, const SequencerPosition& spos, bool in
// sync object_map too. even if this object has a header or keys,
// it have had them in the past and then removed them, so always
// sync.
- object_map->sync();
+ object_map->sync(hoid, &spos);
_inject_failure();
@@ -2674,7 +2678,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
coll_t cid = i.get_cid();
hobject_t oid = i.get_oid();
if (_check_replay_guard(cid, oid, spos) > 0)
- r = _remove(cid, oid);
+ r = _remove(cid, oid, spos);
}
break;
@@ -2688,7 +2692,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
if (_check_replay_guard(cid, oid, spos) > 0) {
map<string, bufferptr> to_set;
to_set[name] = bufferptr(bl.c_str(), bl.length());
- r = _setattrs(cid, oid, to_set);
+ r = _setattrs(cid, oid, to_set, spos);
if (r == -ENOSPC)
dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
<< " name " << name << " size " << bl.length() << dendl;
@@ -2703,7 +2707,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
map<string, bufferptr> aset;
i.get_attrset(aset);
if (_check_replay_guard(cid, oid, spos) > 0)
- r = _setattrs(cid, oid, aset);
+ r = _setattrs(cid, oid, aset, spos);
if (r == -ENOSPC)
dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
}
@@ -2715,7 +2719,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
hobject_t oid = i.get_oid();
string name = i.get_attrname();
if (_check_replay_guard(cid, oid, spos) > 0)
- r = _rmattr(cid, oid, name.c_str());
+ r = _rmattr(cid, oid, name.c_str(), spos);
}
break;
@@ -2724,7 +2728,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
coll_t cid = i.get_cid();
hobject_t oid = i.get_oid();
if (_check_replay_guard(cid, oid, spos) > 0)
- r = _rmattrs(cid, oid);
+ r = _rmattrs(cid, oid, spos);
}
break;
@@ -2790,7 +2794,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
coll_t cid = i.get_cid();
hobject_t oid = i.get_oid();
if (_check_replay_guard(cid, oid, spos) > 0)
- r = _collection_remove(cid, oid);
+ r = _remove(cid, oid, spos);
}
break;
@@ -2803,7 +2807,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
r = _collection_add(ocid, ncid, oid, spos);
if (r == 0 &&
(_check_replay_guard(ocid, oid, spos) > 0))
- r = _collection_remove(ocid, oid);
+ r = _remove(ocid, oid, spos);
}
break;
@@ -2843,7 +2847,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
{
coll_t cid(i.get_cid());
hobject_t oid = i.get_oid();
- r = _omap_clear(cid, oid);
+ r = _omap_clear(cid, oid, spos);
}
break;
case Transaction::OP_OMAP_SETKEYS:
@@ -2852,7 +2856,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
hobject_t oid = i.get_oid();
map<string, bufferlist> aset;
i.get_attrset(aset);
- r = _omap_setkeys(cid, oid, aset);
+ r = _omap_setkeys(cid, oid, aset, spos);
}
break;
case Transaction::OP_OMAP_RMKEYS:
@@ -2861,7 +2865,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
hobject_t oid = i.get_oid();
set<string> keys;
i.get_keyset(keys);
- r = _omap_rmkeys(cid, oid, keys);
+ r = _omap_rmkeys(cid, oid, keys, spos);
}
break;
case Transaction::OP_OMAP_SETHEADER:
@@ -2870,7 +2874,7 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
hobject_t oid = i.get_oid();
bufferlist bl;
i.get_bl(bl);
- r = _omap_setheader(cid, oid, bl);
+ r = _omap_setheader(cid, oid, bl, spos);
}
break;
@@ -3090,10 +3094,11 @@ done:
}
-int FileStore::_remove(coll_t cid, const hobject_t& oid)
+int FileStore::_remove(coll_t cid, const hobject_t& oid,
+ const SequencerPosition &spos)
{
dout(15) << "remove " << cid << "/" << oid << dendl;
- int r = lfn_unlink(cid, oid);
+ int r = lfn_unlink(cid, oid, spos);
dout(10) << "remove " << cid << "/" << oid << " = " << r << dendl;
return r;
}
@@ -3250,7 +3255,7 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo
goto out3;
}
dout(20) << "objectmap clone" << dendl;
- r = object_map->clone(oldoid, newoid);
+ r = object_map->clone(oldoid, newoid, &spos);
if (r < 0 && r != -ENOENT)
goto out3;
}
@@ -3261,13 +3266,13 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo
if (r < 0)
goto out3;
- r = _setattrs(cid, newoid, aset);
+ r = _setattrs(cid, newoid, aset, spos);
if (r < 0)
goto out3;
}
// clone is non-idempotent; record our work.
- _set_replay_guard(n, spos);
+ _set_replay_guard(n, spos, &newoid);
out3:
TEMP_FAILURE_RETRY(::close(n));
@@ -3434,7 +3439,7 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
r = _do_clone_range(o, n, srcoff, len, dstoff);
// clone is non-idempotent; record our work.
- _set_replay_guard(n, spos);
+ _set_replay_guard(n, spos, &newoid);
TEMP_FAILURE_RETRY(::close(n));
out:
@@ -4091,7 +4096,8 @@ int FileStore::getattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>&
return r;
}
-int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset)
+int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset,
+ const SequencerPosition &spos)
{
map<string, bufferlist> omap_set;
set<string> omap_remove;
@@ -4152,12 +4158,12 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
dout(10) << __func__ << " could not get index r = " << r << dendl;
return r;
}
- r = object_map->remove_xattrs(oid, omap_remove);
+ r = object_map->remove_xattrs(oid, omap_remove, &spos);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not remove_xattrs r = " << r << dendl;
return r;
}
- r = object_map->set_xattrs(oid, omap_set);
+ r = object_map->set_xattrs(oid, omap_set, &spos);
if (r < 0) {
dout(10) << __func__ << " could not set_xattrs r = " << r << dendl;
return r;
@@ -4168,7 +4174,8 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
}
-int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name)
+int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name,
+ const SequencerPosition &spos)
{
dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl;
char n[ATTR_MAX_NAME_LEN];
@@ -4183,7 +4190,7 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name)
}
set<string> to_remove;
to_remove.insert(string(name));
- r = object_map->remove_xattrs(oid, to_remove);
+ r = object_map->remove_xattrs(oid, to_remove, &spos);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not remove_xattrs index r = " << r << dendl;
return r;
@@ -4193,7 +4200,8 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name)
return r;
}
-int FileStore::_rmattrs(coll_t cid, const hobject_t& oid)
+int FileStore::_rmattrs(coll_t cid, const hobject_t& oid,
+ const SequencerPosition &spos)
{
dout(15) << "rmattrs " << cid << "/" << oid << dendl;
@@ -4221,7 +4229,7 @@ int FileStore::_rmattrs(coll_t cid, const hobject_t& oid)
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
return r;
}
- r = object_map->remove_xattrs(oid, omap_attrs);
+ r = object_map->remove_xattrs(oid, omap_attrs, &spos);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
return r;
@@ -4614,7 +4622,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
}
assert(fd >= 0);
if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
- _set_replay_guard(fd, spos, true);
+ _set_replay_guard(fd, spos, &o, true);
}
int r = lfn_link(oldcid, c, o);
@@ -4634,15 +4642,6 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
return r;
}
-int FileStore::_collection_remove(coll_t c, const hobject_t& o)
-{
- dout(15) << "collection_remove " << c << "/" << o << dendl;
- int r = lfn_unlink(c, o);
- dout(10) << "collection_remove " << c << "/" << o << " = " << r << dendl;
- return r;
-}
-
-
void FileStore::_inject_failure()
{
if (m_filestore_kill_at.read()) {
@@ -4656,47 +4655,51 @@ void FileStore::_inject_failure()
}
}
-int FileStore::_omap_clear(coll_t cid, const hobject_t &hoid) {
+int FileStore::_omap_clear(coll_t cid, const hobject_t &hoid,
+ const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
if (r < 0)
return r;
- r = object_map->clear(hoid);
+ r = object_map->clear(hoid, &spos);
if (r < 0 && r != -ENOENT)
return r;
return 0;
}
int FileStore::_omap_setkeys(coll_t cid, const hobject_t &hoid,
- const map<string, bufferlist> &aset) {
+ const map<string, bufferlist> &aset,
+ const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
if (r < 0)
return r;
- return object_map->set_keys(hoid, aset);
+ return object_map->set_keys(hoid, aset, &spos);
}
int FileStore::_omap_rmkeys(coll_t cid, const hobject_t &hoid,
- const set<string> &keys) {
+ const set<string> &keys,
+ const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
if (r < 0)
return r;
- r = object_map->rm_keys(hoid, keys);
+ r = object_map->rm_keys(hoid, keys, &spos);
if (r < 0 && r != -ENOENT)
return r;
return 0;
}
int FileStore::_omap_setheader(coll_t cid, const hobject_t &hoid,
- const bufferlist &bl)
+ const bufferlist &bl,
+ const SequencerPosition &spos)
{
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
if (r < 0)
return r;
- return object_map->set_header(hoid, bl);
+ return object_map->set_header(hoid, bl, &spos);
}
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index ba333adbb56..6be29be9d43 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -282,7 +282,7 @@ public:
int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode);
int lfn_open(coll_t cid, const hobject_t& oid, int flags);
int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ;
- int lfn_unlink(coll_t cid, const hobject_t& o);
+ int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos);
public:
FileStore(const std::string &base, const std::string &jdev, const char *internal_name = "filestore", bool update_to=false);
@@ -328,7 +328,10 @@ public:
* @param fd open file descriptor for the file/object
* @param spos sequencer position of the last operation we should not replay
*/
- void _set_replay_guard(int fd, const SequencerPosition& spos, bool in_progress=false);
+ void _set_replay_guard(int fd,
+ const SequencerPosition& spos,
+ const hobject_t *hoid=0,
+ bool in_progress=false);
/// close a replay guard opened with in_progress=true
void _close_replay_guard(int fd, const SequencerPosition& spos);
@@ -374,7 +377,7 @@ public:
const SequencerPosition& spos);
int _do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff);
int _do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff);
- int _remove(coll_t cid, const hobject_t& oid);
+ int _remove(coll_t cid, const hobject_t& oid, const SequencerPosition &spos);
void _start_sync();
@@ -403,9 +406,12 @@ public:
int _getattr(const char *fn, const char *name, bufferptr& bp);
int _getattrs(const char *fn, map<string,bufferptr>& aset, bool user_only = false);
- int _setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset);
- int _rmattr(coll_t cid, const hobject_t& oid, const char *name);
- int _rmattrs(coll_t cid, const hobject_t& oid);
+ int _setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset,
+ const SequencerPosition &spos);
+ int _rmattr(coll_t cid, const hobject_t& oid, const char *name,
+ const SequencerPosition &spos);
+ int _rmattrs(coll_t cid, const hobject_t& oid,
+ const SequencerPosition &spos);
int collection_getattr(coll_t c, const char *name, void *value, size_t size);
int collection_getattr(coll_t c, const char *name, bufferlist& bl);
@@ -443,8 +449,6 @@ public:
int _destroy_collection(coll_t c);
int _collection_add(coll_t c, coll_t ocid, const hobject_t& o,
const SequencerPosition& spos);
- int _collection_remove(coll_t c, const hobject_t& o);
-
void dump_start(const std::string& file);
void dump_stop();
void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr);
@@ -453,11 +457,15 @@ private:
void _inject_failure();
// omap
- int _omap_clear(coll_t cid, const hobject_t &hoid);
+ int _omap_clear(coll_t cid, const hobject_t &hoid,
+ const SequencerPosition &spos);
int _omap_setkeys(coll_t cid, const hobject_t &hoid,
- const map<string, bufferlist> &aset);
- int _omap_rmkeys(coll_t cid, const hobject_t &hoid, const set<string> &keys);
- int _omap_setheader(coll_t cid, const hobject_t &hoid, const bufferlist &bl);
+ const map<string, bufferlist> &aset,
+ const SequencerPosition &spos);
+ int _omap_rmkeys(coll_t cid, const hobject_t &hoid, const set<string> &keys,
+ const SequencerPosition &spos);
+ int _omap_setheader(coll_t cid, const hobject_t &hoid, const bufferlist &bl,
+ const SequencerPosition &spos);
virtual const char** get_tracked_conf_keys() const;
virtual void handle_conf_change(const struct md_config_t *conf,
diff --git a/src/os/ObjectMap.h b/src/os/ObjectMap.h
index 8e52fbb6ea0..5cc1e495de1 100644
--- a/src/os/ObjectMap.h
+++ b/src/os/ObjectMap.h
@@ -16,6 +16,7 @@
#define OS_KEYVALUESTORE_H
#include "IndexManager.h"
+#include "SequencerPosition.h"
#include <string>
#include <vector>
#include <tr1/memory>
@@ -30,13 +31,15 @@ public:
/// Set keys and values from specified map
virtual int set_keys(
const hobject_t &hoid, ///< [in] object containing map
- const map<string, bufferlist> &set ///< [in] key to value map to set
+ const map<string, bufferlist> &set, ///< [in] key to value map to set
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) = 0;
/// Set header
virtual int set_header(
const hobject_t &hoid, ///< [in] object containing map
- const bufferlist &bl ///< [in] header to set
+ const bufferlist &bl, ///< [in] header to set
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) = 0;
/// Retrieve header
@@ -47,13 +50,15 @@ public:
/// Clear all map keys and values from hoid
virtual int clear(
- const hobject_t &hoid ///< [in] object containing map
+ const hobject_t &hoid, ///< [in] object containing map
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) = 0;
/// Clear all map keys and values from hoid
virtual int rm_keys(
const hobject_t &hoid, ///< [in] object containing map
- const set<string> &to_clear ///< [in] Keys to clear
+ const set<string> &to_clear, ///< [in] Keys to clear
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) = 0;
/// Get all keys and values
@@ -99,24 +104,30 @@ public:
/// set xattrs in to_set
virtual int set_xattrs(
const hobject_t &hoid, ///< [in] object
- const map<string, bufferlist> &to_set ///< [in] attrs/values to set
+ const map<string, bufferlist> &to_set,///< [in] attrs/values to set
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) = 0;
/// remove xattrs in to_remove
virtual int remove_xattrs(
const hobject_t &hoid, ///< [in] object
- const set<string> &to_remove ///< [in] attrs to remove
+ const set<string> &to_remove, ///< [in] attrs to remove
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) = 0;
/// Clone keys efficiently from hoid map to target map
virtual int clone(
const hobject_t &hoid, ///< [in] object containing map
- const hobject_t &target ///< [in] target of clone
+ const hobject_t &target, ///< [in] target of clone
+ const SequencerPosition *spos=0 ///< [in] sequencer position
) { return 0; }
/// Ensure all previous writes are durable
- virtual int sync() { return 0; }
+ virtual int sync(
+ const hobject_t *hoid=0, ///< [in] object
+ const SequencerPosition *spos=0 ///< [in] Sequencer
+ ) { return 0; }
virtual bool check(std::ostream &out) { return true; }