From 5cdacf168b43a6c4c4a8e3a502186e5c7d6523b3 Mon Sep 17 00:00:00 2001 From: Anand Thakker Date: Thu, 14 Jun 2018 15:41:53 -0400 Subject: Simplify design for "holding" mailbox --- include/mbgl/actor/mailbox.hpp | 18 ++++++++++++++---- include/mbgl/util/thread.hpp | 23 ++++++++--------------- src/mbgl/actor/mailbox.cpp | 21 +++++++++++++++------ 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp index 1603ea4924..862ee750a4 100644 --- a/include/mbgl/actor/mailbox.hpp +++ b/include/mbgl/actor/mailbox.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -11,6 +13,13 @@ class Message; class Mailbox : public std::enable_shared_from_this { public: + + // Create a "holding" mailbox, messages to which will remain queued, + // unconsumed, until the mailbox is associated with a Scheduler using + // start(). This allows a Mailbox object to be created on one thread and + // later transferred to a different target thread that may not yet exist. + Mailbox(); + Mailbox(Scheduler&); void push(std::unique_ptr); @@ -18,14 +27,15 @@ public: void close(); void receive(); - // Replace this mailbox's scheduler. Effectively allows a mailbox to be - // created on one thread and moved to another one. - void setScheduler(Scheduler* scheduler_); + // Attach the given scheduler to this mailbox and begin processing messages + // sent to it. The mailbox must be a "holding" mailbox, as created by the + // default constructor Mailbox(). + void start(Scheduler* scheduler_); static void maybeReceive(std::weak_ptr); private: - Scheduler* scheduler; + optional scheduler; std::recursive_mutex receivingMutex; std::mutex pushingMutex; diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index b5f79aa28d..0cd12f6e95 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -19,11 +19,6 @@ namespace mbgl { namespace util { -class NoopScheduler : public Scheduler { -public: - void schedule(std::weak_ptr) override {} -}; - // Manages a thread with `Object`. // Upon creation of this object, it launches a thread and creates an object of type `Object` @@ -49,14 +44,13 @@ public: std::unique_ptr> running_ = std::make_unique>(); running = running_->get_future(); - // Pre-create a mailbox for this actor, using a NoopScheduler that - // leaves the mailbox's queue unconsumed. - // Once the RunLoop on the target thread has been created, we'll replace - // the NoopScheduler with the RunLoop. Meanwhile, this allows us to - // immediately provide ActorRef using this mailbox, with any messages - // sent to them being queued in the holding mailbox until the thread is - // up and running. - std::shared_ptr mailbox_ = std::make_shared(noopScheduler); + // Pre-create a "holding" mailbox for this actor, whose messages are + // guaranteed not to be consumed until we explicitly call start(), which + // we'll do on the target thread, once its RunLoop and Object instance + // are ready. + // Meanwhile, this allows us to immediately provide ActorRef using this + // mailbox to queue any messages that come in before the thread is ready. + std::shared_ptr mailbox_ = std::make_shared(); mailbox = mailbox_; auto tuple = std::make_tuple(std::forward(args)...); @@ -81,7 +75,7 @@ public: // Replace the NoopScheduler on the mailbox with the RunLoop to // begin actually processing messages. - actor->mailbox->setScheduler(this); + actor->mailbox->start(this); runningPromise->set_value(); @@ -178,7 +172,6 @@ private: return new (&actorStorage) Actor(std::move(sharedMailbox), std::move(std::get(std::forward(args)))...); } - NoopScheduler noopScheduler; std::weak_ptr mailbox; std::aligned_storage)> actorStorage; diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp index c117aa2605..20d9b25cad 100644 --- a/src/mbgl/actor/mailbox.cpp +++ b/src/mbgl/actor/mailbox.cpp @@ -6,6 +6,9 @@ namespace mbgl { +Mailbox::Mailbox() { +} + Mailbox::Mailbox(Scheduler& scheduler_) : scheduler(&scheduler_) { } @@ -22,18 +25,22 @@ void Mailbox::close() { closed = true; } -void Mailbox::setScheduler(Scheduler* scheduler_) { +void Mailbox::start(Scheduler* scheduler_) { + assert(!scheduler); + + // As with close(), block until neither receive() nor push() are in progress, and acquire the two + // mutexes in the same order. std::lock_guard receivingLock(receivingMutex); std::lock_guard pushingLock(pushingMutex); scheduler = scheduler_; - + if (closed) { return; } if (!queue.empty()) { - scheduler->schedule(shared_from_this()); + (*scheduler)->schedule(shared_from_this()); } } @@ -48,13 +55,15 @@ void Mailbox::push(std::unique_ptr message) { std::lock_guard queueLock(queueMutex); bool wasEmpty = queue.empty(); queue.push(std::move(message)); - if (wasEmpty) { - scheduler->schedule(shared_from_this()); + if (wasEmpty && scheduler) { + (*scheduler)->schedule(shared_from_this()); } } void Mailbox::receive() { std::lock_guard receivingLock(receivingMutex); + + assert(scheduler); if (closed) { return; @@ -74,7 +83,7 @@ void Mailbox::receive() { (*message)(); if (!wasEmpty) { - scheduler->schedule(shared_from_this()); + (*scheduler)->schedule(shared_from_this()); } } -- cgit v1.2.1