0

Push message pipe back pressure back to IPC::ChannelProxy.

This CL implements mojo::internal::MessageQuotaChecker, that allows
unread message quota checking to be performed at send-time in
IPC::ChannelProxy. This in turn allows generating crash dumps with
the abusive producer on the call stack, red handed.

Bug: 1017827
Change-Id: Ib0a7f5cde2c9df00c89bb79834c1cffa211fa6f3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1899909
Reviewed-by: Scott Violet <sky@chromium.org>
Reviewed-by: Ken Rockot <rockot@google.com>
Commit-Queue: Sigurður Ásgeirsson <siggi@chromium.org>
Cr-Commit-Position: refs/heads/master@{#714591}
This commit is contained in:
Sigurdur Asgeirsson
2019-11-12 19:32:20 +00:00
committed by Commit Bot
parent ed5faa8bd2
commit d655dd65fc
21 changed files with 627 additions and 126 deletions

@ -31,6 +31,7 @@
#include "ipc/ipc_channel_mojo.h"
#include "ipc/ipc_logging.h"
#include "ipc/message_filter.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "services/resource_coordinator/public/mojom/memory_instrumentation/constants.mojom.h"
#include "services/service_manager/public/cpp/interface_provider.h"
@ -157,7 +158,8 @@ void ChildProcessHostImpl::CreateChannelMojo() {
BindInterface(IPC::mojom::ChannelBootstrap::Name_, std::move(pipe.handle1));
channel_ = IPC::ChannelMojo::Create(
std::move(pipe.handle0), IPC::Channel::MODE_SERVER, this,
base::ThreadTaskRunnerHandle::Get(), base::ThreadTaskRunnerHandle::Get());
base::ThreadTaskRunnerHandle::Get(), base::ThreadTaskRunnerHandle::Get(),
mojo::internal::MessageQuotaChecker::MaybeCreate());
DCHECK(channel_);
bool initialized = InitChannel();

@ -21,6 +21,7 @@
#include "gpu/ipc/common/gpu_watchdog_timeout.h"
#include "ipc/ipc_channel_mojo.h"
#include "ipc/ipc_sync_message.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "url/gurl.h"
using base::AutoLock;
@ -274,11 +275,13 @@ operator=(OrderingBarrierInfo&&) = default;
GpuChannelHost::Listener::Listener(
mojo::ScopedMessagePipeHandle handle,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner)
: channel_(IPC::ChannelMojo::Create(std::move(handle),
IPC::Channel::MODE_CLIENT,
this,
io_task_runner,
base::ThreadTaskRunnerHandle::Get())) {
: channel_(IPC::ChannelMojo::Create(
std::move(handle),
IPC::Channel::MODE_CLIENT,
this,
io_task_runner,
base::ThreadTaskRunnerHandle::Get(),
mojo::internal::MessageQuotaChecker::MaybeCreate())) {
DCHECK(channel_);
DCHECK(io_task_runner->BelongsToCurrentThread());
bool result = channel_->Connect();

@ -5,6 +5,7 @@
#include "build/build_config.h"
#include "ipc/ipc_channel.h"
#include "ipc/ipc_channel_mojo.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "mojo/public/cpp/system/message_pipe.h"
namespace IPC {
@ -39,7 +40,8 @@ std::unique_ptr<Channel> Channel::CreateClient(
return ChannelMojo::Create(
mojo::ScopedMessagePipeHandle(channel_handle.mojo_handle),
Channel::MODE_CLIENT, listener, ipc_task_runner,
base::ThreadTaskRunnerHandle::Get());
base::ThreadTaskRunnerHandle::Get(),
mojo::internal::MessageQuotaChecker::MaybeCreate());
#endif
}
@ -55,7 +57,8 @@ std::unique_ptr<Channel> Channel::CreateServer(
return ChannelMojo::Create(
mojo::ScopedMessagePipeHandle(channel_handle.mojo_handle),
Channel::MODE_SERVER, listener, ipc_task_runner,
base::ThreadTaskRunnerHandle::Get());
base::ThreadTaskRunnerHandle::Get(),
mojo::internal::MessageQuotaChecker::MaybeCreate());
#endif
}

@ -2,10 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipc/ipc_channel_factory.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "ipc/ipc_channel_factory.h"
#include "ipc/ipc_channel_mojo.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
namespace IPC {
@ -17,7 +18,10 @@ class PlatformChannelFactory : public ChannelFactory {
ChannelHandle handle,
Channel::Mode mode,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
: handle_(handle), mode_(mode), ipc_task_runner_(ipc_task_runner) {}
: handle_(handle),
mode_(mode),
ipc_task_runner_(ipc_task_runner),
quota_checker_(mojo::internal::MessageQuotaChecker::MaybeCreate()) {}
std::unique_ptr<Channel> BuildChannel(Listener* listener) override {
#if defined(OS_NACL_SFI)
@ -26,7 +30,7 @@ class PlatformChannelFactory : public ChannelFactory {
DCHECK(handle_.is_mojo_channel_handle());
return ChannelMojo::Create(
mojo::ScopedMessagePipeHandle(handle_.mojo_handle), mode_, listener,
ipc_task_runner_, base::ThreadTaskRunnerHandle::Get());
ipc_task_runner_, base::ThreadTaskRunnerHandle::Get(), quota_checker_);
#endif
}
@ -34,10 +38,16 @@ class PlatformChannelFactory : public ChannelFactory {
return ipc_task_runner_;
}
scoped_refptr<mojo::internal::MessageQuotaChecker> GetQuotaChecker()
override {
return quota_checker_;
}
private:
ChannelHandle handle_;
Channel::Mode mode_;
scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
scoped_refptr<mojo::internal::MessageQuotaChecker> quota_checker_;
DISALLOW_COPY_AND_ASSIGN(PlatformChannelFactory);
};

@ -14,6 +14,12 @@
#include "base/single_thread_task_runner.h"
#include "ipc/ipc_channel.h"
namespace mojo {
namespace internal {
class MessageQuotaChecker;
} // namespace internal
} // namespace mojo
namespace IPC {
// Encapsulates how a Channel is created. A ChannelFactory can be
@ -31,6 +37,8 @@ class COMPONENT_EXPORT(IPC) ChannelFactory {
virtual ~ChannelFactory() { }
virtual std::unique_ptr<Channel> BuildChannel(Listener* listener) = 0;
virtual scoped_refptr<base::SingleThreadTaskRunner> GetIPCTaskRunner() = 0;
virtual scoped_refptr<mojo::internal::MessageQuotaChecker>
GetQuotaChecker() = 0;
};
} // namespace IPC

@ -27,6 +27,7 @@
#include "ipc/ipc_mojo_handle_attachment.h"
#include "ipc/native_handle_type_converters.h"
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "mojo/public/cpp/bindings/pending_associated_receiver.h"
#include "mojo/public/cpp/system/platform_handle.h"
@ -44,22 +45,30 @@ class MojoChannelFactory : public ChannelFactory {
: handle_(std::move(handle)),
mode_(mode),
ipc_task_runner_(ipc_task_runner),
proxy_task_runner_(proxy_task_runner) {}
proxy_task_runner_(proxy_task_runner),
quota_checker_(mojo::internal::MessageQuotaChecker::MaybeCreate()) {}
std::unique_ptr<Channel> BuildChannel(Listener* listener) override {
return ChannelMojo::Create(std::move(handle_), mode_, listener,
ipc_task_runner_, proxy_task_runner_);
ipc_task_runner_, proxy_task_runner_,
quota_checker_);
}
scoped_refptr<base::SingleThreadTaskRunner> GetIPCTaskRunner() override {
return ipc_task_runner_;
}
scoped_refptr<mojo::internal::MessageQuotaChecker> GetQuotaChecker()
override {
return quota_checker_;
}
private:
mojo::ScopedMessagePipeHandle handle_;
const Channel::Mode mode_;
scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
scoped_refptr<mojo::internal::MessageQuotaChecker> quota_checker_;
DISALLOW_COPY_AND_ASSIGN(MojoChannelFactory);
};
@ -86,9 +95,11 @@ std::unique_ptr<ChannelMojo> ChannelMojo::Create(
Mode mode,
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker) {
return base::WrapUnique(new ChannelMojo(std::move(handle), mode, listener,
ipc_task_runner, proxy_task_runner));
ipc_task_runner, proxy_task_runner,
quota_checker));
}
// static
@ -116,11 +127,12 @@ ChannelMojo::ChannelMojo(
Mode mode,
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker)
: task_runner_(ipc_task_runner), pipe_(handle.get()), listener_(listener) {
weak_ptr_ = weak_factory_.GetWeakPtr();
bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, ipc_task_runner,
proxy_task_runner);
proxy_task_runner, quota_checker);
}
void ChannelMojo::ForwardMessageFromThreadSafePtr(mojo::Message message) {

@ -51,7 +51,8 @@ class COMPONENT_EXPORT(IPC) ChannelMojo
Mode mode,
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner);
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker);
// Create a factory object for ChannelMojo.
// The factory is used to create Mojo-based ChannelProxy family.
@ -101,7 +102,8 @@ class COMPONENT_EXPORT(IPC) ChannelMojo
Mode mode,
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner);
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker);
void ForwardMessageFromThreadSafePtr(mojo::Message message);
void ForwardMessageWithResponderFromThreadSafePtr(

@ -224,6 +224,9 @@ void ChannelProxy::Context::Clear() {
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnSendMessage(std::unique_ptr<Message> message) {
if (quota_checker_)
quota_checker_->AfterMessagesDequeued(1);
if (!channel_) {
OnChannelClosed();
return;
@ -419,6 +422,9 @@ void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread(
}
void ChannelProxy::Context::Send(Message* message) {
if (quota_checker_)
quota_checker_->BeforeMessagesEnqueued(1);
ipc_task_runner()->PostTask(
FROM_HERE, base::BindOnce(&ChannelProxy::Context::OnSendMessage, this,
base::WrapUnique(message)));
@ -497,6 +503,9 @@ void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory,
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!did_init_);
DCHECK(!context_->quota_checker_);
context_->quota_checker_ = factory->GetQuotaChecker();
if (create_pipe_now) {
// Create the channel immediately. This effectively sets up the
// low-level pipe so that the client can connect. Without creating

@ -27,6 +27,7 @@
#include "mojo/public/cpp/bindings/associated_interface_ptr.h"
#include "mojo/public/cpp/bindings/associated_interface_request.h"
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
#include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h"
@ -376,6 +377,9 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
std::unique_ptr<Channel> channel_;
bool channel_connected_called_;
// The quota checker associated with this channel, if any.
scoped_refptr<mojo::internal::MessageQuotaChecker> quota_checker_;
// Lock for |channel_| value. This is only relevant in the context of
// thread-safe send.
base::Lock channel_lifetime_lock_;

@ -120,9 +120,11 @@ class ChannelAssociatedGroupController
ChannelAssociatedGroupController(
bool set_interface_id_namespace_bit,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker)
: task_runner_(ipc_task_runner),
proxy_task_runner_(proxy_task_runner),
quota_checker_(quota_checker),
set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
dispatcher_(this),
control_message_handler_(this),
@ -172,6 +174,8 @@ class ChannelAssociatedGroupController
base::Unretained(this)));
connector_->set_enforce_errors_from_incoming_receiver(false);
connector_->SetWatcherHeapProfilerTag("IPC Channel");
if (quota_checker_)
connector_->SetMessageQuotaChecker(quota_checker_);
// Don't let the Connector do any sort of queuing on our behalf. Individual
// messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
@ -198,6 +202,9 @@ class ChannelAssociatedGroupController
base::AutoLock lock(outgoing_messages_lock_);
std::swap(outgoing_messages, outgoing_messages_);
}
if (quota_checker_ && outgoing_messages.size())
quota_checker_->AfterMessagesDequeued(outgoing_messages.size());
for (auto& message : outgoing_messages)
SendMessage(&message);
}
@ -243,6 +250,9 @@ class ChannelAssociatedGroupController
connector_.reset();
base::AutoLock lock(outgoing_messages_lock_);
if (quota_checker_ && outgoing_messages_.size())
quota_checker_->AfterMessagesDequeued(outgoing_messages_.size());
outgoing_messages_.clear();
}
@ -706,6 +716,8 @@ class ChannelAssociatedGroupController
if (!connector_ || paused_) {
if (!shut_down_) {
base::AutoLock lock(outgoing_messages_lock_);
if (quota_checker_)
quota_checker_->BeforeMessagesEnqueued(1);
outgoing_messages_.emplace_back(std::move(*message));
}
return true;
@ -983,6 +995,7 @@ class ChannelAssociatedGroupController
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
const scoped_refptr<mojo::internal::MessageQuotaChecker> quota_checker_;
const bool set_interface_id_namespace_bit_;
bool paused_ = false;
std::unique_ptr<mojo::Connector> connector_;
@ -1100,11 +1113,12 @@ std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
mojo::ScopedMessagePipeHandle handle,
Channel::Mode mode,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker) {
return std::make_unique<MojoBootstrapImpl>(
std::move(handle),
new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
ipc_task_runner, proxy_task_runner));
std::move(handle), new ChannelAssociatedGroupController(
mode == Channel::MODE_SERVER, ipc_task_runner,
proxy_task_runner, quota_checker));
}
} // namespace IPC

@ -19,6 +19,7 @@
#include "ipc/ipc_listener.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "mojo/public/cpp/bindings/pending_associated_receiver.h"
#include "mojo/public/cpp/system/message_pipe.h"
@ -42,7 +43,8 @@ class COMPONENT_EXPORT(IPC) MojoBootstrap {
mojo::ScopedMessagePipeHandle handle,
Channel::Mode mode,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner);
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker);
// Start the handshake over the underlying message pipe.
virtual void Connect(

@ -113,7 +113,7 @@ TEST_F(IPCMojoBootstrapTest, Connect) {
IPC::MojoBootstrap::Create(
helper_.StartChild("IPCMojoBootstrapTestClient"),
IPC::Channel::MODE_SERVER, base::ThreadTaskRunnerHandle::Get(),
base::ThreadTaskRunnerHandle::Get()),
base::ThreadTaskRunnerHandle::Get(), nullptr),
kTestServerPid);
mojo::PendingAssociatedReceiver<IPC::mojom::Channel> receiver;
@ -138,7 +138,7 @@ MULTIPROCESS_TEST_MAIN_WITH_SETUP(
IPC::MojoBootstrap::Create(
std::move(mojo::core::test::MultiprocessTestHelper::primordial_pipe),
IPC::Channel::MODE_CLIENT, base::ThreadTaskRunnerHandle::Get(),
base::ThreadTaskRunnerHandle::Get()),
base::ThreadTaskRunnerHandle::Get(), nullptr),
kTestClientPid);
mojo::PendingAssociatedReceiver<IPC::mojom::Channel> receiver;
@ -159,7 +159,7 @@ TEST_F(IPCMojoBootstrapTest, ReceiveEmptyMessage) {
IPC::MojoBootstrap::Create(
helper_.StartChild("IPCMojoBootstrapTestEmptyMessage"),
IPC::Channel::MODE_SERVER, base::ThreadTaskRunnerHandle::Get(),
base::ThreadTaskRunnerHandle::Get()),
base::ThreadTaskRunnerHandle::Get(), nullptr),
kTestServerPid);
mojo::PendingAssociatedReceiver<IPC::mojom::Channel> receiver;
@ -186,7 +186,7 @@ MULTIPROCESS_TEST_MAIN_WITH_SETUP(
IPC::MojoBootstrap::Create(
std::move(mojo::core::test::MultiprocessTestHelper::primordial_pipe),
IPC::Channel::MODE_CLIENT, base::ThreadTaskRunnerHandle::Get(),
base::ThreadTaskRunnerHandle::Get()),
base::ThreadTaskRunnerHandle::Get(), nullptr),
kTestClientPid);
mojo::PendingAssociatedReceiver<IPC::mojom::Channel> receiver;

@ -38,9 +38,10 @@ void IPCChannelMojoTestBase::TearDown() {
}
void IPCChannelMojoTestBase::CreateChannel(IPC::Listener* listener) {
channel_ = IPC::ChannelMojo::Create(
TakeHandle(), IPC::Channel::MODE_SERVER, listener,
base::ThreadTaskRunnerHandle::Get(), base::ThreadTaskRunnerHandle::Get());
channel_ =
IPC::ChannelMojo::Create(TakeHandle(), IPC::Channel::MODE_SERVER,
listener, base::ThreadTaskRunnerHandle::Get(),
base::ThreadTaskRunnerHandle::Get(), nullptr);
}
bool IPCChannelMojoTestBase::ConnectChannel() {
@ -64,9 +65,10 @@ void IpcChannelMojoTestClient::Init(mojo::ScopedMessagePipeHandle handle) {
}
void IpcChannelMojoTestClient::Connect(IPC::Listener* listener) {
channel_ = IPC::ChannelMojo::Create(
std::move(handle_), IPC::Channel::MODE_CLIENT, listener,
base::ThreadTaskRunnerHandle::Get(), base::ThreadTaskRunnerHandle::Get());
channel_ =
IPC::ChannelMojo::Create(std::move(handle_), IPC::Channel::MODE_CLIENT,
listener, base::ThreadTaskRunnerHandle::Get(),
base::ThreadTaskRunnerHandle::Get(), nullptr);
CHECK(channel_->Connect());
}

@ -160,6 +160,8 @@ component("bindings") {
"lib/interface_ptr_state.h",
"lib/interface_serialization.h",
"lib/message_dispatcher.cc",
"lib/message_quota_checker.cc",
"lib/message_quota_checker.h",
"lib/multiplex_router.cc",
"lib/multiplex_router.h",
"lib/native_enum_data.h",

@ -18,7 +18,6 @@
#include "base/optional.h"
#include "base/sequence_checker.h"
#include "base/sequenced_task_runner.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/cpp/bindings/connection_group.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
@ -32,6 +31,9 @@ class Lock;
}
namespace mojo {
namespace internal {
class MessageQuotaChecker;
}
// The Connector class is responsible for performing read/write operations on a
// MessagePipe. It writes messages it receives through the MessageReceiver
@ -197,6 +199,10 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// |tag| must be a const string literal.
void SetWatcherHeapProfilerTag(const char* tag);
// Sets the quota checker.
void SetMessageQuotaChecker(
scoped_refptr<internal::MessageQuotaChecker> checker);
// Allows testing environments to override the default serialization behavior
// of newly constructed Connector instances. Must be called before any
// Connector instances are constructed.
@ -315,10 +321,8 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
SEQUENCE_CHECKER(sequence_checker_);
// If this instance was selected for unread message measurement, contains
// the max send quota usage seen so far. If this instance was not selected
// contains MOJO_QUOTA_LIMIT_NONE as a sentinel value.
uint64_t max_unread_message_quota_used_ = MOJO_QUOTA_LIMIT_NONE;
// The quota checker associate with this connector, if any.
scoped_refptr<internal::MessageQuotaChecker> quota_checker_;
base::Lock connected_lock_;
bool connected_ = true;

@ -8,8 +8,6 @@
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/debug/alias.h"
#include "base/debug/dump_without_crashing.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
@ -23,8 +21,10 @@
#include "base/synchronization/lock.h"
#include "base/threading/sequence_local_storage_slot.h"
#include "base/trace_event/trace_event.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/cpp/bindings/features.h"
#include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "mojo/public/cpp/bindings/lib/tracing_helper.h"
#include "mojo/public/cpp/bindings/mojo_buildflags.h"
#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
@ -56,62 +56,6 @@ bool EnableTaskPerMessage() {
return enable;
}
const base::FeatureParam<int> kMojoRecordUnreadMessageCountSampleRate = {
&features::kMojoRecordUnreadMessageCount, "SampleRate",
100 // Sample 1% of Connectors by default.
};
const base::FeatureParam<int> kMojoRecordUnreadMessageCountQuotaValue = {
&features::kMojoRecordUnreadMessageCount, "QuotaValue",
100 // Use a 100 message quote by default.
};
const base::FeatureParam<int> kMojoRecordUnreadMessageCountCrashThreshold = {
&features::kMojoRecordUnreadMessageCount, "CrashThreshold",
0 // Set to zero to disable crash dumps by default.
};
int UnreadMessageCountQuota() {
static const bool enabled =
base::FeatureList::IsEnabled(features::kMojoRecordUnreadMessageCount);
if (!enabled)
return 0;
static const int sample_rate = kMojoRecordUnreadMessageCountSampleRate.Get();
if (base::RandInt(0, sample_rate - 1) != 0)
return 0;
static const int quota = kMojoRecordUnreadMessageCountQuotaValue.Get();
return quota;
}
// Disable inlining for this function to make sure it appears in the stack
// trace on crash.
NOINLINE void MaybeDumpWithoutCrashing(int quota_used) {
static const int crash_threshold =
kMojoRecordUnreadMessageCountCrashThreshold.Get();
if (crash_threshold == 0 || quota_used < crash_threshold)
return;
static bool have_crashed = false;
if (have_crashed)
return;
// Only crash once per process/per run. Note that this is slightly racy
// against concurrent quota overruns on multiple threads, but that's fine.
have_crashed = true;
// This is happening because the user of the interface implicated on the crash
// stack has queued up an unreasonable number of messages, namely
// |quota_used|.
base::debug::DumpWithoutCrashing();
// Defeat tail-call optimization and ensure these two variables are available
// on the stack.
base::debug::Alias(&crash_threshold);
base::debug::Alias(&quota_used);
}
} // namespace
// Used to efficiently maintain a doubly-linked list of all Connectors
@ -220,25 +164,14 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe,
// Even though we don't have an incoming receiver, we still want to monitor
// the message pipe to know if is closed or encounters an error.
WaitToReadMore();
int unread_message_count_quota = UnreadMessageCountQuota();
if (unread_message_count_quota != 0) {
// This connector has been sampled to record the max unread message count.
// Note that setting the quota to N results in over-counting usage by up to
// N/2, in addition to overcounting due to message transit delays. As result
// it's best to treat the resulting metric as N-granular.
MojoResult rv = MojoSetQuota(message_pipe_.get().value(),
MOJO_QUOTA_TYPE_UNREAD_MESSAGE_COUNT,
unread_message_count_quota, nullptr);
if (rv == MOJO_RESULT_OK)
max_unread_message_quota_used_ = 0U;
}
}
Connector::~Connector() {
if (max_unread_message_quota_used_ != MOJO_QUOTA_LIMIT_NONE) {
if (quota_checker_) {
// Clear the message pipe handle in the checker.
quota_checker_->SetMessagePipe(MessagePipeHandle());
UMA_HISTOGRAM_COUNTS_1M("Mojo.Connector.MaxUnreadMessageQuotaUsed",
max_unread_message_quota_used_);
quota_checker_->GetMaxQuotaUsage());
}
{
@ -393,17 +326,9 @@ bool Connector::Accept(Message* message) {
DCHECK(dump_result);
}
#endif
if (max_unread_message_quota_used_ != MOJO_QUOTA_LIMIT_NONE) {
uint64_t limit = 0;
uint64_t usage = 0;
MojoResult rv = MojoQueryQuota(message_pipe_.get().value(),
MOJO_QUOTA_TYPE_UNREAD_MESSAGE_COUNT,
nullptr, &limit, &usage);
if (rv == MOJO_RESULT_OK && usage > max_unread_message_quota_used_) {
MaybeDumpWithoutCrashing(usage);
max_unread_message_quota_used_ = usage;
}
}
if (quota_checker_)
quota_checker_->BeforeWrite();
MojoResult rv =
WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
@ -458,6 +383,14 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) {
}
}
void Connector::SetMessageQuotaChecker(
scoped_refptr<internal::MessageQuotaChecker> checker) {
DCHECK(checker && !quota_checker_);
quota_checker_ = std::move(checker);
quota_checker_->SetMessagePipe(message_pipe_.get());
}
// static
void Connector::OverrideDefaultSerializationBehaviorForTesting(
OutgoingSerializationMode outgoing_mode,

@ -0,0 +1,192 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include <algorithm>
#include "base/debug/alias.h"
#include "base/debug/dump_without_crashing.h"
#include "base/logging.h"
#include "base/metrics/field_trial_params.h"
#include "base/no_destructor.h"
#include "base/rand_util.h"
#include "base/synchronization/lock.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/cpp/bindings/features.h"
#include "mojo/public/cpp/bindings/mojo_buildflags.h"
namespace mojo {
namespace internal {
namespace {
const base::FeatureParam<int> kMojoRecordUnreadMessageCountSampleRate = {
&features::kMojoRecordUnreadMessageCount, "SampleRate",
100 // Sample 1% of Connectors by default. */
};
const base::FeatureParam<int> kMojoRecordUnreadMessageCountQuotaValue = {
&features::kMojoRecordUnreadMessageCount, "QuotaValue",
100 // Use a 100 message quote by default.
};
const base::FeatureParam<int> kMojoRecordUnreadMessageCountCrashThreshold = {
&features::kMojoRecordUnreadMessageCount, "CrashThreshold",
0 // Set to zero to disable crash dumps by default.
};
NOINLINE void MaybeDumpWithoutCrashing(size_t quota_used) {
static bool have_crashed = false;
if (have_crashed)
return;
// Only crash once per process/per run. Note that this is slightly racy
// against concurrent quota overruns on multiple threads, but that's fine.
have_crashed = true;
// This is happening because the user of the interface implicated on the crash
// stack has queued up an unreasonable number of messages, namely
// |quota_used|.
base::debug::DumpWithoutCrashing();
base::debug::Alias(&quota_used);
}
} // namespace
// static
scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreate() {
static const Configuration config = GetConfiguration();
return MaybeCreateImpl(config);
}
void MessageQuotaChecker::BeforeWrite() {
QuotaCheckImpl(0u);
}
void MessageQuotaChecker::BeforeMessagesEnqueued(size_t num) {
DCHECK_NE(num, 0u);
QuotaCheckImpl(num);
}
void MessageQuotaChecker::AfterMessagesDequeued(size_t num) {
base::AutoLock hold(lock_);
DCHECK_LE(num, consumed_quota_);
DCHECK_NE(num, 0u);
consumed_quota_ -= num;
}
size_t MessageQuotaChecker::GetMaxQuotaUsage() {
base::AutoLock hold(lock_);
return max_consumed_quota_;
}
void MessageQuotaChecker::SetMessagePipe(MessagePipeHandle message_pipe) {
base::AutoLock hold(lock_);
message_pipe_ = message_pipe;
if (!message_pipe_)
return;
MojoResult rv =
MojoSetQuota(message_pipe.value(), MOJO_QUOTA_TYPE_UNREAD_MESSAGE_COUNT,
config_->unread_message_count_quota, nullptr);
DCHECK_EQ(MOJO_RESULT_OK, rv);
}
size_t MessageQuotaChecker::GetCurrentQuotaStatusForTesting() {
base::AutoLock hold(lock_);
return GetCurrentQuotaStatus();
}
// static
MessageQuotaChecker::Configuration
MessageQuotaChecker::GetConfigurationForTesting() {
return GetConfiguration();
}
// static
scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreateForTesting(
const Configuration& config) {
return MaybeCreateImpl(config);
}
MessageQuotaChecker::MessageQuotaChecker(const Configuration* config)
: config_(config) {}
MessageQuotaChecker::~MessageQuotaChecker() = default;
// static
MessageQuotaChecker::Configuration MessageQuotaChecker::GetConfiguration() {
Configuration ret;
ret.is_enabled =
base::FeatureList::IsEnabled(features::kMojoRecordUnreadMessageCount);
ret.sample_rate = kMojoRecordUnreadMessageCountSampleRate.Get();
// Lower-bound the quota value to 100, which implies roughly 2% message
// overhead for sampled pipes.
constexpr int kMinQuotaValue = 100;
ret.unread_message_count_quota =
std::max(kMinQuotaValue, kMojoRecordUnreadMessageCountQuotaValue.Get());
ret.crash_threshold = kMojoRecordUnreadMessageCountCrashThreshold.Get();
ret.maybe_crash_function = &MaybeDumpWithoutCrashing;
return ret;
}
// static
scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreateImpl(
const Configuration& config) {
if (!config.is_enabled)
return nullptr;
if (base::RandInt(0, config.sample_rate - 1) != 0)
return nullptr;
return new MessageQuotaChecker(&config);
}
size_t MessageQuotaChecker::GetCurrentQuotaStatus() {
lock_.AssertAcquired();
size_t quota_status = consumed_quota_;
if (message_pipe_) {
uint64_t limit = 0;
uint64_t usage = 0;
MojoResult rv = MojoQueryQuota(message_pipe_.value(),
MOJO_QUOTA_TYPE_UNREAD_MESSAGE_COUNT,
nullptr, &limit, &usage);
if (rv == MOJO_RESULT_OK)
quota_status += usage;
}
return quota_status;
}
void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) {
bool new_max = false;
size_t quota_used = 0u;
{
base::AutoLock hold(lock_);
consumed_quota_ += num_enqueued;
quota_used = GetCurrentQuotaStatus();
// Account for the message that will be written.
if (!num_enqueued)
++quota_used;
if (quota_used > max_consumed_quota_) {
max_consumed_quota_ = quota_used;
new_max = true;
}
}
if (new_max && config_->crash_threshold != 0 &&
quota_used >= config_->crash_threshold) {
config_->maybe_crash_function(quota_used);
}
}
} // namespace internal
} // namespace mojo

@ -0,0 +1,103 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_
#define MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_
#include <stdint.h>
#include "base/component_export.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/public/cpp/system/message_pipe.h"
namespace mojo {
namespace internal {
// This class keeps track of how many messages are in-flight for a message pipe,
// including messages that are posted or locally queued.
//
// Message pipe owners may have reason to implement their own mechanism for
// queuing outgoing messages before writing them to a pipe. This class helps
// with unread message quota monitoring in such cases, since Mojo's own
// quota monitoring on the pipe cannot account for such external queues.
// Callers are responsible for invoking |BeforeMessagesEnqueued()| and
// |AfterMessagesDequeued()| when making respective changes to their local
// outgoing queue. Additionally, |BeforeWrite()| should be called immediately
// before writing each message to the corresponding message pipe.
//
// Also note that messages posted to a different sequence with
// |base::PostTask()| and the like, need to be treated as locally queued. Task
// queues can grow arbitrarily long, and it's ideal to perform unread quota
// checks before posting.
//
// Either |BeforeMessagesEnqueued()| or |BeforeWrite()| may cause the quota
// to be exceeded, thus invoking the |maybe_crash_function| set in this
// object's Configuration.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MessageQuotaChecker
: public base::RefCountedThreadSafe<MessageQuotaChecker> {
public:
// Returns a new instance if this invocation has been sampled for quota
// checking.
static scoped_refptr<MessageQuotaChecker> MaybeCreate();
// Call before writing a message to |message_pipe_|.
void BeforeWrite();
// Call before queueing |num| messages.
void BeforeMessagesEnqueued(size_t num);
// Call after de-queueing |num| messages.
void AfterMessagesDequeued(size_t num);
// Returns the high watermark of quota usage observed by this instance.
size_t GetMaxQuotaUsage();
// Set or unset the message pipe associated with this quota checker.
void SetMessagePipe(MessagePipeHandle message_pipe);
// Test support.
size_t GetCurrentQuotaStatusForTesting();
struct Configuration;
static Configuration GetConfigurationForTesting();
static scoped_refptr<MessageQuotaChecker> MaybeCreateForTesting(
const Configuration& config);
private:
friend class base::RefCountedThreadSafe<MessageQuotaChecker>;
explicit MessageQuotaChecker(const Configuration* config);
~MessageQuotaChecker();
static Configuration GetConfiguration();
static scoped_refptr<MessageQuotaChecker> MaybeCreateImpl(
const Configuration& config);
size_t GetCurrentQuotaStatus();
void QuotaCheckImpl(size_t num_enqueued);
const Configuration* config_;
// Locks all local state.
base::Lock lock_;
// The locally consumed quota, e.g. the difference between the counts passed
// to |BeforeMessagesEnqueued()| and |BeforeMessagesDequeued()|.
size_t consumed_quota_ = 0u;
// The high watermark consumed quota observed.
size_t max_consumed_quota_ = 0u;
// The quota level that triggers a crash dump, or zero to disable crashing.
size_t crash_threshold_ = 0u;
// The message pipe this instance observes, if any.
MessagePipeHandle message_pipe_;
};
struct MessageQuotaChecker::Configuration {
bool is_enabled = false;
size_t sample_rate = 0u;
size_t unread_message_count_quota = 0u;
size_t crash_threshold = 0u;
void (*maybe_crash_function)(size_t quota_used);
};
} // namespace internal
} // namespace mojo
#endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_

@ -18,6 +18,7 @@
#include "mojo/public/cpp/bindings/interface_endpoint_client.h"
#include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
#include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
namespace mojo {
@ -313,9 +314,9 @@ struct MultiplexRouter::Task {
MultiplexRouter::MultiplexRouter(
ScopedMessagePipeHandle message_pipe,
Config config,
bool set_interface_id_namesapce_bit,
bool set_interface_id_namespace_bit,
scoped_refptr<base::SequencedTaskRunner> runner)
: set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
: set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
task_runner_(runner),
dispatcher_(this),
connector_(std::move(message_pipe),
@ -342,6 +343,11 @@ MultiplexRouter::MultiplexRouter(
base::BindOnce(&MultiplexRouter::OnPipeConnectionError,
base::Unretained(this), false /* force_async_dispatch */));
scoped_refptr<internal::MessageQuotaChecker> quota_checker =
internal::MessageQuotaChecker::MaybeCreate();
if (quota_checker)
connector_.SetMessageQuotaChecker(std::move(quota_checker));
std::unique_ptr<MessageHeaderValidator> header_validator =
std::make_unique<MessageHeaderValidator>();
header_validator_ = header_validator.get();

@ -29,6 +29,7 @@ source_set("tests") {
"map_unittest.cc",
"message_queue.cc",
"message_queue.h",
"message_quota_checker_unittest.cc",
"message_unittest.cc",
"multiplex_router_unittest.cc",
"native_struct_unittest.cc",

@ -0,0 +1,189 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
#include "base/test/scoped_feature_list.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/cpp/bindings/features.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace test {
namespace {
class MessageQuotaCheckerTest : public testing::Test {
public:
MessageQuotaCheckerTest() {
EXPECT_EQ(nullptr, instance_);
instance_ = this;
}
~MessageQuotaCheckerTest() override {
EXPECT_EQ(this, instance_);
instance_ = nullptr;
}
protected:
using MessageQuotaChecker = internal::MessageQuotaChecker;
using Configuration = MessageQuotaChecker::Configuration;
static void RecordDumpAttempt(size_t quota_used) {
++instance_->num_dumps_;
instance_->last_dump_quota_used_ = quota_used;
}
size_t num_dumps_ = false;
size_t last_dump_quota_used_ = 0u;
static const Configuration enabled_config_;
static MessageQuotaCheckerTest* instance_;
};
const MessageQuotaCheckerTest::Configuration
MessageQuotaCheckerTest::enabled_config_ = {true, 1, 100, 200,
&RecordDumpAttempt};
MessageQuotaCheckerTest* MessageQuotaCheckerTest::instance_ = nullptr;
TEST_F(MessageQuotaCheckerTest, ReadsConfigurationFromFeatures) {
base::FieldTrialParams params;
params["SampleRate"] = "19";
// Quota value parameter below the minimum the checker will allow.
params["QuotaValue"] = "57";
params["CrashThreshold"] = "225";
base::test::ScopedFeatureList feature_list;
feature_list.InitAndEnableFeatureWithParameters(
features::kMojoRecordUnreadMessageCount, params);
// Validate that the configuration reads from the feature configuration.
const MessageQuotaChecker::Configuration config =
MessageQuotaChecker::GetConfigurationForTesting();
EXPECT_TRUE(config.is_enabled);
EXPECT_EQ(19u, config.sample_rate);
EXPECT_EQ(100u, config.unread_message_count_quota);
EXPECT_EQ(225u, config.crash_threshold);
EXPECT_NE(nullptr, config.maybe_crash_function);
}
TEST_F(MessageQuotaCheckerTest, DisabledByDefault) {
const MessageQuotaChecker::Configuration config =
MessageQuotaChecker::GetConfigurationForTesting();
EXPECT_FALSE(config.is_enabled);
// Validate that no MessageQuoteCheckers are created in the default feature
// configuration. Run a bunch of iterations, as this function returns an
// instance randomly.
for (size_t i = 0; i < 1000; ++i)
ASSERT_EQ(nullptr, MessageQuotaChecker::MaybeCreate());
}
TEST_F(MessageQuotaCheckerTest, CreatesWhenEnabled) {
// Run a bunch of iterations, as this function returns an instance randomly.
for (size_t i = 0; i < 1000; ++i)
EXPECT_NE(nullptr,
MessageQuotaChecker::MaybeCreateForTesting(enabled_config_));
}
TEST_F(MessageQuotaCheckerTest, CountsRight) {
scoped_refptr<MessageQuotaChecker> checker =
MessageQuotaChecker::MaybeCreateForTesting(enabled_config_);
ASSERT_EQ(0u, checker->GetCurrentQuotaStatusForTesting());
ASSERT_EQ(0u, checker->GetMaxQuotaUsage());
checker->BeforeMessagesEnqueued(10);
ASSERT_EQ(10u, checker->GetCurrentQuotaStatusForTesting());
ASSERT_EQ(10u, checker->GetMaxQuotaUsage());
checker->AfterMessagesDequeued(5);
ASSERT_EQ(5u, checker->GetCurrentQuotaStatusForTesting());
ASSERT_EQ(10u, checker->GetMaxQuotaUsage());
ASSERT_EQ(0u, num_dumps_);
}
TEST_F(MessageQuotaCheckerTest, CountsMessagePipeAlso) {
MessagePipe pipe;
scoped_refptr<MessageQuotaChecker> checker =
MessageQuotaChecker::MaybeCreateForTesting(enabled_config_);
uint64_t limit = 0;
uint64_t usage = 0;
MojoResult rv = MojoQueryQuota(pipe.handle0.get().value(),
MOJO_QUOTA_TYPE_UNREAD_MESSAGE_COUNT, nullptr,
&limit, &usage);
ASSERT_EQ(MOJO_RESULT_OK, rv);
ASSERT_EQ(MOJO_QUOTA_LIMIT_NONE, limit);
checker->SetMessagePipe(pipe.handle0.get());
// Validate that the checker sets an unread message quota on the pipe, and
// that it clamps to the minimum of 100.
rv = MojoQueryQuota(pipe.handle0.get().value(),
MOJO_QUOTA_TYPE_UNREAD_MESSAGE_COUNT, nullptr, &limit,
&usage);
ASSERT_EQ(MOJO_RESULT_OK, rv);
ASSERT_EQ(100u, limit);
ASSERT_EQ(0u, checker->GetCurrentQuotaStatusForTesting());
const char kMessage[] = "hello";
for (size_t i = 0; i < 10; ++i) {
checker->BeforeWrite();
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(pipe.handle0.get(), kMessage, sizeof(kMessage),
nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
}
ASSERT_EQ(10u, checker->GetMaxQuotaUsage());
ASSERT_EQ(10u, checker->GetCurrentQuotaStatusForTesting());
checker->BeforeMessagesEnqueued(10);
ASSERT_EQ(20u, checker->GetMaxQuotaUsage());
ASSERT_EQ(20u, checker->GetCurrentQuotaStatusForTesting());
ASSERT_EQ(0u, num_dumps_);
}
TEST_F(MessageQuotaCheckerTest, DumpsCoreOnOverrun) {
MessagePipe pipe;
scoped_refptr<MessageQuotaChecker> checker =
MessageQuotaChecker::MaybeCreateForTesting(enabled_config_);
// Queue up 100 messages.
checker->SetMessagePipe(pipe.handle0.get());
const char kMessage[] = "hello";
for (size_t i = 0; i < 100; ++i) {
checker->BeforeWrite();
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(pipe.handle0.get(), kMessage, sizeof(kMessage),
nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
}
// The crash threshold is at 200 per the config, so shouldn't have attempted
// a core dump yet.
ASSERT_EQ(0u, num_dumps_);
checker->BeforeMessagesEnqueued(50);
ASSERT_EQ(0u, num_dumps_);
checker->BeforeMessagesEnqueued(50);
ASSERT_EQ(1u, num_dumps_);
ASSERT_EQ(200u, last_dump_quota_used_);
checker->BeforeWrite();
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(pipe.handle0.get(), kMessage, sizeof(kMessage),
nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
ASSERT_EQ(2u, num_dumps_);
ASSERT_EQ(201u, last_dump_quota_used_);
}
} // namespace
} // namespace test
} // namespace mojo