From aa81738e6b6ee6f729e84eef1affbf867ceb6847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paczos?= Date: Mon, 30 Jul 2018 15:09:35 +0200 Subject: [android] synchronize and coalesce requests targeting the same TileID in the CustomGeometrySource --- .../style/sources/CustomGeometrySource.java | 194 ++++++++++++++++----- .../src/style/sources/custom_geometry_source.cpp | 19 +- .../src/style/sources/custom_geometry_source.hpp | 1 + 3 files changed, 170 insertions(+), 44 deletions(-) diff --git a/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java b/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java index 6c0b76f00e..a44d7cab1a 100644 --- a/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java +++ b/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java @@ -14,13 +14,14 @@ import com.mapbox.mapboxsdk.style.expressions.Expression; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -28,16 +29,26 @@ import java.util.concurrent.locks.ReentrantLock; /** * Custom Vector Source, allows using FeatureCollections. + *

+ * CustomGeometrySource uses a coalescing model for frequent data updates targeting the same tile id, + * which means, that the in-progress request as well as the last scheduled request are guaranteed to finish. + * Any requests scheduled meanwhile can be canceled. */ -@UiThread public class CustomGeometrySource extends Source { public static final String THREAD_PREFIX = "CustomGeom"; public static final int THREAD_POOL_LIMIT = 4; private static final AtomicInteger poolCount = new AtomicInteger(); private final Lock executorLock = new ReentrantLock(); - private ExecutorService executor; + private ThreadPoolExecutor executor; private GeometryTileProvider provider; - private final Map cancelledTileRequests = new ConcurrentHashMap<>(); + private final Map awaitingTasksMap = new HashMap<>(); + + /** + * A map containing in-progress requests targeting distinct tiles. + * A request is considered in-progress when it's started by the ThreadPoolExecutor. + * A request is marked as done when the data is passed from the JNI layer to the core, after the features conversion. + */ + private final Map inProgressTasksMap = new HashMap<>(); /** * Create a CustomGeometrySource @@ -45,6 +56,7 @@ public class CustomGeometrySource extends Source { * @param id The source id. * @param provider The tile provider that returns geometry data for this source. */ + @UiThread public CustomGeometrySource(String id, GeometryTileProvider provider) { this(id, provider, new CustomGeometrySourceOptions()); } @@ -57,6 +69,7 @@ public class CustomGeometrySource extends Source { * @param provider The tile provider that returns geometry data for this source. * @param options CustomGeometrySourceOptions. */ + @UiThread public CustomGeometrySource(String id, GeometryTileProvider provider, CustomGeometrySourceOptions options) { super(); this.provider = provider; @@ -71,7 +84,6 @@ public class CustomGeometrySource extends Source { * @param bounds The region in which features should be invalidated at all zoom levels */ public void invalidateRegion(LatLngBounds bounds) { - checkThread(); nativeInvalidateBounds(bounds); } @@ -84,7 +96,6 @@ public class CustomGeometrySource extends Source { * @param y Tile Y coordinate. */ public void invalidateTile(int zoomLevel, int x, int y) { - checkThread(); nativeInvalidateTile(zoomLevel, x, y); } @@ -99,7 +110,6 @@ public class CustomGeometrySource extends Source { * @param data Feature collection for the tile. */ public void setTileData(int zoomLevel, int x, int y, FeatureCollection data) { - checkThread(); nativeSetTileData(zoomLevel, x, y, data); } @@ -136,18 +146,40 @@ public class CustomGeometrySource extends Source { protected native void finalize() throws Throwable; private void setTileData(TileID tileId, FeatureCollection data) { - cancelledTileRequests.remove(tileId); nativeSetTileData(tileId.z, tileId.x, tileId.y, data); } + /** + * Tile data request can come from a number of different threads. + * To remove race condition for requests targeting the same tile id we are first checking if there is a request + * already enqueued, if yes, we are replacing it. + * Otherwise, we are checking if there is an in-progress request, if yes, + * we are creating or replacing an awaiting request. + * If none of the above, we are enqueueing the request. + */ @WorkerThread @Keep private void fetchTile(int z, int x, int y) { AtomicBoolean cancelFlag = new AtomicBoolean(false); TileID tileID = new TileID(z, x, y); - cancelledTileRequests.put(tileID, cancelFlag); - GeometryTileRequest request = new GeometryTileRequest(tileID, provider, this, cancelFlag); + GeometryTileRequest request = + new GeometryTileRequest(tileID, provider, awaitingTasksMap, inProgressTasksMap, this, cancelFlag); + + synchronized (awaitingTasksMap) { + synchronized (inProgressTasksMap) { + if (executor.getQueue().contains(request)) { + executor.remove(request); + executeRequest(request); + } else if (inProgressTasksMap.containsKey(tileID)) { + awaitingTasksMap.put(tileID, request); + } else { + executeRequest(request); + } + } + } + } + private void executeRequest(GeometryTileRequest request) { executorLock.lock(); try { if (executor != null && !executor.isShutdown()) { @@ -158,12 +190,34 @@ public class CustomGeometrySource extends Source { } } + /** + * We want to cancel only the oldest request, therefore, we are first checking if it's in progress, + * if not or if the currently in progress request has already been canceled, + * we are searching for any request in the executor's queue. + * Otherwise, we are removing an awaiting request targeting this tile id. + *

+ * {@link GeometryTileRequest#equals(Object)} is overridden to cover only the tile id, + * therefore, we can use an empty request to search the executor's queue. + */ @WorkerThread @Keep private void cancelTile(int z, int x, int y) { - AtomicBoolean cancelFlag = cancelledTileRequests.get(new TileID(z, x, y)); - if (cancelFlag != null) { - cancelFlag.compareAndSet(false, true); + TileID tileID = new TileID(z, x, y); + + synchronized (awaitingTasksMap) { + synchronized (inProgressTasksMap) { + AtomicBoolean cancelFlag = inProgressTasksMap.get(tileID); + // check if there is an in progress task + if (!(cancelFlag != null && cancelFlag.compareAndSet(false, true))) { + // if there is no tasks in progress or the in progress task was already cancelled, check the executor's queue + GeometryTileRequest emptyRequest = + new GeometryTileRequest(tileID, null, null, null, null, null); + if (!executor.getQueue().remove(emptyRequest)) { + // if there was no tasks in queue, remove from the awaiting map + awaitingTasksMap.remove(tileID); + } + } + } } } @@ -175,17 +229,19 @@ public class CustomGeometrySource extends Source { executor.shutdownNow(); } - executor = Executors.newFixedThreadPool(THREAD_POOL_LIMIT, new ThreadFactory() { - final AtomicInteger threadCount = new AtomicInteger(); - final int poolId = poolCount.getAndIncrement(); - - @Override - public Thread newThread(@NonNull Runnable runnable) { - return new Thread( - runnable, - String.format(Locale.US, "%s-%d-%d", THREAD_PREFIX, poolId, threadCount.getAndIncrement())); - } - }); + executor = new ThreadPoolExecutor(THREAD_POOL_LIMIT, THREAD_POOL_LIMIT, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new ThreadFactory() { + final AtomicInteger threadCount = new AtomicInteger(); + final int poolId = poolCount.getAndIncrement(); + + @Override + public Thread newThread(@NonNull Runnable runnable) { + return new Thread( + runnable, + String.format(Locale.US, "%s-%d-%d", THREAD_PREFIX, poolId, threadCount.getAndIncrement())); + } + }); } finally { executorLock.unlock(); } @@ -201,12 +257,17 @@ public class CustomGeometrySource extends Source { } } - private static class TileID { + @Keep + private boolean isCancelled(int z, int x, int y) { + return inProgressTasksMap.get(new TileID(z, x, y)).get(); + } + + static class TileID { public int z; public int x; public int y; - public TileID(int _z, int _x, int _y) { + TileID(int _z, int _x, int _y) { z = _z; x = _x; y = _y; @@ -233,34 +294,83 @@ public class CustomGeometrySource extends Source { } } - private static class GeometryTileRequest implements Runnable { - private TileID id; - private GeometryTileProvider provider; - private WeakReference sourceRef; - private AtomicBoolean cancelled; - - public GeometryTileRequest(TileID _id, GeometryTileProvider p, - CustomGeometrySource _source, AtomicBoolean _cancelled) { + static class GeometryTileRequest implements Runnable { + private final TileID id; + private final GeometryTileProvider provider; + private final Map awaiting; + private final Map inProgress; + private final WeakReference sourceRef; + private final AtomicBoolean cancelled; + + GeometryTileRequest(TileID _id, GeometryTileProvider p, + Map awaiting, + Map m, + CustomGeometrySource _source, AtomicBoolean _cancelled) { id = _id; provider = p; + this.awaiting = awaiting; + inProgress = m; sourceRef = new WeakReference<>(_source); cancelled = _cancelled; } public void run() { - if (isCancelled()) { - return; + synchronized (awaiting) { + synchronized (inProgress) { + if (inProgress.containsKey(id)) { + // request targeting this tile id is already being processed, + // scenario that should occur only if the tile is being requested when + // another request is switching threads to execute + if (!awaiting.containsKey(id)) { + awaiting.put(id, this); + } + return; + } else { + inProgress.put(id, cancelled); + } + } + } + + if (!isCancelled()) { + FeatureCollection data = provider.getFeaturesForBounds(LatLngBounds.from(id.z, id.x, id.y), id.z); + CustomGeometrySource source = sourceRef.get(); + if (!isCancelled() && source != null && data != null) { + source.setTileData(id, data); + } } - FeatureCollection data = provider.getFeaturesForBounds(LatLngBounds.from(id.z, id.x, id.y), id.z); - CustomGeometrySource source = sourceRef.get(); - if (!isCancelled() && source != null && data != null) { - source.setTileData(id, data); + synchronized (awaiting) { + synchronized (inProgress) { + inProgress.remove(id); + + // executing the next request targeting the same tile + if (awaiting.containsKey(id)) { + GeometryTileRequest queuedRequest = awaiting.get(id); + CustomGeometrySource source = sourceRef.get(); + if (source != null && queuedRequest != null) { + source.executor.execute(queuedRequest); + } + + awaiting.remove(id); + } + } } } private Boolean isCancelled() { return cancelled.get(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GeometryTileRequest request = (GeometryTileRequest) o; + return id.equals(request.id); + } } } diff --git a/platform/android/src/style/sources/custom_geometry_source.cpp b/platform/android/src/style/sources/custom_geometry_source.cpp index 9c51f70ab5..e9a530f75b 100644 --- a/platform/android/src/style/sources/custom_geometry_source.cpp +++ b/platform/android/src/style/sources/custom_geometry_source.cpp @@ -102,6 +102,19 @@ namespace android { peer.Call(*_env, releaseThreads); }; + bool CustomGeometrySource::isCancelled(jni::jint z, + jni::jint x, + jni::jint y) { + android::UniqueEnv _env = android::AttachEnv(); + + static auto isCancelled = javaClass.GetMethod(*_env, "isCancelled"); + + assert(javaPeer); + + auto peer = jni::Cast(*_env, *javaPeer, javaClass); + return peer.Call(*_env, isCancelled, z, x, y); + }; + void CustomGeometrySource::setTileData(jni::JNIEnv& env, jni::jint z, jni::jint x, @@ -112,8 +125,10 @@ namespace android { // Convert the jni object auto geometry = geojson::FeatureCollection::convert(env, jFeatures); - // Update the core source - source.as()->CustomGeometrySource::setTileData(CanonicalTileID(z, x, y), GeoJSON(geometry)); + // Update the core source if not cancelled + if (!isCancelled(z, x ,y)) { + source.as()->CustomGeometrySource::setTileData(CanonicalTileID(z, x, y), GeoJSON(geometry)); + } } void CustomGeometrySource::invalidateTile(jni::JNIEnv&, jni::jint z, jni::jint x, jni::jint y) { diff --git a/platform/android/src/style/sources/custom_geometry_source.hpp b/platform/android/src/style/sources/custom_geometry_source.hpp index c38926a5b9..801f81b089 100644 --- a/platform/android/src/style/sources/custom_geometry_source.hpp +++ b/platform/android/src/style/sources/custom_geometry_source.hpp @@ -33,6 +33,7 @@ public: void fetchTile(const mbgl::CanonicalTileID& tileID); void cancelTile(const mbgl::CanonicalTileID& tileID); + bool isCancelled(jni::jint z, jni::jint x, jni::jint y); void startThreads(); void releaseThreads(); void setTileData(jni::JNIEnv& env, jni::jint z, jni::jint x, jni::jint y, jni::Object jf); -- cgit v1.2.1