0
Files
android_webview
apps
ash
base
build
build_overrides
buildtools
cc
chrome
chromecast
chromeos
codelabs
components
content
app
app_shim_remote_cocoa
browser
accessibility
aggregation_service
android
attribution_reporting
audio
background_fetch
background_sync
blob_storage
bluetooth
broadcast_channel
browser_plugin
browsing_data
browsing_topics
buckets
cache_storage
client_hints
cocoa
code_cache
compositor
compute_pressure
contacts
content_index
cookie_store
device
device_posture
device_sensors
devtools
direct_sockets
display_cutout
dom_storage
download
fenced_frame
file_system
file_system_access
first_party_sets
font_access
font_unique_name_lookup
generic_sensor
geolocation
gpu
handwriting
hid
hyphenation
idle
image_capture
indexed_db
installedapp
interest_group
keyboard_lock
loader
lock_screen
locks
manifest
media
media_session
memory
metrics
ml
native_io
net
notifications
origin_trials
payments
performance_manager
permissions
picture_in_picture
portal
preloading
presentation
private_aggregation
process_internals
push_messaging
quota
reduce_accept_language
renderer_host
resources
scheduler
screen_enumeration
screen_orientation
screenlock_monitor
serial
service_worker
shape_detection
shared_storage
sms
speech
ssl
tracing
usb
wake_lock
web_contents
web_database
web_package
webauth
webid
webrtc
websockets
webtransport
webui
worker_host
xr
zygote_host
BACK_FORWARD_CACHE_OWNERS
BUILD.gn
DEPS
OWNERS
PRESUBMIT.py
SITE_ISOLATION_OWNERS
about_url_loader_factory.cc
about_url_loader_factory.h
after_startup_task_utils.cc
after_startup_task_utils.h
back_forward_cache_basics_browsertest.cc
back_forward_cache_browsertest.cc
back_forward_cache_browsertest.h
back_forward_cache_features_browsertest.cc
back_forward_cache_internal_browsertest.cc
back_forward_cache_network_request_browsertest.cc
back_forward_cache_no_store_browsertest.cc
back_forward_cache_not_restored_reasons_browsertest.cc
back_forward_cache_test_util.cc
back_forward_cache_test_util.h
bad_message.cc
bad_message.h
battery_monitor_browsertest.cc
bookmarklet_browsertest.cc
browser_associated_interface_unittest.cc
browser_child_process_host_impl.cc
browser_child_process_host_impl.h
browser_child_process_host_impl_receiver_bindings.cc
browser_context.cc
browser_context_impl.cc
browser_context_impl.h
browser_interface_binders.cc
browser_interface_binders.h
browser_interface_broker_impl.h
browser_main.cc
browser_main.h
browser_main_loop.cc
browser_main_loop.h
browser_main_loop_unittest.cc
browser_main_runner_impl.cc
browser_main_runner_impl.h
browser_process_io_thread.cc
browser_process_io_thread.h
browser_task_traits_unittest.nc
browser_thread_browsertest.cc
browser_thread_impl.cc
browser_thread_impl.h
browser_thread_unittest.cc
browser_url_handler_impl.cc
browser_url_handler_impl.h
browser_url_handler_impl_unittest.cc
browsing_instance.cc
browsing_instance.h
byte_stream.cc
byte_stream.h
byte_stream_unittest.cc
can_commit_status.h
child_process_launcher.cc
child_process_launcher.h
child_process_launcher_browsertest.cc
child_process_launcher_helper.cc
child_process_launcher_helper.h
child_process_launcher_helper_android.cc
child_process_launcher_helper_fuchsia.cc
child_process_launcher_helper_linux.cc
child_process_launcher_helper_mac.cc
child_process_launcher_helper_posix.cc
child_process_launcher_helper_posix.h
child_process_launcher_helper_win.cc
child_process_security_policy_browsertest.cc
child_process_security_policy_impl.cc
child_process_security_policy_impl.h
child_process_security_policy_unittest.cc
child_process_task_port_provider_mac.cc
child_process_task_port_provider_mac.h
child_process_task_port_provider_mac_unittest.cc
content_security_policy_browsertest.cc
context_factory.cc
cross_origin_opener_policy_browsertest.cc
cross_site_transfer_browsertest.cc
data_decoder_browsertest.cc
data_url_loader_factory.cc
data_url_loader_factory.h
database_browsertest.cc
do_not_track_browsertest.cc
eye_dropper_chooser_impl.cc
eye_dropper_chooser_impl.h
feature_observer.cc
feature_observer.h
field_trial_recorder.cc
field_trial_recorder.h
field_trial_synchronizer.cc
field_trial_synchronizer.h
find_in_page_client.cc
find_in_page_client.h
find_request_manager.cc
find_request_manager.h
find_request_manager_browsertest.cc
font_list_async.cc
font_preferences_browsertest.cc
font_service.cc
font_service.h
form_controls_browsertest.cc
form_controls_browsertest_mac.h
form_controls_browsertest_mac.mm
host_zoom_level_context.cc
host_zoom_level_context.h
host_zoom_map_impl.cc
host_zoom_map_impl.h
host_zoom_map_impl_browsertest.cc
host_zoom_map_impl_unittest.cc
isolated_origin_browsertest.cc
isolated_origin_util.cc
isolated_origin_util.h
isolation_context.cc
isolation_context.h
keyboard_lock_browsertest.cc
keyboard_lock_browsertest.h
keyboard_lock_browsertest_mac.mm
launch_as_mojo_client_browsertest.cc
log_console_message.cc
log_console_message.h
message_port_provider.cc
message_port_provider_browsertest.cc
mime_registry_impl.cc
mime_registry_impl.h
mojo_binder_policy_applier.cc
mojo_binder_policy_applier.h
mojo_binder_policy_applier_unittest.cc
mojo_binder_policy_map_impl.cc
mojo_binder_policy_map_impl.h
mojo_binder_policy_map_impl_unittest.cc
mojo_sandbox_browsertest.cc
native_profiling.pdl
navigation_browsertest.cc
navigation_mhtml_browsertest.cc
navigation_or_document_handle.cc
navigation_or_document_handle.h
navigation_subresource_loader_params.cc
navigation_subresource_loader_params.h
net_info_browsertest.cc
network_context_client_base_impl.cc
network_context_client_base_impl.h
network_context_client_base_impl_unittest.cc
network_sandbox.cc
network_sandbox.h
network_sandbox_grant_result.h
network_service_browsertest.cc
network_service_client.cc
network_service_client.h
network_service_instance_impl.cc
network_service_instance_impl.h
network_service_restart_browsertest.cc
notification_service_impl.cc
notification_service_impl.h
notification_service_impl_unittest.cc
origin_agent_cluster_isolation_state.cc
origin_agent_cluster_isolation_state.h
per_web_ui_browser_interface_broker.cc
performance_memory_browsertest.cc
performance_timeline_back_forward_cache_restoration_browsertest.cc
performance_timeline_navigation_id_browsertest.cc
plugin_list.cc
plugin_list.h
plugin_list_unittest.cc
plugin_service_impl.cc
plugin_service_impl.h
plugin_service_impl_browsertest.cc
pointer_lock_browsertest.cc
pointer_lock_browsertest.h
pointer_lock_browsertest_mac.mm
posix_file_descriptor_info_impl.cc
posix_file_descriptor_info_impl.h
posix_file_descriptor_info_impl_unittest.cc
power_monitor_browsertest.cc
ppapi_plugin_process_host.cc
ppapi_plugin_process_host.h
ppapi_plugin_process_host_receiver_bindings.cc
ppapi_plugin_sandboxed_process_launcher_delegate.cc
ppapi_plugin_sandboxed_process_launcher_delegate.h
ppapi_plugin_sandboxed_process_launcher_delegate_unittest.cc
process_lock.cc
process_lock.h
process_visibility_util.cc
profiling_utils.cc
resource_context_impl.cc
resource_context_impl.h
resource_coordinator_service.cc
resource_loading_browsertest.cc
sandbox_host_linux.cc
sandbox_host_linux.h
sandbox_ipc_linux.cc
sandbox_ipc_linux.h
sandbox_mac_unittest.mm
sandbox_parameters_mac.h
sandbox_parameters_mac.mm
sandbox_support_mac_impl.h
sandbox_support_mac_impl.mm
scoped_active_url.cc
scoped_active_url.h
security_exploit_browsertest.cc
service_process_host_browsertest.cc
service_process_host_impl.cc
session_history_browsertest.cc
shareable_file_reference_unittest.cc
site_info.cc
site_info.h
site_instance_group.cc
site_instance_group.h
site_instance_group_manager.cc
site_instance_group_manager.h
site_instance_impl.cc
site_instance_impl.h
site_instance_impl_unittest.cc
site_isolation_policy_unittest.cc
site_per_process_browsertest.cc
site_per_process_browsertest.h
site_per_process_hit_test_browsertest.cc
site_per_process_layout_browsertest.cc
site_per_process_mac_browsertest.mm
site_per_process_mixed_content_browsertest.cc
site_per_process_oopsif_browsertest.cc
site_per_process_sad_frame_browsertest.cc
site_per_process_scroll_browsertest.cc
site_per_process_unload_browsertest.cc
site_per_process_web_bundle_browsertest.cc
smoke_rust_browsertest.rs
snapshot_browsertest.cc
ssl_private_key_impl.cc
ssl_private_key_impl.h
starscan_load_observer.cc
starscan_load_observer.h
startup_data_impl.cc
startup_data_impl.h
startup_helper.cc
startup_helper.h
startup_task_runner.cc
startup_task_runner.h
startup_task_runner_unittest.cc
storage_partition_config_unittest.cc
storage_partition_impl.cc
storage_partition_impl.h
storage_partition_impl_browsertest.cc
storage_partition_impl_map.cc
storage_partition_impl_map.h
storage_partition_impl_map_unittest.cc
storage_partition_impl_unittest.cc
storage_service_restart_browsertest.cc
storage_service_sandbox_browsertest.cc
text_fragment_browsertest.cc
theme_helper.cc
theme_helper.h
theme_helper_mac.h
theme_helper_mac.mm
ukm_internals_ui.cc
ukm_internals_ui.h
url_info.cc
url_info.h
url_loader_factory_getter.cc
url_loader_factory_getter.h
url_loader_factory_params_helper.cc
url_loader_factory_params_helper.h
utility_process_host.cc
utility_process_host.h
utility_process_host_browsertest.cc
utility_process_host_receiver_bindings.cc
utility_process_sandbox_browsertest.cc
utility_sandbox_delegate.cc
utility_sandbox_delegate.h
utility_sandbox_delegate_win.cc
v8_snapshot_files.cc
v8_snapshot_files.h
vibration_browsertest.cc
video_capture_service.cc
web_exposed_isolation_info.cc
web_exposed_isolation_info.h
web_exposed_isolation_info_unittest.cc
web_ui_browser_interface_broker_registry.cc
webkit_browsertest.cc
worker_network_isolation_key_browsertest.cc
zoom_browsertest.cc
child
common
gpu
ppapi_plugin
public
renderer
services
shell
test
utility
web_test
zygote
BUILD.gn
DEPS
DIR_METADATA
OWNERS
README.md
content_resources.grd
dev_ui_content_resources.grd
courgette
crypto
dbus
device
docs
extensions
fuchsia_web
gin
google_apis
google_update
gpu
headless
infra
ios
ipc
media
mojo
native_client_sdk
net
pdf
ppapi
printing
remoting
rlz
sandbox
services
skia
sql
storage
styleguide
testing
third_party
tools
ui
url
weblayer
.clang-format
.clang-tidy
.eslintrc.js
.git-blame-ignore-revs
.gitattributes
.gitignore
.gn
.mailmap
.rustfmt.toml
.vpython3
.yapfignore
AUTHORS
BUILD.gn
CODE_OF_CONDUCT.md
DEPS
DIR_METADATA
ENG_REVIEW_OWNERS
LICENSE
LICENSE.chromium_os
OWNERS
PRESUBMIT.py
PRESUBMIT_test.py
PRESUBMIT_test_mocks.py
README.md
WATCHLISTS
codereview.settings
src/content/browser/byte_stream.cc
Avi Drissman 4e1b7bc33d Update copyright headers in content/
The methodology used to generate this CL is documented in
https://crbug.com/1098010#c34.

No-Try: true
No-Presubmit: true
Bug: 1098010
Change-Id: I8c0f009d16350271f07d8e5e561085822cc9dd27
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3895935
Owners-Override: Avi Drissman <avi@chromium.org>
Reviewed-by: Mark Mentovai <mark@chromium.org>
Commit-Queue: Mark Mentovai <mark@chromium.org>
Auto-Submit: Avi Drissman <avi@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1047456}
2022-09-15 14:03:50 +00:00

463 lines
15 KiB
C++

// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/byte_stream.h"
#include <memory>
#include <set>
#include <utility>
#include "base/bind.h"
#include "base/containers/circular_deque.h"
#include "base/location.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
namespace content {
namespace {
using ContentVector =
base::circular_deque<std::pair<scoped_refptr<net::IOBuffer>, size_t>>;
class ByteStreamReaderImpl;
// A makeshift weak pointer; a RefCountedThreadSafe boolean that can be cleared
// in an object destructor and accessed to check for object existence. We can't
// use weak pointers because they're tightly tied to threads rather than task
// runners.
// TODO(rdsmith): A better solution would be extending weak pointers
// to support SequencedTaskRunners.
struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
public:
LifetimeFlag() : is_alive(true) { }
LifetimeFlag(const LifetimeFlag&) = delete;
LifetimeFlag& operator=(const LifetimeFlag&) = delete;
bool is_alive;
protected:
friend class base::RefCountedThreadSafe<LifetimeFlag>;
virtual ~LifetimeFlag() {}
};
// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
// SetPeer may happen anywhere; all other operations on each class must
// happen in the context of their SequencedTaskRunner.
class ByteStreamWriterImpl : public ByteStreamWriter {
public:
ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size);
~ByteStreamWriterImpl() override;
// Must be called before any operations are performed.
void SetPeer(ByteStreamReaderImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag);
// Overridden from ByteStreamWriter.
bool Write(scoped_refptr<net::IOBuffer> buffer, size_t byte_count) override;
void Flush() override;
void Close(int status) override;
void RegisterCallback(base::RepeatingClosure source_callback) override;
size_t GetTotalBufferedBytes() const override;
// PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
ByteStreamWriterImpl* target,
size_t bytes_consumed);
private:
// Called from UpdateWindow when object existence has been validated.
void UpdateWindowInternal(size_t bytes_consumed);
void PostToPeer(bool complete, int status);
const size_t total_buffer_size_;
// All data objects in this class are only valid to access on
// this task runner except as otherwise noted.
scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
// True while this object is alive.
scoped_refptr<LifetimeFlag> my_lifetime_flag_;
base::RepeatingClosure space_available_callback_;
ContentVector input_contents_;
size_t input_contents_size_;
// ** Peer information.
scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
// How much we've sent to the output that for flow control purposes we
// must assume hasn't been read yet.
size_t output_size_used_;
// Only valid to access on peer_task_runner_.
scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
// Only valid to access on peer_task_runner_ if
// |*peer_lifetime_flag_ == true|
raw_ptr<ByteStreamReaderImpl> peer_;
};
class ByteStreamReaderImpl : public ByteStreamReader {
public:
ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size);
~ByteStreamReaderImpl() override;
// Must be called before any operations are performed.
void SetPeer(ByteStreamWriterImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag);
// Overridden from ByteStreamReader.
StreamState Read(scoped_refptr<net::IOBuffer>* data, size_t* length) override;
int GetStatus() const override;
void RegisterCallback(base::RepeatingClosure sink_callback) override;
// PostTask target from |ByteStreamWriterImpl::Write| and
// |ByteStreamWriterImpl::Close|.
// Receive data from our peer.
// static because it may be called after the object it is targeting
// has been destroyed. It may not access |*target|
// if |*object_lifetime_flag| is false.
static void TransferData(scoped_refptr<LifetimeFlag> object_lifetime_flag,
ByteStreamReaderImpl* target,
std::unique_ptr<ContentVector> transfer_buffer,
size_t transfer_buffer_bytes,
bool source_complete,
int status);
private:
// Called from TransferData once object existence has been validated.
void TransferDataInternal(std::unique_ptr<ContentVector> transfer_buffer,
size_t transfer_buffer_bytes,
bool source_complete,
int status);
void MaybeUpdateInput();
const size_t total_buffer_size_;
scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
// True while this object is alive.
scoped_refptr<LifetimeFlag> my_lifetime_flag_;
ContentVector available_contents_;
bool received_status_;
int status_;
base::RepeatingClosure data_available_callback_;
// Time of last point at which data in stream transitioned from full
// to non-full. Nulled when a callback is sent.
base::Time last_non_full_time_;
// ** Peer information
scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
// How much has been removed from this class that we haven't told
// the input about yet.
size_t unreported_consumed_bytes_;
// Only valid to access on peer_task_runner_.
scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
// Only valid to access on peer_task_runner_ if
// |*peer_lifetime_flag_ == true|
raw_ptr<ByteStreamWriterImpl> peer_;
};
ByteStreamWriterImpl::ByteStreamWriterImpl(
scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size)
: total_buffer_size_(buffer_size),
my_task_runner_(task_runner),
my_lifetime_flag_(lifetime_flag),
input_contents_size_(0),
output_size_used_(0),
peer_(nullptr) {
DCHECK(my_lifetime_flag_.get());
my_lifetime_flag_->is_alive = true;
}
ByteStreamWriterImpl::~ByteStreamWriterImpl() {
// No RunsTasksInCurrentSequence() check to allow deleting a created writer
// before we start using it. Once started, should be deleted on the specified
// task runner.
my_lifetime_flag_->is_alive = false;
}
void ByteStreamWriterImpl::SetPeer(
ByteStreamReaderImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
peer_ = peer;
peer_task_runner_ = peer_task_runner;
peer_lifetime_flag_ = peer_lifetime_flag;
}
bool ByteStreamWriterImpl::Write(
scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
// Check overflow.
//
// TODO(tyoshino): Discuss with content/browser/download developer and if
// they're fine with, set smaller limit and make it configurable.
size_t space_limit = std::numeric_limits<size_t>::max() -
GetTotalBufferedBytes();
if (byte_count > space_limit) {
// TODO(tyoshino): Tell the user that Write() failed.
// Ignore input.
return false;
}
input_contents_.push_back(std::make_pair(buffer, byte_count));
input_contents_size_ += byte_count;
// Arbitrarily, we buffer to a third of the total size before sending.
if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
PostToPeer(false, 0);
return GetTotalBufferedBytes() <= total_buffer_size_;
}
void ByteStreamWriterImpl::Flush() {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
if (input_contents_size_ > 0)
PostToPeer(false, 0);
}
void ByteStreamWriterImpl::Close(int status) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
PostToPeer(true, status);
}
void ByteStreamWriterImpl::RegisterCallback(
base::RepeatingClosure source_callback) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
space_available_callback_ = std::move(source_callback);
}
size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
// This sum doesn't overflow since Write() fails if this sum is going to
// overflow.
return input_contents_size_ + output_size_used_;
}
// static
void ByteStreamWriterImpl::UpdateWindow(
scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
size_t bytes_consumed) {
// If the target object isn't alive anymore, we do nothing.
if (!lifetime_flag->is_alive) return;
target->UpdateWindowInternal(bytes_consumed);
}
void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
DCHECK_GE(output_size_used_, bytes_consumed);
output_size_used_ -= bytes_consumed;
// Callback if we were above the limit and we're now <= to it.
bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
if (no_longer_above_limit && was_above_limit &&
!space_available_callback_.is_null())
space_available_callback_.Run();
}
void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
// Valid contexts in which to call.
DCHECK(complete || 0 != input_contents_size_);
std::unique_ptr<ContentVector> transfer_buffer;
size_t buffer_size = 0;
if (0 != input_contents_size_) {
transfer_buffer = std::make_unique<ContentVector>();
transfer_buffer->swap(input_contents_);
buffer_size = input_contents_size_;
output_size_used_ += input_contents_size_;
input_contents_size_ = 0;
}
peer_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ByteStreamReaderImpl::TransferData, peer_lifetime_flag_,
peer_, std::move(transfer_buffer), buffer_size, complete,
status));
}
ByteStreamReaderImpl::ByteStreamReaderImpl(
scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size)
: total_buffer_size_(buffer_size),
my_task_runner_(task_runner),
my_lifetime_flag_(lifetime_flag),
received_status_(false),
status_(0),
unreported_consumed_bytes_(0),
peer_(nullptr) {
DCHECK(my_lifetime_flag_.get());
my_lifetime_flag_->is_alive = true;
}
ByteStreamReaderImpl::~ByteStreamReaderImpl() {
// No RunsTasksInCurrentSequence() check to allow deleting a created writer
// before we start using it. Once started, should be deleted on the specified
// task runner.
my_lifetime_flag_->is_alive = false;
}
void ByteStreamReaderImpl::SetPeer(
ByteStreamWriterImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
peer_ = peer;
peer_task_runner_ = peer_task_runner;
peer_lifetime_flag_ = peer_lifetime_flag;
}
ByteStreamReaderImpl::StreamState
ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
size_t* length) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
if (available_contents_.size()) {
*data = available_contents_.front().first;
*length = available_contents_.front().second;
available_contents_.pop_front();
unreported_consumed_bytes_ += *length;
MaybeUpdateInput();
return STREAM_HAS_DATA;
}
if (received_status_) {
return STREAM_COMPLETE;
}
return STREAM_EMPTY;
}
int ByteStreamReaderImpl::GetStatus() const {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
DCHECK(received_status_);
return status_;
}
void ByteStreamReaderImpl::RegisterCallback(
base::RepeatingClosure sink_callback) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
data_available_callback_ = std::move(sink_callback);
}
// static
void ByteStreamReaderImpl::TransferData(
scoped_refptr<LifetimeFlag> object_lifetime_flag,
ByteStreamReaderImpl* target,
std::unique_ptr<ContentVector> transfer_buffer,
size_t buffer_size,
bool source_complete,
int status) {
// If our target is no longer alive, do nothing.
if (!object_lifetime_flag->is_alive) return;
target->TransferDataInternal(std::move(transfer_buffer), buffer_size,
source_complete, status);
}
void ByteStreamReaderImpl::TransferDataInternal(
std::unique_ptr<ContentVector> transfer_buffer,
size_t buffer_size,
bool source_complete,
int status) {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
bool was_empty = available_contents_.empty();
if (transfer_buffer) {
available_contents_.insert(available_contents_.end(),
transfer_buffer->begin(),
transfer_buffer->end());
}
if (source_complete) {
received_status_ = true;
status_ = status;
}
// Callback on transition from empty to non-empty, or
// source complete.
if (((was_empty && !available_contents_.empty()) ||
source_complete) &&
!data_available_callback_.is_null())
data_available_callback_.Run();
}
// Decide whether or not to send the input a window update.
// Currently we do that whenever we've got unreported consumption
// greater than 1/3 of total size.
void ByteStreamReaderImpl::MaybeUpdateInput() {
DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
if (unreported_consumed_bytes_ <=
total_buffer_size_ / kFractionReadBeforeWindowUpdate)
return;
peer_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ByteStreamWriterImpl::UpdateWindow, peer_lifetime_flag_,
peer_, unreported_consumed_bytes_));
unreported_consumed_bytes_ = 0;
}
} // namespace
const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
ByteStreamReader::~ByteStreamReader() { }
ByteStreamWriter::~ByteStreamWriter() { }
void CreateByteStream(
scoped_refptr<base::SequencedTaskRunner> input_task_runner,
scoped_refptr<base::SequencedTaskRunner> output_task_runner,
size_t buffer_size,
std::unique_ptr<ByteStreamWriter>* input,
std::unique_ptr<ByteStreamReader>* output) {
scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
input_task_runner, input_flag, buffer_size);
ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
output_task_runner, output_flag, buffer_size);
in->SetPeer(out, output_task_runner, output_flag);
out->SetPeer(in, input_task_runner, input_flag);
input->reset(in);
output->reset(out);
}
} // namespace content