summaryrefslogtreecommitdiff
path: root/src/mbgl/util/uv-worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mbgl/util/uv-worker.c')
-rw-r--r--src/mbgl/util/uv-worker.c170
1 files changed, 170 insertions, 0 deletions
diff --git a/src/mbgl/util/uv-worker.c b/src/mbgl/util/uv-worker.c
new file mode 100644
index 0000000000..d2aa908019
--- /dev/null
+++ b/src/mbgl/util/uv-worker.c
@@ -0,0 +1,170 @@
+#include <mbgl/util/uv-worker.h>
+#include <mbgl/util/uv-messenger.h>
+
+#include <stdio.h>
+#include <assert.h>
+
+typedef struct uv__worker_item_s uv__worker_item_t;
+struct uv__worker_item_s {
+ uv_worker_t *worker;
+ void *data;
+ uv_worker_cb work_cb;
+ uv_worker_after_cb after_work_cb;
+};
+
+typedef struct uv__worker_thread_s uv__worker_thread_t;
+struct uv__worker_thread_s {
+ uv_worker_t *worker;
+ uv_thread_t thread;
+};
+
+void uv__worker_free_messenger(uv_messenger_t *msgr) {
+ free(msgr);
+}
+
+void uv__worker_thread_finished(uv__worker_thread_t *worker_thread) {
+ uv_worker_t *worker = worker_thread->worker;
+
+#ifndef NDEBUG
+ assert(uv_thread_self() == worker->thread_id);
+#endif
+
+ // This should at most block very briefly. We are sending the termination
+ // notification as the last thing in the worker thread, so by now the thread
+ // has probably terminated already. If not, the waiting time should be
+ // extremely short.
+ uv_thread_join(&worker_thread->thread);
+
+ assert(worker->count > 0);
+ worker->count--;
+ if (worker->count == 0) {
+ uv_chan_destroy(&worker->chan);
+ uv_messenger_stop(worker->msgr, uv__worker_free_messenger);
+ if (worker->close_cb) {
+ worker->close_cb(worker);
+ }
+ }
+}
+
+void uv__worker_after(void *ptr) {
+ uv__worker_item_t *item = (uv__worker_item_t *)ptr;
+
+ if (item->work_cb) {
+ // We are finishing a regular work request.
+ if (item->after_work_cb) {
+ assert(item->after_work_cb);
+ item->after_work_cb(item->data);
+ }
+ uv_worker_t *worker = item->worker;
+ assert(worker->active_items > 0);
+ if (--worker->active_items == 0) {
+ uv_messenger_unref(worker->msgr);
+ }
+ } else {
+ // This is a worker thread termination.
+ uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)item->data;
+ uv__worker_thread_finished(worker_thread);
+ free(worker_thread);
+ }
+
+ free(item);
+}
+
+void uv__worker_thread_loop(void *ptr) {
+ uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)ptr;
+ uv_worker_t *worker = worker_thread->worker;
+
+#ifdef __APPLE__
+ if (worker->name) {
+ pthread_setname_np(worker->name);
+ }
+#endif
+
+ uv__worker_item_t *item = NULL;
+ while ((item = (uv__worker_item_t *)uv_chan_receive(&worker->chan)) != NULL) {
+ assert(item->work_cb);
+ item->work_cb(item->data);
+
+ // Trigger the after callback in the main thread.
+ uv_messenger_send(worker->msgr, item);
+ }
+
+ // Make sure to close all other workers too.
+ uv_chan_send(&worker->chan, NULL);
+
+ // Create a new worker item that acts as a terminate flag for this thread.
+ item = (uv__worker_item_t *)malloc(sizeof(uv__worker_item_t));
+ item->data = worker_thread;
+ item->work_cb = NULL;
+ item->after_work_cb = NULL;
+ uv_messenger_send(worker->msgr, item);
+}
+
+int uv_worker_init(uv_worker_t *worker, uv_loop_t *loop, int count, const char *name) {
+#ifndef NDEBUG
+ worker->thread_id = uv_thread_self();
+#endif
+ worker->loop = loop;
+ worker->name = name;
+ worker->count = 0;
+ worker->close_cb = NULL;
+ worker->active_items = 0;
+ worker->msgr = (uv_messenger_t *)malloc(sizeof(uv_messenger_t));
+ int ret = uv_messenger_init(loop, worker->msgr, uv__worker_after);
+ if (ret < 0) {
+ free(worker->msgr);
+ return ret;
+ }
+ uv_messenger_unref(worker->msgr);
+ ret = uv_chan_init(&worker->chan);
+ if (ret < 0) return ret;
+
+ // Initialize all worker threads.
+ int i;
+ for (i = 0; i < count; i++) {
+ uv__worker_thread_t *worker_thread = (uv__worker_thread_t *)malloc(sizeof(uv__worker_thread_t));
+ worker_thread->worker = worker;
+ ret = uv_thread_create(&worker_thread->thread, uv__worker_thread_loop, worker_thread);
+ if (ret < 0) return ret;
+ worker->count++;
+ }
+
+ return 0;
+}
+
+void uv_worker_send(uv_worker_t *worker, void *data, uv_worker_cb work_cb,
+ uv_worker_after_cb after_work_cb) {
+#ifndef NDEBUG
+ assert(uv_thread_self() == worker->thread_id);
+#endif
+
+ // It doesn't make sense to not provide a work callback. On the other hand, the after_work_cb
+ // may be NULL. In that case, there will be no callback called in the current thread and the
+ // worker item will instead be freed in the worker thread.
+ assert(work_cb);
+
+ uv__worker_item_t *item = (uv__worker_item_t *)malloc(sizeof(uv__worker_item_t));
+ item->worker = worker;
+ item->work_cb = work_cb;
+ item->after_work_cb = after_work_cb;
+ item->data = data;
+ uv_chan_send(&worker->chan, item);
+ if (worker->active_items++ == 0) {
+ uv_messenger_ref(worker->msgr);
+ }
+}
+
+void uv_worker_close(uv_worker_t *worker, uv_worker_close_cb close_cb) {
+#ifndef NDEBUG
+ assert(uv_thread_self() == worker->thread_id);
+#endif
+
+ // Prevent double calling.
+ assert(worker->close_cb == NULL);
+
+ worker->close_cb = close_cb;
+ uv_chan_send(&worker->chan, NULL);
+ if (worker->active_items++ == 0) {
+ uv_messenger_ref(worker->msgr);
+ }
+}