summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/mbgl/util/run_loop.hpp125
-rw-r--r--include/mbgl/util/thread.hpp33
-rw-r--r--platform/android/src/run_loop.cpp3
-rw-r--r--platform/darwin/src/run_loop.cpp3
-rw-r--r--platform/default/run_loop.cpp3
-rw-r--r--platform/qt/src/run_loop.cpp3
-rw-r--r--test/util/thread.test.cpp54
7 files changed, 174 insertions, 50 deletions
diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp
index acbea80273..a4c34ec540 100644
--- a/include/mbgl/util/run_loop.hpp
+++ b/include/mbgl/util/run_loop.hpp
@@ -12,6 +12,8 @@
#include <utility>
#include <queue>
#include <mutex>
+#include <condition_variable>
+#include <cassert>
namespace mbgl {
namespace util {
@@ -48,6 +50,7 @@ public:
void removeWatch(int fd);
// Invoke fn(args...) on this RunLoop.
+ // This call is threadsafe.
template <class Fn, class... Args>
void invoke(Fn&& fn, Args&&... args) {
std::shared_ptr<WorkTask> task = WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...);
@@ -55,6 +58,7 @@ public:
}
// Post the cancellable work fn(args...) to this RunLoop.
+ // This call is threadsafe.
template <class Fn, class... Args>
std::unique_ptr<AsyncRequest>
invokeCancellable(Fn&& fn, Args&&... args) {
@@ -62,39 +66,136 @@ public:
push(task);
return std::make_unique<WorkRequest>(task);
}
-
+
+ // This call is threadsafe.
void schedule(std::weak_ptr<Mailbox> mailbox) override {
invoke([mailbox] () {
Mailbox::maybeReceive(mailbox);
});
}
+ // Instructs the RunLoop to stop processing tasks until a call to resume() follows. This call
+ // blocks until the RunLoop finished processing the current task.
+ // This call is threadsafe.
+ void pause() {
+ // A RunLoop can't pause itself.
+ assert(static_cast<RunLoop*>(Scheduler::GetCurrent()) != this);
+
+ std::unique_lock<std::mutex> lock(mutex);
+ if (pausing) {
+ // Do not attempt to pause again when we've already started the pause.
+ return;
+ }
+
+ // Indicates our intent to pause this RunLoop. Just having pausing set to true doesn't mean
+ // the RunLoop is paused; e.g. it might still be processing a task. We'll have to wait until
+ // the process() function has confirmed that it stopped running. We do this via the running
+ // variable and wait until it's set to false.
+ pausing = true;
+
+ // If we're at this point, we assume that there is no current request to pause the loop that
+ // is already in progress, so running must be true.
+ assert(running);
+ lock.unlock();
+
+ // Wake up the RunLoop so that it'll run process(), which eventually sets running to false.
+ // We have to invoke this because otherwise, running could never be set to false, e.g. when
+ // this RunLoop doesn't have any items to process.
+ wake();
+
+ // Wait until the RunLoop stops running. Once this condition is triggered, we know that the
+ // RunLoop has stopped processing the current item
+ lock.lock();
+ cv.wait(lock, [this] { return !running; });
+ }
+
+ // Resumes processing items in this RunLoop. This call blocks until the RunLoop finished
+ // processing the current task if it has not yet paused.
+ // This call is threadsafe.
+ void resume() {
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!pausing || resuming) {
+ // Do not attempt to resume again when we've already started to resume, or when the
+ // RunLoop isn't paused
+ return;
+ }
+
+ // Indicates the intent to resume this RunLoop. This is required to prevent two concurrent
+ // resume() calls from two different threads from interleaving. E.g. one resume() call could
+ // wait until running is set to false, then we reset to true immediately after, which leaves
+ // the second resume() call waiting until it gets set to false indefinitely.
+ resuming = true;
+
+ // Make sure that a previous call to pause() (e.g. on another thread) has completed.
+ cv.wait(lock, [this] { return !running; });
+
+ // We can now reverse the pause actions.
+ pausing = false;
+ running = true;
+ resuming = false;
+
+ lock.unlock();
+
+ // Finally, make sure that we resume processing the items we've collected during the pause.
+ wake();
+ }
+
class Impl;
private:
MBGL_STORE_THREAD(tid)
- using Queue = std::queue<std::shared_ptr<WorkTask>>;
+ // Wakes up the RunLoop so that it starts processing items in the queue.
+ void wake();
- void push(std::shared_ptr<WorkTask>);
+ // Adds a WorkTask to the queue, and wakes it up.
+ void push(std::shared_ptr<WorkTask> task) {
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ queue.push(std::move(task));
- void withMutex(std::function<void()>&& fn) {
- std::lock_guard<std::mutex> lock(mutex);
- fn();
+ if (pausing) {
+ // Do not attempt to wake up the RunLoop if we already know it's paused
+ return;
+ }
+ }
+
+ wake();
}
+ // Process items in the queue. This should be called from the thread that the RunLoop runs on.
void process() {
- Queue queue_;
- withMutex([&] { queue_.swap(queue); });
+ std::shared_ptr<WorkTask> task;
+ std::unique_lock<std::mutex> lock(mutex);
+ while (!pausing && !queue.empty()) {
+ task = std::move(queue.front());
+ queue.pop();
+ lock.unlock();
+
+ if (task) {
+ (*task)();
+ }
+
+ lock.lock();
+ }
+
+ if (pausing && running) {
+ // Notify the pause()/resume() calls that we've finished processing the current task and
+ // are now truly pausing.
+ running = false;
- while (!queue_.empty()) {
- (*(queue_.front()))();
- queue_.pop();
+ // Use notify_all() instead of notify_one() in case we have both a pause() and a resume()
+ // call waiting for this condition variable.
+ cv.notify_all();
}
}
- Queue queue;
std::mutex mutex;
+ std::condition_variable cv;
+ bool pausing = false;
+ bool running = true;
+ bool resuming = false;
+ std::queue<std::shared_ptr<WorkTask>> queue;
std::unique_ptr<Impl> impl;
};
diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp
index e3bd18143d..198e4b063a 100644
--- a/include/mbgl/util/thread.hpp
+++ b/include/mbgl/util/thread.hpp
@@ -61,9 +61,7 @@ public:
}
~Thread() override {
- if (paused) {
- resume();
- }
+ resume();
std::promise<void> joinable;
@@ -94,34 +92,12 @@ public:
// sent to a paused `Object` will be queued and only processed after
// `resume()` is called.
void pause() {
- MBGL_VERIFY_THREAD(tid);
-
- assert(!paused);
-
- paused = std::make_unique<std::promise<void>>();
- resumed = std::make_unique<std::promise<void>>();
-
- auto pausing = paused->get_future();
-
- loop->invoke([this] {
- auto resuming = resumed->get_future();
- paused->set_value();
- resuming.get();
- });
-
- pausing.get();
+ loop->pause();
}
// Resumes the `Object` thread previously paused by `pause()`.
void resume() {
- MBGL_VERIFY_THREAD(tid);
-
- assert(paused);
-
- resumed->set_value();
-
- resumed.reset();
- paused.reset();
+ loop->resume();
}
private:
@@ -134,9 +110,6 @@ private:
std::thread thread;
std::unique_ptr<Actor<Object>> object;
- std::unique_ptr<std::promise<void>> paused;
- std::unique_ptr<std::promise<void>> resumed;
-
util::RunLoop* loop = nullptr;
};
diff --git a/platform/android/src/run_loop.cpp b/platform/android/src/run_loop.cpp
index dff7d1d984..34366d836a 100644
--- a/platform/android/src/run_loop.cpp
+++ b/platform/android/src/run_loop.cpp
@@ -216,8 +216,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() {
return Get()->impl.get();
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(std::move(task)); });
+void RunLoop::wake() {
impl->wake();
}
diff --git a/platform/darwin/src/run_loop.cpp b/platform/darwin/src/run_loop.cpp
index 2ba8f8415b..0778b004e5 100644
--- a/platform/darwin/src/run_loop.cpp
+++ b/platform/darwin/src/run_loop.cpp
@@ -29,8 +29,7 @@ RunLoop::~RunLoop() {
Scheduler::SetCurrent(nullptr);
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(std::move(task)); });
+void RunLoop::wake() {
impl->async->send();
}
diff --git a/platform/default/run_loop.cpp b/platform/default/run_loop.cpp
index 6375dba78e..868ee72114 100644
--- a/platform/default/run_loop.cpp
+++ b/platform/default/run_loop.cpp
@@ -129,8 +129,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() {
return Get()->impl->loop;
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(std::move(task)); });
+void RunLoop::wake() {
impl->async->send();
}
diff --git a/platform/qt/src/run_loop.cpp b/platform/qt/src/run_loop.cpp
index 71ea19032a..c25243c8e7 100644
--- a/platform/qt/src/run_loop.cpp
+++ b/platform/qt/src/run_loop.cpp
@@ -52,8 +52,7 @@ LOOP_HANDLE RunLoop::getLoopHandle() {
return nullptr;
}
-void RunLoop::push(std::shared_ptr<WorkTask> task) {
- withMutex([&] { queue.push(task); });
+void RunLoop::wake() {
impl->async->send();
}
diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp
index 76fb5ce3f0..c74dae2ec6 100644
--- a/test/util/thread.test.cpp
+++ b/test/util/thread.test.cpp
@@ -275,3 +275,57 @@ TEST(Thread, PauseResume) {
thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
loop.run();
}
+
+TEST(Thread, MultiplePauseResumeCalls) {
+ RunLoop loop;
+
+ Thread<TestWorker> thread("Test");
+
+ // Test if multiple pause calls work
+ thread.pause();
+ thread.pause();
+ thread.resume();
+ thread.resume();
+
+ thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
+ loop.run();
+}
+
+TEST(Thread, TestImmediatePause) {
+ using namespace std::chrono_literals;
+
+ RunLoop loop;
+
+ Thread<TestWorker> thread("Test");
+
+ std::promise<void> resume;
+ auto resumed = resume.get_future();
+
+ std::atomic<bool> ending { false };
+
+ thread.pause();
+ thread.actor().invoke(&TestWorker::send, [&] {
+ resume.set_value();
+
+ // Make sure we have some time to process the pause() call.
+ std::this_thread::sleep_for(300ms);
+ });
+
+ // We're scheduling a second action right after, before calling pause. Ensure that it never
+ // gets called.
+ thread.actor().invoke(&TestWorker::send, [&] {
+ EXPECT_TRUE(ending) << "callback called without ending";
+ });
+
+ thread.resume();
+ resumed.get();
+ thread.pause();
+
+ Timer timer;
+ timer.start(600ms, Duration::zero(), [&] {
+ ending = true;
+ loop.stop();
+ });
+
+ loop.run();
+}