From 23cebe20fed0e3ebc3328c6345af4e32d86cfcf5 Mon Sep 17 00:00:00 2001 From: Mikhail Pozdnyakov Date: Mon, 7 Oct 2019 15:45:45 +0300 Subject: [core] Introduce SequencedScheduler and ParallelScheduler This commit refactors `utils::ThreadPool` into a template `ThreadedScheduler` class and provides aux type aliases. So that it is possible to obtain a sequenced schedule,where all the scheduled tasks are guarantied to be executed consequently. The sequenced lightweight scheduler is required by both the orchestration thread and the refactored `FileSource` implementation. --- src/mbgl/actor/scheduler.cpp | 2 +- src/mbgl/util/thread_pool.cpp | 62 +++++++++++++++++++------------------------ src/mbgl/util/thread_pool.hpp | 53 +++++++++++++++++++++++++++++++----- 3 files changed, 74 insertions(+), 43 deletions(-) diff --git a/src/mbgl/actor/scheduler.cpp b/src/mbgl/actor/scheduler.cpp index cb0c7728ec..5fd9a133bd 100644 --- a/src/mbgl/actor/scheduler.cpp +++ b/src/mbgl/actor/scheduler.cpp @@ -28,7 +28,7 @@ std::shared_ptr Scheduler::GetBackground() { std::shared_ptr scheduler = weak.lock(); if (!scheduler) { - weak = scheduler = std::make_shared(4); + weak = scheduler = std::make_shared(); } return scheduler; diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp index d8df0cd575..040e996dd4 100644 --- a/src/mbgl/util/thread_pool.cpp +++ b/src/mbgl/util/thread_pool.cpp @@ -6,49 +6,41 @@ namespace mbgl { -ThreadPool::ThreadPool(std::size_t count) { - threads.reserve(count); - - for (std::size_t i = 0; i < count; ++i) { - threads.emplace_back([this, i]() { - platform::setCurrentThreadName(std::string{ "Worker " } + util::toString(i + 1)); - platform::attachThread(); - - while (true) { - std::unique_lock lock(mutex); - - cv.wait(lock, [this] { - return !queue.empty() || terminate; - }); - - if (terminate) { - platform::detachThread(); - return; - } - - auto function = std::move(queue.front()); - queue.pop(); - lock.unlock(); - if (function) function(); - } - }); - } -} +ThreadedSchedulerBase::~ThreadedSchedulerBase() = default; -ThreadPool::~ThreadPool() { +void ThreadedSchedulerBase::terminate() { { std::lock_guard lock(mutex); - terminate = true; + terminated = true; } - cv.notify_all(); +} - for (auto& thread : threads) { - thread.join(); - } +std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) { + return std::thread([this, index]() { + platform::setCurrentThreadName(std::string{"Worker "} + util::toString(index + 1)); + platform::attachThread(); + + while (true) { + std::unique_lock lock(mutex); + + cv.wait(lock, [this] { return !queue.empty() || terminated; }); + + if (terminated) { + platform::detachThread(); + return; + } + + auto function = std::move(queue.front()); + queue.pop(); + lock.unlock(); + if (function) function(); + } + }); } -void ThreadPool::schedule(std::function fn) { +void ThreadedSchedulerBase::schedule(std::function fn) { + assert(fn); { std::lock_guard lock(mutex); queue.push(std::move(fn)); diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp index 96fc13bda5..f302e50914 100644 --- a/src/mbgl/util/thread_pool.hpp +++ b/src/mbgl/util/thread_pool.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -10,19 +11,57 @@ namespace mbgl { -class ThreadPool final : public Scheduler { +class ThreadedSchedulerBase : public Scheduler { public: - explicit ThreadPool(std::size_t count); - ~ThreadPool() override; - void schedule(std::function) override; -private: - std::vector threads; +protected: + ThreadedSchedulerBase() = default; + ~ThreadedSchedulerBase() override; + + void terminate(); + std::thread makeSchedulerThread(size_t index); + std::queue> queue; std::mutex mutex; std::condition_variable cv; - bool terminate{ false }; + bool terminated{false}; }; +/** + * @brief ThreadScheduler implements Scheduler interface using a lightweight event loop + * + * @tparam N number of threads + * + * Note: If N == 1 all scheduled tasks are guaranteed to execute consequently; + * otherwise, some of the scheduled tasks might be executed in parallel. + */ +template +class ThreadedScheduler : public ThreadedSchedulerBase { +public: + ThreadedScheduler() { + for (std::size_t i = 0u; i < N; ++i) { + threads[i] = makeSchedulerThread(i); + } + } + + ~ThreadedScheduler() override { + terminate(); + for (auto& thread : threads) { + thread.join(); + } + } + +private: + std::array threads; + static_assert(N > 0, "Thread count must be more than zero."); +}; + +using SequencedScheduler = ThreadedScheduler<1>; + +template +using ParallelScheduler = ThreadedScheduler<1 + extra>; + +using ThreadPool = ParallelScheduler<3>; + } // namespace mbgl -- cgit v1.2.1