diff options
Diffstat (limited to 'platform/default/online_file_source.cpp')
-rw-r--r-- | platform/default/online_file_source.cpp | 320 |
1 files changed, 163 insertions, 157 deletions
diff --git a/platform/default/online_file_source.cpp b/platform/default/online_file_source.cpp index 988147a373..367777d623 100644 --- a/platform/default/online_file_source.cpp +++ b/platform/default/online_file_source.cpp @@ -1,7 +1,6 @@ #include <mbgl/storage/online_file_source.hpp> #include <mbgl/storage/http_context_base.hpp> #include <mbgl/storage/network_status.hpp> -#include <mbgl/storage/sqlite_cache.hpp> #include <mbgl/storage/response.hpp> #include <mbgl/platform/log.hpp> @@ -16,42 +15,27 @@ #include <algorithm> #include <cassert> -#include <set> +#include <list> +#include <unordered_set> #include <unordered_map> namespace mbgl { -class OnlineFileRequest : public FileRequest { -public: - OnlineFileRequest(OnlineFileSource& fileSource_) - : fileSource(fileSource_) { - } - - ~OnlineFileRequest() { - fileSource.cancel(this); - } - - OnlineFileSource& fileSource; - std::unique_ptr<WorkRequest> workRequest; -}; - class OnlineFileRequestImpl : public util::noncopyable { public: using Callback = std::function<void (Response)>; - OnlineFileRequestImpl(const Resource&, Callback, OnlineFileSource::Impl&); + OnlineFileRequestImpl(FileRequest*, const Resource&, Callback, OnlineFileSource::Impl&); ~OnlineFileRequestImpl(); void networkIsReachableAgain(OnlineFileSource::Impl&); + void schedule(OnlineFileSource::Impl&, bool forceImmediate = false); + void completed(OnlineFileSource::Impl&, Response); -private: - void scheduleCacheRequest(OnlineFileSource::Impl&); - void scheduleRealRequest(OnlineFileSource::Impl&, bool forceImmediate = false); - + FileRequest* key; Resource resource; - std::unique_ptr<WorkRequest> cacheRequest; - HTTPRequestBase* realRequest = nullptr; - util::Timer realRequestTimer; + HTTPRequestBase* request = nullptr; + util::Timer timer; Callback callback; // Counts the number of subsequent failed requests. We're using this value for exponential @@ -62,140 +46,167 @@ private: class OnlineFileSource::Impl { public: - using Callback = std::function<void (Response)>; + // Dummy parameter is a workaround for a gcc 4.9 bug. + Impl(int) { + NetworkStatus::Subscribe(&reachability); + } + + ~Impl() { + NetworkStatus::Unsubscribe(&reachability); + } + + void request(FileRequest* key, Resource resource, Callback callback) { + allRequests[key] = std::make_unique<OnlineFileRequestImpl>(key, resource, callback, *this); + } - Impl(SQLiteCache*); - ~Impl(); + void cancel(FileRequest* key) { + allRequests.erase(key); + if (activeRequests.erase(key)) { + activatePendingRequest(); + } else { + auto it = pendingRequestsMap.find(key); + if (it != pendingRequestsMap.end()) { + pendingRequestsList.erase(it->second); + pendingRequestsMap.erase(it); + } + } + } - void networkIsReachableAgain(); + void activateOrQueueRequest(OnlineFileRequestImpl* impl) { + assert(allRequests.find(impl->key) != allRequests.end()); + assert(activeRequests.find(impl->key) == activeRequests.end()); + assert(!impl->request); - void add(Resource, FileRequest*, Callback); - void cancel(FileRequest*); + if (activeRequests.size() >= HTTPContextBase::maximumConcurrentRequests()) { + queueRequest(impl); + } else { + activateRequest(impl); + } + } + + void queueRequest(OnlineFileRequestImpl* impl) { + auto it = pendingRequestsList.insert(pendingRequestsList.end(), impl->key); + pendingRequestsMap.emplace(impl->key, std::move(it)); + } + + void activateRequest(OnlineFileRequestImpl* impl) { + activeRequests.insert(impl->key); + impl->request = httpContext->createRequest(impl->resource, [=] (Response response) { + impl->request = nullptr; + activeRequests.erase(impl->key); + activatePendingRequest(); + impl->completed(*this, response); + }); + } + + void activatePendingRequest() { + if (pendingRequestsList.empty()) { + return; + } + + FileRequest* key = pendingRequestsList.front(); + pendingRequestsList.pop_front(); + + pendingRequestsMap.erase(key); + + auto it = allRequests.find(key); + assert(it != allRequests.end()); + activateRequest(it->second.get()); + } private: - friend OnlineFileRequestImpl; + void networkIsReachableAgain() { + for (auto& req : allRequests) { + req.second->networkIsReachableAgain(*this); + } + } - std::unordered_map<FileRequest*, std::unique_ptr<OnlineFileRequestImpl>> pending; - SQLiteCache* const cache; - const std::unique_ptr<HTTPContextBase> httpContext; - util::AsyncTask reachability; + /** + * The lifetime of a request is: + * + * 1. Waiting for timeout (revalidation or retry) + * 2. Pending (waiting for room in the active set) + * 3. Active (open network connection) + * 4. Back to #1 + * + * Requests in any state are in `allRequests`. Requests in the pending state are in + * `pendingRequests`. Requests in the active state are in `activeRequests`. + */ + std::unordered_map<FileRequest*, std::unique_ptr<OnlineFileRequestImpl>> allRequests; + std::list<FileRequest*> pendingRequestsList; + std::unordered_map<FileRequest*, std::list<FileRequest*>::iterator> pendingRequestsMap; + std::unordered_set<FileRequest*> activeRequests; + + const std::unique_ptr<HTTPContextBase> httpContext { HTTPContextBase::createContext() }; + util::AsyncTask reachability { std::bind(&Impl::networkIsReachableAgain, this) }; }; -OnlineFileSource::OnlineFileSource(SQLiteCache* cache) +OnlineFileSource::OnlineFileSource() : thread(std::make_unique<util::Thread<Impl>>( - util::ThreadContext{ "OnlineFileSource", util::ThreadType::Unknown, util::ThreadPriority::Low }, - cache)) { + util::ThreadContext{ "OnlineFileSource", util::ThreadType::Unknown, util::ThreadPriority::Low }, 0)) { } OnlineFileSource::~OnlineFileSource() = default; std::unique_ptr<FileRequest> OnlineFileSource::request(const Resource& resource, Callback callback) { - if (!callback) { - throw util::MisuseException("FileSource callback can't be empty"); - } - - std::string url; + Resource res = resource; switch (resource.kind) { + case Resource::Kind::Unknown: + break; + case Resource::Kind::Style: - url = mbgl::util::mapbox::normalizeStyleURL(resource.url, accessToken); + res.url = mbgl::util::mapbox::normalizeStyleURL(resource.url, accessToken); break; case Resource::Kind::Source: - url = util::mapbox::normalizeSourceURL(resource.url, accessToken); + res.url = util::mapbox::normalizeSourceURL(resource.url, accessToken); break; case Resource::Kind::Glyphs: - url = util::mapbox::normalizeGlyphsURL(resource.url, accessToken); + res.url = util::mapbox::normalizeGlyphsURL(resource.url, accessToken); break; case Resource::Kind::SpriteImage: case Resource::Kind::SpriteJSON: - url = util::mapbox::normalizeSpriteURL(resource.url, accessToken); + res.url = util::mapbox::normalizeSpriteURL(resource.url, accessToken); break; - default: - url = resource.url; + case Resource::Kind::Tile: + res.url = util::mapbox::normalizeTileURL(resource.url, accessToken); + break; } - Resource res { resource.kind, url }; - auto req = std::make_unique<OnlineFileRequest>(*this); - req->workRequest = thread->invokeWithCallback(&Impl::add, callback, res, req.get()); - return std::move(req); -} - -void OnlineFileSource::cancel(FileRequest* req) { - thread->invoke(&Impl::cancel, req); -} - -// ----- Impl ----- - -OnlineFileSource::Impl::Impl(SQLiteCache* cache_) - : cache(cache_), - httpContext(HTTPContextBase::createContext()), - reachability(std::bind(&Impl::networkIsReachableAgain, this)) { - // Subscribe to network status changes, but make sure that this async handle doesn't keep the - // loop alive; otherwise our app wouldn't terminate. After all, we only need status change - // notifications when our app is still running. - NetworkStatus::Subscribe(&reachability); -} - -OnlineFileSource::Impl::~Impl() { - NetworkStatus::Unsubscribe(&reachability); -} + class OnlineFileRequest : public FileRequest { + public: + OnlineFileRequest(Resource resource_, FileSource::Callback callback_, util::Thread<OnlineFileSource::Impl>& thread_) + : thread(thread_), + workRequest(thread.invokeWithCallback(&OnlineFileSource::Impl::request, callback_, this, resource_)) { + } -void OnlineFileSource::Impl::networkIsReachableAgain() { - for (auto& req : pending) { - req.second->networkIsReachableAgain(*this); - } -} + ~OnlineFileRequest() { + thread.invoke(&OnlineFileSource::Impl::cancel, this); + } -void OnlineFileSource::Impl::add(Resource resource, FileRequest* req, Callback callback) { - pending[req] = std::make_unique<OnlineFileRequestImpl>(resource, callback, *this); -} + util::Thread<OnlineFileSource::Impl>& thread; + std::unique_ptr<WorkRequest> workRequest; + }; -void OnlineFileSource::Impl::cancel(FileRequest* req) { - pending.erase(req); + return std::make_unique<OnlineFileRequest>(res, callback, *thread); } -// ----- OnlineFileRequest ----- - -OnlineFileRequestImpl::OnlineFileRequestImpl(const Resource& resource_, Callback callback_, OnlineFileSource::Impl& impl) - : resource(resource_), - callback(callback_) { - if (impl.cache) { - scheduleCacheRequest(impl); - } else { - scheduleRealRequest(impl, true); - } +OnlineFileRequestImpl::OnlineFileRequestImpl(FileRequest* key_, const Resource& resource_, Callback callback_, OnlineFileSource::Impl& impl) + : key(key_), + resource(resource_), + callback(std::move(callback_)) { + // Force an immediate first request if we don't have an expiration time. + schedule(impl, !resource.priorExpires); } OnlineFileRequestImpl::~OnlineFileRequestImpl() { - if (realRequest) { - realRequest->cancel(); - realRequest = nullptr; + if (request) { + request->cancel(); } - // realRequestTimer and cacheRequest are automatically canceled upon destruction. -} - -void OnlineFileRequestImpl::scheduleCacheRequest(OnlineFileSource::Impl& impl) { - // Check the cache for existing data so that we can potentially - // revalidate the information without having to redownload everything. - cacheRequest = impl.cache->get(resource, [this, &impl](std::shared_ptr<Response> response) { - cacheRequest = nullptr; - - if (response) { - resource.priorModified = response->modified; - resource.priorExpires = response->expires; - resource.priorEtag = response->etag; - callback(*response); - } - - // Force immediate revalidation if we don't have a cached response, or the cached - // response does not have an expiration time. Otherwise revalidation will happen in - // the normal scheduling flow. - scheduleRealRequest(impl, !response || !response->expires); - }); } static Duration errorRetryTimeout(Response::Error::Reason failedRequestReason, uint32_t failedRequests) { @@ -220,8 +231,8 @@ static Duration expirationTimeout(optional<SystemTimePoint> expires) { } } -void OnlineFileRequestImpl::scheduleRealRequest(OnlineFileSource::Impl& impl, bool forceImmediate) { - if (realRequest) { +void OnlineFileRequestImpl::schedule(OnlineFileSource::Impl& impl, bool forceImmediate) { + if (request) { // There's already a request in progress; don't start another one. return; } @@ -237,55 +248,50 @@ void OnlineFileRequestImpl::scheduleRealRequest(OnlineFileSource::Impl& impl, bo return; } - realRequestTimer.start(timeout, Duration::zero(), [this, &impl] { - assert(!realRequest); - realRequest = impl.httpContext->createRequest(resource, [this, &impl](Response response) { - realRequest = nullptr; - - // If we didn't get various caching headers in the response, continue using the - // previous values. Otherwise, update the previous values to the new values. + timer.start(timeout, Duration::zero(), [&] { + impl.activateOrQueueRequest(this); + }); +} - if (!response.modified) { - response.modified = resource.priorModified; - } else { - resource.priorModified = response.modified; - } +void OnlineFileRequestImpl::completed(OnlineFileSource::Impl& impl, Response response) { + // If we didn't get various caching headers in the response, continue using the + // previous values. Otherwise, update the previous values to the new values. - if (!response.expires) { - response.expires = resource.priorExpires; - } else { - resource.priorExpires = response.expires; - } + if (!response.modified) { + response.modified = resource.priorModified; + } else { + resource.priorModified = response.modified; + } - if (!response.etag) { - response.etag = resource.priorEtag; - } else { - resource.priorEtag = response.etag; - } + if (!response.expires) { + response.expires = resource.priorExpires; + } else { + resource.priorExpires = response.expires; + } - if (impl.cache) { - impl.cache->put(resource, response); - } + if (!response.etag) { + response.etag = resource.priorEtag; + } else { + resource.priorEtag = response.etag; + } - if (response.error) { - failedRequests++; - failedRequestReason = response.error->reason; - } else { - failedRequests = 0; - failedRequestReason = Response::Error::Reason::Success; - } + if (response.error) { + failedRequests++; + failedRequestReason = response.error->reason; + } else { + failedRequests = 0; + failedRequestReason = Response::Error::Reason::Success; + } - callback(response); - scheduleRealRequest(impl); - }); - }); + callback(response); + schedule(impl); } void OnlineFileRequestImpl::networkIsReachableAgain(OnlineFileSource::Impl& impl) { // We need all requests to fail at least once before we are going to start retrying // them, and we only immediately restart request that failed due to connection issues. if (failedRequestReason == Response::Error::Reason::Connection) { - scheduleRealRequest(impl, true); + schedule(impl, true); } } |