From bfac9b6292fdb4fe167a69e652be5751b2e2f4b8 Mon Sep 17 00:00:00 2001 From: Ivo van Dongen Date: Fri, 21 Jul 2017 15:54:26 +0300 Subject: [core] allow safe direct access to actor on single threaded scheduler --- include/mbgl/actor/actor.hpp | 8 ++++++++ include/mbgl/util/thread.hpp | 29 +++++++++++++++++++++++++++++ test/util/thread.test.cpp | 28 ++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp index a0df19208e..93de4a948f 100644 --- a/include/mbgl/actor/actor.hpp +++ b/include/mbgl/actor/actor.hpp @@ -45,6 +45,11 @@ namespace mbgl { purpose of the actor model: prohibiting direct concurrent access to shared state. */ + +namespace util { +template class Thread; +} // namespace util + template class Actor : public util::noncopyable { public: @@ -91,6 +96,9 @@ public: } private: + template + friend class util::Thread; + std::shared_ptr mailbox; Object object; }; diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index 06254569a9..c0cbc532ce 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -131,6 +131,13 @@ public: } private: + template + friend class BlockingThreadGuard; + + Object& getObject() { + return object->object; + } + MBGL_STORE_THREAD(tid); void schedule(std::weak_ptr mailbox) override { @@ -157,11 +164,33 @@ private: std::thread thread; std::unique_ptr> object; + std::mutex pauseMutex; std::unique_ptr> paused; std::unique_ptr> resumed; util::RunLoop* loop = nullptr; }; + +template +class BlockingThreadGuard { +public: + BlockingThreadGuard(Thread& thread_) + : thread(thread_) { + thread.pause(); + } + + ~BlockingThreadGuard() { + thread.resume(); + } + + Object& object() { + return thread.getObject(); + } + +private: + Thread& thread; +}; + } // namespace util } // namespace mbgl diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp index 228e463d9e..1bc55eadda 100644 --- a/test/util/thread.test.cpp +++ b/test/util/thread.test.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -308,3 +309,30 @@ TEST(Thread, PauseResumeMultiThreaded) { thread1.join(); thread2.join(); } + +TEST(Thread, DirectAccess) { + + Thread test("Test"); + + // Use the thread's object directly + std::atomic flag { false }; + auto guard = std::make_unique>( test ); + guard->object().send([&] { flag = true; }); + ASSERT_TRUE(flag); + + // Ensure messages queued up are processed + std::atomic message1Consumed { false }; + test.actor().invoke(&TestWorker::send, [&]() { message1Consumed = true; }); + + // Release the guard + guard.reset(); + + // Ensure messages send after releasing the guard are processed + std::atomic message2Consumed { false }; + test.actor().invoke(&TestWorker::send, [&]() { message2Consumed = true; }); + + while (!message1Consumed && !message2Consumed) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(10ms); + }; +} -- cgit v1.2.1