summaryrefslogtreecommitdiff
path: root/platform/node/src/node_file_source.cpp
blob: 204e85a1262e822ea8c07588ea9b72a76be786bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include "node_file_source.hpp"
#include "node_request.hpp"
#include "util/async_queue.hpp"

#include <mbgl/storage/request.hpp>

namespace node_mbgl {

struct NodeFileSource::Action {
    const enum : bool { Add, Cancel } type;
    mbgl::Resource const resource;
};

NodeFileSource::NodeFileSource(v8::Handle<v8::Object> options_) :
    queue(new Queue(uv_default_loop(), [this](Action &action) {
        if (action.type == Action::Add) {
            processAdd(action.resource);
        } else if (action.type == Action::Cancel) {
            processCancel(action.resource);
        }
    }))
{
    NanAssignPersistent(options, options_->ToObject());

    // Make sure that the queue doesn't block the loop from exiting.
    queue->unref();
}

NodeFileSource::~NodeFileSource() {
    queue->stop();
    queue = nullptr;

    NanDisposePersistent(options);
}

mbgl::Request* NodeFileSource::request(const mbgl::Resource& resource, uv_loop_t* loop, Callback callback) {
    auto req = new mbgl::Request(resource, loop, std::move(callback));

    std::lock_guard<std::mutex> lock(observersMutex);

    assert(observers.find(resource) == observers.end());
    observers[resource] = req;

    // This function can be called from any thread. Make sure we're executing the actual call in the
    // file source loop by sending it over the queue. It will be processed in processAction().
    queue->send(Action{ Action::Add, resource });

    return req;
}

void NodeFileSource::cancel(mbgl::Request* req) {
    req->cancel();

    std::lock_guard<std::mutex> lock(observersMutex);

    auto it = observers.find(req->resource);
    if (it == observers.end()) {
        return;
    }

    observers.erase(it);

    // This function can be called from any thread. Make sure we're executing the actual call in the
    // file source loop by sending it over the queue. It will be processed in processAction().
    queue->send(Action{ Action::Cancel, req->resource });

    req->destruct();
}

void NodeFileSource::processAdd(const mbgl::Resource& resource) {
    NanScope();

    // Make sure the loop stays alive as long as request is pending.
    if (pending.empty()) {
        queue->ref();
    }

    auto requestHandle = NanNew<v8::Object>(NodeRequest::Create(this, resource));

    v8::Persistent<v8::Object> requestPersistent;
    NanAssignPersistent(requestPersistent, requestHandle);
    pending.emplace(resource, std::move(requestPersistent));

#if (NODE_MODULE_VERSION > NODE_0_10_MODULE_VERSION)
    auto requestFunction = v8::Local<v8::Object>::New(v8::Isolate::GetCurrent(), options)->Get(NanNew("request")).As<v8::Function>();
#else
    auto requestFunction = options->Get(NanNew("request")).As<v8::Function>();
#endif

    v8::Local<v8::Value> argv[] = { requestHandle };
    NanMakeCallback(NanGetCurrentContext()->Global(), requestFunction, 1, argv);
}

void NodeFileSource::processCancel(const mbgl::Resource& resource) {
    NanScope();

    auto it = pending.find(resource);
    if (it == pending.end()) {
        // The response callback was already fired. There is no point in calling the cancelation
        // callback because the request is already completed.
    } else {
#if (NODE_MODULE_VERSION > NODE_0_10_MODULE_VERSION)
        auto requestHandle = v8::Local<v8::Object>::New(v8::Isolate::GetCurrent(), it->second);
        it->second.Reset();
#else
        auto requestHandle = NanNew<v8::Object>(it->second);
        NanDisposePersistent(it->second);
#endif
        pending.erase(it);

        // Make sure the the loop can exit when there are no pending requests.
        if (pending.empty()) {
            queue->unref();
        }

#if (NODE_MODULE_VERSION > NODE_0_10_MODULE_VERSION)
        auto optionsObject = v8::Local<v8::Object>::New(v8::Isolate::GetCurrent(), options);
        if (optionsObject->Has(NanNew("cancel"))) {
            auto cancelFunction = optionsObject->Get(NanNew("cancel")).As<v8::Function>();
#else
        if (options->Has(NanNew("cancel"))) {
            auto cancelFunction = options->Get(NanNew("cancel")).As<v8::Function>();
#endif
            v8::Local<v8::Value> argv[] = { requestHandle };
            NanMakeCallback(NanGetCurrentContext()->Global(), cancelFunction, 1, argv);
        }

        // Set the request handle in the request wrapper handle to null
        node::ObjectWrap::Unwrap<NodeRequest>(requestHandle)->cancel();
    }
}

void NodeFileSource::notify(const mbgl::Resource& resource, const std::shared_ptr<const mbgl::Response>& response) {
    // First, remove the request, since it might be destructed at any point now.
    auto it = pending.find(resource);
    if (it != pending.end()) {
#if (NODE_MODULE_VERSION > NODE_0_10_MODULE_VERSION)
        it->second.Reset();
#else
        NanDisposePersistent(it->second);
#endif
        pending.erase(it);

        // Make sure the the loop can exit when there are no pending requests.
        if (pending.empty()) {
            queue->unref();
        }
    }

    std::lock_guard<std::mutex> lock(observersMutex);

    auto observersIt = observers.find(resource);
    if (observersIt == observers.end()) {
        return;
    }

    observersIt->second->notify(response);
    observers.erase(observersIt);
}

}