summaryrefslogtreecommitdiff
path: root/chromium/net/quic/core/quic_stream.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/net/quic/core/quic_stream.cc')
-rw-r--r--chromium/net/quic/core/quic_stream.cc191
1 files changed, 181 insertions, 10 deletions
diff --git a/chromium/net/quic/core/quic_stream.cc b/chromium/net/quic/core/quic_stream.cc
index e8583b62c04..4afb5c24531 100644
--- a/chromium/net/quic/core/quic_stream.cc
+++ b/chromium/net/quic/core/quic_stream.cc
@@ -7,6 +7,8 @@
#include "net/quic/core/quic_flow_controller.h"
#include "net/quic/core/quic_session.h"
#include "net/quic/platform/api/quic_bug_tracker.h"
+#include "net/quic/platform/api/quic_flag_utils.h"
+#include "net/quic/platform/api/quic_flags.h"
#include "net/quic/platform/api/quic_logging.h"
using std::string;
@@ -78,7 +80,10 @@ QuicStream::QuicStream(QuicStreamId id, QuicSession* session)
busy_counter_(0),
add_random_padding_after_fin_(false),
ack_listener_(nullptr),
- send_buffer_(session->connection()->helper()->GetBufferAllocator()) {
+ send_buffer_(
+ session->connection()->helper()->GetStreamSendBufferAllocator()),
+ buffered_data_threshold_(
+ GetQuicFlag(FLAGS_quic_buffered_data_threshold)) {
SetFromConfig();
}
@@ -214,6 +219,26 @@ void QuicStream::WriteOrBufferData(
QuicConsumedData consumed_data(0, false);
fin_buffered_ = fin;
+ if (session_->save_data_before_consumption()) {
+ bool had_buffered_data = HasBufferedData();
+ // Do not respect buffered data upper limit as WriteOrBufferData guarantees
+ // all data to be consumed.
+ if (data.length() > 0) {
+ struct iovec iov(MakeIovec(data));
+ QuicIOVector quic_iov(&iov, 1, data.length());
+ QuicStreamOffset offset = send_buffer_.stream_offset();
+ send_buffer_.SaveStreamData(quic_iov, 0, data.length());
+ OnDataBuffered(offset, data.length(), ack_listener);
+ }
+ if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
+ // Write data if there is no buffered data before.
+ QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_save_data_before_consumption2,
+ 2, 4);
+ WriteBufferedData();
+ }
+ return;
+ }
+
if (queued_data_.empty()) {
struct iovec iov(MakeIovec(data));
consumed_data = WritevData(&iov, 1, fin, ack_listener);
@@ -230,6 +255,26 @@ void QuicStream::WriteOrBufferData(
}
void QuicStream::OnCanWrite() {
+ if (session_->save_data_before_consumption()) {
+ DCHECK(queued_data_.empty());
+ if (write_side_closed_) {
+ QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
+ << "attempting to write when the write side is closed";
+ return;
+ }
+ if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
+ QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_save_data_before_consumption2,
+ 3, 4);
+ WriteBufferedData();
+ }
+ if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
+ // Notify upper layer to write new data when buffered data size is below
+ // low water mark.
+ OnCanWriteNewData();
+ }
+ return;
+ }
+
bool fin = false;
while (!queued_data_.empty()) {
PendingData* pending_data = &queued_data_.front();
@@ -299,6 +344,38 @@ QuicConsumedData QuicStream::WritevData(
}
}
+ if (session_->save_data_before_consumption()) {
+ QuicConsumedData consumed_data(0, false);
+ if (fin_buffered_) {
+ QUIC_BUG << "Fin already buffered";
+ return consumed_data;
+ }
+
+ bool had_buffered_data = HasBufferedData();
+ if (CanWriteNewData()) {
+ // Save all data if buffered data size is below low water mark.
+ QuicIOVector quic_iovec(iov, iov_count, write_length);
+ consumed_data.bytes_consumed = write_length;
+ if (consumed_data.bytes_consumed > 0) {
+ QuicStreamOffset offset = send_buffer_.stream_offset();
+ send_buffer_.SaveStreamData(quic_iovec, 0, write_length);
+ OnDataBuffered(offset, write_length, ack_listener);
+ }
+ }
+ consumed_data.fin_consumed =
+ consumed_data.bytes_consumed == write_length && fin;
+ fin_buffered_ = consumed_data.fin_consumed;
+
+ if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
+ // Write data if there is no buffered data before.
+ QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_save_data_before_consumption2,
+ 1, 4);
+ WriteBufferedData();
+ }
+
+ return consumed_data;
+ }
+
// A FIN with zero data payload should not be flow control blocked.
bool fin_with_zero_data = (fin && write_length == 0);
@@ -409,6 +486,11 @@ void QuicStream::CloseWriteSide() {
}
bool QuicStream::HasBufferedData() const {
+ if (session_->save_data_before_consumption()) {
+ DCHECK(queued_data_.empty());
+ DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written_);
+ return send_buffer_.stream_offset() > stream_bytes_written_;
+ }
return !queued_data_.empty();
}
@@ -511,12 +593,14 @@ void QuicStream::OnStreamFrameAcked(const QuicStreamFrame& frame,
QuicTime::Delta ack_delay_time) {
OnStreamFrameDiscarded(frame);
if (ack_listener_ != nullptr) {
+ QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_use_stream_notifier2, 1, 3);
ack_listener_->OnPacketAcked(frame.data_length, ack_delay_time);
}
}
void QuicStream::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
if (ack_listener_ != nullptr) {
+ QUIC_FLAG_COUNT_N(quic_reloadable_flag_quic_use_stream_notifier2, 2, 3);
ack_listener_->OnPacketRetransmitted(frame.data_length);
}
}
@@ -533,7 +617,7 @@ void QuicStream::OnStreamFrameDiscarded(const QuicStreamFrame& frame) {
if (frame.fin) {
fin_outstanding_ = false;
}
- if (session_->streams_own_data() && frame.data_length > 0) {
+ if (session_->save_data_before_consumption() && frame.data_length > 0) {
send_buffer_.RemoveStreamFrame(frame.offset, frame.data_length);
}
if (!IsWaitingForAcks()) {
@@ -545,14 +629,6 @@ bool QuicStream::IsWaitingForAcks() const {
return stream_bytes_outstanding_ || fin_outstanding_;
}
-void QuicStream::SaveStreamData(QuicIOVector iov,
- size_t iov_offset,
- QuicStreamOffset offset,
- QuicByteCount data_length) {
- DCHECK_LT(0u, data_length);
- send_buffer_.SaveStreamData(iov, iov_offset, offset, data_length);
-}
-
bool QuicStream::WriteStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
@@ -560,4 +636,99 @@ bool QuicStream::WriteStreamData(QuicStreamOffset offset,
return send_buffer_.WriteStreamData(offset, data_length, writer);
}
+void QuicStream::WriteBufferedData() {
+ DCHECK(!write_side_closed_ && queued_data_.empty() &&
+ (HasBufferedData() || fin_buffered_));
+
+ if (session_->ShouldYield(id())) {
+ session_->MarkConnectionLevelWriteBlocked(id());
+ return;
+ }
+
+ // Size of buffered data.
+ size_t write_length = queued_data_bytes();
+
+ // A FIN with zero data payload should not be flow control blocked.
+ bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
+
+ bool fin = fin_buffered_;
+
+ // How much data flow control permits to be written.
+ QuicByteCount send_window = flow_controller_.SendWindowSize();
+ if (stream_contributes_to_connection_flow_control_) {
+ send_window =
+ std::min(send_window, connection_flow_controller_->SendWindowSize());
+ }
+
+ if (send_window == 0 && !fin_with_zero_data) {
+ // Quick return if nothing can be sent.
+ MaybeSendBlocked();
+ return;
+ }
+
+ if (write_length > send_window) {
+ // Don't send the FIN unless all the data will be sent.
+ fin = false;
+
+ // Writing more data would be a violation of flow control.
+ write_length = static_cast<size_t>(send_window);
+ QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
+ << write_length << " due to flow control";
+ }
+
+ QuicConsumedData consumed_data = WritevDataInner(
+ QuicIOVector(/*iov=*/nullptr, /*iov_count=*/0, write_length),
+ stream_bytes_written_, fin, nullptr);
+
+ stream_bytes_written_ += consumed_data.bytes_consumed;
+ stream_bytes_outstanding_ += consumed_data.bytes_consumed;
+
+ AddBytesSent(consumed_data.bytes_consumed);
+ QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
+ << stream_bytes_written_ << " bytes "
+ << " and has buffered data " << queued_data_bytes() << " bytes."
+ << " fin is sent: " << consumed_data.fin_consumed
+ << " fin is buffered: " << fin_buffered_;
+
+ // The write may have generated a write error causing this stream to be
+ // closed. If so, simply return without marking the stream write blocked.
+ if (write_side_closed_) {
+ return;
+ }
+
+ if (consumed_data.bytes_consumed == write_length) {
+ if (!fin_with_zero_data) {
+ MaybeSendBlocked();
+ }
+ if (fin && consumed_data.fin_consumed) {
+ fin_sent_ = true;
+ fin_outstanding_ = true;
+ if (fin_received_) {
+ session_->StreamDraining(id_);
+ }
+ CloseWriteSide();
+ } else if (fin && !consumed_data.fin_consumed) {
+ session_->MarkConnectionLevelWriteBlocked(id());
+ }
+ } else {
+ session_->MarkConnectionLevelWriteBlocked(id());
+ }
+ if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
+ busy_counter_ = 0;
+ }
+}
+
+uint64_t QuicStream::queued_data_bytes() const {
+ if (session_->save_data_before_consumption()) {
+ DCHECK_EQ(0u, queued_data_bytes_);
+ DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written_);
+ return send_buffer_.stream_offset() - stream_bytes_written_;
+ }
+ return queued_data_bytes_;
+}
+
+bool QuicStream::CanWriteNewData() const {
+ return queued_data_bytes() < buffered_data_threshold_;
+}
+
} // namespace net