diff options
Diffstat (limited to 'chromium/net/quic/core/quic_spdy_stream.cc')
-rw-r--r-- | chromium/net/quic/core/quic_spdy_stream.cc | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/chromium/net/quic/core/quic_spdy_stream.cc b/chromium/net/quic/core/quic_spdy_stream.cc new file mode 100644 index 00000000000..3777177c1bf --- /dev/null +++ b/chromium/net/quic/core/quic_spdy_stream.cc @@ -0,0 +1,426 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/core/quic_spdy_stream.h" + +#include <utility> + +#include "base/logging.h" +#include "base/strings/string_number_conversions.h" +#include "net/quic/core/quic_bug_tracker.h" +#include "net/quic/core/quic_spdy_session.h" +#include "net/quic/core/quic_utils.h" +#include "net/quic/core/quic_write_blocked_list.h" +#include "net/quic/core/spdy_utils.h" + +using base::IntToString; +using base::StringPiece; +using std::min; +using std::string; + +namespace net { + +#define ENDPOINT \ + (session()->perspective() == Perspective::IS_SERVER ? "Server: " : "Client:" \ + " ") + +QuicSpdyStream::QuicSpdyStream(QuicStreamId id, QuicSpdySession* spdy_session) + : ReliableQuicStream(id, spdy_session), + spdy_session_(spdy_session), + visitor_(nullptr), + headers_decompressed_(false), + priority_(kDefaultPriority), + trailers_decompressed_(false), + trailers_consumed_(false) { + DCHECK_NE(kCryptoStreamId, id); + // Don't receive any callbacks from the sequencer until headers + // are complete. + sequencer()->SetBlockedUntilFlush(); + spdy_session_->RegisterStreamPriority(id, priority_); +} + +QuicSpdyStream::~QuicSpdyStream() { + if (spdy_session_ != nullptr) { + spdy_session_->UnregisterStreamPriority(id()); + } +} + +void QuicSpdyStream::CloseWriteSide() { + if (!fin_received() && !rst_received() && sequencer()->ignore_read_data() && + !rst_sent()) { + DCHECK(fin_sent()); + // Tell the peer to stop sending further data. + DVLOG(1) << ENDPOINT << "Send QUIC_STREAM_NO_ERROR on stream " << id(); + Reset(QUIC_STREAM_NO_ERROR); + } + + ReliableQuicStream::CloseWriteSide(); +} + +void QuicSpdyStream::StopReading() { + if (!fin_received() && !rst_received() && write_side_closed() && + !rst_sent()) { + DCHECK(fin_sent()); + // Tell the peer to stop sending further data. + DVLOG(1) << ENDPOINT << "Send QUIC_STREAM_NO_ERROR on stream " << id(); + Reset(QUIC_STREAM_NO_ERROR); + } + ReliableQuicStream::StopReading(); +} + +size_t QuicSpdyStream::WriteHeaders( + SpdyHeaderBlock header_block, + bool fin, + QuicAckListenerInterface* ack_notifier_delegate) { + size_t bytes_written = spdy_session_->WriteHeaders( + id(), std::move(header_block), fin, priority_, ack_notifier_delegate); + if (fin) { + // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent. + set_fin_sent(true); + CloseWriteSide(); + } + return bytes_written; +} + +void QuicSpdyStream::WriteOrBufferBody( + const string& data, + bool fin, + QuicAckListenerInterface* ack_notifier_delegate) { + WriteOrBufferData(data, fin, ack_notifier_delegate); +} + +size_t QuicSpdyStream::WriteTrailers( + SpdyHeaderBlock trailer_block, + QuicAckListenerInterface* ack_notifier_delegate) { + if (fin_sent()) { + QUIC_BUG << "Trailers cannot be sent after a FIN."; + return 0; + } + + // The header block must contain the final offset for this stream, as the + // trailers may be processed out of order at the peer. + DVLOG(1) << "Inserting trailer: (" << kFinalOffsetHeaderKey << ", " + << stream_bytes_written() + queued_data_bytes() << ")"; + trailer_block.insert(std::make_pair( + kFinalOffsetHeaderKey, + IntToString(stream_bytes_written() + queued_data_bytes()))); + + // Write the trailing headers with a FIN, and close stream for writing: + // trailers are the last thing to be sent on a stream. + const bool kFin = true; + size_t bytes_written = spdy_session_->WriteHeaders( + id(), std::move(trailer_block), kFin, priority_, ack_notifier_delegate); + set_fin_sent(kFin); + + // Trailers are the last thing to be sent on a stream, but if there is still + // queued data then CloseWriteSide() will cause it never to be sent. + if (queued_data_bytes() == 0) { + CloseWriteSide(); + } + + return bytes_written; +} + +size_t QuicSpdyStream::Readv(const struct iovec* iov, size_t iov_len) { + DCHECK(FinishedReadingHeaders()); + return sequencer()->Readv(iov, iov_len); +} + +int QuicSpdyStream::GetReadableRegions(iovec* iov, size_t iov_len) const { + DCHECK(FinishedReadingHeaders()); + return sequencer()->GetReadableRegions(iov, iov_len); +} + +void QuicSpdyStream::MarkConsumed(size_t num_bytes) { + DCHECK(FinishedReadingHeaders()); + return sequencer()->MarkConsumed(num_bytes); +} + +bool QuicSpdyStream::IsDoneReading() const { + bool done_reading_headers = FinishedReadingHeaders(); + bool done_reading_body = sequencer()->IsClosed(); + bool done_reading_trailers = FinishedReadingTrailers(); + return done_reading_headers && done_reading_body && done_reading_trailers; +} + +bool QuicSpdyStream::HasBytesToRead() const { + bool headers_to_read = !decompressed_headers_.empty(); + bool body_to_read = sequencer()->HasBytesToRead(); + bool trailers_to_read = !decompressed_trailers_.empty(); + return headers_to_read || body_to_read || trailers_to_read; +} + +void QuicSpdyStream::MarkHeadersConsumed(size_t bytes_consumed) { + decompressed_headers_.erase(0, bytes_consumed); + if (FinishedReadingHeaders()) { + sequencer()->SetUnblocked(); + } +} + +void QuicSpdyStream::MarkTrailersConsumed(size_t bytes_consumed) { + decompressed_trailers_.erase(0, bytes_consumed); +} + +void QuicSpdyStream::MarkTrailersConsumed() { + trailers_consumed_ = true; +} + +void QuicSpdyStream::ConsumeHeaderList() { + header_list_.Clear(); + if (FinishedReadingHeaders()) { + sequencer()->SetUnblocked(); + } +} + +void QuicSpdyStream::SetPriority(SpdyPriority priority) { + DCHECK_EQ(0u, stream_bytes_written()); + spdy_session_->UpdateStreamPriority(id(), priority); + priority_ = priority; +} + +void QuicSpdyStream::OnStreamHeaders(StringPiece headers_data) { + if (!headers_decompressed_) { + headers_data.AppendToString(&decompressed_headers_); + } else { + DCHECK(!trailers_decompressed_); + headers_data.AppendToString(&decompressed_trailers_); + } +} + +void QuicSpdyStream::OnStreamHeadersPriority(SpdyPriority priority) { + DCHECK_EQ(Perspective::IS_SERVER, session()->connection()->perspective()); + SetPriority(priority); +} + +void QuicSpdyStream::OnStreamHeadersComplete(bool fin, size_t frame_len) { + if (!headers_decompressed_) { + OnInitialHeadersComplete(fin, frame_len); + } else { + OnTrailingHeadersComplete(fin, frame_len); + } +} + +void QuicSpdyStream::OnStreamHeaderList(bool fin, + size_t frame_len, + const QuicHeaderList& header_list) { + if (!headers_decompressed_) { + OnInitialHeadersComplete(fin, frame_len, header_list); + } else { + OnTrailingHeadersComplete(fin, frame_len, header_list); + } +} + +void QuicSpdyStream::OnInitialHeadersComplete(bool fin, size_t /*frame_len*/) { + headers_decompressed_ = true; + if (fin) { + OnStreamFrame(QuicStreamFrame(id(), fin, 0, StringPiece())); + } + if (FinishedReadingHeaders()) { + sequencer()->SetUnblocked(); + } +} + +void QuicSpdyStream::OnInitialHeadersComplete( + bool fin, + size_t /*frame_len*/, + const QuicHeaderList& header_list) { + headers_decompressed_ = true; + header_list_ = header_list; + if (fin) { + OnStreamFrame(QuicStreamFrame(id(), fin, 0, StringPiece())); + } + if (FinishedReadingHeaders()) { + sequencer()->SetUnblocked(); + } +} + +void QuicSpdyStream::OnPromiseHeaders(StringPiece headers_data) { + headers_data.AppendToString(&decompressed_headers_); +} + +void QuicSpdyStream::OnPromiseHeadersComplete( + QuicStreamId /* promised_stream_id */, + size_t /* frame_len */) { + // To be overridden in QuicSpdyClientStream. Not supported on + // server side. + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Promise headers received by server", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; +} + +void QuicSpdyStream::OnPromiseHeaderList( + QuicStreamId /* promised_id */, + size_t /* frame_len */, + const QuicHeaderList& /*header_list */) { + // To be overridden in QuicSpdyClientStream. Not supported on + // server side. + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Promise headers received by server", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; +} + +void QuicSpdyStream::OnTrailingHeadersComplete(bool fin, size_t /*frame_len*/) { + DCHECK(!trailers_decompressed_); + if (fin_received()) { + DLOG(ERROR) << "Received Trailers after FIN, on stream: " << id(); + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers after fin", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + if (!fin) { + DLOG(ERROR) << "Trailers must have FIN set, on stream: " << id(); + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Fin missing from trailers", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + + size_t final_byte_offset = 0; + if (!SpdyUtils::ParseTrailers(decompressed_trailers().data(), + decompressed_trailers().length(), + &final_byte_offset, &received_trailers_)) { + DLOG(ERROR) << "Trailers are malformed: " << id(); + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers are malformed", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + + // The data on this stream ends at |final_byte_offset|. + DVLOG(1) << "Stream ends at byte offset: " << final_byte_offset + << " currently read: " << stream_bytes_read(); + trailers_decompressed_ = true; + OnStreamFrame(QuicStreamFrame(id(), fin, final_byte_offset, StringPiece())); +} + +void QuicSpdyStream::OnTrailingHeadersComplete( + bool fin, + size_t /*frame_len*/, + const QuicHeaderList& header_list) { + DCHECK(!trailers_decompressed_); + if (fin_received()) { + DLOG(ERROR) << "Received Trailers after FIN, on stream: " << id(); + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers after fin", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + if (!fin) { + DLOG(ERROR) << "Trailers must have FIN set, on stream: " << id(); + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Fin missing from trailers", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + + size_t final_byte_offset = 0; + if (!SpdyUtils::CopyAndValidateTrailers(header_list, &final_byte_offset, + &received_trailers_)) { + DLOG(ERROR) << "Trailers are malformed: " << id(); + session()->connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers are malformed", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + trailers_decompressed_ = true; + OnStreamFrame(QuicStreamFrame(id(), fin, final_byte_offset, StringPiece())); +} + +void QuicSpdyStream::OnStreamReset(const QuicRstStreamFrame& frame) { + if (frame.error_code != QUIC_STREAM_NO_ERROR) { + ReliableQuicStream::OnStreamReset(frame); + return; + } + DVLOG(1) << "Received QUIC_STREAM_NO_ERROR, not discarding response"; + set_rst_received(true); + MaybeIncreaseHighestReceivedOffset(frame.byte_offset); + set_stream_error(frame.error_code); + CloseWriteSide(); +} + +void QuicSpdyStream::OnClose() { + ReliableQuicStream::OnClose(); + + if (visitor_) { + Visitor* visitor = visitor_; + // Calling Visitor::OnClose() may result the destruction of the visitor, + // so we need to ensure we don't call it again. + visitor_ = nullptr; + visitor->OnClose(this); + } +} + +void QuicSpdyStream::OnCanWrite() { + ReliableQuicStream::OnCanWrite(); + + // Trailers (and hence a FIN) may have been sent ahead of queued body bytes. + if (!HasBufferedData() && fin_sent()) { + CloseWriteSide(); + } +} + +bool QuicSpdyStream::FinishedReadingHeaders() const { + return headers_decompressed_ && decompressed_headers_.empty() && + header_list_.empty(); +} + +bool QuicSpdyStream::ParseHeaderStatusCode(const SpdyHeaderBlock& header, + int* status_code) const { + SpdyHeaderBlock::const_iterator it = header.find(":status"); + if (it == header.end()) { + return false; + } + const StringPiece status(it->second); + if (status.size() != 3) { + return false; + } + // First character must be an integer in range [1,5]. + if (status[0] < '1' || status[0] > '5') { + return false; + } + // The remaining two characters must be integers. + if (!isdigit(status[1]) || !isdigit(status[2])) { + return false; + } + return StringToInt(status, status_code); +} + +bool QuicSpdyStream::FinishedReadingTrailers() const { + // If no further trailing headers are expected, and the decompressed trailers + // (if any) have been consumed, then reading of trailers is finished. + if (!fin_received()) { + return false; + } else if (!trailers_decompressed_) { + return true; + } else { + return trailers_consumed_ && decompressed_trailers_.empty(); + } +} + +SpdyPriority QuicSpdyStream::priority() const { + return priority_; +} + +void QuicSpdyStream::ClearSession() { + spdy_session_ = nullptr; +} + +QuicConsumedData QuicSpdyStream::WritevDataInner( + QuicIOVector iov, + QuicStreamOffset offset, + bool fin, + QuicAckListenerInterface* ack_notifier_delegate) { + if (spdy_session_->headers_stream() != nullptr && + spdy_session_->force_hol_blocking()) { + return spdy_session_->headers_stream()->WritevStreamData( + id(), iov, offset, fin, ack_notifier_delegate); + } + return ReliableQuicStream::WritevDataInner(iov, offset, fin, + ack_notifier_delegate); +} + +} // namespace net |