summaryrefslogtreecommitdiff
path: root/src/osd/ReplicatedPG.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/ReplicatedPG.h')
-rw-r--r--src/osd/ReplicatedPG.h113
1 files changed, 101 insertions, 12 deletions
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 05edcef6adf..c277c0d3f86 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -18,6 +18,7 @@
#define CEPH_REPLICATEDPG_H
#include <boost/optional.hpp>
+#include <boost/tuple/tuple.hpp>
#include "include/assert.h"
#include "common/cmdparse.h"
@@ -93,9 +94,11 @@ public:
* state associated with a copy operation
*/
struct OpContext;
+ class CopyCallback;
struct CopyOp {
- OpContext *ctx;
+ CopyCallback *cb;
+ ObjectContextRef obc;
hobject_t src;
object_locator_t oloc;
version_t version;
@@ -114,16 +117,86 @@ public:
hobject_t temp_oid;
object_copy_cursor_t temp_cursor;
- CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
- : ctx(c), src(s), oloc(l), version(v),
+ CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l,
+ version_t v, const hobject_t& dest)
+ : cb(cb_), obc(_obc), src(s), oloc(l), version(v),
objecter_tid(0),
size(0),
- rval(-1)
+ rval(-1),
+ temp_oid(dest)
{}
};
typedef boost::shared_ptr<CopyOp> CopyOpRef;
+ /**
+ * The CopyCallback class defines an interface for completions to the
+ * copy_start code. Users of the copy infrastructure must implement
+ * one and give an instance of the class to start_copy.
+ *
+ * The implementer is responsible for making sure that the CopyCallback
+ * can associate itself with the correct copy operation. The presence
+ * of the closing Transaction ensures that write operations can be performed
+ * atomically with the copy being completed (which doing them in separate
+ * transactions would not allow); if you are doing the copy for a read
+ * op you will have to generate a separate op to finish the copy with.
+ */
+ /// return code, total object size, data in temp object?, final Transaction
+ typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults;
+ class CopyCallback : public GenContext<CopyResults&> {
+ protected:
+ CopyCallback() {}
+ /**
+ * results.get<0>() is the return code: 0 for success; -ECANCELLED if
+ * the operation was cancelled by the local OSD; -errno for other issues.
+ * results.get<1>() is the total size of the object (for updating pg stats)
+ * results.get<2>() indicates whether we have already written data to
+ * the temp object (so it needs to get cleaned up, if the return code
+ * indicates a failure)
+ * results.get<3>() is a Transaction; if non-empty you need to perform
+ * its results before any other accesses to the object in order to
+ * complete the copy.
+ */
+ virtual void finish(CopyResults& results_) = 0;
+
+ public:
+ /// Provide the final size of the copied object to the CopyCallback
+ virtual ~CopyCallback() {};
+ };
+
+ class CopyFromCallback: public CopyCallback {
+ public:
+ CopyResults results;
+ OpContext *ctx;
+ hobject_t temp_obj;
+ CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) :
+ ctx(ctx_), temp_obj(temp_obj_) {}
+ ~CopyFromCallback() {}
+
+ virtual void finish(CopyResults& results_) {
+ results = results_;
+ int r = results.get<0>();
+ if (r >= 0) {
+ ctx->pg->execute_ctx(ctx);
+ }
+ ctx->copy_cb = NULL;
+ if (r < 0) {
+ if (r != -ECANCELED) { // on cancel just toss it out; client resends
+ ctx->pg->osd->reply_op_error(ctx->op, r);
+ }
+ delete ctx;
+ }
+ }
+
+ bool is_temp_obj_used() { return results.get<2>(); }
+ uint64_t get_data_size() { return results.get<1>(); }
+ int get_result() { return results.get<0>(); }
+ };
+ friend class CopyFromCallback;
+
boost::scoped_ptr<PGBackend> pgbackend;
+ PGBackend *get_pgbackend() {
+ return pgbackend.get();
+ }
/// Listener methods
void on_local_recover_start(
@@ -297,7 +370,7 @@ public:
int num_read; ///< count read ops
int num_write; ///< count update ops
- CopyOpRef copy_op;
+ CopyFromCallback *copy_cb;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
@@ -314,7 +387,8 @@ public:
current_osd_subop_num(0),
data_off(0), reply(NULL), pg(_pg),
num_read(0),
- num_write(0) {
+ num_write(0),
+ copy_cb(NULL) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@@ -619,10 +693,16 @@ protected:
* @bi [out] resulting map of objects to eversion_t's
*/
void scan_range(
- hobject_t begin, int min, int max, BackfillInterval *bi,
+ int min, int max, BackfillInterval *bi,
ThreadPool::TPHandle &handle
);
+ /// Update a hash range to reflect changes since the last scan
+ void update_range(
+ BackfillInterval *bi, ///< [in,out] interval to update
+ ThreadPool::TPHandle &handle ///< [in] tp handle
+ );
+
void prep_backfill_object_push(
hobject_t oid, eversion_t v, eversion_t have, int peer,
PGBackend::RecoveryHandle *h);
@@ -662,12 +742,17 @@ protected:
}
};
struct C_OSD_OndiskWriteUnlock : public Context {
- ObjectContextRef obc, obc2;
- C_OSD_OndiskWriteUnlock(ObjectContextRef o, ObjectContextRef o2 = ObjectContextRef()) : obc(o), obc2(o2) {}
+ ObjectContextRef obc, obc2, obc3;
+ C_OSD_OndiskWriteUnlock(
+ ObjectContextRef o,
+ ObjectContextRef o2 = ObjectContextRef(),
+ ObjectContextRef o3 = ObjectContextRef()) : obc(o), obc2(o2), obc3(o3) {}
void finish(int r) {
obc->ondisk_write_unlock();
if (obc2)
obc2->ondisk_write_unlock();
+ if (obc3)
+ obc3->ondisk_write_unlock();
}
};
struct C_OSD_OndiskWriteUnlockList : public Context {
@@ -723,11 +808,15 @@ protected:
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
- int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version);
+ int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
+ object_locator_t oloc, version_t version,
+ const hobject_t& temp_dest_oid);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
- void _copy_some(OpContext *ctx, CopyOpRef cop);
- int finish_copy(OpContext *ctx);
+ void _copy_some(ObjectContextRef obc, CopyOpRef cop);
+ void _build_finish_copy_transaction(CopyOpRef cop,
+ ObjectStore::Transaction& t);
+ int finish_copyfrom(OpContext *ctx);
void cancel_copy(CopyOpRef cop);
void cancel_copy_ops();