diff options
Diffstat (limited to 'include/mbgl')
-rw-r--r-- | include/mbgl/storage/default_file_source.hpp | 2 | ||||
-rw-r--r-- | include/mbgl/storage/file_source.hpp | 4 | ||||
-rw-r--r-- | include/mbgl/util/run_loop.hpp | 159 | ||||
-rw-r--r-- | include/mbgl/util/uv_detail.hpp | 220 | ||||
-rw-r--r-- | include/mbgl/util/work_request.hpp | 24 | ||||
-rw-r--r-- | include/mbgl/util/work_task.hpp | 21 |
6 files changed, 426 insertions, 4 deletions
diff --git a/include/mbgl/storage/default_file_source.hpp b/include/mbgl/storage/default_file_source.hpp index 8cfae03a96..5d018b720e 100644 --- a/include/mbgl/storage/default_file_source.hpp +++ b/include/mbgl/storage/default_file_source.hpp @@ -19,7 +19,7 @@ public: std::string getAccessToken() const { return accessToken; } // FileSource API - Request* request(const Resource&, uv_loop_t*, Callback) override; + Request* request(const Resource&, Callback) override; void cancel(Request*) override; public: diff --git a/include/mbgl/storage/file_source.hpp b/include/mbgl/storage/file_source.hpp index 3b19e00788..a53bf31c2c 100644 --- a/include/mbgl/storage/file_source.hpp +++ b/include/mbgl/storage/file_source.hpp @@ -9,8 +9,6 @@ #include <functional> -typedef struct uv_loop_s uv_loop_t; - namespace mbgl { class Request; @@ -26,7 +24,7 @@ public: // These can be called from any thread. The callback will be invoked in the loop. // You can only cancel a request from the same thread it was created in. - virtual Request* request(const Resource&, uv_loop_t*, Callback) = 0; + virtual Request* request(const Resource&, Callback) = 0; virtual void cancel(Request*) = 0; }; diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp new file mode 100644 index 0000000000..6113ac2215 --- /dev/null +++ b/include/mbgl/util/run_loop.hpp @@ -0,0 +1,159 @@ +#ifndef MBGL_UTIL_RUN_LOOP +#define MBGL_UTIL_RUN_LOOP + +#include <mbgl/util/noncopyable.hpp> +#include <mbgl/util/work_task.hpp> +#include <mbgl/util/work_request.hpp> +#include <mbgl/util/uv_detail.hpp> + +#include <functional> +#include <utility> +#include <queue> +#include <mutex> +#include <atomic> + +namespace mbgl { +namespace util { + +class RunLoop : private util::noncopyable { +public: + RunLoop(uv_loop_t*); + ~RunLoop(); + + static RunLoop* Get() { + return current.get(); + } + + static uv_loop_t* getLoop() { + return current.get()->get(); + } + + void stop(); + + // Invoke fn(args...) on this RunLoop. + template <class Fn, class... Args> + void invoke(Fn&& fn, Args&&... args) { + auto tuple = std::make_tuple(std::move(args)...); + auto task = std::make_shared<Invoker<Fn, decltype(tuple)>>( + std::move(fn), + std::move(tuple)); + + withMutex([&] { queue.push(task); }); + async.send(); + } + + // Post the cancellable work fn(args...) to this RunLoop. + template <class Fn, class... Args> + std::unique_ptr<WorkRequest> + invokeCancellable(Fn&& fn, Args&&... args) { + auto flag = std::make_shared<std::atomic<bool>>(); + *flag = false; + + auto tuple = std::make_tuple(std::move(args)...); + auto task = std::make_shared<Invoker<Fn, decltype(tuple)>>( + std::move(fn), + std::move(tuple), + flag); + + withMutex([&] { queue.push(task); }); + async.send(); + + return std::make_unique<WorkRequest>(task); + } + + // Invoke fn(args...) on this RunLoop, then invoke callback(results...) on the current RunLoop. + template <class Fn, class Cb, class... Args> + std::unique_ptr<WorkRequest> + invokeWithCallback(Fn&& fn, Cb&& callback, Args&&... args) { + auto flag = std::make_shared<std::atomic<bool>>(); + *flag = false; + + // Create a lambda L1 that invokes another lambda L2 on the current RunLoop R, that calls + // the callback C. Both lambdas check the flag before proceeding. L1 needs to check the flag + // because if the request was cancelled, then R might have been destroyed. L2 needs to check + // the flag because the request may have been cancelled after L2 was invoked but before it + // began executing. + auto after = [flag, current = RunLoop::current.get(), callback1 = std::move(callback)] (auto&&... results1) { + if (!*flag) { + current->invoke([flag, callback2 = std::move(callback1)] (auto&&... results2) { + if (!*flag) { + callback2(std::move(results2)...); + } + }, std::move(results1)...); + } + }; + + auto tuple = std::make_tuple(std::move(args)..., after); + auto task = std::make_shared<Invoker<Fn, decltype(tuple)>>( + std::move(fn), + std::move(tuple), + flag); + + withMutex([&] { queue.push(task); }); + async.send(); + + return std::make_unique<WorkRequest>(task); + } + + uv_loop_t* get() { return async.get()->loop; } + +private: + template <class F, class P> + class Invoker : public WorkTask { + public: + Invoker(F&& f, P&& p, std::shared_ptr<std::atomic<bool>> canceled_ = nullptr) + : canceled(canceled_), + func(std::move(f)), + params(std::move(p)) { + } + + void operator()() override { + // Lock the mutex while processing so that cancel() will block. + std::lock_guard<std::recursive_mutex> lock(mutex); + if (!canceled || !*canceled) { + invoke(std::make_index_sequence<std::tuple_size<P>::value>{}); + } + } + + // If the task has not yet begun, this will cancel it. + // If the task is in progress, this will block until it completed. (Currently + // necessary because of shared state, but should be removed.) It will also + // cancel the after callback. + // If the task has completed, but the after callback has not executed, this + // will cancel the after callback. + // If the task has completed and the after callback has executed, this will + // do nothing. + void cancel() override { + std::lock_guard<std::recursive_mutex> lock(mutex); + *canceled = true; + } + + private: + template <std::size_t... I> + void invoke(std::index_sequence<I...>) { + func(std::get<I>(std::forward<P>(params))...); + } + + std::recursive_mutex mutex; + std::shared_ptr<std::atomic<bool>> canceled; + + F func; + P params; + }; + + using Queue = std::queue<std::shared_ptr<WorkTask>>; + + void withMutex(std::function<void()>&&); + void process(); + + Queue queue; + std::mutex mutex; + uv::async async; + + static uv::tls<RunLoop> current; +}; + +} +} + +#endif diff --git a/include/mbgl/util/uv_detail.hpp b/include/mbgl/util/uv_detail.hpp new file mode 100644 index 0000000000..86a64d33f2 --- /dev/null +++ b/include/mbgl/util/uv_detail.hpp @@ -0,0 +1,220 @@ +#ifndef MBGL_UTIL_UV_DETAIL +#define MBGL_UTIL_UV_DETAIL + +#include <mbgl/util/uv.hpp> +#include <mbgl/util/noncopyable.hpp> + +#include <uv.h> + +// XXX: uv.h will include <bits/termios.h> that will +// polute the namespace by defining "B0" which +// will conflict with boost macros. +#ifdef B0 +#undef B0 +#endif + +#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 +#define UV_ASYNC_PARAMS(handle) uv_async_t *handle, int +#define UV_TIMER_PARAMS(timer) uv_timer_t *timer, int +#else +#define UV_ASYNC_PARAMS(handle) uv_async_t *handle +#define UV_TIMER_PARAMS(timer) uv_timer_t *timer +#endif + +#include <functional> +#include <cassert> +#include <memory> +#include <string> + +#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 + +// Add thread local storage to libuv API: +// https://github.com/joyent/libuv/commit/5d2434bf71e47802841bad218d521fa254d1ca2d + +typedef pthread_key_t uv_key_t; + +UV_EXTERN int uv_key_create(uv_key_t* key); +UV_EXTERN void uv_key_delete(uv_key_t* key); +UV_EXTERN void* uv_key_get(uv_key_t* key); +UV_EXTERN void uv_key_set(uv_key_t* key, void* value); + +#endif + + +namespace uv { + +class loop : public mbgl::util::noncopyable { +public: + inline loop() { +#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 + l = uv_loop_new(); + if (l == nullptr) { +#else + l = new uv_loop_t; + if (uv_loop_init(l) != 0) { +#endif + throw std::runtime_error("failed to initialize loop"); + } + } + + inline ~loop() { +#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 + uv_loop_delete(l); +#else + uv_loop_close(l); + delete l; +#endif + } + + inline void run() { + uv_run(l, UV_RUN_DEFAULT); + } + + inline uv_loop_t* operator*() { + return l; + } + + inline uv_loop_t* get() { + return l; + } + +private: + uv_loop_t *l = nullptr; +}; + +template <class T> +class handle : public mbgl::util::noncopyable { +public: + inline handle() : t(reinterpret_cast<uv_handle_t*>(new T)) { + t->data = this; + } + + inline ~handle() { + uv_close(t.release(), [](uv_handle_t* h) { + delete reinterpret_cast<T*>(h); + }); + } + + inline void ref() { + uv_ref(t.get()); + } + + inline void unref() { + uv_unref(t.get()); + } + + inline T* get() { + return reinterpret_cast<T*>(t.get()); + } + +private: + std::unique_ptr<uv_handle_t> t; +}; + +class async : public handle<uv_async_t> { +public: + inline async(uv_loop_t* loop, std::function<void ()> fn_) + : fn(fn_) { + if (uv_async_init(loop, get(), async_cb) != 0) { + throw std::runtime_error("failed to initialize async"); + } + } + + inline void send() { + if (uv_async_send(get()) != 0) { + throw std::runtime_error("failed to async send"); + } + } + +private: + static void async_cb(UV_ASYNC_PARAMS(handle)) { + reinterpret_cast<async*>(handle->data)->fn(); + } + + std::function<void ()> fn; +}; + +class timer : public handle<uv_timer_t> { +public: + inline timer(uv_loop_t* loop) { + if (uv_timer_init(loop, get()) != 0) { + throw std::runtime_error("failed to initialize timer"); + } + } + + inline void start(uint64_t timeout, uint64_t repeat, std::function<void ()> fn_) { + fn = fn_; + if (uv_timer_start(get(), timer_cb, timeout, repeat) != 0) { + throw std::runtime_error("failed to start timer"); + } + } + + inline void stop() { + fn = nullptr; + if (uv_timer_stop(get()) != 0) { + throw std::runtime_error("failed to stop timer"); + } + } + +private: + static void timer_cb(UV_TIMER_PARAMS(t)) { + reinterpret_cast<timer*>(t->data)->fn(); + } + + std::function<void ()> fn; +}; + +class mutex : public mbgl::util::noncopyable { +public: + inline mutex() { + if (uv_mutex_init(&mtx) != 0) { + throw std::runtime_error("failed to initialize mutex lock"); + } + } + inline ~mutex() { uv_mutex_destroy(&mtx); } + inline void lock() { uv_mutex_lock(&mtx); } + inline void unlock() { uv_mutex_unlock(&mtx); } +private: + uv_mutex_t mtx; +}; + +class rwlock : public mbgl::util::noncopyable { +public: + inline rwlock() { + if (uv_rwlock_init(&mtx) != 0) { + throw std::runtime_error("failed to initialize read-write lock"); + } + } + inline ~rwlock() { uv_rwlock_destroy(&mtx); } + inline void rdlock() { uv_rwlock_rdlock(&mtx); } + inline void wrlock() { uv_rwlock_wrlock(&mtx); } + inline void rdunlock() { uv_rwlock_rdunlock(&mtx); } + inline void wrunlock() { uv_rwlock_wrunlock(&mtx); } + +private: + uv_rwlock_t mtx; +}; + +template <class T> +class tls : public mbgl::util::noncopyable { +public: + inline tls(T* val) { + tls(); + set(val); + } + inline tls() { + if (uv_key_create(&key) != 0) { + throw std::runtime_error("failed to initialize thread local storage key"); + } + } + inline ~tls() { uv_key_delete(&key); } + inline T* get() { return reinterpret_cast<T*>(uv_key_get(&key)); } + inline void set(T* val) { uv_key_set(&key, val); } + +private: + uv_key_t key; +}; + +} + +#endif diff --git a/include/mbgl/util/work_request.hpp b/include/mbgl/util/work_request.hpp new file mode 100644 index 0000000000..f2aa2bbacc --- /dev/null +++ b/include/mbgl/util/work_request.hpp @@ -0,0 +1,24 @@ +#ifndef MBGL_UTIL_WORK_REQUEST +#define MBGL_UTIL_WORK_REQUEST + +#include <mbgl/util/noncopyable.hpp> + +#include <memory> + +namespace mbgl { + +class WorkTask; + +class WorkRequest : public util::noncopyable { +public: + using Task = std::shared_ptr<WorkTask>; + WorkRequest(Task); + ~WorkRequest(); + +private: + std::shared_ptr<WorkTask> task; +}; + +} + +#endif diff --git a/include/mbgl/util/work_task.hpp b/include/mbgl/util/work_task.hpp new file mode 100644 index 0000000000..2224b211c4 --- /dev/null +++ b/include/mbgl/util/work_task.hpp @@ -0,0 +1,21 @@ +#ifndef MBGL_UTIL_WORK_TASK +#define MBGL_UTIL_WORK_TASK + +#include <mbgl/util/noncopyable.hpp> + +namespace mbgl { + +// A movable type-erasing function wrapper. This allows to store arbitrary invokable +// things (like std::function<>, or the result of a movable-only std::bind()) in the queue. +// Source: http://stackoverflow.com/a/29642072/331379 +class WorkTask : private util::noncopyable { +public: + virtual ~WorkTask() = default; + + virtual void operator()() = 0; + virtual void cancel() = 0; +}; + +} + +#endif |