summaryrefslogtreecommitdiff
path: root/test/actor/actor.test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/actor/actor.test.cpp')
-rw-r--r--test/actor/actor.test.cpp223
1 files changed, 222 insertions, 1 deletions
diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp
index 03f41a6e64..967dc152d9 100644
--- a/test/actor/actor.test.cpp
+++ b/test/actor/actor.test.cpp
@@ -6,6 +6,7 @@
#include <chrono>
#include <functional>
#include <future>
+#include <memory>
using namespace mbgl;
using namespace std::chrono_literals;
@@ -26,7 +27,7 @@ TEST(Actor, Construction) {
EXPECT_TRUE(constructed);
}
-TEST(Actor, DestructionClosesMailbox) {
+TEST(Actor, DestructionBlocksOnReceive) {
// Destruction blocks until the actor is not receiving.
struct Test {
@@ -67,6 +68,149 @@ TEST(Actor, DestructionClosesMailbox) {
exitingPromise.set_value();
}
+TEST(Actor, DestructionBlocksOnSend) {
+ // Destruction blocks until the actor is not being sent a message.
+
+ struct TestScheduler : public Scheduler {
+ std::promise<void> promise;
+ std::future<void> future;
+ std::atomic<bool> waited;
+
+ TestScheduler(std::promise<void> promise_, std::future<void> future_)
+ : promise(std::move(promise_)),
+ future(std::move(future_)),
+ waited(false) {
+ }
+
+ ~TestScheduler() {
+ EXPECT_TRUE(waited.load());
+ }
+
+ void schedule(std::weak_ptr<Mailbox>) final {
+ promise.set_value();
+ future.wait();
+ std::this_thread::sleep_for(1ms);
+ waited = true;
+ }
+ };
+
+ struct Test {
+ Test(ActorRef<Test>) {}
+ void message() {}
+ };
+
+ std::promise<void> enteredPromise;
+ std::future<void> enteredFuture = enteredPromise.get_future();
+
+ std::promise<void> exitingPromise;
+ std::future<void> exitingFuture = exitingPromise.get_future();
+
+ auto scheduler = std::make_unique<TestScheduler>(std::move(enteredPromise), std::move(exitingFuture));
+ auto actor = std::make_unique<Actor<Test>>(*scheduler);
+
+ std::thread thread {
+ [] (ActorRef<Test> ref) {
+ ref.invoke(&Test::message);
+ },
+ actor->self()
+ };
+
+ enteredFuture.wait();
+ exitingPromise.set_value();
+
+ actor.reset();
+ scheduler.reset();
+
+ thread.join();
+}
+
+TEST(Actor, DestructionAllowedInReceiveOnSameThread) {
+ // Destruction doesn't block if occurring on the same
+ // thread as receive(). This prevents deadlocks and
+ // allows for self-closing actors
+
+ struct Test {
+
+ Test(ActorRef<Test>){};
+
+ void callMeBack(std::function<void ()> callback) {
+ callback();
+ }
+ };
+
+ ThreadPool pool { 1 };
+
+ std::promise<void> callbackFiredPromise;
+
+ auto test = std::make_unique<Actor<Test>>(pool);
+
+ // Callback (triggered while mutex is locked in Mailbox::receive())
+ test->invoke(&Test::callMeBack, [&]() {
+ // Destroy the Actor/Mailbox in the same thread
+ test.reset();
+ callbackFiredPromise.set_value();
+ });
+
+ auto status = callbackFiredPromise.get_future().wait_for(std::chrono::seconds(1));
+ ASSERT_EQ(std::future_status::ready, status);
+}
+
+TEST(Actor, SelfDestructionDoesntCrashWaitingReceivingThreads) {
+ // Ensures destruction doesn't cause waiting threads to
+ // crash when a actor closes it's own mailbox from a
+ // callback
+
+ struct Test {
+
+ Test(ActorRef<Test>){};
+
+ void callMeBack(std::function<void ()> callback) {
+ callback();
+ }
+ };
+
+
+ ThreadPool pool { 2 };
+
+ std::promise<void> actorClosedPromise;
+
+ auto closingActor = std::make_unique<Actor<Test>>(pool);
+ auto waitingActor = std::make_unique<Actor<Test>>(pool);
+
+ std::atomic<bool> waitingMessageProcessed {false};
+
+ // Callback (triggered while mutex is locked in Mailbox::receive())
+ closingActor->invoke(&Test::callMeBack, [&]() {
+
+ // Queue up another message from another thread
+ std::promise<void> messageQueuedPromise;
+ waitingActor->invoke(&Test::callMeBack, [&]() {
+ // This will be waiting on the mutex in
+ // Mailbox::receive(), holding a lock
+ // on the weak_ptr so the mailbox is not
+ // destroyed
+ closingActor->invoke(&Test::callMeBack, [&]() {
+ waitingMessageProcessed.store(true);
+ });
+ messageQueuedPromise.set_value();
+ });
+
+ // Wait for the message to be queued
+ ASSERT_EQ(
+ messageQueuedPromise.get_future().wait_for(std::chrono::seconds(1)),
+ std::future_status::ready
+ );
+
+ // Destroy the Actor/Mailbox in the same thread
+ closingActor.reset();
+ actorClosedPromise.set_value();
+ });
+
+ auto status = actorClosedPromise.get_future().wait_for(std::chrono::seconds(1));
+ ASSERT_EQ(std::future_status::ready, status);
+ ASSERT_FALSE(waitingMessageProcessed.load());
+}
+
TEST(Actor, OrderedMailbox) {
// Messages are processed in order.
@@ -137,3 +281,80 @@ TEST(Actor, NonConcurrentMailbox) {
test.invoke(&Test::end);
endedFuture.wait();
}
+
+TEST(Actor, Ask) {
+ // Asking for a result
+
+ struct Test {
+
+ Test(ActorRef<Test>) {}
+
+ int doubleIt(int i) {
+ return i * 2;
+ }
+ };
+
+ ThreadPool pool { 2 };
+ Actor<Test> test(pool);
+
+ auto result = test.ask(&Test::doubleIt, 1);
+
+ ASSERT_TRUE(result.valid());
+
+ auto status = result.wait_for(std::chrono::seconds(1));
+ ASSERT_EQ(std::future_status::ready, status);
+ ASSERT_EQ(2, result.get());
+}
+
+TEST(Actor, AskVoid) {
+ // Ask waits for void methods
+
+ struct Test {
+ bool& executed;
+
+ Test(bool& executed_) : executed(executed_) {
+ }
+
+ void doIt() {
+ executed = true;
+ }
+ };
+
+ ThreadPool pool { 1 };
+ bool executed = false;
+ Actor<Test> actor(pool, executed);
+
+ actor.ask(&Test::doIt).get();
+ EXPECT_TRUE(executed);
+}
+
+TEST(Actor, NoSelfActorRef) {
+ // Not all actors need a reference to self
+
+ // Trivially constructable
+ struct Trivial {};
+
+ ThreadPool pool { 2 };
+ Actor<Trivial> trivial(pool);
+
+
+ // With arguments
+ struct WithArguments {
+ std::promise<void> promise;
+
+ WithArguments(std::promise<void> promise_)
+ : promise(std::move(promise_)) {
+ }
+
+ void receive() {
+ promise.set_value();
+ }
+ };
+
+ std::promise<void> promise;
+ auto future = promise.get_future();
+ Actor<WithArguments> withArguments(pool, std::move(promise));
+
+ withArguments.invoke(&WithArguments::receive);
+ future.wait();
+}