diff options
author | Mikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com> | 2019-10-07 15:45:45 +0300 |
---|---|---|
committer | Mikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com> | 2019-10-07 16:42:40 +0300 |
commit | 23cebe20fed0e3ebc3328c6345af4e32d86cfcf5 (patch) | |
tree | af3c1837435967b0e2ec425b3c2a00822166d38b /src/mbgl | |
parent | bd283fc1be2f90ce02b37617411a0ce4246d898e (diff) | |
download | qtlocation-mapboxgl-23cebe20fed0e3ebc3328c6345af4e32d86cfcf5.tar.gz |
[core] Introduce SequencedScheduler and ParallelScheduler
This commit refactors `utils::ThreadPool` into a template
`ThreadedScheduler` class and provides aux type aliases.
So that it is possible to obtain a sequenced schedule,where
all the scheduled tasks are guarantied to be executed
consequently.
The sequenced lightweight scheduler is required by both the
orchestration thread and the refactored `FileSource` implementation.
Diffstat (limited to 'src/mbgl')
-rw-r--r-- | src/mbgl/actor/scheduler.cpp | 2 | ||||
-rw-r--r-- | src/mbgl/util/thread_pool.cpp | 62 | ||||
-rw-r--r-- | src/mbgl/util/thread_pool.hpp | 53 |
3 files changed, 74 insertions, 43 deletions
diff --git a/src/mbgl/actor/scheduler.cpp b/src/mbgl/actor/scheduler.cpp index cb0c7728ec..5fd9a133bd 100644 --- a/src/mbgl/actor/scheduler.cpp +++ b/src/mbgl/actor/scheduler.cpp @@ -28,7 +28,7 @@ std::shared_ptr<Scheduler> Scheduler::GetBackground() { std::shared_ptr<Scheduler> scheduler = weak.lock(); if (!scheduler) { - weak = scheduler = std::make_shared<ThreadPool>(4); + weak = scheduler = std::make_shared<ThreadPool>(); } return scheduler; diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp index d8df0cd575..040e996dd4 100644 --- a/src/mbgl/util/thread_pool.cpp +++ b/src/mbgl/util/thread_pool.cpp @@ -6,49 +6,41 @@ namespace mbgl { -ThreadPool::ThreadPool(std::size_t count) { - threads.reserve(count); - - for (std::size_t i = 0; i < count; ++i) { - threads.emplace_back([this, i]() { - platform::setCurrentThreadName(std::string{ "Worker " } + util::toString(i + 1)); - platform::attachThread(); - - while (true) { - std::unique_lock<std::mutex> lock(mutex); - - cv.wait(lock, [this] { - return !queue.empty() || terminate; - }); - - if (terminate) { - platform::detachThread(); - return; - } - - auto function = std::move(queue.front()); - queue.pop(); - lock.unlock(); - if (function) function(); - } - }); - } -} +ThreadedSchedulerBase::~ThreadedSchedulerBase() = default; -ThreadPool::~ThreadPool() { +void ThreadedSchedulerBase::terminate() { { std::lock_guard<std::mutex> lock(mutex); - terminate = true; + terminated = true; } - cv.notify_all(); +} - for (auto& thread : threads) { - thread.join(); - } +std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) { + return std::thread([this, index]() { + platform::setCurrentThreadName(std::string{"Worker "} + util::toString(index + 1)); + platform::attachThread(); + + while (true) { + std::unique_lock<std::mutex> lock(mutex); + + cv.wait(lock, [this] { return !queue.empty() || terminated; }); + + if (terminated) { + platform::detachThread(); + return; + } + + auto function = std::move(queue.front()); + queue.pop(); + lock.unlock(); + if (function) function(); + } + }); } -void ThreadPool::schedule(std::function<void()> fn) { +void ThreadedSchedulerBase::schedule(std::function<void()> fn) { + assert(fn); { std::lock_guard<std::mutex> lock(mutex); queue.push(std::move(fn)); diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp index 96fc13bda5..f302e50914 100644 --- a/src/mbgl/util/thread_pool.hpp +++ b/src/mbgl/util/thread_pool.hpp @@ -3,6 +3,7 @@ #include <mbgl/actor/mailbox.hpp> #include <mbgl/actor/scheduler.hpp> +#include <array> #include <condition_variable> #include <mutex> #include <queue> @@ -10,19 +11,57 @@ namespace mbgl { -class ThreadPool final : public Scheduler { +class ThreadedSchedulerBase : public Scheduler { public: - explicit ThreadPool(std::size_t count); - ~ThreadPool() override; - void schedule(std::function<void()>) override; -private: - std::vector<std::thread> threads; +protected: + ThreadedSchedulerBase() = default; + ~ThreadedSchedulerBase() override; + + void terminate(); + std::thread makeSchedulerThread(size_t index); + std::queue<std::function<void()>> queue; std::mutex mutex; std::condition_variable cv; - bool terminate{ false }; + bool terminated{false}; }; +/** + * @brief ThreadScheduler implements Scheduler interface using a lightweight event loop + * + * @tparam N number of threads + * + * Note: If N == 1 all scheduled tasks are guaranteed to execute consequently; + * otherwise, some of the scheduled tasks might be executed in parallel. + */ +template <std::size_t N> +class ThreadedScheduler : public ThreadedSchedulerBase { +public: + ThreadedScheduler() { + for (std::size_t i = 0u; i < N; ++i) { + threads[i] = makeSchedulerThread(i); + } + } + + ~ThreadedScheduler() override { + terminate(); + for (auto& thread : threads) { + thread.join(); + } + } + +private: + std::array<std::thread, N> threads; + static_assert(N > 0, "Thread count must be more than zero."); +}; + +using SequencedScheduler = ThreadedScheduler<1>; + +template <std::size_t extra> +using ParallelScheduler = ThreadedScheduler<1 + extra>; + +using ThreadPool = ParallelScheduler<3>; + } // namespace mbgl |