diff options
author | Anand Thakker <anandthakker@users.noreply.github.com> | 2018-07-03 17:17:39 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-03 17:17:39 -0400 |
commit | cfd436c287f4209d0d994042452ccbb552a6bd28 (patch) | |
tree | 6811590928d7ea19db8e8b3f9db8d1df54ba9965 /test | |
parent | 840a5cf1207ed78df3302211a23d369dd3c12b89 (diff) | |
download | qtlocation-mapboxgl-cfd436c287f4209d0d994042452ccbb552a6bd28.tar.gz |
[core] Avoid blocking in Thread<Object> constructor (#12151)
* Introduce AspiringActor, EstablishedActor
This pair of objects represents the two-phase (parent-thread /
child-thread) construction that's needed to support constructing
Thread<Object> without blocking until the child thread is up and
running.
An `AspiringActor<O>` is responsible for:
- ownership of the actor's `Mailbox`
- allocating the memory for (but *not* constructing) the target object `O`
Using these two pieces--the mailbox and a stable address for `O`--an
`AspiringActor<O>` can accept messages for the target object, or provide
`ActorRef<O>`s that do so, before the object has actually been
constructed by the corresponding `EstablishedActor<O>`. (Such messages
are queued in the mailbox until after the object is constructed.)
This allows for an `AspiringActor<O>` to be created and safely used by a
thread other than the one on which the target object will (eventually)
live.
An `EstablishedActor<O>` is responsible for managing the lifetime of the
target object `O` and the open/closed state of the parent's `mailbox`.
The `O` object's lifetime is contained by that of its owning
`EstablishedActor<O>`: the `EstablishedActor` constructor executes the
`O` constructor via "placement new", constructing it at the address
provided by the parent `AspiringActor`, and the `~EstablishedActor`
destructor similarly executes the `~O` destructor (after closing the
mailbox). `EstablishedActor` should therefore live entirely on the
thread intended to own `O`.
* Remove Actor#{invoke,ask}
Diffstat (limited to 'test')
-rw-r--r-- | test/actor/actor.test.cpp | 99 | ||||
-rw-r--r-- | test/util/thread.test.cpp | 61 |
2 files changed, 144 insertions, 16 deletions
diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp index 967dc152d9..7493abe263 100644 --- a/test/actor/actor.test.cpp +++ b/test/actor/actor.test.cpp @@ -1,5 +1,6 @@ #include <mbgl/actor/actor.hpp> #include <mbgl/util/default_thread_pool.hpp> +#include <mbgl/util/run_loop.hpp> #include <mbgl/test/util.hpp> @@ -7,13 +8,12 @@ #include <functional> #include <future> #include <memory> +#include <thread> using namespace mbgl; using namespace std::chrono_literals; TEST(Actor, Construction) { - // Construction is currently synchronous. It may become asynchronous in the future. - struct Test { Test(ActorRef<Test>, bool& constructed) { constructed = true; @@ -27,6 +27,25 @@ TEST(Actor, Construction) { EXPECT_TRUE(constructed); } +TEST(Actor, Destruction) { + struct Test { + Test(ActorRef<Test>, bool& destructed_) : destructed(destructed_) {}; + ~Test() { + destructed = true; + } + + bool& destructed; + }; + + ThreadPool pool { 1 }; + bool destructed = false; + { + Actor<Test> test(pool, std::ref(destructed)); + } + + EXPECT_TRUE(destructed); +} + TEST(Actor, DestructionBlocksOnReceive) { // Destruction blocks until the actor is not receiving. @@ -63,7 +82,7 @@ TEST(Actor, DestructionBlocksOnReceive) { Actor<Test> test(pool, std::move(enteredPromise), std::move(exitingFuture)); - test.invoke(&Test::wait); + test.self().invoke(&Test::wait); enteredFuture.wait(); exitingPromise.set_value(); } @@ -145,7 +164,7 @@ TEST(Actor, DestructionAllowedInReceiveOnSameThread) { auto test = std::make_unique<Actor<Test>>(pool); // Callback (triggered while mutex is locked in Mailbox::receive()) - test->invoke(&Test::callMeBack, [&]() { + test->self().invoke(&Test::callMeBack, [&]() { // Destroy the Actor/Mailbox in the same thread test.reset(); callbackFiredPromise.set_value(); @@ -180,16 +199,16 @@ TEST(Actor, SelfDestructionDoesntCrashWaitingReceivingThreads) { std::atomic<bool> waitingMessageProcessed {false}; // Callback (triggered while mutex is locked in Mailbox::receive()) - closingActor->invoke(&Test::callMeBack, [&]() { + closingActor->self().invoke(&Test::callMeBack, [&]() { // Queue up another message from another thread std::promise<void> messageQueuedPromise; - waitingActor->invoke(&Test::callMeBack, [&]() { + waitingActor->self().invoke(&Test::callMeBack, [&]() { // This will be waiting on the mutex in // Mailbox::receive(), holding a lock // on the weak_ptr so the mailbox is not // destroyed - closingActor->invoke(&Test::callMeBack, [&]() { + closingActor->self().invoke(&Test::callMeBack, [&]() { waitingMessageProcessed.store(true); }); messageQueuedPromise.set_value(); @@ -239,10 +258,10 @@ TEST(Actor, OrderedMailbox) { Actor<Test> test(pool, std::move(endedPromise)); for (auto i = 1; i <= 10; ++i) { - test.invoke(&Test::receive, i); + test.self().invoke(&Test::receive, i); } - test.invoke(&Test::end); + test.self().invoke(&Test::end); endedFuture.wait(); } @@ -275,10 +294,10 @@ TEST(Actor, NonConcurrentMailbox) { Actor<Test> test(pool, std::move(endedPromise)); for (auto i = 1; i <= 10; ++i) { - test.invoke(&Test::receive, i); + test.self().invoke(&Test::receive, i); } - test.invoke(&Test::end); + test.self().invoke(&Test::end); endedFuture.wait(); } @@ -297,7 +316,7 @@ TEST(Actor, Ask) { ThreadPool pool { 2 }; Actor<Test> test(pool); - auto result = test.ask(&Test::doubleIt, 1); + auto result = test.self().ask(&Test::doubleIt, 1); ASSERT_TRUE(result.valid()); @@ -324,7 +343,7 @@ TEST(Actor, AskVoid) { bool executed = false; Actor<Test> actor(pool, executed); - actor.ask(&Test::doIt).get(); + actor.self().ask(&Test::doIt).get(); EXPECT_TRUE(executed); } @@ -355,6 +374,58 @@ TEST(Actor, NoSelfActorRef) { auto future = promise.get_future(); Actor<WithArguments> withArguments(pool, std::move(promise)); - withArguments.invoke(&WithArguments::receive); + withArguments.self().invoke(&WithArguments::receive); future.wait(); } + +TEST(Actor, TwoPhaseConstruction) { + // This test mimics, in simplified form, the approach used by the Thread<Object> to construct + // its actor in two parts so that the Thread<Object> instance can be created without waiting + // for the target thread to be up and running. + + struct Test { + Test(ActorRef<Test>, std::shared_ptr<bool> destroyed_) + : destroyed(std::move(destroyed_)) {}; + + ~Test() { + *destroyed = true; + } + + void callMe(std::promise<void> p) { + p.set_value(); + } + + void stop() { + util::RunLoop::Get()->stop(); + } + + std::shared_ptr<bool> destroyed; + }; + + AspiringActor<Test> parent; + + auto destroyed = std::make_shared<bool>(false); + + std::promise<void> queueExecuted; + auto queueExecutedFuture = queueExecuted.get_future(); + + parent.self().invoke(&Test::callMe, std::move(queueExecuted)); + parent.self().invoke(&Test::stop); + + auto thread = std::thread([ + capturedArgs = std::make_tuple(destroyed), + &parent + ] () mutable { + util::RunLoop loop(util::RunLoop::Type::New); + EstablishedActor<Test> test(loop, parent, capturedArgs); + loop.run(); + }); + + // should not hang + queueExecutedFuture.get(); + thread.join(); + + EXPECT_TRUE(*destroyed); +} + + diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp index 76fb5ce3f0..2bcb9d8959 100644 --- a/test/util/thread.test.cpp +++ b/test/util/thread.test.cpp @@ -15,11 +15,11 @@ class TestObject { public: TestObject(ActorRef<TestObject>, std::thread::id otherTid) : tid(std::this_thread::get_id()) { - EXPECT_NE(tid, otherTid); + EXPECT_NE(tid, otherTid); // Object is created on child thread } ~TestObject() { - EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(tid, std::this_thread::get_id()); // Object is destroyed on child thread } void fn1(int val) const { @@ -275,3 +275,60 @@ TEST(Thread, PauseResume) { thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); loop.run(); } + + +class TestWorkerDelayedConstruction { +public: + TestWorkerDelayedConstruction(ActorRef<TestWorkerDelayedConstruction>, std::future<void> start) { + start.get(); + } + + void send(std::function<void ()> cb) { + cb(); + } + +private: + Timer timer; +}; + +TEST(Thread, InvokeBeforeChildStarts) { + RunLoop loop; + + std::promise<void> start; + Thread<TestWorkerDelayedConstruction> thread("Test", start.get_future()); + + std::atomic<int> count { 0 }; + + for (unsigned i = 0; i < 100; ++i) { + thread.actor().invoke(&TestWorkerDelayedConstruction::send, [&] { ++count; }); + } + + thread.actor().invoke(&TestWorkerDelayedConstruction::send, [&] { loop.stop(); }); + + // This test will be flaky if messages are consumed before the target object is constructed. + ASSERT_EQ(count, 0); + + start.set_value(); + + loop.run(); + + ASSERT_EQ(count, 100); +} + +TEST(Thread, DeleteBeforeChildStarts) { + std::atomic_bool flag(false); + std::promise<void> start; + + Thread<TestWorker> control("Control"); + auto thread = std::make_unique<Thread<TestWorkerDelayedConstruction>>("Test", start.get_future()); + + thread->actor().invoke(&TestWorkerDelayedConstruction::send, [&] { flag = true; }); + + control.actor().invoke(&TestWorker::sendDelayed, [&] { start.set_value(); }); + + // Should not hang. + thread.reset(); + + // Should process the queue before destruction. + ASSERT_TRUE(flag); +} |