From 3f3a048a99ded73f80e3fe91835316b57780114f Mon Sep 17 00:00:00 2001 From: Anand Thakker Date: Wed, 13 Jun 2018 09:27:57 -0400 Subject: Avoid blocking in Thread constructor --- include/mbgl/actor/actor.hpp | 19 +++++++++++++ include/mbgl/actor/mailbox.hpp | 6 +++- include/mbgl/util/thread.hpp | 62 +++++++++++++++++++++++++++++++++--------- src/mbgl/actor/mailbox.cpp | 22 +++++++++++++-- 4 files changed, 92 insertions(+), 17 deletions(-) diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp index a0df19208e..2d2757459c 100644 --- a/include/mbgl/actor/actor.hpp +++ b/include/mbgl/actor/actor.hpp @@ -11,6 +11,11 @@ namespace mbgl { +namespace util { + template + class Thread; +} + /* An `Actor` is an owning reference to an asynchronous object of type `O`: an "actor". Communication with an actor happens via message passing: you send a message to the object @@ -56,12 +61,23 @@ public: object(self(), std::forward(args_)...) { } + template , Args...>::value>::type * = nullptr> + Actor(std::shared_ptr mailbox_, Args&&... args_) + : mailbox(std::move(mailbox_)), + object(self(), std::forward(args_)...) { + } + // Enabled for plain Objects template, Args...>::value>::type * = nullptr> Actor(Scheduler& scheduler, Args&& ... args_) : mailbox(std::make_shared(scheduler)), object(std::forward(args_)...) { } + template, Args...>::value>::type * = nullptr> + Actor(std::shared_ptr mailbox_, Args&& ... args_) + : mailbox(mailbox_), object(std::forward(args_)...) { + } + ~Actor() { mailbox->close(); } @@ -91,6 +107,9 @@ public: } private: + template + friend class util::Thread; + std::shared_ptr mailbox; Object object; }; diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp index 8ecf91701a..1603ea4924 100644 --- a/include/mbgl/actor/mailbox.hpp +++ b/include/mbgl/actor/mailbox.hpp @@ -17,11 +17,15 @@ public: void close(); void receive(); + + // Replace this mailbox's scheduler. Effectively allows a mailbox to be + // created on one thread and moved to another one. + void setScheduler(Scheduler* scheduler_); static void maybeReceive(std::weak_ptr); private: - Scheduler& scheduler; + Scheduler* scheduler; std::recursive_mutex receivingMutex; std::mutex pushingMutex; diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index 74e722b02d..8230d8778f 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -19,6 +19,11 @@ namespace mbgl { namespace util { +class NoopScheduler : public Scheduler { +public: + void schedule(std::weak_ptr) override {} +}; + // Manages a thread with `Object`. // Upon creation of this object, it launches a thread and creates an object of type `Object` @@ -41,23 +46,37 @@ class Thread : public Scheduler { public: template Thread(const std::string& name, Args&&... args) { - std::promise running; - - thread = std::thread([&] { + std::unique_ptr> running_ = std::make_unique>(); + running = running_->get_future(); + + // Pre-create a mailbox for this actor, using a NoopScheduler that + // leaves the mailbox's queue unconsumed. + // Once the RunLoop on the target thread has been created, we'll replace + // the NoopScheduler with the RunLoop. Meanwhile, this allows us to + // immediately provide ActorRef using this mailbox, with any messages + // sent to them being queued in the holding mailbox until the thread is + // up and running. + std::shared_ptr mailbox_ = std::make_shared(noopScheduler); + mailbox = mailbox_; + + thread = std::thread([&, sharedMailbox = std::move(mailbox_), runningPromise = std::move(running_)] { platform::setCurrentThreadName(name); platform::makeThreadLowPriority(); util::RunLoop loop_(util::RunLoop::Type::New); loop = &loop_; - object = std::make_unique>(*this, std::forward(args)...); - running.set_value(); + Actor* actor = new (&actorStorage) Actor(std::move(sharedMailbox), std::forward(args)...); + // Replace the NoopScheduler on the mailbox with the RunLoop to + // begin actually processing messages. + actor->mailbox->setScheduler(this); + + runningPromise->set_value(); + loop->run(); loop = nullptr; }); - - running.get_future().get(); } ~Thread() override { @@ -66,12 +85,14 @@ public: } std::promise joinable; + + running.wait(); // Kill the actor, so we don't get more // messages posted on this scheduler after // we delete the RunLoop. loop->invoke([&] { - object.reset(); + reinterpret_cast*>(&actorStorage)->~Actor(); joinable.set_value(); }); @@ -85,8 +106,15 @@ public: // can be used to send messages to `Object`. It is safe // to the non-owning reference to outlive this object // and be used after the `Thread<>` gets destroyed. - ActorRef> actor() const { - return object->self(); + ActorRef> actor() { + // The actor->object reference we provide here will not actually be + // valid until the child thread constructs Actor into + // actorStorage using "placement new". + // We guarantee that the object reference isn't actually used by + // using the NoopScheduler to prevent messages to this mailbox from + // being processed until after the actor has been constructed. + auto actor = reinterpret_cast*>(&actorStorage); + return ActorRef>(actor->object, mailbox); } // Pauses the `Object` thread. It will prevent the object to wake @@ -103,6 +131,8 @@ public: auto pausing = paused->get_future(); + running.wait(); + loop->invoke(RunLoop::Priority::High, [this] { auto resuming = resumed->get_future(); paused->set_value(); @@ -127,13 +157,19 @@ public: private: MBGL_STORE_THREAD(tid); - void schedule(std::weak_ptr mailbox) override { - loop->schedule(mailbox); + void schedule(std::weak_ptr mailbox_) override { + assert(loop); + loop->schedule(mailbox_); } + NoopScheduler noopScheduler; + std::weak_ptr mailbox; + std::aligned_storage)> actorStorage; + std::thread thread; - std::unique_ptr> object; + std::future running; + std::unique_ptr> paused; std::unique_ptr> resumed; diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp index 373c24275f..c117aa2605 100644 --- a/src/mbgl/actor/mailbox.cpp +++ b/src/mbgl/actor/mailbox.cpp @@ -7,7 +7,7 @@ namespace mbgl { Mailbox::Mailbox(Scheduler& scheduler_) - : scheduler(scheduler_) { + : scheduler(&scheduler_) { } void Mailbox::close() { @@ -22,6 +22,22 @@ void Mailbox::close() { closed = true; } +void Mailbox::setScheduler(Scheduler* scheduler_) { + std::lock_guard receivingLock(receivingMutex); + std::lock_guard pushingLock(pushingMutex); + + scheduler = scheduler_; + + if (closed) { + return; + } + + if (!queue.empty()) { + scheduler->schedule(shared_from_this()); + } +} + + void Mailbox::push(std::unique_ptr message) { std::lock_guard pushingLock(pushingMutex); @@ -33,7 +49,7 @@ void Mailbox::push(std::unique_ptr message) { bool wasEmpty = queue.empty(); queue.push(std::move(message)); if (wasEmpty) { - scheduler.schedule(shared_from_this()); + scheduler->schedule(shared_from_this()); } } @@ -58,7 +74,7 @@ void Mailbox::receive() { (*message)(); if (!wasEmpty) { - scheduler.schedule(shared_from_this()); + scheduler->schedule(shared_from_this()); } } -- cgit v1.2.1