From 2c8343aa8233333dae0caf91f25bf1fe4689c88e Mon Sep 17 00:00:00 2001 From: John Firebaugh Date: Mon, 22 May 2017 16:05:58 -0700 Subject: [core] Block in Mailbox::close() until neither receive nor push are in progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Otherwise, an ActorRef that's in the process of sending a message could attempt to access an invalid Scheduler reference: Thread 1 Thread 2 -------------------------------------------------- Scheduler::Scheduler Actor::Actor weakMailbox.lock() Actor::~Actor Scheduler::~Scheduler mailbox->push() scheduler.schedule() 💣 --- include/mbgl/actor/mailbox.hpp | 6 +++-- src/mbgl/actor/mailbox.cpp | 27 +++++++++++++------- test/actor/actor.test.cpp | 58 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp index cff0de243a..1327e8e6fe 100644 --- a/include/mbgl/actor/mailbox.hpp +++ b/include/mbgl/actor/mailbox.hpp @@ -23,8 +23,10 @@ public: private: Scheduler& scheduler; - std::mutex closingMutex; - bool closing { false }; + std::mutex receivingMutex; + std::mutex pushingMutex; + + bool closed { false }; std::mutex queueMutex; std::queue> queue; diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp index 5f60629833..947f6f9028 100644 --- a/src/mbgl/actor/mailbox.cpp +++ b/src/mbgl/actor/mailbox.cpp @@ -10,8 +10,23 @@ Mailbox::Mailbox(Scheduler& scheduler_) : scheduler(scheduler_) { } +void Mailbox::close() { + // Block until neither receive() nor push() are in progress. Two mutexes are used because receive() + // must not block send(). Of the two, the receiving mutex must be acquired first, because that is + // the order that an actor will obtain them when it self-sends a message, and consistent lock + // acquisition order prevents deadlocks. + std::lock_guard receivingLock(receivingMutex); + std::lock_guard pushingLock(pushingMutex); + + closed = true; +} + void Mailbox::push(std::unique_ptr message) { - assert(!closing); + std::lock_guard pushingLock(pushingMutex); + + if (closed) { + return; + } std::lock_guard queueLock(queueMutex); bool wasEmpty = queue.empty(); @@ -21,16 +36,10 @@ void Mailbox::push(std::unique_ptr message) { } } -void Mailbox::close() { - // Block until the scheduler is guaranteed not to be executing receive(). - std::lock_guard closingLock(closingMutex); - closing = true; -} - void Mailbox::receive() { - std::lock_guard closingLock(closingMutex); + std::lock_guard receivingLock(receivingMutex); - if (closing) { + if (closed) { return; } diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp index 03f41a6e64..9db6882889 100644 --- a/test/actor/actor.test.cpp +++ b/test/actor/actor.test.cpp @@ -26,7 +26,7 @@ TEST(Actor, Construction) { EXPECT_TRUE(constructed); } -TEST(Actor, DestructionClosesMailbox) { +TEST(Actor, DestructionBlocksOnReceive) { // Destruction blocks until the actor is not receiving. struct Test { @@ -67,6 +67,62 @@ TEST(Actor, DestructionClosesMailbox) { exitingPromise.set_value(); } +TEST(Actor, DestructionBlocksOnSend) { + // Destruction blocks until the actor is not being sent a message. + + struct TestScheduler : public Scheduler { + std::promise promise; + std::future future; + std::atomic waited; + + TestScheduler(std::promise promise_, std::future future_) + : promise(std::move(promise_)), + future(std::move(future_)), + waited(false) { + } + + ~TestScheduler() { + EXPECT_TRUE(waited.load()); + } + + void schedule(std::weak_ptr) final { + promise.set_value(); + future.wait(); + std::this_thread::sleep_for(1ms); + waited = true; + } + }; + + struct Test { + Test(ActorRef) {} + void message() {} + }; + + std::promise enteredPromise; + std::future enteredFuture = enteredPromise.get_future(); + + std::promise exitingPromise; + std::future exitingFuture = exitingPromise.get_future(); + + auto scheduler = std::make_unique(std::move(enteredPromise), std::move(exitingFuture)); + auto actor = std::make_unique>(*scheduler); + + std::thread thread { + [] (ActorRef ref) { + ref.invoke(&Test::message); + }, + actor->self() + }; + + enteredFuture.wait(); + exitingPromise.set_value(); + + actor.reset(); + scheduler.reset(); + + thread.join(); +} + TEST(Actor, OrderedMailbox) { // Messages are processed in order. -- cgit v1.2.1