summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com>2019-10-04 18:03:49 +0300
committerMikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com>2019-10-07 12:37:19 +0300
commit7dd1d73db761cfe55ac13505dc85e9b279f98f03 (patch)
tree98d635976da76804c15d69281c52be04d0bca48b
parentbd283fc1be2f90ce02b37617411a0ce4246d898e (diff)
downloadqtlocation-mapboxgl-upstream/mikhail_lightweight_thread.tar.gz
[core] Introduce SingleThreadSchedulerupstream/mikhail_lightweight_thread
`SingleThreadScheduler` implements `Scheduler` interface using a lightweight event loop on a single thread. Therefore, all scheduled tasks are guaranteed to execute consequently. `ThreadPoolScheduler` (renamed from `ThreadPool`) schedules tasks on an arbitrary thread from the contained thread pool. Some of the scheduled tasks might be executed in parallel.
-rw-r--r--src/mbgl/actor/scheduler.cpp2
-rw-r--r--src/mbgl/util/thread_pool.cpp112
-rw-r--r--src/mbgl/util/thread_pool.hpp43
3 files changed, 109 insertions, 48 deletions
diff --git a/src/mbgl/actor/scheduler.cpp b/src/mbgl/actor/scheduler.cpp
index cb0c7728ec..d8911747d6 100644
--- a/src/mbgl/actor/scheduler.cpp
+++ b/src/mbgl/actor/scheduler.cpp
@@ -2,6 +2,8 @@
#include <mbgl/util/thread_local.hpp>
#include <mbgl/util/thread_pool.hpp>
+#include <mutex>
+
namespace mbgl {
util::ThreadLocal<Scheduler> g_currentScheduler;
diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp
index d8df0cd575..cb80591759 100644
--- a/src/mbgl/util/thread_pool.cpp
+++ b/src/mbgl/util/thread_pool.cpp
@@ -4,57 +4,97 @@
#include <mbgl/util/string.hpp>
#include <mbgl/platform/thread.hpp>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+
namespace mbgl {
-ThreadPool::ThreadPool(std::size_t count) {
- threads.reserve(count);
+class SingleThreadScheduler::Impl {
+public:
+ void schedule(std::function<void()> fn);
+ void terminate();
+ static std::shared_ptr<Impl> create() { return std::make_shared<Impl>(); }
- for (std::size_t i = 0; i < count; ++i) {
- threads.emplace_back([this, i]() {
- platform::setCurrentThreadName(std::string{ "Worker " } + util::toString(i + 1));
- platform::attachThread();
-
- while (true) {
- std::unique_lock<std::mutex> lock(mutex);
-
- cv.wait(lock, [this] {
- return !queue.empty() || terminate;
- });
-
- if (terminate) {
- platform::detachThread();
- return;
- }
-
- auto function = std::move(queue.front());
- queue.pop();
- lock.unlock();
- if (function) function();
- }
- });
+ std::queue<std::function<void()>> queue;
+ std::mutex mutex;
+ std::condition_variable cv;
+ std::atomic_bool terminated{false};
+};
+
+void SingleThreadScheduler::Impl::schedule(std::function<void()> fn) {
+ assert(fn);
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ queue.push(std::move(fn));
}
+
+ cv.notify_one();
}
-ThreadPool::~ThreadPool() {
- {
+void SingleThreadScheduler::Impl::terminate() {
+ if (!terminated) {
std::lock_guard<std::mutex> lock(mutex);
- terminate = true;
+ terminated = true;
+ cv.notify_all();
}
+}
+
+SingleThreadScheduler::SingleThreadScheduler(std::shared_ptr<SingleThreadScheduler::Impl> impl_,
+ optional<std::string> name_)
+ : impl(std::move(impl_)), thread([this, name = std::move(name_)]() {
+ if (name) {
+ platform::setCurrentThreadName(*name);
+ }
+ platform::attachThread();
+
+ while (true) {
+ std::unique_lock<std::mutex> lock(impl->mutex);
+
+ impl->cv.wait(lock, [this] { return !impl->queue.empty() || impl->terminated; });
- cv.notify_all();
+ if (impl->terminated) {
+ platform::detachThread();
+ return;
+ }
- for (auto& thread : threads) {
- thread.join();
+ auto function = std::move(impl->queue.front());
+ impl->queue.pop();
+ lock.unlock();
+ if (function) function();
+ }
+ }) {
+ assert(impl);
+}
+
+SingleThreadScheduler::SingleThreadScheduler(optional<std::string> name_)
+ : SingleThreadScheduler(Impl::create(), std::move(name_)) {}
+
+SingleThreadScheduler::~SingleThreadScheduler() {
+ if (impl) {
+ impl->terminate();
+ if (thread.joinable()) thread.join();
}
}
-void ThreadPool::schedule(std::function<void()> fn) {
- {
- std::lock_guard<std::mutex> lock(mutex);
- queue.push(std::move(fn));
+void SingleThreadScheduler::schedule(std::function<void()> fn) {
+ impl->schedule(std::move(fn));
+}
+
+ThreadPoolScheduler::ThreadPoolScheduler(std::size_t count) : sharedImpl(SingleThreadScheduler::Impl::create()) {
+ threads.reserve(count);
+ for (std::size_t i = 0; i < count; ++i) {
+ threads.emplace_back(sharedImpl, std::string{"Worker "} + util::toString(i + 1));
}
+}
- cv.notify_one();
+ThreadPoolScheduler::~ThreadPoolScheduler() {
+ sharedImpl->terminate();
+}
+
+void ThreadPoolScheduler::schedule(std::function<void()> fn) {
+ sharedImpl->schedule(std::move(fn));
}
} // namespace mbgl
diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp
index 96fc13bda5..8ec51e3118 100644
--- a/src/mbgl/util/thread_pool.hpp
+++ b/src/mbgl/util/thread_pool.hpp
@@ -1,28 +1,47 @@
#pragma once
-#include <mbgl/actor/mailbox.hpp>
#include <mbgl/actor/scheduler.hpp>
+#include <mbgl/util/optional.hpp>
-#include <condition_variable>
-#include <mutex>
-#include <queue>
+#include <memory>
#include <thread>
+#include <vector>
namespace mbgl {
-class ThreadPool final : public Scheduler {
+// `SingleThreadScheduler` implements `Scheduler` interface using
+// a lightweight event loop on a single thread. Therefore, all scheduled
+// tasks are guaranteed to execute consequently.
+class SingleThreadScheduler final : public Scheduler {
public:
- explicit ThreadPool(std::size_t count);
- ~ThreadPool() override;
+ explicit SingleThreadScheduler(optional<std::string> name = nullopt);
+ ~SingleThreadScheduler() override;
+ SingleThreadScheduler(SingleThreadScheduler&&) = default;
void schedule(std::function<void()>) override;
+ class Impl;
+ SingleThreadScheduler(std::shared_ptr<Impl>, optional<std::string> name);
+
private:
- std::vector<std::thread> threads;
- std::queue<std::function<void()>> queue;
- std::mutex mutex;
- std::condition_variable cv;
- bool terminate{ false };
+ std::shared_ptr<Impl> impl;
+ std::thread thread;
};
+// `ThreadPoolScheduler` schedules tasks on an arbitrary thread from the contained
+// thread pool. Some of the scheduled tasks might be executed in parallel.
+class ThreadPoolScheduler final : public Scheduler {
+public:
+ explicit ThreadPoolScheduler(std::size_t count);
+ ~ThreadPoolScheduler() override;
+
+ void schedule(std::function<void()>) override;
+
+private:
+ std::shared_ptr<SingleThreadScheduler::Impl> sharedImpl;
+ std::vector<SingleThreadScheduler> threads;
+};
+
+using ThreadPool = ThreadPoolScheduler;
+
} // namespace mbgl