diff options
Diffstat (limited to 'chromium/media/remoting/receiver.cc')
-rw-r--r-- | chromium/media/remoting/receiver.cc | 295 |
1 files changed, 161 insertions, 134 deletions
diff --git a/chromium/media/remoting/receiver.cc b/chromium/media/remoting/receiver.cc index 6db0c7d3b9d..40bffe10dd3 100644 --- a/chromium/media/remoting/receiver.cc +++ b/chromium/media/remoting/receiver.cc @@ -6,10 +6,14 @@ #include "base/bind.h" #include "base/callback.h" +#include "base/single_thread_task_runner.h" +#include "base/threading/thread_task_runner_handle.h" +#include "media/base/bind_to_current_loop.h" #include "media/base/decoder_buffer.h" #include "media/base/renderer.h" #include "media/remoting/proto_enum_utils.h" #include "media/remoting/proto_utils.h" +#include "media/remoting/receiver_controller.h" #include "media/remoting/stream_provider.h" namespace media { @@ -23,116 +27,168 @@ constexpr base::TimeDelta kTimeUpdateInterval = } // namespace -Receiver::Receiver(std::unique_ptr<Renderer> renderer, RpcBroker* rpc_broker) - : renderer_(std::move(renderer)), - rpc_broker_(rpc_broker), - rpc_handle_(rpc_broker_->GetUniqueHandle()) { - DCHECK(renderer_); +Receiver::Receiver( + int rpc_handle, + int remote_handle, + ReceiverController* receiver_controller, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + std::unique_ptr<Renderer> renderer, + base::OnceCallback<void(int)> acquire_renderer_done_cb) + : rpc_handle_(rpc_handle), + remote_handle_(remote_handle), + receiver_controller_(receiver_controller), + rpc_broker_(receiver_controller_->rpc_broker()), + main_task_runner_(base::ThreadTaskRunnerHandle::Get()), + media_task_runner_(media_task_runner), + renderer_(std::move(renderer)), + acquire_renderer_done_cb_(std::move(acquire_renderer_done_cb)) { + DCHECK(rpc_handle_ != RpcBroker::kInvalidHandle); + DCHECK(receiver_controller_); DCHECK(rpc_broker_); - rpc_broker_->RegisterMessageReceiverCallback( - rpc_handle_, base::BindRepeating(&Receiver::OnReceivedRpc, - weak_factory_.GetWeakPtr())); - rpc_broker_->RegisterMessageReceiverCallback( - RpcBroker::kAcquireHandle, - base::BindRepeating(&Receiver::OnReceivedRpc, - weak_factory_.GetWeakPtr())); + DCHECK(renderer_); + + // Note: The constructor is running on the main thread, but will be destroyed + // on the media thread. Therefore, all weak pointers must be dereferenced on + // the media thread. + const RpcBroker::ReceiveMessageCallback receive_callback = BindToLoop( + media_task_runner_, + BindRepeating(&Receiver::OnReceivedRpc, weak_factory_.GetWeakPtr())); + + // Listening all renderer rpc messages. + rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback); + VerifyAcquireRendererDone(); } Receiver::~Receiver() { rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_); - rpc_broker_->UnregisterMessageReceiverCallback(RpcBroker::kAcquireHandle); + rpc_broker_->UnregisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle); +} + +// Receiver::Initialize() will be called by the local pipeline, it would only +// keep the |init_cb| in order to continue the initialization once it receives +// RPC_R_INITIALIZE, which means Receiver::RpcInitialize() is called. +void Receiver::Initialize(MediaResource* media_resource, + RendererClient* client, + PipelineStatusCallback init_cb) { + demuxer_ = media_resource; + init_cb_ = std::move(init_cb); + ShouldInitializeRenderer(); +} + +/* CDM is not supported for remoting media */ +void Receiver::SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) { + NOTREACHED(); +} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::Flush(base::OnceClosure flush_cb) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::StartPlayingFrom(base::TimeDelta time) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::SetPlaybackRate(double playback_rate) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::SetVolume(float volume) {} + +// No-op. Controlled by sender via RPC calls instead. +base::TimeDelta Receiver::GetMediaTime() { + return base::TimeDelta(); +} + +void Receiver::SendRpcMessageOnMainThread( + std::unique_ptr<pb::RpcMessage> message) { + // |rpc_broker_| is owned by |receiver_controller_| which is a singleton per + // process, so it's safe to use Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&RpcBroker::SendMessageToRemote, + base::Unretained(rpc_broker_), std::move(message))); } void Receiver::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(message); switch (message->proc()) { - case pb::RpcMessage::RPC_ACQUIRE_RENDERER: - AcquireRenderer(std::move(message)); + case pb::RpcMessage::RPC_R_INITIALIZE: + RpcInitialize(std::move(message)); break; case pb::RpcMessage::RPC_R_FLUSHUNTIL: - FlushUntil(std::move(message)); + RpcFlushUntil(std::move(message)); break; case pb::RpcMessage::RPC_R_STARTPLAYINGFROM: - StartPlayingFrom(std::move(message)); + RpcStartPlayingFrom(std::move(message)); break; case pb::RpcMessage::RPC_R_SETPLAYBACKRATE: - SetPlaybackRate(std::move(message)); + RpcSetPlaybackRate(std::move(message)); break; case pb::RpcMessage::RPC_R_SETVOLUME: - SetVolume(std::move(message)); - break; - case pb::RpcMessage::RPC_R_INITIALIZE: - Initialize(std::move(message)); + RpcSetVolume(std::move(message)); break; default: - VLOG(1) << __func__ << ": Unknow RPC message. proc=" << message->proc(); + VLOG(1) << __func__ << ": Unknown RPC message. proc=" << message->proc(); } } -void Receiver::AcquireRenderer(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_ACQUIRE_RENDERER with remote_handle= " - << message->integer_value(); +void Receiver::SetRemoteHandle(int remote_handle) { + DCHECK_NE(remote_handle, RpcBroker::kInvalidHandle); + DCHECK_EQ(remote_handle_, RpcBroker::kInvalidHandle); + remote_handle_ = remote_handle; + VerifyAcquireRendererDone(); +} - remote_handle_ = message->integer_value(); - if (stream_provider_) { - VLOG(1) << "Acquire renderer error: Already acquired."; - OnError(PipelineStatus::PIPELINE_ERROR_DECODE); +void Receiver::VerifyAcquireRendererDone() { + if (remote_handle_ == RpcBroker::kInvalidHandle) return; - } - - stream_provider_.reset(new StreamProvider( - rpc_broker_, - base::BindOnce(&Receiver::OnError, weak_factory_.GetWeakPtr(), - PipelineStatus::PIPELINE_ERROR_DECODE))); - DVLOG(3) << __func__ - << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle=" - << remote_handle_ << " rpc_handle=" << rpc_handle_; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); - rpc->set_handle(remote_handle_); - rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE); - rpc->set_integer_value(rpc_handle_); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + DCHECK(acquire_renderer_done_cb_); + std::move(acquire_renderer_done_cb_).Run(rpc_handle_); } -void Receiver::Initialize(std::unique_ptr<pb::RpcMessage> message) { - DCHECK(stream_provider_); - DVLOG(3) << __func__ << ": Receives RPC_R_INITIALIZE with callback handle= " - << message->renderer_initialize_rpc().callback_handle(); - DCHECK(message->renderer_initialize_rpc().callback_handle() == - remote_handle_); - if (!stream_provider_) - OnRendererInitialized(PipelineStatus::PIPELINE_ERROR_INITIALIZATION_FAILED); - - stream_provider_->Initialize( - message->renderer_initialize_rpc().audio_demuxer_handle(), - message->renderer_initialize_rpc().video_demuxer_handle(), - base::BindOnce(&Receiver::OnStreamInitialized, - weak_factory_.GetWeakPtr())); +void Receiver::RpcInitialize(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(renderer_); + rpc_initialize_received_ = true; + ShouldInitializeRenderer(); } -void Receiver::OnStreamInitialized() { - DCHECK(stream_provider_); - renderer_->Initialize(stream_provider_.get(), this, +void Receiver::ShouldInitializeRenderer() { + // ShouldInitializeRenderer() will be called from Initialize() and + // RpcInitialize() in different orders. + // + // |renderer_| must be initialized when both Initialize() and + // RpcInitialize() are called. + if (!rpc_initialize_received_ || !init_cb_) + return; + + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(renderer_); + DCHECK(demuxer_); + renderer_->Initialize(demuxer_, this, base::BindOnce(&Receiver::OnRendererInitialized, weak_factory_.GetWeakPtr())); } void Receiver::OnRendererInitialized(PipelineStatus status) { - DVLOG(3) << __func__ << ": Issues RPC_R_INITIALIZE_CALLBACK RPC message." - << "remote_handle=" << remote_handle_; + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(init_cb_); + std::move(init_cb_).Run(status); - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK); rpc->set_boolean_value(status == PIPELINE_OK); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } -void Receiver::SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) { +void Receiver::RpcSetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + const double playback_rate = message->double_value(); - DVLOG(3) << __func__ - << ": Receives RPC_R_SETPLAYBACKRATE with rate=" << playback_rate; renderer_->SetPlaybackRate(playback_rate); if (playback_rate == 0.0) { @@ -147,38 +203,32 @@ void Receiver::SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) { } } -void Receiver::FlushUntil(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_R_FLUSHUNTIL RPC message."; +void Receiver::RpcFlushUntil(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(message->has_renderer_flushuntil_rpc()); const pb::RendererFlushUntil flush_message = message->renderer_flushuntil_rpc(); DCHECK_EQ(flush_message.callback_handle(), remote_handle_); - if (stream_provider_) { - if (flush_message.has_audio_count()) { - stream_provider_->FlushUntil(DemuxerStream::AUDIO, - flush_message.audio_count()); - } - if (flush_message.has_video_count()) { - stream_provider_->FlushUntil(DemuxerStream::VIDEO, - flush_message.video_count()); - } - } + + receiver_controller_->OnRendererFlush(flush_message.audio_count(), + flush_message.video_count()); + time_update_timer_.Stop(); renderer_->Flush( base::BindOnce(&Receiver::OnFlushDone, weak_factory_.GetWeakPtr())); } void Receiver::OnFlushDone() { - DVLOG(3) << __func__ << ": Issues RPC_R_FLUSHUNTIL_CALLBACK RPC message."; - - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } -void Receiver::StartPlayingFrom(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_R_STARTPLAYINGFROM message."; +void Receiver::RpcStartPlayingFrom(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + base::TimeDelta time = base::TimeDelta::FromMicroseconds(message->integer64_value()); renderer_->StartPlayingFrom(time); @@ -194,14 +244,14 @@ void Receiver::ScheduleMediaTimeUpdates() { weak_factory_.GetWeakPtr())); } -void Receiver::SetVolume(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_R_SETVOLUME message."; +void Receiver::RpcSetVolume(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); renderer_->SetVolume(message->double_value()); } void Receiver::SendMediaTimeUpdate() { // Issues RPC_RC_ONTIMEUPDATE RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONTIMEUPDATE); auto* message = rpc->mutable_rendererclient_ontimeupdate_rpc(); @@ -209,40 +259,26 @@ void Receiver::SendMediaTimeUpdate() { message->set_time_usec(media_time.InMicroseconds()); base::TimeDelta max_time = media_time; message->set_max_time_usec(max_time.InMicroseconds()); - DVLOG(3) << __func__ << ": Issues RPC_RC_ONTIMEUPDATE message." - << " media_time = " << media_time.InMicroseconds() - << " max_time= " << max_time.InMicroseconds(); - rpc_broker_->SendMessageToRemote(std::move(rpc)); -} - -void Receiver::OnReceivedBuffer(DemuxerStream::Type type, - scoped_refptr<DecoderBuffer> buffer) { - DVLOG(3) << __func__ - << ": type=" << (type == DemuxerStream::AUDIO ? "Audio" : "Video"); - DCHECK(stream_provider_); - stream_provider_->AppendBuffer(type, buffer); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnError(PipelineStatus status) { - VLOG(1) << __func__ << ": Issues RPC_RC_ONERROR message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONERROR); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnEnded() { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONENDED message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONENDED); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); time_update_timer_.Stop(); } void Receiver::OnStatisticsUpdate(const PipelineStatistics& stats) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONSTATISTICSUPDATE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONSTATISTICSUPDATE); auto* message = rpc->mutable_rendererclient_onstatisticsupdate_rpc(); @@ -252,77 +288,68 @@ void Receiver::OnStatisticsUpdate(const PipelineStatistics& stats) { message->set_video_frames_dropped(stats.video_frames_dropped); message->set_audio_memory_usage(stats.audio_memory_usage); message->set_video_memory_usage(stats.video_memory_usage); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnBufferingStateChange(BufferingState state, BufferingStateChangeReason reason) { - DVLOG(3) << __func__ - << ": Issues RPC_RC_ONBUFFERINGSTATECHANGE message: state=" << state; - - // The |reason| is determined on the other side of the RPC in CourierRenderer. - // For now, there is no reason to provide this in the |message| below. - DCHECK_EQ(reason, BUFFERING_CHANGE_REASON_UNKNOWN); - - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE); auto* message = rpc->mutable_rendererclient_onbufferingstatechange_rpc(); message->set_state(ToProtoMediaBufferingState(state).value()); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } // TODO: Passes |reason| over. void Receiver::OnWaiting(WaitingReason reason) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONWAITINGFORDECRYPTIONKEY message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnAudioConfigChange(const AudioDecoderConfig& config) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONAUDIOCONFIGCHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE); auto* message = rpc->mutable_rendererclient_onaudioconfigchange_rpc(); pb::AudioDecoderConfig* proto_audio_config = message->mutable_audio_decoder_config(); ConvertAudioDecoderConfigToProto(config, proto_audio_config); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnVideoConfigChange(const VideoDecoderConfig& config) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEOCONFIGCHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE); auto* message = rpc->mutable_rendererclient_onvideoconfigchange_rpc(); pb::VideoDecoderConfig* proto_video_config = message->mutable_video_decoder_config(); ConvertVideoDecoderConfigToProto(config, proto_video_config); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnVideoNaturalSizeChange(const gfx::Size& size) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEONATURALSIZECHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE); auto* message = rpc->mutable_rendererclient_onvideonatualsizechange_rpc(); message->set_width(size.width()); message->set_height(size.height()); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); + + // Notify the host. + receiver_controller_->OnVideoNaturalSizeChange(size); } void Receiver::OnVideoOpacityChange(bool opaque) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEOOPACITYCHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE); rpc->set_boolean_value(opaque); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnVideoFrameRateChange(base::Optional<int>) {} |