0

DOM: Make Subscriber weak ref, and not own Observable

This CL puts the ref-counted producer implementation of Observables in
sync with the latest spec changes, as per discussion in
https://github.com/WICG/observable/pull/197#discussion_r1964646130.

Essentially, the Observable should not own its most recent active
Subscriber, and a Subscriber kept alive by JavaScript should not keep
its associated Observable alive if it is no longer referenced by script.

This CL ensures these garbage collection semantics are tested too.

R=masonf

Bug: 40282760
Change-Id: I6579f5a5c95557f686d078c0aef7094b8e216066
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/6287812
Commit-Queue: Dominic Farolino <dom@chromium.org>
Reviewed-by: Mason Freed <masonf@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1423285}
This commit is contained in:
Dominic Farolino
2025-02-21 11:18:29 -08:00
committed by Chromium LUCI CQ
parent 92a7e5a611
commit e35d6616b7
6 changed files with 74 additions and 41 deletions

@ -2608,15 +2608,15 @@ void Observable::SubscribeInternal(
}
CHECK(observer);
if (active_subscriber_) {
active_subscriber_->RegisterNewObserver(script_state, observer, options);
if (weak_subscriber_ && weak_subscriber_->active()) {
weak_subscriber_->RegisterNewObserver(script_state, observer, options);
return;
}
// Construct `active_subscriber_` for the first subscription. This will take
// Construct `weak_subscriber_` for the first subscription. This will take
// care of registering `observer` as the first observer.
active_subscriber_ = MakeGarbageCollected<Subscriber>(
PassKey(), this, script_state, observer, options);
weak_subscriber_ = MakeGarbageCollected<Subscriber>(PassKey(), script_state,
observer, options);
// Exactly one of `subscribe_callback_` or `subscribe_delegate_` is non-null.
// Use whichever is provided.
@ -2624,7 +2624,7 @@ void Observable::SubscribeInternal(
<< "Exactly one of subscribe_callback_ or subscribe_delegate_ should be "
"non-null";
if (subscribe_delegate_) {
subscribe_delegate_->OnSubscribe(active_subscriber_, script_state);
subscribe_delegate_->OnSubscribe(weak_subscriber_, script_state);
return;
}
@ -2646,19 +2646,23 @@ void Observable::SubscribeInternal(
ScriptState::Scope scope(script_state);
v8::TryCatch try_catch(script_state->GetIsolate());
std::ignore = subscribe_callback_->Invoke(nullptr, active_subscriber_);
std::ignore = subscribe_callback_->Invoke(nullptr, weak_subscriber_);
if (try_catch.HasCaught()) {
// If the above `subscribe_callback_` closes the subscription,
// `active_subscriber_` will be cleared to null. In the case where closing
// the subscription also throws an error (i.e., an exception-throwing
// `complete()` handler), then `try_catch.HasCaught()` will be true even
// though `active_subscriber_` is null; so we must report the exception to
// the global instead.
if (active_subscriber_) {
active_subscriber_->error(
// There are two cases where we might have a JS exception on the stack here:
// 1. The `subscribe_callback_` immediately started pushing values to the
// observer, and somewhere along the way an exception was thrown. In
// this case, `weak_subscriber_` is non-null, and still active. Report
// the exception to it.
if (weak_subscriber_->active()) {
weak_subscriber_->error(
script_state,
ScriptValue(script_state->GetIsolate(), try_catch.Exception()));
} else {
// 2. The `subscriber_callback_` immediately closed the subscription, and
// during this, an error was thrown (an exception-throwing `complete()`
// handler for example). In that case, `weak_subscriber_` is non-null
// but inactive. Report the exception to the global instead of the
// subscriber.
if (!script_state->ContextIsValid()) {
CHECK(!GetExecutionContext());
return;
@ -3228,7 +3232,7 @@ ScriptPromise<IDLAny> Observable::ReduceInternal(
void Observable::Trace(Visitor* visitor) const {
visitor->Trace(subscribe_callback_);
visitor->Trace(subscribe_delegate_);
visitor->Trace(active_subscriber_);
visitor->Trace(weak_subscriber_);
ScriptWrappable::Trace(visitor);
ExecutionContextClient::Trace(visitor);

@ -111,10 +111,6 @@ class CORE_EXPORT Observable final : public ScriptWrappable,
void Trace(Visitor*) const override;
void ClearSubscriber(base::PassKey<Subscriber>) {
active_subscriber_ = nullptr;
}
// The `subscribe()` API is used when web content subscribes to an Observable
// with a `V8UnionObserverOrObserverCallback`, whereas this API is used when
// native code subscribes to an `Observable` with a native internal observer.
@ -150,11 +146,12 @@ class CORE_EXPORT Observable final : public ScriptWrappable,
const Member<V8SubscribeCallback> subscribe_callback_;
const Member<SubscribeDelegate> subscribe_delegate_;
// The active subscriber associated with `this`. It is set in
// The most recent `Subscriber` associated with `this`. It is set in
// `SubscribeInternal`, and used to register all subsequent subscriptions
// until it becomes inactive. Once inactive, `this` clears this pointer until
// the next invocation of `SubscribeInternal()`.
Member<Subscriber> active_subscriber_;
// until it becomes inactive or garbage collected. Once inactive or garbage
// collected, `this` no longer has an "active" subscription, and this member
// will be set anew in subsequent invocations of `SubscribeInternal()`.
WeakMember<Subscriber> weak_subscriber_;
};
} // namespace blink

@ -65,12 +65,10 @@ class Subscriber::ConsumerAbortSubscriptionAlgorithm final
};
Subscriber::Subscriber(base::PassKey<Observable>,
Observable* owning_observable,
ScriptState* script_state,
ObservableInternalObserver* internal_observer,
SubscribeOptions* options)
: ExecutionContextClient(ExecutionContext::From(script_state)),
owning_observable_(owning_observable),
subscription_controller_(AbortController::Create(script_state)) {
internal_observers_.push_back(internal_observer);
@ -220,8 +218,6 @@ void Subscriber::CloseSubscription(ScriptState* script_state,
// any more values to downstream `Observer`-provided callbacks.
active_ = false;
owning_observable_->ClearSubscriber(PassKey());
// 2. Abort `subscription_controller_`. This actually does two things:
// (a) Immediately aborts any "upstream" subscriptions, i.e., any
// observables that the observable associated with `this` had
@ -276,7 +272,6 @@ void Subscriber::Trace(Visitor* visitor) const {
visitor->Trace(subscription_controller_);
visitor->Trace(consumer_abort_algorithms_);
visitor->Trace(teardown_callbacks_);
visitor->Trace(owning_observable_);
visitor->Trace(internal_observers_);
ScriptWrappable::Trace(visitor);

@ -29,7 +29,6 @@ class CORE_EXPORT Subscriber final : public ScriptWrappable,
public:
Subscriber(base::PassKey<Observable>,
Observable*,
ScriptState*,
ObservableInternalObserver*,
SubscribeOptions*);
@ -78,13 +77,6 @@ class CORE_EXPORT Subscriber final : public ScriptWrappable,
void CloseSubscription(ScriptState* script_state,
std::optional<ScriptValue> abort_reason);
// The `Observable` that owns `this`. We need this back reference so that when
// the subscription to `this` ends (i.e., when `active_` becomes false),
// `this` can tell `owning_observable_` to clear its reference to `this`. That
// way future subscriptions to `owning_observable_` spin up a new `Subscriber`
// altogether.
Member<Observable> owning_observable_;
// The list of `ObservableInternalObserver`s which encapsulate algorithms to
// call when `this` produces values or actions that need to be pushed to the
// subscriber handlers.

@ -0,0 +1,31 @@
<!DOCTYPE html>
<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>
<body>
<script>
promise_test(async t => {
let strong_subscriber = null;
let weak_observable = null;
const results = [];
{
// Populate the above variables.
weak_observable = new WeakRef(new Observable(subscriber => {
strong_subscriber = subscriber;
}))
weak_observable.deref().subscribe(v => results.push(v));
}
assert_true(strong_subscriber instanceof Subscriber);
assert_true(weak_observable.deref() instanceof Observable);
// Trigger garbage collection, and verify that `weak_subscriber` has been
// garbage collected.
await gc({type: 'major', execution: 'async'});
assert_equals(weak_observable.deref(), undefined);
strong_subscriber.next(1);
assert_array_equals(results, [1]);
}, "Subscriber can outlive Observable, as long as JavaScript is keeping " +
"the Subscriber alive");
</script>
</body>

@ -7,21 +7,35 @@
// lifetime defies usual garbage collection semantics for weakly-referenced
// objects.
promise_test(async t => {
let strong_observable = null;
let weak_subscriber = null;
const controller = new AbortController();
const results = [];
{
// Create a new Observable, subscribe to it, and throw it out.
new Observable(subscriber => {
strong_observable = new Observable(subscriber => {
results.push('new subscription');
weak_subscriber = new WeakRef(subscriber);
}).subscribe({signal: controller.signal});
});
// Subscribe to start the subscription and make `weak_subscriber` "active",
// but we don't need any handlers.
strong_observable.subscribe();
}
assert_true(strong_observable instanceof Observable);
assert_true(weak_subscriber.deref() instanceof Subscriber);
// Trigger garbage collection, and verify that `weak_subscriber` has been
// garbage collected.
// garbage collected despite the fact that it was "active".
await gc({type: 'major', execution: 'async'});
assert_equals(weak_subscriber.deref(), undefined);
// `strong_observable` detects that its associated "weak subscriber" [1] has
// been garbage collected and is null, so that subsequent subscriptions can
// start anew.
// [1]: https://wicg.github.io/observable/#observable-weak-subscriber.
strong_observable.subscribe();
assert_true(weak_subscriber.deref() instanceof Subscriber);
assert_array_equals(results, ['new subscription', 'new subscription']);
}, "Subscriber is not arbitrarily kept alive until the subscription ends, " +
"but can be garbage collected if it is weakly owned throughout the " +
"subscription");