diff options
Diffstat (limited to 'platform/default/src/mbgl/storage/online_file_source.cpp')
-rw-r--r-- | platform/default/src/mbgl/storage/online_file_source.cpp | 369 |
1 files changed, 249 insertions, 120 deletions
diff --git a/platform/default/src/mbgl/storage/online_file_source.cpp b/platform/default/src/mbgl/storage/online_file_source.cpp index f4225bdf3f..0f5b438e1d 100644 --- a/platform/default/src/mbgl/storage/online_file_source.cpp +++ b/platform/default/src/mbgl/storage/online_file_source.cpp @@ -2,52 +2,59 @@ #include <mbgl/storage/http_file_source.hpp> #include <mbgl/storage/network_status.hpp> +#include <mbgl/storage/file_source_request.hpp> #include <mbgl/storage/resource_transform.hpp> #include <mbgl/storage/response.hpp> #include <mbgl/util/logging.hpp> #include <mbgl/actor/mailbox.hpp> +#include <mbgl/util/async_task.hpp> +#include <mbgl/util/chrono.hpp> #include <mbgl/util/constants.hpp> -#include <mbgl/util/mapbox.hpp> #include <mbgl/util/exception.hpp> -#include <mbgl/util/chrono.hpp> -#include <mbgl/util/async_task.hpp> +#include <mbgl/util/http_timeout.hpp> +#include <mbgl/util/mapbox.hpp> #include <mbgl/util/noncopyable.hpp> #include <mbgl/util/run_loop.hpp> +#include <mbgl/util/thread.hpp> #include <mbgl/util/timer.hpp> -#include <mbgl/util/http_timeout.hpp> #include <algorithm> #include <cassert> #include <list> -#include <unordered_set> +#include <map> #include <unordered_map> +#include <unordered_set> namespace mbgl { -static uint32_t DEFAULT_MAXIMUM_CONCURRENT_REQUESTS = 20; +// For testing only +constexpr const char* ONLINE_STATUS_KEY = "online-status"; -class OnlineFileRequest : public AsyncRequest { -public: - using Callback = std::function<void (Response)>; +class OnlineFileSourceThread; - OnlineFileRequest(Resource, Callback, OnlineFileSource::Impl&); - ~OnlineFileRequest() override; +struct OnlineFileRequest { + using Callback = std::function<void(Response)>; + + OnlineFileRequest(Resource resource_, Callback callback_, OnlineFileSourceThread& impl_); + ~OnlineFileRequest(); void networkIsReachableAgain(); void schedule(); void schedule(optional<Timestamp> expires); void completed(Response); - void setTransformedURL(const std::string&& url); + void setTransformedURL(const std::string& url); ActorRef<OnlineFileRequest> actor(); + void onCancel(std::function<void()>); - OnlineFileSource::Impl& impl; + OnlineFileSourceThread& impl; Resource resource; std::unique_ptr<AsyncRequest> request; util::Timer timer; Callback callback; + std::function<void()> cancelCallback = nullptr; std::shared_ptr<Mailbox> mailbox; // Counts the number of times a response was already expired when received. We're using @@ -62,101 +69,99 @@ public: optional<Timestamp> retryAfter; }; -class OnlineFileSource::Impl { +class OnlineFileSourceThread { public: - Impl() { + OnlineFileSourceThread() { NetworkStatus::Subscribe(&reachability); - setMaximumConcurrentRequests(DEFAULT_MAXIMUM_CONCURRENT_REQUESTS); + setMaximumConcurrentRequests(util::DEFAULT_MAXIMUM_CONCURRENT_REQUESTS); + } + + ~OnlineFileSourceThread() { NetworkStatus::Unsubscribe(&reachability); } + + void request(AsyncRequest* req, Resource resource, ActorRef<FileSourceRequest> ref) { + auto callback = [ref](const Response& res) { ref.invoke(&FileSourceRequest::setResponse, res); }; + tasks[req] = std::make_unique<OnlineFileRequest>(std::move(resource), std::move(callback), *this); } - ~Impl() { - NetworkStatus::Unsubscribe(&reachability); + void cancel(AsyncRequest* req) { + auto it = tasks.find(req); + assert(it != tasks.end()); + remove(it->second.get()); + tasks.erase(it); } - void add(OnlineFileRequest* request) { - allRequests.insert(request); + void add(OnlineFileRequest* req) { + allRequests.insert(req); if (resourceTransform) { // Request the ResourceTransform actor a new url and replace the resource url with the // transformed one before proceeding to schedule the request. - resourceTransform->invoke(&ResourceTransform::transform, - request->resource.kind, - std::move(request->resource.url), - [ref = request->actor()](const std::string&& url) { - ref.invoke(&OnlineFileRequest::setTransformedURL, url); - }); + resourceTransform.transform( + req->resource.kind, req->resource.url, [ref = req->actor()](const std::string& url) { + ref.invoke(&OnlineFileRequest::setTransformedURL, url); + }); } else { - request->schedule(); + req->schedule(); } } - void remove(OnlineFileRequest* request) { - allRequests.erase(request); - if (activeRequests.erase(request)) { + void remove(OnlineFileRequest* req) { + allRequests.erase(req); + if (activeRequests.erase(req)) { activatePendingRequest(); } else { - pendingRequests.remove(request); + pendingRequests.remove(req); } } - void activateOrQueueRequest(OnlineFileRequest* request) { - assert(allRequests.find(request) != allRequests.end()); - assert(activeRequests.find(request) == activeRequests.end()); - assert(!request->request); + void activateOrQueueRequest(OnlineFileRequest* req) { + assert(allRequests.find(req) != allRequests.end()); + assert(activeRequests.find(req) == activeRequests.end()); + assert(!req->request); if (activeRequests.size() >= getMaximumConcurrentRequests()) { - queueRequest(request); + queueRequest(req); } else { - activateRequest(request); + activateRequest(req); } } - void queueRequest(OnlineFileRequest* request) { - pendingRequests.insert(request); - } + void queueRequest(OnlineFileRequest* req) { pendingRequests.insert(req); } - void activateRequest(OnlineFileRequest* request) { + void activateRequest(OnlineFileRequest* req) { auto callback = [=](Response response) { - activeRequests.erase(request); - request->request.reset(); - request->completed(response); + activeRequests.erase(req); + req->request.reset(); + req->completed(response); activatePendingRequest(); }; - activeRequests.insert(request); + activeRequests.insert(req); if (online) { - request->request = httpFileSource.request(request->resource, callback); + req->request = httpFileSource.request(req->resource, callback); } else { Response response; response.error = std::make_unique<Response::Error>(Response::Error::Reason::Connection, "Online connectivity is disabled."); callback(response); } - } void activatePendingRequest() { + auto req = pendingRequests.pop(); - auto request = pendingRequests.pop(); - - if (request) { - activateRequest(*request); + if (req) { + activateRequest(*req); } } - bool isPending(OnlineFileRequest* request) { - return pendingRequests.contains(request); - } + bool isPending(OnlineFileRequest* req) { return pendingRequests.contains(req); } - bool isActive(OnlineFileRequest* request) { - return activeRequests.find(request) != activeRequests.end(); - } + bool isActive(OnlineFileRequest* req) { return activeRequests.find(req) != activeRequests.end(); } - void setResourceTransform(optional<ActorRef<ResourceTransform>>&& transform) { - resourceTransform = std::move(transform); - } + void setResourceTransform(ResourceTransform transform) { resourceTransform = std::move(transform); } - void setOnlineStatus(const bool status) { + void setOnlineStatus(bool status) { online = status; if (online) { networkIsReachableAgain(); @@ -171,20 +176,27 @@ public: maximumConcurrentRequests = maximumConcurrentRequests_; } + void setAPIBaseURL(const std::string& t) { apiBaseURL = t; } + std::string getAPIBaseURL() const { return apiBaseURL; } + + void setAccessToken(const std::string& t) { accessToken = t; } + std::string getAccessToken() const { return accessToken; } + private: + friend struct OnlineFileRequest; void networkIsReachableAgain() { // Notify regular priority requests. - for (auto& request : allRequests) { - if (request->resource.priority == Resource::Priority::Regular) { - request->networkIsReachableAgain(); + for (auto& req : allRequests) { + if (req->resource.priority == Resource::Priority::Regular) { + req->networkIsReachableAgain(); } } // Notify low priority requests. - for (auto& request : allRequests) { - if (request->resource.priority == Resource::Priority::Low) { - request->networkIsReachableAgain(); + for (auto& req : allRequests) { + if (req->resource.priority == Resource::Priority::Low) { + req->networkIsReachableAgain(); } } } @@ -231,7 +243,6 @@ private: } } - optional<OnlineFileRequest*> pop() { if (queue.empty()) { return optional<OnlineFileRequest*>(); @@ -252,7 +263,7 @@ private: }; - optional<ActorRef<ResourceTransform>> resourceTransform; + ResourceTransform resourceTransform; /** * The lifetime of a request is: @@ -274,56 +285,99 @@ private: bool online = true; uint32_t maximumConcurrentRequests; HTTPFileSource httpFileSource; - util::AsyncTask reachability { std::bind(&Impl::networkIsReachableAgain, this) }; + util::AsyncTask reachability{std::bind(&OnlineFileSourceThread::networkIsReachableAgain, this)}; + std::string accessToken; + std::string apiBaseURL = mbgl::util::API_BASE_URL; + std::map<AsyncRequest*, std::unique_ptr<OnlineFileRequest>> tasks; }; -OnlineFileSource::OnlineFileSource() - : impl(std::make_unique<Impl>()) { -} +class OnlineFileSource::Impl { +public: + Impl() : thread(std::make_unique<util::Thread<OnlineFileSourceThread>>("OnlineFileSource")) {} -OnlineFileSource::~OnlineFileSource() = default; + std::unique_ptr<AsyncRequest> request(Callback callback, Resource res) { + auto req = std::make_unique<FileSourceRequest>(std::move(callback)); + req->onCancel( + [actorRef = thread->actor(), req = req.get()]() { actorRef.invoke(&OnlineFileSourceThread::cancel, req); }); + thread->actor().invoke(&OnlineFileSourceThread::request, req.get(), std::move(res), req->actor()); + return std::move(req); + } -std::unique_ptr<AsyncRequest> OnlineFileSource::request(const Resource& resource, Callback callback) { - Resource res = resource; + void pause() { thread->pause(); } - switch (resource.kind) { - case Resource::Kind::Unknown: - case Resource::Kind::Image: - break; + void resume() { thread->resume(); } + + void setResourceTransform(ResourceTransform transform) { + thread->actor().invoke(&OnlineFileSourceThread::setResourceTransform, std::move(transform)); + } + + void setOnlineStatus(bool status) { thread->actor().invoke(&OnlineFileSourceThread::setOnlineStatus, status); } - case Resource::Kind::Style: - res.url = mbgl::util::mapbox::normalizeStyleURL(apiBaseURL, resource.url, accessToken); - break; + void setAPIBaseURL(const mapbox::base::Value& value) { + if (auto* baseURL = value.getString()) { + thread->actor().invoke(&OnlineFileSourceThread::setAPIBaseURL, *baseURL); + { + std::lock_guard<std::mutex> lock(cachedBaseURLMutex); + cachedBaseURL = *baseURL; + } + } else { + Log::Error(Event::General, "Invalid api-base-url property value type."); + } + } - case Resource::Kind::Source: - res.url = util::mapbox::normalizeSourceURL(apiBaseURL, resource.url, accessToken); - break; + std::string getAPIBaseURL() const { + std::lock_guard<std::mutex> lock(cachedBaseURLMutex); + return cachedBaseURL; + } - case Resource::Kind::Glyphs: - res.url = util::mapbox::normalizeGlyphsURL(apiBaseURL, resource.url, accessToken); - break; + void setMaximumConcurrentRequests(const mapbox::base::Value& value) { + if (auto* maximumConcurrentRequests = value.getUint()) { + assert(*maximumConcurrentRequests < std::numeric_limits<uint32_t>::max()); + const uint32_t maxConcurretnRequests = static_cast<uint32_t>(*maximumConcurrentRequests); + thread->actor().invoke(&OnlineFileSourceThread::setMaximumConcurrentRequests, maxConcurretnRequests); + { + std::lock_guard<std::mutex> lock(maximumConcurrentRequestsMutex); + cachedMaximumConcurrentRequests = maxConcurretnRequests; + } + } else { + Log::Error(Event::General, "Invalid max-concurrent-requests property value type."); + } + } - case Resource::Kind::SpriteImage: - case Resource::Kind::SpriteJSON: - res.url = util::mapbox::normalizeSpriteURL(apiBaseURL, resource.url, accessToken); - break; + uint32_t getMaximumConcurrentRequests() const { + std::lock_guard<std::mutex> lock(maximumConcurrentRequestsMutex); + return cachedMaximumConcurrentRequests; + } - case Resource::Kind::Tile: - res.url = util::mapbox::normalizeTileURL(apiBaseURL, resource.url, accessToken); - break; + void setAccessToken(const mapbox::base::Value& value) { + if (auto* accessToken = value.getString()) { + thread->actor().invoke(&OnlineFileSourceThread::setAccessToken, *accessToken); + { + std::lock_guard<std::mutex> lock(cachedAccessTokenMutex); + cachedAccessToken = *accessToken; + } + } else { + Log::Error(Event::General, "Invalid access-token property value type."); + } } - return std::make_unique<OnlineFileRequest>(std::move(res), std::move(callback), *impl); -} + std::string getAccessToken() const { + std::lock_guard<std::mutex> lock(cachedAccessTokenMutex); + return cachedAccessToken; + } -void OnlineFileSource::setResourceTransform(optional<ActorRef<ResourceTransform>>&& transform) { - impl->setResourceTransform(std::move(transform)); -} +private: + mutable std::mutex cachedAccessTokenMutex; + std::string cachedAccessToken; + mutable std::mutex cachedBaseURLMutex; + std::string cachedBaseURL = util::API_BASE_URL; + mutable std::mutex maximumConcurrentRequestsMutex; + uint32_t cachedMaximumConcurrentRequests = util::DEFAULT_MAXIMUM_CONCURRENT_REQUESTS; + const std::unique_ptr<util::Thread<OnlineFileSourceThread>> thread; +}; -OnlineFileRequest::OnlineFileRequest(Resource resource_, Callback callback_, OnlineFileSource::Impl& impl_) - : impl(impl_), - resource(std::move(resource_)), - callback(std::move(callback_)) { +OnlineFileRequest::OnlineFileRequest(Resource resource_, Callback callback_, OnlineFileSourceThread& impl_) + : impl(impl_), resource(std::move(resource_)), callback(std::move(callback_)) { impl.add(this); } @@ -337,12 +391,12 @@ void OnlineFileRequest::schedule() { } OnlineFileRequest::~OnlineFileRequest() { - impl.remove(this); + if (mailbox) { + mailbox->close(); + } } -Timestamp interpolateExpiration(const Timestamp& current, - optional<Timestamp> prior, - bool& expired) { +Timestamp interpolateExpiration(const Timestamp& current, optional<Timestamp> prior, bool& expired) { auto now = util::now(); if (current > now) { return current; @@ -383,9 +437,8 @@ void OnlineFileRequest::schedule(optional<Timestamp> expires) { // If we're not being asked for a forced refresh, calculate a timeout that depends on how many // consecutive errors we've encountered, and on the expiration time, if present. - Duration timeout = std::min( - http::errorRetryTimeout(failedRequestReason, failedRequests, retryAfter), - http::expirationTimeout(expires, expiredRequests)); + Duration timeout = std::min(http::errorRetryTimeout(failedRequestReason, failedRequests, retryAfter), + http::expirationTimeout(expires, expiredRequests)); if (timeout == Duration::max()) { return; @@ -469,7 +522,7 @@ void OnlineFileRequest::networkIsReachableAgain() { } } -void OnlineFileRequest::setTransformedURL(const std::string&& url) { +void OnlineFileRequest::setTransformedURL(const std::string& url) { resource.url = url; schedule(); } @@ -484,19 +537,95 @@ ActorRef<OnlineFileRequest> OnlineFileRequest::actor() { return ActorRef<OnlineFileRequest>(*this, mailbox); } -void OnlineFileSource::setMaximumConcurrentRequests(uint32_t maximumConcurrentRequests_) { - impl->setMaximumConcurrentRequests(maximumConcurrentRequests_); +void OnlineFileRequest::onCancel(std::function<void()> callback_) { + cancelCallback = std::move(callback_); +} + +OnlineFileSource::OnlineFileSource() : impl(std::make_unique<Impl>()) {} + +OnlineFileSource::~OnlineFileSource() = default; + +std::unique_ptr<AsyncRequest> OnlineFileSource::request(const Resource& resource, Callback callback) { + Resource res = resource; + + switch (resource.kind) { + case Resource::Kind::Unknown: + case Resource::Kind::Image: + break; + + case Resource::Kind::Style: + res.url = + mbgl::util::mapbox::normalizeStyleURL(impl->getAPIBaseURL(), resource.url, impl->getAccessToken()); + break; + + case Resource::Kind::Source: + res.url = util::mapbox::normalizeSourceURL(impl->getAPIBaseURL(), resource.url, impl->getAccessToken()); + break; + + case Resource::Kind::Glyphs: + res.url = util::mapbox::normalizeGlyphsURL(impl->getAPIBaseURL(), resource.url, impl->getAccessToken()); + break; + + case Resource::Kind::SpriteImage: + case Resource::Kind::SpriteJSON: + res.url = util::mapbox::normalizeSpriteURL(impl->getAPIBaseURL(), resource.url, impl->getAccessToken()); + break; + + case Resource::Kind::Tile: + res.url = util::mapbox::normalizeTileURL(impl->getAPIBaseURL(), resource.url, impl->getAccessToken()); + break; + } + + return impl->request(std::move(callback), std::move(res)); +} + +bool OnlineFileSource::canRequest(const Resource& resource) const { + return resource.hasLoadingMethod(Resource::LoadingMethod::Network) && + resource.url.rfind(mbgl::util::ASSET_PROTOCOL, 0) == std::string::npos && + resource.url.rfind(mbgl::util::FILE_PROTOCOL, 0) == std::string::npos; } -uint32_t OnlineFileSource::getMaximumConcurrentRequests() const { - return impl->getMaximumConcurrentRequests(); +void OnlineFileSource::pause() { + impl->pause(); } +void OnlineFileSource::resume() { + impl->resume(); +} + +void OnlineFileSource::setProperty(const std::string& key, const mapbox::base::Value& value) { + if (key == ACCESS_TOKEN_KEY) { + impl->setAccessToken(value); + } else if (key == API_BASE_URL_KEY) { + impl->setAPIBaseURL(value); + } else if (key == MAX_CONCURRENT_REQUESTS_KEY) { + impl->setMaximumConcurrentRequests(value); + } else if (key == ONLINE_STATUS_KEY) { + // For testing only + if (auto* boolValue = value.getBool()) { + impl->setOnlineStatus(*boolValue); + } + } else { + std::string message = "Resource provider does not support property " + key; + Log::Error(Event::General, message.c_str()); + } +} -// For testing only: +mapbox::base::Value OnlineFileSource::getProperty(const std::string& key) const { + if (key == ACCESS_TOKEN_KEY) { + return impl->getAccessToken(); + } else if (key == API_BASE_URL_KEY) { + return impl->getAPIBaseURL(); + } else if (key == MAX_CONCURRENT_REQUESTS_KEY) { + return impl->getMaximumConcurrentRequests(); + } + std::string message = "Resource provider does not support property " + key; + Log::Error(Event::General, message.c_str()); + return {}; +} -void OnlineFileSource::setOnlineStatus(const bool status) { - impl->setOnlineStatus(status); +void OnlineFileSource::setResourceTransform(ResourceTransform transform) { + impl->setResourceTransform(std::move(transform)); } } // namespace mbgl |