diff options
Diffstat (limited to 'chromium/third_party/nearby/src/cpp/core/internal/base_pcp_handler.cc')
-rw-r--r-- | chromium/third_party/nearby/src/cpp/core/internal/base_pcp_handler.cc | 1533 |
1 files changed, 0 insertions, 1533 deletions
diff --git a/chromium/third_party/nearby/src/cpp/core/internal/base_pcp_handler.cc b/chromium/third_party/nearby/src/cpp/core/internal/base_pcp_handler.cc deleted file mode 100644 index 8e3bf3e9455..00000000000 --- a/chromium/third_party/nearby/src/cpp/core/internal/base_pcp_handler.cc +++ /dev/null @@ -1,1533 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "core/internal/base_pcp_handler.h" - -#include <cassert> -#include <cinttypes> -#include <cstdlib> -#include <limits> -#include <memory> - -#include "securegcm/d2d_connection_context_v1.h" -#include "securegcm/ukey2_handshake.h" -#include "absl/container/flat_hash_set.h" -#include "absl/strings/escaping.h" -#include "absl/types/span.h" -#include "core/internal/mediums/utils.h" -#include "core/internal/offline_frames.h" -#include "core/internal/pcp_handler.h" -#include "core/options.h" -#include "platform/base/base64_utils.h" -#include "platform/base/bluetooth_utils.h" -#include "platform/public/logging.h" -#include "platform/public/system_clock.h" - -namespace location { -namespace nearby { -namespace connections { - -using ::location::nearby::proto::connections::Medium; -using ::securegcm::UKey2Handshake; - -constexpr absl::Duration BasePcpHandler::kConnectionRequestReadTimeout; -constexpr absl::Duration BasePcpHandler::kRejectedConnectionCloseDelay; - -BasePcpHandler::BasePcpHandler(Mediums* mediums, - EndpointManager* endpoint_manager, - EndpointChannelManager* channel_manager, - BwuManager* bwu_manager, Pcp pcp) - : mediums_(mediums), - endpoint_manager_(endpoint_manager), - channel_manager_(channel_manager), - pcp_(pcp), - bwu_manager_(bwu_manager) {} - -BasePcpHandler::~BasePcpHandler() { - NEARBY_LOGS(INFO) << "Initiating shutdown of BasePcpHandler(" - << strategy_.GetName() << ")"; - DisconnectFromEndpointManager(); - // Stop all the ongoing Runnables (as gracefully as possible). - NEARBY_LOGS(INFO) << "BasePcpHandler(" << strategy_.GetName() - << ") is bringing down executors."; - serial_executor_.Shutdown(); - alarm_executor_.Shutdown(); - NEARBY_LOGS(INFO) << "BasePcpHandler(" << strategy_.GetName() - << ") has shut down."; -} - -void BasePcpHandler::DisconnectFromEndpointManager() { - if (stop_.Set(true)) return; - NEARBY_LOGS(INFO) << "BasePcpHandler(" << strategy_.GetName() - << ") unregister from EPM."; - // Unregister ourselves from EPM message dispatcher. - endpoint_manager_->UnregisterFrameProcessor(V1Frame::CONNECTION_RESPONSE, - this); -} - -Status BasePcpHandler::StartAdvertising(ClientProxy* client, - const std::string& service_id, - const ConnectionOptions& options, - const ConnectionRequestInfo& info) { - Future<Status> response; - NEARBY_LOGS(INFO) << "StartAdvertising with supported mediums: " - << GetStringValueOfSupportedMediums(options); - ConnectionOptions advertising_options = options.CompatibleOptions(); - RunOnPcpHandlerThread( - "start-advertising", - [this, client, &service_id, &info, &advertising_options, &response]() - RUN_ON_PCP_HANDLER_THREAD() { - // The endpoint id inside of the advertisement is different to high - // visibility and low visibility mode. In order to decide if client - // should grab the high visibility or low visibility id, it needs to - // tell client which one right now, before - // client#StartedAdvertising. - if (ShouldEnterHighVisibilityMode(advertising_options)) { - client->EnterHighVisibilityMode(); - } - - auto result = StartAdvertisingImpl( - client, service_id, client->GetLocalEndpointId(), - info.endpoint_info, advertising_options); - if (!result.status.Ok()) { - client->ExitHighVisibilityMode(); - response.Set(result.status); - return; - } - - // Now that we've succeeded, mark the client as advertising. - // Save the advertising options for local reference in later process - // like upgrading bandwidth. - advertising_listener_ = info.listener; - client->StartedAdvertising(service_id, GetStrategy(), info.listener, - absl::MakeSpan(result.mediums), - advertising_options); - response.Set({Status::kSuccess}); - }); - return WaitForResult(absl::StrCat("StartAdvertising(", service_id, ")"), - client->GetClientId(), &response); -} - -void BasePcpHandler::StopAdvertising(ClientProxy* client) { - NEARBY_LOGS(INFO) << "StopAdvertising local_endpoint_id=" - << client->GetLocalEndpointId(); - CountDownLatch latch(1); - RunOnPcpHandlerThread("stop-advertising", - [this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() { - StopAdvertisingImpl(client); - client->StoppedAdvertising(); - latch.CountDown(); - }); - WaitForLatch("StopAdvertising", &latch); -} - -std::string BasePcpHandler::GetStringValueOfSupportedMediums( - const ConnectionOptions& options) const { - std::ostringstream result; - result << "{ "; - if (options.allowed.bluetooth) { - result << proto::connections::Medium_Name(Medium::BLUETOOTH) << " "; - } - if (options.allowed.ble) { - result << proto::connections::Medium_Name(Medium::BLE) << " "; - } - if (options.allowed.web_rtc) { - result << proto::connections::Medium_Name(Medium::WEB_RTC) << " "; - } - if (options.allowed.wifi_lan) { - result << proto::connections::Medium_Name(Medium::WIFI_LAN) << " "; - } - result << "}"; - return result.str(); -} - -bool BasePcpHandler::ShouldEnterHighVisibilityMode( - const ConnectionOptions& options) { - return !options.low_power && options.allowed.bluetooth; -} - -BooleanMediumSelector BasePcpHandler::ComputeIntersectionOfSupportedMediums( - const PendingConnectionInfo& connection_info) { - absl::flat_hash_set<Medium> intersection; - auto their_mediums = connection_info.supported_mediums; - - // If no supported mediums were set, use the default upgrade medium. - if (their_mediums.empty()) { - their_mediums.push_back(GetDefaultUpgradeMedium()); - } - - for (Medium my_medium : GetConnectionMediumsByPriority()) { - if (std::find(their_mediums.begin(), their_mediums.end(), my_medium) != - their_mediums.end()) { - // We use advertising options as a proxy to whether or not the local - // client does want to enable a WebRTC upgrade. - if (my_medium == location::nearby::proto::connections::Medium::WEB_RTC) { - ConnectionOptions advertising_options = - connection_info.client->GetAdvertisingOptions(); - - if (!advertising_options.enable_webrtc_listening && - !advertising_options.allowed.web_rtc) { - // The local client does not allow WebRTC for listening or upgrades, - // ignore. - continue; - } - } - - intersection.emplace(my_medium); - } - } - - // Not using designated initializers here since the VS C++ compiler errors - // out indicating that MediumSelector<bool> is not an aggregate - MediumSelector<bool> mediumSelector{}; - mediumSelector.bluetooth = intersection.contains(Medium::BLUETOOTH); - mediumSelector.ble = intersection.contains(Medium::BLE); - mediumSelector.web_rtc = intersection.contains(Medium::WEB_RTC); - mediumSelector.wifi_lan = intersection.contains(Medium::WIFI_LAN); - return mediumSelector; -} - -Status BasePcpHandler::StartDiscovery(ClientProxy* client, - const std::string& service_id, - const ConnectionOptions& options, - const DiscoveryListener& listener) { - Future<Status> response; - ConnectionOptions discovery_options = options.CompatibleOptions(); - - NEARBY_LOGS(INFO) << "StartDiscovery with supported mediums:" - << GetStringValueOfSupportedMediums(options); - RunOnPcpHandlerThread( - "start-discovery", [this, client, service_id, discovery_options, - &listener, &response]() RUN_ON_PCP_HANDLER_THREAD() { - // Ask the implementation to attempt to start discovery. - auto result = StartDiscoveryImpl(client, service_id, discovery_options); - if (!result.status.Ok()) { - response.Set(result.status); - return; - } - - // Now that we've succeeded, mark the client as discovering and clear - // out any old endpoints we had discovered. - discovered_endpoints_.clear(); - client->StartedDiscovery(service_id, GetStrategy(), listener, - absl::MakeSpan(result.mediums), - discovery_options); - response.Set({Status::kSuccess}); - }); - return WaitForResult(absl::StrCat("StartDiscovery(", service_id, ")"), - client->GetClientId(), &response); -} - -void BasePcpHandler::StopDiscovery(ClientProxy* client) { - CountDownLatch latch(1); - RunOnPcpHandlerThread("stop-discovery", - [this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() { - StopDiscoveryImpl(client); - client->StoppedDiscovery(); - latch.CountDown(); - }); - - WaitForLatch("StopDiscovery", &latch); -} - -void BasePcpHandler::InjectEndpoint( - ClientProxy* client, const std::string& service_id, - const OutOfBandConnectionMetadata& metadata) { - CountDownLatch latch(1); - RunOnPcpHandlerThread("inject-endpoint", - [this, client, service_id, metadata, &latch]() - RUN_ON_PCP_HANDLER_THREAD() { - InjectEndpointImpl(client, service_id, metadata); - latch.CountDown(); - }); - - WaitForLatch(absl::StrCat("InjectEndpoint(", service_id, ")"), &latch); -} - -void BasePcpHandler::WaitForLatch(const std::string& method_name, - CountDownLatch* latch) { - Exception await_exception = latch->Await(); - if (!await_exception.Ok()) { - if (await_exception.Raised(Exception::kTimeout)) { - NEARBY_LOGS(INFO) << "Blocked in " << method_name; - } - } -} - -Status BasePcpHandler::WaitForResult(const std::string& method_name, - std::int64_t client_id, - Future<Status>* future) { - if (!future) { - NEARBY_LOGS(INFO) << "No future to wait for; return with error"; - return {Status::kError}; - } - NEARBY_LOGS(INFO) << "Waiting for future to complete: " << method_name; - ExceptionOr<Status> result = future->Get(); - if (!result.ok()) { - NEARBY_LOGS(INFO) << "Future:[" << method_name - << "] completed with exception:" << result.exception(); - return {Status::kError}; - } - NEARBY_LOGS(INFO) << "Future:[" << method_name - << "] completed with status:" << result.result().value; - return result.result(); -} - -void BasePcpHandler::RunOnPcpHandlerThread(const std::string& name, - Runnable runnable) { - serial_executor_.Execute(name, std::move(runnable)); -} - -EncryptionRunner::ResultListener BasePcpHandler::GetResultListener() { - return { - .on_success_cb = - [this](const std::string& endpoint_id, - std::unique_ptr<UKey2Handshake> ukey2, - const std::string& auth_token, - const ByteArray& raw_auth_token) { - RunOnPcpHandlerThread( - "encryption-success", - [this, endpoint_id, raw_ukey2 = ukey2.release(), auth_token, - raw_auth_token]() RUN_ON_PCP_HANDLER_THREAD() mutable { - OnEncryptionSuccessRunnable( - endpoint_id, std::unique_ptr<UKey2Handshake>(raw_ukey2), - auth_token, raw_auth_token); - }); - }, - .on_failure_cb = - [this](const std::string& endpoint_id, EndpointChannel* channel) { - RunOnPcpHandlerThread( - "encryption-failure", - [this, endpoint_id, channel]() RUN_ON_PCP_HANDLER_THREAD() { - NEARBY_LOGS(ERROR) - << "Encryption failed for endpoint_id=" << endpoint_id - << " on medium=" - << proto::connections::Medium_Name(channel->GetMedium()); - OnEncryptionFailureRunnable(endpoint_id, channel); - }); - }, - }; -} - -void BasePcpHandler::OnEncryptionSuccessRunnable( - const std::string& endpoint_id, std::unique_ptr<UKey2Handshake> ukey2, - const std::string& auth_token, const ByteArray& raw_auth_token) { - // Quick fail if we've been removed from pending connections while we were - // busy running UKEY2. - auto it = pending_connections_.find(endpoint_id); - if (it == pending_connections_.end()) { - NEARBY_LOGS(INFO) - << "Connection not found on UKEY negotination complete; endpoint_id=" - << endpoint_id; - return; - } - - BasePcpHandler::PendingConnectionInfo& connection_info = it->second; - Medium medium = connection_info.channel->GetMedium(); - - if (!ukey2) { - // Fail early, if there is no crypto context. - ProcessPreConnectionInitiationFailure( - connection_info.client, medium, endpoint_id, - connection_info.channel.get(), connection_info.is_incoming, - connection_info.start_time, {Status::kEndpointIoError}, - connection_info.result.lock().get()); - return; - } - - connection_info.SetCryptoContext(std::move(ukey2)); - connection_info.connection_token = GetHashedConnectionToken(raw_auth_token); - NEARBY_LOGS(INFO) - << "Register encrypted connection; wait for response; endpoint_id=" - << endpoint_id; - - // Set ourselves up so that we receive all acceptance/rejection messages - endpoint_manager_->RegisterFrameProcessor(V1Frame::CONNECTION_RESPONSE, this); - - // Now we register our endpoint so that we can listen for both sides to - // accept. - endpoint_manager_->RegisterEndpoint( - connection_info.client, endpoint_id, - { - .remote_endpoint_info = connection_info.remote_endpoint_info, - .authentication_token = auth_token, - .raw_authentication_token = raw_auth_token, - .is_incoming_connection = connection_info.is_incoming, - }, - { - .strategy = connection_info.options.strategy, - .allowed = ComputeIntersectionOfSupportedMediums(connection_info), - .auto_upgrade_bandwidth = - connection_info.options.auto_upgrade_bandwidth, - .enforce_topology_constraints = - connection_info.options.enforce_topology_constraints, - .low_power = connection_info.options.low_power, - .enable_bluetooth_listening = - connection_info.options.enable_bluetooth_listening, - .enable_webrtc_listening = - connection_info.options.enable_webrtc_listening, - .is_out_of_band_connection = - connection_info.options.is_out_of_band_connection, - .remote_bluetooth_mac_address = - connection_info.options.remote_bluetooth_mac_address, - .fast_advertisement_service_uuid = - connection_info.options.fast_advertisement_service_uuid, - .keep_alive_interval_millis = - connection_info.options.keep_alive_interval_millis, - .keep_alive_timeout_millis = - connection_info.options.keep_alive_timeout_millis, - }, - std::move(connection_info.channel), connection_info.listener, - connection_info.connection_token); - - if (connection_info.is_incoming) { - connection_info.client->GetAnalyticsRecorder().OnIncomingConnectionAttempt( - proto::connections::INITIAL, medium, proto::connections::RESULT_SUCCESS, - SystemClock::ElapsedRealtime() - connection_info.start_time, - connection_info.connection_token); - } else { - connection_info.client->GetAnalyticsRecorder().OnOutgoingConnectionAttempt( - endpoint_id, proto::connections::INITIAL, medium, - proto::connections::RESULT_SUCCESS, - SystemClock::ElapsedRealtime() - connection_info.start_time, - connection_info.connection_token); - } - - if (auto future_status = connection_info.result.lock()) { - NEARBY_LOGS(INFO) << "Connection established; Finalising future OK."; - future_status->Set({Status::kSuccess}); - connection_info.result.reset(); - } -} - -void BasePcpHandler::OnEncryptionFailureRunnable( - const std::string& endpoint_id, EndpointChannel* endpoint_channel) { - auto it = pending_connections_.find(endpoint_id); - if (it == pending_connections_.end()) { - NEARBY_LOGS(INFO) - << "Connection not found on UKEY negotination complete; endpoint_id=" - << endpoint_id; - return; - } - - BasePcpHandler::PendingConnectionInfo& info = it->second; - // We had a bug here, caused by a race with EncryptionRunner. We now verify - // the EndpointChannel to avoid it. In a simultaneous connection, we clean - // up one of the two EndpointChannels and then update our pendingConnections - // with the winning channel's state. Closing a channel that was in the - // middle of EncryptionRunner would trigger onEncryptionFailed, and, since - // the map had already updated with the winning EndpointChannel, we closed - // it too by accident. - if (*endpoint_channel != *info.channel) { - NEARBY_LOGS(INFO) << "Not destroying channel [mismatch]: passed=" - << endpoint_channel->GetName() - << "; expected=" << info.channel->GetName(); - return; - } - - ProcessPreConnectionInitiationFailure( - info.client, info.channel->GetMedium(), endpoint_id, info.channel.get(), - info.is_incoming, info.start_time, {Status::kEndpointIoError}, - info.result.lock().get()); -} - -Status BasePcpHandler::RequestConnection(ClientProxy* client, - const std::string& endpoint_id, - const ConnectionRequestInfo& info, - const ConnectionOptions& options) { - auto result = std::make_shared<Future<Status>>(); - RunOnPcpHandlerThread( - "request-connection", [this, client, &info, options, endpoint_id, - result]() RUN_ON_PCP_HANDLER_THREAD() { - absl::Time start_time = SystemClock::ElapsedRealtime(); - - // If we already have a pending connection, then we shouldn't allow any - // more outgoing connections to this endpoint. - if (pending_connections_.count(endpoint_id)) { - NEARBY_LOGS(INFO) - << "In requestConnection(), connection requested with " - "endpoint(id=" - << endpoint_id - << "), but we already have a pending connection with them."; - result->Set({Status::kAlreadyConnectedToEndpoint}); - return; - } - - // If our child class says we can't send any more outgoing connections, - // listen to them. - if (ShouldEnforceTopologyConstraints(client->GetAdvertisingOptions()) && - !CanSendOutgoingConnection(client)) { - NEARBY_LOGS(INFO) - << "In requestConnection(), client=" << client->GetClientId() - << " attempted a connection with endpoint(id=" << endpoint_id - << "), but outgoing connections are disallowed"; - result->Set({Status::kOutOfOrderApiCall}); - return; - } - - DiscoveredEndpoint* endpoint = GetDiscoveredEndpoint(endpoint_id); - if (endpoint == nullptr) { - NEARBY_LOGS(INFO) - << "Discovered endpoint not found: endpoint_id=" << endpoint_id; - result->Set({Status::kEndpointUnknown}); - return; - } - - auto remote_bluetooth_mac_address = - BluetoothUtils::ToString(options.remote_bluetooth_mac_address); - if (!remote_bluetooth_mac_address.empty()) { - if (AppendRemoteBluetoothMacAddressEndpoint( - endpoint_id, remote_bluetooth_mac_address, - client->GetDiscoveryOptions())) - NEARBY_LOGS(INFO) - << "Appended remote Bluetooth MAC Address endpoint [" - << remote_bluetooth_mac_address << "]"; - } - - if (AppendWebRTCEndpoint(endpoint_id, client->GetDiscoveryOptions())) - NEARBY_LOGS(INFO) << "Appended Web RTC endpoint."; - - auto discovered_endpoints = GetDiscoveredEndpoints(endpoint_id); - std::unique_ptr<EndpointChannel> channel; - ConnectImplResult connect_impl_result; - - for (auto connect_endpoint : discovered_endpoints) { - absl::Time connect_start_time = SystemClock::ElapsedRealtime(); - if (!MediumSupportedByClientOptions(connect_endpoint->medium, - options)) - continue; - connect_impl_result = ConnectImpl(client, connect_endpoint); - if (connect_impl_result.status.Ok()) { - channel = std::move(connect_impl_result.endpoint_channel); - break; - } else { - LogConnectionAttempt(client, connect_endpoint->medium, - connect_endpoint->endpoint_id, - /* is_incoming = */ false, connect_start_time); - } - } - - Medium channel_medium = - channel ? channel->GetMedium() : Medium::UNKNOWN_MEDIUM; - if (channel == nullptr) { - NEARBY_LOGS(INFO) - << "Endpoint channel not available: endpoint_id=" << endpoint_id; - ProcessPreConnectionInitiationFailure( - client, channel_medium, endpoint_id, channel.get(), - /* is_incoming = */ false, start_time, connect_impl_result.status, - result.get()); - return; - } - - NEARBY_LOGS(INFO) - << "In requestConnection(), wrote ConnectionRequestFrame " - "to endpoint_id=" - << endpoint_id; - // Generate the nonce to use for this connection. - std::int32_t nonce = prng_.NextInt32(); - - // The first message we have to send, after connecting, is to tell the - // endpoint about ourselves. - Exception write_exception = WriteConnectionRequestFrame( - channel.get(), client->GetLocalEndpointId(), info.endpoint_info, - nonce, GetSupportedConnectionMediumsByPriority(options), - options.keep_alive_interval_millis, - options.keep_alive_timeout_millis); - if (!write_exception.Ok()) { - NEARBY_LOGS(INFO) << "Failed to send connection request: endpoint_id=" - << endpoint_id; - ProcessPreConnectionInitiationFailure( - client, channel_medium, endpoint_id, channel.get(), - /* is_incoming = */ false, start_time, {Status::kEndpointIoError}, - result.get()); - return; - } - - NEARBY_LOGS(INFO) << "Adding connection to pending set: endpoint_id=" - << endpoint_id; - - // We've successfully connected to the device, and are now about to jump - // on to the EncryptionRunner thread to start running our encryption - // protocol. We'll mark ourselves as pending in case we get another call - // to RequestConnection or OnIncomingConnection, so that we can cancel - // the connection if needed. - // Not using designated initializers here since the VS C++ compiler - // errors out indicating that MediumSelector<bool> is not an aggregate - PendingConnectionInfo pendingConnectionInfo{}; - pendingConnectionInfo.client = client; - pendingConnectionInfo.remote_endpoint_info = endpoint->endpoint_info; - pendingConnectionInfo.nonce = nonce; - pendingConnectionInfo.is_incoming = false; - pendingConnectionInfo.start_time = start_time; - pendingConnectionInfo.listener = info.listener; - pendingConnectionInfo.options = options; - pendingConnectionInfo.result = result; - pendingConnectionInfo.channel = std::move(channel); - - EndpointChannel* endpoint_channel = - pending_connections_ - .emplace(endpoint_id, std::move(pendingConnectionInfo)) - .first->second.channel.get(); - - NEARBY_LOGS(INFO) << "Initiating secure connection: endpoint_id=" - << endpoint_id; - // Next, we'll set up encryption. When it's done, our future will return - // and RequestConnection() will finish. - encryption_runner_.StartClient(client, endpoint_id, endpoint_channel, - GetResultListener()); - }); - NEARBY_LOGS(INFO) << "Waiting for connection to complete: endpoint_id=" - << endpoint_id; - auto status = - WaitForResult(absl::StrCat("RequestConnection(", endpoint_id, ")"), - client->GetClientId(), result.get()); - NEARBY_LOGS(INFO) << "Wait is complete: endpoint_id=" << endpoint_id - << "; status=" << status.value; - return status; -} - -bool BasePcpHandler::MediumSupportedByClientOptions( - const proto::connections::Medium& medium, - const ConnectionOptions& client_options) const { - for (auto supported_medium : client_options.GetMediums()) { - if (medium == supported_medium) { - return true; - } - } - return false; -} - -// Get ordered supported connection medium based on local advertising/discovery -// option. -std::vector<proto::connections::Medium> -BasePcpHandler::GetSupportedConnectionMediumsByPriority( - const ConnectionOptions& local_option) { - std::vector<proto::connections::Medium> supported_mediums_by_priority; - for (auto medium_by_priority : GetConnectionMediumsByPriority()) { - if (MediumSupportedByClientOptions(medium_by_priority, local_option)) { - supported_mediums_by_priority.push_back(medium_by_priority); - } - } - return supported_mediums_by_priority; -} - -// Get any single discovered endpoint for a given endpoint_id. -BasePcpHandler::DiscoveredEndpoint* BasePcpHandler::GetDiscoveredEndpoint( - const std::string& endpoint_id) { - auto it = discovered_endpoints_.find(endpoint_id); - if (it == discovered_endpoints_.end()) { - return nullptr; - } - return it->second.get(); -} - -std::vector<BasePcpHandler::DiscoveredEndpoint*> -BasePcpHandler::GetDiscoveredEndpoints(const std::string& endpoint_id) { - std::vector<BasePcpHandler::DiscoveredEndpoint*> result; - auto it = discovered_endpoints_.equal_range(endpoint_id); - for (auto item = it.first; item != it.second; item++) { - result.push_back(item->second.get()); - } - std::sort(result.begin(), result.end(), - [this](DiscoveredEndpoint* a, DiscoveredEndpoint* b) -> bool { - return IsPreferred(*a, *b); - }); - - return result; -} - -std::vector<BasePcpHandler::DiscoveredEndpoint*> -BasePcpHandler::GetDiscoveredEndpoints( - const proto::connections::Medium medium) { - std::vector<BasePcpHandler::DiscoveredEndpoint*> result; - for (const auto& item : discovered_endpoints_) { - if (item.second->medium == medium) { - result.push_back(item.second.get()); - } - } - return result; -} - -mediums::WebrtcPeerId BasePcpHandler::CreatePeerIdFromAdvertisement( - const std::string& service_id, const std::string& endpoint_id, - const ByteArray& endpoint_info) { - std::string seed = - absl::StrCat(service_id, endpoint_id, std::string(endpoint_info)); - return mediums::WebrtcPeerId::FromSeed(ByteArray(std::move(seed))); -} - -bool BasePcpHandler::HasOutgoingConnections(ClientProxy* client) const { - for (const auto& item : pending_connections_) { - auto& connection = item.second; - if (!connection.is_incoming) { - return true; - } - } - return client->GetNumOutgoingConnections() > 0; -} - -bool BasePcpHandler::HasIncomingConnections(ClientProxy* client) const { - for (const auto& item : pending_connections_) { - auto& connection = item.second; - if (connection.is_incoming) { - return true; - } - } - return client->GetNumIncomingConnections() > 0; -} - -bool BasePcpHandler::CanSendOutgoingConnection(ClientProxy* client) const { - return true; -} - -bool BasePcpHandler::CanReceiveIncomingConnection(ClientProxy* client) const { - return true; -} - -Exception BasePcpHandler::WriteConnectionRequestFrame( - EndpointChannel* endpoint_channel, const std::string& local_endpoint_id, - const ByteArray& local_endpoint_info, std::int32_t nonce, - const std::vector<proto::connections::Medium>& supported_mediums, - std::int32_t keep_alive_interval_millis, - std::int32_t keep_alive_timeout_millis) { - return endpoint_channel->Write(parser::ForConnectionRequest( - local_endpoint_id, local_endpoint_info, nonce, /*supports_5_ghz =*/false, - /*bssid=*/std::string{}, supported_mediums, keep_alive_interval_millis, - keep_alive_timeout_millis)); -} - -void BasePcpHandler::ProcessPreConnectionInitiationFailure( - ClientProxy* client, Medium medium, const std::string& endpoint_id, - EndpointChannel* channel, bool is_incoming, absl::Time start_time, - Status status, Future<Status>* result) { - if (channel != nullptr) { - channel->Close(); - } - - if (result != nullptr) { - NEARBY_LOGS(INFO) << "Connection failed; aborting future"; - result->Set(status); - } - - LogConnectionAttempt(client, medium, endpoint_id, is_incoming, start_time); - // result is hold inside a swapper, and saved in PendingConnectionInfo. - // PendingConnectionInfo destructor will clear the memory of SettableFuture - // shared_ptr for result. - pending_connections_.erase(endpoint_id); -} - -void BasePcpHandler::ProcessPreConnectionResultFailure( - ClientProxy* client, const std::string& endpoint_id) { - auto item = pending_connections_.extract(endpoint_id); - endpoint_manager_->DiscardEndpoint(client, endpoint_id); - client->OnConnectionRejected(endpoint_id, {Status::kError}); -} - -bool BasePcpHandler::ShouldEnforceTopologyConstraints( - const ConnectionOptions& local_advertising_options) const { - // Topology constraints only matter for the advertiser. - // For discoverers, we'll always enforce them. - if (local_advertising_options.strategy.IsNone()) { - return true; - } - - return local_advertising_options.enforce_topology_constraints; -} - -bool BasePcpHandler::AutoUpgradeBandwidth( - const ConnectionOptions& local_advertising_options) const { - if (local_advertising_options.strategy.IsNone()) { - return true; - } - - return local_advertising_options.auto_upgrade_bandwidth; -} - -Status BasePcpHandler::AcceptConnection( - ClientProxy* client, const std::string& endpoint_id, - const PayloadListener& payload_listener) { - Future<Status> response; - RunOnPcpHandlerThread( - "accept-connection", [this, client, endpoint_id, payload_listener, - &response]() RUN_ON_PCP_HANDLER_THREAD() { - NEARBY_LOGS(INFO) << "AcceptConnection: endpoint_id=" << endpoint_id; - if (!pending_connections_.count(endpoint_id)) { - NEARBY_LOGS(INFO) - << "AcceptConnection: no pending connection for endpoint_id=" - << endpoint_id; - - response.Set({Status::kEndpointUnknown}); - return; - } - auto& connection_info = pending_connections_[endpoint_id]; - - // By this point in the flow, connection_info.channel has been - // nulled out because ownership of that EndpointChannel was passed on to - // EndpointChannelManager via a call to - // EndpointManager::registerEndpoint(), so we now need to get access to - // the EndpointChannel from the authoritative owner. - std::shared_ptr<EndpointChannel> channel = - channel_manager_->GetChannelForEndpoint(endpoint_id); - if (channel == nullptr) { - NEARBY_LOGS(ERROR) << "Channel destroyed before Accept; bring down " - "connection: endpoint_id=" - << endpoint_id; - ProcessPreConnectionResultFailure(client, endpoint_id); - response.Set({Status::kEndpointUnknown}); - return; - } - - Exception write_exception = - channel->Write(parser::ForConnectionResponse(Status::kSuccess)); - if (!write_exception.Ok()) { - NEARBY_LOGS(INFO) - << "AcceptConnection: failed to send response: endpoint_id=" - << endpoint_id; - ProcessPreConnectionResultFailure(client, endpoint_id); - response.Set({Status::kEndpointIoError}); - return; - } - - NEARBY_LOGS(INFO) << "AcceptConnection: accepting locally: endpoint_id=" - << endpoint_id; - connection_info.LocalEndpointAcceptedConnection(endpoint_id, - payload_listener); - EvaluateConnectionResult(client, endpoint_id, - false /* can_close_immediately */); - response.Set({Status::kSuccess}); - }); - - return WaitForResult(absl::StrCat("AcceptConnection(", endpoint_id, ")"), - client->GetClientId(), &response); -} - -Status BasePcpHandler::RejectConnection(ClientProxy* client, - const std::string& endpoint_id) { - Future<Status> response; - RunOnPcpHandlerThread( - "reject-connection", - [this, client, endpoint_id, &response]() RUN_ON_PCP_HANDLER_THREAD() { - NEARBY_LOG(INFO, "RejectConnection: id=%s", endpoint_id.c_str()); - if (!pending_connections_.count(endpoint_id)) { - NEARBY_LOGS(INFO) - << "RejectConnection: no pending connection for endpoint_id=" - << endpoint_id; - response.Set({Status::kEndpointUnknown}); - return; - } - auto& connection_info = pending_connections_[endpoint_id]; - - // By this point in the flow, connection_info->endpoint_channel_ has - // been nulled out because ownership of that EndpointChannel was passed - // on to EndpointChannelManager via a call to - // EndpointManager::registerEndpoint(), so we now need to get access to - // the EndpointChannel from the authoritative owner. - std::shared_ptr<EndpointChannel> channel = - channel_manager_->GetChannelForEndpoint(endpoint_id); - if (channel == nullptr) { - NEARBY_LOGS(ERROR) - << "Channel destroyed before Reject; bring down connection: " - "endpoint_id=" - << endpoint_id; - ProcessPreConnectionResultFailure(client, endpoint_id); - response.Set({Status::kEndpointUnknown}); - return; - } - - Exception write_exception = channel->Write( - parser::ForConnectionResponse(Status::kConnectionRejected)); - if (!write_exception.Ok()) { - NEARBY_LOGS(INFO) - << "RejectConnection: failed to send response: endpoint_id=" - << endpoint_id; - ProcessPreConnectionResultFailure(client, endpoint_id); - response.Set({Status::kEndpointIoError}); - return; - } - - NEARBY_LOGS(INFO) << "RejectConnection: rejecting locally: endpoint_id=" - << endpoint_id; - connection_info.LocalEndpointRejectedConnection(endpoint_id); - EvaluateConnectionResult(client, endpoint_id, - false /* can_close_immediately */); - response.Set({Status::kSuccess}); - }); - - return WaitForResult(absl::StrCat("RejectConnection(", endpoint_id, ")"), - client->GetClientId(), &response); -} - -void BasePcpHandler::OnIncomingFrame(OfflineFrame& frame, - const std::string& endpoint_id, - ClientProxy* client, - proto::connections::Medium medium) { - CountDownLatch latch(1); - RunOnPcpHandlerThread( - "incoming-frame", - [this, client, endpoint_id, frame, &latch]() RUN_ON_PCP_HANDLER_THREAD() { - NEARBY_LOGS(INFO) << "OnConnectionResponse: endpoint_id=" - << endpoint_id; - - if (client->HasRemoteEndpointResponded(endpoint_id)) { - NEARBY_LOGS(INFO) - << "OnConnectionResponse: already handled; endpoint_id=" - << endpoint_id; - return; - } - - const ConnectionResponseFrame& connection_response = - frame.v1().connection_response(); - - // For backward compatible, here still check both status and - // response parameters until the response feature is roll out in all - // supported devices. - bool accepted = false; - if (connection_response.has_response()) { - accepted = - connection_response.response() == ConnectionResponseFrame::ACCEPT; - } else { - accepted = connection_response.status() == Status::kSuccess; - } - if (accepted) { - NEARBY_LOGS(INFO) - << "OnConnectionResponse: remote accepted; endpoint_id=" - << endpoint_id; - client->RemoteEndpointAcceptedConnection(endpoint_id); - } else { - NEARBY_LOGS(INFO) - << "OnConnectionResponse: remote rejected; endpoint_id=" - << endpoint_id << "; status=" << connection_response.status(); - client->RemoteEndpointRejectedConnection(endpoint_id); - } - - EvaluateConnectionResult(client, endpoint_id, - /* can_close_immediately= */ true); - - latch.CountDown(); - }); - WaitForLatch("OnIncomingFrame()", &latch); -} - -void BasePcpHandler::OnEndpointDisconnect(ClientProxy* client, - const std::string& endpoint_id, - CountDownLatch barrier) { - if (stop_.Get()) { - barrier.CountDown(); - return; - } - RunOnPcpHandlerThread("on-endpoint-disconnect", - [this, client, endpoint_id, barrier]() - RUN_ON_PCP_HANDLER_THREAD() mutable { - auto item = pending_alarms_.find(endpoint_id); - if (item != pending_alarms_.end()) { - auto& alarm = item->second; - alarm.Cancel(); - pending_alarms_.erase(item); - } - ProcessPreConnectionResultFailure(client, - endpoint_id); - barrier.CountDown(); - }); -} - -BluetoothDevice BasePcpHandler::GetRemoteBluetoothDevice( - const std::string& remote_bluetooth_mac_address) { - return mediums_->GetBluetoothClassic().GetRemoteDevice( - remote_bluetooth_mac_address); -} - -void BasePcpHandler::OnEndpointFound( - ClientProxy* client, std::shared_ptr<DiscoveredEndpoint> endpoint) { - // Check if we've seen this endpoint ID before. - std::string& endpoint_id = endpoint->endpoint_id; - NEARBY_LOGS(INFO) << "OnEndpointFound: id=" << endpoint_id << " [enter]"; - - auto range = discovered_endpoints_.equal_range(endpoint->endpoint_id); - - DiscoveredEndpoint* owned_endpoint = nullptr; - for (auto& item = range.first; item != range.second; ++item) { - auto& discovered_endpoint = item->second; - if (discovered_endpoint->medium != endpoint->medium) continue; - // Check if there was a info change. If there was, report the previous - // endpoint as lost. - if (discovered_endpoint->endpoint_info != endpoint->endpoint_info) { - OnEndpointLost(client, *discovered_endpoint); - discovered_endpoint = endpoint; // Replace endpoint. - OnEndpointFound(client, std::move(endpoint)); - return; - } else { - owned_endpoint = endpoint.get(); - break; - } - } - - if (!owned_endpoint) { - owned_endpoint = - discovered_endpoints_.emplace(endpoint_id, std::move(endpoint)) - ->second.get(); - } - - // Range is empty: this is the first endpoint we discovered so far. - // Report this endpoint_id to client. - if (range.first == range.second) { - NEARBY_LOGS(INFO) << "Adding new endpoint: endpoint_id=" << endpoint_id; - // And, as it's the first time, report it to the client. - client->OnEndpointFound( - owned_endpoint->service_id, owned_endpoint->endpoint_id, - owned_endpoint->endpoint_info, owned_endpoint->medium); - } else { - NEARBY_LOGS(INFO) << "Adding new medium for endpoint: endpoint_id=" - << endpoint_id << "; medium=" << owned_endpoint->medium; - } -} - -void BasePcpHandler::OnEndpointLost( - ClientProxy* client, const BasePcpHandler::DiscoveredEndpoint& endpoint) { - // Look up the DiscoveredEndpoint we have in our cache. - const auto* discovered_endpoint = GetDiscoveredEndpoint(endpoint.endpoint_id); - if (discovered_endpoint == nullptr) { - NEARBY_LOGS(INFO) << "No previous endpoint (nothing to lose): endpoint_id=" - << endpoint.endpoint_id; - return; - } - - // Validate that the cached endpoint has the same info as the one reported as - // onLost. If the info differs, then no-op. This likely means that the remote - // device changed their info. We reported onFound for the new info and are - // just now figuring out that we lost the old info. - if (discovered_endpoint->endpoint_info != endpoint.endpoint_info) { - NEARBY_LOGS(INFO) << "Previous endpoint name mismatch; passed=" - << absl::BytesToHexString(endpoint.endpoint_info.data()) - << "; expected=" - << absl::BytesToHexString( - discovered_endpoint->endpoint_info.data()); - return; - } - - auto item = discovered_endpoints_.extract(endpoint.endpoint_id); - if (!discovered_endpoints_.count(endpoint.endpoint_id)) { - client->OnEndpointLost(endpoint.service_id, endpoint.endpoint_id); - } -} - -bool BasePcpHandler::IsPreferred( - const BasePcpHandler::DiscoveredEndpoint& new_endpoint, - const BasePcpHandler::DiscoveredEndpoint& old_endpoint) { - std::vector<proto::connections::Medium> mediums = - GetConnectionMediumsByPriority(); - // As we iterate through the list of mediums, we see if we run into the new - // endpoint's medium or the old endpoint's medium first. - for (const auto& medium : mediums) { - if (medium == new_endpoint.medium) { - // The new endpoint's medium came first. It's preferred! - return true; - } - - if (medium == old_endpoint.medium) { - // The old endpoint's medium came first. Stick with the old endpoint! - return false; - } - } - std::string medium_string; - for (const auto& medium : mediums) { - absl::StrAppend(&medium_string, medium, "; "); - } - NEARBY_LOGS(ERROR) << "Failed to find either " << new_endpoint.medium - << " or " << old_endpoint.medium - << " in the list of locally supported mediums despite " - "expecting to find both, when deciding which medium " - << medium_string << " is preferred."; - return false; -} - -Exception BasePcpHandler::OnIncomingConnection( - ClientProxy* client, const ByteArray& remote_endpoint_info, - std::unique_ptr<EndpointChannel> channel, - proto::connections::Medium medium) { - absl::Time start_time = SystemClock::ElapsedRealtime(); - - // Fixes an NPE in ClientProxy.OnConnectionAccepted. The crash happened when - // the client stopped advertising and we nulled out state, followed by an - // incoming connection where we attempted to check that state. - if (!client->IsAdvertising()) { - NEARBY_LOGS(WARNING) << "Ignoring incoming connection on medium " - << proto::connections::Medium_Name( - channel->GetMedium()) - << " because client=" << client->GetClientId() - << " is no longer advertising."; - return {Exception::kIo}; - } - - // Endpoints connecting to us will always tell us about themselves first. - ExceptionOr<OfflineFrame> wrapped_frame = - ReadConnectionRequestFrame(channel.get()); - - if (!wrapped_frame.ok()) { - if (wrapped_frame.exception()) { - NEARBY_LOGS(ERROR) - << "Failed to parse incoming connection request; client=" - << client->GetClientId() - << "; device=" << absl::BytesToHexString(remote_endpoint_info.data()); - ProcessPreConnectionInitiationFailure(client, medium, "", channel.get(), - /* is_incoming= */ false, - start_time, {Status::kError}, - nullptr); - return {Exception::kSuccess}; - } - return wrapped_frame.GetException(); - } - - OfflineFrame& frame = wrapped_frame.result(); - const ConnectionRequestFrame& connection_request = - frame.v1().connection_request(); - NEARBY_LOGS(INFO) << "In onIncomingConnection(" - << proto::connections::Medium_Name(channel->GetMedium()) - << ") for client=" << client->GetClientId() - << ", read ConnectionRequestFrame from endpoint(id=" - << connection_request.endpoint_id() << ")"; - if (client->IsConnectedToEndpoint(connection_request.endpoint_id())) { - NEARBY_LOGS(ERROR) << "Incoming connection on medium " - << proto::connections::Medium_Name(channel->GetMedium()) - << " was denied because we're " - "already connected to endpoint(id=" - << connection_request.endpoint_id() << ")."; - return {Exception::kIo}; - } - - // If we've already sent out a connection request to this endpoint, then this - // is where we need to decide which connection to break. - if (BreakTie(client, connection_request.endpoint_id(), - connection_request.nonce(), channel.get())) { - return {Exception::kSuccess}; - } - - // If our child class says we can't accept any more incoming connections, - // listen to them. - if (ShouldEnforceTopologyConstraints(client->GetAdvertisingOptions()) && - !CanReceiveIncomingConnection(client)) { - NEARBY_LOGS(ERROR) << "Incoming connections are currently disallowed."; - return {Exception::kIo}; - } - - // The ConnectionRequest frame has two fields that both contain the - // EndpointInfo. The legacy field stores it as a string while the newer field - // stores it as a byte array. We'll attempt to grab from the newer field, but - // will accept the older string if it's all that exists. - const ByteArray endpoint_info{connection_request.has_endpoint_info() - ? connection_request.endpoint_info() - : connection_request.endpoint_name()}; - - // Retrieve the keep-alive frame interval and timeout fields. If the frame - // doesn't have those fields, we need to get them as default from feature - // flags to prevent 0-values causing thread ill. - ConnectionOptions options = {.keep_alive_interval_millis = 0, - .keep_alive_timeout_millis = 0}; - if (connection_request.has_keep_alive_interval_millis() && - connection_request.has_keep_alive_timeout_millis()) { - options.keep_alive_interval_millis = - connection_request.keep_alive_interval_millis(); - options.keep_alive_timeout_millis = - connection_request.keep_alive_timeout_millis(); - } - if (options.keep_alive_interval_millis == 0 || - options.keep_alive_timeout_millis == 0 || - options.keep_alive_interval_millis >= options.keep_alive_timeout_millis) { - NEARBY_LOGS(WARNING) - << "Incoming connection has wrong keep-alive frame interval=" - << options.keep_alive_interval_millis - << ", timeout=" << options.keep_alive_timeout_millis - << " values; correct them as default.", - options.keep_alive_interval_millis = - FeatureFlags::GetInstance().GetFlags().keep_alive_interval_millis; - options.keep_alive_timeout_millis = - FeatureFlags::GetInstance().GetFlags().keep_alive_timeout_millis; - } - - // We've successfully connected to the device, and are now about to jump on to - // the EncryptionRunner thread to start running our encryption protocol. We'll - // mark ourselves as pending in case we get another call to RequestConnection - // or OnIncomingConnection, so that we can cancel the connection if needed. - // Not using designated initializers here since the VS C++ compiler errors - // out indicating that MediumSelector<bool> is not an aggregate - PendingConnectionInfo pendingConnectionInfo{}; - pendingConnectionInfo.client = client; - pendingConnectionInfo.remote_endpoint_info = endpoint_info; - pendingConnectionInfo.nonce = connection_request.nonce(); - pendingConnectionInfo.is_incoming = true; - pendingConnectionInfo.start_time = start_time; - pendingConnectionInfo.listener = advertising_listener_; - pendingConnectionInfo.options = options; - pendingConnectionInfo.supported_mediums = - parser::ConnectionRequestMediumsToMediums(connection_request); - pendingConnectionInfo.channel = std::move(channel); - - auto* owned_channel = pending_connections_ - .emplace(connection_request.endpoint_id(), - std::move(pendingConnectionInfo)) - .first->second.channel.get(); - - // Next, we'll set up encryption. - encryption_runner_.StartServer(client, connection_request.endpoint_id(), - owned_channel, GetResultListener()); - return {Exception::kSuccess}; -} - -bool BasePcpHandler::BreakTie(ClientProxy* client, - const std::string& endpoint_id, - std::int32_t incoming_nonce, - EndpointChannel* endpoint_channel) { - auto it = pending_connections_.find(endpoint_id); - if (it != pending_connections_.end()) { - BasePcpHandler::PendingConnectionInfo& info = it->second; - - NEARBY_LOGS(INFO) - << "In onIncomingConnection(" - << proto::connections::Medium_Name(endpoint_channel->GetMedium()) - << ") for client=" << client->GetClientId() - << ", found a collision with endpoint " << endpoint_id - << ". We've already sent a connection request to them with nonce " - << info.nonce - << ", but they're also trying to connect to us with nonce " - << incoming_nonce; - // Break the lowest connection. In the (extremely) rare case of a tie, break - // both. - if (info.nonce > incoming_nonce) { - // Our connection won! Clean up their connection. - endpoint_channel->Close(); - - NEARBY_LOGS(INFO) << "In onIncomingConnection(" - << proto::connections::Medium_Name( - endpoint_channel->GetMedium()) - << ") for client=" << client->GetClientId() - << ", cleaned up the collision with endpoint " - << endpoint_id << " by closing their channel."; - return true; - } else if (info.nonce < incoming_nonce) { - // Aw, we lost. Clean up our connection, and then we'll let their - // connection continue on. - ProcessTieBreakLoss(client, endpoint_id, &info); - NEARBY_LOGS(INFO) - << "In onIncomingConnection(" - << proto::connections::Medium_Name(endpoint_channel->GetMedium()) - << ") for client=" << client->GetClientId() - << ", cleaned up the collision with endpoint " << endpoint_id - << " by closing our channel and notifying our client of the failure."; - } else { - // Oh. Huh. We both lost. Well, that's awkward. We'll clean up both and - // just force the devices to retry. - endpoint_channel->Close(); - - ProcessTieBreakLoss(client, endpoint_id, &info); - - NEARBY_LOGS(INFO) - << "In onIncomingConnection(" - << proto::connections::Medium_Name(endpoint_channel->GetMedium()) - << ") for client=" << client->GetClientId() - << ", cleaned up the collision with endpoint " << endpoint_id - << " by closing both channels. Our nonces were identical, so we " - "couldn't decide which channel to use."; - return true; - } - } - - return false; -} - -void BasePcpHandler::ProcessTieBreakLoss( - ClientProxy* client, const std::string& endpoint_id, - BasePcpHandler::PendingConnectionInfo* info) { - ProcessPreConnectionInitiationFailure( - client, info->channel->GetMedium(), endpoint_id, info->channel.get(), - info->is_incoming, info->start_time, {Status::kEndpointIoError}, - info->result.lock().get()); - ProcessPreConnectionResultFailure(client, endpoint_id); -} - -bool BasePcpHandler::AppendRemoteBluetoothMacAddressEndpoint( - const std::string& endpoint_id, - const std::string& remote_bluetooth_mac_address, - const ConnectionOptions& local_discovery_options) { - if (!local_discovery_options.allowed.bluetooth) { - return false; - } - - auto it = discovered_endpoints_.equal_range(endpoint_id); - if (it.first == it.second) { - return false; - } - auto endpoint = it.first->second.get(); - for (auto item = it.first; item != it.second; item++) { - if (item->second->medium == proto::connections::Medium::BLUETOOTH) { - NEARBY_LOGS(INFO) - << "Cannot append remote Bluetooth MAC Address endpoint, because " - "the endpoint has already been found over Bluetooth [" - << remote_bluetooth_mac_address << "]"; - return false; - } - } - - auto remote_bluetooth_device = - GetRemoteBluetoothDevice(remote_bluetooth_mac_address); - if (!remote_bluetooth_device.IsValid()) { - NEARBY_LOGS(INFO) - << "Cannot append remote Bluetooth MAC Address endpoint, because a " - "valid Bluetooth device could not be derived [" - << remote_bluetooth_mac_address << "]"; - return false; - } - - auto bluetooth_endpoint = - std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{ - {endpoint_id, endpoint->endpoint_info, endpoint->service_id, - proto::connections::Medium::BLUETOOTH, WebRtcState::kUnconnectable}, - remote_bluetooth_device, - }); - - discovered_endpoints_.emplace(endpoint_id, std::move(bluetooth_endpoint)); - return true; -} - -bool BasePcpHandler::AppendWebRTCEndpoint( - const std::string& endpoint_id, - const ConnectionOptions& local_discovery_options) { - if (!local_discovery_options.allowed.web_rtc) { - return false; - } - - bool should_connect_web_rtc = false; - auto it = discovered_endpoints_.equal_range(endpoint_id); - if (it.first == it.second) return false; - auto endpoint = it.first->second.get(); - for (auto item = it.first; item != it.second; item++) { - if (item->second->web_rtc_state != WebRtcState::kUnconnectable) { - should_connect_web_rtc = true; - break; - } - } - if (!should_connect_web_rtc) return false; - - auto webrtc_endpoint = std::make_shared<WebRtcEndpoint>(WebRtcEndpoint{ - {endpoint_id, endpoint->endpoint_info, endpoint->service_id, - proto::connections::Medium::WEB_RTC, WebRtcState::kConnectable}, - CreatePeerIdFromAdvertisement(endpoint->service_id, endpoint->endpoint_id, - endpoint->endpoint_info), - }); - - discovered_endpoints_.emplace(endpoint_id, std::move(webrtc_endpoint)); - return true; -} - -void BasePcpHandler::EvaluateConnectionResult(ClientProxy* client, - const std::string& endpoint_id, - bool can_close_immediately) { - // Short-circuit immediately if we're not in an actionable state yet. We will - // be called again once the other side has made their decision. - if (!client->IsConnectionAccepted(endpoint_id) && - !client->IsConnectionRejected(endpoint_id)) { - if (!client->HasLocalEndpointResponded(endpoint_id)) { - NEARBY_LOGS(INFO) - << "ConnectionResult: local client did not respond; endpoint_id=" - << endpoint_id; - } else if (!client->HasRemoteEndpointResponded(endpoint_id)) { - NEARBY_LOGS(INFO) - << "ConnectionResult: remote client did not respond; endpoint_id=" - << endpoint_id; - } - return; - } - - // Clean up the endpoint channel from our list of 'pending' connections. It's - // no longer pending. - auto it = pending_connections_.find(endpoint_id); - if (it == pending_connections_.end()) { - NEARBY_LOGS(INFO) << "No pending connection to evaluate; endpoint_id=" - << endpoint_id; - return; - } - - auto pair = pending_connections_.extract(it); - BasePcpHandler::PendingConnectionInfo& connection_info = pair.mapped(); - bool is_connection_accepted = client->IsConnectionAccepted(endpoint_id); - - Status response_code; - if (is_connection_accepted) { - NEARBY_LOGS(INFO) << "Pending connection accepted; endpoint_id=" - << endpoint_id; - response_code = {Status::kSuccess}; - - // Both sides have accepted, so we can now start talking over encrypted - // channels - // Now, after both parties accepted connection (presumably after verifying & - // matching security tokens), we are allowed to extract the shared key. - auto ukey2 = std::move(connection_info.ukey2); - bool succeeded = ukey2->VerifyHandshake(); - CHECK(succeeded); // If this fails, it's a UKEY2 protocol bug. - auto context = ukey2->ToConnectionContext(); - CHECK(context); // there is no way how this can fail, if Verify succeeded. - // If it did, it's a UKEY2 protocol bug. - - channel_manager_->EncryptChannelForEndpoint(endpoint_id, - std::move(context)); - - client->GetAnalyticsRecorder().OnConnectionEstablished( - endpoint_id, - channel_manager_->GetChannelForEndpoint(endpoint_id)->GetMedium(), - connection_info.connection_token); - } else { - NEARBY_LOGS(INFO) << "Pending connection rejected; endpoint_id=" - << endpoint_id; - response_code = {Status::kConnectionRejected}; - } - - // Invoke the client callback to let it know of the connection result. - if (response_code.Ok()) { - client->OnConnectionAccepted(endpoint_id); - } else { - client->OnConnectionRejected(endpoint_id, response_code); - } - - // If the connection failed, clean everything up and short circuit. - if (!is_connection_accepted) { - // Clean up the channel in EndpointManager if it's no longer required. - if (can_close_immediately) { - endpoint_manager_->DiscardEndpoint(client, endpoint_id); - } else { - pending_alarms_.emplace( - endpoint_id, - CancelableAlarm( - "BasePcpHandler.evaluateConnectionResult() delayed close", - [this, client, endpoint_id]() { - endpoint_manager_->DiscardEndpoint(client, endpoint_id); - }, - kRejectedConnectionCloseDelay, &alarm_executor_)); - } - - return; - } - - // Kick off the bandwidth upgrade for incoming connections. - if (connection_info.is_incoming && - AutoUpgradeBandwidth(client->GetAdvertisingOptions())) { - bwu_manager_->InitiateBwuForEndpoint(client, endpoint_id); - } -} - -ExceptionOr<OfflineFrame> BasePcpHandler::ReadConnectionRequestFrame( - EndpointChannel* endpoint_channel) { - if (endpoint_channel == nullptr) { - return ExceptionOr<OfflineFrame>(Exception::kIo); - } - - // To avoid a device connecting but never sending their introductory frame, we - // time out the connection after a certain amount of time. - CancelableAlarm timeout_alarm( - absl::StrCat("PcpHandler(", this->GetStrategy().GetName(), - ")::ReadConnectionRequestFrame"), - [endpoint_channel]() { endpoint_channel->Close(); }, - kConnectionRequestReadTimeout, &alarm_executor_); - // Do a blocking read to try and find the ConnectionRequestFrame - ExceptionOr<ByteArray> wrapped_bytes = endpoint_channel->Read(); - timeout_alarm.Cancel(); - - if (!wrapped_bytes.ok()) { - return ExceptionOr<OfflineFrame>(wrapped_bytes.exception()); - } - - ByteArray bytes = std::move(wrapped_bytes.result()); - ExceptionOr<OfflineFrame> wrapped_frame = parser::FromBytes(bytes); - if (wrapped_frame.GetException().Raised(Exception::kInvalidProtocolBuffer)) { - return ExceptionOr<OfflineFrame>(Exception::kIo); - } - - OfflineFrame& frame = wrapped_frame.result(); - if (V1Frame::CONNECTION_REQUEST != parser::GetFrameType(frame)) { - return ExceptionOr<OfflineFrame>(Exception::kIo); - } - - return wrapped_frame; -} - -std::string BasePcpHandler::GetHashedConnectionToken( - const ByteArray& token_bytes) { - auto token = std::string(token_bytes); - return location::nearby::Base64Utils::Encode( - Utils::Sha256Hash(token, token.size())) - .substr(0, kConnectionTokenLength); -} - -void BasePcpHandler::LogConnectionAttempt(ClientProxy* client, Medium medium, - const std::string& endpoint_id, - bool is_incoming, - absl::Time start_time) { - proto::connections::ConnectionAttemptResult result = - Cancelled(client, endpoint_id) ? proto::connections::RESULT_CANCELLED - : proto::connections::RESULT_ERROR; - if (is_incoming) { - client->GetAnalyticsRecorder().OnIncomingConnectionAttempt( - proto::connections::INITIAL, medium, result, - SystemClock::ElapsedRealtime() - start_time, - /* connection_token= */ ""); - } else { - client->GetAnalyticsRecorder().OnOutgoingConnectionAttempt( - endpoint_id, proto::connections::INITIAL, medium, result, - SystemClock::ElapsedRealtime() - start_time, - /* connection_token= */ ""); - } -} - -bool BasePcpHandler::Cancelled(ClientProxy* client, - const std::string& endpoint_id) { - if (endpoint_id.empty()) { - return false; - } - - return client->GetCancellationFlag(endpoint_id)->Cancelled(); -} - -///////////////////// BasePcpHandler::PendingConnectionInfo /////////////////// - -void BasePcpHandler::PendingConnectionInfo::SetCryptoContext( - std::unique_ptr<UKey2Handshake> ukey2) { - this->ukey2 = std::move(ukey2); -} - -BasePcpHandler::PendingConnectionInfo::~PendingConnectionInfo() { - auto future_status = result.lock(); - if (future_status && !future_status->IsSet()) { - NEARBY_LOG(INFO, "Future was not set; destroying info"); - future_status->Set({Status::kError}); - } - - if (channel != nullptr) { - channel->Close(proto::connections::DisconnectionReason::SHUTDOWN); - } - - // Destroy crypto context now; for some reason, crypto context destructor - // segfaults if it is not destroyed here. - this->ukey2.reset(); -} - -void BasePcpHandler::PendingConnectionInfo::LocalEndpointAcceptedConnection( - const std::string& endpoint_id, const PayloadListener& payload_listener) { - client->LocalEndpointAcceptedConnection(endpoint_id, payload_listener); -} - -void BasePcpHandler::PendingConnectionInfo::LocalEndpointRejectedConnection( - const std::string& endpoint_id) { - client->LocalEndpointRejectedConnection(endpoint_id); -} - -} // namespace connections -} // namespace nearby -} // namespace location |