diff options
author | Sage Weil <sage@newdream.net> | 2010-11-07 09:45:09 -0800 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2010-11-07 09:45:09 -0800 |
commit | b7f578cf76dcda18209c986a484c9cedc81edfcf (patch) | |
tree | f6601e7c6d20179b3d11b495d480adda8fdfe842 /src/common | |
parent | deb9ef760ee67f7cc4b78ea3541bceff550284a4 (diff) | |
parent | c4e56e9a5ea9cf0964169e2ec41102325fb63350 (diff) | |
download | ceph-b7f578cf76dcda18209c986a484c9cedc81edfcf.tar.gz |
Merge remote branch 'origin/timer-fixes' into unstable
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/Logger.cc | 12 | ||||
-rw-r--r-- | src/common/Timer.cc | 523 | ||||
-rw-r--r-- | src/common/Timer.h | 260 |
3 files changed, 390 insertions, 405 deletions
diff --git a/src/common/Logger.cc b/src/common/Logger.cc index 8aab1e00a6e..1a9b20b9428 100644 --- a/src/common/Logger.cc +++ b/src/common/Logger.cc @@ -20,6 +20,7 @@ #include "Logger.h" #include <iostream> +#include <memory> #include "Clock.h" #include "config.h" @@ -31,7 +32,7 @@ // per-process lock. lame, but this way I protect LogType too! Mutex logger_lock("logger_lock"); -SafeTimer logger_timer(logger_lock); +std::auto_ptr < SafeTimer >logger_timer; Context *logger_event = 0; list<Logger*> logger_list; utime_t start; @@ -172,15 +173,14 @@ static void flush_all_loggers() << " next=" << next << dendl; logger_event = new C_FlushLoggers; - logger_timer.add_event_at(next, logger_event); + if (!logger_timer.get()) + logger_timer.reset(new SafeTimer(logger_lock)); + logger_timer->add_event_at(next, logger_event); } static void stop() { - logger_lock.Lock(); - logger_timer.cancel_all(); - logger_timer.join(); - logger_lock.Unlock(); + logger_timer.reset(NULL); } diff --git a/src/common/Timer.cc b/src/common/Timer.cc index fe8223a9347..7b7530c3d83 100644 --- a/src/common/Timer.cc +++ b/src/common/Timer.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,16 +7,15 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ - - - -#include "Timer.h" #include "Cond.h" +#include "Mutex.h" +#include "Thread.h" +#include "Timer.h" #include "config.h" #include "include/Context.h" @@ -27,340 +26,350 @@ #define DBL 10 +#include <sstream> #include <signal.h> #include <sys/time.h> #include <math.h> +typedef std::multimap < utime_t, Context *> scheduled_map_t; +typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t; -/**** thread solution *****/ - -bool Timer::get_next_due(utime_t& when) -{ - if (scheduled.empty()) { - return false; - } else { - map< utime_t, set<Context*> >::iterator it = scheduled.begin(); - when = it->first; - return true; +class TimerThread : public Thread { +public: + TimerThread(Timer &parent_) + : p(parent_) + { } -} + void *entry() + { + p.lock.Lock(); + while (true) { + /* Wait for a cond_signal */ + scheduled_map_t::iterator s = p.scheduled.begin(); + if (s == p.scheduled.end()) + p.cond.Wait(p.lock); + else + p.cond.WaitUntil(p.lock, s->first); -void Timer::timer_entry() -{ - lock.Lock(); - - utime_t now = g_clock.now(); - - while (!thread_stop) { - dout(10) << "at top" << dendl; - // any events due? - utime_t next; - bool next_due = get_next_due(next); - if (next_due) { - dout(10) << "get_next_due - " << next << dendl; - } else { - dout(10) << "get_next_due - nothing scheduled" << dendl; - } - - if (next_due && now >= next) { - // move to pending list - list<Context*> pending; - - map< utime_t, set<Context*> >::iterator it = scheduled.begin(); - while (it != scheduled.end()) { - if (it->first > now) break; - - utime_t t = it->first; - dout(DBL) << "queueing event(s) scheduled at " << t << dendl; - - for (set<Context*>::iterator cit = it->second.begin(); - cit != it->second.end(); - cit++) { - pending.push_back(*cit); - event_times.erase(*cit); - num_event--; - } - - map< utime_t, set<Context*> >::iterator previt = it; - it++; - scheduled.erase(previt); + if (p.exiting) { + dout(DBL) << "exiting TimerThread" << dendl; + p.lock.Unlock(); + return NULL; } - if (!pending.empty()) { - sleeping = false; - lock.Unlock(); - { - // make sure we're not holding any locks while we do callbacks - // make the callbacks myself. - for (list<Context*>::iterator cit = pending.begin(); - cit != pending.end(); - cit++) { - dout(DBL) << "start callback " << *cit << dendl; - (*cit)->finish(0); - dout(DBL) << "finish callback " << *cit << dendl; - delete *cit; - } - pending.clear(); - assert(pending.empty()); - } - lock.Lock(); + // Find out what callbacks we have to do + list <Context*> running; + utime_t now = g_clock.now(); + p.pop_running(running, now); + + if (running.empty()) { + dout(DBL) << "TimerThread: nothing to do." << dendl; + continue; } - now = g_clock.now(); - dout(DBL) << "looping at " << now << dendl; - } - else { - // sleep - if (next_due) { - dout(DBL) << "sleeping until " << next << dendl; - timed_sleep = true; - sleeping = true; - timeout_cond.WaitUntil(lock, next); // wait for waker or time - now = g_clock.now(); - dout(DBL) << "kicked or timed out at " << now << dendl; - } else { - dout(DBL) << "sleeping" << dendl; - timed_sleep = false; - sleeping = true; - sleep_cond.Wait(lock); // wait for waker - now = g_clock.now(); - dout(DBL) << "kicked at " << now << dendl; + p.lock.Unlock(); + + lock_event_lock(); + // p.running is protected by the event lock. + p.running.swap(running); + while (true) { + list <Context*>::const_iterator cit = p.running.begin(); + if (cit == p.running.end()) + break; + p.running.pop_front(); + Context *ctx = *cit; + dout(DBL) << "start callback " << ctx << dendl; + ctx->finish(0); + dout(DBL) << "deleting callback " << ctx << dendl; + delete ctx; + unlock_event_lock(); + // Release the event_lock here to give other waiters a chance. + dout(DBL) << "finished callback " << ctx << dendl; + lock_event_lock(); } - dout(10) << "in brace" << dendl; + unlock_event_lock(); + p.lock.Lock(); } - dout(10) << "at bottom" << dendl; } - lock.Unlock(); -} - +private: + inline void lock_event_lock() + { + if (p.event_lock) + p.event_lock->Lock(); + } + inline void unlock_event_lock() + { + if (p.event_lock) + p.event_lock->Unlock(); + } -/** - * Timer bits - */ + Timer &p; +}; -void Timer::register_timer() +Timer::Timer() + : lock("Timer::lock"), + event_lock(NULL), + cond(), + thread(NULL), + exiting(false) { - if (timer_thread.is_started()) { - if (sleeping) { - dout(DBL) << "register_timer kicking thread" << dendl; - if (timed_sleep) - timeout_cond.SignalAll(); - else - sleep_cond.SignalAll(); - } else { - dout(DBL) << "register_timer doing nothing; thread is awake" << dendl; - // it's probably doing callbacks. - } - } else { - dout(DBL) << "register_timer starting thread" << dendl; - timer_thread.create(); + if (init()) { + assert(0); } } -void Timer::cancel_timer() +Timer::Timer(Mutex *event_lock_) + : lock("Timer::lock"), + event_lock(event_lock_), + cond(), + thread(NULL), + exiting(false) { - // clear my callback pointers - if (timer_thread.is_started()) { - dout(10) << "setting thread_stop flag" << dendl; - lock.Lock(); - thread_stop = true; - if (timed_sleep) - timeout_cond.SignalAll(); - else - sleep_cond.SignalAll(); - lock.Unlock(); - - dout(10) << "waiting for thread to finish" << dendl; - void *ptr; - timer_thread.join(&ptr); - thread_stop = false; - - dout(10) << "thread finished, exit code " << ptr << dendl; + if (init()) { + assert(0); } } -void Timer::cancel_all_events() +Timer::~Timer() { - lock.Lock(); + shutdown(); +} - // clean up unfired events. - for (map<utime_t, set<Context*> >::iterator p = scheduled.begin(); - p != scheduled.end(); - p++) { - for (set<Context*>::iterator q = p->second.begin(); - q != p->second.end(); - q++) { - dout(DBL) << "cancel_all_events deleting " << *q << dendl; - delete *q; - } +void Timer::shutdown() +{ + lock.Lock(); + if (!thread) { + lock.Unlock(); + return; } - scheduled.clear(); - event_times.clear(); - + exiting = true; + cancel_all_events_impl(false); + cond.Signal(); lock.Unlock(); -} + /* Block until the thread has exited. + * Only then do we know that no events are in progress. */ + thread->join(); -/* - * schedule - */ - + delete thread; + thread = NULL; +} void Timer::add_event_after(double seconds, - Context *callback) + Context *callback) { utime_t when = g_clock.now(); when += seconds; Timer::add_event_at(when, callback); } -void Timer::add_event_at(utime_t when, - Context *callback) +void Timer::add_event_at(utime_t when, Context *callback) { lock.Lock(); + /* Don't start using the timer until it's initialized */ + assert(thread); + dout(DBL) << "add_event " << callback << " at " << when << dendl; - // insert - scheduled[when].insert(callback); - assert(event_times.count(callback) == 0); - event_times[callback] = when; - - num_event++; - - // make sure i wake up on time - register_timer(); - + scheduled_map_t::value_type s_val(when, callback); + scheduled_map_t::iterator i = scheduled.insert(s_val); + + event_lookup_map_t::value_type e_val(callback, i); + pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val)); + dout(DBL) << "inserted events entry for " << callback << dendl; + + /* If you hit this, you tried to insert the same Context* twice. */ + assert(rval.second); + + /* If the event we have just inserted comes before everything else, we need to + * adjust our timeout. */ + if (i == scheduled.begin()) + cond.Signal(); + + dout(19) << show_all_events(__func__) << dendl; lock.Unlock(); } -bool Timer::cancel_event(Context *callback) +bool Timer::cancel_event(Context *callback) { + dout(DBL) << __PRETTY_FUNCTION__ << ": " << callback << dendl; + lock.Lock(); - - dout(DBL) << "cancel_event " << callback << dendl; + bool ret = cancel_event_impl(callback, false); + lock.Unlock(); + return ret; +} - if (!event_times.count(callback)) { - dout(DBL) << "cancel_event " << callback << " isn't scheduled (probably executing)" << dendl; - lock.Unlock(); - return false; // wasn't scheduled. - } +void Timer::cancel_all_events(void) +{ + dout(DBL) << __PRETTY_FUNCTION__ << dendl; - utime_t tp = event_times[callback]; - event_times.erase(callback); + dout(19) << show_all_events(__func__) << dendl; - assert(scheduled.count(tp)); - assert(scheduled[tp].count(callback)); - scheduled[tp].erase(callback); - if (scheduled[tp].empty()) - scheduled.erase(tp); - + lock.Lock(); + cancel_all_events_impl(false); lock.Unlock(); +} - // delete the canceled event. - delete callback; +int Timer::init() +{ + int ret = 0; + lock.Lock(); + assert(exiting == false); + assert(!thread); - return true; + dout(DBL) << "Timer::init: starting thread" << dendl; + thread = new TimerThread(*this); + ret = thread->create(); + lock.Unlock(); + return ret; } +bool Timer::cancel_event_impl(Context *callback, bool cancel_running) +{ + dout(19) << show_all_events(__func__) << dendl; -// ------------------------------- + event_lookup_map_t::iterator e = events.find(callback); + if (e != events.end()) { + // Erase the item out of the scheduled map. + scheduled.erase(e->second); + events.erase(e); -void SafeTimer::add_event_after(double seconds, Context *c) -{ - assert(lock.is_locked()); - Context *w = new EventWrapper(this, c); - dout(DBL) << "SafeTimer.add_event_after wrapping " << c << " with " << w << dendl; - scheduled[c] = w; - Timer::add_event_after(seconds, w); + delete callback; + return true; + } + + // If we can't peek at the running list, we have to give up. + if (!cancel_running) + return false; + + // Ok, we will check the running list. It's safe, because we're holding the + // event_lock. + list <Context*>::iterator cit = + std::find(running.begin(), running.end(), callback); + if (cit == running.end()) + return false; + running.erase(cit); + delete callback; + return true; } -void SafeTimer::add_event_at(utime_t when, Context *c) +void Timer::cancel_all_events_impl(bool clear_running) { - assert(lock.is_locked()); - Context *w = new EventWrapper(this, c); - dout(DBL) << "SafeTimer.add_event_at wrapping " << c << " with " << w << dendl; - scheduled[c] = w; - Timer::add_event_at(when, w); + while (1) { + scheduled_map_t::iterator s = scheduled.begin(); + if (s == scheduled.end()) + break; + delete s->second; + scheduled.erase(s); + } + events.clear(); + + if (clear_running) { + running.clear(); + } } -void SafeTimer::EventWrapper::finish(int r) +void Timer::pop_running(list <Context*> &running_, const utime_t &now) { - timer->lock.Lock(); - if (timer->scheduled.count(actual)) { - // still scheduled. execute. - actual->finish(r); - timer->scheduled.erase(actual); - } else { - // i was canceled. - assert(timer->canceled.count(actual)); + while (true) { + std::multimap < utime_t, Context* >::iterator s = scheduled.begin(); + if (s == scheduled.end()) + return; + const utime_t &utime(s->first); + Context *cit(s->second); + if (utime > now) + return; + running_.push_back(cit); + event_lookup_map_t::iterator e = events.find(cit); + assert(e != events.end()); + events.erase(e); + scheduled.erase(s); } +} - // did i get canceled? - // (this can happen even if i just executed above. e.g., i may have canceled myself.) - if (timer->canceled.count(actual)) { - timer->canceled.erase(actual); - timer->cond.Signal(); +std::string Timer::show_all_events(const char *caller) const +{ + ostringstream oss; + string sep; + oss << "show_all_events: from " << caller << ": scheduled ["; + for (scheduled_map_t::const_iterator s = scheduled.begin(); + s != scheduled.end(); + ++s) + { + oss << sep << s->first << "->" << s->second; + sep = ","; } + oss << "] "; + + oss << "events ["; + string sep2; + for (event_lookup_map_t::const_iterator e = events.begin(); + e != events.end(); + ++e) + { + oss << sep2 << e->first << "->" << e->second->first; + sep2 = ","; + } + oss << "]"; + return oss.str(); +} - // delete the original event - delete actual; - - timer->lock.Unlock(); +/******************************************************************/ +SafeTimer::SafeTimer(Mutex &event_lock) + : t(&event_lock) +{ } -bool SafeTimer::cancel_event(Context *c) +SafeTimer::~SafeTimer() { - bool ret; + dout(DBL) << __PRETTY_FUNCTION__ << dendl; - assert(lock.is_locked()); - assert(scheduled.count(c)); + t.shutdown(); +} - ret = Timer::cancel_event(scheduled[c]); +void SafeTimer::shutdown() +{ + t.shutdown(); +} - if (ret) { - // hosed wrapper. hose original event too. - delete c; - } else { - // clean up later. - canceled[c] = scheduled[c]; - } - scheduled.erase(c); - return ret; +void SafeTimer::add_event_after(double seconds, Context *callback) +{ + assert(t.event_lock->is_locked()); + + t.add_event_after(seconds, callback); } -void SafeTimer::cancel_all() +void SafeTimer::add_event_at(utime_t when, Context *callback) { - assert(lock.is_locked()); - - while (!scheduled.empty()) - cancel_event(scheduled.begin()->first); + assert(t.event_lock->is_locked()); + + t.add_event_at(when, callback); } -void SafeTimer::join() +bool SafeTimer::cancel_event(Context *callback) { - assert(lock.is_locked()); - assert(scheduled.empty()); - - if (!canceled.empty()) { - while (!canceled.empty()) { - // wait - dout(2) << "SafeTimer.join waiting for " << canceled.size() << " to join: " << canceled << dendl; - cond.Wait(lock); - } - dout(2) << "SafeTimer.join done" << dendl; - } + dout(DBL) << __PRETTY_FUNCTION__ << ": " << callback << dendl; + + assert(t.event_lock->is_locked()); + + t.lock.Lock(); + bool ret = t.cancel_event_impl(callback, true); + t.lock.Unlock(); + return ret; } -SafeTimer::~SafeTimer() +void SafeTimer::cancel_all_events() { - if (!scheduled.empty() && !canceled.empty()) { - derr(0) << "SafeTimer.~SafeTimer " << scheduled.size() << " events scheduled, " - << canceled.size() << " canceled but unflushed" - << dendl; - assert(0); - } + dout(DBL) << __PRETTY_FUNCTION__ << dendl; + + assert(t.event_lock->is_locked()); + + t.lock.Lock(); + t.cancel_all_events_impl(true); + t.lock.Unlock(); } diff --git a/src/common/Timer.h b/src/common/Timer.h index 63d19021d9b..f19f3c74c52 100644 --- a/src/common/Timer.h +++ b/src/common/Timer.h @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system @@ -7,169 +7,145 @@ * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software + * License version 2.1, as published by the Free Software * Foundation. See file COPYING. - * + * */ #ifndef CEPH_TIMER_H #define CEPH_TIMER_H -#include "include/types.h" -#include "include/Context.h" #include "Clock.h" - -#include "Mutex.h" #include "Cond.h" -#include "Thread.h" +#include "Mutex.h" +#include "include/types.h" +#include <list> #include <map> -#include <set> -using std::map; -using std::set; - -#include <ext/hash_map> -using namespace __gnu_cxx; +class Context; +class TimerThread; -/*** Timer - * schedule callbacks +/* Timer + * + * An instance of the timer class holds a thread which executes callbacks at + * predetermined times. */ - -//class Messenger; - - -namespace __gnu_cxx { - template<> struct hash<Context*> { - size_t operator()(const Context *p) const { - static hash<unsigned long> H; - return H((unsigned long)p); - } - }; -} - - -class Timer { - private: - map< utime_t, set<Context*> > scheduled; // time -> (context ...) - hash_map< Context*, utime_t > event_times; // event -> time - - bool get_next_due(utime_t &when); - - void register_timer(); // make sure i get a callback - void cancel_timer(); // make sure i get a callback - - bool thread_stop; - Mutex lock; - bool timed_sleep; - bool sleeping; - Cond sleep_cond; - Cond timeout_cond; - - public: - void timer_entry(); // waiter thread (that wakes us up) - - class TimerThread : public Thread { - Timer *t; - public: - void *entry() { - t->timer_entry(); - return 0; - } - TimerThread(Timer *_t) : t(_t) {} - } timer_thread; - - - int num_event; - - - public: - Timer() : - thread_stop(false), - lock("Timer::lock"), - timed_sleep(false), - sleeping(false), - timer_thread(this), - num_event(0) - { - } - virtual ~Timer() { - // stop. - cancel_timer(); - - // scheduled - for (map< utime_t, set<Context*> >::iterator it = scheduled.begin(); - it != scheduled.end(); - it++) { - for (set<Context*>::iterator sit = it->second.begin(); - sit != it->second.end(); - sit++) - delete *sit; - } - scheduled.clear(); - } - - void init() { - register_timer(); - } - void shutdown() { - cancel_timer(); - cancel_all_events(); - } - - // schedule events - virtual void add_event_after(double seconds, - Context *callback); - virtual void add_event_at(utime_t when, - Context *callback); - virtual bool cancel_event(Context *callback); - virtual void cancel_all_events(); - - // execute pending events - void execute_pending(); - +class Timer +{ +public: + Timer(); + + /* Calls shutdown() */ + ~Timer(); + + /* Cancel all events and stop the timer thread. + * + * This function might block for a while because it does a thread.join(). + * */ + void shutdown(); + + /* Schedule an event in the future */ + void add_event_after(double seconds, Context *callback); + void add_event_at(utime_t when, Context *callback); + + /* Cancel an event. + * + * If this function returns true, you know that the callback has been + * destroyed and is not currently running. + * If it returns false, either the callback is in progress, or you never addded + * the callback in the first place. + */ + bool cancel_event(Context *callback); + + /* Cancel all events. + * + * Even after this function returns, there may be events in progress. + * Use SafeTimer if you have to be sure that nothing is running after + * cancelling an event or events. + */ + void cancel_all_events(); + +private: + Timer(Mutex *event_lock_); + + /* Starts the timer thread. + * Returns 0 on success; error code otherwise. */ + int init(); + + bool cancel_event_impl(Context *callback, bool cancel_running); + + void cancel_all_events_impl(bool clear_running); + + void pop_running(std::list <Context*> &running_, const utime_t &now); + + std::string show_all_events(const char *caller) const; + + // This class isn't supposed to be copied + Timer(const Timer &rhs); + Timer& operator=(const Timer &rhs); + + Mutex lock; + Mutex *event_lock; + Cond cond; + TimerThread *thread; + bool exiting; + std::multimap < utime_t, Context* > scheduled; + std::map < Context*, std::multimap < utime_t, Context* >::iterator > events; + std::list<Context*> running; + + friend class TimerThread; + friend class SafeTimer; }; - /* * SafeTimer is a wrapper around the a Timer that protects event * execution with an existing mutex. It provides for, among other - * things, reliable event cancellation on class destruction. The - * caller just needs to cancel each event (or cancel_all()), and then - * call join() to ensure any concurrently exectuting events (in other - * threads) get flushed. + * things, reliable event cancellation in cancel_event. Unlike in + * Timer::cancel_event, the caller can be sure that once SafeTimer::cancel_event + * returns, the callback will not be in progress. */ -class SafeTimer : public Timer { - Mutex& lock; - Cond cond; - map<Context*,Context*> scheduled; // actual -> wrapper - map<Context*,Context*> canceled; - - class EventWrapper : public Context { - SafeTimer *timer; - Context *actual; - public: - EventWrapper(SafeTimer *st, Context *c) : timer(st), - actual(c) {} - void finish(int r); - }; - +class SafeTimer +{ public: - SafeTimer(Mutex& l) : lock(l) { } + SafeTimer(Mutex &event_lock_); ~SafeTimer(); - void add_event_after(double seconds, Context *c); - void add_event_at(utime_t when, Context *c); - bool cancel_event(Context *c); - void cancel_all(); - void join(); - - int get_num_scheduled() { return scheduled.size(); } - int get_num_canceled() { return canceled.size(); } + /* Call with the event_lock UNLOCKED. + * + * Cancel all events and stop the timer thread. + * + * If there are any events that still have to run, they will need to take + * the event_lock first. */ + void shutdown(); + + /* Schedule an event in the future + * Call with the event_lock LOCKED */ + void add_event_after(double seconds, Context *callback); + void add_event_at(utime_t when, Context *callback); + + /* Cancel an event. + * Call with the event_lock LOCKED + * + * Returns true if the callback was cancelled. + * Returns false if you never addded the callback in the first place. + */ + bool cancel_event(Context *callback); + + /* Cancel all events. + * Call with the event_lock LOCKED + * + * When this function returns, all events have been cancelled, and there are no + * more in progress. + */ + void cancel_all_events(); + +private: + // This class isn't supposed to be copied + SafeTimer(const SafeTimer &rhs); + SafeTimer& operator=(const SafeTimer &rhs); + + Timer t; }; - - - - #endif |