diff options
Diffstat (limited to 'chromium/components/mirroring/service/session_monitor.cc')
-rw-r--r-- | chromium/components/mirroring/service/session_monitor.cc | 412 |
1 files changed, 412 insertions, 0 deletions
diff --git a/chromium/components/mirroring/service/session_monitor.cc b/chromium/components/mirroring/service/session_monitor.cc new file mode 100644 index 00000000000..94e6d77438a --- /dev/null +++ b/chromium/components/mirroring/service/session_monitor.cc @@ -0,0 +1,412 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/mirroring/service/session_monitor.h" + +#include <string> +#include <vector> + +#include "base/json/json_reader.h" +#include "base/json/json_writer.h" +#include "base/stl_util.h" +#include "components/mirroring/service/value_util.h" +#include "components/mirroring/service/wifi_status_monitor.h" +#include "components/version_info/version_info.h" +#include "media/cast/cast_environment.h" +#include "media/cast/logging/log_serializer.h" +#include "media/cast/logging/logging_defines.h" +#include "media/cast/logging/proto/raw_events.pb.h" +#include "media/cast/logging/raw_event_subscriber_bundle.h" +#include "net/traffic_annotation/network_traffic_annotation.h" +#include "services/network/public/cpp/resource_request.h" +#include "services/network/public/cpp/simple_url_loader.h" + +namespace mirroring { + +namespace { + +// Interval between snapshots of Cast Streaming events/stats. +constexpr base::TimeDelta kSnapshotInterval = + base::TimeDelta::FromMinutes(15); // Typical: 15 min → ~3 MB + +// The maximum number of bytes for receiver's setup info. 256kb should be more +// than sufficient. +constexpr int kMaxSetupResponseSizeBytes = 262144; + +// Returns the number of milliseconds elapsed since epoch. +int32_t ToEpochTime(const base::Time& time) { + return (time - base::Time::UnixEpoch()).InMilliseconds(); +} + +// Helper to parse the response for receiver setup info and update the tags. +bool AddReceiverSetupInfoTags(const std::string& response, base::Value* tags) { + DCHECK(tags); + std::unique_ptr<base::Value> value = base::JSONReader::Read(response); + + std::string build_version; + bool is_connected = false; + bool is_on_ethernet = false; + bool has_update = false; + int32_t uptime_seconds = 0; + + const bool result = + value && value->is_dict() && + GetString(*value, "cast_build_revision", &build_version) && + GetBool(*value, "connected", &is_connected) && + GetBool(*value, "ethernet_connected", &is_on_ethernet) && + GetBool(*value, "has_update", &has_update) && + GetInt(*value, "uptime", &uptime_seconds); + if (result) { + tags->SetKey("receiverVersion", base::Value(build_version)); + tags->SetKey("receiverConnected", base::Value(is_connected)); + tags->SetKey("receiverOnEthernet", base::Value(is_on_ethernet)); + tags->SetKey("receiverHasUpdatePending", base::Value(has_update)); + tags->SetKey("receiverUptimeSeconds", base::Value(uptime_seconds)); + } + return result; +} + +const char* ToErrorMessage(SessionError error) { + switch (error) { + case ANSWER_TIME_OUT: + return "ANSWER response time out"; + case ANSWER_NOT_OK: + return "Received an error ANSWER response"; + case ANSWER_MISMATCHED_CAST_MODE: + return "Unexpected cast mode in ANSWER response."; + case ANSWER_MISMATCHED_SSRC_LENGTH: + return "sendIndexes.length != ssrcs.length in ANSWER"; + case ANSWER_SELECT_MULTIPLE_AUDIO: + return "Receiver selected audio RTP stream twice in ANSWER"; + case ANSWER_SELECT_MULTIPLE_VIDEO: + return "Receiver selected video RTP stream twice in ANSWER"; + case ANSWER_SELECT_INVALID_INDEX: + return "Invalid indexes selected in ANSWER"; + case ANSWER_NO_AUDIO_OR_VIDEO: + return "Incorrect ANSWER message: No audio or Video."; + case AUDIO_CAPTURE_ERROR: + return "Audio capture error"; + case VIDEO_CAPTURE_ERROR: + return "Video capture error"; + case RTP_STREAM_ERROR: + return "RTP stream error"; + case ENCODING_ERROR: + return "Encoding status error"; + case CAST_TRANSPORT_ERROR: + return "Transport error"; + } + return ""; +} + +} // namespace + +SessionMonitor::SessionMonitor( + int max_retention_bytes, + const net::IPAddress& receiver_address, + base::Value session_tags, + network::mojom::URLLoaderFactoryPtr loader_factory, + std::unique_ptr<WifiStatusMonitor> wifi_status_monitor) + : max_retention_bytes_(max_retention_bytes), + receiver_address_(receiver_address), + session_tags_(std::move(session_tags)), + url_loader_factory_(std::move(loader_factory)), + wifi_status_monitor_(std::move(wifi_status_monitor)), + stored_snapshots_bytes_(0), + weak_factory_(this) {} + +SessionMonitor::~SessionMonitor() {} + +void SessionMonitor::StartStreamingSession( + scoped_refptr<media::cast::CastEnvironment> cast_environment, + SessionType session_type, + bool is_remoting) { + DCHECK(!event_subscribers_); + DCHECK(!snapshot_timer_.IsRunning()); + + std::string session_activity = + session_type == AUDIO_AND_VIDEO + ? "audio+video" + : session_type == AUDIO_ONLY ? "audio-only" : "video-only"; + session_activity += is_remoting ? " remoting" : " streaming"; + session_tags_.SetKey("activity", base::Value(session_activity)); + + // Query the receiver setup info at the beginning of each streaming session. + QueryReceiverSetupInfo(); + + // Start collecting Cast Streaming events/stats. + event_subscribers_ = std::make_unique<media::cast::RawEventSubscriberBundle>( + std::move(cast_environment)); + if (session_type != VIDEO_ONLY) + event_subscribers_->AddEventSubscribers(true /* is_audio */); + if (session_type != AUDIO_ONLY) + event_subscribers_->AddEventSubscribers(false /* is_audio */); + + // Start periodically snapshotting Cast Streaming events/stats. + snapshot_timer_.Start(FROM_HERE, kSnapshotInterval, + base::BindRepeating(&SessionMonitor::TakeSnapshot, + base::Unretained(this))); + + start_time_ = base::Time::Now(); +} + +void SessionMonitor::StopStreamingSession() { + if (snapshot_timer_.IsRunning()) { + snapshot_timer_.Stop(); + TakeSnapshot(); // Final snapshot of this streaming session. + } + event_subscribers_.reset(); +} + +void SessionMonitor::OnStreamingError(SessionError error) { + DVLOG(2) << error; + + if (!snapshot_timer_.IsRunning()) + return; // Ignore errors before streaming starts. + // If the error has already been recorded, do not overwrite it with another + // since the first will usually be the most indicative of the problem. + if (error_.has_value()) + return; + error_time_ = base::Time::Now(); + error_.emplace(error); +} + +std::vector<SessionMonitor::EventsAndStats> +SessionMonitor::AssembleBundlesAndClear( + const std::vector<int32_t>& bundle_sizes) { + std::vector<EventsAndStats> bundles; + // If a streaming session is currently active, take a snapshot now so that all + // data collected since the last automatic periodic snapshot is included in + // the bundle. + if (snapshot_timer_.IsRunning()) { + TakeSnapshot(); + snapshot_timer_.Reset(); + } + + for (int32_t max_bytes : bundle_sizes) + bundles.emplace_back(MakeSliceOfSnapshots(max_bytes)); + snapshots_.clear(); + stored_snapshots_bytes_ = 0; + return bundles; +} + +SessionMonitor::EventsAndStats SessionMonitor::MakeSliceOfSnapshots( + int32_t max_bytes) { + // Immediately subtract two bytes for array brackets ("[]") since + // AssembleSnapshotsAndClear() will produce a JSON array of each snapshot's + // stats JSON. + max_bytes -= 2; + base::circular_deque<EventsAndStats> slice; + for (int i = snapshots_.size() - 1; i >= 0; --i) { + max_bytes -= snapshots_[i].second.length() + 1 /* size of the comma */; + // If insufficient bytes remain to retain the current stats JSON, stop + // adding more Snapshots to the slice. + if (max_bytes < 0) + break; + slice.emplace_front(std::make_pair("", snapshots_[i].second)); + // If sufficient bytes remain to include the current events Blob, add it to + // the slice. + if (!snapshots_[i].first.empty()) { + const int32_t events_size = snapshots_[i].first.length(); + if (max_bytes >= events_size) { + slice[0].first = snapshots_[i].first; + max_bytes -= events_size; + } + } + } + + EventsAndStats bundle; + if (slice.empty()) + return bundle; + + bundle.second = "["; + for (size_t i = 0; i < slice.size(); i++) { + // To produce a single events gzipped-data Blob, simply concatenate the + // individual gzipped-data Blobs. The spec for gzip explicitly allows for + // this. :-) + bundle.first += slice[i].first; + // To produce the JSON stats array, concatenate the mix of string and Blob + // objects to produce a single UTF-8 encoded string. + if (i > 0) + bundle.second += ","; + bundle.second += slice[i].second; + } + bundle.second += "]"; + + return bundle; +} + +void SessionMonitor::TakeSnapshot() { + // Session-level tags. + base::Value tags = session_tags_.Clone(); + + // Add snapshot-level tags. + tags.SetKey("startTime", base::Value(ToEpochTime(start_time_))); + const base::Time end_time = base::Time::Now(); + tags.SetKey("endTime", base::Value(ToEpochTime(end_time))); + start_time_ = end_time; + + if (wifi_status_monitor_) { + const std::vector<WifiStatus> wifi_status = + wifi_status_monitor_->GetRecentValues(); + base::Value::ListStorage wifi_status_list; + for (const auto& status : wifi_status) { + base::Value status_value(base::Value::Type::DICTIONARY); + status_value.SetKey("wifiSnr", base::Value(status.snr)); + status_value.SetKey("wifiSpeed", base::Value(status.speed)); + status_value.SetKey("timestamp", + base::Value(ToEpochTime(status.timestamp))); + wifi_status_list.emplace_back(std::move(status_value)); + } + tags.SetKey("receiverWifiStatus", base::Value(wifi_status_list)); + } + + // Streaming error tags (if any). + if (error_.has_value()) { + tags.SetKey("streamingErrorTime", base::Value(ToEpochTime(error_time_))); + tags.SetKey("streamingErrorMessage", + base::Value(ToErrorMessage(error_.value()))); + error_.reset(); + } + + std::string tags_string; + base::JSONWriter::Write(tags, &tags_string); + + // Collect raw events. + std::string events = GetEventLogsAndReset(true, tags_string) + + GetEventLogsAndReset(false, tags_string); + + // Collect stats. + std::unique_ptr<base::DictionaryValue> audio_stats = + base::DictionaryValue::From(GetStatsAndReset(true)); + std::unique_ptr<base::DictionaryValue> video_stats = + base::DictionaryValue::From(GetStatsAndReset(false)); + base::DictionaryValue stats; + if (audio_stats) + stats.MergeDictionary(audio_stats.get()); + if (video_stats) + stats.MergeDictionary(video_stats.get()); + stats.SetKey("tags", std::move(tags)); + std::string stats_string; + base::JSONWriter::Write(stats, &stats_string); + + int snapshots_bytes = + stored_snapshots_bytes_ + events.size() + stats_string.size(); + // Prune |snapshots_| if necessary. + while (snapshots_bytes > max_retention_bytes_) { + snapshots_bytes -= snapshots_[0].first.size(); + snapshots_[0].first = std::string(); + if (snapshots_bytes <= max_retention_bytes_) + break; + snapshots_bytes -= snapshots_[0].second.size(); + snapshots_.pop_front(); + } + snapshots_.emplace_back(std::make_pair(events, stats_string)); + stored_snapshots_bytes_ = snapshots_bytes; +} + +std::string SessionMonitor::GetEventLogsAndReset( + bool is_audio, + const std::string& extra_data) { + std::string result; + if (!event_subscribers_.get()) + return result; + + media::cast::EncodingEventSubscriber* subscriber = + event_subscribers_->GetEncodingEventSubscriber(is_audio); + if (!subscriber) + return result; + + media::cast::proto::LogMetadata metadata; + media::cast::FrameEventList frame_events; + media::cast::PacketEventList packet_events; + + subscriber->GetEventsAndReset(&metadata, &frame_events, &packet_events); + + if (!extra_data.empty()) + metadata.set_extra_data(extra_data); + media::cast::proto::GeneralDescription* gen_desc = + metadata.mutable_general_description(); + gen_desc->set_product(version_info::GetProductName()); + gen_desc->set_product_version(version_info::GetVersionNumber()); + gen_desc->set_os(version_info::GetOSType()); + + result.resize(media::cast::kMaxSerializedBytes); + int output_bytes; + // TODO(xjz): media::cast::SerializeEvents() shouldn't require the caller to + // pre-allocate the memory. It should return a string result. + if (media::cast::SerializeEvents(metadata, frame_events, packet_events, + true /* compress */, + media::cast::kMaxSerializedBytes, + base::data(result), &output_bytes)) { + result.resize(output_bytes); + } else { + result.clear(); + } + return result; +} + +std::unique_ptr<base::Value> SessionMonitor::GetStatsAndReset(bool is_audio) { + if (!event_subscribers_.get()) + return nullptr; + + media::cast::StatsEventSubscriber* subscriber = + event_subscribers_->GetStatsEventSubscriber(is_audio); + if (!subscriber) + return nullptr; + + std::unique_ptr<base::Value> stats = subscriber->GetStats(); + subscriber->Reset(); + return stats; +} + +void SessionMonitor::QueryReceiverSetupInfo() { + auto resource_request = std::make_unique<network::ResourceRequest>(); + resource_request->method = "GET"; + resource_request->url = GURL("http://" + receiver_address_.ToString() + + ":8008/setup/eureka_info"); + net::NetworkTrafficAnnotationTag traffic_annotation = + net::DefineNetworkTrafficAnnotation("mirroring_get_setup_info", R"( + semantics { + sender: "Mirroring Service" + description: + "Mirroring Service sends a request to the receiver to obtain its " + "setup info such as the build version, the model name, etc. The " + "data is included in mirroring feedback for analysis." + trigger: + "A tab/desktop mirroring session starts." + data: "An HTTP GET request." + destination: OTHER + destination_other: + "A mirroring receiver, such as a ChromeCast device." + } + policy { + cookies_allowed: NO + setting: "This feature cannot be disabled in settings." + chrome_policy { + EnableMediaRouter { + EnableMediaRouter: false + } + } + })"); + std::unique_ptr<network::SimpleURLLoader> url_loader = + network::SimpleURLLoader::Create(std::move(resource_request), + traffic_annotation); + network::SimpleURLLoader* url_loader_ptr = url_loader.get(); + url_loader_ptr->DownloadToString( + url_loader_factory_.get(), + base::BindOnce( + [](base::WeakPtr<SessionMonitor> monitor, + std::unique_ptr<network::SimpleURLLoader> url_loader, + std::unique_ptr<std::string> response) { + if (monitor) { + if (url_loader->NetError() != net::OK || + !AddReceiverSetupInfoTags(*response, &monitor->session_tags_)) + VLOG(2) << "Unable to fetch/parse receiver setup info."; + } + }, + weak_factory_.GetWeakPtr(), std::move(url_loader)), + kMaxSetupResponseSizeBytes); +} + +} // namespace mirroring |