0

[mojo] Use RunOrPostTask to send message on channel-assoc. interface.

RunOrPostTask provides the same ordering and mutual exclusion
guarantees as PostTask, but it may run its callback synchronously.
to reduce scheduling overhead. It's appropriate for short tasks (since
the task runs synchronously, the RunOrPostTask call may take as long
as running the callback).

The change is behind a base::Feature
("MojoChannelAssociatedSendUsesRunOrPostTask") to allow measuring its
impact or disabling it completely in case problems are found.

Bug: 1503967
Change-Id: I226032e3771741b674bb9b97cdfa635d76e3addf
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5063755
Reviewed-by: Ken Rockot <rockot@google.com>
Commit-Queue: Francois Pierre Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1243073}
This commit is contained in:
François Doray
2024-01-04 20:22:53 +00:00
committed by Chromium LUCI CQ
parent 5883be52c7
commit fad1b8bc2e
2 changed files with 48 additions and 25 deletions

@ -22,14 +22,17 @@ class TimerBase;
class TimerBasedTickProvider;
class WebRtcTaskQueue;
}
namespace webrtc {
class ThreadWrapper;
} // namespace webrtc
namespace IPC {
class ChannelAssociatedGroupController;
} // namespace IPC
namespace media {
class AlsaPcmOutputStream;
class AlsaPcmInputStream;
class FakeAudioWorker;
} // namespace media
namespace webrtc {
class ThreadWrapper;
} // namespace webrtc
namespace base {
@ -71,6 +74,7 @@ class RunOrPostTaskPassKey {
// Avoid =default to disallow creation by uniform initialization.
RunOrPostTaskPassKey() {}
friend class IPC::ChannelAssociatedGroupController;
friend class RunOrPostTaskPassKeyForTesting;
};

@ -17,6 +17,7 @@
#include "base/check_op.h"
#include "base/containers/circular_deque.h"
#include "base/containers/contains.h"
#include "base/feature_list.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/memory/ptr_util.h"
@ -54,12 +55,16 @@
namespace IPC {
namespace {
class ChannelAssociatedGroupController;
namespace {
ABSL_CONST_INIT thread_local bool off_sequence_binding_allowed = false;
BASE_FEATURE(kMojoChannelAssociatedSendUsesRunOrPostTask,
"MojoChannelAssociatedSendUsesRunOrPostTask",
base::FEATURE_ENABLED_BY_DEFAULT);
// Used to track some internal Channel state in pursuit of message leaks.
//
// TODO(https://crbug.com/813045): Remove this.
@ -154,6 +159,8 @@ class ScopedUrgentMessageNotification {
raw_ptr<UrgentMessageObserver> observer_;
};
} // namespace
class ChannelAssociatedGroupController
: public mojo::AssociatedGroupController,
public mojo::MessageReceiver,
@ -841,30 +848,40 @@ class ChannelAssociatedGroupController
DCHECK(message->heap_profiler_tag());
if (task_runner_->BelongsToCurrentThread()) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!connector_ || paused_) {
if (!shut_down_) {
base::AutoLock lock(outgoing_messages_lock_);
outgoing_messages_.emplace_back(std::move(*message));
}
return true;
}
return connector_->Accept(message);
} else {
// We always post tasks to the primary endpoint thread when called from
// other threads in order to simulate IPC::ChannelProxy::Send behavior.
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&ChannelAssociatedGroupController::SendMessageOnPrimaryThread,
this, std::move(*message)));
return true;
return SendMessageOnSequence(message);
}
// PostTask (or RunOrPostTask) so that `message` is sent after messages from
// tasks that are already queued (e.g. by `IPC::ChannelProxy::Send`).
auto callback = base::BindOnce(
&ChannelAssociatedGroupController::SendMessageOnSequenceViaTask, this,
std::move(*message));
if (base::FeatureList::IsEnabled(
kMojoChannelAssociatedSendUsesRunOrPostTask)) {
task_runner_->RunOrPostTask(base::subtle::RunOrPostTaskPassKey(),
FROM_HERE, std::move(callback));
} else {
task_runner_->PostTask(FROM_HERE, std::move(callback));
}
return true;
}
void SendMessageOnPrimaryThread(mojo::Message message) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!SendMessage(&message))
bool SendMessageOnSequence(mojo::Message* message) {
if (!connector_ || paused_) {
if (!shut_down_) {
base::AutoLock lock(outgoing_messages_lock_);
outgoing_messages_.emplace_back(std::move(*message));
}
return true;
}
return connector_->Accept(message);
}
void SendMessageOnSequenceViaTask(mojo::Message message) {
if (!SendMessageOnSequence(&message)) {
RaiseError();
}
}
void OnPipeError() {
@ -1250,6 +1267,8 @@ class ChannelAssociatedGroupController
raw_ptr<UrgentMessageObserver> urgent_message_observer_ = nullptr;
};
namespace {
bool ControllerMemoryDumpProvider::OnMemoryDump(
const base::trace_event::MemoryDumpArgs& args,
base::trace_event::ProcessMemoryDump* pmd) {