diff options
Diffstat (limited to 'chromium/net/quic/chromium/quic_chromium_client_stream.cc')
-rw-r--r-- | chromium/net/quic/chromium/quic_chromium_client_stream.cc | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/chromium/net/quic/chromium/quic_chromium_client_stream.cc b/chromium/net/quic/chromium/quic_chromium_client_stream.cc new file mode 100644 index 00000000000..5a3d996fb09 --- /dev/null +++ b/chromium/net/quic/chromium/quic_chromium_client_stream.cc @@ -0,0 +1,338 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/chromium/quic_chromium_client_stream.h" + +#include <utility> + +#include "base/bind_helpers.h" +#include "base/callback_helpers.h" +#include "base/location.h" +#include "base/threading/thread_task_runner_handle.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/log/net_log_event_type.h" +#include "net/quic/chromium/quic_chromium_client_session.h" +#include "net/quic/core/quic_http_utils.h" +#include "net/quic/core/quic_spdy_session.h" +#include "net/quic/core/quic_write_blocked_list.h" +#include "net/quic/core/spdy_utils.h" + +namespace net { + +QuicChromiumClientStream::QuicChromiumClientStream( + QuicStreamId id, + QuicClientSessionBase* session, + const NetLogWithSource& net_log) + : QuicSpdyStream(id, session), + net_log_(net_log), + delegate_(nullptr), + headers_delivered_(false), + session_(session), + can_migrate_(true), + weak_factory_(this) {} + +QuicChromiumClientStream::~QuicChromiumClientStream() { + if (delegate_) + delegate_->OnClose(); +} + +void QuicChromiumClientStream::OnStreamHeadersComplete(bool fin, + size_t frame_len) { + QuicSpdyStream::OnStreamHeadersComplete(fin, frame_len); + if (decompressed_headers().empty() && !decompressed_trailers().empty()) { + DCHECK(trailers_decompressed()); + // The delegate will read the trailers via a posted task. + NotifyDelegateOfHeadersCompleteLater(received_trailers().Clone(), + frame_len); + } else { + DCHECK(!headers_delivered_); + SpdyHeaderBlock headers; + SpdyFramer framer(HTTP2); + size_t headers_len = decompressed_headers().length(); + const char* header_data = decompressed_headers().data(); + if (!framer.ParseHeaderBlockInBuffer(header_data, headers_len, &headers)) { + DLOG(WARNING) << "Invalid headers"; + Reset(QUIC_BAD_APPLICATION_PAYLOAD); + return; + } + MarkHeadersConsumed(headers_len); + session_->OnInitialHeadersComplete(id(), headers); + + // The delegate will read the headers via a posted task. + NotifyDelegateOfHeadersCompleteLater(std::move(headers), frame_len); + } +} + +void QuicChromiumClientStream::OnInitialHeadersComplete( + bool fin, + size_t frame_len, + const QuicHeaderList& header_list) { + QuicSpdyStream::OnInitialHeadersComplete(fin, frame_len, header_list); + + SpdyHeaderBlock header_block; + int64_t length = -1; + if (!SpdyUtils::CopyAndValidateHeaders(header_list, &length, &header_block)) { + DLOG(ERROR) << "Failed to parse header list: " << header_list.DebugString(); + ConsumeHeaderList(); + Reset(QUIC_BAD_APPLICATION_PAYLOAD); + return; + } + + ConsumeHeaderList(); + session_->OnInitialHeadersComplete(id(), header_block); + + // The delegate will read the headers via a posted task. + NotifyDelegateOfHeadersCompleteLater(std::move(header_block), frame_len); +} + +void QuicChromiumClientStream::OnTrailingHeadersComplete( + bool fin, + size_t frame_len, + const QuicHeaderList& header_list) { + QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len, header_list); + NotifyDelegateOfHeadersCompleteLater(received_trailers().Clone(), frame_len); +} + +void QuicChromiumClientStream::OnPromiseHeadersComplete( + QuicStreamId promised_id, + size_t frame_len) { + size_t headers_len = decompressed_headers().length(); + SpdyHeaderBlock headers; + SpdyFramer framer(HTTP2); + if (!framer.ParseHeaderBlockInBuffer(decompressed_headers().data(), + headers_len, &headers)) { + DLOG(WARNING) << "Invalid headers"; + Reset(QUIC_BAD_APPLICATION_PAYLOAD); + return; + } + MarkHeadersConsumed(headers_len); + + session_->HandlePromised(id(), promised_id, headers); +} + +void QuicChromiumClientStream::OnPromiseHeaderList( + QuicStreamId promised_id, + size_t frame_len, + const QuicHeaderList& header_list) { + SpdyHeaderBlock promise_headers; + int64_t content_length = -1; + if (!SpdyUtils::CopyAndValidateHeaders(header_list, &content_length, + &promise_headers)) { + DLOG(ERROR) << "Failed to parse header list: " << header_list.DebugString(); + ConsumeHeaderList(); + Reset(QUIC_BAD_APPLICATION_PAYLOAD); + return; + } + ConsumeHeaderList(); + + session_->HandlePromised(id(), promised_id, promise_headers); +} + +void QuicChromiumClientStream::OnDataAvailable() { + if (!FinishedReadingHeaders() || !headers_delivered_) { + // Buffer the data in the sequencer until the headers have been read. + return; + } + + if (!sequencer()->HasBytesToRead() && !FinishedReadingTrailers()) { + // If there is no data to read, wait until either FIN is received or + // trailers are delivered. + return; + } + + // The delegate will read the data via a posted task, and + // will be able to, potentially, read all data which has queued up. + NotifyDelegateOfDataAvailableLater(); +} + +void QuicChromiumClientStream::OnClose() { + if (delegate_) { + delegate_->OnClose(); + delegate_ = nullptr; + delegate_tasks_.clear(); + } + ReliableQuicStream::OnClose(); +} + +void QuicChromiumClientStream::OnCanWrite() { + ReliableQuicStream::OnCanWrite(); + + if (!HasBufferedData() && !callback_.is_null()) { + base::ResetAndReturn(&callback_).Run(OK); + } +} + +size_t QuicChromiumClientStream::WriteHeaders( + SpdyHeaderBlock header_block, + bool fin, + QuicAckListenerInterface* ack_notifier_delegate) { + if (!session()->IsCryptoHandshakeConfirmed()) { + auto entry = header_block.find(":method"); + DCHECK(entry != header_block.end()); + DCHECK_NE("POST", entry->second); + } + net_log_.AddEvent( + NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_SEND_REQUEST_HEADERS, + base::Bind(&QuicRequestNetLogCallback, id(), &header_block, + QuicSpdyStream::priority())); + return QuicSpdyStream::WriteHeaders(std::move(header_block), fin, + ack_notifier_delegate); +} + +SpdyPriority QuicChromiumClientStream::priority() const { + if (delegate_ && delegate_->HasSendHeadersComplete()) { + return QuicSpdyStream::priority(); + } + return net::kV3HighestPriority; +} + +int QuicChromiumClientStream::WriteStreamData( + base::StringPiece data, + bool fin, + const CompletionCallback& callback) { + // We should not have data buffered. + DCHECK(!HasBufferedData()); + // Writes the data, or buffers it. + WriteOrBufferData(data, fin, nullptr); + if (!HasBufferedData()) { + return OK; + } + + callback_ = callback; + return ERR_IO_PENDING; +} + +int QuicChromiumClientStream::WritevStreamData( + const std::vector<scoped_refptr<IOBuffer>>& buffers, + const std::vector<int>& lengths, + bool fin, + const CompletionCallback& callback) { + // Must not be called when data is buffered. + DCHECK(!HasBufferedData()); + // Writes the data, or buffers it. + for (size_t i = 0; i < buffers.size(); ++i) { + bool is_fin = fin && (i == buffers.size() - 1); + base::StringPiece string_data(buffers[i]->data(), lengths[i]); + WriteOrBufferData(string_data, is_fin, nullptr); + } + if (!HasBufferedData()) { + return OK; + } + + callback_ = callback; + return ERR_IO_PENDING; +} + +void QuicChromiumClientStream::SetDelegate( + QuicChromiumClientStream::Delegate* delegate) { + DCHECK(!(delegate_ && delegate)); + delegate_ = delegate; + while (!delegate_tasks_.empty()) { + base::Closure closure = delegate_tasks_.front(); + delegate_tasks_.pop_front(); + closure.Run(); + } + if (delegate == nullptr && sequencer()->IsClosed()) { + OnFinRead(); + } +} + +void QuicChromiumClientStream::OnError(int error) { + if (delegate_) { + QuicChromiumClientStream::Delegate* delegate = delegate_; + delegate_ = nullptr; + delegate_tasks_.clear(); + delegate->OnError(error); + } +} + +int QuicChromiumClientStream::Read(IOBuffer* buf, int buf_len) { + if (IsDoneReading()) + return 0; // EOF + + if (!HasBytesToRead()) + return ERR_IO_PENDING; + + iovec iov; + iov.iov_base = buf->data(); + iov.iov_len = buf_len; + size_t bytes_read = Readv(&iov, 1); + // If no more body bytes and trailers are to be delivered, return + // ERR_IO_PENDING now because onDataAvailable() will be called after trailers. + if (bytes_read == 0 && !FinishedReadingTrailers()) + return ERR_IO_PENDING; + return bytes_read; +} + +bool QuicChromiumClientStream::CanWrite(const CompletionCallback& callback) { + bool can_write = session()->connection()->CanWrite(HAS_RETRANSMITTABLE_DATA); + if (!can_write) { + session()->MarkConnectionLevelWriteBlocked(id()); + DCHECK(callback_.is_null()); + callback_ = callback; + } + return can_write; +} + +void QuicChromiumClientStream::NotifyDelegateOfHeadersCompleteLater( + SpdyHeaderBlock headers, + size_t frame_len) { + RunOrBuffer(base::Bind( + &QuicChromiumClientStream::NotifyDelegateOfHeadersComplete, + weak_factory_.GetWeakPtr(), base::Passed(std::move(headers)), frame_len)); +} + +void QuicChromiumClientStream::NotifyDelegateOfHeadersComplete( + SpdyHeaderBlock headers, + size_t frame_len) { + if (!delegate_) + return; + // Only mark trailers consumed when we are about to notify delegate. + if (headers_delivered_) { + MarkTrailersConsumed(decompressed_trailers().length()); + MarkTrailersConsumed(); + // Post an async task to notify delegate of the FIN flag. + NotifyDelegateOfDataAvailableLater(); + net_log_.AddEvent( + NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_READ_RESPONSE_TRAILERS, + base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); + } else { + headers_delivered_ = true; + net_log_.AddEvent( + NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_READ_RESPONSE_HEADERS, + base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); + } + + delegate_->OnHeadersAvailable(headers, frame_len); +} + +void QuicChromiumClientStream::NotifyDelegateOfDataAvailableLater() { + RunOrBuffer( + base::Bind(&QuicChromiumClientStream::NotifyDelegateOfDataAvailable, + weak_factory_.GetWeakPtr())); +} + +void QuicChromiumClientStream::NotifyDelegateOfDataAvailable() { + if (delegate_) + delegate_->OnDataAvailable(); +} + +void QuicChromiumClientStream::RunOrBuffer(base::Closure closure) { + if (delegate_) { + base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, closure); + } else { + delegate_tasks_.push_back(closure); + } +} + +void QuicChromiumClientStream::DisableConnectionMigration() { + can_migrate_ = false; +} + +bool QuicChromiumClientStream::IsFirstStream() { + return id() == kHeadersStreamId + 2; +} + +} // namespace net |