diff options
-rw-r--r-- | include/mbgl/actor/actor.hpp | 8 | ||||
-rw-r--r-- | include/mbgl/util/thread.hpp | 29 | ||||
-rw-r--r-- | test/util/thread.test.cpp | 28 |
3 files changed, 65 insertions, 0 deletions
diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp index a0df19208e..93de4a948f 100644 --- a/include/mbgl/actor/actor.hpp +++ b/include/mbgl/actor/actor.hpp @@ -45,6 +45,11 @@ namespace mbgl { purpose of the actor model: prohibiting direct concurrent access to shared state. */ + +namespace util { +template <class> class Thread; +} // namespace util + template <class Object> class Actor : public util::noncopyable { public: @@ -91,6 +96,9 @@ public: } private: + template<typename U> + friend class util::Thread; + std::shared_ptr<Mailbox> mailbox; Object object; }; diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index 06254569a9..c0cbc532ce 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -131,6 +131,13 @@ public: } private: + template <class U> + friend class BlockingThreadGuard; + + Object& getObject() { + return object->object; + } + MBGL_STORE_THREAD(tid); void schedule(std::weak_ptr<Mailbox> mailbox) override { @@ -157,11 +164,33 @@ private: std::thread thread; std::unique_ptr<Actor<Object>> object; + std::mutex pauseMutex; std::unique_ptr<std::promise<void>> paused; std::unique_ptr<std::promise<void>> resumed; util::RunLoop* loop = nullptr; }; + +template <class Object> +class BlockingThreadGuard { +public: + BlockingThreadGuard(Thread<Object>& thread_) + : thread(thread_) { + thread.pause(); + } + + ~BlockingThreadGuard() { + thread.resume(); + } + + Object& object() { + return thread.getObject(); + } + +private: + Thread<Object>& thread; +}; + } // namespace util } // namespace mbgl diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp index 228e463d9e..1bc55eadda 100644 --- a/test/util/thread.test.cpp +++ b/test/util/thread.test.cpp @@ -6,6 +6,7 @@ #include <mbgl/util/timer.hpp> #include <atomic> +#include <chrono> #include <memory> #include <thread> @@ -308,3 +309,30 @@ TEST(Thread, PauseResumeMultiThreaded) { thread1.join(); thread2.join(); } + +TEST(Thread, DirectAccess) { + + Thread<TestWorker> test("Test"); + + // Use the thread's object directly + std::atomic<bool> flag { false }; + auto guard = std::make_unique<BlockingThreadGuard<TestWorker>>( test ); + guard->object().send([&] { flag = true; }); + ASSERT_TRUE(flag); + + // Ensure messages queued up are processed + std::atomic<bool> message1Consumed { false }; + test.actor().invoke(&TestWorker::send, [&]() { message1Consumed = true; }); + + // Release the guard + guard.reset(); + + // Ensure messages send after releasing the guard are processed + std::atomic<bool> message2Consumed { false }; + test.actor().invoke(&TestWorker::send, [&]() { message2Consumed = true; }); + + while (!message1Consumed && !message2Consumed) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(10ms); + }; +} |