diff options
author | John Firebaugh <john.firebaugh@gmail.com> | 2016-09-06 15:01:34 -0700 |
---|---|---|
committer | John Firebaugh <john.firebaugh@gmail.com> | 2016-09-16 12:01:06 -0700 |
commit | 41bbd4e4f7d66465433e370ca024ab0239fcace3 (patch) | |
tree | 8fe15fa31d97aafeb175a808e431b437297af88b /src/mbgl/actor | |
parent | 0bd66d40ddf9e75f860fe18e7c80de9c840f48ac (diff) | |
download | qtlocation-mapboxgl-41bbd4e4f7d66465433e370ca024ab0239fcace3.tar.gz |
[core] Use an actor model for tile worker concurrency
Diffstat (limited to 'src/mbgl/actor')
-rw-r--r-- | src/mbgl/actor/actor.hpp | 76 | ||||
-rw-r--r-- | src/mbgl/actor/actor_ref.hpp | 43 | ||||
-rw-r--r-- | src/mbgl/actor/mailbox.cpp | 55 | ||||
-rw-r--r-- | src/mbgl/actor/mailbox.hpp | 31 | ||||
-rw-r--r-- | src/mbgl/actor/message.hpp | 48 | ||||
-rw-r--r-- | src/mbgl/actor/scheduler.hpp | 38 | ||||
-rw-r--r-- | src/mbgl/actor/thread_pool.cpp | 48 | ||||
-rw-r--r-- | src/mbgl/actor/thread_pool.hpp | 28 |
8 files changed, 367 insertions, 0 deletions
diff --git a/src/mbgl/actor/actor.hpp b/src/mbgl/actor/actor.hpp new file mode 100644 index 0000000000..281bbdaed1 --- /dev/null +++ b/src/mbgl/actor/actor.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/actor/message.hpp> +#include <mbgl/actor/actor_ref.hpp> +#include <mbgl/util/noncopyable.hpp> + +#include <memory> + +namespace mbgl { + +/* + An `Actor<O>` is an owning reference to an asynchronous object of type `O`: an "actor". + Communication with an actor happens via message passing: you send a message to the object + (using `invoke`), passing a pointer to the member function to call and arguments which + are then forwarded to the actor. + + The actor receives messages sent to it asynchronously, in a manner defined its `Scheduler`. + To store incoming messages before their receipt, each actor has a `Mailbox`, which acts as + a FIFO queue. Messages sent from actor S to actor R are guaranteed to be processed in the + order sent. However, relative order of messages sent by two *different* actors S1 and S2 + to R is *not* guaranteed (and can't be: S1 and S2 may be acting asynchronously with respect + to each other). + + Construction and destruction of an actor is currently synchronous: the corresponding `O` + object is constructed synchronously by the `Actor` constructor, and destructed synchronously + by the `~Actor` destructor, after ensuring that the `O` is not currently receiving an + asynchronous message. (Construction and destruction may change to be asynchronous in the + future.) + + An `Actor<O>` can be converted to an `ActorRef<O>`, a non-owning value object representing + a (weak) reference to the actor. Messages can be sent via the `Ref` as well. + + It's safe -- and encouraged -- to pass `Ref`s between actors via messages. This is how two-way + communication and other forms of collaboration between multiple actors is accomplished. + + It's safe for a `Ref` to outlive its `Actor` -- the reference is "weak", and does not extend + the lifetime of the owning Actor, and sending a message to a `Ref` whose `Actor` has died is + a no-op. (In the future, a dead-letters queue or log may be implemented.) + + Please don't send messages that contain shared pointers or references. That subverts the + purpose of the actor model: prohibiting direct concurrent access to shared state. +*/ + +template <class Object> +class Actor : public util::noncopyable { +public: + template <class... Args> + Actor(Scheduler& scheduler, Args&&... args_) + : mailbox(std::make_shared<Mailbox>(scheduler)), + object(self(), std::forward<Args>(args_)...) { + } + + ~Actor() { + mailbox->close(); + } + + template <typename Fn, class... Args> + void invoke(Fn fn, Args&&... args) { + mailbox->push(actor::makeMessage(object, fn, std::forward<Args>(args)...)); + } + + ActorRef<std::decay_t<Object>> self() { + return ActorRef<std::decay_t<Object>>(object, mailbox); + } + + operator ActorRef<std::decay_t<Object>>() { + return self(); + } + +private: + std::shared_ptr<Mailbox> mailbox; + Object object; +}; + +} // namespace mbgl diff --git a/src/mbgl/actor/actor_ref.hpp b/src/mbgl/actor/actor_ref.hpp new file mode 100644 index 0000000000..9d858d823f --- /dev/null +++ b/src/mbgl/actor/actor_ref.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/actor/message.hpp> + +#include <memory> + +namespace mbgl { + +/* + An `ActorRef<O>` is a *non*-owning, weak reference to an actor of type `O`. You can send it + messages just like an `Actor<O>`. It's a value object: safe to copy and pass between actors + via messages. + + An `ActorRef<O>` does not extend the lifetime of the corresponding `Actor<O>`. That's determined + entirely by whichever object owns the `Actor<O>` -- the actor's "supervisor". + + It's safe for a `Ref` to outlive its `Actor` -- the reference is "weak", and does not extend + the lifetime of the owning Actor, and sending a message to a `Ref` whose `Actor` has died is + a no-op. (In the future, a dead-letters queue or log may be implemented.) +*/ + +template <class Object> +class ActorRef { +public: + ActorRef(Object& object_, std::weak_ptr<Mailbox> weakMailbox_) + : object(object_), + weakMailbox(std::move(weakMailbox_)) { + } + + template <typename Fn, class... Args> + void invoke(Fn fn, Args&&... args) { + if (auto mailbox = weakMailbox.lock()) { + mailbox->push(actor::makeMessage(object, fn, std::forward<Args>(args)...)); + } + } + +private: + Object& object; + std::weak_ptr<Mailbox> weakMailbox; +}; + +} // namespace mbgl diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp new file mode 100644 index 0000000000..ae3c0967af --- /dev/null +++ b/src/mbgl/actor/mailbox.cpp @@ -0,0 +1,55 @@ +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/actor/message.hpp> +#include <mbgl/actor/scheduler.hpp> + +#include <cassert> + +namespace mbgl { + +Mailbox::Mailbox(Scheduler& scheduler_) + : scheduler(scheduler_) { +} + +void Mailbox::push(std::unique_ptr<Message> message) { + assert(!closing); + + std::lock_guard<std::mutex> queueLock(queueMutex); + bool wasEmpty = queue.empty(); + queue.push(std::move(message)); + if (wasEmpty) { + scheduler.schedule(shared_from_this()); + } +} + +void Mailbox::close() { + // Block until the scheduler is guaranteed not to be executing receive(). + std::lock_guard<std::mutex> closingLock(closingMutex); + closing = true; +} + +void Mailbox::receive() { + std::lock_guard<std::mutex> closingLock(closingMutex); + + if (closing) { + return; + } + + std::unique_ptr<Message> message; + bool wasEmpty; + + { + std::lock_guard<std::mutex> queueLock(queueMutex); + assert(!queue.empty()); + message = std::move(queue.front()); + queue.pop(); + wasEmpty = queue.empty(); + } + + (*message)(); + + if (!wasEmpty) { + scheduler.schedule(shared_from_this()); + } +} + +} // namespace mbgl diff --git a/src/mbgl/actor/mailbox.hpp b/src/mbgl/actor/mailbox.hpp new file mode 100644 index 0000000000..5d5e8cb924 --- /dev/null +++ b/src/mbgl/actor/mailbox.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include <memory> +#include <mutex> +#include <queue> + +namespace mbgl { + +class Scheduler; +class Message; + +class Mailbox : public std::enable_shared_from_this<Mailbox> { +public: + Mailbox(Scheduler&); + + void push(std::unique_ptr<Message>); + + void close(); + void receive(); + +private: + Scheduler& scheduler; + + std::mutex closingMutex; + bool closing { false }; + + std::mutex queueMutex; + std::queue<std::unique_ptr<Message>> queue; +}; + +} // namespace mbgl diff --git a/src/mbgl/actor/message.hpp b/src/mbgl/actor/message.hpp new file mode 100644 index 0000000000..cf071d4933 --- /dev/null +++ b/src/mbgl/actor/message.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include <utility> + +namespace mbgl { + +// A movable type-erasing function wrapper. This allows to store arbitrary invokable +// things (like std::function<>, or the result of a movable-only std::bind()) in the queue. +// Source: http://stackoverflow.com/a/29642072/331379 +class Message { +public: + virtual ~Message() = default; + virtual void operator()() = 0; +}; + +template <class Object, class MemberFn, class ArgsTuple> +class MessageImpl : public Message { +public: + MessageImpl(Object& object_, MemberFn memberFn_, ArgsTuple argsTuple_) + : object(object_), + memberFn(memberFn_), + argsTuple(std::move(argsTuple_)) { + } + + void operator()() override { + invoke(std::make_index_sequence<std::tuple_size<ArgsTuple>::value>()); + } + + template <std::size_t... I> + void invoke(std::index_sequence<I...>) { + (object.*memberFn)(std::move(std::get<I>(argsTuple))...); + } + + Object& object; + MemberFn memberFn; + ArgsTuple argsTuple; +}; + +namespace actor { + +template <class Object, class MemberFn, class... Args> +std::unique_ptr<Message> makeMessage(Object& object, MemberFn memberFn, Args&&... args) { + auto tuple = std::make_tuple(std::forward<Args>(args)...); + return std::make_unique<MessageImpl<Object, MemberFn, decltype(tuple)>>(object, memberFn, std::move(tuple)); +} + +} // namespace actor +} // namespace mbgl diff --git a/src/mbgl/actor/scheduler.hpp b/src/mbgl/actor/scheduler.hpp new file mode 100644 index 0000000000..83689c3348 --- /dev/null +++ b/src/mbgl/actor/scheduler.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include <memory> + +namespace mbgl { + +class Mailbox; + +/* + A `Scheduler` is responsible for coordinating the processing of messages by + one or more actors via their mailboxes. It's an abstract interface. Currently, + the following concrete implementations exist: + + * `ThreadPool` can coordinate an unlimited number of actors over any number of + threads via a pool, preserving the following behaviors: + + - Messages from each individual mailbox are processed in order + - Only a single message from a mailbox is processed at a time; there is no + concurrency within a mailbox + + Subject to these constraints, processing can happen on whatever thread in the + pool is available. + + * `RunLoop` is a `Scheduler` that is typically used to create a mailbox and + `ActorRef` for an object that lives on the main thread and is not itself wrapped + as an `Actor`: + + auto mailbox = std::make_shared<Mailbox>(*util::RunLoop::Get()); + Actor<Worker> worker(threadPool, ActorRef<Foo>(*this, mailbox)); +*/ + +class Scheduler { +public: + virtual ~Scheduler() = default; + virtual void schedule(std::weak_ptr<Mailbox>) = 0; +}; + +} // namespace mbgl diff --git a/src/mbgl/actor/thread_pool.cpp b/src/mbgl/actor/thread_pool.cpp new file mode 100644 index 0000000000..89ca1b72f0 --- /dev/null +++ b/src/mbgl/actor/thread_pool.cpp @@ -0,0 +1,48 @@ +#include <mbgl/actor/thread_pool.hpp> +#include <mbgl/actor/mailbox.hpp> + +namespace mbgl { + +ThreadPool::ThreadPool(std::size_t count) { + threads.reserve(count); + for (std::size_t i = 0; i < count; ++i) { + threads.emplace_back([this] () { + while (true) { + std::unique_lock<std::mutex> lock(mutex); + + cv.wait(lock, [this] { + return !queue.empty() || terminate.load(); + }); + + if (terminate.load()) { + return; + } + + auto mailbox = queue.front(); + queue.pop(); + lock.unlock(); + + if (auto locked = mailbox.lock()) { + locked->receive(); + } + } + }); + } +} + +ThreadPool::~ThreadPool() { + terminate.store(true); + cv.notify_all(); + + for (auto& thread : threads) { + thread.join(); + } +} + +void ThreadPool::schedule(std::weak_ptr<Mailbox> mailbox) { + std::lock_guard<std::mutex> lock(mutex); + queue.push(mailbox); + cv.notify_one(); +} + +} // namespace mbgl diff --git a/src/mbgl/actor/thread_pool.hpp b/src/mbgl/actor/thread_pool.hpp new file mode 100644 index 0000000000..7e0b40f4e3 --- /dev/null +++ b/src/mbgl/actor/thread_pool.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include <mbgl/actor/scheduler.hpp> + +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <queue> +#include <thread> + +namespace mbgl { + +class ThreadPool : public Scheduler { +public: + ThreadPool(std::size_t count); + ~ThreadPool() override; + + void schedule(std::weak_ptr<Mailbox>) override; + +private: + std::vector<std::thread> threads; + std::queue<std::weak_ptr<Mailbox>> queue; + std::mutex mutex; + std::condition_variable cv; + std::atomic_bool terminate { false }; +}; + +} // namespace mbgl |