summaryrefslogtreecommitdiff
path: root/chromium/content/browser/streams
diff options
context:
space:
mode:
authorZeno Albisser <zeno.albisser@digia.com>2013-08-15 21:46:11 +0200
committerZeno Albisser <zeno.albisser@digia.com>2013-08-15 21:46:11 +0200
commit679147eead574d186ebf3069647b4c23e8ccace6 (patch)
treefc247a0ac8ff119f7c8550879ebb6d3dd8d1ff69 /chromium/content/browser/streams
downloadqtwebengine-chromium-679147eead574d186ebf3069647b4c23e8ccace6.tar.gz
Initial import.
Diffstat (limited to 'chromium/content/browser/streams')
-rw-r--r--chromium/content/browser/streams/OWNERS1
-rw-r--r--chromium/content/browser/streams/stream.cc156
-rw-r--r--chromium/content/browser/streams/stream.h115
-rw-r--r--chromium/content/browser/streams/stream_context.cc57
-rw-r--r--chromium/content/browser/streams/stream_context.h60
-rw-r--r--chromium/content/browser/streams/stream_handle_impl.cc38
-rw-r--r--chromium/content/browser/streams/stream_handle_impl.h40
-rw-r--r--chromium/content/browser/streams/stream_read_observer.h26
-rw-r--r--chromium/content/browser/streams/stream_registry.cc48
-rw-r--r--chromium/content/browser/streams/stream_registry.h51
-rw-r--r--chromium/content/browser/streams/stream_unittest.cc253
-rw-r--r--chromium/content/browser/streams/stream_url_request_job.cc238
-rw-r--r--chromium/content/browser/streams/stream_url_request_job.h66
-rw-r--r--chromium/content/browser/streams/stream_url_request_job_unittest.cc173
-rw-r--r--chromium/content/browser/streams/stream_write_observer.h27
15 files changed, 1349 insertions, 0 deletions
diff --git a/chromium/content/browser/streams/OWNERS b/chromium/content/browser/streams/OWNERS
new file mode 100644
index 00000000000..8192dfb0eff
--- /dev/null
+++ b/chromium/content/browser/streams/OWNERS
@@ -0,0 +1 @@
+zork@chromium.org
diff --git a/chromium/content/browser/streams/stream.cc b/chromium/content/browser/streams/stream.cc
new file mode 100644
index 00000000000..6026df9e7c7
--- /dev/null
+++ b/chromium/content/browser/streams/stream.cc
@@ -0,0 +1,156 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream.h"
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "content/browser/streams/stream_handle_impl.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "net/base/io_buffer.h"
+
+namespace {
+// Start throttling the connection at about 1MB.
+const size_t kDeferSizeThreshold = 40 * 32768;
+}
+
+namespace content {
+
+Stream::Stream(StreamRegistry* registry,
+ StreamWriteObserver* write_observer,
+ const GURL& url)
+ : data_bytes_read_(0),
+ can_add_data_(true),
+ url_(url),
+ data_length_(0),
+ registry_(registry),
+ read_observer_(NULL),
+ write_observer_(write_observer),
+ stream_handle_(NULL),
+ weak_ptr_factory_(this) {
+ CreateByteStream(base::MessageLoopProxy::current(),
+ base::MessageLoopProxy::current(),
+ kDeferSizeThreshold,
+ &writer_,
+ &reader_);
+
+ // Setup callback for writing.
+ writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
+ weak_ptr_factory_.GetWeakPtr()));
+ reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ registry_->RegisterStream(this);
+}
+
+Stream::~Stream() {
+}
+
+bool Stream::SetReadObserver(StreamReadObserver* observer) {
+ if (read_observer_)
+ return false;
+ read_observer_ = observer;
+ return true;
+}
+
+void Stream::RemoveReadObserver(StreamReadObserver* observer) {
+ DCHECK(observer == read_observer_);
+ read_observer_ = NULL;
+}
+
+void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
+ DCHECK(observer == write_observer_);
+ write_observer_ = NULL;
+}
+
+void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
+ can_add_data_ = writer_->Write(buffer, size);
+}
+
+void Stream::AddData(const char* data, size_t size) {
+ scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
+ memcpy(io_buffer->data(), data, size);
+ can_add_data_ = writer_->Write(io_buffer, size);
+}
+
+void Stream::Finalize() {
+ writer_->Close(0);
+ writer_.reset(NULL);
+
+ // Continue asynchronously.
+ base::MessageLoopProxy::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
+}
+
+Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
+ int buf_size,
+ int* bytes_read) {
+ DCHECK(buf);
+ DCHECK(bytes_read);
+
+ *bytes_read = 0;
+ if (!data_.get()) {
+ data_length_ = 0;
+ data_bytes_read_ = 0;
+ ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
+ switch (state) {
+ case ByteStreamReader::STREAM_HAS_DATA:
+ break;
+ case ByteStreamReader::STREAM_COMPLETE:
+ registry_->UnregisterStream(url());
+ return STREAM_COMPLETE;
+ case ByteStreamReader::STREAM_EMPTY:
+ return STREAM_EMPTY;
+ }
+ }
+
+ const size_t remaining_bytes = data_length_ - data_bytes_read_;
+ size_t to_read =
+ static_cast<size_t>(buf_size) < remaining_bytes ?
+ buf_size : remaining_bytes;
+ memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
+ data_bytes_read_ += to_read;
+ if (data_bytes_read_ >= data_length_)
+ data_ = NULL;
+
+ *bytes_read = to_read;
+ return STREAM_HAS_DATA;
+}
+
+scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
+ const std::string& mime_type) {
+ CHECK(!stream_handle_);
+ stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
+ original_url,
+ mime_type);
+ return scoped_ptr<StreamHandle>(stream_handle_).Pass();
+}
+
+void Stream::CloseHandle() {
+ // Prevent deletion until this function ends.
+ scoped_refptr<Stream> ref(this);
+
+ CHECK(stream_handle_);
+ stream_handle_ = NULL;
+ registry_->UnregisterStream(url());
+ if (write_observer_)
+ write_observer_->OnClose(this);
+}
+
+void Stream::OnSpaceAvailable() {
+ can_add_data_ = true;
+ if (write_observer_)
+ write_observer_->OnSpaceAvailable(this);
+}
+
+void Stream::OnDataAvailable() {
+ if (read_observer_)
+ read_observer_->OnDataAvailable(this);
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream.h b/chromium/content/browser/streams/stream.h
new file mode 100644
index 00000000000..85edc884081
--- /dev/null
+++ b/chromium/content/browser/streams/stream.h
@@ -0,0 +1,115 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_H_
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "content/browser/byte_stream.h"
+#include "content/common/content_export.h"
+#include "url/gurl.h"
+
+namespace net {
+class IOBuffer;
+}
+
+namespace content {
+
+class StreamHandle;
+class StreamHandleImpl;
+class StreamReadObserver;
+class StreamRegistry;
+class StreamWriteObserver;
+
+// A stream that sends data from an arbitrary source to an internal URL
+// that can be read by an internal consumer. It will continue to pull from the
+// original URL as long as there is data available. It can be read from
+// multiple clients, but only one can be reading at a time. This allows a
+// reader to consume part of the stream, then pass it along to another client
+// to continue processing the stream.
+class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
+ public:
+ enum StreamState {
+ STREAM_HAS_DATA,
+ STREAM_COMPLETE,
+ STREAM_EMPTY,
+ };
+
+ // Creates a stream.
+ //
+ // Security origin of Streams is checked in Blink (See BlobRegistry,
+ // BlobURL and SecurityOrigin to understand how it works). There's no security
+ // origin check in Chromium side for now.
+ Stream(StreamRegistry* registry,
+ StreamWriteObserver* write_observer,
+ const GURL& url);
+
+ // Sets the reader of this stream. Returns true on success, or false if there
+ // is already a reader.
+ bool SetReadObserver(StreamReadObserver* observer);
+
+ // Removes the read observer. |observer| must be the current observer.
+ void RemoveReadObserver(StreamReadObserver* observer);
+
+ // Removes the write observer. |observer| must be the current observer.
+ void RemoveWriteObserver(StreamWriteObserver* observer);
+
+ // Adds the data in |buffer| to the stream. Takes ownership of |buffer|.
+ void AddData(scoped_refptr<net::IOBuffer> buffer, size_t size);
+ // Adds data of |size| at |data| to the stream. This method creates a copy
+ // of the data, and then passes it to |writer_|.
+ void AddData(const char* data, size_t size);
+
+ // Notifies this stream that it will not be receiving any more data.
+ void Finalize();
+
+ // Reads a maximum of |buf_size| from the stream into |buf|. Sets
+ // |*bytes_read| to the number of bytes actually read.
+ // Returns STREAM_HAS_DATA if data was read, STREAM_EMPTY if no data was read,
+ // and STREAM_COMPLETE if the stream is finalized and all data has been read.
+ StreamState ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read);
+
+ scoped_ptr<StreamHandle> CreateHandle(const GURL& original_url,
+ const std::string& mime_type);
+ void CloseHandle();
+
+ // Indicates whether there is space in the buffer to add more data.
+ bool can_add_data() const { return can_add_data_; }
+
+ const GURL& url() const { return url_; }
+
+ private:
+ friend class base::RefCountedThreadSafe<Stream>;
+
+ virtual ~Stream();
+
+ void OnSpaceAvailable();
+ void OnDataAvailable();
+
+ size_t data_bytes_read_;
+ bool can_add_data_;
+
+ GURL url_;
+
+ scoped_refptr<net::IOBuffer> data_;
+ size_t data_length_;
+
+ scoped_ptr<ByteStreamWriter> writer_;
+ scoped_ptr<ByteStreamReader> reader_;
+
+ StreamRegistry* registry_;
+ StreamReadObserver* read_observer_;
+ StreamWriteObserver* write_observer_;
+
+ StreamHandleImpl* stream_handle_;
+
+ base::WeakPtrFactory<Stream> weak_ptr_factory_;
+ DISALLOW_COPY_AND_ASSIGN(Stream);
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_H_
diff --git a/chromium/content/browser/streams/stream_context.cc b/chromium/content/browser/streams/stream_context.cc
new file mode 100644
index 00000000000..44ca0f4a1cd
--- /dev/null
+++ b/chromium/content/browser/streams/stream_context.cc
@@ -0,0 +1,57 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_context.h"
+
+#include "base/bind.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/public/browser/browser_context.h"
+
+using base::UserDataAdapter;
+
+namespace {
+const char* kStreamContextKeyName = "content_stream_context";
+}
+
+namespace content {
+
+StreamContext::StreamContext() {}
+
+StreamContext* StreamContext::GetFor(BrowserContext* context) {
+ if (!context->GetUserData(kStreamContextKeyName)) {
+ scoped_refptr<StreamContext> stream = new StreamContext();
+ context->SetUserData(kStreamContextKeyName,
+ new UserDataAdapter<StreamContext>(stream.get()));
+ // Check first to avoid memory leak in unittests.
+ if (BrowserThread::IsMessageLoopValid(BrowserThread::IO)) {
+ BrowserThread::PostTask(
+ BrowserThread::IO, FROM_HERE,
+ base::Bind(&StreamContext::InitializeOnIOThread, stream));
+ }
+ }
+
+ return UserDataAdapter<StreamContext>::Get(context, kStreamContextKeyName);
+}
+
+void StreamContext::InitializeOnIOThread() {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
+ registry_.reset(new StreamRegistry());
+}
+
+StreamContext::~StreamContext() {}
+
+void StreamContext::DeleteOnCorrectThread() const {
+ // In many tests, there isn't a valid IO thread. In that case, just delete on
+ // the current thread.
+ // TODO(zork): Remove this custom deleter, and fix the leaks in all the
+ // tests.
+ if (BrowserThread::IsMessageLoopValid(BrowserThread::IO) &&
+ !BrowserThread::CurrentlyOn(BrowserThread::IO)) {
+ BrowserThread::DeleteSoon(BrowserThread::IO, FROM_HERE, this);
+ return;
+ }
+ delete this;
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream_context.h b/chromium/content/browser/streams/stream_context.h
new file mode 100644
index 00000000000..fb6cdc720ff
--- /dev/null
+++ b/chromium/content/browser/streams/stream_context.h
@@ -0,0 +1,60 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/sequenced_task_runner_helpers.h"
+#include "content/common/content_export.h"
+#include "content/public/browser/browser_thread.h"
+
+namespace content {
+class BrowserContext;
+class StreamRegistry;
+struct StreamContextDeleter;
+
+// A context class that keeps track of StreamRegistry used by the chrome.
+// There is an instance associated with each BrowserContext. There could be
+// multiple URLRequestContexts in the same browser context that refers to the
+// same instance.
+//
+// All methods, except the ctor, are expected to be called on
+// the IO thread (unless specifically called out in doc comments).
+class StreamContext
+ : public base::RefCountedThreadSafe<StreamContext,
+ StreamContextDeleter> {
+ public:
+ StreamContext();
+
+ CONTENT_EXPORT static StreamContext* GetFor(BrowserContext* browser_context);
+
+ void InitializeOnIOThread();
+
+ StreamRegistry* registry() const { return registry_.get(); }
+
+ protected:
+ virtual ~StreamContext();
+
+ private:
+ friend class base::DeleteHelper<StreamContext>;
+ friend class base::RefCountedThreadSafe<StreamContext,
+ StreamContextDeleter>;
+ friend struct StreamContextDeleter;
+
+ void DeleteOnCorrectThread() const;
+
+ scoped_ptr<StreamRegistry> registry_;
+};
+
+struct StreamContextDeleter {
+ static void Destruct(const StreamContext* context) {
+ context->DeleteOnCorrectThread();
+ }
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_
diff --git a/chromium/content/browser/streams/stream_handle_impl.cc b/chromium/content/browser/streams/stream_handle_impl.cc
new file mode 100644
index 00000000000..d9f877cbc76
--- /dev/null
+++ b/chromium/content/browser/streams/stream_handle_impl.cc
@@ -0,0 +1,38 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_handle_impl.h"
+
+#include "content/browser/streams/stream.h"
+#include "content/public/browser/browser_thread.h"
+
+namespace content {
+
+StreamHandleImpl::StreamHandleImpl(const base::WeakPtr<Stream>& stream,
+ const GURL& original_url,
+ const std::string& mime_type)
+ : stream_(stream),
+ url_(stream->url()),
+ original_url_(original_url),
+ mime_type_(mime_type),
+ stream_message_loop_(base::MessageLoopProxy::current().get()) {}
+
+StreamHandleImpl::~StreamHandleImpl() {
+ stream_message_loop_->PostTask(FROM_HERE,
+ base::Bind(&Stream::CloseHandle, stream_));
+}
+
+const GURL& StreamHandleImpl::GetURL() {
+ return url_;
+}
+
+const GURL& StreamHandleImpl::GetOriginalURL() {
+ return original_url_;
+}
+
+const std::string& StreamHandleImpl::GetMimeType() {
+ return mime_type_;
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream_handle_impl.h b/chromium/content/browser/streams/stream_handle_impl.h
new file mode 100644
index 00000000000..aabd0cb7dfe
--- /dev/null
+++ b/chromium/content/browser/streams/stream_handle_impl.h
@@ -0,0 +1,40 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_HANDLE_IMPL_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_HANDLE_IMPL_H_
+
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "base/synchronization/lock.h"
+#include "content/public/browser/stream_handle.h"
+
+namespace content {
+
+class Stream;
+
+class StreamHandleImpl : public StreamHandle {
+ public:
+ StreamHandleImpl(const base::WeakPtr<Stream>& stream,
+ const GURL& original_url,
+ const std::string& mime_type);
+ virtual ~StreamHandleImpl();
+
+ private:
+ // StreamHandle overrides
+ virtual const GURL& GetURL() OVERRIDE;
+ virtual const GURL& GetOriginalURL() OVERRIDE;
+ virtual const std::string& GetMimeType() OVERRIDE;
+
+ base::WeakPtr<Stream> stream_;
+ GURL url_;
+ GURL original_url_;
+ std::string mime_type_;
+ base::MessageLoopProxy* stream_message_loop_;
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_HANDLE_IMPL_H_
+
diff --git a/chromium/content/browser/streams/stream_read_observer.h b/chromium/content/browser/streams/stream_read_observer.h
new file mode 100644
index 00000000000..08fd657ac01
--- /dev/null
+++ b/chromium/content/browser/streams/stream_read_observer.h
@@ -0,0 +1,26 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+
+#include "content/common/content_export.h"
+
+namespace content {
+
+class Stream;
+
+class CONTENT_EXPORT StreamReadObserver {
+ public:
+ // Sent when there is data available to be read from the stream.
+ virtual void OnDataAvailable(Stream* stream) = 0;
+
+ protected:
+ virtual ~StreamReadObserver() {}
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+
diff --git a/chromium/content/browser/streams/stream_registry.cc b/chromium/content/browser/streams/stream_registry.cc
new file mode 100644
index 00000000000..39d24b39f5f
--- /dev/null
+++ b/chromium/content/browser/streams/stream_registry.cc
@@ -0,0 +1,48 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_registry.h"
+
+#include "content/browser/streams/stream.h"
+
+namespace content {
+
+StreamRegistry::StreamRegistry() {
+}
+
+StreamRegistry::~StreamRegistry() {
+}
+
+void StreamRegistry::RegisterStream(scoped_refptr<Stream> stream) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(stream.get());
+ DCHECK(!stream->url().is_empty());
+ streams_[stream->url()] = stream;
+}
+
+scoped_refptr<Stream> StreamRegistry::GetStream(const GURL& url) {
+ DCHECK(CalledOnValidThread());
+ StreamMap::const_iterator stream = streams_.find(url);
+ if (stream != streams_.end())
+ return stream->second;
+
+ return NULL;
+}
+
+bool StreamRegistry::CloneStream(const GURL& url, const GURL& src_url) {
+ DCHECK(CalledOnValidThread());
+ scoped_refptr<Stream> stream(GetStream(src_url));
+ if (stream.get()) {
+ streams_[url] = stream;
+ return true;
+ }
+ return false;
+}
+
+void StreamRegistry::UnregisterStream(const GURL& url) {
+ DCHECK(CalledOnValidThread());
+ streams_.erase(url);
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream_registry.h b/chromium/content/browser/streams/stream_registry.h
new file mode 100644
index 00000000000..e75c97c1699
--- /dev/null
+++ b/chromium/content/browser/streams/stream_registry.h
@@ -0,0 +1,51 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/threading/non_thread_safe.h"
+#include "content/common/content_export.h"
+#include "url/gurl.h"
+
+namespace content {
+
+class Stream;
+
+// Maintains a mapping of blob: URLs to active streams.
+class CONTENT_EXPORT StreamRegistry : public base::NonThreadSafe {
+ public:
+ StreamRegistry();
+ virtual ~StreamRegistry();
+
+ // Registers a stream, and sets its URL.
+ void RegisterStream(scoped_refptr<Stream> stream);
+
+ // Clones a stream. Returns true on success, or false if |src_url| doesn't
+ // exist.
+ bool CloneStream(const GURL& url, const GURL& src_url);
+
+ void UnregisterStream(const GURL& url);
+
+ // Gets the stream associated with |url|. Returns NULL if there is no such
+ // stream.
+ scoped_refptr<Stream> GetStream(const GURL& url);
+
+ private:
+ typedef std::map<GURL, scoped_refptr<Stream> > StreamMap;
+
+ StreamMap streams_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamRegistry);
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
+
+
diff --git a/chromium/content/browser/streams/stream_unittest.cc b/chromium/content/browser/streams/stream_unittest.cc
new file mode 100644
index 00000000000..c0077b7375b
--- /dev/null
+++ b/chromium/content/browser/streams/stream_unittest.cc
@@ -0,0 +1,253 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop/message_loop.h"
+#include "base/test/test_simple_task_runner.h"
+#include "content/browser/streams/stream.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace content {
+
+class StreamTest : public testing::Test {
+ public:
+ StreamTest() : producing_seed_key_(0) {}
+
+ virtual void SetUp() OVERRIDE {
+ registry_.reset(new StreamRegistry());
+ }
+
+ // Create a new IO buffer of the given |buffer_size| and fill it with random
+ // data.
+ scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) {
+ scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
+ char *bufferp = buffer->data();
+ for (size_t i = 0; i < buffer_size; i++)
+ bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
+ ++producing_seed_key_;
+ return buffer;
+ }
+
+ protected:
+ base::MessageLoop message_loop_;
+ scoped_ptr<StreamRegistry> registry_;
+
+ private:
+ int producing_seed_key_;
+};
+
+class TestStreamReader : public StreamReadObserver {
+ public:
+ TestStreamReader() : buffer_(new net::GrowableIOBuffer()), completed_(false) {
+ }
+ virtual ~TestStreamReader() {}
+
+ void Read(Stream* stream) {
+ const size_t kBufferSize = 32768;
+ scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));
+
+ int bytes_read = 0;
+ while (true) {
+ Stream::StreamState state =
+ stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read);
+ switch (state) {
+ case Stream::STREAM_HAS_DATA:
+ // TODO(tyoshino): Move these expectations to the beginning of Read()
+ // method once Stream::Finalize() is fixed.
+ EXPECT_FALSE(completed_);
+ break;
+ case Stream::STREAM_COMPLETE:
+ completed_ = true;
+ return;
+ case Stream::STREAM_EMPTY:
+ EXPECT_FALSE(completed_);
+ return;
+ }
+ size_t old_capacity = buffer_->capacity();
+ buffer_->SetCapacity(old_capacity + bytes_read);
+ memcpy(buffer_->StartOfBuffer() + old_capacity,
+ buffer->data(), bytes_read);
+ }
+ }
+
+ virtual void OnDataAvailable(Stream* stream) OVERRIDE {
+ Read(stream);
+ }
+
+ scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; }
+
+ bool completed() const {
+ return completed_;
+ }
+
+ private:
+ scoped_refptr<net::GrowableIOBuffer> buffer_;
+ bool completed_;
+};
+
+class TestStreamWriter : public StreamWriteObserver {
+ public:
+ TestStreamWriter() {}
+ virtual ~TestStreamWriter() {}
+
+ void Write(Stream* stream,
+ scoped_refptr<net::IOBuffer> buffer,
+ size_t buffer_size) {
+ stream->AddData(buffer, buffer_size);
+ }
+
+ virtual void OnSpaceAvailable(Stream* stream) OVERRIDE {
+ }
+
+ virtual void OnClose(Stream* stream) OVERRIDE {
+ }
+};
+
+TEST_F(StreamTest, SetReadObserver) {
+ TestStreamReader reader;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+}
+
+TEST_F(StreamTest, SetReadObserver_SecondFails) {
+ TestStreamReader reader1;
+ TestStreamReader reader2;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader1));
+ EXPECT_FALSE(stream->SetReadObserver(&reader2));
+}
+
+TEST_F(StreamTest, SetReadObserver_TwoReaders) {
+ TestStreamReader reader1;
+ TestStreamReader reader2;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader1));
+
+ // Once the first read observer is removed, a new one can be added.
+ stream->RemoveReadObserver(&reader1);
+ EXPECT_TRUE(stream->SetReadObserver(&reader2));
+}
+
+TEST_F(StreamTest, Stream) {
+ TestStreamReader reader;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+
+ const int kBufferSize = 1000000;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ writer.Write(stream.get(), buffer, kBufferSize);
+ stream->Finalize();
+ base::MessageLoop::current()->RunUntilIdle();
+ EXPECT_TRUE(reader.completed());
+
+ ASSERT_EQ(reader.buffer()->capacity(), kBufferSize);
+ for (int i = 0; i < kBufferSize; i++)
+ EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]);
+}
+
+// Test that even if a reader receives an empty buffer, once TransferData()
+// method is called on it with |source_complete| = true, following Read() calls
+// on it never returns STREAM_EMPTY. Together with StreamTest.Stream above, this
+// guarantees that Reader::Read() call returns only STREAM_HAS_DATA
+// or STREAM_COMPLETE in |data_available_callback_| call corresponding to
+// Writer::Close().
+TEST_F(StreamTest, ClosedReaderDoesNotReturnStreamEmpty) {
+ TestStreamReader reader;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+
+ const int kBufferSize = 0;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ stream->AddData(buffer, kBufferSize);
+ stream->Finalize();
+ base::MessageLoop::current()->RunUntilIdle();
+ EXPECT_TRUE(reader.completed());
+ EXPECT_EQ(0, reader.buffer()->capacity());
+}
+
+TEST_F(StreamTest, GetStream) {
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, url));
+
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url);
+ ASSERT_EQ(stream1, stream2);
+}
+
+TEST_F(StreamTest, GetStream_Missing) {
+ TestStreamWriter writer;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, url1));
+
+ GURL url2("blob://stream2");
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
+ ASSERT_FALSE(stream2.get());
+}
+
+TEST_F(StreamTest, CloneStream) {
+ TestStreamWriter writer;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, url1));
+
+ GURL url2("blob://stream2");
+ ASSERT_TRUE(registry_->CloneStream(url2, url1));
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
+ ASSERT_EQ(stream1, stream2);
+}
+
+TEST_F(StreamTest, CloneStream_Missing) {
+ TestStreamWriter writer;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, url1));
+
+ GURL url2("blob://stream2");
+ GURL url3("blob://stream3");
+ ASSERT_FALSE(registry_->CloneStream(url2, url3));
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
+ ASSERT_FALSE(stream2.get());
+}
+
+TEST_F(StreamTest, UnregisterStream) {
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, url));
+
+ registry_->UnregisterStream(url);
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url);
+ ASSERT_FALSE(stream2.get());
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream_url_request_job.cc b/chromium/content/browser/streams/stream_url_request_job.cc
new file mode 100644
index 00000000000..0965178de2c
--- /dev/null
+++ b/chromium/content/browser/streams/stream_url_request_job.cc
@@ -0,0 +1,238 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_url_request_job.h"
+
+#include "base/strings/string_number_conversions.h"
+#include "content/browser/streams/stream.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/http/http_byte_range.h"
+#include "net/http/http_response_headers.h"
+#include "net/http/http_response_info.h"
+#include "net/http/http_util.h"
+#include "net/url_request/url_request.h"
+
+namespace content {
+
+StreamURLRequestJob::StreamURLRequestJob(
+ net::URLRequest* request,
+ net::NetworkDelegate* network_delegate,
+ scoped_refptr<Stream> stream)
+ : net::URLRequestJob(request, network_delegate),
+ weak_factory_(this),
+ stream_(stream),
+ headers_set_(false),
+ pending_buffer_size_(0),
+ total_bytes_read_(0),
+ max_range_(0),
+ request_failed_(false) {
+ DCHECK(stream_.get());
+ stream_->SetReadObserver(this);
+}
+
+StreamURLRequestJob::~StreamURLRequestJob() {
+ ClearStream();
+}
+
+void StreamURLRequestJob::OnDataAvailable(Stream* stream) {
+ // Clear the IO_PENDING status.
+ SetStatus(net::URLRequestStatus());
+ // Do nothing if pending_buffer_ is empty, i.e. there's no ReadRawData()
+ // operation waiting for IO completion.
+ if (!pending_buffer_.get())
+ return;
+
+ // pending_buffer_ is set to the IOBuffer instance provided to ReadRawData()
+ // by URLRequestJob.
+
+ int bytes_read;
+ switch (stream_->ReadRawData(
+ pending_buffer_.get(), pending_buffer_size_, &bytes_read)) {
+ case Stream::STREAM_HAS_DATA:
+ DCHECK_GT(bytes_read, 0);
+ break;
+ case Stream::STREAM_COMPLETE:
+ // Ensure this. Calling NotifyReadComplete call with 0 signals
+ // completion.
+ bytes_read = 0;
+ break;
+ case Stream::STREAM_EMPTY:
+ NOTREACHED();
+ break;
+ }
+
+ // Clear the buffers before notifying the read is complete, so that it is
+ // safe for the observer to read.
+ pending_buffer_ = NULL;
+ pending_buffer_size_ = 0;
+
+ total_bytes_read_ += bytes_read;
+ NotifyReadComplete(bytes_read);
+}
+
+// net::URLRequestJob methods.
+void StreamURLRequestJob::Start() {
+ // Continue asynchronously.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&StreamURLRequestJob::DidStart, weak_factory_.GetWeakPtr()));
+}
+
+void StreamURLRequestJob::Kill() {
+ net::URLRequestJob::Kill();
+ weak_factory_.InvalidateWeakPtrs();
+ ClearStream();
+}
+
+bool StreamURLRequestJob::ReadRawData(net::IOBuffer* buf,
+ int buf_size,
+ int* bytes_read) {
+ if (request_failed_)
+ return true;
+
+ DCHECK(buf);
+ DCHECK(bytes_read);
+ int to_read = buf_size;
+ if (max_range_ && to_read) {
+ if (to_read + total_bytes_read_ > max_range_)
+ to_read = max_range_ - total_bytes_read_;
+
+ if (to_read <= 0) {
+ *bytes_read = 0;
+ return true;
+ }
+ }
+
+ switch (stream_->ReadRawData(buf, to_read, bytes_read)) {
+ case Stream::STREAM_HAS_DATA:
+ case Stream::STREAM_COMPLETE:
+ total_bytes_read_ += *bytes_read;
+ return true;
+ case Stream::STREAM_EMPTY:
+ pending_buffer_ = buf;
+ pending_buffer_size_ = to_read;
+ SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0));
+ return false;
+ }
+ NOTREACHED();
+ return false;
+}
+
+bool StreamURLRequestJob::GetMimeType(std::string* mime_type) const {
+ if (!response_info_)
+ return false;
+
+ // TODO(zork): Support registered MIME types if needed.
+ return response_info_->headers->GetMimeType(mime_type);
+}
+
+void StreamURLRequestJob::GetResponseInfo(net::HttpResponseInfo* info) {
+ if (response_info_)
+ *info = *response_info_;
+}
+
+int StreamURLRequestJob::GetResponseCode() const {
+ if (!response_info_)
+ return -1;
+
+ return response_info_->headers->response_code();
+}
+
+void StreamURLRequestJob::SetExtraRequestHeaders(
+ const net::HttpRequestHeaders& headers) {
+ std::string range_header;
+ if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
+ std::vector<net::HttpByteRange> ranges;
+ if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
+ if (ranges.size() == 1) {
+ // Streams don't support seeking, so a non-zero starting position
+ // doesn't make sense.
+ if (ranges[0].first_byte_position() == 0) {
+ max_range_ = ranges[0].last_byte_position() + 1;
+ } else {
+ NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED);
+ return;
+ }
+ } else {
+ NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED);
+ return;
+ }
+ }
+ }
+}
+
+void StreamURLRequestJob::DidStart() {
+ // We only support GET request.
+ if (request()->method() != "GET") {
+ NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED);
+ return;
+ }
+
+ HeadersCompleted(net::HTTP_OK);
+}
+
+void StreamURLRequestJob::NotifyFailure(int error_code) {
+ request_failed_ = true;
+
+ // If we already return the headers on success, we can't change the headers
+ // now. Instead, we just error out.
+ if (headers_set_) {
+ NotifyDone(net::URLRequestStatus(net::URLRequestStatus::FAILED,
+ error_code));
+ return;
+ }
+
+ // TODO(zork): Share these with BlobURLRequestJob.
+ net::HttpStatusCode status_code = net::HTTP_INTERNAL_SERVER_ERROR;
+ switch (error_code) {
+ case net::ERR_ACCESS_DENIED:
+ status_code = net::HTTP_FORBIDDEN;
+ break;
+ case net::ERR_FILE_NOT_FOUND:
+ status_code = net::HTTP_NOT_FOUND;
+ break;
+ case net::ERR_METHOD_NOT_SUPPORTED:
+ status_code = net::HTTP_METHOD_NOT_ALLOWED;
+ break;
+ case net::ERR_FAILED:
+ break;
+ default:
+ DCHECK(false);
+ break;
+ }
+ HeadersCompleted(status_code);
+}
+
+void StreamURLRequestJob::HeadersCompleted(net::HttpStatusCode status_code) {
+ std::string status("HTTP/1.1 ");
+ status.append(base::IntToString(status_code));
+ status.append(" ");
+ status.append(net::GetHttpReasonPhrase(status_code));
+ status.append("\0\0", 2);
+ net::HttpResponseHeaders* headers = new net::HttpResponseHeaders(status);
+
+ if (status_code == net::HTTP_OK) {
+ std::string content_type_header(net::HttpRequestHeaders::kContentType);
+ content_type_header.append(": ");
+ content_type_header.append("plain/text");
+ headers->AddHeader(content_type_header);
+ }
+
+ response_info_.reset(new net::HttpResponseInfo());
+ response_info_->headers = headers;
+
+ headers_set_ = true;
+
+ NotifyHeadersComplete();
+}
+
+void StreamURLRequestJob::ClearStream() {
+ if (stream_.get()) {
+ stream_->RemoveReadObserver(this);
+ stream_ = NULL;
+ }
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream_url_request_job.h b/chromium/content/browser/streams/stream_url_request_job.h
new file mode 100644
index 00000000000..03187bf2399
--- /dev/null
+++ b/chromium/content/browser/streams/stream_url_request_job.h
@@ -0,0 +1,66 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_
+
+#include "net/http/http_status_code.h"
+#include "net/url_request/url_request_job.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/common/content_export.h"
+
+namespace content {
+
+class Stream;
+
+// A request job that handles reading stream URLs.
+class CONTENT_EXPORT StreamURLRequestJob
+ : public net::URLRequestJob,
+ public StreamReadObserver {
+ public:
+ StreamURLRequestJob(net::URLRequest* request,
+ net::NetworkDelegate* network_delegate,
+ scoped_refptr<Stream> stream);
+
+ // StreamObserver methods.
+ virtual void OnDataAvailable(Stream* stream) OVERRIDE;
+
+ // net::URLRequestJob methods.
+ virtual void Start() OVERRIDE;
+ virtual void Kill() OVERRIDE;
+ virtual bool ReadRawData(net::IOBuffer* buf,
+ int buf_size,
+ int* bytes_read) OVERRIDE;
+ virtual bool GetMimeType(std::string* mime_type) const OVERRIDE;
+ virtual void GetResponseInfo(net::HttpResponseInfo* info) OVERRIDE;
+ virtual int GetResponseCode() const OVERRIDE;
+ virtual void SetExtraRequestHeaders(
+ const net::HttpRequestHeaders& headers) OVERRIDE;
+
+ protected:
+ virtual ~StreamURLRequestJob();
+
+ private:
+ void DidStart();
+ void NotifyFailure(int);
+ void HeadersCompleted(net::HttpStatusCode status_code);
+ void ClearStream();
+
+ base::WeakPtrFactory<StreamURLRequestJob> weak_factory_;
+ scoped_refptr<content::Stream> stream_;
+ bool headers_set_;
+ scoped_refptr<net::IOBuffer> pending_buffer_;
+ int pending_buffer_size_;
+ scoped_ptr<net::HttpResponseInfo> response_info_;
+
+ int total_bytes_read_;
+ int max_range_;
+ bool request_failed_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamURLRequestJob);
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_
diff --git a/chromium/content/browser/streams/stream_url_request_job_unittest.cc b/chromium/content/browser/streams/stream_url_request_job_unittest.cc
new file mode 100644
index 00000000000..4bb1798b514
--- /dev/null
+++ b/chromium/content/browser/streams/stream_url_request_job_unittest.cc
@@ -0,0 +1,173 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop/message_loop.h"
+#include "base/test/test_simple_task_runner.h"
+#include "content/browser/streams/stream.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_url_request_job.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "net/http/http_response_headers.h"
+#include "net/url_request/url_request.h"
+#include "net/url_request/url_request_context.h"
+#include "net/url_request/url_request_job_factory_impl.h"
+#include "net/url_request/url_request_test_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace content {
+
+namespace {
+
+const int kBufferSize = 1024;
+const char kTestData1[] = "Hello";
+const char kTestData2[] = "Here it is data.";
+
+const GURL kStreamURL("blob://stream");
+
+} // namespace
+
+class StreamURLRequestJobTest : public testing::Test {
+ public:
+ // A simple ProtocolHandler implementation to create StreamURLRequestJob.
+ class MockProtocolHandler :
+ public net::URLRequestJobFactory::ProtocolHandler {
+ public:
+ MockProtocolHandler(StreamRegistry* registry) : registry_(registry) {}
+
+ // net::URLRequestJobFactory::ProtocolHandler override.
+ virtual net::URLRequestJob* MaybeCreateJob(
+ net::URLRequest* request,
+ net::NetworkDelegate* network_delegate) const OVERRIDE {
+ scoped_refptr<Stream> stream = registry_->GetStream(request->url());
+ if (stream.get())
+ return new StreamURLRequestJob(request, network_delegate, stream);
+ return NULL;
+ }
+
+ private:
+ StreamRegistry* registry_;
+ };
+
+ StreamURLRequestJobTest() : message_loop_(base::MessageLoop::TYPE_IO) {}
+
+ virtual void SetUp() {
+ registry_.reset(new StreamRegistry());
+
+ url_request_job_factory_.SetProtocolHandler(
+ "blob", new MockProtocolHandler(registry_.get()));
+ url_request_context_.set_job_factory(&url_request_job_factory_);
+ }
+
+ virtual void TearDown() {
+ }
+
+ void TestSuccessRequest(const GURL& url,
+ const std::string& expected_response) {
+ TestRequest("GET", url, net::HttpRequestHeaders(), 200, expected_response);
+ }
+
+ void TestRequest(const std::string& method,
+ const GURL& url,
+ const net::HttpRequestHeaders& extra_headers,
+ int expected_status_code,
+ const std::string& expected_response) {
+ net::TestDelegate delegate;
+ request_.reset(url_request_context_.CreateRequest(url, &delegate));
+ request_->set_method(method);
+ if (!extra_headers.IsEmpty())
+ request_->SetExtraRequestHeaders(extra_headers);
+ request_->Start();
+
+ base::MessageLoop::current()->RunUntilIdle();
+
+ // Verify response.
+ EXPECT_TRUE(request_->status().is_success());
+ EXPECT_EQ(expected_status_code,
+ request_->response_headers()->response_code());
+ EXPECT_EQ(expected_response, delegate.data_received());
+ }
+
+ protected:
+ base::MessageLoop message_loop_;
+ scoped_ptr<StreamRegistry> registry_;
+
+ net::URLRequestContext url_request_context_;
+ net::URLRequestJobFactoryImpl url_request_job_factory_;
+ scoped_ptr<net::URLRequest> request_;
+};
+
+TEST_F(StreamURLRequestJobTest, TestGetSimpleDataRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, kStreamURL));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(kTestData1));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+
+ TestSuccessRequest(kStreamURL, kTestData1);
+}
+
+TEST_F(StreamURLRequestJobTest, TestGetLargeStreamRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, kStreamURL));
+
+ std::string large_data;
+ large_data.reserve(kBufferSize * 5);
+ for (int i = 0; i < kBufferSize * 5; ++i)
+ large_data.append(1, static_cast<char>(i % 256));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(large_data));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+ TestSuccessRequest(kStreamURL, large_data);
+}
+
+TEST_F(StreamURLRequestJobTest, TestGetNonExistentStreamRequest) {
+ net::TestDelegate delegate;
+ request_.reset(url_request_context_.CreateRequest(kStreamURL, &delegate));
+ request_->set_method("GET");
+ request_->Start();
+
+ base::MessageLoop::current()->RunUntilIdle();
+
+ // Verify response.
+ EXPECT_FALSE(request_->status().is_success());
+}
+
+TEST_F(StreamURLRequestJobTest, TestRangeDataRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, kStreamURL));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(kTestData2));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+
+ net::HttpRequestHeaders extra_headers;
+ extra_headers.SetHeader(net::HttpRequestHeaders::kRange, "bytes=0-3");
+ TestRequest("GET", kStreamURL, extra_headers,
+ 200, std::string(kTestData2, 4));
+}
+
+TEST_F(StreamURLRequestJobTest, TestInvalidRangeDataRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, kStreamURL));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(kTestData2));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+
+ net::HttpRequestHeaders extra_headers;
+ extra_headers.SetHeader(net::HttpRequestHeaders::kRange, "bytes=1-3");
+ TestRequest("GET", kStreamURL, extra_headers, 405, std::string());
+}
+
+} // namespace content
diff --git a/chromium/content/browser/streams/stream_write_observer.h b/chromium/content/browser/streams/stream_write_observer.h
new file mode 100644
index 00000000000..deab7ad009e
--- /dev/null
+++ b/chromium/content/browser/streams/stream_write_observer.h
@@ -0,0 +1,27 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_
+
+namespace content {
+
+class Stream;
+
+class StreamWriteObserver {
+ public:
+ // Sent when space becomes available in the stream, and the source should
+ // resume writing.
+ virtual void OnSpaceAvailable(Stream* stream) = 0;
+
+ // Sent when the stream is closed, and the writer should stop sending data.
+ virtual void OnClose(Stream* stream) = 0;
+
+ protected:
+ virtual ~StreamWriteObserver() {}
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_