diff options
author | Thiago Marcos P. Santos <tmpsantos@gmail.com> | 2017-06-15 17:59:25 +0300 |
---|---|---|
committer | Thiago Marcos P. Santos <tmpsantos@gmail.com> | 2017-06-21 14:30:09 +0300 |
commit | c34d1cfaa7a9e61c6bb8f07d4fff547ad68e3115 (patch) | |
tree | e1e3be2b4ce1b57d74f45c4bed92c73952922fd9 /test/util | |
parent | 0de17ea5f47e9e6c4b67170d4d04b0425e67ce5c (diff) | |
download | qtlocation-mapboxgl-c34d1cfaa7a9e61c6bb8f07d4fff547ad68e3115.tar.gz |
[tests] Added tests for ThreadedObject
Diffstat (limited to 'test/util')
-rw-r--r-- | test/util/threaded_object.test.cpp | 277 |
1 files changed, 277 insertions, 0 deletions
diff --git a/test/util/threaded_object.test.cpp b/test/util/threaded_object.test.cpp new file mode 100644 index 0000000000..9a78290305 --- /dev/null +++ b/test/util/threaded_object.test.cpp @@ -0,0 +1,277 @@ +#include <mbgl/actor/actor_ref.hpp> +#include <mbgl/test/util.hpp> +#include <mbgl/util/default_thread_pool.hpp> +#include <mbgl/util/run_loop.hpp> +#include <mbgl/util/threaded_object.hpp> +#include <mbgl/util/timer.hpp> + +#include <atomic> +#include <memory> + +using namespace mbgl; +using namespace mbgl::util; + +class TestObject { +public: + TestObject(ActorRef<TestObject>, std::thread::id otherTid) + : tid(std::this_thread::get_id()) { + EXPECT_NE(tid, otherTid); + } + + ~TestObject() { + EXPECT_EQ(tid, std::this_thread::get_id()); + } + + void fn1(int val) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(val, 1); + } + + void fn2(std::function<void (int)> cb) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + cb(1); + } + + void transferIn(std::unique_ptr<int> val) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(*val, 1); + } + + void transferInShared(std::shared_ptr<int> val) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(*val, 1); + } + + void transferString(const std::string& string) const { + EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(string, "test"); + } + + void checkContext(std::promise<bool> result) const { + result.set_value(tid == std::this_thread::get_id()); + } + + void sync(std::promise<void> result) const { + result.set_value(); + } + + const std::thread::id tid; +}; + +TEST(ThreadedObject, invoke) { + const std::thread::id tid = std::this_thread::get_id(); + ThreadedObject<TestObject> thread("Test", tid); + + thread.actor().invoke(&TestObject::fn1, 1); + thread.actor().invoke(&TestObject::fn2, [] (int result) { EXPECT_EQ(result, 1); } ); + thread.actor().invoke(&TestObject::transferIn, std::make_unique<int>(1)); + thread.actor().invoke(&TestObject::transferInShared, std::make_shared<int>(1)); + + std::string test("test"); + thread.actor().invoke(&TestObject::transferString, test); + + // Make sure the message queue was consumed before ending the test. + std::promise<void> result; + auto resultFuture = result.get_future(); + thread.actor().invoke(&TestObject::sync, std::move(result)); + resultFuture.get(); +} + +TEST(ThreadedObject, Context) { + const std::thread::id tid = std::this_thread::get_id(); + ThreadedObject<TestObject> thread("Test", tid); + + std::promise<bool> result; + auto resultFuture = result.get_future(); + + thread.actor().invoke(&TestObject::checkContext, std::move(result)); + EXPECT_EQ(resultFuture.get(), true); +} + +class TestWorker { +public: + TestWorker(ActorRef<TestWorker>) {} + + void send(std::function<void ()> cb) { + cb(); + } + + void sendDelayed(std::function<void ()> cb) { + timer.start(Milliseconds(300), mbgl::Duration::zero(), [cb] { + cb(); + }); + } + +private: + Timer timer; +}; + +TEST(ThreadedObject, ExecutesAfter) { + RunLoop loop; + ThreadedObject<TestWorker> thread("Test"); + + bool didWork = false; + bool didAfter = false; + + thread.actor().invoke(&TestWorker::send, [&] { didWork = true; }); + thread.actor().invoke(&TestWorker::send, [&] { didAfter = true; loop.stop(); }); + + loop.run(); + + EXPECT_TRUE(didWork); + EXPECT_TRUE(didAfter); +} + +TEST(ThreadedObject, CanSelfWakeUp) { + RunLoop loop; + ThreadedObject<TestWorker> thread("Test"); + + thread.actor().invoke(&TestWorker::sendDelayed, [&] { + loop.stop(); + }); + + loop.run(); +} + +TEST(ThreadedObject, Concurrency) { + auto loop = std::make_shared<RunLoop>(); + + unsigned numMessages = 100000; + std::atomic_uint completed(numMessages); + + ThreadPool threadPool(10); + Actor<TestWorker> poolWorker(threadPool); + auto poolWorkerRef = poolWorker.self(); + + ThreadedObject<TestWorker> threadedObject("Test"); + auto threadedObjectRef = threadedObject.actor(); + + // 10 threads sending 100k messages to the ThreadedObject. The + // idea here is to test if the scheduler is handling concurrency + // correctly, otherwise this test should crash. + for (unsigned i = 0; i < numMessages; ++i) { + poolWorkerRef.invoke(&TestWorker::send, [threadedObjectRef, loop, &completed] () mutable { + threadedObjectRef.invoke(&TestWorker::send, [loop, &completed] () { + if (!--completed) { + loop->stop(); + } + }); + }); + }; + + loop->run(); +} + +TEST(ThreadedObject, ThreadPoolMessaging) { + auto loop = std::make_shared<RunLoop>(); + + ThreadPool threadPool(1); + Actor<TestWorker> poolWorker(threadPool); + auto poolWorkerRef = poolWorker.self(); + + ThreadedObject<TestWorker> threadedObject("Test"); + auto threadedObjectRef = threadedObject.actor(); + + // This is sending a message to the ThreadedObject from the main + // thread. Then the ThreadedObject will send another message to + // a worker on the ThreadPool. + threadedObjectRef.invoke(&TestWorker::send, [poolWorkerRef, loop] () mutable { + poolWorkerRef.invoke(&TestWorker::send, [loop] () { loop->stop(); }); + }); + + loop->run(); + + // Same as before, but in the opposite direction. + poolWorkerRef.invoke(&TestWorker::send, [threadedObjectRef, loop] () mutable { + threadedObjectRef.invoke(&TestWorker::send, [loop] () { loop->stop(); }); + }); + + loop->run(); +} + +TEST(ThreadedObject, ReferenceCanOutliveThreadedObject) { + auto thread = std::make_unique<ThreadedObject<TestWorker>>("Test"); + auto worker = thread->actor(); + + thread.reset(); + + for (unsigned i = 0; i < 1000; ++i) { + worker.invoke(&TestWorker::send, [&] { ADD_FAILURE() << "Should never happen"; }); + } + + usleep(10000); +} + +TEST(ThreadedObject, DeletePausedThreadedObject) { + std::atomic_bool flag(false); + + auto thread = std::make_unique<ThreadedObject<TestWorker>>("Test"); + thread->pause(); + thread->actor().invoke(&TestWorker::send, [&] { flag = true; }); + + // Should not hang. + thread.reset(); + + // Should process the queue before destruction. + ASSERT_TRUE(flag); +} + +TEST(ThreadedObject, Pause) { + RunLoop loop; + + std::atomic_bool flag(false); + + ThreadedObject<TestWorker> thread1("Test1"); + thread1.pause(); + + ThreadedObject<TestWorker> thread2("Test2"); + + for (unsigned i = 0; i < 100; ++i) { + thread1.actor().invoke(&TestWorker::send, [&] { flag = true; }); + thread2.actor().invoke(&TestWorker::send, [&] { ASSERT_FALSE(flag); }); + } + + // Queue a message at the end of thread2 queue. + thread2.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + loop.run(); +} + +TEST(ThreadedObject, Resume) { + RunLoop loop; + + std::atomic_bool flag(false); + + ThreadedObject<TestWorker> thread("Test"); + thread.pause(); + + for (unsigned i = 0; i < 100; ++i) { + thread.actor().invoke(&TestWorker::send, [&] { flag = true; }); + } + + // ThreadedObject messages are ondered, when we resume, this is going + // to me the last thing to run on the message queue. + thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + + // This test will be flaky if the thread doesn't get paused. + ASSERT_FALSE(flag); + + thread.resume(); + loop.run(); + + ASSERT_TRUE(flag); +} + +TEST(ThreadedObject, PauseResume) { + RunLoop loop; + + ThreadedObject<TestWorker> thread("Test"); + + // Test if multiple pause/resume work. + for (unsigned i = 0; i < 100; ++i) { + thread.pause(); + thread.resume(); + } + + thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); + loop.run(); +} |