From c34d1cfaa7a9e61c6bb8f07d4fff547ad68e3115 Mon Sep 17 00:00:00 2001 From: "Thiago Marcos P. Santos" Date: Thu, 15 Jun 2017 17:59:25 +0300 Subject: [tests] Added tests for ThreadedObject --- test/util/threaded_object.test.cpp | 277 +++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 test/util/threaded_object.test.cpp (limited to 'test/util') diff --git a/test/util/threaded_object.test.cpp b/test/util/threaded_object.test.cpp new file mode 100644 index 0000000000..9a78290305 --- /dev/null +++ b/test/util/threaded_object.test.cpp @@ -0,0 +1,277 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace mbgl; +using namespace mbgl::util; + +class TestObject { +public: + TestObject(ActorRef, std::thread::id otherTid) + : tid(std::this_thread::get_id()) { + EXPECT_NE(tid, otherTid); + } + + ~TestObject() { + EXPECT_EQ(tid, std::this_thread::get_id()); + } + + void fn1(int val) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(val, 1); + } + + void fn2(std::function cb) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + cb(1); + } + + void transferIn(std::unique_ptr val) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(*val, 1); + } + + void transferInShared(std::shared_ptr val) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(*val, 1); + } + + void transferString(const std::string& string) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(string, "test"); + } + + void checkContext(std::promise result) const { + result.set_value(tid == std::this_thread::get_id()); + } + + void sync(std::promise result) const { + result.set_value(); + } + + const std::thread::id tid; +}; + +TEST(ThreadedObject, invoke) { + const std::thread::id tid = std::this_thread::get_id(); + ThreadedObject thread("Test", tid); + + thread.actor().invoke(&TestObject::fn1, 1); + thread.actor().invoke(&TestObject::fn2, [] (int result) { EXPECT_EQ(result, 1); } ); + thread.actor().invoke(&TestObject::transferIn, std::make_unique(1)); + thread.actor().invoke(&TestObject::transferInShared, std::make_shared(1)); + + std::string test("test"); + thread.actor().invoke(&TestObject::transferString, test); + + // Make sure the message queue was consumed before ending the test. + std::promise result; + auto resultFuture = result.get_future(); + thread.actor().invoke(&TestObject::sync, std::move(result)); + resultFuture.get(); +} + +TEST(ThreadedObject, Context) { + const std::thread::id tid = std::this_thread::get_id(); + ThreadedObject thread("Test", tid); + + std::promise result; + auto resultFuture = result.get_future(); + + thread.actor().invoke(&TestObject::checkContext, std::move(result)); + EXPECT_EQ(resultFuture.get(), true); +} + +class TestWorker { +public: + TestWorker(ActorRef) {} + + void send(std::function cb) { + cb(); + } + + void sendDelayed(std::function cb) { + timer.start(Milliseconds(300), mbgl::Duration::zero(), [cb] { + cb(); + }); + } + +private: + Timer timer; +}; + +TEST(ThreadedObject, ExecutesAfter) { + RunLoop loop; + ThreadedObject thread("Test"); + + bool didWork = false; + bool didAfter = false; + + thread.actor().invoke(&TestWorker::send, [&] { didWork = true; }); + thread.actor().invoke(&TestWorker::send, [&] { didAfter = true; loop.stop(); }); + + loop.run(); + + EXPECT_TRUE(didWork); + EXPECT_TRUE(didAfter); +} + +TEST(ThreadedObject, CanSelfWakeUp) { + RunLoop loop; + ThreadedObject thread("Test"); + + thread.actor().invoke(&TestWorker::sendDelayed, [&] { + loop.stop(); + }); + + loop.run(); +} + +TEST(ThreadedObject, Concurrency) { + auto loop = std::make_shared(); + + unsigned numMessages = 100000; + std::atomic_uint completed(numMessages); + + ThreadPool threadPool(10); + Actor poolWorker(threadPool); + auto poolWorkerRef = poolWorker.self(); + + ThreadedObject threadedObject("Test"); + auto threadedObjectRef = threadedObject.actor(); + + // 10 threads sending 100k messages to the ThreadedObject. The + // idea here is to test if the scheduler is handling concurrency + // correctly, otherwise this test should crash. + for (unsigned i = 0; i < numMessages; ++i) { + poolWorkerRef.invoke(&TestWorker::send, [threadedObjectRef, loop, &completed] () mutable { + threadedObjectRef.invoke(&TestWorker::send, [loop, &completed] () { + if (!--completed) { + loop->stop(); + } + }); + }); + }; + + loop->run(); +} + +TEST(ThreadedObject, ThreadPoolMessaging) { + auto loop = std::make_shared(); + + ThreadPool threadPool(1); + Actor poolWorker(threadPool); + auto poolWorkerRef = poolWorker.self(); + + ThreadedObject threadedObject("Test"); + auto threadedObjectRef = threadedObject.actor(); + + // This is sending a message to the ThreadedObject from the main + // thread. Then the ThreadedObject will send another message to + // a worker on the ThreadPool. + threadedObjectRef.invoke(&TestWorker::send, [poolWorkerRef, loop] () mutable { + poolWorkerRef.invoke(&TestWorker::send, [loop] () { loop->stop(); }); + }); + + loop->run(); + + // Same as before, but in the opposite direction. + poolWorkerRef.invoke(&TestWorker::send, [threadedObjectRef, loop] () mutable { + threadedObjectRef.invoke(&TestWorker::send, [loop] () { loop->stop(); }); + }); + + loop->run(); +} + +TEST(ThreadedObject, ReferenceCanOutliveThreadedObject) { + auto thread = std::make_unique>("Test"); + auto worker = thread->actor(); + + thread.reset(); + + for (unsigned i = 0; i < 1000; ++i) { + worker.invoke(&TestWorker::send, [&] { ADD_FAILURE() << "Should never happen"; }); + } + + usleep(10000); +} + +TEST(ThreadedObject, DeletePausedThreadedObject) { + std::atomic_bool flag(false); + + auto thread = std::make_unique>("Test"); + thread->pause(); + thread->actor().invoke(&TestWorker::send, [&] { flag = true; }); + + // Should not hang. + thread.reset(); + + // Should process the queue before destruction. + ASSERT_TRUE(flag); +} + +TEST(ThreadedObject, Pause) { + RunLoop loop; + + std::atomic_bool flag(false); + + ThreadedObject thread1("Test1"); + thread1.pause(); + + ThreadedObject thread2("Test2"); + + for (unsigned i = 0; i < 100; ++i) { + thread1.actor().invoke(&TestWorker::send, [&] { flag = true; }); + thread2.actor().invoke(&TestWorker::send, [&] { ASSERT_FALSE(flag); }); + } + + // Queue a message at the end of thread2 queue. + thread2.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + loop.run(); +} + +TEST(ThreadedObject, Resume) { + RunLoop loop; + + std::atomic_bool flag(false); + + ThreadedObject thread("Test"); + thread.pause(); + + for (unsigned i = 0; i < 100; ++i) { + thread.actor().invoke(&TestWorker::send, [&] { flag = true; }); + } + + // ThreadedObject messages are ondered, when we resume, this is going + // to me the last thing to run on the message queue. + thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + + // This test will be flaky if the thread doesn't get paused. + ASSERT_FALSE(flag); + + thread.resume(); + loop.run(); + + ASSERT_TRUE(flag); +} + +TEST(ThreadedObject, PauseResume) { + RunLoop loop; + + ThreadedObject thread("Test"); + + // Test if multiple pause/resume work. + for (unsigned i = 0; i < 100; ++i) { + thread.pause(); + thread.resume(); + } + + thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + loop.run(); +} -- cgit v1.2.1