summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnand Thakker <anandthakker@users.noreply.github.com>2018-07-03 17:17:39 -0400
committerGitHub <noreply@github.com>2018-07-03 17:17:39 -0400
commitcfd436c287f4209d0d994042452ccbb552a6bd28 (patch)
tree6811590928d7ea19db8e8b3f9db8d1df54ba9965
parent840a5cf1207ed78df3302211a23d369dd3c12b89 (diff)
downloadqtlocation-mapboxgl-cfd436c287f4209d0d994042452ccbb552a6bd28.tar.gz
[core] Avoid blocking in Thread<Object> constructor (#12151)
* Introduce AspiringActor, EstablishedActor This pair of objects represents the two-phase (parent-thread / child-thread) construction that's needed to support constructing Thread<Object> without blocking until the child thread is up and running. An `AspiringActor<O>` is responsible for: - ownership of the actor's `Mailbox` - allocating the memory for (but *not* constructing) the target object `O` Using these two pieces--the mailbox and a stable address for `O`--an `AspiringActor<O>` can accept messages for the target object, or provide `ActorRef<O>`s that do so, before the object has actually been constructed by the corresponding `EstablishedActor<O>`. (Such messages are queued in the mailbox until after the object is constructed.) This allows for an `AspiringActor<O>` to be created and safely used by a thread other than the one on which the target object will (eventually) live. An `EstablishedActor<O>` is responsible for managing the lifetime of the target object `O` and the open/closed state of the parent's `mailbox`. The `O` object's lifetime is contained by that of its owning `EstablishedActor<O>`: the `EstablishedActor` constructor executes the `O` constructor via "placement new", constructing it at the address provided by the parent `AspiringActor`, and the `~EstablishedActor` destructor similarly executes the `~O` destructor (after closing the mailbox). `EstablishedActor` should therefore live entirely on the thread intended to own `O`. * Remove Actor#{invoke,ask}
-rw-r--r--cmake/core-files.cmake2
-rw-r--r--include/mbgl/actor/actor.hpp61
-rw-r--r--include/mbgl/actor/aspiring_actor.hpp60
-rw-r--r--include/mbgl/actor/established_actor.hpp80
-rw-r--r--include/mbgl/actor/mailbox.hpp21
-rw-r--r--include/mbgl/actor/scheduler.hpp5
-rw-r--r--include/mbgl/util/thread.hpp55
-rw-r--r--platform/android/src/snapshotter/map_snapshotter.cpp6
-rw-r--r--platform/android/src/snapshotter/map_snapshotter.hpp2
-rw-r--r--platform/darwin/src/MGLMapSnapshotter.mm2
-rw-r--r--platform/default/default_file_source.cpp11
-rw-r--r--platform/default/mbgl/map/map_snapshotter.cpp20
-rw-r--r--platform/default/mbgl/map/map_snapshotter.hpp4
-rw-r--r--src/mbgl/actor/mailbox.cpp35
-rw-r--r--src/mbgl/sprite/sprite_loader.cpp2
-rw-r--r--src/mbgl/style/sources/custom_geometry_source.cpp6
-rw-r--r--src/mbgl/tile/geometry_tile.cpp10
-rw-r--r--src/mbgl/tile/raster_dem_tile.cpp2
-rw-r--r--src/mbgl/tile/raster_tile.cpp2
-rw-r--r--test/actor/actor.test.cpp99
-rw-r--r--test/util/thread.test.cpp61
21 files changed, 421 insertions, 125 deletions
diff --git a/cmake/core-files.cmake b/cmake/core-files.cmake
index 66f569abc9..f4ee786df7 100644
--- a/cmake/core-files.cmake
+++ b/cmake/core-files.cmake
@@ -4,6 +4,8 @@ set(MBGL_CORE_FILES
# actor
include/mbgl/actor/actor.hpp
include/mbgl/actor/actor_ref.hpp
+ include/mbgl/actor/aspiring_actor.hpp
+ include/mbgl/actor/established_actor.hpp
include/mbgl/actor/mailbox.hpp
include/mbgl/actor/message.hpp
include/mbgl/actor/scheduler.hpp
diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp
index a0df19208e..0052fad242 100644
--- a/include/mbgl/actor/actor.hpp
+++ b/include/mbgl/actor/actor.hpp
@@ -1,5 +1,7 @@
#pragma once
+#include <mbgl/actor/aspiring_actor.hpp>
+#include <mbgl/actor/established_actor.hpp>
#include <mbgl/actor/mailbox.hpp>
#include <mbgl/actor/message.hpp>
#include <mbgl/actor/actor_ref.hpp>
@@ -8,6 +10,7 @@
#include <memory>
#include <future>
#include <type_traits>
+#include <cassert>
namespace mbgl {
@@ -34,65 +37,33 @@ namespace mbgl {
the lifetime of the owning Actor, and sending a message to a `Ref` whose `Actor` has died is
a no-op. (In the future, a dead-letters queue or log may be implemented.)
- Construction and destruction of an actor is currently synchronous: the corresponding `O`
+ Construction and destruction of an Actor is synchronous: the corresponding `O`
object is constructed synchronously by the `Actor` constructor, and destructed synchronously
by the `~Actor` destructor, after ensuring that the `O` is not currently receiving an
- asynchronous message. (Construction and destruction may change to be asynchronous in the
- future.) The constructor of `O` is passed an `ActorRef<O>` referring to itself (which it
- can use to self-send messages), followed by the forwarded arguments passed to `Actor<O>`.
+ asynchronous message. The constructor of `O` is passed an `ActorRef<O>` referring to itself
+ (which it can use to self-send messages), followed by the forwarded arguments passed to
+ `Actor<O>`. Asynchronous object construction can be accomplished by directly using the
+ lower-level types, `AspiringActor<O>` and `EstablishedActor<O>`.
Please don't send messages that contain shared pointers or references. That subverts the
purpose of the actor model: prohibiting direct concurrent access to shared state.
*/
-
template <class Object>
-class Actor : public util::noncopyable {
+class Actor {
public:
+ template <class... Args>
+ Actor(Scheduler& scheduler, Args&&... args)
+ : target(scheduler, parent, std::forward<Args>(args)...) {}
- // Enabled for Objects with a constructor taking ActorRef<Object> as the first parameter
- template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr>
- Actor(Scheduler& scheduler, Args&&... args_)
- : mailbox(std::make_shared<Mailbox>(scheduler)),
- object(self(), std::forward<Args>(args_)...) {
- }
-
- // Enabled for plain Objects
- template<typename U = Object, class... Args, typename std::enable_if<!std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr>
- Actor(Scheduler& scheduler, Args&& ... args_)
- : mailbox(std::make_shared<Mailbox>(scheduler)), object(std::forward<Args>(args_)...) {
- }
-
- ~Actor() {
- mailbox->close();
- }
-
- template <typename Fn, class... Args>
- void invoke(Fn fn, Args&&... args) {
- mailbox->push(actor::makeMessage(object, fn, std::forward<Args>(args)...));
- }
-
- template <typename Fn, class... Args>
- auto ask(Fn fn, Args&&... args) {
- // Result type is deduced from the function's return type
- using ResultType = typename std::result_of<decltype(fn)(Object, Args...)>::type;
-
- std::promise<ResultType> promise;
- auto future = promise.get_future();
- mailbox->push(actor::makeMessage(std::move(promise), object, fn, std::forward<Args>(args)...));
- return future;
- }
+ Actor(const Actor&) = delete;
ActorRef<std::decay_t<Object>> self() {
- return ActorRef<std::decay_t<Object>>(object, mailbox);
- }
-
- operator ActorRef<std::decay_t<Object>>() {
- return self();
+ return parent.self();
}
private:
- std::shared_ptr<Mailbox> mailbox;
- Object object;
+ AspiringActor<Object> parent;
+ EstablishedActor<Object> target;
};
} // namespace mbgl
diff --git a/include/mbgl/actor/aspiring_actor.hpp b/include/mbgl/actor/aspiring_actor.hpp
new file mode 100644
index 0000000000..6c410cdfca
--- /dev/null
+++ b/include/mbgl/actor/aspiring_actor.hpp
@@ -0,0 +1,60 @@
+#pragma once
+
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/actor/message.hpp>
+#include <mbgl/actor/actor_ref.hpp>
+
+#include <memory>
+#include <future>
+#include <type_traits>
+#include <cassert>
+
+namespace mbgl {
+
+template <class Object>
+class EstablishedActor;
+
+template <class Object>
+class Actor;
+
+/*
+ An `AspiringActor<O>` is one half of the pair of types that comprise an actor (see `Actor<O>`),
+ the other half being `EstablishedActor<O>`. It is responsible for:
+ - ownership of the actor's `Mailbox`
+ - allocating the memory for (but *not* constructing) the target object `O`
+
+ Using these two pieces--the mailbox and a stable address for `O`--an `AspiringActor<O>` can
+ accept messages for the target object, or provide `ActorRef<O>`s that do so, before the object
+ has actually been constructed by the corresponding `EstablishedActor<O>`. (Such messages are
+ queued in the mailbox until after the object is constructed.)
+
+ This allows for an `AspiringActor<O>` to be created and safely used by a thread other than the
+ one on which the target object will (eventually) live.
+*/
+template <class Object>
+class AspiringActor {
+public:
+ AspiringActor() : mailbox(std::make_shared<Mailbox>()) {
+ // mailbox starts closed because the `Object` hasn't yet been constructed
+ assert(!mailbox->isOpen());
+ }
+
+ AspiringActor(const AspiringActor&) = delete;
+
+ ActorRef<std::decay_t<Object>> self() {
+ return ActorRef<std::decay_t<Object>>(object(), mailbox);
+ }
+
+private:
+ std::shared_ptr<Mailbox> mailbox;
+ std::aligned_storage_t<sizeof(Object)> objectStorage;
+
+ Object& object() {
+ return *reinterpret_cast<Object *>(&objectStorage);
+ }
+
+ friend class EstablishedActor<Object>;
+ friend class Actor<Object>;
+};
+
+} // namespace mbgl
diff --git a/include/mbgl/actor/established_actor.hpp b/include/mbgl/actor/established_actor.hpp
new file mode 100644
index 0000000000..da0d8ac705
--- /dev/null
+++ b/include/mbgl/actor/established_actor.hpp
@@ -0,0 +1,80 @@
+#pragma once
+
+#include <mbgl/actor/aspiring_actor.hpp>
+#include <mbgl/actor/mailbox.hpp>
+#include <mbgl/actor/message.hpp>
+#include <mbgl/actor/actor_ref.hpp>
+
+#include <memory>
+#include <future>
+#include <type_traits>
+#include <cassert>
+
+namespace mbgl {
+
+/*
+ An `EstablishedActor<O>` is one half of the pair of types that comprise an actor (see `Actor<O>`),
+ the other half being `AspiringActor<O>`. It is responsible for managing the lifetime of the
+ target object `O` and the open/closed state of the parent's `mailbox`.
+
+ The `O` object's lifetime is contained by that of its owning `EstablishedActor<O>`: the
+ `EstablishedActor` constructor executes the `O` constructor via "placement new", constructing
+ it at the address provided by the parent `AspiringActor`, and the `~EstablishedActor` destructor
+ similarly executes the `~O` destructor (after closing the mailbox). `EstablishedActor` should
+ therefore live entirely on the thread intended to own `O`.
+*/
+
+template <class Object>
+class EstablishedActor {
+public:
+ // Construct the Object from a parameter pack `args` (i.e. `Object(args...)`)
+ template <typename U = Object, class... Args, typename std::enable_if<
+ std::is_constructible<U, Args...>::value ||
+ std::is_constructible<U, ActorRef<U>, Args...>::value
+ >::type * = nullptr>
+ EstablishedActor(Scheduler& scheduler, AspiringActor<Object>& parent_, Args&& ... args)
+ : parent(parent_) {
+ emplaceObject(std::forward<Args>(args)...);
+ parent.mailbox->open(scheduler);
+ }
+
+ // Construct the `Object` from a tuple containing the constructor arguments (i.e.
+ // `Object(std::get<0>(args), std::get<1>(args), ...)`)
+ template <class ArgsTuple, std::size_t ArgCount = std::tuple_size<std::decay_t<ArgsTuple>>::value>
+ EstablishedActor(Scheduler& scheduler, AspiringActor<Object>& parent_, ArgsTuple&& args)
+ : parent(parent_) {
+ emplaceObject(std::forward<ArgsTuple>(args), std::make_index_sequence<ArgCount>{});
+ parent.mailbox->open(scheduler);
+ }
+
+ EstablishedActor(const EstablishedActor&) = delete;
+
+ ~EstablishedActor() {
+ parent.mailbox->close();
+ parent.object().~Object();
+ }
+
+private:
+ // Enabled for Objects with a constructor taking ActorRef<Object> as the first parameter
+ template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr>
+ void emplaceObject(Args&&... args_) {
+ new (&parent.objectStorage) Object(parent.self(), std::forward<Args>(args_)...);
+ }
+
+ // Enabled for plain Objects
+ template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, Args...>::value>::type * = nullptr>
+ void emplaceObject(Args&&... args_) {
+ new (&parent.objectStorage) Object(std::forward<Args>(args_)...);
+ }
+
+ // Used to expand a tuple holding the constructor arguments
+ template <class ArgsTuple, std::size_t... I>
+ void emplaceObject(ArgsTuple&& args, std::index_sequence<I...>) {
+ emplaceObject(std::move(std::get<I>(std::forward<ArgsTuple>(args)))...);
+ (void) args; // mark args as used: if it's empty tuple, it's not actually used above.
+ }
+
+ AspiringActor<Object>& parent;
+};
+
+} // namespace mbgl
diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp
index 8ecf91701a..23c579917a 100644
--- a/include/mbgl/actor/mailbox.hpp
+++ b/include/mbgl/actor/mailbox.hpp
@@ -1,5 +1,7 @@
#pragma once
+#include <mbgl/util/optional.hpp>
+
#include <memory>
#include <mutex>
#include <queue>
@@ -11,17 +13,30 @@ class Message;
class Mailbox : public std::enable_shared_from_this<Mailbox> {
public:
+
+ // Create a "holding" mailbox, messages to which will remain queued,
+ // unconsumed, until the mailbox is associated with a Scheduler using
+ // start(). This allows a Mailbox object to be created on one thread and
+ // later transferred to a different target thread that may not yet exist.
+ Mailbox();
+
Mailbox(Scheduler&);
- void push(std::unique_ptr<Message>);
-
+ // Attach the given scheduler to this mailbox and begin processing messages
+ // sent to it. The mailbox must be a "holding" mailbox, as created by the
+ // default constructor Mailbox().
+ void open(Scheduler& scheduler_);
void close();
+
+ bool isOpen() const;
+
+ void push(std::unique_ptr<Message>);
void receive();
static void maybeReceive(std::weak_ptr<Mailbox>);
private:
- Scheduler& scheduler;
+ optional<Scheduler*> scheduler;
std::recursive_mutex receivingMutex;
std::mutex pushingMutex;
diff --git a/include/mbgl/actor/scheduler.hpp b/include/mbgl/actor/scheduler.hpp
index d8a26ebeab..75ead29f0a 100644
--- a/include/mbgl/actor/scheduler.hpp
+++ b/include/mbgl/actor/scheduler.hpp
@@ -31,6 +31,11 @@ class Mailbox;
class Scheduler {
public:
virtual ~Scheduler() = default;
+
+ // Used by a Mailbox when it has a message in its queue to request that it
+ // be scheduled. Specifically, the scheduler is expected to asynchronously
+ // call `receive() on the given mailbox, provided it still exists at that
+ // time.
virtual void schedule(std::weak_ptr<Mailbox>) = 0;
// Set/Get the current Scheduler for this thread
diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp
index 74e722b02d..bc58427349 100644
--- a/include/mbgl/util/thread.hpp
+++ b/include/mbgl/util/thread.hpp
@@ -37,45 +37,55 @@ namespace util {
// - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events.
//
template<class Object>
-class Thread : public Scheduler {
+class Thread {
public:
template <class... Args>
Thread(const std::string& name, Args&&... args) {
- std::promise<void> running;
- thread = std::thread([&] {
+ std::promise<void> running_;
+ running = running_.get_future();
+
+ auto capturedArgs = std::make_tuple(std::forward<Args>(args)...);
+
+ thread = std::thread([
+ this,
+ name,
+ capturedArgs = std::move(capturedArgs),
+ runningPromise = std::move(running_)
+ ] () mutable {
platform::setCurrentThreadName(name);
platform::makeThreadLowPriority();
util::RunLoop loop_(util::RunLoop::Type::New);
loop = &loop_;
+ EstablishedActor<Object> establishedActor(loop_, object, std::move(capturedArgs));
- object = std::make_unique<Actor<Object>>(*this, std::forward<Args>(args)...);
- running.set_value();
-
+ runningPromise.set_value();
+
loop->run();
+
+ (void) establishedActor;
+
loop = nullptr;
});
-
- running.get_future().get();
}
- ~Thread() override {
+ ~Thread() {
if (paused) {
resume();
}
- std::promise<void> joinable;
+ std::promise<void> stoppable;
+
+ running.wait();
- // Kill the actor, so we don't get more
- // messages posted on this scheduler after
- // we delete the RunLoop.
+ // Invoke a noop task on the run loop to ensure that we're executing
+ // run() before we call stop()
loop->invoke([&] {
- object.reset();
- joinable.set_value();
+ stoppable.set_value();
});
- joinable.get_future().get();
+ stoppable.get_future().get();
loop->stop();
thread.join();
@@ -85,8 +95,8 @@ public:
// can be used to send messages to `Object`. It is safe
// to the non-owning reference to outlive this object
// and be used after the `Thread<>` gets destroyed.
- ActorRef<std::decay_t<Object>> actor() const {
- return object->self();
+ ActorRef<std::decay_t<Object>> actor() {
+ return object.self();
}
// Pauses the `Object` thread. It will prevent the object to wake
@@ -103,6 +113,8 @@ public:
auto pausing = paused->get_future();
+ running.wait();
+
loop->invoke(RunLoop::Priority::High, [this] {
auto resuming = resumed->get_future();
paused->set_value();
@@ -127,13 +139,12 @@ public:
private:
MBGL_STORE_THREAD(tid);
- void schedule(std::weak_ptr<Mailbox> mailbox) override {
- loop->schedule(mailbox);
- }
+ AspiringActor<Object> object;
std::thread thread;
- std::unique_ptr<Actor<Object>> object;
+ std::future<void> running;
+
std::unique_ptr<std::promise<void>> paused;
std::unique_ptr<std::promise<void>> resumed;
diff --git a/platform/android/src/snapshotter/map_snapshotter.cpp b/platform/android/src/snapshotter/map_snapshotter.cpp
index 155fdf81fb..e8fcc61770 100644
--- a/platform/android/src/snapshotter/map_snapshotter.cpp
+++ b/platform/android/src/snapshotter/map_snapshotter.cpp
@@ -58,8 +58,8 @@ MapSnapshotter::MapSnapshotter(jni::JNIEnv& _env,
showLogo = _showLogo;
// Create the core snapshotter
- snapshotter = std::make_unique<mbgl::MapSnapshotter>(fileSource,
- *threadPool,
+ snapshotter = std::make_unique<mbgl::MapSnapshotter>(&fileSource,
+ threadPool,
style,
size,
pixelRatio,
@@ -173,4 +173,4 @@ void MapSnapshotter::registerNative(jni::JNIEnv& env) {
}
} // namespace android
-} // namespace mbgl \ No newline at end of file
+} // namespace mbgl
diff --git a/platform/android/src/snapshotter/map_snapshotter.hpp b/platform/android/src/snapshotter/map_snapshotter.hpp
index 3be2cb4f6c..7b72452c45 100644
--- a/platform/android/src/snapshotter/map_snapshotter.hpp
+++ b/platform/android/src/snapshotter/map_snapshotter.hpp
@@ -76,4 +76,4 @@ private:
};
} // namespace android
-} // namespace mbgl \ No newline at end of file
+} // namespace mbgl
diff --git a/platform/darwin/src/MGLMapSnapshotter.mm b/platform/darwin/src/MGLMapSnapshotter.mm
index 2a2bef8fb8..d9fa044217 100644
--- a/platform/darwin/src/MGLMapSnapshotter.mm
+++ b/platform/darwin/src/MGLMapSnapshotter.mm
@@ -469,7 +469,7 @@ const CGFloat MGLSnapshotterMinimumPixelSize = 64;
}
// Create the snapshotter
- _mbglMapSnapshotter = std::make_unique<mbgl::MapSnapshotter>(*mbglFileSource, *_mbglThreadPool, style, size, pixelRatio, cameraOptions, coordinateBounds);
+ _mbglMapSnapshotter = std::make_unique<mbgl::MapSnapshotter>(mbglFileSource, _mbglThreadPool, style, size, pixelRatio, cameraOptions, coordinateBounds);
}
@end
diff --git a/platform/default/default_file_source.cpp b/platform/default/default_file_source.cpp
index 89aabeb8d3..f070121497 100644
--- a/platform/default/default_file_source.cpp
+++ b/platform/default/default_file_source.cpp
@@ -19,15 +19,10 @@ namespace mbgl {
class DefaultFileSource::Impl {
public:
- Impl(ActorRef<Impl> self, std::shared_ptr<FileSource> assetFileSource_, const std::string& cachePath, uint64_t maximumCacheSize)
+ Impl(std::shared_ptr<FileSource> assetFileSource_, std::string cachePath, uint64_t maximumCacheSize)
: assetFileSource(assetFileSource_)
- , localFileSource(std::make_unique<LocalFileSource>()) {
- // Initialize the Database asynchronously so as to not block Actor creation.
- self.invoke(&Impl::initializeOfflineDatabase, cachePath, maximumCacheSize);
- }
-
- void initializeOfflineDatabase(std::string cachePath, uint64_t maximumCacheSize) {
- offlineDatabase = std::make_unique<OfflineDatabase>(cachePath, maximumCacheSize);
+ , localFileSource(std::make_unique<LocalFileSource>())
+ , offlineDatabase(std::make_unique<OfflineDatabase>(cachePath, maximumCacheSize)) {
}
void setAPIBaseURL(const std::string& url) {
diff --git a/platform/default/mbgl/map/map_snapshotter.cpp b/platform/default/mbgl/map/map_snapshotter.cpp
index a909e3fe9b..149ef22e7a 100644
--- a/platform/default/mbgl/map/map_snapshotter.cpp
+++ b/platform/default/mbgl/map/map_snapshotter.cpp
@@ -13,8 +13,8 @@ namespace mbgl {
class MapSnapshotter::Impl {
public:
- Impl(FileSource&,
- Scheduler&,
+ Impl(FileSource*,
+ std::shared_ptr<Scheduler>,
const std::pair<bool, std::string> style,
const Size&,
const float pixelRatio,
@@ -40,20 +40,22 @@ public:
void snapshot(ActorRef<MapSnapshotter::Callback>);
private:
+ std::shared_ptr<Scheduler> scheduler;
HeadlessFrontend frontend;
Map map;
};
-MapSnapshotter::Impl::Impl(FileSource& fileSource,
- Scheduler& scheduler,
+MapSnapshotter::Impl::Impl(FileSource* fileSource,
+ std::shared_ptr<Scheduler> scheduler_,
const std::pair<bool, std::string> style,
const Size& size,
const float pixelRatio,
const optional<CameraOptions> cameraOptions,
const optional<LatLngBounds> region,
const optional<std::string> programCacheDir)
- : frontend(size, pixelRatio, fileSource, scheduler, programCacheDir)
- , map(frontend, MapObserver::nullObserver(), size, pixelRatio, fileSource, scheduler, MapMode::Static) {
+ : scheduler(std::move(scheduler_))
+ , frontend(size, pixelRatio, *fileSource, *scheduler, programCacheDir)
+ , map(frontend, MapObserver::nullObserver(), size, pixelRatio, *fileSource, *scheduler, MapMode::Static) {
if (style.first) {
map.getStyle().loadJSON(style.second);
@@ -149,15 +151,15 @@ LatLngBounds MapSnapshotter::Impl::getRegion() const {
return map.latLngBoundsForCamera(getCameraOptions());
}
-MapSnapshotter::MapSnapshotter(FileSource& fileSource,
- Scheduler& scheduler,
+MapSnapshotter::MapSnapshotter(FileSource* fileSource,
+ std::shared_ptr<Scheduler> scheduler,
const std::pair<bool, std::string> style,
const Size& size,
const float pixelRatio,
const optional<CameraOptions> cameraOptions,
const optional<LatLngBounds> region,
const optional<std::string> programCacheDir)
- : impl(std::make_unique<util::Thread<MapSnapshotter::Impl>>("Map Snapshotter", fileSource, scheduler, style, size, pixelRatio, cameraOptions, region, programCacheDir)) {
+ : impl(std::make_unique<util::Thread<MapSnapshotter::Impl>>("Map Snapshotter", fileSource, std::move(scheduler), style, size, pixelRatio, cameraOptions, region, programCacheDir)) {
}
MapSnapshotter::~MapSnapshotter() = default;
diff --git a/platform/default/mbgl/map/map_snapshotter.hpp b/platform/default/mbgl/map/map_snapshotter.hpp
index b9e6307664..f40d1e4b77 100644
--- a/platform/default/mbgl/map/map_snapshotter.hpp
+++ b/platform/default/mbgl/map/map_snapshotter.hpp
@@ -25,8 +25,8 @@ class Style;
class MapSnapshotter {
public:
- MapSnapshotter(FileSource& fileSource,
- Scheduler& scheduler,
+ MapSnapshotter(FileSource* fileSource,
+ std::shared_ptr<Scheduler> scheduler,
const std::pair<bool, std::string> style,
const Size&,
const float pixelRatio,
diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp
index 373c24275f..8ee8dca114 100644
--- a/src/mbgl/actor/mailbox.cpp
+++ b/src/mbgl/actor/mailbox.cpp
@@ -6,8 +6,30 @@
namespace mbgl {
+Mailbox::Mailbox() {
+}
+
Mailbox::Mailbox(Scheduler& scheduler_)
- : scheduler(scheduler_) {
+ : scheduler(&scheduler_) {
+}
+
+void Mailbox::open(Scheduler& scheduler_) {
+ assert(!scheduler);
+
+ // As with close(), block until neither receive() nor push() are in progress, and acquire the two
+ // mutexes in the same order.
+ std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex);
+ std::lock_guard<std::mutex> pushingLock(pushingMutex);
+
+ scheduler = &scheduler_;
+
+ if (closed) {
+ return;
+ }
+
+ if (!queue.empty()) {
+ (*scheduler)->schedule(shared_from_this());
+ }
}
void Mailbox::close() {
@@ -22,6 +44,9 @@ void Mailbox::close() {
closed = true;
}
+bool Mailbox::isOpen() const { return bool(scheduler); }
+
+
void Mailbox::push(std::unique_ptr<Message> message) {
std::lock_guard<std::mutex> pushingLock(pushingMutex);
@@ -32,13 +57,15 @@ void Mailbox::push(std::unique_ptr<Message> message) {
std::lock_guard<std::mutex> queueLock(queueMutex);
bool wasEmpty = queue.empty();
queue.push(std::move(message));
- if (wasEmpty) {
- scheduler.schedule(shared_from_this());
+ if (wasEmpty && scheduler) {
+ (*scheduler)->schedule(shared_from_this());
}
}
void Mailbox::receive() {
std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex);
+
+ assert(scheduler);
if (closed) {
return;
@@ -58,7 +85,7 @@ void Mailbox::receive() {
(*message)();
if (!wasEmpty) {
- scheduler.schedule(shared_from_this());
+ (*scheduler)->schedule(shared_from_this());
}
}
diff --git a/src/mbgl/sprite/sprite_loader.cpp b/src/mbgl/sprite/sprite_loader.cpp
index 93d6dfd9ae..df4fe6e8df 100644
--- a/src/mbgl/sprite/sprite_loader.cpp
+++ b/src/mbgl/sprite/sprite_loader.cpp
@@ -86,7 +86,7 @@ void SpriteLoader::emitSpriteLoadedIfComplete() {
return;
}
- loader->worker.invoke(&SpriteLoaderWorker::parse, loader->image, loader->json);
+ loader->worker.self().invoke(&SpriteLoaderWorker::parse, loader->image, loader->json);
}
void SpriteLoader::onParsed(std::vector<std::unique_ptr<style::Image>>&& result) {
diff --git a/src/mbgl/style/sources/custom_geometry_source.cpp b/src/mbgl/style/sources/custom_geometry_source.cpp
index b37490a5ce..6ce7c1be11 100644
--- a/src/mbgl/style/sources/custom_geometry_source.cpp
+++ b/src/mbgl/style/sources/custom_geometry_source.cpp
@@ -30,15 +30,15 @@ void CustomGeometrySource::loadDescription(FileSource&) {
void CustomGeometrySource::setTileData(const CanonicalTileID& tileID,
const GeoJSON& data) {
- loader->invoke(&CustomTileLoader::setTileData, tileID, data);
+ loader->self().invoke(&CustomTileLoader::setTileData, tileID, data);
}
void CustomGeometrySource::invalidateTile(const CanonicalTileID& tileID) {
- loader->invoke(&CustomTileLoader::invalidateTile, tileID);
+ loader->self().invoke(&CustomTileLoader::invalidateTile, tileID);
}
void CustomGeometrySource::invalidateRegion(const LatLngBounds& bounds) {
- loader->invoke(&CustomTileLoader::invalidateRegion, bounds, impl().getZoomRange());
+ loader->self().invoke(&CustomTileLoader::invalidateRegion, bounds, impl().getZoomRange());
}
} // namespace style
diff --git a/src/mbgl/tile/geometry_tile.cpp b/src/mbgl/tile/geometry_tile.cpp
index af28fe3963..d686d8440b 100644
--- a/src/mbgl/tile/geometry_tile.cpp
+++ b/src/mbgl/tile/geometry_tile.cpp
@@ -86,7 +86,7 @@ void GeometryTile::setData(std::unique_ptr<const GeometryTileData> data_) {
pending = true;
++correlationID;
- worker.invoke(&GeometryTileWorker::setData, std::move(data_), correlationID);
+ worker.self().invoke(&GeometryTileWorker::setData, std::move(data_), correlationID);
}
@@ -112,14 +112,14 @@ void GeometryTile::setLayers(const std::vector<Immutable<Layer::Impl>>& layers)
}
++correlationID;
- worker.invoke(&GeometryTileWorker::setLayers, std::move(impls), correlationID);
+ worker.self().invoke(&GeometryTileWorker::setLayers, std::move(impls), correlationID);
}
void GeometryTile::setShowCollisionBoxes(const bool showCollisionBoxes_) {
if (showCollisionBoxes != showCollisionBoxes_) {
showCollisionBoxes = showCollisionBoxes_;
++correlationID;
- worker.invoke(&GeometryTileWorker::setShowCollisionBoxes, showCollisionBoxes, correlationID);
+ worker.self().invoke(&GeometryTileWorker::setShowCollisionBoxes, showCollisionBoxes, correlationID);
}
}
@@ -153,7 +153,7 @@ void GeometryTile::onError(std::exception_ptr err, const uint64_t resultCorrelat
}
void GeometryTile::onGlyphsAvailable(GlyphMap glyphs) {
- worker.invoke(&GeometryTileWorker::onGlyphsAvailable, std::move(glyphs));
+ worker.self().invoke(&GeometryTileWorker::onGlyphsAvailable, std::move(glyphs));
}
void GeometryTile::getGlyphs(GlyphDependencies glyphDependencies) {
@@ -161,7 +161,7 @@ void GeometryTile::getGlyphs(GlyphDependencies glyphDependencies) {
}
void GeometryTile::onImagesAvailable(ImageMap images, uint64_t imageCorrelationID) {
- worker.invoke(&GeometryTileWorker::onImagesAvailable, std::move(images), imageCorrelationID);
+ worker.self().invoke(&GeometryTileWorker::onImagesAvailable, std::move(images), imageCorrelationID);
}
void GeometryTile::getImages(ImageRequestPair pair) {
diff --git a/src/mbgl/tile/raster_dem_tile.cpp b/src/mbgl/tile/raster_dem_tile.cpp
index 5db298cf4c..f29861ee71 100644
--- a/src/mbgl/tile/raster_dem_tile.cpp
+++ b/src/mbgl/tile/raster_dem_tile.cpp
@@ -48,7 +48,7 @@ void RasterDEMTile::setMetadata(optional<Timestamp> modified_, optional<Timestam
void RasterDEMTile::setData(std::shared_ptr<const std::string> data) {
pending = true;
++correlationID;
- worker.invoke(&RasterDEMTileWorker::parse, data, correlationID, encoding);
+ worker.self().invoke(&RasterDEMTileWorker::parse, data, correlationID, encoding);
}
void RasterDEMTile::onParsed(std::unique_ptr<HillshadeBucket> result, const uint64_t resultCorrelationID) {
diff --git a/src/mbgl/tile/raster_tile.cpp b/src/mbgl/tile/raster_tile.cpp
index ff23d4493e..cc71c04ba1 100644
--- a/src/mbgl/tile/raster_tile.cpp
+++ b/src/mbgl/tile/raster_tile.cpp
@@ -37,7 +37,7 @@ void RasterTile::setMetadata(optional<Timestamp> modified_, optional<Timestamp>
void RasterTile::setData(std::shared_ptr<const std::string> data) {
pending = true;
++correlationID;
- worker.invoke(&RasterTileWorker::parse, data, correlationID);
+ worker.self().invoke(&RasterTileWorker::parse, data, correlationID);
}
void RasterTile::onParsed(std::unique_ptr<RasterBucket> result, const uint64_t resultCorrelationID) {
diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp
index 967dc152d9..7493abe263 100644
--- a/test/actor/actor.test.cpp
+++ b/test/actor/actor.test.cpp
@@ -1,5 +1,6 @@
#include <mbgl/actor/actor.hpp>
#include <mbgl/util/default_thread_pool.hpp>
+#include <mbgl/util/run_loop.hpp>
#include <mbgl/test/util.hpp>
@@ -7,13 +8,12 @@
#include <functional>
#include <future>
#include <memory>
+#include <thread>
using namespace mbgl;
using namespace std::chrono_literals;
TEST(Actor, Construction) {
- // Construction is currently synchronous. It may become asynchronous in the future.
-
struct Test {
Test(ActorRef<Test>, bool& constructed) {
constructed = true;
@@ -27,6 +27,25 @@ TEST(Actor, Construction) {
EXPECT_TRUE(constructed);
}
+TEST(Actor, Destruction) {
+ struct Test {
+ Test(ActorRef<Test>, bool& destructed_) : destructed(destructed_) {};
+ ~Test() {
+ destructed = true;
+ }
+
+ bool& destructed;
+ };
+
+ ThreadPool pool { 1 };
+ bool destructed = false;
+ {
+ Actor<Test> test(pool, std::ref(destructed));
+ }
+
+ EXPECT_TRUE(destructed);
+}
+
TEST(Actor, DestructionBlocksOnReceive) {
// Destruction blocks until the actor is not receiving.
@@ -63,7 +82,7 @@ TEST(Actor, DestructionBlocksOnReceive) {
Actor<Test> test(pool, std::move(enteredPromise), std::move(exitingFuture));
- test.invoke(&Test::wait);
+ test.self().invoke(&Test::wait);
enteredFuture.wait();
exitingPromise.set_value();
}
@@ -145,7 +164,7 @@ TEST(Actor, DestructionAllowedInReceiveOnSameThread) {
auto test = std::make_unique<Actor<Test>>(pool);
// Callback (triggered while mutex is locked in Mailbox::receive())
- test->invoke(&Test::callMeBack, [&]() {
+ test->self().invoke(&Test::callMeBack, [&]() {
// Destroy the Actor/Mailbox in the same thread
test.reset();
callbackFiredPromise.set_value();
@@ -180,16 +199,16 @@ TEST(Actor, SelfDestructionDoesntCrashWaitingReceivingThreads) {
std::atomic<bool> waitingMessageProcessed {false};
// Callback (triggered while mutex is locked in Mailbox::receive())
- closingActor->invoke(&Test::callMeBack, [&]() {
+ closingActor->self().invoke(&Test::callMeBack, [&]() {
// Queue up another message from another thread
std::promise<void> messageQueuedPromise;
- waitingActor->invoke(&Test::callMeBack, [&]() {
+ waitingActor->self().invoke(&Test::callMeBack, [&]() {
// This will be waiting on the mutex in
// Mailbox::receive(), holding a lock
// on the weak_ptr so the mailbox is not
// destroyed
- closingActor->invoke(&Test::callMeBack, [&]() {
+ closingActor->self().invoke(&Test::callMeBack, [&]() {
waitingMessageProcessed.store(true);
});
messageQueuedPromise.set_value();
@@ -239,10 +258,10 @@ TEST(Actor, OrderedMailbox) {
Actor<Test> test(pool, std::move(endedPromise));
for (auto i = 1; i <= 10; ++i) {
- test.invoke(&Test::receive, i);
+ test.self().invoke(&Test::receive, i);
}
- test.invoke(&Test::end);
+ test.self().invoke(&Test::end);
endedFuture.wait();
}
@@ -275,10 +294,10 @@ TEST(Actor, NonConcurrentMailbox) {
Actor<Test> test(pool, std::move(endedPromise));
for (auto i = 1; i <= 10; ++i) {
- test.invoke(&Test::receive, i);
+ test.self().invoke(&Test::receive, i);
}
- test.invoke(&Test::end);
+ test.self().invoke(&Test::end);
endedFuture.wait();
}
@@ -297,7 +316,7 @@ TEST(Actor, Ask) {
ThreadPool pool { 2 };
Actor<Test> test(pool);
- auto result = test.ask(&Test::doubleIt, 1);
+ auto result = test.self().ask(&Test::doubleIt, 1);
ASSERT_TRUE(result.valid());
@@ -324,7 +343,7 @@ TEST(Actor, AskVoid) {
bool executed = false;
Actor<Test> actor(pool, executed);
- actor.ask(&Test::doIt).get();
+ actor.self().ask(&Test::doIt).get();
EXPECT_TRUE(executed);
}
@@ -355,6 +374,58 @@ TEST(Actor, NoSelfActorRef) {
auto future = promise.get_future();
Actor<WithArguments> withArguments(pool, std::move(promise));
- withArguments.invoke(&WithArguments::receive);
+ withArguments.self().invoke(&WithArguments::receive);
future.wait();
}
+
+TEST(Actor, TwoPhaseConstruction) {
+ // This test mimics, in simplified form, the approach used by the Thread<Object> to construct
+ // its actor in two parts so that the Thread<Object> instance can be created without waiting
+ // for the target thread to be up and running.
+
+ struct Test {
+ Test(ActorRef<Test>, std::shared_ptr<bool> destroyed_)
+ : destroyed(std::move(destroyed_)) {};
+
+ ~Test() {
+ *destroyed = true;
+ }
+
+ void callMe(std::promise<void> p) {
+ p.set_value();
+ }
+
+ void stop() {
+ util::RunLoop::Get()->stop();
+ }
+
+ std::shared_ptr<bool> destroyed;
+ };
+
+ AspiringActor<Test> parent;
+
+ auto destroyed = std::make_shared<bool>(false);
+
+ std::promise<void> queueExecuted;
+ auto queueExecutedFuture = queueExecuted.get_future();
+
+ parent.self().invoke(&Test::callMe, std::move(queueExecuted));
+ parent.self().invoke(&Test::stop);
+
+ auto thread = std::thread([
+ capturedArgs = std::make_tuple(destroyed),
+ &parent
+ ] () mutable {
+ util::RunLoop loop(util::RunLoop::Type::New);
+ EstablishedActor<Test> test(loop, parent, capturedArgs);
+ loop.run();
+ });
+
+ // should not hang
+ queueExecutedFuture.get();
+ thread.join();
+
+ EXPECT_TRUE(*destroyed);
+}
+
+
diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp
index 76fb5ce3f0..2bcb9d8959 100644
--- a/test/util/thread.test.cpp
+++ b/test/util/thread.test.cpp
@@ -15,11 +15,11 @@ class TestObject {
public:
TestObject(ActorRef<TestObject>, std::thread::id otherTid)
: tid(std::this_thread::get_id()) {
- EXPECT_NE(tid, otherTid);
+ EXPECT_NE(tid, otherTid); // Object is created on child thread
}
~TestObject() {
- EXPECT_EQ(tid, std::this_thread::get_id());
+ EXPECT_EQ(tid, std::this_thread::get_id()); // Object is destroyed on child thread
}
void fn1(int val) const {
@@ -275,3 +275,60 @@ TEST(Thread, PauseResume) {
thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); });
loop.run();
}
+
+
+class TestWorkerDelayedConstruction {
+public:
+ TestWorkerDelayedConstruction(ActorRef<TestWorkerDelayedConstruction>, std::future<void> start) {
+ start.get();
+ }
+
+ void send(std::function<void ()> cb) {
+ cb();
+ }
+
+private:
+ Timer timer;
+};
+
+TEST(Thread, InvokeBeforeChildStarts) {
+ RunLoop loop;
+
+ std::promise<void> start;
+ Thread<TestWorkerDelayedConstruction> thread("Test", start.get_future());
+
+ std::atomic<int> count { 0 };
+
+ for (unsigned i = 0; i < 100; ++i) {
+ thread.actor().invoke(&TestWorkerDelayedConstruction::send, [&] { ++count; });
+ }
+
+ thread.actor().invoke(&TestWorkerDelayedConstruction::send, [&] { loop.stop(); });
+
+ // This test will be flaky if messages are consumed before the target object is constructed.
+ ASSERT_EQ(count, 0);
+
+ start.set_value();
+
+ loop.run();
+
+ ASSERT_EQ(count, 100);
+}
+
+TEST(Thread, DeleteBeforeChildStarts) {
+ std::atomic_bool flag(false);
+ std::promise<void> start;
+
+ Thread<TestWorker> control("Control");
+ auto thread = std::make_unique<Thread<TestWorkerDelayedConstruction>>("Test", start.get_future());
+
+ thread->actor().invoke(&TestWorkerDelayedConstruction::send, [&] { flag = true; });
+
+ control.actor().invoke(&TestWorker::sendDelayed, [&] { start.set_value(); });
+
+ // Should not hang.
+ thread.reset();
+
+ // Should process the queue before destruction.
+ ASSERT_TRUE(flag);
+}