summaryrefslogtreecommitdiff
path: root/chromium/third_party/libgav1/src/src/utils/threadpool.cc
blob: 4072cfb808e3ebf65160c9df3e72b23864c38135 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
// Copyright 2019 The libgav1 Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "src/utils/threadpool.h"

#if defined(_MSC_VER)
#include <process.h>
#include <windows.h>
#else  // defined(_MSC_VER)
#include <pthread.h>
#endif  // defined(_MSC_VER)
#if defined(__ANDROID__) || defined(__GLIBC__)
#include <sys/types.h>
#include <unistd.h>
#include <cstdio>
#endif
#include <algorithm>
#include <cassert>
#include <cinttypes>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <new>
#include <utility>

#if defined(__ANDROID__)
#include <chrono>  // NOLINT (unapproved c++11 header)
#endif

// Define the GetTid() function, a wrapper for the gettid() system call in
// Linux.
#if defined(__ANDROID__)
static pid_t GetTid() { return gettid(); }
#elif defined(__GLIBC__)
// The glibc wrapper for the gettid() system call was added in glibc 2.30.
// Emulate it for older versions of glibc.
#if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 30)
static pid_t GetTid() { return gettid(); }
#else  // Older than glibc 2.30
#include <sys/syscall.h>

static pid_t GetTid() { return static_cast<pid_t>(syscall(SYS_gettid)); }
#endif  // glibc 2.30 or later.
#endif  // defined(__GLIBC__)

namespace libgav1 {

#if defined(__ANDROID__)
namespace {

using Clock = std::chrono::steady_clock;
using Duration = Clock::duration;
constexpr Duration kBusyWaitDuration =
    std::chrono::duration_cast<Duration>(std::chrono::duration<double>(2e-3));

}  // namespace
#endif  // defined(__ANDROID__)

// static
std::unique_ptr<ThreadPool> ThreadPool::Create(int num_threads) {
  return Create(/*name_prefix=*/"", num_threads);
}

// static
std::unique_ptr<ThreadPool> ThreadPool::Create(const char name_prefix[],
                                               int num_threads) {
  if (name_prefix == nullptr || num_threads <= 0) return nullptr;
  std::unique_ptr<WorkerThread*[]> threads(new (std::nothrow)
                                               WorkerThread*[num_threads]);
  if (threads == nullptr) return nullptr;
  std::unique_ptr<ThreadPool> pool(new (std::nothrow) ThreadPool(
      name_prefix, std::move(threads), num_threads));
  if (pool != nullptr && !pool->StartWorkers()) {
    pool = nullptr;
  }
  return pool;
}

ThreadPool::ThreadPool(const char name_prefix[],
                       std::unique_ptr<WorkerThread*[]> threads,
                       int num_threads)
    : threads_(std::move(threads)), num_threads_(num_threads) {
  threads_[0] = nullptr;
  assert(name_prefix != nullptr);
  const size_t name_prefix_len =
      std::min(strlen(name_prefix), sizeof(name_prefix_) - 1);
  memcpy(name_prefix_, name_prefix, name_prefix_len);
  name_prefix_[name_prefix_len] = '\0';
}

ThreadPool::~ThreadPool() { Shutdown(); }

void ThreadPool::Schedule(std::function<void()> closure) {
  LockMutex();
  if (!queue_.GrowIfNeeded()) {
    // queue_ is full and we can't grow it. Run |closure| directly.
    UnlockMutex();
    closure();
    return;
  }
  queue_.Push(std::move(closure));
  UnlockMutex();
  SignalOne();
}

int ThreadPool::num_threads() const { return num_threads_; }

// A simple implementation that mirrors the non-portable Thread.  We may
// choose to expand this in the future as a portable implementation of
// Thread, or replace it at such a time as one is implemented.
class ThreadPool::WorkerThread : public Allocable {
 public:
  // Creates and starts a thread that runs pool->WorkerFunction().
  explicit WorkerThread(ThreadPool* pool);

  // Not copyable or movable.
  WorkerThread(const WorkerThread&) = delete;
  WorkerThread& operator=(const WorkerThread&) = delete;

  // REQUIRES: Join() must have been called if Start() was called and
  // succeeded.
  ~WorkerThread() = default;

  LIBGAV1_MUST_USE_RESULT bool Start();

  // Joins with the running thread.
  void Join();

 private:
#if defined(_MSC_VER)
  static unsigned int __stdcall ThreadBody(void* arg);
#else
  static void* ThreadBody(void* arg);
#endif

  void SetupName();
  void Run();

  ThreadPool* pool_;
#if defined(_MSC_VER)
  HANDLE handle_;
#else
  pthread_t thread_;
#endif
};

ThreadPool::WorkerThread::WorkerThread(ThreadPool* pool) : pool_(pool) {}

#if defined(_MSC_VER)

bool ThreadPool::WorkerThread::Start() {
  // Since our code calls the C run-time library (CRT), use _beginthreadex
  // rather than CreateThread. Microsoft documentation says "If a thread
  // created using CreateThread calls the CRT, the CRT may terminate the
  // process in low-memory conditions."
  uintptr_t handle = _beginthreadex(
      /*security=*/nullptr, /*stack_size=*/0, ThreadBody, this,
      /*initflag=*/CREATE_SUSPENDED, /*thrdaddr=*/nullptr);
  if (handle == 0) return false;
  handle_ = reinterpret_cast<HANDLE>(handle);
  ResumeThread(handle_);
  return true;
}

void ThreadPool::WorkerThread::Join() {
  WaitForSingleObject(handle_, INFINITE);
  CloseHandle(handle_);
}

unsigned int ThreadPool::WorkerThread::ThreadBody(void* arg) {
  auto* thread = static_cast<WorkerThread*>(arg);
  thread->Run();
  return 0;
}

void ThreadPool::WorkerThread::SetupName() {
  // Not currently supported on Windows.
}

#else  // defined(_MSC_VER)

bool ThreadPool::WorkerThread::Start() {
  return pthread_create(&thread_, nullptr, ThreadBody, this) == 0;
}

void ThreadPool::WorkerThread::Join() { pthread_join(thread_, nullptr); }

void* ThreadPool::WorkerThread::ThreadBody(void* arg) {
  auto* thread = static_cast<WorkerThread*>(arg);
  thread->Run();
  return nullptr;
}

void ThreadPool::WorkerThread::SetupName() {
  if (pool_->name_prefix_[0] != '\0') {
#if defined(__APPLE__)
    // Apple's version of pthread_setname_np takes one argument and operates on
    // the current thread only. Also, pthread_mach_thread_np is Apple-specific.
    // The maximum size of the |name| buffer was noted in the Chromium source
    // code and was confirmed by experiments.
    char name[64];
    mach_port_t id = pthread_mach_thread_np(pthread_self());
    int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
                      static_cast<int64_t>(id));
    assert(rv >= 0);
    rv = pthread_setname_np(name);
    assert(rv == 0);
    static_cast<void>(rv);
#elif defined(__ANDROID__) || defined(__GLIBC__)
    // If the |name| buffer is longer than 16 bytes, pthread_setname_np fails
    // with error 34 (ERANGE) on Android.
    char name[16];
    pid_t id = GetTid();
    int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
                      static_cast<int64_t>(id));
    assert(rv >= 0);
    rv = pthread_setname_np(pthread_self(), name);
    assert(rv == 0);
    static_cast<void>(rv);
#endif
  }
}

#endif  // defined(_MSC_VER)

void ThreadPool::WorkerThread::Run() {
  SetupName();
  pool_->WorkerFunction();
}

bool ThreadPool::StartWorkers() {
  if (!queue_.Init()) return false;
  for (int i = 0; i < num_threads_; ++i) {
    threads_[i] = new (std::nothrow) WorkerThread(this);
    if (threads_[i] == nullptr) return false;
    if (!threads_[i]->Start()) {
      delete threads_[i];
      threads_[i] = nullptr;
      return false;
    }
  }
  return true;
}

void ThreadPool::WorkerFunction() {
  LockMutex();
  while (true) {
    if (queue_.Empty()) {
      if (exit_threads_) {
        break;  // Queue is empty and exit was requested.
      }
#if defined(__ANDROID__)
      // On android, if we go to a conditional wait right away, the CPU governor
      // kicks in and starts shutting the cores down. So we do a very small busy
      // wait to see if we get our next job within that period. This
      // significantly improves the performance of common cases of tile parallel
      // decoding. If we don't receive a job in the busy wait time, we then go
      // to an actual conditional wait as usual.
      UnlockMutex();
      bool found_job = false;
      const auto wait_start = Clock::now();
      while (Clock::now() - wait_start < kBusyWaitDuration) {
        LockMutex();
        if (!queue_.Empty()) {
          found_job = true;
          break;
        }
        UnlockMutex();
      }
      // If |found_job| is true, we simply continue since we already hold the
      // mutex and we know for sure that the |queue_| is not empty.
      if (found_job) continue;
      // Since |found_job_| was false, the mutex is not being held at this
      // point.
      LockMutex();
      // Ensure that the queue is still empty.
      if (!queue_.Empty()) continue;
      if (exit_threads_) {
        break;  // Queue is empty and exit was requested.
      }
#endif  // defined(__ANDROID__)
      // Queue is still empty, wait for signal or broadcast.
      Wait();
    } else {
      // Take a job from the queue.
      std::function<void()> job = std::move(queue_.Front());
      queue_.Pop();

      UnlockMutex();
      // Note that it is good practice to surround this with a try/catch so
      // the thread pool doesn't go to hell if the job throws an exception.
      // This is omitted here because Google3 doesn't like exceptions.
      std::move(job)();
      job = nullptr;

      LockMutex();
    }
  }
  UnlockMutex();
}

void ThreadPool::Shutdown() {
  // Tell worker threads how to exit.
  LockMutex();
  exit_threads_ = true;
  UnlockMutex();
  SignalAll();

  // Join all workers. This will block.
  for (int i = 0; i < num_threads_; ++i) {
    if (threads_[i] == nullptr) break;
    threads_[i]->Join();
    delete threads_[i];
  }
}

}  // namespace libgav1