From f024a3ed79e438b975301e5781b12739e150caf1 Mon Sep 17 00:00:00 2001 From: Mikhail Pozdnyakov Date: Fri, 4 Oct 2019 12:30:41 +0300 Subject: [core] Decouple Scheduler interface from actor model So that it is possible to schedule normal `std::function` and use `mapbox::base::WeakPtr`. --- include/mbgl/actor/mailbox.hpp | 2 ++ include/mbgl/actor/scheduler.hpp | 10 ++++------ include/mbgl/util/run_loop.hpp | 8 ++------ platform/android/src/map_renderer.cpp | 2 +- platform/android/src/map_renderer.hpp | 2 +- platform/android/src/map_renderer_runnable.cpp | 7 +++---- platform/android/src/map_renderer_runnable.hpp | 4 ++-- platform/qt/src/qmapboxgl_scheduler.cpp | 10 +++++----- platform/qt/src/qmapboxgl_scheduler.hpp | 5 ++--- src/mbgl/actor/mailbox.cpp | 12 +++++++++--- src/mbgl/util/thread_pool.cpp | 9 ++++----- src/mbgl/util/thread_pool.hpp | 4 ++-- test/actor/actor.test.cpp | 2 +- 13 files changed, 38 insertions(+), 39 deletions(-) diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp index 23c579917a..2b9838ef29 100644 --- a/include/mbgl/actor/mailbox.hpp +++ b/include/mbgl/actor/mailbox.hpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -34,6 +35,7 @@ public: void receive(); static void maybeReceive(std::weak_ptr); + static std::function makeClosure(std::weak_ptr); private: optional scheduler; diff --git a/include/mbgl/actor/scheduler.hpp b/include/mbgl/actor/scheduler.hpp index 6470ab1245..ca34901cfd 100644 --- a/include/mbgl/actor/scheduler.hpp +++ b/include/mbgl/actor/scheduler.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include namespace mbgl { @@ -31,12 +32,9 @@ class Mailbox; class Scheduler { public: virtual ~Scheduler() = default; - - // Used by a Mailbox when it has a message in its queue to request that it - // be scheduled. Specifically, the scheduler is expected to asynchronously - // call `receive() on the given mailbox, provided it still exists at that - // time. - virtual void schedule(std::weak_ptr) = 0; + + // Enqueues a function for execution. + virtual void schedule(std::function) = 0; // Set/Get the current Scheduler for this thread static Scheduler* GetCurrent(); diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp index 381e3ae213..961573fd87 100644 --- a/include/mbgl/util/run_loop.hpp +++ b/include/mbgl/util/run_loop.hpp @@ -72,12 +72,8 @@ public: push(Priority::Default, task); return std::make_unique(task); } - - void schedule(std::weak_ptr mailbox) override { - invoke([mailbox] () { - Mailbox::maybeReceive(mailbox); - }); - } + + void schedule(std::function fn) override { invoke(std::move(fn)); } class Impl; diff --git a/platform/android/src/map_renderer.cpp b/platform/android/src/map_renderer.cpp index 6be708b994..0c0e907f14 100644 --- a/platform/android/src/map_renderer.cpp +++ b/platform/android/src/map_renderer.cpp @@ -43,7 +43,7 @@ ActorRef MapRenderer::actor() const { return *rendererRef; } -void MapRenderer::schedule(std::weak_ptr scheduled) { +void MapRenderer::schedule(std::function scheduled) { // Create a runnable android::UniqueEnv _env = android::AttachEnv(); auto runnable = std::make_unique(*_env, std::move(scheduled)); diff --git a/platform/android/src/map_renderer.hpp b/platform/android/src/map_renderer.hpp index 5a8ddeeb91..664da20a94 100644 --- a/platform/android/src/map_renderer.hpp +++ b/platform/android/src/map_renderer.hpp @@ -67,7 +67,7 @@ public: // From Scheduler. Schedules by using callbacks to the // JVM to process the mailbox on the right thread. - void schedule(std::weak_ptr scheduled) override; + void schedule(std::function scheduled) override; void requestRender(); diff --git a/platform/android/src/map_renderer_runnable.cpp b/platform/android/src/map_renderer_runnable.cpp index 77c3aa301d..227f49ee3f 100644 --- a/platform/android/src/map_renderer_runnable.cpp +++ b/platform/android/src/map_renderer_runnable.cpp @@ -5,9 +5,8 @@ namespace mbgl { namespace android { -MapRendererRunnable::MapRendererRunnable(jni::JNIEnv& env, std::weak_ptr mailbox_) - : mailbox(std::move(mailbox_)) { - +MapRendererRunnable::MapRendererRunnable(jni::JNIEnv& env, std::function function_) + : function(std::move(function_)) { // Create the Java peer and hold on to a global reference // Not using a weak reference here as this might oerflow // the weak reference table on some devices @@ -21,7 +20,7 @@ MapRendererRunnable::MapRendererRunnable(jni::JNIEnv& env, std::weak_ptr> MapRendererRunnable::peer() { diff --git a/platform/android/src/map_renderer_runnable.hpp b/platform/android/src/map_renderer_runnable.hpp index 21c4369b69..24d0f2af49 100644 --- a/platform/android/src/map_renderer_runnable.hpp +++ b/platform/android/src/map_renderer_runnable.hpp @@ -24,7 +24,7 @@ public: static void registerNative(jni::JNIEnv&); - MapRendererRunnable(jni::JNIEnv&, std::weak_ptr); + MapRendererRunnable(jni::JNIEnv&, std::function); // Only for jni registration, unused MapRendererRunnable(jni::JNIEnv&) { @@ -40,7 +40,7 @@ public: private: jni::Global> javaPeer; - std::weak_ptr mailbox; + std::function function; }; } // namespace android diff --git a/platform/qt/src/qmapboxgl_scheduler.cpp b/platform/qt/src/qmapboxgl_scheduler.cpp index e2d39703ee..5fc3ab13de 100644 --- a/platform/qt/src/qmapboxgl_scheduler.cpp +++ b/platform/qt/src/qmapboxgl_scheduler.cpp @@ -13,10 +13,9 @@ QMapboxGLScheduler::~QMapboxGLScheduler() MBGL_VERIFY_THREAD(tid); } -void QMapboxGLScheduler::schedule(std::weak_ptr mailbox) -{ +void QMapboxGLScheduler::schedule(std::function function) { std::lock_guard lock(m_taskQueueMutex); - m_taskQueue.push(mailbox); + m_taskQueue.push(std::move(function)); // Need to force the main thread to wake // up this thread and process the events. @@ -25,14 +24,15 @@ void QMapboxGLScheduler::schedule(std::weak_ptr mailbox) void QMapboxGLScheduler::processEvents() { - std::queue> taskQueue; + std::queue> taskQueue; { std::unique_lock lock(m_taskQueueMutex); std::swap(taskQueue, m_taskQueue); } while (!taskQueue.empty()) { - mbgl::Mailbox::maybeReceive(taskQueue.front()); + auto& function = taskQueue.front(); + if (function) function(); taskQueue.pop(); } } diff --git a/platform/qt/src/qmapboxgl_scheduler.hpp b/platform/qt/src/qmapboxgl_scheduler.hpp index 68636d0d11..b34dd3d5b8 100644 --- a/platform/qt/src/qmapboxgl_scheduler.hpp +++ b/platform/qt/src/qmapboxgl_scheduler.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include #include @@ -19,7 +18,7 @@ public: virtual ~QMapboxGLScheduler(); // mbgl::Scheduler implementation. - void schedule(std::weak_ptr scheduled) final; + void schedule(std::function scheduled) final; void processEvents(); @@ -30,5 +29,5 @@ private: MBGL_STORE_THREAD(tid); std::mutex m_taskQueueMutex; - std::queue> m_taskQueue; + std::queue> m_taskQueue; }; diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp index dfe0520790..070e14bdb0 100644 --- a/src/mbgl/actor/mailbox.cpp +++ b/src/mbgl/actor/mailbox.cpp @@ -27,7 +27,7 @@ void Mailbox::open(Scheduler& scheduler_) { } if (!queue.empty()) { - (*scheduler)->schedule(shared_from_this()); + (*scheduler)->schedule(makeClosure(shared_from_this())); } } @@ -57,7 +57,7 @@ void Mailbox::push(std::unique_ptr message) { bool wasEmpty = queue.empty(); queue.push(std::move(message)); if (wasEmpty && scheduler) { - (*scheduler)->schedule(shared_from_this()); + (*scheduler)->schedule(makeClosure(shared_from_this())); } } @@ -84,14 +84,20 @@ void Mailbox::receive() { (*message)(); if (!wasEmpty) { - (*scheduler)->schedule(shared_from_this()); + (*scheduler)->schedule(makeClosure(shared_from_this())); } } +// static void Mailbox::maybeReceive(std::weak_ptr mailbox) { if (auto locked = mailbox.lock()) { locked->receive(); } } +// static +std::function Mailbox::makeClosure(std::weak_ptr mailbox) { + return [mailbox]() { maybeReceive(mailbox); }; +} + } // namespace mbgl diff --git a/src/mbgl/util/thread_pool.cpp b/src/mbgl/util/thread_pool.cpp index e839d1b4be..d8df0cd575 100644 --- a/src/mbgl/util/thread_pool.cpp +++ b/src/mbgl/util/thread_pool.cpp @@ -26,11 +26,10 @@ ThreadPool::ThreadPool(std::size_t count) { return; } - auto mailbox = queue.front(); + auto function = std::move(queue.front()); queue.pop(); lock.unlock(); - - Mailbox::maybeReceive(mailbox); + if (function) function(); } }); } @@ -49,10 +48,10 @@ ThreadPool::~ThreadPool() { } } -void ThreadPool::schedule(std::weak_ptr mailbox) { +void ThreadPool::schedule(std::function fn) { { std::lock_guard lock(mutex); - queue.push(mailbox); + queue.push(std::move(fn)); } cv.notify_one(); diff --git a/src/mbgl/util/thread_pool.hpp b/src/mbgl/util/thread_pool.hpp index 509fd06061..96fc13bda5 100644 --- a/src/mbgl/util/thread_pool.hpp +++ b/src/mbgl/util/thread_pool.hpp @@ -15,11 +15,11 @@ public: explicit ThreadPool(std::size_t count); ~ThreadPool() override; - void schedule(std::weak_ptr) override; + void schedule(std::function) override; private: std::vector threads; - std::queue> queue; + std::queue> queue; std::mutex mutex; std::condition_variable cv; bool terminate{ false }; diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp index 6db95a83f1..4b152f471a 100644 --- a/test/actor/actor.test.cpp +++ b/test/actor/actor.test.cpp @@ -101,7 +101,7 @@ TEST(Actor, DestructionBlocksOnSend) { EXPECT_TRUE(waited.load()); } - void schedule(std::weak_ptr) final { + void schedule(std::function) final { promise.set_value(); future.wait(); std::this_thread::sleep_for(1ms); -- cgit v1.2.1