1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/speech/upstream_loader.h"
#include "base/callback_forward.h"
#include "components/speech/upstream_loader_client.h"
namespace speech {
UpstreamLoader::UpstreamLoader(
std::unique_ptr<network::ResourceRequest> resource_request,
net::NetworkTrafficAnnotationTag upstream_traffic_annotation,
network::mojom::URLLoaderFactory* url_loader_factory,
UpstreamLoaderClient* upstream_loader_client)
: upstream_loader_client_(upstream_loader_client) {
DCHECK(upstream_loader_client_);
// Attach a chunked upload body.
mojo::PendingRemote<network::mojom::ChunkedDataPipeGetter> data_remote;
receiver_set_.Add(this, data_remote.InitWithNewPipeAndPassReceiver());
resource_request->request_body = new network::ResourceRequestBody();
resource_request->request_body->SetToChunkedDataPipe(std::move(data_remote));
simple_url_loader_ = network::SimpleURLLoader::Create(
std::move(resource_request), upstream_traffic_annotation);
simple_url_loader_->DownloadToStringOfUnboundedSizeUntilCrashAndDie(
url_loader_factory,
base::BindOnce(&UpstreamLoader::OnComplete, base::Unretained(this)));
}
UpstreamLoader::~UpstreamLoader() = default;
// Attempts to send more of the upload body, if more data is available, and
// |upload_pipe_| is valid.
void UpstreamLoader::SendData() {
DCHECK_LE(upload_position_, upload_body_.size());
if (!upload_pipe_.is_valid())
return;
// Nothing more to write yet, or done writing everything.
if (upload_position_ == upload_body_.size())
return;
// Since kMaxUploadWrite is a uint32_t, no overflow occurs in this downcast.
uint32_t write_bytes = std::min(upload_body_.length() - upload_position_,
static_cast<size_t>(kMaxUploadWrite));
MojoResult result =
upload_pipe_->WriteData(upload_body_.data() + upload_position_,
&write_bytes, MOJO_WRITE_DATA_FLAG_NONE);
// Wait for the pipe to have more capacity available, if needed.
if (result == MOJO_RESULT_SHOULD_WAIT) {
upload_pipe_watcher_->ArmOrNotify();
return;
}
// Do nothing on pipe closure - depend on the SimpleURLLoader to notice the
// other pipes being closed on error. Can reach this point if there's a
// retry, for instance, so cannot draw any conclusions here.
if (result != MOJO_RESULT_OK)
return;
upload_position_ += write_bytes;
// If more data is available, arm the watcher again. Don't write again in a
// loop, even if WriteData would allow it, to avoid blocking the current
// thread.
if (upload_position_ < upload_body_.size())
upload_pipe_watcher_->ArmOrNotify();
}
void UpstreamLoader::AppendChunkToUpload(const std::string& data,
bool is_last_chunk) {
DCHECK(!has_last_chunk_);
upload_body_ += data;
if (is_last_chunk) {
// Send size before the rest of the body. While it doesn't matter much, if
// the other side receives the size before the last chunk, which Mojo does
// not guarantee, some protocols can merge the data and the last chunk
// itself into a single frame.
has_last_chunk_ = is_last_chunk;
if (get_size_callback_)
std::move(get_size_callback_).Run(net::OK, upload_body_.size());
}
SendData();
}
void UpstreamLoader::OnUploadPipeWriteable(MojoResult unused) {
SendData();
}
void UpstreamLoader::OnComplete(std::unique_ptr<std::string> response_body) {
int response_code = -1;
if (simple_url_loader_->ResponseInfo() &&
simple_url_loader_->ResponseInfo()->headers) {
response_code =
simple_url_loader_->ResponseInfo()->headers->response_code();
}
upstream_loader_client_->OnUpstreamDataComplete(response_body != nullptr,
response_code);
}
void UpstreamLoader::GetSize(GetSizeCallback get_size_callback) {
if (has_last_chunk_) {
std::move(get_size_callback).Run(net::OK, upload_body_.size());
} else {
get_size_callback_ = std::move(get_size_callback);
}
}
void UpstreamLoader::StartReading(mojo::ScopedDataPipeProducerHandle pipe) {
// Delete any existing pipe, if any.
upload_pipe_watcher_.reset();
upload_pipe_ = std::move(pipe);
upload_pipe_watcher_ = std::make_unique<mojo::SimpleWatcher>(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL);
upload_pipe_watcher_->Watch(
upload_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
base::BindRepeating(&UpstreamLoader::OnUploadPipeWriteable,
base::Unretained(this)));
upload_position_ = 0;
// Will attempt to start sending the request body, if any data is available.
SendData();
}
} // namespace speech
|