diff options
author | Peter Liu <peterqliu@gmail.com> | 2015-04-09 16:00:08 -0700 |
---|---|---|
committer | Peter Liu <peterqliu@gmail.com> | 2015-04-09 16:00:08 -0700 |
commit | 8f987ae4cd32440943f39691065ea170fe1a6338 (patch) | |
tree | 2c0d8fb3101a8f2e092cee1cb4acc088815f3507 | |
parent | 051c9c0b1285c5df59251d66b7865e0d6706ca4f (diff) | |
parent | 566c6718d7ebfa2a8b25f4722b8c97d06a96418b (diff) | |
download | qtlocation-mapboxgl-8f987ae4cd32440943f39691065ea170fe1a6338.tar.gz |
Merge branch 'master' of https://github.com/mapbox/mapbox-gl-native
-rw-r--r-- | include/mbgl/map/map.hpp | 8 | ||||
-rw-r--r-- | include/mbgl/platform/platform.hpp | 2 | ||||
-rw-r--r-- | include/mbgl/util/uv-messenger.h | 33 | ||||
-rw-r--r-- | include/mbgl/util/uv.hpp | 1 | ||||
-rw-r--r-- | src/mbgl/map/map.cpp | 5 | ||||
-rw-r--r-- | src/mbgl/map/source.cpp | 4 | ||||
-rw-r--r-- | src/mbgl/map/source.hpp | 5 | ||||
-rw-r--r-- | src/mbgl/map/tile_data.cpp | 22 | ||||
-rw-r--r-- | src/mbgl/map/tile_data.hpp | 11 | ||||
-rw-r--r-- | src/mbgl/util/channel.hpp | 39 | ||||
-rw-r--r-- | src/mbgl/util/queue.h | 92 | ||||
-rw-r--r-- | src/mbgl/util/uv-channel.c | 69 | ||||
-rw-r--r-- | src/mbgl/util/uv-channel.h | 29 | ||||
-rw-r--r-- | src/mbgl/util/uv-messenger.c | 86 | ||||
-rw-r--r-- | src/mbgl/util/uv-worker.c | 170 | ||||
-rw-r--r-- | src/mbgl/util/uv-worker.h | 41 | ||||
-rw-r--r-- | src/mbgl/util/uv_detail.hpp | 51 | ||||
-rw-r--r-- | src/mbgl/util/worker.cpp | 73 | ||||
-rw-r--r-- | src/mbgl/util/worker.hpp | 44 |
19 files changed, 180 insertions, 605 deletions
diff --git a/include/mbgl/map/map.hpp b/include/mbgl/map/map.hpp index 129d463b25..38a37a8057 100644 --- a/include/mbgl/map/map.hpp +++ b/include/mbgl/map/map.hpp @@ -7,7 +7,6 @@ #include <mbgl/util/geo.hpp> #include <mbgl/util/projection.hpp> #include <mbgl/util/noncopyable.hpp> -#include <mbgl/util/uv.hpp> #include <mbgl/util/ptr.hpp> #include <mbgl/util/vec.hpp> @@ -22,6 +21,8 @@ #include <condition_variable> #include <functional> +namespace uv { class async; } + namespace mbgl { class Painter; @@ -40,6 +41,7 @@ class Environment; class EnvironmentScope; class AnnotationManager; class MapData; +class Worker; class Map : private util::noncopyable { friend class View; @@ -165,7 +167,7 @@ private: void resize(uint16_t width, uint16_t height, float ratio, uint16_t fbWidth, uint16_t fbHeight); util::ptr<Sprite> getSprite(); - uv::worker& getWorker(); + Worker& getWorker(); // Checks if render thread needs to pause void checkForPause(); @@ -207,7 +209,7 @@ private: View &view; private: - std::unique_ptr<uv::worker> workers; + std::unique_ptr<Worker> workers; std::thread thread; std::unique_ptr<uv::async> asyncTerminate; std::unique_ptr<uv::async> asyncUpdate; diff --git a/include/mbgl/platform/platform.hpp b/include/mbgl/platform/platform.hpp index ac90d0d3d0..cd87e2256d 100644 --- a/include/mbgl/platform/platform.hpp +++ b/include/mbgl/platform/platform.hpp @@ -1,8 +1,6 @@ #ifndef MBGL_PLATFORM_PLATFORM #define MBGL_PLATFORM_PLATFORM -#include <mbgl/util/uv.hpp> - #include <memory> #include <string> diff --git a/include/mbgl/util/uv-messenger.h b/include/mbgl/util/uv-messenger.h deleted file mode 100644 index 35c5ce2188..0000000000 --- a/include/mbgl/util/uv-messenger.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef MBGL_UTIL_UV_MESSENGER -#define MBGL_UTIL_UV_MESSENGER - -#include <uv.h> - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct uv_messenger_s uv_messenger_t; -typedef void (*uv_messenger_cb)(void *arg); -typedef void (*uv_messenger_stop_cb)(uv_messenger_t *msgr); - -struct uv_messenger_s { - uv_mutex_t mutex; - uv_async_t async; - uv_messenger_cb callback; - uv_messenger_stop_cb stop_callback; - void *data; - void *queue[2]; -}; - -int uv_messenger_init(uv_loop_t *loop, uv_messenger_t *msgr, uv_messenger_cb callback); -void uv_messenger_send(uv_messenger_t *msgr, void *arg); -void uv_messenger_stop(uv_messenger_t *msgr, uv_messenger_stop_cb stop_callback); -void uv_messenger_ref(uv_messenger_t *msgr); -void uv_messenger_unref(uv_messenger_t *msgr); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/include/mbgl/util/uv.hpp b/include/mbgl/util/uv.hpp index 6f0a916040..19c6c5b4df 100644 --- a/include/mbgl/util/uv.hpp +++ b/include/mbgl/util/uv.hpp @@ -17,7 +17,6 @@ std::string cwd(); class rwlock; class loop; class async; -class worker; class mutex; class cond; diff --git a/src/mbgl/map/map.cpp b/src/mbgl/map/map.cpp index 5e046766e6..f76845e3a7 100644 --- a/src/mbgl/map/map.cpp +++ b/src/mbgl/map/map.cpp @@ -27,6 +27,7 @@ #include <mbgl/util/uv.hpp> #include <mbgl/util/mapbox.hpp> #include <mbgl/util/exception.hpp> +#include <mbgl/util/worker.hpp> #include <algorithm> #include <iostream> @@ -105,7 +106,7 @@ Map::~Map() { env->performCleanup(); } -uv::worker &Map::getWorker() { +Worker& Map::getWorker() { assert(workers); return *workers; } @@ -266,7 +267,7 @@ void Map::run() { view.activate(); - workers = util::make_unique<uv::worker>(env->loop, 4, "Tile Worker"); + workers = util::make_unique<Worker>(env->loop, 4); setup(); prepare(); diff --git a/src/mbgl/map/source.cpp b/src/mbgl/map/source.cpp index fa7ab6b969..29f84171d1 100644 --- a/src/mbgl/map/source.cpp +++ b/src/mbgl/map/source.cpp @@ -215,7 +215,7 @@ TileData::State Source::hasTile(const TileID& id) { return TileData::State::invalid; } -TileData::State Source::addTile(Map &map, uv::worker &worker, +TileData::State Source::addTile(Map &map, Worker &worker, util::ptr<Style> style, GlyphAtlas &glyphAtlas, GlyphStore &glyphStore, SpriteAtlas &spriteAtlas, util::ptr<Sprite> sprite, TexturePool &texturePool, @@ -349,7 +349,7 @@ bool Source::findLoadedParent(const TileID& id, int32_t minCoveringZoom, std::fo } void Source::update(Map &map, - uv::worker &worker, + Worker &worker, util::ptr<Style> style, GlyphAtlas &glyphAtlas, GlyphStore &glyphStore, diff --git a/src/mbgl/map/source.hpp b/src/mbgl/map/source.hpp index 4a551cda5b..625647ac5e 100644 --- a/src/mbgl/map/source.hpp +++ b/src/mbgl/map/source.hpp @@ -21,6 +21,7 @@ namespace mbgl { class Map; class Environment; +class Worker; class GlyphAtlas; class GlyphStore; class SpriteAtlas; @@ -59,7 +60,7 @@ public: Environment&, std::function<void()> callback); - void update(Map &, uv::worker &, util::ptr<Style>, GlyphAtlas &, GlyphStore &, + void update(Map &, Worker &, util::ptr<Style>, GlyphAtlas &, GlyphStore &, SpriteAtlas &, util::ptr<Sprite>, TexturePool &, std::function<void()> callback); void invalidateTiles(const std::vector<TileID>&); @@ -80,7 +81,7 @@ private: int32_t coveringZoomLevel(const TransformState&) const; std::forward_list<TileID> coveringTiles(const TransformState&) const; - TileData::State addTile(Map &, uv::worker &, util::ptr<Style>, GlyphAtlas &, + TileData::State addTile(Map &, Worker &, util::ptr<Style>, GlyphAtlas &, GlyphStore &, SpriteAtlas &, util::ptr<Sprite>, TexturePool &, const TileID &, std::function<void()> callback); diff --git a/src/mbgl/map/tile_data.cpp b/src/mbgl/map/tile_data.cpp index 74993ff52a..72e356bbf2 100644 --- a/src/mbgl/map/tile_data.cpp +++ b/src/mbgl/map/tile_data.cpp @@ -3,7 +3,7 @@ #include <mbgl/map/source.hpp> #include <mbgl/storage/file_source.hpp> -#include <mbgl/util/uv_detail.hpp> +#include <mbgl/util/worker.hpp> #include <mbgl/platform/log.hpp> using namespace mbgl; @@ -27,7 +27,7 @@ const std::string TileData::toString() const { return std::string { "[tile " } + name + "]"; } -void TileData::request(uv::worker &worker, float pixelRatio, std::function<void()> callback) { +void TileData::request(Worker& worker, float pixelRatio, std::function<void()> callback) { std::string url = source.tileURL(id, pixelRatio); state = State::loading; @@ -57,18 +57,12 @@ void TileData::cancel() { } } -void TileData::reparse(uv::worker& worker, std::function<void()> callback) -{ - // We're creating a new work request. The work request deletes itself after it executed - // the after work handler - new uv::work<util::ptr<TileData>>( - worker, - [this](util::ptr<TileData>& tile) { - EnvironmentScope scope(env, ThreadType::TileWorker, "TileWorker_" + tile->name); +void TileData::reparse(Worker& worker, std::function<void()> callback) { + util::ptr<TileData> tile = shared_from_this(); + worker.send( + [tile]() { + EnvironmentScope scope(tile->env, ThreadType::TileWorker, "TileWorker_" + tile->name); tile->parse(); }, - [callback](util::ptr<TileData>&) { - callback(); - }, - shared_from_this()); + callback); } diff --git a/src/mbgl/map/tile_data.hpp b/src/mbgl/map/tile_data.hpp index 6fbc9ec3fb..92d9778fd5 100644 --- a/src/mbgl/map/tile_data.hpp +++ b/src/mbgl/map/tile_data.hpp @@ -12,12 +12,6 @@ #include <string> #include <functional> -namespace uv { -class worker; -} - -typedef struct uv_loop_s uv_loop_t; - namespace mbgl { class Environment; @@ -25,6 +19,7 @@ class Painter; class SourceInfo; class StyleLayer; class Request; +class Worker; class TileData : public std::enable_shared_from_this<TileData>, private util::noncopyable { @@ -41,8 +36,8 @@ public: TileData(const TileID&, const SourceInfo&); ~TileData(); - void request(uv::worker&, float pixelRatio, std::function<void ()> callback); - void reparse(uv::worker&, std::function<void ()> callback); + void request(Worker&, float pixelRatio, std::function<void ()> callback); + void reparse(Worker&, std::function<void ()> callback); void cancel(); const std::string toString() const; diff --git a/src/mbgl/util/channel.hpp b/src/mbgl/util/channel.hpp new file mode 100644 index 0000000000..a48156f120 --- /dev/null +++ b/src/mbgl/util/channel.hpp @@ -0,0 +1,39 @@ +#ifndef MBGL_UTIL_CHANNEL +#define MBGL_UTIL_CHANNEL + +#include <mbgl/util/noncopyable.hpp> + +#include <mutex> +#include <condition_variable> +#include <queue> + +namespace mbgl { + +template <class T> +class Channel : public mbgl::util::noncopyable { +public: + void send(const T& t) { + std::unique_lock<std::mutex> lock(mutex); + queue.push(t); + condition.notify_one(); + } + + T receive() { + std::unique_lock<std::mutex> lock(mutex); + condition.wait(lock, [&](){ return !queue.empty(); }); + + T t = queue.front(); + queue.pop(); + + return t; + } + +private: + std::mutex mutex; + std::condition_variable condition; + std::queue<T> queue; +}; + +} + +#endif diff --git a/src/mbgl/util/queue.h b/src/mbgl/util/queue.h deleted file mode 100644 index fe02b454ea..0000000000 --- a/src/mbgl/util/queue.h +++ /dev/null @@ -1,92 +0,0 @@ -/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl> - * - * Permission to use, copy, modify, and/or distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -#ifndef QUEUE_H_ -#define QUEUE_H_ - -typedef void *QUEUE[2]; - -/* Private macros. */ -#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) -#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) - -/* Public macros. */ -#define QUEUE_DATA(ptr, type, field) \ - ((type *) ((char *) (ptr) - ((char *) &((type *) 0)->field))) - -#define QUEUE_FOREACH(q, h) \ - for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) - -#define QUEUE_EMPTY(q) \ - ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) - -#define QUEUE_HEAD(q) \ - (QUEUE_NEXT(q)) - -#define QUEUE_INIT(q) \ - do { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } \ - while (0) - -#define QUEUE_ADD(h, n) \ - do { \ - QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ - QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV(h) = QUEUE_PREV(n); \ - QUEUE_PREV_NEXT(h) = (h); \ - } \ - while (0) - -#define QUEUE_SPLIT(h, q, n) \ - do { \ - QUEUE_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(n) = (n); \ - QUEUE_NEXT(n) = (q); \ - QUEUE_PREV(h) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(h) = (h); \ - QUEUE_PREV(q) = (n); \ - } \ - while (0) - -#define QUEUE_INSERT_HEAD(h, q) \ - do { \ - QUEUE_NEXT(q) = QUEUE_NEXT(h); \ - QUEUE_PREV(q) = (h); \ - QUEUE_NEXT_PREV(q) = (q); \ - QUEUE_NEXT(h) = (q); \ - } \ - while (0) - -#define QUEUE_INSERT_TAIL(h, q) \ - do { \ - QUEUE_NEXT(q) = (h); \ - QUEUE_PREV(q) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(q) = (q); \ - QUEUE_PREV(h) = (q); \ - } \ - while (0) - -#define QUEUE_REMOVE(q) \ - do { \ - QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ - QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ - } \ - while (0) - -#endif /* QUEUE_H_ */ diff --git a/src/mbgl/util/uv-channel.c b/src/mbgl/util/uv-channel.c deleted file mode 100644 index 4e3b9fa5ff..0000000000 --- a/src/mbgl/util/uv-channel.c +++ /dev/null @@ -1,69 +0,0 @@ -#include <mbgl/util/uv-channel.h> -#include <mbgl/util/queue.h> - -#include <stdlib.h> - -// Taken from http://navaneeth.github.io/blog/2013/08/02/channels-in-libuv/ - -typedef struct { - void *data; - void *active_queue[2]; -} uv__chan_item_t; - -int uv_chan_init(uv_chan_t *chan) { - int r = uv_mutex_init(&chan->mutex); - if (r == -1) - return r; - - QUEUE_INIT(&chan->q); - - return uv_cond_init(&chan->cond); -} - -void uv_chan_send(uv_chan_t *chan, void *data) { - uv__chan_item_t *item = (uv__chan_item_t *)malloc(sizeof(uv__chan_item_t)); - item->data = data; - - uv_mutex_lock(&chan->mutex); - QUEUE_INSERT_TAIL(&chan->q, &item->active_queue); - uv_cond_signal(&chan->cond); - uv_mutex_unlock(&chan->mutex); -} - -void *uv_chan_receive(uv_chan_t *chan) { - uv__chan_item_t *item; - QUEUE *head; - void *data = NULL; - - uv_mutex_lock(&chan->mutex); - while (QUEUE_EMPTY(&chan->q)) { - uv_cond_wait(&chan->cond, &chan->mutex); - } - - head = QUEUE_HEAD(&chan->q); - item = QUEUE_DATA(head, uv__chan_item_t, active_queue); - data = item->data; - QUEUE_REMOVE(head); - free(item); - uv_mutex_unlock(&chan->mutex); - return data; -} - -void uv_chan_clear(uv_chan_t *chan) { - uv_mutex_lock(&chan->mutex); - uv__chan_item_t *item = NULL; - QUEUE *head = NULL; - while (!QUEUE_EMPTY(&chan->q)) { - head = QUEUE_HEAD(&chan->q); - item = QUEUE_DATA(head, uv__chan_item_t, active_queue); - QUEUE_REMOVE(head); - free(item); - } - uv_mutex_unlock(&chan->mutex); -} - -void uv_chan_destroy(uv_chan_t *chan) { - uv_chan_clear(chan); - uv_cond_destroy(&chan->cond); - uv_mutex_destroy(&chan->mutex); -} diff --git a/src/mbgl/util/uv-channel.h b/src/mbgl/util/uv-channel.h deleted file mode 100644 index 0a0d706477..0000000000 --- a/src/mbgl/util/uv-channel.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef MBGL_UTIL_UV_CHANNEL -#define MBGL_UTIL_UV_CHANNEL - -#include <uv.h> - -#ifdef __cplusplus -extern "C" { -#endif - -// Taken from http://navaneeth.github.io/blog/2013/08/02/channels-in-libuv/ - -typedef struct uv_chan_s uv_chan_t; - -struct uv_chan_s { - uv_mutex_t mutex; - uv_cond_t cond; - void *q[2]; -}; - -int uv_chan_init(uv_chan_t *chan); -void uv_chan_send(uv_chan_t *chan, void *data); -void *uv_chan_receive(uv_chan_t *chan); -void uv_chan_destroy(uv_chan_t *chan); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mbgl/util/uv-messenger.c b/src/mbgl/util/uv-messenger.c deleted file mode 100644 index 935b6f1c41..0000000000 --- a/src/mbgl/util/uv-messenger.c +++ /dev/null @@ -1,86 +0,0 @@ -#include <mbgl/util/uv-messenger.h> -#include <mbgl/util/queue.h> - -#include <stdlib.h> -#include <assert.h> - -typedef struct { - void *data; - void *queue[2]; -} uv__messenger_item_t; - -#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10 -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wunused-parameter" -void uv__messenger_callback(uv_async_t *async, int status) { -#pragma clang diagnostic pop -#else -void uv__messenger_callback(uv_async_t *async) { -#endif - uv_messenger_t *msgr = (uv_messenger_t *)async->data; - - uv__messenger_item_t *item; - QUEUE *head; - - while (1) { - uv_mutex_lock(&msgr->mutex); - if (QUEUE_EMPTY(&msgr->queue)) { - uv_mutex_unlock(&msgr->mutex); - break; - } - - head = QUEUE_HEAD(&msgr->queue); - item = QUEUE_DATA(head, uv__messenger_item_t, queue); - QUEUE_REMOVE(head); - uv_mutex_unlock(&msgr->mutex); - - msgr->callback(item->data); - - free(item); - } -} - -int uv_messenger_init(uv_loop_t *loop, uv_messenger_t *msgr, uv_messenger_cb callback) { - int ret = uv_mutex_init(&msgr->mutex); - if (ret < 0) { - return ret; - } - - msgr->callback = callback; - msgr->stop_callback = NULL; - - QUEUE_INIT(&msgr->queue); - - msgr->async.data = msgr; - return uv_async_init(loop, &msgr->async, uv__messenger_callback); -} - -void uv_messenger_send(uv_messenger_t *msgr, void *data) { - uv__messenger_item_t *item = (uv__messenger_item_t *)malloc(sizeof(uv__messenger_item_t)); - item->data = data; - - uv_mutex_lock(&msgr->mutex); - QUEUE_INSERT_TAIL(&msgr->queue, &item->queue); - uv_mutex_unlock(&msgr->mutex); - - uv_async_send(&msgr->async); -} - -void uv_messenger_ref(uv_messenger_t *msgr) { - uv_ref((uv_handle_t *)&msgr->async); -} - -void uv_messenger_unref(uv_messenger_t *msgr) { - uv_unref((uv_handle_t *)&msgr->async); -} - -void uv__messenger_stop_callback(uv_handle_t *handle) { - uv_messenger_t *msgr = (uv_messenger_t *)handle->data; - msgr->stop_callback(msgr); -} - -void uv_messenger_stop(uv_messenger_t *msgr, uv_messenger_stop_cb stop_callback) { - assert(!msgr->stop_callback); - msgr->stop_callback = stop_callback; - uv_close((uv_handle_t *)&msgr->async, uv__messenger_stop_callback); -} diff --git a/src/mbgl/util/uv-worker.c b/src/mbgl/util/uv-worker.c deleted file mode 100644 index ec8eae461c..0000000000 --- a/src/mbgl/util/uv-worker.c +++ /dev/null @@ -1,170 +0,0 @@ -#include <mbgl/util/uv-worker.h> -#include <mbgl/util/uv-messenger.h> - -#include <stdio.h> -#include <assert.h> - -typedef struct uv__worker_item_s uv__worker_item_t; -struct uv__worker_item_s { - uv_worker_t *worker; - void *data; - uv_worker_cb work_cb; - uv_worker_after_cb after_work_cb; -}; - -typedef struct uv__worker_thread_s uv__worker_thread_t; -struct uv__worker_thread_s { - uv_worker_t *worker; - uv_thread_t thread; -}; - -void uv__worker_free_messenger(uv_messenger_t *msgr) { - free(msgr); -} - -void uv__worker_thread_finished(uv__worker_thread_t *worker_thread) { - uv_worker_t *worker = worker_thread->worker; - -#ifdef DEBUG - assert(uv_thread_self() == worker->thread_id); -#endif - - // This should at most block very briefly. We are sending the termination - // notification as the last thing in the worker thread, so by now the thread - // has probably terminated already. If not, the waiting time should be - // extremely short. - uv_thread_join(&worker_thread->thread); - - assert(worker->count > 0); - worker->count--; - if (worker->count == 0) { - uv_chan_destroy(&worker->chan); - uv_messenger_stop(worker->msgr, uv__worker_free_messenger); - if (worker->close_cb) { - worker->close_cb(worker); - } - } -} - -void uv__worker_after(void *ptr) { - uv__worker_item_t *item = (uv__worker_item_t *)ptr; - - if (item->work_cb) { - // We are finishing a regular work request. - if (item->after_work_cb) { - assert(item->after_work_cb); - item->after_work_cb(item->data); - } - uv_worker_t *worker = item->worker; - assert(worker->active_items > 0); - if (--worker->active_items == 0) { - uv_messenger_unref(worker->msgr); - } - } else { - // This is a worker thread termination. - uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)item->data; - uv__worker_thread_finished(worker_thread); - free(worker_thread); - } - - free(item); -} - -void uv__worker_thread_loop(void *ptr) { - uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)ptr; - uv_worker_t *worker = worker_thread->worker; - -#ifdef __APPLE__ - if (worker->name) { - pthread_setname_np(worker->name); - } -#endif - - uv__worker_item_t *item = NULL; - while ((item = (uv__worker_item_t *)uv_chan_receive(&worker->chan)) != NULL) { - assert(item->work_cb); - item->work_cb(item->data); - - // Trigger the after callback in the main thread. - uv_messenger_send(worker->msgr, item); - } - - // Make sure to close all other workers too. - uv_chan_send(&worker->chan, NULL); - - // Create a new worker item that acts as a terminate flag for this thread. - item = (uv__worker_item_t *)malloc(sizeof(uv__worker_item_t)); - item->data = worker_thread; - item->work_cb = NULL; - item->after_work_cb = NULL; - uv_messenger_send(worker->msgr, item); -} - -int uv_worker_init(uv_worker_t *worker, uv_loop_t *loop, int count, const char *name) { -#ifdef DEBUG - worker->thread_id = uv_thread_self(); -#endif - worker->loop = loop; - worker->name = name; - worker->count = 0; - worker->close_cb = NULL; - worker->active_items = 0; - worker->msgr = (uv_messenger_t *)malloc(sizeof(uv_messenger_t)); - int ret = uv_messenger_init(loop, worker->msgr, uv__worker_after); - if (ret < 0) { - free(worker->msgr); - return ret; - } - uv_messenger_unref(worker->msgr); - ret = uv_chan_init(&worker->chan); - if (ret < 0) return ret; - - // Initialize all worker threads. - int i; - for (i = 0; i < count; i++) { - uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)malloc(sizeof(uv__worker_thread_t)); - worker_thread->worker = worker; - ret = uv_thread_create(&worker_thread->thread, uv__worker_thread_loop, worker_thread); - if (ret < 0) return ret; - worker->count++; - } - - return 0; -} - -void uv_worker_send(uv_worker_t *worker, void *data, uv_worker_cb work_cb, - uv_worker_after_cb after_work_cb) { -#ifdef DEBUG - assert(uv_thread_self() == worker->thread_id); -#endif - - // It doesn't make sense to not provide a work callback. On the other hand, the after_work_cb - // may be NULL. In that case, there will be no callback called in the current thread and the - // worker item will instead be freed in the worker thread. - assert(work_cb); - - uv__worker_item_t *item = (uv__worker_item_t *)malloc(sizeof(uv__worker_item_t)); - item->worker = worker; - item->work_cb = work_cb; - item->after_work_cb = after_work_cb; - item->data = data; - uv_chan_send(&worker->chan, item); - if (worker->active_items++ == 0) { - uv_messenger_ref(worker->msgr); - } -} - -void uv_worker_close(uv_worker_t *worker, uv_worker_close_cb close_cb) { -#ifdef DEBUG - assert(uv_thread_self() == worker->thread_id); -#endif - - // Prevent double calling. - assert(worker->close_cb == NULL); - - worker->close_cb = close_cb; - uv_chan_send(&worker->chan, NULL); - if (worker->active_items++ == 0) { - uv_messenger_ref(worker->msgr); - } -} diff --git a/src/mbgl/util/uv-worker.h b/src/mbgl/util/uv-worker.h deleted file mode 100644 index 65ad42edb8..0000000000 --- a/src/mbgl/util/uv-worker.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef MBGL_UTIL_UV_WORKER -#define MBGL_UTIL_UV_WORKER - -#include <stdlib.h> - -#include <mbgl/util/uv-messenger.h> -#include <mbgl/util/uv-channel.h> - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct uv_worker_s uv_worker_t; - -typedef void (*uv_worker_cb)(void *data); -typedef void (*uv_worker_after_cb)(void *data); -typedef void (*uv_worker_close_cb)(uv_worker_t *worker); - -struct uv_worker_s { -#ifdef DEBUG - unsigned long thread_id; -#endif - uv_loop_t *loop; - uv_messenger_t *msgr; - uv_chan_t chan; - const char *name; - int count; - uv_worker_close_cb close_cb; - unsigned int active_items; -}; - -int uv_worker_init(uv_worker_t *worker, uv_loop_t *loop, int count, const char *name); -void uv_worker_send(uv_worker_t *worker, void *data, uv_worker_cb work_cb, - uv_worker_after_cb after_work_cb); -void uv_worker_close(uv_worker_t *worker, uv_worker_close_cb close_cb); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mbgl/util/uv_detail.hpp b/src/mbgl/util/uv_detail.hpp index 6ae3713e09..9d479da425 100644 --- a/src/mbgl/util/uv_detail.hpp +++ b/src/mbgl/util/uv_detail.hpp @@ -2,7 +2,6 @@ #define MBGL_UTIL_UV_DETAIL #include <mbgl/util/uv.hpp> -#include <mbgl/util/uv-worker.h> #include <mbgl/util/noncopyable.hpp> #include <uv.h> @@ -117,56 +116,6 @@ private: uv_rwlock_t mtx; }; -class worker : public mbgl::util::noncopyable { -public: - inline worker(uv_loop_t *loop, unsigned int count, const char *name = nullptr) : w(new uv_worker_t) { - uv_worker_init(w, loop, count, name); - } - inline ~worker() { - uv_worker_close(w, [](uv_worker_t *worker_) { - delete worker_; - }); - } - inline void add(void *data, uv_worker_cb work_cb, uv_worker_after_cb after_work_cb) { - uv_worker_send(w, data, work_cb, after_work_cb); - } - -private: - uv_worker_t *w; -}; - -template <typename T> -class work : public mbgl::util::noncopyable { -public: - typedef std::function<void (T&)> work_callback; - typedef std::function<void (T&)> after_work_callback; - - template<typename... Args> - work(worker &worker, work_callback work_cb_, after_work_callback after_work_cb_, Args&&... args) - : data(std::forward<Args>(args)...), - work_cb(work_cb_), - after_work_cb(after_work_cb_) { - worker.add(this, do_work, after_work); - } - -private: - static void do_work(void *data) { - work<T> *w = reinterpret_cast<work<T> *>(data); - w->work_cb(w->data); - } - - static void after_work(void *data) { - work<T> *w = reinterpret_cast<work<T> *>(data); - w->after_work_cb(w->data); - delete w; - } - -private: - T data; - work_callback work_cb; - after_work_callback after_work_cb; -}; - } #endif diff --git a/src/mbgl/util/worker.cpp b/src/mbgl/util/worker.cpp new file mode 100644 index 0000000000..3559cdd71f --- /dev/null +++ b/src/mbgl/util/worker.cpp @@ -0,0 +1,73 @@ +#include <mbgl/util/worker.hpp> + +#include <cassert> + +namespace mbgl { + +Worker::Worker(uv_loop_t* loop, std::size_t count) + : queue(new Queue(loop, [this](Fn after) { afterWork(after); })) +{ + queue->unref(); + + for (std::size_t i = 0; i < count; i++) { + threads.emplace_back(&Worker::workLoop, this); + } +} + +Worker::~Worker() { + MBGL_VERIFY_THREAD(tid); + + if (active++ == 0) { + queue->ref(); + } + + channel.send(Work()); + + for (auto& thread : threads) { + thread.join(); + } + + queue->stop(); +} + +void Worker::send(Fn work, Fn after) { + MBGL_VERIFY_THREAD(tid); + assert(work); + + if (active++ == 0) { + queue->ref(); + } + + channel.send({work, after}); +} + +void Worker::workLoop() { +#ifdef __APPLE__ + pthread_setname_np("Worker"); +#endif + + while (true) { + Work item = channel.receive(); + + if (!item.work) + break; + + item.work(); + queue->send(std::move(item.after)); + } + + // Make sure to close all other workers too. + channel.send(Work()); +} + +void Worker::afterWork(Fn after) { + if (after) { + after(); + } + + if (--active == 0) { + queue->unref(); + } +} + +} diff --git a/src/mbgl/util/worker.hpp b/src/mbgl/util/worker.hpp new file mode 100644 index 0000000000..86c2e6acf4 --- /dev/null +++ b/src/mbgl/util/worker.hpp @@ -0,0 +1,44 @@ +#ifndef MBGL_UTIL_WORKER +#define MBGL_UTIL_WORKER + +#include <mbgl/util/noncopyable.hpp> +#include <mbgl/util/async_queue.hpp> +#include <mbgl/util/channel.hpp> +#include <mbgl/util/util.hpp> + +#include <thread> +#include <functional> + +namespace mbgl { + +class Worker : public mbgl::util::noncopyable { +public: + using Fn = std::function<void ()>; + + Worker(uv_loop_t* loop, std::size_t count); + ~Worker(); + + void send(Fn work, Fn after); + +private: + void workLoop(); + void afterWork(Fn after); + + struct Work { + Fn work; + Fn after; + }; + + using Queue = util::AsyncQueue<std::function<void ()>>; + + std::size_t active = 0; + Queue* queue = nullptr; + Channel<Work> channel; + std::vector<std::thread> threads; + + MBGL_STORE_THREAD(tid) +}; + +} + +#endif |