diff options
-rw-r--r-- | include/mbgl/util/run_loop.hpp | 14 | ||||
-rw-r--r-- | platform/default/http_request_curl.cpp | 87 | ||||
-rw-r--r-- | platform/default/run_loop.cpp | 106 |
3 files changed, 139 insertions, 68 deletions
diff --git a/include/mbgl/util/run_loop.hpp b/include/mbgl/util/run_loop.hpp index 122f90c3d7..85b8b8d665 100644 --- a/include/mbgl/util/run_loop.hpp +++ b/include/mbgl/util/run_loop.hpp @@ -2,6 +2,7 @@ #define MBGL_UTIL_RUN_LOOP #include <mbgl/util/noncopyable.hpp> +#include <mbgl/util/util.hpp> #include <mbgl/util/work_task.hpp> #include <mbgl/util/work_request.hpp> @@ -23,6 +24,13 @@ public: New, }; + enum class Event : uint8_t { + None = 0, + Read = 1, + Write = 2, + ReadWrite = Read | Write, + }; + RunLoop(Type type = Type::Default); ~RunLoop(); @@ -33,6 +41,10 @@ public: void runOnce(); void stop(); + // So far only needed by the libcurl backend. + void addWatch(int fd, Event, std::function<void(int, Event)>&& callback); + void removeWatch(int fd); + // Invoke fn(args...) on this RunLoop. template <class Fn, class... Args> void invoke(Fn&& fn, Args&&... args) { @@ -96,6 +108,8 @@ public: } private: + MBGL_STORE_THREAD(tid) + template <class F, class P> class Invoker : public WorkTask { public: diff --git a/platform/default/http_request_curl.cpp b/platform/default/http_request_curl.cpp index 18d6d9d62a..065b90501b 100644 --- a/platform/default/http_request_curl.cpp +++ b/platform/default/http_request_curl.cpp @@ -9,7 +9,6 @@ #include <mbgl/util/run_loop.hpp> #include <mbgl/util/string.hpp> #include <mbgl/util/timer.hpp> -#include <mbgl/util/uv.hpp> #include <curl/curl.h> @@ -25,12 +24,6 @@ #include <cstring> #include <cstdio> -#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 -#define UV_TIMER_PARAMS(timer) uv_timer_t *timer, int -#else -#define UV_TIMER_PARAMS(timer) uv_timer_t *timer -#endif - void handleError(CURLMcode code) { if (code != CURLM_OK) { throw std::runtime_error(std::string("CURL multi error: ") + curl_multi_strerror(code)); @@ -59,10 +52,10 @@ public: std::shared_ptr<const Response>) final; static int handleSocket(CURL *handle, curl_socket_t s, int action, void *userp, void *socketp); - static void perform(uv_poll_t *req, int status, int events); static int startTimeout(CURLM *multi, long timeout_ms, void *userp); static void onTimeout(HTTPCURLContext *context); + void perform(curl_socket_t s, util::RunLoop::Event event); CURL *getHandle(); void returnHandle(CURL *handle); void checkMultiInfo(); @@ -115,42 +108,6 @@ private: char error[CURL_ERROR_SIZE]; }; - - -struct Socket { -private: - uv_poll_t poll; - -public: - HTTPCURLContext *context = nullptr; - const curl_socket_t sockfd = 0; - -public: - Socket(HTTPCURLContext *context_, curl_socket_t sockfd_) : context(context_), sockfd(sockfd_) { - assert(context); - uv_poll_init_socket(reinterpret_cast<uv_loop_t*>(util::RunLoop::getLoopHandle()), &poll, sockfd); - poll.data = this; - } - - void start(int events, uv_poll_cb cb) { - uv_poll_start(&poll, events, cb); - } - - void stop() { - assert(poll.data); - uv_poll_stop(&poll); - uv_close((uv_handle_t *)&poll, [](uv_handle_t *handle) { - assert(handle->data); - delete reinterpret_cast<Socket *>(handle->data); - }); - } - -private: - // Make the destructor private to ensure that we can only close the Socket - // with stop(), and disallow manual deletion. - ~Socket() = default; -}; - // ------------------------------------------------------------------------------------------------- HTTPCURLContext::HTTPCURLContext() { @@ -224,51 +181,45 @@ void HTTPCURLContext::checkMultiInfo() { } } -void HTTPCURLContext::perform(uv_poll_t *req, int /* status */, int events) { - assert(req->data); - auto socket = reinterpret_cast<Socket *>(req->data); - auto context = socket->context; - MBGL_VERIFY_THREAD(context->tid); +void HTTPCURLContext::perform(curl_socket_t s, util::RunLoop::Event events) { + MBGL_VERIFY_THREAD(tid); int flags = 0; - if (events & UV_READABLE) { + if (events == util::RunLoop::Event::Read) { flags |= CURL_CSELECT_IN; } - if (events & UV_WRITABLE) { + if (events == util::RunLoop::Event::Write) { flags |= CURL_CSELECT_OUT; } int running_handles = 0; - curl_multi_socket_action(context->multi, socket->sockfd, flags, &running_handles); - context->checkMultiInfo(); + curl_multi_socket_action(multi, s, flags, &running_handles); + checkMultiInfo(); } int HTTPCURLContext::handleSocket(CURL * /* handle */, curl_socket_t s, int action, void *userp, - void *socketp) { - auto socket = reinterpret_cast<Socket *>(socketp); + void * /* socketp */) { assert(userp); auto context = reinterpret_cast<HTTPCURLContext *>(userp); MBGL_VERIFY_THREAD(context->tid); - if (!socket && action != CURL_POLL_REMOVE) { - socket = new Socket(context, s); - curl_multi_assign(context->multi, s, (void *)socket); - } - switch (action) { - case CURL_POLL_IN: - socket->start(UV_READABLE, perform); + case CURL_POLL_IN: { + using namespace std::placeholders; + util::RunLoop::Get()->addWatch(s, util::RunLoop::Event::Read, + std::bind(&HTTPCURLContext::perform, context, _1, _2)); break; - case CURL_POLL_OUT: - socket->start(UV_WRITABLE, perform); + } + case CURL_POLL_OUT: { + using namespace std::placeholders; + util::RunLoop::Get()->addWatch(s, util::RunLoop::Event::Write, + std::bind(&HTTPCURLContext::perform, context, _1, _2)); break; + } case CURL_POLL_REMOVE: - if (socket) { - socket->stop(); - curl_multi_assign(context->multi, s, nullptr); - } + util::RunLoop::Get()->removeWatch(s); break; default: throw std::runtime_error("Unhandled CURL socket action"); diff --git a/platform/default/run_loop.cpp b/platform/default/run_loop.cpp index 845ee1b2ab..9ca7cc9207 100644 --- a/platform/default/run_loop.cpp +++ b/platform/default/run_loop.cpp @@ -3,6 +3,9 @@ #include <mbgl/util/uv.hpp> #include <mbgl/util/thread_local.hpp> +#include <functional> +#include <unordered_map> + namespace { using namespace mbgl::util; @@ -13,6 +16,38 @@ static ThreadLocal<RunLoop>& current = *new ThreadLocal<RunLoop>; namespace mbgl { namespace util { +struct Watch { + static void onEvent(uv_poll_t* poll, int, int event) { + auto watch = reinterpret_cast<Watch*>(poll->data); + + RunLoop::Event watchEvent = RunLoop::Event::None; + switch (event) { + case UV_READABLE: + watchEvent = RunLoop::Event::Read; + break; + case UV_WRITABLE: + watchEvent = RunLoop::Event::Write; + break; + case UV_READABLE | UV_WRITABLE: + watchEvent = RunLoop::Event::ReadWrite; + break; + } + + watch->eventCallback(watch->fd, watchEvent); + }; + + static void onClose(uv_handle_t *poll) { + auto watch = reinterpret_cast<Watch*>(poll->data); + watch->closeCallback(); + }; + + uv_poll_t poll; + int fd; + + std::function<void(int, RunLoop::Event)> eventCallback; + std::function<void()> closeCallback; +}; + RunLoop* RunLoop::Get() { return current.get(); } @@ -24,6 +59,8 @@ public: uv_loop_t *loop; RunLoop::Type type; std::unique_ptr<AsyncTask> async; + + std::unordered_map<int, std::unique_ptr<Watch>> watchPoll; }; RunLoop::RunLoop(Type type) : impl(std::make_unique<Impl>()) { @@ -84,10 +121,14 @@ void RunLoop::push(std::shared_ptr<WorkTask> task) { } void RunLoop::run() { + MBGL_VERIFY_THREAD(tid); + uv_run(impl->loop, UV_RUN_DEFAULT); } void RunLoop::runOnce() { + MBGL_VERIFY_THREAD(tid); + uv_run(impl->loop, UV_RUN_ONCE); } @@ -95,5 +136,70 @@ void RunLoop::stop() { invoke([&] { impl->async->unref(); }); } +void RunLoop::addWatch(int fd, Event event, std::function<void(int, Event)>&& callback) { + MBGL_VERIFY_THREAD(tid); + + Watch *watch = nullptr; + auto watchPollIter = impl->watchPoll.find(fd); + + if (watchPollIter == impl->watchPoll.end()) { + std::unique_ptr<Watch> watchPtr = std::make_unique<Watch>(); + + watch = watchPtr.get(); + impl->watchPoll[fd] = std::move(watchPtr); + + if (uv_poll_init(impl->loop, &watch->poll, fd)) { + throw std::runtime_error("Failed to init poll on file descriptor."); + } + } else { + watch = watchPollIter->second.get(); + } + + watch->poll.data = watch; + watch->fd = fd; + watch->eventCallback = std::move(callback); + + int pollEvent = 0; + switch (event) { + case Event::Read: + pollEvent = UV_READABLE; + break; + case Event::Write: + pollEvent = UV_WRITABLE; + break; + case Event::ReadWrite: + pollEvent = UV_READABLE | UV_WRITABLE; + break; + default: + throw std::runtime_error("Unhandled event."); + } + + if (uv_poll_start(&watch->poll, pollEvent, &Watch::onEvent)) { + throw std::runtime_error("Failed to start poll on file descriptor."); + } +} + +void RunLoop::removeWatch(int fd) { + MBGL_VERIFY_THREAD(tid); + + auto watchPollIter = impl->watchPoll.find(fd); + if (watchPollIter == impl->watchPoll.end()) { + return; + } + + Watch* watch = watchPollIter->second.release(); + impl->watchPoll.erase(watchPollIter); + + watch->closeCallback = [watch] { + delete watch; + }; + + if (uv_poll_stop(&watch->poll)) { + throw std::runtime_error("Failed to stop poll on file descriptor."); + } + + uv_close(reinterpret_cast<uv_handle_t*>(&watch->poll), &Watch::onClose); +} + } } |