HttpStreamPool: Use Group instead of AttemptManager in Job
This CL replaces a raw_ptr of AttemptManager in Job with a raw_ptr of Group so that Job can be created without AttemptManager. When a Job is created during the current AttemptManager is failing, Group owns a raw_ptr to the Job so that it can associate the raw_ptr to the new AttemptManager after it recovers from the failing state. Note that this CL doesn't implement recovering logic yet. No visible changes. Subsequent CLs will implement recovering logic. Bug: 381742472 Change-Id: I568d83304bda16222e5da5c5a1d919806ab25d1b Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/6063740 Commit-Queue: Kenichi Ishibashi <bashi@chromium.org> Reviewed-by: Nidhi Jaju <nidhijaju@chromium.org> Cr-Commit-Position: refs/heads/main@{#1391513}
This commit is contained in:

committed by
Chromium LUCI CQ

parent
00c6ab3e38
commit
cd7f174a7e
@ -213,6 +213,8 @@ void HttpStreamPool::AttemptManager::StartJob(
|
||||
const std::vector<SSLConfig::CertAndStatus>& allowed_bad_certs,
|
||||
quic::ParsedQuicVersion quic_version,
|
||||
const NetLogWithSource& net_log) {
|
||||
CHECK(!is_failing_);
|
||||
|
||||
MaybeUpdateQuicVersionWhenForced(quic_version);
|
||||
net_log_.AddEvent(
|
||||
NetLogEventType::HTTP_STREAM_POOL_ATTEMPT_MANAGER_START_JOB, [&] {
|
||||
@ -257,14 +259,6 @@ void HttpStreamPool::AttemptManager::StartJob(
|
||||
|
||||
jobs_.Insert(job, priority);
|
||||
|
||||
if (is_failing_) {
|
||||
// `this` is failing, notify the failure.
|
||||
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
|
||||
FROM_HERE, base::BindOnce(&AttemptManager::NotifyJobOfFailure,
|
||||
weak_ptr_factory_.GetWeakPtr()));
|
||||
return;
|
||||
}
|
||||
|
||||
RestrictAllowedProtocols(job->allowed_alpns());
|
||||
|
||||
MaybeChangeServiceEndpointRequestPriority();
|
||||
|
@ -69,6 +69,10 @@ class HttpStreamPool::AttemptManager
|
||||
return service_endpoint_request_.get();
|
||||
}
|
||||
|
||||
bool is_failing() const { return is_failing_; }
|
||||
|
||||
int error_to_notify() const { return error_to_notify_; }
|
||||
|
||||
bool is_service_endpoint_request_finished() const {
|
||||
return service_endpoint_request_finished_;
|
||||
}
|
||||
|
@ -3537,34 +3537,42 @@ TEST_F(HttpStreamPoolAttemptManagerTest,
|
||||
success_data->set_connect_data(MockConnect(ASYNC, OK));
|
||||
socket_factory()->AddSocketDataProvider(success_data.get());
|
||||
|
||||
StreamRequester requester1;
|
||||
requester1.set_destination(kDestination).RequestStream(pool());
|
||||
HttpStreamKey stream_key =
|
||||
StreamKeyBuilder().set_destination(kDestination).Build();
|
||||
|
||||
RunUntilIdle();
|
||||
StreamRequester requester1(stream_key);
|
||||
requester1.RequestStream(pool());
|
||||
|
||||
requester1.WaitForResult();
|
||||
EXPECT_THAT(requester1.result(), Optional(IsError(ERR_CONNECTION_RESET)));
|
||||
|
||||
// The first request isn't destroyed yet so the failing attempt manager is
|
||||
// still alive. A request that comes during a failure also fails.
|
||||
StreamRequester requester2;
|
||||
requester2.set_destination(kDestination).RequestStream(pool());
|
||||
RunUntilIdle();
|
||||
StreamRequester requester2(stream_key);
|
||||
requester2.RequestStream(pool());
|
||||
requester2.WaitForResult();
|
||||
EXPECT_THAT(requester2.result(), Optional(IsError(ERR_CONNECTION_RESET)));
|
||||
EXPECT_EQ(pool().GetGroupForTesting(stream_key)->PausedJobCount(), 1u);
|
||||
|
||||
// Preconnect fails too.
|
||||
Preconnector preconnector1(kDestination);
|
||||
EXPECT_THAT(preconnector1.Preconnect(pool()), IsError(ERR_CONNECTION_RESET));
|
||||
|
||||
// Destroy failed requests. This should destroy the failing attempt manager.
|
||||
// Destroy failed requests. This should destroy the failing attempt manager
|
||||
// and the group.
|
||||
requester1.ResetRequest();
|
||||
requester2.ResetRequest();
|
||||
FastForwardUntilNoTasksRemain();
|
||||
ASSERT_FALSE(pool().GetGroupForTesting(stream_key));
|
||||
|
||||
// Request a stream again. This time server is happy to accept the connection.
|
||||
StreamRequester requester3;
|
||||
requester3.set_destination(kDestination).RequestStream(pool());
|
||||
StreamRequester requester3(stream_key);
|
||||
requester3.RequestStream(pool());
|
||||
|
||||
RunUntilIdle();
|
||||
requester3.WaitForResult();
|
||||
EXPECT_THAT(requester3.result(), Optional(IsOk()));
|
||||
|
||||
// Preconnect should also succeed.
|
||||
Preconnector preconnector2(kDestination);
|
||||
EXPECT_THAT(preconnector2.Preconnect(pool()), IsOk());
|
||||
}
|
||||
|
@ -108,9 +108,34 @@ std::unique_ptr<HttpStreamPool::Job> HttpStreamPool::Group::CreateJob(
|
||||
quic::ParsedQuicVersion quic_version,
|
||||
NextProto expected_protocol,
|
||||
const NetLogWithSource& net_log) {
|
||||
return std::make_unique<Job>(delegate, this, quic_version, expected_protocol,
|
||||
net_log);
|
||||
}
|
||||
|
||||
bool HttpStreamPool::Group::CanStartJob(Job* job) {
|
||||
if (IsFailing()) {
|
||||
auto [_, inserted] = paused_jobs_.emplace(job);
|
||||
CHECK(inserted);
|
||||
// TODO(crbug.com/381742472): Resume `job` after this recovers from the
|
||||
// failing state. Currently just fail with an error.
|
||||
job->OnStreamFailed(attempt_manager_->error_to_notify(), NetErrorDetails(),
|
||||
ResolveErrorInfo());
|
||||
return false;
|
||||
}
|
||||
|
||||
EnsureAttemptManager();
|
||||
return std::make_unique<Job>(delegate, attempt_manager_.get(), quic_version,
|
||||
expected_protocol, net_log);
|
||||
return true;
|
||||
}
|
||||
|
||||
void HttpStreamPool::Group::OnJobComplete(Job* job) {
|
||||
paused_jobs_.erase(job);
|
||||
|
||||
if (attempt_manager_) {
|
||||
attempt_manager_->OnJobComplete(job);
|
||||
// `this` may be deleted.
|
||||
} else {
|
||||
MaybeComplete();
|
||||
}
|
||||
}
|
||||
|
||||
int HttpStreamPool::Group::Preconnect(size_t num_streams,
|
||||
@ -120,6 +145,8 @@ int HttpStreamPool::Group::Preconnect(size_t num_streams,
|
||||
return OK;
|
||||
}
|
||||
|
||||
// TODO(crbug.com/381742472): Have this preconnect paused if the
|
||||
// existing attempts are failing.
|
||||
EnsureAttemptManager();
|
||||
return attempt_manager_->Preconnect(num_streams, quic_version,
|
||||
std::move(callback));
|
||||
@ -299,6 +326,8 @@ void HttpStreamPool::Group::FlushWithError(
|
||||
|
||||
void HttpStreamPool::Group::Refresh(std::string_view net_log_close_reason_utf8,
|
||||
StreamCloseReason cancel_reason) {
|
||||
// TODO(crbug.com/381742472): Should we do anything for paused
|
||||
// jobs/preconnects?
|
||||
++generation_;
|
||||
CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
|
||||
if (attempt_manager_) {
|
||||
@ -319,12 +348,18 @@ void HttpStreamPool::Group::CloseIdleStreams(
|
||||
}
|
||||
|
||||
void HttpStreamPool::Group::CancelJobs(int error) {
|
||||
// TODO(crbug.com/381742472): Cancel jobs in `paused_jobs_`. Also cancel
|
||||
// paused preconnects when we support paused preconnects.
|
||||
if (attempt_manager_) {
|
||||
attempt_manager_->CancelJobs(error);
|
||||
}
|
||||
}
|
||||
|
||||
void HttpStreamPool::Group::OnRequiredHttp11() {
|
||||
// This method is called from the upper layer to fall back HTTP/1.1 for
|
||||
// on-going jobs/preconnects (not for paused ones). No need to handle
|
||||
// paused jobs/preconnects.
|
||||
// TODO(crbug.com/381742472): Confirm the above is correct.
|
||||
if (attempt_manager_) {
|
||||
attempt_manager_->OnRequiredHttp11();
|
||||
}
|
||||
@ -333,6 +368,9 @@ void HttpStreamPool::Group::OnRequiredHttp11() {
|
||||
void HttpStreamPool::Group::OnAttemptManagerComplete() {
|
||||
CHECK(attempt_manager_);
|
||||
attempt_manager_.reset();
|
||||
|
||||
// TODO(crbug.com/381742472): Handle paused jobs/preconnects if exist.
|
||||
|
||||
MaybeComplete();
|
||||
}
|
||||
|
||||
@ -342,6 +380,8 @@ base::Value::Dict HttpStreamPool::Group::GetInfoAsValue() const {
|
||||
dict.Set("idle_socket_count", static_cast<int>(IdleStreamSocketCount()));
|
||||
dict.Set("handed_out_socket_count",
|
||||
static_cast<int>(HandedOutStreamSocketCount()));
|
||||
dict.Set("paused_job_count", static_cast<int>(PausedJobCount()));
|
||||
dict.Set("attempt_manager_alive", !!attempt_manager_);
|
||||
if (attempt_manager_) {
|
||||
dict.Set("attempt_state", attempt_manager_->GetInfoAsValue());
|
||||
}
|
||||
@ -352,6 +392,13 @@ void HttpStreamPool::Group::CleanupTimedoutIdleStreamSocketsForTesting() {
|
||||
CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, "For testing");
|
||||
}
|
||||
|
||||
bool HttpStreamPool::Group::IsFailing() const {
|
||||
// If we don't have an AttemptManager the group is not considered as failing
|
||||
// because we destroy an AttemptManager after all in-flight attempts are
|
||||
// completed (There are only handed out streams and/or idle streams).
|
||||
return attempt_manager_ && attempt_manager_->is_failing();
|
||||
}
|
||||
|
||||
void HttpStreamPool::Group::CleanupIdleStreamSockets(
|
||||
CleanupMode mode,
|
||||
std::string_view net_log_close_reason_utf8) {
|
||||
@ -384,7 +431,10 @@ void HttpStreamPool::Group::EnsureAttemptManager() {
|
||||
}
|
||||
|
||||
bool HttpStreamPool::Group::CanComplete() const {
|
||||
return ActiveStreamSocketCount() == 0 && !attempt_manager_;
|
||||
// TODO(crbug.com/381742472): Check paused preconnects once we support
|
||||
// paused preconnects.
|
||||
return ActiveStreamSocketCount() == 0 && PausedJobCount() == 0 &&
|
||||
!attempt_manager_;
|
||||
}
|
||||
|
||||
void HttpStreamPool::Group::MaybeComplete() {
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
|
||||
#include "base/containers/unique_ptr_adapters.h"
|
||||
#include "base/memory/raw_ptr.h"
|
||||
#include "base/memory/weak_ptr.h"
|
||||
#include "base/time/time.h"
|
||||
@ -69,6 +70,8 @@ class HttpStreamPool::Group {
|
||||
return pool_->http_network_session();
|
||||
}
|
||||
|
||||
AttemptManager* attempt_manager() const { return attempt_manager_.get(); }
|
||||
|
||||
const NetLogWithSource& net_log() { return net_log_; }
|
||||
|
||||
bool force_quic() const { return force_quic_; }
|
||||
@ -82,6 +85,12 @@ class HttpStreamPool::Group {
|
||||
NextProto expected_protocol,
|
||||
const NetLogWithSource& net_log);
|
||||
|
||||
// Called by `job` to see whether `job` can start.
|
||||
bool CanStartJob(Job* job);
|
||||
|
||||
// Called when `job` is going to be destroyed.
|
||||
void OnJobComplete(Job* job);
|
||||
|
||||
// Creates idle streams or sessions for `num_streams` be opened.
|
||||
// Note that this method finishes synchronously, or `callback` is called, once
|
||||
// `this` has enough streams/sessions for `num_streams` be opened. This means
|
||||
@ -139,8 +148,12 @@ class HttpStreamPool::Group {
|
||||
// Returns the number of active streams.
|
||||
size_t ActiveStreamSocketCount() const;
|
||||
|
||||
// True when the number of active streams reached the group limit.
|
||||
bool ReachedMaxStreamLimit() const;
|
||||
|
||||
// Returns the number of paused jobs. See the comment of `paused_jobs_`.
|
||||
size_t PausedJobCount() const { return paused_jobs_.size(); }
|
||||
|
||||
// Returns the highest pending request priority if the group is stalled due to
|
||||
// the per-pool limit, not the per-group limit.
|
||||
std::optional<RequestPriority> GetPriorityIfStalledByPoolLimit() const;
|
||||
@ -200,6 +213,8 @@ class HttpStreamPool::Group {
|
||||
static base::expected<void, std::string_view> IsIdleStreamSocketUsable(
|
||||
const IdleStreamSocket& idle);
|
||||
|
||||
bool IsFailing() const;
|
||||
|
||||
void CleanupIdleStreamSockets(CleanupMode mode,
|
||||
std::string_view net_log_close_reason_utf8);
|
||||
|
||||
@ -223,6 +238,14 @@ class HttpStreamPool::Group {
|
||||
|
||||
std::unique_ptr<AttemptManager> attempt_manager_;
|
||||
|
||||
// Keeps jobs that are created while the current AttemptManager is failing.
|
||||
// Once the AttemptManager completes notifying the failure to its jobs, we
|
||||
// create a new AttemptManager and pass these jobs to the new AttemptManager.
|
||||
// We call these jobs "paused". Note that there are another type of jobs that
|
||||
// are called "pending". Pending jobs are associated with an AttemptManager
|
||||
// but haven't attempted connections yet.
|
||||
std::set<raw_ptr<Job>> paused_jobs_;
|
||||
|
||||
base::WeakPtrFactory<Group> weak_ptr_factory_{this};
|
||||
};
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include "base/memory/raw_ptr.h"
|
||||
#include "base/task/sequenced_task_runner.h"
|
||||
#include "net/base/load_states.h"
|
||||
#include "net/base/net_error_details.h"
|
||||
#include "net/base/net_errors.h"
|
||||
#include "net/base/net_export.h"
|
||||
@ -46,12 +47,12 @@ NextProtoSet CalculateAllowedAlpns(NextProto expected_protocol,
|
||||
} // namespace
|
||||
|
||||
HttpStreamPool::Job::Job(Delegate* delegate,
|
||||
AttemptManager* attempt_manager,
|
||||
Group* group,
|
||||
quic::ParsedQuicVersion quic_version,
|
||||
NextProto expected_protocol,
|
||||
const NetLogWithSource& net_log)
|
||||
: delegate_(delegate),
|
||||
attempt_manager_(attempt_manager),
|
||||
group_(group),
|
||||
quic_version_(quic_version),
|
||||
allowed_alpns_(CalculateAllowedAlpns(expected_protocol,
|
||||
delegate_->is_http1_allowed())),
|
||||
@ -61,14 +62,20 @@ HttpStreamPool::Job::Job(Delegate* delegate,
|
||||
}
|
||||
|
||||
HttpStreamPool::Job::~Job() {
|
||||
CHECK(attempt_manager_);
|
||||
// `attempt_manager_` may be deleted after this call.
|
||||
attempt_manager_.ExtractAsDangling()->OnJobComplete(this);
|
||||
CHECK(group_);
|
||||
// `group_` may be deleted after this call.
|
||||
group_.ExtractAsDangling()->OnJobComplete(this);
|
||||
}
|
||||
|
||||
void HttpStreamPool::Job::Start() {
|
||||
const url::SchemeHostPort& destination =
|
||||
attempt_manager_->group()->stream_key().destination();
|
||||
CHECK(group_);
|
||||
|
||||
if (!group_->CanStartJob(this)) {
|
||||
return;
|
||||
}
|
||||
|
||||
CHECK(attempt_manager());
|
||||
const url::SchemeHostPort& destination = group_->stream_key().destination();
|
||||
if (!IsPortAllowedForScheme(destination.port(), destination.scheme())) {
|
||||
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
|
||||
FROM_HERE,
|
||||
@ -77,18 +84,21 @@ void HttpStreamPool::Job::Start() {
|
||||
return;
|
||||
}
|
||||
|
||||
attempt_manager_->StartJob(this, priority(), delegate_->allowed_bad_certs(),
|
||||
quic_version_, net_log_);
|
||||
attempt_manager()->StartJob(this, priority(), delegate_->allowed_bad_certs(),
|
||||
quic_version_, net_log_);
|
||||
}
|
||||
|
||||
LoadState HttpStreamPool::Job::GetLoadState() const {
|
||||
CHECK(attempt_manager_);
|
||||
return attempt_manager_->GetLoadState();
|
||||
if (!attempt_manager()) {
|
||||
return LOAD_STATE_IDLE;
|
||||
}
|
||||
return attempt_manager()->GetLoadState();
|
||||
}
|
||||
|
||||
void HttpStreamPool::Job::SetPriority(RequestPriority priority) {
|
||||
CHECK(attempt_manager_);
|
||||
attempt_manager_->SetJobPriority(this, priority);
|
||||
if (attempt_manager()) {
|
||||
attempt_manager()->SetJobPriority(this, priority);
|
||||
}
|
||||
}
|
||||
|
||||
void HttpStreamPool::Job::AddConnectionAttempts(
|
||||
@ -119,10 +129,8 @@ void HttpStreamPool::Job::OnStreamReady(std::unique_ptr<HttpStream> stream,
|
||||
return;
|
||||
}
|
||||
|
||||
attempt_manager_->group()
|
||||
->http_network_session()
|
||||
->proxy_resolution_service()
|
||||
->ReportSuccess(delegate_->proxy_info());
|
||||
group_->http_network_session()->proxy_resolution_service()->ReportSuccess(
|
||||
delegate_->proxy_info());
|
||||
delegate_->OnStreamReady(this, std::move(stream), negotiated_protocol);
|
||||
}
|
||||
|
||||
@ -146,4 +154,9 @@ void HttpStreamPool::Job::OnNeedsClientAuth(SSLCertRequestInfo* cert_info) {
|
||||
delegate_->OnNeedsClientAuth(this, cert_info);
|
||||
}
|
||||
|
||||
HttpStreamPool::AttemptManager* HttpStreamPool::Job::attempt_manager() const {
|
||||
CHECK(group_);
|
||||
return group_->attempt_manager();
|
||||
}
|
||||
|
||||
} // namespace net
|
||||
|
@ -80,7 +80,7 @@ class HttpStreamPool::Job {
|
||||
|
||||
// `delegate` must outlive `this`.
|
||||
Job(Delegate* delegate,
|
||||
AttemptManager* attempt_manager,
|
||||
Group* group,
|
||||
quic::ParsedQuicVersion quic_version,
|
||||
NextProto expected_protocol,
|
||||
const NetLogWithSource& net_log);
|
||||
@ -139,8 +139,10 @@ class HttpStreamPool::Job {
|
||||
}
|
||||
|
||||
private:
|
||||
AttemptManager* attempt_manager() const;
|
||||
|
||||
const raw_ptr<Delegate> delegate_;
|
||||
raw_ptr<AttemptManager> attempt_manager_;
|
||||
raw_ptr<Group> group_;
|
||||
const quic::ParsedQuicVersion quic_version_;
|
||||
const NextProtoSet allowed_alpns_;
|
||||
const NetLogWithSource net_log_;
|
||||
|
@ -360,6 +360,7 @@ int HttpStreamPool::JobController::RestartTunnelWithProxyAuth() {
|
||||
}
|
||||
|
||||
void HttpStreamPool::JobController::SetPriority(RequestPriority priority) {
|
||||
priority_ = priority;
|
||||
if (origin_job_) {
|
||||
origin_job_->SetPriority(priority);
|
||||
}
|
||||
|
@ -148,7 +148,7 @@ class HttpStreamPool::JobController : public HttpStreamPool::Job::Delegate,
|
||||
void MaybeMarkAlternativeServiceBroken();
|
||||
|
||||
const raw_ptr<HttpStreamPool> pool_;
|
||||
const RequestPriority priority_;
|
||||
RequestPriority priority_;
|
||||
const std::vector<SSLConfig::CertAndStatus> allowed_bad_certs_;
|
||||
const bool enable_ip_based_pooling_;
|
||||
const bool enable_alternative_services_;
|
||||
|
Reference in New Issue
Block a user