diff options
Diffstat (limited to 'chromium/net/quic/quic_data_stream.cc')
-rw-r--r-- | chromium/net/quic/quic_data_stream.cc | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/chromium/net/quic/quic_data_stream.cc b/chromium/net/quic/quic_data_stream.cc new file mode 100644 index 00000000000..3c992a777b0 --- /dev/null +++ b/chromium/net/quic/quic_data_stream.cc @@ -0,0 +1,333 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_data_stream.h" + +#include "base/logging.h" +#include "net/quic/quic_session.h" +#include "net/quic/quic_spdy_decompressor.h" +#include "net/spdy/write_blocked_list.h" + +using base::StringPiece; +using std::min; + +namespace net { + +#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") + +namespace { + +// This is somewhat arbitrary. It's possible, but unlikely, we will either fail +// to set a priority client-side, or cancel a stream before stripping the +// priority from the wire server-side. In either case, start out with a +// priority in the middle. +QuicPriority kDefaultPriority = 3; + +// Appends bytes from data into partial_data_buffer. Once partial_data_buffer +// reaches 4 bytes, copies the data into 'result' and clears +// partial_data_buffer. +// Returns the number of bytes consumed. +uint32 StripUint32(const char* data, uint32 data_len, + string* partial_data_buffer, + uint32* result) { + DCHECK_GT(4u, partial_data_buffer->length()); + size_t missing_size = 4 - partial_data_buffer->length(); + if (data_len < missing_size) { + StringPiece(data, data_len).AppendToString(partial_data_buffer); + return data_len; + } + StringPiece(data, missing_size).AppendToString(partial_data_buffer); + DCHECK_EQ(4u, partial_data_buffer->length()); + memcpy(result, partial_data_buffer->data(), 4); + partial_data_buffer->clear(); + return missing_size; +} + +} // namespace + +QuicDataStream::QuicDataStream(QuicStreamId id, + QuicSession* session) + : ReliableQuicStream(id, session), + visitor_(NULL), + headers_decompressed_(false), + priority_(kDefaultPriority), + headers_id_(0), + decompression_failed_(false), + priority_parsed_(false) { + DCHECK_NE(kCryptoStreamId, id); +} + +QuicDataStream::~QuicDataStream() { +} + +size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) { + if (FinishedReadingHeaders()) { + // If the headers have been read, simply delegate to the sequencer's + // Readv method. + return sequencer()->Readv(iov, iov_len); + } + // Otherwise, copy decompressed header data into |iov|. + size_t bytes_consumed = 0; + size_t iov_index = 0; + while (iov_index < iov_len && + decompressed_headers_.length() > bytes_consumed) { + size_t bytes_to_read = min(iov[iov_index].iov_len, + decompressed_headers_.length() - bytes_consumed); + char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); + memcpy(iov_ptr, + decompressed_headers_.data() + bytes_consumed, bytes_to_read); + bytes_consumed += bytes_to_read; + ++iov_index; + } + decompressed_headers_.erase(0, bytes_consumed); + return bytes_consumed; +} + +int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) { + if (FinishedReadingHeaders()) { + return sequencer()->GetReadableRegions(iov, iov_len); + } + if (iov_len == 0) { + return 0; + } + iov[0].iov_base = static_cast<void*>( + const_cast<char*>(decompressed_headers_.data())); + iov[0].iov_len = decompressed_headers_.length(); + return 1; +} + +bool QuicDataStream::IsDoneReading() const { + if (!headers_decompressed_ || !decompressed_headers_.empty()) { + return false; + } + return sequencer()->IsClosed(); +} + +bool QuicDataStream::HasBytesToRead() const { + return !decompressed_headers_.empty() || sequencer()->HasBytesToRead(); +} + +void QuicDataStream::set_priority(QuicPriority priority) { + DCHECK_EQ(0u, stream_bytes_written()); + priority_ = priority; +} + +QuicPriority QuicDataStream::EffectivePriority() const { + return priority(); +} + +uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { + DCHECK_NE(0u, data_len); + + uint32 total_bytes_consumed = 0; + if (headers_id_ == 0u) { + total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); + data += total_bytes_consumed; + data_len -= total_bytes_consumed; + if (data_len == 0 || total_bytes_consumed == 0) { + return total_bytes_consumed; + } + } + DCHECK_NE(0u, headers_id_); + + // Once the headers are finished, we simply pass the data through. + if (headers_decompressed_) { + // Some buffered header data remains. + if (!decompressed_headers_.empty()) { + ProcessHeaderData(); + } + if (decompressed_headers_.empty()) { + DVLOG(1) << "Delegating procesing to ProcessData"; + total_bytes_consumed += ProcessData(data, data_len); + } + return total_bytes_consumed; + } + + QuicHeaderId current_header_id = + session()->decompressor()->current_header_id(); + // Ensure that this header id looks sane. + if (headers_id_ < current_header_id || + headers_id_ > kMaxHeaderIdDelta + current_header_id) { + DVLOG(1) << ENDPOINT + << "Invalid headers for stream: " << id() + << " header_id: " << headers_id_ + << " current_header_id: " << current_header_id; + session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); + return total_bytes_consumed; + } + + // If we are head-of-line blocked on decompression, then back up. + if (current_header_id != headers_id_) { + session()->MarkDecompressionBlocked(headers_id_, id()); + DVLOG(1) << ENDPOINT + << "Unable to decompress header data for stream: " << id() + << " header_id: " << headers_id_; + return total_bytes_consumed; + } + + // Decompressed data will be delivered to decompressed_headers_. + size_t bytes_consumed = session()->decompressor()->DecompressData( + StringPiece(data, data_len), this); + DCHECK_NE(0u, bytes_consumed); + if (bytes_consumed > data_len) { + DCHECK(false) << "DecompressData returned illegal value"; + OnDecompressionError(); + return total_bytes_consumed; + } + total_bytes_consumed += bytes_consumed; + data += bytes_consumed; + data_len -= bytes_consumed; + + if (decompression_failed_) { + // The session will have been closed in OnDecompressionError. + return total_bytes_consumed; + } + + // Headers are complete if the decompressor has moved on to the + // next stream. + headers_decompressed_ = + session()->decompressor()->current_header_id() != headers_id_; + if (!headers_decompressed_) { + DCHECK_EQ(0u, data_len); + } + + ProcessHeaderData(); + + if (!headers_decompressed_ || !decompressed_headers_.empty()) { + return total_bytes_consumed; + } + + // We have processed all of the decompressed data but we might + // have some more raw data to process. + if (data_len > 0) { + total_bytes_consumed += ProcessData(data, data_len); + } + + // The sequencer will push any additional buffered frames if this data + // has been completely consumed. + return total_bytes_consumed; +} + +const IPEndPoint& QuicDataStream::GetPeerAddress() { + return session()->peer_address(); +} + +QuicSpdyCompressor* QuicDataStream::compressor() { + return session()->compressor(); +} + +bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { + return session()->GetSSLInfo(ssl_info); +} + +uint32 QuicDataStream::ProcessHeaderData() { + if (decompressed_headers_.empty()) { + return 0; + } + + size_t bytes_processed = ProcessData(decompressed_headers_.data(), + decompressed_headers_.length()); + if (bytes_processed == decompressed_headers_.length()) { + decompressed_headers_.clear(); + } else { + decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); + } + return bytes_processed; +} + +void QuicDataStream::OnDecompressorAvailable() { + DCHECK_EQ(headers_id_, + session()->decompressor()->current_header_id()); + DCHECK(!headers_decompressed_); + DCHECK(!decompression_failed_); + DCHECK_EQ(0u, decompressed_headers_.length()); + + while (!headers_decompressed_) { + struct iovec iovec; + if (sequencer()->GetReadableRegions(&iovec, 1) == 0) { + return; + } + + size_t bytes_consumed = session()->decompressor()->DecompressData( + StringPiece(static_cast<char*>(iovec.iov_base), + iovec.iov_len), + this); + DCHECK_LE(bytes_consumed, iovec.iov_len); + if (decompression_failed_) { + return; + } + sequencer()->MarkConsumed(bytes_consumed); + + headers_decompressed_ = + session()->decompressor()->current_header_id() != headers_id_; + } + + // Either the headers are complete, or the all data as been consumed. + ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. + if (IsDoneReading()) { + OnFinRead(); + } else if (FinishedReadingHeaders()) { + sequencer()->FlushBufferedFrames(); + } +} + +bool QuicDataStream::OnDecompressedData(StringPiece data) { + data.AppendToString(&decompressed_headers_); + return true; +} + +void QuicDataStream::OnDecompressionError() { + DCHECK(!decompression_failed_); + decompression_failed_ = true; + session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); +} + +void QuicDataStream::OnClose() { + ReliableQuicStream::OnClose(); + + if (visitor_) { + Visitor* visitor = visitor_; + // Calling Visitor::OnClose() may result the destruction of the visitor, + // so we need to ensure we don't call it again. + visitor_ = NULL; + visitor->OnClose(this); + } +} + +uint32 QuicDataStream::StripPriorityAndHeaderId( + const char* data, uint32 data_len) { + uint32 total_bytes_parsed = 0; + + if (!priority_parsed_ && session()->connection()->is_server()) { + QuicPriority temporary_priority = priority_; + total_bytes_parsed = StripUint32( + data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); + if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { + priority_parsed_ = true; + + // Spdy priorities are inverted, so the highest numerical value is the + // lowest legal priority. + if (temporary_priority > QuicUtils::LowestPriority()) { + session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); + return 0; + } + priority_ = temporary_priority; + } + data += total_bytes_parsed; + data_len -= total_bytes_parsed; + } + if (data_len > 0 && headers_id_ == 0u) { + // The headers ID has not yet been read. Strip it from the beginning of + // the data stream. + total_bytes_parsed += StripUint32( + data, data_len, &headers_id_and_priority_buffer_, &headers_id_); + } + return total_bytes_parsed; +} + +bool QuicDataStream::FinishedReadingHeaders() { + return headers_decompressed_ && decompressed_headers_.empty(); +} + +} // namespace net |