diff options
Diffstat (limited to 'chromium/third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.cc')
-rw-r--r-- | chromium/third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.cc | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/chromium/third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.cc b/chromium/third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.cc new file mode 100644 index 00000000000..abf67b31626 --- /dev/null +++ b/chromium/third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.cc @@ -0,0 +1,221 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h" + +#include <string.h> + +#include <algorithm> + +#include "third_party/blink/renderer/bindings/core/v8/script_function.h" +#include "third_party/blink/renderer/bindings/core/v8/script_value.h" +#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h" +#include "third_party/blink/renderer/bindings/core/v8/v8_uint8_array.h" +#include "third_party/blink/renderer/core/streams/readable_stream_operations.h" +#include "third_party/blink/renderer/platform/bindings/scoped_persistent.h" +#include "third_party/blink/renderer/platform/bindings/script_state.h" +#include "third_party/blink/renderer/platform/bindings/v8_binding_macros.h" +#include "third_party/blink/renderer/platform/wtf/assertions.h" +#include "third_party/blink/renderer/platform/wtf/text/wtf_string.h" +#include "v8/include/v8.h" + +namespace blink { + +class ReadableStreamBytesConsumer::OnFulfilled final : public ScriptFunction { + public: + static v8::Local<v8::Function> CreateFunction( + ScriptState* script_state, + ReadableStreamBytesConsumer* consumer) { + return (new OnFulfilled(script_state, consumer))->BindToV8Function(); + } + + ScriptValue Call(ScriptValue v) override { + bool done; + v8::Local<v8::Value> item = v.V8Value(); + DCHECK(item->IsObject()); + v8::Local<v8::Value> value = + V8UnpackIteratorResult(v.GetScriptState(), item.As<v8::Object>(), &done) + .ToLocalChecked(); + if (done) { + consumer_->OnReadDone(); + return v; + } + if (!value->IsUint8Array()) { + consumer_->OnRejected(); + return ScriptValue(); + } + consumer_->OnRead(V8Uint8Array::ToImpl(value.As<v8::Object>())); + return v; + } + + void Trace(blink::Visitor* visitor) override { + visitor->Trace(consumer_); + ScriptFunction::Trace(visitor); + } + + private: + OnFulfilled(ScriptState* script_state, ReadableStreamBytesConsumer* consumer) + : ScriptFunction(script_state), consumer_(consumer) {} + + Member<ReadableStreamBytesConsumer> consumer_; +}; + +class ReadableStreamBytesConsumer::OnRejected final : public ScriptFunction { + public: + static v8::Local<v8::Function> CreateFunction( + ScriptState* script_state, + ReadableStreamBytesConsumer* consumer) { + return (new OnRejected(script_state, consumer))->BindToV8Function(); + } + + ScriptValue Call(ScriptValue v) override { + consumer_->OnRejected(); + return v; + } + + void Trace(blink::Visitor* visitor) override { + visitor->Trace(consumer_); + ScriptFunction::Trace(visitor); + } + + private: + OnRejected(ScriptState* script_state, ReadableStreamBytesConsumer* consumer) + : ScriptFunction(script_state), consumer_(consumer) {} + + Member<ReadableStreamBytesConsumer> consumer_; +}; + +ReadableStreamBytesConsumer::ReadableStreamBytesConsumer( + ScriptState* script_state, + ScriptValue stream_reader) + : reader_(script_state->GetIsolate(), stream_reader.V8Value()), + script_state_(script_state) { + reader_.SetPhantom(); +} + +ReadableStreamBytesConsumer::~ReadableStreamBytesConsumer() {} + +BytesConsumer::Result ReadableStreamBytesConsumer::BeginRead( + const char** buffer, + size_t* available) { + *buffer = nullptr; + *available = 0; + if (state_ == PublicState::kErrored) + return Result::kError; + if (state_ == PublicState::kClosed) + return Result::kDone; + + if (pending_buffer_) { + DCHECK_LE(pending_offset_, pending_buffer_->length()); + *buffer = reinterpret_cast<const char*>(pending_buffer_->Data()) + + pending_offset_; + *available = pending_buffer_->length() - pending_offset_; + return Result::kOk; + } + if (!is_reading_) { + is_reading_ = true; + ScriptState::Scope scope(script_state_.get()); + ScriptValue reader(script_state_.get(), + reader_.NewLocal(script_state_->GetIsolate())); + // The owner must retain the reader. + DCHECK(!reader.IsEmpty()); + ReadableStreamOperations::DefaultReaderRead(script_state_.get(), reader) + .Then(OnFulfilled::CreateFunction(script_state_.get(), this), + OnRejected::CreateFunction(script_state_.get(), this)); + } + return Result::kShouldWait; +} + +BytesConsumer::Result ReadableStreamBytesConsumer::EndRead(size_t read_size) { + DCHECK(pending_buffer_); + DCHECK_LE(pending_offset_ + read_size, pending_buffer_->length()); + pending_offset_ += read_size; + if (pending_offset_ >= pending_buffer_->length()) { + pending_buffer_ = nullptr; + pending_offset_ = 0; + } + return Result::kOk; +} + +void ReadableStreamBytesConsumer::SetClient(Client* client) { + DCHECK(!client_); + DCHECK(client); + client_ = client; +} + +void ReadableStreamBytesConsumer::ClearClient() { + client_ = nullptr; +} + +void ReadableStreamBytesConsumer::Cancel() { + if (state_ == PublicState::kClosed || state_ == PublicState::kErrored) + return; + state_ = PublicState::kClosed; + ClearClient(); + reader_.Clear(); +} + +BytesConsumer::PublicState ReadableStreamBytesConsumer::GetPublicState() const { + return state_; +} + +BytesConsumer::Error ReadableStreamBytesConsumer::GetError() const { + return Error("Failed to read from a ReadableStream."); +} + +void ReadableStreamBytesConsumer::Trace(blink::Visitor* visitor) { + visitor->Trace(client_); + visitor->Trace(pending_buffer_); + BytesConsumer::Trace(visitor); +} + +void ReadableStreamBytesConsumer::Dispose() { + reader_.Clear(); +} + +void ReadableStreamBytesConsumer::OnRead(DOMUint8Array* buffer) { + DCHECK(is_reading_); + DCHECK(buffer); + DCHECK(!pending_buffer_); + DCHECK(!pending_offset_); + is_reading_ = false; + if (state_ == PublicState::kClosed) + return; + DCHECK_EQ(state_, PublicState::kReadableOrWaiting); + pending_buffer_ = buffer; + if (client_) + client_->OnStateChange(); +} + +void ReadableStreamBytesConsumer::OnReadDone() { + DCHECK(is_reading_); + DCHECK(!pending_buffer_); + is_reading_ = false; + if (state_ == PublicState::kClosed) + return; + DCHECK_EQ(state_, PublicState::kReadableOrWaiting); + state_ = PublicState::kClosed; + reader_.Clear(); + Client* client = client_; + ClearClient(); + if (client) + client->OnStateChange(); +} + +void ReadableStreamBytesConsumer::OnRejected() { + DCHECK(is_reading_); + DCHECK(!pending_buffer_); + is_reading_ = false; + if (state_ == PublicState::kClosed) + return; + DCHECK_EQ(state_, PublicState::kReadableOrWaiting); + state_ = PublicState::kErrored; + reader_.Clear(); + Client* client = client_; + ClearClient(); + if (client) + client->OnStateChange(); +} + +} // namespace blink |