summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-forEach.js
blob: e3678352c41591b9d5f201c3783da786ab6cc618 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
'use strict';

const common = require('../common');
const {
  Readable,
} = require('stream');
const assert = require('assert');
const { once } = require('events');

{
  // forEach works on synchronous streams with a synchronous predicate
  const stream = Readable.from([1, 2, 3]);
  const result = [1, 2, 3];
  (async () => {
    await stream.forEach((value) => assert.strictEqual(value, result.shift()));
  })().then(common.mustCall());
}

{
  // forEach works an asynchronous streams
  const stream = Readable.from([1, 2, 3]).filter(async (x) => {
    await Promise.resolve();
    return true;
  });
  const result = [1, 2, 3];
  (async () => {
    await stream.forEach((value) => assert.strictEqual(value, result.shift()));
  })().then(common.mustCall());
}

{
  // forEach works on asynchronous streams with a asynchronous forEach fn
  const stream = Readable.from([1, 2, 3]).filter(async (x) => {
    await Promise.resolve();
    return true;
  });
  const result = [1, 2, 3];
  (async () => {
    await stream.forEach(async (value) => {
      await Promise.resolve();
      assert.strictEqual(value, result.shift());
    });
  })().then(common.mustCall());
}

{
  // forEach works on an infinite stream
  const ac = new AbortController();
  const { signal } = ac;
  const stream = Readable.from(async function* () {
    while (true) yield 1;
  }(), { signal });
  let i = 0;
  assert.rejects(stream.forEach(common.mustCall((x) => {
    i++;
    if (i === 10) ac.abort();
    assert.strictEqual(x, 1);
  }, 10)), { name: 'AbortError' }).then(common.mustCall());
}

{
  // Emitting an error during `forEach`
  const stream = Readable.from([1, 2, 3, 4, 5]);
  assert.rejects(stream.forEach(async (x) => {
    if (x === 3) {
      stream.emit('error', new Error('boom'));
    }
  }), /boom/).then(common.mustCall());
}

{
  // Throwing an error during `forEach` (sync)
  const stream = Readable.from([1, 2, 3, 4, 5]);
  assert.rejects(stream.forEach((x) => {
    if (x === 3) {
      throw new Error('boom');
    }
  }), /boom/).then(common.mustCall());
}

{
  // Throwing an error during `forEach` (async)
  const stream = Readable.from([1, 2, 3, 4, 5]);
  assert.rejects(stream.forEach(async (x) => {
    if (x === 3) {
      return Promise.reject(new Error('boom'));
    }
  }), /boom/).then(common.mustCall());
}

{
  // Concurrency + AbortSignal
  const ac = new AbortController();
  let calls = 0;
  const forEachPromise =
    Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
      calls++;
      await once(signal, 'abort');
    }, { signal: ac.signal, concurrency: 2 });
  // pump
  assert.rejects(async () => {
    await forEachPromise;
  }, {
    name: 'AbortError',
  }).then(common.mustCall());

  setImmediate(() => {
    ac.abort();
    assert.strictEqual(calls, 2);
  });
}

{
  // Error cases
  assert.rejects(async () => {
    await Readable.from([1]).forEach(1);
  }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
  assert.rejects(async () => {
    await Readable.from([1]).forEach((x) => x, {
      concurrency: 'Foo'
    });
  }, /ERR_OUT_OF_RANGE/).then(common.mustCall());
  assert.rejects(async () => {
    await Readable.from([1]).forEach((x) => x, 1);
  }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
  // Test result is a Promise
  const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
  assert.strictEqual(typeof stream.then, 'function');
}
{
  const stream = Readable.from([1, 2, 3, 4, 5]);
  Object.defineProperty(stream, 'map', {
    value: common.mustNotCall(() => {}),
  });
  // Check that map isn't getting called.
  stream.forEach(() => true);
}