diff options
author | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2018-08-28 15:28:34 +0200 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2018-08-28 13:54:51 +0000 |
commit | 2a19c63448c84c1805fb1a585c3651318bb86ca7 (patch) | |
tree | eb17888e8531aa6ee5e85721bd553b832a7e5156 /chromium/components/mirroring | |
parent | b014812705fc80bff0a5c120dfcef88f349816dc (diff) | |
download | qtwebengine-chromium-2a19c63448c84c1805fb1a585c3651318bb86ca7.tar.gz |
BASELINE: Update Chromium to 69.0.3497.70
Change-Id: I2b7b56e4e7a8b26656930def0d4575dc32b900a0
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/components/mirroring')
24 files changed, 2690 insertions, 233 deletions
diff --git a/chromium/components/mirroring/service/BUILD.gn b/chromium/components/mirroring/service/BUILD.gn index a5b3ef6d2f4..1f9d28bf573 100644 --- a/chromium/components/mirroring/service/BUILD.gn +++ b/chromium/components/mirroring/service/BUILD.gn @@ -18,6 +18,7 @@ source_set("interface") { "//media/capture/mojom:video_capture", "//media/cast:common", "//media/mojo/interfaces", + "//media/mojo/interfaces:remoting", "//net", "//services/network/public/mojom", ] @@ -25,12 +26,18 @@ source_set("interface") { source_set("service") { sources = [ + "captured_audio_input.cc", + "captured_audio_input.h", + "media_remoter.cc", + "media_remoter.h", "message_dispatcher.cc", "message_dispatcher.h", "mirror_settings.cc", "mirror_settings.h", "receiver_response.cc", "receiver_response.h", + "remoting_sender.cc", + "remoting_sender.h", "rtp_stream.cc", "rtp_stream.h", "session.cc", @@ -62,6 +69,8 @@ source_set("service") { "//media/cast:net", "//media/cast:sender", "//media/mojo/common:common", + "//media/mojo/interfaces", + "//media/mojo/interfaces:remoting", "//mojo/public/cpp/bindings", "//mojo/public/cpp/system", "//net", @@ -74,12 +83,15 @@ source_set("service") { source_set("unittests") { testonly = true sources = [ + "captured_audio_input_unittest.cc", "fake_network_service.cc", "fake_network_service.h", "fake_video_capture_host.cc", "fake_video_capture_host.h", + "media_remoter_unittest.cc", "message_dispatcher_unittest.cc", "receiver_response_unittest.cc", + "remoting_sender_unittest.cc", "rtp_stream_unittest.cc", "session_monitor_unittest.cc", "session_unittest.cc", @@ -100,6 +112,8 @@ source_set("unittests") { "//media/cast:sender", "//media/cast:test_support", "//media/cast:test_support", + "//media/mojo/interfaces", + "//media/mojo/interfaces:remoting", "//mojo/public/cpp/bindings", "//net", "//services/network:test_support", diff --git a/chromium/components/mirroring/service/captured_audio_input.cc b/chromium/components/mirroring/service/captured_audio_input.cc new file mode 100644 index 00000000000..1d9719c120f --- /dev/null +++ b/chromium/components/mirroring/service/captured_audio_input.cc @@ -0,0 +1,96 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/captured_audio_input.h" + +#include "base/logging.h" +#include "mojo/public/cpp/system/platform_handle.h" + +namespace mirroring { + +CapturedAudioInput::CapturedAudioInput(StreamCreatorCallback callback) + : stream_creator_callback_(std::move(callback)), + stream_client_binding_(this) { + DETACH_FROM_SEQUENCE(sequence_checker_); + DCHECK(!stream_creator_callback_.is_null()); +} + +CapturedAudioInput::~CapturedAudioInput() {} + +void CapturedAudioInput::CreateStream(media::AudioInputIPCDelegate* delegate, + const media::AudioParameters& params, + bool automatic_gain_control, + uint32_t total_segments) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(!automatic_gain_control); // Invalid to be true for screen capture. + DCHECK(delegate); + DCHECK(!delegate_); + delegate_ = delegate; + stream_creator_callback_.Run(this, params, total_segments); +} + +void CapturedAudioInput::RecordStream() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(stream_.is_bound()); + stream_->Record(); +} + +void CapturedAudioInput::SetVolume(double volume) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(stream_.is_bound()); + stream_->SetVolume(volume); +} + +void CapturedAudioInput::CloseStream() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + delegate_ = nullptr; + if (stream_client_binding_.is_bound()) + stream_client_binding_.Unbind(); + stream_.reset(); +} + +void CapturedAudioInput::SetOutputDeviceForAec( + const std::string& output_device_id) { + NOTREACHED(); +} + +void CapturedAudioInput::StreamCreated( + media::mojom::AudioInputStreamPtr stream, + media::mojom::AudioInputStreamClientRequest client_request, + media::mojom::ReadOnlyAudioDataPipePtr data_pipe, + bool initially_muted) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(delegate_); + DCHECK(!stream_); + DCHECK(!stream_client_binding_.is_bound()); + + stream_ = std::move(stream); + stream_client_binding_.Bind(std::move(client_request)); + + base::PlatformFile socket_handle; + auto result = + mojo::UnwrapPlatformFile(std::move(data_pipe->socket), &socket_handle); + DCHECK_EQ(result, MOJO_RESULT_OK); + + base::ReadOnlySharedMemoryRegion& shared_memory_region = + data_pipe->shared_memory; + DCHECK(shared_memory_region.IsValid()); + + delegate_->OnStreamCreated(std::move(shared_memory_region), socket_handle, + initially_muted); +} + +void CapturedAudioInput::OnError() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(delegate_); + delegate_->OnError(); +} + +void CapturedAudioInput::OnMutedStateChanged(bool is_muted) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(delegate_); + delegate_->OnMuted(is_muted); +} + +} // namespace mirroring diff --git a/chromium/components/mirroring/service/captured_audio_input.h b/chromium/components/mirroring/service/captured_audio_input.h new file mode 100644 index 00000000000..ca92e0acabe --- /dev/null +++ b/chromium/components/mirroring/service/captured_audio_input.h @@ -0,0 +1,64 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_MIRRORING_SERVICE_CAPTURED_AUDIO_INPUT_H_ +#define COMPONENTS_MIRRORING_SERVICE_CAPTURED_AUDIO_INPUT_H_ + +#include "base/callback.h" +#include "base/macros.h" +#include "base/sequence_checker.h" +#include "components/mirroring/service/interface.h" +#include "media/audio/audio_input_ipc.h" +#include "media/mojo/interfaces/audio_input_stream.mojom.h" +#include "mojo/public/cpp/bindings/binding.h" + +namespace mirroring { + +// CapturedAudioInput handles the creation, initialization and control of an +// audio input stream created by Audio Service. +class CapturedAudioInput final : public media::AudioInputIPC, + public AudioStreamCreatorClient, + public media::mojom::AudioInputStreamClient { + public: + using StreamCreatorCallback = + base::RepeatingCallback<void(AudioStreamCreatorClient* client, + const media::AudioParameters& params, + uint32_t total_segments)>; + explicit CapturedAudioInput(StreamCreatorCallback callback); + ~CapturedAudioInput() override; + + private: + // media::AudioInputIPC implementation. + void CreateStream(media::AudioInputIPCDelegate* delegate, + const media::AudioParameters& params, + bool automatic_gain_control, + uint32_t total_segments) override; + void RecordStream() override; + void SetVolume(double volume) override; + void CloseStream() override; + void SetOutputDeviceForAec(const std::string& output_device_id) override; + + // AudioStreamCreatorClient implementation + void StreamCreated(media::mojom::AudioInputStreamPtr stream, + media::mojom::AudioInputStreamClientRequest client_request, + media::mojom::ReadOnlyAudioDataPipePtr data_pipe, + bool initially_muted) override; + + // media::mojom::AudioInputStreamClient implementation. + void OnError() override; + void OnMutedStateChanged(bool is_muted) override; + + SEQUENCE_CHECKER(sequence_checker_); + + const StreamCreatorCallback stream_creator_callback_; + mojo::Binding<media::mojom::AudioInputStreamClient> stream_client_binding_; + media::AudioInputIPCDelegate* delegate_ = nullptr; + media::mojom::AudioInputStreamPtr stream_; + + DISALLOW_COPY_AND_ASSIGN(CapturedAudioInput); +}; + +} // namespace mirroring + +#endif // COMPONENTS_MIRRORING_SERVICE_CAPTURED_AUDIO_INPUT_H_ diff --git a/chromium/components/mirroring/service/captured_audio_input_unittest.cc b/chromium/components/mirroring/service/captured_audio_input_unittest.cc new file mode 100644 index 00000000000..da8948b9513 --- /dev/null +++ b/chromium/components/mirroring/service/captured_audio_input_unittest.cc @@ -0,0 +1,177 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/captured_audio_input.h" + +#include "base/macros.h" +#include "base/run_loop.h" +#include "base/test/scoped_task_environment.h" +#include "media/base/audio_parameters.h" +#include "mojo/public/cpp/bindings/strong_binding.h" +#include "mojo/public/cpp/system/buffer.h" +#include "mojo/public/cpp/system/platform_handle.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using ::testing::InvokeWithoutArgs; + +namespace mirroring { + +namespace { + +class MockStream final : public media::mojom::AudioInputStream { + public: + MOCK_METHOD0(Record, void()); + MOCK_METHOD1(SetVolume, void(double)); +}; + +class MockDelegate final : public media::AudioInputIPCDelegate { + public: + MockDelegate() {} + ~MockDelegate() override {} + + MOCK_METHOD1(StreamCreated, void(bool initially_muted)); + MOCK_METHOD0(OnError, void()); + MOCK_METHOD1(OnMuted, void(bool muted)); + MOCK_METHOD0(OnIPCClosed, void()); + + void OnStreamCreated(base::ReadOnlySharedMemoryRegion shared_memory_region, + base::SyncSocket::Handle socket_handle, + bool initially_muted) override { + StreamCreated(initially_muted); + } +}; + +} // namespace + +class CapturedAudioInputTest : public ::testing::Test { + public: + CapturedAudioInputTest() {} + + ~CapturedAudioInputTest() override { + scoped_task_environment_.RunUntilIdle(); + } + + void CreateMockStream(bool initially_muted, + AudioStreamCreatorClient* client, + const media::AudioParameters& params, + uint32_t total_segments) { + EXPECT_EQ(base::SyncSocket::kInvalidHandle, socket_.handle()); + EXPECT_FALSE(stream_); + media::mojom::AudioInputStreamPtr stream_ptr; + auto input_stream = std::make_unique<MockStream>(); + stream_ = input_stream.get(); + mojo::MakeStrongBinding(std::move(input_stream), + mojo::MakeRequest(&stream_ptr)); + base::CancelableSyncSocket foreign_socket; + EXPECT_TRUE( + base::CancelableSyncSocket::CreatePair(&socket_, &foreign_socket)); + client->StreamCreated( + std::move(stream_ptr), mojo::MakeRequest(&stream_client_), + {base::in_place, base::ReadOnlySharedMemoryRegion::Create(1024).region, + mojo::WrapPlatformFile(foreign_socket.Release())}, + initially_muted); + } + + protected: + void CreateStream(bool initially_muted) { + audio_input_ = std::make_unique<CapturedAudioInput>( + base::BindRepeating(&CapturedAudioInputTest::CreateMockStream, + base::Unretained(this), initially_muted)); + base::RunLoop run_loop; + EXPECT_CALL(delegate_, StreamCreated(initially_muted)) + .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + audio_input_->CreateStream(&delegate_, media::AudioParameters(), false, 10); + run_loop.Run(); + } + + void CloseStream() { + EXPECT_TRUE(audio_input_); + audio_input_->CloseStream(); + scoped_task_environment_.RunUntilIdle(); + socket_.Close(); + audio_input_.reset(); + stream_ = nullptr; + } + + void SignalStreamError() { + EXPECT_TRUE(stream_client_.is_bound()); + base::RunLoop run_loop; + EXPECT_CALL(delegate_, OnError()) + .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + stream_client_->OnError(); + run_loop.Run(); + } + + void SignalMutedStateChanged(bool is_muted) { + EXPECT_TRUE(stream_client_.is_bound()); + base::RunLoop run_loop; + EXPECT_CALL(delegate_, OnMuted(true)) + .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + stream_client_->OnMutedStateChanged(is_muted); + run_loop.Run(); + } + + void SetVolume(double volume) { + EXPECT_TRUE(audio_input_); + base::RunLoop run_loop; + EXPECT_CALL(*stream_, SetVolume(volume)) + .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + audio_input_->SetVolume(volume); + run_loop.Run(); + } + + void Record() { + EXPECT_TRUE(audio_input_); + base::RunLoop run_loop; + EXPECT_CALL(*stream_, Record()) + .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + audio_input_->RecordStream(); + run_loop.Run(); + } + + private: + base::test::ScopedTaskEnvironment scoped_task_environment_; + std::unique_ptr<media::AudioInputIPC> audio_input_; + MockDelegate delegate_; + MockStream* stream_ = nullptr; + media::mojom::AudioInputStreamClientPtr stream_client_; + base::CancelableSyncSocket socket_; + + DISALLOW_COPY_AND_ASSIGN(CapturedAudioInputTest); +}; + +TEST_F(CapturedAudioInputTest, CreateStream) { + // Test that the initial muted state can be propagated to |delegate_|. + CreateStream(false); + CloseStream(); + CreateStream(true); + CloseStream(); +} + +TEST_F(CapturedAudioInputTest, PropagatesStreamError) { + CreateStream(false); + SignalStreamError(); + CloseStream(); +} + +TEST_F(CapturedAudioInputTest, PropagatesMutedStateChange) { + CreateStream(false); + SignalMutedStateChanged(true); + CloseStream(); +} + +TEST_F(CapturedAudioInputTest, SetVolume) { + CreateStream(false); + SetVolume(0.8); + CloseStream(); +} + +TEST_F(CapturedAudioInputTest, Record) { + CreateStream(false); + Record(); + CloseStream(); +} + +} // namespace mirroring diff --git a/chromium/components/mirroring/service/interface.h b/chromium/components/mirroring/service/interface.h index 3feac5c3edb..222b342e241 100644 --- a/chromium/components/mirroring/service/interface.h +++ b/chromium/components/mirroring/service/interface.h @@ -7,7 +7,11 @@ #include <string> +#include "media/base/audio_parameters.h" #include "media/capture/mojom/video_capture.mojom.h" +#include "media/mojo/interfaces/audio_data_pipe.mojom.h" +#include "media/mojo/interfaces/audio_input_stream.mojom.h" +#include "media/mojo/interfaces/remoting.mojom.h" #include "net/base/ip_address.h" #include "services/network/public/mojom/network_service.mojom.h" @@ -78,15 +82,37 @@ class SessionObserver { virtual void DidStop() = 0; }; +class AudioStreamCreatorClient { + public: + virtual ~AudioStreamCreatorClient() {} + + // Called by ResourceProvider when an audio input stream is created as + // requested. + virtual void StreamCreated( + media::mojom::AudioInputStreamPtr stream, + media::mojom::AudioInputStreamClientRequest client_request, + media::mojom::ReadOnlyAudioDataPipePtr data_pipe, + bool initially_muted) = 0; +}; + class ResourceProvider { public: virtual ~ResourceProvider() {} virtual void GetVideoCaptureHost( media::mojom::VideoCaptureHostRequest request) = 0; + virtual void GetNetworkContext( network::mojom::NetworkContextRequest request) = 0; - // TODO(xjz): Add interface to get AudioCaptureHost. + + virtual void CreateAudioStream(AudioStreamCreatorClient* client, + const media::AudioParameters& params, + uint32_t total_segments) = 0; + + virtual void ConnectToRemotingSource( + media::mojom::RemoterPtr remoter, + media::mojom::RemotingSourceRequest request) = 0; + // TODO(xjz): Add interface for HW encoder profiles query and VEA create // support. }; diff --git a/chromium/components/mirroring/service/media_remoter.cc b/chromium/components/mirroring/service/media_remoter.cc new file mode 100644 index 00000000000..c1c39fd02cf --- /dev/null +++ b/chromium/components/mirroring/service/media_remoter.cc @@ -0,0 +1,195 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/media_remoter.h" + +#include "base/base64.h" +#include "base/callback.h" +#include "base/json/json_writer.h" +#include "base/logging.h" +#include "base/strings/string_piece.h" +#include "base/values.h" +#include "components/mirroring/service/interface.h" +#include "components/mirroring/service/message_dispatcher.h" +#include "components/mirroring/service/remoting_sender.h" +#include "media/cast/net/cast_transport.h" + +using media::cast::Codec; +using media::cast::FrameSenderConfig; + +namespace mirroring { + +MediaRemoter::MediaRemoter( + Client* client, + const media::mojom::RemotingSinkMetadata& sink_metadata, + MessageDispatcher* message_dispatcher) + : client_(client), + sink_metadata_(sink_metadata), + message_dispatcher_(message_dispatcher), + binding_(this), + cast_environment_(nullptr), + transport_(nullptr), + state_(MIRRORING), + weak_factory_(this) { + DCHECK(client_); + DCHECK(message_dispatcher_); + + media::mojom::RemoterPtr remoter; + binding_.Bind(mojo::MakeRequest(&remoter)); + client_->ConnectToRemotingSource(std::move(remoter), + mojo::MakeRequest(&remoting_source_)); + remoting_source_->OnSinkAvailable(sink_metadata_.Clone()); +} + +MediaRemoter::~MediaRemoter() { + // Stop this remoting session if mirroring is stopped during a remoting + // session. For example, user stops mirroring through the cast dialog or + // closes the tab. + Stop(media::mojom::RemotingStopReason::ROUTE_TERMINATED); +} + +void MediaRemoter::OnMessageFromSink(const ReceiverResponse& response) { + DCHECK_EQ(ResponseType::RPC, response.type); + remoting_source_->OnMessageFromSink( + std::vector<uint8_t>(response.rpc.begin(), response.rpc.end())); +} + +void MediaRemoter::StartRpcMessaging( + scoped_refptr<media::cast::CastEnvironment> cast_environment, + media::cast::CastTransport* transport, + const FrameSenderConfig& audio_config, + const FrameSenderConfig& video_config) { + DCHECK(!cast_environment_); + DCHECK(!transport_); + DCHECK_EQ(Codec::CODEC_UNKNOWN, audio_config_.codec); + DCHECK_EQ(Codec::CODEC_UNKNOWN, video_config_.codec); + DCHECK(audio_config.codec == Codec::CODEC_AUDIO_REMOTE || + video_config.codec == Codec::CODEC_VIDEO_REMOTE); + + if (state_ != STARTING_REMOTING) + return; // Start operation was canceled. + // A remoting streaming session started. Start RPC message transport and + // notify the remoting source to start data streaming. + cast_environment_ = std::move(cast_environment); + transport_ = transport; + audio_config_ = audio_config; + video_config_ = video_config; + message_dispatcher_->Subscribe( + ResponseType::RPC, base::BindRepeating(&MediaRemoter::OnMessageFromSink, + weak_factory_.GetWeakPtr())); + state_ = REMOTING_STARTED; + remoting_source_->OnStarted(); +} + +void MediaRemoter::OnMirroringResumed() { + if (state_ == REMOTING_DISABLED) + return; + DCHECK_EQ(STOPPING_REMOTING, state_); + state_ = MIRRORING; + // Notify the remoting source to enable starting media remoting again. + remoting_source_->OnSinkAvailable(sink_metadata_.Clone()); +} + +void MediaRemoter::OnRemotingFailed() { + DCHECK(state_ == STARTING_REMOTING || state_ == REMOTING_STARTED); + if (state_ == STARTING_REMOTING) { + // TODO(xjz): Rename SERVICE_NOT_CONNECTED to INVALID_ANSWER_MESSAGE. + remoting_source_->OnStartFailed( + media::mojom::RemotingStartFailReason::SERVICE_NOT_CONNECTED); + } + state_ = REMOTING_DISABLED; + remoting_source_->OnSinkGone(); + // Fallback to mirroring. + client_->RestartMirroringStreaming(); +} + +void MediaRemoter::Stop(media::mojom::RemotingStopReason reason) { + if (state_ != STARTING_REMOTING && state_ != REMOTING_STARTED) + return; + if (state_ == REMOTING_STARTED) { + message_dispatcher_->Unsubscribe(ResponseType::RPC); + audio_sender_.reset(); + video_sender_.reset(); + cast_environment_ = nullptr; + transport_ = nullptr; + audio_config_ = FrameSenderConfig(); + video_config_ = FrameSenderConfig(); + } + state_ = STOPPING_REMOTING; + remoting_source_->OnStopped(reason); + // Prevent the start of remoting until switching completes. + remoting_source_->OnSinkGone(); + // Switch to mirroring. + client_->RestartMirroringStreaming(); +} + +void MediaRemoter::Start() { + if (state_ != MIRRORING) { + VLOG(2) << "Warning: Ignore start request. state=" << state_; + return; + } + state_ = STARTING_REMOTING; + client_->RequestRemotingStreaming(); +} + +void MediaRemoter::StartDataStreams( + mojo::ScopedDataPipeConsumerHandle audio_pipe, + mojo::ScopedDataPipeConsumerHandle video_pipe, + media::mojom::RemotingDataStreamSenderRequest audio_sender_request, + media::mojom::RemotingDataStreamSenderRequest video_sender_request) { + if (state_ != REMOTING_STARTED) + return; // Stop() was called before. + DCHECK(cast_environment_); + DCHECK(transport_); + if (audio_pipe.is_valid() && + audio_config_.codec == Codec::CODEC_AUDIO_REMOTE) { + audio_sender_ = std::make_unique<RemotingSender>( + cast_environment_, transport_, audio_config_, std::move(audio_pipe), + std::move(audio_sender_request), + base::BindOnce(&MediaRemoter::OnRemotingDataStreamError, + base::Unretained(this))); + } + if (video_pipe.is_valid() && + video_config_.codec == Codec::CODEC_VIDEO_REMOTE) { + video_sender_ = std::make_unique<RemotingSender>( + cast_environment_, transport_, video_config_, std::move(video_pipe), + std::move(video_sender_request), + base::BindOnce(&MediaRemoter::OnRemotingDataStreamError, + base::Unretained(this))); + } +} + +void MediaRemoter::SendMessageToSink(const std::vector<uint8_t>& message) { + if (state_ != REMOTING_STARTED) + return; + std::string encoded_rpc; + base::Base64Encode( + base::StringPiece(reinterpret_cast<const char*>(message.data()), + message.size()), + &encoded_rpc); + base::Value rpc(base::Value::Type::DICTIONARY); + rpc.SetKey("type", base::Value("RPC")); + rpc.SetKey("rpc", base::Value(std::move(encoded_rpc))); + CastMessage rpc_message; + rpc_message.message_namespace = kRemotingNamespace; + const bool did_serialize_rpc = + base::JSONWriter::Write(rpc, &rpc_message.json_format_data); + DCHECK(did_serialize_rpc); + message_dispatcher_->SendOutboundMessage(rpc_message); +} + +void MediaRemoter::EstimateTransmissionCapacity( + media::mojom::Remoter::EstimateTransmissionCapacityCallback callback) { + NOTIMPLEMENTED(); + std::move(callback).Run(0); +} + +void MediaRemoter::OnRemotingDataStreamError() { + if (state_ != REMOTING_STARTED) + return; + state_ = REMOTING_DISABLED; + Stop(media::mojom::RemotingStopReason::DATA_SEND_FAILED); +} + +} // namespace mirroring diff --git a/chromium/components/mirroring/service/media_remoter.h b/chromium/components/mirroring/service/media_remoter.h new file mode 100644 index 00000000000..edaf8e3cff7 --- /dev/null +++ b/chromium/components/mirroring/service/media_remoter.h @@ -0,0 +1,147 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_MIRRORING_SERVICE_MEDIA_REMOTER_H_ +#define COMPONENTS_MIRRORING_SERVICE_MEDIA_REMOTER_H_ + +#include "base/macros.h" +#include "media/cast/cast_config.h" +#include "media/mojo/interfaces/remoting.mojom.h" +#include "mojo/public/cpp/bindings/binding.h" + +namespace media { +namespace cast { +class CastEnvironment; +class CastTransport; +} // namespace cast +} // namespace media + +namespace mirroring { + +class MessageDispatcher; +struct ReceiverResponse; +class RemotingSender; + +// MediaRemoter remotes media content directly to a Cast Receiver. When +// MediaRemoter is started, it connects itself with a source tab in browser +// through the Mirroring Service mojo interface and allows the browser to access +// this MediaRemoter to start/stop individual remoting sessions, which are +// caused by user actions (i.e., when they somehow indicate a desire to +// enter/leave an immersive video-watching mode). +// +// When a remoting session is started, MediaRemoter will first request that tab +// mirroring be switched into content remoting mode. If granted, it will notify +// the browser that this has succeeded. At this point, two-way RPC binary +// messaging begins, and the MediaRemoter simply forwards messages between the +// browser and the Cast Receiver. The audio/video data streams are delivered +// from the media renderer to the Mirroring Service through mojo datapipes, and +// are then sent out to Cast Receiver through Cast Streaming. +class MediaRemoter final : public media::mojom::Remoter { + public: + class Client { + public: + virtual ~Client() {} + + // Connects the |remoter| with a source tab. + virtual void ConnectToRemotingSource( + media::mojom::RemoterPtr remoter, + media::mojom::RemotingSourceRequest source_request) = 0; + + // Requests to start remoting. StartRpcMessaging() / OnRemotingStartFailed() + // will be called when starting succeeds / fails. + virtual void RequestRemotingStreaming() = 0; + + // Requests to resume mirroring. + virtual void RestartMirroringStreaming() = 0; + }; + + MediaRemoter(Client* client, + const media::mojom::RemotingSinkMetadata& sink_metadata, + MessageDispatcher* message_dispatcher); + + ~MediaRemoter() override; + + // Callback from |message_dispatcher_| for received RPC messages. + void OnMessageFromSink(const ReceiverResponse& response); + + // Called when OFFER/ANSWER exchange for a remoting session succeeds. + void StartRpcMessaging( + scoped_refptr<media::cast::CastEnvironment> cast_environment, + media::cast::CastTransport* transport, + const media::cast::FrameSenderConfig& audio_config, + const media::cast::FrameSenderConfig& video_config); + + // Called when a mirroring session is successfully resumed. + void OnMirroringResumed(); + + // Error occurred either during the start of remoting or in the middle of + // remoting. In either case, this call fallbacks to mirroring, and prevents + // further starting of media remoting during this mirroring session. + void OnRemotingFailed(); + + // media::mojom::Remoter implememtation. Stops the current remoting session. + // This could be called either by the RemotingSource or the Session. + void Stop(media::mojom::RemotingStopReason reason) override; + + private: + // media::mojom::Remoter implememtation. + void Start() override; + void StartDataStreams( + mojo::ScopedDataPipeConsumerHandle audio_pipe, + mojo::ScopedDataPipeConsumerHandle video_pipe, + media::mojom::RemotingDataStreamSenderRequest audio_sender_request, + media::mojom::RemotingDataStreamSenderRequest video_sender_request) + override; + void SendMessageToSink(const std::vector<uint8_t>& message) override; + void EstimateTransmissionCapacity( + media::mojom::Remoter::EstimateTransmissionCapacityCallback callback) + override; + + // Called by RemotingSender when error occurred. Will stop this remoting + // session and fallback to mirroring. + void OnRemotingDataStreamError(); + + Client* const client_; // Outlives this class. + const media::mojom::RemotingSinkMetadata sink_metadata_; + MessageDispatcher* const message_dispatcher_; // Outlives this class. + mojo::Binding<media::mojom::Remoter> binding_; + media::mojom::RemotingSourcePtr remoting_source_; + scoped_refptr<media::cast::CastEnvironment> cast_environment_; + std::unique_ptr<RemotingSender> audio_sender_; + std::unique_ptr<RemotingSender> video_sender_; + media::cast::CastTransport* transport_; // Outlives this class; + media::cast::FrameSenderConfig audio_config_; + media::cast::FrameSenderConfig video_config_; + + // State transition diagram: + // + // .-----------> MIRRORING + // | | + // | V + // | STARTING_REMOTING + // | | + // | V + // | .-----------------------------. + // | | | | + // | | V V + // | | REMOTING_STARTED ----> REMOTING_DISABLED + // | | | + // | V V + // .--STOPPING_REMOTING + enum { + MIRRORING, // In mirroring. + STARTING_REMOTING, // Starting a remoting session. + REMOTING_STARTED, // Remoting started successfully. + REMOTING_DISABLED, // Remoting was disabled (because of error). + STOPPING_REMOTING, // Stopping the remoting session. + } state_; + + base::WeakPtrFactory<MediaRemoter> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(MediaRemoter); +}; + +} // namespace mirroring + +#endif // COMPONENTS_MIRRORING_SERVICE_MEDIA_REMOTER_H_ diff --git a/chromium/components/mirroring/service/media_remoter_unittest.cc b/chromium/components/mirroring/service/media_remoter_unittest.cc new file mode 100644 index 00000000000..18776e5ec77 --- /dev/null +++ b/chromium/components/mirroring/service/media_remoter_unittest.cc @@ -0,0 +1,220 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/media_remoter.h" + +#include "base/macros.h" +#include "base/run_loop.h" +#include "base/test/mock_callback.h" +#include "base/test/scoped_task_environment.h" +#include "base/time/default_tick_clock.h" +#include "components/mirroring/service/message_dispatcher.h" +#include "components/mirroring/service/mirror_settings.h" +#include "media/cast/cast_environment.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using ::testing::InvokeWithoutArgs; +using ::testing::_; +using ::testing::Mock; +using media::mojom::RemotingSinkMetadata; +using media::mojom::RemotingStopReason; +using media::cast::RtpPayloadType; +using media::cast::Codec; + +namespace mirroring { + +namespace { + +class MockRemotingSource final : public media::mojom::RemotingSource { + public: + MockRemotingSource() : binding_(this) {} + ~MockRemotingSource() override {} + + void Bind(media::mojom::RemotingSourceRequest request) { + binding_.Bind(std::move(request)); + } + + MOCK_METHOD0(OnSinkGone, void()); + MOCK_METHOD0(OnStarted, void()); + MOCK_METHOD1(OnStartFailed, void(media::mojom::RemotingStartFailReason)); + MOCK_METHOD1(OnMessageFromSink, void(const std::vector<uint8_t>&)); + MOCK_METHOD1(OnStopped, void(RemotingStopReason)); + MOCK_METHOD1(OnSinkAvailable, void(const RemotingSinkMetadata&)); + void OnSinkAvailable( + media::mojom::RemotingSinkMetadataPtr metadata) override { + OnSinkAvailable(*metadata); + } + + private: + mojo::Binding<media::mojom::RemotingSource> binding_; +}; + +RemotingSinkMetadata DefaultSinkMetadata() { + RemotingSinkMetadata metadata; + metadata.features.push_back(media::mojom::RemotingSinkFeature::RENDERING); + metadata.video_capabilities.push_back( + media::mojom::RemotingSinkVideoCapability::CODEC_VP8); + metadata.audio_capabilities.push_back( + media::mojom::RemotingSinkAudioCapability::CODEC_BASELINE_SET); + metadata.friendly_name = "Test"; + return metadata; +} + +} // namespace + +class MediaRemoterTest : public CastMessageChannel, + public MediaRemoter::Client, + public ::testing::Test { + public: + MediaRemoterTest() + : message_dispatcher_(this, error_callback_.Get()), + sink_metadata_(DefaultSinkMetadata()) {} + ~MediaRemoterTest() override { scoped_task_environment_.RunUntilIdle(); } + + protected: + MOCK_METHOD1(Send, void(const CastMessage&)); + MOCK_METHOD0(OnConnectToRemotingSource, void()); + MOCK_METHOD0(RequestRemotingStreaming, void()); + MOCK_METHOD0(RestartMirroringStreaming, void()); + + // MediaRemoter::Client implementation. + void ConnectToRemotingSource( + media::mojom::RemoterPtr remoter, + media::mojom::RemotingSourceRequest source_request) override { + remoter_ = std::move(remoter); + remoting_source_.Bind(std::move(source_request)); + OnConnectToRemotingSource(); + } + + void CreateRemoter() { + EXPECT_FALSE(media_remoter_); + EXPECT_CALL(*this, OnConnectToRemotingSource()).Times(1); + EXPECT_CALL(remoting_source_, OnSinkAvailable(_)).Times(1); + media_remoter_ = std::make_unique<MediaRemoter>(this, sink_metadata_, + &message_dispatcher_); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); + } + + // Requests to start a remoting session. + void StartRemoting() { + ASSERT_TRUE(remoter_); + EXPECT_CALL(*this, RequestRemotingStreaming()).Times(1); + remoter_->Start(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + } + + // Stops the current remoting session. + void StopRemoting() { + ASSERT_TRUE(remoter_); + EXPECT_CALL(remoting_source_, OnStopped(RemotingStopReason::USER_DISABLED)) + .Times(1); + EXPECT_CALL(remoting_source_, OnSinkGone()).Times(1); + EXPECT_CALL(*this, RestartMirroringStreaming()).Times(1); + remoter_->Stop(media::mojom::RemotingStopReason::USER_DISABLED); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); + } + + // Signals that a remoting streaming session starts successfully. + void RemotingStreamingStarted() { + ASSERT_TRUE(media_remoter_); + scoped_refptr<media::cast::CastEnvironment> cast_environment = + new media::cast::CastEnvironment( + base::DefaultTickClock::GetInstance(), + scoped_task_environment_.GetMainThreadTaskRunner(), + scoped_task_environment_.GetMainThreadTaskRunner(), + scoped_task_environment_.GetMainThreadTaskRunner()); + EXPECT_CALL(remoting_source_, OnStarted()).Times(1); + media_remoter_->StartRpcMessaging( + cast_environment, nullptr, media::cast::FrameSenderConfig(), + MirrorSettings::GetDefaultVideoConfig(RtpPayloadType::REMOTE_VIDEO, + Codec::CODEC_VIDEO_REMOTE)); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(&remoting_source_); + } + + // Signals that mirroring is resumed successfully. + void MirroringResumed() { + EXPECT_CALL(remoting_source_, OnSinkAvailable(_)).Times(1); + media_remoter_->OnMirroringResumed(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(&remoting_source_); + } + + // Signals that remoting session failed to start. + void RemotingStartFailed() { + ASSERT_TRUE(media_remoter_); + EXPECT_CALL(remoting_source_, OnStartFailed(_)).Times(1); + EXPECT_CALL(remoting_source_, OnSinkGone()).Times(1); + EXPECT_CALL(*this, RestartMirroringStreaming()).Times(1); + media_remoter_->OnRemotingFailed(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); + } + + private: + base::test::ScopedTaskEnvironment scoped_task_environment_; + base::MockCallback<MessageDispatcher::ErrorCallback> error_callback_; + MessageDispatcher message_dispatcher_; + const media::mojom::RemotingSinkMetadata sink_metadata_; + MockRemotingSource remoting_source_; + media::mojom::RemoterPtr remoter_; + std::unique_ptr<MediaRemoter> media_remoter_; + + DISALLOW_COPY_AND_ASSIGN(MediaRemoterTest); +}; + +TEST_F(MediaRemoterTest, StartAndStopRemoting) { + CreateRemoter(); + StartRemoting(); + RemotingStreamingStarted(); + StopRemoting(); +} + +TEST_F(MediaRemoterTest, StopRemotingWhileStarting) { + CreateRemoter(); + // Starts a remoting session. + StartRemoting(); + // Immediately stops the remoting session while not started yet. + StopRemoting(); + + // Signals that successfully switch to mirroring. + MirroringResumed(); + // Now remoting can be started again. + StartRemoting(); +} + +TEST_F(MediaRemoterTest, RemotingStartFailed) { + CreateRemoter(); + StartRemoting(); + RemotingStartFailed(); +} + +TEST_F(MediaRemoterTest, SwitchBetweenMultipleSessions) { + CreateRemoter(); + + // Start a remoting session. + StartRemoting(); + RemotingStreamingStarted(); + + // Stop the remoting session and switch to mirroring. + StopRemoting(); + MirroringResumed(); + + // Switch to remoting again. + StartRemoting(); + RemotingStreamingStarted(); + + // Switch to mirroring again. + StopRemoting(); + MirroringResumed(); +} + +} // namespace mirroring diff --git a/chromium/components/mirroring/service/message_dispatcher.cc b/chromium/components/mirroring/service/message_dispatcher.cc index aa5ad4a4069..95cf380aa2e 100644 --- a/chromium/components/mirroring/service/message_dispatcher.cc +++ b/chromium/components/mirroring/service/message_dispatcher.cc @@ -17,10 +17,7 @@ class MessageDispatcher::RequestHolder { public: RequestHolder() {} - ~RequestHolder() { - if (!response_callback_.is_null()) - std::move(response_callback_).Run(ReceiverResponse()); - } + ~RequestHolder() {} void Start(const base::TimeDelta& timeout, int32_t sequence_number, @@ -134,6 +131,8 @@ void MessageDispatcher::RequestReply(const CastMessage& message, OnceResponseCallback callback) { DCHECK(!callback.is_null()); DCHECK(timeout > base::TimeDelta()); + + Unsubscribe(response_type); // Cancel the old request if there is any. RequestHolder* const request_holder = new RequestHolder(); request_holder->Start( timeout, sequence_number, diff --git a/chromium/components/mirroring/service/message_dispatcher.h b/chromium/components/mirroring/service/message_dispatcher.h index 7b2ee9bfb62..7fbd3c5ceb9 100644 --- a/chromium/components/mirroring/service/message_dispatcher.h +++ b/chromium/components/mirroring/service/message_dispatcher.h @@ -38,6 +38,8 @@ class MessageDispatcher final : public CastMessageChannel { // delivered to the supplied callback if the sequence number of the response // matches |sequence_number|. If the timeout period elapses, the callback will // be run once with an unknown type of |response|. + // Note: Calling RequestReply() before a previous reply was made will cancel + // the previous request and not run its response callback. void RequestReply(const CastMessage& message, ResponseType response_type, int32_t sequence_number, diff --git a/chromium/components/mirroring/service/message_dispatcher_unittest.cc b/chromium/components/mirroring/service/message_dispatcher_unittest.cc index 32cb7328ecc..184227def27 100644 --- a/chromium/components/mirroring/service/message_dispatcher_unittest.cc +++ b/chromium/components/mirroring/service/message_dispatcher_unittest.cc @@ -286,14 +286,11 @@ TEST_F(MessageDispatcherTest, RequestReply) { EXPECT_FALSE(last_answer_response_); EXPECT_FALSE(last_status_response_); - // Destroy the dispatcher. Expect to receive an unknown type response. + // Destroy the dispatcher. message_dispatcher_.reset(); scoped_task_environment_.RunUntilIdle(); - ASSERT_TRUE(last_answer_response_); + ASSERT_FALSE(last_answer_response_); EXPECT_FALSE(last_status_response_); - EXPECT_TRUE(last_error_message_.empty()); - EXPECT_EQ(ResponseType::UNKNOWN, last_answer_response_->type); - EXPECT_EQ(-1, last_answer_response_->sequence_number); } } // namespace mirroring diff --git a/chromium/components/mirroring/service/mirror_settings.cc b/chromium/components/mirroring/service/mirror_settings.cc index a14799757a4..3c51c1e6532 100644 --- a/chromium/components/mirroring/service/mirror_settings.cc +++ b/chromium/components/mirroring/service/mirror_settings.cc @@ -6,6 +6,8 @@ #include <algorithm> +#include "media/base/audio_parameters.h" + using media::cast::FrameSenderConfig; using media::cast::Codec; using media::cast::RtpPayloadType; @@ -118,6 +120,14 @@ media::VideoCaptureParams MirrorSettings::GetVideoCaptureParams() { return params; } +media::AudioParameters MirrorSettings::GetAudioCaptureParams() { + media::AudioParameters params(media::AudioParameters::AUDIO_PCM_LOW_LATENCY, + media::CHANNEL_LAYOUT_STEREO, kAudioTimebase, + kAudioTimebase / 100); + DCHECK(params.IsValid()); + return params; +} + base::Value MirrorSettings::ToDictionaryValue() { base::Value settings(base::Value::Type::DICTIONARY); settings.SetKey("maxWidth", base::Value(max_width_)); diff --git a/chromium/components/mirroring/service/mirror_settings.h b/chromium/components/mirroring/service/mirror_settings.h index 937602d3e36..ac2ee3c528d 100644 --- a/chromium/components/mirroring/service/mirror_settings.h +++ b/chromium/components/mirroring/service/mirror_settings.h @@ -10,6 +10,10 @@ #include "media/capture/video_capture_types.h" #include "media/cast/cast_config.h" +namespace media { +class AudioParameters; +} // namespace media + namespace mirroring { // Holds the default settings for a mirroring session. This class provides the @@ -37,6 +41,9 @@ class MirrorSettings { // Get video capture constraints with the current settings. media::VideoCaptureParams GetVideoCaptureParams(); + // Get Audio capture constraints with the current settings. + media::AudioParameters GetAudioCaptureParams(); + int max_width() const { return max_width_; } int max_height() const { return max_height_; } diff --git a/chromium/components/mirroring/service/remoting_sender.cc b/chromium/components/mirroring/service/remoting_sender.cc new file mode 100644 index 00000000000..11b21e4ec26 --- /dev/null +++ b/chromium/components/mirroring/service/remoting_sender.cc @@ -0,0 +1,216 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/remoting_sender.h" + +#include <algorithm> + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/logging.h" +#include "base/time/default_tick_clock.h" +#include "media/cast/constants.h" +#include "media/cast/sender/sender_encoded_frame.h" +#include "media/mojo/common/mojo_data_pipe_read_write.h" + +namespace mirroring { + +RemotingSender::RemotingSender( + scoped_refptr<media::cast::CastEnvironment> cast_environment, + media::cast::CastTransport* transport, + const media::cast::FrameSenderConfig& config, + mojo::ScopedDataPipeConsumerHandle pipe, + media::mojom::RemotingDataStreamSenderRequest request, + base::OnceClosure error_callback) + : FrameSender(cast_environment, + transport, + config, + media::cast::NewFixedCongestionControl(config.max_bitrate)), + clock_(cast_environment->Clock()), + error_callback_(std::move(error_callback)), + data_pipe_reader_(new media::MojoDataPipeReader(std::move(pipe))), + binding_(this, std::move(request)), + input_queue_discards_remaining_(0), + is_reading_(false), + flow_restart_pending_(true), + weak_factory_(this) { + binding_.set_connection_error_handler(base::BindOnce( + &RemotingSender::OnRemotingDataStreamError, base::Unretained(this))); +} + +RemotingSender::~RemotingSender() {} + +void RemotingSender::SendFrame(uint32_t frame_size) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + const bool need_to_start_processing = input_queue_.empty(); + input_queue_.push(base::BindRepeating(&RemotingSender::ReadFrame, + base::Unretained(this), frame_size)); + input_queue_.push(base::BindRepeating(&RemotingSender::TrySendFrame, + base::Unretained(this))); + if (need_to_start_processing) + ProcessNextInputTask(); +} + +void RemotingSender::CancelInFlightData() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + +// TODO(miu): The following code is something we want to do as an +// optimization. However, as-is, it's not quite correct. We can only cancel +// frames where no packets have actually hit the network yet. Said another +// way, we can only cancel frames the receiver has definitely not seen any +// part of (including kickstarting!). http://crbug.com/647423 +#if 0 + if (latest_acked_frame_id_ < last_sent_frame_id_) { + std::vector<media::cast::FrameId> frames_to_cancel; + do { + ++latest_acked_frame_id_; + frames_to_cancel.push_back(latest_acked_frame_id_); + } while (latest_acked_frame_id_ < last_sent_frame_id_); + transport_->CancelSendingFrames(ssrc_, frames_to_cancel); + } +#endif + + // Flag that all pending input operations should discard data. + input_queue_discards_remaining_ = input_queue_.size(); + + flow_restart_pending_ = true; + VLOG(1) << "Now restarting because in-flight data was just canceled."; +} + +int RemotingSender::GetNumberOfFramesInEncoder() const { + NOTREACHED(); + return 0; +} + +base::TimeDelta RemotingSender::GetInFlightMediaDuration() const { + NOTREACHED(); + return base::TimeDelta(); +} + +void RemotingSender::OnCancelSendingFrames() { + // One or more frames were canceled. This may allow pending input operations + // to complete. + ProcessNextInputTask(); +} + +void RemotingSender::ProcessNextInputTask() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + if (input_queue_.empty() || is_reading_) + return; + + input_queue_.front().Run(); +} + +void RemotingSender::ReadFrame(uint32_t size) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(!is_reading_); + if (!data_pipe_reader_->IsPipeValid()) { + VLOG(1) << "Data pipe handle no longer valid."; + OnRemotingDataStreamError(); + return; + } + + is_reading_ = true; + if (input_queue_discards_remaining_ > 0) { + data_pipe_reader_->Read( + nullptr, size, + base::BindOnce(&RemotingSender::OnFrameRead, base::Unretained(this))); + } else { + next_frame_data_.resize(size); + data_pipe_reader_->Read( + reinterpret_cast<uint8_t*>(base::data(next_frame_data_)), size, + base::BindOnce(&RemotingSender::OnFrameRead, base::Unretained(this))); + } +} + +void RemotingSender::TrySendFrame() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(!is_reading_); + if (input_queue_discards_remaining_ > 0) { + OnInputTaskComplete(); + return; + } + + // If there would be too many frames in-flight, do not proceed. + if (GetUnacknowledgedFrameCount() >= media::cast::kMaxUnackedFrames) { + VLOG(1) << "Cannot send frame now because too many frames are in flight."; + return; + } + + const bool is_first_frame_to_be_sent = last_send_time_.is_null(); + const media::cast::FrameId frame_id = is_first_frame_to_be_sent + ? media::cast::FrameId::first() + : (last_sent_frame_id_ + 1); + + base::TimeTicks last_frame_reference_time = last_send_time_; + auto remoting_frame = std::make_unique<media::cast::SenderEncodedFrame>(); + remoting_frame->frame_id = frame_id; + if (flow_restart_pending_) { + remoting_frame->dependency = media::cast::EncodedFrame::KEY; + flow_restart_pending_ = false; + } else { + DCHECK(!is_first_frame_to_be_sent); + remoting_frame->dependency = media::cast::EncodedFrame::DEPENDENT; + } + remoting_frame->referenced_frame_id = + remoting_frame->dependency == media::cast::EncodedFrame::KEY + ? frame_id + : frame_id - 1; + remoting_frame->reference_time = clock_->NowTicks(); + remoting_frame->encode_completion_time = remoting_frame->reference_time; + media::cast::RtpTimeTicks last_frame_rtp_timestamp; + if (is_first_frame_to_be_sent) { + last_frame_reference_time = remoting_frame->reference_time; + last_frame_rtp_timestamp = + media::cast::RtpTimeTicks() - media::cast::RtpTimeDelta::FromTicks(1); + } else { + last_frame_rtp_timestamp = GetRecordedRtpTimestamp(frame_id - 1); + } + // Ensure each successive frame's RTP timestamp is unique, but otherwise just + // base it on the reference time. + remoting_frame->rtp_timestamp = + last_frame_rtp_timestamp + + std::max(media::cast::RtpTimeDelta::FromTicks(1), + media::cast::RtpTimeDelta::FromTimeDelta( + remoting_frame->reference_time - last_frame_reference_time, + media::cast::kRemotingRtpTimebase)); + remoting_frame->data.swap(next_frame_data_); + + SendEncodedFrame(0, std::move(remoting_frame)); + + OnInputTaskComplete(); +} + +void RemotingSender::OnFrameRead(bool success) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(is_reading_); + is_reading_ = false; + if (!success) { + OnRemotingDataStreamError(); + return; + } + OnInputTaskComplete(); +} + +void RemotingSender::OnInputTaskComplete() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(!input_queue_.empty()); + input_queue_.pop(); + if (input_queue_discards_remaining_ > 0) + --input_queue_discards_remaining_; + + // Always force a post task to prevent the stack from growing too deep. + base::ThreadTaskRunnerHandle::Get()->PostTask( + FROM_HERE, base::BindOnce(&RemotingSender::ProcessNextInputTask, + weak_factory_.GetWeakPtr())); +} + +void RemotingSender::OnRemotingDataStreamError() { + data_pipe_reader_.reset(); + binding_.Close(); + if (!error_callback_.is_null()) + std::move(error_callback_).Run(); +} + +} // namespace mirroring diff --git a/chromium/components/mirroring/service/remoting_sender.h b/chromium/components/mirroring/service/remoting_sender.h new file mode 100644 index 00000000000..d36d005ab56 --- /dev/null +++ b/chromium/components/mirroring/service/remoting_sender.h @@ -0,0 +1,120 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_MIRRORING_SERVICE_REMOTING_SENDER_H_ +#define COMPONENTS_MIRRORING_SERVICE_REMOTING_SENDER_H_ + +#include <memory> + +#include "base/callback.h" +#include "base/containers/queue.h" +#include "base/macros.h" +#include "base/memory/weak_ptr.h" +#include "base/sequence_checker.h" +#include "media/cast/sender/frame_sender.h" +#include "media/mojo/interfaces/remoting.mojom.h" +#include "mojo/public/cpp/bindings/binding.h" + +namespace base { +class TickClock; +} // namespace base + +namespace media { +class MojoDataPipeReader; +} // namespace media + +namespace mirroring { + +// RTP sender for a single Cast Remoting RTP stream. The client calls Send() to +// instruct the sender to read from a Mojo data pipe and transmit the data using +// a CastTransport. +class RemotingSender final : public media::mojom::RemotingDataStreamSender, + public media::cast::FrameSender { + public: + // |transport| is expected to outlive this class. + RemotingSender(scoped_refptr<media::cast::CastEnvironment> cast_environment, + media::cast::CastTransport* transport, + const media::cast::FrameSenderConfig& config, + mojo::ScopedDataPipeConsumerHandle pipe, + media::mojom::RemotingDataStreamSenderRequest request, + base::OnceClosure error_callback); + ~RemotingSender() override; + + private: + // Friend class for unit tests. + friend class RemotingSenderTest; + + // media::mojom::RemotingDataStreamSender implementation. SendFrame() will + // push callbacks onto the back of the input queue, and these may or may not + // be processed at a later time. It depends on whether the data pipe has data + // available or the CastTransport can accept more frames. CancelInFlightData() + // is processed immediately, and will cause all pending operations to discard + // data when they are processed later. + void SendFrame(uint32_t frame_size) override; + void CancelInFlightData() override; + + // FrameSender override. + int GetNumberOfFramesInEncoder() const override; + base::TimeDelta GetInFlightMediaDuration() const override; + void OnCancelSendingFrames() override; + + // Attempt to run next pending input task, popping the head of the input queue + // as each task succeeds. + void ProcessNextInputTask(); + + // These are called via callbacks run from the input queue. + // Consumes a frame of |size| from the associated Mojo data pipe. + void ReadFrame(uint32_t size); + // Sends out the frame to the receiver over network. + void TrySendFrame(); + + // Called when a frame is completely read/discarded from the data pipe. + void OnFrameRead(bool success); + + // Called when an input task completes. + void OnInputTaskComplete(); + + void OnRemotingDataStreamError(); + + SEQUENCE_CHECKER(sequence_checker_); + + const base::TickClock* clock_; + + // Callback that is run to notify when a fatal error occurs. + base::OnceClosure error_callback_; + + std::unique_ptr<media::MojoDataPipeReader> data_pipe_reader_; + + // Mojo binding for this instance. Implementation at the other end of the + // message pipe uses the RemotingDataStreamSender interface to control when + // this RemotingSender consumes from |pipe_|. + mojo::Binding<media::mojom::RemotingDataStreamSender> binding_; + + // The next frame's payload data. Populated by call to OnFrameRead() when + // reading succeeded. + std::string next_frame_data_; + + // Queue of pending input operations. |input_queue_discards_remaining_| + // indicates the number of operations where data should be discarded (due to + // CancelInFlightData()). + base::queue<base::RepeatingClosure> input_queue_; + size_t input_queue_discards_remaining_; + + // Indicates whether the |data_pipe_reader_| is processing a reading request. + bool is_reading_; + + // Set to true if the first frame has not yet been sent, or if a + // CancelInFlightData() operation just completed. This causes TrySendFrame() + // to mark the next frame as the start of a new sequence. + bool flow_restart_pending_; + + // NOTE: Weak pointers must be invalidated before all other member variables. + base::WeakPtrFactory<RemotingSender> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(RemotingSender); +}; + +} // namespace mirroring + +#endif // COMPONENTS_MIRRORING_SERVICE_REMOTING_SENDER_H_ diff --git a/chromium/components/mirroring/service/remoting_sender_unittest.cc b/chromium/components/mirroring/service/remoting_sender_unittest.cc new file mode 100644 index 00000000000..0c61f9d9a10 --- /dev/null +++ b/chromium/components/mirroring/service/remoting_sender_unittest.cc @@ -0,0 +1,615 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/remoting_sender.h" + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/callback_helpers.h" +#include "base/compiler_specific.h" +#include "base/macros.h" +#include "base/run_loop.h" +#include "base/test/scoped_task_environment.h" +#include "base/time/default_tick_clock.h" +#include "media/cast/constants.h" +#include "media/cast/net/cast_transport.h" +#include "media/cast/test/utility/default_config.h" +#include "media/mojo/interfaces/remoting.mojom.h" +#include "mojo/public/cpp/system/data_pipe.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mirroring { + +namespace { + +// Data pipe capacity is 1KB. +constexpr int kDataPipeCapacity = 1024; + +// Implements the CastTransport interface to capture output from the +// RemotingSender. +class FakeTransport : public media::cast::CastTransport { + public: + FakeTransport() {} + ~FakeTransport() final {} + + void TakeSentFrames(std::vector<media::cast::EncodedFrame>* frames) { + frames->swap(sent_frames_); + sent_frames_.clear(); + } + + void TakeCanceledFrameIds(std::vector<media::cast::FrameId>* frame_ids) { + frame_ids->swap(canceled_frame_ids_); + canceled_frame_ids_.clear(); + } + + media::cast::FrameId WaitForKickstart() { + base::RunLoop run_loop; + kickstarted_callback_ = run_loop.QuitClosure(); + run_loop.Run(); + return kickstarted_frame_id_; + } + + protected: + void InsertFrame(uint32_t ssrc, + const media::cast::EncodedFrame& frame) final { + sent_frames_.push_back(frame); + } + + void CancelSendingFrames( + uint32_t ssrc, + const std::vector<media::cast::FrameId>& frame_ids) final { + for (media::cast::FrameId frame_id : frame_ids) + canceled_frame_ids_.push_back(frame_id); + } + + void ResendFrameForKickstart(uint32_t ssrc, + media::cast::FrameId frame_id) final { + kickstarted_frame_id_ = frame_id; + if (!kickstarted_callback_.is_null()) + base::ResetAndReturn(&kickstarted_callback_).Run(); + } + + // The rest of the interface is not used for these tests. + void SendSenderReport( + uint32_t ssrc, + base::TimeTicks current_time, + media::cast::RtpTimeTicks current_time_as_rtp_timestamp) final {} + void AddValidRtpReceiver(uint32_t rtp_sender_ssrc, + uint32_t rtp_receiver_ssrc) final {} + void InitializeRtpReceiverRtcpBuilder( + uint32_t rtp_receiver_ssrc, + const media::cast::RtcpTimeData& time_data) final {} + void AddCastFeedback(const media::cast::RtcpCastMessage& cast_message, + base::TimeDelta target_delay) final {} + void AddPli(const media::cast::RtcpPliMessage& pli_message) final {} + void AddRtcpEvents( + const media::cast::ReceiverRtcpEventSubscriber::RtcpEvents& e) final {} + void AddRtpReceiverReport(const media::cast::RtcpReportBlock& b) final {} + void SendRtcpFromRtpReceiver() final {} + void SetOptions(const base::DictionaryValue& options) final {} + + private: + std::vector<media::cast::EncodedFrame> sent_frames_; + std::vector<media::cast::FrameId> canceled_frame_ids_; + + base::RepeatingClosure kickstarted_callback_; + media::cast::FrameId kickstarted_frame_id_; + + DISALLOW_COPY_AND_ASSIGN(FakeTransport); +}; + +} // namespace + +class RemotingSenderTest : public ::testing::Test { + protected: + RemotingSenderTest() + : cast_environment_(new media::cast::CastEnvironment( + base::DefaultTickClock::GetInstance(), + scoped_task_environment_.GetMainThreadTaskRunner(), + scoped_task_environment_.GetMainThreadTaskRunner(), + scoped_task_environment_.GetMainThreadTaskRunner())), + expecting_error_callback_run_(false), + receiver_ssrc_(-1) { + const MojoCreateDataPipeOptions data_pipe_options{ + sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, + kDataPipeCapacity}; + mojo::ScopedDataPipeConsumerHandle consumer_end; + CHECK_EQ(MOJO_RESULT_OK, + mojo::CreateDataPipe(&data_pipe_options, &producer_end_, + &consumer_end)); + + media::cast::FrameSenderConfig video_config = + media::cast::GetDefaultVideoSenderConfig(); + video_config.rtp_payload_type = media::cast::RtpPayloadType::REMOTE_VIDEO; + video_config.codec = media::cast::CODEC_VIDEO_REMOTE; + receiver_ssrc_ = video_config.receiver_ssrc; + remoting_sender_ = std::make_unique<RemotingSender>( + cast_environment_, &transport_, video_config, std::move(consumer_end), + mojo::MakeRequest(&sender_), + base::BindOnce( + [](bool expecting_error_callback_run) { + CHECK(expecting_error_callback_run); + }, + expecting_error_callback_run_)); + + // Give CastRemotingSender a small RTT measurement to prevent kickstart + // testing from taking too long. + remoting_sender_->OnMeasuredRoundTripTime( + base::TimeDelta::FromMilliseconds(1)); + RunPendingTasks(); + } + + ~RemotingSenderTest() override {} + + void TearDown() final { + remoting_sender_.reset(); + // Allow any pending tasks to run before destruction. + RunPendingTasks(); + } + + // Allow pending tasks, such as Mojo method calls, to execute. + void RunPendingTasks() { scoped_task_environment_.RunUntilIdle(); } + + protected: + media::cast::FrameId latest_acked_frame_id() const { + return remoting_sender_->latest_acked_frame_id_; + } + + int NumberOfFramesInFlight() { + return remoting_sender_->GetUnacknowledgedFrameCount(); + } + + size_t GetSizeOfNextFrameData() { + return remoting_sender_->next_frame_data_.size(); + } + + bool IsFlowRestartPending() const { + return remoting_sender_->flow_restart_pending_; + } + + bool ProduceDataChunk(size_t offset, size_t size) WARN_UNUSED_RESULT { + std::vector<uint8_t> fake_chunk(size); + for (size_t i = 0; i < size; ++i) + fake_chunk[i] = static_cast<uint8_t>(offset + i); + uint32_t num_bytes = fake_chunk.size(); + return producer_end_->WriteData(fake_chunk.data(), &num_bytes, + MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) == + MOJO_RESULT_OK; + } + + void PostMojoCallTask_SendFrame(uint32_t size) { sender_->SendFrame(size); } + + void PostMojoCallTask_CancelInFlightData() { sender_->CancelInFlightData(); } + + void TakeSentFrames(std::vector<media::cast::EncodedFrame>* frames) { + transport_.TakeSentFrames(frames); + } + + bool ExpectOneFrameWasSent(size_t expected_payload_size) { + std::vector<media::cast::EncodedFrame> frames; + transport_.TakeSentFrames(&frames); + EXPECT_EQ(1u, frames.size()); + if (frames.empty()) + return false; + return ExpectCorrectFrameData(expected_payload_size, frames.front()); + } + + void AckUpToAndIncluding(media::cast::FrameId frame_id) { + media::cast::RtcpCastMessage cast_feedback(receiver_ssrc_); + cast_feedback.ack_frame_id = frame_id; + remoting_sender_->OnReceivedCastFeedback(cast_feedback); + } + + void AckOldestInFlightFrames(int count) { + AckUpToAndIncluding(latest_acked_frame_id() + count); + } + + // Blocks the caller indefinitely, until a kickstart frame is sent, and then + // returns the FrameId of the kickstarted-frame. + media::cast::FrameId WaitForKickstart() { + return transport_.WaitForKickstart(); + } + + bool ExpectNoFramesCanceled() { + std::vector<media::cast::FrameId> frame_ids; + transport_.TakeCanceledFrameIds(&frame_ids); + return frame_ids.empty(); + } + + bool ExpectFramesCanceled(media::cast::FrameId first_frame_id, + media::cast::FrameId last_frame_id) { + std::vector<media::cast::FrameId> frame_ids; + transport_.TakeCanceledFrameIds(&frame_ids); + auto begin = frame_ids.begin(); + auto end = frame_ids.end(); + for (auto fid = first_frame_id; fid <= last_frame_id; ++fid) { + auto new_end = std::remove(begin, end, fid); + if (new_end == end) + return false; + end = new_end; + } + return begin == end; + } + + static bool ExpectCorrectFrameData(size_t expected_payload_size, + const media::cast::EncodedFrame& frame) { + if (expected_payload_size != frame.data.size()) { + ADD_FAILURE() << "Expected frame data size != frame.data.size(): " + << expected_payload_size << " vs " << frame.data.size(); + return false; + } + for (size_t i = 0; i < expected_payload_size; ++i) { + if (static_cast<uint8_t>(frame.data[i]) != static_cast<uint8_t>(i)) { + ADD_FAILURE() << "Frame data byte mismatch at offset " << i; + return false; + } + } + return true; + } + + private: + base::test::ScopedTaskEnvironment scoped_task_environment_; + const scoped_refptr<media::cast::CastEnvironment> cast_environment_; + FakeTransport transport_; + std::unique_ptr<RemotingSender> remoting_sender_; + media::mojom::RemotingDataStreamSenderPtr sender_; + mojo::ScopedDataPipeProducerHandle producer_end_; + bool expecting_error_callback_run_; + uint32_t receiver_ssrc_; + + DISALLOW_COPY_AND_ASSIGN(RemotingSenderTest); +}; + +TEST_F(RemotingSenderTest, SendsFramesViaMojoInterface) { + // One 256-byte chunk pushed through the data pipe to make one frame. + ASSERT_TRUE(ProduceDataChunk(0, 256)); + PostMojoCallTask_SendFrame(256); + RunPendingTasks(); + EXPECT_TRUE(ExpectOneFrameWasSent(256)); + AckOldestInFlightFrames(1); + EXPECT_EQ(media::cast::FrameId::first(), latest_acked_frame_id()); + + // Four 256-byte chunks pushed through the data pipe to make one frame. + PostMojoCallTask_SendFrame(1024); + for (int i = 0; i < 4; ++i) { + ASSERT_TRUE(ProduceDataChunk(i * 256, 256)); + } + RunPendingTasks(); + EXPECT_TRUE(ExpectOneFrameWasSent(1024)); + AckOldestInFlightFrames(1); + EXPECT_EQ(media::cast::FrameId::first() + 1, latest_acked_frame_id()); + + // 10 differently-sized chunks pushed through the data pipe to make one frame + // that is larger than the data pipe's total capacity. + PostMojoCallTask_SendFrame(6665); + size_t offset = 0; + for (int i = 0; i < 10; ++i) { + const size_t chunk_size = 500 + i * 37; + ASSERT_TRUE(ProduceDataChunk(offset, chunk_size)); + RunPendingTasks(); + offset += chunk_size; + } + RunPendingTasks(); + EXPECT_TRUE(ExpectOneFrameWasSent(6665)); + AckOldestInFlightFrames(1); + EXPECT_EQ(media::cast::FrameId::first() + 2, latest_acked_frame_id()); +} + +TEST_F(RemotingSenderTest, SendsMultipleFramesWithDelayedAcks) { + // Send 4 frames. + for (int i = 0; i < 4; ++i) { + ASSERT_TRUE(ProduceDataChunk(0, 16)); + PostMojoCallTask_SendFrame(16); + } + RunPendingTasks(); + EXPECT_EQ(4, NumberOfFramesInFlight()); + EXPECT_TRUE(ExpectNoFramesCanceled()); + + // Ack one frame. + AckOldestInFlightFrames(1); + EXPECT_EQ(3, NumberOfFramesInFlight()); + EXPECT_TRUE(ExpectFramesCanceled(media::cast::FrameId::first(), + media::cast::FrameId::first())); + + // Ack all. + AckOldestInFlightFrames(3); + EXPECT_EQ(0, NumberOfFramesInFlight()); + EXPECT_TRUE(ExpectFramesCanceled(media::cast::FrameId::first() + 1, + media::cast::FrameId::first() + 3)); +} + +TEST_F(RemotingSenderTest, KickstartsIfAckNotTimely) { + // Send first frame and don't Ack it. Expect the first frame to be + // kickstarted. + ASSERT_TRUE(ProduceDataChunk(0, 16)); + PostMojoCallTask_SendFrame(16); + EXPECT_EQ(media::cast::FrameId::first(), WaitForKickstart()); + EXPECT_EQ(1, NumberOfFramesInFlight()); + + // Send 3 more frames and don't Ack them either. Expect the 4th frame to be + // kickstarted. + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(ProduceDataChunk(0, 16)); + PostMojoCallTask_SendFrame(16); + } + EXPECT_EQ(media::cast::FrameId::first() + 3, WaitForKickstart()); + EXPECT_EQ(4, NumberOfFramesInFlight()); + + // Ack the first two frames and wait for another kickstart (for the 4th frame + // again). + AckOldestInFlightFrames(2); + EXPECT_EQ(2, NumberOfFramesInFlight()); + EXPECT_EQ(media::cast::FrameId::first() + 3, WaitForKickstart()); +} + +TEST_F(RemotingSenderTest, CancelsUnsentFrame) { + PostMojoCallTask_SendFrame(16); + PostMojoCallTask_SendFrame(32); + PostMojoCallTask_CancelInFlightData(); + EXPECT_EQ(0u, GetSizeOfNextFrameData()); + ASSERT_TRUE(ProduceDataChunk(0, 16)); + RunPendingTasks(); + // The first frame's data should have been read from the data pipe. + EXPECT_EQ(16u, GetSizeOfNextFrameData()); + EXPECT_EQ(0, NumberOfFramesInFlight()); + ASSERT_TRUE(ProduceDataChunk(0, 32)); + RunPendingTasks(); + // The |next_frame_data_| was not updated because the second frame data was + // discarded from the data pipe. + EXPECT_EQ(16u, GetSizeOfNextFrameData()); + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Since no frames were sent, none should have been passed to the + // CastTransport, and none should have been canceled. + std::vector<media::cast::EncodedFrame> frames; + TakeSentFrames(&frames); + EXPECT_TRUE(frames.empty()); + EXPECT_TRUE(ExpectNoFramesCanceled()); +} + +// http://crbug.com/647423 +#define MAYBE_CancelsFramesInFlight DISABLED_CancelsFramesInFlight +TEST_F(RemotingSenderTest, MAYBE_CancelsFramesInFlight) { + EXPECT_TRUE(IsFlowRestartPending()); + + // Send 10 frames. + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(ProduceDataChunk(0, 16)); + PostMojoCallTask_SendFrame(16); + } + RunPendingTasks(); + EXPECT_FALSE(IsFlowRestartPending()); + EXPECT_EQ(10, NumberOfFramesInFlight()); + + // Ack the first frame. + AckOldestInFlightFrames(1); + EXPECT_FALSE(IsFlowRestartPending()); + EXPECT_EQ(9, NumberOfFramesInFlight()); + EXPECT_TRUE(ExpectFramesCanceled(media::cast::FrameId::first(), + media::cast::FrameId::first())); + + // Cancel all in-flight data. This should cause the remaining 9 frames to be + // canceled. + PostMojoCallTask_CancelInFlightData(); + RunPendingTasks(); + EXPECT_TRUE(IsFlowRestartPending()); + EXPECT_EQ(0, NumberOfFramesInFlight()); + EXPECT_TRUE(ExpectFramesCanceled(media::cast::FrameId::first() + 1, + media::cast::FrameId::first() + 9)); + + // Send one more frame and ack it. + ASSERT_TRUE(ProduceDataChunk(0, 16)); + PostMojoCallTask_SendFrame(16); + RunPendingTasks(); + EXPECT_FALSE(IsFlowRestartPending()); + EXPECT_EQ(1, NumberOfFramesInFlight()); + AckOldestInFlightFrames(1); + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Check that the dependency metadata was set correctly to indicate a frame + // that immediately follows a CancelInFlightData() operation. + std::vector<media::cast::EncodedFrame> frames; + TakeSentFrames(&frames); + ASSERT_EQ(11u, frames.size()); + for (size_t i = 0; i < 11; ++i) { + const media::cast::EncodedFrame& frame = frames[i]; + EXPECT_EQ(media::cast::FrameId::first() + i, frame.frame_id); + if (i == 0 || i == 10) + EXPECT_EQ(media::cast::EncodedFrame::KEY, frame.dependency); + else + EXPECT_EQ(media::cast::EncodedFrame::DEPENDENT, frame.dependency); + } +} + +TEST_F(RemotingSenderTest, WaitsForDataBeforeConsumingFromDataPipe) { + // Queue up and issue Mojo calls to consume three frames. Since no data has + // been pushed into the pipe yet no frames should be sent. + for (int i = 0; i < 3; ++i) { + PostMojoCallTask_SendFrame(4); + } + RunPendingTasks(); + EXPECT_TRUE(IsFlowRestartPending()); + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Push the data for one frame into the data pipe. This should trigger input + // processing and allow one frame to be sent. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + RunPendingTasks(); // Allow Mojo Watcher to signal CastRemotingSender. + EXPECT_FALSE(IsFlowRestartPending()); + EXPECT_EQ(1, NumberOfFramesInFlight()); + + // Now push the data for the other two frames into the data pipe and expect + // two more frames to be sent. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + ASSERT_TRUE(ProduceDataChunk(0, 4)); + RunPendingTasks(); // Allow Mojo Watcher to signal CastRemotingSender. + EXPECT_FALSE(IsFlowRestartPending()); + EXPECT_EQ(3, NumberOfFramesInFlight()); +} + +TEST_F(RemotingSenderTest, WaitsForDataThenDiscardsCanceledData) { + // Queue up and issue Mojo calls to consume data chunks and send three + // frames. Since no data has been pushed into the pipe yet no frames should be + // sent. + for (int i = 0; i < 3; ++i) { + PostMojoCallTask_SendFrame(4); + } + RunPendingTasks(); + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Cancel all in-flight data. + PostMojoCallTask_CancelInFlightData(); + RunPendingTasks(); + + // Now, push the data for one frame into the data pipe. Because of the + // cancellation, no frames should be sent. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + RunPendingTasks(); // Allow Mojo Watcher to signal CastRemotingSender. + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Now push the data for the other two frames into the data pipe and still no + // frames should be sent. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + ASSERT_TRUE(ProduceDataChunk(0, 4)); + RunPendingTasks(); // Allow Mojo Watcher to signal CastRemotingSender. + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Now issue calls to send another frame and then push the data for it into + // the data pipe. Expect to see the frame gets sent since it was provided + // after the CancelInFlightData(). + PostMojoCallTask_SendFrame(4); + RunPendingTasks(); + EXPECT_EQ(0, NumberOfFramesInFlight()); + ASSERT_TRUE(ProduceDataChunk(0, 4)); + RunPendingTasks(); // Allow Mojo Watcher to signal CastRemotingSender. + EXPECT_EQ(1, NumberOfFramesInFlight()); +} + +TEST_F(RemotingSenderTest, StopsConsumingWhileTooManyFramesAreInFlight) { + EXPECT_TRUE(IsFlowRestartPending()); + + // Send out the maximum possible number of unacked frames, but don't ack any + // yet. + for (int i = 0; i < media::cast::kMaxUnackedFrames; ++i) { + ASSERT_TRUE(ProduceDataChunk(0, 4)); + PostMojoCallTask_SendFrame(4); + } + RunPendingTasks(); + EXPECT_FALSE(IsFlowRestartPending()); + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + // Note: All frames should have been sent to the Transport, and so + // CastRemotingSender's single-frame data buffer should be empty. + EXPECT_EQ(0u, GetSizeOfNextFrameData()); + + // When the client provides one more frame, CastRemotingSender will begin + // queuing input operations instead of sending the the frame to the + // CastTransport. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + PostMojoCallTask_SendFrame(4); + RunPendingTasks(); + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + // Note: The unsent frame resides in CastRemotingSender's single-frame data + // buffer. + EXPECT_EQ(4u, GetSizeOfNextFrameData()); + + // Ack the the first frame and expect sending to resume, with one more frame + // being sent to the CastTransport. + AckOldestInFlightFrames(1); + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + // Note: Only one frame was backlogged, and so CastRemotingSender's + // single-frame data buffer should be empty. + EXPECT_EQ(0u, GetSizeOfNextFrameData()); + + // Attempting to send another frame will once again cause CastRemotingSender + // to queue input operations. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + PostMojoCallTask_SendFrame(4); + RunPendingTasks(); + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + // Note: Once again, CastRemotingSender's single-frame data buffer contains an + // unsent frame. + EXPECT_EQ(4u, GetSizeOfNextFrameData()); + + // Send more frames: Some number of frames will queue-up inside the Mojo data + // pipe (the exact number depends on the data pipe's capacity, and how Mojo + // manages memory internally). At some point, attempting to produce and push + // another frame will fail because the data pipe is full. + int num_frames_in_data_pipe = 0; + while (ProduceDataChunk(0, 768)) { + ++num_frames_in_data_pipe; + PostMojoCallTask_SendFrame(768); + RunPendingTasks(); + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + // Note: CastRemotingSender's single-frame data buffer should still contain + // the unsent 4-byte frame. + EXPECT_EQ(4u, GetSizeOfNextFrameData()); + } + EXPECT_LT(0, num_frames_in_data_pipe); + + // Ack one frame at a time until the backlog in the Mojo data pipe has + // cleared. + int remaining_frames_in_data_pipe = num_frames_in_data_pipe; + while (remaining_frames_in_data_pipe > 0) { + AckOldestInFlightFrames(1); + RunPendingTasks(); + --remaining_frames_in_data_pipe; + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + EXPECT_EQ(768u, GetSizeOfNextFrameData()); + } + + // Ack one more frame. There should no longer be a backlog on the input side + // of things. + AckOldestInFlightFrames(1); + RunPendingTasks(); // No additional Mojo method calls should be made here. + EXPECT_EQ(media::cast::kMaxUnackedFrames, NumberOfFramesInFlight()); + // The single-frame data buffer should be empty to indicate no input backlog. + EXPECT_EQ(0u, GetSizeOfNextFrameData()); + + // Ack all but one frame. + AckOldestInFlightFrames(NumberOfFramesInFlight() - 1); + EXPECT_EQ(1, NumberOfFramesInFlight()); + // ..and one more frame can be sent immediately. + ASSERT_TRUE(ProduceDataChunk(0, 4)); + PostMojoCallTask_SendFrame(4); + RunPendingTasks(); + EXPECT_EQ(2, NumberOfFramesInFlight()); + // ...and ack these last two frames. + AckOldestInFlightFrames(2); + EXPECT_EQ(0, NumberOfFramesInFlight()); + + // Finally, examine all frames that were sent to the CastTransport, and + // confirm their metadata and data is valid. + std::vector<media::cast::EncodedFrame> frames; + TakeSentFrames(&frames); + const size_t total_frames_sent = + media::cast::kMaxUnackedFrames + 2 + num_frames_in_data_pipe + 1; + ASSERT_EQ(total_frames_sent, frames.size()); + media::cast::RtpTimeTicks last_rtp_timestamp = + media::cast::RtpTimeTicks() - media::cast::RtpTimeDelta::FromTicks(1); + for (size_t i = 0; i < total_frames_sent; ++i) { + const media::cast::EncodedFrame& frame = frames[i]; + EXPECT_EQ(media::cast::FrameId::first() + i, frame.frame_id); + if (i == 0) { + EXPECT_EQ(media::cast::EncodedFrame::KEY, frame.dependency); + EXPECT_EQ(media::cast::FrameId::first() + i, frame.referenced_frame_id); + } else { + EXPECT_EQ(media::cast::EncodedFrame::DEPENDENT, frame.dependency); + EXPECT_EQ(media::cast::FrameId::first() + i - 1, + frame.referenced_frame_id); + } + + // RTP timestamp must be monotonically increasing. + EXPECT_GT(frame.rtp_timestamp, last_rtp_timestamp); + last_rtp_timestamp = frame.rtp_timestamp; + + size_t expected_frame_size = 4; + if ((i >= media::cast::kMaxUnackedFrames + 2u) && + (i < media::cast::kMaxUnackedFrames + 2u + num_frames_in_data_pipe)) { + expected_frame_size = 768; + } + EXPECT_TRUE(ExpectCorrectFrameData(expected_frame_size, frame)); + } +} + +} // namespace mirroring diff --git a/chromium/components/mirroring/service/rtp_stream.cc b/chromium/components/mirroring/service/rtp_stream.cc index d9a95d39b28..b3ca6b5d754 100644 --- a/chromium/components/mirroring/service/rtp_stream.cc +++ b/chromium/components/mirroring/service/rtp_stream.cc @@ -37,14 +37,13 @@ VideoRtpStream::VideoRtpStream( : video_sender_(std::move(video_sender)), client_(client), consecutive_refresh_count_(0), - expecting_a_refresh_frame_(false), - weak_factory_(this) { + expecting_a_refresh_frame_(false) { DCHECK(video_sender_); DCHECK(client); refresh_timer_.Start(FROM_HERE, kRefreshInterval, base::BindRepeating(&VideoRtpStream::OnRefreshTimerFired, - weak_factory_.GetWeakPtr())); + this->AsWeakPtr())); } VideoRtpStream::~VideoRtpStream() {} @@ -112,7 +111,7 @@ AudioRtpStream::AudioRtpStream( AudioRtpStream::~AudioRtpStream() {} void AudioRtpStream::InsertAudio(std::unique_ptr<media::AudioBus> audio_bus, - base::TimeTicks capture_time) { + const base::TimeTicks& capture_time) { audio_sender_->InsertAudio(std::move(audio_bus), capture_time); } diff --git a/chromium/components/mirroring/service/rtp_stream.h b/chromium/components/mirroring/service/rtp_stream.h index 9bffae1f60d..dbbf6673864 100644 --- a/chromium/components/mirroring/service/rtp_stream.h +++ b/chromium/components/mirroring/service/rtp_stream.h @@ -61,7 +61,7 @@ class RtpStreamClient { // regular intervals for a short period of time. This provides the video // encoder, downstream, several copies of the last frame so that it may clear up // lossy encoding artifacts. -class VideoRtpStream { +class VideoRtpStream : public base::SupportsWeakPtr<VideoRtpStream> { public: VideoRtpStream(std::unique_ptr<media::cast::VideoSender> video_sender, base::WeakPtr<RtpStreamClient> client); @@ -71,10 +71,6 @@ class VideoRtpStream { // |video_frame| is required to provide REFERENCE_TIME in the metadata. void InsertVideoFrame(scoped_refptr<media::VideoFrame> video_frame); - base::WeakPtr<VideoRtpStream> AsWeakPtr() { - return weak_factory_.GetWeakPtr(); - } - void SetTargetPlayoutDelay(base::TimeDelta playout_delay); private: @@ -94,15 +90,11 @@ class VideoRtpStream { // cleared once the next frame is received. bool expecting_a_refresh_frame_; - base::WeakPtrFactory<VideoRtpStream> weak_factory_; - DISALLOW_COPY_AND_ASSIGN(VideoRtpStream); }; // Receives audio data and submits the data to media::cast::AudioSender. -// TODO(xjz): Complete implementation after Audio Service mirroring refactoring -// is landed. -class AudioRtpStream { +class AudioRtpStream : public base::SupportsWeakPtr<AudioRtpStream> { public: AudioRtpStream(std::unique_ptr<media::cast::AudioSender> audio_sender, base::WeakPtr<RtpStreamClient> client); @@ -110,7 +102,7 @@ class AudioRtpStream { // Called by AudioCaptureClient when new audio data is available. void InsertAudio(std::unique_ptr<media::AudioBus> audio_bus, - base::TimeTicks estimated_capture_time); + const base::TimeTicks& estimated_capture_time); void SetTargetPlayoutDelay(base::TimeDelta playout_delay); diff --git a/chromium/components/mirroring/service/session.cc b/chromium/components/mirroring/service/session.cc index 58047f8db81..f99169ae07b 100644 --- a/chromium/components/mirroring/service/session.cc +++ b/chromium/components/mirroring/service/session.cc @@ -4,6 +4,12 @@ #include "components/mirroring/service/session.h" +#include <algorithm> +#include <memory> +#include <string> +#include <utility> +#include <vector> + #include "base/json/json_writer.h" #include "base/logging.h" #include "base/rand_util.h" @@ -17,9 +23,13 @@ #include "base/time/time.h" #include "base/values.h" #include "build/build_config.h" +#include "components/mirroring/service/captured_audio_input.h" #include "components/mirroring/service/udp_socket_client.h" #include "components/mirroring/service/video_capture_client.h" #include "crypto/random.h" +#include "media/audio/audio_input_device.h" +#include "media/base/audio_capturer_source.h" +#include "media/base/bind_to_current_loop.h" #include "media/cast/net/cast_transport.h" #include "media/cast/sender/audio_sender.h" #include "media/cast/sender/video_sender.h" @@ -36,6 +46,8 @@ using media::cast::FrameEvent; using media::cast::PacketEvent; using media::cast::OperationalStatus; using media::cast::Packet; +using media::mojom::RemotingSinkAudioCapability; +using media::mojom::RemotingSinkVideoCapability; namespace mirroring { @@ -50,6 +62,11 @@ constexpr base::TimeDelta kSendEventsInterval = base::TimeDelta::FromSeconds(1); constexpr base::TimeDelta kOfferAnswerExchangeTimeout = base::TimeDelta::FromSeconds(15); +// Amount of time to wait before assuming the Cast Receiver does not support +// querying for capabilities via GET_CAPABILITIES. +constexpr base::TimeDelta kGetCapabilitiesTimeout = + base::TimeDelta::FromSeconds(30); + // Used for OFFER/ANSWER message exchange. Some receivers will error out on // payloadType values other than the ones hard-coded here. constexpr int kAudioPayloadType = 127; @@ -167,10 +184,10 @@ void AddStreamObject(int stream_index, (config.rtp_payload_type <= media::cast::RtpPayloadType::AUDIO_LAST); stream.SetKey("rtpPayloadType", base::Value(is_audio ? kAudioPayloadType : kVideoPayloadType)); - stream.SetKey("ssrc", base::Value(int(config.sender_ssrc))); - stream.SetKey( - "targetDelay", - base::Value(int(config.animated_playout_delay.InMilliseconds()))); + stream.SetKey("ssrc", base::Value(static_cast<int>(config.sender_ssrc))); + stream.SetKey("targetDelay", + base::Value(static_cast<int>( + config.animated_playout_delay.InMilliseconds()))); stream.SetKey("aesKey", base::Value(base::HexEncode(config.aes_key.data(), config.aes_key.size()))); stream.SetKey("aesIvMask", @@ -215,8 +232,137 @@ void AddStreamObject(int stream_index, stream_list->emplace_back(std::move(stream)); } +// Checks whether receiver's build version is less than "1.|base_version|.xxxx". +// Returns false if given version doesn't have the format of "1.xx.xxxx". +bool NeedsWorkaroundForOlder1DotXVersions( + const std::string& receiver_build_version, + int base_version) { + if (!base::StartsWith(receiver_build_version, "1.", + base::CompareCase::SENSITIVE)) + return false; + const size_t end_pos = receiver_build_version.find_first_of('.', 2); + if (end_pos == std::string::npos) + return false; + int version = 0; + return (base::StringToInt(receiver_build_version.substr(2, end_pos - 2), + &version) && + version < base_version); +} + +// Convert the sink capabilities to media::mojom::RemotingSinkMetadata. +media::mojom::RemotingSinkMetadata ToRemotingSinkMetadata( + const std::vector<std::string>& capabilities, + const CastSinkInfo& sink_info, + const std::string& receiver_build_version) { + media::mojom::RemotingSinkMetadata sink_metadata; + sink_metadata.friendly_name = sink_info.friendly_name; + + for (const auto& capability : capabilities) { + if (capability == "audio") { + sink_metadata.audio_capabilities.push_back( + RemotingSinkAudioCapability::CODEC_BASELINE_SET); + } else if (capability == "aac") { + sink_metadata.audio_capabilities.push_back( + RemotingSinkAudioCapability::CODEC_AAC); + } else if (capability == "opus") { + sink_metadata.audio_capabilities.push_back( + RemotingSinkAudioCapability::CODEC_OPUS); + } else if (capability == "video") { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::CODEC_BASELINE_SET); + } else if (capability == "4k") { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::SUPPORT_4K); + } else if (capability == "h264") { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::CODEC_H264); + } else if (capability == "vp8") { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::CODEC_VP8); + } else if (capability == "vp9") { + // Before 1.27 Earth receivers report "vp9" even though they don't support + // remoting the VP9 encoded video. + if (!NeedsWorkaroundForOlder1DotXVersions(receiver_build_version, 27) || + base::StartsWith(sink_info.model_name, "Chromecast Ultra", + base::CompareCase::SENSITIVE)) { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::CODEC_VP9); + } + } else if (capability == "hevc") { + // Before 1.27 Earth receivers report "hevc" even though they don't + // support remoting the HEVC encoded video. + if (!NeedsWorkaroundForOlder1DotXVersions(receiver_build_version, 27) || + base::StartsWith(sink_info.model_name, "Chromecast Ultra", + base::CompareCase::SENSITIVE)) { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::CODEC_HEVC); + } + } else { + DVLOG(1) << "Unknown mediaCap name: " << capability; + } + } + + // Enable remoting 1080p 30fps or higher resolution/fps content for Chromecast + // Ultra receivers only. + // TODO(xjz): Receiver should report this capability. + if (sink_info.model_name == "Chromecast Ultra") { + sink_metadata.video_capabilities.push_back( + RemotingSinkVideoCapability::SUPPORT_4K); + } + + return sink_metadata; +} + } // namespace +class Session::AudioCapturingCallback final + : public media::AudioCapturerSource::CaptureCallback { + public: + using AudioDataCallback = + base::RepeatingCallback<void(std::unique_ptr<media::AudioBus> audio_bus, + const base::TimeTicks& recorded_time)>; + AudioCapturingCallback(AudioDataCallback audio_data_callback, + base::OnceClosure error_callback) + : audio_data_callback_(std::move(audio_data_callback)), + error_callback_(std::move(error_callback)) { + DCHECK(!audio_data_callback_.is_null()); + } + + ~AudioCapturingCallback() override {} + + private: + // media::AudioCapturerSource::CaptureCallback implementation. + void OnCaptureStarted() override {} + + // Called on audio thread. + void Capture(const media::AudioBus* audio_bus, + int audio_delay_milliseconds, + double volume, + bool key_pressed) override { + // TODO(xjz): Don't copy the audio data. Instead, send |audio_bus| directly + // to the encoder. + std::unique_ptr<media::AudioBus> captured_audio = + media::AudioBus::Create(audio_bus->channels(), audio_bus->frames()); + audio_bus->CopyTo(captured_audio.get()); + const base::TimeTicks recorded_time = + base::TimeTicks::Now() - + base::TimeDelta::FromMilliseconds(audio_delay_milliseconds); + audio_data_callback_.Run(std::move(captured_audio), recorded_time); + } + + void OnCaptureError(const std::string& message) override { + if (!error_callback_.is_null()) + std::move(error_callback_).Run(); + } + + void OnCaptureMuted(bool is_muted) override {} + + const AudioDataCallback audio_data_callback_; + base::OnceClosure error_callback_; + + DISALLOW_COPY_AND_ASSIGN(AudioCapturingCallback); +}; + Session::Session(int32_t session_id, const CastSinkInfo& sink_info, const gfx::Size& max_resolution, @@ -225,6 +371,7 @@ Session::Session(int32_t session_id, CastMessageChannel* outbound_channel) : session_id_(session_id), sink_info_(sink_info), + state_(MIRRORING), observer_(observer), resource_provider_(resource_provider), message_dispatcher_(outbound_channel, @@ -236,13 +383,13 @@ Session::Session(int32_t session_id, max_resolution.height()); resource_provider_->GetNetworkContext(mojo::MakeRequest(&network_context_)); - auto wifi_status_monitor = - std::make_unique<WifiStatusMonitor>(session_id_, &message_dispatcher_); + network::mojom::URLLoaderFactoryParamsPtr params = + network::mojom::URLLoaderFactoryParams::New(); + params->process_id = network::mojom::kBrowserProcessId; + params->is_corb_enabled = false; network::mojom::URLLoaderFactoryPtr url_loader_factory; network_context_->CreateURLLoaderFactory( - mojo::MakeRequest(&url_loader_factory), - network::mojom::URLLoaderFactoryParams::New( - network::mojom::kBrowserProcessId, false, std::string())); + mojo::MakeRequest(&url_loader_factory), std::move(params)); // Generate session level tags. base::Value session_tags(base::Value::Type::DICTIONARY); @@ -254,9 +401,9 @@ Session::Session(int32_t session_id, session_tags.SetKey("receiverProductName", base::Value(sink_info_.model_name)); - session_monitor_.emplace( - kMaxCrashReportBytes, sink_info_.ip_address, std::move(session_tags), - std::move(url_loader_factory), std::move(wifi_status_monitor)); + session_monitor_.emplace(kMaxCrashReportBytes, sink_info_.ip_address, + std::move(session_tags), + std::move(url_loader_factory)); CreateAndSendOffer(); } @@ -268,26 +415,48 @@ Session::~Session() { void Session::ReportError(SessionError error) { if (session_monitor_.has_value()) session_monitor_->OnStreamingError(error); + if (state_ == REMOTING) { + media_remoter_->OnRemotingFailed(); // Try to fallback on mirroring. + return; + } + + // Report the error and stop this session. if (observer_) observer_->OnError(error); StopSession(); } +void Session::StopStreaming() { + DVLOG(2) << __func__ << " state=" << state_; + if (!cast_environment_) + return; + + session_monitor_->StopStreamingSession(); + if (audio_input_device_) { + audio_input_device_->Stop(); + audio_input_device_ = nullptr; + } + audio_capturing_callback_.reset(); + audio_stream_.reset(); + video_stream_.reset(); + cast_transport_.reset(); + cast_environment_ = nullptr; +} + void Session::StopSession() { DVLOG(1) << __func__; - if (!resource_provider_) + if (state_ == STOPPED) return; - session_monitor_->StopStreamingSession(); + state_ = STOPPED; + StopStreaming(); + session_monitor_.reset(); weak_factory_.InvalidateWeakPtrs(); audio_encode_thread_ = nullptr; video_encode_thread_ = nullptr; video_capture_client_.reset(); - audio_stream_.reset(); - video_stream_.reset(); - cast_transport_.reset(); - cast_environment_ = nullptr; + media_remoter_.reset(); resource_provider_ = nullptr; if (observer_) { observer_->DidStop(); @@ -383,10 +552,12 @@ void Session::OnLoggingEventsReceived( std::move(packet_events)); } -void Session::OnAnswer(const std::string& cast_mode, - const std::vector<FrameSenderConfig>& audio_configs, +void Session::OnAnswer(const std::vector<FrameSenderConfig>& audio_configs, const std::vector<FrameSenderConfig>& video_configs, const ReceiverResponse& response) { + if (state_ == STOPPED) + return; + if (!response.answer || response.type == ResponseType::UNKNOWN) { ReportError(ANSWER_TIME_OUT); return; @@ -400,6 +571,8 @@ void Session::OnAnswer(const std::string& cast_mode, } const Answer& answer = *response.answer; + const std::string cast_mode = + (state_ == MIRRORING ? "mirroring" : "remoting"); if (answer.cast_mode != cast_mode) { ReportError(ANSWER_MISMATCHED_CAST_MODE); return; @@ -450,23 +623,19 @@ void Session::OnAnswer(const std::string& cast_mode, return; } - if ((has_audio && - audio_config.rtp_payload_type == RtpPayloadType::REMOTE_AUDIO) || - (has_video && - video_config.rtp_payload_type == RtpPayloadType::REMOTE_VIDEO)) { - NOTIMPLEMENTED(); // TODO(xjz): Add support for media remoting. - return; - } - // Start streaming. - audio_encode_thread_ = base::CreateSingleThreadTaskRunnerWithTraits( - {base::TaskPriority::USER_BLOCKING, - base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, - base::SingleThreadTaskRunnerThreadMode::DEDICATED); - video_encode_thread_ = base::CreateSingleThreadTaskRunnerWithTraits( - {base::TaskPriority::USER_BLOCKING, - base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, - base::SingleThreadTaskRunnerThreadMode::DEDICATED); + const bool initially_starting_session = + !audio_encode_thread_ && !video_encode_thread_; + if (initially_starting_session) { + audio_encode_thread_ = base::CreateSingleThreadTaskRunnerWithTraits( + {base::TaskPriority::USER_BLOCKING, + base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, + base::SingleThreadTaskRunnerThreadMode::DEDICATED); + video_encode_thread_ = base::CreateSingleThreadTaskRunnerWithTraits( + {base::TaskPriority::USER_BLOCKING, + base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, + base::SingleThreadTaskRunnerThreadMode::DEDICATED); + } cast_environment_ = new media::cast::CastEnvironment( base::DefaultTickClock::GetInstance(), base::ThreadTaskRunnerHandle::Get(), audio_encode_thread_, @@ -481,51 +650,98 @@ void Session::OnAnswer(const std::string& cast_mode, std::make_unique<TransportClient>(this), std::move(udp_client), base::ThreadTaskRunnerHandle::Get()); - if (has_audio) { - auto audio_sender = std::make_unique<media::cast::AudioSender>( - cast_environment_, audio_config, - base::BindRepeating(&Session::OnEncoderStatusChange, - weak_factory_.GetWeakPtr()), - cast_transport_.get()); - audio_stream_ = std::make_unique<AudioRtpStream>( - std::move(audio_sender), weak_factory_.GetWeakPtr()); - // TODO(xjz): Start audio capturing. - NOTIMPLEMENTED(); - } + if (state_ == REMOTING) { + DCHECK(media_remoter_); + DCHECK(audio_config.rtp_payload_type == RtpPayloadType::REMOTE_AUDIO || + video_config.rtp_payload_type == RtpPayloadType::REMOTE_VIDEO); + media_remoter_->StartRpcMessaging(cast_environment_, cast_transport_.get(), + audio_config, video_config); + } else /* MIRRORING */ { + if (has_audio) { + auto audio_sender = std::make_unique<media::cast::AudioSender>( + cast_environment_, audio_config, + base::BindRepeating(&Session::OnEncoderStatusChange, + weak_factory_.GetWeakPtr()), + cast_transport_.get()); + audio_stream_ = std::make_unique<AudioRtpStream>( + std::move(audio_sender), weak_factory_.GetWeakPtr()); + DCHECK(!audio_capturing_callback_); + // TODO(xjz): Elliminate the thread hops. The audio data is thread-hopped + // from the audio thread, and later thread-hopped again to the encoding + // thread. + audio_capturing_callback_ = std::make_unique<AudioCapturingCallback>( + media::BindToCurrentLoop(base::BindRepeating( + &AudioRtpStream::InsertAudio, audio_stream_->AsWeakPtr())), + base::BindOnce(&Session::ReportError, weak_factory_.GetWeakPtr(), + SessionError::AUDIO_CAPTURE_ERROR)); + audio_input_device_ = new media::AudioInputDevice( + std::make_unique<CapturedAudioInput>(base::BindRepeating( + &Session::CreateAudioStream, base::Unretained(this))), + base::ThreadPriority::NORMAL); + audio_input_device_->Initialize(mirror_settings_.GetAudioCaptureParams(), + audio_capturing_callback_.get()); + audio_input_device_->Start(); + } - if (has_video) { - auto video_sender = std::make_unique<media::cast::VideoSender>( - cast_environment_, video_config, - base::BindRepeating(&Session::OnEncoderStatusChange, - weak_factory_.GetWeakPtr()), - base::BindRepeating(&Session::CreateVideoEncodeAccelerator, - weak_factory_.GetWeakPtr()), - base::BindRepeating(&Session::CreateVideoEncodeMemory, - weak_factory_.GetWeakPtr()), - cast_transport_.get(), - base::BindRepeating(&Session::SetTargetPlayoutDelay, - weak_factory_.GetWeakPtr())); - video_stream_ = std::make_unique<VideoRtpStream>( - std::move(video_sender), weak_factory_.GetWeakPtr()); - media::mojom::VideoCaptureHostPtr video_host; - resource_provider_->GetVideoCaptureHost(mojo::MakeRequest(&video_host)); - video_capture_client_ = std::make_unique<VideoCaptureClient>( - mirror_settings_.GetVideoCaptureParams(), std::move(video_host)); - video_capture_client_->Start( - base::BindRepeating(&VideoRtpStream::InsertVideoFrame, - video_stream_->AsWeakPtr()), - base::BindOnce(&Session::ReportError, weak_factory_.GetWeakPtr(), - SessionError::VIDEO_CAPTURE_ERROR)); + if (has_video) { + auto video_sender = std::make_unique<media::cast::VideoSender>( + cast_environment_, video_config, + base::BindRepeating(&Session::OnEncoderStatusChange, + weak_factory_.GetWeakPtr()), + base::BindRepeating(&Session::CreateVideoEncodeAccelerator, + weak_factory_.GetWeakPtr()), + base::BindRepeating(&Session::CreateVideoEncodeMemory, + weak_factory_.GetWeakPtr()), + cast_transport_.get(), + base::BindRepeating(&Session::SetTargetPlayoutDelay, + weak_factory_.GetWeakPtr())); + video_stream_ = std::make_unique<VideoRtpStream>( + std::move(video_sender), weak_factory_.GetWeakPtr()); + if (!video_capture_client_) { + media::mojom::VideoCaptureHostPtr video_host; + resource_provider_->GetVideoCaptureHost(mojo::MakeRequest(&video_host)); + video_capture_client_ = std::make_unique<VideoCaptureClient>( + mirror_settings_.GetVideoCaptureParams(), std::move(video_host)); + video_capture_client_->Start( + base::BindRepeating(&VideoRtpStream::InsertVideoFrame, + video_stream_->AsWeakPtr()), + base::BindOnce(&Session::ReportError, weak_factory_.GetWeakPtr(), + SessionError::VIDEO_CAPTURE_ERROR)); + } else { + video_capture_client_->Resume(base::BindRepeating( + &VideoRtpStream::InsertVideoFrame, video_stream_->AsWeakPtr())); + } + } + if (media_remoter_) + media_remoter_->OnMirroringResumed(); } + DCHECK(session_monitor_.has_value()); const SessionMonitor::SessionType session_type = (has_audio && has_video) ? SessionMonitor::AUDIO_AND_VIDEO : has_audio ? SessionMonitor::AUDIO_ONLY : SessionMonitor::VIDEO_ONLY; - session_monitor_->StartStreamingSession(cast_environment_, session_type, - false /* is_remoting */); + std::unique_ptr<WifiStatusMonitor> wifi_status_monitor; + if (answer.supports_get_status) { + wifi_status_monitor = + std::make_unique<WifiStatusMonitor>(session_id_, &message_dispatcher_); + // Before 1.28 Android TV Chromecast receivers respond to GET_CAPABILITIES + // even though they don't support remoting. + if (initially_starting_session && + (!NeedsWorkaroundForOlder1DotXVersions( + session_monitor_->GetReceiverBuildVersion(), 28) || + base::StartsWith(sink_info_.model_name, "Chromecast", + base::CompareCase::SENSITIVE) || + base::StartsWith(sink_info_.model_name, "Eureka Dongle", + base::CompareCase::SENSITIVE))) { + QueryCapabilitiesForRemoting(); + } + } + session_monitor_->StartStreamingSession(cast_environment_, + std::move(wifi_status_monitor), + session_type, state_ == REMOTING); - if (observer_) + if (initially_starting_session && observer_) observer_->DidStart(); } @@ -533,6 +749,12 @@ void Session::OnResponseParsingError(const std::string& error_message) { // TODO(xjz): Log the |error_message| in the mirroring logs. } +void Session::CreateAudioStream(AudioStreamCreatorClient* client, + const media::AudioParameters& params, + uint32_t shared_memory_count) { + resource_provider_->CreateAudioStream(client, params, shared_memory_count); +} + void Session::SetTargetPlayoutDelay(base::TimeDelta playout_delay) { if (audio_stream_) audio_stream_->SetTargetPlayoutDelay(playout_delay); @@ -541,6 +763,8 @@ void Session::SetTargetPlayoutDelay(base::TimeDelta playout_delay) { } void Session::CreateAndSendOffer() { + DCHECK(state_ != STOPPED); + // The random AES key and initialization vector pair used by all streams in // this session. const std::string aes_key = MakeRandomString(16); // AES-128. @@ -552,46 +776,62 @@ void Session::CreateAndSendOffer() { base::Value::ListStorage stream_list; int stream_index = 0; if (sink_info_.capability != DeviceCapability::VIDEO_ONLY) { - FrameSenderConfig config = MirrorSettings::GetDefaultAudioConfig( - RtpPayloadType::AUDIO_OPUS, Codec::CODEC_AUDIO_OPUS); - AddSenderConfig(base::RandInt(kAudioSsrcMin, kAudioSsrcMax), config, - aes_key, aes_iv, &audio_configs); - AddStreamObject(stream_index++, "OPUS", audio_configs.back(), - mirror_settings_, &stream_list); - } - if (sink_info_.capability != DeviceCapability::AUDIO_ONLY) { - const int32_t video_ssrc = base::RandInt(kVideoSsrcMin, kVideoSsrcMax); - if (IsHardwareVP8EncodingSupported(GetSupportedVeaProfiles())) { - FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( - RtpPayloadType::VIDEO_VP8, Codec::CODEC_VIDEO_VP8); - config.use_external_encoder = true; - AddSenderConfig(video_ssrc, config, aes_key, aes_iv, &video_configs); - AddStreamObject(stream_index++, "VP8", video_configs.back(), + const int32_t audio_ssrc = base::RandInt(kAudioSsrcMin, kAudioSsrcMax); + if (state_ == MIRRORING) { + FrameSenderConfig config = MirrorSettings::GetDefaultAudioConfig( + RtpPayloadType::AUDIO_OPUS, Codec::CODEC_AUDIO_OPUS); + AddSenderConfig(audio_ssrc, config, aes_key, aes_iv, &audio_configs); + AddStreamObject(stream_index++, "OPUS", audio_configs.back(), mirror_settings_, &stream_list); - } - if (IsHardwareH264EncodingSupported(GetSupportedVeaProfiles())) { - FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( - RtpPayloadType::VIDEO_H264, Codec::CODEC_VIDEO_H264); - config.use_external_encoder = true; - AddSenderConfig(video_ssrc, config, aes_key, aes_iv, &video_configs); - AddStreamObject(stream_index++, "H264", video_configs.back(), + } else /* REMOTING */ { + FrameSenderConfig config = MirrorSettings::GetDefaultAudioConfig( + RtpPayloadType::REMOTE_AUDIO, Codec::CODEC_AUDIO_REMOTE); + AddSenderConfig(audio_ssrc, config, aes_key, aes_iv, &audio_configs); + AddStreamObject(stream_index++, "REMOTE_AUDIO", audio_configs.back(), mirror_settings_, &stream_list); } - if (video_configs.empty()) { + } + if (sink_info_.capability != DeviceCapability::AUDIO_ONLY) { + const int32_t video_ssrc = base::RandInt(kVideoSsrcMin, kVideoSsrcMax); + if (state_ == MIRRORING) { + if (IsHardwareVP8EncodingSupported(GetSupportedVeaProfiles())) { + FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( + RtpPayloadType::VIDEO_VP8, Codec::CODEC_VIDEO_VP8); + config.use_external_encoder = true; + AddSenderConfig(video_ssrc, config, aes_key, aes_iv, &video_configs); + AddStreamObject(stream_index++, "VP8", video_configs.back(), + mirror_settings_, &stream_list); + } + if (IsHardwareH264EncodingSupported(GetSupportedVeaProfiles())) { + FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( + RtpPayloadType::VIDEO_H264, Codec::CODEC_VIDEO_H264); + config.use_external_encoder = true; + AddSenderConfig(video_ssrc, config, aes_key, aes_iv, &video_configs); + AddStreamObject(stream_index++, "H264", video_configs.back(), + mirror_settings_, &stream_list); + } + if (video_configs.empty()) { + FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( + RtpPayloadType::VIDEO_VP8, Codec::CODEC_VIDEO_VP8); + AddSenderConfig(video_ssrc, config, aes_key, aes_iv, &video_configs); + AddStreamObject(stream_index++, "VP8", video_configs.back(), + mirror_settings_, &stream_list); + } + } else /* REMOTING */ { FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( - RtpPayloadType::VIDEO_VP8, Codec::CODEC_VIDEO_VP8); + RtpPayloadType::REMOTE_VIDEO, Codec::CODEC_VIDEO_REMOTE); AddSenderConfig(video_ssrc, config, aes_key, aes_iv, &video_configs); - AddStreamObject(stream_index++, "VP8", video_configs.back(), + AddStreamObject(stream_index++, "REMOTE_VIDEO", video_configs.back(), mirror_settings_, &stream_list); } } DCHECK(!audio_configs.empty() || !video_configs.empty()); // Assemble the OFFER message. - const std::string cast_mode = "mirroring"; base::Value offer(base::Value::Type::DICTIONARY); - offer.SetKey("castMode", base::Value(cast_mode)); - offer.SetKey("receiverGetStatus", base::Value("true")); + offer.SetKey("castMode", + base::Value(state_ == MIRRORING ? "mirroring" : "remoting")); + offer.SetKey("receiverGetStatus", base::Value(true)); offer.SetKey("supportedStreams", base::Value(stream_list)); const int32_t sequence_number = message_dispatcher_.GetNextSeqNumber(); @@ -610,8 +850,75 @@ void Session::CreateAndSendOffer() { message_dispatcher_.RequestReply( message_to_receiver, ResponseType::ANSWER, sequence_number, kOfferAnswerExchangeTimeout, - base::BindOnce(&Session::OnAnswer, base::Unretained(this), cast_mode, - audio_configs, video_configs)); + base::BindOnce(&Session::OnAnswer, base::Unretained(this), audio_configs, + video_configs)); +} + +void Session::ConnectToRemotingSource( + media::mojom::RemoterPtr remoter, + media::mojom::RemotingSourceRequest request) { + resource_provider_->ConnectToRemotingSource(std::move(remoter), + std::move(request)); +} + +void Session::RequestRemotingStreaming() { + DCHECK(media_remoter_); + DCHECK_EQ(MIRRORING, state_); + if (video_capture_client_) + video_capture_client_->Pause(); + StopStreaming(); + state_ = REMOTING; + CreateAndSendOffer(); +} + +void Session::RestartMirroringStreaming() { + if (state_ != REMOTING) + return; + StopStreaming(); + state_ = MIRRORING; + CreateAndSendOffer(); +} + +void Session::QueryCapabilitiesForRemoting() { + DCHECK(!media_remoter_); + const int32_t sequence_number = message_dispatcher_.GetNextSeqNumber(); + base::Value query(base::Value::Type::DICTIONARY); + query.SetKey("type", base::Value("GET_CAPABILITIES")); + query.SetKey("sessionId", base::Value(session_id_)); + query.SetKey("seqNum", base::Value(sequence_number)); + + CastMessage query_message; + query_message.message_namespace = kWebRtcNamespace; + const bool did_serialize_query = + base::JSONWriter::Write(query, &query_message.json_format_data); + DCHECK(did_serialize_query); + message_dispatcher_.RequestReply( + query_message, ResponseType::CAPABILITIES_RESPONSE, sequence_number, + kGetCapabilitiesTimeout, + base::BindOnce(&Session::OnCapabilitiesResponse, base::Unretained(this))); +} + +void Session::OnCapabilitiesResponse(const ReceiverResponse& response) { + if (!response.capabilities || response.type == ResponseType::UNKNOWN) { + VLOG(1) << "Receiver doens't support GET_CAPABILITIES. Remoting disabled."; + return; + } + if (response.result != "ok") { + VLOG(1) << "Bad CAPABILITIES_RESPONSE. Remoting disabled."; + if (response.error) { + VLOG(1) << "error code=" << response.error->code + << " description=" << response.error->description + << " details=" << response.error->details; + } + return; + } + const std::vector<std::string>& caps = response.capabilities->media_caps; + const std::string receiver_build_version = + session_monitor_.has_value() ? session_monitor_->GetReceiverBuildVersion() + : ""; + media_remoter_ = std::make_unique<MediaRemoter>( + this, ToRemotingSinkMetadata(caps, sink_info_, receiver_build_version), + &message_dispatcher_); } } // namespace mirroring diff --git a/chromium/components/mirroring/service/session.h b/chromium/components/mirroring/service/session.h index efc7d8b125b..41c21c37104 100644 --- a/chromium/components/mirroring/service/session.h +++ b/chromium/components/mirroring/service/session.h @@ -9,6 +9,7 @@ #include "base/optional.h" #include "base/single_thread_task_runner.h" #include "components/mirroring/service/interface.h" +#include "components/mirroring/service/media_remoter.h" #include "components/mirroring/service/message_dispatcher.h" #include "components/mirroring/service/mirror_settings.h" #include "components/mirroring/service/rtp_stream.h" @@ -19,6 +20,8 @@ namespace media { +class AudioInputDevice; + namespace cast { class CastTransport; } // namespace cast @@ -31,13 +34,14 @@ struct ReceiverResponse; class VideoCaptureClient; class SessionMonitor; -// Controls a mirroring session, including audio/video capturing and Cast -// Streaming. When constructed, it does OFFER/ANSWER exchange with the mirroring -// receiver. Mirroring starts when the exchange succeeds and stops when this -// class is destructed or error occurs. |observer| will get notified when status -// changes. |outbound_channel| is responsible for sending messages to the -// mirroring receiver through Cast Channel. -class Session final : public RtpStreamClient { +// Controls a mirroring session, including audio/video capturing, Cast +// Streaming, and the switching to/from media remoting. When constructed, it +// does OFFER/ANSWER exchange with the mirroring receiver. Mirroring starts when +// the exchange succeeds and stops when this class is destructed or error +// occurs. |observer| will get notified when status changes. |outbound_channel| +// is responsible for sending messages to the mirroring receiver through Cast +// Channel. +class Session final : public RtpStreamClient, public MediaRemoter::Client { public: Session(int32_t session_id, const CastSinkInfo& sink_info, @@ -70,7 +74,6 @@ class Session final : public RtpStreamClient { // get notified with error, and session is stopped. Otherwise, capturing and // streaming are started with the selected configs. void OnAnswer( - const std::string& cast_mode, const std::vector<media::cast::FrameSenderConfig>& audio_configs, const std::vector<media::cast::FrameSenderConfig>& video_configs, const ReceiverResponse& response); @@ -79,8 +82,36 @@ class Session final : public RtpStreamClient { // responses. void OnResponseParsingError(const std::string& error_message); + // Creates an audio input stream through Audio Service. |client| will be + // called after the stream is created. + void CreateAudioStream(AudioStreamCreatorClient* client, + const media::AudioParameters& params, + uint32_t shared_memory_count); + + // Callback for CAPABILITIES_RESPONSE. + void OnCapabilitiesResponse(const ReceiverResponse& response); + private: - void StopSession(); + // To allow the test code access the |message_dispatcher_|. + // TODO(xjz): Remove this after adding the inbound message channel argument. + friend class SessionTest; + + class AudioCapturingCallback; + + // MediaRemoter::Client implementation. + void ConnectToRemotingSource( + media::mojom::RemoterPtr remoter, + media::mojom::RemotingSourceRequest source_request) override; + void RequestRemotingStreaming() override; + void RestartMirroringStreaming() override; + + // Stops the current streaming session. If not called from StopSession(), a + // new streaming session will start later after exchanging OFFER/ANSWER + // messages with the receiver. This could happen any number of times before + // StopSession() shuts down everything permanently. + void StopStreaming(); + + void StopSession(); // Shuts down the entire mirroring session. // Notify |observer_| that error occurred and close the session. void ReportError(SessionError error); @@ -96,10 +127,23 @@ class Session final : public RtpStreamClient { // Create and send OFFER message. void CreateAndSendOffer(); + // Send GET_CAPABILITIES message. + void QueryCapabilitiesForRemoting(); + // Provided by Cast Media Route Provider (MRP). const int32_t session_id_; const CastSinkInfo sink_info_; + // State transition: + // MIRRORING <-------> REMOTING + // | | + // .---> STOPPED <----. + enum { + MIRRORING, // A mirroring streaming session is starting or started. + REMOTING, // A remoting streaming session is starting or started. + STOPPED, // The session is stopped due to user's request or errors. + } state_; + SessionObserver* observer_ = nullptr; ResourceProvider* resource_provider_ = nullptr; MirrorSettings mirror_settings_; @@ -118,6 +162,9 @@ class Session final : public RtpStreamClient { std::unique_ptr<media::cast::CastTransport> cast_transport_; scoped_refptr<base::SingleThreadTaskRunner> audio_encode_thread_ = nullptr; scoped_refptr<base::SingleThreadTaskRunner> video_encode_thread_ = nullptr; + std::unique_ptr<AudioCapturingCallback> audio_capturing_callback_; + scoped_refptr<media::AudioInputDevice> audio_input_device_; + std::unique_ptr<MediaRemoter> media_remoter_; base::WeakPtrFactory<Session> weak_factory_; }; diff --git a/chromium/components/mirroring/service/session_monitor.cc b/chromium/components/mirroring/service/session_monitor.cc index 94e6d77438a..82d05b524fb 100644 --- a/chromium/components/mirroring/service/session_monitor.cc +++ b/chromium/components/mirroring/service/session_monitor.cc @@ -105,25 +105,27 @@ SessionMonitor::SessionMonitor( int max_retention_bytes, const net::IPAddress& receiver_address, base::Value session_tags, - network::mojom::URLLoaderFactoryPtr loader_factory, - std::unique_ptr<WifiStatusMonitor> wifi_status_monitor) + network::mojom::URLLoaderFactoryPtr loader_factory) : max_retention_bytes_(max_retention_bytes), receiver_address_(receiver_address), session_tags_(std::move(session_tags)), url_loader_factory_(std::move(loader_factory)), - wifi_status_monitor_(std::move(wifi_status_monitor)), stored_snapshots_bytes_(0), - weak_factory_(this) {} + weak_factory_(this) { + QueryReceiverSetupInfo(); +} SessionMonitor::~SessionMonitor() {} void SessionMonitor::StartStreamingSession( scoped_refptr<media::cast::CastEnvironment> cast_environment, + std::unique_ptr<WifiStatusMonitor> wifi_status_monitor, SessionType session_type, bool is_remoting) { DCHECK(!event_subscribers_); DCHECK(!snapshot_timer_.IsRunning()); + wifi_status_monitor_ = std::move(wifi_status_monitor); std::string session_activity = session_type == AUDIO_AND_VIDEO ? "audio+video" @@ -156,6 +158,7 @@ void SessionMonitor::StopStreamingSession() { TakeSnapshot(); // Final snapshot of this streaming session. } event_subscribers_.reset(); + wifi_status_monitor_.reset(); } void SessionMonitor::OnStreamingError(SessionError error) { @@ -305,6 +308,12 @@ void SessionMonitor::TakeSnapshot() { stored_snapshots_bytes_ = snapshots_bytes; } +std::string SessionMonitor::GetReceiverBuildVersion() const { + std::string build_version; + GetString(session_tags_, "receiverVersion", &build_version); + return build_version; +} + std::string SessionMonitor::GetEventLogsAndReset( bool is_audio, const std::string& extra_data) { diff --git a/chromium/components/mirroring/service/session_monitor.h b/chromium/components/mirroring/service/session_monitor.h index ba4892345c2..a6e27fb7249 100644 --- a/chromium/components/mirroring/service/session_monitor.h +++ b/chromium/components/mirroring/service/session_monitor.h @@ -56,8 +56,7 @@ class SessionMonitor { SessionMonitor(int max_retention_bytes, const net::IPAddress& receiver_address, base::Value session_tags, - network::mojom::URLLoaderFactoryPtr loader_factory, - std::unique_ptr<WifiStatusMonitor> wifi_status_monitor); + network::mojom::URLLoaderFactoryPtr loader_factory); ~SessionMonitor(); @@ -71,6 +70,7 @@ class SessionMonitor { // events/stats. void StartStreamingSession( scoped_refptr<media::cast::CastEnvironment> cast_environment, + std::unique_ptr<WifiStatusMonitor> wifi_status_monitor, SessionType session_type, bool is_remoting); void StopStreamingSession(); @@ -91,6 +91,8 @@ class SessionMonitor { // Takes a snapshot of recent Cast Streaming events and statistics. void TakeSnapshot(); + std::string GetReceiverBuildVersion() const; + private: // Query the receiver for its current setup and uptime. void QueryReceiverSetupInfo(); @@ -113,7 +115,7 @@ class SessionMonitor { network::mojom::URLLoaderFactoryPtr url_loader_factory_; // Monitors the WiFi status if not null. - const std::unique_ptr<WifiStatusMonitor> wifi_status_monitor_; + std::unique_ptr<WifiStatusMonitor> wifi_status_monitor_; std::unique_ptr<media::cast::RawEventSubscriberBundle> event_subscribers_; diff --git a/chromium/components/mirroring/service/session_monitor_unittest.cc b/chromium/components/mirroring/service/session_monitor_unittest.cc index c887eaa0a9f..93792e5184b 100644 --- a/chromium/components/mirroring/service/session_monitor_unittest.cc +++ b/chromium/components/mirroring/service/session_monitor_unittest.cc @@ -92,8 +92,6 @@ class SessionMonitorTest : public CastMessageChannel, public ::testing::Test { void CreateSessionMonitor(int max_bytes, std::string* expected_settings) { EXPECT_CALL(*this, Send(::testing::_)).Times(::testing::AtLeast(1)); - auto wifi_status_monitor = - std::make_unique<WifiStatusMonitor>(123, &message_dispatcher_); network::mojom::URLLoaderFactoryPtr url_loader_factory; auto test_url_loader_factory = std::make_unique<network::TestURLLoaderFactory>(); @@ -111,7 +109,7 @@ class SessionMonitorTest : public CastMessageChannel, public ::testing::Test { session_tags.SetKey("shouldCaptureVideo", base::Value(true)); session_monitor_ = std::make_unique<SessionMonitor>( max_bytes, receiver_address_, std::move(session_tags), - std::move(url_loader_factory), std::move(wifi_status_monitor)); + std::move(url_loader_factory)); } // Generates and sends |num_of_responses| WiFi status. @@ -148,9 +146,11 @@ class SessionMonitorTest : public CastMessageChannel, public ::testing::Test { scoped_task_environment_.GetMainThreadTaskRunner(), scoped_task_environment_.GetMainThreadTaskRunner()); EXPECT_TRUE(session_monitor_); - session_monitor_->StartStreamingSession(cast_environment_, - SessionMonitor::AUDIO_AND_VIDEO, - false /* is_remoting */); + auto wifi_status_monitor = + std::make_unique<WifiStatusMonitor>(123, &message_dispatcher_); + session_monitor_->StartStreamingSession( + cast_environment_, std::move(wifi_status_monitor), + SessionMonitor::AUDIO_AND_VIDEO, false /* is_remoting */); scoped_task_environment_.RunUntilIdle(); } @@ -220,8 +220,8 @@ class SessionMonitorTest : public CastMessageChannel, public ::testing::Test { TEST_F(SessionMonitorTest, ProvidesExpectedTags) { std::string expected_settings; CreateSessionMonitor(kRetentionBytes, &expected_settings); - SendWifiStatus(34, 2000, 5); StartStreamingSession(); + SendWifiStatus(34, 2000, 5); std::vector<int32_t> bundle_sizes({kRetentionBytes}); std::vector<SessionMonitor::EventsAndStats> bundles = AssembleBundleAndVerify(bundle_sizes); @@ -271,11 +271,11 @@ TEST_F(SessionMonitorTest, ConfigureMaxRetentionBytes) { // 2500 is an estimate number of bytes for a snapshot that includes tags and // five WiFi status records. CreateSessionMonitor(2500, nullptr); - SendWifiStatus(34, 2000, 5); StartStreamingSession(); + SendWifiStatus(34, 2000, 5); StopStreamingSession(); - SendWifiStatus(54, 3000, 5); StartStreamingSession(); + SendWifiStatus(54, 3000, 5); StopStreamingSession(); std::vector<int32_t> bundle_sizes({kRetentionBytes}); std::vector<SessionMonitor::EventsAndStats> bundles = @@ -289,11 +289,11 @@ TEST_F(SessionMonitorTest, ConfigureMaxRetentionBytes) { TEST_F(SessionMonitorTest, AssembleBundlesWithVaryingSizes) { CreateSessionMonitor(kRetentionBytes, nullptr); - SendWifiStatus(34, 2000, 5); StartStreamingSession(); + SendWifiStatus(34, 2000, 5); StopStreamingSession(); - SendWifiStatus(54, 3000, 5); StartStreamingSession(); + SendWifiStatus(54, 3000, 5); StopStreamingSession(); std::vector<int32_t> bundle_sizes({2500, kRetentionBytes}); std::vector<SessionMonitor::EventsAndStats> bundles = diff --git a/chromium/components/mirroring/service/session_unittest.cc b/chromium/components/mirroring/service/session_unittest.cc index 973c0daed8a..7443fb67949 100644 --- a/chromium/components/mirroring/service/session_unittest.cc +++ b/chromium/components/mirroring/service/session_unittest.cc @@ -10,7 +10,7 @@ #include "base/macros.h" #include "base/run_loop.h" #include "base/test/scoped_task_environment.h" -#include "base/test/simple_test_tick_clock.h" +#include "base/time/time.h" #include "base/values.h" #include "components/mirroring/service/fake_network_service.h" #include "components/mirroring/service/fake_video_capture_host.h" @@ -27,35 +27,68 @@ using ::testing::InvokeWithoutArgs; using ::testing::_; +using ::testing::AtLeast; +using ::testing::Mock; using media::cast::FrameSenderConfig; using media::cast::Packet; +using media::mojom::RemotingStopReason; +using media::mojom::RemotingStartFailReason; +using media::mojom::RemotingSinkMetadata; +using media::mojom::RemotingSinkMetadataPtr; namespace mirroring { +namespace { + const int kSessionId = 5; +class MockRemotingSource final : public media::mojom::RemotingSource { + public: + MockRemotingSource() : binding_(this) {} + ~MockRemotingSource() override {} + + void Bind(media::mojom::RemotingSourceRequest request) { + binding_.Bind(std::move(request)); + } + + MOCK_METHOD0(OnSinkGone, void()); + MOCK_METHOD0(OnStarted, void()); + MOCK_METHOD1(OnStartFailed, void(RemotingStartFailReason)); + MOCK_METHOD1(OnMessageFromSink, void(const std::vector<uint8_t>&)); + MOCK_METHOD1(OnStopped, void(RemotingStopReason)); + MOCK_METHOD1(OnSinkAvailable, void(const RemotingSinkMetadata&)); + void OnSinkAvailable(RemotingSinkMetadataPtr metadata) override { + OnSinkAvailable(*metadata); + } + + private: + mojo::Binding<media::mojom::RemotingSource> binding_; +}; + +} // namespace + class SessionTest : public ResourceProvider, public SessionObserver, public CastMessageChannel, public ::testing::Test { public: - SessionTest() : receiver_endpoint_(media::cast::test::GetFreeLocalPort()) { - testing_clock_.Advance(base::TimeTicks::Now() - base::TimeTicks()); - } + SessionTest() : receiver_endpoint_(media::cast::test::GetFreeLocalPort()) {} ~SessionTest() override { scoped_task_environment_.RunUntilIdle(); } + protected: // SessionObserver implemenation. MOCK_METHOD1(OnError, void(SessionError)); MOCK_METHOD0(DidStart, void()); MOCK_METHOD0(DidStop, void()); - // ResourceProvider implemenation. MOCK_METHOD0(OnGetVideoCaptureHost, void()); MOCK_METHOD0(OnGetNetworkContext, void()); + MOCK_METHOD0(OnCreateAudioStream, void()); + MOCK_METHOD0(OnConnectToRemotingSource, void()); - // Called when sends OFFER message. - MOCK_METHOD0(OnOffer, void()); + // Called when sends an outbound message. + MOCK_METHOD1(OnOutboundMessage, void(const std::string& message_type)); // CastMessageHandler implementation. For outbound messages. void Send(const CastMessage& message) { @@ -68,10 +101,13 @@ class SessionTest : public ResourceProvider, EXPECT_TRUE(GetString(*value, "type", &message_type)); if (message_type == "OFFER") { EXPECT_TRUE(GetInt(*value, "seqNum", &offer_sequence_number_)); - OnOffer(); + } else if (message_type == "GET_CAPABILITIES") { + EXPECT_TRUE(GetInt(*value, "seqNum", &capability_sequence_number_)); } + OnOutboundMessage(message_type); } + // ResourceProvider implemenation. void GetVideoCaptureHost( media::mojom::VideoCaptureHostRequest request) override { video_host_ = std::make_unique<FakeVideoCaptureHost>(std::move(request)); @@ -84,18 +120,50 @@ class SessionTest : public ResourceProvider, OnGetNetworkContext(); } + void CreateAudioStream(AudioStreamCreatorClient* client, + const media::AudioParameters& params, + uint32_t total_segments) { + OnCreateAudioStream(); + } + void SendAnswer() { - FrameSenderConfig config = MirrorSettings::GetDefaultVideoConfig( - media::cast::RtpPayloadType::VIDEO_VP8, - media::cast::Codec::CODEC_VIDEO_VP8); + ASSERT_TRUE(session_); + std::vector<FrameSenderConfig> audio_configs; std::vector<FrameSenderConfig> video_configs; - video_configs.emplace_back(config); + if (sink_capability_ != DeviceCapability::VIDEO_ONLY) { + if (cast_mode_ == "remoting") { + audio_configs.emplace_back(MirrorSettings::GetDefaultAudioConfig( + media::cast::RtpPayloadType::REMOTE_AUDIO, + media::cast::Codec::CODEC_AUDIO_REMOTE)); + } else { + EXPECT_EQ("mirroring", cast_mode_); + audio_configs.emplace_back(MirrorSettings::GetDefaultAudioConfig( + media::cast::RtpPayloadType::AUDIO_OPUS, + media::cast::Codec::CODEC_AUDIO_OPUS)); + } + } + if (sink_capability_ != DeviceCapability::AUDIO_ONLY) { + if (cast_mode_ == "remoting") { + video_configs.emplace_back(MirrorSettings::GetDefaultVideoConfig( + media::cast::RtpPayloadType::REMOTE_VIDEO, + media::cast::Codec::CODEC_VIDEO_REMOTE)); + } else { + EXPECT_EQ("mirroring", cast_mode_); + video_configs.emplace_back(MirrorSettings::GetDefaultVideoConfig( + media::cast::RtpPayloadType::VIDEO_VP8, + media::cast::Codec::CODEC_VIDEO_VP8)); + } + } auto answer = std::make_unique<Answer>(); answer->udp_port = receiver_endpoint_.port(); - answer->send_indexes.push_back(0); - answer->ssrcs.push_back(32); - answer->cast_mode = "mirroring"; + answer->cast_mode = cast_mode_; + answer->supports_get_status = true; + const int number_of_configs = audio_configs.size() + video_configs.size(); + for (int i = 0; i < number_of_configs; ++i) { + answer->send_indexes.push_back(i); + answer->ssrcs.push_back(31 + i); // Arbitrary receiver SSRCs. + } ReceiverResponse response; response.result = "ok"; @@ -103,94 +171,222 @@ class SessionTest : public ResourceProvider, response.sequence_number = offer_sequence_number_; response.answer = std::move(answer); - session_->OnAnswer("mirroring", std::vector<FrameSenderConfig>(), - video_configs, response); + session_->OnAnswer(audio_configs, video_configs, response); + scoped_task_environment_.RunUntilIdle(); } - protected: - void CreateSession() { + void ConnectToRemotingSource( + media::mojom::RemoterPtr remoter, + media::mojom::RemotingSourceRequest request) override { + remoter_ = std::move(remoter); + remoting_source_.Bind(std::move(request)); + OnConnectToRemotingSource(); + } + + // Create a mirroring session. Expect to send OFFER message. + void CreateSession(DeviceCapability sink_capability) { + sink_capability_ = sink_capability; CastSinkInfo sink_info; sink_info.ip_address = receiver_endpoint_.address(); - sink_info.capability = DeviceCapability::AUDIO_AND_VIDEO; - // Expect to receive OFFER message when session is created. - base::RunLoop run_loop; + sink_info.capability = sink_capability_; + sink_info.model_name = "Chromecast"; + cast_mode_ = "mirroring"; + // Expect to send OFFER message when session is created. EXPECT_CALL(*this, OnGetNetworkContext()).Times(1); EXPECT_CALL(*this, OnError(_)).Times(0); - EXPECT_CALL(*this, OnOffer()) - .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + EXPECT_CALL(*this, OnOutboundMessage("OFFER")).Times(1); session_ = std::make_unique<Session>( kSessionId, sink_info, gfx::Size(1920, 1080), this, this, this); - run_loop.Run(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); } - base::test::ScopedTaskEnvironment scoped_task_environment_; - const net::IPEndPoint receiver_endpoint_; - base::SimpleTestTickClock testing_clock_; - - std::unique_ptr<Session> session_; - std::unique_ptr<FakeVideoCaptureHost> video_host_; - std::unique_ptr<MockNetworkContext> network_context_; - - int32_t offer_sequence_number_ = -1; - - private: - DISALLOW_COPY_AND_ASSIGN(SessionTest); -}; - -TEST_F(SessionTest, Mirroring) { - CreateSession(); - scoped_task_environment_.RunUntilIdle(); - { + // Starts the mirroring session. + void StartSession() { + ASSERT_TRUE(cast_mode_ == "mirroring"); // Except mirroing session starts after receiving ANSWER message. - base::RunLoop run_loop; - EXPECT_CALL(*this, OnGetVideoCaptureHost()).Times(1); + const int num_to_get_video_host = + sink_capability_ == DeviceCapability::AUDIO_ONLY ? 0 : 1; + const int num_to_create_audio_stream = + sink_capability_ == DeviceCapability::VIDEO_ONLY ? 0 : 1; + EXPECT_CALL(*this, OnGetVideoCaptureHost()).Times(num_to_get_video_host); + EXPECT_CALL(*this, OnCreateAudioStream()).Times(num_to_create_audio_stream); EXPECT_CALL(*this, OnError(_)).Times(0); - EXPECT_CALL(*this, DidStart()) - .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + EXPECT_CALL(*this, OnOutboundMessage("GET_STATUS")).Times(AtLeast(1)); + EXPECT_CALL(*this, OnOutboundMessage("GET_CAPABILITIES")).Times(1); + EXPECT_CALL(*this, DidStart()).Times(1); SendAnswer(); - run_loop.Run(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); } - scoped_task_environment_.RunUntilIdle(); - { - base::RunLoop run_loop; + + void StopSession() { + if (video_host_) + EXPECT_CALL(*video_host_, OnStopped()).Times(1); + EXPECT_CALL(*this, DidStop()).Times(1); + session_.reset(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + } + + void CaptureOneVideoFrame() { + ASSERT_TRUE(cast_mode_ == "mirroring"); + ASSERT_TRUE(video_host_); // Expect to send out some UDP packets. - EXPECT_CALL(*network_context_->udp_socket(), OnSend()) - .Times(testing::AtLeast(1)); - EXPECT_CALL(*video_host_, ReleaseBuffer(_, _, _)) - .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); + EXPECT_CALL(*network_context_->udp_socket(), OnSend()).Times(AtLeast(1)); + EXPECT_CALL(*video_host_, ReleaseBuffer(_, _, _)).Times(1); // Send one video frame to the consumer. - video_host_->SendOneFrame(gfx::Size(64, 32), testing_clock_.NowTicks()); - run_loop.Run(); + video_host_->SendOneFrame(gfx::Size(64, 32), base::TimeTicks::Now()); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(network_context_.get()); + Mock::VerifyAndClear(video_host_.get()); + } + + void SignalAnswerTimeout() { + if (cast_mode_ == "mirroring") { + EXPECT_CALL(*this, DidStop()).Times(1); + EXPECT_CALL(*this, OnError(ANSWER_TIME_OUT)).Times(1); + } else { + EXPECT_CALL(*this, DidStop()).Times(0); + EXPECT_CALL(*this, OnError(ANSWER_TIME_OUT)).Times(0); + // Expect to send OFFER message to fallback on mirroring. + EXPECT_CALL(*this, OnOutboundMessage("OFFER")).Times(1); + // The start of remoting is expected to fail. + EXPECT_CALL(remoting_source_, + OnStartFailed(RemotingStartFailReason::SERVICE_NOT_CONNECTED)) + .Times(1); + EXPECT_CALL(remoting_source_, OnSinkGone()).Times(AtLeast(1)); + } + session_->OnAnswer(std::vector<FrameSenderConfig>(), + std::vector<FrameSenderConfig>(), ReceiverResponse()); + scoped_task_environment_.RunUntilIdle(); + cast_mode_ = "mirroring"; + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); + } + + void SendRemotingCapabilities() { + EXPECT_CALL(*this, OnConnectToRemotingSource()).Times(1); + EXPECT_CALL(remoting_source_, OnSinkAvailable(_)).Times(1); + ReceiverResponse response; + response.result = "ok"; + response.type = ResponseType::CAPABILITIES_RESPONSE; + response.sequence_number = capability_sequence_number_; + response.capabilities = std::make_unique<ReceiverCapability>(); + response.capabilities->media_caps = + std::vector<std::string>({"video", "audio", "vp8", "opus"}); + session_->OnCapabilitiesResponse(response); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); } - scoped_task_environment_.RunUntilIdle(); - // Stop the session. - { + void StartRemoting() { base::RunLoop run_loop; - EXPECT_CALL(*video_host_, OnStopped()).Times(1); - EXPECT_CALL(*this, DidStop()) + ASSERT_TRUE(remoter_.is_bound()); + // GET_CAPABILITIES is only sent once at the start of mirroring. + EXPECT_CALL(*this, OnOutboundMessage("GET_CAPABILITIES")).Times(0); + EXPECT_CALL(*this, OnOutboundMessage("OFFER")) .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); - session_.reset(); + remoter_->Start(); run_loop.Run(); + scoped_task_environment_.RunUntilIdle(); + cast_mode_ = "remoting"; + Mock::VerifyAndClear(this); } - scoped_task_environment_.RunUntilIdle(); + + void RemotingStarted() { + ASSERT_TRUE(cast_mode_ == "remoting"); + EXPECT_CALL(remoting_source_, OnStarted()).Times(1); + SendAnswer(); + scoped_task_environment_.RunUntilIdle(); + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); + } + + void StopRemoting() { + ASSERT_TRUE(cast_mode_ == "remoting"); + const RemotingStopReason reason = RemotingStopReason::LOCAL_PLAYBACK; + // Expect to send OFFER message to fallback on mirroring. + EXPECT_CALL(*this, OnOutboundMessage("OFFER")).Times(1); + EXPECT_CALL(remoting_source_, OnStopped(reason)).Times(1); + remoter_->Stop(reason); + scoped_task_environment_.RunUntilIdle(); + cast_mode_ = "mirroring"; + Mock::VerifyAndClear(this); + Mock::VerifyAndClear(&remoting_source_); + } + + private: + base::test::ScopedTaskEnvironment scoped_task_environment_; + const net::IPEndPoint receiver_endpoint_; + DeviceCapability sink_capability_ = DeviceCapability::AUDIO_AND_VIDEO; + media::mojom::RemoterPtr remoter_; + MockRemotingSource remoting_source_; + std::string cast_mode_; + int32_t offer_sequence_number_ = -1; + int32_t capability_sequence_number_ = -1; + + std::unique_ptr<Session> session_; + std::unique_ptr<FakeVideoCaptureHost> video_host_; + std::unique_ptr<MockNetworkContext> network_context_; + + DISALLOW_COPY_AND_ASSIGN(SessionTest); +}; + +TEST_F(SessionTest, AudioOnlyMirroring) { + CreateSession(DeviceCapability::AUDIO_ONLY); + StartSession(); + StopSession(); +} + +TEST_F(SessionTest, VideoOnlyMirroring) { + CreateSession(DeviceCapability::VIDEO_ONLY); + StartSession(); + CaptureOneVideoFrame(); + StopSession(); +} + +TEST_F(SessionTest, AudioAndVideoMirroring) { + CreateSession(DeviceCapability::AUDIO_AND_VIDEO); + StartSession(); + StopSession(); } TEST_F(SessionTest, AnswerTimeout) { - CreateSession(); - scoped_task_environment_.RunUntilIdle(); - { - // Expect error. - base::RunLoop run_loop; - EXPECT_CALL(*this, OnGetVideoCaptureHost()).Times(0); - EXPECT_CALL(*this, DidStop()).Times(1); - EXPECT_CALL(*this, OnError(ANSWER_TIME_OUT)) - .WillOnce(InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit)); - session_->OnAnswer("mirroring", std::vector<FrameSenderConfig>(), - std::vector<FrameSenderConfig>(), ReceiverResponse()); - run_loop.Run(); - } - scoped_task_environment_.RunUntilIdle(); + CreateSession(DeviceCapability::AUDIO_AND_VIDEO); + SignalAnswerTimeout(); +} + +TEST_F(SessionTest, SwitchToAndFromRemoting) { + CreateSession(DeviceCapability::AUDIO_AND_VIDEO); + StartSession(); + SendRemotingCapabilities(); + StartRemoting(); + RemotingStarted(); + StopRemoting(); + StopSession(); +} + +TEST_F(SessionTest, StopSessionWhileRemoting) { + CreateSession(DeviceCapability::AUDIO_AND_VIDEO); + StartSession(); + SendRemotingCapabilities(); + StartRemoting(); + RemotingStarted(); + StopSession(); +} + +TEST_F(SessionTest, StartRemotingFailed) { + CreateSession(DeviceCapability::AUDIO_AND_VIDEO); + StartSession(); + SendRemotingCapabilities(); + StartRemoting(); + SignalAnswerTimeout(); + // Resume mirroring. + SendAnswer(); + CaptureOneVideoFrame(); + StopSession(); } } // namespace mirroring |