summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Käfer <mail@kkaefer.com>2014-03-02 11:26:54 +0100
committerKonstantin Käfer <mail@kkaefer.com>2014-03-02 11:26:54 +0100
commite134e6a8f3c4fdfef81d1a4f4e74b04853ffa4db (patch)
tree06661ec39b06ca90835ac571669a91c8070aafa7
parent7fdff4176b842004872db6d94cde11562abff2e0 (diff)
downloadqtlocation-mapboxgl-e134e6a8f3c4fdfef81d1a4f4e74b04853ffa4db.tar.gz
add threadpooled requests
-rw-r--r--include/llmr/platform/platform.hpp2
-rw-r--r--linux/llmr-app.gyp4
-rw-r--r--linux/main.cpp102
-rw-r--r--linux/request.cpp166
-rw-r--r--linux/request.hpp33
5 files changed, 210 insertions, 97 deletions
diff --git a/include/llmr/platform/platform.hpp b/include/llmr/platform/platform.hpp
index fba922d1e8..d280ebb3a7 100644
--- a/include/llmr/platform/platform.hpp
+++ b/include/llmr/platform/platform.hpp
@@ -15,7 +15,7 @@ namespace platform {
// Restarts painting. This could for example trigger the event loop of the controlling application.
void restart();
-struct Request;
+class Request;
struct Response {
int16_t code = -1;
diff --git a/linux/llmr-app.gyp b/linux/llmr-app.gyp
index 79c1d4560d..6c5002d0eb 100644
--- a/linux/llmr-app.gyp
+++ b/linux/llmr-app.gyp
@@ -11,7 +11,9 @@
"sources": [
"./main.cpp",
"./settings.cpp",
- "./settings.hpp"
+ "./settings.hpp",
+ "./request.cpp",
+ "./request.hpp"
],
'conditions': [
['OS == "mac"', {
diff --git a/linux/main.cpp b/linux/main.cpp
index a7830c6f2d..f8da4fb955 100644
--- a/linux/main.cpp
+++ b/linux/main.cpp
@@ -1,44 +1,14 @@
#include <llmr/llmr.hpp>
#include <GLFW/glfw3.h>
-#include <curl/curl.h>
#include <llmr/platform/platform.hpp>
-#include "settings.hpp"
#include <future>
#include <list>
-#include <forward_list>
-#include <chrono>
-#include <atomic>
-
-#include <pthread.h>
-
-namespace llmr {
-namespace platform {
-
-struct Request {
- pthread_t thread;
- std::string url;
- std::atomic<bool> done;
- std::atomic<bool> cancel;
- std::function<void(Response&)> background_function;
- std::function<void()> foreground_callback;
-};
-}
-}
+#include "settings.hpp"
+#include "request.hpp"
-pthread_mutex_t curl_share_mutex = PTHREAD_MUTEX_INITIALIZER;
-CURLSH *curl_share;
std::forward_list<llmr::platform::Request *> requests;
-
-void curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *) {
- pthread_mutex_lock(&curl_share_mutex);
-}
-
-void curl_share_unlock(CURL *, curl_lock_data, void *) {
- pthread_mutex_unlock(&curl_share_mutex);
-}
-
class MapView {
public:
MapView() :
@@ -187,7 +157,6 @@ public:
requests.remove_if([&dirty](llmr::platform::Request * req) {
if (req->done) {
req->foreground_callback();
- pthread_join(req->thread, nullptr);
delete req;
dirty = true;
return true;
@@ -264,70 +233,17 @@ void restart() {
}
}
-static size_t curl_write_callback(void *contents, size_t size, size_t nmemb, void *userp) {
- ((std::string *)userp)->append((char *)contents, size * nmemb);
- return size * nmemb;
-}
-
-
-static int curl_progress_callback(void *ptr, double dltotal, double dlnow, double ultotal, double ulnow) {
- Request *req = static_cast<Request *>(ptr);
- return req->cancel;
-}
-
-void *request_http(void *ptr) {
- Request *req = static_cast<Request *>(ptr);
- Response res;
-
- // TODO: use curl multi to be able to cancel
-
- CURL *curl;
- CURLcode code;
- curl = curl_easy_init();
-
- if (curl) {
- curl_easy_setopt(curl, CURLOPT_URL, req->url.c_str());
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_callback);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res.body);
- curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, curl_progress_callback);
- curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, req);
- curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0);
- curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "deflate");
- curl_easy_setopt(curl, CURLOPT_SHARE, curl_share);
- code = curl_easy_perform(curl);
- curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &res.code);
- curl_easy_cleanup(curl);
-
- if (code != CURLE_ABORTED_BY_CALLBACK) {
- req->background_function(res);
- }
- }
-
- req->done = true;
-
- pthread_exit(nullptr);
-}
-
Request *request_http(std::string url, std::function<void(Response&)> background_function, std::function<void()> foreground_callback) {
- Request *req = new Request();
- req->url = url;
- req->background_function = background_function;
- req->foreground_callback = foreground_callback;
+ Request *req = new Request(url, background_function, foreground_callback);
requests.push_front(req);
- int rc = pthread_create(&req->thread, nullptr, request_http, (void *)req);
- if (rc) {
- fprintf(stderr, "http request failed\n");
- return nullptr;
- }
-
return req;
}
void cancel_request_http(Request *request) {
- auto it = std::find(requests.begin(), requests.end(), request);
- if (it != requests.end()) {
- Request *req = *it;
- req->cancel = true;
+ for (Request *req : requests) {
+ if (req == request) {
+ req->cancel();
+ }
}
}
@@ -341,9 +257,6 @@ double time() {
int main() {
curl_global_init(CURL_GLOBAL_ALL);
- curl_share = curl_share_init();
- curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, curl_share_lock);
- curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, curl_share_unlock);
mapView = new MapView();
mapView->init();
@@ -351,7 +264,6 @@ int main() {
mapView->settings.sync();
delete mapView;
- curl_share_cleanup(curl_share);
pthread_exit(NULL);
return ret;
}
diff --git a/linux/request.cpp b/linux/request.cpp
new file mode 100644
index 0000000000..5985e22b64
--- /dev/null
+++ b/linux/request.cpp
@@ -0,0 +1,166 @@
+#include "request.hpp"
+
+#include <queue>
+#include <forward_list>
+#include <pthread.h>
+
+#include <llmr/platform/platform.hpp>
+
+using namespace llmr::platform;
+
+class Threadpool;
+
+class Worker {
+public:
+ Worker(Threadpool& pool);
+
+ static void *loop(void *ptr) {
+ pthread_setname_np("worker");
+ static_cast<Worker *>(ptr)->loop();
+ pthread_exit(nullptr);
+ }
+
+ void loop();
+
+private:
+ Threadpool& pool;
+ pthread_t thread;
+};
+
+
+class Threadpool {
+ friend class Worker;
+
+public:
+ typedef void (*Callback)(void *);
+ Threadpool(Callback callback);
+
+ void add(void *data);
+
+private:
+ pthread_mutex_t mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER;
+ pthread_cond_t condition = PTHREAD_COND_INITIALIZER;
+ std::forward_list<Worker> workers;
+ Callback callback;
+ std::queue<void *> tasks;
+};
+
+
+Threadpool::Threadpool(Callback callback)
+ : callback(callback) {
+ const int worker_count = 4;
+ fprintf(stderr, "workers: %d\n", worker_count);
+ for (int i = 0; i < worker_count; i++) {
+ workers.emplace_front(*this);
+ }
+}
+
+void Threadpool::add(void *data) {
+ pthread_mutex_lock(&mutex);
+ tasks.push(data);
+ pthread_mutex_unlock(&mutex);
+ pthread_cond_signal(&condition);
+}
+
+Worker::Worker(Threadpool& pool)
+ : pool(pool) {
+ pthread_create(&thread, nullptr, loop, (void *)this);
+}
+
+void Worker::loop() {
+ pthread_mutex_lock(&pool.mutex);
+ while (true) {
+ if (pool.tasks.size()) {
+ void *task = pool.tasks.front();
+ pool.tasks.pop();
+ pthread_mutex_unlock(&pool.mutex);
+ pool.callback(task);
+ pthread_mutex_lock(&pool.mutex);
+ } else {
+ pthread_cond_wait(&pool.condition, &pool.mutex);
+ }
+ }
+}
+
+Threadpool *request_pool = nullptr;
+CURLSH *curl_share = nullptr;
+pthread_mutex_t curl_share_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+void curl_share_lock(CURL *, curl_lock_data, curl_lock_access, void *) {
+ pthread_mutex_lock(&curl_share_mutex);
+}
+
+void curl_share_unlock(CURL *, curl_lock_data, void *) {
+ pthread_mutex_unlock(&curl_share_mutex);
+}
+
+
+Request::Request(std::string url, std::function<void(platform::Response&)> bg, std::function<void()> fg)
+ : done(false),
+ cancelled(false),
+ url(url),
+ background_function(bg),
+ foreground_callback(fg) {
+ if (!request_pool) {
+ request_pool = new Threadpool(request);
+ }
+ if (!curl_share) {
+ // curl init
+ curl_global_init(CURL_GLOBAL_ALL);
+ curl_share = curl_share_init();
+ curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, curl_share_lock);
+ curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, curl_share_unlock);
+ }
+ request_pool->add(this);
+}
+
+
+size_t Request::write_callback(void *contents, size_t size, size_t nmemb, void *userp) {
+ ((std::string *)userp)->append((char *)contents, size * nmemb);
+ return size * nmemb;
+}
+
+
+int Request::progress_callback(void *ptr, double dltotal, double dlnow, double ultotal, double ulnow) {
+ Request *req = static_cast<Request *>(ptr);
+ bool cancel = req->cancelled;
+ if (cancel) {
+ fprintf(stderr, "cancel download %s\n", req->url.c_str());
+ }
+ return cancel;
+}
+
+void Request::request(void *ptr) {
+ Request *req = static_cast<Request *>(ptr);
+ Response res;
+
+ fprintf(stderr, "[%p] request %s\n", pthread_self(), req->url.c_str());
+
+ // TODO: use curl multi to be able to cancel, or to
+
+ CURL *curl = curl_easy_init();
+ CURLcode code;
+
+ curl_easy_setopt(curl, CURLOPT_URL, req->url.c_str());
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res.body);
+ curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progress_callback);
+ curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, req);
+ curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0);
+ curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "deflate");
+ curl_easy_setopt(curl, CURLOPT_SHARE, curl_share);
+ code = curl_easy_perform(curl);
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &res.code);
+ curl_easy_cleanup(curl);
+
+ if (code != CURLE_ABORTED_BY_CALLBACK) {
+ req->background_function(res);
+ }
+
+ req->done = true;
+}
+
+void Request::cancel() {
+ cancelled = true;
+}
diff --git a/linux/request.hpp b/linux/request.hpp
new file mode 100644
index 0000000000..337ab75224
--- /dev/null
+++ b/linux/request.hpp
@@ -0,0 +1,33 @@
+#ifndef LLMR_LINUX_REQUEST
+#define LLMR_LINUX_REQUEST
+
+#include <string>
+#include <functional>
+#include <curl/curl.h>
+
+namespace llmr {
+namespace platform {
+
+struct Response;
+
+class Request {
+public:
+ Request(std::string url, std::function<void(platform::Response&)> bg, std::function<void()> fg);
+
+ static void request(void *);
+ static size_t write_callback(void *, size_t, size_t, void *);
+ static int progress_callback(void *, double, double, double, double);
+ void cancel();
+
+public:
+ std::atomic<bool> done;
+ std::atomic<bool> cancelled;
+ const std::string url;
+ const std::function<void(platform::Response&)> background_function;
+ const std::function<void()> foreground_callback;
+};
+
+}
+}
+
+#endif