From a3715c46ad32de759fd4cc02d009d94aa39315b2 Mon Sep 17 00:00:00 2001 From: Anand Thakker Date: Fri, 15 Jun 2018 14:27:44 -0400 Subject: Enable Actor to construct its Object asynchronously --- include/mbgl/actor/actor.hpp | 95 +++++++++++++++++++++++--------- include/mbgl/actor/mailbox.hpp | 3 +- include/mbgl/util/thread.hpp | 49 +++------------- platform/default/default_file_source.cpp | 11 +--- src/mbgl/actor/mailbox.cpp | 6 +- 5 files changed, 86 insertions(+), 78 deletions(-) diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp index 338b7a559d..f87f191b73 100644 --- a/include/mbgl/actor/actor.hpp +++ b/include/mbgl/actor/actor.hpp @@ -53,38 +53,55 @@ namespace util { template class Actor : public util::noncopyable { public: + // TODO: handle std::reference_wrapper case like make_tuple does. + template + using Arguments = std::tuple...>; - // Enabled for Objects with a constructor taking ActorRef as the first parameter - template , Args...>::value>::type * = nullptr> - Actor(Scheduler& scheduler, Args&&... args_) - : mailbox(std::make_shared(scheduler)), - object(self(), std::forward(args_)...) { + template + static Arguments captureArguments(Args&&... args) { + return std::make_tuple(std::forward(args)...); } - template , Args...>::value>::type * = nullptr> - Actor(std::shared_ptr mailbox_, Args&&... args_) - : mailbox(std::move(mailbox_)), - object(self(), std::forward(args_)...) { - } - - // Enabled for plain Objects - template, Args...>::value>::type * = nullptr> - Actor(Scheduler& scheduler, Args&& ... args_) - : mailbox(std::make_shared(scheduler)), object(std::forward(args_)...) { + // Target thread constructor: constructs the Object synchronously and + // immediately begins processing messages to it. + template + Actor(Scheduler& scheduler, Args&& ... args) : mailbox(std::make_shared()) { + emplaceObject(std::forward(args)...); + mailbox->activate(scheduler); } - template, Args...>::value>::type * = nullptr> - Actor(std::shared_ptr mailbox_, Args&& ... args_) - : mailbox(std::move(mailbox_)), object(std::forward(args_)...) { + // Parent thread constructor, allowing an Actor to be pre-created on a + // parent thread before its target thread is up and running. + // This constructor: + // * does not construct an Object + // * pre-creates a "holding" mailbox, whose messages are guaranteed not to + // be consumed until we explicitly call start(). + // + // An Actor created in this manner must be manually activated by a call to + // activate() from the target thread, at which point the Object is + // constructed and the Mailbox starts being consumed. + // + // Meanwhile, this allows us to immediately provide ActorRefs to which + // messages can safely be sent, since we won't process those messages + // until their target object is in place. + Actor() : mailbox(std::make_shared()) {} + + template + void activate(Scheduler& scheduler, ArgsTuple&& args) { + emplaceObject(args, std::make_index_sequence::value>{}); + mailbox->activate(scheduler); } - + ~Actor() { mailbox->close(); + if (initialized) { + (&object())->~Object(); + } } template void invoke(Fn fn, Args&&... args) { - mailbox->push(actor::makeMessage(object, fn, std::forward(args)...)); + mailbox->push(actor::makeMessage(object(), fn, std::forward(args)...)); } template @@ -94,24 +111,48 @@ public: std::promise promise; auto future = promise.get_future(); - mailbox->push(actor::makeMessage(std::move(promise), object, fn, std::forward(args)...)); + mailbox->push(actor::makeMessage(std::move(promise), object(), fn, std::forward(args)...)); return future; } ActorRef> self() { - return ActorRef>(object, mailbox); + return ActorRef>(object(), mailbox); } operator ActorRef>() { return self(); } - -private: - template - friend class util::Thread; +private: std::shared_ptr mailbox; - Object object; + std::aligned_storage_t objectStorage; + std::atomic initialized { false }; + + Object& object() { + assert(initialized || !mailbox->isActive()); + return *reinterpret_cast(&objectStorage); + } + + // Enabled for Objects with a constructor taking ActorRef as the first parameter + template , Args...>::value>::type * = nullptr> + void emplaceObject(Args&&... args_) { + new (&objectStorage) Object(self(), std::forward(args_)...); + initialized = true; + } + + // Enabled for plain Objects + template, Args...>::value>::type * = nullptr> + void emplaceObject(Args&&... args_) { + new (&objectStorage) Object(std::forward(args_)...); + initialized = true; + } + + // Used to expand a tuple of arguments created by Actor::captureArguments() + template + void emplaceObject(ArgsTuple args, std::index_sequence) { + emplaceObject(std::move(std::get(std::forward(args)))...); + } + }; } // namespace mbgl diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp index 862ee750a4..34d502495a 100644 --- a/include/mbgl/actor/mailbox.hpp +++ b/include/mbgl/actor/mailbox.hpp @@ -30,7 +30,8 @@ public: // Attach the given scheduler to this mailbox and begin processing messages // sent to it. The mailbox must be a "holding" mailbox, as created by the // default constructor Mailbox(). - void start(Scheduler* scheduler_); + void activate(Scheduler& scheduler_); + bool isActive() const; static void maybeReceive(std::weak_ptr); diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index 9eecb64920..1fcc431b28 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -40,27 +40,18 @@ template class Thread : public Scheduler { public: template - Thread(const std::string& name, Args&&... args) { + Thread(const std::string& name, Args&&... args) + : object(std::make_unique>()) { + std::unique_ptr> running_ = std::make_unique>(); running = running_->get_future(); - // Pre-create a "holding" mailbox for this actor, whose messages are - // guaranteed not to be consumed until we explicitly call start(), which - // we'll do on the target thread, once its RunLoop and Object instance - // are ready. - // Meanwhile, this allows us to immediately provide ActorRef using this - // mailbox to queue any messages that come in before the thread is - // ready. (See actor().) - std::shared_ptr mailbox_ = std::make_shared(); - mailbox = mailbox_; - - auto tuple = std::make_tuple(std::forward(args)...); + auto capturedArgs = Actor::captureArguments(std::forward(args)...); thread = std::thread([ this, name, - tuple, - sharedMailbox = std::move(mailbox_), + capturedArgs, runningPromise = std::move(running_) ] { platform::setCurrentThreadName(name); @@ -69,16 +60,7 @@ public: util::RunLoop loop_(util::RunLoop::Type::New); loop = &loop_; - // Construct the Actor into the pre-allocated memory - // at `actorStorage`. - Actor* actor = emplaceActor( - std::move(sharedMailbox), - std::move(tuple), - std::make_index_sequence::value>{}); - - // Replace the NoopScheduler on the mailbox with the RunLoop to - // begin actually processing messages. - actor->mailbox->start(this); + object->activate(*this, std::move(capturedArgs)); runningPromise->set_value(); @@ -100,7 +82,7 @@ public: // messages posted on this scheduler after // we delete the RunLoop. loop->invoke([&] { - reinterpret_cast*>(&actorStorage)->~Actor(); + object.reset(); joinable.set_value(); }); @@ -115,14 +97,7 @@ public: // to the non-owning reference to outlive this object // and be used after the `Thread<>` gets destroyed. ActorRef> actor() { - // The actor->object reference we provide here will not actually be - // valid until the child thread constructs Actor into - // actorStorage using "placement new". - // We guarantee that the object reference isn't actually used by - // creating this mailbox without a scheduler, and only starting it - // after the actor has been constructed. - auto actor = reinterpret_cast*>(&actorStorage); - return ActorRef>(actor->object, mailbox); + return object->self(); } // Pauses the `Object` thread. It will prevent the object to wake @@ -170,13 +145,7 @@ private: loop->schedule(mailbox_); } - template - Actor* emplaceActor(std::shared_ptr sharedMailbox, ArgsTuple args, std::index_sequence) { - return new (&actorStorage) Actor(std::move(sharedMailbox), std::move(std::get(std::forward(args)))...); - } - - std::weak_ptr mailbox; - std::aligned_storage_t)> actorStorage; + std::unique_ptr> object; std::thread thread; diff --git a/platform/default/default_file_source.cpp b/platform/default/default_file_source.cpp index 26b67134cb..66ce0e3d81 100644 --- a/platform/default/default_file_source.cpp +++ b/platform/default/default_file_source.cpp @@ -18,15 +18,10 @@ namespace mbgl { class DefaultFileSource::Impl { public: - Impl(ActorRef self, std::shared_ptr assetFileSource_, std::string cachePath, uint64_t maximumCacheSize) + Impl(std::shared_ptr assetFileSource_, std::string cachePath, uint64_t maximumCacheSize) : assetFileSource(assetFileSource_) - , localFileSource(std::make_unique()) { - // Initialize the Database asynchronously so as to not block Actor creation. - self.invoke(&Impl::initializeOfflineDatabase, cachePath, maximumCacheSize); - } - - void initializeOfflineDatabase(std::string cachePath, uint64_t maximumCacheSize) { - offlineDatabase = std::make_unique(cachePath, maximumCacheSize); + , localFileSource(std::make_unique()) + , offlineDatabase(std::make_unique(cachePath, maximumCacheSize)) { } void setAPIBaseURL(const std::string& url) { diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp index 20d9b25cad..b95a6dd7f6 100644 --- a/src/mbgl/actor/mailbox.cpp +++ b/src/mbgl/actor/mailbox.cpp @@ -25,7 +25,7 @@ void Mailbox::close() { closed = true; } -void Mailbox::start(Scheduler* scheduler_) { +void Mailbox::activate(Scheduler& scheduler_) { assert(!scheduler); // As with close(), block until neither receive() nor push() are in progress, and acquire the two @@ -33,7 +33,7 @@ void Mailbox::start(Scheduler* scheduler_) { std::lock_guard receivingLock(receivingMutex); std::lock_guard pushingLock(pushingMutex); - scheduler = scheduler_; + scheduler = &scheduler_; if (closed) { return; @@ -44,6 +44,8 @@ void Mailbox::start(Scheduler* scheduler_) { } } +bool Mailbox::isActive() const { return bool(scheduler); } + void Mailbox::push(std::unique_ptr message) { std::lock_guard pushingLock(pushingMutex); -- cgit v1.2.1