From 36581f3d3015d525db92248004e9dc7477705694 Mon Sep 17 00:00:00 2001 From: "Thiago Marcos P. Santos" Date: Mon, 31 Aug 2015 14:40:09 +0300 Subject: [core] Do not pass uv_loop_t around This should be abstracted by util::RunLoop --- include/mbgl/util/run_loop.hpp | 159 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 include/mbgl/util/run_loop.hpp (limited to 'include/mbgl/util/run_loop.hpp') diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp new file mode 100644 index 0000000000..6113ac2215 --- /dev/null +++ b/include/mbgl/util/run_loop.hpp @@ -0,0 +1,159 @@ +#ifndef MBGL_UTIL_RUN_LOOP +#define MBGL_UTIL_RUN_LOOP + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mbgl { +namespace util { + +class RunLoop : private util::noncopyable { +public: + RunLoop(uv_loop_t*); + ~RunLoop(); + + static RunLoop* Get() { + return current.get(); + } + + static uv_loop_t* getLoop() { + return current.get()->get(); + } + + void stop(); + + // Invoke fn(args...) on this RunLoop. + template + void invoke(Fn&& fn, Args&&... args) { + auto tuple = std::make_tuple(std::move(args)...); + auto task = std::make_shared>( + std::move(fn), + std::move(tuple)); + + withMutex([&] { queue.push(task); }); + async.send(); + } + + // Post the cancellable work fn(args...) to this RunLoop. + template + std::unique_ptr + invokeCancellable(Fn&& fn, Args&&... args) { + auto flag = std::make_shared>(); + *flag = false; + + auto tuple = std::make_tuple(std::move(args)...); + auto task = std::make_shared>( + std::move(fn), + std::move(tuple), + flag); + + withMutex([&] { queue.push(task); }); + async.send(); + + return std::make_unique(task); + } + + // Invoke fn(args...) on this RunLoop, then invoke callback(results...) on the current RunLoop. + template + std::unique_ptr + invokeWithCallback(Fn&& fn, Cb&& callback, Args&&... args) { + auto flag = std::make_shared>(); + *flag = false; + + // Create a lambda L1 that invokes another lambda L2 on the current RunLoop R, that calls + // the callback C. Both lambdas check the flag before proceeding. L1 needs to check the flag + // because if the request was cancelled, then R might have been destroyed. L2 needs to check + // the flag because the request may have been cancelled after L2 was invoked but before it + // began executing. + auto after = [flag, current = RunLoop::current.get(), callback1 = std::move(callback)] (auto&&... results1) { + if (!*flag) { + current->invoke([flag, callback2 = std::move(callback1)] (auto&&... results2) { + if (!*flag) { + callback2(std::move(results2)...); + } + }, std::move(results1)...); + } + }; + + auto tuple = std::make_tuple(std::move(args)..., after); + auto task = std::make_shared>( + std::move(fn), + std::move(tuple), + flag); + + withMutex([&] { queue.push(task); }); + async.send(); + + return std::make_unique(task); + } + + uv_loop_t* get() { return async.get()->loop; } + +private: + template + class Invoker : public WorkTask { + public: + Invoker(F&& f, P&& p, std::shared_ptr> canceled_ = nullptr) + : canceled(canceled_), + func(std::move(f)), + params(std::move(p)) { + } + + void operator()() override { + // Lock the mutex while processing so that cancel() will block. + std::lock_guard lock(mutex); + if (!canceled || !*canceled) { + invoke(std::make_index_sequence::value>{}); + } + } + + // If the task has not yet begun, this will cancel it. + // If the task is in progress, this will block until it completed. (Currently + // necessary because of shared state, but should be removed.) It will also + // cancel the after callback. + // If the task has completed, but the after callback has not executed, this + // will cancel the after callback. + // If the task has completed and the after callback has executed, this will + // do nothing. + void cancel() override { + std::lock_guard lock(mutex); + *canceled = true; + } + + private: + template + void invoke(std::index_sequence) { + func(std::get(std::forward

(params))...); + } + + std::recursive_mutex mutex; + std::shared_ptr> canceled; + + F func; + P params; + }; + + using Queue = std::queue>; + + void withMutex(std::function&&); + void process(); + + Queue queue; + std::mutex mutex; + uv::async async; + + static uv::tls current; +}; + +} +} + +#endif -- cgit v1.2.1