diff options
Diffstat (limited to 'include/mbgl/util/run_loop.hpp')
-rw-r--r-- | include/mbgl/util/run_loop.hpp | 125 |
1 files changed, 113 insertions, 12 deletions
diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp index acbea80273..a4c34ec540 100644 --- a/include/mbgl/util/run_loop.hpp +++ b/include/mbgl/util/run_loop.hpp @@ -12,6 +12,8 @@ #include <utility> #include <queue> #include <mutex> +#include <condition_variable> +#include <cassert> namespace mbgl { namespace util { @@ -48,6 +50,7 @@ public: void removeWatch(int fd); // Invoke fn(args...) on this RunLoop. + // This call is threadsafe. template <class Fn, class... Args> void invoke(Fn&& fn, Args&&... args) { std::shared_ptr<WorkTask> task = WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...); @@ -55,6 +58,7 @@ public: } // Post the cancellable work fn(args...) to this RunLoop. + // This call is threadsafe. template <class Fn, class... Args> std::unique_ptr<AsyncRequest> invokeCancellable(Fn&& fn, Args&&... args) { @@ -62,39 +66,136 @@ public: push(task); return std::make_unique<WorkRequest>(task); } - + + // This call is threadsafe. void schedule(std::weak_ptr<Mailbox> mailbox) override { invoke([mailbox] () { Mailbox::maybeReceive(mailbox); }); } + // Instructs the RunLoop to stop processing tasks until a call to resume() follows. This call + // blocks until the RunLoop finished processing the current task. + // This call is threadsafe. + void pause() { + // A RunLoop can't pause itself. + assert(static_cast<RunLoop*>(Scheduler::GetCurrent()) != this); + + std::unique_lock<std::mutex> lock(mutex); + if (pausing) { + // Do not attempt to pause again when we've already started the pause. + return; + } + + // Indicates our intent to pause this RunLoop. Just having pausing set to true doesn't mean + // the RunLoop is paused; e.g. it might still be processing a task. We'll have to wait until + // the process() function has confirmed that it stopped running. We do this via the running + // variable and wait until it's set to false. + pausing = true; + + // If we're at this point, we assume that there is no current request to pause the loop that + // is already in progress, so running must be true. + assert(running); + lock.unlock(); + + // Wake up the RunLoop so that it'll run process(), which eventually sets running to false. + // We have to invoke this because otherwise, running could never be set to false, e.g. when + // this RunLoop doesn't have any items to process. + wake(); + + // Wait until the RunLoop stops running. Once this condition is triggered, we know that the + // RunLoop has stopped processing the current item + lock.lock(); + cv.wait(lock, [this] { return !running; }); + } + + // Resumes processing items in this RunLoop. This call blocks until the RunLoop finished + // processing the current task if it has not yet paused. + // This call is threadsafe. + void resume() { + std::unique_lock<std::mutex> lock(mutex); + if (!pausing || resuming) { + // Do not attempt to resume again when we've already started to resume, or when the + // RunLoop isn't paused + return; + } + + // Indicates the intent to resume this RunLoop. This is required to prevent two concurrent + // resume() calls from two different threads from interleaving. E.g. one resume() call could + // wait until running is set to false, then we reset to true immediately after, which leaves + // the second resume() call waiting until it gets set to false indefinitely. + resuming = true; + + // Make sure that a previous call to pause() (e.g. on another thread) has completed. + cv.wait(lock, [this] { return !running; }); + + // We can now reverse the pause actions. + pausing = false; + running = true; + resuming = false; + + lock.unlock(); + + // Finally, make sure that we resume processing the items we've collected during the pause. + wake(); + } + class Impl; private: MBGL_STORE_THREAD(tid) - using Queue = std::queue<std::shared_ptr<WorkTask>>; + // Wakes up the RunLoop so that it starts processing items in the queue. + void wake(); - void push(std::shared_ptr<WorkTask>); + // Adds a WorkTask to the queue, and wakes it up. + void push(std::shared_ptr<WorkTask> task) { + { + std::lock_guard<std::mutex> lock(mutex); + queue.push(std::move(task)); - void withMutex(std::function<void()>&& fn) { - std::lock_guard<std::mutex> lock(mutex); - fn(); + if (pausing) { + // Do not attempt to wake up the RunLoop if we already know it's paused + return; + } + } + + wake(); } + // Process items in the queue. This should be called from the thread that the RunLoop runs on. void process() { - Queue queue_; - withMutex([&] { queue_.swap(queue); }); + std::shared_ptr<WorkTask> task; + std::unique_lock<std::mutex> lock(mutex); + while (!pausing && !queue.empty()) { + task = std::move(queue.front()); + queue.pop(); + lock.unlock(); + + if (task) { + (*task)(); + } + + lock.lock(); + } + + if (pausing && running) { + // Notify the pause()/resume() calls that we've finished processing the current task and + // are now truly pausing. + running = false; - while (!queue_.empty()) { - (*(queue_.front()))(); - queue_.pop(); + // Use notify_all() instead of notify_one() in case we have both a pause() and a resume() + // call waiting for this condition variable. + cv.notify_all(); } } - Queue queue; std::mutex mutex; + std::condition_variable cv; + bool pausing = false; + bool running = true; + bool resuming = false; + std::queue<std::shared_ptr<WorkTask>> queue; std::unique_ptr<Impl> impl; }; |