/* * Copyright (C) 2015 Canon Inc. All rights reserved. * Copyright (C) 2015 Igalia. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ // @conditional=ENABLE(READABLE_STREAM_API) // @internal function privateInitializeReadableStreamDefaultReader(stream) { "use strict"; if (!@isReadableStream(stream)) @throwTypeError("ReadableStreamDefaultReader needs a ReadableStream"); if (@isReadableStreamLocked(stream)) @throwTypeError("ReadableStream is locked"); @readableStreamReaderGenericInitialize(this, stream); this.@readRequests = []; return this; } function readableStreamReaderGenericInitialize(reader, stream) { "use strict"; reader.@ownerReadableStream = stream; stream.@reader = reader; if (stream.@state === @streamReadable) reader.@closedPromiseCapability = @newPromiseCapability(@Promise); else if (stream.@state === @streamClosed) reader.@closedPromiseCapability = { @promise: @Promise.@resolve() }; else { @assert(stream.@state === @streamErrored); reader.@closedPromiseCapability = { @promise: @Promise.@reject(stream.@storedError) }; } } function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark) { "use strict"; if (!@isReadableStream(stream)) @throwTypeError("ReadableStreamDefaultController needs a ReadableStream"); // readableStreamController is initialized with null value. if (stream.@readableStreamController !== null) @throwTypeError("ReadableStream already has a controller"); this.@controlledReadableStream = stream; this.@underlyingSource = underlyingSource; this.@queue = @newQueue(); this.@started = false; this.@closeRequested = false; this.@pullAgain = false; this.@pulling = false; this.@strategy = @validateAndNormalizeQueuingStrategy(size, highWaterMark); const controller = this; @promiseInvokeOrNoopNoCatch(underlyingSource, "start", [this]).@then(() => { controller.@started = true; @assert(!controller.@pulling); @assert(!controller.@pullAgain); @readableStreamDefaultControllerCallPullIfNeeded(controller); }, (error) => { if (stream.@state === @streamReadable) @readableStreamDefaultControllerError(controller, error); }); this.@cancel = @readableStreamDefaultControllerCancel; this.@pull = @readableStreamDefaultControllerPull; return this; } function readableStreamDefaultControllerError(controller, error) { "use strict"; const stream = controller.@controlledReadableStream; @assert(stream.@state === @streamReadable); controller.@queue = @newQueue(); @readableStreamError(stream, error); } function readableStreamTee(stream, shouldClone) { "use strict"; @assert(@isReadableStream(stream)); @assert(typeof(shouldClone) === "boolean"); const reader = new @ReadableStreamDefaultReader(stream); const teeState = { closedOrErrored: false, canceled1: false, canceled2: false, reason1: @undefined, reason2: @undefined, }; teeState.cancelPromiseCapability = @newPromiseCapability(@InternalPromise); const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone); const branch1 = new @ReadableStream({ "pull": pullFunction, "cancel": @readableStreamTeeBranch1CancelFunction(teeState, stream) }); const branch2 = new @ReadableStream({ "pull": pullFunction, "cancel": @readableStreamTeeBranch2CancelFunction(teeState, stream) }); reader.@closedPromiseCapability.@promise.@then(@undefined, function(e) { if (teeState.closedOrErrored) return; @readableStreamDefaultControllerError(branch1.@readableStreamController, e); @readableStreamDefaultControllerError(branch2.@readableStreamController, e); teeState.closedOrErrored = true; }); // Additional fields compared to the spec, as they are needed within pull/cancel functions. teeState.branch1 = branch1; teeState.branch2 = branch2; return [branch1, branch2]; } function doStructuredClone(object) { "use strict"; // FIXME: We should implement http://w3c.github.io/html/infrastructure.html#ref-for-structured-clone-4 // Implementation is currently limited to ArrayBuffer/ArrayBufferView to meet Fetch API needs. if (object instanceof @ArrayBuffer) return @structuredCloneArrayBuffer(object); if (@ArrayBuffer.@isView(object)) return @structuredCloneArrayBufferView(object); @throwTypeError("structuredClone not implemented for: " + object); } function readableStreamTeePullFunction(teeState, reader, shouldClone) { "use strict"; return function() { @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) { @assert(@isObject(result)); @assert(typeof result.done === "boolean"); if (result.done && !teeState.closedOrErrored) { if (!teeState.canceled1) @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController); if (!teeState.canceled2) @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController); teeState.closedOrErrored = true; } if (teeState.closedOrErrored) return; if (!teeState.canceled1) @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value); if (!teeState.canceled2) @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @doStructuredClone(result.value) : result.value); }); } } function readableStreamTeeBranch1CancelFunction(teeState, stream) { "use strict"; return function(r) { teeState.canceled1 = true; teeState.reason1 = r; if (teeState.canceled2) { @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( teeState.cancelPromiseCapability.@resolve, teeState.cancelPromiseCapability.@reject); } return teeState.cancelPromiseCapability.@promise; } } function readableStreamTeeBranch2CancelFunction(teeState, stream) { "use strict"; return function(r) { teeState.canceled2 = true; teeState.reason2 = r; if (teeState.canceled1) { @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( teeState.cancelPromiseCapability.@resolve, teeState.cancelPromiseCapability.@reject); } return teeState.cancelPromiseCapability.@promise; } } function isReadableStream(stream) { "use strict"; // Spec tells to return true only if stream has a readableStreamController internal slot. // However, since it is a private slot, it cannot be checked using hasOwnProperty(). // Therefore, readableStreamController is initialized with null value. return @isObject(stream) && stream.@readableStreamController !== @undefined; } function isReadableStreamDefaultReader(reader) { "use strict"; // Spec tells to return true only if reader has a readRequests internal slot. // However, since it is a private slot, it cannot be checked using hasOwnProperty(). // Since readRequests is initialized with an empty array, the following test is ok. return @isObject(reader) && !!reader.@readRequests; } function isReadableStreamDefaultController(controller) { "use strict"; // Spec tells to return true only if controller has an underlyingSource internal slot. // However, since it is a private slot, it cannot be checked using hasOwnProperty(). // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set // to an empty object. Therefore, following test is ok. return @isObject(controller) && !!controller.@underlyingSource; } function readableStreamError(stream, error) { "use strict"; @assert(@isReadableStream(stream)); @assert(stream.@state === @streamReadable); stream.@state = @streamErrored; stream.@storedError = error; if (!stream.@reader) return; const reader = stream.@reader; if (@isReadableStreamDefaultReader(reader)) { const requests = reader.@readRequests; for (let index = 0, length = requests.length; index < length; ++index) requests[index].@reject.@call(@undefined, error); reader.@readRequests = []; } else // FIXME: Implement ReadableStreamBYOBReader. @throwTypeError("Only ReadableStreamDefaultReader is currently supported"); reader.@closedPromiseCapability.@reject.@call(@undefined, error); } function readableStreamDefaultControllerCallPullIfNeeded(controller) { "use strict"; const stream = controller.@controlledReadableStream; if (stream.@state === @streamClosed || stream.@state === @streamErrored) return; if (controller.@closeRequested) return; if (!controller.@started) return; if ((!@isReadableStreamLocked(stream) || !stream.@reader.@readRequests.length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return; if (controller.@pulling) { controller.@pullAgain = true; return; } @assert(!controller.@pullAgain); controller.@pulling = true; @promiseInvokeOrNoop(controller.@underlyingSource, "pull", [controller]).@then(function() { controller.@pulling = false; if (controller.@pullAgain) { controller.@pullAgain = false; @readableStreamDefaultControllerCallPullIfNeeded(controller); } }, function(error) { if (stream.@state === @streamReadable) @readableStreamDefaultControllerError(controller, error); }); } function isReadableStreamLocked(stream) { "use strict"; @assert(@isReadableStream(stream)); return !!stream.@reader; } function readableStreamDefaultControllerGetDesiredSize(controller) { "use strict"; return controller.@strategy.highWaterMark - controller.@queue.size; } function readableStreamReaderGenericCancel(reader, reason) { "use strict"; const stream = reader.@ownerReadableStream; @assert(!!stream); return @readableStreamCancel(stream, reason); } function readableStreamCancel(stream, reason) { "use strict"; stream.@disturbed = true; if (stream.@state === @streamClosed) return @Promise.@resolve(); if (stream.@state === @streamErrored) return @Promise.@reject(stream.@storedError); @readableStreamClose(stream); return stream.@readableStreamController.@cancel(stream.@readableStreamController, reason).@then(function() { }); } function readableStreamDefaultControllerCancel(controller, reason) { "use strict"; controller.@queue = @newQueue(); return @promiseInvokeOrNoop(controller.@underlyingSource, "cancel", [reason]); } function readableStreamDefaultControllerPull(controller) { "use strict"; const stream = controller.@controlledReadableStream; if (controller.@queue.content.length) { const chunk = @dequeueValue(controller.@queue); if (controller.@closeRequested && controller.@queue.content.length === 0) @readableStreamClose(stream); else @readableStreamDefaultControllerCallPullIfNeeded(controller); return @Promise.@resolve({value: chunk, done: false}); } const pendingPromise = @readableStreamAddReadRequest(stream); @readableStreamDefaultControllerCallPullIfNeeded(controller); return pendingPromise; } function readableStreamDefaultControllerClose(controller) { "use strict"; const stream = controller.@controlledReadableStream; @assert(!controller.@closeRequested); @assert(stream.@state === @streamReadable); controller.@closeRequested = true; if (controller.@queue.content.length === 0) @readableStreamClose(stream); } function readableStreamClose(stream) { "use strict"; @assert(stream.@state === @streamReadable); stream.@state = @streamClosed; const reader = stream.@reader; if (!reader) return; if (@isReadableStreamDefaultReader(reader)) { const requests = reader.@readRequests; for (let index = 0, length = requests.length; index < length; ++index) requests[index].@resolve.@call(@undefined, {value:@undefined, done: true}); reader.@readRequests = []; } reader.@closedPromiseCapability.@resolve.@call(); } function readableStreamFulfillReadRequest(stream, chunk, done) { "use strict"; stream.@reader.@readRequests.@shift().@resolve.@call(@undefined, {value: chunk, done: done}); } function readableStreamDefaultControllerEnqueue(controller, chunk) { "use strict"; const stream = controller.@controlledReadableStream; @assert(!controller.@closeRequested); @assert(stream.@state === @streamReadable); if (@isReadableStreamLocked(stream) && stream.@reader.@readRequests.length) { @readableStreamFulfillReadRequest(stream, chunk, false); @readableStreamDefaultControllerCallPullIfNeeded(controller); return; } try { let chunkSize = 1; if (controller.@strategy.size !== @undefined) chunkSize = controller.@strategy.size(chunk); @enqueueValueWithSize(controller.@queue, chunk, chunkSize); } catch(error) { if (stream.@state === @streamReadable) @readableStreamDefaultControllerError(controller, error); throw error; } @readableStreamDefaultControllerCallPullIfNeeded(controller); } function readableStreamDefaultReaderRead(reader) { "use strict"; const stream = reader.@ownerReadableStream; @assert(!!stream); stream.@disturbed = true; if (stream.@state === @streamClosed) return @Promise.@resolve({value: @undefined, done: true}); if (stream.@state === @streamErrored) return @Promise.@reject(stream.@storedError); @assert(stream.@state === @streamReadable); return stream.@readableStreamController.@pull(stream.@readableStreamController); } function readableStreamAddReadRequest(stream) { "use strict"; @assert(@isReadableStreamDefaultReader(stream.@reader)); @assert(stream.@state == @streamReadable); const readRequest = @newPromiseCapability(@Promise); stream.@reader.@readRequests.@push(readRequest); return readRequest.@promise; } function isReadableStreamDisturbed(stream) { "use strict"; @assert(@isReadableStream(stream)); return stream.@disturbed; } function readableStreamReaderGenericRelease(reader) { "use strict"; @assert(!!reader.@ownerReadableStream); @assert(reader.@ownerReadableStream.@reader === reader); if (reader.@ownerReadableStream.@state === @streamReadable) reader.@closedPromiseCapability.@reject.@call(@undefined, new @TypeError("releasing lock of reader whose stream is still in readable state")); else reader.@closedPromiseCapability = { @promise: @Promise.@reject(new @TypeError("reader released lock")) }; reader.@ownerReadableStream.@reader = @undefined; reader.@ownerReadableStream = null; }