summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com>2019-10-07 15:45:45 +0300
committerMikhail Pozdnyakov <mikhail.pozdnyakov@mapbox.com>2019-10-07 16:42:40 +0300
commit23cebe20fed0e3ebc3328c6345af4e32d86cfcf5 (patch)
treeaf3c1837435967b0e2ec425b3c2a00822166d38b
parentbd283fc1be2f90ce02b37617411a0ce4246d898e (diff)
downloadqtlocation-mapboxgl-23cebe20fed0e3ebc3328c6345af4e32d86cfcf5.tar.gz
[core] Introduce SequencedScheduler and ParallelScheduler
This commit refactors `utils::ThreadPool` into a template `ThreadedScheduler` class and provides aux type aliases. So that it is possible to obtain a sequenced schedule,where all the scheduled tasks are guarantied to be executed consequently. The sequenced lightweight scheduler is required by both the orchestration thread and the refactored `FileSource` implementation.
-rw-r--r--src/mbgl/actor/scheduler.cpp2
-rw-r--r--src/mbgl/util/thread_pool.cpp62
-rw-r--r--src/mbgl/util/thread_pool.hpp53
3 files changed, 74 insertions, 43 deletions
diff --git a/src/mbgl/actor/scheduler.cpp b/src/mbgl/actor/scheduler.cpp
index cb0c7728ec..5fd9a133bd 100644
--- a/src/mbgl/actor/scheduler.cpp
+++ b/src/mbgl/actor/scheduler.cpp
@@ -28,7 +28,7 @@ std::shared_ptr<Scheduler> Scheduler::GetBackground() {
std::shared_ptr<Scheduler> scheduler = weak.lock();
if (!scheduler) {
- weak = scheduler = std::make_shared<ThreadPool>(4);
+ weak = scheduler = std::make_shared<ThreadPool>();
}
return scheduler;
diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp
index d8df0cd575..040e996dd4 100644
--- a/src/mbgl/util/thread_pool.cpp
+++ b/src/mbgl/util/thread_pool.cpp
@@ -6,49 +6,41 @@
namespace mbgl {
-ThreadPool::ThreadPool(std::size_t count) {
- threads.reserve(count);
-
- for (std::size_t i = 0; i < count; ++i) {
- threads.emplace_back([this, i]() {
- platform::setCurrentThreadName(std::string{ "Worker " } + util::toString(i + 1));
- platform::attachThread();
-
- while (true) {
- std::unique_lock<std::mutex> lock(mutex);
-
- cv.wait(lock, [this] {
- return !queue.empty() || terminate;
- });
-
- if (terminate) {
- platform::detachThread();
- return;
- }
-
- auto function = std::move(queue.front());
- queue.pop();
- lock.unlock();
- if (function) function();
- }
- });
- }
-}
+ThreadedSchedulerBase::~ThreadedSchedulerBase() = default;
-ThreadPool::~ThreadPool() {
+void ThreadedSchedulerBase::terminate() {
{
std::lock_guard<std::mutex> lock(mutex);
- terminate = true;
+ terminated = true;
}
-
cv.notify_all();
+}
- for (auto& thread : threads) {
- thread.join();
- }
+std::thread ThreadedSchedulerBase::makeSchedulerThread(size_t index) {
+ return std::thread([this, index]() {
+ platform::setCurrentThreadName(std::string{"Worker "} + util::toString(index + 1));
+ platform::attachThread();
+
+ while (true) {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ cv.wait(lock, [this] { return !queue.empty() || terminated; });
+
+ if (terminated) {
+ platform::detachThread();
+ return;
+ }
+
+ auto function = std::move(queue.front());
+ queue.pop();
+ lock.unlock();
+ if (function) function();
+ }
+ });
}
-void ThreadPool::schedule(std::function<void()> fn) {
+void ThreadedSchedulerBase::schedule(std::function<void()> fn) {
+ assert(fn);
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(std::move(fn));
diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp
index 96fc13bda5..f302e50914 100644
--- a/src/mbgl/util/thread_pool.hpp
+++ b/src/mbgl/util/thread_pool.hpp
@@ -3,6 +3,7 @@
#include <mbgl/actor/mailbox.hpp>
#include <mbgl/actor/scheduler.hpp>
+#include <array>
#include <condition_variable>
#include <mutex>
#include <queue>
@@ -10,19 +11,57 @@
namespace mbgl {
-class ThreadPool final : public Scheduler {
+class ThreadedSchedulerBase : public Scheduler {
public:
- explicit ThreadPool(std::size_t count);
- ~ThreadPool() override;
-
void schedule(std::function<void()>) override;
-private:
- std::vector<std::thread> threads;
+protected:
+ ThreadedSchedulerBase() = default;
+ ~ThreadedSchedulerBase() override;
+
+ void terminate();
+ std::thread makeSchedulerThread(size_t index);
+
std::queue<std::function<void()>> queue;
std::mutex mutex;
std::condition_variable cv;
- bool terminate{ false };
+ bool terminated{false};
};
+/**
+ * @brief ThreadScheduler implements Scheduler interface using a lightweight event loop
+ *
+ * @tparam N number of threads
+ *
+ * Note: If N == 1 all scheduled tasks are guaranteed to execute consequently;
+ * otherwise, some of the scheduled tasks might be executed in parallel.
+ */
+template <std::size_t N>
+class ThreadedScheduler : public ThreadedSchedulerBase {
+public:
+ ThreadedScheduler() {
+ for (std::size_t i = 0u; i < N; ++i) {
+ threads[i] = makeSchedulerThread(i);
+ }
+ }
+
+ ~ThreadedScheduler() override {
+ terminate();
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ }
+
+private:
+ std::array<std::thread, N> threads;
+ static_assert(N > 0, "Thread count must be more than zero.");
+};
+
+using SequencedScheduler = ThreadedScheduler<1>;
+
+template <std::size_t extra>
+using ParallelScheduler = ThreadedScheduler<1 + extra>;
+
+using ThreadPool = ParallelScheduler<3>;
+
} // namespace mbgl