diff options
author | Konstantin Käfer <mail@kkaefer.com> | 2014-03-03 12:17:32 +0100 |
---|---|---|
committer | Konstantin Käfer <mail@kkaefer.com> | 2014-03-03 12:17:32 +0100 |
commit | b26f3d319bbb8400b5608ced69534260134915ea (patch) | |
tree | c6fe1e7c2f4063b31b69482ab8d43196c8098945 | |
parent | e134e6a8f3c4fdfef81d1a4f4e74b04853ffa4db (diff) | |
download | qtlocation-mapboxgl-b26f3d319bbb8400b5608ced69534260134915ea.tar.gz |
add threadpool implementation
-rw-r--r-- | include/llmr/util/threadpool.hpp | 43 | ||||
-rw-r--r-- | linux/llmr-app.gyp | 5 | ||||
-rw-r--r-- | linux/main.cpp | 7 | ||||
-rw-r--r-- | linux/request.cpp | 143 | ||||
-rw-r--r-- | linux/request.hpp | 24 | ||||
-rw-r--r-- | src/util/threadpool.cpp | 51 |
6 files changed, 160 insertions, 113 deletions
diff --git a/include/llmr/util/threadpool.hpp b/include/llmr/util/threadpool.hpp new file mode 100644 index 0000000000..736e65c527 --- /dev/null +++ b/include/llmr/util/threadpool.hpp @@ -0,0 +1,43 @@ +#ifndef LLMR_UTIL_THREADPOOL +#define LLMR_UTIL_THREADPOOL + +#include <pthread.h> +#include <forward_list> +#include <queue> + +namespace llmr { +namespace util { + +class Threadpool { + class Worker { + public: + Worker(Threadpool& pool); + ~Worker(); + static void *loop(void *ptr); + + private: + Threadpool& pool; + pthread_t thread; + }; + +public: + typedef void (*Callback)(void *); + Threadpool(Callback callback, int max_workers = 4); + + void add(void *data); + +private: + const int max_workers; + pthread_mutex_t mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER; + pthread_cond_t condition = PTHREAD_COND_INITIALIZER; + std::forward_list<Worker> workers; + int worker_count = 0; + Callback callback; + std::queue<void *> tasks; +}; + +} +} + +#endif + diff --git a/linux/llmr-app.gyp b/linux/llmr-app.gyp index 6c5002d0eb..0fa77085e9 100644 --- a/linux/llmr-app.gyp +++ b/linux/llmr-app.gyp @@ -18,8 +18,9 @@ 'conditions': [ ['OS == "mac"', { 'product_extension': 'app', + 'mac_bundle': 1, 'mac_bundle_resources': [ - 'Icon.icns', + '../macosx/Icon.icns', ], 'link_settings': { 'libraries': [ @@ -42,7 +43,7 @@ '-lcurl', ], 'SDKROOT': 'macosx', - 'INFOPLIST_FILE': 'Info.plist', + 'INFOPLIST_FILE': '../macosx/Info.plist', 'CLANG_CXX_LIBRARY': 'libc++', 'CLANG_CXX_LANGUAGE_STANDARD':'c++11', 'MACOSX_DEPLOYMENT_TARGET':'10.9', diff --git a/linux/main.cpp b/linux/main.cpp index f8da4fb955..e9b6e4c36f 100644 --- a/linux/main.cpp +++ b/linux/main.cpp @@ -258,12 +258,17 @@ double time() { int main() { curl_global_init(CURL_GLOBAL_ALL); + llmr::platform::Request::initialize(); + mapView = new MapView(); mapView->init(); int ret = mapView->run(); mapView->settings.sync(); delete mapView; - pthread_exit(NULL); + + llmr::platform::Request::finish(); + + curl_global_cleanup(); return ret; } diff --git a/linux/request.cpp b/linux/request.cpp index 5985e22b64..b2881f789b 100644 --- a/linux/request.cpp +++ b/linux/request.cpp @@ -1,151 +1,80 @@ #include "request.hpp" -#include <queue> -#include <forward_list> -#include <pthread.h> +#include <atomic> +#include <cassert> +#include <llmr/util/threadpool.hpp> #include <llmr/platform/platform.hpp> using namespace llmr::platform; -class Threadpool; -class Worker { -public: - Worker(Threadpool& pool); +llmr::util::Threadpool *Request::pool = nullptr; +CURLSH *Request::curl_share = nullptr; +pthread_mutex_t Request::curl_share_mutex = PTHREAD_MUTEX_INITIALIZER; - static void *loop(void *ptr) { - pthread_setname_np("worker"); - static_cast<Worker *>(ptr)->loop(); - pthread_exit(nullptr); - } - - void loop(); - -private: - Threadpool& pool; - pthread_t thread; -}; - - -class Threadpool { - friend class Worker; - -public: - typedef void (*Callback)(void *); - Threadpool(Callback callback); - - void add(void *data); - -private: - pthread_mutex_t mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER; - pthread_cond_t condition = PTHREAD_COND_INITIALIZER; - std::forward_list<Worker> workers; - Callback callback; - std::queue<void *> tasks; -}; - - -Threadpool::Threadpool(Callback callback) - : callback(callback) { - const int worker_count = 4; - fprintf(stderr, "workers: %d\n", worker_count); - for (int i = 0; i < worker_count; i++) { - workers.emplace_front(*this); - } +void Request::curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *) { + pthread_mutex_lock(&curl_share_mutex); } -void Threadpool::add(void *data) { - pthread_mutex_lock(&mutex); - tasks.push(data); - pthread_mutex_unlock(&mutex); - pthread_cond_signal(&condition); +void Request::curl_share_unlock(CURL *, curl_lock_data, void *) { + pthread_mutex_unlock(&curl_share_mutex); } -Worker::Worker(Threadpool& pool) - : pool(pool) { - pthread_create(&thread, nullptr, loop, (void *)this); +size_t Request::curl_write_callback(void *contents, size_t size, size_t nmemb, void *userp) { + ((std::string *)userp)->append((char *)contents, size * nmemb); + return size * nmemb; } -void Worker::loop() { - pthread_mutex_lock(&pool.mutex); - while (true) { - if (pool.tasks.size()) { - void *task = pool.tasks.front(); - pool.tasks.pop(); - pthread_mutex_unlock(&pool.mutex); - pool.callback(task); - pthread_mutex_lock(&pool.mutex); - } else { - pthread_cond_wait(&pool.condition, &pool.mutex); - } +int Request::curl_progress_callback(void *ptr, double dltotal, double dlnow, double ultotal, double ulnow) { + Request *req = static_cast<Request *>(ptr); + bool cancel = req->cancelled; + if (cancel) { + fprintf(stderr, "cancel download %s\n", req->url.c_str()); } + return cancel; } -Threadpool *request_pool = nullptr; -CURLSH *curl_share = nullptr; -pthread_mutex_t curl_share_mutex = PTHREAD_MUTEX_INITIALIZER; - +void Request::initialize() { + pool = new llmr::util::Threadpool(Request::request); -void curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *) { - pthread_mutex_lock(&curl_share_mutex); + // curl init + curl_global_init(CURL_GLOBAL_ALL); + curl_share = curl_share_init(); + curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, curl_share_lock); + curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, curl_share_unlock); } -void curl_share_unlock(CURL *, curl_lock_data, void *) { - pthread_mutex_unlock(&curl_share_mutex); +void Request::finish() { + delete pool; + curl_share_cleanup(curl_share); } - Request::Request(std::string url, std::function<void(platform::Response&)> bg, std::function<void()> fg) : done(false), cancelled(false), url(url), background_function(bg), foreground_callback(fg) { - if (!request_pool) { - request_pool = new Threadpool(request); - } - if (!curl_share) { - // curl init - curl_global_init(CURL_GLOBAL_ALL); - curl_share = curl_share_init(); - curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, curl_share_lock); - curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, curl_share_unlock); - } - request_pool->add(this); -} - - -size_t Request::write_callback(void *contents, size_t size, size_t nmemb, void *userp) { - ((std::string *)userp)->append((char *)contents, size * nmemb); - return size * nmemb; -} - - -int Request::progress_callback(void *ptr, double dltotal, double dlnow, double ultotal, double ulnow) { - Request *req = static_cast<Request *>(ptr); - bool cancel = req->cancelled; - if (cancel) { - fprintf(stderr, "cancel download %s\n", req->url.c_str()); - } - return cancel; + assert(pool); + pool->add(this); } void Request::request(void *ptr) { + assert(curl_share); + Request *req = static_cast<Request *>(ptr); Response res; - fprintf(stderr, "[%p] request %s\n", pthread_self(), req->url.c_str()); - - // TODO: use curl multi to be able to cancel, or to + // TODO: use curl multi to be able to cancel, or to CURL *curl = curl_easy_init(); CURLcode code; curl_easy_setopt(curl, CURLOPT_URL, req->url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_callback); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res.body); - curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progress_callback); + curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, curl_progress_callback); curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, req); curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "deflate"); diff --git a/linux/request.hpp b/linux/request.hpp index 337ab75224..ff72afc966 100644 --- a/linux/request.hpp +++ b/linux/request.hpp @@ -5,7 +5,13 @@ #include <functional> #include <curl/curl.h> + namespace llmr { + +namespace util { +class Threadpool; +} + namespace platform { struct Response; @@ -14,17 +20,29 @@ class Request { public: Request(std::string url, std::function<void(platform::Response&)> bg, std::function<void()> fg); - static void request(void *); - static size_t write_callback(void *, size_t, size_t, void *); - static int progress_callback(void *, double, double, double, double); + static void initialize(); + static void finish(); + void cancel(); +private: + static void request(void *); + static size_t curl_write_callback(void *, size_t, size_t, void *); + static int curl_progress_callback(void *, double, double, double, double); + static void curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *); + static void curl_share_unlock(CURL *, curl_lock_data, void *); + public: std::atomic<bool> done; std::atomic<bool> cancelled; const std::string url; const std::function<void(platform::Response&)> background_function; const std::function<void()> foreground_callback; + +private: + static util::Threadpool* pool; + static CURLSH *curl_share; + static pthread_mutex_t curl_share_mutex; }; } diff --git a/src/util/threadpool.cpp b/src/util/threadpool.cpp new file mode 100644 index 0000000000..4f7a84cf88 --- /dev/null +++ b/src/util/threadpool.cpp @@ -0,0 +1,51 @@ +#include <llmr/util/threadpool.hpp> + +using namespace llmr::util; + +Threadpool::Threadpool(Callback callback, int max_workers) + : max_workers(max_workers), + callback(callback) { +} + +void Threadpool::add(void *data) { + if (worker_count < max_workers) { + workers.emplace_front(*this); + worker_count++; + } + + pthread_mutex_lock(&mutex); + tasks.push(data); + pthread_mutex_unlock(&mutex); + pthread_cond_signal(&condition); +} + +Threadpool::Worker::Worker(Threadpool& pool) + : pool(pool) { + pthread_create(&thread, nullptr, loop, (void *)this); +} + +Threadpool::Worker::~Worker() { + pthread_cancel(thread); +} + + +void *Threadpool::Worker::loop(void *ptr) { + Worker *worker = static_cast<Worker *>(ptr); + Threadpool& pool = worker->pool; + + pthread_setname_np("worker"); + pthread_mutex_lock(&pool.mutex); + while (true) { + if (pool.tasks.size()) { + void *task = pool.tasks.front(); + pool.tasks.pop(); + pthread_mutex_unlock(&pool.mutex); + pool.callback(task); + pthread_mutex_lock(&pool.mutex); + } else { + pthread_cond_wait(&pool.condition, &pool.mutex); + } + } + + return nullptr; +} |