summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Firebaugh <john.firebaugh@gmail.com>2015-04-13 16:41:37 -0700
committerJohn Firebaugh <john.firebaugh@gmail.com>2015-04-15 20:38:25 -0700
commit1eaf6b8c18d52fb060bb70a76a516c8a7c56a3fa (patch)
treec03669cce16494bf720be33fb26d68a6242e7c23
parent1066504008ab9d779fcc007d3956d5966b7892e8 (diff)
downloadqtlocation-mapboxgl-1eaf6b8c18d52fb060bb70a76a516c8a7c56a3fa.tar.gz
Stricter Thread<> interface
Implements a Thread<>::invoke interface that only permits calling member functions with bound arguments. Also introduce invokeWithResult, which accepts a callback and invokes it, passing the result, on the invoking thread's RunLoop. This closes the loop of two evented threads doing message passing.
-rw-r--r--platform/default/sqlite_cache.cpp21
-rw-r--r--platform/default/sqlite_cache_impl.hpp9
-rw-r--r--src/mbgl/storage/default_file_source.cpp27
-rw-r--r--src/mbgl/storage/default_file_source_impl.hpp12
-rw-r--r--src/mbgl/util/run_loop.cpp58
-rw-r--r--src/mbgl/util/run_loop.hpp106
-rw-r--r--src/mbgl/util/thread.hpp88
-rw-r--r--src/mbgl/util/uv.cpp22
-rw-r--r--src/mbgl/util/uv_detail.hpp31
9 files changed, 237 insertions, 137 deletions
diff --git a/platform/default/sqlite_cache.cpp b/platform/default/sqlite_cache.cpp
index 958cdc1ac1..e73618f2b4 100644
--- a/platform/default/sqlite_cache.cpp
+++ b/platform/default/sqlite_cache.cpp
@@ -5,7 +5,6 @@
#include <mbgl/util/compression.hpp>
#include <mbgl/util/io.hpp>
#include <mbgl/util/thread.hpp>
-#include <mbgl/util/run_loop.hpp>
#include <mbgl/platform/log.hpp>
#include "sqlite3.hpp"
@@ -62,16 +61,14 @@ std::string unifyMapboxURLs(const std::string &url) {
using namespace mapbox::sqlite;
-SQLiteCache::SQLiteCache(const std::string& path_) : thread(util::make_unique<util::Thread<Impl>>(path_)) {
+SQLiteCache::SQLiteCache(const std::string& path_)
+ : thread(util::make_unique<util::Thread<Impl>>("SQLite Cache", path_)) {
}
SQLiteCache::~SQLiteCache() = default;
SQLiteCache::Impl::Impl(const std::string& path_)
: path(path_) {
-#ifdef __APPLE__
- pthread_setname_np("SQLite Cache");
-#endif
}
SQLiteCache::Impl::~Impl() {
@@ -134,10 +131,10 @@ void SQLiteCache::get(const Resource &resource, Callback callback) {
// Will try to load the URL from the SQLite database and call the callback when done.
// Note that the callback is probably going to invoked from another thread, so the caller
// must make sure that it can run in that thread.
- (*thread)->invoke([=] { (*thread)->processGet(resource, callback); });
+ thread->invokeWithResult(&Impl::processGet, callback, resource);
}
-void SQLiteCache::Impl::processGet(const Resource &resource, Callback callback) {
+std::unique_ptr<Response> SQLiteCache::Impl::processGet(const Resource &resource) {
try {
// This is called in the SQLite event loop.
if (!db) {
@@ -170,14 +167,14 @@ void SQLiteCache::Impl::processGet(const Resource &resource, Callback callback)
if (getStmt->get<int>(5)) { // == compressed
response->data = util::decompress(response->data);
}
- callback(std::move(response));
+ return std::move(response);
} else {
// There is no data.
- callback(nullptr);
+ return nullptr;
}
} catch (mapbox::sqlite::Exception& ex) {
Log::Error(Event::Database, ex.code, ex.what());
- callback(nullptr);
+ return nullptr;
}
}
@@ -186,9 +183,9 @@ void SQLiteCache::put(const Resource &resource, std::shared_ptr<const Response>
// storing a new response or updating the currently stored response, potentially setting a new
// expiry date.
if (hint == Hint::Full) {
- (*thread)->invoke([=] { (*thread)->processPut(resource, response); });
+ thread->invoke(&Impl::processPut, resource, std::move(response));
} else if (hint == Hint::Refresh) {
- (*thread)->invoke([=] { (*thread)->processRefresh(resource, response->expires); });
+ thread->invoke(&Impl::processRefresh, resource, int64_t(response->expires));
}
}
diff --git a/platform/default/sqlite_cache_impl.hpp b/platform/default/sqlite_cache_impl.hpp
index a194f9e782..b13c7c2cd3 100644
--- a/platform/default/sqlite_cache_impl.hpp
+++ b/platform/default/sqlite_cache_impl.hpp
@@ -2,7 +2,6 @@
#define MBGL_STORAGE_DEFAULT_SQLITE_CACHE_IMPL
#include <mbgl/storage/sqlite_cache.hpp>
-#include <mbgl/util/run_loop.hpp>
namespace mapbox {
namespace sqlite {
@@ -13,15 +12,12 @@ class Statement;
namespace mbgl {
-class SQLiteCache::Impl : public util::RunLoop {
- friend class util::Thread<SQLiteCache::Impl>;
-
+class SQLiteCache::Impl {
public:
Impl(const std::string &path = ":memory:");
~Impl();
-public:
- void processGet(const Resource& resource, Callback callback);
+ std::unique_ptr<Response> processGet(const Resource&);
void processPut(const Resource& resource, std::shared_ptr<const Response> response);
void processRefresh(const Resource& resource, int64_t expires);
@@ -29,7 +25,6 @@ private:
void createDatabase();
void createSchema();
-private:
const std::string path;
std::unique_ptr<::mapbox::sqlite::Database> db;
std::unique_ptr<::mapbox::sqlite::Statement> getStmt;
diff --git a/src/mbgl/storage/default_file_source.cpp b/src/mbgl/storage/default_file_source.cpp
index b28e096414..d866e2353f 100644
--- a/src/mbgl/storage/default_file_source.cpp
+++ b/src/mbgl/storage/default_file_source.cpp
@@ -10,6 +10,7 @@
#include <mbgl/util/chrono.hpp>
#include <mbgl/util/thread.hpp>
#include <mbgl/platform/log.hpp>
+#include <mbgl/map/environment.hpp>
#pragma GCC diagnostic push
#ifndef __clang__
@@ -29,13 +30,10 @@ namespace mbgl {
DefaultFileSource::Impl::Impl(FileCache* cache_, const std::string& root)
: assetRoot(root.empty() ? platform::assetRoot() : root), cache(cache_) {
-#ifdef __APPLE__
- pthread_setname_np("FileSource");
-#endif
}
DefaultFileSource::DefaultFileSource(FileCache* cache, const std::string& root)
- : thread(util::make_unique<util::Thread<Impl>>(cache, root)) {
+ : thread(util::make_unique<util::Thread<Impl>>("FileSource", cache, root)) {
}
DefaultFileSource::~DefaultFileSource() {
@@ -62,7 +60,7 @@ Request* DefaultFileSource::request(const Resource& resource,
// This function can be called from any thread. Make sure we're executing the actual call in the
// file source loop by sending it over the queue.
- (*thread)->invoke([=] { (*thread)->processAdd(req); });
+ thread->invoke(&Impl::processAdd, std::move(req), thread->get());
return req;
}
@@ -76,14 +74,14 @@ void DefaultFileSource::cancel(Request *req) {
// This function can be called from any thread. Make sure we're executing the actual call in the
// file source loop by sending it over the queue.
- (*thread)->invoke([=] { (*thread)->processCancel(req); });
+ thread->invoke(&Impl::processCancel, std::move(req));
}
void DefaultFileSource::abort(const Environment &env) {
- (*thread)->invoke([=, &env] { (*thread)->processAbort(env); });
+ thread->invoke(&Impl::processAbort, std::ref(env));
}
-void DefaultFileSource::Impl::processAdd(Request* req) {
+void DefaultFileSource::Impl::processAdd(Request* req, uv_loop_t* loop) {
const Resource &resource = req->resource;
// We're adding a new Request.
@@ -102,13 +100,12 @@ void DefaultFileSource::Impl::processAdd(Request* req) {
// But first, we're going to start querying the database if it exists.
if (!cache) {
- sharedRequest->start(loop().get());
+ sharedRequest->start(loop);
} else {
// Otherwise, first check the cache for existing data so that we can potentially
// revalidate the information without having to redownload everything.
- cache->get(resource, [this, resource](std::unique_ptr<Response> response) {
- std::shared_ptr<const Response> sharedResponse = std::move(response);
- invoke([this, resource, sharedResponse] { processResult(resource, sharedResponse); });
+ cache->get(resource, [this, resource, loop](std::unique_ptr<Response> response) {
+ processResult(resource, std::move(response), loop);
});
}
}
@@ -132,7 +129,7 @@ void DefaultFileSource::Impl::processCancel(Request* req) {
req->destruct();
}
-void DefaultFileSource::Impl::processResult(const Resource& resource, std::shared_ptr<const Response> response) {
+void DefaultFileSource::Impl::processResult(const Resource& resource, std::shared_ptr<const Response> response, uv_loop_t* loop) {
SharedRequestBase *sharedRequest = find(resource);
if (sharedRequest) {
if (response) {
@@ -146,11 +143,11 @@ void DefaultFileSource::Impl::processResult(const Resource& resource, std::share
return;
} else {
// The cached response is stale. Now run the real request.
- sharedRequest->start(loop().get(), response);
+ sharedRequest->start(loop, response);
}
} else {
// There is no response. Now run the real request.
- sharedRequest->start(loop().get());
+ sharedRequest->start(loop);
}
} else {
// There is no request for this URL anymore. Likely, the request was canceled
diff --git a/src/mbgl/storage/default_file_source_impl.hpp b/src/mbgl/storage/default_file_source_impl.hpp
index 82a3b9a1a0..6383d13dce 100644
--- a/src/mbgl/storage/default_file_source_impl.hpp
+++ b/src/mbgl/storage/default_file_source_impl.hpp
@@ -2,7 +2,6 @@
#define MBGL_STORAGE_DEFAULT_DEFAULT_FILE_SOURCE_IMPL
#include <mbgl/storage/default_file_source.hpp>
-#include <mbgl/util/run_loop.hpp>
#include <set>
#include <unordered_map>
@@ -11,26 +10,23 @@ namespace mbgl {
class SharedRequestBase;
-class DefaultFileSource::Impl : public util::RunLoop {
- friend class util::Thread<DefaultFileSource::Impl>;
-
+class DefaultFileSource::Impl {
public:
Impl(FileCache *cache, const std::string &root = "");
-public:
void notify(SharedRequestBase *sharedRequest, const std::set<Request *> &observers,
std::shared_ptr<const Response> response, FileCache::Hint hint);
SharedRequestBase *find(const Resource &resource);
- void processAdd(Request* request);
+ void processAdd(Request* request, uv_loop_t* loop);
void processCancel(Request* request);
- void processResult(const Resource& resource, std::shared_ptr<const Response> response);
void processAbort(const Environment& env);
-public:
const std::string assetRoot;
private:
+ void processResult(const Resource& resource, std::shared_ptr<const Response> response, uv_loop_t* loop);
+
std::unordered_map<Resource, SharedRequestBase *, Resource::Hash> pending;
FileCache *cache = nullptr;
};
diff --git a/src/mbgl/util/run_loop.cpp b/src/mbgl/util/run_loop.cpp
index dba75fcfcf..e945a02326 100644
--- a/src/mbgl/util/run_loop.cpp
+++ b/src/mbgl/util/run_loop.cpp
@@ -1,54 +1,38 @@
#include <mbgl/util/run_loop.hpp>
-#include <mbgl/util/uv_detail.hpp>
-#include <mbgl/util/std.hpp>
-
-#include <uv.h>
-
-namespace /* anonymous */ {
-inline void critical_section(std::mutex& mutex, std::function<void()> fn) {
- std::lock_guard<std::mutex> lock(mutex);
- fn();
-}
-}
namespace mbgl {
namespace util {
-RunLoop::RunLoop()
- : runloop(util::make_unique<uv::loop>()),
- runloopAsync(util::make_unique<uv::async>(runloop->get(), std::bind(&RunLoop::process, this))) {
-}
-
-// Define here since we can't destroy the uv::* objects from just the header file.
-RunLoop::~RunLoop() = default;
+uv::tls<RunLoop> RunLoop::current;
-void RunLoop::start() {
- runloop->run();
+RunLoop::RunLoop()
+ : async(*loop, std::bind(&RunLoop::process, this)) {
}
-void RunLoop::stop() {
- critical_section(runloopMutex, [this] { runloopQueue.push(nullptr); });
- runloopAsync->send();
+void RunLoop::withMutex(std::function<void()>&& fn) {
+ std::lock_guard<std::mutex> lock(mutex);
+ fn();
}
void RunLoop::process() {
- std::queue<std::function<void()>> queue;
- critical_section(runloopMutex, [this, &queue] { queue.swap(runloopQueue); });
- while (!queue.empty()) {
- if (queue.front()) {
- queue.front()();
- } else {
- runloopAsync->unref();
- }
- queue.pop();
+ Queue queue_;
+ withMutex([&] { queue_.swap(queue); });
+
+ while (!queue_.empty()) {
+ (queue_.front())();
+ queue_.pop();
}
}
-void RunLoop::invoke(std::function<void()>&& fn) {
- if (fn) {
- critical_section(runloopMutex, [this, &fn] { runloopQueue.emplace(std::move(fn)); });
- runloopAsync->send();
- }
+void RunLoop::run() {
+ assert(!current.get());
+ current.set(this);
+ loop.run();
+ current.set(nullptr);
+}
+
+void RunLoop::stop() {
+ invoke([&] { async.unref(); });
}
}
diff --git a/src/mbgl/util/run_loop.hpp b/src/mbgl/util/run_loop.hpp
index 9bed417c38..c39fb60186 100644
--- a/src/mbgl/util/run_loop.hpp
+++ b/src/mbgl/util/run_loop.hpp
@@ -1,53 +1,97 @@
#ifndef MBGL_UTIL_RUN_LOOP
#define MBGL_UTIL_RUN_LOOP
-#include <memory>
-#include <mutex>
+#include <mbgl/util/uv_detail.hpp>
+
#include <functional>
#include <queue>
-
-namespace uv {
-class async;
-class loop;
-}
+#include <mutex>
namespace mbgl {
namespace util {
-template <typename T> class Thread;
-
class RunLoop {
- friend Thread<RunLoop>;
-
-protected:
- // These are called by the Thread<> wrapper.
+public:
RunLoop();
- ~RunLoop();
-
- // Called by the Thread<> wrapper to start the loop. When you implement this
- // method in a child class, you *must* call this function as the last action.
- void start();
-protected:
- // Called by the Thread<> wrapper to terminate this loop.
+ void run();
void stop();
- // Obtain the underlying loop object in case you want to attach additional listeners.
- uv::loop& loop() { return *runloop; };
+ // Invoke fn() in the runloop thread.
+ template <class Fn>
+ void invoke(Fn&& fn) {
+ withMutex([&] { queue.push(Message(std::move(fn))); });
+ async.send();
+ }
+
+ // Invoke fn() in the runloop thread, then invoke callback(result) in the current thread.
+ template <class Fn, class R>
+ void invokeWithResult(Fn&& fn, std::function<void (R)> callback) {
+ RunLoop* outer = current.get();
+ assert(outer);
+
+ invoke([fn, callback, outer] {
+ /*
+ With C++14, we could write:
+
+ outer->invoke([callback, result = std::move(fn())] () mutable {
+ callback(std::move(result));
+ });
+
+ Instead we're using a workaround with std::bind
+ to obtain move-capturing semantics with C++11:
+ http://stackoverflow.com/a/12744730/52207
+ */
+ outer->invoke(std::bind([callback] (R& result) {
+ callback(std::move(result));
+ }, std::move(fn())));
+ });
+ }
+
+ uv_loop_t* get() { return *loop; }
private:
- // Invokes function in the run loop.
+ struct Message {
+ struct Base {
+ virtual void operator()() = 0;
+ virtual ~Base() = default;
+ };
+
+ template <class F>
+ struct Invoker : Base {
+ Invoker(F&& f) : func(std::move(f)) {}
+ void operator()() override { func(); }
+ F func;
+ };
+
+ Message() = default;
+ Message(Message&&) = default;
+ ~Message() = default;
+ Message& operator=(Message&&) = default;
+
+ // copy members implicitly deleted
+
+ template <class Fn>
+ Message(Fn fn)
+ : p_fn(new Invoker<Fn>(std::move(fn))) {
+ }
+
+ void operator()() const { (*p_fn)(); }
+ std::unique_ptr<Base> p_fn;
+ };
+
+ using Queue = std::queue<Message>;
+
+ static uv::tls<RunLoop> current;
+
+ void withMutex(std::function<void()>&&);
void process();
-public:
- // Schedules a function to be executed as part of this run loop.
- void invoke(std::function<void()>&& fn);
+ Queue queue;
+ std::mutex mutex;
-private:
- const std::unique_ptr<uv::loop> runloop;
- const std::unique_ptr<uv::async> runloopAsync;
- std::mutex runloopMutex;
- std::queue<std::function<void()>> runloopQueue;
+ uv::loop loop;
+ uv::async async;
};
}
diff --git a/src/mbgl/util/thread.hpp b/src/mbgl/util/thread.hpp
index 7c9ba70e11..4831b9efc2 100644
--- a/src/mbgl/util/thread.hpp
+++ b/src/mbgl/util/thread.hpp
@@ -3,6 +3,9 @@
#include <future>
#include <thread>
+#include <functional>
+
+#include <mbgl/util/run_loop.hpp>
namespace {
@@ -34,49 +37,80 @@ template <class Object>
class Thread {
public:
template <class... Args>
- Thread(Args&&... args);
+ Thread(const std::string& name, Args&&... args);
+ ~Thread();
+
+ // Invoke object->fn(args...) in the runloop thread.
+ template <typename Fn, class... Args>
+ void invoke(Fn fn, Args&&... args) {
+ loop->invoke(std::bind(fn, object, args...));
+ }
+
+ // Invoke object->fn(args...) in the runloop thread, then invoke callback(result) in the current thread.
+ template <typename Fn, class R, class... Args>
+ void invokeWithResult(Fn fn, std::function<void (R)> callback, Args&&... args) {
+ loop->invokeWithResult(std::bind(fn, object, args...), callback);
+ }
+
+ uv_loop_t* get() { return loop->get(); }
+
+private:
Thread(const Thread&) = delete;
Thread(Thread&&) = delete;
Thread& operator=(const Thread&) = delete;
Thread& operator=(Thread&&) = delete;
- ~Thread();
-
- inline Object* operator->() const { return &object; }
- inline operator Object*() const { return &object; }
-private:
template <typename P, std::size_t... I>
- void run(std::promise<Object&>& promise, P&& params, index_sequence<I...>) {
- Object context(std::get<I>(std::forward<P>(params))...);
- promise.set_value(context);
- context.start();
- joinable.get_future().get();
- }
+ void run(P&& params, index_sequence<I...>);
-private:
- std::thread thread;
+ std::promise<void> running;
std::promise<void> joinable;
- Object& object;
+
+ std::thread thread;
+
+ Object* object;
+ RunLoop* loop;
};
template <class Object>
template <class... Args>
-Thread<Object>::Thread(Args&&... args)
- : object([&](std::tuple<Args...>&& params) -> Object& {
- // Note: We're using std::tuple<> to store the arguments because GCC 4.9 has a bug
- // when expanding parameters packs captured in lambdas.
- std::promise<Object&> promise;
- constexpr auto seq = typename integer_sequence<sizeof...(Args)>::type();
- thread = std::thread([&] {
- run(promise, std::move(params), seq);
- });
- return promise.get_future().get();
- }(std::move(std::forward_as_tuple(::std::forward<Args>(args)...)))) {
+Thread<Object>::Thread(const std::string& name, Args&&... args) {
+ // Note: We're using std::tuple<> to store the arguments because GCC 4.9 has a bug
+ // when expanding parameters packs captured in lambdas.
+ std::tuple<Args...> params = std::forward_as_tuple(::std::forward<Args>(args)...);
+
+ thread = std::thread([&] {
+ #ifdef __APPLE__
+ pthread_setname_np(name.c_str());
+ #else
+ (void(name));
+ #endif
+
+ constexpr auto seq = typename integer_sequence<sizeof...(Args)>::type();
+ run(std::move(params), seq);
+ });
+
+ running.get_future().get();
+}
+
+template <class Object>
+template <typename P, std::size_t... I>
+void Thread<Object>::run(P&& params, index_sequence<I...>) {
+ Object object_(std::get<I>(std::forward<P>(params))...);
+ object = &object_;
+
+ RunLoop loop_;
+ loop = &loop_;
+
+ running.set_value();
+ loop_.run();
+
+ joinable.get_future().get();
}
template <class Object>
Thread<Object>::~Thread() {
- object.stop();
+ loop->stop();
joinable.set_value();
thread.join();
}
diff --git a/src/mbgl/util/uv.cpp b/src/mbgl/util/uv.cpp
index d465dfd963..5dae34ebd0 100644
--- a/src/mbgl/util/uv.cpp
+++ b/src/mbgl/util/uv.cpp
@@ -3,6 +3,28 @@
#include <uv.h>
+#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10
+
+int uv_key_create(uv_key_t* key) {
+ return -pthread_key_create(key, NULL);
+}
+
+void uv_key_delete(uv_key_t* key) {
+ if (pthread_key_delete(*key))
+ abort();
+}
+
+void* uv_key_get(uv_key_t* key) {
+ return pthread_getspecific(*key);
+}
+
+void uv_key_set(uv_key_t* key, void* value) {
+ if (pthread_setspecific(*key, value))
+ abort();
+}
+
+#endif
+
namespace uv {
std::string cwd() {
diff --git a/src/mbgl/util/uv_detail.hpp b/src/mbgl/util/uv_detail.hpp
index 6acef1b386..96d5442462 100644
--- a/src/mbgl/util/uv_detail.hpp
+++ b/src/mbgl/util/uv_detail.hpp
@@ -11,6 +11,21 @@
#include <memory>
#include <string>
+#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10
+
+// Add thread local storage to libuv API:
+// https://github.com/joyent/libuv/commit/5d2434bf71e47802841bad218d521fa254d1ca2d
+
+typedef pthread_key_t uv_key_t;
+
+UV_EXTERN int uv_key_create(uv_key_t* key);
+UV_EXTERN void uv_key_delete(uv_key_t* key);
+UV_EXTERN void* uv_key_get(uv_key_t* key);
+UV_EXTERN void uv_key_set(uv_key_t* key, void* value);
+
+#endif
+
+
namespace uv {
template <class T>
@@ -133,6 +148,22 @@ private:
uv_rwlock_t mtx;
};
+template <class T>
+class tls : public mbgl::util::noncopyable {
+public:
+ inline tls() {
+ if (uv_key_create(&key) != 0) {
+ throw std::runtime_error("failed to initialize thread local storage key");
+ }
+ }
+ inline ~tls() { uv_key_delete(&key); }
+ inline T* get() { return reinterpret_cast<T*>(uv_key_get(&key)); }
+ inline void set(T* val) { uv_key_set(&key, val); }
+
+private:
+ uv_key_t key;
+};
+
}
#endif