summaryrefslogtreecommitdiff
path: root/include/mbgl/util/run_loop.hpp
blob: 4bf614c55f46c0ed50d273eedc3b2f753f33883c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#pragma once

#include <mbgl/actor/scheduler.hpp>
#include <mbgl/actor/mailbox.hpp>
#include <mbgl/util/noncopyable.hpp>
#include <mbgl/util/util.hpp>
#include <mbgl/util/work_task.hpp>
#include <mbgl/util/work_request.hpp>

#include <atomic>
#include <functional>
#include <utility>
#include <queue>
#include <mutex>

namespace mbgl {
namespace util {

using LOOP_HANDLE = void *;

class RunLoop : public Scheduler,
                private util::noncopyable {
public:
    enum class Type : uint8_t {
        Default,
        New,
    };

    enum class Priority : bool {
        Default = false,
        High = true,
    };

    enum class Event : uint8_t {
        None      = 0,
        Read      = 1,
        Write     = 2,
        ReadWrite = Read | Write,
    };

    RunLoop(Type type = Type::Default);
    ~RunLoop() override;

    static RunLoop* Get();
    static LOOP_HANDLE getLoopHandle();

    void run();
    void runOnce();
    void stop();

    // Platform integration callback for platforms that do not have full
    // run loop integration or don't want to block at the Mapbox GL Native
    // loop. It will be called from any thread and is up to the platform
    // to, after receiving the callback, call RunLoop::runOnce() from the
    // same thread as the Map object lives.
    void setPlatformCallback(std::function<void()> callback) { platformCallback = std::move(callback); }

    // So far only needed by the libcurl backend.
    void addWatch(int fd, Event, std::function<void(int, Event)>&& callback);
    void removeWatch(int fd);

    // Invoke fn(args...) on this RunLoop.
    template <class Fn, class... Args>
    void invoke(Priority priority, Fn&& fn, Args&&... args) {
        push(priority, WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...));
    }

    // Invoke fn(args...) on this RunLoop.
    template <class Fn, class... Args>
    void invoke(Fn&& fn, Args&&... args) {
        invoke(Priority::Default, std::forward<Fn>(fn), std::forward<Args>(args)...);
    }

    // Post the cancellable work fn(args...) to this RunLoop.
    template <class Fn, class... Args>
    std::unique_ptr<AsyncRequest>
    invokeCancellable(Fn&& fn, Args&&... args) {
        std::shared_ptr<WorkTask> task = WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...);
        push(Priority::Default, task);
        return std::make_unique<WorkRequest>(task);
    }

    void schedule(std::function<void()> fn) override { invoke(std::move(fn)); }
    ::mapbox::base::WeakPtr<Scheduler> makeWeakPtr() override { return weakFactory.makeWeakPtr(); }

    class Impl;

private:
    MBGL_STORE_THREAD(tid)

    using Queue = std::queue<std::shared_ptr<WorkTask>>;

    // Wakes up the RunLoop so that it starts processing items in the queue.
    void wake();

    // Adds a WorkTask to the queue, and wakes it up.
    void push(Priority priority, std::shared_ptr<WorkTask> task) {
        std::lock_guard<std::mutex> lock(mutex);
        if (priority == Priority::High) {
            highPriorityQueue.emplace(std::move(task));
        } else {
            defaultQueue.emplace(std::move(task));
        }
        wake();

        if (platformCallback) {
            platformCallback();
        }
    }

    void process() {
        std::shared_ptr<WorkTask> task;
        std::unique_lock<std::mutex> lock(mutex);
        while (true) {
            if (!highPriorityQueue.empty()) {
                task = std::move(highPriorityQueue.front());
                highPriorityQueue.pop();
            } else if (!defaultQueue.empty()) {
                task = std::move(defaultQueue.front());
                defaultQueue.pop();
            } else {
                break;
            }
            lock.unlock();
            (*task)();
            task.reset();
            lock.lock();
        }
    }

    std::function<void()> platformCallback;

    Queue defaultQueue;
    Queue highPriorityQueue;
    std::mutex mutex;

    std::unique_ptr<Impl> impl;
    ::mapbox::base::WeakPtrFactory<Scheduler> weakFactory{this};
};

} // namespace util
} // namespace mbgl

#include <mbgl/util/work_task_impl.hpp>