// Copyright 2012 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "components/webrtc/thread_wrapper.h" #include #include #include #include "base/bind.h" #include "base/callback_helpers.h" #include "base/lazy_instance.h" #include "base/memory/raw_ptr.h" #include "base/sequence_checker.h" #include "base/synchronization/waitable_event.h" #include "base/thread_annotations.h" #include "base/threading/thread_local.h" #include "base/threading/thread_task_runner_handle.h" #include "base/time/time.h" #include "base/trace_event/trace_event.h" #include "third_party/abseil-cpp/absl/types/optional.h" #include "third_party/webrtc/rtc_base/physical_socket_server.h" #include "third_party/webrtc_overrides/metronome_source.h" namespace webrtc { namespace { constexpr base::TimeDelta kTaskLatencySampleDuration = base::Seconds(3); } // Class intended to conditionally live for the duration of ThreadWrapper // that periodically captures task latencies (definition in docs for // SetLatencyAndTaskDurationCallbacks). class ThreadWrapper::PostTaskLatencySampler { public: PostTaskLatencySampler( scoped_refptr task_runner, SampledDurationCallback task_latency_callback) : task_runner_(task_runner), task_latency_callback_(std::move(task_latency_callback)) { ScheduleDelayedSample(); } bool ShouldSampleNextTaskDuration() { DCHECK_CALLED_ON_VALID_SEQUENCE(current_); bool time_to_sample = should_sample_next_task_duration_; should_sample_next_task_duration_ = false; return time_to_sample; } private: void ScheduleDelayedSample() { DCHECK_CALLED_ON_VALID_SEQUENCE(current_); task_runner_->PostDelayedTask( FROM_HERE, base::BindOnce(&PostTaskLatencySampler::TakeSample, base::Unretained(this)), kTaskLatencySampleDuration); } void TakeSample() { DCHECK_CALLED_ON_VALID_SEQUENCE(current_); task_runner_->PostTask( FROM_HERE, base::BindOnce(&PostTaskLatencySampler::FinishSample, base::Unretained(this), base::TimeTicks::Now())); } void FinishSample(base::TimeTicks post_timestamp) { DCHECK_CALLED_ON_VALID_SEQUENCE(current_); task_latency_callback_.Run(base::TimeTicks::Now() - post_timestamp); ScheduleDelayedSample(); should_sample_next_task_duration_ = true; } SEQUENCE_CHECKER(current_); scoped_refptr task_runner_; base::RepeatingCallback task_latency_callback_ GUARDED_BY_CONTEXT(current_); bool should_sample_next_task_duration_ GUARDED_BY_CONTEXT(current_) = false; }; struct ThreadWrapper::PendingSend { explicit PendingSend(rtc::FunctionView functor) : functor(functor), done_event(base::WaitableEvent::ResetPolicy::MANUAL, base::WaitableEvent::InitialState::NOT_SIGNALED) {} rtc::FunctionView functor; base::WaitableEvent done_event; }; base::LazyInstance>::DestructorAtExit g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; // static void ThreadWrapper::EnsureForCurrentMessageLoop() { if (ThreadWrapper::current() == nullptr) { std::unique_ptr wrapper = ThreadWrapper::WrapTaskRunner(base::ThreadTaskRunnerHandle::Get()); base::CurrentThread::Get()->AddDestructionObserver(wrapper.release()); } DCHECK_EQ(rtc::Thread::Current(), current()); } std::unique_ptr ThreadWrapper::WrapTaskRunner( scoped_refptr task_runner) { DCHECK(!ThreadWrapper::current()); DCHECK(task_runner->BelongsToCurrentThread()); std::unique_ptr result(new ThreadWrapper(task_runner)); g_jingle_thread_wrapper.Get().Set(result.get()); return result; } // static ThreadWrapper* ThreadWrapper::current() { return g_jingle_thread_wrapper.Get().Get(); } void ThreadWrapper::SetLatencyAndTaskDurationCallbacks( SampledDurationCallback task_latency_callback, SampledDurationCallback task_duration_callback) { task_latency_callback_ = std::move(task_latency_callback); task_duration_callback_ = std::move(task_duration_callback); } ThreadWrapper::ThreadWrapper( scoped_refptr task_runner) : Thread(std::make_unique()), task_runner_(task_runner), send_allowed_(false), pending_send_event_(base::WaitableEvent::ResetPolicy::MANUAL, base::WaitableEvent::InitialState::NOT_SIGNALED) { DCHECK(task_runner->BelongsToCurrentThread()); DCHECK(!rtc::Thread::Current()); weak_ptr_ = weak_ptr_factory_.GetWeakPtr(); rtc::ThreadManager::Add(this); SafeWrapCurrent(); } ThreadWrapper::~ThreadWrapper() { DCHECK_EQ(this, ThreadWrapper::current()); DCHECK_EQ(this, rtc::Thread::Current()); UnwrapCurrent(); rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); rtc::ThreadManager::Remove(this); g_jingle_thread_wrapper.Get().Set(nullptr); CHECK(pending_send_messages_.empty()); coalesced_tasks_.Clear(); } rtc::SocketServer* ThreadWrapper::SocketServer() { return rtc::Thread::socketserver(); } void ThreadWrapper::WillDestroyCurrentMessageLoop() { delete this; } void ThreadWrapper::BlockingCall(rtc::FunctionView functor) { ThreadWrapper* current_thread = ThreadWrapper::current(); DCHECK(current_thread != nullptr) << "BlockingCall() can be called only from " "a thread that has ThreadWrapper."; if (current_thread == this) { functor(); return; } // Send message from a thread different than |this|. // Allow inter-thread send only from threads that have // |send_allowed_| flag set. DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous " "messages is not allowed from the current thread."; PendingSend pending_send(functor); { base::AutoLock auto_lock(lock_); pending_send_messages_.push_back(&pending_send); } // Need to signal |pending_send_event_| here in case the thread is // sending message to another thread. pending_send_event_.Signal(); task_runner_->PostTask( FROM_HERE, base::BindOnce(&ThreadWrapper::ProcessPendingSends, weak_ptr_)); while (!pending_send.done_event.IsSignaled()) { base::WaitableEvent* events[] = {&pending_send.done_event, ¤t_thread->pending_send_event_}; size_t event = base::WaitableEvent::WaitMany(events, std::size(events)); DCHECK(event == 0 || event == 1); if (event == 1) current_thread->ProcessPendingSends(); } } void ThreadWrapper::ProcessPendingSends() { while (true) { PendingSend* pending_send = nullptr; { base::AutoLock auto_lock(lock_); if (!pending_send_messages_.empty()) { pending_send = pending_send_messages_.front(); pending_send_messages_.pop_front(); } else { // Reset the event while |lock_| is still locked. pending_send_event_.Reset(); break; } } if (pending_send) { pending_send->functor(); pending_send->done_event.Signal(); } } } void ThreadWrapper::PostTask(absl::AnyInvocable task) { task_runner_->PostTask( FROM_HERE, base::BindOnce(&ThreadWrapper::RunTaskQueueTask, weak_ptr_, std::move(task))); } void ThreadWrapper::PostDelayedTask(absl::AnyInvocable task, TimeDelta delay) { base::TimeTicks target_time = base::TimeTicks::Now() + base::Microseconds(delay.us()); // Coalesce low precision tasks onto the metronome. base::TimeTicks snapped_target_time = blink::MetronomeSource::TimeSnappedToNextTick(target_time); if (coalesced_tasks_.QueueDelayedTask(target_time, std::move(task), snapped_target_time)) { task_runner_->PostDelayedTaskAt( base::subtle::PostDelayedTaskPassKey(), FROM_HERE, base::BindOnce(&ThreadWrapper::RunCoalescedTaskQueueTasks, weak_ptr_, snapped_target_time), snapped_target_time, base::subtle::DelayPolicy::kPrecise); } } void ThreadWrapper::PostDelayedHighPrecisionTask( absl::AnyInvocable task, webrtc::TimeDelta delay) { base::TimeTicks target_time = base::TimeTicks::Now() + base::Microseconds(delay.us()); task_runner_->PostDelayedTaskAt( base::subtle::PostDelayedTaskPassKey(), FROM_HERE, base::BindOnce(&ThreadWrapper::RunTaskQueueTask, weak_ptr_, std::move(task)), target_time, base::subtle::DelayPolicy::kPrecise); } absl::optional ThreadWrapper::PrepareRunTask() { if (!latency_sampler_ && task_latency_callback_) { latency_sampler_ = std::make_unique( task_runner_, std::move(task_latency_callback_)); } absl::optional task_start_timestamp; if (!task_duration_callback_.is_null() && latency_sampler_ && latency_sampler_->ShouldSampleNextTaskDuration()) { task_start_timestamp = base::TimeTicks::Now(); } return task_start_timestamp; } void ThreadWrapper::RunTaskQueueTask(absl::AnyInvocable task) { absl::optional task_start_timestamp = PrepareRunTask(); std::move(task)(); task = nullptr; FinalizeRunTask(std::move(task_start_timestamp)); } void ThreadWrapper::RunCoalescedTaskQueueTasks(base::TimeTicks scheduled_time) { // base::Unretained(this) is safe here because these callbacks are only used // for the duration of the RunScheduledTasks() call. coalesced_tasks_.RunScheduledTasks( scheduled_time, base::BindRepeating(&ThreadWrapper::PrepareRunTask, base::Unretained(this)), base::BindRepeating(&ThreadWrapper::FinalizeRunTask, base::Unretained(this))); } void ThreadWrapper::FinalizeRunTask( absl::optional task_start_timestamp) { if (task_start_timestamp.has_value()) task_duration_callback_.Run(base::TimeTicks::Now() - *task_start_timestamp); } bool ThreadWrapper::IsQuitting() { NOTREACHED(); return false; } // All methods below are marked as not reached. See comments in the // header for more details. void ThreadWrapper::Quit() { NOTREACHED(); } void ThreadWrapper::Restart() { NOTREACHED(); } int ThreadWrapper::GetDelay() { NOTREACHED(); return 0; } void ThreadWrapper::Stop() { NOTREACHED(); } void ThreadWrapper::Run() { NOTREACHED(); } } // namespace webrtc