diff options
author | Thiago Marcos P. Santos <tmpsantos@gmail.com> | 2017-06-13 18:44:13 +0300 |
---|---|---|
committer | Thiago Marcos P. Santos <tmpsantos@gmail.com> | 2017-06-21 14:30:09 +0300 |
commit | 0de17ea5f47e9e6c4b67170d4d04b0425e67ce5c (patch) | |
tree | 27f587ff587cefcea994d037866d9bcbec3b7c95 | |
parent | 2676ad05b0b934a39485631f2700423891ce1ef1 (diff) | |
download | qtlocation-mapboxgl-0de17ea5f47e9e6c4b67170d4d04b0425e67ce5c.tar.gz |
[core] Added the ThreadedObject
Actor model version of util::Thread.
-rw-r--r-- | cmake/core-files.cmake | 3 | ||||
-rw-r--r-- | src/mbgl/storage/file_source_request.cpp | 37 | ||||
-rw-r--r-- | src/mbgl/storage/file_source_request.hpp | 31 | ||||
-rw-r--r-- | src/mbgl/util/threaded_object.hpp | 163 |
4 files changed, 234 insertions, 0 deletions
diff --git a/cmake/core-files.cmake b/cmake/core-files.cmake index 78451afc14..729d3f7da3 100644 --- a/cmake/core-files.cmake +++ b/cmake/core-files.cmake @@ -312,6 +312,8 @@ set(MBGL_CORE_FILES include/mbgl/storage/resource.hpp include/mbgl/storage/response.hpp src/mbgl/storage/asset_file_source.hpp + src/mbgl/storage/file_source_request.cpp + src/mbgl/storage/file_source_request.hpp src/mbgl/storage/http_file_source.hpp src/mbgl/storage/local_file_source.hpp src/mbgl/storage/network_status.cpp @@ -605,6 +607,7 @@ set(MBGL_CORE_FILES src/mbgl/util/thread_context.cpp src/mbgl/util/thread_context.hpp src/mbgl/util/thread_local.hpp + src/mbgl/util/threaded_object.hpp src/mbgl/util/throttler.cpp src/mbgl/util/throttler.hpp src/mbgl/util/tile_coordinate.hpp diff --git a/src/mbgl/storage/file_source_request.cpp b/src/mbgl/storage/file_source_request.cpp new file mode 100644 index 0000000000..8a6fb21181 --- /dev/null +++ b/src/mbgl/storage/file_source_request.cpp @@ -0,0 +1,37 @@ +#include <mbgl/storage/file_source_request.hpp> + +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/util/run_loop.hpp> + +namespace mbgl { + +FileSourceRequest::FileSourceRequest(FileSource::Callback&& callback) + : responseCallback(callback) + , mailbox(std::make_shared<Mailbox>(*util::RunLoop::Get())) { +} + +FileSourceRequest::~FileSourceRequest() { + if (cancelCallback) { + cancelCallback(); + } + + mailbox->close(); +} + +void FileSourceRequest::onCancel(std::function<void()>&& callback) { + cancelCallback = std::move(callback); +} + +void FileSourceRequest::setResponse(const Response& response) { + // Copy, because calling the callback will sometimes self + // destroy this object. We cannot move because this method + // can be called more than one. + auto callback = responseCallback; + callback(response); +} + +ActorRef<FileSourceRequest> FileSourceRequest::actor() { + return ActorRef<FileSourceRequest>(*this, mailbox); +} + +} // namespace mbgl diff --git a/src/mbgl/storage/file_source_request.hpp b/src/mbgl/storage/file_source_request.hpp new file mode 100644 index 0000000000..6bd0d44df6 --- /dev/null +++ b/src/mbgl/storage/file_source_request.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include <mbgl/actor/actor_ref.hpp> +#include <mbgl/storage/file_source.hpp> +#include <mbgl/util/async_request.hpp> + +#include <memory> +#include <functional> + +namespace mbgl { + +class Mailbox; + +class FileSourceRequest : public AsyncRequest { +public: + FileSourceRequest(FileSource::Callback&& callback); + ~FileSourceRequest() final; + + void onCancel(std::function<void()>&& callback); + void setResponse(const Response& res); + + ActorRef<FileSourceRequest> actor(); + +private: + FileSource::Callback responseCallback = nullptr; + std::function<void()> cancelCallback = nullptr; + + std::shared_ptr<Mailbox> mailbox; +}; + +} // namespace mbgl diff --git a/src/mbgl/util/threaded_object.hpp b/src/mbgl/util/threaded_object.hpp new file mode 100644 index 0000000000..7310eed2b1 --- /dev/null +++ b/src/mbgl/util/threaded_object.hpp @@ -0,0 +1,163 @@ +#pragma once + +#include <mbgl/actor/actor.hpp> +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/actor/scheduler.hpp> +#include <mbgl/util/platform.hpp> +#include <mbgl/util/run_loop.hpp> +#include <mbgl/util/util.hpp> + +#include <cassert> +#include <future> +#include <memory> +#include <mutex> +#include <queue> +#include <string> +#include <thread> +#include <utility> + +namespace mbgl { +namespace util { + +// Manages a thread with `Object`. + +// Upon creation of this object, it launches a thread and creates an object of type `Object` +// in that thread. When the `ThreadedObject<>` object is destructed, the destructor waits +// for thread termination. The `ThreadedObject<>` constructor blocks until the thread and +// the `Object` are fully created, so after the object creation, it's safe to obtain the +// `Object` stored in this thread. The thread created will always have low priority on +// the platforms that support setting thread priority. +// +// The following properties make this class different from `ThreadPool`: +// +// - Only one thread is created. +// - `Object` will live in a single thread, providing thread affinity. +// - It is safe to use `ThreadLocal` in an `Object` managed by `ThreadedObject<>` +// - A `RunLoop` is created for the `Object` thread. +// - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events. +// +template<class Object> +class ThreadedObject : public Scheduler { +public: + template <class... Args> + ThreadedObject(const std::string& name, Args&&... args) { + std::promise<void> running; + + thread = std::thread([&] { + platform::setCurrentThreadName(name); + platform::makeThreadLowPriority(); + + util::RunLoop loop_(util::RunLoop::Type::New); + loop = &loop_; + + object = std::make_unique<Actor<Object>>(*this, std::forward<Args>(args)...); + running.set_value(); + + loop->run(); + loop = nullptr; + }); + + running.get_future().get(); + } + + ~ThreadedObject() override { + MBGL_VERIFY_THREAD(tid); + + if (paused) { + resume(); + } + + std::promise<void> joinable; + + // Kill the actor, so we don't get more + // messages posted on this scheduler after + // we delete the RunLoop. + loop->invoke([&] { + object.reset(); + joinable.set_value(); + }); + + joinable.get_future().get(); + + loop->stop(); + thread.join(); + } + + // Returns a non-owning reference to `Object` that + // can be used to send messages to `Object`. It is safe + // to the non-owning reference to outlive this object + // and be used after the `ThreadedObject<>` gets destroyed. + ActorRef<std::decay_t<Object>> actor() const { + return object->self(); + } + + // Pauses the `Object` thread. It will prevent the object to wake + // up from events such as timers and file descriptor I/O. Messages + // sent to a paused `Object` will be queued and only processed after + // `resume()` is called. + void pause() { + MBGL_VERIFY_THREAD(tid); + + assert(!paused); + + paused = std::make_unique<std::promise<void>>(); + resumed = std::make_unique<std::promise<void>>(); + + auto pausing = paused->get_future(); + + loop->invoke([this] { + auto resuming = resumed->get_future(); + paused->set_value(); + resuming.get(); + }); + + pausing.get(); + } + + // Resumes the `Object` thread previously paused by `pause()`. + void resume() { + MBGL_VERIFY_THREAD(tid); + + assert(paused); + + resumed->set_value(); + + resumed.reset(); + paused.reset(); + } + +private: + MBGL_STORE_THREAD(tid); + + void schedule(std::weak_ptr<Mailbox> mailbox) override { + { + std::lock_guard<std::mutex> lock(mutex); + queue.push(mailbox); + } + + loop->invoke([this] { receive(); }); + } + + void receive() { + std::unique_lock<std::mutex> lock(mutex); + + auto mailbox = queue.front(); + queue.pop(); + lock.unlock(); + + Mailbox::maybeReceive(mailbox); + } + + std::mutex mutex; + std::queue<std::weak_ptr<Mailbox>> queue; + std::thread thread; + std::unique_ptr<Actor<Object>> object; + + std::unique_ptr<std::promise<void>> paused; + std::unique_ptr<std::promise<void>> resumed; + + util::RunLoop* loop = nullptr; +}; + +} // namespace util +} // namespace mbgl |