summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sjust@redhat.com>2015-03-26 10:50:19 -0700
committerSamuel Just <sjust@redhat.com>2015-03-26 10:50:19 -0700
commitc176ebf7918aff8a671afc6c5c4e8a6bbdd919df (patch)
treee104e5619761ab7d92a9a3918092e255cc8b48e3
parente9d6096f254479b1c07e2eb0d1a5279e304d1728 (diff)
downloadceph-c176ebf7918aff8a671afc6c5c4e8a6bbdd919df.tar.gz
osd/: Move ReplicatedBackend methods into ReplicatedBackend.cc
Signed-off-by: Samuel Just <sjust@redhat.com>
-rw-r--r--src/osd/ReplicatedBackend.cc1623
-rw-r--r--src/osd/ReplicatedPG.cc1626
2 files changed, 1623 insertions, 1626 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index 680c27a5e8b..b86d4d1e744 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -30,6 +30,35 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
return *_dout << pgb->get_parent()->gen_dbg_prefix();
}
+static void log_subop_stats(
+ PerfCounters *logger,
+ OpRequestRef op, int subop)
+{
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t latency = now;
+ latency -= op->get_req()->get_recv_stamp();
+
+
+ logger->inc(l_osd_sop);
+ logger->tinc(l_osd_sop_lat, latency);
+ logger->inc(subop);
+
+ if (subop != l_osd_sop_pull) {
+ uint64_t inb = op->get_req()->get_data().length();
+ logger->inc(l_osd_sop_inb, inb);
+ if (subop == l_osd_sop_w) {
+ logger->inc(l_osd_sop_w_inb, inb);
+ logger->tinc(l_osd_sop_w_lat, latency);
+ } else if (subop == l_osd_sop_push) {
+ logger->inc(l_osd_sop_push_inb, inb);
+ logger->tinc(l_osd_sop_push_lat, latency);
+ } else
+ assert("no support subop" == 0);
+ } else {
+ logger->tinc(l_osd_sop_pull_lat, latency);
+ }
+}
+
ReplicatedBackend::ReplicatedBackend(
PGBackend::Listener *pg,
coll_t coll,
@@ -797,3 +826,1597 @@ void ReplicatedBackend::be_deep_scrub(
o.omap_digest = oh.digest();
o.omap_digest_present = true;
}
+
+void ReplicatedBackend::_do_push(OpRequestRef op)
+{
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PUSH);
+ pg_shard_t from = m->from;
+
+ vector<PushReplyOp> replies;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ for (vector<PushOp>::iterator i = m->pushes.begin();
+ i != m->pushes.end();
+ ++i) {
+ replies.push_back(PushReplyOp());
+ handle_push(from, *i, &(replies.back()), t);
+ }
+
+ MOSDPGPushReply *reply = new MOSDPGPushReply;
+ reply->from = get_parent()->whoami_shard();
+ reply->set_priority(m->get_priority());
+ reply->pgid = get_info().pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->replies.swap(replies);
+ reply->compute_cost(cct);
+
+ t->register_on_complete(
+ new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
+
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
+ get_parent()->queue_transaction(t);
+}
+
+struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
+ ReplicatedBackend *bc;
+ list<hobject_t> to_continue;
+ int priority;
+ C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+ : bc(bc), priority(priority) {}
+
+ void finish(ThreadPool::TPHandle &handle) {
+ ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+ for (list<hobject_t>::iterator i =
+ to_continue.begin();
+ i != to_continue.end();
+ ++i) {
+ map<hobject_t, ReplicatedBackend::PullInfo>::iterator j =
+ bc->pulling.find(*i);
+ assert(j != bc->pulling.end());
+ if (!bc->start_pushes(*i, j->second.obc, h)) {
+ bc->get_parent()->on_global_recover(
+ *i);
+ }
+ bc->pulling.erase(*i);
+ handle.reset_tp_timeout();
+ }
+ bc->run_recovery_op(h, priority);
+ }
+};
+
+void ReplicatedBackend::_do_pull_response(OpRequestRef op)
+{
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PUSH);
+ pg_shard_t from = m->from;
+
+ vector<PullOp> replies(1);
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ list<hobject_t> to_continue;
+ for (vector<PushOp>::iterator i = m->pushes.begin();
+ i != m->pushes.end();
+ ++i) {
+ bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
+ if (more)
+ replies.push_back(PullOp());
+ }
+ if (!to_continue.empty()) {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ m->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new PG_RecoveryQueueAsync(
+ get_parent(),
+ get_parent()->bless_gencontext(c)));
+ }
+ replies.erase(replies.end() - 1);
+
+ if (replies.size()) {
+ MOSDPGPull *reply = new MOSDPGPull;
+ reply->from = parent->whoami_shard();
+ reply->set_priority(m->get_priority());
+ reply->pgid = get_info().pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->pulls.swap(replies);
+ reply->compute_cost(cct);
+
+ t->register_on_complete(
+ new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
+ }
+
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
+ get_parent()->queue_transaction(t);
+}
+
+void ReplicatedBackend::do_pull(OpRequestRef op)
+{
+ MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PULL);
+ pg_shard_t from = m->from;
+
+ map<pg_shard_t, vector<PushOp> > replies;
+ for (vector<PullOp>::iterator i = m->pulls.begin();
+ i != m->pulls.end();
+ ++i) {
+ replies[from].push_back(PushOp());
+ handle_pull(from, *i, &(replies[from].back()));
+ }
+ send_pushes(m->get_priority(), replies);
+}
+
+void ReplicatedBackend::do_push_reply(OpRequestRef op)
+{
+ MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
+ pg_shard_t from = m->from;
+
+ vector<PushOp> replies(1);
+ for (vector<PushReplyOp>::iterator i = m->replies.begin();
+ i != m->replies.end();
+ ++i) {
+ bool more = handle_push_reply(from, *i, &(replies.back()));
+ if (more)
+ replies.push_back(PushOp());
+ }
+ replies.erase(replies.end() - 1);
+
+ map<pg_shard_t, vector<PushOp> > _replies;
+ _replies[from].swap(replies);
+ send_pushes(m->get_priority(), _replies);
+}
+
+template<typename T, int MSGTYPE>
+Message * ReplicatedBackend::generate_subop(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ const vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_hist,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t,
+ pg_shard_t peer,
+ const pg_info_t &pinfo)
+{
+ int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
+ // forward the write/update/whatever
+ T *wr = new T(
+ reqid, parent->whoami_shard(),
+ spg_t(get_info().pgid.pgid, peer.shard),
+ soid, acks_wanted,
+ get_osdmap()->get_epoch(),
+ tid, at_version);
+
+ // ship resulting transaction, log entries, and pg_stats
+ if (!parent->should_send_op(peer, soid)) {
+ dout(10) << "issue_repop shipping empty opt to osd." << peer
+ <<", object " << soid
+ << " beyond MAX(last_backfill_started "
+ << ", pinfo.last_backfill "
+ << pinfo.last_backfill << ")" << dendl;
+ ObjectStore::Transaction t;
+ t.set_use_tbl(op_t->get_use_tbl());
+ ::encode(t, wr->get_data());
+ } else {
+ ::encode(*op_t, wr->get_data());
+ }
+
+ ::encode(log_entries, wr->logbl);
+
+ if (pinfo.is_incomplete())
+ wr->pg_stats = pinfo.stats; // reflects backfill progress
+ else
+ wr->pg_stats = get_info().stats;
+
+ wr->pg_trim_to = pg_trim_to;
+ wr->pg_trim_rollback_to = pg_trim_rollback_to;
+
+ wr->new_temp_oid = new_temp_oid;
+ wr->discard_temp_oid = discard_temp_oid;
+ wr->updated_hit_set_history = hset_hist;
+ return wr;
+}
+
+void ReplicatedBackend::issue_op(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ const vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_hist,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t)
+{
+
+ if (parent->get_actingbackfill_shards().size() > 1) {
+ ostringstream ss;
+ set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
+ replicas.erase(parent->whoami_shard());
+ ss << "waiting for subops from " << replicas;
+ if (op->op)
+ op->op->mark_sub_op_sent(ss.str());
+ }
+ for (set<pg_shard_t>::const_iterator i =
+ parent->get_actingbackfill_shards().begin();
+ i != parent->get_actingbackfill_shards().end();
+ ++i) {
+ if (*i == parent->whoami_shard()) continue;
+ pg_shard_t peer = *i;
+ const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
+
+ Message *wr;
+ uint64_t min_features = parent->min_peer_features();
+ if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
+ dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
+ wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
+ soid,
+ at_version,
+ tid,
+ reqid,
+ pg_trim_to,
+ pg_trim_rollback_to,
+ new_temp_oid,
+ discard_temp_oid,
+ log_entries,
+ hset_hist,
+ op,
+ op_t,
+ peer,
+ pinfo);
+ } else {
+ wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
+ soid,
+ at_version,
+ tid,
+ reqid,
+ pg_trim_to,
+ pg_trim_rollback_to,
+ new_temp_oid,
+ discard_temp_oid,
+ log_entries,
+ hset_hist,
+ op,
+ op_t,
+ peer,
+ pinfo);
+ }
+
+ get_parent()->send_message_osd_cluster(
+ peer.osd, wr, get_osdmap()->get_epoch());
+ }
+}
+
+// sub op modify
+void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
+ Message *m = op->get_req();
+ int msg_type = m->get_type();
+ if (msg_type == MSG_OSD_SUBOP) {
+ sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ } else if (msg_type == MSG_OSD_REPOP) {
+ sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
+ } else {
+ assert(0);
+ }
+}
+
+template<typename T, int MSGTYPE>
+void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
+{
+ T *m = static_cast<T *>(op->get_req());
+ int msg_type = m->get_type();
+ assert(MSGTYPE == msg_type);
+ assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
+
+ const hobject_t& soid = m->poid;
+
+ dout(10) << "sub_op_modify trans"
+ << " " << soid
+ << " v " << m->version
+ << (m->logbl.length() ? " (transaction)" : " (parallel exec")
+ << " " << m->logbl.length()
+ << dendl;
+
+ // sanity checks
+ assert(m->map_epoch >= get_info().history.same_interval_since);
+
+ // we better not be missing this.
+ assert(!parent->get_log().get_missing().is_missing(soid));
+
+ int ackerosd = m->get_source().num();
+
+ op->mark_started();
+
+ RepModifyRef rm(new RepModify);
+ rm->op = op;
+ rm->ackerosd = ackerosd;
+ rm->last_complete = get_info().last_complete;
+ rm->epoch_started = get_osdmap()->get_epoch();
+
+ assert(m->logbl.length());
+ // shipped transaction and log entries
+ vector<pg_log_entry_t> log;
+
+ bufferlist::iterator p = m->get_data().begin();
+ ::decode(rm->opt, p);
+ rm->localt.set_use_tbl(rm->opt.get_use_tbl());
+
+ if (m->new_temp_oid != hobject_t()) {
+ dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
+ add_temp_obj(m->new_temp_oid);
+ get_temp_coll(&rm->localt);
+ }
+ if (m->discard_temp_oid != hobject_t()) {
+ dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
+ if (rm->opt.empty()) {
+ dout(10) << __func__ << ": removing object " << m->discard_temp_oid
+ << " since we won't get the transaction" << dendl;
+ rm->localt.remove(temp_coll, m->discard_temp_oid);
+ }
+ clear_temp_obj(m->discard_temp_oid);
+ }
+
+ p = m->logbl.begin();
+ ::decode(log, p);
+ rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ bool update_snaps = false;
+ if (!rm->opt.empty()) {
+ // If the opt is non-empty, we infer we are before
+ // last_backfill (according to the primary, not our
+ // not-quite-accurate value), and should update the
+ // collections now. Otherwise, we do it later on push.
+ update_snaps = true;
+ }
+ parent->update_stats(m->pg_stats);
+ parent->log_operation(
+ log,
+ m->updated_hit_set_history,
+ m->pg_trim_to,
+ m->pg_trim_rollback_to,
+ update_snaps,
+ &(rm->localt));
+
+ rm->bytes_written = rm->opt.get_encoded_bytes();
+
+ op->mark_started();
+
+ rm->localt.append(rm->opt);
+ rm->localt.register_on_commit(
+ parent->bless_context(
+ new C_OSD_RepModifyCommit(this, rm)));
+ rm->localt.register_on_applied(
+ parent->bless_context(
+ new C_OSD_RepModifyApply(this, rm)));
+ parent->queue_transaction(&(rm->localt), op);
+ // op is cleaned up by oncommit/onapply when both are executed
+}
+
+void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
+{
+ rm->op->mark_event("sub_op_applied");
+ rm->applied = true;
+
+ dout(10) << "sub_op_modify_applied on " << rm << " op "
+ << *rm->op->get_req() << dendl;
+ Message *m = rm->op->get_req();
+
+ Message *ack = NULL;
+ eversion_t version;
+
+ if (m->get_type() == MSG_OSD_SUBOP) {
+ // doesn't have CLIENT SUBOP feature ,use Subop
+ MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
+ version = req->version;
+ if (!rm->committed)
+ ack = new MOSDSubOpReply(
+ req, parent->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ } else if (m->get_type() == MSG_OSD_REPOP) {
+ MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
+ version = req->version;
+ if (!rm->committed)
+ ack = new MOSDRepOpReply(
+ static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ } else {
+ assert(0);
+ }
+
+ // send ack to acker only if we haven't sent a commit already
+ if (ack) {
+ ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+ get_parent()->send_message_osd_cluster(
+ rm->ackerosd, ack, get_osdmap()->get_epoch());
+ }
+
+ parent->op_applied(version);
+}
+
+void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
+{
+ rm->op->mark_commit_sent();
+ rm->committed = true;
+
+ // send commit.
+ dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
+ << ", sending commit to osd." << rm->ackerosd
+ << dendl;
+
+ assert(get_osdmap()->is_up(rm->ackerosd));
+ get_parent()->update_last_complete_ondisk(rm->last_complete);
+
+ Message *m = rm->op->get_req();
+ Message *commit;
+ if (m->get_type() == MSG_OSD_SUBOP) {
+ // doesn't have CLIENT SUBOP feature ,use Subop
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ static_cast<MOSDSubOp*>(m),
+ get_parent()->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(rm->last_complete);
+ commit = reply;
+ } else if (m->get_type() == MSG_OSD_REPOP) {
+ MOSDRepOpReply *reply = new MOSDRepOpReply(
+ static_cast<MOSDRepOp*>(m),
+ get_parent()->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(rm->last_complete);
+ commit = reply;
+ }
+ else {
+ assert(0);
+ }
+
+ commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+ get_parent()->send_message_osd_cluster(
+ rm->ackerosd, commit, get_osdmap()->get_epoch());
+
+ log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
+}
+
+
+// ===========================================================
+
+void ReplicatedBackend::calc_head_subsets(
+ ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+ dout(10) << "calc_head_subsets " << head
+ << " clone_overlap " << snapset.clone_overlap << dendl;
+
+ uint64_t size = obc->obs.oi.size;
+ if (size)
+ data_subset.insert(0, size);
+
+ if (get_parent()->get_pool().allow_incomplete_clones()) {
+ dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
+ return;
+ }
+
+ if (!cct->_conf->osd_recover_clone_overlap) {
+ dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
+ return;
+ }
+
+
+ interval_set<uint64_t> cloning;
+ interval_set<uint64_t> prev;
+ if (size)
+ prev.insert(0, size);
+
+ for (int j=snapset.clones.size()-1; j>=0; j--) {
+ hobject_t c = head;
+ c.snap = snapset.clones[j];
+ prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
+ if (!missing.is_missing(c) && c < last_backfill) {
+ dout(10) << "calc_head_subsets " << head << " has prev " << c
+ << " overlap " << prev << dendl;
+ clone_subsets[c] = prev;
+ cloning.union_of(prev);
+ break;
+ }
+ dout(10) << "calc_head_subsets " << head << " does not have prev " << c
+ << " overlap " << prev << dendl;
+ }
+
+
+ if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+ dout(10) << "skipping clone, too many holes" << dendl;
+ clone_subsets.clear();
+ cloning.clear();
+ }
+
+ // what's left for us to push?
+ data_subset.subtract(cloning);
+
+ dout(10) << "calc_head_subsets " << head
+ << " data_subset " << data_subset
+ << " clone_subsets " << clone_subsets << dendl;
+}
+
+void ReplicatedBackend::calc_clone_subsets(
+ SnapSet& snapset, const hobject_t& soid,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+ dout(10) << "calc_clone_subsets " << soid
+ << " clone_overlap " << snapset.clone_overlap << dendl;
+
+ uint64_t size = snapset.clone_size[soid.snap];
+ if (size)
+ data_subset.insert(0, size);
+
+ if (get_parent()->get_pool().allow_incomplete_clones()) {
+ dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
+ return;
+ }
+
+ if (!cct->_conf->osd_recover_clone_overlap) {
+ dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
+ return;
+ }
+
+ unsigned i;
+ for (i=0; i < snapset.clones.size(); i++)
+ if (snapset.clones[i] == soid.snap)
+ break;
+
+ // any overlap with next older clone?
+ interval_set<uint64_t> cloning;
+ interval_set<uint64_t> prev;
+ if (size)
+ prev.insert(0, size);
+ for (int j=i-1; j>=0; j--) {
+ hobject_t c = soid;
+ c.snap = snapset.clones[j];
+ prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
+ if (!missing.is_missing(c) && c < last_backfill) {
+ dout(10) << "calc_clone_subsets " << soid << " has prev " << c
+ << " overlap " << prev << dendl;
+ clone_subsets[c] = prev;
+ cloning.union_of(prev);
+ break;
+ }
+ dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
+ << " overlap " << prev << dendl;
+ }
+
+ // overlap with next newest?
+ interval_set<uint64_t> next;
+ if (size)
+ next.insert(0, size);
+ for (unsigned j=i+1; j<snapset.clones.size(); j++) {
+ hobject_t c = soid;
+ c.snap = snapset.clones[j];
+ next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
+ if (!missing.is_missing(c) && c < last_backfill) {
+ dout(10) << "calc_clone_subsets " << soid << " has next " << c
+ << " overlap " << next << dendl;
+ clone_subsets[c] = next;
+ cloning.union_of(next);
+ break;
+ }
+ dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
+ << " overlap " << next << dendl;
+ }
+
+ if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+ dout(10) << "skipping clone, too many holes" << dendl;
+ clone_subsets.clear();
+ cloning.clear();
+ }
+
+
+ // what's left for us to push?
+ data_subset.subtract(cloning);
+
+ dout(10) << "calc_clone_subsets " << soid
+ << " data_subset " << data_subset
+ << " clone_subsets " << clone_subsets << dendl;
+}
+
+void ReplicatedBackend::prepare_pull(
+ eversion_t v,
+ const hobject_t& soid,
+ ObjectContextRef headctx,
+ RPGHandle *h)
+{
+ assert(get_parent()->get_local_missing().missing.count(soid));
+ eversion_t _v = get_parent()->get_local_missing().missing.find(
+ soid)->second.need;
+ assert(_v == v);
+ const map<hobject_t, set<pg_shard_t> > &missing_loc(
+ get_parent()->get_missing_loc_shards());
+ const map<pg_shard_t, pg_missing_t > &peer_missing(
+ get_parent()->get_shard_missing());
+ map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid);
+ assert(q != missing_loc.end());
+ assert(!q->second.empty());
+
+ // pick a pullee
+ vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
+ random_shuffle(shuffle.begin(), shuffle.end());
+ vector<pg_shard_t>::iterator p = shuffle.begin();
+ assert(get_osdmap()->is_up(p->osd));
+ pg_shard_t fromshard = *p;
+
+ dout(7) << "pull " << soid
+ << " v " << v
+ << " on osds " << *p
+ << " from osd." << fromshard
+ << dendl;
+
+ assert(peer_missing.count(fromshard));
+ const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
+ if (pmissing.is_missing(soid, v)) {
+ assert(pmissing.missing.find(soid)->second.have != v);
+ dout(10) << "pulling soid " << soid << " from osd " << fromshard
+ << " at version " << pmissing.missing.find(soid)->second.have
+ << " rather than at version " << v << dendl;
+ v = pmissing.missing.find(soid)->second.have;
+ assert(get_parent()->get_log().get_log().objects.count(soid) &&
+ (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
+ pg_log_entry_t::LOST_REVERT) &&
+ (get_parent()->get_log().get_log().objects.find(
+ soid)->second->reverting_to ==
+ v));
+ }
+
+ ObjectRecoveryInfo recovery_info;
+
+ if (soid.is_snap()) {
+ assert(!get_parent()->get_local_missing().is_missing(
+ soid.get_head()) ||
+ !get_parent()->get_local_missing().is_missing(
+ soid.get_snapdir()));
+ assert(headctx);
+ // check snapset
+ SnapSetContext *ssc = headctx->ssc;
+ assert(ssc);
+ dout(10) << " snapset " << ssc->snapset << dendl;
+ calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
+ recovery_info.copy_subset,
+ recovery_info.clone_subset);
+ // FIXME: this may overestimate if we are pulling multiple clones in parallel...
+ dout(10) << " pulling " << recovery_info << dendl;
+ } else {
+ // pulling head or unversioned object.
+ // always pull the whole thing.
+ recovery_info.copy_subset.insert(0, (uint64_t)-1);
+ recovery_info.size = ((uint64_t)-1);
+ }
+
+ h->pulls[fromshard].push_back(PullOp());
+ PullOp &op = h->pulls[fromshard].back();
+ op.soid = soid;
+
+ op.recovery_info = recovery_info;
+ op.recovery_info.soid = soid;
+ op.recovery_info.version = v;
+ op.recovery_progress.data_complete = false;
+ op.recovery_progress.omap_complete = false;
+ op.recovery_progress.data_recovered_to = 0;
+ op.recovery_progress.first = true;
+
+ assert(!pulling.count(soid));
+ pull_from_peer[fromshard].insert(soid);
+ PullInfo &pi = pulling[soid];
+ pi.head_ctx = headctx;
+ pi.recovery_info = op.recovery_info;
+ pi.recovery_progress = op.recovery_progress;
+}
+
+/*
+ * intelligently push an object to a replica. make use of existing
+ * clones/heads and dup data ranges where possible.
+ */
+void ReplicatedBackend::prep_push_to_replica(
+ ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
+ PushOp *pop)
+{
+ const object_info_t& oi = obc->obs.oi;
+ uint64_t size = obc->obs.oi.size;
+
+ dout(10) << __func__ << ": " << soid << " v" << oi.version
+ << " size " << size << " to osd." << peer << dendl;
+
+ map<hobject_t, interval_set<uint64_t> > clone_subsets;
+ interval_set<uint64_t> data_subset;
+
+ // are we doing a clone on the replica?
+ if (soid.snap && soid.snap < CEPH_NOSNAP) {
+ hobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
+
+ // try to base push off of clones that succeed/preceed poid
+ // we need the head (and current SnapSet) locally to do that.
+ if (get_parent()->get_local_missing().is_missing(head)) {
+ dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
+ return prep_push(obc, soid, peer, pop);
+ }
+ hobject_t snapdir = head;
+ snapdir.snap = CEPH_SNAPDIR;
+ if (get_parent()->get_local_missing().is_missing(snapdir)) {
+ dout(15) << "push_to_replica missing snapdir " << snapdir
+ << ", pushing raw clone" << dendl;
+ return prep_push(obc, soid, peer, pop);
+ }
+
+ SnapSetContext *ssc = obc->ssc;
+ assert(ssc);
+ dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+ map<pg_shard_t, pg_missing_t>::const_iterator pm =
+ get_parent()->get_shard_missing().find(peer);
+ assert(pm != get_parent()->get_shard_missing().end());
+ map<pg_shard_t, pg_info_t>::const_iterator pi =
+ get_parent()->get_shard_info().find(peer);
+ assert(pi != get_parent()->get_shard_info().end());
+ calc_clone_subsets(ssc->snapset, soid,
+ pm->second,
+ pi->second.last_backfill,
+ data_subset, clone_subsets);
+ } else if (soid.snap == CEPH_NOSNAP) {
+ // pushing head or unversioned object.
+ // base this on partially on replica's clones?
+ SnapSetContext *ssc = obc->ssc;
+ assert(ssc);
+ dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+ calc_head_subsets(
+ obc,
+ ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
+ get_parent()->get_shard_info().find(peer)->second.last_backfill,
+ data_subset, clone_subsets);
+ }
+
+ prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
+}
+
+void ReplicatedBackend::prep_push(ObjectContextRef obc,
+ const hobject_t& soid, pg_shard_t peer,
+ PushOp *pop)
+{
+ interval_set<uint64_t> data_subset;
+ if (obc->obs.oi.size)
+ data_subset.insert(0, obc->obs.oi.size);
+ map<hobject_t, interval_set<uint64_t> > clone_subsets;
+
+ prep_push(obc, soid, peer,
+ obc->obs.oi.version, data_subset, clone_subsets,
+ pop);
+}
+
+void ReplicatedBackend::prep_push(
+ ObjectContextRef obc,
+ const hobject_t& soid, pg_shard_t peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+ PushOp *pop)
+{
+ get_parent()->begin_peer_recover(peer, soid);
+ // take note.
+ PushInfo &pi = pushing[soid][peer];
+ pi.obc = obc;
+ pi.recovery_info.size = obc->obs.oi.size;
+ pi.recovery_info.copy_subset = data_subset;
+ pi.recovery_info.clone_subset = clone_subsets;
+ pi.recovery_info.soid = soid;
+ pi.recovery_info.oi = obc->obs.oi;
+ pi.recovery_info.version = version;
+ pi.recovery_progress.first = true;
+ pi.recovery_progress.data_recovered_to = 0;
+ pi.recovery_progress.data_complete = 0;
+ pi.recovery_progress.omap_complete = 0;
+
+ ObjectRecoveryProgress new_progress;
+ int r = build_push_op(pi.recovery_info,
+ pi.recovery_progress,
+ &new_progress,
+ pop,
+ &(pi.stat));
+ assert(r == 0);
+ pi.recovery_progress = new_progress;
+}
+
+int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectRecoveryProgress progress)
+{
+ // send op
+ ceph_tid_t tid = get_parent()->get_tid();
+ osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
+
+ dout(10) << "send_pull_op " << recovery_info.soid << " "
+ << recovery_info.version
+ << " first=" << progress.first
+ << " data " << recovery_info.copy_subset
+ << " from osd." << peer
+ << " tid " << tid << dendl;
+
+ MOSDSubOp *subop = new MOSDSubOp(
+ rid, parent->whoami_shard(),
+ get_info().pgid, recovery_info.soid,
+ CEPH_OSD_FLAG_ACK,
+ get_osdmap()->get_epoch(), tid,
+ recovery_info.version);
+ subop->set_priority(prio);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PULL;
+ subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk;
+ subop->recovery_info = recovery_info;
+ subop->recovery_progress = progress;
+
+ get_parent()->send_message_osd_cluster(
+ peer.osd, subop, get_osdmap()->get_epoch());
+
+ get_parent()->get_logger()->inc(l_osd_pull);
+ return 0;
+}
+
+void ReplicatedBackend::submit_push_data(
+ ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ bufferlist omap_header,
+ map<string, bufferlist> &attrs,
+ map<string, bufferlist> &omap_entries,
+ ObjectStore::Transaction *t)
+{
+ coll_t target_coll;
+ if (first && complete) {
+ target_coll = coll;
+ } else {
+ dout(10) << __func__ << ": Creating oid "
+ << recovery_info.soid << " in the temp collection" << dendl;
+ add_temp_obj(recovery_info.soid);
+ target_coll = get_temp_coll(t);
+ }
+
+ if (first) {
+ get_parent()->on_local_recover_start(recovery_info.soid, t);
+ t->remove(get_temp_coll(t), recovery_info.soid);
+ t->touch(target_coll, recovery_info.soid);
+ t->truncate(target_coll, recovery_info.soid, recovery_info.size);
+ t->omap_setheader(target_coll, recovery_info.soid, omap_header);
+ }
+ uint64_t off = 0;
+ for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
+ p != intervals_included.end();
+ ++p) {
+ bufferlist bit;
+ bit.substr_of(data_included, off, p.get_len());
+ t->write(target_coll, recovery_info.soid,
+ p.get_start(), p.get_len(), bit);
+ off += p.get_len();
+ }
+
+ t->omap_setkeys(target_coll, recovery_info.soid,
+ omap_entries);
+ t->setattrs(target_coll, recovery_info.soid,
+ attrs);
+
+ if (complete) {
+ if (!first) {
+ dout(10) << __func__ << ": Removing oid "
+ << recovery_info.soid << " from the temp collection" << dendl;
+ clear_temp_obj(recovery_info.soid);
+ t->collection_move(coll, target_coll, recovery_info.soid);
+ }
+
+ submit_push_complete(recovery_info, t);
+ }
+}
+
+void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t)
+{
+ for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
+ recovery_info.clone_subset.begin();
+ p != recovery_info.clone_subset.end();
+ ++p) {
+ for (interval_set<uint64_t>::const_iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ dout(15) << " clone_range " << p->first << " "
+ << q.get_start() << "~" << q.get_len() << dendl;
+ t->clone_range(coll, p->first, recovery_info.soid,
+ q.get_start(), q.get_len(), q.get_start());
+ }
+ }
+}
+
+ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
+ const ObjectRecoveryInfo& recovery_info,
+ SnapSetContext *ssc)
+{
+ if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
+ return recovery_info;
+ ObjectRecoveryInfo new_info = recovery_info;
+ new_info.copy_subset.clear();
+ new_info.clone_subset.clear();
+ assert(ssc);
+ calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
+ new_info.copy_subset, new_info.clone_subset);
+ return new_info;
+}
+
+bool ReplicatedBackend::handle_pull_response(
+ pg_shard_t from, PushOp &pop, PullOp *response,
+ list<hobject_t> *to_continue,
+ ObjectStore::Transaction *t
+ )
+{
+ interval_set<uint64_t> data_included = pop.data_included;
+ bufferlist data;
+ data.claim(pop.data);
+ dout(10) << "handle_pull_response "
+ << pop.recovery_info
+ << pop.after_progress
+ << " data.size() is " << data.length()
+ << " data_included: " << data_included
+ << dendl;
+ if (pop.version == eversion_t()) {
+ // replica doesn't have it!
+ _failed_push(from, pop.soid);
+ return false;
+ }
+
+ hobject_t &hoid = pop.soid;
+ assert((data_included.empty() && data.length() == 0) ||
+ (!data_included.empty() && data.length() > 0));
+
+ if (!pulling.count(hoid)) {
+ return false;
+ }
+
+ PullInfo &pi = pulling[hoid];
+ if (pi.recovery_info.size == (uint64_t(-1))) {
+ pi.recovery_info.size = pop.recovery_info.size;
+ pi.recovery_info.copy_subset.intersection_of(
+ pop.recovery_info.copy_subset);
+ }
+
+ bool first = pi.recovery_progress.first;
+ if (first) {
+ pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
+ pi.recovery_info.oi = pi.obc->obs.oi;
+ pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
+ }
+
+
+ interval_set<uint64_t> usable_intervals;
+ bufferlist usable_data;
+ trim_pushed_data(pi.recovery_info.copy_subset,
+ data_included,
+ data,
+ &usable_intervals,
+ &usable_data);
+ data_included = usable_intervals;
+ data.claim(usable_data);
+
+
+ pi.recovery_progress = pop.after_progress;
+
+ pi.stat.num_bytes_recovered += data.length();
+
+ dout(10) << "new recovery_info " << pi.recovery_info
+ << ", new progress " << pi.recovery_progress
+ << dendl;
+
+ bool complete = pi.is_complete();
+
+ submit_push_data(pi.recovery_info, first,
+ complete,
+ data_included, data,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
+ t);
+
+ pi.stat.num_keys_recovered += pop.omap_entries.size();
+
+ if (complete) {
+ to_continue->push_back(hoid);
+ pi.stat.num_objects_recovered++;
+ get_parent()->on_local_recover(
+ hoid, pi.stat, pi.recovery_info, pi.obc, t);
+ pull_from_peer[from].erase(hoid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
+ return false;
+ } else {
+ response->soid = pop.soid;
+ response->recovery_info = pi.recovery_info;
+ response->recovery_progress = pi.recovery_progress;
+ return true;
+ }
+}
+
+void ReplicatedBackend::handle_push(
+ pg_shard_t from, PushOp &pop, PushReplyOp *response,
+ ObjectStore::Transaction *t)
+{
+ dout(10) << "handle_push "
+ << pop.recovery_info
+ << pop.after_progress
+ << dendl;
+ bufferlist data;
+ data.claim(pop.data);
+ bool first = pop.before_progress.first;
+ bool complete = pop.after_progress.data_complete &&
+ pop.after_progress.omap_complete;
+
+ response->soid = pop.recovery_info.soid;
+ submit_push_data(pop.recovery_info,
+ first,
+ complete,
+ pop.data_included,
+ data,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
+ t);
+
+ if (complete)
+ get_parent()->on_local_recover(
+ pop.recovery_info.soid,
+ object_stat_sum_t(),
+ pop.recovery_info,
+ ObjectContextRef(), // ok, is replica
+ t);
+}
+
+void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
+{
+ for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
+ i != pushes.end();
+ ++i) {
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
+ i->first.osd,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+ for (vector<PushOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ dout(20) << __func__ << ": sending push (legacy) " << *j
+ << " to osd." << i->first << dendl;
+ send_push_op_legacy(prio, i->first, *j);
+ }
+ } else {
+ vector<PushOp>::iterator j = i->second.begin();
+ while (j != i->second.end()) {
+ uint64_t cost = 0;
+ uint64_t pushes = 0;
+ MOSDPGPush *msg = new MOSDPGPush();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = get_parent()->primary_spg_t();
+ msg->map_epoch = get_osdmap()->get_epoch();
+ msg->set_priority(prio);
+ for (;
+ (j != i->second.end() &&
+ cost < cct->_conf->osd_max_push_cost &&
+ pushes < cct->_conf->osd_max_push_objects) ;
+ ++j) {
+ dout(20) << __func__ << ": sending push " << *j
+ << " to osd." << i->first << dendl;
+ cost += j->cost(cct);
+ pushes += 1;
+ msg->pushes.push_back(*j);
+ }
+ msg->compute_cost(cct);
+ get_parent()->send_message_osd_cluster(msg, con);
+ }
+ }
+ }
+}
+
+void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
+{
+ for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
+ i != pulls.end();
+ ++i) {
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
+ i->first.osd,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+ for (vector<PullOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ dout(20) << __func__ << ": sending pull (legacy) " << *j
+ << " to osd." << i->first << dendl;
+ send_pull_legacy(
+ prio,
+ i->first,
+ j->recovery_info,
+ j->recovery_progress);
+ }
+ } else {
+ dout(20) << __func__ << ": sending pulls " << i->second
+ << " to osd." << i->first << dendl;
+ MOSDPGPull *msg = new MOSDPGPull();
+ msg->from = parent->whoami_shard();
+ msg->set_priority(prio);
+ msg->pgid = get_parent()->primary_spg_t();
+ msg->map_epoch = get_osdmap()->get_epoch();
+ msg->pulls.swap(i->second);
+ msg->compute_cost(cct);
+ get_parent()->send_message_osd_cluster(msg, con);
+ }
+ }
+}
+
+int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op,
+ object_stat_sum_t *stat)
+{
+ ObjectRecoveryProgress _new_progress;
+ if (!out_progress)
+ out_progress = &_new_progress;
+ ObjectRecoveryProgress &new_progress = *out_progress;
+ new_progress = progress;
+
+ dout(7) << "send_push_op " << recovery_info.soid
+ << " v " << recovery_info.version
+ << " size " << recovery_info.size
+ << " recovery_info: " << recovery_info
+ << dendl;
+
+ if (progress.first) {
+ store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+ store->getattrs(coll, recovery_info.soid, out_op->attrset);
+
+ // Debug
+ bufferlist bv = out_op->attrset[OI_ATTR];
+ object_info_t oi(bv);
+
+ if (oi.version != recovery_info.version) {
+ get_parent()->clog_error() << get_info().pgid << " push "
+ << recovery_info.soid << " v "
+ << recovery_info.version
+ << " failed because local copy is "
+ << oi.version << "\n";
+ return -EINVAL;
+ }
+
+ new_progress.first = false;
+ }
+
+ uint64_t available = cct->_conf->osd_recovery_max_chunk;
+ if (!progress.omap_complete) {
+ ObjectMap::ObjectMapIterator iter =
+ store->get_omap_iterator(coll,
+ recovery_info.soid);
+ for (iter->lower_bound(progress.omap_recovered_to);
+ iter->valid();
+ iter->next()) {
+ if (!out_op->omap_entries.empty() &&
+ available <= (iter->key().size() + iter->value().length()))
+ break;
+ out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
+
+ if ((iter->key().size() + iter->value().length()) <= available)
+ available -= (iter->key().size() + iter->value().length());
+ else
+ available = 0;
+ }
+ if (!iter->valid())
+ new_progress.omap_complete = true;
+ else
+ new_progress.omap_recovered_to = iter->key();
+ }
+
+ if (available > 0) {
+ if (!recovery_info.copy_subset.empty()) {
+ interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
+ bufferlist bl;
+ int r = store->fiemap(coll, recovery_info.soid, 0,
+ copy_subset.range_end(), bl);
+ if (r >= 0) {
+ interval_set<uint64_t> fiemap_included;
+ map<uint64_t, uint64_t> m;
+ bufferlist::iterator iter = bl.begin();
+ ::decode(m, iter);
+ map<uint64_t, uint64_t>::iterator miter;
+ for (miter = m.begin(); miter != m.end(); ++miter) {
+ fiemap_included.insert(miter->first, miter->second);
+ }
+
+ copy_subset.intersection_of(fiemap_included);
+ }
+ out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
+ available);
+ if (out_op->data_included.empty()) // zero filled section, skip to end!
+ new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
+ else
+ new_progress.data_recovered_to = out_op->data_included.range_end();
+ }
+ } else {
+ out_op->data_included.clear();
+ }
+
+ for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
+ p != out_op->data_included.end();
+ ++p) {
+ bufferlist bit;
+ store->read(coll, recovery_info.soid,
+ p.get_start(), p.get_len(), bit);
+ if (p.get_len() != bit.length()) {
+ dout(10) << " extent " << p.get_start() << "~" << p.get_len()
+ << " is actually " << p.get_start() << "~" << bit.length()
+ << dendl;
+ interval_set<uint64_t>::iterator save = p++;
+ if (bit.length() == 0)
+ out_op->data_included.erase(save); //Remove this empty interval
+ else
+ save.set_len(bit.length());
+ // Remove any other intervals present
+ while (p != out_op->data_included.end()) {
+ interval_set<uint64_t>::iterator save = p++;
+ out_op->data_included.erase(save);
+ }
+ new_progress.data_complete = true;
+ out_op->data.claim_append(bit);
+ break;
+ }
+ out_op->data.claim_append(bit);
+ }
+
+ if (new_progress.is_complete(recovery_info)) {
+ new_progress.data_complete = true;
+ if (stat)
+ stat->num_objects_recovered++;
+ }
+
+ if (stat) {
+ stat->num_keys_recovered += out_op->omap_entries.size();
+ stat->num_bytes_recovered += out_op->data.length();
+ }
+
+ get_parent()->get_logger()->inc(l_osd_push);
+ get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
+
+ // send
+ out_op->version = recovery_info.version;
+ out_op->soid = recovery_info.soid;
+ out_op->recovery_info = recovery_info;
+ out_op->after_progress = new_progress;
+ out_op->before_progress = progress;
+ return 0;
+}
+
+int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop)
+{
+ ceph_tid_t tid = get_parent()->get_tid();
+ osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
+ MOSDSubOp *subop = new MOSDSubOp(
+ rid, parent->whoami_shard(),
+ spg_t(get_info().pgid.pgid, peer.shard), pop.soid,
+ 0, get_osdmap()->get_epoch(),
+ tid, pop.recovery_info.version);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+ subop->set_priority(prio);
+ subop->version = pop.version;
+ subop->ops[0].indata.claim(pop.data);
+ subop->data_included.swap(pop.data_included);
+ subop->omap_header.claim(pop.omap_header);
+ subop->omap_entries.swap(pop.omap_entries);
+ subop->attrset.swap(pop.attrset);
+ subop->recovery_info = pop.recovery_info;
+ subop->current_progress = pop.before_progress;
+ subop->recovery_progress = pop.after_progress;
+
+ get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
+ return 0;
+}
+
+void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
+{
+ op->recovery_info.version = eversion_t();
+ op->version = eversion_t();
+ op->soid = soid;
+}
+
+void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
+{
+ MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
+ const hobject_t& soid = reply->get_poid();
+ assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
+ dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+ pg_shard_t peer = reply->from;
+
+ op->mark_started();
+
+ PushReplyOp rop;
+ rop.soid = soid;
+ PushOp pop;
+ bool more = handle_push_reply(peer, rop, &pop);
+ if (more)
+ send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
+}
+
+bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply)
+{
+ const hobject_t &soid = op.soid;
+ if (pushing.count(soid) == 0) {
+ dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
+ << ", or anybody else"
+ << dendl;
+ return false;
+ } else if (pushing[soid].count(peer) == 0) {
+ dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
+ << dendl;
+ return false;
+ } else {
+ PushInfo *pi = &pushing[soid][peer];
+
+ if (!pi->recovery_progress.data_complete) {
+ dout(10) << " pushing more from, "
+ << pi->recovery_progress.data_recovered_to
+ << " of " << pi->recovery_info.copy_subset << dendl;
+ ObjectRecoveryProgress new_progress;
+ int r = build_push_op(
+ pi->recovery_info,
+ pi->recovery_progress, &new_progress, reply,
+ &(pi->stat));
+ assert(r == 0);
+ pi->recovery_progress = new_progress;
+ return true;
+ } else {
+ // done!
+ get_parent()->on_peer_recover(
+ peer, soid, pi->recovery_info,
+ pi->stat);
+
+ pushing[soid].erase(peer);
+ pi = NULL;
+
+
+ if (pushing[soid].empty()) {
+ get_parent()->on_global_recover(soid);
+ pushing.erase(soid);
+ } else {
+ dout(10) << "pushed " << soid << ", still waiting for push ack from "
+ << pushing[soid].size() << " others" << dendl;
+ }
+ return false;
+ }
+ }
+}
+
+/** op_pull
+ * process request to pull an entire object.
+ * NOTE: called from opqueue.
+ */
+void ReplicatedBackend::sub_op_pull(OpRequestRef op)
+{
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+ assert(m->get_type() == MSG_OSD_SUBOP);
+
+ op->mark_started();
+
+ const hobject_t soid = m->poid;
+
+ dout(7) << "pull" << soid << " v " << m->version
+ << " from " << m->get_source()
+ << dendl;
+
+ assert(!is_primary()); // we should be a replica or stray.
+
+ PullOp pop;
+ pop.soid = soid;
+ pop.recovery_info = m->recovery_info;
+ pop.recovery_progress = m->recovery_progress;
+
+ PushOp reply;
+ handle_pull(m->from, pop, &reply);
+ send_push_op_legacy(
+ m->get_priority(),
+ m->from,
+ reply);
+
+ log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull);
+}
+
+void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
+{
+ const hobject_t &soid = op.soid;
+ struct stat st;
+ int r = store->stat(coll, soid, &st);
+ if (r != 0) {
+ get_parent()->clog_error() << get_info().pgid << " "
+ << peer << " tried to pull " << soid
+ << " but got " << cpp_strerror(-r) << "\n";
+ prep_push_op_blank(soid, reply);
+ } else {
+ ObjectRecoveryInfo &recovery_info = op.recovery_info;
+ ObjectRecoveryProgress &progress = op.recovery_progress;
+ if (progress.first && recovery_info.size == ((uint64_t)-1)) {
+ // Adjust size and copy_subset
+ recovery_info.size = st.st_size;
+ recovery_info.copy_subset.clear();
+ if (st.st_size)
+ recovery_info.copy_subset.insert(0, st.st_size);
+ assert(recovery_info.clone_subset.empty());
+ }
+
+ r = build_push_op(recovery_info, progress, 0, reply);
+ if (r < 0)
+ prep_push_op_blank(soid, reply);
+ }
+}
+
+/**
+ * trim received data to remove what we don't want
+ *
+ * @param copy_subset intervals we want
+ * @param data_included intervals we got
+ * @param data_recieved data we got
+ * @param intervals_usable intervals we want to keep
+ * @param data_usable matching data we want to keep
+ */
+void ReplicatedBackend::trim_pushed_data(
+ const interval_set<uint64_t> &copy_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable)
+{
+ if (intervals_received.subset_of(copy_subset)) {
+ *intervals_usable = intervals_received;
+ *data_usable = data_received;
+ return;
+ }
+
+ intervals_usable->intersection_of(copy_subset,
+ intervals_received);
+
+ uint64_t off = 0;
+ for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
+ p != intervals_received.end();
+ ++p) {
+ interval_set<uint64_t> x;
+ x.insert(p.get_start(), p.get_len());
+ x.intersection_of(copy_subset);
+ for (interval_set<uint64_t>::const_iterator q = x.begin();
+ q != x.end();
+ ++q) {
+ bufferlist sub;
+ uint64_t data_off = off + (q.get_start() - p.get_start());
+ sub.substr_of(data_received, data_off, q.get_len());
+ data_usable->claim_append(sub);
+ }
+ off += p.get_len();
+ }
+}
+
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void ReplicatedBackend::sub_op_push(OpRequestRef op)
+{
+ op->mark_started();
+ MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
+
+ PushOp pop;
+ pop.soid = m->recovery_info.soid;
+ pop.version = m->version;
+ m->claim_data(pop.data);
+ pop.data_included.swap(m->data_included);
+ pop.omap_header.swap(m->omap_header);
+ pop.omap_entries.swap(m->omap_entries);
+ pop.attrset.swap(m->attrset);
+ pop.recovery_info = m->recovery_info;
+ pop.before_progress = m->current_progress;
+ pop.after_progress = m->recovery_progress;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+
+ if (is_primary()) {
+ PullOp resp;
+ RPGHandle *h = _open_recovery_op();
+ list<hobject_t> to_continue;
+ bool more = handle_pull_response(
+ m->from, pop, &resp,
+ &to_continue, t);
+ if (more) {
+ send_pull_legacy(
+ m->get_priority(),
+ m->from,
+ resp.recovery_info,
+ resp.recovery_progress);
+ } else {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ op->get_req()->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new PG_RecoveryQueueAsync(
+ get_parent(),
+ get_parent()->bless_gencontext(c)));
+ }
+ run_recovery_op(h, op->get_req()->get_priority());
+ } else {
+ PushReplyOp resp;
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ m, parent->whoami_shard(), 0,
+ get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ reply->set_priority(m->get_priority());
+ assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+ handle_push(m->from, pop, &resp, t);
+ t->register_on_complete(new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
+ }
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
+ get_parent()->queue_transaction(t);
+ return;
+}
+
+void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
+{
+ get_parent()->failed_push(from, soid);
+ pull_from_peer[from].erase(soid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
+ pulling.erase(soid);
+}
+
+int ReplicatedBackend::start_pushes(
+ const hobject_t &soid,
+ ObjectContextRef obc,
+ RPGHandle *h)
+{
+ int pushes = 0;
+ // who needs it?
+ assert(get_parent()->get_actingbackfill_shards().size() > 0);
+ for (set<pg_shard_t>::iterator i =
+ get_parent()->get_actingbackfill_shards().begin();
+ i != get_parent()->get_actingbackfill_shards().end();
+ ++i) {
+ if (*i == get_parent()->whoami_shard()) continue;
+ pg_shard_t peer = *i;
+ map<pg_shard_t, pg_missing_t>::const_iterator j =
+ get_parent()->get_shard_missing().find(peer);
+ assert(j != get_parent()->get_shard_missing().end());
+ if (j->second.is_missing(soid)) {
+ ++pushes;
+ h->pushes[peer].push_back(PushOp());
+ prep_push_to_replica(obc, soid, peer,
+ &(h->pushes[peer].back())
+ );
+ }
+ }
+ return pushes;
+}
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 64b88eb0266..c748aa9c74e 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -90,35 +90,6 @@ PGLSFilter::~PGLSFilter()
{
}
-static void log_subop_stats(
- PerfCounters *logger,
- OpRequestRef op, int subop)
-{
- utime_t now = ceph_clock_now(g_ceph_context);
- utime_t latency = now;
- latency -= op->get_req()->get_recv_stamp();
-
-
- logger->inc(l_osd_sop);
- logger->tinc(l_osd_sop_lat, latency);
- logger->inc(subop);
-
- if (subop != l_osd_sop_pull) {
- uint64_t inb = op->get_req()->get_data().length();
- logger->inc(l_osd_sop_inb, inb);
- if (subop == l_osd_sop_w) {
- logger->inc(l_osd_sop_w_inb, inb);
- logger->tinc(l_osd_sop_w_lat, latency);
- } else if (subop == l_osd_sop_push) {
- logger->inc(l_osd_sop_push_inb, inb);
- logger->tinc(l_osd_sop_push_lat, latency);
- } else
- assert("no support subop" == 0);
- } else {
- logger->tinc(l_osd_sop_pull_lat, latency);
- }
-}
-
struct OnReadComplete : public Context {
ReplicatedPG *pg;
ReplicatedPG::OpContext *opcontext;
@@ -2605,150 +2576,6 @@ void ReplicatedPG::do_scan(
}
}
-void ReplicatedBackend::_do_push(OpRequestRef op)
-{
- MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_PUSH);
- pg_shard_t from = m->from;
-
- vector<PushReplyOp> replies;
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- for (vector<PushOp>::iterator i = m->pushes.begin();
- i != m->pushes.end();
- ++i) {
- replies.push_back(PushReplyOp());
- handle_push(from, *i, &(replies.back()), t);
- }
-
- MOSDPGPushReply *reply = new MOSDPGPushReply;
- reply->from = get_parent()->whoami_shard();
- reply->set_priority(m->get_priority());
- reply->pgid = get_info().pgid;
- reply->map_epoch = m->map_epoch;
- reply->replies.swap(replies);
- reply->compute_cost(cct);
-
- t->register_on_complete(
- new PG_SendMessageOnConn(
- get_parent(), reply, m->get_connection()));
-
- t->register_on_applied(
- new ObjectStore::C_DeleteTransaction(t));
- get_parent()->queue_transaction(t);
-}
-
-struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
- ReplicatedBackend *bc;
- list<hobject_t> to_continue;
- int priority;
- C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
- : bc(bc), priority(priority) {}
-
- void finish(ThreadPool::TPHandle &handle) {
- ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
- for (list<hobject_t>::iterator i =
- to_continue.begin();
- i != to_continue.end();
- ++i) {
- map<hobject_t, ReplicatedBackend::PullInfo>::iterator j =
- bc->pulling.find(*i);
- assert(j != bc->pulling.end());
- if (!bc->start_pushes(*i, j->second.obc, h)) {
- bc->get_parent()->on_global_recover(
- *i);
- }
- bc->pulling.erase(*i);
- handle.reset_tp_timeout();
- }
- bc->run_recovery_op(h, priority);
- }
-};
-
-void ReplicatedBackend::_do_pull_response(OpRequestRef op)
-{
- MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_PUSH);
- pg_shard_t from = m->from;
-
- vector<PullOp> replies(1);
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- list<hobject_t> to_continue;
- for (vector<PushOp>::iterator i = m->pushes.begin();
- i != m->pushes.end();
- ++i) {
- bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
- if (more)
- replies.push_back(PullOp());
- }
- if (!to_continue.empty()) {
- C_ReplicatedBackend_OnPullComplete *c =
- new C_ReplicatedBackend_OnPullComplete(
- this,
- m->get_priority());
- c->to_continue.swap(to_continue);
- t->register_on_complete(
- new PG_RecoveryQueueAsync(
- get_parent(),
- get_parent()->bless_gencontext(c)));
- }
- replies.erase(replies.end() - 1);
-
- if (replies.size()) {
- MOSDPGPull *reply = new MOSDPGPull;
- reply->from = parent->whoami_shard();
- reply->set_priority(m->get_priority());
- reply->pgid = get_info().pgid;
- reply->map_epoch = m->map_epoch;
- reply->pulls.swap(replies);
- reply->compute_cost(cct);
-
- t->register_on_complete(
- new PG_SendMessageOnConn(
- get_parent(), reply, m->get_connection()));
- }
-
- t->register_on_applied(
- new ObjectStore::C_DeleteTransaction(t));
- get_parent()->queue_transaction(t);
-}
-
-void ReplicatedBackend::do_pull(OpRequestRef op)
-{
- MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_PULL);
- pg_shard_t from = m->from;
-
- map<pg_shard_t, vector<PushOp> > replies;
- for (vector<PullOp>::iterator i = m->pulls.begin();
- i != m->pulls.end();
- ++i) {
- replies[from].push_back(PushOp());
- handle_pull(from, *i, &(replies[from].back()));
- }
- send_pushes(m->get_priority(), replies);
-}
-
-void ReplicatedBackend::do_push_reply(OpRequestRef op)
-{
- MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
- pg_shard_t from = m->from;
-
- vector<PushOp> replies(1);
- for (vector<PushReplyOp>::iterator i = m->replies.begin();
- i != m->replies.end();
- ++i) {
- bool more = handle_push_reply(from, *i, &(replies.back()));
- if (more)
- replies.push_back(PushOp());
- }
- replies.erase(replies.end() - 1);
-
- map<pg_shard_t, vector<PushOp> > _replies;
- _replies[from].swap(replies);
- send_pushes(m->get_priority(), _replies);
-}
-
void ReplicatedPG::do_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
@@ -7608,136 +7435,6 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
repop->ctx->op_t = NULL;
}
-template<typename T, int MSGTYPE>
-Message * ReplicatedBackend::generate_subop(
- const hobject_t &soid,
- const eversion_t &at_version,
- ceph_tid_t tid,
- osd_reqid_t reqid,
- eversion_t pg_trim_to,
- eversion_t pg_trim_rollback_to,
- hobject_t new_temp_oid,
- hobject_t discard_temp_oid,
- const vector<pg_log_entry_t> &log_entries,
- boost::optional<pg_hit_set_history_t> &hset_hist,
- InProgressOp *op,
- ObjectStore::Transaction *op_t,
- pg_shard_t peer,
- const pg_info_t &pinfo)
-{
- int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
- assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
- // forward the write/update/whatever
- T *wr = new T(
- reqid, parent->whoami_shard(),
- spg_t(get_info().pgid.pgid, peer.shard),
- soid, acks_wanted,
- get_osdmap()->get_epoch(),
- tid, at_version);
-
- // ship resulting transaction, log entries, and pg_stats
- if (!parent->should_send_op(peer, soid)) {
- dout(10) << "issue_repop shipping empty opt to osd." << peer
- <<", object " << soid
- << " beyond MAX(last_backfill_started "
- << ", pinfo.last_backfill "
- << pinfo.last_backfill << ")" << dendl;
- ObjectStore::Transaction t;
- t.set_use_tbl(op_t->get_use_tbl());
- ::encode(t, wr->get_data());
- } else {
- ::encode(*op_t, wr->get_data());
- }
-
- ::encode(log_entries, wr->logbl);
-
- if (pinfo.is_incomplete())
- wr->pg_stats = pinfo.stats; // reflects backfill progress
- else
- wr->pg_stats = get_info().stats;
-
- wr->pg_trim_to = pg_trim_to;
- wr->pg_trim_rollback_to = pg_trim_rollback_to;
-
- wr->new_temp_oid = new_temp_oid;
- wr->discard_temp_oid = discard_temp_oid;
- wr->updated_hit_set_history = hset_hist;
- return wr;
-}
-
-void ReplicatedBackend::issue_op(
- const hobject_t &soid,
- const eversion_t &at_version,
- ceph_tid_t tid,
- osd_reqid_t reqid,
- eversion_t pg_trim_to,
- eversion_t pg_trim_rollback_to,
- hobject_t new_temp_oid,
- hobject_t discard_temp_oid,
- const vector<pg_log_entry_t> &log_entries,
- boost::optional<pg_hit_set_history_t> &hset_hist,
- InProgressOp *op,
- ObjectStore::Transaction *op_t)
-{
-
- if (parent->get_actingbackfill_shards().size() > 1) {
- ostringstream ss;
- set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
- replicas.erase(parent->whoami_shard());
- ss << "waiting for subops from " << replicas;
- if (op->op)
- op->op->mark_sub_op_sent(ss.str());
- }
- for (set<pg_shard_t>::const_iterator i =
- parent->get_actingbackfill_shards().begin();
- i != parent->get_actingbackfill_shards().end();
- ++i) {
- if (*i == parent->whoami_shard()) continue;
- pg_shard_t peer = *i;
- const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
-
- Message *wr;
- uint64_t min_features = parent->min_peer_features();
- if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
- dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
- wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
- soid,
- at_version,
- tid,
- reqid,
- pg_trim_to,
- pg_trim_rollback_to,
- new_temp_oid,
- discard_temp_oid,
- log_entries,
- hset_hist,
- op,
- op_t,
- peer,
- pinfo);
- } else {
- wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
- soid,
- at_version,
- tid,
- reqid,
- pg_trim_to,
- pg_trim_rollback_to,
- new_temp_oid,
- discard_temp_oid,
- log_entries,
- hset_hist,
- op,
- op_t,
- peer,
- pinfo);
- }
-
- get_parent()->send_message_osd_cluster(
- peer.osd, wr, get_osdmap()->get_epoch());
- }
-}
-
ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc,
ceph_tid_t rep_tid)
{
@@ -8439,341 +8136,6 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
}
}
-// sub op modify
-void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
- Message *m = op->get_req();
- int msg_type = m->get_type();
- if (msg_type == MSG_OSD_SUBOP) {
- sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
- } else if (msg_type == MSG_OSD_REPOP) {
- sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
- } else {
- assert(0);
- }
-}
-
-template<typename T, int MSGTYPE>
-void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
-{
- T *m = static_cast<T *>(op->get_req());
- int msg_type = m->get_type();
- assert(MSGTYPE == msg_type);
- assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
-
- const hobject_t& soid = m->poid;
-
- dout(10) << "sub_op_modify trans"
- << " " << soid
- << " v " << m->version
- << (m->logbl.length() ? " (transaction)" : " (parallel exec")
- << " " << m->logbl.length()
- << dendl;
-
- // sanity checks
- assert(m->map_epoch >= get_info().history.same_interval_since);
-
- // we better not be missing this.
- assert(!parent->get_log().get_missing().is_missing(soid));
-
- int ackerosd = m->get_source().num();
-
- op->mark_started();
-
- RepModifyRef rm(new RepModify);
- rm->op = op;
- rm->ackerosd = ackerosd;
- rm->last_complete = get_info().last_complete;
- rm->epoch_started = get_osdmap()->get_epoch();
-
- assert(m->logbl.length());
- // shipped transaction and log entries
- vector<pg_log_entry_t> log;
-
- bufferlist::iterator p = m->get_data().begin();
- ::decode(rm->opt, p);
- rm->localt.set_use_tbl(rm->opt.get_use_tbl());
-
- if (m->new_temp_oid != hobject_t()) {
- dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
- add_temp_obj(m->new_temp_oid);
- get_temp_coll(&rm->localt);
- }
- if (m->discard_temp_oid != hobject_t()) {
- dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
- if (rm->opt.empty()) {
- dout(10) << __func__ << ": removing object " << m->discard_temp_oid
- << " since we won't get the transaction" << dendl;
- rm->localt.remove(temp_coll, m->discard_temp_oid);
- }
- clear_temp_obj(m->discard_temp_oid);
- }
-
- p = m->logbl.begin();
- ::decode(log, p);
- rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
-
- bool update_snaps = false;
- if (!rm->opt.empty()) {
- // If the opt is non-empty, we infer we are before
- // last_backfill (according to the primary, not our
- // not-quite-accurate value), and should update the
- // collections now. Otherwise, we do it later on push.
- update_snaps = true;
- }
- parent->update_stats(m->pg_stats);
- parent->log_operation(
- log,
- m->updated_hit_set_history,
- m->pg_trim_to,
- m->pg_trim_rollback_to,
- update_snaps,
- &(rm->localt));
-
- rm->bytes_written = rm->opt.get_encoded_bytes();
-
- op->mark_started();
-
- rm->localt.append(rm->opt);
- rm->localt.register_on_commit(
- parent->bless_context(
- new C_OSD_RepModifyCommit(this, rm)));
- rm->localt.register_on_applied(
- parent->bless_context(
- new C_OSD_RepModifyApply(this, rm)));
- parent->queue_transaction(&(rm->localt), op);
- // op is cleaned up by oncommit/onapply when both are executed
-}
-
-void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
-{
- rm->op->mark_event("sub_op_applied");
- rm->applied = true;
-
- dout(10) << "sub_op_modify_applied on " << rm << " op "
- << *rm->op->get_req() << dendl;
- Message *m = rm->op->get_req();
-
- Message *ack = NULL;
- eversion_t version;
-
- if (m->get_type() == MSG_OSD_SUBOP) {
- // doesn't have CLIENT SUBOP feature ,use Subop
- MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
- version = req->version;
- if (!rm->committed)
- ack = new MOSDSubOpReply(
- req, parent->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- } else if (m->get_type() == MSG_OSD_REPOP) {
- MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
- version = req->version;
- if (!rm->committed)
- ack = new MOSDRepOpReply(
- static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- } else {
- assert(0);
- }
-
- // send ack to acker only if we haven't sent a commit already
- if (ack) {
- ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- get_parent()->send_message_osd_cluster(
- rm->ackerosd, ack, get_osdmap()->get_epoch());
- }
-
- parent->op_applied(version);
-}
-
-void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
-{
- rm->op->mark_commit_sent();
- rm->committed = true;
-
- // send commit.
- dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
- << ", sending commit to osd." << rm->ackerosd
- << dendl;
-
- assert(get_osdmap()->is_up(rm->ackerosd));
- get_parent()->update_last_complete_ondisk(rm->last_complete);
-
- Message *m = rm->op->get_req();
- Message *commit;
- if (m->get_type() == MSG_OSD_SUBOP) {
- // doesn't have CLIENT SUBOP feature ,use Subop
- MOSDSubOpReply *reply = new MOSDSubOpReply(
- static_cast<MOSDSubOp*>(m),
- get_parent()->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
- reply->set_last_complete_ondisk(rm->last_complete);
- commit = reply;
- } else if (m->get_type() == MSG_OSD_REPOP) {
- MOSDRepOpReply *reply = new MOSDRepOpReply(
- static_cast<MOSDRepOp*>(m),
- get_parent()->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
- reply->set_last_complete_ondisk(rm->last_complete);
- commit = reply;
- }
- else {
- assert(0);
- }
-
- commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- get_parent()->send_message_osd_cluster(
- rm->ackerosd, commit, get_osdmap()->get_epoch());
-
- log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
-}
-
-
-// ===========================================================
-
-void ReplicatedBackend::calc_head_subsets(
- ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
- const pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
-{
- dout(10) << "calc_head_subsets " << head
- << " clone_overlap " << snapset.clone_overlap << dendl;
-
- uint64_t size = obc->obs.oi.size;
- if (size)
- data_subset.insert(0, size);
-
- if (get_parent()->get_pool().allow_incomplete_clones()) {
- dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
- return;
- }
-
- if (!cct->_conf->osd_recover_clone_overlap) {
- dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
- return;
- }
-
-
- interval_set<uint64_t> cloning;
- interval_set<uint64_t> prev;
- if (size)
- prev.insert(0, size);
-
- for (int j=snapset.clones.size()-1; j>=0; j--) {
- hobject_t c = head;
- c.snap = snapset.clones[j];
- prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
- if (!missing.is_missing(c) && c < last_backfill) {
- dout(10) << "calc_head_subsets " << head << " has prev " << c
- << " overlap " << prev << dendl;
- clone_subsets[c] = prev;
- cloning.union_of(prev);
- break;
- }
- dout(10) << "calc_head_subsets " << head << " does not have prev " << c
- << " overlap " << prev << dendl;
- }
-
-
- if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
- dout(10) << "skipping clone, too many holes" << dendl;
- clone_subsets.clear();
- cloning.clear();
- }
-
- // what's left for us to push?
- data_subset.subtract(cloning);
-
- dout(10) << "calc_head_subsets " << head
- << " data_subset " << data_subset
- << " clone_subsets " << clone_subsets << dendl;
-}
-
-void ReplicatedBackend::calc_clone_subsets(
- SnapSet& snapset, const hobject_t& soid,
- const pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
-{
- dout(10) << "calc_clone_subsets " << soid
- << " clone_overlap " << snapset.clone_overlap << dendl;
-
- uint64_t size = snapset.clone_size[soid.snap];
- if (size)
- data_subset.insert(0, size);
-
- if (get_parent()->get_pool().allow_incomplete_clones()) {
- dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
- return;
- }
-
- if (!cct->_conf->osd_recover_clone_overlap) {
- dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
- return;
- }
-
- unsigned i;
- for (i=0; i < snapset.clones.size(); i++)
- if (snapset.clones[i] == soid.snap)
- break;
-
- // any overlap with next older clone?
- interval_set<uint64_t> cloning;
- interval_set<uint64_t> prev;
- if (size)
- prev.insert(0, size);
- for (int j=i-1; j>=0; j--) {
- hobject_t c = soid;
- c.snap = snapset.clones[j];
- prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
- if (!missing.is_missing(c) && c < last_backfill) {
- dout(10) << "calc_clone_subsets " << soid << " has prev " << c
- << " overlap " << prev << dendl;
- clone_subsets[c] = prev;
- cloning.union_of(prev);
- break;
- }
- dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
- << " overlap " << prev << dendl;
- }
-
- // overlap with next newest?
- interval_set<uint64_t> next;
- if (size)
- next.insert(0, size);
- for (unsigned j=i+1; j<snapset.clones.size(); j++) {
- hobject_t c = soid;
- c.snap = snapset.clones[j];
- next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
- if (!missing.is_missing(c) && c < last_backfill) {
- dout(10) << "calc_clone_subsets " << soid << " has next " << c
- << " overlap " << next << dendl;
- clone_subsets[c] = next;
- cloning.union_of(next);
- break;
- }
- dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
- << " overlap " << next << dendl;
- }
-
- if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
- dout(10) << "skipping clone, too many holes" << dendl;
- clone_subsets.clear();
- cloning.clear();
- }
-
-
- // what's left for us to push?
- data_subset.subtract(cloning);
-
- dout(10) << "calc_clone_subsets " << soid
- << " data_subset " << data_subset
- << " clone_subsets " << clone_subsets << dendl;
-}
-
-
/** pull - request object from a peer
*/
@@ -8785,98 +8147,6 @@ void ReplicatedBackend::calc_clone_subsets(
*/
enum { PULL_NONE, PULL_OTHER, PULL_YES };
-void ReplicatedBackend::prepare_pull(
- eversion_t v,
- const hobject_t& soid,
- ObjectContextRef headctx,
- RPGHandle *h)
-{
- assert(get_parent()->get_local_missing().missing.count(soid));
- eversion_t _v = get_parent()->get_local_missing().missing.find(
- soid)->second.need;
- assert(_v == v);
- const map<hobject_t, set<pg_shard_t> > &missing_loc(
- get_parent()->get_missing_loc_shards());
- const map<pg_shard_t, pg_missing_t > &peer_missing(
- get_parent()->get_shard_missing());
- map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid);
- assert(q != missing_loc.end());
- assert(!q->second.empty());
-
- // pick a pullee
- vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
- random_shuffle(shuffle.begin(), shuffle.end());
- vector<pg_shard_t>::iterator p = shuffle.begin();
- assert(get_osdmap()->is_up(p->osd));
- pg_shard_t fromshard = *p;
-
- dout(7) << "pull " << soid
- << " v " << v
- << " on osds " << *p
- << " from osd." << fromshard
- << dendl;
-
- assert(peer_missing.count(fromshard));
- const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
- if (pmissing.is_missing(soid, v)) {
- assert(pmissing.missing.find(soid)->second.have != v);
- dout(10) << "pulling soid " << soid << " from osd " << fromshard
- << " at version " << pmissing.missing.find(soid)->second.have
- << " rather than at version " << v << dendl;
- v = pmissing.missing.find(soid)->second.have;
- assert(get_parent()->get_log().get_log().objects.count(soid) &&
- (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
- pg_log_entry_t::LOST_REVERT) &&
- (get_parent()->get_log().get_log().objects.find(
- soid)->second->reverting_to ==
- v));
- }
-
- ObjectRecoveryInfo recovery_info;
-
- if (soid.is_snap()) {
- assert(!get_parent()->get_local_missing().is_missing(
- soid.get_head()) ||
- !get_parent()->get_local_missing().is_missing(
- soid.get_snapdir()));
- assert(headctx);
- // check snapset
- SnapSetContext *ssc = headctx->ssc;
- assert(ssc);
- dout(10) << " snapset " << ssc->snapset << dendl;
- calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
- get_info().last_backfill,
- recovery_info.copy_subset,
- recovery_info.clone_subset);
- // FIXME: this may overestimate if we are pulling multiple clones in parallel...
- dout(10) << " pulling " << recovery_info << dendl;
- } else {
- // pulling head or unversioned object.
- // always pull the whole thing.
- recovery_info.copy_subset.insert(0, (uint64_t)-1);
- recovery_info.size = ((uint64_t)-1);
- }
-
- h->pulls[fromshard].push_back(PullOp());
- PullOp &op = h->pulls[fromshard].back();
- op.soid = soid;
-
- op.recovery_info = recovery_info;
- op.recovery_info.soid = soid;
- op.recovery_info.version = v;
- op.recovery_progress.data_complete = false;
- op.recovery_progress.omap_complete = false;
- op.recovery_progress.data_recovered_to = 0;
- op.recovery_progress.first = true;
-
- assert(!pulling.count(soid));
- pull_from_peer[fromshard].insert(soid);
- PullInfo &pi = pulling[soid];
- pi.head_ctx = headctx;
- pi.recovery_info = op.recovery_info;
- pi.recovery_progress = op.recovery_progress;
-}
-
int ReplicatedPG::recover_missing(
const hobject_t &soid, eversion_t v,
int priority,
@@ -8966,694 +8236,6 @@ void ReplicatedPG::send_remove_op(
osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
}
-/*
- * intelligently push an object to a replica. make use of existing
- * clones/heads and dup data ranges where possible.
- */
-void ReplicatedBackend::prep_push_to_replica(
- ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
- PushOp *pop)
-{
- const object_info_t& oi = obc->obs.oi;
- uint64_t size = obc->obs.oi.size;
-
- dout(10) << __func__ << ": " << soid << " v" << oi.version
- << " size " << size << " to osd." << peer << dendl;
-
- map<hobject_t, interval_set<uint64_t> > clone_subsets;
- interval_set<uint64_t> data_subset;
-
- // are we doing a clone on the replica?
- if (soid.snap && soid.snap < CEPH_NOSNAP) {
- hobject_t head = soid;
- head.snap = CEPH_NOSNAP;
-
- // try to base push off of clones that succeed/preceed poid
- // we need the head (and current SnapSet) locally to do that.
- if (get_parent()->get_local_missing().is_missing(head)) {
- dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
- return prep_push(obc, soid, peer, pop);
- }
- hobject_t snapdir = head;
- snapdir.snap = CEPH_SNAPDIR;
- if (get_parent()->get_local_missing().is_missing(snapdir)) {
- dout(15) << "push_to_replica missing snapdir " << snapdir
- << ", pushing raw clone" << dendl;
- return prep_push(obc, soid, peer, pop);
- }
-
- SnapSetContext *ssc = obc->ssc;
- assert(ssc);
- dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
- map<pg_shard_t, pg_missing_t>::const_iterator pm =
- get_parent()->get_shard_missing().find(peer);
- assert(pm != get_parent()->get_shard_missing().end());
- map<pg_shard_t, pg_info_t>::const_iterator pi =
- get_parent()->get_shard_info().find(peer);
- assert(pi != get_parent()->get_shard_info().end());
- calc_clone_subsets(ssc->snapset, soid,
- pm->second,
- pi->second.last_backfill,
- data_subset, clone_subsets);
- } else if (soid.snap == CEPH_NOSNAP) {
- // pushing head or unversioned object.
- // base this on partially on replica's clones?
- SnapSetContext *ssc = obc->ssc;
- assert(ssc);
- dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
- calc_head_subsets(
- obc,
- ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
- get_parent()->get_shard_info().find(peer)->second.last_backfill,
- data_subset, clone_subsets);
- }
-
- prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
-}
-
-void ReplicatedBackend::prep_push(ObjectContextRef obc,
- const hobject_t& soid, pg_shard_t peer,
- PushOp *pop)
-{
- interval_set<uint64_t> data_subset;
- if (obc->obs.oi.size)
- data_subset.insert(0, obc->obs.oi.size);
- map<hobject_t, interval_set<uint64_t> > clone_subsets;
-
- prep_push(obc, soid, peer,
- obc->obs.oi.version, data_subset, clone_subsets,
- pop);
-}
-
-void ReplicatedBackend::prep_push(
- ObjectContextRef obc,
- const hobject_t& soid, pg_shard_t peer,
- eversion_t version,
- interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets,
- PushOp *pop)
-{
- get_parent()->begin_peer_recover(peer, soid);
- // take note.
- PushInfo &pi = pushing[soid][peer];
- pi.obc = obc;
- pi.recovery_info.size = obc->obs.oi.size;
- pi.recovery_info.copy_subset = data_subset;
- pi.recovery_info.clone_subset = clone_subsets;
- pi.recovery_info.soid = soid;
- pi.recovery_info.oi = obc->obs.oi;
- pi.recovery_info.version = version;
- pi.recovery_progress.first = true;
- pi.recovery_progress.data_recovered_to = 0;
- pi.recovery_progress.data_complete = 0;
- pi.recovery_progress.omap_complete = 0;
-
- ObjectRecoveryProgress new_progress;
- int r = build_push_op(pi.recovery_info,
- pi.recovery_progress,
- &new_progress,
- pop,
- &(pi.stat));
- assert(r == 0);
- pi.recovery_progress = new_progress;
-}
-
-int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
- const ObjectRecoveryInfo &recovery_info,
- ObjectRecoveryProgress progress)
-{
- // send op
- ceph_tid_t tid = get_parent()->get_tid();
- osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
-
- dout(10) << "send_pull_op " << recovery_info.soid << " "
- << recovery_info.version
- << " first=" << progress.first
- << " data " << recovery_info.copy_subset
- << " from osd." << peer
- << " tid " << tid << dendl;
-
- MOSDSubOp *subop = new MOSDSubOp(
- rid, parent->whoami_shard(),
- get_info().pgid, recovery_info.soid,
- CEPH_OSD_FLAG_ACK,
- get_osdmap()->get_epoch(), tid,
- recovery_info.version);
- subop->set_priority(prio);
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PULL;
- subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk;
- subop->recovery_info = recovery_info;
- subop->recovery_progress = progress;
-
- get_parent()->send_message_osd_cluster(
- peer.osd, subop, get_osdmap()->get_epoch());
-
- get_parent()->get_logger()->inc(l_osd_pull);
- return 0;
-}
-
-void ReplicatedBackend::submit_push_data(
- ObjectRecoveryInfo &recovery_info,
- bool first,
- bool complete,
- const interval_set<uint64_t> &intervals_included,
- bufferlist data_included,
- bufferlist omap_header,
- map<string, bufferlist> &attrs,
- map<string, bufferlist> &omap_entries,
- ObjectStore::Transaction *t)
-{
- coll_t target_coll;
- if (first && complete) {
- target_coll = coll;
- } else {
- dout(10) << __func__ << ": Creating oid "
- << recovery_info.soid << " in the temp collection" << dendl;
- add_temp_obj(recovery_info.soid);
- target_coll = get_temp_coll(t);
- }
-
- if (first) {
- get_parent()->on_local_recover_start(recovery_info.soid, t);
- t->remove(get_temp_coll(t), recovery_info.soid);
- t->touch(target_coll, recovery_info.soid);
- t->truncate(target_coll, recovery_info.soid, recovery_info.size);
- t->omap_setheader(target_coll, recovery_info.soid, omap_header);
- }
- uint64_t off = 0;
- for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
- p != intervals_included.end();
- ++p) {
- bufferlist bit;
- bit.substr_of(data_included, off, p.get_len());
- t->write(target_coll, recovery_info.soid,
- p.get_start(), p.get_len(), bit);
- off += p.get_len();
- }
-
- t->omap_setkeys(target_coll, recovery_info.soid,
- omap_entries);
- t->setattrs(target_coll, recovery_info.soid,
- attrs);
-
- if (complete) {
- if (!first) {
- dout(10) << __func__ << ": Removing oid "
- << recovery_info.soid << " from the temp collection" << dendl;
- clear_temp_obj(recovery_info.soid);
- t->collection_move(coll, target_coll, recovery_info.soid);
- }
-
- submit_push_complete(recovery_info, t);
- }
-}
-
-void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
- ObjectStore::Transaction *t)
-{
- for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
- recovery_info.clone_subset.begin();
- p != recovery_info.clone_subset.end();
- ++p) {
- for (interval_set<uint64_t>::const_iterator q = p->second.begin();
- q != p->second.end();
- ++q) {
- dout(15) << " clone_range " << p->first << " "
- << q.get_start() << "~" << q.get_len() << dendl;
- t->clone_range(coll, p->first, recovery_info.soid,
- q.get_start(), q.get_len(), q.get_start());
- }
- }
-}
-
-ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
- const ObjectRecoveryInfo& recovery_info,
- SnapSetContext *ssc)
-{
- if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
- return recovery_info;
- ObjectRecoveryInfo new_info = recovery_info;
- new_info.copy_subset.clear();
- new_info.clone_subset.clear();
- assert(ssc);
- calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
- get_info().last_backfill,
- new_info.copy_subset, new_info.clone_subset);
- return new_info;
-}
-
-bool ReplicatedBackend::handle_pull_response(
- pg_shard_t from, PushOp &pop, PullOp *response,
- list<hobject_t> *to_continue,
- ObjectStore::Transaction *t
- )
-{
- interval_set<uint64_t> data_included = pop.data_included;
- bufferlist data;
- data.claim(pop.data);
- dout(10) << "handle_pull_response "
- << pop.recovery_info
- << pop.after_progress
- << " data.size() is " << data.length()
- << " data_included: " << data_included
- << dendl;
- if (pop.version == eversion_t()) {
- // replica doesn't have it!
- _failed_push(from, pop.soid);
- return false;
- }
-
- hobject_t &hoid = pop.soid;
- assert((data_included.empty() && data.length() == 0) ||
- (!data_included.empty() && data.length() > 0));
-
- if (!pulling.count(hoid)) {
- return false;
- }
-
- PullInfo &pi = pulling[hoid];
- if (pi.recovery_info.size == (uint64_t(-1))) {
- pi.recovery_info.size = pop.recovery_info.size;
- pi.recovery_info.copy_subset.intersection_of(
- pop.recovery_info.copy_subset);
- }
-
- bool first = pi.recovery_progress.first;
- if (first) {
- pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
- pi.recovery_info.oi = pi.obc->obs.oi;
- pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
- }
-
-
- interval_set<uint64_t> usable_intervals;
- bufferlist usable_data;
- trim_pushed_data(pi.recovery_info.copy_subset,
- data_included,
- data,
- &usable_intervals,
- &usable_data);
- data_included = usable_intervals;
- data.claim(usable_data);
-
-
- pi.recovery_progress = pop.after_progress;
-
- pi.stat.num_bytes_recovered += data.length();
-
- dout(10) << "new recovery_info " << pi.recovery_info
- << ", new progress " << pi.recovery_progress
- << dendl;
-
- bool complete = pi.is_complete();
-
- submit_push_data(pi.recovery_info, first,
- complete,
- data_included, data,
- pop.omap_header,
- pop.attrset,
- pop.omap_entries,
- t);
-
- pi.stat.num_keys_recovered += pop.omap_entries.size();
-
- if (complete) {
- to_continue->push_back(hoid);
- pi.stat.num_objects_recovered++;
- get_parent()->on_local_recover(
- hoid, pi.stat, pi.recovery_info, pi.obc, t);
- pull_from_peer[from].erase(hoid);
- if (pull_from_peer[from].empty())
- pull_from_peer.erase(from);
- return false;
- } else {
- response->soid = pop.soid;
- response->recovery_info = pi.recovery_info;
- response->recovery_progress = pi.recovery_progress;
- return true;
- }
-}
-
-void ReplicatedBackend::handle_push(
- pg_shard_t from, PushOp &pop, PushReplyOp *response,
- ObjectStore::Transaction *t)
-{
- dout(10) << "handle_push "
- << pop.recovery_info
- << pop.after_progress
- << dendl;
- bufferlist data;
- data.claim(pop.data);
- bool first = pop.before_progress.first;
- bool complete = pop.after_progress.data_complete &&
- pop.after_progress.omap_complete;
-
- response->soid = pop.recovery_info.soid;
- submit_push_data(pop.recovery_info,
- first,
- complete,
- pop.data_included,
- data,
- pop.omap_header,
- pop.attrset,
- pop.omap_entries,
- t);
-
- if (complete)
- get_parent()->on_local_recover(
- pop.recovery_info.soid,
- object_stat_sum_t(),
- pop.recovery_info,
- ObjectContextRef(), // ok, is replica
- t);
-}
-
-void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
-{
- for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
- i != pushes.end();
- ++i) {
- ConnectionRef con = get_parent()->get_con_osd_cluster(
- i->first.osd,
- get_osdmap()->get_epoch());
- if (!con)
- continue;
- if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
- for (vector<PushOp>::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- dout(20) << __func__ << ": sending push (legacy) " << *j
- << " to osd." << i->first << dendl;
- send_push_op_legacy(prio, i->first, *j);
- }
- } else {
- vector<PushOp>::iterator j = i->second.begin();
- while (j != i->second.end()) {
- uint64_t cost = 0;
- uint64_t pushes = 0;
- MOSDPGPush *msg = new MOSDPGPush();
- msg->from = get_parent()->whoami_shard();
- msg->pgid = get_parent()->primary_spg_t();
- msg->map_epoch = get_osdmap()->get_epoch();
- msg->set_priority(prio);
- for (;
- (j != i->second.end() &&
- cost < cct->_conf->osd_max_push_cost &&
- pushes < cct->_conf->osd_max_push_objects) ;
- ++j) {
- dout(20) << __func__ << ": sending push " << *j
- << " to osd." << i->first << dendl;
- cost += j->cost(cct);
- pushes += 1;
- msg->pushes.push_back(*j);
- }
- msg->compute_cost(cct);
- get_parent()->send_message_osd_cluster(msg, con);
- }
- }
- }
-}
-
-void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
-{
- for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
- i != pulls.end();
- ++i) {
- ConnectionRef con = get_parent()->get_con_osd_cluster(
- i->first.osd,
- get_osdmap()->get_epoch());
- if (!con)
- continue;
- if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
- for (vector<PullOp>::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- dout(20) << __func__ << ": sending pull (legacy) " << *j
- << " to osd." << i->first << dendl;
- send_pull_legacy(
- prio,
- i->first,
- j->recovery_info,
- j->recovery_progress);
- }
- } else {
- dout(20) << __func__ << ": sending pulls " << i->second
- << " to osd." << i->first << dendl;
- MOSDPGPull *msg = new MOSDPGPull();
- msg->from = parent->whoami_shard();
- msg->set_priority(prio);
- msg->pgid = get_parent()->primary_spg_t();
- msg->map_epoch = get_osdmap()->get_epoch();
- msg->pulls.swap(i->second);
- msg->compute_cost(cct);
- get_parent()->send_message_osd_cluster(msg, con);
- }
- }
-}
-
-int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
- const ObjectRecoveryProgress &progress,
- ObjectRecoveryProgress *out_progress,
- PushOp *out_op,
- object_stat_sum_t *stat)
-{
- ObjectRecoveryProgress _new_progress;
- if (!out_progress)
- out_progress = &_new_progress;
- ObjectRecoveryProgress &new_progress = *out_progress;
- new_progress = progress;
-
- dout(7) << "send_push_op " << recovery_info.soid
- << " v " << recovery_info.version
- << " size " << recovery_info.size
- << " recovery_info: " << recovery_info
- << dendl;
-
- if (progress.first) {
- store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
- store->getattrs(coll, recovery_info.soid, out_op->attrset);
-
- // Debug
- bufferlist bv = out_op->attrset[OI_ATTR];
- object_info_t oi(bv);
-
- if (oi.version != recovery_info.version) {
- get_parent()->clog_error() << get_info().pgid << " push "
- << recovery_info.soid << " v "
- << recovery_info.version
- << " failed because local copy is "
- << oi.version << "\n";
- return -EINVAL;
- }
-
- new_progress.first = false;
- }
-
- uint64_t available = cct->_conf->osd_recovery_max_chunk;
- if (!progress.omap_complete) {
- ObjectMap::ObjectMapIterator iter =
- store->get_omap_iterator(coll,
- recovery_info.soid);
- for (iter->lower_bound(progress.omap_recovered_to);
- iter->valid();
- iter->next()) {
- if (!out_op->omap_entries.empty() &&
- available <= (iter->key().size() + iter->value().length()))
- break;
- out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
-
- if ((iter->key().size() + iter->value().length()) <= available)
- available -= (iter->key().size() + iter->value().length());
- else
- available = 0;
- }
- if (!iter->valid())
- new_progress.omap_complete = true;
- else
- new_progress.omap_recovered_to = iter->key();
- }
-
- if (available > 0) {
- if (!recovery_info.copy_subset.empty()) {
- interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
- bufferlist bl;
- int r = store->fiemap(coll, recovery_info.soid, 0,
- copy_subset.range_end(), bl);
- if (r >= 0) {
- interval_set<uint64_t> fiemap_included;
- map<uint64_t, uint64_t> m;
- bufferlist::iterator iter = bl.begin();
- ::decode(m, iter);
- map<uint64_t, uint64_t>::iterator miter;
- for (miter = m.begin(); miter != m.end(); ++miter) {
- fiemap_included.insert(miter->first, miter->second);
- }
-
- copy_subset.intersection_of(fiemap_included);
- }
- out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
- available);
- if (out_op->data_included.empty()) // zero filled section, skip to end!
- new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
- else
- new_progress.data_recovered_to = out_op->data_included.range_end();
- }
- } else {
- out_op->data_included.clear();
- }
-
- for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
- p != out_op->data_included.end();
- ++p) {
- bufferlist bit;
- store->read(coll, recovery_info.soid,
- p.get_start(), p.get_len(), bit);
- if (p.get_len() != bit.length()) {
- dout(10) << " extent " << p.get_start() << "~" << p.get_len()
- << " is actually " << p.get_start() << "~" << bit.length()
- << dendl;
- interval_set<uint64_t>::iterator save = p++;
- if (bit.length() == 0)
- out_op->data_included.erase(save); //Remove this empty interval
- else
- save.set_len(bit.length());
- // Remove any other intervals present
- while (p != out_op->data_included.end()) {
- interval_set<uint64_t>::iterator save = p++;
- out_op->data_included.erase(save);
- }
- new_progress.data_complete = true;
- out_op->data.claim_append(bit);
- break;
- }
- out_op->data.claim_append(bit);
- }
-
- if (new_progress.is_complete(recovery_info)) {
- new_progress.data_complete = true;
- if (stat)
- stat->num_objects_recovered++;
- }
-
- if (stat) {
- stat->num_keys_recovered += out_op->omap_entries.size();
- stat->num_bytes_recovered += out_op->data.length();
- }
-
- get_parent()->get_logger()->inc(l_osd_push);
- get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
-
- // send
- out_op->version = recovery_info.version;
- out_op->soid = recovery_info.soid;
- out_op->recovery_info = recovery_info;
- out_op->after_progress = new_progress;
- out_op->before_progress = progress;
- return 0;
-}
-
-int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop)
-{
- ceph_tid_t tid = get_parent()->get_tid();
- osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
- MOSDSubOp *subop = new MOSDSubOp(
- rid, parent->whoami_shard(),
- spg_t(get_info().pgid.pgid, peer.shard), pop.soid,
- 0, get_osdmap()->get_epoch(),
- tid, pop.recovery_info.version);
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-
- subop->set_priority(prio);
- subop->version = pop.version;
- subop->ops[0].indata.claim(pop.data);
- subop->data_included.swap(pop.data_included);
- subop->omap_header.claim(pop.omap_header);
- subop->omap_entries.swap(pop.omap_entries);
- subop->attrset.swap(pop.attrset);
- subop->recovery_info = pop.recovery_info;
- subop->current_progress = pop.before_progress;
- subop->recovery_progress = pop.after_progress;
-
- get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
- return 0;
-}
-
-void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
-{
- op->recovery_info.version = eversion_t();
- op->version = eversion_t();
- op->soid = soid;
-}
-
-void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
-{
- MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
- const hobject_t& soid = reply->get_poid();
- assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
- dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
- pg_shard_t peer = reply->from;
-
- op->mark_started();
-
- PushReplyOp rop;
- rop.soid = soid;
- PushOp pop;
- bool more = handle_push_reply(peer, rop, &pop);
- if (more)
- send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
-}
-
-bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply)
-{
- const hobject_t &soid = op.soid;
- if (pushing.count(soid) == 0) {
- dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
- << ", or anybody else"
- << dendl;
- return false;
- } else if (pushing[soid].count(peer) == 0) {
- dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
- << dendl;
- return false;
- } else {
- PushInfo *pi = &pushing[soid][peer];
-
- if (!pi->recovery_progress.data_complete) {
- dout(10) << " pushing more from, "
- << pi->recovery_progress.data_recovered_to
- << " of " << pi->recovery_info.copy_subset << dendl;
- ObjectRecoveryProgress new_progress;
- int r = build_push_op(
- pi->recovery_info,
- pi->recovery_progress, &new_progress, reply,
- &(pi->stat));
- assert(r == 0);
- pi->recovery_progress = new_progress;
- return true;
- } else {
- // done!
- get_parent()->on_peer_recover(
- peer, soid, pi->recovery_info,
- pi->stat);
-
- pushing[soid].erase(peer);
- pi = NULL;
-
-
- if (pushing[soid].empty()) {
- get_parent()->on_global_recover(soid);
- pushing.erase(soid);
- } else {
- dout(10) << "pushed " << soid << ", still waiting for push ack from "
- << pushing[soid].size() << " others" << dendl;
- }
- return false;
- }
- }
-}
-
void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
{
dout(10) << "finish_degraded_object " << oid << dendl;
@@ -9678,69 +8260,6 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
}
}
-/** op_pull
- * process request to pull an entire object.
- * NOTE: called from opqueue.
- */
-void ReplicatedBackend::sub_op_pull(OpRequestRef op)
-{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
- assert(m->get_type() == MSG_OSD_SUBOP);
-
- op->mark_started();
-
- const hobject_t soid = m->poid;
-
- dout(7) << "pull" << soid << " v " << m->version
- << " from " << m->get_source()
- << dendl;
-
- assert(!is_primary()); // we should be a replica or stray.
-
- PullOp pop;
- pop.soid = soid;
- pop.recovery_info = m->recovery_info;
- pop.recovery_progress = m->recovery_progress;
-
- PushOp reply;
- handle_pull(m->from, pop, &reply);
- send_push_op_legacy(
- m->get_priority(),
- m->from,
- reply);
-
- log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull);
-}
-
-void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
-{
- const hobject_t &soid = op.soid;
- struct stat st;
- int r = store->stat(coll, soid, &st);
- if (r != 0) {
- get_parent()->clog_error() << get_info().pgid << " "
- << peer << " tried to pull " << soid
- << " but got " << cpp_strerror(-r) << "\n";
- prep_push_op_blank(soid, reply);
- } else {
- ObjectRecoveryInfo &recovery_info = op.recovery_info;
- ObjectRecoveryProgress &progress = op.recovery_progress;
- if (progress.first && recovery_info.size == ((uint64_t)-1)) {
- // Adjust size and copy_subset
- recovery_info.size = st.st_size;
- recovery_info.copy_subset.clear();
- if (st.st_size)
- recovery_info.copy_subset.insert(0, st.st_size);
- assert(recovery_info.clone_subset.empty());
- }
-
- r = build_push_op(recovery_info, progress, 0, reply);
- if (r < 0)
- prep_push_op_blank(soid, reply);
- }
-}
-
-
void ReplicatedPG::_committed_pushed_object(
epoch_t epoch, eversion_t last_complete)
{
@@ -9827,113 +8346,6 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
}
-/**
- * trim received data to remove what we don't want
- *
- * @param copy_subset intervals we want
- * @param data_included intervals we got
- * @param data_recieved data we got
- * @param intervals_usable intervals we want to keep
- * @param data_usable matching data we want to keep
- */
-void ReplicatedBackend::trim_pushed_data(
- const interval_set<uint64_t> &copy_subset,
- const interval_set<uint64_t> &intervals_received,
- bufferlist data_received,
- interval_set<uint64_t> *intervals_usable,
- bufferlist *data_usable)
-{
- if (intervals_received.subset_of(copy_subset)) {
- *intervals_usable = intervals_received;
- *data_usable = data_received;
- return;
- }
-
- intervals_usable->intersection_of(copy_subset,
- intervals_received);
-
- uint64_t off = 0;
- for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
- p != intervals_received.end();
- ++p) {
- interval_set<uint64_t> x;
- x.insert(p.get_start(), p.get_len());
- x.intersection_of(copy_subset);
- for (interval_set<uint64_t>::const_iterator q = x.begin();
- q != x.end();
- ++q) {
- bufferlist sub;
- uint64_t data_off = off + (q.get_start() - p.get_start());
- sub.substr_of(data_received, data_off, q.get_len());
- data_usable->claim_append(sub);
- }
- off += p.get_len();
- }
-}
-
-/** op_push
- * NOTE: called from opqueue.
- */
-void ReplicatedBackend::sub_op_push(OpRequestRef op)
-{
- op->mark_started();
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
-
- PushOp pop;
- pop.soid = m->recovery_info.soid;
- pop.version = m->version;
- m->claim_data(pop.data);
- pop.data_included.swap(m->data_included);
- pop.omap_header.swap(m->omap_header);
- pop.omap_entries.swap(m->omap_entries);
- pop.attrset.swap(m->attrset);
- pop.recovery_info = m->recovery_info;
- pop.before_progress = m->current_progress;
- pop.after_progress = m->recovery_progress;
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
-
- if (is_primary()) {
- PullOp resp;
- RPGHandle *h = _open_recovery_op();
- list<hobject_t> to_continue;
- bool more = handle_pull_response(
- m->from, pop, &resp,
- &to_continue, t);
- if (more) {
- send_pull_legacy(
- m->get_priority(),
- m->from,
- resp.recovery_info,
- resp.recovery_progress);
- } else {
- C_ReplicatedBackend_OnPullComplete *c =
- new C_ReplicatedBackend_OnPullComplete(
- this,
- op->get_req()->get_priority());
- c->to_continue.swap(to_continue);
- t->register_on_complete(
- new PG_RecoveryQueueAsync(
- get_parent(),
- get_parent()->bless_gencontext(c)));
- }
- run_recovery_op(h, op->get_req()->get_priority());
- } else {
- PushReplyOp resp;
- MOSDSubOpReply *reply = new MOSDSubOpReply(
- m, parent->whoami_shard(), 0,
- get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- reply->set_priority(m->get_priority());
- assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
- handle_push(m->from, pop, &resp, t);
- t->register_on_complete(new PG_SendMessageOnConn(
- get_parent(), reply, m->get_connection()));
- }
- t->register_on_applied(
- new ObjectStore::C_DeleteTransaction(t));
- get_parent()->queue_transaction(t);
- return;
-}
-
void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid)
{
assert(recovering.count(soid));
@@ -9945,15 +8357,6 @@ void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid)
finish_recovery_op(soid); // close out this attempt,
}
-void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
-{
- get_parent()->failed_push(from, soid);
- pull_from_peer[from].erase(soid);
- if (pull_from_peer[from].empty())
- pull_from_peer.erase(from);
- pulling.erase(soid);
-}
-
void ReplicatedPG::sub_op_remove(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
@@ -9968,7 +8371,6 @@ void ReplicatedPG::sub_op_remove(OpRequestRef op)
assert(r == 0);
}
-
eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
{
eversion_t v;
@@ -10966,34 +9368,6 @@ int ReplicatedPG::prep_object_replica_pushes(
return 1;
}
-int ReplicatedBackend::start_pushes(
- const hobject_t &soid,
- ObjectContextRef obc,
- RPGHandle *h)
-{
- int pushes = 0;
- // who needs it?
- assert(get_parent()->get_actingbackfill_shards().size() > 0);
- for (set<pg_shard_t>::iterator i =
- get_parent()->get_actingbackfill_shards().begin();
- i != get_parent()->get_actingbackfill_shards().end();
- ++i) {
- if (*i == get_parent()->whoami_shard()) continue;
- pg_shard_t peer = *i;
- map<pg_shard_t, pg_missing_t>::const_iterator j =
- get_parent()->get_shard_missing().find(peer);
- assert(j != get_parent()->get_shard_missing().end());
- if (j->second.is_missing(soid)) {
- ++pushes;
- h->pushes[peer].push_back(PushOp());
- prep_push_to_replica(obc, soid, peer,
- &(h->pushes[peer].back())
- );
- }
- }
- return pushes;
-}
-
int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
{
dout(10) << __func__ << "(" << max << ")" << dendl;