summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-reduce.js
blob: 56271c5e23262742532267d62faefb468d160c85 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
'use strict';

const common = require('../common');
const {
  Readable,
} = require('stream');
const assert = require('assert');

function sum(p, c) {
  return p + c;
}

{
  // Does the same thing as `(await stream.toArray()).reduce(...)`
  (async () => {
    const tests = [
      [[], sum, 0],
      [[1], sum, 0],
      [[1, 2, 3, 4, 5], sum, 0],
      [[...Array(100).keys()], sum, 0],
      [['a', 'b', 'c'], sum, ''],
      [[1, 2], sum],
      [[1, 2, 3], (x, y) => y],
    ];
    for (const [values, fn, initial] of tests) {
      const streamReduce = await Readable.from(values)
                                         .reduce(fn, initial);
      const arrayReduce = values.reduce(fn, initial);
      assert.deepStrictEqual(streamReduce, arrayReduce);
    }
    // Does the same thing as `(await stream.toArray()).reduce(...)` with an
    // asynchronous reducer
    for (const [values, fn, initial] of tests) {
      const streamReduce = await Readable.from(values)
                                         .map(async (x) => x)
                                         .reduce(fn, initial);
      const arrayReduce = values.reduce(fn, initial);
      assert.deepStrictEqual(streamReduce, arrayReduce);
    }
  })().then(common.mustCall());
}
{
  // Works with an async reducer, with or without initial value
  (async () => {
    const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
    assert.strictEqual(six, 6);
  })().then(common.mustCall());
  (async () => {
    const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
    assert.strictEqual(six, 6);
  })().then(common.mustCall());
}
{
  // Works lazily
  assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
    .map(common.mustCall((x) => {
      return x;
    }, 3)) // Two consumed and one buffered by `map` due to default concurrency
    .reduce(async (p, c) => {
      if (p === 1) {
        throw new Error('boom');
      }
      return c;
    }, 0)
  , /boom/).then(common.mustCall());
}

{
  // Support for AbortSignal
  const ac = new AbortController();
  assert.rejects(async () => {
    await Readable.from([1, 2, 3]).reduce(async (p, c) => {
      if (c === 3) {
        await new Promise(() => {}); // Explicitly do not pass signal here
      }
      return Promise.resolve();
    }, 0, { signal: ac.signal });
  }, {
    name: 'AbortError',
  }).then(common.mustCall());
  ac.abort();
}


{
  // Support for AbortSignal - pre aborted
  const stream = Readable.from([1, 2, 3]);
  assert.rejects(async () => {
    await stream.reduce(async (p, c) => {
      if (c === 3) {
        await new Promise(() => {}); // Explicitly do not pass signal here
      }
      return Promise.resolve();
    }, 0, { signal: AbortSignal.abort() });
  }, {
    name: 'AbortError',
  }).then(common.mustCall(() => {
    assert.strictEqual(stream.destroyed, true);
  }));
}

{
  // Support for AbortSignal - deep
  const stream = Readable.from([1, 2, 3]);
  assert.rejects(async () => {
    await stream.reduce(async (p, c, { signal }) => {
      signal.addEventListener('abort', common.mustCall(), { once: true });
      if (c === 3) {
        await new Promise(() => {}); // Explicitly do not pass signal here
      }
      return Promise.resolve();
    }, 0, { signal: AbortSignal.abort() });
  }, {
    name: 'AbortError',
  }).then(common.mustCall(() => {
    assert.strictEqual(stream.destroyed, true);
  }));
}

{
  // Error cases
  assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
  assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
  assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
  assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}

{
  // Test result is a Promise
  const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
  assert.ok(result instanceof Promise);
}