// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h" #include #include #include "third_party/blink/renderer/bindings/core/v8/script_function.h" #include "third_party/blink/renderer/bindings/core/v8/script_value.h" #include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h" #include "third_party/blink/renderer/bindings/core/v8/v8_uint8_array.h" #include "third_party/blink/renderer/core/streams/readable_stream_operations.h" #include "third_party/blink/renderer/platform/bindings/scoped_persistent.h" #include "third_party/blink/renderer/platform/bindings/script_state.h" #include "third_party/blink/renderer/platform/bindings/v8_binding_macros.h" #include "third_party/blink/renderer/platform/wtf/assertions.h" #include "third_party/blink/renderer/platform/wtf/text/wtf_string.h" #include "v8/include/v8.h" namespace blink { class ReadableStreamBytesConsumer::OnFulfilled final : public ScriptFunction { public: static v8::Local CreateFunction( ScriptState* script_state, ReadableStreamBytesConsumer* consumer) { return (new OnFulfilled(script_state, consumer))->BindToV8Function(); } ScriptValue Call(ScriptValue v) override { bool done; v8::Local item = v.V8Value(); DCHECK(item->IsObject()); v8::Local value = V8UnpackIteratorResult(v.GetScriptState(), item.As(), &done) .ToLocalChecked(); if (done) { consumer_->OnReadDone(); return v; } if (!value->IsUint8Array()) { consumer_->OnRejected(); return ScriptValue(); } consumer_->OnRead(V8Uint8Array::ToImpl(value.As())); return v; } void Trace(blink::Visitor* visitor) override { visitor->Trace(consumer_); ScriptFunction::Trace(visitor); } private: OnFulfilled(ScriptState* script_state, ReadableStreamBytesConsumer* consumer) : ScriptFunction(script_state), consumer_(consumer) {} Member consumer_; }; class ReadableStreamBytesConsumer::OnRejected final : public ScriptFunction { public: static v8::Local CreateFunction( ScriptState* script_state, ReadableStreamBytesConsumer* consumer) { return (new OnRejected(script_state, consumer))->BindToV8Function(); } ScriptValue Call(ScriptValue v) override { consumer_->OnRejected(); return v; } void Trace(blink::Visitor* visitor) override { visitor->Trace(consumer_); ScriptFunction::Trace(visitor); } private: OnRejected(ScriptState* script_state, ReadableStreamBytesConsumer* consumer) : ScriptFunction(script_state), consumer_(consumer) {} Member consumer_; }; ReadableStreamBytesConsumer::ReadableStreamBytesConsumer( ScriptState* script_state, ScriptValue stream_reader) : reader_(script_state->GetIsolate(), stream_reader.V8Value()), script_state_(script_state) { reader_.SetPhantom(); } ReadableStreamBytesConsumer::~ReadableStreamBytesConsumer() {} BytesConsumer::Result ReadableStreamBytesConsumer::BeginRead( const char** buffer, size_t* available) { *buffer = nullptr; *available = 0; if (state_ == PublicState::kErrored) return Result::kError; if (state_ == PublicState::kClosed) return Result::kDone; if (pending_buffer_) { DCHECK_LE(pending_offset_, pending_buffer_->length()); *buffer = reinterpret_cast(pending_buffer_->Data()) + pending_offset_; *available = pending_buffer_->length() - pending_offset_; return Result::kOk; } if (!is_reading_) { is_reading_ = true; ScriptState::Scope scope(script_state_.get()); ScriptValue reader(script_state_.get(), reader_.NewLocal(script_state_->GetIsolate())); // The owner must retain the reader. DCHECK(!reader.IsEmpty()); ReadableStreamOperations::DefaultReaderRead(script_state_.get(), reader) .Then(OnFulfilled::CreateFunction(script_state_.get(), this), OnRejected::CreateFunction(script_state_.get(), this)); } return Result::kShouldWait; } BytesConsumer::Result ReadableStreamBytesConsumer::EndRead(size_t read_size) { DCHECK(pending_buffer_); DCHECK_LE(pending_offset_ + read_size, pending_buffer_->length()); pending_offset_ += read_size; if (pending_offset_ >= pending_buffer_->length()) { pending_buffer_ = nullptr; pending_offset_ = 0; } return Result::kOk; } void ReadableStreamBytesConsumer::SetClient(Client* client) { DCHECK(!client_); DCHECK(client); client_ = client; } void ReadableStreamBytesConsumer::ClearClient() { client_ = nullptr; } void ReadableStreamBytesConsumer::Cancel() { if (state_ == PublicState::kClosed || state_ == PublicState::kErrored) return; state_ = PublicState::kClosed; ClearClient(); reader_.Clear(); } BytesConsumer::PublicState ReadableStreamBytesConsumer::GetPublicState() const { return state_; } BytesConsumer::Error ReadableStreamBytesConsumer::GetError() const { return Error("Failed to read from a ReadableStream."); } void ReadableStreamBytesConsumer::Trace(blink::Visitor* visitor) { visitor->Trace(client_); visitor->Trace(pending_buffer_); BytesConsumer::Trace(visitor); } void ReadableStreamBytesConsumer::Dispose() { reader_.Clear(); } void ReadableStreamBytesConsumer::OnRead(DOMUint8Array* buffer) { DCHECK(is_reading_); DCHECK(buffer); DCHECK(!pending_buffer_); DCHECK(!pending_offset_); is_reading_ = false; if (state_ == PublicState::kClosed) return; DCHECK_EQ(state_, PublicState::kReadableOrWaiting); pending_buffer_ = buffer; if (client_) client_->OnStateChange(); } void ReadableStreamBytesConsumer::OnReadDone() { DCHECK(is_reading_); DCHECK(!pending_buffer_); is_reading_ = false; if (state_ == PublicState::kClosed) return; DCHECK_EQ(state_, PublicState::kReadableOrWaiting); state_ = PublicState::kClosed; reader_.Clear(); Client* client = client_; ClearClient(); if (client) client->OnStateChange(); } void ReadableStreamBytesConsumer::OnRejected() { DCHECK(is_reading_); DCHECK(!pending_buffer_); is_reading_ = false; if (state_ == PublicState::kClosed) return; DCHECK_EQ(state_, PublicState::kReadableOrWaiting); state_ = PublicState::kErrored; reader_.Clear(); Client* client = client_; ClearClient(); if (client) client->OnStateChange(); } } // namespace blink