diff options
21 files changed, 421 insertions, 125 deletions
diff --git a/cmake/core-files.cmake b/cmake/core-files.cmake index 66f569abc9..f4ee786df7 100644 --- a/cmake/core-files.cmake +++ b/cmake/core-files.cmake @@ -4,6 +4,8 @@ set(MBGL_CORE_FILES # actor include/mbgl/actor/actor.hpp include/mbgl/actor/actor_ref.hpp + include/mbgl/actor/aspiring_actor.hpp + include/mbgl/actor/established_actor.hpp include/mbgl/actor/mailbox.hpp include/mbgl/actor/message.hpp include/mbgl/actor/scheduler.hpp diff --git a/include/mbgl/actor/actor.hpp b/include/mbgl/actor/actor.hpp index a0df19208e..0052fad242 100644 --- a/include/mbgl/actor/actor.hpp +++ b/include/mbgl/actor/actor.hpp @@ -1,5 +1,7 @@ #pragma once +#include <mbgl/actor/aspiring_actor.hpp> +#include <mbgl/actor/established_actor.hpp> #include <mbgl/actor/mailbox.hpp> #include <mbgl/actor/message.hpp> #include <mbgl/actor/actor_ref.hpp> @@ -8,6 +10,7 @@ #include <memory> #include <future> #include <type_traits> +#include <cassert> namespace mbgl { @@ -34,65 +37,33 @@ namespace mbgl { the lifetime of the owning Actor, and sending a message to a `Ref` whose `Actor` has died is a no-op. (In the future, a dead-letters queue or log may be implemented.) - Construction and destruction of an actor is currently synchronous: the corresponding `O` + Construction and destruction of an Actor is synchronous: the corresponding `O` object is constructed synchronously by the `Actor` constructor, and destructed synchronously by the `~Actor` destructor, after ensuring that the `O` is not currently receiving an - asynchronous message. (Construction and destruction may change to be asynchronous in the - future.) The constructor of `O` is passed an `ActorRef<O>` referring to itself (which it - can use to self-send messages), followed by the forwarded arguments passed to `Actor<O>`. + asynchronous message. The constructor of `O` is passed an `ActorRef<O>` referring to itself + (which it can use to self-send messages), followed by the forwarded arguments passed to + `Actor<O>`. Asynchronous object construction can be accomplished by directly using the + lower-level types, `AspiringActor<O>` and `EstablishedActor<O>`. Please don't send messages that contain shared pointers or references. That subverts the purpose of the actor model: prohibiting direct concurrent access to shared state. */ - template <class Object> -class Actor : public util::noncopyable { +class Actor { public: + template <class... Args> + Actor(Scheduler& scheduler, Args&&... args) + : target(scheduler, parent, std::forward<Args>(args)...) {} - // Enabled for Objects with a constructor taking ActorRef<Object> as the first parameter - template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr> - Actor(Scheduler& scheduler, Args&&... args_) - : mailbox(std::make_shared<Mailbox>(scheduler)), - object(self(), std::forward<Args>(args_)...) { - } - - // Enabled for plain Objects - template<typename U = Object, class... Args, typename std::enable_if<!std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr> - Actor(Scheduler& scheduler, Args&& ... args_) - : mailbox(std::make_shared<Mailbox>(scheduler)), object(std::forward<Args>(args_)...) { - } - - ~Actor() { - mailbox->close(); - } - - template <typename Fn, class... Args> - void invoke(Fn fn, Args&&... args) { - mailbox->push(actor::makeMessage(object, fn, std::forward<Args>(args)...)); - } - - template <typename Fn, class... Args> - auto ask(Fn fn, Args&&... args) { - // Result type is deduced from the function's return type - using ResultType = typename std::result_of<decltype(fn)(Object, Args...)>::type; - - std::promise<ResultType> promise; - auto future = promise.get_future(); - mailbox->push(actor::makeMessage(std::move(promise), object, fn, std::forward<Args>(args)...)); - return future; - } + Actor(const Actor&) = delete; ActorRef<std::decay_t<Object>> self() { - return ActorRef<std::decay_t<Object>>(object, mailbox); - } - - operator ActorRef<std::decay_t<Object>>() { - return self(); + return parent.self(); } private: - std::shared_ptr<Mailbox> mailbox; - Object object; + AspiringActor<Object> parent; + EstablishedActor<Object> target; }; } // namespace mbgl diff --git a/include/mbgl/actor/aspiring_actor.hpp b/include/mbgl/actor/aspiring_actor.hpp new file mode 100644 index 0000000000..6c410cdfca --- /dev/null +++ b/include/mbgl/actor/aspiring_actor.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/actor/message.hpp> +#include <mbgl/actor/actor_ref.hpp> + +#include <memory> +#include <future> +#include <type_traits> +#include <cassert> + +namespace mbgl { + +template <class Object> +class EstablishedActor; + +template <class Object> +class Actor; + +/* + An `AspiringActor<O>` is one half of the pair of types that comprise an actor (see `Actor<O>`), + the other half being `EstablishedActor<O>`. It is responsible for: + - ownership of the actor's `Mailbox` + - allocating the memory for (but *not* constructing) the target object `O` + + Using these two pieces--the mailbox and a stable address for `O`--an `AspiringActor<O>` can + accept messages for the target object, or provide `ActorRef<O>`s that do so, before the object + has actually been constructed by the corresponding `EstablishedActor<O>`. (Such messages are + queued in the mailbox until after the object is constructed.) + + This allows for an `AspiringActor<O>` to be created and safely used by a thread other than the + one on which the target object will (eventually) live. +*/ +template <class Object> +class AspiringActor { +public: + AspiringActor() : mailbox(std::make_shared<Mailbox>()) { + // mailbox starts closed because the `Object` hasn't yet been constructed + assert(!mailbox->isOpen()); + } + + AspiringActor(const AspiringActor&) = delete; + + ActorRef<std::decay_t<Object>> self() { + return ActorRef<std::decay_t<Object>>(object(), mailbox); + } + +private: + std::shared_ptr<Mailbox> mailbox; + std::aligned_storage_t<sizeof(Object)> objectStorage; + + Object& object() { + return *reinterpret_cast<Object *>(&objectStorage); + } + + friend class EstablishedActor<Object>; + friend class Actor<Object>; +}; + +} // namespace mbgl diff --git a/include/mbgl/actor/established_actor.hpp b/include/mbgl/actor/established_actor.hpp new file mode 100644 index 0000000000..da0d8ac705 --- /dev/null +++ b/include/mbgl/actor/established_actor.hpp @@ -0,0 +1,80 @@ +#pragma once + +#include <mbgl/actor/aspiring_actor.hpp> +#include <mbgl/actor/mailbox.hpp> +#include <mbgl/actor/message.hpp> +#include <mbgl/actor/actor_ref.hpp> + +#include <memory> +#include <future> +#include <type_traits> +#include <cassert> + +namespace mbgl { + +/* + An `EstablishedActor<O>` is one half of the pair of types that comprise an actor (see `Actor<O>`), + the other half being `AspiringActor<O>`. It is responsible for managing the lifetime of the + target object `O` and the open/closed state of the parent's `mailbox`. + + The `O` object's lifetime is contained by that of its owning `EstablishedActor<O>`: the + `EstablishedActor` constructor executes the `O` constructor via "placement new", constructing + it at the address provided by the parent `AspiringActor`, and the `~EstablishedActor` destructor + similarly executes the `~O` destructor (after closing the mailbox). `EstablishedActor` should + therefore live entirely on the thread intended to own `O`. +*/ + +template <class Object> +class EstablishedActor { +public: + // Construct the Object from a parameter pack `args` (i.e. `Object(args...)`) + template <typename U = Object, class... Args, typename std::enable_if< + std::is_constructible<U, Args...>::value || + std::is_constructible<U, ActorRef<U>, Args...>::value + >::type * = nullptr> + EstablishedActor(Scheduler& scheduler, AspiringActor<Object>& parent_, Args&& ... args) + : parent(parent_) { + emplaceObject(std::forward<Args>(args)...); + parent.mailbox->open(scheduler); + } + + // Construct the `Object` from a tuple containing the constructor arguments (i.e. + // `Object(std::get<0>(args), std::get<1>(args), ...)`) + template <class ArgsTuple, std::size_t ArgCount = std::tuple_size<std::decay_t<ArgsTuple>>::value> + EstablishedActor(Scheduler& scheduler, AspiringActor<Object>& parent_, ArgsTuple&& args) + : parent(parent_) { + emplaceObject(std::forward<ArgsTuple>(args), std::make_index_sequence<ArgCount>{}); + parent.mailbox->open(scheduler); + } + + EstablishedActor(const EstablishedActor&) = delete; + + ~EstablishedActor() { + parent.mailbox->close(); + parent.object().~Object(); + } + +private: + // Enabled for Objects with a constructor taking ActorRef<Object> as the first parameter + template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, ActorRef<U>, Args...>::value>::type * = nullptr> + void emplaceObject(Args&&... args_) { + new (&parent.objectStorage) Object(parent.self(), std::forward<Args>(args_)...); + } + + // Enabled for plain Objects + template <typename U = Object, class... Args, typename std::enable_if<std::is_constructible<U, Args...>::value>::type * = nullptr> + void emplaceObject(Args&&... args_) { + new (&parent.objectStorage) Object(std::forward<Args>(args_)...); + } + + // Used to expand a tuple holding the constructor arguments + template <class ArgsTuple, std::size_t... I> + void emplaceObject(ArgsTuple&& args, std::index_sequence<I...>) { + emplaceObject(std::move(std::get<I>(std::forward<ArgsTuple>(args)))...); + (void) args; // mark args as used: if it's empty tuple, it's not actually used above. + } + + AspiringActor<Object>& parent; +}; + +} // namespace mbgl diff --git a/include/mbgl/actor/mailbox.hpp b/include/mbgl/actor/mailbox.hpp index 8ecf91701a..23c579917a 100644 --- a/include/mbgl/actor/mailbox.hpp +++ b/include/mbgl/actor/mailbox.hpp @@ -1,5 +1,7 @@ #pragma once +#include <mbgl/util/optional.hpp> + #include <memory> #include <mutex> #include <queue> @@ -11,17 +13,30 @@ class Message; class Mailbox : public std::enable_shared_from_this<Mailbox> { public: + + // Create a "holding" mailbox, messages to which will remain queued, + // unconsumed, until the mailbox is associated with a Scheduler using + // start(). This allows a Mailbox object to be created on one thread and + // later transferred to a different target thread that may not yet exist. + Mailbox(); + Mailbox(Scheduler&); - void push(std::unique_ptr<Message>); - + // Attach the given scheduler to this mailbox and begin processing messages + // sent to it. The mailbox must be a "holding" mailbox, as created by the + // default constructor Mailbox(). + void open(Scheduler& scheduler_); void close(); + + bool isOpen() const; + + void push(std::unique_ptr<Message>); void receive(); static void maybeReceive(std::weak_ptr<Mailbox>); private: - Scheduler& scheduler; + optional<Scheduler*> scheduler; std::recursive_mutex receivingMutex; std::mutex pushingMutex; diff --git a/include/mbgl/actor/scheduler.hpp b/include/mbgl/actor/scheduler.hpp index d8a26ebeab..75ead29f0a 100644 --- a/include/mbgl/actor/scheduler.hpp +++ b/include/mbgl/actor/scheduler.hpp @@ -31,6 +31,11 @@ class Mailbox; class Scheduler { public: virtual ~Scheduler() = default; + + // Used by a Mailbox when it has a message in its queue to request that it + // be scheduled. Specifically, the scheduler is expected to asynchronously + // call `receive() on the given mailbox, provided it still exists at that + // time. virtual void schedule(std::weak_ptr<Mailbox>) = 0; // Set/Get the current Scheduler for this thread diff --git a/include/mbgl/util/thread.hpp b/include/mbgl/util/thread.hpp index 74e722b02d..bc58427349 100644 --- a/include/mbgl/util/thread.hpp +++ b/include/mbgl/util/thread.hpp @@ -37,45 +37,55 @@ namespace util { // - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events. // template<class Object> -class Thread : public Scheduler { +class Thread { public: template <class... Args> Thread(const std::string& name, Args&&... args) { - std::promise<void> running; - thread = std::thread([&] { + std::promise<void> running_; + running = running_.get_future(); + + auto capturedArgs = std::make_tuple(std::forward<Args>(args)...); + + thread = std::thread([ + this, + name, + capturedArgs = std::move(capturedArgs), + runningPromise = std::move(running_) + ] () mutable { platform::setCurrentThreadName(name); platform::makeThreadLowPriority(); util::RunLoop loop_(util::RunLoop::Type::New); loop = &loop_; + EstablishedActor<Object> establishedActor(loop_, object, std::move(capturedArgs)); - object = std::make_unique<Actor<Object>>(*this, std::forward<Args>(args)...); - running.set_value(); - + runningPromise.set_value(); + loop->run(); + + (void) establishedActor; + loop = nullptr; }); - - running.get_future().get(); } - ~Thread() override { + ~Thread() { if (paused) { resume(); } - std::promise<void> joinable; + std::promise<void> stoppable; + + running.wait(); - // Kill the actor, so we don't get more - // messages posted on this scheduler after - // we delete the RunLoop. + // Invoke a noop task on the run loop to ensure that we're executing + // run() before we call stop() loop->invoke([&] { - object.reset(); - joinable.set_value(); + stoppable.set_value(); }); - joinable.get_future().get(); + stoppable.get_future().get(); loop->stop(); thread.join(); @@ -85,8 +95,8 @@ public: // can be used to send messages to `Object`. It is safe // to the non-owning reference to outlive this object // and be used after the `Thread<>` gets destroyed. - ActorRef<std::decay_t<Object>> actor() const { - return object->self(); + ActorRef<std::decay_t<Object>> actor() { + return object.self(); } // Pauses the `Object` thread. It will prevent the object to wake @@ -103,6 +113,8 @@ public: auto pausing = paused->get_future(); + running.wait(); + loop->invoke(RunLoop::Priority::High, [this] { auto resuming = resumed->get_future(); paused->set_value(); @@ -127,13 +139,12 @@ public: private: MBGL_STORE_THREAD(tid); - void schedule(std::weak_ptr<Mailbox> mailbox) override { - loop->schedule(mailbox); - } + AspiringActor<Object> object; std::thread thread; - std::unique_ptr<Actor<Object>> object; + std::future<void> running; + std::unique_ptr<std::promise<void>> paused; std::unique_ptr<std::promise<void>> resumed; diff --git a/platform/android/src/snapshotter/map_snapshotter.cpp b/platform/android/src/snapshotter/map_snapshotter.cpp index 155fdf81fb..e8fcc61770 100644 --- a/platform/android/src/snapshotter/map_snapshotter.cpp +++ b/platform/android/src/snapshotter/map_snapshotter.cpp @@ -58,8 +58,8 @@ MapSnapshotter::MapSnapshotter(jni::JNIEnv& _env, showLogo = _showLogo; // Create the core snapshotter - snapshotter = std::make_unique<mbgl::MapSnapshotter>(fileSource, - *threadPool, + snapshotter = std::make_unique<mbgl::MapSnapshotter>(&fileSource, + threadPool, style, size, pixelRatio, @@ -173,4 +173,4 @@ void MapSnapshotter::registerNative(jni::JNIEnv& env) { } } // namespace android -} // namespace mbgl
\ No newline at end of file +} // namespace mbgl diff --git a/platform/android/src/snapshotter/map_snapshotter.hpp b/platform/android/src/snapshotter/map_snapshotter.hpp index 3be2cb4f6c..7b72452c45 100644 --- a/platform/android/src/snapshotter/map_snapshotter.hpp +++ b/platform/android/src/snapshotter/map_snapshotter.hpp @@ -76,4 +76,4 @@ private: }; } // namespace android -} // namespace mbgl
\ No newline at end of file +} // namespace mbgl diff --git a/platform/darwin/src/MGLMapSnapshotter.mm b/platform/darwin/src/MGLMapSnapshotter.mm index 2a2bef8fb8..d9fa044217 100644 --- a/platform/darwin/src/MGLMapSnapshotter.mm +++ b/platform/darwin/src/MGLMapSnapshotter.mm @@ -469,7 +469,7 @@ const CGFloat MGLSnapshotterMinimumPixelSize = 64; } // Create the snapshotter - _mbglMapSnapshotter = std::make_unique<mbgl::MapSnapshotter>(*mbglFileSource, *_mbglThreadPool, style, size, pixelRatio, cameraOptions, coordinateBounds); + _mbglMapSnapshotter = std::make_unique<mbgl::MapSnapshotter>(mbglFileSource, _mbglThreadPool, style, size, pixelRatio, cameraOptions, coordinateBounds); } @end diff --git a/platform/default/default_file_source.cpp b/platform/default/default_file_source.cpp index 89aabeb8d3..f070121497 100644 --- a/platform/default/default_file_source.cpp +++ b/platform/default/default_file_source.cpp @@ -19,15 +19,10 @@ namespace mbgl { class DefaultFileSource::Impl { public: - Impl(ActorRef<Impl> self, std::shared_ptr<FileSource> assetFileSource_, const std::string& cachePath, uint64_t maximumCacheSize) + Impl(std::shared_ptr<FileSource> assetFileSource_, std::string cachePath, uint64_t maximumCacheSize) : assetFileSource(assetFileSource_) - , localFileSource(std::make_unique<LocalFileSource>()) { - // Initialize the Database asynchronously so as to not block Actor creation. - self.invoke(&Impl::initializeOfflineDatabase, cachePath, maximumCacheSize); - } - - void initializeOfflineDatabase(std::string cachePath, uint64_t maximumCacheSize) { - offlineDatabase = std::make_unique<OfflineDatabase>(cachePath, maximumCacheSize); + , localFileSource(std::make_unique<LocalFileSource>()) + , offlineDatabase(std::make_unique<OfflineDatabase>(cachePath, maximumCacheSize)) { } void setAPIBaseURL(const std::string& url) { diff --git a/platform/default/mbgl/map/map_snapshotter.cpp b/platform/default/mbgl/map/map_snapshotter.cpp index a909e3fe9b..149ef22e7a 100644 --- a/platform/default/mbgl/map/map_snapshotter.cpp +++ b/platform/default/mbgl/map/map_snapshotter.cpp @@ -13,8 +13,8 @@ namespace mbgl { class MapSnapshotter::Impl { public: - Impl(FileSource&, - Scheduler&, + Impl(FileSource*, + std::shared_ptr<Scheduler>, const std::pair<bool, std::string> style, const Size&, const float pixelRatio, @@ -40,20 +40,22 @@ public: void snapshot(ActorRef<MapSnapshotter::Callback>); private: + std::shared_ptr<Scheduler> scheduler; HeadlessFrontend frontend; Map map; }; -MapSnapshotter::Impl::Impl(FileSource& fileSource, - Scheduler& scheduler, +MapSnapshotter::Impl::Impl(FileSource* fileSource, + std::shared_ptr<Scheduler> scheduler_, const std::pair<bool, std::string> style, const Size& size, const float pixelRatio, const optional<CameraOptions> cameraOptions, const optional<LatLngBounds> region, const optional<std::string> programCacheDir) - : frontend(size, pixelRatio, fileSource, scheduler, programCacheDir) - , map(frontend, MapObserver::nullObserver(), size, pixelRatio, fileSource, scheduler, MapMode::Static) { + : scheduler(std::move(scheduler_)) + , frontend(size, pixelRatio, *fileSource, *scheduler, programCacheDir) + , map(frontend, MapObserver::nullObserver(), size, pixelRatio, *fileSource, *scheduler, MapMode::Static) { if (style.first) { map.getStyle().loadJSON(style.second); @@ -149,15 +151,15 @@ LatLngBounds MapSnapshotter::Impl::getRegion() const { return map.latLngBoundsForCamera(getCameraOptions()); } -MapSnapshotter::MapSnapshotter(FileSource& fileSource, - Scheduler& scheduler, +MapSnapshotter::MapSnapshotter(FileSource* fileSource, + std::shared_ptr<Scheduler> scheduler, const std::pair<bool, std::string> style, const Size& size, const float pixelRatio, const optional<CameraOptions> cameraOptions, const optional<LatLngBounds> region, const optional<std::string> programCacheDir) - : impl(std::make_unique<util::Thread<MapSnapshotter::Impl>>("Map Snapshotter", fileSource, scheduler, style, size, pixelRatio, cameraOptions, region, programCacheDir)) { + : impl(std::make_unique<util::Thread<MapSnapshotter::Impl>>("Map Snapshotter", fileSource, std::move(scheduler), style, size, pixelRatio, cameraOptions, region, programCacheDir)) { } MapSnapshotter::~MapSnapshotter() = default; diff --git a/platform/default/mbgl/map/map_snapshotter.hpp b/platform/default/mbgl/map/map_snapshotter.hpp index b9e6307664..f40d1e4b77 100644 --- a/platform/default/mbgl/map/map_snapshotter.hpp +++ b/platform/default/mbgl/map/map_snapshotter.hpp @@ -25,8 +25,8 @@ class Style; class MapSnapshotter { public: - MapSnapshotter(FileSource& fileSource, - Scheduler& scheduler, + MapSnapshotter(FileSource* fileSource, + std::shared_ptr<Scheduler> scheduler, const std::pair<bool, std::string> style, const Size&, const float pixelRatio, diff --git a/src/mbgl/actor/mailbox.cpp b/src/mbgl/actor/mailbox.cpp index 373c24275f..8ee8dca114 100644 --- a/src/mbgl/actor/mailbox.cpp +++ b/src/mbgl/actor/mailbox.cpp @@ -6,8 +6,30 @@ namespace mbgl { +Mailbox::Mailbox() { +} + Mailbox::Mailbox(Scheduler& scheduler_) - : scheduler(scheduler_) { + : scheduler(&scheduler_) { +} + +void Mailbox::open(Scheduler& scheduler_) { + assert(!scheduler); + + // As with close(), block until neither receive() nor push() are in progress, and acquire the two + // mutexes in the same order. + std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex); + std::lock_guard<std::mutex> pushingLock(pushingMutex); + + scheduler = &scheduler_; + + if (closed) { + return; + } + + if (!queue.empty()) { + (*scheduler)->schedule(shared_from_this()); + } } void Mailbox::close() { @@ -22,6 +44,9 @@ void Mailbox::close() { closed = true; } +bool Mailbox::isOpen() const { return bool(scheduler); } + + void Mailbox::push(std::unique_ptr<Message> message) { std::lock_guard<std::mutex> pushingLock(pushingMutex); @@ -32,13 +57,15 @@ void Mailbox::push(std::unique_ptr<Message> message) { std::lock_guard<std::mutex> queueLock(queueMutex); bool wasEmpty = queue.empty(); queue.push(std::move(message)); - if (wasEmpty) { - scheduler.schedule(shared_from_this()); + if (wasEmpty && scheduler) { + (*scheduler)->schedule(shared_from_this()); } } void Mailbox::receive() { std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex); + + assert(scheduler); if (closed) { return; @@ -58,7 +85,7 @@ void Mailbox::receive() { (*message)(); if (!wasEmpty) { - scheduler.schedule(shared_from_this()); + (*scheduler)->schedule(shared_from_this()); } } diff --git a/src/mbgl/sprite/sprite_loader.cpp b/src/mbgl/sprite/sprite_loader.cpp index 93d6dfd9ae..df4fe6e8df 100644 --- a/src/mbgl/sprite/sprite_loader.cpp +++ b/src/mbgl/sprite/sprite_loader.cpp @@ -86,7 +86,7 @@ void SpriteLoader::emitSpriteLoadedIfComplete() { return; } - loader->worker.invoke(&SpriteLoaderWorker::parse, loader->image, loader->json); + loader->worker.self().invoke(&SpriteLoaderWorker::parse, loader->image, loader->json); } void SpriteLoader::onParsed(std::vector<std::unique_ptr<style::Image>>&& result) { diff --git a/src/mbgl/style/sources/custom_geometry_source.cpp b/src/mbgl/style/sources/custom_geometry_source.cpp index b37490a5ce..6ce7c1be11 100644 --- a/src/mbgl/style/sources/custom_geometry_source.cpp +++ b/src/mbgl/style/sources/custom_geometry_source.cpp @@ -30,15 +30,15 @@ void CustomGeometrySource::loadDescription(FileSource&) { void CustomGeometrySource::setTileData(const CanonicalTileID& tileID, const GeoJSON& data) { - loader->invoke(&CustomTileLoader::setTileData, tileID, data); + loader->self().invoke(&CustomTileLoader::setTileData, tileID, data); } void CustomGeometrySource::invalidateTile(const CanonicalTileID& tileID) { - loader->invoke(&CustomTileLoader::invalidateTile, tileID); + loader->self().invoke(&CustomTileLoader::invalidateTile, tileID); } void CustomGeometrySource::invalidateRegion(const LatLngBounds& bounds) { - loader->invoke(&CustomTileLoader::invalidateRegion, bounds, impl().getZoomRange()); + loader->self().invoke(&CustomTileLoader::invalidateRegion, bounds, impl().getZoomRange()); } } // namespace style diff --git a/src/mbgl/tile/geometry_tile.cpp b/src/mbgl/tile/geometry_tile.cpp index af28fe3963..d686d8440b 100644 --- a/src/mbgl/tile/geometry_tile.cpp +++ b/src/mbgl/tile/geometry_tile.cpp @@ -86,7 +86,7 @@ void GeometryTile::setData(std::unique_ptr<const GeometryTileData> data_) { pending = true; ++correlationID; - worker.invoke(&GeometryTileWorker::setData, std::move(data_), correlationID); + worker.self().invoke(&GeometryTileWorker::setData, std::move(data_), correlationID); } @@ -112,14 +112,14 @@ void GeometryTile::setLayers(const std::vector<Immutable<Layer::Impl>>& layers) } ++correlationID; - worker.invoke(&GeometryTileWorker::setLayers, std::move(impls), correlationID); + worker.self().invoke(&GeometryTileWorker::setLayers, std::move(impls), correlationID); } void GeometryTile::setShowCollisionBoxes(const bool showCollisionBoxes_) { if (showCollisionBoxes != showCollisionBoxes_) { showCollisionBoxes = showCollisionBoxes_; ++correlationID; - worker.invoke(&GeometryTileWorker::setShowCollisionBoxes, showCollisionBoxes, correlationID); + worker.self().invoke(&GeometryTileWorker::setShowCollisionBoxes, showCollisionBoxes, correlationID); } } @@ -153,7 +153,7 @@ void GeometryTile::onError(std::exception_ptr err, const uint64_t resultCorrelat } void GeometryTile::onGlyphsAvailable(GlyphMap glyphs) { - worker.invoke(&GeometryTileWorker::onGlyphsAvailable, std::move(glyphs)); + worker.self().invoke(&GeometryTileWorker::onGlyphsAvailable, std::move(glyphs)); } void GeometryTile::getGlyphs(GlyphDependencies glyphDependencies) { @@ -161,7 +161,7 @@ void GeometryTile::getGlyphs(GlyphDependencies glyphDependencies) { } void GeometryTile::onImagesAvailable(ImageMap images, uint64_t imageCorrelationID) { - worker.invoke(&GeometryTileWorker::onImagesAvailable, std::move(images), imageCorrelationID); + worker.self().invoke(&GeometryTileWorker::onImagesAvailable, std::move(images), imageCorrelationID); } void GeometryTile::getImages(ImageRequestPair pair) { diff --git a/src/mbgl/tile/raster_dem_tile.cpp b/src/mbgl/tile/raster_dem_tile.cpp index 5db298cf4c..f29861ee71 100644 --- a/src/mbgl/tile/raster_dem_tile.cpp +++ b/src/mbgl/tile/raster_dem_tile.cpp @@ -48,7 +48,7 @@ void RasterDEMTile::setMetadata(optional<Timestamp> modified_, optional<Timestam void RasterDEMTile::setData(std::shared_ptr<const std::string> data) { pending = true; ++correlationID; - worker.invoke(&RasterDEMTileWorker::parse, data, correlationID, encoding); + worker.self().invoke(&RasterDEMTileWorker::parse, data, correlationID, encoding); } void RasterDEMTile::onParsed(std::unique_ptr<HillshadeBucket> result, const uint64_t resultCorrelationID) { diff --git a/src/mbgl/tile/raster_tile.cpp b/src/mbgl/tile/raster_tile.cpp index ff23d4493e..cc71c04ba1 100644 --- a/src/mbgl/tile/raster_tile.cpp +++ b/src/mbgl/tile/raster_tile.cpp @@ -37,7 +37,7 @@ void RasterTile::setMetadata(optional<Timestamp> modified_, optional<Timestamp> void RasterTile::setData(std::shared_ptr<const std::string> data) { pending = true; ++correlationID; - worker.invoke(&RasterTileWorker::parse, data, correlationID); + worker.self().invoke(&RasterTileWorker::parse, data, correlationID); } void RasterTile::onParsed(std::unique_ptr<RasterBucket> result, const uint64_t resultCorrelationID) { diff --git a/test/actor/actor.test.cpp b/test/actor/actor.test.cpp index 967dc152d9..7493abe263 100644 --- a/test/actor/actor.test.cpp +++ b/test/actor/actor.test.cpp @@ -1,5 +1,6 @@ #include <mbgl/actor/actor.hpp> #include <mbgl/util/default_thread_pool.hpp> +#include <mbgl/util/run_loop.hpp> #include <mbgl/test/util.hpp> @@ -7,13 +8,12 @@ #include <functional> #include <future> #include <memory> +#include <thread> using namespace mbgl; using namespace std::chrono_literals; TEST(Actor, Construction) { - // Construction is currently synchronous. It may become asynchronous in the future. - struct Test { Test(ActorRef<Test>, bool& constructed) { constructed = true; @@ -27,6 +27,25 @@ TEST(Actor, Construction) { EXPECT_TRUE(constructed); } +TEST(Actor, Destruction) { + struct Test { + Test(ActorRef<Test>, bool& destructed_) : destructed(destructed_) {}; + ~Test() { + destructed = true; + } + + bool& destructed; + }; + + ThreadPool pool { 1 }; + bool destructed = false; + { + Actor<Test> test(pool, std::ref(destructed)); + } + + EXPECT_TRUE(destructed); +} + TEST(Actor, DestructionBlocksOnReceive) { // Destruction blocks until the actor is not receiving. @@ -63,7 +82,7 @@ TEST(Actor, DestructionBlocksOnReceive) { Actor<Test> test(pool, std::move(enteredPromise), std::move(exitingFuture)); - test.invoke(&Test::wait); + test.self().invoke(&Test::wait); enteredFuture.wait(); exitingPromise.set_value(); } @@ -145,7 +164,7 @@ TEST(Actor, DestructionAllowedInReceiveOnSameThread) { auto test = std::make_unique<Actor<Test>>(pool); // Callback (triggered while mutex is locked in Mailbox::receive()) - test->invoke(&Test::callMeBack, [&]() { + test->self().invoke(&Test::callMeBack, [&]() { // Destroy the Actor/Mailbox in the same thread test.reset(); callbackFiredPromise.set_value(); @@ -180,16 +199,16 @@ TEST(Actor, SelfDestructionDoesntCrashWaitingReceivingThreads) { std::atomic<bool> waitingMessageProcessed {false}; // Callback (triggered while mutex is locked in Mailbox::receive()) - closingActor->invoke(&Test::callMeBack, [&]() { + closingActor->self().invoke(&Test::callMeBack, [&]() { // Queue up another message from another thread std::promise<void> messageQueuedPromise; - waitingActor->invoke(&Test::callMeBack, [&]() { + waitingActor->self().invoke(&Test::callMeBack, [&]() { // This will be waiting on the mutex in // Mailbox::receive(), holding a lock // on the weak_ptr so the mailbox is not // destroyed - closingActor->invoke(&Test::callMeBack, [&]() { + closingActor->self().invoke(&Test::callMeBack, [&]() { waitingMessageProcessed.store(true); }); messageQueuedPromise.set_value(); @@ -239,10 +258,10 @@ TEST(Actor, OrderedMailbox) { Actor<Test> test(pool, std::move(endedPromise)); for (auto i = 1; i <= 10; ++i) { - test.invoke(&Test::receive, i); + test.self().invoke(&Test::receive, i); } - test.invoke(&Test::end); + test.self().invoke(&Test::end); endedFuture.wait(); } @@ -275,10 +294,10 @@ TEST(Actor, NonConcurrentMailbox) { Actor<Test> test(pool, std::move(endedPromise)); for (auto i = 1; i <= 10; ++i) { - test.invoke(&Test::receive, i); + test.self().invoke(&Test::receive, i); } - test.invoke(&Test::end); + test.self().invoke(&Test::end); endedFuture.wait(); } @@ -297,7 +316,7 @@ TEST(Actor, Ask) { ThreadPool pool { 2 }; Actor<Test> test(pool); - auto result = test.ask(&Test::doubleIt, 1); + auto result = test.self().ask(&Test::doubleIt, 1); ASSERT_TRUE(result.valid()); @@ -324,7 +343,7 @@ TEST(Actor, AskVoid) { bool executed = false; Actor<Test> actor(pool, executed); - actor.ask(&Test::doIt).get(); + actor.self().ask(&Test::doIt).get(); EXPECT_TRUE(executed); } @@ -355,6 +374,58 @@ TEST(Actor, NoSelfActorRef) { auto future = promise.get_future(); Actor<WithArguments> withArguments(pool, std::move(promise)); - withArguments.invoke(&WithArguments::receive); + withArguments.self().invoke(&WithArguments::receive); future.wait(); } + +TEST(Actor, TwoPhaseConstruction) { + // This test mimics, in simplified form, the approach used by the Thread<Object> to construct + // its actor in two parts so that the Thread<Object> instance can be created without waiting + // for the target thread to be up and running. + + struct Test { + Test(ActorRef<Test>, std::shared_ptr<bool> destroyed_) + : destroyed(std::move(destroyed_)) {}; + + ~Test() { + *destroyed = true; + } + + void callMe(std::promise<void> p) { + p.set_value(); + } + + void stop() { + util::RunLoop::Get()->stop(); + } + + std::shared_ptr<bool> destroyed; + }; + + AspiringActor<Test> parent; + + auto destroyed = std::make_shared<bool>(false); + + std::promise<void> queueExecuted; + auto queueExecutedFuture = queueExecuted.get_future(); + + parent.self().invoke(&Test::callMe, std::move(queueExecuted)); + parent.self().invoke(&Test::stop); + + auto thread = std::thread([ + capturedArgs = std::make_tuple(destroyed), + &parent + ] () mutable { + util::RunLoop loop(util::RunLoop::Type::New); + EstablishedActor<Test> test(loop, parent, capturedArgs); + loop.run(); + }); + + // should not hang + queueExecutedFuture.get(); + thread.join(); + + EXPECT_TRUE(*destroyed); +} + + diff --git a/test/util/thread.test.cpp b/test/util/thread.test.cpp index 76fb5ce3f0..2bcb9d8959 100644 --- a/test/util/thread.test.cpp +++ b/test/util/thread.test.cpp @@ -15,11 +15,11 @@ class TestObject { public: TestObject(ActorRef<TestObject>, std::thread::id otherTid) : tid(std::this_thread::get_id()) { - EXPECT_NE(tid, otherTid); + EXPECT_NE(tid, otherTid); // Object is created on child thread } ~TestObject() { - EXPECT_EQ(tid, std::this_thread::get_id()); + EXPECT_EQ(tid, std::this_thread::get_id()); // Object is destroyed on child thread } void fn1(int val) const { @@ -275,3 +275,60 @@ TEST(Thread, PauseResume) { thread.actor().invoke(&TestWorker::send, [&] { loop.stop(); }); loop.run(); } + + +class TestWorkerDelayedConstruction { +public: + TestWorkerDelayedConstruction(ActorRef<TestWorkerDelayedConstruction>, std::future<void> start) { + start.get(); + } + + void send(std::function<void ()> cb) { + cb(); + } + +private: + Timer timer; +}; + +TEST(Thread, InvokeBeforeChildStarts) { + RunLoop loop; + + std::promise<void> start; + Thread<TestWorkerDelayedConstruction> thread("Test", start.get_future()); + + std::atomic<int> count { 0 }; + + for (unsigned i = 0; i < 100; ++i) { + thread.actor().invoke(&TestWorkerDelayedConstruction::send, [&] { ++count; }); + } + + thread.actor().invoke(&TestWorkerDelayedConstruction::send, [&] { loop.stop(); }); + + // This test will be flaky if messages are consumed before the target object is constructed. + ASSERT_EQ(count, 0); + + start.set_value(); + + loop.run(); + + ASSERT_EQ(count, 100); +} + +TEST(Thread, DeleteBeforeChildStarts) { + std::atomic_bool flag(false); + std::promise<void> start; + + Thread<TestWorker> control("Control"); + auto thread = std::make_unique<Thread<TestWorkerDelayedConstruction>>("Test", start.get_future()); + + thread->actor().invoke(&TestWorkerDelayedConstruction::send, [&] { flag = true; }); + + control.actor().invoke(&TestWorker::sendDelayed, [&] { start.set_value(); }); + + // Should not hang. + thread.reset(); + + // Should process the queue before destruction. + ASSERT_TRUE(flag); +} |