diff options
author | Zeno Albisser <zeno.albisser@digia.com> | 2013-08-15 21:46:11 +0200 |
---|---|---|
committer | Zeno Albisser <zeno.albisser@digia.com> | 2013-08-15 21:46:11 +0200 |
commit | 679147eead574d186ebf3069647b4c23e8ccace6 (patch) | |
tree | fc247a0ac8ff119f7c8550879ebb6d3dd8d1ff69 /chromium/content/browser/streams | |
download | qtwebengine-chromium-679147eead574d186ebf3069647b4c23e8ccace6.tar.gz |
Initial import.
Diffstat (limited to 'chromium/content/browser/streams')
-rw-r--r-- | chromium/content/browser/streams/OWNERS | 1 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream.cc | 156 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream.h | 115 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_context.cc | 57 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_context.h | 60 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_handle_impl.cc | 38 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_handle_impl.h | 40 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_read_observer.h | 26 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_registry.cc | 48 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_registry.h | 51 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_unittest.cc | 253 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_url_request_job.cc | 238 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_url_request_job.h | 66 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_url_request_job_unittest.cc | 173 | ||||
-rw-r--r-- | chromium/content/browser/streams/stream_write_observer.h | 27 |
15 files changed, 1349 insertions, 0 deletions
diff --git a/chromium/content/browser/streams/OWNERS b/chromium/content/browser/streams/OWNERS new file mode 100644 index 00000000000..8192dfb0eff --- /dev/null +++ b/chromium/content/browser/streams/OWNERS @@ -0,0 +1 @@ +zork@chromium.org diff --git a/chromium/content/browser/streams/stream.cc b/chromium/content/browser/streams/stream.cc new file mode 100644 index 00000000000..6026df9e7c7 --- /dev/null +++ b/chromium/content/browser/streams/stream.cc @@ -0,0 +1,156 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream.h" + +#include "base/bind.h" +#include "base/location.h" +#include "base/message_loop/message_loop_proxy.h" +#include "content/browser/streams/stream_handle_impl.h" +#include "content/browser/streams/stream_read_observer.h" +#include "content/browser/streams/stream_registry.h" +#include "content/browser/streams/stream_write_observer.h" +#include "net/base/io_buffer.h" + +namespace { +// Start throttling the connection at about 1MB. +const size_t kDeferSizeThreshold = 40 * 32768; +} + +namespace content { + +Stream::Stream(StreamRegistry* registry, + StreamWriteObserver* write_observer, + const GURL& url) + : data_bytes_read_(0), + can_add_data_(true), + url_(url), + data_length_(0), + registry_(registry), + read_observer_(NULL), + write_observer_(write_observer), + stream_handle_(NULL), + weak_ptr_factory_(this) { + CreateByteStream(base::MessageLoopProxy::current(), + base::MessageLoopProxy::current(), + kDeferSizeThreshold, + &writer_, + &reader_); + + // Setup callback for writing. + writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, + weak_ptr_factory_.GetWeakPtr())); + reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, + weak_ptr_factory_.GetWeakPtr())); + + registry_->RegisterStream(this); +} + +Stream::~Stream() { +} + +bool Stream::SetReadObserver(StreamReadObserver* observer) { + if (read_observer_) + return false; + read_observer_ = observer; + return true; +} + +void Stream::RemoveReadObserver(StreamReadObserver* observer) { + DCHECK(observer == read_observer_); + read_observer_ = NULL; +} + +void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { + DCHECK(observer == write_observer_); + write_observer_ = NULL; +} + +void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { + can_add_data_ = writer_->Write(buffer, size); +} + +void Stream::AddData(const char* data, size_t size) { + scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); + memcpy(io_buffer->data(), data, size); + can_add_data_ = writer_->Write(io_buffer, size); +} + +void Stream::Finalize() { + writer_->Close(0); + writer_.reset(NULL); + + // Continue asynchronously. + base::MessageLoopProxy::current()->PostTask( + FROM_HERE, + base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); +} + +Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, + int buf_size, + int* bytes_read) { + DCHECK(buf); + DCHECK(bytes_read); + + *bytes_read = 0; + if (!data_.get()) { + data_length_ = 0; + data_bytes_read_ = 0; + ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); + switch (state) { + case ByteStreamReader::STREAM_HAS_DATA: + break; + case ByteStreamReader::STREAM_COMPLETE: + registry_->UnregisterStream(url()); + return STREAM_COMPLETE; + case ByteStreamReader::STREAM_EMPTY: + return STREAM_EMPTY; + } + } + + const size_t remaining_bytes = data_length_ - data_bytes_read_; + size_t to_read = + static_cast<size_t>(buf_size) < remaining_bytes ? + buf_size : remaining_bytes; + memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); + data_bytes_read_ += to_read; + if (data_bytes_read_ >= data_length_) + data_ = NULL; + + *bytes_read = to_read; + return STREAM_HAS_DATA; +} + +scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url, + const std::string& mime_type) { + CHECK(!stream_handle_); + stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), + original_url, + mime_type); + return scoped_ptr<StreamHandle>(stream_handle_).Pass(); +} + +void Stream::CloseHandle() { + // Prevent deletion until this function ends. + scoped_refptr<Stream> ref(this); + + CHECK(stream_handle_); + stream_handle_ = NULL; + registry_->UnregisterStream(url()); + if (write_observer_) + write_observer_->OnClose(this); +} + +void Stream::OnSpaceAvailable() { + can_add_data_ = true; + if (write_observer_) + write_observer_->OnSpaceAvailable(this); +} + +void Stream::OnDataAvailable() { + if (read_observer_) + read_observer_->OnDataAvailable(this); +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream.h b/chromium/content/browser/streams/stream.h new file mode 100644 index 00000000000..85edc884081 --- /dev/null +++ b/chromium/content/browser/streams/stream.h @@ -0,0 +1,115 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_H_ + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/memory/weak_ptr.h" +#include "content/browser/byte_stream.h" +#include "content/common/content_export.h" +#include "url/gurl.h" + +namespace net { +class IOBuffer; +} + +namespace content { + +class StreamHandle; +class StreamHandleImpl; +class StreamReadObserver; +class StreamRegistry; +class StreamWriteObserver; + +// A stream that sends data from an arbitrary source to an internal URL +// that can be read by an internal consumer. It will continue to pull from the +// original URL as long as there is data available. It can be read from +// multiple clients, but only one can be reading at a time. This allows a +// reader to consume part of the stream, then pass it along to another client +// to continue processing the stream. +class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> { + public: + enum StreamState { + STREAM_HAS_DATA, + STREAM_COMPLETE, + STREAM_EMPTY, + }; + + // Creates a stream. + // + // Security origin of Streams is checked in Blink (See BlobRegistry, + // BlobURL and SecurityOrigin to understand how it works). There's no security + // origin check in Chromium side for now. + Stream(StreamRegistry* registry, + StreamWriteObserver* write_observer, + const GURL& url); + + // Sets the reader of this stream. Returns true on success, or false if there + // is already a reader. + bool SetReadObserver(StreamReadObserver* observer); + + // Removes the read observer. |observer| must be the current observer. + void RemoveReadObserver(StreamReadObserver* observer); + + // Removes the write observer. |observer| must be the current observer. + void RemoveWriteObserver(StreamWriteObserver* observer); + + // Adds the data in |buffer| to the stream. Takes ownership of |buffer|. + void AddData(scoped_refptr<net::IOBuffer> buffer, size_t size); + // Adds data of |size| at |data| to the stream. This method creates a copy + // of the data, and then passes it to |writer_|. + void AddData(const char* data, size_t size); + + // Notifies this stream that it will not be receiving any more data. + void Finalize(); + + // Reads a maximum of |buf_size| from the stream into |buf|. Sets + // |*bytes_read| to the number of bytes actually read. + // Returns STREAM_HAS_DATA if data was read, STREAM_EMPTY if no data was read, + // and STREAM_COMPLETE if the stream is finalized and all data has been read. + StreamState ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read); + + scoped_ptr<StreamHandle> CreateHandle(const GURL& original_url, + const std::string& mime_type); + void CloseHandle(); + + // Indicates whether there is space in the buffer to add more data. + bool can_add_data() const { return can_add_data_; } + + const GURL& url() const { return url_; } + + private: + friend class base::RefCountedThreadSafe<Stream>; + + virtual ~Stream(); + + void OnSpaceAvailable(); + void OnDataAvailable(); + + size_t data_bytes_read_; + bool can_add_data_; + + GURL url_; + + scoped_refptr<net::IOBuffer> data_; + size_t data_length_; + + scoped_ptr<ByteStreamWriter> writer_; + scoped_ptr<ByteStreamReader> reader_; + + StreamRegistry* registry_; + StreamReadObserver* read_observer_; + StreamWriteObserver* write_observer_; + + StreamHandleImpl* stream_handle_; + + base::WeakPtrFactory<Stream> weak_ptr_factory_; + DISALLOW_COPY_AND_ASSIGN(Stream); +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_H_ diff --git a/chromium/content/browser/streams/stream_context.cc b/chromium/content/browser/streams/stream_context.cc new file mode 100644 index 00000000000..44ca0f4a1cd --- /dev/null +++ b/chromium/content/browser/streams/stream_context.cc @@ -0,0 +1,57 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream_context.h" + +#include "base/bind.h" +#include "content/browser/streams/stream_registry.h" +#include "content/public/browser/browser_context.h" + +using base::UserDataAdapter; + +namespace { +const char* kStreamContextKeyName = "content_stream_context"; +} + +namespace content { + +StreamContext::StreamContext() {} + +StreamContext* StreamContext::GetFor(BrowserContext* context) { + if (!context->GetUserData(kStreamContextKeyName)) { + scoped_refptr<StreamContext> stream = new StreamContext(); + context->SetUserData(kStreamContextKeyName, + new UserDataAdapter<StreamContext>(stream.get())); + // Check first to avoid memory leak in unittests. + if (BrowserThread::IsMessageLoopValid(BrowserThread::IO)) { + BrowserThread::PostTask( + BrowserThread::IO, FROM_HERE, + base::Bind(&StreamContext::InitializeOnIOThread, stream)); + } + } + + return UserDataAdapter<StreamContext>::Get(context, kStreamContextKeyName); +} + +void StreamContext::InitializeOnIOThread() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); + registry_.reset(new StreamRegistry()); +} + +StreamContext::~StreamContext() {} + +void StreamContext::DeleteOnCorrectThread() const { + // In many tests, there isn't a valid IO thread. In that case, just delete on + // the current thread. + // TODO(zork): Remove this custom deleter, and fix the leaks in all the + // tests. + if (BrowserThread::IsMessageLoopValid(BrowserThread::IO) && + !BrowserThread::CurrentlyOn(BrowserThread::IO)) { + BrowserThread::DeleteSoon(BrowserThread::IO, FROM_HERE, this); + return; + } + delete this; +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream_context.h b/chromium/content/browser/streams/stream_context.h new file mode 100644 index 00000000000..fb6cdc720ff --- /dev/null +++ b/chromium/content/browser/streams/stream_context.h @@ -0,0 +1,60 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_ + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/sequenced_task_runner_helpers.h" +#include "content/common/content_export.h" +#include "content/public/browser/browser_thread.h" + +namespace content { +class BrowserContext; +class StreamRegistry; +struct StreamContextDeleter; + +// A context class that keeps track of StreamRegistry used by the chrome. +// There is an instance associated with each BrowserContext. There could be +// multiple URLRequestContexts in the same browser context that refers to the +// same instance. +// +// All methods, except the ctor, are expected to be called on +// the IO thread (unless specifically called out in doc comments). +class StreamContext + : public base::RefCountedThreadSafe<StreamContext, + StreamContextDeleter> { + public: + StreamContext(); + + CONTENT_EXPORT static StreamContext* GetFor(BrowserContext* browser_context); + + void InitializeOnIOThread(); + + StreamRegistry* registry() const { return registry_.get(); } + + protected: + virtual ~StreamContext(); + + private: + friend class base::DeleteHelper<StreamContext>; + friend class base::RefCountedThreadSafe<StreamContext, + StreamContextDeleter>; + friend struct StreamContextDeleter; + + void DeleteOnCorrectThread() const; + + scoped_ptr<StreamRegistry> registry_; +}; + +struct StreamContextDeleter { + static void Destruct(const StreamContext* context) { + context->DeleteOnCorrectThread(); + } +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_ diff --git a/chromium/content/browser/streams/stream_handle_impl.cc b/chromium/content/browser/streams/stream_handle_impl.cc new file mode 100644 index 00000000000..d9f877cbc76 --- /dev/null +++ b/chromium/content/browser/streams/stream_handle_impl.cc @@ -0,0 +1,38 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream_handle_impl.h" + +#include "content/browser/streams/stream.h" +#include "content/public/browser/browser_thread.h" + +namespace content { + +StreamHandleImpl::StreamHandleImpl(const base::WeakPtr<Stream>& stream, + const GURL& original_url, + const std::string& mime_type) + : stream_(stream), + url_(stream->url()), + original_url_(original_url), + mime_type_(mime_type), + stream_message_loop_(base::MessageLoopProxy::current().get()) {} + +StreamHandleImpl::~StreamHandleImpl() { + stream_message_loop_->PostTask(FROM_HERE, + base::Bind(&Stream::CloseHandle, stream_)); +} + +const GURL& StreamHandleImpl::GetURL() { + return url_; +} + +const GURL& StreamHandleImpl::GetOriginalURL() { + return original_url_; +} + +const std::string& StreamHandleImpl::GetMimeType() { + return mime_type_; +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream_handle_impl.h b/chromium/content/browser/streams/stream_handle_impl.h new file mode 100644 index 00000000000..aabd0cb7dfe --- /dev/null +++ b/chromium/content/browser/streams/stream_handle_impl.h @@ -0,0 +1,40 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_HANDLE_IMPL_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_HANDLE_IMPL_H_ + +#include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/synchronization/lock.h" +#include "content/public/browser/stream_handle.h" + +namespace content { + +class Stream; + +class StreamHandleImpl : public StreamHandle { + public: + StreamHandleImpl(const base::WeakPtr<Stream>& stream, + const GURL& original_url, + const std::string& mime_type); + virtual ~StreamHandleImpl(); + + private: + // StreamHandle overrides + virtual const GURL& GetURL() OVERRIDE; + virtual const GURL& GetOriginalURL() OVERRIDE; + virtual const std::string& GetMimeType() OVERRIDE; + + base::WeakPtr<Stream> stream_; + GURL url_; + GURL original_url_; + std::string mime_type_; + base::MessageLoopProxy* stream_message_loop_; +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_HANDLE_IMPL_H_ + diff --git a/chromium/content/browser/streams/stream_read_observer.h b/chromium/content/browser/streams/stream_read_observer.h new file mode 100644 index 00000000000..08fd657ac01 --- /dev/null +++ b/chromium/content/browser/streams/stream_read_observer.h @@ -0,0 +1,26 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_ + +#include "content/common/content_export.h" + +namespace content { + +class Stream; + +class CONTENT_EXPORT StreamReadObserver { + public: + // Sent when there is data available to be read from the stream. + virtual void OnDataAvailable(Stream* stream) = 0; + + protected: + virtual ~StreamReadObserver() {} +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_ + diff --git a/chromium/content/browser/streams/stream_registry.cc b/chromium/content/browser/streams/stream_registry.cc new file mode 100644 index 00000000000..39d24b39f5f --- /dev/null +++ b/chromium/content/browser/streams/stream_registry.cc @@ -0,0 +1,48 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream_registry.h" + +#include "content/browser/streams/stream.h" + +namespace content { + +StreamRegistry::StreamRegistry() { +} + +StreamRegistry::~StreamRegistry() { +} + +void StreamRegistry::RegisterStream(scoped_refptr<Stream> stream) { + DCHECK(CalledOnValidThread()); + DCHECK(stream.get()); + DCHECK(!stream->url().is_empty()); + streams_[stream->url()] = stream; +} + +scoped_refptr<Stream> StreamRegistry::GetStream(const GURL& url) { + DCHECK(CalledOnValidThread()); + StreamMap::const_iterator stream = streams_.find(url); + if (stream != streams_.end()) + return stream->second; + + return NULL; +} + +bool StreamRegistry::CloneStream(const GURL& url, const GURL& src_url) { + DCHECK(CalledOnValidThread()); + scoped_refptr<Stream> stream(GetStream(src_url)); + if (stream.get()) { + streams_[url] = stream; + return true; + } + return false; +} + +void StreamRegistry::UnregisterStream(const GURL& url) { + DCHECK(CalledOnValidThread()); + streams_.erase(url); +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream_registry.h b/chromium/content/browser/streams/stream_registry.h new file mode 100644 index 00000000000..e75c97c1699 --- /dev/null +++ b/chromium/content/browser/streams/stream_registry.h @@ -0,0 +1,51 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_ + +#include <map> + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/threading/non_thread_safe.h" +#include "content/common/content_export.h" +#include "url/gurl.h" + +namespace content { + +class Stream; + +// Maintains a mapping of blob: URLs to active streams. +class CONTENT_EXPORT StreamRegistry : public base::NonThreadSafe { + public: + StreamRegistry(); + virtual ~StreamRegistry(); + + // Registers a stream, and sets its URL. + void RegisterStream(scoped_refptr<Stream> stream); + + // Clones a stream. Returns true on success, or false if |src_url| doesn't + // exist. + bool CloneStream(const GURL& url, const GURL& src_url); + + void UnregisterStream(const GURL& url); + + // Gets the stream associated with |url|. Returns NULL if there is no such + // stream. + scoped_refptr<Stream> GetStream(const GURL& url); + + private: + typedef std::map<GURL, scoped_refptr<Stream> > StreamMap; + + StreamMap streams_; + + DISALLOW_COPY_AND_ASSIGN(StreamRegistry); +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_ + + diff --git a/chromium/content/browser/streams/stream_unittest.cc b/chromium/content/browser/streams/stream_unittest.cc new file mode 100644 index 00000000000..c0077b7375b --- /dev/null +++ b/chromium/content/browser/streams/stream_unittest.cc @@ -0,0 +1,253 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop/message_loop.h" +#include "base/test/test_simple_task_runner.h" +#include "content/browser/streams/stream.h" +#include "content/browser/streams/stream_read_observer.h" +#include "content/browser/streams/stream_registry.h" +#include "content/browser/streams/stream_write_observer.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace content { + +class StreamTest : public testing::Test { + public: + StreamTest() : producing_seed_key_(0) {} + + virtual void SetUp() OVERRIDE { + registry_.reset(new StreamRegistry()); + } + + // Create a new IO buffer of the given |buffer_size| and fill it with random + // data. + scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) { + scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size)); + char *bufferp = buffer->data(); + for (size_t i = 0; i < buffer_size; i++) + bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char)); + ++producing_seed_key_; + return buffer; + } + + protected: + base::MessageLoop message_loop_; + scoped_ptr<StreamRegistry> registry_; + + private: + int producing_seed_key_; +}; + +class TestStreamReader : public StreamReadObserver { + public: + TestStreamReader() : buffer_(new net::GrowableIOBuffer()), completed_(false) { + } + virtual ~TestStreamReader() {} + + void Read(Stream* stream) { + const size_t kBufferSize = 32768; + scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize)); + + int bytes_read = 0; + while (true) { + Stream::StreamState state = + stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read); + switch (state) { + case Stream::STREAM_HAS_DATA: + // TODO(tyoshino): Move these expectations to the beginning of Read() + // method once Stream::Finalize() is fixed. + EXPECT_FALSE(completed_); + break; + case Stream::STREAM_COMPLETE: + completed_ = true; + return; + case Stream::STREAM_EMPTY: + EXPECT_FALSE(completed_); + return; + } + size_t old_capacity = buffer_->capacity(); + buffer_->SetCapacity(old_capacity + bytes_read); + memcpy(buffer_->StartOfBuffer() + old_capacity, + buffer->data(), bytes_read); + } + } + + virtual void OnDataAvailable(Stream* stream) OVERRIDE { + Read(stream); + } + + scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; } + + bool completed() const { + return completed_; + } + + private: + scoped_refptr<net::GrowableIOBuffer> buffer_; + bool completed_; +}; + +class TestStreamWriter : public StreamWriteObserver { + public: + TestStreamWriter() {} + virtual ~TestStreamWriter() {} + + void Write(Stream* stream, + scoped_refptr<net::IOBuffer> buffer, + size_t buffer_size) { + stream->AddData(buffer, buffer_size); + } + + virtual void OnSpaceAvailable(Stream* stream) OVERRIDE { + } + + virtual void OnClose(Stream* stream) OVERRIDE { + } +}; + +TEST_F(StreamTest, SetReadObserver) { + TestStreamReader reader; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, url)); + EXPECT_TRUE(stream->SetReadObserver(&reader)); +} + +TEST_F(StreamTest, SetReadObserver_SecondFails) { + TestStreamReader reader1; + TestStreamReader reader2; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, url)); + EXPECT_TRUE(stream->SetReadObserver(&reader1)); + EXPECT_FALSE(stream->SetReadObserver(&reader2)); +} + +TEST_F(StreamTest, SetReadObserver_TwoReaders) { + TestStreamReader reader1; + TestStreamReader reader2; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, url)); + EXPECT_TRUE(stream->SetReadObserver(&reader1)); + + // Once the first read observer is removed, a new one can be added. + stream->RemoveReadObserver(&reader1); + EXPECT_TRUE(stream->SetReadObserver(&reader2)); +} + +TEST_F(StreamTest, Stream) { + TestStreamReader reader; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, url)); + EXPECT_TRUE(stream->SetReadObserver(&reader)); + + const int kBufferSize = 1000000; + scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize)); + writer.Write(stream.get(), buffer, kBufferSize); + stream->Finalize(); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(reader.completed()); + + ASSERT_EQ(reader.buffer()->capacity(), kBufferSize); + for (int i = 0; i < kBufferSize; i++) + EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]); +} + +// Test that even if a reader receives an empty buffer, once TransferData() +// method is called on it with |source_complete| = true, following Read() calls +// on it never returns STREAM_EMPTY. Together with StreamTest.Stream above, this +// guarantees that Reader::Read() call returns only STREAM_HAS_DATA +// or STREAM_COMPLETE in |data_available_callback_| call corresponding to +// Writer::Close(). +TEST_F(StreamTest, ClosedReaderDoesNotReturnStreamEmpty) { + TestStreamReader reader; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, url)); + EXPECT_TRUE(stream->SetReadObserver(&reader)); + + const int kBufferSize = 0; + scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize)); + stream->AddData(buffer, kBufferSize); + stream->Finalize(); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(reader.completed()); + EXPECT_EQ(0, reader.buffer()->capacity()); +} + +TEST_F(StreamTest, GetStream) { + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, url)); + + scoped_refptr<Stream> stream2 = registry_->GetStream(url); + ASSERT_EQ(stream1, stream2); +} + +TEST_F(StreamTest, GetStream_Missing) { + TestStreamWriter writer; + + GURL url1("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, url1)); + + GURL url2("blob://stream2"); + scoped_refptr<Stream> stream2 = registry_->GetStream(url2); + ASSERT_FALSE(stream2.get()); +} + +TEST_F(StreamTest, CloneStream) { + TestStreamWriter writer; + + GURL url1("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, url1)); + + GURL url2("blob://stream2"); + ASSERT_TRUE(registry_->CloneStream(url2, url1)); + scoped_refptr<Stream> stream2 = registry_->GetStream(url2); + ASSERT_EQ(stream1, stream2); +} + +TEST_F(StreamTest, CloneStream_Missing) { + TestStreamWriter writer; + + GURL url1("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, url1)); + + GURL url2("blob://stream2"); + GURL url3("blob://stream3"); + ASSERT_FALSE(registry_->CloneStream(url2, url3)); + scoped_refptr<Stream> stream2 = registry_->GetStream(url2); + ASSERT_FALSE(stream2.get()); +} + +TEST_F(StreamTest, UnregisterStream) { + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, url)); + + registry_->UnregisterStream(url); + scoped_refptr<Stream> stream2 = registry_->GetStream(url); + ASSERT_FALSE(stream2.get()); +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream_url_request_job.cc b/chromium/content/browser/streams/stream_url_request_job.cc new file mode 100644 index 00000000000..0965178de2c --- /dev/null +++ b/chromium/content/browser/streams/stream_url_request_job.cc @@ -0,0 +1,238 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream_url_request_job.h" + +#include "base/strings/string_number_conversions.h" +#include "content/browser/streams/stream.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/http/http_byte_range.h" +#include "net/http/http_response_headers.h" +#include "net/http/http_response_info.h" +#include "net/http/http_util.h" +#include "net/url_request/url_request.h" + +namespace content { + +StreamURLRequestJob::StreamURLRequestJob( + net::URLRequest* request, + net::NetworkDelegate* network_delegate, + scoped_refptr<Stream> stream) + : net::URLRequestJob(request, network_delegate), + weak_factory_(this), + stream_(stream), + headers_set_(false), + pending_buffer_size_(0), + total_bytes_read_(0), + max_range_(0), + request_failed_(false) { + DCHECK(stream_.get()); + stream_->SetReadObserver(this); +} + +StreamURLRequestJob::~StreamURLRequestJob() { + ClearStream(); +} + +void StreamURLRequestJob::OnDataAvailable(Stream* stream) { + // Clear the IO_PENDING status. + SetStatus(net::URLRequestStatus()); + // Do nothing if pending_buffer_ is empty, i.e. there's no ReadRawData() + // operation waiting for IO completion. + if (!pending_buffer_.get()) + return; + + // pending_buffer_ is set to the IOBuffer instance provided to ReadRawData() + // by URLRequestJob. + + int bytes_read; + switch (stream_->ReadRawData( + pending_buffer_.get(), pending_buffer_size_, &bytes_read)) { + case Stream::STREAM_HAS_DATA: + DCHECK_GT(bytes_read, 0); + break; + case Stream::STREAM_COMPLETE: + // Ensure this. Calling NotifyReadComplete call with 0 signals + // completion. + bytes_read = 0; + break; + case Stream::STREAM_EMPTY: + NOTREACHED(); + break; + } + + // Clear the buffers before notifying the read is complete, so that it is + // safe for the observer to read. + pending_buffer_ = NULL; + pending_buffer_size_ = 0; + + total_bytes_read_ += bytes_read; + NotifyReadComplete(bytes_read); +} + +// net::URLRequestJob methods. +void StreamURLRequestJob::Start() { + // Continue asynchronously. + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&StreamURLRequestJob::DidStart, weak_factory_.GetWeakPtr())); +} + +void StreamURLRequestJob::Kill() { + net::URLRequestJob::Kill(); + weak_factory_.InvalidateWeakPtrs(); + ClearStream(); +} + +bool StreamURLRequestJob::ReadRawData(net::IOBuffer* buf, + int buf_size, + int* bytes_read) { + if (request_failed_) + return true; + + DCHECK(buf); + DCHECK(bytes_read); + int to_read = buf_size; + if (max_range_ && to_read) { + if (to_read + total_bytes_read_ > max_range_) + to_read = max_range_ - total_bytes_read_; + + if (to_read <= 0) { + *bytes_read = 0; + return true; + } + } + + switch (stream_->ReadRawData(buf, to_read, bytes_read)) { + case Stream::STREAM_HAS_DATA: + case Stream::STREAM_COMPLETE: + total_bytes_read_ += *bytes_read; + return true; + case Stream::STREAM_EMPTY: + pending_buffer_ = buf; + pending_buffer_size_ = to_read; + SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0)); + return false; + } + NOTREACHED(); + return false; +} + +bool StreamURLRequestJob::GetMimeType(std::string* mime_type) const { + if (!response_info_) + return false; + + // TODO(zork): Support registered MIME types if needed. + return response_info_->headers->GetMimeType(mime_type); +} + +void StreamURLRequestJob::GetResponseInfo(net::HttpResponseInfo* info) { + if (response_info_) + *info = *response_info_; +} + +int StreamURLRequestJob::GetResponseCode() const { + if (!response_info_) + return -1; + + return response_info_->headers->response_code(); +} + +void StreamURLRequestJob::SetExtraRequestHeaders( + const net::HttpRequestHeaders& headers) { + std::string range_header; + if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) { + std::vector<net::HttpByteRange> ranges; + if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) { + if (ranges.size() == 1) { + // Streams don't support seeking, so a non-zero starting position + // doesn't make sense. + if (ranges[0].first_byte_position() == 0) { + max_range_ = ranges[0].last_byte_position() + 1; + } else { + NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED); + return; + } + } else { + NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED); + return; + } + } + } +} + +void StreamURLRequestJob::DidStart() { + // We only support GET request. + if (request()->method() != "GET") { + NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED); + return; + } + + HeadersCompleted(net::HTTP_OK); +} + +void StreamURLRequestJob::NotifyFailure(int error_code) { + request_failed_ = true; + + // If we already return the headers on success, we can't change the headers + // now. Instead, we just error out. + if (headers_set_) { + NotifyDone(net::URLRequestStatus(net::URLRequestStatus::FAILED, + error_code)); + return; + } + + // TODO(zork): Share these with BlobURLRequestJob. + net::HttpStatusCode status_code = net::HTTP_INTERNAL_SERVER_ERROR; + switch (error_code) { + case net::ERR_ACCESS_DENIED: + status_code = net::HTTP_FORBIDDEN; + break; + case net::ERR_FILE_NOT_FOUND: + status_code = net::HTTP_NOT_FOUND; + break; + case net::ERR_METHOD_NOT_SUPPORTED: + status_code = net::HTTP_METHOD_NOT_ALLOWED; + break; + case net::ERR_FAILED: + break; + default: + DCHECK(false); + break; + } + HeadersCompleted(status_code); +} + +void StreamURLRequestJob::HeadersCompleted(net::HttpStatusCode status_code) { + std::string status("HTTP/1.1 "); + status.append(base::IntToString(status_code)); + status.append(" "); + status.append(net::GetHttpReasonPhrase(status_code)); + status.append("\0\0", 2); + net::HttpResponseHeaders* headers = new net::HttpResponseHeaders(status); + + if (status_code == net::HTTP_OK) { + std::string content_type_header(net::HttpRequestHeaders::kContentType); + content_type_header.append(": "); + content_type_header.append("plain/text"); + headers->AddHeader(content_type_header); + } + + response_info_.reset(new net::HttpResponseInfo()); + response_info_->headers = headers; + + headers_set_ = true; + + NotifyHeadersComplete(); +} + +void StreamURLRequestJob::ClearStream() { + if (stream_.get()) { + stream_->RemoveReadObserver(this); + stream_ = NULL; + } +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream_url_request_job.h b/chromium/content/browser/streams/stream_url_request_job.h new file mode 100644 index 00000000000..03187bf2399 --- /dev/null +++ b/chromium/content/browser/streams/stream_url_request_job.h @@ -0,0 +1,66 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_ + +#include "net/http/http_status_code.h" +#include "net/url_request/url_request_job.h" +#include "content/browser/streams/stream_read_observer.h" +#include "content/common/content_export.h" + +namespace content { + +class Stream; + +// A request job that handles reading stream URLs. +class CONTENT_EXPORT StreamURLRequestJob + : public net::URLRequestJob, + public StreamReadObserver { + public: + StreamURLRequestJob(net::URLRequest* request, + net::NetworkDelegate* network_delegate, + scoped_refptr<Stream> stream); + + // StreamObserver methods. + virtual void OnDataAvailable(Stream* stream) OVERRIDE; + + // net::URLRequestJob methods. + virtual void Start() OVERRIDE; + virtual void Kill() OVERRIDE; + virtual bool ReadRawData(net::IOBuffer* buf, + int buf_size, + int* bytes_read) OVERRIDE; + virtual bool GetMimeType(std::string* mime_type) const OVERRIDE; + virtual void GetResponseInfo(net::HttpResponseInfo* info) OVERRIDE; + virtual int GetResponseCode() const OVERRIDE; + virtual void SetExtraRequestHeaders( + const net::HttpRequestHeaders& headers) OVERRIDE; + + protected: + virtual ~StreamURLRequestJob(); + + private: + void DidStart(); + void NotifyFailure(int); + void HeadersCompleted(net::HttpStatusCode status_code); + void ClearStream(); + + base::WeakPtrFactory<StreamURLRequestJob> weak_factory_; + scoped_refptr<content::Stream> stream_; + bool headers_set_; + scoped_refptr<net::IOBuffer> pending_buffer_; + int pending_buffer_size_; + scoped_ptr<net::HttpResponseInfo> response_info_; + + int total_bytes_read_; + int max_range_; + bool request_failed_; + + DISALLOW_COPY_AND_ASSIGN(StreamURLRequestJob); +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_ diff --git a/chromium/content/browser/streams/stream_url_request_job_unittest.cc b/chromium/content/browser/streams/stream_url_request_job_unittest.cc new file mode 100644 index 00000000000..4bb1798b514 --- /dev/null +++ b/chromium/content/browser/streams/stream_url_request_job_unittest.cc @@ -0,0 +1,173 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop/message_loop.h" +#include "base/test/test_simple_task_runner.h" +#include "content/browser/streams/stream.h" +#include "content/browser/streams/stream_registry.h" +#include "content/browser/streams/stream_url_request_job.h" +#include "content/browser/streams/stream_write_observer.h" +#include "net/http/http_response_headers.h" +#include "net/url_request/url_request.h" +#include "net/url_request/url_request_context.h" +#include "net/url_request/url_request_job_factory_impl.h" +#include "net/url_request/url_request_test_util.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace content { + +namespace { + +const int kBufferSize = 1024; +const char kTestData1[] = "Hello"; +const char kTestData2[] = "Here it is data."; + +const GURL kStreamURL("blob://stream"); + +} // namespace + +class StreamURLRequestJobTest : public testing::Test { + public: + // A simple ProtocolHandler implementation to create StreamURLRequestJob. + class MockProtocolHandler : + public net::URLRequestJobFactory::ProtocolHandler { + public: + MockProtocolHandler(StreamRegistry* registry) : registry_(registry) {} + + // net::URLRequestJobFactory::ProtocolHandler override. + virtual net::URLRequestJob* MaybeCreateJob( + net::URLRequest* request, + net::NetworkDelegate* network_delegate) const OVERRIDE { + scoped_refptr<Stream> stream = registry_->GetStream(request->url()); + if (stream.get()) + return new StreamURLRequestJob(request, network_delegate, stream); + return NULL; + } + + private: + StreamRegistry* registry_; + }; + + StreamURLRequestJobTest() : message_loop_(base::MessageLoop::TYPE_IO) {} + + virtual void SetUp() { + registry_.reset(new StreamRegistry()); + + url_request_job_factory_.SetProtocolHandler( + "blob", new MockProtocolHandler(registry_.get())); + url_request_context_.set_job_factory(&url_request_job_factory_); + } + + virtual void TearDown() { + } + + void TestSuccessRequest(const GURL& url, + const std::string& expected_response) { + TestRequest("GET", url, net::HttpRequestHeaders(), 200, expected_response); + } + + void TestRequest(const std::string& method, + const GURL& url, + const net::HttpRequestHeaders& extra_headers, + int expected_status_code, + const std::string& expected_response) { + net::TestDelegate delegate; + request_.reset(url_request_context_.CreateRequest(url, &delegate)); + request_->set_method(method); + if (!extra_headers.IsEmpty()) + request_->SetExtraRequestHeaders(extra_headers); + request_->Start(); + + base::MessageLoop::current()->RunUntilIdle(); + + // Verify response. + EXPECT_TRUE(request_->status().is_success()); + EXPECT_EQ(expected_status_code, + request_->response_headers()->response_code()); + EXPECT_EQ(expected_response, delegate.data_received()); + } + + protected: + base::MessageLoop message_loop_; + scoped_ptr<StreamRegistry> registry_; + + net::URLRequestContext url_request_context_; + net::URLRequestJobFactoryImpl url_request_job_factory_; + scoped_ptr<net::URLRequest> request_; +}; + +TEST_F(StreamURLRequestJobTest, TestGetSimpleDataRequest) { + scoped_refptr<Stream> stream( + new Stream(registry_.get(), NULL, kStreamURL)); + + scoped_refptr<net::StringIOBuffer> buffer( + new net::StringIOBuffer(kTestData1)); + + stream->AddData(buffer, buffer->size()); + stream->Finalize(); + + TestSuccessRequest(kStreamURL, kTestData1); +} + +TEST_F(StreamURLRequestJobTest, TestGetLargeStreamRequest) { + scoped_refptr<Stream> stream( + new Stream(registry_.get(), NULL, kStreamURL)); + + std::string large_data; + large_data.reserve(kBufferSize * 5); + for (int i = 0; i < kBufferSize * 5; ++i) + large_data.append(1, static_cast<char>(i % 256)); + + scoped_refptr<net::StringIOBuffer> buffer( + new net::StringIOBuffer(large_data)); + + stream->AddData(buffer, buffer->size()); + stream->Finalize(); + TestSuccessRequest(kStreamURL, large_data); +} + +TEST_F(StreamURLRequestJobTest, TestGetNonExistentStreamRequest) { + net::TestDelegate delegate; + request_.reset(url_request_context_.CreateRequest(kStreamURL, &delegate)); + request_->set_method("GET"); + request_->Start(); + + base::MessageLoop::current()->RunUntilIdle(); + + // Verify response. + EXPECT_FALSE(request_->status().is_success()); +} + +TEST_F(StreamURLRequestJobTest, TestRangeDataRequest) { + scoped_refptr<Stream> stream( + new Stream(registry_.get(), NULL, kStreamURL)); + + scoped_refptr<net::StringIOBuffer> buffer( + new net::StringIOBuffer(kTestData2)); + + stream->AddData(buffer, buffer->size()); + stream->Finalize(); + + net::HttpRequestHeaders extra_headers; + extra_headers.SetHeader(net::HttpRequestHeaders::kRange, "bytes=0-3"); + TestRequest("GET", kStreamURL, extra_headers, + 200, std::string(kTestData2, 4)); +} + +TEST_F(StreamURLRequestJobTest, TestInvalidRangeDataRequest) { + scoped_refptr<Stream> stream( + new Stream(registry_.get(), NULL, kStreamURL)); + + scoped_refptr<net::StringIOBuffer> buffer( + new net::StringIOBuffer(kTestData2)); + + stream->AddData(buffer, buffer->size()); + stream->Finalize(); + + net::HttpRequestHeaders extra_headers; + extra_headers.SetHeader(net::HttpRequestHeaders::kRange, "bytes=1-3"); + TestRequest("GET", kStreamURL, extra_headers, 405, std::string()); +} + +} // namespace content diff --git a/chromium/content/browser/streams/stream_write_observer.h b/chromium/content/browser/streams/stream_write_observer.h new file mode 100644 index 00000000000..deab7ad009e --- /dev/null +++ b/chromium/content/browser/streams/stream_write_observer.h @@ -0,0 +1,27 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_ + +namespace content { + +class Stream; + +class StreamWriteObserver { + public: + // Sent when space becomes available in the stream, and the source should + // resume writing. + virtual void OnSpaceAvailable(Stream* stream) = 0; + + // Sent when the stream is closed, and the writer should stop sending data. + virtual void OnClose(Stream* stream) = 0; + + protected: + virtual ~StreamWriteObserver() {} +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_ |