diff options
Diffstat (limited to 'src/osd/ReplicatedPG.h')
-rw-r--r-- | src/osd/ReplicatedPG.h | 113 |
1 files changed, 101 insertions, 12 deletions
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 05edcef6adf..c277c0d3f86 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -18,6 +18,7 @@ #define CEPH_REPLICATEDPG_H #include <boost/optional.hpp> +#include <boost/tuple/tuple.hpp> #include "include/assert.h" #include "common/cmdparse.h" @@ -93,9 +94,11 @@ public: * state associated with a copy operation */ struct OpContext; + class CopyCallback; struct CopyOp { - OpContext *ctx; + CopyCallback *cb; + ObjectContextRef obc; hobject_t src; object_locator_t oloc; version_t version; @@ -114,16 +117,86 @@ public: hobject_t temp_oid; object_copy_cursor_t temp_cursor; - CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v) - : ctx(c), src(s), oloc(l), version(v), + CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l, + version_t v, const hobject_t& dest) + : cb(cb_), obc(_obc), src(s), oloc(l), version(v), objecter_tid(0), size(0), - rval(-1) + rval(-1), + temp_oid(dest) {} }; typedef boost::shared_ptr<CopyOp> CopyOpRef; + /** + * The CopyCallback class defines an interface for completions to the + * copy_start code. Users of the copy infrastructure must implement + * one and give an instance of the class to start_copy. + * + * The implementer is responsible for making sure that the CopyCallback + * can associate itself with the correct copy operation. The presence + * of the closing Transaction ensures that write operations can be performed + * atomically with the copy being completed (which doing them in separate + * transactions would not allow); if you are doing the copy for a read + * op you will have to generate a separate op to finish the copy with. + */ + /// return code, total object size, data in temp object?, final Transaction + typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults; + class CopyCallback : public GenContext<CopyResults&> { + protected: + CopyCallback() {} + /** + * results.get<0>() is the return code: 0 for success; -ECANCELLED if + * the operation was cancelled by the local OSD; -errno for other issues. + * results.get<1>() is the total size of the object (for updating pg stats) + * results.get<2>() indicates whether we have already written data to + * the temp object (so it needs to get cleaned up, if the return code + * indicates a failure) + * results.get<3>() is a Transaction; if non-empty you need to perform + * its results before any other accesses to the object in order to + * complete the copy. + */ + virtual void finish(CopyResults& results_) = 0; + + public: + /// Provide the final size of the copied object to the CopyCallback + virtual ~CopyCallback() {}; + }; + + class CopyFromCallback: public CopyCallback { + public: + CopyResults results; + OpContext *ctx; + hobject_t temp_obj; + CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) : + ctx(ctx_), temp_obj(temp_obj_) {} + ~CopyFromCallback() {} + + virtual void finish(CopyResults& results_) { + results = results_; + int r = results.get<0>(); + if (r >= 0) { + ctx->pg->execute_ctx(ctx); + } + ctx->copy_cb = NULL; + if (r < 0) { + if (r != -ECANCELED) { // on cancel just toss it out; client resends + ctx->pg->osd->reply_op_error(ctx->op, r); + } + delete ctx; + } + } + + bool is_temp_obj_used() { return results.get<2>(); } + uint64_t get_data_size() { return results.get<1>(); } + int get_result() { return results.get<0>(); } + }; + friend class CopyFromCallback; + boost::scoped_ptr<PGBackend> pgbackend; + PGBackend *get_pgbackend() { + return pgbackend.get(); + } /// Listener methods void on_local_recover_start( @@ -297,7 +370,7 @@ public: int num_read; ///< count read ops int num_write; ///< count update ops - CopyOpRef copy_op; + CopyFromCallback *copy_cb; hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking @@ -314,7 +387,8 @@ public: current_osd_subop_num(0), data_off(0), reply(NULL), pg(_pg), num_read(0), - num_write(0) { + num_write(0), + copy_cb(NULL) { if (_ssc) { new_snapset = _ssc->snapset; snapset = &_ssc->snapset; @@ -619,10 +693,16 @@ protected: * @bi [out] resulting map of objects to eversion_t's */ void scan_range( - hobject_t begin, int min, int max, BackfillInterval *bi, + int min, int max, BackfillInterval *bi, ThreadPool::TPHandle &handle ); + /// Update a hash range to reflect changes since the last scan + void update_range( + BackfillInterval *bi, ///< [in,out] interval to update + ThreadPool::TPHandle &handle ///< [in] tp handle + ); + void prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, PGBackend::RecoveryHandle *h); @@ -662,12 +742,17 @@ protected: } }; struct C_OSD_OndiskWriteUnlock : public Context { - ObjectContextRef obc, obc2; - C_OSD_OndiskWriteUnlock(ObjectContextRef o, ObjectContextRef o2 = ObjectContextRef()) : obc(o), obc2(o2) {} + ObjectContextRef obc, obc2, obc3; + C_OSD_OndiskWriteUnlock( + ObjectContextRef o, + ObjectContextRef o2 = ObjectContextRef(), + ObjectContextRef o3 = ObjectContextRef()) : obc(o), obc2(o2), obc3(o3) {} void finish(int r) { obc->ondisk_write_unlock(); if (obc2) obc2->ondisk_write_unlock(); + if (obc3) + obc3->ondisk_write_unlock(); } }; struct C_OSD_OndiskWriteUnlockList : public Context { @@ -723,11 +808,15 @@ protected: // -- copyfrom -- map<hobject_t, CopyOpRef> copy_ops; - int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version); + int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src, + object_locator_t oloc, version_t version, + const hobject_t& temp_dest_oid); void process_copy_chunk(hobject_t oid, tid_t tid, int r); void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t); - void _copy_some(OpContext *ctx, CopyOpRef cop); - int finish_copy(OpContext *ctx); + void _copy_some(ObjectContextRef obc, CopyOpRef cop); + void _build_finish_copy_transaction(CopyOpRef cop, + ObjectStore::Transaction& t); + int finish_copyfrom(OpContext *ctx); void cancel_copy(CopyOpRef cop); void cancel_copy_ops(); |