summaryrefslogtreecommitdiff
path: root/chromium/components/speech/upstream_loader.cc
blob: 9a88c14b6ff2acb6e787073b44cec37c91d9cb8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "components/speech/upstream_loader.h"

#include "base/callback_forward.h"
#include "components/speech/upstream_loader_client.h"

namespace speech {

UpstreamLoader::UpstreamLoader(
    std::unique_ptr<network::ResourceRequest> resource_request,
    net::NetworkTrafficAnnotationTag upstream_traffic_annotation,
    network::mojom::URLLoaderFactory* url_loader_factory,
    UpstreamLoaderClient* upstream_loader_client)
    : upstream_loader_client_(upstream_loader_client) {
  DCHECK(upstream_loader_client_);
  // Attach a chunked upload body.
  mojo::PendingRemote<network::mojom::ChunkedDataPipeGetter> data_remote;
  receiver_set_.Add(this, data_remote.InitWithNewPipeAndPassReceiver());
  resource_request->request_body = new network::ResourceRequestBody();
  resource_request->request_body->SetToChunkedDataPipe(std::move(data_remote));
  simple_url_loader_ = network::SimpleURLLoader::Create(
      std::move(resource_request), upstream_traffic_annotation);
  simple_url_loader_->DownloadToStringOfUnboundedSizeUntilCrashAndDie(
      url_loader_factory,
      base::BindOnce(&UpstreamLoader::OnComplete, base::Unretained(this)));
}

UpstreamLoader::~UpstreamLoader() = default;

// Attempts to send more of the upload body, if more data is available, and
// |upload_pipe_| is valid.
void UpstreamLoader::SendData() {
  DCHECK_LE(upload_position_, upload_body_.size());

  if (!upload_pipe_.is_valid())
    return;

  // Nothing more to write yet, or done writing everything.
  if (upload_position_ == upload_body_.size())
    return;

  // Since kMaxUploadWrite is a uint32_t, no overflow occurs in this downcast.
  uint32_t write_bytes = std::min(upload_body_.length() - upload_position_,
                                  static_cast<size_t>(kMaxUploadWrite));
  MojoResult result =
      upload_pipe_->WriteData(upload_body_.data() + upload_position_,
                              &write_bytes, MOJO_WRITE_DATA_FLAG_NONE);

  // Wait for the pipe to have more capacity available, if needed.
  if (result == MOJO_RESULT_SHOULD_WAIT) {
    upload_pipe_watcher_->ArmOrNotify();
    return;
  }

  // Do nothing on pipe closure - depend on the SimpleURLLoader to notice the
  // other pipes being closed on error. Can reach this point if there's a
  // retry, for instance, so cannot draw any conclusions here.
  if (result != MOJO_RESULT_OK)
    return;

  upload_position_ += write_bytes;
  // If more data is available, arm the watcher again. Don't write again in a
  // loop, even if WriteData would allow it, to avoid blocking the current
  // thread.
  if (upload_position_ < upload_body_.size())
    upload_pipe_watcher_->ArmOrNotify();
}

void UpstreamLoader::AppendChunkToUpload(const std::string& data,
                                         bool is_last_chunk) {
  DCHECK(!has_last_chunk_);

  upload_body_ += data;
  if (is_last_chunk) {
    // Send size before the rest of the body. While it doesn't matter much, if
    // the other side receives the size before the last chunk, which Mojo does
    // not guarantee, some protocols can merge the data and the last chunk
    // itself into a single frame.
    has_last_chunk_ = is_last_chunk;
    if (get_size_callback_)
      std::move(get_size_callback_).Run(net::OK, upload_body_.size());
  }

  SendData();
}

void UpstreamLoader::OnUploadPipeWriteable(MojoResult unused) {
  SendData();
}

void UpstreamLoader::OnComplete(std::unique_ptr<std::string> response_body) {
  int response_code = -1;
  if (simple_url_loader_->ResponseInfo() &&
      simple_url_loader_->ResponseInfo()->headers) {
    response_code =
        simple_url_loader_->ResponseInfo()->headers->response_code();
  }
  upstream_loader_client_->OnUpstreamDataComplete(response_body != nullptr,
                                                  response_code);
}

void UpstreamLoader::GetSize(GetSizeCallback get_size_callback) {
  if (has_last_chunk_) {
    std::move(get_size_callback).Run(net::OK, upload_body_.size());
  } else {
    get_size_callback_ = std::move(get_size_callback);
  }
}

void UpstreamLoader::StartReading(mojo::ScopedDataPipeProducerHandle pipe) {
  // Delete any existing pipe, if any.
  upload_pipe_watcher_.reset();
  upload_pipe_ = std::move(pipe);
  upload_pipe_watcher_ = std::make_unique<mojo::SimpleWatcher>(
      FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL);
  upload_pipe_watcher_->Watch(
      upload_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
      base::BindRepeating(&UpstreamLoader::OnUploadPipeWriteable,
                          base::Unretained(this)));
  upload_position_ = 0;

  // Will attempt to start sending the request body, if any data is available.
  SendData();
}

}  // namespace speech