diff options
-rw-r--r-- | include/mbgl/util/run_loop.hpp | 125 | ||||
-rw-r--r-- | include/mbgl/util/thread.hpp | 33 | ||||
-rw-r--r-- | platform/android/src/run_loop.cpp | 3 | ||||
-rw-r--r-- | platform/darwin/src/run_loop.cpp | 3 | ||||
-rw-r--r-- | platform/default/run_loop.cpp | 3 | ||||
-rw-r--r-- | platform/qt/src/run_loop.cpp | 3 | ||||
-rw-r--r-- | test/util/thread.test.cpp | 54 |
7 files changed, 174 insertions, 50 deletions
diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp index acbea80273..a4c34ec540 100644 --- a/include/mbgl/util/run_loop.hpp +++ b/include/mbgl/util/run_loop.hpp @@ -12,6 +12,8 @@ #include <utility> #include <queue> #include <mutex> +#include <condition_variable> +#include <cassert> namespace mbgl { namespace util { @@ -48,6 +50,7 @@ public: void removeWatch(int fd); // Invoke fn(args...) on this RunLoop. + // This call is threadsafe. template <class Fn, class... Args> void invoke(Fn&& fn, Args&&... args) { std::shared_ptr<WorkTask> task = WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...); @@ -55,6 +58,7 @@ public: } // Post the cancellable work fn(args...) to this RunLoop. + // This call is threadsafe. template <class Fn, class... Args> std::unique_ptr<AsyncRequest> invokeCancellable(Fn&& fn, Args&&... args) { @@ -62,39 +66,136 @@ public: push(task); return std::make_unique<WorkRequest>(task); } - + + // This call is threadsafe. void schedule(std::weak_ptr<Mailbox> mailbox) override { invoke([mailbox] () { Mailbox::maybeReceive(mailbox); }); } + // Instructs the RunLoop to stop processing tasks until a call to resume() follows. This call + // blocks until the RunLoop finished processing the current task. + // This call is threadsafe. + void pause() { + // A RunLoop can't pause itself. + assert(static_cast<RunLoop*>(Scheduler::GetCurrent()) != this); + + std::unique_lock<std::mutex> lock(mutex); + if (pausing) { + // Do not attempt to pause again when we've already started the pause. + return; + } + + // Indicates our intent to pause this RunLoop. Just having pausing set to true doesn't mean + // the RunLoop is paused; e.g. it might still be processing a task. We'll have to wait until + // the process() function has confirmed that it stopped running. We do this via the running + // variable and wait until it's set to false. + pausing = true; + + // If we're at this point, we assume that there is no current request to pause the loop that + // is already in progress, so running must be true. + assert(running); + lock.unlock(); + + // Wake up the RunLoop so that it'll run process(), which eventually sets running to false. + // We have to invoke this because otherwise, running could never be set to false, e.g. when + // this RunLoop doesn't have any items to process. + wake(); + + // Wait until the RunLoop stops running. Once this condition is triggered, we know that the + // RunLoop has stopped processing the current item + lock.lock(); + cv.wait(lock, [this] { return !running; }); + } + + // Resumes processing items in this RunLoop. This call blocks until the RunLoop finished + // processing the current task if it has not yet paused. + // This call is threadsafe. + void resume() { + std::unique_lock<std::mutex> lock(mutex); + if (!pausing || resuming) { + // Do not attempt to resume again when we've already started to resume, or when the + // RunLoop isn't paused + return; + } + + // Indicates the intent to resume this RunLoop. This is required to prevent two concurrent + // resume() calls from two different threads from interleaving. E.g. one resume() call could + // wait until running is set to false, then we reset to true immediately after, which leaves + // the second resume() call waiting until it gets set to false indefinitely. + resuming = true; + + // Make sure that a previous call to pause() (e.g. on another thread) has completed. + cv.wait(lock, [this] { return !running; }); + + // We can now reverse the pause actions. + pausing = false; + running = true; + resuming = false; + + lock.unlock(); + + // Finally, make sure that we resume processing the items we've collected during the pause. + wake(); + } + class Impl; private: MBGL_STORE_THREAD(tid) - using Queue = std::queue<std::shared_ptr<WorkTask>>; + // Wakes up the RunLoop so that it starts processing items in the queue. + void wake(); - void push(std::shared_ptr<WorkTask>); + // Adds a WorkTask to the queue, and wakes it up. + void push(std::shared_ptr<WorkTask> task) { + { + std::lock_guard<std::mutex> lock(mutex); + queue.push(std::move(task)); - void withMutex(std::function<void()>&& fn) { - std::lock_guard<std::mutex> lock(mutex); - fn(); + if (pausing) { + // Do not attempt to wake up the RunLoop if we already know it's paused + return; + } + } + + wake(); } + // Process items in the queue. This should be called from the thread that the RunLoop runs on. void process() { - Queue queue_; - withMutex([&] { queue_.swap(queue); }); + std::shared_ptr<WorkTask> task; + std::unique_lock<std::mutex> lock(mutex); + while (!pausing && !queue.empty()) { + task = std::move(queue.front()); + queue.pop(); + lock.unlock(); + + if (task) { + (*task)(); + } + + lock.lock(); + } + + if (pausing && running) { + // Notify the pause()/resume() calls that we've finished processing the current task and + // are now truly pausing. + running = false; - while (!queue_.empty()) { - (*(queue_.front()))(); - queue_.pop(); + // Use notify_all() instead of notify_one() in case we have both a pause() and a resume() + // call waiting for this condition variable. + cv.notify_all(); } } - Queue queue; std::mutex mutex; + std::condition_variable cv; + bool pausing = false; + bool running = true; + bool resuming = false; + std::queue<std::shared_ptr<WorkTask>> queue; std::unique_ptr<Impl> impl; }; diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index e3bd18143d..198e4b063a 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -61,9 +61,7 @@ public: } ~Thread() override { - if (paused) { - resume(); - } + resume(); std::promise<void> joinable; @@ -94,34 +92,12 @@ public: // sent to a paused `Object` will be queued and only processed after // `resume()` is called. void pause() { - MBGL_VERIFY_THREAD(tid); - - assert(!paused); - - paused = std::make_unique<std::promise<void>>(); - resumed = std::make_unique<std::promise<void>>(); - - auto pausing = paused->get_future(); - - loop->invoke([this] { - auto resuming = resumed->get_future(); - paused->set_value(); - resuming.get(); - }); - - pausing.get(); + loop->pause(); } // Resumes the `Object` thread previously paused by `pause()`. void resume() { - MBGL_VERIFY_THREAD(tid); - - assert(paused); - - resumed->set_value(); - - resumed.reset(); - paused.reset(); + loop->resume(); } private: @@ -134,9 +110,6 @@ private: std::thread thread; std::unique_ptr<Actor<Object>> object; - std::unique_ptr<std::promise<void>> paused; - std::unique_ptr<std::promise<void>> resumed; - util::RunLoop* loop = nullptr; }; diff --git a/platform/android/src/run_loop.cpp b/platform/android/src/run_loop.cpp index dff7d1d984..34366d836a 100644 --- a/platform/android/src/run_loop.cpp +++ b/platform/android/src/run_loop.cpp @@ -216,8 +216,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() { return Get()->impl.get(); } -void RunLoop::push(std::shared_ptr<WorkTask> task) { - withMutex([&] { queue.push(std::move(task)); }); +void RunLoop::wake() { impl->wake(); } diff --git a/platform/darwin/src/run_loop.cpp b/platform/darwin/src/run_loop.cpp index 2ba8f8415b..0778b004e5 100644 --- a/platform/darwin/src/run_loop.cpp +++ b/platform/darwin/src/run_loop.cpp @@ -29,8 +29,7 @@ RunLoop::~RunLoop() { Scheduler::SetCurrent(nullptr); } -void RunLoop::push(std::shared_ptr<WorkTask> task) { - withMutex([&] { queue.push(std::move(task)); }); +void RunLoop::wake() { impl->async->send(); } diff --git a/platform/default/run_loop.cpp b/platform/default/run_loop.cpp index 6375dba78e..868ee72114 100644 --- a/platform/default/run_loop.cpp +++ b/platform/default/run_loop.cpp @@ -129,8 +129,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() { return Get()->impl->loop; } -void RunLoop::push(std::shared_ptr<WorkTask> task) { - withMutex([&] { queue.push(std::move(task)); }); +void RunLoop::wake() { impl->async->send(); } diff --git a/platform/qt/src/run_loop.cpp b/platform/qt/src/run_loop.cpp index 71ea19032a..c25243c8e7 100644 --- a/platform/qt/src/run_loop.cpp +++ b/platform/qt/src/run_loop.cpp @@ -52,8 +52,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() { return nullptr; } -void RunLoop::push(std::shared_ptr<WorkTask> task) { - withMutex([&] { queue.push(task); }); +void RunLoop::wake() { impl->async->send(); } diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp index 76fb5ce3f0..c74dae2ec6 100644 --- a/test/util/thread.test.cpp +++ b/test/util/thread.test.cpp @@ -275,3 +275,57 @@ TEST(Thread, PauseResume) { thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); loop.run(); } + +TEST(Thread, MultiplePauseResumeCalls) { + RunLoop loop; + + Thread<TestWorker> thread("Test"); + + // Test if multiple pause calls work + thread.pause(); + thread.pause(); + thread.resume(); + thread.resume(); + + thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + loop.run(); +} + +TEST(Thread, TestImmediatePause) { + using namespace std::chrono_literals; + + RunLoop loop; + + Thread<TestWorker> thread("Test"); + + std::promise<void> resume; + auto resumed = resume.get_future(); + + std::atomic<bool> ending { false }; + + thread.pause(); + thread.actor().invoke(&TestWorker::send, [&] { + resume.set_value(); + + // Make sure we have some time to process the pause() call. + std::this_thread::sleep_for(300ms); + }); + + // We're scheduling a second action right after, before calling pause. Ensure that it never + // gets called. + thread.actor().invoke(&TestWorker::send, [&] { + EXPECT_TRUE(ending) << "callback called without ending"; + }); + + thread.resume(); + resumed.get(); + thread.pause(); + + Timer timer; + timer.start(600ms, Duration::zero(), [&] { + ending = true; + loop.stop(); + }); + + loop.run(); +} |