summaryrefslogtreecommitdiff
path: root/chromium/third_party/blink/renderer/modules/webtransport/incoming_stream.h
blob: dd529b70e8fa45cfb2ae432786111c3fafbe330e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_INCOMING_STREAM_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_INCOMING_STREAM_H_

#include <stdint.h>

#include "base/callback.h"
#include "base/logging.h"
#include "base/optional.h"
#include "base/util/type_safety/strong_alias.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/bindings/core/v8/script_value.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/heap/thread_state.h"

namespace blink {

class ScriptState;
class StreamAbortInfo;
class ReadableStream;
class ReadableStreamDefaultControllerWithScriptScope;
class Visitor;

// Implementation of the IncomingStream mixin from the standard:
// https://wicg.github.io/web-transport/#incoming-stream. ReceiveStream and
// BidirectionalStream delegate to this to implement the functionality.
class MODULES_EXPORT IncomingStream final
    : public GarbageCollected<IncomingStream> {
  USING_PRE_FINALIZER(IncomingStream, Dispose);

 public:
  IncomingStream(ScriptState*,
                 base::OnceClosure on_abort,
                 mojo::ScopedDataPipeConsumerHandle);
  ~IncomingStream();

  // Init() must be called before the stream is used.
  void Init();

  // Methods from the IncomingStream IDL:
  // https://wicg.github.io/web-transport/#incoming-stream
  ReadableStream* Readable() const {
    DVLOG(1) << "IncomingStream::readable() called";

    return readable_;
  }

  ScriptPromise ReadingAborted() const { return reading_aborted_; }

  void AbortReading(StreamAbortInfo*);

  // Called from QuicTransport via a WebTransportStream class. May execute
  // JavaScript.
  void OnIncomingStreamClosed(bool fin_received);

  // Called via QuicTransport via a WebTransportStream class. Expects a
  // JavaScript scope to have been entered.
  void Reset();

  // Called from QuicTransport rather than using
  // ExecutionContextLifecycleObserver to ensure correct destruction order.
  // Does not execute JavaScript.
  void ContextDestroyed();

  void Trace(Visitor*) const;

 private:
  class UnderlyingSource;

  using IsLocalAbort = util::StrongAlias<class IsLocalAbortTag, bool>;

  // Called when |data_pipe_| becomes readable or errored.
  void OnHandleReady(MojoResult, const mojo::HandleSignalsState&);

  // Called when |data_pipe_| is closed.
  void OnPeerClosed(MojoResult, const mojo::HandleSignalsState&);

  // Rejects any unfinished read() calls and resets |data_pipe_|.
  void HandlePipeClosed();

  // Handles a remote close appropriately for the value of |fin_received_|.
  void ProcessClose();

  // Reads all the data currently in the pipe and enqueues it. If no data is
  // currently available, triggers the |read_watcher_| and enqueues when data
  // becomes available.
  void ReadFromPipeAndEnqueue();

  // Copies a sequence of bytes into an ArrayBuffer and enqueues it.
  void EnqueueBytes(const void* source, uint32_t byte_length);

  // Creates a DOMException indicating that the stream has been aborted.
  // If IsLocalAbort it true it will indicate a locally-initiated abort,
  // otherwise it will indicate a server--initiated abort.
  ScriptValue CreateAbortException(IsLocalAbort);

  // Closes |readable_|, resolves |reading_aborted_| and resets |data_pipe_|.
  void CloseAbortAndReset();

  // Errors |readable_|, resolves |reading_aborted_| and resets |data_pipe_|.
  // |exception| will be set as the error on |readable_|.
  void ErrorStreamAbortAndReset(ScriptValue exception);

  // Resolves the |reading_aborted_| promise and resets the |data_pipe_|.
  void AbortAndReset();

  // Resets |data_pipe_| and clears the watchers.
  // If the pipe is open it will be closed as a side-effect.
  void ResetPipe();

  // Prepares the object for destruction.
  void Dispose();

  const Member<ScriptState> script_state_;

  base::OnceClosure on_abort_;

  mojo::ScopedDataPipeConsumerHandle data_pipe_;

  // Only armed when we need to read something.
  mojo::SimpleWatcher read_watcher_;

  // Always armed to detect close.
  mojo::SimpleWatcher close_watcher_;

  Member<ReadableStream> readable_;
  Member<ReadableStreamDefaultControllerWithScriptScope> controller_;

  // Promise returned by the |readingAborted| attribute.
  ScriptPromise reading_aborted_;
  Member<ScriptPromiseResolver> reading_aborted_resolver_;

  // This is set when OnIncomingStreamClosed() is called.
  base::Optional<bool> fin_received_;

  // True when |data_pipe_| has been detected to be closed. The close is not
  // processed until |fin_received_| is also set.
  bool is_pipe_closed_ = false;

  // Indicates if we are currently performing a two-phase read from the pipe and
  // so can't start another read.
  bool in_two_phase_read_ = false;

  // Indicates if we need to perform another read after the current one
  // completes.
  bool read_pending_ = false;
};

}  // namespace blink

#endif  // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_INCOMING_STREAM_H_