summaryrefslogtreecommitdiff
path: root/chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc')
-rw-r--r--chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc303
1 files changed, 225 insertions, 78 deletions
diff --git a/chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc b/chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc
index 3a32b2e70ea..e3a696ba820 100644
--- a/chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc
+++ b/chromium/net/third_party/quiche/src/http2/adapter/oghttp2_session.cc
@@ -1,16 +1,28 @@
#include "http2/adapter/oghttp2_session.h"
#include <tuple>
+#include <utility>
+#include "absl/memory/memory.h"
#include "absl/strings/escaping.h"
+#include "http2/adapter/http2_protocol.h"
#include "http2/adapter/oghttp2_util.h"
+#include "spdy/core/spdy_protocol.h"
namespace http2 {
namespace adapter {
namespace {
-const size_t kMaxMetadataFrameSize = 16384;
+// #define OGHTTP2_DEBUG_TRACE 1
+
+#ifdef OGHTTP2_DEBUG_TRACE
+const bool kTraceLoggingEnabled = true;
+#else
+const bool kTraceLoggingEnabled = false;
+#endif
+
+const uint32_t kMaxAllowedMetadataFrameSize = 65536u;
// TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
@@ -107,28 +119,55 @@ class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
uint8_t flags_ = 0;
};
+absl::string_view TracePerspectiveAsString(Perspective p) {
+ switch (p) {
+ case Perspective::kClient:
+ return "OGHTTP2_CLIENT";
+ case Perspective::kServer:
+ return "OGHTTP2_SERVER";
+ }
+}
+
} // namespace
void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
+ result_ = Http2VisitorInterface::HEADER_OK;
const bool status = visitor_.OnBeginHeadersForStream(stream_id_);
if (!status) {
result_ = Http2VisitorInterface::HEADER_CONNECTION_ERROR;
}
+ validator_.StartHeaderBlock();
}
void OgHttp2Session::PassthroughHeadersHandler::OnHeader(
absl::string_view key,
absl::string_view value) {
- if (result_ == Http2VisitorInterface::HEADER_OK) {
- result_ = visitor_.OnHeaderForStream(stream_id_, key, value);
+ if (result_ != Http2VisitorInterface::HEADER_OK) {
+ QUICHE_VLOG(2) << "Early return; status not HEADER_OK";
+ return;
+ }
+ const auto validation_result = validator_.ValidateSingleHeader(key, value);
+ if (validation_result != HeaderValidator::HEADER_OK) {
+ QUICHE_VLOG(2) << "RST_STREAM: invalid header found";
+ result_ = Http2VisitorInterface::HEADER_RST_STREAM;
+ return;
}
+ result_ = visitor_.OnHeaderForStream(stream_id_, key, value);
}
void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
size_t /* uncompressed_header_bytes */,
size_t /* compressed_header_bytes */) {
if (result_ == Http2VisitorInterface::HEADER_OK) {
- visitor_.OnEndHeadersForStream(stream_id_);
+ if (!validator_.FinishHeaderBlock(type_)) {
+ result_ = Http2VisitorInterface::HEADER_RST_STREAM;
+ }
+ }
+ if (result_ == Http2VisitorInterface::HEADER_OK) {
+ const bool result = visitor_.OnEndHeadersForStream(stream_id_);
+ if (!result) {
+ session_.decoder_.StopProcessing();
+ }
} else {
session_.OnHeaderStatus(stream_id_, result_);
}
@@ -136,6 +175,12 @@ void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
: visitor_(visitor),
+ receive_logger_(
+ this, TracePerspectiveAsString(options.perspective),
+ []() { return kTraceLoggingEnabled; }, this),
+ send_logger_(
+ TracePerspectiveAsString(options.perspective),
+ []() { return kTraceLoggingEnabled; }, this),
headers_handler_(*this, visitor),
connection_window_manager_(kInitialFlowControlWindowSize,
[this](size_t window_update_delta) {
@@ -143,7 +188,7 @@ OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
window_update_delta);
}),
options_(options) {
- decoder_.set_visitor(this);
+ decoder_.set_visitor(&receive_logger_);
decoder_.set_extension_visitor(this);
if (options_.perspective == Perspective::kServer) {
remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
@@ -217,8 +262,8 @@ int OgHttp2Session::GetHpackDecoderDynamicTableSize() const {
return decoder == nullptr ? 0 : decoder->GetDynamicTableSize();
}
-ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
- ssize_t preface_consumed = 0;
+int64_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
+ int64_t preface_consumed = 0;
if (!remaining_preface_.empty()) {
QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
// decoder_ does not understand the client connection preface.
@@ -228,7 +273,7 @@ ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
QUICHE_DLOG(INFO) << "Preface doesn't match! Expected: ["
<< absl::CEscape(remaining_preface_) << "], actual: ["
<< absl::CEscape(bytes) << "]";
- visitor_.OnConnectionError();
+ LatchErrorAndNotify();
return -1;
}
remaining_preface_.remove_prefix(min_size);
@@ -240,8 +285,14 @@ ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
}
preface_consumed = min_size;
}
- ssize_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
- return result < 0 ? result : result + preface_consumed;
+ int64_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
+ if (latched_error_) {
+ QUICHE_VLOG(2) << "ProcessBytes encountered an error.";
+ return -1;
+ }
+ const int64_t ret = result < 0 ? result : result + preface_consumed;
+ QUICHE_VLOG(2) << "ProcessBytes returning: " << ret;
+ return ret;
}
int OgHttp2Session::Consume(Http2StreamId stream_id, size_t num_bytes) {
@@ -284,7 +335,7 @@ void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
int OgHttp2Session::Send() {
MaybeSetupPreface();
- ssize_t result = std::numeric_limits<ssize_t>::max();
+ int64_t result = std::numeric_limits<int64_t>::max();
// Flush any serialized prefix.
while (result > 0 && !serialized_prefix_.empty()) {
result = visitor_.OnReadyToSend(serialized_prefix_);
@@ -292,9 +343,13 @@ int OgHttp2Session::Send() {
serialized_prefix_.erase(0, result);
}
}
- if (!serialized_prefix_.empty()) {
- return result < 0 ? result : 0;
+ if (result < 0) {
+ LatchErrorAndNotify();
+ return result;
+ } else if (!serialized_prefix_.empty()) {
+ return 0;
}
+
bool continue_writing = SendQueuedFrames();
while (continue_writing && !connection_metadata_.empty()) {
continue_writing = SendMetadata(0, connection_metadata_);
@@ -321,10 +376,11 @@ bool OgHttp2Session::SendQueuedFrames() {
frame_ptr->Visit(&c);
visitor_.OnBeforeFrameSent(c.frame_type(), c.stream_id(), c.length(),
c.flags());
+ frame_ptr->Visit(&send_logger_);
spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
- const ssize_t result = visitor_.OnReadyToSend(absl::string_view(frame));
+ const int64_t result = visitor_.OnReadyToSend(absl::string_view(frame));
if (result < 0) {
- visitor_.OnConnectionError();
+ LatchErrorAndNotify();
return false;
} else if (result == 0) {
// Write blocked.
@@ -336,7 +392,7 @@ bool OgHttp2Session::SendQueuedFrames() {
// If this endpoint is resetting the stream, the stream should be
// closed. This endpoint is already aware of the outbound RST_STREAM and
// its error code, so close with NO_ERROR.
- visitor_.OnCloseStream(c.stream_id(), Http2ErrorCode::NO_ERROR);
+ CloseStream(c.stream_id(), Http2ErrorCode::NO_ERROR);
}
frames_.pop_front();
@@ -377,11 +433,12 @@ bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
return true;
}
bool source_can_produce = true;
- int32_t available_window = std::min(
- std::min(connection_send_window_, state.send_window), max_frame_payload_);
+ int32_t available_window =
+ std::min({connection_send_window_, state.send_window,
+ static_cast<int32_t>(max_frame_payload_)});
while (connection_can_write && available_window > 0 &&
state.outbound_body != nullptr) {
- ssize_t length;
+ int64_t length;
bool end_data;
std::tie(length, end_data) =
state.outbound_body->SelectPayloadLength(available_window);
@@ -390,7 +447,8 @@ bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
break;
} else if (length == DataFrameSource::kError) {
source_can_produce = false;
- visitor_.OnCloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
+ CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
+ // No more work on the stream; it has been closed.
break;
}
const bool fin = end_data ? state.outbound_body->send_fin() : false;
@@ -411,9 +469,8 @@ bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
visitor_.OnFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
connection_send_window_ -= length;
state.send_window -= length;
- available_window =
- std::min(std::min(connection_send_window_, state.send_window),
- max_frame_payload_);
+ available_window = std::min({connection_send_window_, state.send_window,
+ static_cast<int32_t>(max_frame_payload_)});
}
if (end_data) {
bool sent_trailers = false;
@@ -428,14 +485,17 @@ bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
}
state.outbound_body = nullptr;
if (fin || sent_trailers) {
- MaybeCloseWithRstStream(stream_id, state);
+ if (MaybeCloseWithRstStream(stream_id, state)) {
+ // No more work on the stream; it has been closed.
+ break;
+ }
}
}
}
- // If the stream still has data to send, it should be marked as ready in the
- // write scheduler.
- if (source_can_produce && state.send_window > 0 &&
- state.outbound_body != nullptr) {
+ // If the stream still exists and has data to send, it should be marked as
+ // ready in the write scheduler.
+ if (stream_map_.contains(stream_id) && source_can_produce &&
+ state.send_window > 0 && state.outbound_body != nullptr) {
write_scheduler_.MarkStreamReady(stream_id, false);
}
// Streams can continue writing as long as the connection is not write-blocked
@@ -445,19 +505,21 @@ bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
bool OgHttp2Session::SendMetadata(Http2StreamId stream_id,
OgHttp2Session::MetadataSequence& sequence) {
- auto payload_buffer = absl::make_unique<uint8_t[]>(kMaxMetadataFrameSize);
+ const uint32_t max_payload_size =
+ std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
+ auto payload_buffer = absl::make_unique<uint8_t[]>(max_payload_size);
while (!sequence.empty()) {
MetadataSource& source = *sequence.front();
- ssize_t written;
+ int64_t written;
bool end_metadata;
std::tie(written, end_metadata) =
- source.Pack(payload_buffer.get(), kMaxMetadataFrameSize);
+ source.Pack(payload_buffer.get(), max_payload_size);
if (written < 0) {
// Did not touch the connection, so perhaps writes are still possible.
return true;
}
- QUICHE_DCHECK_LE(static_cast<size_t>(written), kMaxMetadataFrameSize);
+ QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
auto payload = absl::string_view(
reinterpret_cast<const char*>(payload_buffer.get()), written);
EnqueueFrame(absl::make_unique<spdy::SpdyUnknownIR>(
@@ -476,21 +538,15 @@ int32_t OgHttp2Session::SubmitRequest(
// TODO(birenroy): return an error for the incorrect perspective
const Http2StreamId stream_id = next_stream_id_;
next_stream_id_ += 2;
- // Convert headers to header block, create headers frame.
- auto frame =
- absl::make_unique<spdy::SpdyHeadersIR>(stream_id, ToHeaderBlock(headers));
- // Add data source and user data to stream state
- auto iter = CreateStream(stream_id);
- write_scheduler_.MarkStreamReady(stream_id, false);
- if (data_source == nullptr) {
- frame->set_fin(true);
- iter->second.half_closed_local = true;
+ if (CanCreateStream()) {
+ StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
+ user_data);
} else {
- iter->second.outbound_body = std::move(data_source);
+ // TODO(diannahu): There should probably be a limit to the number of allowed
+ // pending streams.
+ pending_streams_.push_back(
+ {stream_id, ToHeaderBlock(headers), std::move(data_source), user_data});
}
- iter->second.user_data = user_data;
- // Enqueue headers frame
- EnqueueFrame(std::move(frame));
return stream_id;
}
@@ -503,22 +559,17 @@ int OgHttp2Session::SubmitResponse(
QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
}
- // Convert headers to header block, create headers frame
- auto frame =
- absl::make_unique<spdy::SpdyHeadersIR>(stream_id, ToHeaderBlock(headers));
- if (data_source == nullptr) {
- frame->set_fin(true);
+ const bool end_stream = data_source == nullptr;
+ if (end_stream) {
if (iter->second.half_closed_remote) {
- visitor_.OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR);
+ CloseStream(stream_id, Http2ErrorCode::NO_ERROR);
}
- // TODO(birenroy): the server adapter should probably delete stream state
- // when calling visitor_.OnCloseStream.
} else {
// Add data source to stream state
iter->second.outbound_body = std::move(data_source);
write_scheduler_.MarkStreamReady(stream_id, false);
}
- EnqueueFrame(std::move(frame));
+ SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
return 0;
}
@@ -571,7 +622,7 @@ void OgHttp2Session::OnError(http2::Http2DecoderAdapter::SpdyFramerError error,
QUICHE_VLOG(1) << "Error: "
<< http2::Http2DecoderAdapter::SpdyFramerErrorToString(error)
<< " details: " << detailed_error;
- visitor_.OnConnectionError();
+ LatchErrorAndNotify();
}
void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id,
@@ -580,19 +631,29 @@ void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id,
uint8_t flags) {
highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
highest_received_stream_id_);
- visitor_.OnFrameHeader(stream_id, length, type, flags);
+ const bool result = visitor_.OnFrameHeader(stream_id, length, type, flags);
+ if (!result) {
+ decoder_.StopProcessing();
+ }
}
void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
size_t length, bool /*fin*/) {
- visitor_.OnBeginDataForStream(stream_id, length);
+ const bool result = visitor_.OnBeginDataForStream(stream_id, length);
+ if (!result) {
+ decoder_.StopProcessing();
+ }
}
void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
const char* data,
size_t len) {
MarkDataBuffered(stream_id, len);
- visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
+ const bool result =
+ visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
+ if (!result) {
+ decoder_.StopProcessing();
+ }
}
void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
@@ -605,7 +666,7 @@ void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
options_.perspective == Perspective::kClient) {
// From the client's perspective, the stream can be closed if it's already
// half_closed_local.
- visitor_.OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR);
+ CloseStream(stream_id, Http2ErrorCode::NO_ERROR);
}
}
@@ -624,10 +685,27 @@ void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId /*stream_id*/, size_t
spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
spdy::SpdyStreamId stream_id) {
headers_handler_.set_stream_id(stream_id);
+ auto it = stream_map_.find(stream_id);
+ if (it != stream_map_.end()) {
+ headers_handler_.set_header_type(
+ NextHeaderType(it->second.received_header_type));
+ }
return &headers_handler_;
}
-void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId /*stream_id*/) {
+void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId stream_id) {
+ auto it = stream_map_.find(stream_id);
+ if (it != stream_map_.end()) {
+ if (headers_handler_.header_type() == HeaderType::RESPONSE &&
+ !headers_handler_.status_header().empty() &&
+ headers_handler_.status_header()[0] == '1') {
+ // If response headers carried a 1xx response code, final response headers
+ // should still be forthcoming.
+ it->second.received_header_type = HeaderType::RESPONSE_100;
+ } else {
+ it->second.received_header_type = headers_handler_.header_type();
+ }
+ }
headers_handler_.set_stream_id(0);
}
@@ -640,9 +718,7 @@ void OgHttp2Session::OnRstStream(spdy::SpdyStreamId stream_id,
write_scheduler_.UnregisterStream(stream_id);
}
visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
- // TODO(birenroy): Consider bundling "close stream" behavior into a dedicated
- // method that also cleans up the stream map.
- visitor_.OnCloseStream(stream_id, TranslateErrorCode(error_code));
+ CloseStream(stream_id, TranslateErrorCode(error_code));
}
void OgHttp2Session::OnSettings() {
@@ -653,6 +729,10 @@ void OgHttp2Session::OnSetting(spdy::SpdySettingsId id, uint32_t value) {
visitor_.OnSetting({id, value});
if (id == kMetadataExtensionId) {
peer_supports_metadata_ = (value != 0);
+ } else if (id == MAX_FRAME_SIZE) {
+ max_frame_payload_ = value;
+ } else if (id == MAX_CONCURRENT_STREAMS) {
+ max_outbound_concurrent_streams_ = value;
}
}
@@ -674,8 +754,11 @@ void OgHttp2Session::OnPing(spdy::SpdyPingId unique_id, bool is_ack) {
void OgHttp2Session::OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,
spdy::SpdyErrorCode error_code) {
received_goaway_ = true;
- visitor_.OnGoAway(last_accepted_stream_id, TranslateErrorCode(error_code),
- "");
+ const bool result = visitor_.OnGoAway(last_accepted_stream_id,
+ TranslateErrorCode(error_code), "");
+ if (!result) {
+ decoder_.StopProcessing();
+ }
}
bool OgHttp2Session::OnGoAwayFrameData(const char* /*goaway_data*/, size_t
@@ -748,7 +831,7 @@ void OgHttp2Session::OnHeaderStatus(
stream_id, spdy::ERROR_CODE_INTERNAL_ERROR));
}
} else if (result == Http2VisitorInterface::HEADER_CONNECTION_ERROR) {
- visitor_.OnConnectionError();
+ LatchErrorAndNotify();
}
}
@@ -771,13 +854,17 @@ bool OgHttp2Session::OnFrameHeader(spdy::SpdyStreamId stream_id, size_t length,
void OgHttp2Session::OnFramePayload(const char* data, size_t len) {
if (metadata_length_ > 0) {
QUICHE_DCHECK_LE(len, metadata_length_);
- visitor_.OnMetadataForStream(metadata_stream_id_,
- absl::string_view(data, len));
- metadata_length_ -= len;
- if (metadata_length_ == 0 && end_metadata_) {
- visitor_.OnMetadataEndForStream(metadata_stream_id_);
- metadata_stream_id_ = 0;
- end_metadata_ = false;
+ const bool success = visitor_.OnMetadataForStream(
+ metadata_stream_id_, absl::string_view(data, len));
+ if (success) {
+ metadata_length_ -= len;
+ if (metadata_length_ == 0 && end_metadata_) {
+ visitor_.OnMetadataEndForStream(metadata_stream_id_);
+ metadata_stream_id_ = 0;
+ end_metadata_ = false;
+ }
+ } else {
+ decoder_.StopProcessing();
}
} else {
QUICHE_DLOG(INFO) << "Unexpected metadata payload for stream "
@@ -808,6 +895,15 @@ void OgHttp2Session::SendWindowUpdate(Http2StreamId stream_id,
absl::make_unique<spdy::SpdyWindowUpdateIR>(stream_id, update_delta));
}
+void OgHttp2Session::SendHeaders(Http2StreamId stream_id,
+ spdy::SpdyHeaderBlock headers,
+ bool end_stream) {
+ auto frame =
+ absl::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(headers));
+ frame->set_fin(end_stream);
+ EnqueueFrame(std::move(frame));
+}
+
void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
spdy::SpdyHeaderBlock trailers) {
auto frame =
@@ -816,22 +912,22 @@ void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
EnqueueFrame(std::move(frame));
}
-void OgHttp2Session::MaybeCloseWithRstStream(Http2StreamId stream_id,
+bool OgHttp2Session::MaybeCloseWithRstStream(Http2StreamId stream_id,
StreamState& state) {
state.half_closed_local = true;
if (options_.perspective == Perspective::kServer) {
if (state.half_closed_remote) {
- visitor_.OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR);
+ CloseStream(stream_id, Http2ErrorCode::NO_ERROR);
+ return true;
} else {
// Since the peer has not yet ended the stream, this endpoint should
// send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
- // Enqueuing the RST_STREAM also invokes OnCloseStream.
+ // Sending the RST_STREAM also invokes OnCloseStream.
}
- // TODO(birenroy): the server adapter should probably delete stream state
- // when calling visitor_.OnCloseStream.
}
+ return false;
}
void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
@@ -861,5 +957,56 @@ OgHttp2Session::StreamStateMap::iterator OgHttp2Session::CreateStream(
return iter;
}
+void OgHttp2Session::StartRequest(Http2StreamId stream_id,
+ spdy::SpdyHeaderBlock headers,
+ std::unique_ptr<DataFrameSource> data_source,
+ void* user_data) {
+ auto iter = CreateStream(stream_id);
+ write_scheduler_.MarkStreamReady(stream_id, false);
+ const bool end_stream = data_source == nullptr;
+ if (end_stream) {
+ iter->second.half_closed_local = true;
+ } else {
+ iter->second.outbound_body = std::move(data_source);
+ }
+ iter->second.user_data = user_data;
+ SendHeaders(stream_id, std::move(headers), end_stream);
+}
+
+void OgHttp2Session::CloseStream(Http2StreamId stream_id,
+ Http2ErrorCode error_code) {
+ visitor_.OnCloseStream(stream_id, error_code);
+ stream_map_.erase(stream_id);
+
+ if (!pending_streams_.empty() && CanCreateStream()) {
+ PendingStreamState& pending_stream = pending_streams_.front();
+ StartRequest(pending_stream.stream_id, std::move(pending_stream.headers),
+ std::move(pending_stream.data_source),
+ pending_stream.user_data);
+ pending_streams_.pop_front();
+ }
+}
+
+bool OgHttp2Session::CanCreateStream() const {
+ return stream_map_.size() < max_outbound_concurrent_streams_;
+}
+
+HeaderType OgHttp2Session::NextHeaderType(
+ absl::optional<HeaderType> current_type) {
+ if (IsServerSession()) {
+ return HeaderType::REQUEST;
+ } else if (!current_type ||
+ current_type.value() == HeaderType::RESPONSE_100) {
+ return HeaderType::RESPONSE;
+ } else {
+ return HeaderType::RESPONSE_TRAILER;
+ }
+}
+
+void OgHttp2Session::LatchErrorAndNotify() {
+ latched_error_ = true;
+ visitor_.OnConnectionError();
+}
+
} // namespace adapter
} // namespace http2