summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvo van Dongen <info@ivovandongen.nl>2017-05-30 13:41:56 +0300
committerIvo van Dongen <info@ivovandongen.nl>2017-05-30 13:42:47 +0300
commit345d7c4d1f8442bd139ca71d4470a7466032e4e7 (patch)
treef9326ed589c78154f874a98611b85797232f9034
parent1a6c6ea19746245b9492efaaba464d8a653062e8 (diff)
downloadqtlocation-mapboxgl-upstream/default-filesource-actor.tar.gz
[core] WIP use threaded run loop in default file sourceupstream/default-filesource-actor
-rw-r--r--cmake/node.cmake2
-rw-r--r--include/mbgl/storage/default_file_source.hpp12
-rw-r--r--platform/default/default_file_source.cpp125
-rw-r--r--platform/macos/config.cmake2
4 files changed, 99 insertions, 42 deletions
diff --git a/cmake/node.cmake b/cmake/node.cmake
index 6833cb983f..0d40052b63 100644
--- a/cmake/node.cmake
+++ b/cmake/node.cmake
@@ -22,6 +22,8 @@ target_sources(mbgl-node
PRIVATE platform/node/src/node_thread_pool.hpp
PRIVATE platform/node/src/node_thread_pool.cpp
PRIVATE platform/node/src/util/async_queue.hpp
+ PRIVATE platform/default/mbgl/util/threaded_run_loop.hpp
+ PRIVATE platform/default/mbgl/util/threaded_run_loop.cpp
)
target_compile_options(mbgl-node
diff --git a/include/mbgl/storage/default_file_source.hpp b/include/mbgl/storage/default_file_source.hpp
index 01b8a3698d..8990c1c56f 100644
--- a/include/mbgl/storage/default_file_source.hpp
+++ b/include/mbgl/storage/default_file_source.hpp
@@ -9,12 +9,10 @@
namespace mbgl {
+template <class>
+class Actor;
class Scheduler;
-namespace util {
-template <typename T> class Thread;
-} // namespace util
-
class DefaultFileSource : public FileSource {
public:
/*
@@ -145,9 +143,9 @@ public:
class Impl;
private:
- // Shared so destruction is done on this thread
- const std::shared_ptr<FileSource> assetFileSource;
- const std::unique_ptr<util::Thread<Impl>> thread;
+ Scheduler& scheduler;
+ const std::unique_ptr<Scheduler> workerScheduler;
+ const std::unique_ptr<Actor<Impl>> worker;
std::mutex cachedBaseURLMutex;
std::string cachedBaseURL = mbgl::util::API_BASE_URL;
diff --git a/platform/default/default_file_source.cpp b/platform/default/default_file_source.cpp
index 3b69aeb857..81fabcb2c4 100644
--- a/platform/default/default_file_source.cpp
+++ b/platform/default/default_file_source.cpp
@@ -5,10 +5,13 @@
#include <mbgl/storage/offline_database.hpp>
#include <mbgl/storage/offline_download.hpp>
+#include <mbgl/actor/actor.hpp>
#include <mbgl/util/platform.hpp>
#include <mbgl/util/url.hpp>
#include <mbgl/util/thread.hpp>
#include <mbgl/util/work_request.hpp>
+#include <mbgl/util/logging.hpp>
+#include <mbgl/util/threaded_run_loop.hpp>
#include <cassert>
@@ -24,10 +27,56 @@ bool isAssetURL(const std::string& url) {
namespace mbgl {
+class RequestCallback {
+public:
+ RequestCallback(ActorRef<RequestCallback>, FileSource::Callback callback_)
+ : callback(std::move(callback_)) {
+ }
+
+ void operator() (Response response) {
+ Log::Info(Event::General, "RequestCallback::operator()");
+ callback(response);
+ }
+
+// void operator() (Response response) const {
+// Log::Info(Event::General, "RequestCallback::operator() const");
+// callback(response);
+// }
+
+private:
+ FileSource::Callback callback;
+};
+
+//class ResourceTransformCallback {
+//public:
+// ResourceTransformCallback(ActorRef<ResourceTransformCallback>, OnlineFileSource::ResourceTransform callback_)
+// : callback(std::move(callback_)) {
+// }
+//
+// //std::function<std::unique_ptr<AsyncRequest>(Resource::Kind, std::string&&, std::function<void(std::string&&)>)>
+// std::unique_ptr<AsyncRequest> operator() (Resource::Kind kind, std::string&& url, std::function<void(std::string&&)> resultCallback) {
+// return callback(kind, std::move(url), resultCallback);
+// }
+//
+//private:
+// OnlineFileSource::ResourceTransform callback;
+//};
+
+//class AssetRequest {
+//public:
+// AssetRequest(ActorRef<AssetRequest>);
+// ~AssetRequest(); // cancels
+//
+// void send(const Resource& resource, ActorRef<RequestCallback> callback) {
+// // do whatever, then eventually:
+// callback.invoke(&RequestCallback::operator(), response);
+// }
+//};
+
class DefaultFileSource::Impl {
public:
- Impl(std::shared_ptr<FileSource> assetFileSource_, const std::string& cachePath, uint64_t maximumCacheSize)
- : assetFileSource(assetFileSource_)
+ Impl(ActorRef<Impl> /*self*/, std::unique_ptr<FileSource> assetFileSource_, const std::string& cachePath, uint64_t maximumCacheSize)
+ : assetFileSource(std::move(assetFileSource_))
, localFileSource(std::make_unique<LocalFileSource>())
, offlineDatabase(cachePath, maximumCacheSize) {
}
@@ -106,13 +155,13 @@ public:
getDownload(regionID).setState(state);
}
- void request(AsyncRequest* req, Resource resource, Callback callback) {
+ void request(AsyncRequest* req, Resource resource, ActorRef<RequestCallback> callback) {
if (isAssetURL(resource.url)) {
//Asset request
- tasks[req] = assetFileSource->request(resource, callback);
+ tasks[req] = assetFileSource->request(resource, [&](Response response) { callback.invoke(&RequestCallback::operator(), std::move(response)); });
} else if (LocalFileSource::acceptsURL(resource.url)) {
//Local file request
- tasks[req] = localFileSource->request(resource, callback);
+ tasks[req] = localFileSource->request(resource, [&callback](Response response) { callback.invoke(&RequestCallback::operator(), response); });
} else {
// Try the offline database
Resource revalidation = resource;
@@ -134,7 +183,7 @@ public:
revalidation.priorModified = offlineResponse->modified;
revalidation.priorExpires = offlineResponse->expires;
revalidation.priorEtag = offlineResponse->etag;
- callback(*offlineResponse);
+ callback.invoke(&RequestCallback::operator(), *offlineResponse);
}
}
@@ -142,13 +191,14 @@ public:
if (resource.necessity == Resource::Required) {
tasks[req] = onlineFileSource.request(revalidation, [=] (Response onlineResponse) {
this->offlineDatabase.put(revalidation, onlineResponse);
- callback(onlineResponse);
+ //TODO callback.invoke(&RequestCallback::operator(), onlineResponse);
});
}
}
}
void cancel(AsyncRequest* req) {
+ Log::Info(Event::General, "DefaultFileSource::Impl::cancel");
tasks.erase(req);
}
@@ -170,8 +220,7 @@ private:
std::make_unique<OfflineDownload>(regionID, offlineDatabase.getRegionDefinition(regionID), offlineDatabase, onlineFileSource)).first->second;
}
- // shared so that destruction is done on the creating thread
- const std::shared_ptr<FileSource> assetFileSource;
+ const std::unique_ptr<FileSource> assetFileSource;
const std::unique_ptr<FileSource> localFileSource;
OfflineDatabase offlineDatabase;
OnlineFileSource onlineFileSource;
@@ -186,19 +235,20 @@ DefaultFileSource::DefaultFileSource(Scheduler& scheduler,
: DefaultFileSource(scheduler, cachePath, std::make_unique<AssetFileSource>(scheduler, assetRoot), maximumCacheSize) {
}
-DefaultFileSource::DefaultFileSource(Scheduler&,
+DefaultFileSource::DefaultFileSource(Scheduler& scheduler_,
const std::string& cachePath,
std::unique_ptr<FileSource>&& assetFileSource_,
uint64_t maximumCacheSize)
- : assetFileSource(std::move(assetFileSource_))
- , thread(std::make_unique<util::Thread<Impl>>(util::ThreadContext{"DefaultFileSource", util::ThreadPriority::Low},
- assetFileSource, cachePath, maximumCacheSize)) {
+ : scheduler(scheduler_)
+ , workerScheduler(std::make_unique<ThreadedRunLoop>("DefaultFileSource"))
+ , worker(std::make_unique<Actor<Impl>>(*workerScheduler, std::move(assetFileSource_), cachePath, maximumCacheSize)) {
+
}
DefaultFileSource::~DefaultFileSource() = default;
void DefaultFileSource::setAPIBaseURL(const std::string& baseURL) {
- thread->invoke(&Impl::setAPIBaseURL, baseURL);
+ worker->invoke(&Impl::setAPIBaseURL, baseURL);
{
std::lock_guard<std::mutex> lock(cachedBaseURLMutex);
@@ -212,7 +262,7 @@ std::string DefaultFileSource::getAPIBaseURL() {
}
void DefaultFileSource::setAccessToken(const std::string& accessToken) {
- thread->invoke(&Impl::setAccessToken, accessToken);
+ worker->invoke(&Impl::setAccessToken, accessToken);
{
std::lock_guard<std::mutex> lock(cachedAccessTokenMutex);
@@ -228,83 +278,88 @@ std::string DefaultFileSource::getAccessToken() {
void DefaultFileSource::setResourceTransform(std::function<std::string(Resource::Kind, std::string&&)> transform) {
if (transform) {
auto loop = util::RunLoop::Get();
- thread->invoke(&Impl::setResourceTransform, [loop, transform](Resource::Kind kind_, std::string&& url_, auto callback_) {
+ worker->invoke(&Impl::setResourceTransform, [loop, transform](Resource::Kind kind_, std::string&& url_, auto callback_) {
return loop->invokeWithCallback([transform](Resource::Kind kind, std::string&& url, auto callback) {
callback(transform(kind, std::move(url)));
}, kind_, std::move(url_), callback_);
});
} else {
- thread->invoke(&Impl::setResourceTransform, nullptr);
+ worker->invoke(&Impl::setResourceTransform, nullptr);
}
}
std::unique_ptr<AsyncRequest> DefaultFileSource::request(const Resource& resource, Callback callback) {
+ Log::Info(Event::General, "DefaultFileSource::request: %s", resource.url.c_str());
class DefaultFileRequest : public AsyncRequest {
public:
- DefaultFileRequest(Resource resource_, FileSource::Callback callback_, util::Thread<DefaultFileSource::Impl>& thread_)
- : thread(thread_),
- workRequest(thread.invokeWithCallback(&DefaultFileSource::Impl::request, this, resource_, callback_)) {
+ DefaultFileRequest(Scheduler& scheduler, Resource resource_, FileSource::Callback callback_, ActorRef<DefaultFileSource::Impl> worker_)
+ : worker(worker_)
+ , callback(scheduler, callback_) {
+ worker.invoke(&DefaultFileSource::Impl::request, this, resource_, callback.self());
}
~DefaultFileRequest() override {
- thread.invoke(&DefaultFileSource::Impl::cancel, this);
+ worker.invoke(&DefaultFileSource::Impl::cancel, this);
}
- util::Thread<DefaultFileSource::Impl>& thread;
- std::unique_ptr<AsyncRequest> workRequest;
+ private:
+ ActorRef<DefaultFileSource::Impl> worker;
+ Actor<RequestCallback> callback;
};
- return std::make_unique<DefaultFileRequest>(resource, callback, *thread);
+ return std::make_unique<DefaultFileRequest>(scheduler, resource, callback, worker->self());
}
void DefaultFileSource::listOfflineRegions(std::function<void (std::exception_ptr, optional<std::vector<OfflineRegion>>)> callback) {
- thread->invoke(&Impl::listRegions, callback);
+ worker->invoke(&Impl::listRegions, callback);
}
void DefaultFileSource::createOfflineRegion(const OfflineRegionDefinition& definition,
const OfflineRegionMetadata& metadata,
std::function<void (std::exception_ptr, optional<OfflineRegion>)> callback) {
- thread->invoke(&Impl::createRegion, definition, metadata, callback);
+ worker->invoke(&Impl::createRegion, definition, metadata, callback);
}
void DefaultFileSource::updateOfflineMetadata(const int64_t regionID,
const OfflineRegionMetadata& metadata,
std::function<void (std::exception_ptr, optional<OfflineRegionMetadata>)> callback) {
- thread->invoke(&Impl::updateMetadata, regionID, metadata, callback);
+ worker->invoke(&Impl::updateMetadata, regionID, metadata, callback);
}
void DefaultFileSource::deleteOfflineRegion(OfflineRegion&& region, std::function<void (std::exception_ptr)> callback) {
- thread->invoke(&Impl::deleteRegion, std::move(region), callback);
+ worker->invoke(&Impl::deleteRegion, std::move(region), callback);
}
void DefaultFileSource::setOfflineRegionObserver(OfflineRegion& region, std::unique_ptr<OfflineRegionObserver> observer) {
- thread->invoke(&Impl::setRegionObserver, region.getID(), std::move(observer));
+ worker->invoke(&Impl::setRegionObserver, region.getID(), std::move(observer));
}
void DefaultFileSource::setOfflineRegionDownloadState(OfflineRegion& region, OfflineRegionDownloadState state) {
- thread->invoke(&Impl::setRegionDownloadState, region.getID(), state);
+ worker->invoke(&Impl::setRegionDownloadState, region.getID(), state);
}
void DefaultFileSource::getOfflineRegionStatus(OfflineRegion& region, std::function<void (std::exception_ptr, optional<OfflineRegionStatus>)> callback) const {
- thread->invoke(&Impl::getRegionStatus, region.getID(), callback);
+ worker->invoke(&Impl::getRegionStatus, region.getID(), callback);
}
void DefaultFileSource::setOfflineMapboxTileCountLimit(uint64_t limit) const {
- thread->invokeSync(&Impl::setOfflineMapboxTileCountLimit, limit);
+ // TODO: was sync, why?
+ worker->invoke(&Impl::setOfflineMapboxTileCountLimit, limit);
}
void DefaultFileSource::pause() {
- thread->pause();
+ //TODO: thread->pause();
}
void DefaultFileSource::resume() {
- thread->resume();
+ //TODO: thread->resume();
}
// For testing only:
void DefaultFileSource::put(const Resource& resource, const Response& response) {
- thread->invokeSync(&Impl::put, resource, response);
+ // TODO: was sync, callback?
+ worker->invoke(&Impl::put, resource, response);
}
} // namespace mbgl
diff --git a/platform/macos/config.cmake b/platform/macos/config.cmake
index 8dc3c38245..a099017a44 100644
--- a/platform/macos/config.cmake
+++ b/platform/macos/config.cmake
@@ -16,6 +16,8 @@ macro(mbgl_platform_core)
PRIVATE platform/default/default_file_source.cpp
PRIVATE platform/default/local_file_source.cpp
PRIVATE platform/default/online_file_source.cpp
+ PRIVATE platform/default/mbgl/util/threaded_run_loop.cpp
+ PRIVATE platform/default/mbgl/util/threaded_run_loop.hpp
# Default styles
PRIVATE platform/default/mbgl/util/default_styles.hpp