diff options
author | John Firebaugh <john.firebaugh@gmail.com> | 2015-04-13 16:41:37 -0700 |
---|---|---|
committer | John Firebaugh <john.firebaugh@gmail.com> | 2015-04-15 20:38:25 -0700 |
commit | 1eaf6b8c18d52fb060bb70a76a516c8a7c56a3fa (patch) | |
tree | c03669cce16494bf720be33fb26d68a6242e7c23 | |
parent | 1066504008ab9d779fcc007d3956d5966b7892e8 (diff) | |
download | qtlocation-mapboxgl-1eaf6b8c18d52fb060bb70a76a516c8a7c56a3fa.tar.gz |
Stricter Thread<> interface
Implements a Thread<>::invoke interface that only permits calling
member functions with bound arguments.
Also introduce invokeWithResult, which accepts a callback and invokes
it, passing the result, on the invoking thread's RunLoop. This closes
the loop of two evented threads doing message passing.
-rw-r--r-- | platform/default/sqlite_cache.cpp | 21 | ||||
-rw-r--r-- | platform/default/sqlite_cache_impl.hpp | 9 | ||||
-rw-r--r-- | src/mbgl/storage/default_file_source.cpp | 27 | ||||
-rw-r--r-- | src/mbgl/storage/default_file_source_impl.hpp | 12 | ||||
-rw-r--r-- | src/mbgl/util/run_loop.cpp | 58 | ||||
-rw-r--r-- | src/mbgl/util/run_loop.hpp | 106 | ||||
-rw-r--r-- | src/mbgl/util/thread.hpp | 88 | ||||
-rw-r--r-- | src/mbgl/util/uv.cpp | 22 | ||||
-rw-r--r-- | src/mbgl/util/uv_detail.hpp | 31 |
9 files changed, 237 insertions, 137 deletions
diff --git a/platform/default/sqlite_cache.cpp b/platform/default/sqlite_cache.cpp index 958cdc1ac1..e73618f2b4 100644 --- a/platform/default/sqlite_cache.cpp +++ b/platform/default/sqlite_cache.cpp @@ -5,7 +5,6 @@ #include <mbgl/util/compression.hpp> #include <mbgl/util/io.hpp> #include <mbgl/util/thread.hpp> -#include <mbgl/util/run_loop.hpp> #include <mbgl/platform/log.hpp> #include "sqlite3.hpp" @@ -62,16 +61,14 @@ std::string unifyMapboxURLs(const std::string &url) { using namespace mapbox::sqlite; -SQLiteCache::SQLiteCache(const std::string& path_) : thread(util::make_unique<util::Thread<Impl>>(path_)) { +SQLiteCache::SQLiteCache(const std::string& path_) + : thread(util::make_unique<util::Thread<Impl>>("SQLite Cache", path_)) { } SQLiteCache::~SQLiteCache() = default; SQLiteCache::Impl::Impl(const std::string& path_) : path(path_) { -#ifdef __APPLE__ - pthread_setname_np("SQLite Cache"); -#endif } SQLiteCache::Impl::~Impl() { @@ -134,10 +131,10 @@ void SQLiteCache::get(const Resource &resource, Callback callback) { // Will try to load the URL from the SQLite database and call the callback when done. // Note that the callback is probably going to invoked from another thread, so the caller // must make sure that it can run in that thread. - (*thread)->invoke([=] { (*thread)->processGet(resource, callback); }); + thread->invokeWithResult(&Impl::processGet, callback, resource); } -void SQLiteCache::Impl::processGet(const Resource &resource, Callback callback) { +std::unique_ptr<Response> SQLiteCache::Impl::processGet(const Resource &resource) { try { // This is called in the SQLite event loop. if (!db) { @@ -170,14 +167,14 @@ void SQLiteCache::Impl::processGet(const Resource &resource, Callback callback) if (getStmt->get<int>(5)) { // == compressed response->data = util::decompress(response->data); } - callback(std::move(response)); + return std::move(response); } else { // There is no data. - callback(nullptr); + return nullptr; } } catch (mapbox::sqlite::Exception& ex) { Log::Error(Event::Database, ex.code, ex.what()); - callback(nullptr); + return nullptr; } } @@ -186,9 +183,9 @@ void SQLiteCache::put(const Resource &resource, std::shared_ptr<const Response> // storing a new response or updating the currently stored response, potentially setting a new // expiry date. if (hint == Hint::Full) { - (*thread)->invoke([=] { (*thread)->processPut(resource, response); }); + thread->invoke(&Impl::processPut, resource, std::move(response)); } else if (hint == Hint::Refresh) { - (*thread)->invoke([=] { (*thread)->processRefresh(resource, response->expires); }); + thread->invoke(&Impl::processRefresh, resource, int64_t(response->expires)); } } diff --git a/platform/default/sqlite_cache_impl.hpp b/platform/default/sqlite_cache_impl.hpp index a194f9e782..b13c7c2cd3 100644 --- a/platform/default/sqlite_cache_impl.hpp +++ b/platform/default/sqlite_cache_impl.hpp @@ -2,7 +2,6 @@ #define MBGL_STORAGE_DEFAULT_SQLITE_CACHE_IMPL #include <mbgl/storage/sqlite_cache.hpp> -#include <mbgl/util/run_loop.hpp> namespace mapbox { namespace sqlite { @@ -13,15 +12,12 @@ class Statement; namespace mbgl { -class SQLiteCache::Impl : public util::RunLoop { - friend class util::Thread<SQLiteCache::Impl>; - +class SQLiteCache::Impl { public: Impl(const std::string &path = ":memory:"); ~Impl(); -public: - void processGet(const Resource& resource, Callback callback); + std::unique_ptr<Response> processGet(const Resource&); void processPut(const Resource& resource, std::shared_ptr<const Response> response); void processRefresh(const Resource& resource, int64_t expires); @@ -29,7 +25,6 @@ private: void createDatabase(); void createSchema(); -private: const std::string path; std::unique_ptr<::mapbox::sqlite::Database> db; std::unique_ptr<::mapbox::sqlite::Statement> getStmt; diff --git a/src/mbgl/storage/default_file_source.cpp b/src/mbgl/storage/default_file_source.cpp index b28e096414..d866e2353f 100644 --- a/src/mbgl/storage/default_file_source.cpp +++ b/src/mbgl/storage/default_file_source.cpp @@ -10,6 +10,7 @@ #include <mbgl/util/chrono.hpp> #include <mbgl/util/thread.hpp> #include <mbgl/platform/log.hpp> +#include <mbgl/map/environment.hpp> #pragma GCC diagnostic push #ifndef __clang__ @@ -29,13 +30,10 @@ namespace mbgl { DefaultFileSource::Impl::Impl(FileCache* cache_, const std::string& root) : assetRoot(root.empty() ? platform::assetRoot() : root), cache(cache_) { -#ifdef __APPLE__ - pthread_setname_np("FileSource"); -#endif } DefaultFileSource::DefaultFileSource(FileCache* cache, const std::string& root) - : thread(util::make_unique<util::Thread<Impl>>(cache, root)) { + : thread(util::make_unique<util::Thread<Impl>>("FileSource", cache, root)) { } DefaultFileSource::~DefaultFileSource() { @@ -62,7 +60,7 @@ Request* DefaultFileSource::request(const Resource& resource, // This function can be called from any thread. Make sure we're executing the actual call in the // file source loop by sending it over the queue. - (*thread)->invoke([=] { (*thread)->processAdd(req); }); + thread->invoke(&Impl::processAdd, std::move(req), thread->get()); return req; } @@ -76,14 +74,14 @@ void DefaultFileSource::cancel(Request *req) { // This function can be called from any thread. Make sure we're executing the actual call in the // file source loop by sending it over the queue. - (*thread)->invoke([=] { (*thread)->processCancel(req); }); + thread->invoke(&Impl::processCancel, std::move(req)); } void DefaultFileSource::abort(const Environment &env) { - (*thread)->invoke([=, &env] { (*thread)->processAbort(env); }); + thread->invoke(&Impl::processAbort, std::ref(env)); } -void DefaultFileSource::Impl::processAdd(Request* req) { +void DefaultFileSource::Impl::processAdd(Request* req, uv_loop_t* loop) { const Resource &resource = req->resource; // We're adding a new Request. @@ -102,13 +100,12 @@ void DefaultFileSource::Impl::processAdd(Request* req) { // But first, we're going to start querying the database if it exists. if (!cache) { - sharedRequest->start(loop().get()); + sharedRequest->start(loop); } else { // Otherwise, first check the cache for existing data so that we can potentially // revalidate the information without having to redownload everything. - cache->get(resource, [this, resource](std::unique_ptr<Response> response) { - std::shared_ptr<const Response> sharedResponse = std::move(response); - invoke([this, resource, sharedResponse] { processResult(resource, sharedResponse); }); + cache->get(resource, [this, resource, loop](std::unique_ptr<Response> response) { + processResult(resource, std::move(response), loop); }); } } @@ -132,7 +129,7 @@ void DefaultFileSource::Impl::processCancel(Request* req) { req->destruct(); } -void DefaultFileSource::Impl::processResult(const Resource& resource, std::shared_ptr<const Response> response) { +void DefaultFileSource::Impl::processResult(const Resource& resource, std::shared_ptr<const Response> response, uv_loop_t* loop) { SharedRequestBase *sharedRequest = find(resource); if (sharedRequest) { if (response) { @@ -146,11 +143,11 @@ void DefaultFileSource::Impl::processResult(const Resource& resource, std::share return; } else { // The cached response is stale. Now run the real request. - sharedRequest->start(loop().get(), response); + sharedRequest->start(loop, response); } } else { // There is no response. Now run the real request. - sharedRequest->start(loop().get()); + sharedRequest->start(loop); } } else { // There is no request for this URL anymore. Likely, the request was canceled diff --git a/src/mbgl/storage/default_file_source_impl.hpp b/src/mbgl/storage/default_file_source_impl.hpp index 82a3b9a1a0..6383d13dce 100644 --- a/src/mbgl/storage/default_file_source_impl.hpp +++ b/src/mbgl/storage/default_file_source_impl.hpp @@ -2,7 +2,6 @@ #define MBGL_STORAGE_DEFAULT_DEFAULT_FILE_SOURCE_IMPL #include <mbgl/storage/default_file_source.hpp> -#include <mbgl/util/run_loop.hpp> #include <set> #include <unordered_map> @@ -11,26 +10,23 @@ namespace mbgl { class SharedRequestBase; -class DefaultFileSource::Impl : public util::RunLoop { - friend class util::Thread<DefaultFileSource::Impl>; - +class DefaultFileSource::Impl { public: Impl(FileCache *cache, const std::string &root = ""); -public: void notify(SharedRequestBase *sharedRequest, const std::set<Request *> &observers, std::shared_ptr<const Response> response, FileCache::Hint hint); SharedRequestBase *find(const Resource &resource); - void processAdd(Request* request); + void processAdd(Request* request, uv_loop_t* loop); void processCancel(Request* request); - void processResult(const Resource& resource, std::shared_ptr<const Response> response); void processAbort(const Environment& env); -public: const std::string assetRoot; private: + void processResult(const Resource& resource, std::shared_ptr<const Response> response, uv_loop_t* loop); + std::unordered_map<Resource, SharedRequestBase *, Resource::Hash> pending; FileCache *cache = nullptr; }; diff --git a/src/mbgl/util/run_loop.cpp b/src/mbgl/util/run_loop.cpp index dba75fcfcf..e945a02326 100644 --- a/src/mbgl/util/run_loop.cpp +++ b/src/mbgl/util/run_loop.cpp @@ -1,54 +1,38 @@ #include <mbgl/util/run_loop.hpp> -#include <mbgl/util/uv_detail.hpp> -#include <mbgl/util/std.hpp> - -#include <uv.h> - -namespace /* anonymous */ { -inline void critical_section(std::mutex& mutex, std::function<void()> fn) { - std::lock_guard<std::mutex> lock(mutex); - fn(); -} -} namespace mbgl { namespace util { -RunLoop::RunLoop() - : runloop(util::make_unique<uv::loop>()), - runloopAsync(util::make_unique<uv::async>(runloop->get(), std::bind(&RunLoop::process, this))) { -} - -// Define here since we can't destroy the uv::* objects from just the header file. -RunLoop::~RunLoop() = default; +uv::tls<RunLoop> RunLoop::current; -void RunLoop::start() { - runloop->run(); +RunLoop::RunLoop() + : async(*loop, std::bind(&RunLoop::process, this)) { } -void RunLoop::stop() { - critical_section(runloopMutex, [this] { runloopQueue.push(nullptr); }); - runloopAsync->send(); +void RunLoop::withMutex(std::function<void()>&& fn) { + std::lock_guard<std::mutex> lock(mutex); + fn(); } void RunLoop::process() { - std::queue<std::function<void()>> queue; - critical_section(runloopMutex, [this, &queue] { queue.swap(runloopQueue); }); - while (!queue.empty()) { - if (queue.front()) { - queue.front()(); - } else { - runloopAsync->unref(); - } - queue.pop(); + Queue queue_; + withMutex([&] { queue_.swap(queue); }); + + while (!queue_.empty()) { + (queue_.front())(); + queue_.pop(); } } -void RunLoop::invoke(std::function<void()>&& fn) { - if (fn) { - critical_section(runloopMutex, [this, &fn] { runloopQueue.emplace(std::move(fn)); }); - runloopAsync->send(); - } +void RunLoop::run() { + assert(!current.get()); + current.set(this); + loop.run(); + current.set(nullptr); +} + +void RunLoop::stop() { + invoke([&] { async.unref(); }); } } diff --git a/src/mbgl/util/run_loop.hpp b/src/mbgl/util/run_loop.hpp index 9bed417c38..c39fb60186 100644 --- a/src/mbgl/util/run_loop.hpp +++ b/src/mbgl/util/run_loop.hpp @@ -1,53 +1,97 @@ #ifndef MBGL_UTIL_RUN_LOOP #define MBGL_UTIL_RUN_LOOP -#include <memory> -#include <mutex> +#include <mbgl/util/uv_detail.hpp> + #include <functional> #include <queue> - -namespace uv { -class async; -class loop; -} +#include <mutex> namespace mbgl { namespace util { -template <typename T> class Thread; - class RunLoop { - friend Thread<RunLoop>; - -protected: - // These are called by the Thread<> wrapper. +public: RunLoop(); - ~RunLoop(); - - // Called by the Thread<> wrapper to start the loop. When you implement this - // method in a child class, you *must* call this function as the last action. - void start(); -protected: - // Called by the Thread<> wrapper to terminate this loop. + void run(); void stop(); - // Obtain the underlying loop object in case you want to attach additional listeners. - uv::loop& loop() { return *runloop; }; + // Invoke fn() in the runloop thread. + template <class Fn> + void invoke(Fn&& fn) { + withMutex([&] { queue.push(Message(std::move(fn))); }); + async.send(); + } + + // Invoke fn() in the runloop thread, then invoke callback(result) in the current thread. + template <class Fn, class R> + void invokeWithResult(Fn&& fn, std::function<void (R)> callback) { + RunLoop* outer = current.get(); + assert(outer); + + invoke([fn, callback, outer] { + /* + With C++14, we could write: + + outer->invoke([callback, result = std::move(fn())] () mutable { + callback(std::move(result)); + }); + + Instead we're using a workaround with std::bind + to obtain move-capturing semantics with C++11: + http://stackoverflow.com/a/12744730/52207 + */ + outer->invoke(std::bind([callback] (R& result) { + callback(std::move(result)); + }, std::move(fn()))); + }); + } + + uv_loop_t* get() { return *loop; } private: - // Invokes function in the run loop. + struct Message { + struct Base { + virtual void operator()() = 0; + virtual ~Base() = default; + }; + + template <class F> + struct Invoker : Base { + Invoker(F&& f) : func(std::move(f)) {} + void operator()() override { func(); } + F func; + }; + + Message() = default; + Message(Message&&) = default; + ~Message() = default; + Message& operator=(Message&&) = default; + + // copy members implicitly deleted + + template <class Fn> + Message(Fn fn) + : p_fn(new Invoker<Fn>(std::move(fn))) { + } + + void operator()() const { (*p_fn)(); } + std::unique_ptr<Base> p_fn; + }; + + using Queue = std::queue<Message>; + + static uv::tls<RunLoop> current; + + void withMutex(std::function<void()>&&); void process(); -public: - // Schedules a function to be executed as part of this run loop. - void invoke(std::function<void()>&& fn); + Queue queue; + std::mutex mutex; -private: - const std::unique_ptr<uv::loop> runloop; - const std::unique_ptr<uv::async> runloopAsync; - std::mutex runloopMutex; - std::queue<std::function<void()>> runloopQueue; + uv::loop loop; + uv::async async; }; } diff --git a/src/mbgl/util/thread.hpp b/src/mbgl/util/thread.hpp index 7c9ba70e11..4831b9efc2 100644 --- a/src/mbgl/util/thread.hpp +++ b/src/mbgl/util/thread.hpp @@ -3,6 +3,9 @@ #include <future> #include <thread> +#include <functional> + +#include <mbgl/util/run_loop.hpp> namespace { @@ -34,49 +37,80 @@ template <class Object> class Thread { public: template <class... Args> - Thread(Args&&... args); + Thread(const std::string& name, Args&&... args); + ~Thread(); + + // Invoke object->fn(args...) in the runloop thread. + template <typename Fn, class... Args> + void invoke(Fn fn, Args&&... args) { + loop->invoke(std::bind(fn, object, args...)); + } + + // Invoke object->fn(args...) in the runloop thread, then invoke callback(result) in the current thread. + template <typename Fn, class R, class... Args> + void invokeWithResult(Fn fn, std::function<void (R)> callback, Args&&... args) { + loop->invokeWithResult(std::bind(fn, object, args...), callback); + } + + uv_loop_t* get() { return loop->get(); } + +private: Thread(const Thread&) = delete; Thread(Thread&&) = delete; Thread& operator=(const Thread&) = delete; Thread& operator=(Thread&&) = delete; - ~Thread(); - - inline Object* operator->() const { return &object; } - inline operator Object*() const { return &object; } -private: template <typename P, std::size_t... I> - void run(std::promise<Object&>& promise, P&& params, index_sequence<I...>) { - Object context(std::get<I>(std::forward<P>(params))...); - promise.set_value(context); - context.start(); - joinable.get_future().get(); - } + void run(P&& params, index_sequence<I...>); -private: - std::thread thread; + std::promise<void> running; std::promise<void> joinable; - Object& object; + + std::thread thread; + + Object* object; + RunLoop* loop; }; template <class Object> template <class... Args> -Thread<Object>::Thread(Args&&... args) - : object([&](std::tuple<Args...>&& params) -> Object& { - // Note: We're using std::tuple<> to store the arguments because GCC 4.9 has a bug - // when expanding parameters packs captured in lambdas. - std::promise<Object&> promise; - constexpr auto seq = typename integer_sequence<sizeof...(Args)>::type(); - thread = std::thread([&] { - run(promise, std::move(params), seq); - }); - return promise.get_future().get(); - }(std::move(std::forward_as_tuple(::std::forward<Args>(args)...)))) { +Thread<Object>::Thread(const std::string& name, Args&&... args) { + // Note: We're using std::tuple<> to store the arguments because GCC 4.9 has a bug + // when expanding parameters packs captured in lambdas. + std::tuple<Args...> params = std::forward_as_tuple(::std::forward<Args>(args)...); + + thread = std::thread([&] { + #ifdef __APPLE__ + pthread_setname_np(name.c_str()); + #else + (void(name)); + #endif + + constexpr auto seq = typename integer_sequence<sizeof...(Args)>::type(); + run(std::move(params), seq); + }); + + running.get_future().get(); +} + +template <class Object> +template <typename P, std::size_t... I> +void Thread<Object>::run(P&& params, index_sequence<I...>) { + Object object_(std::get<I>(std::forward<P>(params))...); + object = &object_; + + RunLoop loop_; + loop = &loop_; + + running.set_value(); + loop_.run(); + + joinable.get_future().get(); } template <class Object> Thread<Object>::~Thread() { - object.stop(); + loop->stop(); joinable.set_value(); thread.join(); } diff --git a/src/mbgl/util/uv.cpp b/src/mbgl/util/uv.cpp index d465dfd963..5dae34ebd0 100644 --- a/src/mbgl/util/uv.cpp +++ b/src/mbgl/util/uv.cpp @@ -3,6 +3,28 @@ #include <uv.h> +#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 + +int uv_key_create(uv_key_t* key) { + return -pthread_key_create(key, NULL); +} + +void uv_key_delete(uv_key_t* key) { + if (pthread_key_delete(*key)) + abort(); +} + +void* uv_key_get(uv_key_t* key) { + return pthread_getspecific(*key); +} + +void uv_key_set(uv_key_t* key, void* value) { + if (pthread_setspecific(*key, value)) + abort(); +} + +#endif + namespace uv { std::string cwd() { diff --git a/src/mbgl/util/uv_detail.hpp b/src/mbgl/util/uv_detail.hpp index 6acef1b386..96d5442462 100644 --- a/src/mbgl/util/uv_detail.hpp +++ b/src/mbgl/util/uv_detail.hpp @@ -11,6 +11,21 @@ #include <memory> #include <string> +#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 + +// Add thread local storage to libuv API: +// https://github.com/joyent/libuv/commit/5d2434bf71e47802841bad218d521fa254d1ca2d + +typedef pthread_key_t uv_key_t; + +UV_EXTERN int uv_key_create(uv_key_t* key); +UV_EXTERN void uv_key_delete(uv_key_t* key); +UV_EXTERN void* uv_key_get(uv_key_t* key); +UV_EXTERN void uv_key_set(uv_key_t* key, void* value); + +#endif + + namespace uv { template <class T> @@ -133,6 +148,22 @@ private: uv_rwlock_t mtx; }; +template <class T> +class tls : public mbgl::util::noncopyable { +public: + inline tls() { + if (uv_key_create(&key) != 0) { + throw std::runtime_error("failed to initialize thread local storage key"); + } + } + inline ~tls() { uv_key_delete(&key); } + inline T* get() { return reinterpret_cast<T*>(uv_key_get(&key)); } + inline void set(T* val) { uv_key_set(&key, val); } + +private: + uv_key_t key; +}; + } #endif |