summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Firebaugh <john.firebaugh@gmail.com>2015-04-09 11:51:57 -0700
committerJohn Firebaugh <john.firebaugh@gmail.com>2015-04-09 15:30:58 -0700
commit0c087603e602bbb5ad25b4924f49169a316f695e (patch)
treec3faa2dabe9deb07a307532b04c5836a30c0600a
parent0b779eaa8b96a65d5b97df7ad468e0380f50703f (diff)
downloadqtlocation-mapboxgl-0c087603e602bbb5ad25b4924f49169a316f695e.tar.gz
Replace uv_messenger with AsyncQueue; rewrite remaining uv_* in C++
-rw-r--r--include/mbgl/map/map.hpp5
-rw-r--r--include/mbgl/util/uv-messenger.h33
-rw-r--r--include/mbgl/util/uv.hpp1
-rw-r--r--src/mbgl/map/map.cpp5
-rw-r--r--src/mbgl/map/source.cpp4
-rw-r--r--src/mbgl/map/source.hpp5
-rw-r--r--src/mbgl/map/tile_data.cpp22
-rw-r--r--src/mbgl/map/tile_data.hpp11
-rw-r--r--src/mbgl/util/channel.hpp39
-rw-r--r--src/mbgl/util/queue.h92
-rw-r--r--src/mbgl/util/uv-channel.c69
-rw-r--r--src/mbgl/util/uv-channel.h29
-rw-r--r--src/mbgl/util/uv-messenger.c86
-rw-r--r--src/mbgl/util/uv-worker.c170
-rw-r--r--src/mbgl/util/uv-worker.h41
-rw-r--r--src/mbgl/util/uv_detail.hpp51
-rw-r--r--src/mbgl/util/worker.cpp73
-rw-r--r--src/mbgl/util/worker.hpp44
18 files changed, 178 insertions, 602 deletions
diff --git a/include/mbgl/map/map.hpp b/include/mbgl/map/map.hpp
index 129d463b25..8b9398f578 100644
--- a/include/mbgl/map/map.hpp
+++ b/include/mbgl/map/map.hpp
@@ -40,6 +40,7 @@ class Environment;
class EnvironmentScope;
class AnnotationManager;
class MapData;
+class Worker;
class Map : private util::noncopyable {
friend class View;
@@ -165,7 +166,7 @@ private:
void resize(uint16_t width, uint16_t height, float ratio, uint16_t fbWidth, uint16_t fbHeight);
util::ptr<Sprite> getSprite();
- uv::worker& getWorker();
+ Worker& getWorker();
// Checks if render thread needs to pause
void checkForPause();
@@ -207,7 +208,7 @@ private:
View &view;
private:
- std::unique_ptr<uv::worker> workers;
+ std::unique_ptr<Worker> workers;
std::thread thread;
std::unique_ptr<uv::async> asyncTerminate;
std::unique_ptr<uv::async> asyncUpdate;
diff --git a/include/mbgl/util/uv-messenger.h b/include/mbgl/util/uv-messenger.h
deleted file mode 100644
index 35c5ce2188..0000000000
--- a/include/mbgl/util/uv-messenger.h
+++ /dev/null
@@ -1,33 +0,0 @@
-#ifndef MBGL_UTIL_UV_MESSENGER
-#define MBGL_UTIL_UV_MESSENGER
-
-#include <uv.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct uv_messenger_s uv_messenger_t;
-typedef void (*uv_messenger_cb)(void *arg);
-typedef void (*uv_messenger_stop_cb)(uv_messenger_t *msgr);
-
-struct uv_messenger_s {
- uv_mutex_t mutex;
- uv_async_t async;
- uv_messenger_cb callback;
- uv_messenger_stop_cb stop_callback;
- void *data;
- void *queue[2];
-};
-
-int uv_messenger_init(uv_loop_t *loop, uv_messenger_t *msgr, uv_messenger_cb callback);
-void uv_messenger_send(uv_messenger_t *msgr, void *arg);
-void uv_messenger_stop(uv_messenger_t *msgr, uv_messenger_stop_cb stop_callback);
-void uv_messenger_ref(uv_messenger_t *msgr);
-void uv_messenger_unref(uv_messenger_t *msgr);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif
diff --git a/include/mbgl/util/uv.hpp b/include/mbgl/util/uv.hpp
index 6f0a916040..19c6c5b4df 100644
--- a/include/mbgl/util/uv.hpp
+++ b/include/mbgl/util/uv.hpp
@@ -17,7 +17,6 @@ std::string cwd();
class rwlock;
class loop;
class async;
-class worker;
class mutex;
class cond;
diff --git a/src/mbgl/map/map.cpp b/src/mbgl/map/map.cpp
index 5e046766e6..f76845e3a7 100644
--- a/src/mbgl/map/map.cpp
+++ b/src/mbgl/map/map.cpp
@@ -27,6 +27,7 @@
#include <mbgl/util/uv.hpp>
#include <mbgl/util/mapbox.hpp>
#include <mbgl/util/exception.hpp>
+#include <mbgl/util/worker.hpp>
#include <algorithm>
#include <iostream>
@@ -105,7 +106,7 @@ Map::~Map() {
env->performCleanup();
}
-uv::worker &Map::getWorker() {
+Worker& Map::getWorker() {
assert(workers);
return *workers;
}
@@ -266,7 +267,7 @@ void Map::run() {
view.activate();
- workers = util::make_unique<uv::worker>(env->loop, 4, "Tile Worker");
+ workers = util::make_unique<Worker>(env->loop, 4);
setup();
prepare();
diff --git a/src/mbgl/map/source.cpp b/src/mbgl/map/source.cpp
index fa7ab6b969..29f84171d1 100644
--- a/src/mbgl/map/source.cpp
+++ b/src/mbgl/map/source.cpp
@@ -215,7 +215,7 @@ TileData::State Source::hasTile(const TileID& id) {
return TileData::State::invalid;
}
-TileData::State Source::addTile(Map &map, uv::worker &worker,
+TileData::State Source::addTile(Map &map, Worker &worker,
util::ptr<Style> style, GlyphAtlas &glyphAtlas,
GlyphStore &glyphStore, SpriteAtlas &spriteAtlas,
util::ptr<Sprite> sprite, TexturePool &texturePool,
@@ -349,7 +349,7 @@ bool Source::findLoadedParent(const TileID& id, int32_t minCoveringZoom, std::fo
}
void Source::update(Map &map,
- uv::worker &worker,
+ Worker &worker,
util::ptr<Style> style,
GlyphAtlas &glyphAtlas,
GlyphStore &glyphStore,
diff --git a/src/mbgl/map/source.hpp b/src/mbgl/map/source.hpp
index 4a551cda5b..625647ac5e 100644
--- a/src/mbgl/map/source.hpp
+++ b/src/mbgl/map/source.hpp
@@ -21,6 +21,7 @@ namespace mbgl {
class Map;
class Environment;
+class Worker;
class GlyphAtlas;
class GlyphStore;
class SpriteAtlas;
@@ -59,7 +60,7 @@ public:
Environment&,
std::function<void()> callback);
- void update(Map &, uv::worker &, util::ptr<Style>, GlyphAtlas &, GlyphStore &,
+ void update(Map &, Worker &, util::ptr<Style>, GlyphAtlas &, GlyphStore &,
SpriteAtlas &, util::ptr<Sprite>, TexturePool &, std::function<void()> callback);
void invalidateTiles(const std::vector<TileID>&);
@@ -80,7 +81,7 @@ private:
int32_t coveringZoomLevel(const TransformState&) const;
std::forward_list<TileID> coveringTiles(const TransformState&) const;
- TileData::State addTile(Map &, uv::worker &, util::ptr<Style>, GlyphAtlas &,
+ TileData::State addTile(Map &, Worker &, util::ptr<Style>, GlyphAtlas &,
GlyphStore &, SpriteAtlas &, util::ptr<Sprite>, TexturePool &,
const TileID &, std::function<void()> callback);
diff --git a/src/mbgl/map/tile_data.cpp b/src/mbgl/map/tile_data.cpp
index 74993ff52a..72e356bbf2 100644
--- a/src/mbgl/map/tile_data.cpp
+++ b/src/mbgl/map/tile_data.cpp
@@ -3,7 +3,7 @@
#include <mbgl/map/source.hpp>
#include <mbgl/storage/file_source.hpp>
-#include <mbgl/util/uv_detail.hpp>
+#include <mbgl/util/worker.hpp>
#include <mbgl/platform/log.hpp>
using namespace mbgl;
@@ -27,7 +27,7 @@ const std::string TileData::toString() const {
return std::string { "[tile " } + name + "]";
}
-void TileData::request(uv::worker &worker, float pixelRatio, std::function<void()> callback) {
+void TileData::request(Worker& worker, float pixelRatio, std::function<void()> callback) {
std::string url = source.tileURL(id, pixelRatio);
state = State::loading;
@@ -57,18 +57,12 @@ void TileData::cancel() {
}
}
-void TileData::reparse(uv::worker& worker, std::function<void()> callback)
-{
- // We're creating a new work request. The work request deletes itself after it executed
- // the after work handler
- new uv::work<util::ptr<TileData>>(
- worker,
- [this](util::ptr<TileData>& tile) {
- EnvironmentScope scope(env, ThreadType::TileWorker, "TileWorker_" + tile->name);
+void TileData::reparse(Worker& worker, std::function<void()> callback) {
+ util::ptr<TileData> tile = shared_from_this();
+ worker.send(
+ [tile]() {
+ EnvironmentScope scope(tile->env, ThreadType::TileWorker, "TileWorker_" + tile->name);
tile->parse();
},
- [callback](util::ptr<TileData>&) {
- callback();
- },
- shared_from_this());
+ callback);
}
diff --git a/src/mbgl/map/tile_data.hpp b/src/mbgl/map/tile_data.hpp
index 6fbc9ec3fb..92d9778fd5 100644
--- a/src/mbgl/map/tile_data.hpp
+++ b/src/mbgl/map/tile_data.hpp
@@ -12,12 +12,6 @@
#include <string>
#include <functional>
-namespace uv {
-class worker;
-}
-
-typedef struct uv_loop_s uv_loop_t;
-
namespace mbgl {
class Environment;
@@ -25,6 +19,7 @@ class Painter;
class SourceInfo;
class StyleLayer;
class Request;
+class Worker;
class TileData : public std::enable_shared_from_this<TileData>,
private util::noncopyable {
@@ -41,8 +36,8 @@ public:
TileData(const TileID&, const SourceInfo&);
~TileData();
- void request(uv::worker&, float pixelRatio, std::function<void ()> callback);
- void reparse(uv::worker&, std::function<void ()> callback);
+ void request(Worker&, float pixelRatio, std::function<void ()> callback);
+ void reparse(Worker&, std::function<void ()> callback);
void cancel();
const std::string toString() const;
diff --git a/src/mbgl/util/channel.hpp b/src/mbgl/util/channel.hpp
new file mode 100644
index 0000000000..a48156f120
--- /dev/null
+++ b/src/mbgl/util/channel.hpp
@@ -0,0 +1,39 @@
+#ifndef MBGL_UTIL_CHANNEL
+#define MBGL_UTIL_CHANNEL
+
+#include <mbgl/util/noncopyable.hpp>
+
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+
+namespace mbgl {
+
+template <class T>
+class Channel : public mbgl::util::noncopyable {
+public:
+ void send(const T& t) {
+ std::unique_lock<std::mutex> lock(mutex);
+ queue.push(t);
+ condition.notify_one();
+ }
+
+ T receive() {
+ std::unique_lock<std::mutex> lock(mutex);
+ condition.wait(lock, [&](){ return !queue.empty(); });
+
+ T t = queue.front();
+ queue.pop();
+
+ return t;
+ }
+
+private:
+ std::mutex mutex;
+ std::condition_variable condition;
+ std::queue<T> queue;
+};
+
+}
+
+#endif
diff --git a/src/mbgl/util/queue.h b/src/mbgl/util/queue.h
deleted file mode 100644
index fe02b454ea..0000000000
--- a/src/mbgl/util/queue.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
- *
- * Permission to use, copy, modify, and/or distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
-
-#ifndef QUEUE_H_
-#define QUEUE_H_
-
-typedef void *QUEUE[2];
-
-/* Private macros. */
-#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
-#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
-#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
-#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
-
-/* Public macros. */
-#define QUEUE_DATA(ptr, type, field) \
- ((type *) ((char *) (ptr) - ((char *) &((type *) 0)->field)))
-
-#define QUEUE_FOREACH(q, h) \
- for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
-
-#define QUEUE_EMPTY(q) \
- ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
-
-#define QUEUE_HEAD(q) \
- (QUEUE_NEXT(q))
-
-#define QUEUE_INIT(q) \
- do { \
- QUEUE_NEXT(q) = (q); \
- QUEUE_PREV(q) = (q); \
- } \
- while (0)
-
-#define QUEUE_ADD(h, n) \
- do { \
- QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
- QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
- QUEUE_PREV(h) = QUEUE_PREV(n); \
- QUEUE_PREV_NEXT(h) = (h); \
- } \
- while (0)
-
-#define QUEUE_SPLIT(h, q, n) \
- do { \
- QUEUE_PREV(n) = QUEUE_PREV(h); \
- QUEUE_PREV_NEXT(n) = (n); \
- QUEUE_NEXT(n) = (q); \
- QUEUE_PREV(h) = QUEUE_PREV(q); \
- QUEUE_PREV_NEXT(h) = (h); \
- QUEUE_PREV(q) = (n); \
- } \
- while (0)
-
-#define QUEUE_INSERT_HEAD(h, q) \
- do { \
- QUEUE_NEXT(q) = QUEUE_NEXT(h); \
- QUEUE_PREV(q) = (h); \
- QUEUE_NEXT_PREV(q) = (q); \
- QUEUE_NEXT(h) = (q); \
- } \
- while (0)
-
-#define QUEUE_INSERT_TAIL(h, q) \
- do { \
- QUEUE_NEXT(q) = (h); \
- QUEUE_PREV(q) = QUEUE_PREV(h); \
- QUEUE_PREV_NEXT(q) = (q); \
- QUEUE_PREV(h) = (q); \
- } \
- while (0)
-
-#define QUEUE_REMOVE(q) \
- do { \
- QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
- QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
- } \
- while (0)
-
-#endif /* QUEUE_H_ */
diff --git a/src/mbgl/util/uv-channel.c b/src/mbgl/util/uv-channel.c
deleted file mode 100644
index 4e3b9fa5ff..0000000000
--- a/src/mbgl/util/uv-channel.c
+++ /dev/null
@@ -1,69 +0,0 @@
-#include <mbgl/util/uv-channel.h>
-#include <mbgl/util/queue.h>
-
-#include <stdlib.h>
-
-// Taken from http://navaneeth.github.io/blog/2013/08/02/channels-in-libuv/
-
-typedef struct {
- void *data;
- void *active_queue[2];
-} uv__chan_item_t;
-
-int uv_chan_init(uv_chan_t *chan) {
- int r = uv_mutex_init(&chan->mutex);
- if (r == -1)
- return r;
-
- QUEUE_INIT(&chan->q);
-
- return uv_cond_init(&chan->cond);
-}
-
-void uv_chan_send(uv_chan_t *chan, void *data) {
- uv__chan_item_t *item = (uv__chan_item_t *)malloc(sizeof(uv__chan_item_t));
- item->data = data;
-
- uv_mutex_lock(&chan->mutex);
- QUEUE_INSERT_TAIL(&chan->q, &item->active_queue);
- uv_cond_signal(&chan->cond);
- uv_mutex_unlock(&chan->mutex);
-}
-
-void *uv_chan_receive(uv_chan_t *chan) {
- uv__chan_item_t *item;
- QUEUE *head;
- void *data = NULL;
-
- uv_mutex_lock(&chan->mutex);
- while (QUEUE_EMPTY(&chan->q)) {
- uv_cond_wait(&chan->cond, &chan->mutex);
- }
-
- head = QUEUE_HEAD(&chan->q);
- item = QUEUE_DATA(head, uv__chan_item_t, active_queue);
- data = item->data;
- QUEUE_REMOVE(head);
- free(item);
- uv_mutex_unlock(&chan->mutex);
- return data;
-}
-
-void uv_chan_clear(uv_chan_t *chan) {
- uv_mutex_lock(&chan->mutex);
- uv__chan_item_t *item = NULL;
- QUEUE *head = NULL;
- while (!QUEUE_EMPTY(&chan->q)) {
- head = QUEUE_HEAD(&chan->q);
- item = QUEUE_DATA(head, uv__chan_item_t, active_queue);
- QUEUE_REMOVE(head);
- free(item);
- }
- uv_mutex_unlock(&chan->mutex);
-}
-
-void uv_chan_destroy(uv_chan_t *chan) {
- uv_chan_clear(chan);
- uv_cond_destroy(&chan->cond);
- uv_mutex_destroy(&chan->mutex);
-}
diff --git a/src/mbgl/util/uv-channel.h b/src/mbgl/util/uv-channel.h
deleted file mode 100644
index 0a0d706477..0000000000
--- a/src/mbgl/util/uv-channel.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef MBGL_UTIL_UV_CHANNEL
-#define MBGL_UTIL_UV_CHANNEL
-
-#include <uv.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// Taken from http://navaneeth.github.io/blog/2013/08/02/channels-in-libuv/
-
-typedef struct uv_chan_s uv_chan_t;
-
-struct uv_chan_s {
- uv_mutex_t mutex;
- uv_cond_t cond;
- void *q[2];
-};
-
-int uv_chan_init(uv_chan_t *chan);
-void uv_chan_send(uv_chan_t *chan, void *data);
-void *uv_chan_receive(uv_chan_t *chan);
-void uv_chan_destroy(uv_chan_t *chan);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif
diff --git a/src/mbgl/util/uv-messenger.c b/src/mbgl/util/uv-messenger.c
deleted file mode 100644
index 935b6f1c41..0000000000
--- a/src/mbgl/util/uv-messenger.c
+++ /dev/null
@@ -1,86 +0,0 @@
-#include <mbgl/util/uv-messenger.h>
-#include <mbgl/util/queue.h>
-
-#include <stdlib.h>
-#include <assert.h>
-
-typedef struct {
- void *data;
- void *queue[2];
-} uv__messenger_item_t;
-
-#if UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR <= 10
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wunused-parameter"
-void uv__messenger_callback(uv_async_t *async, int status) {
-#pragma clang diagnostic pop
-#else
-void uv__messenger_callback(uv_async_t *async) {
-#endif
- uv_messenger_t *msgr = (uv_messenger_t *)async->data;
-
- uv__messenger_item_t *item;
- QUEUE *head;
-
- while (1) {
- uv_mutex_lock(&msgr->mutex);
- if (QUEUE_EMPTY(&msgr->queue)) {
- uv_mutex_unlock(&msgr->mutex);
- break;
- }
-
- head = QUEUE_HEAD(&msgr->queue);
- item = QUEUE_DATA(head, uv__messenger_item_t, queue);
- QUEUE_REMOVE(head);
- uv_mutex_unlock(&msgr->mutex);
-
- msgr->callback(item->data);
-
- free(item);
- }
-}
-
-int uv_messenger_init(uv_loop_t *loop, uv_messenger_t *msgr, uv_messenger_cb callback) {
- int ret = uv_mutex_init(&msgr->mutex);
- if (ret < 0) {
- return ret;
- }
-
- msgr->callback = callback;
- msgr->stop_callback = NULL;
-
- QUEUE_INIT(&msgr->queue);
-
- msgr->async.data = msgr;
- return uv_async_init(loop, &msgr->async, uv__messenger_callback);
-}
-
-void uv_messenger_send(uv_messenger_t *msgr, void *data) {
- uv__messenger_item_t *item = (uv__messenger_item_t *)malloc(sizeof(uv__messenger_item_t));
- item->data = data;
-
- uv_mutex_lock(&msgr->mutex);
- QUEUE_INSERT_TAIL(&msgr->queue, &item->queue);
- uv_mutex_unlock(&msgr->mutex);
-
- uv_async_send(&msgr->async);
-}
-
-void uv_messenger_ref(uv_messenger_t *msgr) {
- uv_ref((uv_handle_t *)&msgr->async);
-}
-
-void uv_messenger_unref(uv_messenger_t *msgr) {
- uv_unref((uv_handle_t *)&msgr->async);
-}
-
-void uv__messenger_stop_callback(uv_handle_t *handle) {
- uv_messenger_t *msgr = (uv_messenger_t *)handle->data;
- msgr->stop_callback(msgr);
-}
-
-void uv_messenger_stop(uv_messenger_t *msgr, uv_messenger_stop_cb stop_callback) {
- assert(!msgr->stop_callback);
- msgr->stop_callback = stop_callback;
- uv_close((uv_handle_t *)&msgr->async, uv__messenger_stop_callback);
-}
diff --git a/src/mbgl/util/uv-worker.c b/src/mbgl/util/uv-worker.c
deleted file mode 100644
index ec8eae461c..0000000000
--- a/src/mbgl/util/uv-worker.c
+++ /dev/null
@@ -1,170 +0,0 @@
-#include <mbgl/util/uv-worker.h>
-#include <mbgl/util/uv-messenger.h>
-
-#include <stdio.h>
-#include <assert.h>
-
-typedef struct uv__worker_item_s uv__worker_item_t;
-struct uv__worker_item_s {
- uv_worker_t *worker;
- void *data;
- uv_worker_cb work_cb;
- uv_worker_after_cb after_work_cb;
-};
-
-typedef struct uv__worker_thread_s uv__worker_thread_t;
-struct uv__worker_thread_s {
- uv_worker_t *worker;
- uv_thread_t thread;
-};
-
-void uv__worker_free_messenger(uv_messenger_t *msgr) {
- free(msgr);
-}
-
-void uv__worker_thread_finished(uv__worker_thread_t *worker_thread) {
- uv_worker_t *worker = worker_thread->worker;
-
-#ifdef DEBUG
- assert(uv_thread_self() == worker->thread_id);
-#endif
-
- // This should at most block very briefly. We are sending the termination
- // notification as the last thing in the worker thread, so by now the thread
- // has probably terminated already. If not, the waiting time should be
- // extremely short.
- uv_thread_join(&worker_thread->thread);
-
- assert(worker->count > 0);
- worker->count--;
- if (worker->count == 0) {
- uv_chan_destroy(&worker->chan);
- uv_messenger_stop(worker->msgr, uv__worker_free_messenger);
- if (worker->close_cb) {
- worker->close_cb(worker);
- }
- }
-}
-
-void uv__worker_after(void *ptr) {
- uv__worker_item_t *item = (uv__worker_item_t *)ptr;
-
- if (item->work_cb) {
- // We are finishing a regular work request.
- if (item->after_work_cb) {
- assert(item->after_work_cb);
- item->after_work_cb(item->data);
- }
- uv_worker_t *worker = item->worker;
- assert(worker->active_items > 0);
- if (--worker->active_items == 0) {
- uv_messenger_unref(worker->msgr);
- }
- } else {
- // This is a worker thread termination.
- uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)item->data;
- uv__worker_thread_finished(worker_thread);
- free(worker_thread);
- }
-
- free(item);
-}
-
-void uv__worker_thread_loop(void *ptr) {
- uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)ptr;
- uv_worker_t *worker = worker_thread->worker;
-
-#ifdef __APPLE__
- if (worker->name) {
- pthread_setname_np(worker->name);
- }
-#endif
-
- uv__worker_item_t *item = NULL;
- while ((item = (uv__worker_item_t *)uv_chan_receive(&worker->chan)) != NULL) {
- assert(item->work_cb);
- item->work_cb(item->data);
-
- // Trigger the after callback in the main thread.
- uv_messenger_send(worker->msgr, item);
- }
-
- // Make sure to close all other workers too.
- uv_chan_send(&worker->chan, NULL);
-
- // Create a new worker item that acts as a terminate flag for this thread.
- item = (uv__worker_item_t *)malloc(sizeof(uv__worker_item_t));
- item->data = worker_thread;
- item->work_cb = NULL;
- item->after_work_cb = NULL;
- uv_messenger_send(worker->msgr, item);
-}
-
-int uv_worker_init(uv_worker_t *worker, uv_loop_t *loop, int count, const char *name) {
-#ifdef DEBUG
- worker->thread_id = uv_thread_self();
-#endif
- worker->loop = loop;
- worker->name = name;
- worker->count = 0;
- worker->close_cb = NULL;
- worker->active_items = 0;
- worker->msgr = (uv_messenger_t *)malloc(sizeof(uv_messenger_t));
- int ret = uv_messenger_init(loop, worker->msgr, uv__worker_after);
- if (ret < 0) {
- free(worker->msgr);
- return ret;
- }
- uv_messenger_unref(worker->msgr);
- ret = uv_chan_init(&worker->chan);
- if (ret < 0) return ret;
-
- // Initialize all worker threads.
- int i;
- for (i = 0; i < count; i++) {
- uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)malloc(sizeof(uv__worker_thread_t));
- worker_thread->worker = worker;
- ret = uv_thread_create(&worker_thread->thread, uv__worker_thread_loop, worker_thread);
- if (ret < 0) return ret;
- worker->count++;
- }
-
- return 0;
-}
-
-void uv_worker_send(uv_worker_t *worker, void *data, uv_worker_cb work_cb,
- uv_worker_after_cb after_work_cb) {
-#ifdef DEBUG
- assert(uv_thread_self() == worker->thread_id);
-#endif
-
- // It doesn't make sense to not provide a work callback. On the other hand, the after_work_cb
- // may be NULL. In that case, there will be no callback called in the current thread and the
- // worker item will instead be freed in the worker thread.
- assert(work_cb);
-
- uv__worker_item_t *item = (uv__worker_item_t *)malloc(sizeof(uv__worker_item_t));
- item->worker = worker;
- item->work_cb = work_cb;
- item->after_work_cb = after_work_cb;
- item->data = data;
- uv_chan_send(&worker->chan, item);
- if (worker->active_items++ == 0) {
- uv_messenger_ref(worker->msgr);
- }
-}
-
-void uv_worker_close(uv_worker_t *worker, uv_worker_close_cb close_cb) {
-#ifdef DEBUG
- assert(uv_thread_self() == worker->thread_id);
-#endif
-
- // Prevent double calling.
- assert(worker->close_cb == NULL);
-
- worker->close_cb = close_cb;
- uv_chan_send(&worker->chan, NULL);
- if (worker->active_items++ == 0) {
- uv_messenger_ref(worker->msgr);
- }
-}
diff --git a/src/mbgl/util/uv-worker.h b/src/mbgl/util/uv-worker.h
deleted file mode 100644
index 65ad42edb8..0000000000
--- a/src/mbgl/util/uv-worker.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#ifndef MBGL_UTIL_UV_WORKER
-#define MBGL_UTIL_UV_WORKER
-
-#include <stdlib.h>
-
-#include <mbgl/util/uv-messenger.h>
-#include <mbgl/util/uv-channel.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct uv_worker_s uv_worker_t;
-
-typedef void (*uv_worker_cb)(void *data);
-typedef void (*uv_worker_after_cb)(void *data);
-typedef void (*uv_worker_close_cb)(uv_worker_t *worker);
-
-struct uv_worker_s {
-#ifdef DEBUG
- unsigned long thread_id;
-#endif
- uv_loop_t *loop;
- uv_messenger_t *msgr;
- uv_chan_t chan;
- const char *name;
- int count;
- uv_worker_close_cb close_cb;
- unsigned int active_items;
-};
-
-int uv_worker_init(uv_worker_t *worker, uv_loop_t *loop, int count, const char *name);
-void uv_worker_send(uv_worker_t *worker, void *data, uv_worker_cb work_cb,
- uv_worker_after_cb after_work_cb);
-void uv_worker_close(uv_worker_t *worker, uv_worker_close_cb close_cb);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif
diff --git a/src/mbgl/util/uv_detail.hpp b/src/mbgl/util/uv_detail.hpp
index 6ae3713e09..9d479da425 100644
--- a/src/mbgl/util/uv_detail.hpp
+++ b/src/mbgl/util/uv_detail.hpp
@@ -2,7 +2,6 @@
#define MBGL_UTIL_UV_DETAIL
#include <mbgl/util/uv.hpp>
-#include <mbgl/util/uv-worker.h>
#include <mbgl/util/noncopyable.hpp>
#include <uv.h>
@@ -117,56 +116,6 @@ private:
uv_rwlock_t mtx;
};
-class worker : public mbgl::util::noncopyable {
-public:
- inline worker(uv_loop_t *loop, unsigned int count, const char *name = nullptr) : w(new uv_worker_t) {
- uv_worker_init(w, loop, count, name);
- }
- inline ~worker() {
- uv_worker_close(w, [](uv_worker_t *worker_) {
- delete worker_;
- });
- }
- inline void add(void *data, uv_worker_cb work_cb, uv_worker_after_cb after_work_cb) {
- uv_worker_send(w, data, work_cb, after_work_cb);
- }
-
-private:
- uv_worker_t *w;
-};
-
-template <typename T>
-class work : public mbgl::util::noncopyable {
-public:
- typedef std::function<void (T&)> work_callback;
- typedef std::function<void (T&)> after_work_callback;
-
- template<typename... Args>
- work(worker &worker, work_callback work_cb_, after_work_callback after_work_cb_, Args&&... args)
- : data(std::forward<Args>(args)...),
- work_cb(work_cb_),
- after_work_cb(after_work_cb_) {
- worker.add(this, do_work, after_work);
- }
-
-private:
- static void do_work(void *data) {
- work<T> *w = reinterpret_cast<work<T> *>(data);
- w->work_cb(w->data);
- }
-
- static void after_work(void *data) {
- work<T> *w = reinterpret_cast<work<T> *>(data);
- w->after_work_cb(w->data);
- delete w;
- }
-
-private:
- T data;
- work_callback work_cb;
- after_work_callback after_work_cb;
-};
-
}
#endif
diff --git a/src/mbgl/util/worker.cpp b/src/mbgl/util/worker.cpp
new file mode 100644
index 0000000000..3559cdd71f
--- /dev/null
+++ b/src/mbgl/util/worker.cpp
@@ -0,0 +1,73 @@
+#include <mbgl/util/worker.hpp>
+
+#include <cassert>
+
+namespace mbgl {
+
+Worker::Worker(uv_loop_t* loop, std::size_t count)
+ : queue(new Queue(loop, [this](Fn after) { afterWork(after); }))
+{
+ queue->unref();
+
+ for (std::size_t i = 0; i < count; i++) {
+ threads.emplace_back(&Worker::workLoop, this);
+ }
+}
+
+Worker::~Worker() {
+ MBGL_VERIFY_THREAD(tid);
+
+ if (active++ == 0) {
+ queue->ref();
+ }
+
+ channel.send(Work());
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ queue->stop();
+}
+
+void Worker::send(Fn work, Fn after) {
+ MBGL_VERIFY_THREAD(tid);
+ assert(work);
+
+ if (active++ == 0) {
+ queue->ref();
+ }
+
+ channel.send({work, after});
+}
+
+void Worker::workLoop() {
+#ifdef __APPLE__
+ pthread_setname_np("Worker");
+#endif
+
+ while (true) {
+ Work item = channel.receive();
+
+ if (!item.work)
+ break;
+
+ item.work();
+ queue->send(std::move(item.after));
+ }
+
+ // Make sure to close all other workers too.
+ channel.send(Work());
+}
+
+void Worker::afterWork(Fn after) {
+ if (after) {
+ after();
+ }
+
+ if (--active == 0) {
+ queue->unref();
+ }
+}
+
+}
diff --git a/src/mbgl/util/worker.hpp b/src/mbgl/util/worker.hpp
new file mode 100644
index 0000000000..86c2e6acf4
--- /dev/null
+++ b/src/mbgl/util/worker.hpp
@@ -0,0 +1,44 @@
+#ifndef MBGL_UTIL_WORKER
+#define MBGL_UTIL_WORKER
+
+#include <mbgl/util/noncopyable.hpp>
+#include <mbgl/util/async_queue.hpp>
+#include <mbgl/util/channel.hpp>
+#include <mbgl/util/util.hpp>
+
+#include <thread>
+#include <functional>
+
+namespace mbgl {
+
+class Worker : public mbgl::util::noncopyable {
+public:
+ using Fn = std::function<void ()>;
+
+ Worker(uv_loop_t* loop, std::size_t count);
+ ~Worker();
+
+ void send(Fn work, Fn after);
+
+private:
+ void workLoop();
+ void afterWork(Fn after);
+
+ struct Work {
+ Fn work;
+ Fn after;
+ };
+
+ using Queue = util::AsyncQueue<std::function<void ()>>;
+
+ std::size_t active = 0;
+ Queue* queue = nullptr;
+ Channel<Work> channel;
+ std::vector<std::thread> threads;
+
+ MBGL_STORE_THREAD(tid)
+};
+
+}
+
+#endif