summaryrefslogtreecommitdiff
path: root/src/mbgl/actor/mailbox.cpp
blob: 070e14bdb084f1e62e0316d582f8d3e57a8c0df4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <mbgl/actor/mailbox.hpp>
#include <mbgl/actor/message.hpp>
#include <mbgl/actor/scheduler.hpp>

#include <cassert>

namespace mbgl {

Mailbox::Mailbox() = default;

Mailbox::Mailbox(Scheduler& scheduler_)
    : scheduler(&scheduler_) {
}

void Mailbox::open(Scheduler& scheduler_) {
    assert(!scheduler);

    // As with close(), block until neither receive() nor push() are in progress, and acquire the two
    // mutexes in the same order.
    std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex);
    std::lock_guard<std::mutex> pushingLock(pushingMutex);
    
    scheduler = &scheduler_;

    if (closed) {
        return;
    }
    
    if (!queue.empty()) {
        (*scheduler)->schedule(makeClosure(shared_from_this()));
    }
}

void Mailbox::close() {
    // Block until neither receive() nor push() are in progress. Two mutexes are used because receive()
    // must not block send(). Of the two, the receiving mutex must be acquired first, because that is
    // the order that an actor will obtain them when it self-sends a message, and consistent lock
    // acquisition order prevents deadlocks.
    // The receiving mutex is recursive to allow a mailbox (and thus the actor) to close itself.
    std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex);
    std::lock_guard<std::mutex> pushingLock(pushingMutex);

    closed = true;
}

bool Mailbox::isOpen() const { return bool(scheduler); }


void Mailbox::push(std::unique_ptr<Message> message) {
    std::lock_guard<std::mutex> pushingLock(pushingMutex);

    if (closed) {
        return;
    }

    std::lock_guard<std::mutex> queueLock(queueMutex);
    bool wasEmpty = queue.empty();
    queue.push(std::move(message));
    if (wasEmpty && scheduler) {
        (*scheduler)->schedule(makeClosure(shared_from_this()));
    }
}

void Mailbox::receive() {
    std::lock_guard<std::recursive_mutex> receivingLock(receivingMutex);
    
    assert(scheduler);

    if (closed) {
        return;
    }

    std::unique_ptr<Message> message;
    bool wasEmpty;

    {
        std::lock_guard<std::mutex> queueLock(queueMutex);
        assert(!queue.empty());
        message = std::move(queue.front());
        queue.pop();
        wasEmpty = queue.empty();
    }

    (*message)();

    if (!wasEmpty) {
        (*scheduler)->schedule(makeClosure(shared_from_this()));
    }
}

// static
void Mailbox::maybeReceive(std::weak_ptr<Mailbox> mailbox) {
    if (auto locked = mailbox.lock()) {
        locked->receive();
    }
}

// static
std::function<void()> Mailbox::makeClosure(std::weak_ptr<Mailbox> mailbox) {
    return [mailbox]() { maybeReceive(mailbox); };
}

} // namespace mbgl