0

[ios blink] Add a NSRunLoop IO message loop with libdispatch events

Create a NSRunLoop based IO thread that uses the DispatchSource
objects to monitor mach ports and file descriptors.

Since kqueue and CFFileDescriptorRunLoopSource are blocked in the
BrowserEngineKit extension sandboxes the only option to watch for
IO events is to use libdispatch. Chromium is designed
around specific threads and thread local storage so using libdispatch
generally for task scheduling is not possible. To mitigate this we
implement an IO thread that will use DispatchSource objects which
get notified in a dispatch block (run on an arbitrary thread chosen by libdispatch) which then posts messages to the IO thread that the event occurred. This incurs a performance overhead of an additional thread
hop for every IO event. We have asked Apple to support kqueue natively
so we hope this workaround will not be long lasted.

Bug: 40254930
Change-Id: I381ee66b9c1a831c5ed6c0143e9ee9b35c76fbcc
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/6249458
Commit-Queue: Dave Tapuska <dtapuska@chromium.org>
Reviewed-by: Mark Mentovai <mark@chromium.org>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1419575}
This commit is contained in:
Dave Tapuska
2025-02-12 15:02:32 -08:00
committed by Chromium LUCI CQ
parent 477822e025
commit 7ef83f9fc7
6 changed files with 750 additions and 1 deletions

@ -2215,6 +2215,11 @@ component("base") {
"message_loop/message_pump_io_ios.cc",
"message_loop/message_pump_io_ios.h",
]
} else if (use_blink) {
sources += [
"message_loop/message_pump_io_ios_libdispatch.cc",
"message_loop/message_pump_io_ios_libdispatch.h",
]
} else {
sources += [
"message_loop/message_pump_kqueue.cc",
@ -3988,6 +3993,8 @@ test("base_unittests") {
}
if (is_cronet_build) {
sources += [ "message_loop/message_pump_io_ios_unittest.cc" ]
} else if (use_blink) {
sources += [ "message_loop/message_pump_io_ios_libdispatch_unittest.cc" ]
} else {
sources += [ "message_loop/message_pump_kqueue_unittest.cc" ]
}

@ -9,12 +9,15 @@
// types representing MessagePumpForIO.
#include "base/message_loop/ios_cronet_buildflags.h"
#include "build/blink_buildflags.h"
#include "build/build_config.h"
#if BUILDFLAG(IS_WIN)
#include "base/message_loop/message_pump_win.h"
#elif BUILDFLAG(IS_IOS) && BUILDFLAG(CRONET_BUILD)
#include "base/message_loop/message_pump_io_ios.h"
#elif BUILDFLAG(IS_IOS) && BUILDFLAG(USE_BLINK)
#include "base/message_loop/message_pump_io_ios_libdispatch.h"
#elif BUILDFLAG(IS_APPLE)
#include "base/message_loop/message_pump_kqueue.h"
#elif BUILDFLAG(IS_NACL)
@ -32,6 +35,8 @@ namespace base {
using MessagePumpForIO = MessagePumpForIO;
#elif BUILDFLAG(IS_IOS) && BUILDFLAG(CRONET_BUILD)
using MessagePumpForIO = MessagePumpIOSForIO;
#elif BUILDFLAG(IS_IOS) && BUILDFLAG(USE_BLINK)
using MessagePumpForIO = MessagePumpIOSForIOLibdispatch;
#elif BUILDFLAG(IS_APPLE)
using MessagePumpForIO = MessagePumpKqueue;
#elif BUILDFLAG(IS_NACL)

@ -0,0 +1,207 @@
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/message_loop/message_pump_io_ios_libdispatch.h"
namespace base {
MessagePumpIOSForIOLibdispatch::FdWatchController::FdWatchController(
const Location& location)
: FdWatchControllerInterface(location) {
io_thread_task_runner_ = SequencedTaskRunner::GetCurrentDefault();
}
MessagePumpIOSForIOLibdispatch::FdWatchController::~FdWatchController() {
StopWatchingFileDescriptor();
}
bool MessagePumpIOSForIOLibdispatch::FdWatchController::
StopWatchingFileDescriptor() {
watcher_ = nullptr;
fd_ = -1;
dispatch_source_read_.reset();
dispatch_source_write_.reset();
return true;
}
void MessagePumpIOSForIOLibdispatch::FdWatchController::Init(
const scoped_refptr<base::SequencedTaskRunner>& io_thread_task_runner,
dispatch_queue_t queue,
int fd,
bool persistent,
int mode,
FdWatcher* watcher) {
DCHECK(io_thread_task_runner->RunsTasksInCurrentSequence());
DCHECK(watcher);
DCHECK(!watcher_);
is_persistent_ = persistent;
io_thread_task_runner_ = io_thread_task_runner;
fd_ = fd;
watcher_ = watcher;
base::WeakPtr<MessagePumpIOSForIOLibdispatch::FdWatchController> weak_this =
weak_factory_.GetWeakPtr();
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
if (mode == WATCH_READ || mode == WATCH_READ_WRITE) {
dispatch_source_read_ = std::make_unique<
apple::DispatchSource>(queue, fd, DISPATCH_SOURCE_TYPE_READ, ^{
if (fd_ == -1) {
return;
}
dispatch_source_read_->Suspend();
io_thread_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&MessagePumpIOSForIOLibdispatch::FdWatchController::HandleRead,
weak_this));
});
dispatch_source_read_->Resume();
}
if (mode == WATCH_WRITE || mode == WATCH_READ_WRITE) {
dispatch_source_write_ = std::make_unique<
apple::DispatchSource>(queue, fd, DISPATCH_SOURCE_TYPE_WRITE, ^{
if (fd_ == -1) {
return;
}
dispatch_source_write_->Suspend();
io_thread_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&MessagePumpIOSForIOLibdispatch::FdWatchController::HandleWrite,
weak_this));
});
dispatch_source_write_->Resume();
}
}
void MessagePumpIOSForIOLibdispatch::FdWatchController::HandleRead() {
DCHECK(io_thread_task_runner_->RunsTasksInCurrentSequence());
if (watcher_) {
base::WeakPtr<MessagePumpIOSForIOLibdispatch::FdWatchController> weak_this =
weak_factory_.GetWeakPtr();
watcher_->OnFileCanReadWithoutBlocking(fd_);
if (!weak_this) {
return;
}
}
if (is_persistent_ && dispatch_source_read_) {
dispatch_source_read_->Resume();
}
}
void MessagePumpIOSForIOLibdispatch::FdWatchController::HandleWrite() {
DCHECK(io_thread_task_runner_->RunsTasksInCurrentSequence());
if (watcher_) {
base::WeakPtr<MessagePumpIOSForIOLibdispatch::FdWatchController> weak_this =
weak_factory_.GetWeakPtr();
watcher_->OnFileCanWriteWithoutBlocking(fd_);
if (!weak_this) {
return;
}
}
if (is_persistent_ && dispatch_source_write_) {
dispatch_source_write_->Resume();
}
}
MessagePumpIOSForIOLibdispatch::MachPortWatchController::
MachPortWatchController(const Location& location) {
io_thread_task_runner_ = SequencedTaskRunner::GetCurrentDefault();
}
MessagePumpIOSForIOLibdispatch::MachPortWatchController::
~MachPortWatchController() {
StopWatchingMachPort();
}
bool MessagePumpIOSForIOLibdispatch::MachPortWatchController::
StopWatchingMachPort() {
port_ = MACH_PORT_NULL;
watcher_ = nullptr;
dispatch_source_.reset();
return true;
}
void MessagePumpIOSForIOLibdispatch::MachPortWatchController::Init(
const scoped_refptr<base::SequencedTaskRunner>& io_thread_task_runner,
dispatch_queue_t queue,
mach_port_t port,
MachPortWatcher* watcher) {
DCHECK(io_thread_task_runner->RunsTasksInCurrentSequence());
DCHECK(watcher);
DCHECK(!watcher_);
watcher_ = watcher;
port_ = port;
io_thread_task_runner_ = io_thread_task_runner;
base::WeakPtr<MessagePumpIOSForIOLibdispatch::MachPortWatchController>
weak_this = weak_factory_.GetWeakPtr();
dispatch_source_ = std::make_unique<apple::DispatchSource>(queue, port, ^{
if (port_ == MACH_PORT_NULL) {
return;
}
dispatch_source_->Suspend();
io_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&MessagePumpIOSForIOLibdispatch::
MachPortWatchController::HandleReceive,
weak_this));
});
dispatch_source_->Resume();
}
void MessagePumpIOSForIOLibdispatch::MachPortWatchController::HandleReceive() {
DCHECK(io_thread_task_runner_->RunsTasksInCurrentSequence());
base::WeakPtr<MessagePumpIOSForIOLibdispatch::MachPortWatchController>
weak_this = weak_factory_.GetWeakPtr();
watcher_->OnMachMessageReceived(port_);
if (!weak_this) {
return;
}
if (dispatch_source_) {
dispatch_source_->Resume();
}
}
MessagePumpIOSForIOLibdispatch::MessagePumpIOSForIOLibdispatch()
: queue_(dispatch_queue_create("org.chromium.io_thread.libdispatch_bridge",
DISPATCH_QUEUE_SERIAL)) {}
MessagePumpIOSForIOLibdispatch::~MessagePumpIOSForIOLibdispatch() = default;
void MessagePumpIOSForIOLibdispatch::DoRun(Delegate* delegate) {
io_thread_task_runner_ = SequencedTaskRunner::GetCurrentDefault();
MessagePumpNSRunLoop::DoRun(delegate);
}
bool MessagePumpIOSForIOLibdispatch::WatchFileDescriptor(
int fd,
bool persistent,
int mode,
FdWatchController* controller,
FdWatcher* watcher) {
CHECK(io_thread_task_runner_);
DCHECK(io_thread_task_runner_->RunsTasksInCurrentSequence());
DCHECK_GE(fd, 0);
DCHECK(controller);
DCHECK(watcher);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
controller->Init(io_thread_task_runner_, queue_.get(), fd, persistent, mode,
watcher);
return true;
}
bool MessagePumpIOSForIOLibdispatch::WatchMachReceivePort(
mach_port_t port,
MachPortWatchController* controller,
MachPortWatcher* watcher) {
CHECK(io_thread_task_runner_);
DCHECK(io_thread_task_runner_->RunsTasksInCurrentSequence());
DCHECK_NE(port, static_cast<mach_port_t>(MACH_PORT_NULL));
DCHECK(controller);
DCHECK(watcher);
controller->Init(io_thread_task_runner_, queue_.get(), port, watcher);
return true;
}
} // namespace base

@ -0,0 +1,147 @@
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_MESSAGE_LOOP_MESSAGE_PUMP_IO_IOS_LIBDISPATCH_H_
#define BASE_MESSAGE_LOOP_MESSAGE_PUMP_IO_IOS_LIBDISPATCH_H_
#include <dispatch/dispatch.h>
#include <mach/mach.h>
#include <atomic>
#include <memory>
#include "base/apple/dispatch_source.h"
#include "base/apple/scoped_dispatch_object.h"
#include "base/base_export.h"
#include "base/location.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump_apple.h"
#include "base/message_loop/watchable_io_message_pump_posix.h"
#include "base/task/sequenced_task_runner.h"
namespace base {
// This file introduces a class to monitor sockets and issue callbacks when
// sockets are ready for I/O on iOS using libdispatch as the backing
// monitoring service.
class BASE_EXPORT MessagePumpIOSForIOLibdispatch
: public MessagePumpNSRunLoop,
public WatchableIOMessagePumpPosix {
public:
class FdWatchController : public FdWatchControllerInterface {
public:
explicit FdWatchController(const Location& location);
FdWatchController(const FdWatchController&) = delete;
FdWatchController& operator=(const FdWatchController&) = delete;
// Implicitly calls StopWatchingFileDescriptor.
~FdWatchController() override;
// FdWatchControllerInterface:
bool StopWatchingFileDescriptor() override;
private:
friend class MessagePumpIOSForIOLibdispatch;
friend class MessagePumpIOSForIOLibdispatchFdTest;
void Init(
const scoped_refptr<base::SequencedTaskRunner>& io_thread_task_runner,
dispatch_queue_t queue,
int fd,
bool persistent,
int mode,
FdWatcher* watcher);
void HandleRead();
void HandleWrite();
bool is_persistent_ = false; // false if this event is one-shot.
raw_ptr<FdWatcher> watcher_ = nullptr;
std::atomic<int> fd_ = -1;
std::unique_ptr<apple::DispatchSource> dispatch_source_read_;
std::unique_ptr<apple::DispatchSource> dispatch_source_write_;
scoped_refptr<SequencedTaskRunner> io_thread_task_runner_;
base::WeakPtrFactory<FdWatchController> weak_factory_{this};
};
// Delegate interface that provides notifications of Mach message receive
// events.
class MachPortWatcher {
public:
virtual ~MachPortWatcher() = default;
virtual void OnMachMessageReceived(mach_port_t port) = 0;
};
// Controller interface that is used to stop receiving events for an
// installed MachPortWatcher.
class MachPortWatchController {
public:
explicit MachPortWatchController(const Location& location);
MachPortWatchController(const MachPortWatchController&) = delete;
MachPortWatchController& operator=(const MachPortWatchController&) = delete;
~MachPortWatchController();
bool StopWatchingMachPort();
protected:
friend class MessagePumpIOSForIOLibdispatch;
void Init(
const scoped_refptr<base::SequencedTaskRunner>& io_thread_task_runner,
dispatch_queue_t queue,
mach_port_t port,
MachPortWatcher* watcher);
void HandleReceive();
private:
std::atomic<mach_port_t> port_ = MACH_PORT_NULL;
raw_ptr<MachPortWatcher> watcher_ = nullptr;
std::unique_ptr<apple::DispatchSource> dispatch_source_;
scoped_refptr<SequencedTaskRunner> io_thread_task_runner_;
base::WeakPtrFactory<MachPortWatchController> weak_factory_{this};
};
MessagePumpIOSForIOLibdispatch();
MessagePumpIOSForIOLibdispatch(const MessagePumpIOSForIOLibdispatch&) =
delete;
MessagePumpIOSForIOLibdispatch& operator=(
const MessagePumpIOSForIOLibdispatch&) = delete;
~MessagePumpIOSForIOLibdispatch() override;
void DoRun(Delegate* delegate) override;
bool WatchFileDescriptor(int fd,
bool persistent,
int mode,
FdWatchController* controller,
FdWatcher* watcher);
// Begins watching the Mach receive right named by `port`. The `controller`
// can be used to stop watching for incoming messages, and new message
// notifications are delivered to the `watcher`. This implementation always
// returns true.
bool WatchMachReceivePort(mach_port_t port,
MachPortWatchController* controller,
MachPortWatcher* watcher);
private:
friend class MessagePumpIOSForIOLibdispatchFdTest;
scoped_refptr<base::SequencedTaskRunner> io_thread_task_runner_;
apple::ScopedDispatchObject<dispatch_queue_t> queue_;
};
} // namespace base
#endif // BASE_MESSAGE_LOOP_MESSAGE_PUMP_IO_IOS_LIBDISPATCH_H_

@ -0,0 +1,378 @@
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/message_loop/message_pump_io_ios_libdispatch.h"
#include <unistd.h>
#include "base/logging.h"
#include "base/message_loop/message_pump_for_io.h"
#include "base/posix/eintr_wrapper.h"
#include "base/task/single_thread_task_executor.h"
#include "base/test/bind.h"
#include "base/test/gtest_util.h"
#include "base/test/task_environment.h"
#include "base/threading/thread.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
class MessagePumpIOSForIOLibdispatchFdTest : public testing::Test {
public:
MessagePumpIOSForIOLibdispatchFdTest(
const MessagePumpIOSForIOLibdispatchFdTest&) = delete;
MessagePumpIOSForIOLibdispatchFdTest& operator=(
const MessagePumpIOSForIOLibdispatchFdTest&) = delete;
protected:
MessagePumpIOSForIOLibdispatchFdTest()
: pump_(new MessagePumpIOSForIOLibdispatch()),
executor_(WrapUnique(pump_.get())) {}
~MessagePumpIOSForIOLibdispatchFdTest() override = default;
MessagePumpIOSForIOLibdispatch* pump() { return pump_; }
void SetUp() override {
int ret = pipe(pipefds_);
ASSERT_EQ(0, ret);
ret = pipe(alternate_pipefds_);
ASSERT_EQ(0, ret);
}
void TearDown() override {
if (IGNORE_EINTR(close(pipefds_[0])) < 0) {
PLOG(ERROR) << "close";
}
if (IGNORE_EINTR(close(pipefds_[1])) < 0) {
PLOG(ERROR) << "close";
}
}
void HandleFdIOWriteEvent(
MessagePumpIOSForIOLibdispatch::FdWatchController* watcher) {
watcher->HandleWrite();
}
void HandleFdIOReadEvent(
MessagePumpIOSForIOLibdispatch::FdWatchController* watcher) {
watcher->HandleRead();
}
int pipefds_[2];
int alternate_pipefds_[2];
private:
raw_ptr<MessagePumpIOSForIOLibdispatch> pump_;
SingleThreadTaskExecutor executor_;
};
namespace {
class BaseWatcher : public MessagePumpIOSForIOLibdispatch::FdWatcher {
public:
BaseWatcher(MessagePumpIOSForIOLibdispatch::FdWatchController* controller)
: controller_(controller) {
CHECK(controller_);
}
~BaseWatcher() override = default;
// MessagePumpIOSForIOLibdispatch::FdWatcher interface
void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
protected:
raw_ptr<MessagePumpIOSForIOLibdispatch::FdWatchController> controller_;
};
class DeleteWatcher : public BaseWatcher {
public:
explicit DeleteWatcher(
MessagePumpIOSForIOLibdispatch::FdWatchController* controller,
RepeatingClosure callback)
: BaseWatcher(controller), callback_(callback) {}
~DeleteWatcher() override { CHECK(!controller_); }
void OnFileCanWriteWithoutBlocking(int /* fd */) override {
CHECK(controller_);
delete controller_;
controller_ = nullptr;
callback_.Run();
}
void OnFileCanReadWithoutBlocking(int /* fd */) override {
CHECK(controller_);
delete controller_;
controller_ = nullptr;
callback_.Run();
}
private:
RepeatingClosure callback_;
};
TEST_F(MessagePumpIOSForIOLibdispatchFdTest, DeleteWritePersistentWatcher) {
MessagePumpIOSForIOLibdispatch::FdWatchController* watcher =
new MessagePumpIOSForIOLibdispatch::FdWatchController(FROM_HERE);
RunLoop run_loop;
DeleteWatcher delegate(watcher, run_loop.QuitClosure());
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
pump()->WatchFileDescriptor(pipefds_[1], /*=persistent*/ true,
MessagePumpForIO::WATCH_WRITE, watcher,
&delegate);
// Spoof a callback.
HandleFdIOWriteEvent(watcher);
}));
run_loop.Run();
}
TEST_F(MessagePumpIOSForIOLibdispatchFdTest, DeleteWriteWatcher) {
MessagePumpIOSForIOLibdispatch::FdWatchController* watcher =
new MessagePumpIOSForIOLibdispatch::FdWatchController(FROM_HERE);
RunLoop run_loop;
DeleteWatcher delegate(watcher, run_loop.QuitClosure());
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
pump()->WatchFileDescriptor(pipefds_[1], /*=persistent*/ false,
MessagePumpForIO::WATCH_WRITE, watcher,
&delegate);
// Spoof a callback.
HandleFdIOWriteEvent(watcher);
}));
run_loop.Run();
}
TEST_F(MessagePumpIOSForIOLibdispatchFdTest, DeleteReadPersistentWatcher) {
MessagePumpIOSForIOLibdispatch::FdWatchController* watcher =
new MessagePumpIOSForIOLibdispatch::FdWatchController(FROM_HERE);
RunLoop run_loop;
DeleteWatcher delegate(watcher, run_loop.QuitClosure());
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
pump()->WatchFileDescriptor(pipefds_[1], /*=persistent*/ true,
MessagePumpForIO::WATCH_READ, watcher,
&delegate);
// Spoof a callback.
HandleFdIOReadEvent(watcher);
}));
run_loop.Run();
}
TEST_F(MessagePumpIOSForIOLibdispatchFdTest, DeleteReadWatcher) {
MessagePumpIOSForIOLibdispatch::FdWatchController* watcher =
new MessagePumpIOSForIOLibdispatch::FdWatchController(FROM_HERE);
RunLoop run_loop;
DeleteWatcher delegate(watcher, run_loop.QuitClosure());
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
pump()->WatchFileDescriptor(pipefds_[1], /*=persistent*/ false,
MessagePumpForIO::WATCH_READ, watcher,
&delegate);
// Spoof a callback.
HandleFdIOReadEvent(watcher);
}));
run_loop.Run();
}
class StopWatcher : public BaseWatcher {
public:
StopWatcher(MessagePumpIOSForIOLibdispatch::FdWatchController* controller,
MessagePumpIOSForIOLibdispatch* pump,
RepeatingClosure callback,
int fd_to_start_watching = -1)
: BaseWatcher(controller),
pump_(pump),
callback_(callback),
fd_to_start_watching_(fd_to_start_watching) {}
~StopWatcher() override = default;
void OnFileCanWriteWithoutBlocking(int /* fd */) override {
controller_->StopWatchingFileDescriptor();
if (fd_to_start_watching_ >= 0) {
pump_->WatchFileDescriptor(fd_to_start_watching_, /*=persistent*/ false,
MessagePumpForIO::WATCH_READ_WRITE,
controller_, this);
}
callback_.Run();
}
private:
raw_ptr<MessagePumpIOSForIOLibdispatch> pump_;
RepeatingClosure callback_;
int fd_to_start_watching_;
};
TEST_F(MessagePumpIOSForIOLibdispatchFdTest, StopWatcher) {
MessagePumpIOSForIOLibdispatch::FdWatchController watcher(FROM_HERE);
RunLoop run_loop;
StopWatcher delegate(&watcher, pump(), run_loop.QuitClosure());
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
pump()->WatchFileDescriptor(pipefds_[1], /*=persistent*/ false,
MessagePumpForIO::WATCH_READ_WRITE,
&watcher, &delegate);
// Spoof a callback.
HandleFdIOWriteEvent(&watcher);
}));
run_loop.Run();
}
TEST_F(MessagePumpIOSForIOLibdispatchFdTest, StopWatcherAndWatchSomethingElse) {
MessagePumpIOSForIOLibdispatch::FdWatchController watcher(FROM_HERE);
RunLoop run_loop;
StopWatcher delegate(&watcher, pump(), run_loop.QuitClosure(),
alternate_pipefds_[1]);
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
pump()->WatchFileDescriptor(pipefds_[1], /*=persistent*/ false,
MessagePumpForIO::WATCH_READ_WRITE,
&watcher, &delegate);
// Spoof a callback.
HandleFdIOWriteEvent(&watcher);
}));
run_loop.Run();
}
class MessagePumpIOSForIOLibdispatchMachPortTest : public testing::Test {
public:
MessagePumpIOSForIOLibdispatchMachPortTest()
: pump_(new MessagePumpIOSForIOLibdispatch()),
executor_(WrapUnique(pump_.get())) {}
MessagePumpIOSForIOLibdispatch* pump() { return pump_; }
static void CreatePortPair(apple::ScopedMachReceiveRight* receive,
apple::ScopedMachSendRight* send) {
mach_port_options_t options{};
options.flags = MPO_INSERT_SEND_RIGHT;
apple::ScopedMachReceiveRight port;
kern_return_t kr = mach_port_construct(
mach_task_self(), &options, 0,
apple::ScopedMachReceiveRight::Receiver(*receive).get());
ASSERT_EQ(kr, KERN_SUCCESS);
*send = apple::ScopedMachSendRight(receive->get());
}
static mach_msg_return_t SendEmptyMessage(mach_port_t remote_port,
mach_msg_id_t msgid) {
mach_msg_empty_send_t message{};
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
message.header.msgh_size = sizeof(message);
message.header.msgh_remote_port = remote_port;
message.header.msgh_id = msgid;
return mach_msg_send(&message.header);
}
private:
raw_ptr<MessagePumpIOSForIOLibdispatch> pump_;
SingleThreadTaskExecutor executor_;
};
class MachPortWatcher : public MessagePumpIOSForIOLibdispatch::MachPortWatcher {
public:
MachPortWatcher(RepeatingClosure callback) : callback_(std::move(callback)) {}
~MachPortWatcher() override = default;
// MessagePumpIOSForIOLibdispatch::MachPortWatchController interface
void OnMachMessageReceived(mach_port_t port) override {
mach_msg_empty_rcv_t message{};
kern_return_t kr =
mach_msg(&message.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
sizeof(message), port, 0, MACH_PORT_NULL);
if (kr != KERN_SUCCESS) {
return;
}
messages_.push_back(message.header);
callback_.Run();
}
std::vector<mach_msg_header_t> messages_;
private:
RepeatingClosure callback_;
};
TEST_F(MessagePumpIOSForIOLibdispatchMachPortTest, PortWatcher) {
apple::ScopedMachReceiveRight port;
apple::ScopedMachSendRight send_right;
CreatePortPair(&port, &send_right);
mach_msg_id_t msgid = 'helo';
RunLoop run_loop;
MachPortWatcher watcher(run_loop.QuitClosure());
MessagePumpIOSForIOLibdispatch::MachPortWatchController controller(FROM_HERE);
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
mach_msg_return_t kr = SendEmptyMessage(port.get(), msgid);
EXPECT_EQ(kr, KERN_SUCCESS);
if (kr != KERN_SUCCESS) {
run_loop.Quit();
}
pump()->WatchMachReceivePort(port.get(), &controller, &watcher);
}));
run_loop.Run();
ASSERT_EQ(1u, watcher.messages_.size());
EXPECT_EQ(port.get(), watcher.messages_[0].msgh_local_port);
EXPECT_EQ(msgid, watcher.messages_[0].msgh_id);
}
class DeleteMachPortWatcher
: public MessagePumpIOSForIOLibdispatch::MachPortWatcher {
public:
DeleteMachPortWatcher(
MessagePumpIOSForIOLibdispatch::MachPortWatchController* controller,
RepeatingClosure callback)
: controller_(controller), callback_(std::move(callback)) {}
~DeleteMachPortWatcher() override = default;
// MessagePumpIOSForIOLibdispatch::MachPortWatchController interface
void OnMachMessageReceived(mach_port_t port) override {
CHECK(controller_);
delete controller_;
controller_ = nullptr;
callback_.Run();
}
private:
raw_ptr<MessagePumpIOSForIOLibdispatch::MachPortWatchController> controller_;
RepeatingClosure callback_;
};
TEST_F(MessagePumpIOSForIOLibdispatchMachPortTest, DeletePortWatcher) {
apple::ScopedMachReceiveRight port;
apple::ScopedMachSendRight send_right;
CreatePortPair(&port, &send_right);
mach_msg_id_t msgid = 'helo';
RunLoop run_loop;
MessagePumpIOSForIOLibdispatch::MachPortWatchController* controller =
new MessagePumpIOSForIOLibdispatch::MachPortWatchController(FROM_HERE);
DeleteMachPortWatcher watcher(controller, run_loop.QuitClosure());
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
mach_msg_return_t kr = SendEmptyMessage(port.get(), msgid);
EXPECT_EQ(kr, KERN_SUCCESS);
if (kr != KERN_SUCCESS) {
run_loop.Quit();
}
pump()->WatchMachReceivePort(port.get(), controller, &watcher);
}));
run_loop.Run();
}
} // namespace
} // namespace base

@ -63,7 +63,12 @@ constexpr mach_msg_id_t kChannelMacOOLMsgId = 'MOJ+';
class ChannelMac : public Channel,
public base::CurrentThread::DestructionObserver,
public base::MessagePumpKqueue::MachPortWatcher {
#if BUILDFLAG(IS_IOS) && BUILDFLAG(USE_BLINK)
public base::MessagePumpIOSForIOLibdispatch::MachPortWatcher
#else
public base::MessagePumpKqueue::MachPortWatcher
#endif
{
public:
ChannelMac(Delegate* delegate,
ConnectionParams connection_params,