summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Käfer <mail@kkaefer.com>2017-10-10 15:39:07 +0200
committerKonstantin Käfer <mail@kkaefer.com>2017-10-10 19:34:36 +0200
commit329282fa574670c54f580ddebbc3b9f97161171b (patch)
tree144eb1312e8e605e8225f0e5e2355c9c7e0b6a9c
parentb820a3031885e6b2df987dc71d1b4562b7ca25f2 (diff)
downloadqtlocation-mapboxgl-upstream/prioritize-thread-pause-resume.tar.gz
[core] Allow pausing RunLoopsupstream/prioritize-thread-pause-resume
Previously we had the capability to pause on the Thread object, which used regular tasks to pause the RunLoop by blocking it. Instead, we can now pause the entire RunLoop and prevent it from processing items. This means that a pause() call is no longer treated as a regular task. Instead, it will take precedence over scheduled tasks, which means that a pause() call takes effect much more instantly since the RunLoop doesn't process the queue before the pause() task. Having pause() take effect much quicker is useful for situations where stopping the loop quickly is important, e.g. when the application goes to the background on iOS, and we have to stop processing tasks that access the file system. It also reduces the length of the blocking pause() call since the time until the RunLoop is paused is shortened.
-rw-r--r--include/mbgl/util/run_loop.hpp125
-rw-r--r--include/mbgl/util/thread.hpp33
-rw-r--r--platform/android/src/run_loop.cpp3
-rw-r--r--platform/darwin/src/run_loop.cpp3
-rw-r--r--platform/default/run_loop.cpp3
-rw-r--r--platform/qt/src/run_loop.cpp3
-rw-r--r--test/util/thread.test.cpp54
7 files changed, 174 insertions, 50 deletions
diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp
index acbea80273..a4c34ec540 100644
--- a/include/mbgl/util/run_loop.hpp
+++ b/include/mbgl/util/run_loop.hpp
@@ -12,6 +12,8 @@
#include <utility>
#include <queue>
#include <mutex>
+#include <condition_variable>
+#include <cassert>
namespace mbgl {
namespace util {
@@ -48,6 +50,7 @@ public:
void removeWatch(int fd);
// Invoke fn(args...) on this RunLoop.
+ // This call is threadsafe.
template <class Fn, class... Args>
void invoke(Fn&& fn, Args&&... args) {
std::shared_ptr<WorkTask> task = WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...);
@@ -55,6 +58,7 @@ public:
}
// Post the cancellable work fn(args...) to this RunLoop.
+ // This call is threadsafe.
template <class Fn, class... Args>
std::unique_ptr<AsyncRequest>
invokeCancellable(Fn&& fn, Args&&... args) {
@@ -62,39 +66,136 @@ public:
push(task);
return std::make_unique<WorkRequest>(task);
}
-
+
+ // This call is threadsafe.
void schedule(std::weak_ptr<Mailbox> mailbox) override {
invoke([mailbox] () {
Mailbox::maybeReceive(mailbox);
});
}
+ // Instructs the RunLoop to stop processing tasks until a call to resume() follows. This call
+ // blocks until the RunLoop finished processing the current task.
+ // This call is threadsafe.
+ void pause() {
+ // A RunLoop can't pause itself.
+ assert(static_cast<RunLoop*>(Scheduler::GetCurrent()) != this);
+
+ std::unique_lock<std::mutex> lock(mutex);
+ if (pausing) {
+ // Do not attempt to pause again when we've already started the pause.
+ return;
+ }
+
+ // Indicates our intent to pause this RunLoop. Just having pausing set to true doesn't mean
+ // the RunLoop is paused; e.g. it might still be processing a task. We'll have to wait until
+ // the process() function has confirmed that it stopped running. We do this via the running
+ // variable and wait until it's set to false.
+ pausing = true;
+
+ // If we're at this point, we assume that there is no current request to pause the loop that
+ // is already in progress, so running must be true.
+ assert(running);
+ lock.unlock();
+
+ // Wake up the RunLoop so that it'll run process(), which eventually sets running to false.
+ // We have to invoke this because otherwise, running could never be set to false, e.g. when
+ // this RunLoop doesn't have any items to process.
+ wake();
+
+ // Wait until the RunLoop stops running. Once this condition is triggered, we know that the
+ // RunLoop has stopped processing the current item
+ lock.lock();
+ cv.wait(lock, [this] { return !running; });
+ }
+
+ // Resumes processing items in this RunLoop. This call blocks until the RunLoop finished
+ // processing the current task if it has not yet paused.
+ // This call is threadsafe.
+ void resume() {
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!pausing || resuming) {
+ // Do not attempt to resume again when we've already started to resume, or when the
+ // RunLoop isn't paused
+ return;
+ }
+
+ // Indicates the intent to resume this RunLoop. This is required to prevent two concurrent
+ // resume() calls from two different threads from interleaving. E.g. one resume() call could
+ // wait until running is set to false, then we reset to true immediately after, which leaves
+ // the second resume() call waiting until it gets set to false indefinitely.
+ resuming = true;
+
+ // Make sure that a previous call to pause() (e.g. on another thread) has completed.
+ cv.wait(lock, [this] { return !running; });
+
+ // We can now reverse the pause actions.
+ pausing = false;
+ running = true;
+ resuming = false;
+
+ lock.unlock();
+
+ // Finally, make sure that we resume processing the items we've collected during the pause.
+ wake();
+ }
+
class Impl;
private:
MBGL_STORE_THREAD(tid)
- using Queue = std::queue<std::shared_ptr<WorkTask>>;
+ // Wakes up the RunLoop so that it starts processing items in the queue.
+ void wake();
- void push(std::shared_ptr<WorkTask>);
+ // Adds a WorkTask to the queue, and wakes it up.
+ void push(std::shared_ptr<WorkTask> task) {
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ queue.push(std::move(task));
- void withMutex(std::function<void()>&& fn) {
- std::lock_guard<std::mutex> lock(mutex);
- fn();
+ if (pausing) {
+ // Do not attempt to wake up the RunLoop if we already know it's paused
+ return;
+ }
+ }
+
+ wake();
}
+ // Process items in the queue. This should be called from the thread that the RunLoop runs on.
void process() {
- Queue queue_;
- withMutex([&] { queue_.swap(queue); });
+ std::shared_ptr<WorkTask> task;
+ std::unique_lock<std::mutex> lock(mutex);
+ while (!pausing && !queue.empty()) {
+ task = std::move(queue.front());
+ queue.pop();
+ lock.unlock();
+
+ if (task) {
+ (*task)();
+ }
+
+ lock.lock();
+ }
+
+ if (pausing && running) {
+ // Notify the pause()/resume() calls that we've finished processing the current task and
+ // are now truly pausing.
+ running = false;
- while (!queue_.empty()) {
- (*(queue_.front()))();
- queue_.pop();
+ // Use notify_all() instead of notify_one() in case we have both a pause() and a resume()
+ // call waiting for this condition variable.
+ cv.notify_all();
}
}
- Queue queue;
std::mutex mutex;
+ std::condition_variable cv;
+ bool pausing = false;
+ bool running = true;
+ bool resuming = false;
+ std::queue<std::shared_ptr<WorkTask>> queue;
std::unique_ptr<Impl> impl;
};
diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp
index e3bd18143d..198e4b063a 100644
--- a/include/mbgl/util/thread.hpp
+++ b/include/mbgl/util/thread.hpp
@@ -61,9 +61,7 @@ public:
}
~Thread() override {
- if (paused) {
- resume();
- }
+ resume();
std::promise<void> joinable;
@@ -94,34 +92,12 @@ public:
// sent to a paused `Object` will be queued and only processed after
// `resume()` is called.
void pause() {
- MBGL_VERIFY_THREAD(tid);
-
- assert(!paused);
-
- paused = std::make_unique<std::promise<void>>();
- resumed = std::make_unique<std::promise<void>>();
-
- auto pausing = paused->get_future();
-
- loop->invoke([this] {
- auto resuming = resumed->get_future();
- paused->set_value();
- resuming.get();
- });
-
- pausing.get();
+ loop->pause();
}
// Resumes the `Object` thread previously paused by `pause()`.
void resume() {
- MBGL_VERIFY_THREAD(tid);
-
- assert(paused);
-
- resumed->set_value();
-
- resumed.reset();
- paused.reset();
+ loop->resume();
}
private:
@@ -134,9 +110,6 @@ private:
std::thread thread;
std::unique_ptr<Actor<Object>> object;
- std::unique_ptr<std::promise<void>> paused;
- std::unique_ptr<std::promise<void>> resumed;
-
util::RunLoop* loop = nullptr;
};
diff --git a/platform/android/src/run_loop.cpp b/platform/android/src/run_loop.cpp
index dff7d1d984..34366d836a 100644
--- a/platform/android/src/run_loop.cpp
+++ b/platform/android/src/run_loop.cpp
@@ -216,8 +216,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() {
return Get()->impl.get();
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(std::move(task)); });
+void RunLoop::wake() {
impl->wake();
}
diff --git a/platform/darwin/src/run_loop.cpp b/platform/darwin/src/run_loop.cpp
index 2ba8f8415b..0778b004e5 100644
--- a/platform/darwin/src/run_loop.cpp
+++ b/platform/darwin/src/run_loop.cpp
@@ -29,8 +29,7 @@ RunLoop::~RunLoop() {
Scheduler::SetCurrent(nullptr);
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(std::move(task)); });
+void RunLoop::wake() {
impl->async->send();
}
diff --git a/platform/default/run_loop.cpp b/platform/default/run_loop.cpp
index 6375dba78e..868ee72114 100644
--- a/platform/default/run_loop.cpp
+++ b/platform/default/run_loop.cpp
@@ -129,8 +129,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() {
return Get()->impl->loop;
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(std::move(task)); });
+void RunLoop::wake() {
impl->async->send();
}
diff --git a/platform/qt/src/run_loop.cpp b/platform/qt/src/run_loop.cpp
index 71ea19032a..c25243c8e7 100644
--- a/platform/qt/src/run_loop.cpp
+++ b/platform/qt/src/run_loop.cpp
@@ -52,8 +52,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() {
return nullptr;
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(task); });
+void RunLoop::wake() {
impl->async->send();
}
diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp
index 76fb5ce3f0..c74dae2ec6 100644
--- a/test/util/thread.test.cpp
+++ b/test/util/thread.test.cpp
@@ -275,3 +275,57 @@ TEST(Thread, PauseResume) {
thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
loop.run();
}
+
+TEST(Thread, MultiplePauseResumeCalls) {
+ RunLoop loop;
+
+ Thread<TestWorker> thread("Test");
+
+ // Test if multiple pause calls work
+ thread.pause();
+ thread.pause();
+ thread.resume();
+ thread.resume();
+
+ thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
+ loop.run();
+}
+
+TEST(Thread, TestImmediatePause) {
+ using namespace std::chrono_literals;
+
+ RunLoop loop;
+
+ Thread<TestWorker> thread("Test");
+
+ std::promise<void> resume;
+ auto resumed = resume.get_future();
+
+ std::atomic<bool> ending { false };
+
+ thread.pause();
+ thread.actor().invoke(&TestWorker::send, [&] {
+ resume.set_value();
+
+ // Make sure we have some time to process the pause() call.
+ std::this_thread::sleep_for(300ms);
+ });
+
+ // We're scheduling a second action right after, before calling pause. Ensure that it never
+ // gets called.
+ thread.actor().invoke(&TestWorker::send, [&] {
+ EXPECT_TRUE(ending) << "callback called without ending";
+ });
+
+ thread.resume();
+ resumed.get();
+ thread.pause();
+
+ Timer timer;
+ timer.start(600ms, Duration::zero(), [&] {
+ ending = true;
+ loop.stop();
+ });
+
+ loop.run();
+}