diff options
author | Sage Weil <sage@inktank.com> | 2013-08-28 15:04:16 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-09-03 15:48:30 -0700 |
commit | ed6807919897e92144ac798b17922784f53d4608 (patch) | |
tree | 31c8f7d3d62389f9bf78d517cf80313c3f81cdb1 | |
parent | 3a8adf53143a0841b4971d68d26f26ca274e902b (diff) | |
download | ceph-ed6807919897e92144ac798b17922784f53d4608.tar.gz |
osd: initial COPY_FROM (not viable for large objects)
Initial pass at COPY_FROM implementation. This uses COPY_GET to read an
object from another OSD and write it locally. It chunks the read but
accumulates it all in-memory and commits it at once, so it is only suitable
for smaller objects.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 219 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 48 | ||||
-rw-r--r-- | src/osd/osd_types.h | 6 | ||||
-rw-r--r-- | src/test/librados/misc.cc | 44 |
5 files changed, 305 insertions, 13 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 328f7f4b94d..2fa72d4ce0f 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -444,6 +444,7 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) OPTION(osd_recovery_max_active, OPT_INT, 15) OPTION(osd_recovery_max_single_start, OPT_INT, 5) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk +OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20) // max size of a COPYFROM chunk OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message OPTION(osd_max_push_objects, OPT_U64, 10) // max objects in single push op diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0027edda077..2c96180b13a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1000,6 +1000,11 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) p->second->ondisk_read_unlock(); } + if (result == -EINPROGRESS) { + // come back later. + return; + } + if (result == -EAGAIN) { // clean up after the ctx delete ctx; @@ -3386,6 +3391,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) if (result < 0) break; cursor.attr_complete = true; + dout(20) << " got attrs" << dendl; } ::encode(out_attrs, osd_op.outdata); @@ -3395,15 +3401,17 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) bufferlist bl; if (left > 0 && !cursor.data_complete) { if (cursor.data_offset < oi.size) { - result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl); + result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl); if (result < 0) return result; assert(result <= left); left -= result; cursor.data_offset += result; } - if (cursor.data_offset == oi.size) + if (cursor.data_offset == oi.size) { cursor.data_complete = true; + dout(20) << " got data" << dendl; + } } ::encode(bl, osd_op.outdata); @@ -3423,15 +3431,73 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) cursor.omap_offset = iter->key(); } else { cursor.omap_complete = true; + dout(20) << " got omap" << dendl; } } ::encode(out_omap, osd_op.outdata); + dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl; ::encode(cursor, osd_op.outdata); result = 0; } break; + case CEPH_OSD_OP_COPY_FROM: + ++ctx->num_write; + { + object_t src_name; + object_locator_t src_oloc; + snapid_t src_snapid = (uint64_t)op.copy_from.snapid; + version_t src_version = op.copy_from.src_version; + try { + ::decode(src_name, bp); + ::decode(src_oloc, bp); + } + catch (buffer::error& e) { + result = -EINVAL; + goto fail; + } + pg_t raw_pg; + get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg); + hobject_t src(src_name, src_oloc.key, src_snapid, + raw_pg.ps(), raw_pg.pool(), + src_oloc.nspace); + if (!ctx->copy_op) { + // start + result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op); + if (result < 0) + goto fail; + result = -EINPROGRESS; + } else { + // finish + CopyOpRef cop = ctx->copy_op; + + if (!obs.exists) { + ctx->delta_stats.num_objects++; + obs.exists = true; + } else { + t.remove(coll, soid); + } + t.write(coll, soid, 0, cop->data.length(), cop->data); + for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p) + t.setattr(coll, soid, string("_") + p->first, p->second); + t.omap_setkeys(coll, soid, cop->omap); + + interval_set<uint64_t> ch; + if (oi.size > 0) + ch.insert(0, oi.size); + ctx->modified_ranges.union_of(ch); + + if (cop->data.length() != oi.size) { + ctx->delta_stats.num_bytes -= oi.size; + oi.size = cop->data.length(); + ctx->delta_stats.num_bytes += oi.size; + } + ctx->delta_stats.num_wr++; + ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10); + } + } + break; default: dout(1) << "unrecognized osd op " << op.op @@ -4013,6 +4079,152 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) return result; } +// ======================================================================== +// copyfrom + +struct C_Copyfrom : public Context { + ReplicatedPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + tid_t tid; + C_Copyfrom(ReplicatedPG *p, hobject_t o, epoch_t lpr) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0) + {} + void finish(int r) { + pg->lock(); + if (last_peering_reset == pg->get_last_peering_reset()) { + pg->process_copy_chunk(oid, tid, r); + } + pg->unlock(); + } +}; + +int ReplicatedPG::start_copy(OpContext *ctx, + hobject_t src, object_locator_t oloc, version_t version, + CopyOpRef *pcop) +{ + const hobject_t& dest = ctx->obs->oi.soid; + dout(10) << __func__ << " " << dest << " ctx " << ctx + << " from " << src << " " << oloc << " v" << version + << dendl; + + // cancel a previous in-progress copy? + if (copy_ops.count(dest)) { + // FIXME: if the src etc match, we could avoid restarting from the + // beginning. + CopyOpRef cop = copy_ops[dest]; + cancel_copy(cop); + } + + CopyOpRef cop(new CopyOp(ctx, src, oloc, version)); + copy_ops[dest] = cop; + ctx->copy_op = cop; + ++ctx->obc->copyfrom_readside; + + _copy_some(ctx, cop); + + return 0; +} + +void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop) +{ + dout(10) << __func__ << " " << ctx << " " << cop << dendl; + ObjectOperation op; + op.assert_version(cop->version); + op.copy_get(&cop->cursor, g_conf->osd_copyfrom_max_chunk, + &cop->size, &cop->mtime, &cop->attrs, + &cop->data, &cop->omap, + &cop->rval); + + C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid, + get_last_peering_reset()); + osd->objecter_lock.Lock(); + tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op, + cop->src.snap, NULL, 0, + new C_OnFinisher(fin, + &osd->objecter_finisher), + NULL); + fin->tid = tid; + cop->objecter_tid = tid; + osd->objecter_lock.Unlock(); +} + +void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) +{ + dout(10) << __func__ << " tid " << tid << " " << cpp_strerror(r) << dendl; + map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid); + if (p == copy_ops.end()) { + dout(10) << __func__ << " no copy_op found" << dendl; + return; + } + CopyOpRef cop = p->second; + if (tid != cop->objecter_tid) { + dout(10) << __func__ << " tid " << tid << " != cop " << cop + << " tid " << cop->objecter_tid << dendl; + return; + } + OpContext *ctx = cop->ctx; + cop->objecter_tid = 0; + if (r < 0) { + copy_ops.erase(ctx->obc->obs.oi.soid); + --ctx->obc->copyfrom_readside; + reply_ctx(ctx, r); + return; + } + assert(cop->rval >= 0); + + // FIXME: this is accumulating the entire object in memory. + + if (!cop->cursor.is_complete()) { + dout(10) << __func__ << " fetching more" << dendl; + _copy_some(ctx, cop); + return; + } + + dout(20) << __func__ << " complete; committing" << dendl; + execute_ctx(ctx); + + copy_ops.erase(ctx->obc->obs.oi.soid); + --ctx->obc->copyfrom_readside; + ctx->copy_op.reset(); +} + +void ReplicatedPG::cancel_copy(CopyOpRef cop) +{ + OpContext *ctx = cop->ctx; + dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx + << " from " << cop->src << " " << cop->oloc << " v" << cop->version + << dendl; + + // cancel objecter op, if we can + if (cop->objecter_tid) { + Mutex::Locker l(osd->objecter_lock); + osd->objecter->op_cancel(cop->objecter_tid); + } + + copy_ops.erase(ctx->obc->obs.oi.soid); + --ctx->obc->copyfrom_readside; + ctx->copy_op.reset(); + + delete ctx; +} + +void ReplicatedPG::requeue_cancel_copy_ops(bool requeue) +{ + dout(10) << __func__ << dendl; + for (map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin(); + p != copy_ops.end(); + copy_ops.erase(p++)) { + // requeue initiating copy *and* any subsequent waiters + CopyOpRef cop = p->second; + if (requeue) { + cop->waiting.push_front(cop->ctx->op); + requeue_ops(cop->waiting); + } + cancel_copy(cop); + } +} // ======================================================================== @@ -6736,6 +6948,7 @@ void ReplicatedPG::on_shutdown() deleting = true; unreg_next_scrub(); + requeue_cancel_copy_ops(false); apply_and_flush_repops(false); context_registry_on_change(); @@ -6786,6 +6999,8 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) context_registry_on_change(); + requeue_cancel_copy_ops(is_primary()); + // requeue object waiters if (is_primary()) { requeue_ops(waiting_for_backfill_pos); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5dc7d882a8b..254b5842ffc 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -83,7 +83,40 @@ public: class ReplicatedPG : public PG { friend class OSD; friend class Watch; -public: + +public: + + /* + * state associated with a copy operation + */ + struct OpContext; + + struct CopyOp { + OpContext *ctx; + hobject_t src; + object_locator_t oloc; + version_t version; + + tid_t objecter_tid; + + list<OpRequestRef> waiting; + + object_copy_cursor_t cursor; + uint64_t size; + utime_t mtime; + map<string,bufferlist> attrs; + bufferlist data; + map<string,bufferlist> omap; + int rval; + + CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v) + : ctx(c), src(s), oloc(l), version(v), + objecter_tid(0), + size(0), + rval(-1) + {} + }; + typedef boost::shared_ptr<CopyOp> CopyOpRef; /* * Capture all object state associated with an in-progress read or write. @@ -145,6 +178,8 @@ public: int num_read; ///< count read ops int num_write; ///< count update ops + CopyOpRef copy_op; + OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); @@ -749,6 +784,17 @@ protected: void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat); + // -- copyfrom -- + map<hobject_t, CopyOpRef> copy_ops; + + int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version, + CopyOpRef *pcop); + void process_copy_chunk(hobject_t oid, tid_t tid, int r); + void _copy_some(OpContext *ctx, CopyOpRef cop); + void cancel_copy(CopyOpRef cop); + void requeue_cancel_copy_ops(bool requeue=true); + + friend class C_Copyfrom; // -- scrub -- virtual void _scrub(ScrubMap& map); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 00e9409c98a..312eb81e3fd 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2130,6 +2130,9 @@ public: Cond cond; int unstable_writes, readers, writers_waiting, readers_waiting; + /// in-progress copyfrom ops for this object + int copyfrom_readside; + // set if writes for this object are blocked on another objects recovery ObjectContextRef blocked_by; // object blocking our writes set<ObjectContextRef> blocking; // objects whose writes we block @@ -2141,7 +2144,8 @@ public: : ssc(NULL), destructor_callback(0), lock("ReplicatedPG::ObjectContext::lock"), - unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {} + unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), + copyfrom_readside(0) {} ~ObjectContext() { if (destructor_callback) diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc index 9fe6427d3f8..af17847aeab 100644 --- a/src/test/librados/misc.cc +++ b/src/test/librados/misc.cc @@ -571,18 +571,44 @@ TEST(LibRadosMisc, CopyPP) { IoCtx ioctx; ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx)); - char buf[64]; - memset(buf, 0xcc, sizeof(buf)); - bufferlist bl; - bl.append(buf, sizeof(buf)); - - ASSERT_EQ(0, ioctx.write_full("foo", bl)); + bufferlist bl, x; + bl.append("hi there"); + x.append("bar"); + // small object + bufferlist blc = bl; + bufferlist xc = x; + ASSERT_EQ(0, ioctx.write_full("foo", blc)); + ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc)); ObjectWriteOperation op; - op.copyfrom("foo", ioctx, ioctx.get_last_version()); - - ASSERT_EQ(0, ioctx.operate("bar", &op)); + op.copy_from("foo", ioctx, ioctx.get_last_version()); + ASSERT_EQ(0, ioctx.operate("foo.copy", &op)); + + bufferlist bl2, x2; + ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + + // do a big object + bl.append(buffer::create(8000000)); + bl.zero(); + bl.append("tail"); + blc = bl; + xc = x; + ASSERT_EQ(0, ioctx.write_full("big", blc)); + ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc)); + + ObjectWriteOperation op2; + op.copy_from("big", ioctx, ioctx.get_last_version()); + ASSERT_EQ(0, ioctx.operate("big.copy", &op)); + + bl2.clear(); + ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); ioctx.close(); ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); |