diff options
author | Ivo van Dongen <info@ivovandongen.nl> | 2017-05-30 13:41:56 +0300 |
---|---|---|
committer | Ivo van Dongen <info@ivovandongen.nl> | 2017-05-30 13:42:47 +0300 |
commit | 345d7c4d1f8442bd139ca71d4470a7466032e4e7 (patch) | |
tree | f9326ed589c78154f874a98611b85797232f9034 | |
parent | 1a6c6ea19746245b9492efaaba464d8a653062e8 (diff) | |
download | qtlocation-mapboxgl-upstream/default-filesource-actor.tar.gz |
[core] WIP use threaded run loop in default file sourceupstream/default-filesource-actor
-rw-r--r-- | cmake/node.cmake | 2 | ||||
-rw-r--r-- | include/mbgl/storage/default_file_source.hpp | 12 | ||||
-rw-r--r-- | platform/default/default_file_source.cpp | 125 | ||||
-rw-r--r-- | platform/macos/config.cmake | 2 |
4 files changed, 99 insertions, 42 deletions
diff --git a/cmake/node.cmake b/cmake/node.cmake index 6833cb983f..0d40052b63 100644 --- a/cmake/node.cmake +++ b/cmake/node.cmake @@ -22,6 +22,8 @@ target_sources(mbgl-node PRIVATE platform/node/src/node_thread_pool.hpp PRIVATE platform/node/src/node_thread_pool.cpp PRIVATE platform/node/src/util/async_queue.hpp + PRIVATE platform/default/mbgl/util/threaded_run_loop.hpp + PRIVATE platform/default/mbgl/util/threaded_run_loop.cpp ) target_compile_options(mbgl-node diff --git a/include/mbgl/storage/default_file_source.hpp b/include/mbgl/storage/default_file_source.hpp index 01b8a3698d..8990c1c56f 100644 --- a/include/mbgl/storage/default_file_source.hpp +++ b/include/mbgl/storage/default_file_source.hpp @@ -9,12 +9,10 @@ namespace mbgl { +template <class> +class Actor; class Scheduler; -namespace util { -template <typename T> class Thread; -} // namespace util - class DefaultFileSource : public FileSource { public: /* @@ -145,9 +143,9 @@ public: class Impl; private: - // Shared so destruction is done on this thread - const std::shared_ptr<FileSource> assetFileSource; - const std::unique_ptr<util::Thread<Impl>> thread; + Scheduler& scheduler; + const std::unique_ptr<Scheduler> workerScheduler; + const std::unique_ptr<Actor<Impl>> worker; std::mutex cachedBaseURLMutex; std::string cachedBaseURL = mbgl::util::API_BASE_URL; diff --git a/platform/default/default_file_source.cpp b/platform/default/default_file_source.cpp index 3b69aeb857..81fabcb2c4 100644 --- a/platform/default/default_file_source.cpp +++ b/platform/default/default_file_source.cpp @@ -5,10 +5,13 @@ #include <mbgl/storage/offline_database.hpp> #include <mbgl/storage/offline_download.hpp> +#include <mbgl/actor/actor.hpp> #include <mbgl/util/platform.hpp> #include <mbgl/util/url.hpp> #include <mbgl/util/thread.hpp> #include <mbgl/util/work_request.hpp> +#include <mbgl/util/logging.hpp> +#include <mbgl/util/threaded_run_loop.hpp> #include <cassert> @@ -24,10 +27,56 @@ bool isAssetURL(const std::string& url) { namespace mbgl { +class RequestCallback { +public: + RequestCallback(ActorRef<RequestCallback>, FileSource::Callback callback_) + : callback(std::move(callback_)) { + } + + void operator() (Response response) { + Log::Info(Event::General, "RequestCallback::operator()"); + callback(response); + } + +// void operator() (Response response) const { +// Log::Info(Event::General, "RequestCallback::operator() const"); +// callback(response); +// } + +private: + FileSource::Callback callback; +}; + +//class ResourceTransformCallback { +//public: +// ResourceTransformCallback(ActorRef<ResourceTransformCallback>, OnlineFileSource::ResourceTransform callback_) +// : callback(std::move(callback_)) { +// } +// +// //std::function<std::unique_ptr<AsyncRequest>(Resource::Kind, std::string&&, std::function<void(std::string&&)>)> +// std::unique_ptr<AsyncRequest> operator() (Resource::Kind kind, std::string&& url, std::function<void(std::string&&)> resultCallback) { +// return callback(kind, std::move(url), resultCallback); +// } +// +//private: +// OnlineFileSource::ResourceTransform callback; +//}; + +//class AssetRequest { +//public: +// AssetRequest(ActorRef<AssetRequest>); +// ~AssetRequest(); // cancels +// +// void send(const Resource& resource, ActorRef<RequestCallback> callback) { +// // do whatever, then eventually: +// callback.invoke(&RequestCallback::operator(), response); +// } +//}; + class DefaultFileSource::Impl { public: - Impl(std::shared_ptr<FileSource> assetFileSource_, const std::string& cachePath, uint64_t maximumCacheSize) - : assetFileSource(assetFileSource_) + Impl(ActorRef<Impl> /*self*/, std::unique_ptr<FileSource> assetFileSource_, const std::string& cachePath, uint64_t maximumCacheSize) + : assetFileSource(std::move(assetFileSource_)) , localFileSource(std::make_unique<LocalFileSource>()) , offlineDatabase(cachePath, maximumCacheSize) { } @@ -106,13 +155,13 @@ public: getDownload(regionID).setState(state); } - void request(AsyncRequest* req, Resource resource, Callback callback) { + void request(AsyncRequest* req, Resource resource, ActorRef<RequestCallback> callback) { if (isAssetURL(resource.url)) { //Asset request - tasks[req] = assetFileSource->request(resource, callback); + tasks[req] = assetFileSource->request(resource, [&](Response response) { callback.invoke(&RequestCallback::operator(), std::move(response)); }); } else if (LocalFileSource::acceptsURL(resource.url)) { //Local file request - tasks[req] = localFileSource->request(resource, callback); + tasks[req] = localFileSource->request(resource, [&callback](Response response) { callback.invoke(&RequestCallback::operator(), response); }); } else { // Try the offline database Resource revalidation = resource; @@ -134,7 +183,7 @@ public: revalidation.priorModified = offlineResponse->modified; revalidation.priorExpires = offlineResponse->expires; revalidation.priorEtag = offlineResponse->etag; - callback(*offlineResponse); + callback.invoke(&RequestCallback::operator(), *offlineResponse); } } @@ -142,13 +191,14 @@ public: if (resource.necessity == Resource::Required) { tasks[req] = onlineFileSource.request(revalidation, [=] (Response onlineResponse) { this->offlineDatabase.put(revalidation, onlineResponse); - callback(onlineResponse); + //TODO callback.invoke(&RequestCallback::operator(), onlineResponse); }); } } } void cancel(AsyncRequest* req) { + Log::Info(Event::General, "DefaultFileSource::Impl::cancel"); tasks.erase(req); } @@ -170,8 +220,7 @@ private: std::make_unique<OfflineDownload>(regionID, offlineDatabase.getRegionDefinition(regionID), offlineDatabase, onlineFileSource)).first->second; } - // shared so that destruction is done on the creating thread - const std::shared_ptr<FileSource> assetFileSource; + const std::unique_ptr<FileSource> assetFileSource; const std::unique_ptr<FileSource> localFileSource; OfflineDatabase offlineDatabase; OnlineFileSource onlineFileSource; @@ -186,19 +235,20 @@ DefaultFileSource::DefaultFileSource(Scheduler& scheduler, : DefaultFileSource(scheduler, cachePath, std::make_unique<AssetFileSource>(scheduler, assetRoot), maximumCacheSize) { } -DefaultFileSource::DefaultFileSource(Scheduler&, +DefaultFileSource::DefaultFileSource(Scheduler& scheduler_, const std::string& cachePath, std::unique_ptr<FileSource>&& assetFileSource_, uint64_t maximumCacheSize) - : assetFileSource(std::move(assetFileSource_)) - , thread(std::make_unique<util::Thread<Impl>>(util::ThreadContext{"DefaultFileSource", util::ThreadPriority::Low}, - assetFileSource, cachePath, maximumCacheSize)) { + : scheduler(scheduler_) + , workerScheduler(std::make_unique<ThreadedRunLoop>("DefaultFileSource")) + , worker(std::make_unique<Actor<Impl>>(*workerScheduler, std::move(assetFileSource_), cachePath, maximumCacheSize)) { + } DefaultFileSource::~DefaultFileSource() = default; void DefaultFileSource::setAPIBaseURL(const std::string& baseURL) { - thread->invoke(&Impl::setAPIBaseURL, baseURL); + worker->invoke(&Impl::setAPIBaseURL, baseURL); { std::lock_guard<std::mutex> lock(cachedBaseURLMutex); @@ -212,7 +262,7 @@ std::string DefaultFileSource::getAPIBaseURL() { } void DefaultFileSource::setAccessToken(const std::string& accessToken) { - thread->invoke(&Impl::setAccessToken, accessToken); + worker->invoke(&Impl::setAccessToken, accessToken); { std::lock_guard<std::mutex> lock(cachedAccessTokenMutex); @@ -228,83 +278,88 @@ std::string DefaultFileSource::getAccessToken() { void DefaultFileSource::setResourceTransform(std::function<std::string(Resource::Kind, std::string&&)> transform) { if (transform) { auto loop = util::RunLoop::Get(); - thread->invoke(&Impl::setResourceTransform, [loop, transform](Resource::Kind kind_, std::string&& url_, auto callback_) { + worker->invoke(&Impl::setResourceTransform, [loop, transform](Resource::Kind kind_, std::string&& url_, auto callback_) { return loop->invokeWithCallback([transform](Resource::Kind kind, std::string&& url, auto callback) { callback(transform(kind, std::move(url))); }, kind_, std::move(url_), callback_); }); } else { - thread->invoke(&Impl::setResourceTransform, nullptr); + worker->invoke(&Impl::setResourceTransform, nullptr); } } std::unique_ptr<AsyncRequest> DefaultFileSource::request(const Resource& resource, Callback callback) { + Log::Info(Event::General, "DefaultFileSource::request: %s", resource.url.c_str()); class DefaultFileRequest : public AsyncRequest { public: - DefaultFileRequest(Resource resource_, FileSource::Callback callback_, util::Thread<DefaultFileSource::Impl>& thread_) - : thread(thread_), - workRequest(thread.invokeWithCallback(&DefaultFileSource::Impl::request, this, resource_, callback_)) { + DefaultFileRequest(Scheduler& scheduler, Resource resource_, FileSource::Callback callback_, ActorRef<DefaultFileSource::Impl> worker_) + : worker(worker_) + , callback(scheduler, callback_) { + worker.invoke(&DefaultFileSource::Impl::request, this, resource_, callback.self()); } ~DefaultFileRequest() override { - thread.invoke(&DefaultFileSource::Impl::cancel, this); + worker.invoke(&DefaultFileSource::Impl::cancel, this); } - util::Thread<DefaultFileSource::Impl>& thread; - std::unique_ptr<AsyncRequest> workRequest; + private: + ActorRef<DefaultFileSource::Impl> worker; + Actor<RequestCallback> callback; }; - return std::make_unique<DefaultFileRequest>(resource, callback, *thread); + return std::make_unique<DefaultFileRequest>(scheduler, resource, callback, worker->self()); } void DefaultFileSource::listOfflineRegions(std::function<void (std::exception_ptr, optional<std::vector<OfflineRegion>>)> callback) { - thread->invoke(&Impl::listRegions, callback); + worker->invoke(&Impl::listRegions, callback); } void DefaultFileSource::createOfflineRegion(const OfflineRegionDefinition& definition, const OfflineRegionMetadata& metadata, std::function<void (std::exception_ptr, optional<OfflineRegion>)> callback) { - thread->invoke(&Impl::createRegion, definition, metadata, callback); + worker->invoke(&Impl::createRegion, definition, metadata, callback); } void DefaultFileSource::updateOfflineMetadata(const int64_t regionID, const OfflineRegionMetadata& metadata, std::function<void (std::exception_ptr, optional<OfflineRegionMetadata>)> callback) { - thread->invoke(&Impl::updateMetadata, regionID, metadata, callback); + worker->invoke(&Impl::updateMetadata, regionID, metadata, callback); } void DefaultFileSource::deleteOfflineRegion(OfflineRegion&& region, std::function<void (std::exception_ptr)> callback) { - thread->invoke(&Impl::deleteRegion, std::move(region), callback); + worker->invoke(&Impl::deleteRegion, std::move(region), callback); } void DefaultFileSource::setOfflineRegionObserver(OfflineRegion& region, std::unique_ptr<OfflineRegionObserver> observer) { - thread->invoke(&Impl::setRegionObserver, region.getID(), std::move(observer)); + worker->invoke(&Impl::setRegionObserver, region.getID(), std::move(observer)); } void DefaultFileSource::setOfflineRegionDownloadState(OfflineRegion& region, OfflineRegionDownloadState state) { - thread->invoke(&Impl::setRegionDownloadState, region.getID(), state); + worker->invoke(&Impl::setRegionDownloadState, region.getID(), state); } void DefaultFileSource::getOfflineRegionStatus(OfflineRegion& region, std::function<void (std::exception_ptr, optional<OfflineRegionStatus>)> callback) const { - thread->invoke(&Impl::getRegionStatus, region.getID(), callback); + worker->invoke(&Impl::getRegionStatus, region.getID(), callback); } void DefaultFileSource::setOfflineMapboxTileCountLimit(uint64_t limit) const { - thread->invokeSync(&Impl::setOfflineMapboxTileCountLimit, limit); + // TODO: was sync, why? + worker->invoke(&Impl::setOfflineMapboxTileCountLimit, limit); } void DefaultFileSource::pause() { - thread->pause(); + //TODO: thread->pause(); } void DefaultFileSource::resume() { - thread->resume(); + //TODO: thread->resume(); } // For testing only: void DefaultFileSource::put(const Resource& resource, const Response& response) { - thread->invokeSync(&Impl::put, resource, response); + // TODO: was sync, callback? + worker->invoke(&Impl::put, resource, response); } } // namespace mbgl diff --git a/platform/macos/config.cmake b/platform/macos/config.cmake index 8dc3c38245..a099017a44 100644 --- a/platform/macos/config.cmake +++ b/platform/macos/config.cmake @@ -16,6 +16,8 @@ macro(mbgl_platform_core) PRIVATE platform/default/default_file_source.cpp PRIVATE platform/default/local_file_source.cpp PRIVATE platform/default/online_file_source.cpp + PRIVATE platform/default/mbgl/util/threaded_run_loop.cpp + PRIVATE platform/default/mbgl/util/threaded_run_loop.hpp # Default styles PRIVATE platform/default/mbgl/util/default_styles.hpp |