summaryrefslogtreecommitdiff
path: root/src/osd/ReplicatedBackend.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/ReplicatedBackend.cc')
-rw-r--r--src/osd/ReplicatedBackend.cc1623
1 files changed, 1623 insertions, 0 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index 680c27a5e8b..b86d4d1e744 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -30,6 +30,35 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
return *_dout << pgb->get_parent()->gen_dbg_prefix();
}
+static void log_subop_stats(
+ PerfCounters *logger,
+ OpRequestRef op, int subop)
+{
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t latency = now;
+ latency -= op->get_req()->get_recv_stamp();
+
+
+ logger->inc(l_osd_sop);
+ logger->tinc(l_osd_sop_lat, latency);
+ logger->inc(subop);
+
+ if (subop != l_osd_sop_pull) {
+ uint64_t inb = op->get_req()->get_data().length();
+ logger->inc(l_osd_sop_inb, inb);
+ if (subop == l_osd_sop_w) {
+ logger->inc(l_osd_sop_w_inb, inb);
+ logger->tinc(l_osd_sop_w_lat, latency);
+ } else if (subop == l_osd_sop_push) {
+ logger->inc(l_osd_sop_push_inb, inb);
+ logger->tinc(l_osd_sop_push_lat, latency);
+ } else
+ assert("no support subop" == 0);
+ } else {
+ logger->tinc(l_osd_sop_pull_lat, latency);
+ }
+}
+
ReplicatedBackend::ReplicatedBackend(
PGBackend::Listener *pg,
coll_t coll,
@@ -797,3 +826,1597 @@ void ReplicatedBackend::be_deep_scrub(
o.omap_digest = oh.digest();
o.omap_digest_present = true;
}
+
+void ReplicatedBackend::_do_push(OpRequestRef op)
+{
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PUSH);
+ pg_shard_t from = m->from;
+
+ vector<PushReplyOp> replies;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ for (vector<PushOp>::iterator i = m->pushes.begin();
+ i != m->pushes.end();
+ ++i) {
+ replies.push_back(PushReplyOp());
+ handle_push(from, *i, &(replies.back()), t);
+ }
+
+ MOSDPGPushReply *reply = new MOSDPGPushReply;
+ reply->from = get_parent()->whoami_shard();
+ reply->set_priority(m->get_priority());
+ reply->pgid = get_info().pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->replies.swap(replies);
+ reply->compute_cost(cct);
+
+ t->register_on_complete(
+ new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
+
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
+ get_parent()->queue_transaction(t);
+}
+
+struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
+ ReplicatedBackend *bc;
+ list<hobject_t> to_continue;
+ int priority;
+ C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+ : bc(bc), priority(priority) {}
+
+ void finish(ThreadPool::TPHandle &handle) {
+ ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+ for (list<hobject_t>::iterator i =
+ to_continue.begin();
+ i != to_continue.end();
+ ++i) {
+ map<hobject_t, ReplicatedBackend::PullInfo>::iterator j =
+ bc->pulling.find(*i);
+ assert(j != bc->pulling.end());
+ if (!bc->start_pushes(*i, j->second.obc, h)) {
+ bc->get_parent()->on_global_recover(
+ *i);
+ }
+ bc->pulling.erase(*i);
+ handle.reset_tp_timeout();
+ }
+ bc->run_recovery_op(h, priority);
+ }
+};
+
+void ReplicatedBackend::_do_pull_response(OpRequestRef op)
+{
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PUSH);
+ pg_shard_t from = m->from;
+
+ vector<PullOp> replies(1);
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ list<hobject_t> to_continue;
+ for (vector<PushOp>::iterator i = m->pushes.begin();
+ i != m->pushes.end();
+ ++i) {
+ bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
+ if (more)
+ replies.push_back(PullOp());
+ }
+ if (!to_continue.empty()) {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ m->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new PG_RecoveryQueueAsync(
+ get_parent(),
+ get_parent()->bless_gencontext(c)));
+ }
+ replies.erase(replies.end() - 1);
+
+ if (replies.size()) {
+ MOSDPGPull *reply = new MOSDPGPull;
+ reply->from = parent->whoami_shard();
+ reply->set_priority(m->get_priority());
+ reply->pgid = get_info().pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->pulls.swap(replies);
+ reply->compute_cost(cct);
+
+ t->register_on_complete(
+ new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
+ }
+
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
+ get_parent()->queue_transaction(t);
+}
+
+void ReplicatedBackend::do_pull(OpRequestRef op)
+{
+ MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PULL);
+ pg_shard_t from = m->from;
+
+ map<pg_shard_t, vector<PushOp> > replies;
+ for (vector<PullOp>::iterator i = m->pulls.begin();
+ i != m->pulls.end();
+ ++i) {
+ replies[from].push_back(PushOp());
+ handle_pull(from, *i, &(replies[from].back()));
+ }
+ send_pushes(m->get_priority(), replies);
+}
+
+void ReplicatedBackend::do_push_reply(OpRequestRef op)
+{
+ MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
+ pg_shard_t from = m->from;
+
+ vector<PushOp> replies(1);
+ for (vector<PushReplyOp>::iterator i = m->replies.begin();
+ i != m->replies.end();
+ ++i) {
+ bool more = handle_push_reply(from, *i, &(replies.back()));
+ if (more)
+ replies.push_back(PushOp());
+ }
+ replies.erase(replies.end() - 1);
+
+ map<pg_shard_t, vector<PushOp> > _replies;
+ _replies[from].swap(replies);
+ send_pushes(m->get_priority(), _replies);
+}
+
+template<typename T, int MSGTYPE>
+Message * ReplicatedBackend::generate_subop(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ const vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_hist,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t,
+ pg_shard_t peer,
+ const pg_info_t &pinfo)
+{
+ int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
+ // forward the write/update/whatever
+ T *wr = new T(
+ reqid, parent->whoami_shard(),
+ spg_t(get_info().pgid.pgid, peer.shard),
+ soid, acks_wanted,
+ get_osdmap()->get_epoch(),
+ tid, at_version);
+
+ // ship resulting transaction, log entries, and pg_stats
+ if (!parent->should_send_op(peer, soid)) {
+ dout(10) << "issue_repop shipping empty opt to osd." << peer
+ <<", object " << soid
+ << " beyond MAX(last_backfill_started "
+ << ", pinfo.last_backfill "
+ << pinfo.last_backfill << ")" << dendl;
+ ObjectStore::Transaction t;
+ t.set_use_tbl(op_t->get_use_tbl());
+ ::encode(t, wr->get_data());
+ } else {
+ ::encode(*op_t, wr->get_data());
+ }
+
+ ::encode(log_entries, wr->logbl);
+
+ if (pinfo.is_incomplete())
+ wr->pg_stats = pinfo.stats; // reflects backfill progress
+ else
+ wr->pg_stats = get_info().stats;
+
+ wr->pg_trim_to = pg_trim_to;
+ wr->pg_trim_rollback_to = pg_trim_rollback_to;
+
+ wr->new_temp_oid = new_temp_oid;
+ wr->discard_temp_oid = discard_temp_oid;
+ wr->updated_hit_set_history = hset_hist;
+ return wr;
+}
+
+void ReplicatedBackend::issue_op(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ const vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_hist,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t)
+{
+
+ if (parent->get_actingbackfill_shards().size() > 1) {
+ ostringstream ss;
+ set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
+ replicas.erase(parent->whoami_shard());
+ ss << "waiting for subops from " << replicas;
+ if (op->op)
+ op->op->mark_sub_op_sent(ss.str());
+ }
+ for (set<pg_shard_t>::const_iterator i =
+ parent->get_actingbackfill_shards().begin();
+ i != parent->get_actingbackfill_shards().end();
+ ++i) {
+ if (*i == parent->whoami_shard()) continue;
+ pg_shard_t peer = *i;
+ const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
+
+ Message *wr;
+ uint64_t min_features = parent->min_peer_features();
+ if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
+ dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
+ wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
+ soid,
+ at_version,
+ tid,
+ reqid,
+ pg_trim_to,
+ pg_trim_rollback_to,
+ new_temp_oid,
+ discard_temp_oid,
+ log_entries,
+ hset_hist,
+ op,
+ op_t,
+ peer,
+ pinfo);
+ } else {
+ wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
+ soid,
+ at_version,
+ tid,
+ reqid,
+ pg_trim_to,
+ pg_trim_rollback_to,
+ new_temp_oid,
+ discard_temp_oid,
+ log_entries,
+ hset_hist,
+ op,
+ op_t,
+ peer,
+ pinfo);
+ }
+
+ get_parent()->send_message_osd_cluster(
+ peer.osd, wr, get_osdmap()->get_epoch());
+ }
+}
+
+// sub op modify
+void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
+ Message *m = op->get_req();
+ int msg_type = m->get_type();
+ if (msg_type == MSG_OSD_SUBOP) {
+ sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ } else if (msg_type == MSG_OSD_REPOP) {
+ sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
+ } else {
+ assert(0);
+ }
+}
+
+template<typename T, int MSGTYPE>
+void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
+{
+ T *m = static_cast<T *>(op->get_req());
+ int msg_type = m->get_type();
+ assert(MSGTYPE == msg_type);
+ assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
+
+ const hobject_t& soid = m->poid;
+
+ dout(10) << "sub_op_modify trans"
+ << " " << soid
+ << " v " << m->version
+ << (m->logbl.length() ? " (transaction)" : " (parallel exec")
+ << " " << m->logbl.length()
+ << dendl;
+
+ // sanity checks
+ assert(m->map_epoch >= get_info().history.same_interval_since);
+
+ // we better not be missing this.
+ assert(!parent->get_log().get_missing().is_missing(soid));
+
+ int ackerosd = m->get_source().num();
+
+ op->mark_started();
+
+ RepModifyRef rm(new RepModify);
+ rm->op = op;
+ rm->ackerosd = ackerosd;
+ rm->last_complete = get_info().last_complete;
+ rm->epoch_started = get_osdmap()->get_epoch();
+
+ assert(m->logbl.length());
+ // shipped transaction and log entries
+ vector<pg_log_entry_t> log;
+
+ bufferlist::iterator p = m->get_data().begin();
+ ::decode(rm->opt, p);
+ rm->localt.set_use_tbl(rm->opt.get_use_tbl());
+
+ if (m->new_temp_oid != hobject_t()) {
+ dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
+ add_temp_obj(m->new_temp_oid);
+ get_temp_coll(&rm->localt);
+ }
+ if (m->discard_temp_oid != hobject_t()) {
+ dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
+ if (rm->opt.empty()) {
+ dout(10) << __func__ << ": removing object " << m->discard_temp_oid
+ << " since we won't get the transaction" << dendl;
+ rm->localt.remove(temp_coll, m->discard_temp_oid);
+ }
+ clear_temp_obj(m->discard_temp_oid);
+ }
+
+ p = m->logbl.begin();
+ ::decode(log, p);
+ rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ bool update_snaps = false;
+ if (!rm->opt.empty()) {
+ // If the opt is non-empty, we infer we are before
+ // last_backfill (according to the primary, not our
+ // not-quite-accurate value), and should update the
+ // collections now. Otherwise, we do it later on push.
+ update_snaps = true;
+ }
+ parent->update_stats(m->pg_stats);
+ parent->log_operation(
+ log,
+ m->updated_hit_set_history,
+ m->pg_trim_to,
+ m->pg_trim_rollback_to,
+ update_snaps,
+ &(rm->localt));
+
+ rm->bytes_written = rm->opt.get_encoded_bytes();
+
+ op->mark_started();
+
+ rm->localt.append(rm->opt);
+ rm->localt.register_on_commit(
+ parent->bless_context(
+ new C_OSD_RepModifyCommit(this, rm)));
+ rm->localt.register_on_applied(
+ parent->bless_context(
+ new C_OSD_RepModifyApply(this, rm)));
+ parent->queue_transaction(&(rm->localt), op);
+ // op is cleaned up by oncommit/onapply when both are executed
+}
+
+void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
+{
+ rm->op->mark_event("sub_op_applied");
+ rm->applied = true;
+
+ dout(10) << "sub_op_modify_applied on " << rm << " op "
+ << *rm->op->get_req() << dendl;
+ Message *m = rm->op->get_req();
+
+ Message *ack = NULL;
+ eversion_t version;
+
+ if (m->get_type() == MSG_OSD_SUBOP) {
+ // doesn't have CLIENT SUBOP feature ,use Subop
+ MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
+ version = req->version;
+ if (!rm->committed)
+ ack = new MOSDSubOpReply(
+ req, parent->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ } else if (m->get_type() == MSG_OSD_REPOP) {
+ MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
+ version = req->version;
+ if (!rm->committed)
+ ack = new MOSDRepOpReply(
+ static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ } else {
+ assert(0);
+ }
+
+ // send ack to acker only if we haven't sent a commit already
+ if (ack) {
+ ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+ get_parent()->send_message_osd_cluster(
+ rm->ackerosd, ack, get_osdmap()->get_epoch());
+ }
+
+ parent->op_applied(version);
+}
+
+void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
+{
+ rm->op->mark_commit_sent();
+ rm->committed = true;
+
+ // send commit.
+ dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
+ << ", sending commit to osd." << rm->ackerosd
+ << dendl;
+
+ assert(get_osdmap()->is_up(rm->ackerosd));
+ get_parent()->update_last_complete_ondisk(rm->last_complete);
+
+ Message *m = rm->op->get_req();
+ Message *commit;
+ if (m->get_type() == MSG_OSD_SUBOP) {
+ // doesn't have CLIENT SUBOP feature ,use Subop
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ static_cast<MOSDSubOp*>(m),
+ get_parent()->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(rm->last_complete);
+ commit = reply;
+ } else if (m->get_type() == MSG_OSD_REPOP) {
+ MOSDRepOpReply *reply = new MOSDRepOpReply(
+ static_cast<MOSDRepOp*>(m),
+ get_parent()->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(rm->last_complete);
+ commit = reply;
+ }
+ else {
+ assert(0);
+ }
+
+ commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+ get_parent()->send_message_osd_cluster(
+ rm->ackerosd, commit, get_osdmap()->get_epoch());
+
+ log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
+}
+
+
+// ===========================================================
+
+void ReplicatedBackend::calc_head_subsets(
+ ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+ dout(10) << "calc_head_subsets " << head
+ << " clone_overlap " << snapset.clone_overlap << dendl;
+
+ uint64_t size = obc->obs.oi.size;
+ if (size)
+ data_subset.insert(0, size);
+
+ if (get_parent()->get_pool().allow_incomplete_clones()) {
+ dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
+ return;
+ }
+
+ if (!cct->_conf->osd_recover_clone_overlap) {
+ dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
+ return;
+ }
+
+
+ interval_set<uint64_t> cloning;
+ interval_set<uint64_t> prev;
+ if (size)
+ prev.insert(0, size);
+
+ for (int j=snapset.clones.size()-1; j>=0; j--) {
+ hobject_t c = head;
+ c.snap = snapset.clones[j];
+ prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
+ if (!missing.is_missing(c) && c < last_backfill) {
+ dout(10) << "calc_head_subsets " << head << " has prev " << c
+ << " overlap " << prev << dendl;
+ clone_subsets[c] = prev;
+ cloning.union_of(prev);
+ break;
+ }
+ dout(10) << "calc_head_subsets " << head << " does not have prev " << c
+ << " overlap " << prev << dendl;
+ }
+
+
+ if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+ dout(10) << "skipping clone, too many holes" << dendl;
+ clone_subsets.clear();
+ cloning.clear();
+ }
+
+ // what's left for us to push?
+ data_subset.subtract(cloning);
+
+ dout(10) << "calc_head_subsets " << head
+ << " data_subset " << data_subset
+ << " clone_subsets " << clone_subsets << dendl;
+}
+
+void ReplicatedBackend::calc_clone_subsets(
+ SnapSet& snapset, const hobject_t& soid,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+ dout(10) << "calc_clone_subsets " << soid
+ << " clone_overlap " << snapset.clone_overlap << dendl;
+
+ uint64_t size = snapset.clone_size[soid.snap];
+ if (size)
+ data_subset.insert(0, size);
+
+ if (get_parent()->get_pool().allow_incomplete_clones()) {
+ dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
+ return;
+ }
+
+ if (!cct->_conf->osd_recover_clone_overlap) {
+ dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
+ return;
+ }
+
+ unsigned i;
+ for (i=0; i < snapset.clones.size(); i++)
+ if (snapset.clones[i] == soid.snap)
+ break;
+
+ // any overlap with next older clone?
+ interval_set<uint64_t> cloning;
+ interval_set<uint64_t> prev;
+ if (size)
+ prev.insert(0, size);
+ for (int j=i-1; j>=0; j--) {
+ hobject_t c = soid;
+ c.snap = snapset.clones[j];
+ prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
+ if (!missing.is_missing(c) && c < last_backfill) {
+ dout(10) << "calc_clone_subsets " << soid << " has prev " << c
+ << " overlap " << prev << dendl;
+ clone_subsets[c] = prev;
+ cloning.union_of(prev);
+ break;
+ }
+ dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
+ << " overlap " << prev << dendl;
+ }
+
+ // overlap with next newest?
+ interval_set<uint64_t> next;
+ if (size)
+ next.insert(0, size);
+ for (unsigned j=i+1; j<snapset.clones.size(); j++) {
+ hobject_t c = soid;
+ c.snap = snapset.clones[j];
+ next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
+ if (!missing.is_missing(c) && c < last_backfill) {
+ dout(10) << "calc_clone_subsets " << soid << " has next " << c
+ << " overlap " << next << dendl;
+ clone_subsets[c] = next;
+ cloning.union_of(next);
+ break;
+ }
+ dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
+ << " overlap " << next << dendl;
+ }
+
+ if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+ dout(10) << "skipping clone, too many holes" << dendl;
+ clone_subsets.clear();
+ cloning.clear();
+ }
+
+
+ // what's left for us to push?
+ data_subset.subtract(cloning);
+
+ dout(10) << "calc_clone_subsets " << soid
+ << " data_subset " << data_subset
+ << " clone_subsets " << clone_subsets << dendl;
+}
+
+void ReplicatedBackend::prepare_pull(
+ eversion_t v,
+ const hobject_t& soid,
+ ObjectContextRef headctx,
+ RPGHandle *h)
+{
+ assert(get_parent()->get_local_missing().missing.count(soid));
+ eversion_t _v = get_parent()->get_local_missing().missing.find(
+ soid)->second.need;
+ assert(_v == v);
+ const map<hobject_t, set<pg_shard_t> > &missing_loc(
+ get_parent()->get_missing_loc_shards());
+ const map<pg_shard_t, pg_missing_t > &peer_missing(
+ get_parent()->get_shard_missing());
+ map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid);
+ assert(q != missing_loc.end());
+ assert(!q->second.empty());
+
+ // pick a pullee
+ vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
+ random_shuffle(shuffle.begin(), shuffle.end());
+ vector<pg_shard_t>::iterator p = shuffle.begin();
+ assert(get_osdmap()->is_up(p->osd));
+ pg_shard_t fromshard = *p;
+
+ dout(7) << "pull " << soid
+ << " v " << v
+ << " on osds " << *p
+ << " from osd." << fromshard
+ << dendl;
+
+ assert(peer_missing.count(fromshard));
+ const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
+ if (pmissing.is_missing(soid, v)) {
+ assert(pmissing.missing.find(soid)->second.have != v);
+ dout(10) << "pulling soid " << soid << " from osd " << fromshard
+ << " at version " << pmissing.missing.find(soid)->second.have
+ << " rather than at version " << v << dendl;
+ v = pmissing.missing.find(soid)->second.have;
+ assert(get_parent()->get_log().get_log().objects.count(soid) &&
+ (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
+ pg_log_entry_t::LOST_REVERT) &&
+ (get_parent()->get_log().get_log().objects.find(
+ soid)->second->reverting_to ==
+ v));
+ }
+
+ ObjectRecoveryInfo recovery_info;
+
+ if (soid.is_snap()) {
+ assert(!get_parent()->get_local_missing().is_missing(
+ soid.get_head()) ||
+ !get_parent()->get_local_missing().is_missing(
+ soid.get_snapdir()));
+ assert(headctx);
+ // check snapset
+ SnapSetContext *ssc = headctx->ssc;
+ assert(ssc);
+ dout(10) << " snapset " << ssc->snapset << dendl;
+ calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
+ recovery_info.copy_subset,
+ recovery_info.clone_subset);
+ // FIXME: this may overestimate if we are pulling multiple clones in parallel...
+ dout(10) << " pulling " << recovery_info << dendl;
+ } else {
+ // pulling head or unversioned object.
+ // always pull the whole thing.
+ recovery_info.copy_subset.insert(0, (uint64_t)-1);
+ recovery_info.size = ((uint64_t)-1);
+ }
+
+ h->pulls[fromshard].push_back(PullOp());
+ PullOp &op = h->pulls[fromshard].back();
+ op.soid = soid;
+
+ op.recovery_info = recovery_info;
+ op.recovery_info.soid = soid;
+ op.recovery_info.version = v;
+ op.recovery_progress.data_complete = false;
+ op.recovery_progress.omap_complete = false;
+ op.recovery_progress.data_recovered_to = 0;
+ op.recovery_progress.first = true;
+
+ assert(!pulling.count(soid));
+ pull_from_peer[fromshard].insert(soid);
+ PullInfo &pi = pulling[soid];
+ pi.head_ctx = headctx;
+ pi.recovery_info = op.recovery_info;
+ pi.recovery_progress = op.recovery_progress;
+}
+
+/*
+ * intelligently push an object to a replica. make use of existing
+ * clones/heads and dup data ranges where possible.
+ */
+void ReplicatedBackend::prep_push_to_replica(
+ ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
+ PushOp *pop)
+{
+ const object_info_t& oi = obc->obs.oi;
+ uint64_t size = obc->obs.oi.size;
+
+ dout(10) << __func__ << ": " << soid << " v" << oi.version
+ << " size " << size << " to osd." << peer << dendl;
+
+ map<hobject_t, interval_set<uint64_t> > clone_subsets;
+ interval_set<uint64_t> data_subset;
+
+ // are we doing a clone on the replica?
+ if (soid.snap && soid.snap < CEPH_NOSNAP) {
+ hobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
+
+ // try to base push off of clones that succeed/preceed poid
+ // we need the head (and current SnapSet) locally to do that.
+ if (get_parent()->get_local_missing().is_missing(head)) {
+ dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
+ return prep_push(obc, soid, peer, pop);
+ }
+ hobject_t snapdir = head;
+ snapdir.snap = CEPH_SNAPDIR;
+ if (get_parent()->get_local_missing().is_missing(snapdir)) {
+ dout(15) << "push_to_replica missing snapdir " << snapdir
+ << ", pushing raw clone" << dendl;
+ return prep_push(obc, soid, peer, pop);
+ }
+
+ SnapSetContext *ssc = obc->ssc;
+ assert(ssc);
+ dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+ map<pg_shard_t, pg_missing_t>::const_iterator pm =
+ get_parent()->get_shard_missing().find(peer);
+ assert(pm != get_parent()->get_shard_missing().end());
+ map<pg_shard_t, pg_info_t>::const_iterator pi =
+ get_parent()->get_shard_info().find(peer);
+ assert(pi != get_parent()->get_shard_info().end());
+ calc_clone_subsets(ssc->snapset, soid,
+ pm->second,
+ pi->second.last_backfill,
+ data_subset, clone_subsets);
+ } else if (soid.snap == CEPH_NOSNAP) {
+ // pushing head or unversioned object.
+ // base this on partially on replica's clones?
+ SnapSetContext *ssc = obc->ssc;
+ assert(ssc);
+ dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+ calc_head_subsets(
+ obc,
+ ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
+ get_parent()->get_shard_info().find(peer)->second.last_backfill,
+ data_subset, clone_subsets);
+ }
+
+ prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
+}
+
+void ReplicatedBackend::prep_push(ObjectContextRef obc,
+ const hobject_t& soid, pg_shard_t peer,
+ PushOp *pop)
+{
+ interval_set<uint64_t> data_subset;
+ if (obc->obs.oi.size)
+ data_subset.insert(0, obc->obs.oi.size);
+ map<hobject_t, interval_set<uint64_t> > clone_subsets;
+
+ prep_push(obc, soid, peer,
+ obc->obs.oi.version, data_subset, clone_subsets,
+ pop);
+}
+
+void ReplicatedBackend::prep_push(
+ ObjectContextRef obc,
+ const hobject_t& soid, pg_shard_t peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+ PushOp *pop)
+{
+ get_parent()->begin_peer_recover(peer, soid);
+ // take note.
+ PushInfo &pi = pushing[soid][peer];
+ pi.obc = obc;
+ pi.recovery_info.size = obc->obs.oi.size;
+ pi.recovery_info.copy_subset = data_subset;
+ pi.recovery_info.clone_subset = clone_subsets;
+ pi.recovery_info.soid = soid;
+ pi.recovery_info.oi = obc->obs.oi;
+ pi.recovery_info.version = version;
+ pi.recovery_progress.first = true;
+ pi.recovery_progress.data_recovered_to = 0;
+ pi.recovery_progress.data_complete = 0;
+ pi.recovery_progress.omap_complete = 0;
+
+ ObjectRecoveryProgress new_progress;
+ int r = build_push_op(pi.recovery_info,
+ pi.recovery_progress,
+ &new_progress,
+ pop,
+ &(pi.stat));
+ assert(r == 0);
+ pi.recovery_progress = new_progress;
+}
+
+int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectRecoveryProgress progress)
+{
+ // send op
+ ceph_tid_t tid = get_parent()->get_tid();
+ osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
+
+ dout(10) << "send_pull_op " << recovery_info.soid << " "
+ << recovery_info.version
+ << " first=" << progress.first
+ << " data " << recovery_info.copy_subset
+ << " from osd." << peer
+ << " tid " << tid << dendl;
+
+ MOSDSubOp *subop = new MOSDSubOp(
+ rid, parent->whoami_shard(),
+ get_info().pgid, recovery_info.soid,
+ CEPH_OSD_FLAG_ACK,
+ get_osdmap()->get_epoch(), tid,
+ recovery_info.version);
+ subop->set_priority(prio);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PULL;
+ subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk;
+ subop->recovery_info = recovery_info;
+ subop->recovery_progress = progress;
+
+ get_parent()->send_message_osd_cluster(
+ peer.osd, subop, get_osdmap()->get_epoch());
+
+ get_parent()->get_logger()->inc(l_osd_pull);
+ return 0;
+}
+
+void ReplicatedBackend::submit_push_data(
+ ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ bufferlist omap_header,
+ map<string, bufferlist> &attrs,
+ map<string, bufferlist> &omap_entries,
+ ObjectStore::Transaction *t)
+{
+ coll_t target_coll;
+ if (first && complete) {
+ target_coll = coll;
+ } else {
+ dout(10) << __func__ << ": Creating oid "
+ << recovery_info.soid << " in the temp collection" << dendl;
+ add_temp_obj(recovery_info.soid);
+ target_coll = get_temp_coll(t);
+ }
+
+ if (first) {
+ get_parent()->on_local_recover_start(recovery_info.soid, t);
+ t->remove(get_temp_coll(t), recovery_info.soid);
+ t->touch(target_coll, recovery_info.soid);
+ t->truncate(target_coll, recovery_info.soid, recovery_info.size);
+ t->omap_setheader(target_coll, recovery_info.soid, omap_header);
+ }
+ uint64_t off = 0;
+ for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
+ p != intervals_included.end();
+ ++p) {
+ bufferlist bit;
+ bit.substr_of(data_included, off, p.get_len());
+ t->write(target_coll, recovery_info.soid,
+ p.get_start(), p.get_len(), bit);
+ off += p.get_len();
+ }
+
+ t->omap_setkeys(target_coll, recovery_info.soid,
+ omap_entries);
+ t->setattrs(target_coll, recovery_info.soid,
+ attrs);
+
+ if (complete) {
+ if (!first) {
+ dout(10) << __func__ << ": Removing oid "
+ << recovery_info.soid << " from the temp collection" << dendl;
+ clear_temp_obj(recovery_info.soid);
+ t->collection_move(coll, target_coll, recovery_info.soid);
+ }
+
+ submit_push_complete(recovery_info, t);
+ }
+}
+
+void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t)
+{
+ for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
+ recovery_info.clone_subset.begin();
+ p != recovery_info.clone_subset.end();
+ ++p) {
+ for (interval_set<uint64_t>::const_iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ dout(15) << " clone_range " << p->first << " "
+ << q.get_start() << "~" << q.get_len() << dendl;
+ t->clone_range(coll, p->first, recovery_info.soid,
+ q.get_start(), q.get_len(), q.get_start());
+ }
+ }
+}
+
+ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
+ const ObjectRecoveryInfo& recovery_info,
+ SnapSetContext *ssc)
+{
+ if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
+ return recovery_info;
+ ObjectRecoveryInfo new_info = recovery_info;
+ new_info.copy_subset.clear();
+ new_info.clone_subset.clear();
+ assert(ssc);
+ calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
+ new_info.copy_subset, new_info.clone_subset);
+ return new_info;
+}
+
+bool ReplicatedBackend::handle_pull_response(
+ pg_shard_t from, PushOp &pop, PullOp *response,
+ list<hobject_t> *to_continue,
+ ObjectStore::Transaction *t
+ )
+{
+ interval_set<uint64_t> data_included = pop.data_included;
+ bufferlist data;
+ data.claim(pop.data);
+ dout(10) << "handle_pull_response "
+ << pop.recovery_info
+ << pop.after_progress
+ << " data.size() is " << data.length()
+ << " data_included: " << data_included
+ << dendl;
+ if (pop.version == eversion_t()) {
+ // replica doesn't have it!
+ _failed_push(from, pop.soid);
+ return false;
+ }
+
+ hobject_t &hoid = pop.soid;
+ assert((data_included.empty() && data.length() == 0) ||
+ (!data_included.empty() && data.length() > 0));
+
+ if (!pulling.count(hoid)) {
+ return false;
+ }
+
+ PullInfo &pi = pulling[hoid];
+ if (pi.recovery_info.size == (uint64_t(-1))) {
+ pi.recovery_info.size = pop.recovery_info.size;
+ pi.recovery_info.copy_subset.intersection_of(
+ pop.recovery_info.copy_subset);
+ }
+
+ bool first = pi.recovery_progress.first;
+ if (first) {
+ pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
+ pi.recovery_info.oi = pi.obc->obs.oi;
+ pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
+ }
+
+
+ interval_set<uint64_t> usable_intervals;
+ bufferlist usable_data;
+ trim_pushed_data(pi.recovery_info.copy_subset,
+ data_included,
+ data,
+ &usable_intervals,
+ &usable_data);
+ data_included = usable_intervals;
+ data.claim(usable_data);
+
+
+ pi.recovery_progress = pop.after_progress;
+
+ pi.stat.num_bytes_recovered += data.length();
+
+ dout(10) << "new recovery_info " << pi.recovery_info
+ << ", new progress " << pi.recovery_progress
+ << dendl;
+
+ bool complete = pi.is_complete();
+
+ submit_push_data(pi.recovery_info, first,
+ complete,
+ data_included, data,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
+ t);
+
+ pi.stat.num_keys_recovered += pop.omap_entries.size();
+
+ if (complete) {
+ to_continue->push_back(hoid);
+ pi.stat.num_objects_recovered++;
+ get_parent()->on_local_recover(
+ hoid, pi.stat, pi.recovery_info, pi.obc, t);
+ pull_from_peer[from].erase(hoid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
+ return false;
+ } else {
+ response->soid = pop.soid;
+ response->recovery_info = pi.recovery_info;
+ response->recovery_progress = pi.recovery_progress;
+ return true;
+ }
+}
+
+void ReplicatedBackend::handle_push(
+ pg_shard_t from, PushOp &pop, PushReplyOp *response,
+ ObjectStore::Transaction *t)
+{
+ dout(10) << "handle_push "
+ << pop.recovery_info
+ << pop.after_progress
+ << dendl;
+ bufferlist data;
+ data.claim(pop.data);
+ bool first = pop.before_progress.first;
+ bool complete = pop.after_progress.data_complete &&
+ pop.after_progress.omap_complete;
+
+ response->soid = pop.recovery_info.soid;
+ submit_push_data(pop.recovery_info,
+ first,
+ complete,
+ pop.data_included,
+ data,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
+ t);
+
+ if (complete)
+ get_parent()->on_local_recover(
+ pop.recovery_info.soid,
+ object_stat_sum_t(),
+ pop.recovery_info,
+ ObjectContextRef(), // ok, is replica
+ t);
+}
+
+void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
+{
+ for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
+ i != pushes.end();
+ ++i) {
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
+ i->first.osd,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+ for (vector<PushOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ dout(20) << __func__ << ": sending push (legacy) " << *j
+ << " to osd." << i->first << dendl;
+ send_push_op_legacy(prio, i->first, *j);
+ }
+ } else {
+ vector<PushOp>::iterator j = i->second.begin();
+ while (j != i->second.end()) {
+ uint64_t cost = 0;
+ uint64_t pushes = 0;
+ MOSDPGPush *msg = new MOSDPGPush();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = get_parent()->primary_spg_t();
+ msg->map_epoch = get_osdmap()->get_epoch();
+ msg->set_priority(prio);
+ for (;
+ (j != i->second.end() &&
+ cost < cct->_conf->osd_max_push_cost &&
+ pushes < cct->_conf->osd_max_push_objects) ;
+ ++j) {
+ dout(20) << __func__ << ": sending push " << *j
+ << " to osd." << i->first << dendl;
+ cost += j->cost(cct);
+ pushes += 1;
+ msg->pushes.push_back(*j);
+ }
+ msg->compute_cost(cct);
+ get_parent()->send_message_osd_cluster(msg, con);
+ }
+ }
+ }
+}
+
+void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
+{
+ for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
+ i != pulls.end();
+ ++i) {
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
+ i->first.osd,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+ for (vector<PullOp>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ dout(20) << __func__ << ": sending pull (legacy) " << *j
+ << " to osd." << i->first << dendl;
+ send_pull_legacy(
+ prio,
+ i->first,
+ j->recovery_info,
+ j->recovery_progress);
+ }
+ } else {
+ dout(20) << __func__ << ": sending pulls " << i->second
+ << " to osd." << i->first << dendl;
+ MOSDPGPull *msg = new MOSDPGPull();
+ msg->from = parent->whoami_shard();
+ msg->set_priority(prio);
+ msg->pgid = get_parent()->primary_spg_t();
+ msg->map_epoch = get_osdmap()->get_epoch();
+ msg->pulls.swap(i->second);
+ msg->compute_cost(cct);
+ get_parent()->send_message_osd_cluster(msg, con);
+ }
+ }
+}
+
+int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op,
+ object_stat_sum_t *stat)
+{
+ ObjectRecoveryProgress _new_progress;
+ if (!out_progress)
+ out_progress = &_new_progress;
+ ObjectRecoveryProgress &new_progress = *out_progress;
+ new_progress = progress;
+
+ dout(7) << "send_push_op " << recovery_info.soid
+ << " v " << recovery_info.version
+ << " size " << recovery_info.size
+ << " recovery_info: " << recovery_info
+ << dendl;
+
+ if (progress.first) {
+ store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+ store->getattrs(coll, recovery_info.soid, out_op->attrset);
+
+ // Debug
+ bufferlist bv = out_op->attrset[OI_ATTR];
+ object_info_t oi(bv);
+
+ if (oi.version != recovery_info.version) {
+ get_parent()->clog_error() << get_info().pgid << " push "
+ << recovery_info.soid << " v "
+ << recovery_info.version
+ << " failed because local copy is "
+ << oi.version << "\n";
+ return -EINVAL;
+ }
+
+ new_progress.first = false;
+ }
+
+ uint64_t available = cct->_conf->osd_recovery_max_chunk;
+ if (!progress.omap_complete) {
+ ObjectMap::ObjectMapIterator iter =
+ store->get_omap_iterator(coll,
+ recovery_info.soid);
+ for (iter->lower_bound(progress.omap_recovered_to);
+ iter->valid();
+ iter->next()) {
+ if (!out_op->omap_entries.empty() &&
+ available <= (iter->key().size() + iter->value().length()))
+ break;
+ out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
+
+ if ((iter->key().size() + iter->value().length()) <= available)
+ available -= (iter->key().size() + iter->value().length());
+ else
+ available = 0;
+ }
+ if (!iter->valid())
+ new_progress.omap_complete = true;
+ else
+ new_progress.omap_recovered_to = iter->key();
+ }
+
+ if (available > 0) {
+ if (!recovery_info.copy_subset.empty()) {
+ interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
+ bufferlist bl;
+ int r = store->fiemap(coll, recovery_info.soid, 0,
+ copy_subset.range_end(), bl);
+ if (r >= 0) {
+ interval_set<uint64_t> fiemap_included;
+ map<uint64_t, uint64_t> m;
+ bufferlist::iterator iter = bl.begin();
+ ::decode(m, iter);
+ map<uint64_t, uint64_t>::iterator miter;
+ for (miter = m.begin(); miter != m.end(); ++miter) {
+ fiemap_included.insert(miter->first, miter->second);
+ }
+
+ copy_subset.intersection_of(fiemap_included);
+ }
+ out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
+ available);
+ if (out_op->data_included.empty()) // zero filled section, skip to end!
+ new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
+ else
+ new_progress.data_recovered_to = out_op->data_included.range_end();
+ }
+ } else {
+ out_op->data_included.clear();
+ }
+
+ for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
+ p != out_op->data_included.end();
+ ++p) {
+ bufferlist bit;
+ store->read(coll, recovery_info.soid,
+ p.get_start(), p.get_len(), bit);
+ if (p.get_len() != bit.length()) {
+ dout(10) << " extent " << p.get_start() << "~" << p.get_len()
+ << " is actually " << p.get_start() << "~" << bit.length()
+ << dendl;
+ interval_set<uint64_t>::iterator save = p++;
+ if (bit.length() == 0)
+ out_op->data_included.erase(save); //Remove this empty interval
+ else
+ save.set_len(bit.length());
+ // Remove any other intervals present
+ while (p != out_op->data_included.end()) {
+ interval_set<uint64_t>::iterator save = p++;
+ out_op->data_included.erase(save);
+ }
+ new_progress.data_complete = true;
+ out_op->data.claim_append(bit);
+ break;
+ }
+ out_op->data.claim_append(bit);
+ }
+
+ if (new_progress.is_complete(recovery_info)) {
+ new_progress.data_complete = true;
+ if (stat)
+ stat->num_objects_recovered++;
+ }
+
+ if (stat) {
+ stat->num_keys_recovered += out_op->omap_entries.size();
+ stat->num_bytes_recovered += out_op->data.length();
+ }
+
+ get_parent()->get_logger()->inc(l_osd_push);
+ get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
+
+ // send
+ out_op->version = recovery_info.version;
+ out_op->soid = recovery_info.soid;
+ out_op->recovery_info = recovery_info;
+ out_op->after_progress = new_progress;
+ out_op->before_progress = progress;
+ return 0;
+}
+
+int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop)
+{
+ ceph_tid_t tid = get_parent()->get_tid();
+ osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
+ MOSDSubOp *subop = new MOSDSubOp(
+ rid, parent->whoami_shard(),
+ spg_t(get_info().pgid.pgid, peer.shard), pop.soid,
+ 0, get_osdmap()->get_epoch(),
+ tid, pop.recovery_info.version);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+ subop->set_priority(prio);
+ subop->version = pop.version;
+ subop->ops[0].indata.claim(pop.data);
+ subop->data_included.swap(pop.data_included);
+ subop->omap_header.claim(pop.omap_header);
+ subop->omap_entries.swap(pop.omap_entries);
+ subop->attrset.swap(pop.attrset);
+ subop->recovery_info = pop.recovery_info;
+ subop->current_progress = pop.before_progress;
+ subop->recovery_progress = pop.after_progress;
+
+ get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
+ return 0;
+}
+
+void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
+{
+ op->recovery_info.version = eversion_t();
+ op->version = eversion_t();
+ op->soid = soid;
+}
+
+void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
+{
+ MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
+ const hobject_t& soid = reply->get_poid();
+ assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
+ dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+ pg_shard_t peer = reply->from;
+
+ op->mark_started();
+
+ PushReplyOp rop;
+ rop.soid = soid;
+ PushOp pop;
+ bool more = handle_push_reply(peer, rop, &pop);
+ if (more)
+ send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
+}
+
+bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply)
+{
+ const hobject_t &soid = op.soid;
+ if (pushing.count(soid) == 0) {
+ dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
+ << ", or anybody else"
+ << dendl;
+ return false;
+ } else if (pushing[soid].count(peer) == 0) {
+ dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
+ << dendl;
+ return false;
+ } else {
+ PushInfo *pi = &pushing[soid][peer];
+
+ if (!pi->recovery_progress.data_complete) {
+ dout(10) << " pushing more from, "
+ << pi->recovery_progress.data_recovered_to
+ << " of " << pi->recovery_info.copy_subset << dendl;
+ ObjectRecoveryProgress new_progress;
+ int r = build_push_op(
+ pi->recovery_info,
+ pi->recovery_progress, &new_progress, reply,
+ &(pi->stat));
+ assert(r == 0);
+ pi->recovery_progress = new_progress;
+ return true;
+ } else {
+ // done!
+ get_parent()->on_peer_recover(
+ peer, soid, pi->recovery_info,
+ pi->stat);
+
+ pushing[soid].erase(peer);
+ pi = NULL;
+
+
+ if (pushing[soid].empty()) {
+ get_parent()->on_global_recover(soid);
+ pushing.erase(soid);
+ } else {
+ dout(10) << "pushed " << soid << ", still waiting for push ack from "
+ << pushing[soid].size() << " others" << dendl;
+ }
+ return false;
+ }
+ }
+}
+
+/** op_pull
+ * process request to pull an entire object.
+ * NOTE: called from opqueue.
+ */
+void ReplicatedBackend::sub_op_pull(OpRequestRef op)
+{
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+ assert(m->get_type() == MSG_OSD_SUBOP);
+
+ op->mark_started();
+
+ const hobject_t soid = m->poid;
+
+ dout(7) << "pull" << soid << " v " << m->version
+ << " from " << m->get_source()
+ << dendl;
+
+ assert(!is_primary()); // we should be a replica or stray.
+
+ PullOp pop;
+ pop.soid = soid;
+ pop.recovery_info = m->recovery_info;
+ pop.recovery_progress = m->recovery_progress;
+
+ PushOp reply;
+ handle_pull(m->from, pop, &reply);
+ send_push_op_legacy(
+ m->get_priority(),
+ m->from,
+ reply);
+
+ log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull);
+}
+
+void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
+{
+ const hobject_t &soid = op.soid;
+ struct stat st;
+ int r = store->stat(coll, soid, &st);
+ if (r != 0) {
+ get_parent()->clog_error() << get_info().pgid << " "
+ << peer << " tried to pull " << soid
+ << " but got " << cpp_strerror(-r) << "\n";
+ prep_push_op_blank(soid, reply);
+ } else {
+ ObjectRecoveryInfo &recovery_info = op.recovery_info;
+ ObjectRecoveryProgress &progress = op.recovery_progress;
+ if (progress.first && recovery_info.size == ((uint64_t)-1)) {
+ // Adjust size and copy_subset
+ recovery_info.size = st.st_size;
+ recovery_info.copy_subset.clear();
+ if (st.st_size)
+ recovery_info.copy_subset.insert(0, st.st_size);
+ assert(recovery_info.clone_subset.empty());
+ }
+
+ r = build_push_op(recovery_info, progress, 0, reply);
+ if (r < 0)
+ prep_push_op_blank(soid, reply);
+ }
+}
+
+/**
+ * trim received data to remove what we don't want
+ *
+ * @param copy_subset intervals we want
+ * @param data_included intervals we got
+ * @param data_recieved data we got
+ * @param intervals_usable intervals we want to keep
+ * @param data_usable matching data we want to keep
+ */
+void ReplicatedBackend::trim_pushed_data(
+ const interval_set<uint64_t> &copy_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable)
+{
+ if (intervals_received.subset_of(copy_subset)) {
+ *intervals_usable = intervals_received;
+ *data_usable = data_received;
+ return;
+ }
+
+ intervals_usable->intersection_of(copy_subset,
+ intervals_received);
+
+ uint64_t off = 0;
+ for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
+ p != intervals_received.end();
+ ++p) {
+ interval_set<uint64_t> x;
+ x.insert(p.get_start(), p.get_len());
+ x.intersection_of(copy_subset);
+ for (interval_set<uint64_t>::const_iterator q = x.begin();
+ q != x.end();
+ ++q) {
+ bufferlist sub;
+ uint64_t data_off = off + (q.get_start() - p.get_start());
+ sub.substr_of(data_received, data_off, q.get_len());
+ data_usable->claim_append(sub);
+ }
+ off += p.get_len();
+ }
+}
+
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void ReplicatedBackend::sub_op_push(OpRequestRef op)
+{
+ op->mark_started();
+ MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
+
+ PushOp pop;
+ pop.soid = m->recovery_info.soid;
+ pop.version = m->version;
+ m->claim_data(pop.data);
+ pop.data_included.swap(m->data_included);
+ pop.omap_header.swap(m->omap_header);
+ pop.omap_entries.swap(m->omap_entries);
+ pop.attrset.swap(m->attrset);
+ pop.recovery_info = m->recovery_info;
+ pop.before_progress = m->current_progress;
+ pop.after_progress = m->recovery_progress;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+
+ if (is_primary()) {
+ PullOp resp;
+ RPGHandle *h = _open_recovery_op();
+ list<hobject_t> to_continue;
+ bool more = handle_pull_response(
+ m->from, pop, &resp,
+ &to_continue, t);
+ if (more) {
+ send_pull_legacy(
+ m->get_priority(),
+ m->from,
+ resp.recovery_info,
+ resp.recovery_progress);
+ } else {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ op->get_req()->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new PG_RecoveryQueueAsync(
+ get_parent(),
+ get_parent()->bless_gencontext(c)));
+ }
+ run_recovery_op(h, op->get_req()->get_priority());
+ } else {
+ PushReplyOp resp;
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ m, parent->whoami_shard(), 0,
+ get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ reply->set_priority(m->get_priority());
+ assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+ handle_push(m->from, pop, &resp, t);
+ t->register_on_complete(new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
+ }
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
+ get_parent()->queue_transaction(t);
+ return;
+}
+
+void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
+{
+ get_parent()->failed_push(from, soid);
+ pull_from_peer[from].erase(soid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
+ pulling.erase(soid);
+}
+
+int ReplicatedBackend::start_pushes(
+ const hobject_t &soid,
+ ObjectContextRef obc,
+ RPGHandle *h)
+{
+ int pushes = 0;
+ // who needs it?
+ assert(get_parent()->get_actingbackfill_shards().size() > 0);
+ for (set<pg_shard_t>::iterator i =
+ get_parent()->get_actingbackfill_shards().begin();
+ i != get_parent()->get_actingbackfill_shards().end();
+ ++i) {
+ if (*i == get_parent()->whoami_shard()) continue;
+ pg_shard_t peer = *i;
+ map<pg_shard_t, pg_missing_t>::const_iterator j =
+ get_parent()->get_shard_missing().find(peer);
+ assert(j != get_parent()->get_shard_missing().end());
+ if (j->second.is_missing(soid)) {
+ ++pushes;
+ h->pushes[peer].push_back(PushOp());
+ prep_push_to_replica(obc, soid, peer,
+ &(h->pushes[peer].back())
+ );
+ }
+ }
+ return pushes;
+}