diff options
Diffstat (limited to 'chromium/third_party/nearby/src/cpp/core/internal/mediums/webrtc.cc')
-rw-r--r-- | chromium/third_party/nearby/src/cpp/core/internal/mediums/webrtc.cc | 744 |
1 files changed, 0 insertions, 744 deletions
diff --git a/chromium/third_party/nearby/src/cpp/core/internal/mediums/webrtc.cc b/chromium/third_party/nearby/src/cpp/core/internal/mediums/webrtc.cc deleted file mode 100644 index 1edcf63965a..00000000000 --- a/chromium/third_party/nearby/src/cpp/core/internal/mediums/webrtc.cc +++ /dev/null @@ -1,744 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "core/internal/mediums/webrtc.h" - -#include <functional> -#include <memory> - -#include "absl/functional/bind_front.h" -#include "absl/strings/str_cat.h" -#include "absl/time/time.h" -#include "core/internal/mediums/webrtc/session_description_wrapper.h" -#include "core/internal/mediums/webrtc/signaling_frames.h" -#include "core/internal/mediums/webrtc_socket_wrapper.h" -#include "platform/base/byte_array.h" -#include "platform/base/listeners.h" -#include "platform/public/cancelable_alarm.h" -#include "platform/public/future.h" -#include "platform/public/logging.h" -#include "platform/public/mutex_lock.h" -#include "proto/mediums/web_rtc_signaling_frames.pb.h" -#include "webrtc/api/jsep.h" - -namespace location { -namespace nearby { -namespace connections { -namespace mediums { - -namespace { - -// The maximum amount of time to wait to connect to a data channel via WebRTC. -constexpr absl::Duration kDataChannelTimeout = absl::Seconds(10); - -// Delay between restarting signaling messenger to receive messages. -constexpr absl::Duration kRestartReceiveMessagesDuration = absl::Seconds(60); - -} // namespace - -WebRtc::WebRtc() = default; - -WebRtc::~WebRtc() { - // This ensures that all pending callbacks are run before we reset the medium - // and we are not accepting new runnables. - single_thread_executor_.Shutdown(); - - // Stop accepting all connections - absl::flat_hash_set<std::string> service_ids; - for (auto& item : accepting_connections_info_) { - service_ids.emplace(item.first); - } - for (const auto& service_id : service_ids) { - StopAcceptingConnections(service_id); - } -} - -const std::string WebRtc::GetDefaultCountryCode() { - return medium_.GetDefaultCountryCode(); -} - -bool WebRtc::IsAvailable() { return medium_.IsValid(); } - -bool WebRtc::IsAcceptingConnections(const std::string& service_id) { - MutexLock lock(&mutex_); - return IsAcceptingConnectionsLocked(service_id); -} - -bool WebRtc::IsAcceptingConnectionsLocked(const std::string& service_id) { - return accepting_connections_info_.contains(service_id); -} - -bool WebRtc::StartAcceptingConnections(const std::string& service_id, - const WebrtcPeerId& self_peer_id, - const LocationHint& location_hint, - AcceptedConnectionCallback callback) { - MutexLock lock(&mutex_); - if (!IsAvailable()) { - NEARBY_LOG(WARNING, - "Cannot start accepting WebRTC connections because WebRTC is " - "not available."); - return false; - } - - if (IsAcceptingConnectionsLocked(service_id)) { - NEARBY_LOG(WARNING, - "Cannot start accepting WebRTC connections because service %s " - "is already accepting WebRTC connections.", - service_id.c_str()); - return false; - } - - // We'll track our state here, so that we're separated from the other services - // who may be also using WebRTC. - AcceptingConnectionsInfo info = AcceptingConnectionsInfo(); - info.self_peer_id = self_peer_id; - info.accepted_connection_callback = callback; - - // Create a new SignalingMessenger so that we can communicate w/ Tachyon. - info.signaling_messenger = - medium_.GetSignalingMessenger(self_peer_id.GetId(), location_hint); - if (!info.signaling_messenger->IsValid()) { - return false; - } - - // This registers ourselves w/ Tachyon, creating a room from the PeerId. - // This allows a remote device to message us over Tachyon. - if (!info.signaling_messenger->StartReceivingMessages( - absl::bind_front(&WebRtc::OnSignalingMessage, this, service_id), - absl::bind_front(&WebRtc::OnSignalingComplete, this, service_id))) { - info.signaling_messenger.reset(); - return false; - } - - // We'll automatically disconnect from Tachyon after 60sec. When this alarm - // fires, we'll recreate our room so we continue to receive messages. - info.restart_tachyon_receive_messages_alarm = CancelableAlarm( - "restart_receiving_messages_webrtc", - std::bind(&WebRtc::ProcessRestartTachyonReceiveMessages, this, - service_id), - kRestartReceiveMessagesDuration, &single_thread_executor_); - - // Now that we're set up to receive messages, we'll save our state and return - // a successful result. - accepting_connections_info_.emplace(service_id, std::move(info)); - NEARBY_LOG(INFO, - "Started listening for WebRTC connections as %s on service %s", - self_peer_id.GetId().c_str(), service_id.c_str()); - return true; -} - -void WebRtc::StopAcceptingConnections(const std::string& service_id) { - MutexLock lock(&mutex_); - if (!IsAcceptingConnectionsLocked(service_id)) { - NEARBY_LOG(WARNING, - "Cannot stop accepting WebRTC connections because service %s " - "is not accepting WebRTC connections.", - service_id.c_str()); - return; - } - - // Grab our info from the map. - auto& info = accepting_connections_info_.find(service_id)->second; - - // Stop receiving messages from Tachyon. - info.signaling_messenger->StopReceivingMessages(); - info.signaling_messenger.reset(); - - // Cancel the scheduled alarm. - if (info.restart_tachyon_receive_messages_alarm.IsValid()) { - info.restart_tachyon_receive_messages_alarm.Cancel(); - info.restart_tachyon_receive_messages_alarm = CancelableAlarm(); - } - - // If we had any in-progress connections that haven't materialized into full - // DataChannels yet, it's time to shut them down since they can't reach us - // anymore. - absl::flat_hash_set<std::string> peer_ids; - for (auto& item : connection_flows_) { - peer_ids.emplace(item.first); - } - for (const auto& peer_id : peer_ids) { - const auto& entry = connection_flows_.find(peer_id); - // Skip outgoing connections in this step. Start/StopAcceptingConnections - // only deals with incoming connections. - if (requesting_connections_info_.contains(peer_id)) { - continue; - } - - // Skip fully connected connections in this step. If the connection was - // formed while we were accepting connections, then it will stay alive until - // it's explicitly closed. - if (!entry->second->CloseIfNotConnected()) { - continue; - } - - connection_flows_.erase(peer_id); - } - - // Clean up our state. We're now no longer listening for connections. - accepting_connections_info_.erase(service_id); - NEARBY_LOG(INFO, "Stopped listening for WebRTC connections for service %s", - service_id.c_str()); -} - -WebRtcSocketWrapper WebRtc::Connect(const std::string& service_id, - const WebrtcPeerId& remote_peer_id, - const LocationHint& location_hint, - CancellationFlag* cancellation_flag) { - for (int attempts_count = 0; attempts_count < kConnectAttemptsLimit; - attempts_count++) { - auto wrapper_result = AttemptToConnect(service_id, remote_peer_id, - location_hint, cancellation_flag); - if (wrapper_result.IsValid()) { - return wrapper_result; - } - } - return WebRtcSocketWrapper(); -} - -WebRtcSocketWrapper WebRtc::AttemptToConnect( - const std::string& service_id, const WebrtcPeerId& remote_peer_id, - const LocationHint& location_hint, CancellationFlag* cancellation_flag) { - ConnectionRequestInfo info = ConnectionRequestInfo(); - info.self_peer_id = WebrtcPeerId::FromRandom(); - Future<WebRtcSocketWrapper> socket_future = info.socket_future; - - { - MutexLock lock(&mutex_); - if (!IsAvailable()) { - NEARBY_LOG( - WARNING, - "Cannot connect to WebRTC peer %s because WebRTC is not available.", - remote_peer_id.GetId().c_str()); - return WebRtcSocketWrapper(); - } - - if (cancellation_flag->Cancelled()) { - NEARBY_LOGS(INFO) << "Cannot connect with WebRtc due to cancel."; - return WebRtcSocketWrapper(); - } - - // Create a new ConnectionFlow for this connection attempt. - std::unique_ptr<ConnectionFlow> connection_flow = - CreateConnectionFlow(service_id, remote_peer_id); - if (!connection_flow) { - NEARBY_LOG( - INFO, - "Cannot connect to WebRTC peer %s because we failed to create a " - "ConnectionFlow.", - remote_peer_id.GetId().c_str()); - return WebRtcSocketWrapper(); - } - - // Create a new SignalingMessenger so that we can communicate over Tachyon. - info.signaling_messenger = - medium_.GetSignalingMessenger(info.self_peer_id.GetId(), location_hint); - if (!info.signaling_messenger->IsValid()) { - NEARBY_LOG( - INFO, - "Cannot connect to WebRTC peer %s because we failed to create a " - "SignalingMessenger.", - remote_peer_id.GetId().c_str()); - return WebRtcSocketWrapper(); - } - - // This registers ourselves w/ Tachyon, creating a room from the PeerId. - // This allows a remote device to message us over Tachyon. - auto signaling_complete_callback = [socket_future](bool success) mutable { - if (!success) { - socket_future.SetException({Exception::kFailed}); - } - }; - if (!info.signaling_messenger->StartReceivingMessages( - absl::bind_front(&WebRtc::OnSignalingMessage, this, service_id), - signaling_complete_callback)) { - NEARBY_LOG(INFO, - "Cannot connect to WebRTC peer %s because we failed to start " - "receiving messages over Tachyon.", - remote_peer_id.GetId().c_str()); - info.signaling_messenger.reset(); - return WebRtcSocketWrapper(); - } - - // Poke the remote device. This will cause them to send us an Offer. - if (!info.signaling_messenger->SendMessage( - remote_peer_id.GetId(), - webrtc_frames::EncodeReadyForSignalingPoke(info.self_peer_id))) { - NEARBY_LOG(INFO, - "Cannot connect to WebRTC peer %s because we failed to poke " - "the peer over Tachyon.", - remote_peer_id.GetId().c_str()); - info.signaling_messenger.reset(); - return WebRtcSocketWrapper(); - } - - // Create a new ConnectionRequest entry. This map will be used later to look - // up state as we negotiate the connection over Tachyon. - requesting_connections_info_.emplace(remote_peer_id.GetId(), - std::move(info)); - connection_flows_.emplace(remote_peer_id.GetId(), - std::move(connection_flow)); - } - - // Wait for the connection to go through. Don't hold the mutex here so that - // we're not blocking necessary operations. - ExceptionOr<WebRtcSocketWrapper> socket_result = - socket_future.Get(kDataChannelTimeout); - - { - MutexLock lock(&mutex_); - - // Reclaim our info, since we had released ownership while talking to - // Tachyon. - auto& info = - requesting_connections_info_.find(remote_peer_id.GetId())->second; - - // Verify that the connection went through. - if (!socket_result.ok()) { - NEARBY_LOG(INFO, "Failed to connect to WebRTC peer %s.", - remote_peer_id.GetId().c_str()); - RemoveConnectionFlow(remote_peer_id); - info.signaling_messenger.reset(); - requesting_connections_info_.erase(remote_peer_id.GetId()); - return WebRtcSocketWrapper(); - } - - // Clean up our ConnectionRequest. - info.signaling_messenger.reset(); - requesting_connections_info_.erase(remote_peer_id.GetId()); - - // Return the result. - return socket_result.GetResult(); - } -} - -void WebRtc::ProcessLocalIceCandidate( - const std::string& service_id, const WebrtcPeerId& remote_peer_id, - const ::location::nearby::mediums::IceCandidate ice_candidate) { - MutexLock lock(&mutex_); - - // Check first if we have an outgoing request w/ this peer. As this request is - // tied to a specific peer, it takes precedence. - const auto& connection_request_entry = - requesting_connections_info_.find(remote_peer_id.GetId()); - if (connection_request_entry != requesting_connections_info_.end()) { - // Pass the ice candidate to the remote side. - if (!connection_request_entry->second.signaling_messenger->SendMessage( - remote_peer_id.GetId(), - webrtc_frames::EncodeIceCandidates( - connection_request_entry->second.self_peer_id, - {ice_candidate}))) { - NEARBY_LOG(INFO, "Failed to send ice candidate to %s.", - remote_peer_id.GetId().c_str()); - } - - NEARBY_LOG(INFO, "Sent ice candidate to %s.", - remote_peer_id.GetId().c_str()); - return; - } - - // Check next if we're expecting incoming connection requests. - const auto& accepting_connection_entry = - accepting_connections_info_.find(service_id); - if (accepting_connection_entry != accepting_connections_info_.end()) { - // Pass the ice candidate to the remote side. - // TODO(xlythe) Consider not blocking here, since this can eat into the - // connection time - if (!accepting_connection_entry->second.signaling_messenger->SendMessage( - remote_peer_id.GetId(), - webrtc_frames::EncodeIceCandidates( - accepting_connection_entry->second.self_peer_id, - {ice_candidate}))) { - NEARBY_LOG(INFO, "Failed to send ice candidate to %s.", - remote_peer_id.GetId().c_str()); - } - - NEARBY_LOG(INFO, "Sent ice candidate to %s.", - remote_peer_id.GetId().c_str()); - return; - } - - NEARBY_LOG(INFO, - "Skipping restart listening for tachyon inbox messages since we " - "are not accepting connections for service %s.", - service_id.c_str()); -} - -void WebRtc::OnSignalingMessage(const std::string& service_id, - const ByteArray& message) { - OffloadFromThread("rtc-on-signaling-message", [this, service_id, message]() { - ProcessTachyonInboxMessage(service_id, message); - }); -} - -void WebRtc::OnSignalingComplete(const std::string& service_id, bool success) { - NEARBY_LOG(INFO, "Signaling completed with status: %d.", success); - if (success) { - return; - } - - OffloadFromThread("rtc-on-signaling-complete", [this, service_id]() { - MutexLock lock(&mutex_); - const auto& info_entry = accepting_connections_info_.find(service_id); - if (info_entry == accepting_connections_info_.end()) { - return; - } - - if (info_entry->second.restart_accept_connections_count < - kRestartAcceptConnectionsLimit) { - ++info_entry->second.restart_accept_connections_count; - } else { - return; - } - - RestartTachyonReceiveMessages(service_id); - }); -} - -void WebRtc::ProcessTachyonInboxMessage(const std::string& service_id, - const ByteArray& message) { - MutexLock lock(&mutex_); - - // Attempt to parse the incoming message as a WebRtcSignalingFrame. - location::nearby::mediums::WebRtcSignalingFrame frame; - if (!frame.ParseFromString(std::string(message))) { - NEARBY_LOG(WARNING, "Failed to parse signaling message."); - return; - } - - // Ensure that the frame is valid (no missing fields). - if (!frame.has_sender_id()) { - NEARBY_LOG(WARNING, "Invalid WebRTC frame: Sender ID is missing."); - return; - } - WebrtcPeerId remote_peer_id = WebrtcPeerId(frame.sender_id().id()); - - // Depending on the message type, we'll respond as appropriate. - if (requesting_connections_info_.contains(remote_peer_id.GetId())) { - // This is from a peer we have an outgoing connection request with, so we'll - // only process the Answer path. - if (frame.has_offer()) { - ReceiveOffer(remote_peer_id, - SessionDescriptionWrapper( - webrtc_frames::DecodeOffer(frame).release())); - SendAnswer(remote_peer_id); - } else if (frame.has_ice_candidates()) { - ReceiveIceCandidates(remote_peer_id, - webrtc_frames::DecodeIceCandidates(frame)); - } else { - NEARBY_LOG(INFO, "Received unknown WebRTC frame: ignoring."); - } - } else if (IsAcceptingConnectionsLocked(service_id)) { - // We don't have an outgoing connection request with this peer, but we are - // accepting incoming requests so we'll only process the Offer path. - if (frame.has_ready_for_signaling_poke()) { - SendOffer(service_id, remote_peer_id); - } else if (frame.has_answer()) { - ReceiveAnswer(remote_peer_id, - SessionDescriptionWrapper( - webrtc_frames::DecodeAnswer(frame).release())); - } else if (frame.has_ice_candidates()) { - ReceiveIceCandidates(remote_peer_id, - webrtc_frames::DecodeIceCandidates(frame)); - } else { - NEARBY_LOG(INFO, "Received unknown WebRTC frame: ignoring."); - } - } else { - NEARBY_LOG( - INFO, - "Ignoring Tachyon message since we are not accepting connections."); - } -} - -void WebRtc::SendOffer(const std::string& service_id, - const WebrtcPeerId& remote_peer_id) { - std::unique_ptr<ConnectionFlow> connection_flow = - CreateConnectionFlow(service_id, remote_peer_id); - if (!connection_flow) { - NEARBY_LOG(INFO, - "Unable to send offer. Failed to create a ConnectionFlow."); - return; - } - - SessionDescriptionWrapper offer = connection_flow->CreateOffer(); - if (!offer.IsValid()) { - NEARBY_LOG(INFO, - "Unable to send offer. Failed to create our offer locally."); - RemoveConnectionFlow(remote_peer_id); - return; - } - - const webrtc::SessionDescriptionInterface& sdp = offer.GetSdp(); - if (!connection_flow->SetLocalSessionDescription(offer)) { - NEARBY_LOG(INFO, - "Unable to send offer. Failed to register our offer locally."); - RemoveConnectionFlow(remote_peer_id); - return; - } - - // Grab our info from the map. - auto& info = accepting_connections_info_.find(service_id)->second; - - // Pass the offer to the remote side. - if (!info.signaling_messenger->SendMessage( - remote_peer_id.GetId(), - webrtc_frames::EncodeOffer(info.self_peer_id, sdp))) { - NEARBY_LOG(INFO, - "Unable to send offer. Failed to write the offer to the remote " - "peer %s.", - remote_peer_id.GetId().c_str()); - RemoveConnectionFlow(remote_peer_id); - return; - } - - // Store the ConnectionFlow so that other methods can use it later. - connection_flows_.emplace(remote_peer_id.GetId(), std::move(connection_flow)); - NEARBY_LOG(INFO, "Sent offer to %s.", remote_peer_id.GetId().c_str()); -} - -void WebRtc::ReceiveOffer(const WebrtcPeerId& remote_peer_id, - SessionDescriptionWrapper offer) { - const auto& entry = connection_flows_.find(remote_peer_id.GetId()); - if (entry == connection_flows_.end()) { - NEARBY_LOG(INFO, - "Unable to receive offer. Failed to create a ConnectionFlow."); - return; - } - - if (!entry->second->OnOfferReceived(offer)) { - NEARBY_LOG(INFO, "Unable to receive offer. Failed to process the offer."); - RemoveConnectionFlow(remote_peer_id); - } -} - -void WebRtc::SendAnswer(const WebrtcPeerId& remote_peer_id) { - const auto& entry = connection_flows_.find(remote_peer_id.GetId()); - if (entry == connection_flows_.end()) { - NEARBY_LOG(INFO, - "Unable to send answer. Failed to create a ConnectionFlow."); - return; - } - - SessionDescriptionWrapper answer = entry->second->CreateAnswer(); - if (!answer.IsValid()) { - NEARBY_LOG(INFO, - "Unable to send answer. Failed to create our answer locally."); - RemoveConnectionFlow(remote_peer_id); - return; - } - - const webrtc::SessionDescriptionInterface& sdp = answer.GetSdp(); - if (!entry->second->SetLocalSessionDescription(answer)) { - NEARBY_LOG(INFO, - "Unable to send answer. Failed to register our answer locally."); - RemoveConnectionFlow(remote_peer_id); - return; - } - - // Grab our info from the map. - const auto& connection_request_entry = - requesting_connections_info_.find(remote_peer_id.GetId()); - if (connection_request_entry == requesting_connections_info_.end()) { - NEARBY_LOG(INFO, - "Unable to send answer. Failed to find an outgoing connection " - "request."); - RemoveConnectionFlow(remote_peer_id); - return; - } - - // Pass the answer to the remote side. - if (!connection_request_entry->second.signaling_messenger->SendMessage( - remote_peer_id.GetId(), - webrtc_frames::EncodeAnswer( - connection_request_entry->second.self_peer_id, sdp))) { - NEARBY_LOG( - INFO, - "Unable to send answer. Failed to write the answer to the remote " - "peer %s.", - remote_peer_id.GetId().c_str()); - RemoveConnectionFlow(remote_peer_id); - return; - } - - NEARBY_LOG(INFO, "Sent answer to %s.", remote_peer_id.GetId().c_str()); -} - -void WebRtc::ReceiveAnswer(const WebrtcPeerId& remote_peer_id, - SessionDescriptionWrapper answer) { - const auto& entry = connection_flows_.find(remote_peer_id.GetId()); - if (entry == connection_flows_.end()) { - NEARBY_LOG(INFO, - "Unable to receive answer. Failed to create a ConnectionFlow."); - return; - } - - if (!entry->second->OnAnswerReceived(answer)) { - NEARBY_LOG(INFO, "Unable to receive answer. Failed to process the answer."); - RemoveConnectionFlow(remote_peer_id); - } -} - -void WebRtc::ReceiveIceCandidates( - const WebrtcPeerId& remote_peer_id, - std::vector<std::unique_ptr<webrtc::IceCandidateInterface>> - ice_candidates) { - const auto& entry = connection_flows_.find(remote_peer_id.GetId()); - if (entry == connection_flows_.end()) { - NEARBY_LOG( - INFO, - "Unable to receive ice candidates. Failed to create a ConnectionFlow."); - return; - } - - entry->second->OnRemoteIceCandidatesReceived(std::move(ice_candidates)); -} - -void WebRtc::ProcessRestartTachyonReceiveMessages( - const std::string& service_id) { - MutexLock lock(&mutex_); - RestartTachyonReceiveMessages(service_id); -} - -void WebRtc::RestartTachyonReceiveMessages(const std::string& service_id) { - if (!IsAcceptingConnectionsLocked(service_id)) { - NEARBY_LOG(INFO, - "Skipping restart listening for tachyon inbox messages since we " - "are not accepting connections for service %s.", - service_id.c_str()); - return; - } - - // Grab our info from the map. - auto& info = accepting_connections_info_.find(service_id)->second; - - // Ensure we've disconnected from Tachyon. - info.signaling_messenger->StopReceivingMessages(); - - // Attempt to re-register. - if (!info.signaling_messenger->StartReceivingMessages( - absl::bind_front(&WebRtc::OnSignalingMessage, this, service_id), - absl::bind_front(&WebRtc::OnSignalingComplete, this, service_id))) { - NEARBY_LOG(WARNING, - "Failed to restart listening for tachyon inbox messages for " - "service %s since we failed to reach Tachyon.", - service_id.c_str()); - return; - } - - NEARBY_LOG(INFO, - "Successfully restarted listening for tachyon inbox messages on " - "service %s.", - service_id.c_str()); -} - -void WebRtc::ProcessDataChannelOpen(const std::string& service_id, - const WebrtcPeerId& remote_peer_id, - WebRtcSocketWrapper socket_wrapper) { - MutexLock lock(&mutex_); - - // Notify the client of the newly formed socket. - const auto& connection_request_entry = - requesting_connections_info_.find(remote_peer_id.GetId()); - if (connection_request_entry != requesting_connections_info_.end()) { - connection_request_entry->second.socket_future.Set(socket_wrapper); - return; - } - - const auto& accepting_connection_entry = - accepting_connections_info_.find(service_id); - if (accepting_connection_entry != accepting_connections_info_.end()) { - accepting_connection_entry->second.accepted_connection_callback.accepted_cb( - socket_wrapper); - return; - } - - // No one to handle the newly created DataChannel, so we'll just close it. - socket_wrapper.Close(); - NEARBY_LOG(INFO, - "Ignoring new DataChannel because we " - "are not accepting connections for service %s.", - service_id.c_str()); -} - -void WebRtc::ProcessDataChannelClosed(const WebrtcPeerId& remote_peer_id) { - MutexLock lock(&mutex_); - NEARBY_LOG(INFO, - "Data channel has closed, removing connection flow for peer %s.", - remote_peer_id.GetId().c_str()); - - RemoveConnectionFlow(remote_peer_id); -} - -std::unique_ptr<ConnectionFlow> WebRtc::CreateConnectionFlow( - const std::string& service_id, const WebrtcPeerId& remote_peer_id) { - RemoveConnectionFlow(remote_peer_id); - - return ConnectionFlow::Create( - {.local_ice_candidate_found_cb = - {[this, service_id, remote_peer_id]( - const webrtc::IceCandidateInterface* ice_candidate) { - // Note: We need to encode the ice candidate here, before we jump - // off the thread. Otherwise, it gets destroyed and we can't read - // it later. - ::location::nearby::mediums::IceCandidate encoded_ice_candidate = - webrtc_frames::EncodeIceCandidate(*ice_candidate); - OffloadFromThread( - "rtc-ice-candidates", - [this, service_id, remote_peer_id, encoded_ice_candidate]() { - ProcessLocalIceCandidate(service_id, remote_peer_id, - encoded_ice_candidate); - }); - }}}, - { - .data_channel_open_cb = {[this, service_id, remote_peer_id]( - WebRtcSocketWrapper socket_wrapper) { - OffloadFromThread( - "rtc-channel-created", - [this, service_id, remote_peer_id, socket_wrapper]() { - ProcessDataChannelOpen(service_id, remote_peer_id, - socket_wrapper); - }); - }}, - .data_channel_closed_cb = {[this, remote_peer_id]() { - OffloadFromThread("rtc-channel-closed", [this, remote_peer_id]() { - ProcessDataChannelClosed(remote_peer_id); - }); - }}, - }, - medium_); -} - -void WebRtc::RemoveConnectionFlow(const WebrtcPeerId& remote_peer_id) { - if (!connection_flows_.erase(remote_peer_id.GetId())) { - return; - } - - // If we had an outgoing connection request w/ this peer, report the failure - // to the future that's being waited on. - const auto& connection_request_entry = - requesting_connections_info_.find(remote_peer_id.GetId()); - if (connection_request_entry != requesting_connections_info_.end()) { - connection_request_entry->second.socket_future.SetException( - {Exception::kFailed}); - } -} - -void WebRtc::OffloadFromThread(const std::string& name, Runnable runnable) { - single_thread_executor_.Execute(name, std::move(runnable)); -} - -} // namespace mediums -} // namespace connections -} // namespace nearby -} // namespace location |