summaryrefslogtreecommitdiff
path: root/chromium/components/speech/upstream_loader.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/components/speech/upstream_loader.cc')
-rw-r--r--chromium/components/speech/upstream_loader.cc129
1 files changed, 129 insertions, 0 deletions
diff --git a/chromium/components/speech/upstream_loader.cc b/chromium/components/speech/upstream_loader.cc
new file mode 100644
index 00000000000..9a88c14b6ff
--- /dev/null
+++ b/chromium/components/speech/upstream_loader.cc
@@ -0,0 +1,129 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "components/speech/upstream_loader.h"
+
+#include "base/callback_forward.h"
+#include "components/speech/upstream_loader_client.h"
+
+namespace speech {
+
+UpstreamLoader::UpstreamLoader(
+ std::unique_ptr<network::ResourceRequest> resource_request,
+ net::NetworkTrafficAnnotationTag upstream_traffic_annotation,
+ network::mojom::URLLoaderFactory* url_loader_factory,
+ UpstreamLoaderClient* upstream_loader_client)
+ : upstream_loader_client_(upstream_loader_client) {
+ DCHECK(upstream_loader_client_);
+ // Attach a chunked upload body.
+ mojo::PendingRemote<network::mojom::ChunkedDataPipeGetter> data_remote;
+ receiver_set_.Add(this, data_remote.InitWithNewPipeAndPassReceiver());
+ resource_request->request_body = new network::ResourceRequestBody();
+ resource_request->request_body->SetToChunkedDataPipe(std::move(data_remote));
+ simple_url_loader_ = network::SimpleURLLoader::Create(
+ std::move(resource_request), upstream_traffic_annotation);
+ simple_url_loader_->DownloadToStringOfUnboundedSizeUntilCrashAndDie(
+ url_loader_factory,
+ base::BindOnce(&UpstreamLoader::OnComplete, base::Unretained(this)));
+}
+
+UpstreamLoader::~UpstreamLoader() = default;
+
+// Attempts to send more of the upload body, if more data is available, and
+// |upload_pipe_| is valid.
+void UpstreamLoader::SendData() {
+ DCHECK_LE(upload_position_, upload_body_.size());
+
+ if (!upload_pipe_.is_valid())
+ return;
+
+ // Nothing more to write yet, or done writing everything.
+ if (upload_position_ == upload_body_.size())
+ return;
+
+ // Since kMaxUploadWrite is a uint32_t, no overflow occurs in this downcast.
+ uint32_t write_bytes = std::min(upload_body_.length() - upload_position_,
+ static_cast<size_t>(kMaxUploadWrite));
+ MojoResult result =
+ upload_pipe_->WriteData(upload_body_.data() + upload_position_,
+ &write_bytes, MOJO_WRITE_DATA_FLAG_NONE);
+
+ // Wait for the pipe to have more capacity available, if needed.
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ upload_pipe_watcher_->ArmOrNotify();
+ return;
+ }
+
+ // Do nothing on pipe closure - depend on the SimpleURLLoader to notice the
+ // other pipes being closed on error. Can reach this point if there's a
+ // retry, for instance, so cannot draw any conclusions here.
+ if (result != MOJO_RESULT_OK)
+ return;
+
+ upload_position_ += write_bytes;
+ // If more data is available, arm the watcher again. Don't write again in a
+ // loop, even if WriteData would allow it, to avoid blocking the current
+ // thread.
+ if (upload_position_ < upload_body_.size())
+ upload_pipe_watcher_->ArmOrNotify();
+}
+
+void UpstreamLoader::AppendChunkToUpload(const std::string& data,
+ bool is_last_chunk) {
+ DCHECK(!has_last_chunk_);
+
+ upload_body_ += data;
+ if (is_last_chunk) {
+ // Send size before the rest of the body. While it doesn't matter much, if
+ // the other side receives the size before the last chunk, which Mojo does
+ // not guarantee, some protocols can merge the data and the last chunk
+ // itself into a single frame.
+ has_last_chunk_ = is_last_chunk;
+ if (get_size_callback_)
+ std::move(get_size_callback_).Run(net::OK, upload_body_.size());
+ }
+
+ SendData();
+}
+
+void UpstreamLoader::OnUploadPipeWriteable(MojoResult unused) {
+ SendData();
+}
+
+void UpstreamLoader::OnComplete(std::unique_ptr<std::string> response_body) {
+ int response_code = -1;
+ if (simple_url_loader_->ResponseInfo() &&
+ simple_url_loader_->ResponseInfo()->headers) {
+ response_code =
+ simple_url_loader_->ResponseInfo()->headers->response_code();
+ }
+ upstream_loader_client_->OnUpstreamDataComplete(response_body != nullptr,
+ response_code);
+}
+
+void UpstreamLoader::GetSize(GetSizeCallback get_size_callback) {
+ if (has_last_chunk_) {
+ std::move(get_size_callback).Run(net::OK, upload_body_.size());
+ } else {
+ get_size_callback_ = std::move(get_size_callback);
+ }
+}
+
+void UpstreamLoader::StartReading(mojo::ScopedDataPipeProducerHandle pipe) {
+ // Delete any existing pipe, if any.
+ upload_pipe_watcher_.reset();
+ upload_pipe_ = std::move(pipe);
+ upload_pipe_watcher_ = std::make_unique<mojo::SimpleWatcher>(
+ FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL);
+ upload_pipe_watcher_->Watch(
+ upload_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
+ base::BindRepeating(&UpstreamLoader::OnUploadPipeWriteable,
+ base::Unretained(this)));
+ upload_position_ = 0;
+
+ // Will attempt to start sending the request body, if any data is available.
+ SendData();
+}
+
+} // namespace speech