// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_FILEJOURNAL_H #define CEPH_FILEJOURNAL_H #include using std::deque; #include "Journal.h" #include "common/Cond.h" #include "common/Mutex.h" #include "common/Thread.h" #include "common/Throttle.h" #ifdef HAVE_LIBAIO # include #endif /** * Implements journaling on top of block device or file. * * Lock ordering is write_lock > aio_lock > queue_lock > flush_lock */ class FileJournal : public Journal { public: /// Protected by queue_lock struct completion_item { uint64_t seq; Context *finish; utime_t start; TrackedOpRef tracked_op; completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref) : seq(o), finish(c), start(s), tracked_op(opref) {} completion_item() : seq(0), finish(0), start(0) {} }; struct write_item { uint64_t seq; bufferlist bl; int alignment; TrackedOpRef tracked_op; write_item(uint64_t s, bufferlist& b, int al, TrackedOpRef opref) : seq(s), alignment(al), tracked_op(opref) { bl.claim(b); } write_item() : seq(0), alignment(0) {} }; Mutex queue_lock; Cond queue_cond; uint64_t journaled_seq; bool plug_journal_completions; deque writeq; deque completions; bool writeq_empty(); write_item &peek_write(); void pop_write(); void flush_queue(); void submit_entry(uint64_t seq, bufferlist& bl, int alignment, Context *oncommit, TrackedOpRef osd_op = TrackedOpRef()); /// End protected by queue_lock /* * journal header */ struct header_t { enum { FLAG_CRC = (1<<0), // NOTE: remove kludgey weirdness in read_header() next time a flag is added. }; uint64_t flags; uuid_d fsid; __u32 block_size; __u32 alignment; int64_t max_size; // max size of journal ring buffer int64_t start; // offset of first entry header_t() : flags(0), block_size(0), alignment(0), max_size(0), start(0) {} void clear() { start = block_size; } uint64_t get_fsid64() { return *(uint64_t*)&fsid.uuid[0]; } void encode(bufferlist& bl) const { __u32 v = 2; ::encode(v, bl); bufferlist em; { ::encode(flags, em); ::encode(fsid, em); ::encode(block_size, em); ::encode(alignment, em); ::encode(max_size, em); ::encode(start, em); } ::encode(em, bl); } void decode(bufferlist::iterator& bl) { __u32 v; ::decode(v, bl); if (v < 2) { // normally 0, but concievably 1 // decode old header_t struct (pre v0.40). bl.advance(4); // skip __u32 flags (it was unused by any old code) flags = 0; uint64_t tfsid; ::decode(tfsid, bl); *(uint64_t*)&fsid.uuid[0] = tfsid; *(uint64_t*)&fsid.uuid[8] = tfsid; ::decode(block_size, bl); ::decode(alignment, bl); ::decode(max_size, bl); ::decode(start, bl); return; } bufferlist em; ::decode(em, bl); bufferlist::iterator t = em.begin(); ::decode(flags, t); ::decode(fsid, t); ::decode(block_size, t); ::decode(alignment, t); ::decode(max_size, t); ::decode(start, t); } } header; struct entry_header_t { uint64_t seq; // fs op seq # uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer. uint32_t len; uint32_t pre_pad, post_pad; uint64_t magic1; uint64_t magic2; void make_magic(off64_t pos, uint64_t fsid) { magic1 = pos; magic2 = fsid ^ seq ^ len; } bool check_magic(off64_t pos, uint64_t fsid) { return magic1 == (uint64_t)pos && magic2 == (fsid ^ seq ^ len); } } __attribute__((__packed__, aligned(4))); private: string fn; /// Protected by flush_lock Mutex flush_lock; Cond write_empty_cond; bool writing; /// End protected by flush_lock char *zero_buf; off64_t max_size; size_t block_size; bool is_bdev; bool directio, aio; bool must_write_header; off64_t write_pos; // byte where the next entry to be written will go off64_t read_pos; // #ifdef HAVE_LIBAIO /// state associated with an in-flight aio request /// Protected by aio_lock struct aio_info { struct iocb iocb; bufferlist bl; struct iovec *iov; bool done; uint64_t off, len; ///< these are for debug only uint64_t seq; ///< seq number to complete on aio completion, if non-zero aio_info(bufferlist& b, uint64_t o, uint64_t s) : iov(NULL), done(false), off(o), len(b.length()), seq(s) { bl.claim(b); } ~aio_info() { delete[] iov; } }; Mutex aio_lock; Cond aio_cond; Cond write_finish_cond; io_context_t aio_ctx; list aio_queue; int aio_num, aio_bytes; /// End protected by aio_lock #endif uint64_t last_committed_seq; /* * full states cycle at the beginnging of each commit epoch, when commit_start() * is called. * FULL - we just filled up during this epoch. * WAIT - we filled up last epoch; now we have to wait until everything during * that epoch commits to the fs before we can start writing over it. * NOTFULL - all good, journal away. */ enum { FULL_NOTFULL = 0, FULL_FULL = 1, FULL_WAIT = 2, } full_state; int fd; // in journal deque > journalq; // track seq offsets, so we can trim later. uint64_t writing_seq; // throttle Throttle throttle_ops, throttle_bytes; void put_throttle(uint64_t ops, uint64_t bytes); // write thread Mutex write_lock; Cond write_cond; bool write_stop; Cond commit_cond; int _open(bool wr, bool create=false); int _open_block_device(); void _check_disk_write_cache() const; int _open_file(int64_t oldsize, blksize_t blksize, bool create); void print_header(); int read_header(); bufferptr prepare_header(); void start_writer(); void stop_writer(); void write_thread_entry(); void queue_completions_thru(uint64_t seq); int check_for_full(uint64_t seq, off64_t pos, off64_t size); int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee); int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes); void do_write(bufferlist& bl); void write_finish_thread_entry(); void check_aio_completion(); void do_aio_write(bufferlist& bl); int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq); void align_bl(off64_t pos, bufferlist& bl); int write_bl(off64_t& pos, bufferlist& bl); void wrap_read_bl(off64_t& pos, int64_t len, bufferlist& bl); class Writer : public Thread { FileJournal *journal; public: Writer(FileJournal *fj) : journal(fj) {} void *entry() { journal->write_thread_entry(); return 0; } } write_thread; class WriteFinisher : public Thread { FileJournal *journal; public: WriteFinisher(FileJournal *fj) : journal(fj) {} void *entry() { journal->write_finish_thread_entry(); return 0; } } write_finish_thread; off64_t get_top() { return ROUND_UP_TO(sizeof(header), block_size); } public: FileJournal(uuid_d fsid, Finisher *fin, Cond *sync_cond, const char *f, bool dio=false, bool ai=true) : Journal(fsid, fin, sync_cond), queue_lock("FileJournal::queue_lock"), journaled_seq(0), plug_journal_completions(false), fn(f), flush_lock("FileJournal::flush_lock"), writing(false), zero_buf(NULL), max_size(0), block_size(0), is_bdev(false), directio(dio), aio(ai), must_write_header(false), write_pos(0), read_pos(0), #ifdef HAVE_LIBAIO aio_lock("FileJournal::aio_lock"), aio_num(0), aio_bytes(0), #endif last_committed_seq(0), full_state(FULL_NOTFULL), fd(-1), writing_seq(0), write_lock("FileJournal::write_lock"), write_stop(false), write_thread(this), write_finish_thread(this) { } ~FileJournal() { delete[] zero_buf; } int create(); int open(uint64_t fs_op_seq); void close(); int peek_fsid(uuid_d& fsid); int dump(ostream& out); void flush(); void throttle(); bool is_writeable() { return read_pos == 0; } void make_writeable(); // writes void commit_start(); void committed_thru(uint64_t seq); bool should_commit_now() { return full_state != FULL_NOTFULL; } void set_wait_on_full(bool b) { wait_on_full = b; } // reads bool read_entry(bufferlist& bl, uint64_t& seq); }; WRITE_CLASS_ENCODER(FileJournal::header_t) #endif