diff options
Diffstat (limited to 'chromium/net/quic/core/quic_stream.cc')
-rw-r--r-- | chromium/net/quic/core/quic_stream.cc | 191 |
1 files changed, 181 insertions, 10 deletions
diff --git a/chromium/net/quic/core/quic_stream.cc b/chromium/net/quic/core/quic_stream.cc index e8583b62c04..4afb5c24531 100644 --- a/chromium/net/quic/core/quic_stream.cc +++ b/chromium/net/quic/core/quic_stream.cc @@ -7,6 +7,8 @@ #include "net/quic/core/quic_flow_controller.h" #include "net/quic/core/quic_session.h" #include "net/quic/platform/api/quic_bug_tracker.h" +#include "net/quic/platform/api/quic_flag_utils.h" +#include "net/quic/platform/api/quic_flags.h" #include "net/quic/platform/api/quic_logging.h" using std::string; @@ -78,7 +80,10 @@ QuicStream::QuicStream(QuicStreamId id, QuicSession* session) busy_counter_(0), add_random_padding_after_fin_(false), ack_listener_(nullptr), - send_buffer_(session->connection()->helper()->GetBufferAllocator()) { + send_buffer_( + session->connection()->helper()->GetStreamSendBufferAllocator()), + buffered_data_threshold_( + GetQuicFlag(FLAGS_quic_buffered_data_threshold)) { SetFromConfig(); } @@ -214,6 +219,26 @@ void QuicStream::WriteOrBufferData( QuicConsumedData consumed_data(0, false); fin_buffered_ = fin; + if (session_->save_data_before_consumption()) { + bool had_buffered_data = HasBufferedData(); + // Do not respect buffered data upper limit as WriteOrBufferData guarantees + // all data to be consumed. + if (data.length() > 0) { + struct iovec iov(MakeIovec(data)); + QuicIOVector quic_iov(&iov, 1, data.length()); + QuicStreamOffset offset = send_buffer_.stream_offset(); + send_buffer_.SaveStreamData(quic_iov, 0, data.length()); + OnDataBuffered(offset, data.length(), ack_listener); + } + if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { + // Write data if there is no buffered data before. + QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_save_data_before_consumption2, + 2, 4); + WriteBufferedData(); + } + return; + } + if (queued_data_.empty()) { struct iovec iov(MakeIovec(data)); consumed_data = WritevData(&iov, 1, fin, ack_listener); @@ -230,6 +255,26 @@ void QuicStream::WriteOrBufferData( } void QuicStream::OnCanWrite() { + if (session_->save_data_before_consumption()) { + DCHECK(queued_data_.empty()); + if (write_side_closed_) { + QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id() + << "attempting to write when the write side is closed"; + return; + } + if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) { + QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_save_data_before_consumption2, + 3, 4); + WriteBufferedData(); + } + if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) { + // Notify upper layer to write new data when buffered data size is below + // low water mark. + OnCanWriteNewData(); + } + return; + } + bool fin = false; while (!queued_data_.empty()) { PendingData* pending_data = &queued_data_.front(); @@ -299,6 +344,38 @@ QuicConsumedData QuicStream::WritevData( } } + if (session_->save_data_before_consumption()) { + QuicConsumedData consumed_data(0, false); + if (fin_buffered_) { + QUIC_BUG << "Fin already buffered"; + return consumed_data; + } + + bool had_buffered_data = HasBufferedData(); + if (CanWriteNewData()) { + // Save all data if buffered data size is below low water mark. + QuicIOVector quic_iovec(iov, iov_count, write_length); + consumed_data.bytes_consumed = write_length; + if (consumed_data.bytes_consumed > 0) { + QuicStreamOffset offset = send_buffer_.stream_offset(); + send_buffer_.SaveStreamData(quic_iovec, 0, write_length); + OnDataBuffered(offset, write_length, ack_listener); + } + } + consumed_data.fin_consumed = + consumed_data.bytes_consumed == write_length && fin; + fin_buffered_ = consumed_data.fin_consumed; + + if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { + // Write data if there is no buffered data before. + QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_save_data_before_consumption2, + 1, 4); + WriteBufferedData(); + } + + return consumed_data; + } + // A FIN with zero data payload should not be flow control blocked. bool fin_with_zero_data = (fin && write_length == 0); @@ -409,6 +486,11 @@ void QuicStream::CloseWriteSide() { } bool QuicStream::HasBufferedData() const { + if (session_->save_data_before_consumption()) { + DCHECK(queued_data_.empty()); + DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written_); + return send_buffer_.stream_offset() > stream_bytes_written_; + } return !queued_data_.empty(); } @@ -511,12 +593,14 @@ void QuicStream::OnStreamFrameAcked(const QuicStreamFrame& frame, QuicTime::Delta ack_delay_time) { OnStreamFrameDiscarded(frame); if (ack_listener_ != nullptr) { + QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_use_stream_notifier2, 1, 3); ack_listener_->OnPacketAcked(frame.data_length, ack_delay_time); } } void QuicStream::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) { if (ack_listener_ != nullptr) { + QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_use_stream_notifier2, 2, 3); ack_listener_->OnPacketRetransmitted(frame.data_length); } } @@ -533,7 +617,7 @@ void QuicStream::OnStreamFrameDiscarded(const QuicStreamFrame& frame) { if (frame.fin) { fin_outstanding_ = false; } - if (session_->streams_own_data() && frame.data_length > 0) { + if (session_->save_data_before_consumption() && frame.data_length > 0) { send_buffer_.RemoveStreamFrame(frame.offset, frame.data_length); } if (!IsWaitingForAcks()) { @@ -545,14 +629,6 @@ bool QuicStream::IsWaitingForAcks() const { return stream_bytes_outstanding_ || fin_outstanding_; } -void QuicStream::SaveStreamData(QuicIOVector iov, - size_t iov_offset, - QuicStreamOffset offset, - QuicByteCount data_length) { - DCHECK_LT(0u, data_length); - send_buffer_.SaveStreamData(iov, iov_offset, offset, data_length); -} - bool QuicStream::WriteStreamData(QuicStreamOffset offset, QuicByteCount data_length, QuicDataWriter* writer) { @@ -560,4 +636,99 @@ bool QuicStream::WriteStreamData(QuicStreamOffset offset, return send_buffer_.WriteStreamData(offset, data_length, writer); } +void QuicStream::WriteBufferedData() { + DCHECK(!write_side_closed_ && queued_data_.empty() && + (HasBufferedData() || fin_buffered_)); + + if (session_->ShouldYield(id())) { + session_->MarkConnectionLevelWriteBlocked(id()); + return; + } + + // Size of buffered data. + size_t write_length = queued_data_bytes(); + + // A FIN with zero data payload should not be flow control blocked. + bool fin_with_zero_data = (fin_buffered_ && write_length == 0); + + bool fin = fin_buffered_; + + // How much data flow control permits to be written. + QuicByteCount send_window = flow_controller_.SendWindowSize(); + if (stream_contributes_to_connection_flow_control_) { + send_window = + std::min(send_window, connection_flow_controller_->SendWindowSize()); + } + + if (send_window == 0 && !fin_with_zero_data) { + // Quick return if nothing can be sent. + MaybeSendBlocked(); + return; + } + + if (write_length > send_window) { + // Don't send the FIN unless all the data will be sent. + fin = false; + + // Writing more data would be a violation of flow control. + write_length = static_cast<size_t>(send_window); + QUIC_DVLOG(1) << "stream " << id() << " shortens write length to " + << write_length << " due to flow control"; + } + + QuicConsumedData consumed_data = WritevDataInner( + QuicIOVector(/*iov=*/nullptr, /*iov_count=*/0, write_length), + stream_bytes_written_, fin, nullptr); + + stream_bytes_written_ += consumed_data.bytes_consumed; + stream_bytes_outstanding_ += consumed_data.bytes_consumed; + + AddBytesSent(consumed_data.bytes_consumed); + QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends " + << stream_bytes_written_ << " bytes " + << " and has buffered data " << queued_data_bytes() << " bytes." + << " fin is sent: " << consumed_data.fin_consumed + << " fin is buffered: " << fin_buffered_; + + // The write may have generated a write error causing this stream to be + // closed. If so, simply return without marking the stream write blocked. + if (write_side_closed_) { + return; + } + + if (consumed_data.bytes_consumed == write_length) { + if (!fin_with_zero_data) { + MaybeSendBlocked(); + } + if (fin && consumed_data.fin_consumed) { + fin_sent_ = true; + fin_outstanding_ = true; + if (fin_received_) { + session_->StreamDraining(id_); + } + CloseWriteSide(); + } else if (fin && !consumed_data.fin_consumed) { + session_->MarkConnectionLevelWriteBlocked(id()); + } + } else { + session_->MarkConnectionLevelWriteBlocked(id()); + } + if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) { + busy_counter_ = 0; + } +} + +uint64_t QuicStream::queued_data_bytes() const { + if (session_->save_data_before_consumption()) { + DCHECK_EQ(0u, queued_data_bytes_); + DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written_); + return send_buffer_.stream_offset() - stream_bytes_written_; + } + return queued_data_bytes_; +} + +bool QuicStream::CanWriteNewData() const { + return queued_data_bytes() < buffered_data_threshold_; +} + } // namespace net |