// Copyright 2019 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "services/network/quic_transport.h" #include "base/auto_reset.h" #include "base/bind.h" #include "base/threading/sequenced_task_runner_handle.h" #include "net/base/io_buffer.h" #include "net/quic/platform/impl/quic_mem_slice_impl.h" #include "net/third_party/quiche/src/quic/core/quic_session.h" #include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h" #include "services/network/network_context.h" #include "services/network/public/mojom/quic_transport.mojom.h" namespace network { namespace { net::QuicTransportClient::Parameters CreateParameters( const std::vector& fingerprints) { net::QuicTransportClient::Parameters params; for (const auto& fingerprint : fingerprints) { params.server_certificate_fingerprints.push_back( quic::CertificateFingerprint{.algorithm = fingerprint->algorithm, .fingerprint = fingerprint->fingerprint}); } return params; } } // namespace class QuicTransport::Stream final { public: class StreamVisitor final : public quic::QuicTransportStream::Visitor { public: explicit StreamVisitor(Stream* stream) : stream_(stream->weak_factory_.GetWeakPtr()) {} ~StreamVisitor() override { if (stream_) { if (stream_->incoming_) { stream_->writable_watcher_.Cancel(); stream_->writable_.reset(); stream_->transport_->client_->OnIncomingStreamClosed( stream_->id_, /*fin_received=*/false); stream_->incoming_ = nullptr; } if (stream_->outgoing_) { stream_->readable_watcher_.Cancel(); stream_->readable_.reset(); stream_->outgoing_ = nullptr; } stream_->MayDisposeLater(); } } // Visitor implementation: void OnCanRead() override { base::SequencedTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::BindOnce(&Stream::Receive, stream_)); } void OnFinRead() override { if (stream_) { stream_->OnFinRead(); } } void OnCanWrite() override { base::SequencedTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::BindOnce(&Stream::Send, stream_)); } private: const base::WeakPtr stream_; }; // Bidirectional Stream(QuicTransport* transport, quic::QuicTransportStream* stream, mojo::ScopedDataPipeConsumerHandle readable, mojo::ScopedDataPipeProducerHandle writable) : transport_(transport), id_(stream->id()), outgoing_(stream), incoming_(stream), readable_(std::move(readable)), writable_(std::move(writable)), readable_watcher_(FROM_HERE, ArmingPolicy::MANUAL), writable_watcher_(FROM_HERE, ArmingPolicy::MANUAL) { DCHECK(outgoing_); DCHECK(incoming_); DCHECK(readable_); DCHECK(writable_); Init(); } // Unidirectional: outgoing Stream(QuicTransport* transport, quic::QuicTransportStream* outgoing, mojo::ScopedDataPipeConsumerHandle readable) : transport_(transport), id_(outgoing->id()), outgoing_(outgoing), readable_(std::move(readable)), readable_watcher_(FROM_HERE, ArmingPolicy::MANUAL), writable_watcher_(FROM_HERE, ArmingPolicy::MANUAL) { DCHECK(outgoing_); DCHECK(readable_); Init(); } // Unidirectional: incoming Stream(QuicTransport* transport, quic::QuicTransportStream* incoming, mojo::ScopedDataPipeProducerHandle writable) : transport_(transport), id_(incoming->id()), incoming_(incoming), writable_(std::move(writable)), readable_watcher_(FROM_HERE, ArmingPolicy::MANUAL), writable_watcher_(FROM_HERE, ArmingPolicy::MANUAL) { DCHECK(incoming_); DCHECK(writable_); Init(); } void NotifyFinFromClient() { has_received_fin_from_client_ = true; MaySendFin(); } void Abort(quic::QuicRstStreamErrorCode code) { auto* stream = incoming_ ? incoming_ : outgoing_; if (!stream) { return; } stream->Reset(code); incoming_ = nullptr; outgoing_ = nullptr; readable_watcher_.Cancel(); readable_.reset(); MayDisposeLater(); } ~Stream() { auto* stream = incoming_ ? incoming_ : outgoing_; if (!stream) { return; } stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED); } private: using ArmingPolicy = mojo::SimpleWatcher::ArmingPolicy; void Init() { if (outgoing_) { DCHECK(readable_); outgoing_->set_visitor(std::make_unique(this)); readable_watcher_.Watch( readable_.get(), MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, base::BindRepeating(&Stream::OnReadable, base::Unretained(this))); readable_watcher_.ArmOrNotify(); } if (incoming_) { DCHECK(writable_); if (incoming_ != outgoing_) { incoming_->set_visitor(std::make_unique(this)); } writable_watcher_.Watch( writable_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, base::BindRepeating(&Stream::OnWritable, base::Unretained(this))); writable_watcher_.ArmOrNotify(); } } void OnReadable(MojoResult result, const mojo::HandleSignalsState& state) { DCHECK_EQ(result, MOJO_RESULT_OK); Send(); } void Send() { MaySendFin(); while (outgoing_ && outgoing_->CanWrite()) { const void* data = nullptr; uint32_t available = 0; MojoResult result = readable_->BeginReadData( &data, &available, MOJO_BEGIN_READ_DATA_FLAG_NONE); if (result == MOJO_RESULT_SHOULD_WAIT) { readable_watcher_.Arm(); return; } if (result == MOJO_RESULT_FAILED_PRECONDITION) { has_seen_end_of_pipe_for_readable_ = true; MaySendFin(); return; } DCHECK_EQ(result, MOJO_RESULT_OK); bool send_result = outgoing_->Write( absl::string_view(reinterpret_cast(data), available)); if (!send_result) { // TODO(yhirano): Handle this failure. readable_->EndReadData(0); return; } readable_->EndReadData(available); } } void OnWritable(MojoResult result, const mojo::HandleSignalsState& state) { DCHECK_EQ(result, MOJO_RESULT_OK); Receive(); } void MaySendFin() { if (!outgoing_) { return; } if (!has_seen_end_of_pipe_for_readable_ || !has_received_fin_from_client_) { return; } if (outgoing_->SendFin()) { outgoing_ = nullptr; readable_watcher_.Cancel(); readable_.reset(); MayDisposeLater(); } // Otherwise, retry in Send(). } void Receive() { while (incoming_ && incoming_->ReadableBytes() > 0) { void* buffer = nullptr; uint32_t available = 0; base::AutoReset auto_reset(&in_two_phase_write_, true); MojoResult result = writable_->BeginWriteData( &buffer, &available, MOJO_BEGIN_WRITE_DATA_FLAG_NONE); if (result == MOJO_RESULT_SHOULD_WAIT) { writable_watcher_.Arm(); return; } if (result == MOJO_RESULT_FAILED_PRECONDITION) { // The client doesn't want further data. writable_watcher_.Cancel(); writable_.reset(); incoming_ = nullptr; MayDisposeLater(); return; } DCHECK_EQ(result, MOJO_RESULT_OK); const size_t num_read_bytes = incoming_->Read(reinterpret_cast(buffer), available); writable_->EndWriteData(num_read_bytes); if (!incoming_) { // |incoming_| can be null here, because OnFinRead can be called in // QuicTransportStream::Read. writable_watcher_.Cancel(); writable_.reset(); MayDisposeLater(); return; } } } void OnFinRead() { incoming_ = nullptr; transport_->client_->OnIncomingStreamClosed(id_, /*fin_received=*/true); if (in_two_phase_write_) { return; } writable_watcher_.Cancel(); writable_.reset(); } void Dispose() { transport_->streams_.erase(id_); // Deletes |this|. } void MayDisposeLater() { if (outgoing_ || incoming_) { return; } base::SequencedTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::BindOnce(&Stream::Dispose, weak_factory_.GetWeakPtr())); } QuicTransport* const transport_; // outlives |this|. const uint32_t id_; // |outgoing_| and |incoming_| point to the same stream when this is a // bidirectional stream. They are owned by |transport_| (via // quic::QuicSession), and the properties will be null-set when the streams // are gone (via StreamVisitor). quic::QuicTransportStream* outgoing_ = nullptr; quic::QuicTransportStream* incoming_ = nullptr; mojo::ScopedDataPipeConsumerHandle readable_; // for |outgoing| mojo::ScopedDataPipeProducerHandle writable_; // for |incoming| mojo::SimpleWatcher readable_watcher_; mojo::SimpleWatcher writable_watcher_; bool in_two_phase_write_ = false; bool has_seen_end_of_pipe_for_readable_ = false; bool has_received_fin_from_client_ = false; // This must be the last member. base::WeakPtrFactory weak_factory_{this}; }; // namespace network QuicTransport::QuicTransport( const GURL& url, const url::Origin& origin, const net::NetworkIsolationKey& key, const std::vector& fingerprints, NetworkContext* context, mojo::PendingRemote handshake_client) : transport_(std::make_unique( url, origin, this, key, context->url_request_context(), CreateParameters(fingerprints))), context_(context), receiver_(this), handshake_client_(std::move(handshake_client)) { handshake_client_.set_disconnect_handler( base::BindOnce(&QuicTransport::Dispose, base::Unretained(this))); transport_->Connect(); } QuicTransport::~QuicTransport() = default; void QuicTransport::SendDatagram(base::span data, base::OnceCallback callback) { DCHECK(!torn_down_); auto buffer = base::MakeRefCounted(data.size()); memcpy(buffer->data(), data.data(), data.size()); quic::QuicMemSlice slice( quic::QuicMemSliceImpl(std::move(buffer), data.size())); const quic::MessageStatus status = transport_->session()->datagram_queue()->SendOrQueueDatagram( std::move(slice)); std::move(callback).Run(status == quic::MESSAGE_STATUS_SUCCESS); } void QuicTransport::CreateStream( mojo::ScopedDataPipeConsumerHandle readable, mojo::ScopedDataPipeProducerHandle writable, base::OnceCallback callback) { // |readable| is non-nullable, |writable| is nullable. DCHECK(readable); if (handshake_client_) { // Invalid request. std::move(callback).Run(false, 0); return; } quic::QuicTransportClientSession* const session = transport_->session(); if (writable) { // Bidirectional if (!session->CanOpenNextOutgoingBidirectionalStream()) { // TODO(crbug.com/104236): Instead of rejecting the creation request, we // should wait in this case. std::move(callback).Run(false, 0); return; } quic::QuicTransportStream* const stream = session->OpenOutgoingBidirectionalStream(); DCHECK(stream); streams_.insert(std::make_pair( stream->id(), std::make_unique(this, stream, std::move(readable), std::move(writable)))); std::move(callback).Run(true, stream->id()); return; } // Unidirectional if (!session->CanOpenNextOutgoingUnidirectionalStream()) { // TODO(crbug.com/104236): Instead of rejecting the creation request, we // should wait in this case. std::move(callback).Run(false, 0); return; } quic::QuicTransportStream* const stream = session->OpenOutgoingUnidirectionalStream(); DCHECK(stream); streams_.insert(std::make_pair( stream->id(), std::make_unique(this, stream, std::move(readable)))); std::move(callback).Run(true, stream->id()); } void QuicTransport::AcceptBidirectionalStream( BidirectionalStreamAcceptanceCallback acceptance) { bidirectional_stream_acceptances_.push(std::move(acceptance)); OnIncomingBidirectionalStreamAvailable(); } void QuicTransport::AcceptUnidirectionalStream( UnidirectionalStreamAcceptanceCallback acceptance) { unidirectional_stream_acceptances_.push(std::move(acceptance)); OnIncomingUnidirectionalStreamAvailable(); } void QuicTransport::SendFin(uint32_t stream) { auto it = streams_.find(stream); if (it == streams_.end()) { return; } it->second->NotifyFinFromClient(); } void QuicTransport::AbortStream(uint32_t stream, uint64_t code) { auto it = streams_.find(stream); if (it == streams_.end()) { return; } auto code_to_pass = quic::QuicRstStreamErrorCode::QUIC_STREAM_NO_ERROR; if (code < quic::QuicRstStreamErrorCode::QUIC_STREAM_LAST_ERROR) { code_to_pass = static_cast(code); } it->second->Abort(code_to_pass); } void QuicTransport::OnConnected() { if (torn_down_) { return; } DCHECK(handshake_client_); handshake_client_->OnConnectionEstablished( receiver_.BindNewPipeAndPassRemote(), client_.BindNewPipeAndPassReceiver()); handshake_client_.reset(); client_.set_disconnect_handler( base::BindOnce(&QuicTransport::Dispose, base::Unretained(this))); } void QuicTransport::OnConnectionFailed() { if (torn_down_) { return; } DCHECK(handshake_client_); // Here we assume that the error is not going to handed to the // initiator renderer. handshake_client_->OnHandshakeFailed(transport_->error()); TearDown(); } void QuicTransport::OnClosed() { if (torn_down_) { return; } DCHECK(!handshake_client_); TearDown(); } void QuicTransport::OnError() { if (torn_down_) { return; } DCHECK(!handshake_client_); TearDown(); } void QuicTransport::OnIncomingBidirectionalStreamAvailable() { DCHECK(!handshake_client_); DCHECK(client_); while (!bidirectional_stream_acceptances_.empty()) { quic::QuicTransportStream* const stream = transport_->session()->AcceptIncomingBidirectionalStream(); if (!stream) { return; } auto acceptance = std::move(bidirectional_stream_acceptances_.front()); bidirectional_stream_acceptances_.pop(); mojo::ScopedDataPipeConsumerHandle readable_for_outgoing; mojo::ScopedDataPipeProducerHandle writable_for_outgoing; mojo::ScopedDataPipeConsumerHandle readable_for_incoming; mojo::ScopedDataPipeProducerHandle writable_for_incoming; const MojoCreateDataPipeOptions options = { sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024}; if (mojo::CreateDataPipe(&options, &writable_for_outgoing, &readable_for_outgoing) != MOJO_RESULT_OK) { stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED); // TODO(yhirano): Error the entire connection. return; } if (mojo::CreateDataPipe(&options, &writable_for_incoming, &readable_for_incoming) != MOJO_RESULT_OK) { stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED); // TODO(yhirano): Error the entire connection. return; } streams_.insert(std::make_pair( stream->id(), std::make_unique(this, stream, std::move(readable_for_outgoing), std::move(writable_for_incoming)))); std::move(acceptance) .Run(stream->id(), std::move(readable_for_incoming), std::move(writable_for_outgoing)); } } void QuicTransport::OnIncomingUnidirectionalStreamAvailable() { DCHECK(!handshake_client_); DCHECK(client_); while (!unidirectional_stream_acceptances_.empty()) { quic::QuicTransportStream* const stream = transport_->session()->AcceptIncomingUnidirectionalStream(); if (!stream) { return; } auto acceptance = std::move(unidirectional_stream_acceptances_.front()); unidirectional_stream_acceptances_.pop(); mojo::ScopedDataPipeConsumerHandle readable_for_incoming; mojo::ScopedDataPipeProducerHandle writable_for_incoming; const MojoCreateDataPipeOptions options = { sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024}; if (mojo::CreateDataPipe(&options, &writable_for_incoming, &readable_for_incoming) != MOJO_RESULT_OK) { stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED); // TODO(yhirano): Error the entire connection. return; } streams_.insert(std::make_pair( stream->id(), std::make_unique( this, stream, std::move(writable_for_incoming)))); std::move(acceptance).Run(stream->id(), std::move(readable_for_incoming)); } } void QuicTransport::OnDatagramReceived(base::StringPiece datagram) { if (torn_down_) { return; } client_->OnDatagramReceived(base::make_span( reinterpret_cast(datagram.data()), datagram.size())); } void QuicTransport::OnCanCreateNewOutgoingBidirectionalStream() { // TODO(yhirano): Implement this. } void QuicTransport::OnCanCreateNewOutgoingUnidirectionalStream() { // TODO(yhirano): Implement this. } void QuicTransport::TearDown() { torn_down_ = true; receiver_.reset(); handshake_client_.reset(); client_.reset(); base::SequencedTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::BindOnce(&QuicTransport::Dispose, weak_factory_.GetWeakPtr())); } void QuicTransport::Dispose() { receiver_.reset(); context_->Remove(this); // |this| is deleted. } } // namespace network