summaryrefslogtreecommitdiff
path: root/chromium/net/quic/quic_data_stream.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/net/quic/quic_data_stream.cc')
-rw-r--r--chromium/net/quic/quic_data_stream.cc333
1 files changed, 333 insertions, 0 deletions
diff --git a/chromium/net/quic/quic_data_stream.cc b/chromium/net/quic/quic_data_stream.cc
new file mode 100644
index 00000000000..3c992a777b0
--- /dev/null
+++ b/chromium/net/quic/quic_data_stream.cc
@@ -0,0 +1,333 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/quic/quic_data_stream.h"
+
+#include "base/logging.h"
+#include "net/quic/quic_session.h"
+#include "net/quic/quic_spdy_decompressor.h"
+#include "net/spdy/write_blocked_list.h"
+
+using base::StringPiece;
+using std::min;
+
+namespace net {
+
+#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
+
+namespace {
+
+// This is somewhat arbitrary. It's possible, but unlikely, we will either fail
+// to set a priority client-side, or cancel a stream before stripping the
+// priority from the wire server-side. In either case, start out with a
+// priority in the middle.
+QuicPriority kDefaultPriority = 3;
+
+// Appends bytes from data into partial_data_buffer. Once partial_data_buffer
+// reaches 4 bytes, copies the data into 'result' and clears
+// partial_data_buffer.
+// Returns the number of bytes consumed.
+uint32 StripUint32(const char* data, uint32 data_len,
+ string* partial_data_buffer,
+ uint32* result) {
+ DCHECK_GT(4u, partial_data_buffer->length());
+ size_t missing_size = 4 - partial_data_buffer->length();
+ if (data_len < missing_size) {
+ StringPiece(data, data_len).AppendToString(partial_data_buffer);
+ return data_len;
+ }
+ StringPiece(data, missing_size).AppendToString(partial_data_buffer);
+ DCHECK_EQ(4u, partial_data_buffer->length());
+ memcpy(result, partial_data_buffer->data(), 4);
+ partial_data_buffer->clear();
+ return missing_size;
+}
+
+} // namespace
+
+QuicDataStream::QuicDataStream(QuicStreamId id,
+ QuicSession* session)
+ : ReliableQuicStream(id, session),
+ visitor_(NULL),
+ headers_decompressed_(false),
+ priority_(kDefaultPriority),
+ headers_id_(0),
+ decompression_failed_(false),
+ priority_parsed_(false) {
+ DCHECK_NE(kCryptoStreamId, id);
+}
+
+QuicDataStream::~QuicDataStream() {
+}
+
+size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) {
+ if (FinishedReadingHeaders()) {
+ // If the headers have been read, simply delegate to the sequencer's
+ // Readv method.
+ return sequencer()->Readv(iov, iov_len);
+ }
+ // Otherwise, copy decompressed header data into |iov|.
+ size_t bytes_consumed = 0;
+ size_t iov_index = 0;
+ while (iov_index < iov_len &&
+ decompressed_headers_.length() > bytes_consumed) {
+ size_t bytes_to_read = min(iov[iov_index].iov_len,
+ decompressed_headers_.length() - bytes_consumed);
+ char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
+ memcpy(iov_ptr,
+ decompressed_headers_.data() + bytes_consumed, bytes_to_read);
+ bytes_consumed += bytes_to_read;
+ ++iov_index;
+ }
+ decompressed_headers_.erase(0, bytes_consumed);
+ return bytes_consumed;
+}
+
+int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) {
+ if (FinishedReadingHeaders()) {
+ return sequencer()->GetReadableRegions(iov, iov_len);
+ }
+ if (iov_len == 0) {
+ return 0;
+ }
+ iov[0].iov_base = static_cast<void*>(
+ const_cast<char*>(decompressed_headers_.data()));
+ iov[0].iov_len = decompressed_headers_.length();
+ return 1;
+}
+
+bool QuicDataStream::IsDoneReading() const {
+ if (!headers_decompressed_ || !decompressed_headers_.empty()) {
+ return false;
+ }
+ return sequencer()->IsClosed();
+}
+
+bool QuicDataStream::HasBytesToRead() const {
+ return !decompressed_headers_.empty() || sequencer()->HasBytesToRead();
+}
+
+void QuicDataStream::set_priority(QuicPriority priority) {
+ DCHECK_EQ(0u, stream_bytes_written());
+ priority_ = priority;
+}
+
+QuicPriority QuicDataStream::EffectivePriority() const {
+ return priority();
+}
+
+uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
+ DCHECK_NE(0u, data_len);
+
+ uint32 total_bytes_consumed = 0;
+ if (headers_id_ == 0u) {
+ total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
+ data += total_bytes_consumed;
+ data_len -= total_bytes_consumed;
+ if (data_len == 0 || total_bytes_consumed == 0) {
+ return total_bytes_consumed;
+ }
+ }
+ DCHECK_NE(0u, headers_id_);
+
+ // Once the headers are finished, we simply pass the data through.
+ if (headers_decompressed_) {
+ // Some buffered header data remains.
+ if (!decompressed_headers_.empty()) {
+ ProcessHeaderData();
+ }
+ if (decompressed_headers_.empty()) {
+ DVLOG(1) << "Delegating procesing to ProcessData";
+ total_bytes_consumed += ProcessData(data, data_len);
+ }
+ return total_bytes_consumed;
+ }
+
+ QuicHeaderId current_header_id =
+ session()->decompressor()->current_header_id();
+ // Ensure that this header id looks sane.
+ if (headers_id_ < current_header_id ||
+ headers_id_ > kMaxHeaderIdDelta + current_header_id) {
+ DVLOG(1) << ENDPOINT
+ << "Invalid headers for stream: " << id()
+ << " header_id: " << headers_id_
+ << " current_header_id: " << current_header_id;
+ session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
+ return total_bytes_consumed;
+ }
+
+ // If we are head-of-line blocked on decompression, then back up.
+ if (current_header_id != headers_id_) {
+ session()->MarkDecompressionBlocked(headers_id_, id());
+ DVLOG(1) << ENDPOINT
+ << "Unable to decompress header data for stream: " << id()
+ << " header_id: " << headers_id_;
+ return total_bytes_consumed;
+ }
+
+ // Decompressed data will be delivered to decompressed_headers_.
+ size_t bytes_consumed = session()->decompressor()->DecompressData(
+ StringPiece(data, data_len), this);
+ DCHECK_NE(0u, bytes_consumed);
+ if (bytes_consumed > data_len) {
+ DCHECK(false) << "DecompressData returned illegal value";
+ OnDecompressionError();
+ return total_bytes_consumed;
+ }
+ total_bytes_consumed += bytes_consumed;
+ data += bytes_consumed;
+ data_len -= bytes_consumed;
+
+ if (decompression_failed_) {
+ // The session will have been closed in OnDecompressionError.
+ return total_bytes_consumed;
+ }
+
+ // Headers are complete if the decompressor has moved on to the
+ // next stream.
+ headers_decompressed_ =
+ session()->decompressor()->current_header_id() != headers_id_;
+ if (!headers_decompressed_) {
+ DCHECK_EQ(0u, data_len);
+ }
+
+ ProcessHeaderData();
+
+ if (!headers_decompressed_ || !decompressed_headers_.empty()) {
+ return total_bytes_consumed;
+ }
+
+ // We have processed all of the decompressed data but we might
+ // have some more raw data to process.
+ if (data_len > 0) {
+ total_bytes_consumed += ProcessData(data, data_len);
+ }
+
+ // The sequencer will push any additional buffered frames if this data
+ // has been completely consumed.
+ return total_bytes_consumed;
+}
+
+const IPEndPoint& QuicDataStream::GetPeerAddress() {
+ return session()->peer_address();
+}
+
+QuicSpdyCompressor* QuicDataStream::compressor() {
+ return session()->compressor();
+}
+
+bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
+ return session()->GetSSLInfo(ssl_info);
+}
+
+uint32 QuicDataStream::ProcessHeaderData() {
+ if (decompressed_headers_.empty()) {
+ return 0;
+ }
+
+ size_t bytes_processed = ProcessData(decompressed_headers_.data(),
+ decompressed_headers_.length());
+ if (bytes_processed == decompressed_headers_.length()) {
+ decompressed_headers_.clear();
+ } else {
+ decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
+ }
+ return bytes_processed;
+}
+
+void QuicDataStream::OnDecompressorAvailable() {
+ DCHECK_EQ(headers_id_,
+ session()->decompressor()->current_header_id());
+ DCHECK(!headers_decompressed_);
+ DCHECK(!decompression_failed_);
+ DCHECK_EQ(0u, decompressed_headers_.length());
+
+ while (!headers_decompressed_) {
+ struct iovec iovec;
+ if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
+ return;
+ }
+
+ size_t bytes_consumed = session()->decompressor()->DecompressData(
+ StringPiece(static_cast<char*>(iovec.iov_base),
+ iovec.iov_len),
+ this);
+ DCHECK_LE(bytes_consumed, iovec.iov_len);
+ if (decompression_failed_) {
+ return;
+ }
+ sequencer()->MarkConsumed(bytes_consumed);
+
+ headers_decompressed_ =
+ session()->decompressor()->current_header_id() != headers_id_;
+ }
+
+ // Either the headers are complete, or the all data as been consumed.
+ ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
+ if (IsDoneReading()) {
+ OnFinRead();
+ } else if (FinishedReadingHeaders()) {
+ sequencer()->FlushBufferedFrames();
+ }
+}
+
+bool QuicDataStream::OnDecompressedData(StringPiece data) {
+ data.AppendToString(&decompressed_headers_);
+ return true;
+}
+
+void QuicDataStream::OnDecompressionError() {
+ DCHECK(!decompression_failed_);
+ decompression_failed_ = true;
+ session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
+}
+
+void QuicDataStream::OnClose() {
+ ReliableQuicStream::OnClose();
+
+ if (visitor_) {
+ Visitor* visitor = visitor_;
+ // Calling Visitor::OnClose() may result the destruction of the visitor,
+ // so we need to ensure we don't call it again.
+ visitor_ = NULL;
+ visitor->OnClose(this);
+ }
+}
+
+uint32 QuicDataStream::StripPriorityAndHeaderId(
+ const char* data, uint32 data_len) {
+ uint32 total_bytes_parsed = 0;
+
+ if (!priority_parsed_ && session()->connection()->is_server()) {
+ QuicPriority temporary_priority = priority_;
+ total_bytes_parsed = StripUint32(
+ data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
+ if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
+ priority_parsed_ = true;
+
+ // Spdy priorities are inverted, so the highest numerical value is the
+ // lowest legal priority.
+ if (temporary_priority > QuicUtils::LowestPriority()) {
+ session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
+ return 0;
+ }
+ priority_ = temporary_priority;
+ }
+ data += total_bytes_parsed;
+ data_len -= total_bytes_parsed;
+ }
+ if (data_len > 0 && headers_id_ == 0u) {
+ // The headers ID has not yet been read. Strip it from the beginning of
+ // the data stream.
+ total_bytes_parsed += StripUint32(
+ data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
+ }
+ return total_bytes_parsed;
+}
+
+bool QuicDataStream::FinishedReadingHeaders() {
+ return headers_decompressed_ && decompressed_headers_.empty();
+}
+
+} // namespace net