diff options
Diffstat (limited to 'src/osd/ReplicatedBackend.cc')
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 1623 |
1 files changed, 1623 insertions, 0 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 680c27a5e8b..b86d4d1e744 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -30,6 +30,35 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { return *_dout << pgb->get_parent()->gen_dbg_prefix(); } +static void log_subop_stats( + PerfCounters *logger, + OpRequestRef op, int subop) +{ + utime_t now = ceph_clock_now(g_ceph_context); + utime_t latency = now; + latency -= op->get_req()->get_recv_stamp(); + + + logger->inc(l_osd_sop); + logger->tinc(l_osd_sop_lat, latency); + logger->inc(subop); + + if (subop != l_osd_sop_pull) { + uint64_t inb = op->get_req()->get_data().length(); + logger->inc(l_osd_sop_inb, inb); + if (subop == l_osd_sop_w) { + logger->inc(l_osd_sop_w_inb, inb); + logger->tinc(l_osd_sop_w_lat, latency); + } else if (subop == l_osd_sop_push) { + logger->inc(l_osd_sop_push_inb, inb); + logger->tinc(l_osd_sop_push_lat, latency); + } else + assert("no support subop" == 0); + } else { + logger->tinc(l_osd_sop_pull_lat, latency); + } +} + ReplicatedBackend::ReplicatedBackend( PGBackend::Listener *pg, coll_t coll, @@ -797,3 +826,1597 @@ void ReplicatedBackend::be_deep_scrub( o.omap_digest = oh.digest(); o.omap_digest_present = true; } + +void ReplicatedBackend::_do_push(OpRequestRef op) +{ + MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH); + pg_shard_t from = m->from; + + vector<PushReplyOp> replies; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + for (vector<PushOp>::iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + replies.push_back(PushReplyOp()); + handle_push(from, *i, &(replies.back()), t); + } + + MOSDPGPushReply *reply = new MOSDPGPushReply; + reply->from = get_parent()->whoami_shard(); + reply->set_priority(m->get_priority()); + reply->pgid = get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->replies.swap(replies); + reply->compute_cost(cct); + + t->register_on_complete( + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); + get_parent()->queue_transaction(t); +} + +struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> { + ReplicatedBackend *bc; + list<hobject_t> to_continue; + int priority; + C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) + : bc(bc), priority(priority) {} + + void finish(ThreadPool::TPHandle &handle) { + ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); + for (list<hobject_t>::iterator i = + to_continue.begin(); + i != to_continue.end(); + ++i) { + map<hobject_t, ReplicatedBackend::PullInfo>::iterator j = + bc->pulling.find(*i); + assert(j != bc->pulling.end()); + if (!bc->start_pushes(*i, j->second.obc, h)) { + bc->get_parent()->on_global_recover( + *i); + } + bc->pulling.erase(*i); + handle.reset_tp_timeout(); + } + bc->run_recovery_op(h, priority); + } +}; + +void ReplicatedBackend::_do_pull_response(OpRequestRef op) +{ + MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH); + pg_shard_t from = m->from; + + vector<PullOp> replies(1); + ObjectStore::Transaction *t = new ObjectStore::Transaction; + list<hobject_t> to_continue; + for (vector<PushOp>::iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t); + if (more) + replies.push_back(PullOp()); + } + if (!to_continue.empty()) { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + m->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new PG_RecoveryQueueAsync( + get_parent(), + get_parent()->bless_gencontext(c))); + } + replies.erase(replies.end() - 1); + + if (replies.size()) { + MOSDPGPull *reply = new MOSDPGPull; + reply->from = parent->whoami_shard(); + reply->set_priority(m->get_priority()); + reply->pgid = get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->pulls.swap(replies); + reply->compute_cost(cct); + + t->register_on_complete( + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + } + + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); + get_parent()->queue_transaction(t); +} + +void ReplicatedBackend::do_pull(OpRequestRef op) +{ + MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PULL); + pg_shard_t from = m->from; + + map<pg_shard_t, vector<PushOp> > replies; + for (vector<PullOp>::iterator i = m->pulls.begin(); + i != m->pulls.end(); + ++i) { + replies[from].push_back(PushOp()); + handle_pull(from, *i, &(replies[from].back())); + } + send_pushes(m->get_priority(), replies); +} + +void ReplicatedBackend::do_push_reply(OpRequestRef op) +{ + MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY); + pg_shard_t from = m->from; + + vector<PushOp> replies(1); + for (vector<PushReplyOp>::iterator i = m->replies.begin(); + i != m->replies.end(); + ++i) { + bool more = handle_push_reply(from, *i, &(replies.back())); + if (more) + replies.push_back(PushOp()); + } + replies.erase(replies.end() - 1); + + map<pg_shard_t, vector<PushOp> > _replies; + _replies[from].swap(replies); + send_pushes(m->get_priority(), _replies); +} + +template<typename T, int MSGTYPE> +Message * ReplicatedBackend::generate_subop( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_trim_rollback_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + const vector<pg_log_entry_t> &log_entries, + boost::optional<pg_hit_set_history_t> &hset_hist, + InProgressOp *op, + ObjectStore::Transaction *op_t, + pg_shard_t peer, + const pg_info_t &pinfo) +{ + int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP); + // forward the write/update/whatever + T *wr = new T( + reqid, parent->whoami_shard(), + spg_t(get_info().pgid.pgid, peer.shard), + soid, acks_wanted, + get_osdmap()->get_epoch(), + tid, at_version); + + // ship resulting transaction, log entries, and pg_stats + if (!parent->should_send_op(peer, soid)) { + dout(10) << "issue_repop shipping empty opt to osd." << peer + <<", object " << soid + << " beyond MAX(last_backfill_started " + << ", pinfo.last_backfill " + << pinfo.last_backfill << ")" << dendl; + ObjectStore::Transaction t; + t.set_use_tbl(op_t->get_use_tbl()); + ::encode(t, wr->get_data()); + } else { + ::encode(*op_t, wr->get_data()); + } + + ::encode(log_entries, wr->logbl); + + if (pinfo.is_incomplete()) + wr->pg_stats = pinfo.stats; // reflects backfill progress + else + wr->pg_stats = get_info().stats; + + wr->pg_trim_to = pg_trim_to; + wr->pg_trim_rollback_to = pg_trim_rollback_to; + + wr->new_temp_oid = new_temp_oid; + wr->discard_temp_oid = discard_temp_oid; + wr->updated_hit_set_history = hset_hist; + return wr; +} + +void ReplicatedBackend::issue_op( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_trim_rollback_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + const vector<pg_log_entry_t> &log_entries, + boost::optional<pg_hit_set_history_t> &hset_hist, + InProgressOp *op, + ObjectStore::Transaction *op_t) +{ + + if (parent->get_actingbackfill_shards().size() > 1) { + ostringstream ss; + set<pg_shard_t> replicas = parent->get_actingbackfill_shards(); + replicas.erase(parent->whoami_shard()); + ss << "waiting for subops from " << replicas; + if (op->op) + op->op->mark_sub_op_sent(ss.str()); + } + for (set<pg_shard_t>::const_iterator i = + parent->get_actingbackfill_shards().begin(); + i != parent->get_actingbackfill_shards().end(); + ++i) { + if (*i == parent->whoami_shard()) continue; + pg_shard_t peer = *i; + const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; + + Message *wr; + uint64_t min_features = parent->min_peer_features(); + if (!(min_features & CEPH_FEATURE_OSD_REPOP)) { + dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl; + wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_trim_rollback_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op, + op_t, + peer, + pinfo); + } else { + wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_trim_rollback_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op, + op_t, + peer, + pinfo); + } + + get_parent()->send_message_osd_cluster( + peer.osd, wr, get_osdmap()->get_epoch()); + } +} + +// sub op modify +void ReplicatedBackend::sub_op_modify(OpRequestRef op) { + Message *m = op->get_req(); + int msg_type = m->get_type(); + if (msg_type == MSG_OSD_SUBOP) { + sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op); + } else if (msg_type == MSG_OSD_REPOP) { + sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op); + } else { + assert(0); + } +} + +template<typename T, int MSGTYPE> +void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op) +{ + T *m = static_cast<T *>(op->get_req()); + int msg_type = m->get_type(); + assert(MSGTYPE == msg_type); + assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP); + + const hobject_t& soid = m->poid; + + dout(10) << "sub_op_modify trans" + << " " << soid + << " v " << m->version + << (m->logbl.length() ? " (transaction)" : " (parallel exec") + << " " << m->logbl.length() + << dendl; + + // sanity checks + assert(m->map_epoch >= get_info().history.same_interval_since); + + // we better not be missing this. + assert(!parent->get_log().get_missing().is_missing(soid)); + + int ackerosd = m->get_source().num(); + + op->mark_started(); + + RepModifyRef rm(new RepModify); + rm->op = op; + rm->ackerosd = ackerosd; + rm->last_complete = get_info().last_complete; + rm->epoch_started = get_osdmap()->get_epoch(); + + assert(m->logbl.length()); + // shipped transaction and log entries + vector<pg_log_entry_t> log; + + bufferlist::iterator p = m->get_data().begin(); + ::decode(rm->opt, p); + rm->localt.set_use_tbl(rm->opt.get_use_tbl()); + + if (m->new_temp_oid != hobject_t()) { + dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; + add_temp_obj(m->new_temp_oid); + get_temp_coll(&rm->localt); + } + if (m->discard_temp_oid != hobject_t()) { + dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; + if (rm->opt.empty()) { + dout(10) << __func__ << ": removing object " << m->discard_temp_oid + << " since we won't get the transaction" << dendl; + rm->localt.remove(temp_coll, m->discard_temp_oid); + } + clear_temp_obj(m->discard_temp_oid); + } + + p = m->logbl.begin(); + ::decode(log, p); + rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + + bool update_snaps = false; + if (!rm->opt.empty()) { + // If the opt is non-empty, we infer we are before + // last_backfill (according to the primary, not our + // not-quite-accurate value), and should update the + // collections now. Otherwise, we do it later on push. + update_snaps = true; + } + parent->update_stats(m->pg_stats); + parent->log_operation( + log, + m->updated_hit_set_history, + m->pg_trim_to, + m->pg_trim_rollback_to, + update_snaps, + &(rm->localt)); + + rm->bytes_written = rm->opt.get_encoded_bytes(); + + op->mark_started(); + + rm->localt.append(rm->opt); + rm->localt.register_on_commit( + parent->bless_context( + new C_OSD_RepModifyCommit(this, rm))); + rm->localt.register_on_applied( + parent->bless_context( + new C_OSD_RepModifyApply(this, rm))); + parent->queue_transaction(&(rm->localt), op); + // op is cleaned up by oncommit/onapply when both are executed +} + +void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm) +{ + rm->op->mark_event("sub_op_applied"); + rm->applied = true; + + dout(10) << "sub_op_modify_applied on " << rm << " op " + << *rm->op->get_req() << dendl; + Message *m = rm->op->get_req(); + + Message *ack = NULL; + eversion_t version; + + if (m->get_type() == MSG_OSD_SUBOP) { + // doesn't have CLIENT SUBOP feature ,use Subop + MOSDSubOp *req = static_cast<MOSDSubOp*>(m); + version = req->version; + if (!rm->committed) + ack = new MOSDSubOpReply( + req, parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + } else if (m->get_type() == MSG_OSD_REPOP) { + MOSDRepOp *req = static_cast<MOSDRepOp*>(m); + version = req->version; + if (!rm->committed) + ack = new MOSDRepOpReply( + static_cast<MOSDRepOp*>(m), parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + } else { + assert(0); + } + + // send ack to acker only if we haven't sent a commit already + if (ack) { + ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! + get_parent()->send_message_osd_cluster( + rm->ackerosd, ack, get_osdmap()->get_epoch()); + } + + parent->op_applied(version); +} + +void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) +{ + rm->op->mark_commit_sent(); + rm->committed = true; + + // send commit. + dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() + << ", sending commit to osd." << rm->ackerosd + << dendl; + + assert(get_osdmap()->is_up(rm->ackerosd)); + get_parent()->update_last_complete_ondisk(rm->last_complete); + + Message *m = rm->op->get_req(); + Message *commit; + if (m->get_type() == MSG_OSD_SUBOP) { + // doesn't have CLIENT SUBOP feature ,use Subop + MOSDSubOpReply *reply = new MOSDSubOpReply( + static_cast<MOSDSubOp*>(m), + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + commit = reply; + } else if (m->get_type() == MSG_OSD_REPOP) { + MOSDRepOpReply *reply = new MOSDRepOpReply( + static_cast<MOSDRepOp*>(m), + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + commit = reply; + } + else { + assert(0); + } + + commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! + get_parent()->send_message_osd_cluster( + rm->ackerosd, commit, get_osdmap()->get_epoch()); + + log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w); +} + + +// =========================================================== + +void ReplicatedBackend::calc_head_subsets( + ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets) +{ + dout(10) << "calc_head_subsets " << head + << " clone_overlap " << snapset.clone_overlap << dendl; + + uint64_t size = obc->obs.oi.size; + if (size) + data_subset.insert(0, size); + + if (get_parent()->get_pool().allow_incomplete_clones()) { + dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; + return; + } + + if (!cct->_conf->osd_recover_clone_overlap) { + dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl; + return; + } + + + interval_set<uint64_t> cloning; + interval_set<uint64_t> prev; + if (size) + prev.insert(0, size); + + for (int j=snapset.clones.size()-1; j>=0; j--) { + hobject_t c = head; + c.snap = snapset.clones[j]; + prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); + if (!missing.is_missing(c) && c < last_backfill) { + dout(10) << "calc_head_subsets " << head << " has prev " << c + << " overlap " << prev << dendl; + clone_subsets[c] = prev; + cloning.union_of(prev); + break; + } + dout(10) << "calc_head_subsets " << head << " does not have prev " << c + << " overlap " << prev << dendl; + } + + + if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { + dout(10) << "skipping clone, too many holes" << dendl; + clone_subsets.clear(); + cloning.clear(); + } + + // what's left for us to push? + data_subset.subtract(cloning); + + dout(10) << "calc_head_subsets " << head + << " data_subset " << data_subset + << " clone_subsets " << clone_subsets << dendl; +} + +void ReplicatedBackend::calc_clone_subsets( + SnapSet& snapset, const hobject_t& soid, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets) +{ + dout(10) << "calc_clone_subsets " << soid + << " clone_overlap " << snapset.clone_overlap << dendl; + + uint64_t size = snapset.clone_size[soid.snap]; + if (size) + data_subset.insert(0, size); + + if (get_parent()->get_pool().allow_incomplete_clones()) { + dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; + return; + } + + if (!cct->_conf->osd_recover_clone_overlap) { + dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl; + return; + } + + unsigned i; + for (i=0; i < snapset.clones.size(); i++) + if (snapset.clones[i] == soid.snap) + break; + + // any overlap with next older clone? + interval_set<uint64_t> cloning; + interval_set<uint64_t> prev; + if (size) + prev.insert(0, size); + for (int j=i-1; j>=0; j--) { + hobject_t c = soid; + c.snap = snapset.clones[j]; + prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); + if (!missing.is_missing(c) && c < last_backfill) { + dout(10) << "calc_clone_subsets " << soid << " has prev " << c + << " overlap " << prev << dendl; + clone_subsets[c] = prev; + cloning.union_of(prev); + break; + } + dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c + << " overlap " << prev << dendl; + } + + // overlap with next newest? + interval_set<uint64_t> next; + if (size) + next.insert(0, size); + for (unsigned j=i+1; j<snapset.clones.size(); j++) { + hobject_t c = soid; + c.snap = snapset.clones[j]; + next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]); + if (!missing.is_missing(c) && c < last_backfill) { + dout(10) << "calc_clone_subsets " << soid << " has next " << c + << " overlap " << next << dendl; + clone_subsets[c] = next; + cloning.union_of(next); + break; + } + dout(10) << "calc_clone_subsets " << soid << " does not have next " << c + << " overlap " << next << dendl; + } + + if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { + dout(10) << "skipping clone, too many holes" << dendl; + clone_subsets.clear(); + cloning.clear(); + } + + + // what's left for us to push? + data_subset.subtract(cloning); + + dout(10) << "calc_clone_subsets " << soid + << " data_subset " << data_subset + << " clone_subsets " << clone_subsets << dendl; +} + +void ReplicatedBackend::prepare_pull( + eversion_t v, + const hobject_t& soid, + ObjectContextRef headctx, + RPGHandle *h) +{ + assert(get_parent()->get_local_missing().missing.count(soid)); + eversion_t _v = get_parent()->get_local_missing().missing.find( + soid)->second.need; + assert(_v == v); + const map<hobject_t, set<pg_shard_t> > &missing_loc( + get_parent()->get_missing_loc_shards()); + const map<pg_shard_t, pg_missing_t > &peer_missing( + get_parent()->get_shard_missing()); + map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid); + assert(q != missing_loc.end()); + assert(!q->second.empty()); + + // pick a pullee + vector<pg_shard_t> shuffle(q->second.begin(), q->second.end()); + random_shuffle(shuffle.begin(), shuffle.end()); + vector<pg_shard_t>::iterator p = shuffle.begin(); + assert(get_osdmap()->is_up(p->osd)); + pg_shard_t fromshard = *p; + + dout(7) << "pull " << soid + << " v " << v + << " on osds " << *p + << " from osd." << fromshard + << dendl; + + assert(peer_missing.count(fromshard)); + const pg_missing_t &pmissing = peer_missing.find(fromshard)->second; + if (pmissing.is_missing(soid, v)) { + assert(pmissing.missing.find(soid)->second.have != v); + dout(10) << "pulling soid " << soid << " from osd " << fromshard + << " at version " << pmissing.missing.find(soid)->second.have + << " rather than at version " << v << dendl; + v = pmissing.missing.find(soid)->second.have; + assert(get_parent()->get_log().get_log().objects.count(soid) && + (get_parent()->get_log().get_log().objects.find(soid)->second->op == + pg_log_entry_t::LOST_REVERT) && + (get_parent()->get_log().get_log().objects.find( + soid)->second->reverting_to == + v)); + } + + ObjectRecoveryInfo recovery_info; + + if (soid.is_snap()) { + assert(!get_parent()->get_local_missing().is_missing( + soid.get_head()) || + !get_parent()->get_local_missing().is_missing( + soid.get_snapdir())); + assert(headctx); + // check snapset + SnapSetContext *ssc = headctx->ssc; + assert(ssc); + dout(10) << " snapset " << ssc->snapset << dendl; + calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(), + get_info().last_backfill, + recovery_info.copy_subset, + recovery_info.clone_subset); + // FIXME: this may overestimate if we are pulling multiple clones in parallel... + dout(10) << " pulling " << recovery_info << dendl; + } else { + // pulling head or unversioned object. + // always pull the whole thing. + recovery_info.copy_subset.insert(0, (uint64_t)-1); + recovery_info.size = ((uint64_t)-1); + } + + h->pulls[fromshard].push_back(PullOp()); + PullOp &op = h->pulls[fromshard].back(); + op.soid = soid; + + op.recovery_info = recovery_info; + op.recovery_info.soid = soid; + op.recovery_info.version = v; + op.recovery_progress.data_complete = false; + op.recovery_progress.omap_complete = false; + op.recovery_progress.data_recovered_to = 0; + op.recovery_progress.first = true; + + assert(!pulling.count(soid)); + pull_from_peer[fromshard].insert(soid); + PullInfo &pi = pulling[soid]; + pi.head_ctx = headctx; + pi.recovery_info = op.recovery_info; + pi.recovery_progress = op.recovery_progress; +} + +/* + * intelligently push an object to a replica. make use of existing + * clones/heads and dup data ranges where possible. + */ +void ReplicatedBackend::prep_push_to_replica( + ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, + PushOp *pop) +{ + const object_info_t& oi = obc->obs.oi; + uint64_t size = obc->obs.oi.size; + + dout(10) << __func__ << ": " << soid << " v" << oi.version + << " size " << size << " to osd." << peer << dendl; + + map<hobject_t, interval_set<uint64_t> > clone_subsets; + interval_set<uint64_t> data_subset; + + // are we doing a clone on the replica? + if (soid.snap && soid.snap < CEPH_NOSNAP) { + hobject_t head = soid; + head.snap = CEPH_NOSNAP; + + // try to base push off of clones that succeed/preceed poid + // we need the head (and current SnapSet) locally to do that. + if (get_parent()->get_local_missing().is_missing(head)) { + dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop); + } + hobject_t snapdir = head; + snapdir.snap = CEPH_SNAPDIR; + if (get_parent()->get_local_missing().is_missing(snapdir)) { + dout(15) << "push_to_replica missing snapdir " << snapdir + << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop); + } + + SnapSetContext *ssc = obc->ssc; + assert(ssc); + dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; + map<pg_shard_t, pg_missing_t>::const_iterator pm = + get_parent()->get_shard_missing().find(peer); + assert(pm != get_parent()->get_shard_missing().end()); + map<pg_shard_t, pg_info_t>::const_iterator pi = + get_parent()->get_shard_info().find(peer); + assert(pi != get_parent()->get_shard_info().end()); + calc_clone_subsets(ssc->snapset, soid, + pm->second, + pi->second.last_backfill, + data_subset, clone_subsets); + } else if (soid.snap == CEPH_NOSNAP) { + // pushing head or unversioned object. + // base this on partially on replica's clones? + SnapSetContext *ssc = obc->ssc; + assert(ssc); + dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; + calc_head_subsets( + obc, + ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second, + get_parent()->get_shard_info().find(peer)->second.last_backfill, + data_subset, clone_subsets); + } + + prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop); +} + +void ReplicatedBackend::prep_push(ObjectContextRef obc, + const hobject_t& soid, pg_shard_t peer, + PushOp *pop) +{ + interval_set<uint64_t> data_subset; + if (obc->obs.oi.size) + data_subset.insert(0, obc->obs.oi.size); + map<hobject_t, interval_set<uint64_t> > clone_subsets; + + prep_push(obc, soid, peer, + obc->obs.oi.version, data_subset, clone_subsets, + pop); +} + +void ReplicatedBackend::prep_push( + ObjectContextRef obc, + const hobject_t& soid, pg_shard_t peer, + eversion_t version, + interval_set<uint64_t> &data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets, + PushOp *pop) +{ + get_parent()->begin_peer_recover(peer, soid); + // take note. + PushInfo &pi = pushing[soid][peer]; + pi.obc = obc; + pi.recovery_info.size = obc->obs.oi.size; + pi.recovery_info.copy_subset = data_subset; + pi.recovery_info.clone_subset = clone_subsets; + pi.recovery_info.soid = soid; + pi.recovery_info.oi = obc->obs.oi; + pi.recovery_info.version = version; + pi.recovery_progress.first = true; + pi.recovery_progress.data_recovered_to = 0; + pi.recovery_progress.data_complete = 0; + pi.recovery_progress.omap_complete = 0; + + ObjectRecoveryProgress new_progress; + int r = build_push_op(pi.recovery_info, + pi.recovery_progress, + &new_progress, + pop, + &(pi.stat)); + assert(r == 0); + pi.recovery_progress = new_progress; +} + +int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer, + const ObjectRecoveryInfo &recovery_info, + ObjectRecoveryProgress progress) +{ + // send op + ceph_tid_t tid = get_parent()->get_tid(); + osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); + + dout(10) << "send_pull_op " << recovery_info.soid << " " + << recovery_info.version + << " first=" << progress.first + << " data " << recovery_info.copy_subset + << " from osd." << peer + << " tid " << tid << dendl; + + MOSDSubOp *subop = new MOSDSubOp( + rid, parent->whoami_shard(), + get_info().pgid, recovery_info.soid, + CEPH_OSD_FLAG_ACK, + get_osdmap()->get_epoch(), tid, + recovery_info.version); + subop->set_priority(prio); + subop->ops = vector<OSDOp>(1); + subop->ops[0].op.op = CEPH_OSD_OP_PULL; + subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk; + subop->recovery_info = recovery_info; + subop->recovery_progress = progress; + + get_parent()->send_message_osd_cluster( + peer.osd, subop, get_osdmap()->get_epoch()); + + get_parent()->get_logger()->inc(l_osd_pull); + return 0; +} + +void ReplicatedBackend::submit_push_data( + ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + const interval_set<uint64_t> &intervals_included, + bufferlist data_included, + bufferlist omap_header, + map<string, bufferlist> &attrs, + map<string, bufferlist> &omap_entries, + ObjectStore::Transaction *t) +{ + coll_t target_coll; + if (first && complete) { + target_coll = coll; + } else { + dout(10) << __func__ << ": Creating oid " + << recovery_info.soid << " in the temp collection" << dendl; + add_temp_obj(recovery_info.soid); + target_coll = get_temp_coll(t); + } + + if (first) { + get_parent()->on_local_recover_start(recovery_info.soid, t); + t->remove(get_temp_coll(t), recovery_info.soid); + t->touch(target_coll, recovery_info.soid); + t->truncate(target_coll, recovery_info.soid, recovery_info.size); + t->omap_setheader(target_coll, recovery_info.soid, omap_header); + } + uint64_t off = 0; + for (interval_set<uint64_t>::const_iterator p = intervals_included.begin(); + p != intervals_included.end(); + ++p) { + bufferlist bit; + bit.substr_of(data_included, off, p.get_len()); + t->write(target_coll, recovery_info.soid, + p.get_start(), p.get_len(), bit); + off += p.get_len(); + } + + t->omap_setkeys(target_coll, recovery_info.soid, + omap_entries); + t->setattrs(target_coll, recovery_info.soid, + attrs); + + if (complete) { + if (!first) { + dout(10) << __func__ << ": Removing oid " + << recovery_info.soid << " from the temp collection" << dendl; + clear_temp_obj(recovery_info.soid); + t->collection_move(coll, target_coll, recovery_info.soid); + } + + submit_push_complete(recovery_info, t); + } +} + +void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) +{ + for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = + recovery_info.clone_subset.begin(); + p != recovery_info.clone_subset.end(); + ++p) { + for (interval_set<uint64_t>::const_iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + dout(15) << " clone_range " << p->first << " " + << q.get_start() << "~" << q.get_len() << dendl; + t->clone_range(coll, p->first, recovery_info.soid, + q.get_start(), q.get_len(), q.get_start()); + } + } +} + +ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( + const ObjectRecoveryInfo& recovery_info, + SnapSetContext *ssc) +{ + if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) + return recovery_info; + ObjectRecoveryInfo new_info = recovery_info; + new_info.copy_subset.clear(); + new_info.clone_subset.clear(); + assert(ssc); + calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(), + get_info().last_backfill, + new_info.copy_subset, new_info.clone_subset); + return new_info; +} + +bool ReplicatedBackend::handle_pull_response( + pg_shard_t from, PushOp &pop, PullOp *response, + list<hobject_t> *to_continue, + ObjectStore::Transaction *t + ) +{ + interval_set<uint64_t> data_included = pop.data_included; + bufferlist data; + data.claim(pop.data); + dout(10) << "handle_pull_response " + << pop.recovery_info + << pop.after_progress + << " data.size() is " << data.length() + << " data_included: " << data_included + << dendl; + if (pop.version == eversion_t()) { + // replica doesn't have it! + _failed_push(from, pop.soid); + return false; + } + + hobject_t &hoid = pop.soid; + assert((data_included.empty() && data.length() == 0) || + (!data_included.empty() && data.length() > 0)); + + if (!pulling.count(hoid)) { + return false; + } + + PullInfo &pi = pulling[hoid]; + if (pi.recovery_info.size == (uint64_t(-1))) { + pi.recovery_info.size = pop.recovery_info.size; + pi.recovery_info.copy_subset.intersection_of( + pop.recovery_info.copy_subset); + } + + bool first = pi.recovery_progress.first; + if (first) { + pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset); + pi.recovery_info.oi = pi.obc->obs.oi; + pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc); + } + + + interval_set<uint64_t> usable_intervals; + bufferlist usable_data; + trim_pushed_data(pi.recovery_info.copy_subset, + data_included, + data, + &usable_intervals, + &usable_data); + data_included = usable_intervals; + data.claim(usable_data); + + + pi.recovery_progress = pop.after_progress; + + pi.stat.num_bytes_recovered += data.length(); + + dout(10) << "new recovery_info " << pi.recovery_info + << ", new progress " << pi.recovery_progress + << dendl; + + bool complete = pi.is_complete(); + + submit_push_data(pi.recovery_info, first, + complete, + data_included, data, + pop.omap_header, + pop.attrset, + pop.omap_entries, + t); + + pi.stat.num_keys_recovered += pop.omap_entries.size(); + + if (complete) { + to_continue->push_back(hoid); + pi.stat.num_objects_recovered++; + get_parent()->on_local_recover( + hoid, pi.stat, pi.recovery_info, pi.obc, t); + pull_from_peer[from].erase(hoid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); + return false; + } else { + response->soid = pop.soid; + response->recovery_info = pi.recovery_info; + response->recovery_progress = pi.recovery_progress; + return true; + } +} + +void ReplicatedBackend::handle_push( + pg_shard_t from, PushOp &pop, PushReplyOp *response, + ObjectStore::Transaction *t) +{ + dout(10) << "handle_push " + << pop.recovery_info + << pop.after_progress + << dendl; + bufferlist data; + data.claim(pop.data); + bool first = pop.before_progress.first; + bool complete = pop.after_progress.data_complete && + pop.after_progress.omap_complete; + + response->soid = pop.recovery_info.soid; + submit_push_data(pop.recovery_info, + first, + complete, + pop.data_included, + data, + pop.omap_header, + pop.attrset, + pop.omap_entries, + t); + + if (complete) + get_parent()->on_local_recover( + pop.recovery_info.soid, + object_stat_sum_t(), + pop.recovery_info, + ObjectContextRef(), // ok, is replica + t); +} + +void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes) +{ + for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin(); + i != pushes.end(); + ++i) { + ConnectionRef con = get_parent()->get_con_osd_cluster( + i->first.osd, + get_osdmap()->get_epoch()); + if (!con) + continue; + if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { + for (vector<PushOp>::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + dout(20) << __func__ << ": sending push (legacy) " << *j + << " to osd." << i->first << dendl; + send_push_op_legacy(prio, i->first, *j); + } + } else { + vector<PushOp>::iterator j = i->second.begin(); + while (j != i->second.end()) { + uint64_t cost = 0; + uint64_t pushes = 0; + MOSDPGPush *msg = new MOSDPGPush(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = get_parent()->primary_spg_t(); + msg->map_epoch = get_osdmap()->get_epoch(); + msg->set_priority(prio); + for (; + (j != i->second.end() && + cost < cct->_conf->osd_max_push_cost && + pushes < cct->_conf->osd_max_push_objects) ; + ++j) { + dout(20) << __func__ << ": sending push " << *j + << " to osd." << i->first << dendl; + cost += j->cost(cct); + pushes += 1; + msg->pushes.push_back(*j); + } + msg->compute_cost(cct); + get_parent()->send_message_osd_cluster(msg, con); + } + } + } +} + +void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls) +{ + for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin(); + i != pulls.end(); + ++i) { + ConnectionRef con = get_parent()->get_con_osd_cluster( + i->first.osd, + get_osdmap()->get_epoch()); + if (!con) + continue; + if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) { + for (vector<PullOp>::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + dout(20) << __func__ << ": sending pull (legacy) " << *j + << " to osd." << i->first << dendl; + send_pull_legacy( + prio, + i->first, + j->recovery_info, + j->recovery_progress); + } + } else { + dout(20) << __func__ << ": sending pulls " << i->second + << " to osd." << i->first << dendl; + MOSDPGPull *msg = new MOSDPGPull(); + msg->from = parent->whoami_shard(); + msg->set_priority(prio); + msg->pgid = get_parent()->primary_spg_t(); + msg->map_epoch = get_osdmap()->get_epoch(); + msg->pulls.swap(i->second); + msg->compute_cost(cct); + get_parent()->send_message_osd_cluster(msg, con); + } + } +} + +int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op, + object_stat_sum_t *stat) +{ + ObjectRecoveryProgress _new_progress; + if (!out_progress) + out_progress = &_new_progress; + ObjectRecoveryProgress &new_progress = *out_progress; + new_progress = progress; + + dout(7) << "send_push_op " << recovery_info.soid + << " v " << recovery_info.version + << " size " << recovery_info.size + << " recovery_info: " << recovery_info + << dendl; + + if (progress.first) { + store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header); + store->getattrs(coll, recovery_info.soid, out_op->attrset); + + // Debug + bufferlist bv = out_op->attrset[OI_ATTR]; + object_info_t oi(bv); + + if (oi.version != recovery_info.version) { + get_parent()->clog_error() << get_info().pgid << " push " + << recovery_info.soid << " v " + << recovery_info.version + << " failed because local copy is " + << oi.version << "\n"; + return -EINVAL; + } + + new_progress.first = false; + } + + uint64_t available = cct->_conf->osd_recovery_max_chunk; + if (!progress.omap_complete) { + ObjectMap::ObjectMapIterator iter = + store->get_omap_iterator(coll, + recovery_info.soid); + for (iter->lower_bound(progress.omap_recovered_to); + iter->valid(); + iter->next()) { + if (!out_op->omap_entries.empty() && + available <= (iter->key().size() + iter->value().length())) + break; + out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); + + if ((iter->key().size() + iter->value().length()) <= available) + available -= (iter->key().size() + iter->value().length()); + else + available = 0; + } + if (!iter->valid()) + new_progress.omap_complete = true; + else + new_progress.omap_recovered_to = iter->key(); + } + + if (available > 0) { + if (!recovery_info.copy_subset.empty()) { + interval_set<uint64_t> copy_subset = recovery_info.copy_subset; + bufferlist bl; + int r = store->fiemap(coll, recovery_info.soid, 0, + copy_subset.range_end(), bl); + if (r >= 0) { + interval_set<uint64_t> fiemap_included; + map<uint64_t, uint64_t> m; + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + map<uint64_t, uint64_t>::iterator miter; + for (miter = m.begin(); miter != m.end(); ++miter) { + fiemap_included.insert(miter->first, miter->second); + } + + copy_subset.intersection_of(fiemap_included); + } + out_op->data_included.span_of(copy_subset, progress.data_recovered_to, + available); + if (out_op->data_included.empty()) // zero filled section, skip to end! + new_progress.data_recovered_to = recovery_info.copy_subset.range_end(); + else + new_progress.data_recovered_to = out_op->data_included.range_end(); + } + } else { + out_op->data_included.clear(); + } + + for (interval_set<uint64_t>::iterator p = out_op->data_included.begin(); + p != out_op->data_included.end(); + ++p) { + bufferlist bit; + store->read(coll, recovery_info.soid, + p.get_start(), p.get_len(), bit); + if (p.get_len() != bit.length()) { + dout(10) << " extent " << p.get_start() << "~" << p.get_len() + << " is actually " << p.get_start() << "~" << bit.length() + << dendl; + interval_set<uint64_t>::iterator save = p++; + if (bit.length() == 0) + out_op->data_included.erase(save); //Remove this empty interval + else + save.set_len(bit.length()); + // Remove any other intervals present + while (p != out_op->data_included.end()) { + interval_set<uint64_t>::iterator save = p++; + out_op->data_included.erase(save); + } + new_progress.data_complete = true; + out_op->data.claim_append(bit); + break; + } + out_op->data.claim_append(bit); + } + + if (new_progress.is_complete(recovery_info)) { + new_progress.data_complete = true; + if (stat) + stat->num_objects_recovered++; + } + + if (stat) { + stat->num_keys_recovered += out_op->omap_entries.size(); + stat->num_bytes_recovered += out_op->data.length(); + } + + get_parent()->get_logger()->inc(l_osd_push); + get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); + + // send + out_op->version = recovery_info.version; + out_op->soid = recovery_info.soid; + out_op->recovery_info = recovery_info; + out_op->after_progress = new_progress; + out_op->before_progress = progress; + return 0; +} + +int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop) +{ + ceph_tid_t tid = get_parent()->get_tid(); + osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); + MOSDSubOp *subop = new MOSDSubOp( + rid, parent->whoami_shard(), + spg_t(get_info().pgid.pgid, peer.shard), pop.soid, + 0, get_osdmap()->get_epoch(), + tid, pop.recovery_info.version); + subop->ops = vector<OSDOp>(1); + subop->ops[0].op.op = CEPH_OSD_OP_PUSH; + + subop->set_priority(prio); + subop->version = pop.version; + subop->ops[0].indata.claim(pop.data); + subop->data_included.swap(pop.data_included); + subop->omap_header.claim(pop.omap_header); + subop->omap_entries.swap(pop.omap_entries); + subop->attrset.swap(pop.attrset); + subop->recovery_info = pop.recovery_info; + subop->current_progress = pop.before_progress; + subop->recovery_progress = pop.after_progress; + + get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch()); + return 0; +} + +void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) +{ + op->recovery_info.version = eversion_t(); + op->version = eversion_t(); + op->soid = soid; +} + +void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) +{ + MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req()); + const hobject_t& soid = reply->get_poid(); + assert(reply->get_type() == MSG_OSD_SUBOPREPLY); + dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; + pg_shard_t peer = reply->from; + + op->mark_started(); + + PushReplyOp rop; + rop.soid = soid; + PushOp pop; + bool more = handle_push_reply(peer, rop, &pop); + if (more) + send_push_op_legacy(op->get_req()->get_priority(), peer, pop); +} + +bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply) +{ + const hobject_t &soid = op.soid; + if (pushing.count(soid) == 0) { + dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer + << ", or anybody else" + << dendl; + return false; + } else if (pushing[soid].count(peer) == 0) { + dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer + << dendl; + return false; + } else { + PushInfo *pi = &pushing[soid][peer]; + + if (!pi->recovery_progress.data_complete) { + dout(10) << " pushing more from, " + << pi->recovery_progress.data_recovered_to + << " of " << pi->recovery_info.copy_subset << dendl; + ObjectRecoveryProgress new_progress; + int r = build_push_op( + pi->recovery_info, + pi->recovery_progress, &new_progress, reply, + &(pi->stat)); + assert(r == 0); + pi->recovery_progress = new_progress; + return true; + } else { + // done! + get_parent()->on_peer_recover( + peer, soid, pi->recovery_info, + pi->stat); + + pushing[soid].erase(peer); + pi = NULL; + + + if (pushing[soid].empty()) { + get_parent()->on_global_recover(soid); + pushing.erase(soid); + } else { + dout(10) << "pushed " << soid << ", still waiting for push ack from " + << pushing[soid].size() << " others" << dendl; + } + return false; + } + } +} + +/** op_pull + * process request to pull an entire object. + * NOTE: called from opqueue. + */ +void ReplicatedBackend::sub_op_pull(OpRequestRef op) +{ + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); + assert(m->get_type() == MSG_OSD_SUBOP); + + op->mark_started(); + + const hobject_t soid = m->poid; + + dout(7) << "pull" << soid << " v " << m->version + << " from " << m->get_source() + << dendl; + + assert(!is_primary()); // we should be a replica or stray. + + PullOp pop; + pop.soid = soid; + pop.recovery_info = m->recovery_info; + pop.recovery_progress = m->recovery_progress; + + PushOp reply; + handle_pull(m->from, pop, &reply); + send_push_op_legacy( + m->get_priority(), + m->from, + reply); + + log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull); +} + +void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply) +{ + const hobject_t &soid = op.soid; + struct stat st; + int r = store->stat(coll, soid, &st); + if (r != 0) { + get_parent()->clog_error() << get_info().pgid << " " + << peer << " tried to pull " << soid + << " but got " << cpp_strerror(-r) << "\n"; + prep_push_op_blank(soid, reply); + } else { + ObjectRecoveryInfo &recovery_info = op.recovery_info; + ObjectRecoveryProgress &progress = op.recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t)-1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + recovery_info.copy_subset.clear(); + if (st.st_size) + recovery_info.copy_subset.insert(0, st.st_size); + assert(recovery_info.clone_subset.empty()); + } + + r = build_push_op(recovery_info, progress, 0, reply); + if (r < 0) + prep_push_op_blank(soid, reply); + } +} + +/** + * trim received data to remove what we don't want + * + * @param copy_subset intervals we want + * @param data_included intervals we got + * @param data_recieved data we got + * @param intervals_usable intervals we want to keep + * @param data_usable matching data we want to keep + */ +void ReplicatedBackend::trim_pushed_data( + const interval_set<uint64_t> ©_subset, + const interval_set<uint64_t> &intervals_received, + bufferlist data_received, + interval_set<uint64_t> *intervals_usable, + bufferlist *data_usable) +{ + if (intervals_received.subset_of(copy_subset)) { + *intervals_usable = intervals_received; + *data_usable = data_received; + return; + } + + intervals_usable->intersection_of(copy_subset, + intervals_received); + + uint64_t off = 0; + for (interval_set<uint64_t>::const_iterator p = intervals_received.begin(); + p != intervals_received.end(); + ++p) { + interval_set<uint64_t> x; + x.insert(p.get_start(), p.get_len()); + x.intersection_of(copy_subset); + for (interval_set<uint64_t>::const_iterator q = x.begin(); + q != x.end(); + ++q) { + bufferlist sub; + uint64_t data_off = off + (q.get_start() - p.get_start()); + sub.substr_of(data_received, data_off, q.get_len()); + data_usable->claim_append(sub); + } + off += p.get_len(); + } +} + +/** op_push + * NOTE: called from opqueue. + */ +void ReplicatedBackend::sub_op_push(OpRequestRef op) +{ + op->mark_started(); + MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req()); + + PushOp pop; + pop.soid = m->recovery_info.soid; + pop.version = m->version; + m->claim_data(pop.data); + pop.data_included.swap(m->data_included); + pop.omap_header.swap(m->omap_header); + pop.omap_entries.swap(m->omap_entries); + pop.attrset.swap(m->attrset); + pop.recovery_info = m->recovery_info; + pop.before_progress = m->current_progress; + pop.after_progress = m->recovery_progress; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + + if (is_primary()) { + PullOp resp; + RPGHandle *h = _open_recovery_op(); + list<hobject_t> to_continue; + bool more = handle_pull_response( + m->from, pop, &resp, + &to_continue, t); + if (more) { + send_pull_legacy( + m->get_priority(), + m->from, + resp.recovery_info, + resp.recovery_progress); + } else { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + op->get_req()->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new PG_RecoveryQueueAsync( + get_parent(), + get_parent()->bless_gencontext(c))); + } + run_recovery_op(h, op->get_req()->get_priority()); + } else { + PushReplyOp resp; + MOSDSubOpReply *reply = new MOSDSubOpReply( + m, parent->whoami_shard(), 0, + get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + reply->set_priority(m->get_priority()); + assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); + handle_push(m->from, pop, &resp, t); + t->register_on_complete(new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + } + t->register_on_applied( + new ObjectStore::C_DeleteTransaction(t)); + get_parent()->queue_transaction(t); + return; +} + +void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid) +{ + get_parent()->failed_push(from, soid); + pull_from_peer[from].erase(soid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); + pulling.erase(soid); +} + +int ReplicatedBackend::start_pushes( + const hobject_t &soid, + ObjectContextRef obc, + RPGHandle *h) +{ + int pushes = 0; + // who needs it? + assert(get_parent()->get_actingbackfill_shards().size() > 0); + for (set<pg_shard_t>::iterator i = + get_parent()->get_actingbackfill_shards().begin(); + i != get_parent()->get_actingbackfill_shards().end(); + ++i) { + if (*i == get_parent()->whoami_shard()) continue; + pg_shard_t peer = *i; + map<pg_shard_t, pg_missing_t>::const_iterator j = + get_parent()->get_shard_missing().find(peer); + assert(j != get_parent()->get_shard_missing().end()); + if (j->second.is_missing(soid)) { + ++pushes; + h->pushes[peer].push_back(PushOp()); + prep_push_to_replica(obc, soid, peer, + &(h->pushes[peer].back()) + ); + } + } + return pushes; +} |