summaryrefslogtreecommitdiff
path: root/chromium/content/browser/streams
diff options
context:
space:
mode:
authorAndras Becsi <andras.becsi@digia.com>2013-12-11 21:33:03 +0100
committerAndras Becsi <andras.becsi@digia.com>2013-12-13 12:34:07 +0100
commitf2a33ff9cbc6d19943f1c7fbddd1f23d23975577 (patch)
tree0586a32aa390ade8557dfd6b4897f43a07449578 /chromium/content/browser/streams
parent5362912cdb5eea702b68ebe23702468d17c3017a (diff)
downloadqtwebengine-chromium-f2a33ff9cbc6d19943f1c7fbddd1f23d23975577.tar.gz
Update Chromium to branch 1650 (31.0.1650.63)
Change-Id: I57d8c832eaec1eb2364e0a8e7352a6dd354db99f Reviewed-by: Jocelyn Turcotte <jocelyn.turcotte@digia.com>
Diffstat (limited to 'chromium/content/browser/streams')
-rw-r--r--chromium/content/browser/streams/stream.cc59
-rw-r--r--chromium/content/browser/streams/stream.h24
-rw-r--r--chromium/content/browser/streams/stream_registry.cc46
-rw-r--r--chromium/content/browser/streams/stream_registry.h21
-rw-r--r--chromium/content/browser/streams/stream_unittest.cc109
-rw-r--r--chromium/content/browser/streams/stream_url_request_job.cc54
-rw-r--r--chromium/content/browser/streams/stream_url_request_job_unittest.cc1
7 files changed, 286 insertions, 28 deletions
diff --git a/chromium/content/browser/streams/stream.cc b/chromium/content/browser/streams/stream.cc
index f5abe0276e8..5d20fe6ab64 100644
--- a/chromium/content/browser/streams/stream.cc
+++ b/chromium/content/browser/streams/stream.cc
@@ -23,10 +23,11 @@ namespace content {
Stream::Stream(StreamRegistry* registry,
StreamWriteObserver* write_observer,
const GURL& url)
- : data_bytes_read_(0),
- can_add_data_(true),
+ : can_add_data_(true),
url_(url),
data_length_(0),
+ data_bytes_read_(0),
+ last_total_buffered_bytes_(0),
registry_(registry),
read_observer_(NULL),
write_observer_(write_observer),
@@ -67,19 +68,50 @@ void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
write_observer_ = NULL;
}
+void Stream::Abort() {
+ // Clear all buffer. It's safe to clear reader_ here since the same thread
+ // is used for both input and output operation.
+ writer_.reset();
+ reader_.reset();
+ ClearBuffer();
+ can_add_data_ = false;
+ registry_->UnregisterStream(url());
+}
+
void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
+ if (!writer_.get())
+ return;
+
+ size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
+ if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
+ Abort();
+ return;
+ }
+
+ // Now it's guaranteed that this doesn't overflow. This must be done before
+ // Write() since GetTotalBufferedBytes() may return different value after
+ // Write() call, so if we use the new value, information in this instance and
+ // one in |registry_| become inconsistent.
+ last_total_buffered_bytes_ = current_buffered_bytes + size;
+
can_add_data_ = writer_->Write(buffer, size);
}
void Stream::AddData(const char* data, size_t size) {
+ if (!writer_.get())
+ return;
+
scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
memcpy(io_buffer->data(), data, size);
- can_add_data_ = writer_->Write(io_buffer, size);
+ AddData(io_buffer, size);
}
void Stream::Finalize() {
+ if (!writer_.get())
+ return;
+
writer_->Close(0);
- writer_.reset(NULL);
+ writer_.reset();
// Continue asynchronously.
base::MessageLoopProxy::current()->PostTask(
@@ -90,10 +122,17 @@ void Stream::Finalize() {
Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
int buf_size,
int* bytes_read) {
+ DCHECK(buf);
+ DCHECK(bytes_read);
+
*bytes_read = 0;
if (!data_.get()) {
- data_length_ = 0;
- data_bytes_read_ = 0;
+ DCHECK(!data_length_);
+ DCHECK(!data_bytes_read_);
+
+ if (!reader_.get())
+ return STREAM_ABORTED;
+
ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
switch (state) {
case ByteStreamReader::STREAM_HAS_DATA:
@@ -113,7 +152,7 @@ Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
data_bytes_read_ += to_read;
if (data_bytes_read_ >= data_length_)
- data_ = NULL;
+ ClearBuffer();
*bytes_read = to_read;
return STREAM_HAS_DATA;
@@ -150,4 +189,10 @@ void Stream::OnDataAvailable() {
read_observer_->OnDataAvailable(this);
}
+void Stream::ClearBuffer() {
+ data_ = NULL;
+ data_length_ = 0;
+ data_bytes_read_ = 0;
+}
+
} // namespace content
diff --git a/chromium/content/browser/streams/stream.h b/chromium/content/browser/streams/stream.h
index 85edc884081..3937f5b9e38 100644
--- a/chromium/content/browser/streams/stream.h
+++ b/chromium/content/browser/streams/stream.h
@@ -36,6 +36,7 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
STREAM_HAS_DATA,
STREAM_COMPLETE,
STREAM_EMPTY,
+ STREAM_ABORTED,
};
// Creates a stream.
@@ -57,6 +58,10 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
// Removes the write observer. |observer| must be the current observer.
void RemoveWriteObserver(StreamWriteObserver* observer);
+ // Stops accepting new data, clears all buffer, unregisters this stream from
+ // |registry_| and make coming ReadRawData() calls return STREAM_ABORTED.
+ void Abort();
+
// Adds the data in |buffer| to the stream. Takes ownership of |buffer|.
void AddData(scoped_refptr<net::IOBuffer> buffer, size_t size);
// Adds data of |size| at |data| to the stream. This method creates a copy
@@ -81,6 +86,11 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
const GURL& url() const { return url_; }
+ // For StreamRegistry to remember the last memory usage reported to it.
+ size_t last_total_buffered_bytes() const {
+ return last_total_buffered_bytes_;
+ }
+
private:
friend class base::RefCountedThreadSafe<Stream>;
@@ -89,13 +99,25 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
void OnSpaceAvailable();
void OnDataAvailable();
- size_t data_bytes_read_;
+ // Clears |data_| and related variables.
+ void ClearBuffer();
+
bool can_add_data_;
GURL url_;
+ // Buffer for storing data read from |reader_| but not yet read out from this
+ // Stream by ReadRawData() method.
scoped_refptr<net::IOBuffer> data_;
+ // Number of bytes read from |reader_| into |data_| including bytes already
+ // read out.
size_t data_length_;
+ // Number of bytes in |data_| that are already read out.
+ size_t data_bytes_read_;
+
+ // Last value returned by writer_->TotalBufferedBytes() in AddData(). Stored
+ // in order to check memory usage.
+ size_t last_total_buffered_bytes_;
scoped_ptr<ByteStreamWriter> writer_;
scoped_ptr<ByteStreamReader> reader_;
diff --git a/chromium/content/browser/streams/stream_registry.cc b/chromium/content/browser/streams/stream_registry.cc
index 39d24b39f5f..f722f57d03c 100644
--- a/chromium/content/browser/streams/stream_registry.cc
+++ b/chromium/content/browser/streams/stream_registry.cc
@@ -8,7 +8,15 @@
namespace content {
-StreamRegistry::StreamRegistry() {
+namespace {
+// The maximum size of memory each StreamRegistry instance is allowed to use
+// for its Stream instances.
+const size_t kDefaultMaxMemoryUsage = 1024 * 1024 * 1024U; // 1GiB
+}
+
+StreamRegistry::StreamRegistry()
+ : total_memory_usage_(0),
+ max_memory_usage_(kDefaultMaxMemoryUsage) {
}
StreamRegistry::~StreamRegistry() {
@@ -42,7 +50,43 @@ bool StreamRegistry::CloneStream(const GURL& url, const GURL& src_url) {
void StreamRegistry::UnregisterStream(const GURL& url) {
DCHECK(CalledOnValidThread());
+
+ StreamMap::iterator iter = streams_.find(url);
+ if (iter == streams_.end())
+ return;
+
+ // Only update |total_memory_usage_| if |url| is NOT a Stream clone because
+ // cloned streams do not update |total_memory_usage_|.
+ if (iter->second->url() == url) {
+ size_t buffered_bytes = iter->second->last_total_buffered_bytes();
+ DCHECK_LE(buffered_bytes, total_memory_usage_);
+ total_memory_usage_ -= buffered_bytes;
+ }
+
streams_.erase(url);
}
+bool StreamRegistry::UpdateMemoryUsage(const GURL& url,
+ size_t current_size,
+ size_t increase) {
+ DCHECK(CalledOnValidThread());
+
+ StreamMap::iterator iter = streams_.find(url);
+ // A Stream must be registered with its parent registry to get memory.
+ if (iter == streams_.end())
+ return false;
+
+ size_t last_size = iter->second->last_total_buffered_bytes();
+ DCHECK_LE(last_size, total_memory_usage_);
+ size_t usage_of_others = total_memory_usage_ - last_size;
+ DCHECK_LE(current_size, last_size);
+ size_t current_total_memory_usage = usage_of_others + current_size;
+
+ if (increase > max_memory_usage_ - current_total_memory_usage)
+ return false;
+
+ total_memory_usage_ = current_total_memory_usage + increase;
+ return true;
+}
+
} // namespace content
diff --git a/chromium/content/browser/streams/stream_registry.h b/chromium/content/browser/streams/stream_registry.h
index e75c97c1699..a359411d2ae 100644
--- a/chromium/content/browser/streams/stream_registry.h
+++ b/chromium/content/browser/streams/stream_registry.h
@@ -32,20 +32,37 @@ class CONTENT_EXPORT StreamRegistry : public base::NonThreadSafe {
void UnregisterStream(const GURL& url);
+ // Called by Stream instances to request increase of memory usage. If the
+ // total memory usage for this registry is going to exceed the limit,
+ // returns false. Otherwise, updates |total_memory_usage_| and returns true.
+ //
+ // |current_size| is the up-to-date size of ByteStream of the Stream instance
+ // and |increase| must be the amount of data going to be added to the Stream
+ // instance.
+ bool UpdateMemoryUsage(const GURL& url, size_t current_size, size_t increase);
+
// Gets the stream associated with |url|. Returns NULL if there is no such
// stream.
scoped_refptr<Stream> GetStream(const GURL& url);
+ void set_max_memory_usage_for_testing(size_t size) {
+ max_memory_usage_ = size;
+ }
+
private:
typedef std::map<GURL, scoped_refptr<Stream> > StreamMap;
StreamMap streams_;
+ size_t total_memory_usage_;
+
+ // Maximum amount of memory allowed to use for Stream instances registered
+ // with this registry.
+ size_t max_memory_usage_;
+
DISALLOW_COPY_AND_ASSIGN(StreamRegistry);
};
} // namespace content
#endif // CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
-
-
diff --git a/chromium/content/browser/streams/stream_unittest.cc b/chromium/content/browser/streams/stream_unittest.cc
index 36add1d649d..a2d959305fa 100644
--- a/chromium/content/browser/streams/stream_unittest.cc
+++ b/chromium/content/browser/streams/stream_unittest.cc
@@ -41,7 +41,7 @@ class StreamTest : public testing::Test {
class TestStreamReader : public StreamReadObserver {
public:
- TestStreamReader() : buffer_(new net::GrowableIOBuffer()) {
+ TestStreamReader() : buffer_(new net::GrowableIOBuffer()), completed_(false) {
}
virtual ~TestStreamReader() {}
@@ -50,8 +50,25 @@ class TestStreamReader : public StreamReadObserver {
scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));
int bytes_read = 0;
- while (stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read) ==
- Stream::STREAM_HAS_DATA) {
+ while (true) {
+ Stream::StreamState state =
+ stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read);
+ switch (state) {
+ case Stream::STREAM_HAS_DATA:
+ // TODO(tyoshino): Move these expectations to the beginning of Read()
+ // method once Stream::Finalize() is fixed.
+ EXPECT_FALSE(completed_);
+ break;
+ case Stream::STREAM_COMPLETE:
+ completed_ = true;
+ return;
+ case Stream::STREAM_EMPTY:
+ EXPECT_FALSE(completed_);
+ return;
+ case Stream::STREAM_ABORTED:
+ EXPECT_FALSE(completed_);
+ return;
+ }
size_t old_capacity = buffer_->capacity();
buffer_->SetCapacity(old_capacity + bytes_read);
memcpy(buffer_->StartOfBuffer() + old_capacity,
@@ -65,8 +82,13 @@ class TestStreamReader : public StreamReadObserver {
scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; }
+ bool completed() const {
+ return completed_;
+ }
+
private:
scoped_refptr<net::GrowableIOBuffer> buffer_;
+ bool completed_;
};
class TestStreamWriter : public StreamWriteObserver {
@@ -137,14 +159,38 @@ TEST_F(StreamTest, Stream) {
scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
writer.Write(stream.get(), buffer, kBufferSize);
stream->Finalize();
- reader.Read(stream.get());
base::MessageLoop::current()->RunUntilIdle();
+ EXPECT_TRUE(reader.completed());
ASSERT_EQ(reader.buffer()->capacity(), kBufferSize);
for (int i = 0; i < kBufferSize; i++)
EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]);
}
+// Test that even if a reader receives an empty buffer, once TransferData()
+// method is called on it with |source_complete| = true, following Read() calls
+// on it never returns STREAM_EMPTY. Together with StreamTest.Stream above, this
+// guarantees that Reader::Read() call returns only STREAM_HAS_DATA
+// or STREAM_COMPLETE in |data_available_callback_| call corresponding to
+// Writer::Close().
+TEST_F(StreamTest, ClosedReaderDoesNotReturnStreamEmpty) {
+ TestStreamReader reader;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+
+ const int kBufferSize = 0;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ stream->AddData(buffer, kBufferSize);
+ stream->Finalize();
+ base::MessageLoop::current()->RunUntilIdle();
+ EXPECT_TRUE(reader.completed());
+ EXPECT_EQ(0, reader.buffer()->capacity());
+}
+
TEST_F(StreamTest, GetStream) {
TestStreamWriter writer;
@@ -207,4 +253,59 @@ TEST_F(StreamTest, UnregisterStream) {
ASSERT_FALSE(stream2.get());
}
+TEST_F(StreamTest, MemoryExceedMemoryUsageLimit) {
+ TestStreamWriter writer1;
+ TestStreamWriter writer2;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer1, url1));
+
+ GURL url2("blob://stream2");
+ scoped_refptr<Stream> stream2(
+ new Stream(registry_.get(), &writer2, url2));
+
+ const int kMaxMemoryUsage = 1500000;
+ registry_->set_max_memory_usage_for_testing(kMaxMemoryUsage);
+
+ const int kBufferSize = 1000000;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ writer1.Write(stream1.get(), buffer, kBufferSize);
+ // Make transfer happen.
+ base::MessageLoop::current()->RunUntilIdle();
+
+ writer2.Write(stream2.get(), buffer, kBufferSize);
+
+ // Written data (1000000 * 2) exceeded limit (1500000). |stream2| should be
+ // unregistered with |registry_|.
+ EXPECT_EQ(NULL, registry_->GetStream(url2).get());
+
+ writer1.Write(stream1.get(), buffer, kMaxMemoryUsage - kBufferSize);
+ // Should be accepted since stream2 is unregistered and the new data is not
+ // so big to exceed the limit.
+ EXPECT_FALSE(registry_->GetStream(url1).get() == NULL);
+}
+
+TEST_F(StreamTest, UnderMemoryUsageLimit) {
+ TestStreamWriter writer;
+ TestStreamReader reader;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+
+ registry_->set_max_memory_usage_for_testing(1500000);
+
+ const int kBufferSize = 1000000;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ writer.Write(stream.get(), buffer, kBufferSize);
+
+ // Run loop to make |reader| consume the data.
+ base::MessageLoop::current()->RunUntilIdle();
+
+ writer.Write(stream.get(), buffer, kBufferSize);
+
+ EXPECT_EQ(stream.get(), registry_->GetStream(url).get());
+}
+
} // namespace content
diff --git a/chromium/content/browser/streams/stream_url_request_job.cc b/chromium/content/browser/streams/stream_url_request_job.cc
index e36c5d49ae6..09b9e6d7815 100644
--- a/chromium/content/browser/streams/stream_url_request_job.cc
+++ b/chromium/content/browser/streams/stream_url_request_job.cc
@@ -39,19 +39,42 @@ StreamURLRequestJob::~StreamURLRequestJob() {
void StreamURLRequestJob::OnDataAvailable(Stream* stream) {
// Clear the IO_PENDING status.
SetStatus(net::URLRequestStatus());
- if (pending_buffer_.get()) {
- int bytes_read;
- stream_->ReadRawData(
- pending_buffer_.get(), pending_buffer_size_, &bytes_read);
-
- // Clear the buffers before notifying the read is complete, so that it is
- // safe for the observer to read.
- pending_buffer_ = NULL;
- pending_buffer_size_ = 0;
-
- total_bytes_read_ += bytes_read;
- NotifyReadComplete(bytes_read);
+ // Do nothing if pending_buffer_ is empty, i.e. there's no ReadRawData()
+ // operation waiting for IO completion.
+ if (!pending_buffer_.get())
+ return;
+
+ // pending_buffer_ is set to the IOBuffer instance provided to ReadRawData()
+ // by URLRequestJob.
+
+ int bytes_read;
+ switch (stream_->ReadRawData(
+ pending_buffer_.get(), pending_buffer_size_, &bytes_read)) {
+ case Stream::STREAM_HAS_DATA:
+ DCHECK_GT(bytes_read, 0);
+ break;
+ case Stream::STREAM_COMPLETE:
+ // Ensure this. Calling NotifyReadComplete call with 0 signals
+ // completion.
+ bytes_read = 0;
+ break;
+ case Stream::STREAM_EMPTY:
+ NOTREACHED();
+ break;
+ case Stream::STREAM_ABORTED:
+ // Handle this as connection reset.
+ NotifyDone(net::URLRequestStatus(net::URLRequestStatus::FAILED,
+ net::ERR_CONNECTION_RESET));
+ break;
}
+
+ // Clear the buffers before notifying the read is complete, so that it is
+ // safe for the observer to read.
+ pending_buffer_ = NULL;
+ pending_buffer_size_ = 0;
+
+ total_bytes_read_ += bytes_read;
+ NotifyReadComplete(bytes_read);
}
// net::URLRequestJob methods.
@@ -74,6 +97,7 @@ bool StreamURLRequestJob::ReadRawData(net::IOBuffer* buf,
if (request_failed_)
return true;
+ DCHECK(buf);
DCHECK(bytes_read);
int to_read = buf_size;
if (max_range_ && to_read) {
@@ -96,6 +120,11 @@ bool StreamURLRequestJob::ReadRawData(net::IOBuffer* buf,
pending_buffer_size_ = to_read;
SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0));
return false;
+ case Stream::STREAM_ABORTED:
+ // Handle this as connection reset.
+ NotifyDone(net::URLRequestStatus(net::URLRequestStatus::FAILED,
+ net::ERR_CONNECTION_RESET));
+ return false;
}
NOTREACHED();
return false;
@@ -167,7 +196,6 @@ void StreamURLRequestJob::NotifyFailure(int error_code) {
// TODO(zork): Share these with BlobURLRequestJob.
net::HttpStatusCode status_code = net::HTTP_INTERNAL_SERVER_ERROR;
- std::string status_txt;
switch (error_code) {
case net::ERR_ACCESS_DENIED:
status_code = net::HTTP_FORBIDDEN;
diff --git a/chromium/content/browser/streams/stream_url_request_job_unittest.cc b/chromium/content/browser/streams/stream_url_request_job_unittest.cc
index 4bb1798b514..8914fe5c50c 100644
--- a/chromium/content/browser/streams/stream_url_request_job_unittest.cc
+++ b/chromium/content/browser/streams/stream_url_request_job_unittest.cc
@@ -83,6 +83,7 @@ class StreamURLRequestJobTest : public testing::Test {
// Verify response.
EXPECT_TRUE(request_->status().is_success());
+ ASSERT_TRUE(request_->response_headers());
EXPECT_EQ(expected_status_code,
request_->response_headers()->response_code());
EXPECT_EQ(expected_response, delegate.data_received());