+// Copyright 2018 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "components/mirroring/service/session_monitor.h"
+#include <string>
+#include <vector>
+#include "base/json/json_reader.h"
+#include "base/json/json_writer.h"
+#include "base/stl_util.h"
+#include "components/mirroring/service/value_util.h"
+#include "components/mirroring/service/wifi_status_monitor.h"
+#include "components/version_info/version_info.h"
+#include "media/cast/cast_environment.h"
+#include "media/cast/logging/log_serializer.h"
+#include "media/cast/logging/logging_defines.h"
+#include "media/cast/logging/proto/raw_events.pb.h"
+#include "media/cast/logging/raw_event_subscriber_bundle.h"
+#include "net/traffic_annotation/network_traffic_annotation.h"
+#include "services/network/public/cpp/resource_request.h"
+#include "services/network/public/cpp/simple_url_loader.h"
+namespace mirroring {
+namespace {
+// Interval between snapshots of Cast Streaming events/stats.
+constexpr base::TimeDelta kSnapshotInterval =
+ base::TimeDelta::FromMinutes(15); // Typical: 15 min → ~3 MB
+// The maximum number of bytes for receiver's setup info. 256kb should be more
+// than sufficient.
+constexpr int kMaxSetupResponseSizeBytes = 262144;
+// Returns the number of milliseconds elapsed since epoch.
+int32_t ToEpochTime(const base::Time& time) {
+ return (time - base::Time::UnixEpoch()).InMilliseconds();
+// Helper to parse the response for receiver setup info and update the tags.
+bool AddReceiverSetupInfoTags(const std::string& response, base::Value* tags) {
+ DCHECK(tags);
+ std::unique_ptr<base::Value> value = base::JSONReader::Read(response);
+ std::string build_version;
+ bool is_connected = false;
+ bool is_on_ethernet = false;
+ bool has_update = false;
+ int32_t uptime_seconds = 0;
+ const bool result =
+ value && value->is_dict() &&
+ GetString(*value, "cast_build_revision", &build_version) &&
+ GetBool(*value, "connected", &is_connected) &&
+ GetBool(*value, "ethernet_connected", &is_on_ethernet) &&
+ GetBool(*value, "has_update", &has_update) &&
+ GetInt(*value, "uptime", &uptime_seconds);
+ if (result) {
+ tags->SetKey("receiverVersion", base::Value(build_version));
+ tags->SetKey("receiverConnected", base::Value(is_connected));
+ tags->SetKey("receiverOnEthernet", base::Value(is_on_ethernet));
+ tags->SetKey("receiverHasUpdatePending", base::Value(has_update));
+ tags->SetKey("receiverUptimeSeconds", base::Value(uptime_seconds));
+ }
+ return result;
+const char* ToErrorMessage(SessionError error) {
+ switch (error) {
+ return "ANSWER response time out";
+ return "Received an error ANSWER response";
+ return "Unexpected cast mode in ANSWER response.";
+ return "sendIndexes.length != ssrcs.length in ANSWER";
+ return "Receiver selected audio RTP stream twice in ANSWER";
+ return "Receiver selected video RTP stream twice in ANSWER";
+ return "Invalid indexes selected in ANSWER";
+ return "Incorrect ANSWER message: No audio or Video.";
+ return "Audio capture error";
+ return "Video capture error";
+ return "RTP stream error";
+ return "Encoding status error";
+ return "Transport error";
+ }
+ return "";
+} // namespace
+ int max_retention_bytes,
+ const net::IPAddress& receiver_address,
+ base::Value session_tags,
+ network::mojom::URLLoaderFactoryPtr loader_factory,
+ std::unique_ptr<WifiStatusMonitor> wifi_status_monitor)
+ : max_retention_bytes_(max_retention_bytes),
+ receiver_address_(receiver_address),
+ session_tags_(std::move(session_tags)),
+ url_loader_factory_(std::move(loader_factory)),
+ wifi_status_monitor_(std::move(wifi_status_monitor)),
+ stored_snapshots_bytes_(0),
+ weak_factory_(this) {}
+SessionMonitor::~SessionMonitor() {}
+void SessionMonitor::StartStreamingSession(
+ scoped_refptr<media::cast::CastEnvironment> cast_environment,
+ SessionType session_type,
+ bool is_remoting) {
+ DCHECK(!event_subscribers_);
+ DCHECK(!snapshot_timer_.IsRunning());
+ std::string session_activity =
+ session_type == AUDIO_AND_VIDEO
+ ? "audio+video"
+ : session_type == AUDIO_ONLY ? "audio-only" : "video-only";
+ session_activity += is_remoting ? " remoting" : " streaming";
+ session_tags_.SetKey("activity", base::Value(session_activity));
+ // Query the receiver setup info at the beginning of each streaming session.
+ QueryReceiverSetupInfo();
+ // Start collecting Cast Streaming events/stats.
+ event_subscribers_ = std::make_unique<media::cast::RawEventSubscriberBundle>(
+ std::move(cast_environment));
+ if (session_type != VIDEO_ONLY)
+ event_subscribers_->AddEventSubscribers(true /* is_audio */);
+ if (session_type != AUDIO_ONLY)
+ event_subscribers_->AddEventSubscribers(false /* is_audio */);
+ // Start periodically snapshotting Cast Streaming events/stats.
+ snapshot_timer_.Start(FROM_HERE, kSnapshotInterval,
+ base::BindRepeating(&SessionMonitor::TakeSnapshot,
+ base::Unretained(this)));
+ start_time_ = base::Time::Now();
+void SessionMonitor::StopStreamingSession() {
+ if (snapshot_timer_.IsRunning()) {
+ snapshot_timer_.Stop();
+ TakeSnapshot(); // Final snapshot of this streaming session.
+ }
+ event_subscribers_.reset();
+void SessionMonitor::OnStreamingError(SessionError error) {
+ DVLOG(2) << error;
+ if (!snapshot_timer_.IsRunning())
+ return; // Ignore errors before streaming starts.
+ // If the error has already been recorded, do not overwrite it with another
+ // since the first will usually be the most indicative of the problem.
+ if (error_.has_value())
+ return;
+ error_time_ = base::Time::Now();
+ error_.emplace(error);
+ const std::vector<int32_t>& bundle_sizes) {
+ std::vector<EventsAndStats> bundles;
+ // If a streaming session is currently active, take a snapshot now so that all
+ // data collected since the last automatic periodic snapshot is included in
+ // the bundle.
+ if (snapshot_timer_.IsRunning()) {
+ TakeSnapshot();
+ snapshot_timer_.Reset();
+ }
+ for (int32_t max_bytes : bundle_sizes)
+ bundles.emplace_back(MakeSliceOfSnapshots(max_bytes));
+ snapshots_.clear();
+ stored_snapshots_bytes_ = 0;
+ return bundles;
+SessionMonitor::EventsAndStats SessionMonitor::MakeSliceOfSnapshots(
+ int32_t max_bytes) {
+ // Immediately subtract two bytes for array brackets ("[]") since
+ // AssembleSnapshotsAndClear() will produce a JSON array of each snapshot's
+ // stats JSON.
+ max_bytes -= 2;
+ base::circular_deque<EventsAndStats> slice;
+ for (int i = snapshots_.size() - 1; i >= 0; --i) {
+ max_bytes -= snapshots_[i].second.length() + 1 /* size of the comma */;
+ // If insufficient bytes remain to retain the current stats JSON, stop
+ // adding more Snapshots to the slice.
+ if (max_bytes < 0)
+ break;
+ slice.emplace_front(std::make_pair("", snapshots_[i].second));
+ // If sufficient bytes remain to include the current events Blob, add it to
+ // the slice.
+ if (!snapshots_[i].first.empty()) {
+ const int32_t events_size = snapshots_[i].first.length();
+ if (max_bytes >= events_size) {
+ slice[0].first = snapshots_[i].first;
+ max_bytes -= events_size;
+ }
+ }
+ }
+ EventsAndStats bundle;
+ if (slice.empty())
+ return bundle;
+ bundle.second = "[";
+ for (size_t i = 0; i < slice.size(); i++) {
+ // To produce a single events gzipped-data Blob, simply concatenate the
+ // individual gzipped-data Blobs. The spec for gzip explicitly allows for
+ // this. :-)
+ bundle.first += slice[i].first;
+ // To produce the JSON stats array, concatenate the mix of string and Blob
+ // objects to produce a single UTF-8 encoded string.
+ if (i > 0)
+ bundle.second += ",";
+ bundle.second += slice[i].second;
+ }
+ bundle.second += "]";
+ return bundle;
+void SessionMonitor::TakeSnapshot() {
+ // Session-level tags.
+ base::Value tags = session_tags_.Clone();
+ // Add snapshot-level tags.
+ tags.SetKey("startTime", base::Value(ToEpochTime(start_time_)));
+ const base::Time end_time = base::Time::Now();
+ tags.SetKey("endTime", base::Value(ToEpochTime(end_time)));
+ start_time_ = end_time;
+ if (wifi_status_monitor_) {
+ const std::vector<WifiStatus> wifi_status =
+ wifi_status_monitor_->GetRecentValues();
+ base::Value::ListStorage wifi_status_list;
+ for (const auto& status : wifi_status) {
+ base::Value status_value(base::Value::Type::DICTIONARY);
+ status_value.SetKey("wifiSnr", base::Value(status.snr));
+ status_value.SetKey("wifiSpeed", base::Value(status.speed));
+ status_value.SetKey("timestamp",
+ base::Value(ToEpochTime(status.timestamp)));
+ wifi_status_list.emplace_back(std::move(status_value));
+ }
+ tags.SetKey("receiverWifiStatus", base::Value(wifi_status_list));
+ }
+ // Streaming error tags (if any).
+ if (error_.has_value()) {
+ tags.SetKey("streamingErrorTime", base::Value(ToEpochTime(error_time_)));
+ tags.SetKey("streamingErrorMessage",
+ base::Value(ToErrorMessage(error_.value())));
+ error_.reset();
+ }
+ std::string tags_string;
+ base::JSONWriter::Write(tags, &tags_string);
+ // Collect raw events.
+ std::string events = GetEventLogsAndReset(true, tags_string) +
+ GetEventLogsAndReset(false, tags_string);
+ // Collect stats.
+ std::unique_ptr<base::DictionaryValue> audio_stats =
+ base::DictionaryValue::From(GetStatsAndReset(true));
+ std::unique_ptr<base::DictionaryValue> video_stats =
+ base::DictionaryValue::From(GetStatsAndReset(false));
+ base::DictionaryValue stats;
+ if (audio_stats)
+ stats.MergeDictionary(audio_stats.get());
+ if (video_stats)
+ stats.MergeDictionary(video_stats.get());
+ stats.SetKey("tags", std::move(tags));
+ std::string stats_string;
+ base::JSONWriter::Write(stats, &stats_string);
+ int snapshots_bytes =
+ stored_snapshots_bytes_ + events.size() + stats_string.size();
+ // Prune |snapshots_| if necessary.
+ while (snapshots_bytes > max_retention_bytes_) {
+ snapshots_bytes -= snapshots_[0].first.size();
+ snapshots_[0].first = std::string();
+ if (snapshots_bytes <= max_retention_bytes_)
+ break;
+ snapshots_bytes -= snapshots_[0].second.size();
+ snapshots_.pop_front();
+ }
+ snapshots_.emplace_back(std::make_pair(events, stats_string));
+ stored_snapshots_bytes_ = snapshots_bytes;
+std::string SessionMonitor::GetEventLogsAndReset(
+ bool is_audio,
+ const std::string& extra_data) {
+ std::string result;
+ if (!event_subscribers_.get())
+ return result;
+ media::cast::EncodingEventSubscriber* subscriber =
+ event_subscribers_->GetEncodingEventSubscriber(is_audio);
+ if (!subscriber)
+ return result;
+ media::cast::proto::LogMetadata metadata;
+ media::cast::FrameEventList frame_events;
+ media::cast::PacketEventList packet_events;
+ subscriber->GetEventsAndReset(&metadata, &frame_events, &packet_events);
+ if (!extra_data.empty())
+ metadata.set_extra_data(extra_data);
+ media::cast::proto::GeneralDescription* gen_desc =
+ metadata.mutable_general_description();
+ gen_desc->set_product(version_info::GetProductName());
+ gen_desc->set_product_version(version_info::GetVersionNumber());
+ gen_desc->set_os(version_info::GetOSType());
+ result.resize(media::cast::kMaxSerializedBytes);
+ int output_bytes;
+ // TODO(xjz): media::cast::SerializeEvents() shouldn't require the caller to
+ // pre-allocate the memory. It should return a string result.
+ if (media::cast::SerializeEvents(metadata, frame_events, packet_events,
+ true /* compress */,
+ media::cast::kMaxSerializedBytes,
+ base::data(result), &output_bytes)) {
+ result.resize(output_bytes);
+ } else {
+ result.clear();
+ }
+ return result;
+std::unique_ptr<base::Value> SessionMonitor::GetStatsAndReset(bool is_audio) {
+ if (!event_subscribers_.get())
+ return nullptr;
+ media::cast::StatsEventSubscriber* subscriber =
+ event_subscribers_->GetStatsEventSubscriber(is_audio);
+ if (!subscriber)
+ return nullptr;
+ std::unique_ptr<base::Value> stats = subscriber->GetStats();
+ subscriber->Reset();
+ return stats;
+void SessionMonitor::QueryReceiverSetupInfo() {
+ auto resource_request = std::make_unique<network::ResourceRequest>();
+ resource_request->method = "GET";
+ resource_request->url = GURL("http://" + receiver_address_.ToString() +
+ ":8008/setup/eureka_info");
+ net::NetworkTrafficAnnotationTag traffic_annotation =
+ net::DefineNetworkTrafficAnnotation("mirroring_get_setup_info", R"(
+ semantics {
+ sender: "Mirroring Service"
+ description:
+ "Mirroring Service sends a request to the receiver to obtain its "
+ "setup info such as the build version, the model name, etc. The "
+ "data is included in mirroring feedback for analysis."
+ trigger:
+ "A tab/desktop mirroring session starts."
+ data: "An HTTP GET request."
+ destination: OTHER
+ destination_other:
+ "A mirroring receiver, such as a ChromeCast device."
+ }
+ policy {
+ cookies_allowed: NO
+ setting: "This feature cannot be disabled in settings."
+ chrome_policy {
+ EnableMediaRouter {
+ EnableMediaRouter: false
+ }
+ }
+ })");
+ std::unique_ptr<network::SimpleURLLoader> url_loader =
+ network::SimpleURLLoader::Create(std::move(resource_request),
+ traffic_annotation);
+ network::SimpleURLLoader* url_loader_ptr = url_loader.get();
+ url_loader_ptr->DownloadToString(
+ url_loader_factory_.get(),
+ base::BindOnce(
+ [](base::WeakPtr<SessionMonitor> monitor,
+ std::unique_ptr<network::SimpleURLLoader> url_loader,
+ std::unique_ptr<std::string> response) {
+ if (monitor) {
+ if (url_loader->NetError() != net::OK ||
+ !AddReceiverSetupInfoTags(*response, &monitor->session_tags_))
+ VLOG(2) << "Unable to fetch/parse receiver setup info.";
+ }
+ },
+ weak_factory_.GetWeakPtr(), std::move(url_loader)),
+ kMaxSetupResponseSizeBytes);
+} // namespace mirroring