diff options
author | Mikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com> | 2019-10-04 18:03:49 +0300 |
---|---|---|
committer | Mikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com> | 2019-10-07 12:37:19 +0300 |
commit | 7dd1d73db761cfe55ac13505dc85e9b279f98f03 (patch) | |
tree | 98d635976da76804c15d69281c52be04d0bca48b | |
parent | bd283fc1be2f90ce02b37617411a0ce4246d898e (diff) | |
download | qtlocation-mapboxgl-upstream/mikhail_lightweight_thread.tar.gz |
[core] Introduce SingleThreadSchedulerupstream/mikhail_lightweight_thread
`SingleThreadScheduler` implements `Scheduler` interface using
a lightweight event loop on a single thread. Therefore, all scheduled
tasks are guaranteed to execute consequently.
`ThreadPoolScheduler` (renamed from `ThreadPool`) schedules tasks
on an arbitrary thread from the contained thread pool. Some of the
scheduled tasks might be executed in parallel.
-rw-r--r-- | src/mbgl/actor/scheduler.cpp | 2 | ||||
-rw-r--r-- | src/mbgl/util/thread_pool.cpp | 112 | ||||
-rw-r--r-- | src/mbgl/util/thread_pool.hpp | 43 |
3 files changed, 109 insertions, 48 deletions
diff --git a/src/mbgl/actor/scheduler.cpp b/src/mbgl/actor/scheduler.cpp index cb0c7728ec..d8911747d6 100644 --- a/src/mbgl/actor/scheduler.cpp +++ b/src/mbgl/actor/scheduler.cpp @@ -2,6 +2,8 @@ #include <mbgl/util/thread_local.hpp> #include <mbgl/util/thread_pool.hpp> +#include <mutex> + namespace mbgl { util::ThreadLocal<Scheduler> g_currentScheduler; diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp index d8df0cd575..cb80591759 100644 --- a/src/mbgl/util/thread_pool.cpp +++ b/src/mbgl/util/thread_pool.cpp @@ -4,57 +4,97 @@ #include <mbgl/util/string.hpp> #include <mbgl/platform/thread.hpp> +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <queue> + namespace mbgl { -ThreadPool::ThreadPool(std::size_t count) { - threads.reserve(count); +class SingleThreadScheduler::Impl { +public: + void schedule(std::function<void()> fn); + void terminate(); + static std::shared_ptr<Impl> create() { return std::make_shared<Impl>(); } - for (std::size_t i = 0; i < count; ++i) { - threads.emplace_back([this, i]() { - platform::setCurrentThreadName(std::string{ "Worker " } + util::toString(i + 1)); - platform::attachThread(); - - while (true) { - std::unique_lock<std::mutex> lock(mutex); - - cv.wait(lock, [this] { - return !queue.empty() || terminate; - }); - - if (terminate) { - platform::detachThread(); - return; - } - - auto function = std::move(queue.front()); - queue.pop(); - lock.unlock(); - if (function) function(); - } - }); + std::queue<std::function<void()>> queue; + std::mutex mutex; + std::condition_variable cv; + std::atomic_bool terminated{false}; +}; + +void SingleThreadScheduler::Impl::schedule(std::function<void()> fn) { + assert(fn); + { + std::lock_guard<std::mutex> lock(mutex); + queue.push(std::move(fn)); } + + cv.notify_one(); } -ThreadPool::~ThreadPool() { - { +void SingleThreadScheduler::Impl::terminate() { + if (!terminated) { std::lock_guard<std::mutex> lock(mutex); - terminate = true; + terminated = true; + cv.notify_all(); } +} + +SingleThreadScheduler::SingleThreadScheduler(std::shared_ptr<SingleThreadScheduler::Impl> impl_, + optional<std::string> name_) + : impl(std::move(impl_)), thread([this, name = std::move(name_)]() { + if (name) { + platform::setCurrentThreadName(*name); + } + platform::attachThread(); + + while (true) { + std::unique_lock<std::mutex> lock(impl->mutex); + + impl->cv.wait(lock, [this] { return !impl->queue.empty() || impl->terminated; }); - cv.notify_all(); + if (impl->terminated) { + platform::detachThread(); + return; + } - for (auto& thread : threads) { - thread.join(); + auto function = std::move(impl->queue.front()); + impl->queue.pop(); + lock.unlock(); + if (function) function(); + } + }) { + assert(impl); +} + +SingleThreadScheduler::SingleThreadScheduler(optional<std::string> name_) + : SingleThreadScheduler(Impl::create(), std::move(name_)) {} + +SingleThreadScheduler::~SingleThreadScheduler() { + if (impl) { + impl->terminate(); + if (thread.joinable()) thread.join(); } } -void ThreadPool::schedule(std::function<void()> fn) { - { - std::lock_guard<std::mutex> lock(mutex); - queue.push(std::move(fn)); +void SingleThreadScheduler::schedule(std::function<void()> fn) { + impl->schedule(std::move(fn)); +} + +ThreadPoolScheduler::ThreadPoolScheduler(std::size_t count) : sharedImpl(SingleThreadScheduler::Impl::create()) { + threads.reserve(count); + for (std::size_t i = 0; i < count; ++i) { + threads.emplace_back(sharedImpl, std::string{"Worker "} + util::toString(i + 1)); } +} - cv.notify_one(); +ThreadPoolScheduler::~ThreadPoolScheduler() { + sharedImpl->terminate(); +} + +void ThreadPoolScheduler::schedule(std::function<void()> fn) { + sharedImpl->schedule(std::move(fn)); } } // namespace mbgl diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp index 96fc13bda5..8ec51e3118 100644 --- a/src/mbgl/util/thread_pool.hpp +++ b/src/mbgl/util/thread_pool.hpp @@ -1,28 +1,47 @@ #pragma once -#include <mbgl/actor/mailbox.hpp> #include <mbgl/actor/scheduler.hpp> +#include <mbgl/util/optional.hpp> -#include <condition_variable> -#include <mutex> -#include <queue> +#include <memory> #include <thread> +#include <vector> namespace mbgl { -class ThreadPool final : public Scheduler { +// `SingleThreadScheduler` implements `Scheduler` interface using +// a lightweight event loop on a single thread. Therefore, all scheduled +// tasks are guaranteed to execute consequently. +class SingleThreadScheduler final : public Scheduler { public: - explicit ThreadPool(std::size_t count); - ~ThreadPool() override; + explicit SingleThreadScheduler(optional<std::string> name = nullopt); + ~SingleThreadScheduler() override; + SingleThreadScheduler(SingleThreadScheduler&&) = default; void schedule(std::function<void()>) override; + class Impl; + SingleThreadScheduler(std::shared_ptr<Impl>, optional<std::string> name); + private: - std::vector<std::thread> threads; - std::queue<std::function<void()>> queue; - std::mutex mutex; - std::condition_variable cv; - bool terminate{ false }; + std::shared_ptr<Impl> impl; + std::thread thread; }; +// `ThreadPoolScheduler` schedules tasks on an arbitrary thread from the contained +// thread pool. Some of the scheduled tasks might be executed in parallel. +class ThreadPoolScheduler final : public Scheduler { +public: + explicit ThreadPoolScheduler(std::size_t count); + ~ThreadPoolScheduler() override; + + void schedule(std::function<void()>) override; + +private: + std::shared_ptr<SingleThreadScheduler::Impl> sharedImpl; + std::vector<SingleThreadScheduler> threads; +}; + +using ThreadPool = ThreadPoolScheduler; + } // namespace mbgl |