diff options
Diffstat (limited to 'test/actor/actor.test.cpp')
-rw-r--r-- | test/actor/actor.test.cpp | 223 |
1 files changed, 222 insertions, 1 deletions
diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp index 03f41a6e64..967dc152d9 100644 --- a/test/actor/actor.test.cpp +++ b/test/actor/actor.test.cpp @@ -6,6 +6,7 @@ #include <chrono> #include <functional> #include <future> +#include <memory> using namespace mbgl; using namespace std::chrono_literals; @@ -26,7 +27,7 @@ TEST(Actor, Construction) { EXPECT_TRUE(constructed); } -TEST(Actor, DestructionClosesMailbox) { +TEST(Actor, DestructionBlocksOnReceive) { // Destruction blocks until the actor is not receiving. struct Test { @@ -67,6 +68,149 @@ TEST(Actor, DestructionClosesMailbox) { exitingPromise.set_value(); } +TEST(Actor, DestructionBlocksOnSend) { + // Destruction blocks until the actor is not being sent a message. + + struct TestScheduler : public Scheduler { + std::promise<void> promise; + std::future<void> future; + std::atomic<bool> waited; + + TestScheduler(std::promise<void> promise_, std::future<void> future_) + : promise(std::move(promise_)), + future(std::move(future_)), + waited(false) { + } + + ~TestScheduler() { + EXPECT_TRUE(waited.load()); + } + + void schedule(std::weak_ptr<Mailbox>) final { + promise.set_value(); + future.wait(); + std::this_thread::sleep_for(1ms); + waited = true; + } + }; + + struct Test { + Test(ActorRef<Test>) {} + void message() {} + }; + + std::promise<void> enteredPromise; + std::future<void> enteredFuture = enteredPromise.get_future(); + + std::promise<void> exitingPromise; + std::future<void> exitingFuture = exitingPromise.get_future(); + + auto scheduler = std::make_unique<TestScheduler>(std::move(enteredPromise), std::move(exitingFuture)); + auto actor = std::make_unique<Actor<Test>>(*scheduler); + + std::thread thread { + [] (ActorRef<Test> ref) { + ref.invoke(&Test::message); + }, + actor->self() + }; + + enteredFuture.wait(); + exitingPromise.set_value(); + + actor.reset(); + scheduler.reset(); + + thread.join(); +} + +TEST(Actor, DestructionAllowedInReceiveOnSameThread) { + // Destruction doesn't block if occurring on the same + // thread as receive(). This prevents deadlocks and + // allows for self-closing actors + + struct Test { + + Test(ActorRef<Test>){}; + + void callMeBack(std::function<void ()> callback) { + callback(); + } + }; + + ThreadPool pool { 1 }; + + std::promise<void> callbackFiredPromise; + + auto test = std::make_unique<Actor<Test>>(pool); + + // Callback (triggered while mutex is locked in Mailbox::receive()) + test->invoke(&Test::callMeBack, [&]() { + // Destroy the Actor/Mailbox in the same thread + test.reset(); + callbackFiredPromise.set_value(); + }); + + auto status = callbackFiredPromise.get_future().wait_for(std::chrono::seconds(1)); + ASSERT_EQ(std::future_status::ready, status); +} + +TEST(Actor, SelfDestructionDoesntCrashWaitingReceivingThreads) { + // Ensures destruction doesn't cause waiting threads to + // crash when a actor closes it's own mailbox from a + // callback + + struct Test { + + Test(ActorRef<Test>){}; + + void callMeBack(std::function<void ()> callback) { + callback(); + } + }; + + + ThreadPool pool { 2 }; + + std::promise<void> actorClosedPromise; + + auto closingActor = std::make_unique<Actor<Test>>(pool); + auto waitingActor = std::make_unique<Actor<Test>>(pool); + + std::atomic<bool> waitingMessageProcessed {false}; + + // Callback (triggered while mutex is locked in Mailbox::receive()) + closingActor->invoke(&Test::callMeBack, [&]() { + + // Queue up another message from another thread + std::promise<void> messageQueuedPromise; + waitingActor->invoke(&Test::callMeBack, [&]() { + // This will be waiting on the mutex in + // Mailbox::receive(), holding a lock + // on the weak_ptr so the mailbox is not + // destroyed + closingActor->invoke(&Test::callMeBack, [&]() { + waitingMessageProcessed.store(true); + }); + messageQueuedPromise.set_value(); + }); + + // Wait for the message to be queued + ASSERT_EQ( + messageQueuedPromise.get_future().wait_for(std::chrono::seconds(1)), + std::future_status::ready + ); + + // Destroy the Actor/Mailbox in the same thread + closingActor.reset(); + actorClosedPromise.set_value(); + }); + + auto status = actorClosedPromise.get_future().wait_for(std::chrono::seconds(1)); + ASSERT_EQ(std::future_status::ready, status); + ASSERT_FALSE(waitingMessageProcessed.load()); +} + TEST(Actor, OrderedMailbox) { // Messages are processed in order. @@ -137,3 +281,80 @@ TEST(Actor, NonConcurrentMailbox) { test.invoke(&Test::end); endedFuture.wait(); } + +TEST(Actor, Ask) { + // Asking for a result + + struct Test { + + Test(ActorRef<Test>) {} + + int doubleIt(int i) { + return i * 2; + } + }; + + ThreadPool pool { 2 }; + Actor<Test> test(pool); + + auto result = test.ask(&Test::doubleIt, 1); + + ASSERT_TRUE(result.valid()); + + auto status = result.wait_for(std::chrono::seconds(1)); + ASSERT_EQ(std::future_status::ready, status); + ASSERT_EQ(2, result.get()); +} + +TEST(Actor, AskVoid) { + // Ask waits for void methods + + struct Test { + bool& executed; + + Test(bool& executed_) : executed(executed_) { + } + + void doIt() { + executed = true; + } + }; + + ThreadPool pool { 1 }; + bool executed = false; + Actor<Test> actor(pool, executed); + + actor.ask(&Test::doIt).get(); + EXPECT_TRUE(executed); +} + +TEST(Actor, NoSelfActorRef) { + // Not all actors need a reference to self + + // Trivially constructable + struct Trivial {}; + + ThreadPool pool { 2 }; + Actor<Trivial> trivial(pool); + + + // With arguments + struct WithArguments { + std::promise<void> promise; + + WithArguments(std::promise<void> promise_) + : promise(std::move(promise_)) { + } + + void receive() { + promise.set_value(); + } + }; + + std::promise<void> promise; + auto future = promise.get_future(); + Actor<WithArguments> withArguments(pool, std::move(promise)); + + withArguments.invoke(&WithArguments::receive); + future.wait(); +} |