summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnand Thakker <github@anandthakker.net>2018-06-13 09:27:57 -0400
committerAnand Thakker <github@anandthakker.net>2018-06-13 09:27:57 -0400
commit3f3a048a99ded73f80e3fe91835316b57780114f (patch)
tree12b234d12b5d220dc7e38d242340c2961cf4754e
parentf41bdc4407e8780e55f8ee6d8719e1dad85b1f7f (diff)
downloadqtlocation-mapboxgl-3f3a048a99ded73f80e3fe91835316b57780114f.tar.gz
Avoid blocking in Thread<Object> constructor
-rw-r--r--include/mbgl/actor/actor.hpp19
-rw-r--r--include/mbgl/actor/mailbox.hpp6
-rw-r--r--include/mbgl/util/thread.hpp62
-rw-r--r--src/mbgl/actor/mailbox.cpp22
4 files changed, 92 insertions, 17 deletions
diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp
index a0df19208e..2d2757459c 100644
--- a/include/mbgl/actor/actor.hpp
+++ b/include/mbgl/actor/actor.hpp
@@ -11,6 +11,11 @@
namespace mbgl {
+namespace util {
+ template <typename O>
+ class Thread;
+}
+
/*
An `Actor<O>` is an owning reference to an asynchronous object of type `O`: an "actor".
Communication with an actor happens via message passing: you send a message to the object
@@ -56,12 +61,23 @@ public:
object(self(), std::forward<Args>(args_)...) {
}
+ template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr>
+ Actor(std::shared_ptr<Mailbox> mailbox_, Args&&... args_)
+ : mailbox(std::move(mailbox_)),
+ object(self(), std::forward<Args>(args_)...) {
+ }
+
// Enabled for plain Objects
template<typename U = Object, class... Args, typename std::enable_if<!std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr>
Actor(Scheduler& scheduler, Args&& ... args_)
: mailbox(std::make_shared<Mailbox>(scheduler)), object(std::forward<Args>(args_)...) {
}
+ template<typename U = Object, class... Args, typename std::enable_if<!std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr>
+ Actor(std::shared_ptr<Mailbox> mailbox_, Args&& ... args_)
+ : mailbox(mailbox_), object(std::forward<Args>(args_)...) {
+ }
+
~Actor() {
mailbox->close();
}
@@ -91,6 +107,9 @@ public:
}
private:
+ template <typename O>
+ friend class util::Thread;
+
std::shared_ptr<Mailbox> mailbox;
Object object;
};
diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp
index 8ecf91701a..1603ea4924 100644
--- a/include/mbgl/actor/mailbox.hpp
+++ b/include/mbgl/actor/mailbox.hpp
@@ -17,11 +17,15 @@ public:
void close();
void receive();
+
+ // Replace this mailbox's scheduler. Effectively allows a mailbox to be
+ // created on one thread and moved to another one.
+ void setScheduler(Scheduler* scheduler_);
static void maybeReceive(std::weak_ptr<Mailbox>);
private:
- Scheduler& scheduler;
+ Scheduler* scheduler;
std::recursive_mutex receivingMutex;
std::mutex pushingMutex;
diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp
index 74e722b02d..8230d8778f 100644
--- a/include/mbgl/util/thread.hpp
+++ b/include/mbgl/util/thread.hpp
@@ -19,6 +19,11 @@
namespace mbgl {
namespace util {
+class NoopScheduler : public Scheduler {
+public:
+ void schedule(std::weak_ptr<Mailbox>) override {}
+};
+
// Manages a thread with `Object`.
// Upon creation of this object, it launches a thread and creates an object of type `Object`
@@ -41,23 +46,37 @@ class Thread : public Scheduler {
public:
template <class... Args>
Thread(const std::string& name, Args&&... args) {
- std::promise<void> running;
-
- thread = std::thread([&] {
+ std::unique_ptr<std::promise<void>> running_ = std::make_unique<std::promise<void>>();
+ running = running_->get_future();
+
+ // Pre-create a mailbox for this actor, using a NoopScheduler that
+ // leaves the mailbox's queue unconsumed.
+ // Once the RunLoop on the target thread has been created, we'll replace
+ // the NoopScheduler with the RunLoop. Meanwhile, this allows us to
+ // immediately provide ActorRef using this mailbox, with any messages
+ // sent to them being queued in the holding mailbox until the thread is
+ // up and running.
+ std::shared_ptr<Mailbox> mailbox_ = std::make_shared<Mailbox>(noopScheduler);
+ mailbox = mailbox_;
+
+ thread = std::thread([&, sharedMailbox = std::move(mailbox_), runningPromise = std::move(running_)] {
platform::setCurrentThreadName(name);
platform::makeThreadLowPriority();
util::RunLoop loop_(util::RunLoop::Type::New);
loop = &loop_;
- object = std::make_unique<Actor<Object>>(*this, std::forward<Args>(args)...);
- running.set_value();
+ Actor<Object>* actor = new (&actorStorage) Actor<Object>(std::move(sharedMailbox), std::forward<Args>(args)...);
+ // Replace the NoopScheduler on the mailbox with the RunLoop to
+ // begin actually processing messages.
+ actor->mailbox->setScheduler(this);
+
+ runningPromise->set_value();
+
loop->run();
loop = nullptr;
});
-
- running.get_future().get();
}
~Thread() override {
@@ -66,12 +85,14 @@ public:
}
std::promise<void> joinable;
+
+ running.wait();
// Kill the actor, so we don't get more
// messages posted on this scheduler after
// we delete the RunLoop.
loop->invoke([&] {
- object.reset();
+ reinterpret_cast<const Actor<Object>*>(&actorStorage)->~Actor<Object>();
joinable.set_value();
});
@@ -85,8 +106,15 @@ public:
// can be used to send messages to `Object`. It is safe
// to the non-owning reference to outlive this object
// and be used after the `Thread<>` gets destroyed.
- ActorRef<std::decay_t<Object>> actor() const {
- return object->self();
+ ActorRef<std::decay_t<Object>> actor() {
+ // The actor->object reference we provide here will not actually be
+ // valid until the child thread constructs Actor<Object> into
+ // actorStorage using "placement new".
+ // We guarantee that the object reference isn't actually used by
+ // using the NoopScheduler to prevent messages to this mailbox from
+ // being processed until after the actor has been constructed.
+ auto actor = reinterpret_cast<Actor<Object>*>(&actorStorage);
+ return ActorRef<std::decay_t<Object>>(actor->object, mailbox);
}
// Pauses the `Object` thread. It will prevent the object to wake
@@ -103,6 +131,8 @@ public:
auto pausing = paused->get_future();
+ running.wait();
+
loop->invoke(RunLoop::Priority::High, [this] {
auto resuming = resumed->get_future();
paused->set_value();
@@ -127,13 +157,19 @@ public:
private:
MBGL_STORE_THREAD(tid);
- void schedule(std::weak_ptr<Mailbox> mailbox) override {
- loop->schedule(mailbox);
+ void schedule(std::weak_ptr<Mailbox> mailbox_) override {
+ assert(loop);
+ loop->schedule(mailbox_);
}
+ NoopScheduler noopScheduler;
+ std::weak_ptr<Mailbox> mailbox;
+ std::aligned_storage<sizeof(Actor<Object>)> actorStorage;
+
std::thread thread;
- std::unique_ptr<Actor<Object>> object;
+ std::future<void> running;
+
std::unique_ptr<std::promise<void>> paused;
std::unique_ptr<std::promise<void>> resumed;
diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp
index 373c24275f..c117aa2605 100644
--- a/src/mbgl/actor/mailbox.cpp
+++ b/src/mbgl/actor/mailbox.cpp
@@ -7,7 +7,7 @@
namespace mbgl {
Mailbox::Mailbox(Scheduler& scheduler_)
- : scheduler(scheduler_) {
+ : scheduler(&scheduler_) {
}
void Mailbox::close() {
@@ -22,6 +22,22 @@ void Mailbox::close() {
closed = true;
}
+void Mailbox::setScheduler(Scheduler* scheduler_) {
+ std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex);
+ std::lock_guard<std::mutex> pushingLock(pushingMutex);
+
+ scheduler = scheduler_;
+
+ if (closed) {
+ return;
+ }
+
+ if (!queue.empty()) {
+ scheduler->schedule(shared_from_this());
+ }
+}
+
+
void Mailbox::push(std::unique_ptr<Message> message) {
std::lock_guard<std::mutex> pushingLock(pushingMutex);
@@ -33,7 +49,7 @@ void Mailbox::push(std::unique_ptr<Message> message) {
bool wasEmpty = queue.empty();
queue.push(std::move(message));
if (wasEmpty) {
- scheduler.schedule(shared_from_this());
+ scheduler->schedule(shared_from_this());
}
}
@@ -58,7 +74,7 @@ void Mailbox::receive() {
(*message)();
if (!wasEmpty) {
- scheduler.schedule(shared_from_this());
+ scheduler->schedule(shared_from_this());
}
}