diff options
author | Konstantin Käfer <mail@kkaefer.com> | 2014-03-02 11:26:54 +0100 |
---|---|---|
committer | Konstantin Käfer <mail@kkaefer.com> | 2014-03-02 11:26:54 +0100 |
commit | e134e6a8f3c4fdfef81d1a4f4e74b04853ffa4db (patch) | |
tree | 06661ec39b06ca90835ac571669a91c8070aafa7 /linux | |
parent | 7fdff4176b842004872db6d94cde11562abff2e0 (diff) | |
download | qtlocation-mapboxgl-e134e6a8f3c4fdfef81d1a4f4e74b04853ffa4db.tar.gz |
add threadpooled requests
Diffstat (limited to 'linux')
-rw-r--r-- | linux/llmr-app.gyp | 4 | ||||
-rw-r--r-- | linux/main.cpp | 102 | ||||
-rw-r--r-- | linux/request.cpp | 166 | ||||
-rw-r--r-- | linux/request.hpp | 33 |
4 files changed, 209 insertions, 96 deletions
diff --git a/linux/llmr-app.gyp b/linux/llmr-app.gyp index 79c1d4560d..6c5002d0eb 100644 --- a/linux/llmr-app.gyp +++ b/linux/llmr-app.gyp @@ -11,7 +11,9 @@ "sources": [ "./main.cpp", "./settings.cpp", - "./settings.hpp" + "./settings.hpp", + "./request.cpp", + "./request.hpp" ], 'conditions': [ ['OS == "mac"', { diff --git a/linux/main.cpp b/linux/main.cpp index a7830c6f2d..f8da4fb955 100644 --- a/linux/main.cpp +++ b/linux/main.cpp @@ -1,44 +1,14 @@ #include <llmr/llmr.hpp> #include <GLFW/glfw3.h> -#include <curl/curl.h> #include <llmr/platform/platform.hpp> -#include "settings.hpp" #include <future> #include <list> -#include <forward_list> -#include <chrono> -#include <atomic> - -#include <pthread.h> - -namespace llmr { -namespace platform { - -struct Request { - pthread_t thread; - std::string url; - std::atomic<bool> done; - std::atomic<bool> cancel; - std::function<void(Response&)> background_function; - std::function<void()> foreground_callback; -}; -} -} +#include "settings.hpp" +#include "request.hpp" -pthread_mutex_t curl_share_mutex = PTHREAD_MUTEX_INITIALIZER; -CURLSH *curl_share; std::forward_list<llmr::platform::Request *> requests; - -void curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *) { - pthread_mutex_lock(&curl_share_mutex); -} - -void curl_share_unlock(CURL *, curl_lock_data, void *) { - pthread_mutex_unlock(&curl_share_mutex); -} - class MapView { public: MapView() : @@ -187,7 +157,6 @@ public: requests.remove_if([&dirty](llmr::platform::Request * req) { if (req->done) { req->foreground_callback(); - pthread_join(req->thread, nullptr); delete req; dirty = true; return true; @@ -264,70 +233,17 @@ void restart() { } } -static size_t curl_write_callback(void *contents, size_t size, size_t nmemb, void *userp) { - ((std::string *)userp)->append((char *)contents, size * nmemb); - return size * nmemb; -} - - -static int curl_progress_callback(void *ptr, double dltotal, double dlnow, double ultotal, double ulnow) { - Request *req = static_cast<Request *>(ptr); - return req->cancel; -} - -void *request_http(void *ptr) { - Request *req = static_cast<Request *>(ptr); - Response res; - - // TODO: use curl multi to be able to cancel - - CURL *curl; - CURLcode code; - curl = curl_easy_init(); - - if (curl) { - curl_easy_setopt(curl, CURLOPT_URL, req->url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_callback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res.body); - curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, curl_progress_callback); - curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, req); - curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); - curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "deflate"); - curl_easy_setopt(curl, CURLOPT_SHARE, curl_share); - code = curl_easy_perform(curl); - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &res.code); - curl_easy_cleanup(curl); - - if (code != CURLE_ABORTED_BY_CALLBACK) { - req->background_function(res); - } - } - - req->done = true; - - pthread_exit(nullptr); -} - Request *request_http(std::string url, std::function<void(Response&)> background_function, std::function<void()> foreground_callback) { - Request *req = new Request(); - req->url = url; - req->background_function = background_function; - req->foreground_callback = foreground_callback; + Request *req = new Request(url, background_function, foreground_callback); requests.push_front(req); - int rc = pthread_create(&req->thread, nullptr, request_http, (void *)req); - if (rc) { - fprintf(stderr, "http request failed\n"); - return nullptr; - } - return req; } void cancel_request_http(Request *request) { - auto it = std::find(requests.begin(), requests.end(), request); - if (it != requests.end()) { - Request *req = *it; - req->cancel = true; + for (Request *req : requests) { + if (req == request) { + req->cancel(); + } } } @@ -341,9 +257,6 @@ double time() { int main() { curl_global_init(CURL_GLOBAL_ALL); - curl_share = curl_share_init(); - curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, curl_share_lock); - curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, curl_share_unlock); mapView = new MapView(); mapView->init(); @@ -351,7 +264,6 @@ int main() { mapView->settings.sync(); delete mapView; - curl_share_cleanup(curl_share); pthread_exit(NULL); return ret; } diff --git a/linux/request.cpp b/linux/request.cpp new file mode 100644 index 0000000000..5985e22b64 --- /dev/null +++ b/linux/request.cpp @@ -0,0 +1,166 @@ +#include "request.hpp" + +#include <queue> +#include <forward_list> +#include <pthread.h> + +#include <llmr/platform/platform.hpp> + +using namespace llmr::platform; + +class Threadpool; + +class Worker { +public: + Worker(Threadpool& pool); + + static void *loop(void *ptr) { + pthread_setname_np("worker"); + static_cast<Worker *>(ptr)->loop(); + pthread_exit(nullptr); + } + + void loop(); + +private: + Threadpool& pool; + pthread_t thread; +}; + + +class Threadpool { + friend class Worker; + +public: + typedef void (*Callback)(void *); + Threadpool(Callback callback); + + void add(void *data); + +private: + pthread_mutex_t mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER; + pthread_cond_t condition = PTHREAD_COND_INITIALIZER; + std::forward_list<Worker> workers; + Callback callback; + std::queue<void *> tasks; +}; + + +Threadpool::Threadpool(Callback callback) + : callback(callback) { + const int worker_count = 4; + fprintf(stderr, "workers: %d\n", worker_count); + for (int i = 0; i < worker_count; i++) { + workers.emplace_front(*this); + } +} + +void Threadpool::add(void *data) { + pthread_mutex_lock(&mutex); + tasks.push(data); + pthread_mutex_unlock(&mutex); + pthread_cond_signal(&condition); +} + +Worker::Worker(Threadpool& pool) + : pool(pool) { + pthread_create(&thread, nullptr, loop, (void *)this); +} + +void Worker::loop() { + pthread_mutex_lock(&pool.mutex); + while (true) { + if (pool.tasks.size()) { + void *task = pool.tasks.front(); + pool.tasks.pop(); + pthread_mutex_unlock(&pool.mutex); + pool.callback(task); + pthread_mutex_lock(&pool.mutex); + } else { + pthread_cond_wait(&pool.condition, &pool.mutex); + } + } +} + +Threadpool *request_pool = nullptr; +CURLSH *curl_share = nullptr; +pthread_mutex_t curl_share_mutex = PTHREAD_MUTEX_INITIALIZER; + + +void curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *) { + pthread_mutex_lock(&curl_share_mutex); +} + +void curl_share_unlock(CURL *, curl_lock_data, void *) { + pthread_mutex_unlock(&curl_share_mutex); +} + + +Request::Request(std::string url, std::function<void(platform::Response&)> bg, std::function<void()> fg) + : done(false), + cancelled(false), + url(url), + background_function(bg), + foreground_callback(fg) { + if (!request_pool) { + request_pool = new Threadpool(request); + } + if (!curl_share) { + // curl init + curl_global_init(CURL_GLOBAL_ALL); + curl_share = curl_share_init(); + curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, curl_share_lock); + curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, curl_share_unlock); + } + request_pool->add(this); +} + + +size_t Request::write_callback(void *contents, size_t size, size_t nmemb, void *userp) { + ((std::string *)userp)->append((char *)contents, size * nmemb); + return size * nmemb; +} + + +int Request::progress_callback(void *ptr, double dltotal, double dlnow, double ultotal, double ulnow) { + Request *req = static_cast<Request *>(ptr); + bool cancel = req->cancelled; + if (cancel) { + fprintf(stderr, "cancel download %s\n", req->url.c_str()); + } + return cancel; +} + +void Request::request(void *ptr) { + Request *req = static_cast<Request *>(ptr); + Response res; + + fprintf(stderr, "[%p] request %s\n", pthread_self(), req->url.c_str()); + + // TODO: use curl multi to be able to cancel, or to + + CURL *curl = curl_easy_init(); + CURLcode code; + + curl_easy_setopt(curl, CURLOPT_URL, req->url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res.body); + curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progress_callback); + curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, req); + curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); + curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "deflate"); + curl_easy_setopt(curl, CURLOPT_SHARE, curl_share); + code = curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &res.code); + curl_easy_cleanup(curl); + + if (code != CURLE_ABORTED_BY_CALLBACK) { + req->background_function(res); + } + + req->done = true; +} + +void Request::cancel() { + cancelled = true; +} diff --git a/linux/request.hpp b/linux/request.hpp new file mode 100644 index 0000000000..337ab75224 --- /dev/null +++ b/linux/request.hpp @@ -0,0 +1,33 @@ +#ifndef LLMR_LINUX_REQUEST +#define LLMR_LINUX_REQUEST + +#include <string> +#include <functional> +#include <curl/curl.h> + +namespace llmr { +namespace platform { + +struct Response; + +class Request { +public: + Request(std::string url, std::function<void(platform::Response&)> bg, std::function<void()> fg); + + static void request(void *); + static size_t write_callback(void *, size_t, size_t, void *); + static int progress_callback(void *, double, double, double, double); + void cancel(); + +public: + std::atomic<bool> done; + std::atomic<bool> cancelled; + const std::string url; + const std::function<void(platform::Response&)> background_function; + const std::function<void()> foreground_callback; +}; + +} +} + +#endif |