Reland "[Mojo] Introduce async invitation client transport"
This is a reland of a127c74955
Fix for MSan failure in PS2
Original change's description:
> [Mojo] Introduce async invitation client transport
>
> Normally clients accept invitations synchronously, as they must do
> blocking IO to read out an OS handle for internal sync IPC (e.g.
> brokered shm allocation). This is necessary in some sandboxed
> environments where the sandboxed client cannot create its own sync IPC
> channel to pass to the Mojo broker.
>
> On Android such restrictions don't apply, and it's critical to avoid
> blocking the main thread where we want to synchronously initialize
> Mojo.
>
> This CL introduces a new invitation transport type which supports
> non-blocking invitation acceptance in clients. The only requirement to
> use this is that the client must be allowed to allocate a Mojo
> PlatformChannel (e.g. domain socket, named pipe, etc). Internally the
> client then creates its own internal sync IPC channel and sends that
> asynchronously over the main invitation transport.
>
> This allows Android clients to instantly accept Mojo invitations and
> initialize Mojo IPC without blocking the calling thread.
>
> Bug: 1005432
> Change-Id: Id1dde38f209d57883a28bece050a4c0638bdec9e
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1811759
> Reviewed-by: John Abd-El-Malek <jam@chromium.org>
> Commit-Queue: Ken Rockot <rockot@google.com>
> Cr-Commit-Position: refs/heads/master@{#698155}
Bug: 1005432
Change-Id: If6b356d5a2d69ad7f4900e618bc4fea172e23bc3
TBR: jam@chromium.org
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1815663
Reviewed-by: Ken Rockot <rockot@google.com>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#698385}
This commit is contained in:
@ -19,9 +19,13 @@ namespace core {
|
||||
// to fulfill shared memory allocation requests on some platforms.
|
||||
class Broker {
|
||||
public:
|
||||
// Note: This is blocking, and will wait for the first message over
|
||||
// the endpoint handle in |handle|.
|
||||
explicit Broker(PlatformHandle handle);
|
||||
// Note: If |wait_for_channel_handle| is |true|, this constructor blocks the
|
||||
// calling thread until it reads first message from |handle|, which must
|
||||
// contain another PlatformHandle for a NodeChannel.
|
||||
//
|
||||
// Otherwise, no initialization message is expected and this will not wait for
|
||||
// one.
|
||||
Broker(PlatformHandle handle, bool wait_for_channel_handle);
|
||||
~Broker();
|
||||
|
||||
// Returns the platform handle that should be used to establish a NodeChannel
|
||||
|
@ -66,7 +66,8 @@ Channel::MessagePtr WaitForBrokerMessage(
|
||||
|
||||
} // namespace
|
||||
|
||||
Broker::Broker(PlatformHandle handle) : sync_channel_(std::move(handle)) {
|
||||
Broker::Broker(PlatformHandle handle, bool wait_for_channel_handle)
|
||||
: sync_channel_(std::move(handle)) {
|
||||
CHECK(sync_channel_.is_valid());
|
||||
|
||||
int fd = sync_channel_.GetFD().get();
|
||||
@ -76,6 +77,9 @@ Broker::Broker(PlatformHandle handle) : sync_channel_(std::move(handle)) {
|
||||
flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
|
||||
PCHECK(flags != -1);
|
||||
|
||||
if (!wait_for_channel_handle)
|
||||
return;
|
||||
|
||||
// Wait for the first message, which should contain a handle.
|
||||
std::vector<PlatformHandle> incoming_platform_handles;
|
||||
if (WaitForBrokerMessage(fd, BrokerMessageType::INIT, 1, 0,
|
||||
|
@ -83,8 +83,12 @@ Channel::MessagePtr WaitForBrokerMessage(HANDLE pipe_handle,
|
||||
|
||||
} // namespace
|
||||
|
||||
Broker::Broker(PlatformHandle handle) : sync_channel_(std::move(handle)) {
|
||||
Broker::Broker(PlatformHandle handle, bool wait_for_channel_handle)
|
||||
: sync_channel_(std::move(handle)) {
|
||||
CHECK(sync_channel_.is_valid());
|
||||
if (!wait_for_channel_handle)
|
||||
return;
|
||||
|
||||
Channel::MessagePtr message = WaitForBrokerMessage(
|
||||
sync_channel_.GetHandle().Get(), BrokerMessageType::INIT);
|
||||
|
||||
|
@ -36,7 +36,11 @@ class MOJO_SYSTEM_IMPL_EXPORT ConnectionParams {
|
||||
return std::move(server_endpoint_);
|
||||
}
|
||||
|
||||
void set_is_async(bool is_async) { is_async_ = is_async; }
|
||||
bool is_async() const { return is_async_; }
|
||||
|
||||
private:
|
||||
bool is_async_ = false;
|
||||
PlatformChannelEndpoint endpoint_;
|
||||
PlatformChannelServerEndpoint server_endpoint_;
|
||||
|
||||
|
@ -1309,7 +1309,9 @@ MojoResult Core::SendInvitation(
|
||||
return MOJO_RESULT_INVALID_ARGUMENT;
|
||||
if (transport_endpoint->type != MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL &&
|
||||
transport_endpoint->type !=
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_SERVER) {
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_SERVER &&
|
||||
transport_endpoint->type !=
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC) {
|
||||
return MOJO_RESULT_UNIMPLEMENTED;
|
||||
}
|
||||
|
||||
@ -1373,6 +1375,10 @@ MojoResult Core::SendInvitation(
|
||||
attached_ports[0].second,
|
||||
connection_name);
|
||||
} else {
|
||||
if (transport_endpoint->type ==
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC) {
|
||||
connection_params.set_is_async(true);
|
||||
}
|
||||
GetNodeController()->SendBrokerClientInvitation(
|
||||
target_process, std::move(connection_params), attached_ports,
|
||||
process_error_callback);
|
||||
@ -1398,7 +1404,9 @@ MojoResult Core::AcceptInvitation(
|
||||
return MOJO_RESULT_INVALID_ARGUMENT;
|
||||
if (transport_endpoint->type != MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL &&
|
||||
transport_endpoint->type !=
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_SERVER) {
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_SERVER &&
|
||||
transport_endpoint->type !=
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC) {
|
||||
return MOJO_RESULT_UNIMPLEMENTED;
|
||||
}
|
||||
|
||||
@ -1447,6 +1455,10 @@ MojoResult Core::AcceptInvitation(
|
||||
dispatcher->AttachMessagePipe(kIsolatedInvitationPipeName, local_port);
|
||||
DCHECK_EQ(MOJO_RESULT_OK, result);
|
||||
} else {
|
||||
if (transport_endpoint->type ==
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC) {
|
||||
connection_params.set_is_async(true);
|
||||
}
|
||||
node_controller->AcceptBrokerClientInvitation(std::move(connection_params));
|
||||
}
|
||||
|
||||
|
@ -1340,7 +1340,8 @@ INSTANTIATE_TEST_SUITE_P(
|
||||
,
|
||||
MultiprocessMessagePipeTestWithPeerSupport,
|
||||
testing::Values(test::MojoTestBase::LaunchType::CHILD,
|
||||
test::MojoTestBase::LaunchType::PEER
|
||||
test::MojoTestBase::LaunchType::PEER,
|
||||
test::MojoTestBase::LaunchType::ASYNC
|
||||
#if !defined(OS_FUCHSIA)
|
||||
,
|
||||
test::MojoTestBase::LaunchType::NAMED_CHILD,
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include "base/location.h"
|
||||
#include "base/logging.h"
|
||||
#include "base/memory/ptr_util.h"
|
||||
#include "mojo/core/broker_host.h"
|
||||
#include "mojo/core/channel.h"
|
||||
#include "mojo/core/configuration.h"
|
||||
#include "mojo/core/core.h"
|
||||
@ -41,6 +42,7 @@ enum class MessageType : uint32_t {
|
||||
EVENT_MESSAGE_FROM_RELAY,
|
||||
#endif
|
||||
ACCEPT_PEER,
|
||||
BIND_BROKER_HOST,
|
||||
};
|
||||
|
||||
struct Header {
|
||||
@ -110,6 +112,12 @@ struct IntroductionData {
|
||||
ports::NodeName name;
|
||||
};
|
||||
|
||||
// This message is just a PlatformHandle. The data struct here has only a
|
||||
// padding field to ensure an aligned, non-zero-length payload.
|
||||
struct BindBrokerHostData {
|
||||
uint64_t padding;
|
||||
};
|
||||
|
||||
#if defined(OS_WIN)
|
||||
// This struct is followed by the full payload of a message to be relayed.
|
||||
struct RelayEventMessageData {
|
||||
@ -373,6 +381,21 @@ void NodeChannel::Broadcast(Channel::MessagePtr message) {
|
||||
WriteChannelMessage(std::move(broadcast_message));
|
||||
}
|
||||
|
||||
void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) {
|
||||
#if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
|
||||
DCHECK(broker_host_handle.is_valid());
|
||||
BindBrokerHostData* data;
|
||||
std::vector<PlatformHandle> handles;
|
||||
handles.push_back(std::move(broker_host_handle));
|
||||
Channel::MessagePtr message =
|
||||
CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData),
|
||||
handles.size(), &data);
|
||||
data->padding = 0;
|
||||
message->SetHandles(std::move(handles));
|
||||
WriteChannelMessage(std::move(message));
|
||||
#endif
|
||||
}
|
||||
|
||||
#if defined(OS_WIN)
|
||||
void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
|
||||
Channel::MessagePtr message) {
|
||||
@ -461,6 +484,17 @@ NodeChannel::~NodeChannel() {
|
||||
ShutDown();
|
||||
}
|
||||
|
||||
void NodeChannel::CreateAndBindLocalBrokerHost(
|
||||
PlatformHandle broker_host_handle) {
|
||||
#if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
|
||||
// Self-owned.
|
||||
ConnectionParams connection_params(
|
||||
PlatformChannelEndpoint(std::move(broker_host_handle)));
|
||||
new BrokerHost(remote_process_handle_.get(), std::move(connection_params),
|
||||
process_error_callback_);
|
||||
#endif
|
||||
}
|
||||
|
||||
void NodeChannel::OnChannelMessage(const void* payload,
|
||||
size_t payload_size,
|
||||
std::vector<PlatformHandle> handles) {
|
||||
@ -685,6 +719,13 @@ void NodeChannel::OnChannelMessage(const void* payload,
|
||||
break;
|
||||
}
|
||||
|
||||
case MessageType::BIND_BROKER_HOST:
|
||||
if (handles.size() == 1) {
|
||||
CreateAndBindLocalBrokerHost(std::move(handles[0]));
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
// Ignore unrecognized message types, allowing for future extensibility.
|
||||
return;
|
||||
|
@ -132,6 +132,7 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
|
||||
void Introduce(const ports::NodeName& name, PlatformHandle channel_handle);
|
||||
void SendChannelMessage(Channel::MessagePtr message);
|
||||
void Broadcast(Channel::MessagePtr message);
|
||||
void BindBrokerHost(PlatformHandle broker_host_handle);
|
||||
|
||||
#if defined(OS_WIN)
|
||||
// Relay the message to the specified node via this channel. This is used to
|
||||
@ -162,6 +163,10 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
|
||||
const ProcessErrorCallback& process_error_callback);
|
||||
~NodeChannel() override;
|
||||
|
||||
// Creates a BrokerHost to satisfy a |BindBrokerHost()| request from the other
|
||||
// end of the channel.
|
||||
void CreateAndBindLocalBrokerHost(PlatformHandle broker_host_handle);
|
||||
|
||||
// Channel::Delegate:
|
||||
void OnChannelMessage(const void* payload,
|
||||
size_t payload_size,
|
||||
|
@ -192,31 +192,46 @@ void NodeController::SendBrokerClientInvitation(
|
||||
|
||||
void NodeController::AcceptBrokerClientInvitation(
|
||||
ConnectionParams connection_params) {
|
||||
base::Optional<PlatformHandle> broker_host_handle;
|
||||
DCHECK(!GetConfiguration().is_broker_process);
|
||||
#if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
|
||||
// Use the bootstrap channel for the broker and receive the node's channel
|
||||
// synchronously as the first message from the broker.
|
||||
DCHECK(connection_params.endpoint().is_valid());
|
||||
base::ElapsedTimer timer;
|
||||
broker_ = std::make_unique<Broker>(
|
||||
connection_params.TakeEndpoint().TakePlatformHandle());
|
||||
PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint();
|
||||
if (!connection_params.is_async()) {
|
||||
// Use the bootstrap channel for the broker and receive the node's channel
|
||||
// synchronously as the first message from the broker.
|
||||
DCHECK(connection_params.endpoint().is_valid());
|
||||
base::ElapsedTimer timer;
|
||||
broker_ = std::make_unique<Broker>(
|
||||
connection_params.TakeEndpoint().TakePlatformHandle(),
|
||||
/*wait_for_channel_handle=*/true);
|
||||
PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint();
|
||||
|
||||
if (!endpoint.is_valid()) {
|
||||
// Most likely the inviter's side of the channel has already been closed and
|
||||
// the broker was unable to negotiate a NodeChannel pipe. In this case we
|
||||
// can cancel our connection to our inviter.
|
||||
DVLOG(1) << "Cannot connect to invalid inviter channel.";
|
||||
CancelPendingPortMerges();
|
||||
return;
|
||||
if (!endpoint.is_valid()) {
|
||||
// Most likely the inviter's side of the channel has already been closed
|
||||
// and the broker was unable to negotiate a NodeChannel pipe. In this case
|
||||
// we can cancel our connection to our inviter.
|
||||
DVLOG(1) << "Cannot connect to invalid inviter channel.";
|
||||
CancelPendingPortMerges();
|
||||
return;
|
||||
}
|
||||
connection_params = ConnectionParams(std::move(endpoint));
|
||||
} else {
|
||||
// For async connections, we instead create a new channel for the broker and
|
||||
// send a request for the inviting process to bind to it. This avoids doing
|
||||
// blocking I/O to accept the invitation. Does not work in some sandboxed
|
||||
// environments, where the PlatformChannel constructor will CHECK fail.
|
||||
PlatformChannel channel;
|
||||
broker_ = std::make_unique<Broker>(
|
||||
channel.TakeLocalEndpoint().TakePlatformHandle(),
|
||||
/*wait_for_channel_handle=*/false);
|
||||
broker_host_handle = channel.TakeRemoteEndpoint().TakePlatformHandle();
|
||||
}
|
||||
connection_params = ConnectionParams(std::move(endpoint));
|
||||
#endif
|
||||
|
||||
io_task_runner_->PostTask(
|
||||
FROM_HERE,
|
||||
base::BindOnce(&NodeController::AcceptBrokerClientInvitationOnIOThread,
|
||||
base::Unretained(this), std::move(connection_params)));
|
||||
base::Unretained(this), std::move(connection_params),
|
||||
std::move(broker_host_handle)));
|
||||
}
|
||||
|
||||
void NodeController::ConnectIsolated(ConnectionParams connection_params,
|
||||
@ -331,28 +346,39 @@ void NodeController::SendBrokerClientInvitationOnIOThread(
|
||||
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
|
||||
|
||||
#if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
|
||||
PlatformChannel node_channel;
|
||||
ConnectionParams node_connection_params(node_channel.TakeLocalEndpoint());
|
||||
// BrokerHost owns itself.
|
||||
BrokerHost* broker_host =
|
||||
new BrokerHost(target_process.get(), std::move(connection_params),
|
||||
process_error_callback);
|
||||
bool channel_ok = broker_host->SendChannel(
|
||||
node_channel.TakeRemoteEndpoint().TakePlatformHandle());
|
||||
ConnectionParams node_connection_params;
|
||||
if (!connection_params.is_async()) {
|
||||
// Sync connections usurp the passed endpoint and use it for the sync broker
|
||||
// channel. A new channel is created here for the NodeChannel and sent over
|
||||
// a sync broker message to the client.
|
||||
PlatformChannel node_channel;
|
||||
node_connection_params = ConnectionParams(node_channel.TakeLocalEndpoint());
|
||||
// BrokerHost owns itself.
|
||||
BrokerHost* broker_host =
|
||||
new BrokerHost(target_process.get(), std::move(connection_params),
|
||||
process_error_callback);
|
||||
bool channel_ok = broker_host->SendChannel(
|
||||
node_channel.TakeRemoteEndpoint().TakePlatformHandle());
|
||||
|
||||
#if defined(OS_WIN)
|
||||
if (!channel_ok) {
|
||||
// On Windows the above operation may fail if the channel is crossing a
|
||||
// session boundary. In that case we fall back to a named pipe.
|
||||
NamedPlatformChannel::Options options;
|
||||
NamedPlatformChannel named_channel(options);
|
||||
node_connection_params =
|
||||
ConnectionParams(named_channel.TakeServerEndpoint());
|
||||
broker_host->SendNamedChannel(named_channel.GetServerName());
|
||||
}
|
||||
if (!channel_ok) {
|
||||
// On Windows the above operation may fail if the channel is crossing a
|
||||
// session boundary. In that case we fall back to a named pipe.
|
||||
NamedPlatformChannel::Options options;
|
||||
NamedPlatformChannel named_channel(options);
|
||||
node_connection_params =
|
||||
ConnectionParams(named_channel.TakeServerEndpoint());
|
||||
broker_host->SendNamedChannel(named_channel.GetServerName());
|
||||
}
|
||||
#else
|
||||
CHECK(channel_ok);
|
||||
CHECK(channel_ok);
|
||||
#endif // defined(OS_WIN)
|
||||
} else {
|
||||
// For async connections, the passed endpoint really is the NodeChannel
|
||||
// endpoint. The broker channel will be established asynchronously by a
|
||||
// |BIND_SYNC_BROKER| message from the invited client.
|
||||
node_connection_params = std::move(connection_params);
|
||||
}
|
||||
|
||||
scoped_refptr<NodeChannel> channel =
|
||||
NodeChannel::Create(this, std::move(node_connection_params),
|
||||
@ -380,7 +406,8 @@ void NodeController::SendBrokerClientInvitationOnIOThread(
|
||||
}
|
||||
|
||||
void NodeController::AcceptBrokerClientInvitationOnIOThread(
|
||||
ConnectionParams connection_params) {
|
||||
ConnectionParams connection_params,
|
||||
base::Optional<PlatformHandle> broker_host_handle) {
|
||||
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
|
||||
|
||||
{
|
||||
@ -400,6 +427,8 @@ void NodeController::AcceptBrokerClientInvitationOnIOThread(
|
||||
bootstrap_inviter_channel_->LeakHandleOnShutdown();
|
||||
}
|
||||
bootstrap_inviter_channel_->Start();
|
||||
if (broker_host_handle)
|
||||
bootstrap_inviter_channel_->BindBrokerHost(std::move(*broker_host_handle));
|
||||
}
|
||||
|
||||
void NodeController::ConnectIsolatedOnIOThread(
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "base/macros.h"
|
||||
#include "base/memory/ref_counted.h"
|
||||
#include "base/memory/writable_shared_memory_region.h"
|
||||
#include "base/optional.h"
|
||||
#include "base/task_runner.h"
|
||||
#include "build/build_config.h"
|
||||
#include "mojo/core/atomic_flag.h"
|
||||
@ -156,7 +157,8 @@ class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
|
||||
ports::NodeName token,
|
||||
const ProcessErrorCallback& process_error_callback);
|
||||
void AcceptBrokerClientInvitationOnIOThread(
|
||||
ConnectionParams connection_params);
|
||||
ConnectionParams connection_params,
|
||||
base::Optional<PlatformHandle> broker_host_handle);
|
||||
|
||||
void ConnectIsolatedOnIOThread(ConnectionParams connection_params,
|
||||
ports::PortRef port,
|
||||
|
@ -48,6 +48,7 @@ namespace {
|
||||
const char kNamedPipeName[] = "named-pipe-name";
|
||||
#endif
|
||||
const char kRunAsBrokerClient[] = "run-as-broker-client";
|
||||
const char kAcceptInvitationAsync[] = "accept-invitation-async";
|
||||
const char kTestChildMessagePipeName[] = "test_pipe";
|
||||
|
||||
// For use (and only valid) in a test child process:
|
||||
@ -114,6 +115,7 @@ ScopedMessagePipeHandle MultiprocessTestHelper::StartChildWithExtraSwitch(
|
||||
switch (launch_type) {
|
||||
case LaunchType::CHILD:
|
||||
case LaunchType::PEER:
|
||||
case LaunchType::ASYNC:
|
||||
channel.PrepareToPassRemoteEndpoint(&options, &command_line);
|
||||
break;
|
||||
#if !defined(OS_FUCHSIA)
|
||||
@ -159,6 +161,7 @@ ScopedMessagePipeHandle MultiprocessTestHelper::StartChildWithExtraSwitch(
|
||||
switch (launch_type) {
|
||||
case LaunchType::CHILD:
|
||||
case LaunchType::PEER:
|
||||
case LaunchType::ASYNC:
|
||||
local_channel_endpoint = channel.TakeLocalEndpoint();
|
||||
break;
|
||||
#if !defined(OS_FUCHSIA)
|
||||
@ -176,6 +179,9 @@ ScopedMessagePipeHandle MultiprocessTestHelper::StartChildWithExtraSwitch(
|
||||
OutgoingInvitation child_invitation;
|
||||
ScopedMessagePipeHandle pipe;
|
||||
switch (launch_type) {
|
||||
case LaunchType::ASYNC:
|
||||
command_line.AppendSwitch(kAcceptInvitationAsync);
|
||||
FALLTHROUGH;
|
||||
case LaunchType::CHILD:
|
||||
#if !defined(OS_FUCHSIA)
|
||||
case LaunchType::NAMED_CHILD:
|
||||
@ -204,14 +210,21 @@ ScopedMessagePipeHandle MultiprocessTestHelper::StartChildWithExtraSwitch(
|
||||
test_child_ =
|
||||
base::SpawnMultiProcessTestChild(test_child_main, command_line, options);
|
||||
|
||||
if (launch_type == LaunchType::CHILD || launch_type == LaunchType::PEER)
|
||||
if (launch_type == LaunchType::CHILD || launch_type == LaunchType::PEER ||
|
||||
launch_type == LaunchType::ASYNC) {
|
||||
channel.RemoteProcessLaunchAttempted();
|
||||
}
|
||||
|
||||
if (launch_type == LaunchType::CHILD) {
|
||||
DCHECK(local_channel_endpoint.is_valid());
|
||||
OutgoingInvitation::Send(std::move(child_invitation), test_child_.Handle(),
|
||||
std::move(local_channel_endpoint),
|
||||
ProcessErrorCallback());
|
||||
} else if (launch_type == LaunchType::ASYNC) {
|
||||
DCHECK(local_channel_endpoint.is_valid());
|
||||
OutgoingInvitation::SendAsync(
|
||||
std::move(child_invitation), test_child_.Handle(),
|
||||
std::move(local_channel_endpoint), ProcessErrorCallback());
|
||||
}
|
||||
#if !defined(OS_FUCHSIA)
|
||||
else if (launch_type == LaunchType::NAMED_CHILD) {
|
||||
@ -246,7 +259,8 @@ void MultiprocessTestHelper::ChildSetup() {
|
||||
|
||||
auto& command_line = *base::CommandLine::ForCurrentProcess();
|
||||
|
||||
bool run_as_broker_client = command_line.HasSwitch(kRunAsBrokerClient);
|
||||
const bool run_as_broker_client = command_line.HasSwitch(kRunAsBrokerClient);
|
||||
const bool async = command_line.HasSwitch(kAcceptInvitationAsync);
|
||||
|
||||
PlatformChannelEndpoint endpoint;
|
||||
#if !defined(OS_FUCHSIA)
|
||||
@ -262,8 +276,11 @@ void MultiprocessTestHelper::ChildSetup() {
|
||||
}
|
||||
|
||||
if (run_as_broker_client) {
|
||||
IncomingInvitation invitation =
|
||||
IncomingInvitation::Accept(std::move(endpoint));
|
||||
IncomingInvitation invitation;
|
||||
if (async)
|
||||
invitation = IncomingInvitation::AcceptAsync(std::move(endpoint));
|
||||
else
|
||||
invitation = IncomingInvitation::Accept(std::move(endpoint));
|
||||
primordial_pipe = invitation.ExtractMessagePipe(kTestChildMessagePipeName);
|
||||
} else {
|
||||
primordial_pipe =
|
||||
|
@ -34,6 +34,9 @@ class MultiprocessTestHelper {
|
||||
// Launch the child process as an unrelated peer process in the mojo system.
|
||||
PEER,
|
||||
|
||||
// Same as CHILD but uses the newer async channel handshake.
|
||||
ASYNC,
|
||||
|
||||
#if !defined(OS_FUCHSIA)
|
||||
// Launch the child process as a child in the mojo system, using a named
|
||||
// pipe.
|
||||
|
@ -88,6 +88,20 @@ typedef uint32_t MojoInvitationTransportType;
|
||||
#define MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_SERVER \
|
||||
((MojoInvitationTransportType)1)
|
||||
|
||||
// Similar to CHANNEL transport. Normally with a CHANNEL transport, the inviting
|
||||
// client sends a secondary sync channel to the invited client, and the invited
|
||||
// client synchronously waits for this before it can accept the invitation.
|
||||
//
|
||||
// With this transport type, the invited client creates its own sync channel and
|
||||
// sends the remote endpoint to the inviting client to be passed along to the
|
||||
// broker. This allows acceptance of incoming invitations to avoid blocking
|
||||
// operations, making both sides of the channel initialization process fully
|
||||
// asynchronous.
|
||||
//
|
||||
// Not supported in all platform sandbox configurations.
|
||||
#define MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC \
|
||||
((MojoInvitationTransportType)2)
|
||||
|
||||
// A transport endpoint over which an invitation may be sent or received via
|
||||
// |MojoSendInvitation()| or |MojoAcceptInvitation()| respectively.
|
||||
struct MOJO_ALIGNAS(8) MojoInvitationTransportEndpoint {
|
||||
|
@ -170,6 +170,17 @@ void OutgoingInvitation::Send(OutgoingInvitation invitation,
|
||||
MOJO_SEND_INVITATION_FLAG_NONE, error_callback, "");
|
||||
}
|
||||
|
||||
// static
|
||||
void OutgoingInvitation::SendAsync(OutgoingInvitation invitation,
|
||||
base::ProcessHandle target_process,
|
||||
PlatformChannelEndpoint channel_endpoint,
|
||||
const ProcessErrorCallback& error_callback) {
|
||||
SendInvitation(std::move(invitation.handle_), target_process,
|
||||
channel_endpoint.TakePlatformHandle(),
|
||||
MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC,
|
||||
MOJO_SEND_INVITATION_FLAG_NONE, error_callback, "");
|
||||
}
|
||||
|
||||
// static
|
||||
ScopedMessagePipeHandle OutgoingInvitation::SendIsolated(
|
||||
PlatformChannelEndpoint channel_endpoint,
|
||||
@ -236,6 +247,30 @@ IncomingInvitation IncomingInvitation::Accept(
|
||||
ScopedInvitationHandle(InvitationHandle(invitation_handle)));
|
||||
}
|
||||
|
||||
// static
|
||||
IncomingInvitation IncomingInvitation::AcceptAsync(
|
||||
PlatformChannelEndpoint channel_endpoint) {
|
||||
MojoPlatformHandle endpoint_handle;
|
||||
PlatformHandle::ToMojoPlatformHandle(channel_endpoint.TakePlatformHandle(),
|
||||
&endpoint_handle);
|
||||
CHECK_NE(endpoint_handle.type, MOJO_PLATFORM_HANDLE_TYPE_INVALID);
|
||||
|
||||
MojoInvitationTransportEndpoint transport_endpoint;
|
||||
transport_endpoint.struct_size = sizeof(transport_endpoint);
|
||||
transport_endpoint.type = MOJO_INVITATION_TRANSPORT_TYPE_CHANNEL_ASYNC;
|
||||
transport_endpoint.num_platform_handles = 1;
|
||||
transport_endpoint.platform_handles = &endpoint_handle;
|
||||
|
||||
MojoHandle invitation_handle;
|
||||
MojoResult result =
|
||||
MojoAcceptInvitation(&transport_endpoint, nullptr, &invitation_handle);
|
||||
if (result != MOJO_RESULT_OK)
|
||||
return IncomingInvitation();
|
||||
|
||||
return IncomingInvitation(
|
||||
ScopedInvitationHandle(InvitationHandle(invitation_handle)));
|
||||
}
|
||||
|
||||
// static
|
||||
ScopedMessagePipeHandle IncomingInvitation::AcceptIsolated(
|
||||
PlatformChannelEndpoint channel_endpoint) {
|
||||
|
@ -99,6 +99,13 @@ class MOJO_CPP_SYSTEM_EXPORT OutgoingInvitation {
|
||||
PlatformChannelServerEndpoint server_endpoint,
|
||||
const ProcessErrorCallback& error_callback = {});
|
||||
|
||||
// Similar to |Send()|, but targets a process which will accept the invitation
|
||||
// with |IncomingInvitation::AcceptAsync()| instead of |Accept()|.
|
||||
static void SendAsync(OutgoingInvitation invitation,
|
||||
base::ProcessHandle target_process,
|
||||
PlatformChannelEndpoint channel_endpoint,
|
||||
const ProcessErrorCallback& error_callback = {});
|
||||
|
||||
// Sends an isolated invitation over |endpoint|. The process at the other
|
||||
// endpoint must use |IncomingInvitation::AcceptIsolated()| to accept the
|
||||
// invitation.
|
||||
@ -160,8 +167,16 @@ class MOJO_CPP_SYSTEM_EXPORT IncomingInvitation {
|
||||
// the other end of that channel. If the invitation was sent using a
|
||||
// |PlatformChannelServerEndpoint|, then |channel_endpoint| should be created
|
||||
// by |NamedPlatformChannel::ConnectToServer|.
|
||||
//
|
||||
// Note that this performs blocking I/O on the calling thread.
|
||||
static IncomingInvitation Accept(PlatformChannelEndpoint channel_endpoint);
|
||||
|
||||
// Like above, but does not perform any blocking I/O. Not all platforms and
|
||||
// sandbox configurations are compatible with this API. In such cases, the
|
||||
// synchronous |Accept()| above should be used.
|
||||
static IncomingInvitation AcceptAsync(
|
||||
PlatformChannelEndpoint channel_endpoint);
|
||||
|
||||
// Accepts an incoming isolated invitation from |channel_endpoint|. See
|
||||
// notes on |OutgoingInvitation::SendIsolated()|.
|
||||
static ScopedMessagePipeHandle AcceptIsolated(
|
||||
|
Reference in New Issue
Block a user