0

Revert "Allow earlier association of Channel interfaces"

This reverts commit 509586f352.

Reason for revert: was not sufficient to fix the issue and breaks the ProcessHostOnUI experiment

Original change's description:
> Allow earlier association of Channel interfaces
>
> Previously IPC Channels were always created on the IO thread, and they
> carried an implicit requirement that any Channel-associated interfaces
> could not be associated until after the IO thread initialization was
> done. This is incompatible with the effort to move process hosts to the
> UI thread.
>
> This loosens some unnecessary constraints on how all the ChannelMojo
> internals are set up, and enables off-thread construction and (limited)
> early use of the ChannelMojo prior to IO thread initialization.
>
> Bug: 904556
> Change-Id: I67a72d2b191b20d3757768bb38a5a77f29b3bea2
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2910960
> Commit-Queue: Ken Rockot <rockot@google.com>
> Reviewed-by: Zhenyao Mo <zmo@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#885330}

Bug: 904556
Change-Id: I57ba20065b860d8a524ec43db15b0fe4bb130f98
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2911700
Auto-Submit: Ken Rockot <rockot@google.com>
Bot-Commit: Rubber Stamper <rubber-stamper@appspot.gserviceaccount.com>
Reviewed-by: John Abd-El-Malek <jam@chromium.org>
Commit-Queue: John Abd-El-Malek <jam@chromium.org>
Owners-Override: John Abd-El-Malek <jam@chromium.org>
Cr-Commit-Position: refs/heads/master@{#885359}
This commit is contained in:
Ken Rockot
2021-05-21 02:40:54 +00:00
committed by Chromium LUCI CQ
parent fb5333632b
commit 676e37c4e1
10 changed files with 38 additions and 113 deletions

@ -52,9 +52,16 @@ GpuChannelHost::GpuChannelHost(
static_cast<int32_t>(
GpuChannelReservedRoutes::kImageDecodeAccelerator)) {
mojo::PendingAssociatedRemote<mojom::GpuChannel> channel;
listener_->Initialize(std::move(handle),
channel.InitWithNewEndpointAndPassReceiver(),
io_thread_);
auto receiver = channel.InitWithNewEndpointAndPassReceiver();
if (io_thread_->BelongsToCurrentThread()) {
listener_->Initialize(std::move(handle), std::move(receiver), io_thread_);
} else {
io_thread_->PostTask(
FROM_HERE,
base::BindOnce(&Listener::Initialize, base::Unretained(listener_.get()),
std::move(handle), std::move(receiver), io_thread_));
}
gpu_channel_ = mojo::SharedAssociatedRemote<mojom::GpuChannel>(
std::move(channel), io_thread_);
@ -290,7 +297,8 @@ void GpuChannelHost::Listener::Initialize(
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner) {
channel_ = IPC::ChannelMojo::Create(
std::move(handle), IPC::Channel::MODE_CLIENT, this, io_task_runner,
io_task_runner, mojo::internal::MessageQuotaChecker::MaybeCreate());
base::ThreadTaskRunnerHandle::Get(),
mojo::internal::MessageQuotaChecker::MaybeCreate());
DCHECK(channel_);
bool result = channel_->Connect();
DCHECK(result);

@ -166,7 +166,7 @@ class GPU_EXPORT GpuChannelHost
Listener();
~Listener() override;
// Called on the GpuChannelHost's thread.
// Called on the IO thread.
void Initialize(mojo::ScopedMessagePipeHandle handle,
mojo::PendingAssociatedReceiver<mojom::GpuChannel> receiver,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner);

@ -186,31 +186,21 @@ ChannelMojo::~ChannelMojo() {
}
bool ChannelMojo::Connect() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
WillConnect();
mojo::PendingAssociatedRemote<mojom::Channel> sender;
mojo::AssociatedRemote<mojom::Channel> sender;
mojo::PendingAssociatedReceiver<mojom::Channel> receiver;
bootstrap_->Connect(&sender, &receiver);
DCHECK(!message_reader_);
sender->SetPeerPid(GetSelfPID());
message_reader_ = std::make_unique<internal::MessagePipeReader>(
pipe_, std::move(sender), std::move(receiver), task_runner_, this);
if (task_runner_->RunsTasksInCurrentSequence()) {
FinishConnectOnIOThread();
} else {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ChannelMojo::FinishConnectOnIOThread, weak_ptr_));
}
pipe_, std::move(sender), std::move(receiver), this);
return true;
}
void ChannelMojo::FinishConnectOnIOThread() {
DCHECK(message_reader_);
message_reader_->FinishInitializationOnIOThread(GetSelfPID());
}
void ChannelMojo::Pause() {
bootstrap_->Pause();
}
@ -385,12 +375,6 @@ void ChannelMojo::GetGenericRemoteAssociatedInterface(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) {
if (message_reader_) {
if (!task_runner_->RunsTasksInCurrentSequence()) {
message_reader_->thread_safe_sender().GetAssociatedInterface(
name, mojo::PendingAssociatedReceiver<mojom::GenericInterface>(
std::move(handle)));
return;
}
message_reader_->GetRemoteInterface(name, std::move(handle));
} else {
// Attach the associated interface to a disconnected pipe, so that the

@ -116,8 +116,6 @@ class COMPONENT_EXPORT(IPC) ChannelMojo
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) override;
void FinishConnectOnIOThread();
base::WeakPtr<ChannelMojo> weak_ptr_;
// A TaskRunner which runs tasks on the ChannelMojo's owning thread.

@ -19,83 +19,29 @@
#include "base/trace_event/trace_event.h"
#include "ipc/ipc_channel_mojo.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/thread_safe_proxy.h"
namespace IPC {
namespace internal {
namespace {
class ThreadSafeProxy : public mojo::ThreadSafeProxy {
public:
using Forwarder = base::RepeatingCallback<void(mojo::Message)>;
ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,
Forwarder forwarder,
mojo::AssociatedGroupController& group_controller)
: task_runner_(std::move(task_runner)),
forwarder_(std::move(forwarder)),
group_controller_(group_controller) {}
// mojo::ThreadSafeProxy:
void SendMessage(mojo::Message& message) override {
message.SerializeHandles(&group_controller_);
task_runner_->PostTask(FROM_HERE,
base::BindOnce(forwarder_, std::move(message)));
}
void SendMessageWithResponder(
mojo::Message& message,
std::unique_ptr<mojo::MessageReceiver> responder) override {
// We don't bother supporting this because it's not used in practice.
NOTREACHED();
}
private:
~ThreadSafeProxy() override = default;
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
const Forwarder forwarder_;
mojo::AssociatedGroupController& group_controller_;
};
} // namespace
MessagePipeReader::MessagePipeReader(
mojo::MessagePipeHandle pipe,
mojo::PendingAssociatedRemote<mojom::Channel> sender,
mojo::AssociatedRemote<mojom::Channel> sender,
mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
scoped_refptr<base::SequencedTaskRunner> task_runner,
MessagePipeReader::Delegate* delegate)
: delegate_(delegate),
sender_(std::move(sender), task_runner),
receiver_(this, std::move(receiver), task_runner) {
thread_safe_sender_ =
std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
base::MakeRefCounted<ThreadSafeProxy>(
task_runner,
base::BindRepeating(&MessagePipeReader::ForwardMessage,
weak_ptr_factory_.GetWeakPtr()),
*sender_.internal_state()->associated_group()->GetController()));
thread_checker_.DetachFromThread();
}
MessagePipeReader::~MessagePipeReader() {
DCHECK(thread_checker_.CalledOnValidThread());
// The pipe should be closed before deletion.
}
void MessagePipeReader::FinishInitializationOnIOThread(
base::ProcessId self_pid) {
sender_(std::move(sender)),
receiver_(this, std::move(receiver)) {
sender_.set_disconnect_handler(
base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
MOJO_RESULT_FAILED_PRECONDITION));
receiver_.set_disconnect_handler(
base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
MOJO_RESULT_FAILED_PRECONDITION));
}
sender_->SetPeerPid(self_pid);
MessagePipeReader::~MessagePipeReader() {
DCHECK(thread_checker_.CalledOnValidThread());
// The pipe should be closed before deletion.
}
void MessagePipeReader::Close() {
@ -182,9 +128,5 @@ void MessagePipeReader::OnPipeError(MojoResult error) {
delegate_->OnPipeError();
}
void MessagePipeReader::ForwardMessage(mojo::Message message) {
sender_.internal_state()->ForwardMessage(std::move(message));
}
} // namespace internal
} // namespace IPC

@ -14,7 +14,6 @@
#include "base/compiler_specific.h"
#include "base/component_export.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/process/process_handle.h"
#include "base/threading/thread_checker.h"
#include "ipc/ipc.mojom.h"
@ -23,7 +22,6 @@
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/pending_associated_receiver.h"
#include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
#include "mojo/public/cpp/bindings/shared_remote.h"
#include "mojo/public/cpp/system/core.h"
#include "mojo/public/cpp/system/message_pipe.h"
@ -70,14 +68,11 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel {
//
// Note that MessagePipeReader doesn't delete |delegate|.
MessagePipeReader(mojo::MessagePipeHandle pipe,
mojo::PendingAssociatedRemote<mojom::Channel> sender,
mojo::AssociatedRemote<mojom::Channel> sender,
mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
scoped_refptr<base::SequencedTaskRunner> task_runner,
Delegate* delegate);
~MessagePipeReader() override;
void FinishInitializationOnIOThread(base::ProcessId self_pid);
// Close and destroy the MessagePipe.
void Close();
@ -93,7 +88,6 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel {
mojo::ScopedInterfaceEndpointHandle handle);
mojo::AssociatedRemote<mojom::Channel>& sender() { return sender_; }
mojom::Channel& thread_safe_sender() { return thread_safe_sender_->proxy(); }
protected:
void OnPipeClosed();
@ -108,16 +102,11 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel {
mojo::PendingAssociatedReceiver<mojom::GenericInterface> receiver)
override;
void ForwardMessage(mojo::Message message);
// |delegate_| is null once the message pipe is closed.
Delegate* delegate_;
mojo::AssociatedRemote<mojom::Channel> sender_;
std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
thread_safe_sender_;
mojo::AssociatedReceiver<mojom::Channel> receiver_;
base::ThreadChecker thread_checker_;
base::WeakPtrFactory<MessagePipeReader> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
};

@ -164,6 +164,9 @@ class ChannelAssociatedGroupController
}
void Bind(mojo::ScopedMessagePipeHandle handle) {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(task_runner_->BelongsToCurrentThread());
connector_ = std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, task_runner_,
"IPC Channel");
@ -208,7 +211,7 @@ class ChannelAssociatedGroupController
}
void CreateChannelEndpoints(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::AssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
mojo::InterfaceId sender_id, receiver_id;
if (set_interface_id_namespace_bit_) {
@ -234,8 +237,8 @@ class ChannelAssociatedGroupController
mojo::ScopedInterfaceEndpointHandle receiver_handle =
CreateScopedInterfaceEndpointHandle(receiver_id);
*sender = mojo::PendingAssociatedRemote<mojom::Channel>(
std::move(sender_handle), 0);
sender->Bind(mojo::PendingAssociatedRemote<mojom::Channel>(
std::move(sender_handle), 0));
*receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
std::move(receiver_handle));
}
@ -1101,7 +1104,7 @@ class MojoBootstrapImpl : public MojoBootstrap {
private:
void Connect(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::AssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
controller_->Bind(std::move(handle_));
controller_->CreateChannelEndpoints(sender, receiver);

@ -48,7 +48,7 @@ class COMPONENT_EXPORT(IPC) MojoBootstrap {
// Start the handshake over the underlying message pipe.
virtual void Connect(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::AssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) = 0;
// Stop transmitting messages and start queueing them instead.

@ -26,9 +26,7 @@ class Connection {
explicit Connection(std::unique_ptr<IPC::MojoBootstrap> bootstrap,
int32_t sender_id)
: bootstrap_(std::move(bootstrap)) {
mojo::PendingAssociatedRemote<IPC::mojom::Channel> sender;
bootstrap_->Connect(&sender, &receiver_);
sender_.Bind(std::move(sender));
bootstrap_->Connect(&sender_, &receiver_);
sender_->SetPeerPid(sender_id);
}

@ -98,6 +98,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// Connector will read messages from the pipe regardless of whether or not an
// incoming receiver has been set.
void set_incoming_receiver(MessageReceiver* receiver) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
incoming_receiver_ = receiver;
}
@ -105,6 +106,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// state, where no more messages will be processed. This method is used
// during testing to prevent that from happening.
void set_enforce_errors_from_incoming_receiver(bool enforce) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
enforce_errors_from_incoming_receiver_ = enforce;
}
@ -118,6 +120,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// Sets the error handler to receive notifications when an error is
// encountered while reading from the pipe or waiting to read from the pipe.
void set_connection_error_handler(base::OnceClosure error_handler) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connection_error_handler_ = std::move(error_handler);
}