diff options
author | Samuel Just <sjust@redhat.com> | 2015-03-26 10:50:19 -0700 |
---|---|---|
committer | Samuel Just <sjust@redhat.com> | 2015-03-26 10:50:19 -0700 |
commit | c176ebf7918aff8a671afc6c5c4e8a6bbdd919df (patch) | |
tree | e104e5619761ab7d92a9a3918092e255cc8b48e3 | |
parent | e9d6096f254479b1c07e2eb0d1a5279e304d1728 (diff) | |
download | ceph-c176ebf7918aff8a671afc6c5c4e8a6bbdd919df.tar.gz |
osd/: Move ReplicatedBackend methods into ReplicatedBackend.cc
Signed-off-by: Samuel Just <sjust@redhat.com>
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 1623 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 1626 |
2 files changed, 1623 insertions, 1626 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 680c27a5e8b..b86d4d1e744 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -30,6 +30,35 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { return *_dout << pgb->get_parent()->gen_dbg_prefix(); } +static void log_subop_stats( + PerfCounters *logger, + OpRequestRef op, int subop) +{ + utime_t now = ceph_clock_now(g_ceph_context); + utime_t latency = now; + latency -= op->get_req()->get_recv_stamp(); + + + logger->inc(l_osd_sop); + logger->tinc(l_osd_sop_lat, latency); + logger->inc(subop); + + if (subop != l_osd_sop_pull) { + uint64_t inb = op->get_req()->get_data().length(); + logger->inc(l_osd_sop_inb, inb); + if (subop == l_osd_sop_w) { + logger->inc(l_osd_sop_w_inb, inb); + logger->tinc(l_osd_sop_w_lat, latency); + } else if (subop == l_osd_sop_push) { + logger->inc(l_osd_sop_push_inb, inb); + logger->tinc(l_osd_sop_push_lat, latency); + } else + assert("no support subop" == 0); + } else { + logger->tinc(l_osd_sop_pull_lat, latency); + } +} + ReplicatedBackend::ReplicatedBackend( PGBackend::Listener *pg, coll_t coll, @@ -797,3 +826,1597 @@ void ReplicatedBackend::be_deep_scrub( o.omap_digest = oh.digest(); o.omap_digest_present = true; } + +void ReplicatedBackend::_do_push(OpRequestRef op) +{ + MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH); + pg_shard_t from = m->from; + + vector<PushReplyOp> replies; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + for (vector<PushOp>::iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + replies.push_back(PushReplyOp()); + handle_push(from, *i, &(replies.back()), t); + } + + MOSDPGPushReply *reply = new MOSDPGPushReply; + reply->from = get_parent()->whoami_shard(); + reply->set_priority(m->get_priority()); + reply->pgid = get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->replies.swap(replies); + reply->compute_cost(cct); + + t->register_on_complete( + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); + get_parent()->queue_transaction(t); +} + +struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> { + ReplicatedBackend *bc; + list<hobject_t> to_continue; + int priority; + C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) + : bc(bc), priority(priority) {} + + void finish(ThreadPool::TPHandle &handle) { + ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); + for (list<hobject_t>::iterator i = + to_continue.begin(); + i != to_continue.end(); + ++i) { + map<hobject_t, ReplicatedBackend::PullInfo>::iterator j = + bc->pulling.find(*i); + assert(j != bc->pulling.end()); + if (!bc->start_pushes(*i, j->second.obc, h)) { + bc->get_parent()->on_global_recover( + *i); + } + bc->pulling.erase(*i); + handle.reset_tp_timeout(); + } + bc->run_recovery_op(h, priority); + } +}; + +void ReplicatedBackend::_do_pull_response(OpRequestRef op) +{ + MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH); + pg_shard_t from = m->from; + + vector<PullOp> replies(1); + ObjectStore::Transaction *t = new ObjectStore::Transaction; + list<hobject_t> to_continue; + for (vector<PushOp>::iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t); + if (more) + replies.push_back(PullOp()); + } + if (!to_continue.empty()) { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + m->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new PG_RecoveryQueueAsync( + get_parent(), + get_parent()->bless_gencontext(c))); + } + replies.erase(replies.end() - 1); + + if (replies.size()) { + MOSDPGPull *reply = new MOSDPGPull; + reply->from = parent->whoami_shard(); + reply->set_priority(m->get_priority()); + reply->pgid = get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->pulls.swap(replies); + reply->compute_cost(cct); + + t->register_on_complete( + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + } + + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); + get_parent()->queue_transaction(t); +} + +void ReplicatedBackend::do_pull(OpRequestRef op) +{ + MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PULL); + pg_shard_t from = m->from; + + map<pg_shard_t, vector<PushOp> > replies; + for (vector<PullOp>::iterator i = m->pulls.begin(); + i != m->pulls.end(); + ++i) { + replies[from].push_back(PushOp()); + handle_pull(from, *i, &(replies[from].back())); + } + send_pushes(m->get_priority(), replies); +} + +void ReplicatedBackend::do_push_reply(OpRequestRef op) +{ + MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY); + pg_shard_t from = m->from; + + vector<PushOp> replies(1); + for (vector<PushReplyOp>::iterator i = m->replies.begin(); + i != m->replies.end(); + ++i) { + bool more = handle_push_reply(from, *i, &(replies.back())); + if (more) + replies.push_back(PushOp()); + } + replies.erase(replies.end() - 1); + + map<pg_shard_t, vector<PushOp> > _replies; + _replies[from].swap(replies); + send_pushes(m->get_priority(), _replies); +} + +template<typename T, int MSGTYPE> +Message * ReplicatedBackend::generate_subop( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_trim_rollback_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + const vector<pg_log_entry_t> &log_entries, + boost::optional<pg_hit_set_history_t> &hset_hist, + InProgressOp *op, + ObjectStore::Transaction *op_t, + pg_shard_t peer, + const pg_info_t &pinfo) +{ + int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP); + // forward the write/update/whatever + T *wr = new T( + reqid, parent->whoami_shard(), + spg_t(get_info().pgid.pgid, peer.shard), + soid, acks_wanted, + get_osdmap()->get_epoch(), + tid, at_version); + + // ship resulting transaction, log entries, and pg_stats + if (!parent->should_send_op(peer, soid)) { + dout(10) << "issue_repop shipping empty opt to osd." << peer + <<", object " << soid + << " beyond MAX(last_backfill_started " + << ", pinfo.last_backfill " + << pinfo.last_backfill << ")" << dendl; + ObjectStore::Transaction t; + t.set_use_tbl(op_t->get_use_tbl()); + ::encode(t, wr->get_data()); + } else { + ::encode(*op_t, wr->get_data()); + } + + ::encode(log_entries, wr->logbl); + + if (pinfo.is_incomplete()) + wr->pg_stats = pinfo.stats; // reflects backfill progress + else + wr->pg_stats = get_info().stats; + + wr->pg_trim_to = pg_trim_to; + wr->pg_trim_rollback_to = pg_trim_rollback_to; + + wr->new_temp_oid = new_temp_oid; + wr->discard_temp_oid = discard_temp_oid; + wr->updated_hit_set_history = hset_hist; + return wr; +} + +void ReplicatedBackend::issue_op( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_trim_rollback_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + const vector<pg_log_entry_t> &log_entries, + boost::optional<pg_hit_set_history_t> &hset_hist, + InProgressOp *op, + ObjectStore::Transaction *op_t) +{ + + if (parent->get_actingbackfill_shards().size() > 1) { + ostringstream ss; + set<pg_shard_t> replicas = parent->get_actingbackfill_shards(); + replicas.erase(parent->whoami_shard()); + ss << "waiting for subops from " << replicas; + if (op->op) + op->op->mark_sub_op_sent(ss.str()); + } + for (set<pg_shard_t>::const_iterator i = + parent->get_actingbackfill_shards().begin(); + i != parent->get_actingbackfill_shards().end(); + ++i) { + if (*i == parent->whoami_shard()) continue; + pg_shard_t peer = *i; + const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; + + Message *wr; + uint64_t min_features = parent->min_peer_features(); + if (!(min_features & CEPH_FEATURE_OSD_REPOP)) { + dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl; + wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_trim_rollback_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op, + op_t, + peer, + pinfo); + } else { + wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_trim_rollback_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op, + op_t, + peer, + pinfo); + } + + get_parent()->send_message_osd_cluster( + peer.osd, wr, get_osdmap()->get_epoch()); + } +} + +// sub op modify +void ReplicatedBackend::sub_op_modify(OpRequestRef op) { + Message *m = op->get_req(); + int msg_type = m->get_type(); + if (msg_type == MSG_OSD_SUBOP) { + sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op); + } else if (msg_type == MSG_OSD_REPOP) { + sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op); + } else { + assert(0); + } +} + +template<typename T, int MSGTYPE> +void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op) +{ + T *m = static_cast<T *>(op->get_req()); + int msg_type = m->get_type(); + assert(MSGTYPE == msg_type); + assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP); + + const hobject_t& soid = m->poid; + + dout(10) << "sub_op_modify trans" + << " " << soid + << " v " << m->version + << (m->logbl.length() ? " (transaction)" : " (parallel exec") + << " " << m->logbl.length() + << dendl; + + // sanity checks + assert(m->map_epoch >= get_info().history.same_interval_since); + + // we better not be missing this. + assert(!parent->get_log().get_missing().is_missing(soid)); + + int ackerosd = m->get_source().num(); + + op->mark_started(); + + RepModifyRef rm(new RepModify); + rm->op = op; + rm->ackerosd = ackerosd; + rm->last_complete = get_info().last_complete; + rm->epoch_started = get_osdmap()->get_epoch(); + + assert(m->logbl.length()); + // shipped transaction and log entries + vector<pg_log_entry_t> log; + + bufferlist::iterator p = m->get_data().begin(); + ::decode(rm->opt, p); + rm->localt.set_use_tbl(rm->opt.get_use_tbl()); + + if (m->new_temp_oid != hobject_t()) { + dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; + add_temp_obj(m->new_temp_oid); + get_temp_coll(&rm->localt); + } + if (m->discard_temp_oid != hobject_t()) { + dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; + if (rm->opt.empty()) { + dout(10) << __func__ << ": removing object " << m->discard_temp_oid + << " since we won't get the transaction" << dendl; + rm->localt.remove(temp_coll, m->discard_temp_oid); + } + clear_temp_obj(m->discard_temp_oid); + } + + p = m->logbl.begin(); + ::decode(log, p); + rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + + bool update_snaps = false; + if (!rm->opt.empty()) { + // If the opt is non-empty, we infer we are before + // last_backfill (according to the primary, not our + // not-quite-accurate value), and should update the + // collections now. Otherwise, we do it later on push. + update_snaps = true; + } + parent->update_stats(m->pg_stats); + parent->log_operation( + log, + m->updated_hit_set_history, + m->pg_trim_to, + m->pg_trim_rollback_to, + update_snaps, + &(rm->localt)); + + rm->bytes_written = rm->opt.get_encoded_bytes(); + + op->mark_started(); + + rm->localt.append(rm->opt); + rm->localt.register_on_commit( + parent->bless_context( + new C_OSD_RepModifyCommit(this, rm))); + rm->localt.register_on_applied( + parent->bless_context( + new C_OSD_RepModifyApply(this, rm))); + parent->queue_transaction(&(rm->localt), op); + // op is cleaned up by oncommit/onapply when both are executed +} + +void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm) +{ + rm->op->mark_event("sub_op_applied"); + rm->applied = true; + + dout(10) << "sub_op_modify_applied on " << rm << " op " + << *rm->op->get_req() << dendl; + Message *m = rm->op->get_req(); + + Message *ack = NULL; + eversion_t version; + + if (m->get_type() == MSG_OSD_SUBOP) { + // doesn't have CLIENT SUBOP feature ,use Subop + MOSDSubOp *req = static_cast<MOSDSubOp*>(m); + version = req->version; + if (!rm->committed) + ack = new MOSDSubOpReply( + req, parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + } else if (m->get_type() == MSG_OSD_REPOP) { + MOSDRepOp *req = static_cast<MOSDRepOp*>(m); + version = req->version; + if (!rm->committed) + ack = new MOSDRepOpReply( + static_cast<MOSDRepOp*>(m), parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + } else { + assert(0); + } + + // send ack to acker only if we haven't sent a commit already + if (ack) { + ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! + get_parent()->send_message_osd_cluster( + rm->ackerosd, ack, get_osdmap()->get_epoch()); + } + + parent->op_applied(version); +} + +void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) +{ + rm->op->mark_commit_sent(); + rm->committed = true; + + // send commit. + dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() + << ", sending commit to osd." << rm->ackerosd + << dendl; + + assert(get_osdmap()->is_up(rm->ackerosd)); + get_parent()->update_last_complete_ondisk(rm->last_complete); + + Message *m = rm->op->get_req(); + Message *commit; + if (m->get_type() == MSG_OSD_SUBOP) { + // doesn't have CLIENT SUBOP feature ,use Subop + MOSDSubOpReply *reply = new MOSDSubOpReply( + static_cast<MOSDSubOp*>(m), + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + commit = reply; + } else if (m->get_type() == MSG_OSD_REPOP) { + MOSDRepOpReply *reply = new MOSDRepOpReply( + static_cast<MOSDRepOp*>(m), + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + commit = reply; + } + else { + assert(0); + } + + commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! + get_parent()->send_message_osd_cluster( + rm->ackerosd, commit, get_osdmap()->get_epoch()); + + log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w); +} + + +// =========================================================== + +void ReplicatedBackend::calc_head_subsets( + ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets) +{ + dout(10) << "calc_head_subsets " << head + << " clone_overlap " << snapset.clone_overlap << dendl; + + uint64_t size = obc->obs.oi.size; + if (size) + data_subset.insert(0, size); + + if (get_parent()->get_pool().allow_incomplete_clones()) { + dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; + return; + } + + if (!cct->_conf->osd_recover_clone_overlap) { + dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl; + return; + } + + + interval_set<uint64_t> cloning; + interval_set<uint64_t> prev; + if (size) + prev.insert(0, size); + + for (int j=snapset.clones.size()-1; j>=0; j--) { + hobject_t c = head; + c.snap = snapset.clones[j]; + prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); + if (!missing.is_missing(c) && c < last_backfill) { + dout(10) << "calc_head_subsets " << head << " has prev " << c + << " overlap " << prev << dendl; + clone_subsets[c] = prev; + cloning.union_of(prev); + break; + } + dout(10) << "calc_head_subsets " << head << " does not have prev " << c + << " overlap " << prev << dendl; + } + + + if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { + dout(10) << "skipping clone, too many holes" << dendl; + clone_subsets.clear(); + cloning.clear(); + } + + // what's left for us to push? + data_subset.subtract(cloning); + + dout(10) << "calc_head_subsets " << head + << " data_subset " << data_subset + << " clone_subsets " << clone_subsets << dendl; +} + +void ReplicatedBackend::calc_clone_subsets( + SnapSet& snapset, const hobject_t& soid, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets) +{ + dout(10) << "calc_clone_subsets " << soid + << " clone_overlap " << snapset.clone_overlap << dendl; + + uint64_t size = snapset.clone_size[soid.snap]; + if (size) + data_subset.insert(0, size); + + if (get_parent()->get_pool().allow_incomplete_clones()) { + dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; + return; + } + + if (!cct->_conf->osd_recover_clone_overlap) { + dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl; + return; + } + + unsigned i; + for (i=0; i < snapset.clones.size(); i++) + if (snapset.clones[i] == soid.snap) + break; + + // any overlap with next older clone? + interval_set<uint64_t> cloning; + interval_set<uint64_t> prev; + if (size) + prev.insert(0, size); + for (int j=i-1; j>=0; j--) { + hobject_t c = soid; + c.snap = snapset.clones[j]; + prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); + if (!missing.is_missing(c) && c < last_backfill) { + dout(10) << "calc_clone_subsets " << soid << " has prev " << c + << " overlap " << prev << dendl; + clone_subsets[c] = prev; + cloning.union_of(prev); + break; + } + dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c + << " overlap " << prev << dendl; + } + + // overlap with next newest? + interval_set<uint64_t> next; + if (size) + next.insert(0, size); + for (unsigned j=i+1; j<snapset.clones.size(); j++) { + hobject_t c = soid; + c.snap = snapset.clones[j]; + next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]); + if (!missing.is_missing(c) && c < last_backfill) { + dout(10) << "calc_clone_subsets " << soid << " has next " << c + << " overlap " << next << dendl; + clone_subsets[c] = next; + cloning.union_of(next); + break; + } + dout(10) << "calc_clone_subsets " << soid << " does not have next " << c + << " overlap " << next << dendl; + } + + if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { + dout(10) << "skipping clone, too many holes" << dendl; + clone_subsets.clear(); + cloning.clear(); + } + + + // what's left for us to push? + data_subset.subtract(cloning); + + dout(10) << "calc_clone_subsets " << soid + << " data_subset " << data_subset + << " clone_subsets " << clone_subsets << dendl; +} + +void ReplicatedBackend::prepare_pull( + eversion_t v, + const hobject_t& soid, + ObjectContextRef headctx, + RPGHandle *h) +{ + assert(get_parent()->get_local_missing().missing.count(soid)); + eversion_t _v = get_parent()->get_local_missing().missing.find( + soid)->second.need; + assert(_v == v); + const map<hobject_t, set<pg_shard_t> > &missing_loc( + get_parent()->get_missing_loc_shards()); + const map<pg_shard_t, pg_missing_t > &peer_missing( + get_parent()->get_shard_missing()); + map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid); + assert(q != missing_loc.end()); + assert(!q->second.empty()); + + // pick a pullee + vector<pg_shard_t> shuffle(q->second.begin(), q->second.end()); + random_shuffle(shuffle.begin(), shuffle.end()); + vector<pg_shard_t>::iterator p = shuffle.begin(); + assert(get_osdmap()->is_up(p->osd)); + pg_shard_t fromshard = *p; + + dout(7) << "pull " << soid + << " v " << v + << " on osds " << *p + << " from osd." << fromshard + << dendl; + + assert(peer_missing.count(fromshard)); + const pg_missing_t &pmissing = peer_missing.find(fromshard)->second; + if (pmissing.is_missing(soid, v)) { + assert(pmissing.missing.find(soid)->second.have != v); + dout(10) << "pulling soid " << soid << " from osd " << fromshard + << " at version " << pmissing.missing.find(soid)->second.have + << " rather than at version " << v << dendl; + v = pmissing.missing.find(soid)->second.have; + assert(get_parent()->get_log().get_log().objects.count(soid) && + (get_parent()->get_log().get_log().objects.find(soid)->second->op == + pg_log_entry_t::LOST_REVERT) && + (get_parent()->get_log().get_log().objects.find( + soid)->second->reverting_to == + v)); + } + + ObjectRecoveryInfo recovery_info; + + if (soid.is_snap()) { + assert(!get_parent()->get_local_missing().is_missing( + soid.get_head()) || + !get_parent()->get_local_missing().is_missing( + soid.get_snapdir())); + assert(headctx); + // check snapset + SnapSetContext *ssc = headctx->ssc; + assert(ssc); + dout(10) << " snapset " << ssc->snapset << dendl; + calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(), + get_info().last_backfill, + recovery_info.copy_subset, + recovery_info.clone_subset); + // FIXME: this may overestimate if we are pulling multiple clones in parallel... + dout(10) << " pulling " << recovery_info << dendl; + } else { + // pulling head or unversioned object. + // always pull the whole thing. + recovery_info.copy_subset.insert(0, (uint64_t)-1); + recovery_info.size = ((uint64_t)-1); + } + + h->pulls[fromshard].push_back(PullOp()); + PullOp &op = h->pulls[fromshard].back(); + op.soid = soid; + + op.recovery_info = recovery_info; + op.recovery_info.soid = soid; + op.recovery_info.version = v; + op.recovery_progress.data_complete = false; + op.recovery_progress.omap_complete = false; + op.recovery_progress.data_recovered_to = 0; + op.recovery_progress.first = true; + + assert(!pulling.count(soid)); + pull_from_peer[fromshard].insert(soid); + PullInfo &pi = pulling[soid]; + pi.head_ctx = headctx; + pi.recovery_info = op.recovery_info; + pi.recovery_progress = op.recovery_progress; +} + +/* + * intelligently push an object to a replica. make use of existing + * clones/heads and dup data ranges where possible. + */ +void ReplicatedBackend::prep_push_to_replica( + ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, + PushOp *pop) +{ + const object_info_t& oi = obc->obs.oi; + uint64_t size = obc->obs.oi.size; + + dout(10) << __func__ << ": " << soid << " v" << oi.version + << " size " << size << " to osd." << peer << dendl; + + map<hobject_t, interval_set<uint64_t> > clone_subsets; + interval_set<uint64_t> data_subset; + + // are we doing a clone on the replica? + if (soid.snap && soid.snap < CEPH_NOSNAP) { + hobject_t head = soid; + head.snap = CEPH_NOSNAP; + + // try to base push off of clones that succeed/preceed poid + // we need the head (and current SnapSet) locally to do that. + if (get_parent()->get_local_missing().is_missing(head)) { + dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop); + } + hobject_t snapdir = head; + snapdir.snap = CEPH_SNAPDIR; + if (get_parent()->get_local_missing().is_missing(snapdir)) { + dout(15) << "push_to_replica missing snapdir " << snapdir + << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop); + } + + SnapSetContext *ssc = obc->ssc; + assert(ssc); + dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; + map<pg_shard_t, pg_missing_t>::const_iterator pm = + get_parent()->get_shard_missing().find(peer); + assert(pm != get_parent()->get_shard_missing().end()); + map<pg_shard_t, pg_info_t>::const_iterator pi = + get_parent()->get_shard_info().find(peer); + assert(pi != get_parent()->get_shard_info().end()); + calc_clone_subsets(ssc->snapset, soid, + pm->second, + pi->second.last_backfill, + data_subset, clone_subsets); + } else if (soid.snap == CEPH_NOSNAP) { + // pushing head or unversioned object. + // base this on partially on replica's clones? + SnapSetContext *ssc = obc->ssc; + assert(ssc); + dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; + calc_head_subsets( + obc, + ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second, + get_parent()->get_shard_info().find(peer)->second.last_backfill, + data_subset, clone_subsets); + } + + prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop); +} + +void ReplicatedBackend::prep_push(ObjectContextRef obc, + const hobject_t& soid, pg_shard_t peer, + PushOp *pop) +{ + interval_set<uint64_t> data_subset; + if (obc->obs.oi.size) + data_subset.insert(0, obc->obs.oi.size); + map<hobject_t, interval_set<uint64_t> > clone_subsets; + + prep_push(obc, soid, peer, + obc->obs.oi.version, data_subset, clone_subsets, + pop); +} + +void ReplicatedBackend::prep_push( + ObjectContextRef obc, + const hobject_t& soid, pg_shard_t peer, + eversion_t version, + interval_set<uint64_t> &data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets, + PushOp *pop) +{ + get_parent()->begin_peer_recover(peer, soid); + // take note. + PushInfo &pi = pushing[soid][peer]; + pi.obc = obc; + pi.recovery_info.size = obc->obs.oi.size; + pi.recovery_info.copy_subset = data_subset; + pi.recovery_info.clone_subset = clone_subsets; + pi.recovery_info.soid = soid; + pi.recovery_info.oi = obc->obs.oi; + pi.recovery_info.version = version; + pi.recovery_progress.first = true; + pi.recovery_progress.data_recovered_to = 0; + pi.recovery_progress.data_complete = 0; + pi.recovery_progress.omap_complete = 0; + + ObjectRecoveryProgress new_progress; + int r = build_push_op(pi.recovery_info, + pi.recovery_progress, + &new_progress, + pop, + &(pi.stat)); + assert(r == 0); + pi.recovery_progress = new_progress; +} + +int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer, + const ObjectRecoveryInfo &recovery_info, + ObjectRecoveryProgress progress) +{ + // send op + ceph_tid_t tid = get_parent()->get_tid(); + osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); + + dout(10) << "send_pull_op " << recovery_info.soid << " " + << recovery_info.version + << " first=" << progress.first + << " data " << recovery_info.copy_subset + << " from osd." << peer + << " tid " << tid << dendl; + + MOSDSubOp *subop = new MOSDSubOp( + rid, parent->whoami_shard(), + get_info().pgid, recovery_info.soid, + CEPH_OSD_FLAG_ACK, + get_osdmap()->get_epoch(), tid, + recovery_info.version); + subop->set_priority(prio); + subop->ops = vector<OSDOp>(1); + subop->ops[0].op.op = CEPH_OSD_OP_PULL; + subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk; + subop->recovery_info = recovery_info; + subop->recovery_progress = progress; + + get_parent()->send_message_osd_cluster( + peer.osd, subop, get_osdmap()->get_epoch()); + + get_parent()->get_logger()->inc(l_osd_pull); + return 0; +} + +void ReplicatedBackend::submit_push_data( + ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + const interval_set<uint64_t> &intervals_included, + bufferlist data_included, + bufferlist omap_header, + map<string, bufferlist> &attrs, + map<string, bufferlist> &omap_entries, + ObjectStore::Transaction *t) +{ + coll_t target_coll; + if (first && complete) { + target_coll = coll; + } else { + dout(10) << __func__ << ": Creating oid " + << recovery_info.soid << " in the temp collection" << dendl; + add_temp_obj(recovery_info.soid); + target_coll = get_temp_coll(t); + } + + if (first) { + get_parent()->on_local_recover_start(recovery_info.soid, t); + t->remove(get_temp_coll(t), recovery_info.soid); + t->touch(target_coll, recovery_info.soid); + t->truncate(target_coll, recovery_info.soid, recovery_info.size); + t->omap_setheader(target_coll, recovery_info.soid, omap_header); + } + uint64_t off = 0; + for (interval_set<uint64_t>::const_iterator p = intervals_included.begin(); + p != intervals_included.end(); + ++p) { + bufferlist bit; + bit.substr_of(data_included, off, p.get_len()); + t->write(target_coll, recovery_info.soid, + p.get_start(), p.get_len(), bit); + off += p.get_len(); + } + + t->omap_setkeys(target_coll, recovery_info.soid, + omap_entries); + t->setattrs(target_coll, recovery_info.soid, + attrs); + + if (complete) { + if (!first) { + dout(10) << __func__ << ": Removing oid " + << recovery_info.soid << " from the temp collection" << dendl; + clear_temp_obj(recovery_info.soid); + t->collection_move(coll, target_coll, recovery_info.soid); + } + + submit_push_complete(recovery_info, t); + } +} + +void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) +{ + for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = + recovery_info.clone_subset.begin(); + p != recovery_info.clone_subset.end(); + ++p) { + for (interval_set<uint64_t>::const_iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + dout(15) << " clone_range " << p->first << " " + << q.get_start() << "~" << q.get_len() << dendl; + t->clone_range(coll, p->first, recovery_info.soid, + q.get_start(), q.get_len(), q.get_start()); + } + } +} + +ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( + const ObjectRecoveryInfo& recovery_info, + SnapSetContext *ssc) +{ + if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) + return recovery_info; + ObjectRecoveryInfo new_info = recovery_info; + new_info.copy_subset.clear(); + new_info.clone_subset.clear(); + assert(ssc); + calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(), + get_info().last_backfill, + new_info.copy_subset, new_info.clone_subset); + return new_info; +} + +bool ReplicatedBackend::handle_pull_response( + pg_shard_t from, PushOp &pop, PullOp *response, + list<hobject_t> *to_continue, + ObjectStore::Transaction *t + ) +{ + interval_set<uint64_t> data_included = pop.data_included; + bufferlist data; + data.claim(pop.data); + dout(10) << "handle_pull_response " + << pop.recovery_info + << pop.after_progress + << " data.size() is " << data.length() + << " data_included: " << data_included + << dendl; + if (pop.version == eversion_t()) { + // replica doesn't have it! + _failed_push(from, pop.soid); + return false; + } + + hobject_t &hoid = pop.soid; + assert((data_included.empty() && data.length() == 0) || + (!data_included.empty() && data.length() > 0)); + + if (!pulling.count(hoid)) { + return false; + } + + PullInfo &pi = pulling[hoid]; + if (pi.recovery_info.size == (uint64_t(-1))) { + pi.recovery_info.size = pop.recovery_info.size; + pi.recovery_info.copy_subset.intersection_of( + pop.recovery_info.copy_subset); + } + + bool first = pi.recovery_progress.first; + if (first) { + pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset); + pi.recovery_info.oi = pi.obc->obs.oi; + pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc); + } + + + interval_set<uint64_t> usable_intervals; + bufferlist usable_data; + trim_pushed_data(pi.recovery_info.copy_subset, + data_included, + data, + &usable_intervals, + &usable_data); + data_included = usable_intervals; + data.claim(usable_data); + + + pi.recovery_progress = pop.after_progress; + + pi.stat.num_bytes_recovered += data.length(); + + dout(10) << "new recovery_info " << pi.recovery_info + << ", new progress " << pi.recovery_progress + << dendl; + + bool complete = pi.is_complete(); + + submit_push_data(pi.recovery_info, first, + complete, + data_included, data, + pop.omap_header, + pop.attrset, + pop.omap_entries, + t); + + pi.stat.num_keys_recovered += pop.omap_entries.size(); + + if (complete) { + to_continue->push_back(hoid); + pi.stat.num_objects_recovered++; + get_parent()->on_local_recover( + hoid, pi.stat, pi.recovery_info, pi.obc, t); + pull_from_peer[from].erase(hoid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); + return false; + } else { + response->soid = pop.soid; + response->recovery_info = pi.recovery_info; + response->recovery_progress = pi.recovery_progress; + return true; + } +} + +void ReplicatedBackend::handle_push( + pg_shard_t from, PushOp &pop, PushReplyOp *response, + ObjectStore::Transaction *t) +{ + dout(10) << "handle_push " + << pop.recovery_info + << pop.after_progress + << dendl; + bufferlist data; + data.claim(pop.data); + bool first = pop.before_progress.first; + bool complete = pop.after_progress.data_complete && + pop.after_progress.omap_complete; + + response->soid = pop.recovery_info.soid; + submit_push_data(pop.recovery_info, + first, + complete, + pop.data_included, + data, + pop.omap_header, + pop.attrset, + pop.omap_entries, + t); + + if (complete) + get_parent()->on_local_recover( + pop.recovery_info.soid, + object_stat_sum_t(), + pop.recovery_info, + ObjectContextRef(), // ok, is replica + t); +} + +void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes) +{ + for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin(); + i != pushes.end(); + ++i) { + ConnectionRef con = get_parent()->get_con_osd_cluster( + i->first.osd, + get_osdmap()->get_epoch()); + if (!con) + continue; + if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { + for (vector<PushOp>::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + dout(20) << __func__ << ": sending push (legacy) " << *j + << " to osd." << i->first << dendl; + send_push_op_legacy(prio, i->first, *j); + } + } else { + vector<PushOp>::iterator j = i->second.begin(); + while (j != i->second.end()) { + uint64_t cost = 0; + uint64_t pushes = 0; + MOSDPGPush *msg = new MOSDPGPush(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = get_parent()->primary_spg_t(); + msg->map_epoch = get_osdmap()->get_epoch(); + msg->set_priority(prio); + for (; + (j != i->second.end() && + cost < cct->_conf->osd_max_push_cost && + pushes < cct->_conf->osd_max_push_objects) ; + ++j) { + dout(20) << __func__ << ": sending push " << *j + << " to osd." << i->first << dendl; + cost += j->cost(cct); + pushes += 1; + msg->pushes.push_back(*j); + } + msg->compute_cost(cct); + get_parent()->send_message_osd_cluster(msg, con); + } + } + } +} + +void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls) +{ + for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin(); + i != pulls.end(); + ++i) { + ConnectionRef con = get_parent()->get_con_osd_cluster( + i->first.osd, + get_osdmap()->get_epoch()); + if (!con) + continue; + if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { + for (vector<PullOp>::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + dout(20) << __func__ << ": sending pull (legacy) " << *j + << " to osd." << i->first << dendl; + send_pull_legacy( + prio, + i->first, + j->recovery_info, + j->recovery_progress); + } + } else { + dout(20) << __func__ << ": sending pulls " << i->second + << " to osd." << i->first << dendl; + MOSDPGPull *msg = new MOSDPGPull(); + msg->from = parent->whoami_shard(); + msg->set_priority(prio); + msg->pgid = get_parent()->primary_spg_t(); + msg->map_epoch = get_osdmap()->get_epoch(); + msg->pulls.swap(i->second); + msg->compute_cost(cct); + get_parent()->send_message_osd_cluster(msg, con); + } + } +} + +int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op, + object_stat_sum_t *stat) +{ + ObjectRecoveryProgress _new_progress; + if (!out_progress) + out_progress = &_new_progress; + ObjectRecoveryProgress &new_progress = *out_progress; + new_progress = progress; + + dout(7) << "send_push_op " << recovery_info.soid + << " v " << recovery_info.version + << " size " << recovery_info.size + << " recovery_info: " << recovery_info + << dendl; + + if (progress.first) { + store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header); + store->getattrs(coll, recovery_info.soid, out_op->attrset); + + // Debug + bufferlist bv = out_op->attrset[OI_ATTR]; + object_info_t oi(bv); + + if (oi.version != recovery_info.version) { + get_parent()->clog_error() << get_info().pgid << " push " + << recovery_info.soid << " v " + << recovery_info.version + << " failed because local copy is " + << oi.version << "\n"; + return -EINVAL; + } + + new_progress.first = false; + } + + uint64_t available = cct->_conf->osd_recovery_max_chunk; + if (!progress.omap_complete) { + ObjectMap::ObjectMapIterator iter = + store->get_omap_iterator(coll, + recovery_info.soid); + for (iter->lower_bound(progress.omap_recovered_to); + iter->valid(); + iter->next()) { + if (!out_op->omap_entries.empty() && + available <= (iter->key().size() + iter->value().length())) + break; + out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); + + if ((iter->key().size() + iter->value().length()) <= available) + available -= (iter->key().size() + iter->value().length()); + else + available = 0; + } + if (!iter->valid()) + new_progress.omap_complete = true; + else + new_progress.omap_recovered_to = iter->key(); + } + + if (available > 0) { + if (!recovery_info.copy_subset.empty()) { + interval_set<uint64_t> copy_subset = recovery_info.copy_subset; + bufferlist bl; + int r = store->fiemap(coll, recovery_info.soid, 0, + copy_subset.range_end(), bl); + if (r >= 0) { + interval_set<uint64_t> fiemap_included; + map<uint64_t, uint64_t> m; + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + map<uint64_t, uint64_t>::iterator miter; + for (miter = m.begin(); miter != m.end(); ++miter) { + fiemap_included.insert(miter->first, miter->second); + } + + copy_subset.intersection_of(fiemap_included); + } + out_op->data_included.span_of(copy_subset, progress.data_recovered_to, + available); + if (out_op->data_included.empty()) // zero filled section, skip to end! + new_progress.data_recovered_to = recovery_info.copy_subset.range_end(); + else + new_progress.data_recovered_to = out_op->data_included.range_end(); + } + } else { + out_op->data_included.clear(); + } + + for (interval_set<uint64_t>::iterator p = out_op->data_included.begin(); + p != out_op->data_included.end(); + ++p) { + bufferlist bit; + store->read(coll, recovery_info.soid, + p.get_start(), p.get_len(), bit); + if (p.get_len() != bit.length()) { + dout(10) << " extent " << p.get_start() << "~" << p.get_len() + << " is actually " << p.get_start() << "~" << bit.length() + << dendl; + interval_set<uint64_t>::iterator save = p++; + if (bit.length() == 0) + out_op->data_included.erase(save); //Remove this empty interval + else + save.set_len(bit.length()); + // Remove any other intervals present + while (p != out_op->data_included.end()) { + interval_set<uint64_t>::iterator save = p++; + out_op->data_included.erase(save); + } + new_progress.data_complete = true; + out_op->data.claim_append(bit); + break; + } + out_op->data.claim_append(bit); + } + + if (new_progress.is_complete(recovery_info)) { + new_progress.data_complete = true; + if (stat) + stat->num_objects_recovered++; + } + + if (stat) { + stat->num_keys_recovered += out_op->omap_entries.size(); + stat->num_bytes_recovered += out_op->data.length(); + } + + get_parent()->get_logger()->inc(l_osd_push); + get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); + + // send + out_op->version = recovery_info.version; + out_op->soid = recovery_info.soid; + out_op->recovery_info = recovery_info; + out_op->after_progress = new_progress; + out_op->before_progress = progress; + return 0; +} + +int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop) +{ + ceph_tid_t tid = get_parent()->get_tid(); + osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); + MOSDSubOp *subop = new MOSDSubOp( + rid, parent->whoami_shard(), + spg_t(get_info().pgid.pgid, peer.shard), pop.soid, + 0, get_osdmap()->get_epoch(), + tid, pop.recovery_info.version); + subop->ops = vector<OSDOp>(1); + subop->ops[0].op.op = CEPH_OSD_OP_PUSH; + + subop->set_priority(prio); + subop->version = pop.version; + subop->ops[0].indata.claim(pop.data); + subop->data_included.swap(pop.data_included); + subop->omap_header.claim(pop.omap_header); + subop->omap_entries.swap(pop.omap_entries); + subop->attrset.swap(pop.attrset); + subop->recovery_info = pop.recovery_info; + subop->current_progress = pop.before_progress; + subop->recovery_progress = pop.after_progress; + + get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch()); + return 0; +} + +void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) +{ + op->recovery_info.version = eversion_t(); + op->version = eversion_t(); + op->soid = soid; +} + +void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) +{ + MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req()); + const hobject_t& soid = reply->get_poid(); + assert(reply->get_type() == MSG_OSD_SUBOPREPLY); + dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; + pg_shard_t peer = reply->from; + + op->mark_started(); + + PushReplyOp rop; + rop.soid = soid; + PushOp pop; + bool more = handle_push_reply(peer, rop, &pop); + if (more) + send_push_op_legacy(op->get_req()->get_priority(), peer, pop); +} + +bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply) +{ + const hobject_t &soid = op.soid; + if (pushing.count(soid) == 0) { + dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer + << ", or anybody else" + << dendl; + return false; + } else if (pushing[soid].count(peer) == 0) { + dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer + << dendl; + return false; + } else { + PushInfo *pi = &pushing[soid][peer]; + + if (!pi->recovery_progress.data_complete) { + dout(10) << " pushing more from, " + << pi->recovery_progress.data_recovered_to + << " of " << pi->recovery_info.copy_subset << dendl; + ObjectRecoveryProgress new_progress; + int r = build_push_op( + pi->recovery_info, + pi->recovery_progress, &new_progress, reply, + &(pi->stat)); + assert(r == 0); + pi->recovery_progress = new_progress; + return true; + } else { + // done! + get_parent()->on_peer_recover( + peer, soid, pi->recovery_info, + pi->stat); + + pushing[soid].erase(peer); + pi = NULL; + + + if (pushing[soid].empty()) { + get_parent()->on_global_recover(soid); + pushing.erase(soid); + } else { + dout(10) << "pushed " << soid << ", still waiting for push ack from " + << pushing[soid].size() << " others" << dendl; + } + return false; + } + } +} + +/** op_pull + * process request to pull an entire object. + * NOTE: called from opqueue. + */ +void ReplicatedBackend::sub_op_pull(OpRequestRef op) +{ + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); + assert(m->get_type() == MSG_OSD_SUBOP); + + op->mark_started(); + + const hobject_t soid = m->poid; + + dout(7) << "pull" << soid << " v " << m->version + << " from " << m->get_source() + << dendl; + + assert(!is_primary()); // we should be a replica or stray. + + PullOp pop; + pop.soid = soid; + pop.recovery_info = m->recovery_info; + pop.recovery_progress = m->recovery_progress; + + PushOp reply; + handle_pull(m->from, pop, &reply); + send_push_op_legacy( + m->get_priority(), + m->from, + reply); + + log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull); +} + +void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply) +{ + const hobject_t &soid = op.soid; + struct stat st; + int r = store->stat(coll, soid, &st); + if (r != 0) { + get_parent()->clog_error() << get_info().pgid << " " + << peer << " tried to pull " << soid + << " but got " << cpp_strerror(-r) << "\n"; + prep_push_op_blank(soid, reply); + } else { + ObjectRecoveryInfo &recovery_info = op.recovery_info; + ObjectRecoveryProgress &progress = op.recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t)-1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + recovery_info.copy_subset.clear(); + if (st.st_size) + recovery_info.copy_subset.insert(0, st.st_size); + assert(recovery_info.clone_subset.empty()); + } + + r = build_push_op(recovery_info, progress, 0, reply); + if (r < 0) + prep_push_op_blank(soid, reply); + } +} + +/** + * trim received data to remove what we don't want + * + * @param copy_subset intervals we want + * @param data_included intervals we got + * @param data_recieved data we got + * @param intervals_usable intervals we want to keep + * @param data_usable matching data we want to keep + */ +void ReplicatedBackend::trim_pushed_data( + const interval_set<uint64_t> ©_subset, + const interval_set<uint64_t> &intervals_received, + bufferlist data_received, + interval_set<uint64_t> *intervals_usable, + bufferlist *data_usable) +{ + if (intervals_received.subset_of(copy_subset)) { + *intervals_usable = intervals_received; + *data_usable = data_received; + return; + } + + intervals_usable->intersection_of(copy_subset, + intervals_received); + + uint64_t off = 0; + for (interval_set<uint64_t>::const_iterator p = intervals_received.begin(); + p != intervals_received.end(); + ++p) { + interval_set<uint64_t> x; + x.insert(p.get_start(), p.get_len()); + x.intersection_of(copy_subset); + for (interval_set<uint64_t>::const_iterator q = x.begin(); + q != x.end(); + ++q) { + bufferlist sub; + uint64_t data_off = off + (q.get_start() - p.get_start()); + sub.substr_of(data_received, data_off, q.get_len()); + data_usable->claim_append(sub); + } + off += p.get_len(); + } +} + +/** op_push + * NOTE: called from opqueue. + */ +void ReplicatedBackend::sub_op_push(OpRequestRef op) +{ + op->mark_started(); + MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req()); + + PushOp pop; + pop.soid = m->recovery_info.soid; + pop.version = m->version; + m->claim_data(pop.data); + pop.data_included.swap(m->data_included); + pop.omap_header.swap(m->omap_header); + pop.omap_entries.swap(m->omap_entries); + pop.attrset.swap(m->attrset); + pop.recovery_info = m->recovery_info; + pop.before_progress = m->current_progress; + pop.after_progress = m->recovery_progress; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + + if (is_primary()) { + PullOp resp; + RPGHandle *h = _open_recovery_op(); + list<hobject_t> to_continue; + bool more = handle_pull_response( + m->from, pop, &resp, + &to_continue, t); + if (more) { + send_pull_legacy( + m->get_priority(), + m->from, + resp.recovery_info, + resp.recovery_progress); + } else { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + op->get_req()->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new PG_RecoveryQueueAsync( + get_parent(), + get_parent()->bless_gencontext(c))); + } + run_recovery_op(h, op->get_req()->get_priority()); + } else { + PushReplyOp resp; + MOSDSubOpReply *reply = new MOSDSubOpReply( + m, parent->whoami_shard(), 0, + get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + reply->set_priority(m->get_priority()); + assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); + handle_push(m->from, pop, &resp, t); + t->register_on_complete(new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + } + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); + get_parent()->queue_transaction(t); + return; +} + +void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid) +{ + get_parent()->failed_push(from, soid); + pull_from_peer[from].erase(soid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); + pulling.erase(soid); +} + +int ReplicatedBackend::start_pushes( + const hobject_t &soid, + ObjectContextRef obc, + RPGHandle *h) +{ + int pushes = 0; + // who needs it? + assert(get_parent()->get_actingbackfill_shards().size() > 0); + for (set<pg_shard_t>::iterator i = + get_parent()->get_actingbackfill_shards().begin(); + i != get_parent()->get_actingbackfill_shards().end(); + ++i) { + if (*i == get_parent()->whoami_shard()) continue; + pg_shard_t peer = *i; + map<pg_shard_t, pg_missing_t>::const_iterator j = + get_parent()->get_shard_missing().find(peer); + assert(j != get_parent()->get_shard_missing().end()); + if (j->second.is_missing(soid)) { + ++pushes; + h->pushes[peer].push_back(PushOp()); + prep_push_to_replica(obc, soid, peer, + &(h->pushes[peer].back()) + ); + } + } + return pushes; +} diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 64b88eb0266..c748aa9c74e 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -90,35 +90,6 @@ PGLSFilter::~PGLSFilter() { } -static void log_subop_stats( - PerfCounters *logger, - OpRequestRef op, int subop) -{ - utime_t now = ceph_clock_now(g_ceph_context); - utime_t latency = now; - latency -= op->get_req()->get_recv_stamp(); - - - logger->inc(l_osd_sop); - logger->tinc(l_osd_sop_lat, latency); - logger->inc(subop); - - if (subop != l_osd_sop_pull) { - uint64_t inb = op->get_req()->get_data().length(); - logger->inc(l_osd_sop_inb, inb); - if (subop == l_osd_sop_w) { - logger->inc(l_osd_sop_w_inb, inb); - logger->tinc(l_osd_sop_w_lat, latency); - } else if (subop == l_osd_sop_push) { - logger->inc(l_osd_sop_push_inb, inb); - logger->tinc(l_osd_sop_push_lat, latency); - } else - assert("no support subop" == 0); - } else { - logger->tinc(l_osd_sop_pull_lat, latency); - } -} - struct OnReadComplete : public Context { ReplicatedPG *pg; ReplicatedPG::OpContext *opcontext; @@ -2605,150 +2576,6 @@ void ReplicatedPG::do_scan( } } -void ReplicatedBackend::_do_push(OpRequestRef op) -{ - MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PUSH); - pg_shard_t from = m->from; - - vector<PushReplyOp> replies; - ObjectStore::Transaction *t = new ObjectStore::Transaction; - for (vector<PushOp>::iterator i = m->pushes.begin(); - i != m->pushes.end(); - ++i) { - replies.push_back(PushReplyOp()); - handle_push(from, *i, &(replies.back()), t); - } - - MOSDPGPushReply *reply = new MOSDPGPushReply; - reply->from = get_parent()->whoami_shard(); - reply->set_priority(m->get_priority()); - reply->pgid = get_info().pgid; - reply->map_epoch = m->map_epoch; - reply->replies.swap(replies); - reply->compute_cost(cct); - - t->register_on_complete( - new PG_SendMessageOnConn( - get_parent(), reply, m->get_connection())); - - t->register_on_applied( - new ObjectStore::C_DeleteTransaction(t)); - get_parent()->queue_transaction(t); -} - -struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> { - ReplicatedBackend *bc; - list<hobject_t> to_continue; - int priority; - C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) - : bc(bc), priority(priority) {} - - void finish(ThreadPool::TPHandle &handle) { - ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); - for (list<hobject_t>::iterator i = - to_continue.begin(); - i != to_continue.end(); - ++i) { - map<hobject_t, ReplicatedBackend::PullInfo>::iterator j = - bc->pulling.find(*i); - assert(j != bc->pulling.end()); - if (!bc->start_pushes(*i, j->second.obc, h)) { - bc->get_parent()->on_global_recover( - *i); - } - bc->pulling.erase(*i); - handle.reset_tp_timeout(); - } - bc->run_recovery_op(h, priority); - } -}; - -void ReplicatedBackend::_do_pull_response(OpRequestRef op) -{ - MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PUSH); - pg_shard_t from = m->from; - - vector<PullOp> replies(1); - ObjectStore::Transaction *t = new ObjectStore::Transaction; - list<hobject_t> to_continue; - for (vector<PushOp>::iterator i = m->pushes.begin(); - i != m->pushes.end(); - ++i) { - bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t); - if (more) - replies.push_back(PullOp()); - } - if (!to_continue.empty()) { - C_ReplicatedBackend_OnPullComplete *c = - new C_ReplicatedBackend_OnPullComplete( - this, - m->get_priority()); - c->to_continue.swap(to_continue); - t->register_on_complete( - new PG_RecoveryQueueAsync( - get_parent(), - get_parent()->bless_gencontext(c))); - } - replies.erase(replies.end() - 1); - - if (replies.size()) { - MOSDPGPull *reply = new MOSDPGPull; - reply->from = parent->whoami_shard(); - reply->set_priority(m->get_priority()); - reply->pgid = get_info().pgid; - reply->map_epoch = m->map_epoch; - reply->pulls.swap(replies); - reply->compute_cost(cct); - - t->register_on_complete( - new PG_SendMessageOnConn( - get_parent(), reply, m->get_connection())); - } - - t->register_on_applied( - new ObjectStore::C_DeleteTransaction(t)); - get_parent()->queue_transaction(t); -} - -void ReplicatedBackend::do_pull(OpRequestRef op) -{ - MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PULL); - pg_shard_t from = m->from; - - map<pg_shard_t, vector<PushOp> > replies; - for (vector<PullOp>::iterator i = m->pulls.begin(); - i != m->pulls.end(); - ++i) { - replies[from].push_back(PushOp()); - handle_pull(from, *i, &(replies[from].back())); - } - send_pushes(m->get_priority(), replies); -} - -void ReplicatedBackend::do_push_reply(OpRequestRef op) -{ - MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY); - pg_shard_t from = m->from; - - vector<PushOp> replies(1); - for (vector<PushReplyOp>::iterator i = m->replies.begin(); - i != m->replies.end(); - ++i) { - bool more = handle_push_reply(from, *i, &(replies.back())); - if (more) - replies.push_back(PushOp()); - } - replies.erase(replies.end() - 1); - - map<pg_shard_t, vector<PushOp> > _replies; - _replies[from].swap(replies); - send_pushes(m->get_priority(), _replies); -} - void ReplicatedPG::do_backfill(OpRequestRef op) { MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req()); @@ -7608,136 +7435,6 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) repop->ctx->op_t = NULL; } -template<typename T, int MSGTYPE> -Message * ReplicatedBackend::generate_subop( - const hobject_t &soid, - const eversion_t &at_version, - ceph_tid_t tid, - osd_reqid_t reqid, - eversion_t pg_trim_to, - eversion_t pg_trim_rollback_to, - hobject_t new_temp_oid, - hobject_t discard_temp_oid, - const vector<pg_log_entry_t> &log_entries, - boost::optional<pg_hit_set_history_t> &hset_hist, - InProgressOp *op, - ObjectStore::Transaction *op_t, - pg_shard_t peer, - const pg_info_t &pinfo) -{ - int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; - assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP); - // forward the write/update/whatever - T *wr = new T( - reqid, parent->whoami_shard(), - spg_t(get_info().pgid.pgid, peer.shard), - soid, acks_wanted, - get_osdmap()->get_epoch(), - tid, at_version); - - // ship resulting transaction, log entries, and pg_stats - if (!parent->should_send_op(peer, soid)) { - dout(10) << "issue_repop shipping empty opt to osd." << peer - <<", object " << soid - << " beyond MAX(last_backfill_started " - << ", pinfo.last_backfill " - << pinfo.last_backfill << ")" << dendl; - ObjectStore::Transaction t; - t.set_use_tbl(op_t->get_use_tbl()); - ::encode(t, wr->get_data()); - } else { - ::encode(*op_t, wr->get_data()); - } - - ::encode(log_entries, wr->logbl); - - if (pinfo.is_incomplete()) - wr->pg_stats = pinfo.stats; // reflects backfill progress - else - wr->pg_stats = get_info().stats; - - wr->pg_trim_to = pg_trim_to; - wr->pg_trim_rollback_to = pg_trim_rollback_to; - - wr->new_temp_oid = new_temp_oid; - wr->discard_temp_oid = discard_temp_oid; - wr->updated_hit_set_history = hset_hist; - return wr; -} - -void ReplicatedBackend::issue_op( - const hobject_t &soid, - const eversion_t &at_version, - ceph_tid_t tid, - osd_reqid_t reqid, - eversion_t pg_trim_to, - eversion_t pg_trim_rollback_to, - hobject_t new_temp_oid, - hobject_t discard_temp_oid, - const vector<pg_log_entry_t> &log_entries, - boost::optional<pg_hit_set_history_t> &hset_hist, - InProgressOp *op, - ObjectStore::Transaction *op_t) -{ - - if (parent->get_actingbackfill_shards().size() > 1) { - ostringstream ss; - set<pg_shard_t> replicas = parent->get_actingbackfill_shards(); - replicas.erase(parent->whoami_shard()); - ss << "waiting for subops from " << replicas; - if (op->op) - op->op->mark_sub_op_sent(ss.str()); - } - for (set<pg_shard_t>::const_iterator i = - parent->get_actingbackfill_shards().begin(); - i != parent->get_actingbackfill_shards().end(); - ++i) { - if (*i == parent->whoami_shard()) continue; - pg_shard_t peer = *i; - const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; - - Message *wr; - uint64_t min_features = parent->min_peer_features(); - if (!(min_features & CEPH_FEATURE_OSD_REPOP)) { - dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl; - wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>( - soid, - at_version, - tid, - reqid, - pg_trim_to, - pg_trim_rollback_to, - new_temp_oid, - discard_temp_oid, - log_entries, - hset_hist, - op, - op_t, - peer, - pinfo); - } else { - wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>( - soid, - at_version, - tid, - reqid, - pg_trim_to, - pg_trim_rollback_to, - new_temp_oid, - discard_temp_oid, - log_entries, - hset_hist, - op, - op_t, - peer, - pinfo); - } - - get_parent()->send_message_osd_cluster( - peer.osd, wr, get_osdmap()->get_epoch()); - } -} - ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc, ceph_tid_t rep_tid) { @@ -8439,341 +8136,6 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) } } -// sub op modify -void ReplicatedBackend::sub_op_modify(OpRequestRef op) { - Message *m = op->get_req(); - int msg_type = m->get_type(); - if (msg_type == MSG_OSD_SUBOP) { - sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op); - } else if (msg_type == MSG_OSD_REPOP) { - sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op); - } else { - assert(0); - } -} - -template<typename T, int MSGTYPE> -void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op) -{ - T *m = static_cast<T *>(op->get_req()); - int msg_type = m->get_type(); - assert(MSGTYPE == msg_type); - assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP); - - const hobject_t& soid = m->poid; - - dout(10) << "sub_op_modify trans" - << " " << soid - << " v " << m->version - << (m->logbl.length() ? " (transaction)" : " (parallel exec") - << " " << m->logbl.length() - << dendl; - - // sanity checks - assert(m->map_epoch >= get_info().history.same_interval_since); - - // we better not be missing this. - assert(!parent->get_log().get_missing().is_missing(soid)); - - int ackerosd = m->get_source().num(); - - op->mark_started(); - - RepModifyRef rm(new RepModify); - rm->op = op; - rm->ackerosd = ackerosd; - rm->last_complete = get_info().last_complete; - rm->epoch_started = get_osdmap()->get_epoch(); - - assert(m->logbl.length()); - // shipped transaction and log entries - vector<pg_log_entry_t> log; - - bufferlist::iterator p = m->get_data().begin(); - ::decode(rm->opt, p); - rm->localt.set_use_tbl(rm->opt.get_use_tbl()); - - if (m->new_temp_oid != hobject_t()) { - dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; - add_temp_obj(m->new_temp_oid); - get_temp_coll(&rm->localt); - } - if (m->discard_temp_oid != hobject_t()) { - dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; - if (rm->opt.empty()) { - dout(10) << __func__ << ": removing object " << m->discard_temp_oid - << " since we won't get the transaction" << dendl; - rm->localt.remove(temp_coll, m->discard_temp_oid); - } - clear_temp_obj(m->discard_temp_oid); - } - - p = m->logbl.begin(); - ::decode(log, p); - rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - - bool update_snaps = false; - if (!rm->opt.empty()) { - // If the opt is non-empty, we infer we are before - // last_backfill (according to the primary, not our - // not-quite-accurate value), and should update the - // collections now. Otherwise, we do it later on push. - update_snaps = true; - } - parent->update_stats(m->pg_stats); - parent->log_operation( - log, - m->updated_hit_set_history, - m->pg_trim_to, - m->pg_trim_rollback_to, - update_snaps, - &(rm->localt)); - - rm->bytes_written = rm->opt.get_encoded_bytes(); - - op->mark_started(); - - rm->localt.append(rm->opt); - rm->localt.register_on_commit( - parent->bless_context( - new C_OSD_RepModifyCommit(this, rm))); - rm->localt.register_on_applied( - parent->bless_context( - new C_OSD_RepModifyApply(this, rm))); - parent->queue_transaction(&(rm->localt), op); - // op is cleaned up by oncommit/onapply when both are executed -} - -void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm) -{ - rm->op->mark_event("sub_op_applied"); - rm->applied = true; - - dout(10) << "sub_op_modify_applied on " << rm << " op " - << *rm->op->get_req() << dendl; - Message *m = rm->op->get_req(); - - Message *ack = NULL; - eversion_t version; - - if (m->get_type() == MSG_OSD_SUBOP) { - // doesn't have CLIENT SUBOP feature ,use Subop - MOSDSubOp *req = static_cast<MOSDSubOp*>(m); - version = req->version; - if (!rm->committed) - ack = new MOSDSubOpReply( - req, parent->whoami_shard(), - 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - } else if (m->get_type() == MSG_OSD_REPOP) { - MOSDRepOp *req = static_cast<MOSDRepOp*>(m); - version = req->version; - if (!rm->committed) - ack = new MOSDRepOpReply( - static_cast<MOSDRepOp*>(m), parent->whoami_shard(), - 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - } else { - assert(0); - } - - // send ack to acker only if we haven't sent a commit already - if (ack) { - ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! - get_parent()->send_message_osd_cluster( - rm->ackerosd, ack, get_osdmap()->get_epoch()); - } - - parent->op_applied(version); -} - -void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) -{ - rm->op->mark_commit_sent(); - rm->committed = true; - - // send commit. - dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() - << ", sending commit to osd." << rm->ackerosd - << dendl; - - assert(get_osdmap()->is_up(rm->ackerosd)); - get_parent()->update_last_complete_ondisk(rm->last_complete); - - Message *m = rm->op->get_req(); - Message *commit; - if (m->get_type() == MSG_OSD_SUBOP) { - // doesn't have CLIENT SUBOP feature ,use Subop - MOSDSubOpReply *reply = new MOSDSubOpReply( - static_cast<MOSDSubOp*>(m), - get_parent()->whoami_shard(), - 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); - reply->set_last_complete_ondisk(rm->last_complete); - commit = reply; - } else if (m->get_type() == MSG_OSD_REPOP) { - MOSDRepOpReply *reply = new MOSDRepOpReply( - static_cast<MOSDRepOp*>(m), - get_parent()->whoami_shard(), - 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); - reply->set_last_complete_ondisk(rm->last_complete); - commit = reply; - } - else { - assert(0); - } - - commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! - get_parent()->send_message_osd_cluster( - rm->ackerosd, commit, get_osdmap()->get_epoch()); - - log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w); -} - - -// =========================================================== - -void ReplicatedBackend::calc_head_subsets( - ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, - const pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set<uint64_t>& data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets) -{ - dout(10) << "calc_head_subsets " << head - << " clone_overlap " << snapset.clone_overlap << dendl; - - uint64_t size = obc->obs.oi.size; - if (size) - data_subset.insert(0, size); - - if (get_parent()->get_pool().allow_incomplete_clones()) { - dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; - return; - } - - if (!cct->_conf->osd_recover_clone_overlap) { - dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl; - return; - } - - - interval_set<uint64_t> cloning; - interval_set<uint64_t> prev; - if (size) - prev.insert(0, size); - - for (int j=snapset.clones.size()-1; j>=0; j--) { - hobject_t c = head; - c.snap = snapset.clones[j]; - prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); - if (!missing.is_missing(c) && c < last_backfill) { - dout(10) << "calc_head_subsets " << head << " has prev " << c - << " overlap " << prev << dendl; - clone_subsets[c] = prev; - cloning.union_of(prev); - break; - } - dout(10) << "calc_head_subsets " << head << " does not have prev " << c - << " overlap " << prev << dendl; - } - - - if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { - dout(10) << "skipping clone, too many holes" << dendl; - clone_subsets.clear(); - cloning.clear(); - } - - // what's left for us to push? - data_subset.subtract(cloning); - - dout(10) << "calc_head_subsets " << head - << " data_subset " << data_subset - << " clone_subsets " << clone_subsets << dendl; -} - -void ReplicatedBackend::calc_clone_subsets( - SnapSet& snapset, const hobject_t& soid, - const pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set<uint64_t>& data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets) -{ - dout(10) << "calc_clone_subsets " << soid - << " clone_overlap " << snapset.clone_overlap << dendl; - - uint64_t size = snapset.clone_size[soid.snap]; - if (size) - data_subset.insert(0, size); - - if (get_parent()->get_pool().allow_incomplete_clones()) { - dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; - return; - } - - if (!cct->_conf->osd_recover_clone_overlap) { - dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl; - return; - } - - unsigned i; - for (i=0; i < snapset.clones.size(); i++) - if (snapset.clones[i] == soid.snap) - break; - - // any overlap with next older clone? - interval_set<uint64_t> cloning; - interval_set<uint64_t> prev; - if (size) - prev.insert(0, size); - for (int j=i-1; j>=0; j--) { - hobject_t c = soid; - c.snap = snapset.clones[j]; - prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); - if (!missing.is_missing(c) && c < last_backfill) { - dout(10) << "calc_clone_subsets " << soid << " has prev " << c - << " overlap " << prev << dendl; - clone_subsets[c] = prev; - cloning.union_of(prev); - break; - } - dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c - << " overlap " << prev << dendl; - } - - // overlap with next newest? - interval_set<uint64_t> next; - if (size) - next.insert(0, size); - for (unsigned j=i+1; j<snapset.clones.size(); j++) { - hobject_t c = soid; - c.snap = snapset.clones[j]; - next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]); - if (!missing.is_missing(c) && c < last_backfill) { - dout(10) << "calc_clone_subsets " << soid << " has next " << c - << " overlap " << next << dendl; - clone_subsets[c] = next; - cloning.union_of(next); - break; - } - dout(10) << "calc_clone_subsets " << soid << " does not have next " << c - << " overlap " << next << dendl; - } - - if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { - dout(10) << "skipping clone, too many holes" << dendl; - clone_subsets.clear(); - cloning.clear(); - } - - - // what's left for us to push? - data_subset.subtract(cloning); - - dout(10) << "calc_clone_subsets " << soid - << " data_subset " << data_subset - << " clone_subsets " << clone_subsets << dendl; -} - - /** pull - request object from a peer */ @@ -8785,98 +8147,6 @@ void ReplicatedBackend::calc_clone_subsets( */ enum { PULL_NONE, PULL_OTHER, PULL_YES }; -void ReplicatedBackend::prepare_pull( - eversion_t v, - const hobject_t& soid, - ObjectContextRef headctx, - RPGHandle *h) -{ - assert(get_parent()->get_local_missing().missing.count(soid)); - eversion_t _v = get_parent()->get_local_missing().missing.find( - soid)->second.need; - assert(_v == v); - const map<hobject_t, set<pg_shard_t> > &missing_loc( - get_parent()->get_missing_loc_shards()); - const map<pg_shard_t, pg_missing_t > &peer_missing( - get_parent()->get_shard_missing()); - map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid); - assert(q != missing_loc.end()); - assert(!q->second.empty()); - - // pick a pullee - vector<pg_shard_t> shuffle(q->second.begin(), q->second.end()); - random_shuffle(shuffle.begin(), shuffle.end()); - vector<pg_shard_t>::iterator p = shuffle.begin(); - assert(get_osdmap()->is_up(p->osd)); - pg_shard_t fromshard = *p; - - dout(7) << "pull " << soid - << " v " << v - << " on osds " << *p - << " from osd." << fromshard - << dendl; - - assert(peer_missing.count(fromshard)); - const pg_missing_t &pmissing = peer_missing.find(fromshard)->second; - if (pmissing.is_missing(soid, v)) { - assert(pmissing.missing.find(soid)->second.have != v); - dout(10) << "pulling soid " << soid << " from osd " << fromshard - << " at version " << pmissing.missing.find(soid)->second.have - << " rather than at version " << v << dendl; - v = pmissing.missing.find(soid)->second.have; - assert(get_parent()->get_log().get_log().objects.count(soid) && - (get_parent()->get_log().get_log().objects.find(soid)->second->op == - pg_log_entry_t::LOST_REVERT) && - (get_parent()->get_log().get_log().objects.find( - soid)->second->reverting_to == - v)); - } - - ObjectRecoveryInfo recovery_info; - - if (soid.is_snap()) { - assert(!get_parent()->get_local_missing().is_missing( - soid.get_head()) || - !get_parent()->get_local_missing().is_missing( - soid.get_snapdir())); - assert(headctx); - // check snapset - SnapSetContext *ssc = headctx->ssc; - assert(ssc); - dout(10) << " snapset " << ssc->snapset << dendl; - calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(), - get_info().last_backfill, - recovery_info.copy_subset, - recovery_info.clone_subset); - // FIXME: this may overestimate if we are pulling multiple clones in parallel... - dout(10) << " pulling " << recovery_info << dendl; - } else { - // pulling head or unversioned object. - // always pull the whole thing. - recovery_info.copy_subset.insert(0, (uint64_t)-1); - recovery_info.size = ((uint64_t)-1); - } - - h->pulls[fromshard].push_back(PullOp()); - PullOp &op = h->pulls[fromshard].back(); - op.soid = soid; - - op.recovery_info = recovery_info; - op.recovery_info.soid = soid; - op.recovery_info.version = v; - op.recovery_progress.data_complete = false; - op.recovery_progress.omap_complete = false; - op.recovery_progress.data_recovered_to = 0; - op.recovery_progress.first = true; - - assert(!pulling.count(soid)); - pull_from_peer[fromshard].insert(soid); - PullInfo &pi = pulling[soid]; - pi.head_ctx = headctx; - pi.recovery_info = op.recovery_info; - pi.recovery_progress = op.recovery_progress; -} - int ReplicatedPG::recover_missing( const hobject_t &soid, eversion_t v, int priority, @@ -8966,694 +8236,6 @@ void ReplicatedPG::send_remove_op( osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch()); } -/* - * intelligently push an object to a replica. make use of existing - * clones/heads and dup data ranges where possible. - */ -void ReplicatedBackend::prep_push_to_replica( - ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, - PushOp *pop) -{ - const object_info_t& oi = obc->obs.oi; - uint64_t size = obc->obs.oi.size; - - dout(10) << __func__ << ": " << soid << " v" << oi.version - << " size " << size << " to osd." << peer << dendl; - - map<hobject_t, interval_set<uint64_t> > clone_subsets; - interval_set<uint64_t> data_subset; - - // are we doing a clone on the replica? - if (soid.snap && soid.snap < CEPH_NOSNAP) { - hobject_t head = soid; - head.snap = CEPH_NOSNAP; - - // try to base push off of clones that succeed/preceed poid - // we need the head (and current SnapSet) locally to do that. - if (get_parent()->get_local_missing().is_missing(head)) { - dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; - return prep_push(obc, soid, peer, pop); - } - hobject_t snapdir = head; - snapdir.snap = CEPH_SNAPDIR; - if (get_parent()->get_local_missing().is_missing(snapdir)) { - dout(15) << "push_to_replica missing snapdir " << snapdir - << ", pushing raw clone" << dendl; - return prep_push(obc, soid, peer, pop); - } - - SnapSetContext *ssc = obc->ssc; - assert(ssc); - dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; - map<pg_shard_t, pg_missing_t>::const_iterator pm = - get_parent()->get_shard_missing().find(peer); - assert(pm != get_parent()->get_shard_missing().end()); - map<pg_shard_t, pg_info_t>::const_iterator pi = - get_parent()->get_shard_info().find(peer); - assert(pi != get_parent()->get_shard_info().end()); - calc_clone_subsets(ssc->snapset, soid, - pm->second, - pi->second.last_backfill, - data_subset, clone_subsets); - } else if (soid.snap == CEPH_NOSNAP) { - // pushing head or unversioned object. - // base this on partially on replica's clones? - SnapSetContext *ssc = obc->ssc; - assert(ssc); - dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; - calc_head_subsets( - obc, - ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second, - get_parent()->get_shard_info().find(peer)->second.last_backfill, - data_subset, clone_subsets); - } - - prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop); -} - -void ReplicatedBackend::prep_push(ObjectContextRef obc, - const hobject_t& soid, pg_shard_t peer, - PushOp *pop) -{ - interval_set<uint64_t> data_subset; - if (obc->obs.oi.size) - data_subset.insert(0, obc->obs.oi.size); - map<hobject_t, interval_set<uint64_t> > clone_subsets; - - prep_push(obc, soid, peer, - obc->obs.oi.version, data_subset, clone_subsets, - pop); -} - -void ReplicatedBackend::prep_push( - ObjectContextRef obc, - const hobject_t& soid, pg_shard_t peer, - eversion_t version, - interval_set<uint64_t> &data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets, - PushOp *pop) -{ - get_parent()->begin_peer_recover(peer, soid); - // take note. - PushInfo &pi = pushing[soid][peer]; - pi.obc = obc; - pi.recovery_info.size = obc->obs.oi.size; - pi.recovery_info.copy_subset = data_subset; - pi.recovery_info.clone_subset = clone_subsets; - pi.recovery_info.soid = soid; - pi.recovery_info.oi = obc->obs.oi; - pi.recovery_info.version = version; - pi.recovery_progress.first = true; - pi.recovery_progress.data_recovered_to = 0; - pi.recovery_progress.data_complete = 0; - pi.recovery_progress.omap_complete = 0; - - ObjectRecoveryProgress new_progress; - int r = build_push_op(pi.recovery_info, - pi.recovery_progress, - &new_progress, - pop, - &(pi.stat)); - assert(r == 0); - pi.recovery_progress = new_progress; -} - -int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer, - const ObjectRecoveryInfo &recovery_info, - ObjectRecoveryProgress progress) -{ - // send op - ceph_tid_t tid = get_parent()->get_tid(); - osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); - - dout(10) << "send_pull_op " << recovery_info.soid << " " - << recovery_info.version - << " first=" << progress.first - << " data " << recovery_info.copy_subset - << " from osd." << peer - << " tid " << tid << dendl; - - MOSDSubOp *subop = new MOSDSubOp( - rid, parent->whoami_shard(), - get_info().pgid, recovery_info.soid, - CEPH_OSD_FLAG_ACK, - get_osdmap()->get_epoch(), tid, - recovery_info.version); - subop->set_priority(prio); - subop->ops = vector<OSDOp>(1); - subop->ops[0].op.op = CEPH_OSD_OP_PULL; - subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk; - subop->recovery_info = recovery_info; - subop->recovery_progress = progress; - - get_parent()->send_message_osd_cluster( - peer.osd, subop, get_osdmap()->get_epoch()); - - get_parent()->get_logger()->inc(l_osd_pull); - return 0; -} - -void ReplicatedBackend::submit_push_data( - ObjectRecoveryInfo &recovery_info, - bool first, - bool complete, - const interval_set<uint64_t> &intervals_included, - bufferlist data_included, - bufferlist omap_header, - map<string, bufferlist> &attrs, - map<string, bufferlist> &omap_entries, - ObjectStore::Transaction *t) -{ - coll_t target_coll; - if (first && complete) { - target_coll = coll; - } else { - dout(10) << __func__ << ": Creating oid " - << recovery_info.soid << " in the temp collection" << dendl; - add_temp_obj(recovery_info.soid); - target_coll = get_temp_coll(t); - } - - if (first) { - get_parent()->on_local_recover_start(recovery_info.soid, t); - t->remove(get_temp_coll(t), recovery_info.soid); - t->touch(target_coll, recovery_info.soid); - t->truncate(target_coll, recovery_info.soid, recovery_info.size); - t->omap_setheader(target_coll, recovery_info.soid, omap_header); - } - uint64_t off = 0; - for (interval_set<uint64_t>::const_iterator p = intervals_included.begin(); - p != intervals_included.end(); - ++p) { - bufferlist bit; - bit.substr_of(data_included, off, p.get_len()); - t->write(target_coll, recovery_info.soid, - p.get_start(), p.get_len(), bit); - off += p.get_len(); - } - - t->omap_setkeys(target_coll, recovery_info.soid, - omap_entries); - t->setattrs(target_coll, recovery_info.soid, - attrs); - - if (complete) { - if (!first) { - dout(10) << __func__ << ": Removing oid " - << recovery_info.soid << " from the temp collection" << dendl; - clear_temp_obj(recovery_info.soid); - t->collection_move(coll, target_coll, recovery_info.soid); - } - - submit_push_complete(recovery_info, t); - } -} - -void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info, - ObjectStore::Transaction *t) -{ - for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = - recovery_info.clone_subset.begin(); - p != recovery_info.clone_subset.end(); - ++p) { - for (interval_set<uint64_t>::const_iterator q = p->second.begin(); - q != p->second.end(); - ++q) { - dout(15) << " clone_range " << p->first << " " - << q.get_start() << "~" << q.get_len() << dendl; - t->clone_range(coll, p->first, recovery_info.soid, - q.get_start(), q.get_len(), q.get_start()); - } - } -} - -ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( - const ObjectRecoveryInfo& recovery_info, - SnapSetContext *ssc) -{ - if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) - return recovery_info; - ObjectRecoveryInfo new_info = recovery_info; - new_info.copy_subset.clear(); - new_info.clone_subset.clear(); - assert(ssc); - calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(), - get_info().last_backfill, - new_info.copy_subset, new_info.clone_subset); - return new_info; -} - -bool ReplicatedBackend::handle_pull_response( - pg_shard_t from, PushOp &pop, PullOp *response, - list<hobject_t> *to_continue, - ObjectStore::Transaction *t - ) -{ - interval_set<uint64_t> data_included = pop.data_included; - bufferlist data; - data.claim(pop.data); - dout(10) << "handle_pull_response " - << pop.recovery_info - << pop.after_progress - << " data.size() is " << data.length() - << " data_included: " << data_included - << dendl; - if (pop.version == eversion_t()) { - // replica doesn't have it! - _failed_push(from, pop.soid); - return false; - } - - hobject_t &hoid = pop.soid; - assert((data_included.empty() && data.length() == 0) || - (!data_included.empty() && data.length() > 0)); - - if (!pulling.count(hoid)) { - return false; - } - - PullInfo &pi = pulling[hoid]; - if (pi.recovery_info.size == (uint64_t(-1))) { - pi.recovery_info.size = pop.recovery_info.size; - pi.recovery_info.copy_subset.intersection_of( - pop.recovery_info.copy_subset); - } - - bool first = pi.recovery_progress.first; - if (first) { - pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset); - pi.recovery_info.oi = pi.obc->obs.oi; - pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc); - } - - - interval_set<uint64_t> usable_intervals; - bufferlist usable_data; - trim_pushed_data(pi.recovery_info.copy_subset, - data_included, - data, - &usable_intervals, - &usable_data); - data_included = usable_intervals; - data.claim(usable_data); - - - pi.recovery_progress = pop.after_progress; - - pi.stat.num_bytes_recovered += data.length(); - - dout(10) << "new recovery_info " << pi.recovery_info - << ", new progress " << pi.recovery_progress - << dendl; - - bool complete = pi.is_complete(); - - submit_push_data(pi.recovery_info, first, - complete, - data_included, data, - pop.omap_header, - pop.attrset, - pop.omap_entries, - t); - - pi.stat.num_keys_recovered += pop.omap_entries.size(); - - if (complete) { - to_continue->push_back(hoid); - pi.stat.num_objects_recovered++; - get_parent()->on_local_recover( - hoid, pi.stat, pi.recovery_info, pi.obc, t); - pull_from_peer[from].erase(hoid); - if (pull_from_peer[from].empty()) - pull_from_peer.erase(from); - return false; - } else { - response->soid = pop.soid; - response->recovery_info = pi.recovery_info; - response->recovery_progress = pi.recovery_progress; - return true; - } -} - -void ReplicatedBackend::handle_push( - pg_shard_t from, PushOp &pop, PushReplyOp *response, - ObjectStore::Transaction *t) -{ - dout(10) << "handle_push " - << pop.recovery_info - << pop.after_progress - << dendl; - bufferlist data; - data.claim(pop.data); - bool first = pop.before_progress.first; - bool complete = pop.after_progress.data_complete && - pop.after_progress.omap_complete; - - response->soid = pop.recovery_info.soid; - submit_push_data(pop.recovery_info, - first, - complete, - pop.data_included, - data, - pop.omap_header, - pop.attrset, - pop.omap_entries, - t); - - if (complete) - get_parent()->on_local_recover( - pop.recovery_info.soid, - object_stat_sum_t(), - pop.recovery_info, - ObjectContextRef(), // ok, is replica - t); -} - -void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes) -{ - for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin(); - i != pushes.end(); - ++i) { - ConnectionRef con = get_parent()->get_con_osd_cluster( - i->first.osd, - get_osdmap()->get_epoch()); - if (!con) - continue; - if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { - for (vector<PushOp>::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - dout(20) << __func__ << ": sending push (legacy) " << *j - << " to osd." << i->first << dendl; - send_push_op_legacy(prio, i->first, *j); - } - } else { - vector<PushOp>::iterator j = i->second.begin(); - while (j != i->second.end()) { - uint64_t cost = 0; - uint64_t pushes = 0; - MOSDPGPush *msg = new MOSDPGPush(); - msg->from = get_parent()->whoami_shard(); - msg->pgid = get_parent()->primary_spg_t(); - msg->map_epoch = get_osdmap()->get_epoch(); - msg->set_priority(prio); - for (; - (j != i->second.end() && - cost < cct->_conf->osd_max_push_cost && - pushes < cct->_conf->osd_max_push_objects) ; - ++j) { - dout(20) << __func__ << ": sending push " << *j - << " to osd." << i->first << dendl; - cost += j->cost(cct); - pushes += 1; - msg->pushes.push_back(*j); - } - msg->compute_cost(cct); - get_parent()->send_message_osd_cluster(msg, con); - } - } - } -} - -void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls) -{ - for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin(); - i != pulls.end(); - ++i) { - ConnectionRef con = get_parent()->get_con_osd_cluster( - i->first.osd, - get_osdmap()->get_epoch()); - if (!con) - continue; - if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { - for (vector<PullOp>::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - dout(20) << __func__ << ": sending pull (legacy) " << *j - << " to osd." << i->first << dendl; - send_pull_legacy( - prio, - i->first, - j->recovery_info, - j->recovery_progress); - } - } else { - dout(20) << __func__ << ": sending pulls " << i->second - << " to osd." << i->first << dendl; - MOSDPGPull *msg = new MOSDPGPull(); - msg->from = parent->whoami_shard(); - msg->set_priority(prio); - msg->pgid = get_parent()->primary_spg_t(); - msg->map_epoch = get_osdmap()->get_epoch(); - msg->pulls.swap(i->second); - msg->compute_cost(cct); - get_parent()->send_message_osd_cluster(msg, con); - } - } -} - -int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, - const ObjectRecoveryProgress &progress, - ObjectRecoveryProgress *out_progress, - PushOp *out_op, - object_stat_sum_t *stat) -{ - ObjectRecoveryProgress _new_progress; - if (!out_progress) - out_progress = &_new_progress; - ObjectRecoveryProgress &new_progress = *out_progress; - new_progress = progress; - - dout(7) << "send_push_op " << recovery_info.soid - << " v " << recovery_info.version - << " size " << recovery_info.size - << " recovery_info: " << recovery_info - << dendl; - - if (progress.first) { - store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header); - store->getattrs(coll, recovery_info.soid, out_op->attrset); - - // Debug - bufferlist bv = out_op->attrset[OI_ATTR]; - object_info_t oi(bv); - - if (oi.version != recovery_info.version) { - get_parent()->clog_error() << get_info().pgid << " push " - << recovery_info.soid << " v " - << recovery_info.version - << " failed because local copy is " - << oi.version << "\n"; - return -EINVAL; - } - - new_progress.first = false; - } - - uint64_t available = cct->_conf->osd_recovery_max_chunk; - if (!progress.omap_complete) { - ObjectMap::ObjectMapIterator iter = - store->get_omap_iterator(coll, - recovery_info.soid); - for (iter->lower_bound(progress.omap_recovered_to); - iter->valid(); - iter->next()) { - if (!out_op->omap_entries.empty() && - available <= (iter->key().size() + iter->value().length())) - break; - out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); - - if ((iter->key().size() + iter->value().length()) <= available) - available -= (iter->key().size() + iter->value().length()); - else - available = 0; - } - if (!iter->valid()) - new_progress.omap_complete = true; - else - new_progress.omap_recovered_to = iter->key(); - } - - if (available > 0) { - if (!recovery_info.copy_subset.empty()) { - interval_set<uint64_t> copy_subset = recovery_info.copy_subset; - bufferlist bl; - int r = store->fiemap(coll, recovery_info.soid, 0, - copy_subset.range_end(), bl); - if (r >= 0) { - interval_set<uint64_t> fiemap_included; - map<uint64_t, uint64_t> m; - bufferlist::iterator iter = bl.begin(); - ::decode(m, iter); - map<uint64_t, uint64_t>::iterator miter; - for (miter = m.begin(); miter != m.end(); ++miter) { - fiemap_included.insert(miter->first, miter->second); - } - - copy_subset.intersection_of(fiemap_included); - } - out_op->data_included.span_of(copy_subset, progress.data_recovered_to, - available); - if (out_op->data_included.empty()) // zero filled section, skip to end! - new_progress.data_recovered_to = recovery_info.copy_subset.range_end(); - else - new_progress.data_recovered_to = out_op->data_included.range_end(); - } - } else { - out_op->data_included.clear(); - } - - for (interval_set<uint64_t>::iterator p = out_op->data_included.begin(); - p != out_op->data_included.end(); - ++p) { - bufferlist bit; - store->read(coll, recovery_info.soid, - p.get_start(), p.get_len(), bit); - if (p.get_len() != bit.length()) { - dout(10) << " extent " << p.get_start() << "~" << p.get_len() - << " is actually " << p.get_start() << "~" << bit.length() - << dendl; - interval_set<uint64_t>::iterator save = p++; - if (bit.length() == 0) - out_op->data_included.erase(save); //Remove this empty interval - else - save.set_len(bit.length()); - // Remove any other intervals present - while (p != out_op->data_included.end()) { - interval_set<uint64_t>::iterator save = p++; - out_op->data_included.erase(save); - } - new_progress.data_complete = true; - out_op->data.claim_append(bit); - break; - } - out_op->data.claim_append(bit); - } - - if (new_progress.is_complete(recovery_info)) { - new_progress.data_complete = true; - if (stat) - stat->num_objects_recovered++; - } - - if (stat) { - stat->num_keys_recovered += out_op->omap_entries.size(); - stat->num_bytes_recovered += out_op->data.length(); - } - - get_parent()->get_logger()->inc(l_osd_push); - get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); - - // send - out_op->version = recovery_info.version; - out_op->soid = recovery_info.soid; - out_op->recovery_info = recovery_info; - out_op->after_progress = new_progress; - out_op->before_progress = progress; - return 0; -} - -int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop) -{ - ceph_tid_t tid = get_parent()->get_tid(); - osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); - MOSDSubOp *subop = new MOSDSubOp( - rid, parent->whoami_shard(), - spg_t(get_info().pgid.pgid, peer.shard), pop.soid, - 0, get_osdmap()->get_epoch(), - tid, pop.recovery_info.version); - subop->ops = vector<OSDOp>(1); - subop->ops[0].op.op = CEPH_OSD_OP_PUSH; - - subop->set_priority(prio); - subop->version = pop.version; - subop->ops[0].indata.claim(pop.data); - subop->data_included.swap(pop.data_included); - subop->omap_header.claim(pop.omap_header); - subop->omap_entries.swap(pop.omap_entries); - subop->attrset.swap(pop.attrset); - subop->recovery_info = pop.recovery_info; - subop->current_progress = pop.before_progress; - subop->recovery_progress = pop.after_progress; - - get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch()); - return 0; -} - -void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) -{ - op->recovery_info.version = eversion_t(); - op->version = eversion_t(); - op->soid = soid; -} - -void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) -{ - MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req()); - const hobject_t& soid = reply->get_poid(); - assert(reply->get_type() == MSG_OSD_SUBOPREPLY); - dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; - pg_shard_t peer = reply->from; - - op->mark_started(); - - PushReplyOp rop; - rop.soid = soid; - PushOp pop; - bool more = handle_push_reply(peer, rop, &pop); - if (more) - send_push_op_legacy(op->get_req()->get_priority(), peer, pop); -} - -bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply) -{ - const hobject_t &soid = op.soid; - if (pushing.count(soid) == 0) { - dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer - << ", or anybody else" - << dendl; - return false; - } else if (pushing[soid].count(peer) == 0) { - dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer - << dendl; - return false; - } else { - PushInfo *pi = &pushing[soid][peer]; - - if (!pi->recovery_progress.data_complete) { - dout(10) << " pushing more from, " - << pi->recovery_progress.data_recovered_to - << " of " << pi->recovery_info.copy_subset << dendl; - ObjectRecoveryProgress new_progress; - int r = build_push_op( - pi->recovery_info, - pi->recovery_progress, &new_progress, reply, - &(pi->stat)); - assert(r == 0); - pi->recovery_progress = new_progress; - return true; - } else { - // done! - get_parent()->on_peer_recover( - peer, soid, pi->recovery_info, - pi->stat); - - pushing[soid].erase(peer); - pi = NULL; - - - if (pushing[soid].empty()) { - get_parent()->on_global_recover(soid); - pushing.erase(soid); - } else { - dout(10) << "pushed " << soid << ", still waiting for push ack from " - << pushing[soid].size() << " others" << dendl; - } - return false; - } - } -} - void ReplicatedPG::finish_degraded_object(const hobject_t& oid) { dout(10) << "finish_degraded_object " << oid << dendl; @@ -9678,69 +8260,6 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid) } } -/** op_pull - * process request to pull an entire object. - * NOTE: called from opqueue. - */ -void ReplicatedBackend::sub_op_pull(OpRequestRef op) -{ - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); - assert(m->get_type() == MSG_OSD_SUBOP); - - op->mark_started(); - - const hobject_t soid = m->poid; - - dout(7) << "pull" << soid << " v " << m->version - << " from " << m->get_source() - << dendl; - - assert(!is_primary()); // we should be a replica or stray. - - PullOp pop; - pop.soid = soid; - pop.recovery_info = m->recovery_info; - pop.recovery_progress = m->recovery_progress; - - PushOp reply; - handle_pull(m->from, pop, &reply); - send_push_op_legacy( - m->get_priority(), - m->from, - reply); - - log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull); -} - -void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply) -{ - const hobject_t &soid = op.soid; - struct stat st; - int r = store->stat(coll, soid, &st); - if (r != 0) { - get_parent()->clog_error() << get_info().pgid << " " - << peer << " tried to pull " << soid - << " but got " << cpp_strerror(-r) << "\n"; - prep_push_op_blank(soid, reply); - } else { - ObjectRecoveryInfo &recovery_info = op.recovery_info; - ObjectRecoveryProgress &progress = op.recovery_progress; - if (progress.first && recovery_info.size == ((uint64_t)-1)) { - // Adjust size and copy_subset - recovery_info.size = st.st_size; - recovery_info.copy_subset.clear(); - if (st.st_size) - recovery_info.copy_subset.insert(0, st.st_size); - assert(recovery_info.clone_subset.empty()); - } - - r = build_push_op(recovery_info, progress, 0, reply); - if (r < 0) - prep_push_op_blank(soid, reply); - } -} - - void ReplicatedPG::_committed_pushed_object( epoch_t epoch, eversion_t last_complete) { @@ -9827,113 +8346,6 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) } -/** - * trim received data to remove what we don't want - * - * @param copy_subset intervals we want - * @param data_included intervals we got - * @param data_recieved data we got - * @param intervals_usable intervals we want to keep - * @param data_usable matching data we want to keep - */ -void ReplicatedBackend::trim_pushed_data( - const interval_set<uint64_t> ©_subset, - const interval_set<uint64_t> &intervals_received, - bufferlist data_received, - interval_set<uint64_t> *intervals_usable, - bufferlist *data_usable) -{ - if (intervals_received.subset_of(copy_subset)) { - *intervals_usable = intervals_received; - *data_usable = data_received; - return; - } - - intervals_usable->intersection_of(copy_subset, - intervals_received); - - uint64_t off = 0; - for (interval_set<uint64_t>::const_iterator p = intervals_received.begin(); - p != intervals_received.end(); - ++p) { - interval_set<uint64_t> x; - x.insert(p.get_start(), p.get_len()); - x.intersection_of(copy_subset); - for (interval_set<uint64_t>::const_iterator q = x.begin(); - q != x.end(); - ++q) { - bufferlist sub; - uint64_t data_off = off + (q.get_start() - p.get_start()); - sub.substr_of(data_received, data_off, q.get_len()); - data_usable->claim_append(sub); - } - off += p.get_len(); - } -} - -/** op_push - * NOTE: called from opqueue. - */ -void ReplicatedBackend::sub_op_push(OpRequestRef op) -{ - op->mark_started(); - MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req()); - - PushOp pop; - pop.soid = m->recovery_info.soid; - pop.version = m->version; - m->claim_data(pop.data); - pop.data_included.swap(m->data_included); - pop.omap_header.swap(m->omap_header); - pop.omap_entries.swap(m->omap_entries); - pop.attrset.swap(m->attrset); - pop.recovery_info = m->recovery_info; - pop.before_progress = m->current_progress; - pop.after_progress = m->recovery_progress; - ObjectStore::Transaction *t = new ObjectStore::Transaction; - - if (is_primary()) { - PullOp resp; - RPGHandle *h = _open_recovery_op(); - list<hobject_t> to_continue; - bool more = handle_pull_response( - m->from, pop, &resp, - &to_continue, t); - if (more) { - send_pull_legacy( - m->get_priority(), - m->from, - resp.recovery_info, - resp.recovery_progress); - } else { - C_ReplicatedBackend_OnPullComplete *c = - new C_ReplicatedBackend_OnPullComplete( - this, - op->get_req()->get_priority()); - c->to_continue.swap(to_continue); - t->register_on_complete( - new PG_RecoveryQueueAsync( - get_parent(), - get_parent()->bless_gencontext(c))); - } - run_recovery_op(h, op->get_req()->get_priority()); - } else { - PushReplyOp resp; - MOSDSubOpReply *reply = new MOSDSubOpReply( - m, parent->whoami_shard(), 0, - get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - reply->set_priority(m->get_priority()); - assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); - handle_push(m->from, pop, &resp, t); - t->register_on_complete(new PG_SendMessageOnConn( - get_parent(), reply, m->get_connection())); - } - t->register_on_applied( - new ObjectStore::C_DeleteTransaction(t)); - get_parent()->queue_transaction(t); - return; -} - void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid) { assert(recovering.count(soid)); @@ -9945,15 +8357,6 @@ void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid) finish_recovery_op(soid); // close out this attempt, } -void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid) -{ - get_parent()->failed_push(from, soid); - pull_from_peer[from].erase(soid); - if (pull_from_peer[from].empty()) - pull_from_peer.erase(from); - pulling.erase(soid); -} - void ReplicatedPG::sub_op_remove(OpRequestRef op) { MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); @@ -9968,7 +8371,6 @@ void ReplicatedPG::sub_op_remove(OpRequestRef op) assert(r == 0); } - eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid) { eversion_t v; @@ -10966,34 +9368,6 @@ int ReplicatedPG::prep_object_replica_pushes( return 1; } -int ReplicatedBackend::start_pushes( - const hobject_t &soid, - ObjectContextRef obc, - RPGHandle *h) -{ - int pushes = 0; - // who needs it? - assert(get_parent()->get_actingbackfill_shards().size() > 0); - for (set<pg_shard_t>::iterator i = - get_parent()->get_actingbackfill_shards().begin(); - i != get_parent()->get_actingbackfill_shards().end(); - ++i) { - if (*i == get_parent()->whoami_shard()) continue; - pg_shard_t peer = *i; - map<pg_shard_t, pg_missing_t>::const_iterator j = - get_parent()->get_shard_missing().find(peer); - assert(j != get_parent()->get_shard_missing().end()); - if (j->second.is_missing(soid)) { - ++pushes; - h->pushes[peer].push_back(PushOp()); - prep_push_to_replica(obc, soid, peer, - &(h->pushes[peer].back()) - ); - } - } - return pushes; -} - int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) { dout(10) << __func__ << "(" << max << ")" << dendl; |