summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago Marcos P. Santos <tmpsantos@gmail.com>2017-06-13 18:44:13 +0300
committerThiago Marcos P. Santos <tmpsantos@gmail.com>2017-06-21 14:30:09 +0300
commit0de17ea5f47e9e6c4b67170d4d04b0425e67ce5c (patch)
tree27f587ff587cefcea994d037866d9bcbec3b7c95
parent2676ad05b0b934a39485631f2700423891ce1ef1 (diff)
downloadqtlocation-mapboxgl-0de17ea5f47e9e6c4b67170d4d04b0425e67ce5c.tar.gz
[core] Added the ThreadedObject
Actor model version of util::Thread.
-rw-r--r--cmake/core-files.cmake3
-rw-r--r--src/mbgl/storage/file_source_request.cpp37
-rw-r--r--src/mbgl/storage/file_source_request.hpp31
-rw-r--r--src/mbgl/util/threaded_object.hpp163
4 files changed, 234 insertions, 0 deletions
diff --git a/cmake/core-files.cmake b/cmake/core-files.cmake
index 78451afc14..729d3f7da3 100644
--- a/cmake/core-files.cmake
+++ b/cmake/core-files.cmake
@@ -312,6 +312,8 @@ set(MBGL_CORE_FILES
include/mbgl/storage/resource.hpp
include/mbgl/storage/response.hpp
src/mbgl/storage/asset_file_source.hpp
+ src/mbgl/storage/file_source_request.cpp
+ src/mbgl/storage/file_source_request.hpp
src/mbgl/storage/http_file_source.hpp
src/mbgl/storage/local_file_source.hpp
src/mbgl/storage/network_status.cpp
@@ -605,6 +607,7 @@ set(MBGL_CORE_FILES
src/mbgl/util/thread_context.cpp
src/mbgl/util/thread_context.hpp
src/mbgl/util/thread_local.hpp
+ src/mbgl/util/threaded_object.hpp
src/mbgl/util/throttler.cpp
src/mbgl/util/throttler.hpp
src/mbgl/util/tile_coordinate.hpp
diff --git a/src/mbgl/storage/file_source_request.cpp b/src/mbgl/storage/file_source_request.cpp
new file mode 100644
index 0000000000..8a6fb21181
--- /dev/null
+++ b/src/mbgl/storage/file_source_request.cpp
@@ -0,0 +1,37 @@
+#include <mbgl/storage/file_source_request.hpp>
+
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/util/run_loop.hpp>
+
+namespace mbgl {
+
+FileSourceRequest::FileSourceRequest(FileSource::Callback&& callback)
+ : responseCallback(callback)
+ , mailbox(std::make_shared<Mailbox>(*util::RunLoop::Get())) {
+}
+
+FileSourceRequest::~FileSourceRequest() {
+ if (cancelCallback) {
+ cancelCallback();
+ }
+
+ mailbox->close();
+}
+
+void FileSourceRequest::onCancel(std::function<void()>&& callback) {
+ cancelCallback = std::move(callback);
+}
+
+void FileSourceRequest::setResponse(const Response& response) {
+ // Copy, because calling the callback will sometimes self
+ // destroy this object. We cannot move because this method
+ // can be called more than one.
+ auto callback = responseCallback;
+ callback(response);
+}
+
+ActorRef<FileSourceRequest> FileSourceRequest::actor() {
+ return ActorRef<FileSourceRequest>(*this, mailbox);
+}
+
+} // namespace mbgl
diff --git a/src/mbgl/storage/file_source_request.hpp b/src/mbgl/storage/file_source_request.hpp
new file mode 100644
index 0000000000..6bd0d44df6
--- /dev/null
+++ b/src/mbgl/storage/file_source_request.hpp
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <mbgl/actor/actor_ref.hpp>
+#include <mbgl/storage/file_source.hpp>
+#include <mbgl/util/async_request.hpp>
+
+#include <memory>
+#include <functional>
+
+namespace mbgl {
+
+class Mailbox;
+
+class FileSourceRequest : public AsyncRequest {
+public:
+ FileSourceRequest(FileSource::Callback&& callback);
+ ~FileSourceRequest() final;
+
+ void onCancel(std::function<void()>&& callback);
+ void setResponse(const Response& res);
+
+ ActorRef<FileSourceRequest> actor();
+
+private:
+ FileSource::Callback responseCallback = nullptr;
+ std::function<void()> cancelCallback = nullptr;
+
+ std::shared_ptr<Mailbox> mailbox;
+};
+
+} // namespace mbgl
diff --git a/src/mbgl/util/threaded_object.hpp b/src/mbgl/util/threaded_object.hpp
new file mode 100644
index 0000000000..7310eed2b1
--- /dev/null
+++ b/src/mbgl/util/threaded_object.hpp
@@ -0,0 +1,163 @@
+#pragma once
+
+#include <mbgl/actor/actor.hpp>
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/actor/scheduler.hpp>
+#include <mbgl/util/platform.hpp>
+#include <mbgl/util/run_loop.hpp>
+#include <mbgl/util/util.hpp>
+
+#include <cassert>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <string>
+#include <thread>
+#include <utility>
+
+namespace mbgl {
+namespace util {
+
+// Manages a thread with `Object`.
+
+// Upon creation of this object, it launches a thread and creates an object of type `Object`
+// in that thread. When the `ThreadedObject<>` object is destructed, the destructor waits
+// for thread termination. The `ThreadedObject<>` constructor blocks until the thread and
+// the `Object` are fully created, so after the object creation, it's safe to obtain the
+// `Object` stored in this thread. The thread created will always have low priority on
+// the platforms that support setting thread priority.
+//
+// The following properties make this class different from `ThreadPool`:
+//
+// - Only one thread is created.
+// - `Object` will live in a single thread, providing thread affinity.
+// - It is safe to use `ThreadLocal` in an `Object` managed by `ThreadedObject<>`
+// - A `RunLoop` is created for the `Object` thread.
+// - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events.
+//
+template<class Object>
+class ThreadedObject : public Scheduler {
+public:
+ template <class... Args>
+ ThreadedObject(const std::string& name, Args&&... args) {
+ std::promise<void> running;
+
+ thread = std::thread([&] {
+ platform::setCurrentThreadName(name);
+ platform::makeThreadLowPriority();
+
+ util::RunLoop loop_(util::RunLoop::Type::New);
+ loop = &loop_;
+
+ object = std::make_unique<Actor<Object>>(*this, std::forward<Args>(args)...);
+ running.set_value();
+
+ loop->run();
+ loop = nullptr;
+ });
+
+ running.get_future().get();
+ }
+
+ ~ThreadedObject() override {
+ MBGL_VERIFY_THREAD(tid);
+
+ if (paused) {
+ resume();
+ }
+
+ std::promise<void> joinable;
+
+ // Kill the actor, so we don't get more
+ // messages posted on this scheduler after
+ // we delete the RunLoop.
+ loop->invoke([&] {
+ object.reset();
+ joinable.set_value();
+ });
+
+ joinable.get_future().get();
+
+ loop->stop();
+ thread.join();
+ }
+
+ // Returns a non-owning reference to `Object` that
+ // can be used to send messages to `Object`. It is safe
+ // to the non-owning reference to outlive this object
+ // and be used after the `ThreadedObject<>` gets destroyed.
+ ActorRef<std::decay_t<Object>> actor() const {
+ return object->self();
+ }
+
+ // Pauses the `Object` thread. It will prevent the object to wake
+ // up from events such as timers and file descriptor I/O. Messages
+ // sent to a paused `Object` will be queued and only processed after
+ // `resume()` is called.
+ void pause() {
+ MBGL_VERIFY_THREAD(tid);
+
+ assert(!paused);
+
+ paused = std::make_unique<std::promise<void>>();
+ resumed = std::make_unique<std::promise<void>>();
+
+ auto pausing = paused->get_future();
+
+ loop->invoke([this] {
+ auto resuming = resumed->get_future();
+ paused->set_value();
+ resuming.get();
+ });
+
+ pausing.get();
+ }
+
+ // Resumes the `Object` thread previously paused by `pause()`.
+ void resume() {
+ MBGL_VERIFY_THREAD(tid);
+
+ assert(paused);
+
+ resumed->set_value();
+
+ resumed.reset();
+ paused.reset();
+ }
+
+private:
+ MBGL_STORE_THREAD(tid);
+
+ void schedule(std::weak_ptr<Mailbox> mailbox) override {
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ queue.push(mailbox);
+ }
+
+ loop->invoke([this] { receive(); });
+ }
+
+ void receive() {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ auto mailbox = queue.front();
+ queue.pop();
+ lock.unlock();
+
+ Mailbox::maybeReceive(mailbox);
+ }
+
+ std::mutex mutex;
+ std::queue<std::weak_ptr<Mailbox>> queue;
+ std::thread thread;
+ std::unique_ptr<Actor<Object>> object;
+
+ std::unique_ptr<std::promise<void>> paused;
+ std::unique_ptr<std::promise<void>> resumed;
+
+ util::RunLoop* loop = nullptr;
+};
+
+} // namespace util
+} // namespace mbgl