summaryrefslogtreecommitdiff
path: root/src/mbgl/actor
diff options
context:
space:
mode:
authorJohn Firebaugh <john.firebaugh@gmail.com>2016-09-06 15:01:34 -0700
committerJohn Firebaugh <john.firebaugh@gmail.com>2016-09-16 12:01:06 -0700
commit41bbd4e4f7d66465433e370ca024ab0239fcace3 (patch)
tree8fe15fa31d97aafeb175a808e431b437297af88b /src/mbgl/actor
parent0bd66d40ddf9e75f860fe18e7c80de9c840f48ac (diff)
downloadqtlocation-mapboxgl-41bbd4e4f7d66465433e370ca024ab0239fcace3.tar.gz
[core] Use an actor model for tile worker concurrency
Diffstat (limited to 'src/mbgl/actor')
-rw-r--r--src/mbgl/actor/actor.hpp76
-rw-r--r--src/mbgl/actor/actor_ref.hpp43
-rw-r--r--src/mbgl/actor/mailbox.cpp55
-rw-r--r--src/mbgl/actor/mailbox.hpp31
-rw-r--r--src/mbgl/actor/message.hpp48
-rw-r--r--src/mbgl/actor/scheduler.hpp38
-rw-r--r--src/mbgl/actor/thread_pool.cpp48
-rw-r--r--src/mbgl/actor/thread_pool.hpp28
8 files changed, 367 insertions, 0 deletions
diff --git a/src/mbgl/actor/actor.hpp b/src/mbgl/actor/actor.hpp
new file mode 100644
index 0000000000..281bbdaed1
--- /dev/null
+++ b/src/mbgl/actor/actor.hpp
@@ -0,0 +1,76 @@
+#pragma once
+
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/actor/message.hpp>
+#include <mbgl/actor/actor_ref.hpp>
+#include <mbgl/util/noncopyable.hpp>
+
+#include <memory>
+
+namespace mbgl {
+
+/*
+ An `Actor<O>` is an owning reference to an asynchronous object of type `O`: an "actor".
+ Communication with an actor happens via message passing: you send a message to the object
+ (using `invoke`), passing a pointer to the member function to call and arguments which
+ are then forwarded to the actor.
+
+ The actor receives messages sent to it asynchronously, in a manner defined its `Scheduler`.
+ To store incoming messages before their receipt, each actor has a `Mailbox`, which acts as
+ a FIFO queue. Messages sent from actor S to actor R are guaranteed to be processed in the
+ order sent. However, relative order of messages sent by two *different* actors S1 and S2
+ to R is *not* guaranteed (and can't be: S1 and S2 may be acting asynchronously with respect
+ to each other).
+
+ Construction and destruction of an actor is currently synchronous: the corresponding `O`
+ object is constructed synchronously by the `Actor` constructor, and destructed synchronously
+ by the `~Actor` destructor, after ensuring that the `O` is not currently receiving an
+ asynchronous message. (Construction and destruction may change to be asynchronous in the
+ future.)
+
+ An `Actor<O>` can be converted to an `ActorRef<O>`, a non-owning value object representing
+ a (weak) reference to the actor. Messages can be sent via the `Ref` as well.
+
+ It's safe -- and encouraged -- to pass `Ref`s between actors via messages. This is how two-way
+ communication and other forms of collaboration between multiple actors is accomplished.
+
+ It's safe for a `Ref` to outlive its `Actor` -- the reference is "weak", and does not extend
+ the lifetime of the owning Actor, and sending a message to a `Ref` whose `Actor` has died is
+ a no-op. (In the future, a dead-letters queue or log may be implemented.)
+
+ Please don't send messages that contain shared pointers or references. That subverts the
+ purpose of the actor model: prohibiting direct concurrent access to shared state.
+*/
+
+template <class Object>
+class Actor : public util::noncopyable {
+public:
+ template <class... Args>
+ Actor(Scheduler& scheduler, Args&&... args_)
+ : mailbox(std::make_shared<Mailbox>(scheduler)),
+ object(self(), std::forward<Args>(args_)...) {
+ }
+
+ ~Actor() {
+ mailbox->close();
+ }
+
+ template <typename Fn, class... Args>
+ void invoke(Fn fn, Args&&... args) {
+ mailbox->push(actor::makeMessage(object, fn, std::forward<Args>(args)...));
+ }
+
+ ActorRef<std::decay_t<Object>> self() {
+ return ActorRef<std::decay_t<Object>>(object, mailbox);
+ }
+
+ operator ActorRef<std::decay_t<Object>>() {
+ return self();
+ }
+
+private:
+ std::shared_ptr<Mailbox> mailbox;
+ Object object;
+};
+
+} // namespace mbgl
diff --git a/src/mbgl/actor/actor_ref.hpp b/src/mbgl/actor/actor_ref.hpp
new file mode 100644
index 0000000000..9d858d823f
--- /dev/null
+++ b/src/mbgl/actor/actor_ref.hpp
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/actor/message.hpp>
+
+#include <memory>
+
+namespace mbgl {
+
+/*
+ An `ActorRef<O>` is a *non*-owning, weak reference to an actor of type `O`. You can send it
+ messages just like an `Actor<O>`. It's a value object: safe to copy and pass between actors
+ via messages.
+
+ An `ActorRef<O>` does not extend the lifetime of the corresponding `Actor<O>`. That's determined
+ entirely by whichever object owns the `Actor<O>` -- the actor's "supervisor".
+
+ It's safe for a `Ref` to outlive its `Actor` -- the reference is "weak", and does not extend
+ the lifetime of the owning Actor, and sending a message to a `Ref` whose `Actor` has died is
+ a no-op. (In the future, a dead-letters queue or log may be implemented.)
+*/
+
+template <class Object>
+class ActorRef {
+public:
+ ActorRef(Object& object_, std::weak_ptr<Mailbox> weakMailbox_)
+ : object(object_),
+ weakMailbox(std::move(weakMailbox_)) {
+ }
+
+ template <typename Fn, class... Args>
+ void invoke(Fn fn, Args&&... args) {
+ if (auto mailbox = weakMailbox.lock()) {
+ mailbox->push(actor::makeMessage(object, fn, std::forward<Args>(args)...));
+ }
+ }
+
+private:
+ Object& object;
+ std::weak_ptr<Mailbox> weakMailbox;
+};
+
+} // namespace mbgl
diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp
new file mode 100644
index 0000000000..ae3c0967af
--- /dev/null
+++ b/src/mbgl/actor/mailbox.cpp
@@ -0,0 +1,55 @@
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/actor/message.hpp>
+#include <mbgl/actor/scheduler.hpp>
+
+#include <cassert>
+
+namespace mbgl {
+
+Mailbox::Mailbox(Scheduler& scheduler_)
+ : scheduler(scheduler_) {
+}
+
+void Mailbox::push(std::unique_ptr<Message> message) {
+ assert(!closing);
+
+ std::lock_guard<std::mutex> queueLock(queueMutex);
+ bool wasEmpty = queue.empty();
+ queue.push(std::move(message));
+ if (wasEmpty) {
+ scheduler.schedule(shared_from_this());
+ }
+}
+
+void Mailbox::close() {
+ // Block until the scheduler is guaranteed not to be executing receive().
+ std::lock_guard<std::mutex> closingLock(closingMutex);
+ closing = true;
+}
+
+void Mailbox::receive() {
+ std::lock_guard<std::mutex> closingLock(closingMutex);
+
+ if (closing) {
+ return;
+ }
+
+ std::unique_ptr<Message> message;
+ bool wasEmpty;
+
+ {
+ std::lock_guard<std::mutex> queueLock(queueMutex);
+ assert(!queue.empty());
+ message = std::move(queue.front());
+ queue.pop();
+ wasEmpty = queue.empty();
+ }
+
+ (*message)();
+
+ if (!wasEmpty) {
+ scheduler.schedule(shared_from_this());
+ }
+}
+
+} // namespace mbgl
diff --git a/src/mbgl/actor/mailbox.hpp b/src/mbgl/actor/mailbox.hpp
new file mode 100644
index 0000000000..5d5e8cb924
--- /dev/null
+++ b/src/mbgl/actor/mailbox.hpp
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <queue>
+
+namespace mbgl {
+
+class Scheduler;
+class Message;
+
+class Mailbox : public std::enable_shared_from_this<Mailbox> {
+public:
+ Mailbox(Scheduler&);
+
+ void push(std::unique_ptr<Message>);
+
+ void close();
+ void receive();
+
+private:
+ Scheduler& scheduler;
+
+ std::mutex closingMutex;
+ bool closing { false };
+
+ std::mutex queueMutex;
+ std::queue<std::unique_ptr<Message>> queue;
+};
+
+} // namespace mbgl
diff --git a/src/mbgl/actor/message.hpp b/src/mbgl/actor/message.hpp
new file mode 100644
index 0000000000..cf071d4933
--- /dev/null
+++ b/src/mbgl/actor/message.hpp
@@ -0,0 +1,48 @@
+#pragma once
+
+#include <utility>
+
+namespace mbgl {
+
+// A movable type-erasing function wrapper. This allows to store arbitrary invokable
+// things (like std::function<>, or the result of a movable-only std::bind()) in the queue.
+// Source: http://stackoverflow.com/a/29642072/331379
+class Message {
+public:
+ virtual ~Message() = default;
+ virtual void operator()() = 0;
+};
+
+template <class Object, class MemberFn, class ArgsTuple>
+class MessageImpl : public Message {
+public:
+ MessageImpl(Object& object_, MemberFn memberFn_, ArgsTuple argsTuple_)
+ : object(object_),
+ memberFn(memberFn_),
+ argsTuple(std::move(argsTuple_)) {
+ }
+
+ void operator()() override {
+ invoke(std::make_index_sequence<std::tuple_size<ArgsTuple>::value>());
+ }
+
+ template <std::size_t... I>
+ void invoke(std::index_sequence<I...>) {
+ (object.*memberFn)(std::move(std::get<I>(argsTuple))...);
+ }
+
+ Object& object;
+ MemberFn memberFn;
+ ArgsTuple argsTuple;
+};
+
+namespace actor {
+
+template <class Object, class MemberFn, class... Args>
+std::unique_ptr<Message> makeMessage(Object& object, MemberFn memberFn, Args&&... args) {
+ auto tuple = std::make_tuple(std::forward<Args>(args)...);
+ return std::make_unique<MessageImpl<Object, MemberFn, decltype(tuple)>>(object, memberFn, std::move(tuple));
+}
+
+} // namespace actor
+} // namespace mbgl
diff --git a/src/mbgl/actor/scheduler.hpp b/src/mbgl/actor/scheduler.hpp
new file mode 100644
index 0000000000..83689c3348
--- /dev/null
+++ b/src/mbgl/actor/scheduler.hpp
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <memory>
+
+namespace mbgl {
+
+class Mailbox;
+
+/*
+ A `Scheduler` is responsible for coordinating the processing of messages by
+ one or more actors via their mailboxes. It's an abstract interface. Currently,
+ the following concrete implementations exist:
+
+ * `ThreadPool` can coordinate an unlimited number of actors over any number of
+ threads via a pool, preserving the following behaviors:
+
+ - Messages from each individual mailbox are processed in order
+ - Only a single message from a mailbox is processed at a time; there is no
+ concurrency within a mailbox
+
+ Subject to these constraints, processing can happen on whatever thread in the
+ pool is available.
+
+ * `RunLoop` is a `Scheduler` that is typically used to create a mailbox and
+ `ActorRef` for an object that lives on the main thread and is not itself wrapped
+ as an `Actor`:
+
+ auto mailbox = std::make_shared<Mailbox>(*util::RunLoop::Get());
+ Actor<Worker> worker(threadPool, ActorRef<Foo>(*this, mailbox));
+*/
+
+class Scheduler {
+public:
+ virtual ~Scheduler() = default;
+ virtual void schedule(std::weak_ptr<Mailbox>) = 0;
+};
+
+} // namespace mbgl
diff --git a/src/mbgl/actor/thread_pool.cpp b/src/mbgl/actor/thread_pool.cpp
new file mode 100644
index 0000000000..89ca1b72f0
--- /dev/null
+++ b/src/mbgl/actor/thread_pool.cpp
@@ -0,0 +1,48 @@
+#include <mbgl/actor/thread_pool.hpp>
+#include <mbgl/actor/mailbox.hpp>
+
+namespace mbgl {
+
+ThreadPool::ThreadPool(std::size_t count) {
+ threads.reserve(count);
+ for (std::size_t i = 0; i < count; ++i) {
+ threads.emplace_back([this] () {
+ while (true) {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ cv.wait(lock, [this] {
+ return !queue.empty() || terminate.load();
+ });
+
+ if (terminate.load()) {
+ return;
+ }
+
+ auto mailbox = queue.front();
+ queue.pop();
+ lock.unlock();
+
+ if (auto locked = mailbox.lock()) {
+ locked->receive();
+ }
+ }
+ });
+ }
+}
+
+ThreadPool::~ThreadPool() {
+ terminate.store(true);
+ cv.notify_all();
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+void ThreadPool::schedule(std::weak_ptr<Mailbox> mailbox) {
+ std::lock_guard<std::mutex> lock(mutex);
+ queue.push(mailbox);
+ cv.notify_one();
+}
+
+} // namespace mbgl
diff --git a/src/mbgl/actor/thread_pool.hpp b/src/mbgl/actor/thread_pool.hpp
new file mode 100644
index 0000000000..7e0b40f4e3
--- /dev/null
+++ b/src/mbgl/actor/thread_pool.hpp
@@ -0,0 +1,28 @@
+#pragma once
+
+#include <mbgl/actor/scheduler.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+namespace mbgl {
+
+class ThreadPool : public Scheduler {
+public:
+ ThreadPool(std::size_t count);
+ ~ThreadPool() override;
+
+ void schedule(std::weak_ptr<Mailbox>) override;
+
+private:
+ std::vector<std::thread> threads;
+ std::queue<std::weak_ptr<Mailbox>> queue;
+ std::mutex mutex;
+ std::condition_variable cv;
+ std::atomic_bool terminate { false };
+};
+
+} // namespace mbgl