diff options
author | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2022-05-12 15:59:20 +0200 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2022-05-25 06:57:22 +0000 |
commit | f7eaed5286974984ba5f9e3189d8f49d03e99f81 (patch) | |
tree | caed19b2af2024f35449fb0b781d0a25e09d4f8f /chromium/components/webrtc/thread_wrapper.cc | |
parent | 9729c4479fe23554eae6e6dd1f30ff488f470c84 (diff) | |
download | qtwebengine-chromium-f7eaed5286974984ba5f9e3189d8f49d03e99f81.tar.gz |
BASELINE: Update Chromium to 100.0.4896.167
Change-Id: I98cbeb5d7543d966ffe04d8cefded0c493a11333
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/components/webrtc/thread_wrapper.cc')
-rw-r--r-- | chromium/components/webrtc/thread_wrapper.cc | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/chromium/components/webrtc/thread_wrapper.cc b/chromium/components/webrtc/thread_wrapper.cc new file mode 100644 index 00000000000..eae08b9c4d4 --- /dev/null +++ b/chromium/components/webrtc/thread_wrapper.cc @@ -0,0 +1,493 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/webrtc/thread_wrapper.h" + +#include <stddef.h> +#include <stdint.h> +#include <memory> + +#include "base/bind.h" +#include "base/callback_helpers.h" +#include "base/cxx17_backports.h" +#include "base/lazy_instance.h" +#include "base/memory/raw_ptr.h" +#include "base/sequence_checker.h" +#include "base/thread_annotations.h" +#include "base/threading/thread_local.h" +#include "base/threading/thread_task_runner_handle.h" +#include "base/time/time.h" +#include "base/trace_event/trace_event.h" +#include "third_party/abseil-cpp/absl/types/optional.h" +#include "third_party/webrtc/rtc_base/physical_socket_server.h" + +namespace webrtc { +namespace { +constexpr base::TimeDelta kTaskLatencySampleDuration = base::Seconds(3); +} + +const base::Feature kThreadWrapperUsesMetronome{ + "ThreadWrapperUsesMetronome", base::FEATURE_DISABLED_BY_DEFAULT}; + +// Class intended to conditionally live for the duration of ThreadWrapper +// that periodically captures task latencies (definition in docs for +// SetLatencyAndTaskDurationCallbacks). +class ThreadWrapper::PostTaskLatencySampler { + public: + PostTaskLatencySampler( + scoped_refptr<base::SingleThreadTaskRunner> task_runner, + SampledDurationCallback task_latency_callback) + : task_runner_(task_runner), + task_latency_callback_(std::move(task_latency_callback)) { + ScheduleDelayedSample(); + } + + bool ShouldSampleNextTaskDuration() { + DCHECK_CALLED_ON_VALID_SEQUENCE(current_); + bool time_to_sample = should_sample_next_task_duration_; + should_sample_next_task_duration_ = false; + return time_to_sample; + } + + private: + void ScheduleDelayedSample() { + DCHECK_CALLED_ON_VALID_SEQUENCE(current_); + task_runner_->PostDelayedTask( + FROM_HERE, + base::BindOnce(&PostTaskLatencySampler::TakeSample, + base::Unretained(this)), + kTaskLatencySampleDuration); + } + + void TakeSample() { + DCHECK_CALLED_ON_VALID_SEQUENCE(current_); + task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&PostTaskLatencySampler::FinishSample, + base::Unretained(this), base::TimeTicks::Now())); + } + + void FinishSample(base::TimeTicks post_timestamp) { + DCHECK_CALLED_ON_VALID_SEQUENCE(current_); + task_latency_callback_.Run(base::TimeTicks::Now() - post_timestamp); + ScheduleDelayedSample(); + should_sample_next_task_duration_ = true; + } + + SEQUENCE_CHECKER(current_); + scoped_refptr<base::SingleThreadTaskRunner> task_runner_; + base::RepeatingCallback<void(base::TimeDelta)> task_latency_callback_ + GUARDED_BY_CONTEXT(current_); + bool should_sample_next_task_duration_ GUARDED_BY_CONTEXT(current_) = false; +}; + +struct ThreadWrapper::PendingSend { + explicit PendingSend(const rtc::Message& message_value) + : sending_thread(ThreadWrapper::current()), + message(message_value), + done_event(base::WaitableEvent::ResetPolicy::MANUAL, + base::WaitableEvent::InitialState::NOT_SIGNALED) { + DCHECK(sending_thread); + } + + raw_ptr<ThreadWrapper> sending_thread; + rtc::Message message; + base::WaitableEvent done_event; +}; + +base::LazyInstance<base::ThreadLocalPointer<ThreadWrapper>>::DestructorAtExit + g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; + +// static +void ThreadWrapper::EnsureForCurrentMessageLoop( + scoped_refptr<blink::MetronomeSource> metronome_source) { + if (ThreadWrapper::current() == nullptr) { + std::unique_ptr<ThreadWrapper> wrapper = ThreadWrapper::WrapTaskRunner( + metronome_source, base::ThreadTaskRunnerHandle::Get()); + base::CurrentThread::Get()->AddDestructionObserver(wrapper.release()); + } + + DCHECK_EQ(rtc::Thread::Current(), current()); + DCHECK_EQ(current()->metronome_source_, metronome_source); +} + +std::unique_ptr<ThreadWrapper> ThreadWrapper::WrapTaskRunner( + scoped_refptr<blink::MetronomeSource> metronome_source, + scoped_refptr<base::SingleThreadTaskRunner> task_runner) { + DCHECK(!ThreadWrapper::current()); + DCHECK(task_runner->BelongsToCurrentThread()); + + std::unique_ptr<ThreadWrapper> result( + new ThreadWrapper(std::move(metronome_source), task_runner)); + g_jingle_thread_wrapper.Get().Set(result.get()); + return result; +} + +// static +ThreadWrapper* ThreadWrapper::current() { + return g_jingle_thread_wrapper.Get().Get(); +} + +void ThreadWrapper::SetLatencyAndTaskDurationCallbacks( + SampledDurationCallback task_latency_callback, + SampledDurationCallback task_duration_callback) { + task_latency_callback_ = std::move(task_latency_callback); + task_duration_callback_ = std::move(task_duration_callback); +} + +ThreadWrapper::ThreadWrapper( + scoped_refptr<blink::MetronomeSource> metronome_source, + scoped_refptr<base::SingleThreadTaskRunner> task_runner) + : Thread(std::make_unique<rtc::PhysicalSocketServer>()), + metronome_source_(std::move(metronome_source)), + task_runner_(task_runner), + send_allowed_(false), + last_task_id_(0), + pending_send_event_(base::WaitableEvent::ResetPolicy::MANUAL, + base::WaitableEvent::InitialState::NOT_SIGNALED) { + DCHECK(task_runner->BelongsToCurrentThread()); + DCHECK(!rtc::Thread::Current()); + weak_ptr_ = weak_ptr_factory_.GetWeakPtr(); + rtc::ThreadManager::Add(this); + SafeWrapCurrent(); +} + +ThreadWrapper::~ThreadWrapper() { + DCHECK_EQ(this, ThreadWrapper::current()); + DCHECK_EQ(this, rtc::Thread::Current()); + + UnwrapCurrent(); + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); + rtc::ThreadManager::Remove(this); + g_jingle_thread_wrapper.Get().Set(nullptr); + + Clear(nullptr, rtc::MQID_ANY, nullptr); + coalesced_tasks_.Clear(); +} + +void ThreadWrapper::WillDestroyCurrentMessageLoop() { + delete this; +} + +void ThreadWrapper::Post(const rtc::Location& posted_from, + rtc::MessageHandler* handler, + uint32_t message_id, + rtc::MessageData* data, + bool time_sensitive) { + PostTaskInternal(posted_from, 0, handler, message_id, data); +} + +void ThreadWrapper::PostDelayed(const rtc::Location& posted_from, + int delay_ms, + rtc::MessageHandler* handler, + uint32_t message_id, + rtc::MessageData* data) { + PostTaskInternal(posted_from, delay_ms, handler, message_id, data); +} + +void ThreadWrapper::Clear(rtc::MessageHandler* handler, + uint32_t id, + rtc::MessageList* removed) { + base::AutoLock auto_lock(lock_); + + for (MessagesQueue::iterator it = messages_.begin(); it != messages_.end();) { + MessagesQueue::iterator next = it; + ++next; + + if (it->second.Match(handler, id)) { + if (removed) { + removed->push_back(it->second); + } else { + delete it->second.pdata; + } + messages_.erase(it); + } + + it = next; + } + + for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin(); + it != pending_send_messages_.end();) { + std::list<PendingSend*>::iterator next = it; + ++next; + + if ((*it)->message.Match(handler, id)) { + if (removed) { + removed->push_back((*it)->message); + } else { + delete (*it)->message.pdata; + } + (*it)->done_event.Signal(); + pending_send_messages_.erase(it); + } + + it = next; + } +} + +void ThreadWrapper::Dispatch(rtc::Message* message) { + TRACE_EVENT2("webrtc", "ThreadWrapper::Dispatch", "src_file_and_line", + message->posted_from.file_and_line(), "src_func", + message->posted_from.function_name()); + message->phandler->OnMessage(message); +} + +void ThreadWrapper::Send(const rtc::Location& posted_from, + rtc::MessageHandler* handler, + uint32_t id, + rtc::MessageData* data) { + ThreadWrapper* current_thread = ThreadWrapper::current(); + DCHECK(current_thread != nullptr) << "Send() can be called only from a " + "thread that has ThreadWrapper."; + + rtc::Message message; + message.posted_from = posted_from; + message.phandler = handler; + message.message_id = id; + message.pdata = data; + + if (current_thread == this) { + Dispatch(&message); + return; + } + + // Send message from a thread different than |this|. + + // Allow inter-thread send only from threads that have + // |send_allowed_| flag set. + DCHECK(current_thread->send_allowed_) + << "Send()'ing synchronous " + "messages is not allowed from the current thread."; + + PendingSend pending_send(message); + { + base::AutoLock auto_lock(lock_); + pending_send_messages_.push_back(&pending_send); + } + + // Need to signal |pending_send_event_| here in case the thread is + // sending message to another thread. + pending_send_event_.Signal(); + task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&ThreadWrapper::ProcessPendingSends, weak_ptr_)); + + while (!pending_send.done_event.IsSignaled()) { + base::WaitableEvent* events[] = {&pending_send.done_event, + ¤t_thread->pending_send_event_}; + size_t event = base::WaitableEvent::WaitMany(events, base::size(events)); + DCHECK(event == 0 || event == 1); + + if (event == 1) + current_thread->ProcessPendingSends(); + } +} + +void ThreadWrapper::ProcessPendingSends() { + while (true) { + PendingSend* pending_send = nullptr; + { + base::AutoLock auto_lock(lock_); + if (!pending_send_messages_.empty()) { + pending_send = pending_send_messages_.front(); + pending_send_messages_.pop_front(); + } else { + // Reset the event while |lock_| is still locked. + pending_send_event_.Reset(); + break; + } + } + if (pending_send) { + Dispatch(&pending_send->message); + pending_send->done_event.Signal(); + } + } +} + +void ThreadWrapper::PostTaskInternal(const rtc::Location& posted_from, + int delay_ms, + rtc::MessageHandler* handler, + uint32_t message_id, + rtc::MessageData* data) { + int task_id; + rtc::Message message; + message.posted_from = posted_from; + message.phandler = handler; + message.message_id = message_id; + message.pdata = data; + { + base::AutoLock auto_lock(lock_); + task_id = ++last_task_id_; + messages_.insert(std::pair<int, rtc::Message>(task_id, message)); + } + + if (delay_ms <= 0) { + task_runner_->PostTask( + FROM_HERE, base::BindOnce(&ThreadWrapper::RunTask, weak_ptr_, task_id)); + } else { + task_runner_->PostDelayedTaskAt( + base::subtle::PostDelayedTaskPassKey(), FROM_HERE, + base::BindOnce(&ThreadWrapper::RunTask, weak_ptr_, task_id), + base::TimeTicks::Now() + base::Milliseconds(delay_ms), + base::subtle::DelayPolicy::kPrecise); + } +} + +void ThreadWrapper::PostTask(std::unique_ptr<webrtc::QueuedTask> task) { + task_runner_->PostTask( + FROM_HERE, base::BindOnce(&ThreadWrapper::RunTaskQueueTask, weak_ptr_, + std::move(task))); +} + +void ThreadWrapper::PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task, + uint32_t milliseconds) { + base::TimeTicks target_time = + base::TimeTicks::Now() + base::Milliseconds(milliseconds); + if (metronome_source_) { + // Coalesce tasks onto the metronome. + base::TimeTicks snapped_target_time = + metronome_source_->GetTimeSnappedToNextMetronomeTick(target_time); + if (coalesced_tasks_.QueueDelayedTask(target_time, std::move(task), + snapped_target_time)) { + task_runner_->PostDelayedTaskAt( + base::subtle::PostDelayedTaskPassKey(), FROM_HERE, + base::BindOnce(&ThreadWrapper::RunCoalescedTaskQueueTasks, weak_ptr_, + snapped_target_time), + snapped_target_time, base::subtle::DelayPolicy::kPrecise); + } + return; + } + // Do not coalesce tasks onto the metronome. + task_runner_->PostDelayedTaskAt( + base::subtle::PostDelayedTaskPassKey(), FROM_HERE, + base::BindOnce(&ThreadWrapper::RunTaskQueueTask, weak_ptr_, + std::move(task)), + target_time, base::subtle::DelayPolicy::kPrecise); +} + +void ThreadWrapper::PostDelayedHighPrecisionTask( + std::unique_ptr<webrtc::QueuedTask> task, + uint32_t milliseconds) { + base::TimeTicks target_time = + base::TimeTicks::Now() + base::Milliseconds(milliseconds); + task_runner_->PostDelayedTaskAt( + base::subtle::PostDelayedTaskPassKey(), FROM_HERE, + base::BindOnce(&ThreadWrapper::RunTaskQueueTask, weak_ptr_, + std::move(task)), + target_time, base::subtle::DelayPolicy::kPrecise); +} + +absl::optional<base::TimeTicks> ThreadWrapper::PrepareRunTask() { + if (!latency_sampler_ && task_latency_callback_) { + latency_sampler_ = std::make_unique<PostTaskLatencySampler>( + task_runner_, std::move(task_latency_callback_)); + } + absl::optional<base::TimeTicks> task_start_timestamp; + if (!task_duration_callback_.is_null() && latency_sampler_ && + latency_sampler_->ShouldSampleNextTaskDuration()) { + task_start_timestamp = base::TimeTicks::Now(); + } + return task_start_timestamp; +} + +void ThreadWrapper::RunTaskQueueTask(std::unique_ptr<webrtc::QueuedTask> task) { + absl::optional<base::TimeTicks> task_start_timestamp = PrepareRunTask(); + + // Follow QueuedTask::Run() semantics: delete if it returns true, release + // otherwise. + if (task->Run()) + task.reset(); + else + task.release(); + + FinalizeRunTask(std::move(task_start_timestamp)); +} + +void ThreadWrapper::RunCoalescedTaskQueueTasks(base::TimeTicks scheduled_time) { + // base::Unretained(this) is safe here because these callbacks are only used + // for the duration of the RunScheduledTasks() call. + coalesced_tasks_.RunScheduledTasks( + scheduled_time, + base::BindRepeating(&ThreadWrapper::PrepareRunTask, + base::Unretained(this)), + base::BindRepeating(&ThreadWrapper::FinalizeRunTask, + base::Unretained(this))); +} + +void ThreadWrapper::RunTask(int task_id) { + absl::optional<base::TimeTicks> task_start_timestamp = PrepareRunTask(); + + RunTaskInternal(task_id); + + FinalizeRunTask(std::move(task_start_timestamp)); +} + +void ThreadWrapper::FinalizeRunTask( + absl::optional<base::TimeTicks> task_start_timestamp) { + if (task_start_timestamp.has_value()) + task_duration_callback_.Run(base::TimeTicks::Now() - *task_start_timestamp); +} + +void ThreadWrapper::RunTaskInternal(int task_id) { + bool have_message = false; + rtc::Message message; + { + base::AutoLock auto_lock(lock_); + MessagesQueue::iterator it = messages_.find(task_id); + if (it != messages_.end()) { + have_message = true; + message = it->second; + messages_.erase(it); + } + } + + if (have_message) { + if (message.message_id == rtc::MQID_DISPOSE) { + DCHECK(message.phandler == nullptr); + delete message.pdata; + } else { + Dispatch(&message); + } + } +} + +bool ThreadWrapper::IsQuitting() { + NOTIMPLEMENTED_LOG_ONCE(); + return false; +} + +// All methods below are marked as not reached. See comments in the +// header for more details. +void ThreadWrapper::Quit() { + NOTREACHED(); +} + +void ThreadWrapper::Restart() { + NOTREACHED(); +} + +bool ThreadWrapper::Get(rtc::Message*, int, bool) { + NOTREACHED(); + return false; +} + +bool ThreadWrapper::Peek(rtc::Message*, int) { + NOTREACHED(); + return false; +} + +int ThreadWrapper::GetDelay() { + NOTREACHED(); + return 0; +} + +void ThreadWrapper::Stop() { + NOTREACHED(); +} + +void ThreadWrapper::Run() { + NOTREACHED(); +} + +} // namespace webrtc |