summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorŁukasz Paczos <lukas.paczos@gmail.com>2018-07-30 15:09:35 +0200
committerŁukasz Paczos <lukas.paczos@gmail.com>2018-08-23 14:05:33 +0200
commitaa81738e6b6ee6f729e84eef1affbf867ceb6847 (patch)
treea0af6e9ab7dfdd9e44727ada4ae2bfb18b5bf1fc
parentcb1f781a1897826ea69eb9787ea4a059686450d9 (diff)
downloadqtlocation-mapboxgl-upstream/12508-custom-geomerty-source-race.tar.gz
[android] synchronize and coalesce requests targeting the same TileID in the CustomGeometrySourceupstream/12508-custom-geomerty-source-race
-rw-r--r--platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java194
-rw-r--r--platform/android/src/style/sources/custom_geometry_source.cpp19
-rw-r--r--platform/android/src/style/sources/custom_geometry_source.hpp1
3 files changed, 170 insertions, 44 deletions
diff --git a/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java b/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java
index 6c0b76f00e..a44d7cab1a 100644
--- a/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java
+++ b/platform/android/MapboxGLAndroidSDK/src/main/java/com/mapbox/mapboxsdk/style/sources/CustomGeometrySource.java
@@ -14,13 +14,14 @@ import com.mapbox.mapboxsdk.style.expressions.Expression;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -28,16 +29,26 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* Custom Vector Source, allows using FeatureCollections.
+ * <p>
+ * CustomGeometrySource uses a coalescing model for frequent data updates targeting the same tile id,
+ * which means, that the in-progress request as well as the last scheduled request are guaranteed to finish.
+ * Any requests scheduled meanwhile can be canceled.
*/
-@UiThread
public class CustomGeometrySource extends Source {
public static final String THREAD_PREFIX = "CustomGeom";
public static final int THREAD_POOL_LIMIT = 4;
private static final AtomicInteger poolCount = new AtomicInteger();
private final Lock executorLock = new ReentrantLock();
- private ExecutorService executor;
+ private ThreadPoolExecutor executor;
private GeometryTileProvider provider;
- private final Map<TileID, AtomicBoolean> cancelledTileRequests = new ConcurrentHashMap<>();
+ private final Map<TileID, GeometryTileRequest> awaitingTasksMap = new HashMap<>();
+
+ /**
+ * A map containing in-progress requests targeting distinct tiles.
+ * A request is considered in-progress when it's started by the ThreadPoolExecutor.
+ * A request is marked as done when the data is passed from the JNI layer to the core, after the features conversion.
+ */
+ private final Map<TileID, AtomicBoolean> inProgressTasksMap = new HashMap<>();
/**
* Create a CustomGeometrySource
@@ -45,6 +56,7 @@ public class CustomGeometrySource extends Source {
* @param id The source id.
* @param provider The tile provider that returns geometry data for this source.
*/
+ @UiThread
public CustomGeometrySource(String id, GeometryTileProvider provider) {
this(id, provider, new CustomGeometrySourceOptions());
}
@@ -57,6 +69,7 @@ public class CustomGeometrySource extends Source {
* @param provider The tile provider that returns geometry data for this source.
* @param options CustomGeometrySourceOptions.
*/
+ @UiThread
public CustomGeometrySource(String id, GeometryTileProvider provider, CustomGeometrySourceOptions options) {
super();
this.provider = provider;
@@ -71,7 +84,6 @@ public class CustomGeometrySource extends Source {
* @param bounds The region in which features should be invalidated at all zoom levels
*/
public void invalidateRegion(LatLngBounds bounds) {
- checkThread();
nativeInvalidateBounds(bounds);
}
@@ -84,7 +96,6 @@ public class CustomGeometrySource extends Source {
* @param y Tile Y coordinate.
*/
public void invalidateTile(int zoomLevel, int x, int y) {
- checkThread();
nativeInvalidateTile(zoomLevel, x, y);
}
@@ -99,7 +110,6 @@ public class CustomGeometrySource extends Source {
* @param data Feature collection for the tile.
*/
public void setTileData(int zoomLevel, int x, int y, FeatureCollection data) {
- checkThread();
nativeSetTileData(zoomLevel, x, y, data);
}
@@ -136,18 +146,40 @@ public class CustomGeometrySource extends Source {
protected native void finalize() throws Throwable;
private void setTileData(TileID tileId, FeatureCollection data) {
- cancelledTileRequests.remove(tileId);
nativeSetTileData(tileId.z, tileId.x, tileId.y, data);
}
+ /**
+ * Tile data request can come from a number of different threads.
+ * To remove race condition for requests targeting the same tile id we are first checking if there is a request
+ * already enqueued, if yes, we are replacing it.
+ * Otherwise, we are checking if there is an in-progress request, if yes,
+ * we are creating or replacing an awaiting request.
+ * If none of the above, we are enqueueing the request.
+ */
@WorkerThread
@Keep
private void fetchTile(int z, int x, int y) {
AtomicBoolean cancelFlag = new AtomicBoolean(false);
TileID tileID = new TileID(z, x, y);
- cancelledTileRequests.put(tileID, cancelFlag);
- GeometryTileRequest request = new GeometryTileRequest(tileID, provider, this, cancelFlag);
+ GeometryTileRequest request =
+ new GeometryTileRequest(tileID, provider, awaitingTasksMap, inProgressTasksMap, this, cancelFlag);
+
+ synchronized (awaitingTasksMap) {
+ synchronized (inProgressTasksMap) {
+ if (executor.getQueue().contains(request)) {
+ executor.remove(request);
+ executeRequest(request);
+ } else if (inProgressTasksMap.containsKey(tileID)) {
+ awaitingTasksMap.put(tileID, request);
+ } else {
+ executeRequest(request);
+ }
+ }
+ }
+ }
+ private void executeRequest(GeometryTileRequest request) {
executorLock.lock();
try {
if (executor != null && !executor.isShutdown()) {
@@ -158,12 +190,34 @@ public class CustomGeometrySource extends Source {
}
}
+ /**
+ * We want to cancel only the oldest request, therefore, we are first checking if it's in progress,
+ * if not or if the currently in progress request has already been canceled,
+ * we are searching for any request in the executor's queue.
+ * Otherwise, we are removing an awaiting request targeting this tile id.
+ * <p>
+ * {@link GeometryTileRequest#equals(Object)} is overridden to cover only the tile id,
+ * therefore, we can use an empty request to search the executor's queue.
+ */
@WorkerThread
@Keep
private void cancelTile(int z, int x, int y) {
- AtomicBoolean cancelFlag = cancelledTileRequests.get(new TileID(z, x, y));
- if (cancelFlag != null) {
- cancelFlag.compareAndSet(false, true);
+ TileID tileID = new TileID(z, x, y);
+
+ synchronized (awaitingTasksMap) {
+ synchronized (inProgressTasksMap) {
+ AtomicBoolean cancelFlag = inProgressTasksMap.get(tileID);
+ // check if there is an in progress task
+ if (!(cancelFlag != null && cancelFlag.compareAndSet(false, true))) {
+ // if there is no tasks in progress or the in progress task was already cancelled, check the executor's queue
+ GeometryTileRequest emptyRequest =
+ new GeometryTileRequest(tileID, null, null, null, null, null);
+ if (!executor.getQueue().remove(emptyRequest)) {
+ // if there was no tasks in queue, remove from the awaiting map
+ awaitingTasksMap.remove(tileID);
+ }
+ }
+ }
}
}
@@ -175,17 +229,19 @@ public class CustomGeometrySource extends Source {
executor.shutdownNow();
}
- executor = Executors.newFixedThreadPool(THREAD_POOL_LIMIT, new ThreadFactory() {
- final AtomicInteger threadCount = new AtomicInteger();
- final int poolId = poolCount.getAndIncrement();
-
- @Override
- public Thread newThread(@NonNull Runnable runnable) {
- return new Thread(
- runnable,
- String.format(Locale.US, "%s-%d-%d", THREAD_PREFIX, poolId, threadCount.getAndIncrement()));
- }
- });
+ executor = new ThreadPoolExecutor(THREAD_POOL_LIMIT, THREAD_POOL_LIMIT,
+ 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
+ new ThreadFactory() {
+ final AtomicInteger threadCount = new AtomicInteger();
+ final int poolId = poolCount.getAndIncrement();
+
+ @Override
+ public Thread newThread(@NonNull Runnable runnable) {
+ return new Thread(
+ runnable,
+ String.format(Locale.US, "%s-%d-%d", THREAD_PREFIX, poolId, threadCount.getAndIncrement()));
+ }
+ });
} finally {
executorLock.unlock();
}
@@ -201,12 +257,17 @@ public class CustomGeometrySource extends Source {
}
}
- private static class TileID {
+ @Keep
+ private boolean isCancelled(int z, int x, int y) {
+ return inProgressTasksMap.get(new TileID(z, x, y)).get();
+ }
+
+ static class TileID {
public int z;
public int x;
public int y;
- public TileID(int _z, int _x, int _y) {
+ TileID(int _z, int _x, int _y) {
z = _z;
x = _x;
y = _y;
@@ -233,34 +294,83 @@ public class CustomGeometrySource extends Source {
}
}
- private static class GeometryTileRequest implements Runnable {
- private TileID id;
- private GeometryTileProvider provider;
- private WeakReference<CustomGeometrySource> sourceRef;
- private AtomicBoolean cancelled;
-
- public GeometryTileRequest(TileID _id, GeometryTileProvider p,
- CustomGeometrySource _source, AtomicBoolean _cancelled) {
+ static class GeometryTileRequest implements Runnable {
+ private final TileID id;
+ private final GeometryTileProvider provider;
+ private final Map<TileID, GeometryTileRequest> awaiting;
+ private final Map<TileID, AtomicBoolean> inProgress;
+ private final WeakReference<CustomGeometrySource> sourceRef;
+ private final AtomicBoolean cancelled;
+
+ GeometryTileRequest(TileID _id, GeometryTileProvider p,
+ Map<TileID, GeometryTileRequest> awaiting,
+ Map<TileID, AtomicBoolean> m,
+ CustomGeometrySource _source, AtomicBoolean _cancelled) {
id = _id;
provider = p;
+ this.awaiting = awaiting;
+ inProgress = m;
sourceRef = new WeakReference<>(_source);
cancelled = _cancelled;
}
public void run() {
- if (isCancelled()) {
- return;
+ synchronized (awaiting) {
+ synchronized (inProgress) {
+ if (inProgress.containsKey(id)) {
+ // request targeting this tile id is already being processed,
+ // scenario that should occur only if the tile is being requested when
+ // another request is switching threads to execute
+ if (!awaiting.containsKey(id)) {
+ awaiting.put(id, this);
+ }
+ return;
+ } else {
+ inProgress.put(id, cancelled);
+ }
+ }
+ }
+
+ if (!isCancelled()) {
+ FeatureCollection data = provider.getFeaturesForBounds(LatLngBounds.from(id.z, id.x, id.y), id.z);
+ CustomGeometrySource source = sourceRef.get();
+ if (!isCancelled() && source != null && data != null) {
+ source.setTileData(id, data);
+ }
}
- FeatureCollection data = provider.getFeaturesForBounds(LatLngBounds.from(id.z, id.x, id.y), id.z);
- CustomGeometrySource source = sourceRef.get();
- if (!isCancelled() && source != null && data != null) {
- source.setTileData(id, data);
+ synchronized (awaiting) {
+ synchronized (inProgress) {
+ inProgress.remove(id);
+
+ // executing the next request targeting the same tile
+ if (awaiting.containsKey(id)) {
+ GeometryTileRequest queuedRequest = awaiting.get(id);
+ CustomGeometrySource source = sourceRef.get();
+ if (source != null && queuedRequest != null) {
+ source.executor.execute(queuedRequest);
+ }
+
+ awaiting.remove(id);
+ }
+ }
}
}
private Boolean isCancelled() {
return cancelled.get();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GeometryTileRequest request = (GeometryTileRequest) o;
+ return id.equals(request.id);
+ }
}
}
diff --git a/platform/android/src/style/sources/custom_geometry_source.cpp b/platform/android/src/style/sources/custom_geometry_source.cpp
index 9c51f70ab5..e9a530f75b 100644
--- a/platform/android/src/style/sources/custom_geometry_source.cpp
+++ b/platform/android/src/style/sources/custom_geometry_source.cpp
@@ -102,6 +102,19 @@ namespace android {
peer.Call(*_env, releaseThreads);
};
+ bool CustomGeometrySource::isCancelled(jni::jint z,
+ jni::jint x,
+ jni::jint y) {
+ android::UniqueEnv _env = android::AttachEnv();
+
+ static auto isCancelled = javaClass.GetMethod<jboolean (jni::jint, jni::jint, jni::jint)>(*_env, "isCancelled");
+
+ assert(javaPeer);
+
+ auto peer = jni::Cast(*_env, *javaPeer, javaClass);
+ return peer.Call(*_env, isCancelled, z, x, y);
+ };
+
void CustomGeometrySource::setTileData(jni::JNIEnv& env,
jni::jint z,
jni::jint x,
@@ -112,8 +125,10 @@ namespace android {
// Convert the jni object
auto geometry = geojson::FeatureCollection::convert(env, jFeatures);
- // Update the core source
- source.as<mbgl::style::CustomGeometrySource>()->CustomGeometrySource::setTileData(CanonicalTileID(z, x, y), GeoJSON(geometry));
+ // Update the core source if not cancelled
+ if (!isCancelled(z, x ,y)) {
+ source.as<mbgl::style::CustomGeometrySource>()->CustomGeometrySource::setTileData(CanonicalTileID(z, x, y), GeoJSON(geometry));
+ }
}
void CustomGeometrySource::invalidateTile(jni::JNIEnv&, jni::jint z, jni::jint x, jni::jint y) {
diff --git a/platform/android/src/style/sources/custom_geometry_source.hpp b/platform/android/src/style/sources/custom_geometry_source.hpp
index c38926a5b9..801f81b089 100644
--- a/platform/android/src/style/sources/custom_geometry_source.hpp
+++ b/platform/android/src/style/sources/custom_geometry_source.hpp
@@ -33,6 +33,7 @@ public:
void fetchTile(const mbgl::CanonicalTileID& tileID);
void cancelTile(const mbgl::CanonicalTileID& tileID);
+ bool isCancelled(jni::jint z, jni::jint x, jni::jint y);
void startThreads();
void releaseThreads();
void setTileData(jni::JNIEnv& env, jni::jint z, jni::jint x, jni::jint y, jni::Object<geojson::FeatureCollection> jf);