0

Mojo: Prefer using writev(2) when possible on POSIX.

According to the UMA metrics Mojo.Channel.WriteQueuePendingMessages at
least 22% of the time there are more than 2 messages queued while
flushing messages on POSIX systems.

This CL introduces a change where when there is more than one message
pending it will try to use writev(2). It's important to note that
we cannot use writev(2) when FileHandles are involved and it handles
that situation. However, according to Mojo.Channel.WriteMessageHandles
sending FileHandles is rare and 98.4% of the time messages do not
contain a FileHandle. So we should be able to benefit from writev(2)
frequently.

Bug: b:171251106
Change-Id: Ie632479da835c457d573ba4aa91bb22b8df084b2
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2486501
Commit-Queue: Brian Geffon <bgeffon@chromium.org>
Reviewed-by: Ken Rockot <rockot@google.com>
Reviewed-by: Ilya Sherman <isherman@chromium.org>
Cr-Commit-Position: refs/heads/master@{#819939}
This commit is contained in:
Brian Geffon
2020-10-22 19:27:12 +00:00
committed by Commit Bot
parent 6048d77c86
commit 36d6df46eb
6 changed files with 172 additions and 0 deletions
content/app
mojo/core
tools/metrics/histograms/histograms_xml/others

@ -87,6 +87,7 @@
#include "gin/v8_initializer.h"
#include "media/base/media.h"
#include "media/media_buildflags.h"
#include "mojo/core/embedder/embedder.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
#include "mojo/public/cpp/platform/platform_channel.h"
#include "mojo/public/cpp/system/dynamic_library_support.h"
@ -486,6 +487,7 @@ int RunZygote(ContentMainDelegate* delegate) {
InitializeFieldTrialAndFeatureList();
delegate->PostFieldTrialInitialization();
mojo::core::InitFeatures();
// After feature list has been initialized, enable pcscan on malloc
// partitions.
@ -847,6 +849,7 @@ int ContentMainRunnerImpl::Run(bool start_service_manager_only) {
// After feature list has been initialized, enable pcscan on malloc
// partitions.
EnablePCScanForMallocPartitionsIfNeeded();
mojo::core::InitFeatures();
}
#if defined(OS_LINUX) || defined(OS_CHROMEOS)
@ -897,6 +900,7 @@ int ContentMainRunnerImpl::RunServiceManager(MainFunctionParams& main_params,
ANNOTATE_LEAKING_OBJECT_PTR(leaked_field_trial_list);
ignore_result(leaked_field_trial_list);
delegate_->PostFieldTrialInitialization();
mojo::core::InitFeatures();
}
if (GetContentClient()->browser()->ShouldCreateThreadPool()) {

@ -278,6 +278,11 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
HandlePolicy handle_policy,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner);
#if defined(OS_POSIX) && !defined(OS_NACL) && !defined(OS_MAC)
// At this point only ChannelPosix needs InitFeatures.
static void set_posix_use_writev(bool use_writev);
#endif
// Allows the caller to change the Channel's HandlePolicy after construction.
void set_handle_policy(HandlePolicy policy) { handle_policy_ = policy; }

@ -8,6 +8,7 @@
#include <sys/socket.h>
#include <algorithm>
#include <atomic>
#include <limits>
#include <memory>
@ -28,6 +29,7 @@
#include "mojo/public/cpp/platform/socket_utils_posix.h"
#if !defined(OS_NACL)
#include <limits.h>
#include <sys/uio.h>
#endif
@ -36,6 +38,10 @@ namespace core {
namespace {
#if !defined(OS_NACL)
std::atomic<bool> g_use_writev{false};
#endif
const size_t kMaxBatchReadCapacity = 256 * 1024;
// A view over a Channel::Message object. The write queue uses these since
@ -89,6 +95,10 @@ class MessageView {
num_handles_sent_ = num_handles_sent;
}
size_t num_handles_remaining() const {
return handles_.size() - num_handles_sent_;
}
private:
Channel::MessagePtr message_;
size_t offset_;
@ -447,6 +457,11 @@ class ChannelPosix : public Channel,
}
bool FlushOutgoingMessagesNoLock() {
#if !defined(OS_NACL)
if (g_use_writev)
return FlushOutgoingMessagesWritevNoLock();
#endif
base::circular_deque<MessageView> messages;
std::swap(outgoing_messages_, messages);
@ -478,6 +493,112 @@ class ChannelPosix : public Channel,
return true;
}
#if !defined(OS_NACL)
bool WriteOutgoingMessagesWithWritev() {
if (outgoing_messages_.empty())
return true;
// If all goes well we can submit a writev(2) with a iovec of size
// outgoing_messages_.size() but never more than the kernel allows.
size_t num_messages_to_send =
std::min<size_t>(IOV_MAX, outgoing_messages_.size());
iovec iov[num_messages_to_send];
memset(&iov[0], 0, sizeof(iov));
// Populate the iov.
size_t num_iovs_set = 0;
for (auto it = outgoing_messages_.begin();
num_iovs_set < num_messages_to_send; ++it) {
if (it->num_handles_remaining() > 0) {
// We can't send handles with writev(2) so stop at this message.
break;
}
iov[num_iovs_set].iov_base = const_cast<void*>(it->data());
iov[num_iovs_set].iov_len = it->data_num_bytes();
num_iovs_set++;
}
UMA_HISTOGRAM_COUNTS_1000("Mojo.Channel.WritevBatchedMessages",
num_iovs_set);
size_t iov_offset = 0;
while (iov_offset < num_iovs_set) {
ssize_t bytes_written = SocketWritev(socket_.get(), &iov[iov_offset],
num_iovs_set - iov_offset);
if (bytes_written < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
WaitForWriteOnIOThreadNoLock();
return true;
}
return false;
}
// Let's walk our outgoing_messages_ popping off outgoing_messages_
// that were fully written.
size_t bytes_remaining = bytes_written;
while (bytes_remaining > 0) {
if (bytes_remaining >= outgoing_messages_.front().data_num_bytes()) {
// This message was fully written.
bytes_remaining -= outgoing_messages_.front().data_num_bytes();
outgoing_messages_.pop_front();
iov_offset++;
} else {
// This message was partially written, account for what was
// already written.
outgoing_messages_.front().advance_data_offset(bytes_remaining);
bytes_remaining = 0;
// Update the iov too as we will call writev again.
iov[iov_offset].iov_base =
const_cast<void*>(outgoing_messages_.front().data());
iov[iov_offset].iov_len = outgoing_messages_.front().data_num_bytes();
}
}
}
return true;
}
// FlushOutgoingMessagesWritevNoLock is equivalent to
// FlushOutgoingMessagesNoLock except it looks for opportunities to make only
// a single write syscall by using writev(2) instead of write(2). In most
// situations this is very straight forward; however, when a handle needs to
// be transferred we cannot use writev(2) and instead will fall back to the
// standard write.
bool FlushOutgoingMessagesWritevNoLock() {
do {
// If the first message contains a handle we will flush it first using a
// standard write, we will also use the standard write if we only have a
// single message.
while (!outgoing_messages_.empty() &&
(outgoing_messages_.front().num_handles_remaining() > 0 ||
outgoing_messages_.size() == 1)) {
MessageView message = std::move(outgoing_messages_.front());
outgoing_messages_.pop_front();
size_t messages_before_write = outgoing_messages_.size();
if (!WriteNoLock(std::move(message)))
return false;
if (outgoing_messages_.size() > messages_before_write) {
// It was re-queued by WriteNoLock.
return true;
}
}
if (!WriteOutgoingMessagesWithWritev())
return false;
// At this point if we have more messages then it's either because we
// exceeded IOV_MAX OR it's because we ran into a FileHandle. Either way
// we just start the process all over again and it will flush any
// FileHandles before attempting writev(2) again.
} while (!outgoing_messages_.empty());
return true;
}
#endif // !defined(OS_NACL)
#if defined(OS_IOS)
bool OnControlMessage(Message::MessageType message_type,
const void* payload,
@ -600,6 +721,13 @@ class ChannelPosix : public Channel,
} // namespace
// static
#if !defined(OS_NACL)
void Channel::set_posix_use_writev(bool use_writev) {
g_use_writev = use_writev;
}
#endif
// static
scoped_refptr<Channel> Channel::Create(
Delegate* delegate,

@ -5,10 +5,14 @@
#include "mojo/core/embedder/embedder.h"
#include <stdint.h>
#include <atomic>
#include <utility>
#include "base/feature_list.h"
#include "base/memory/ref_counted.h"
#include "base/task_runner.h"
#include "build/build_config.h"
#include "mojo/core/channel.h"
#include "mojo/core/configuration.h"
#include "mojo/core/core.h"
#include "mojo/core/entrypoints.h"
@ -18,6 +22,21 @@
namespace mojo {
namespace core {
namespace {
#if defined(OS_POSIX) && !defined(OS_NACL) && !defined(OS_MAC)
const base::Feature kMojoPosixUseWritev{"MojoPosixUseWritev",
base::FEATURE_DISABLED_BY_DEFAULT};
#endif
} // namespace
// InitFeatures will be called as soon as the base::FeatureList is initialized.
void InitFeatures() {
#if defined(OS_POSIX) && !defined(OS_NACL) && !defined(OS_MAC)
Channel::set_posix_use_writev(
base::FeatureList::IsEnabled(kMojoPosixUseWritev));
#endif
}
void Init(const Configuration& configuration) {
internal::g_configuration = configuration;
InitializeCore();

@ -38,6 +38,14 @@ COMPONENT_EXPORT(MOJO_CORE_EMBEDDER) void Init();
COMPONENT_EXPORT(MOJO_CORE_EMBEDDER)
scoped_refptr<base::SingleThreadTaskRunner> GetIOTaskRunner();
// InitFeatures will be called as soon as the base::FeatureList is initialized.
// NOTE: This is temporarily necessary because of how Mojo is started with
// respect to base::FeatureList.
//
// TODO(rockot): Remove once a long term solution is in place for using
// base::Features inside of Mojo.
COMPONENT_EXPORT(MOJO_CORE_EMBEDDER) void InitFeatures();
} // namespace core
} // namespace mojo

@ -8609,6 +8609,14 @@ reviews. Googlers can read more about this at go/gwsq-gerrit.
</summary>
</histogram>
<histogram name="Mojo.Channel.WritevBatchedMessages" units="messages"
expires_after="2021-07-01">
<owner>amistry@chromium.org</owner>
<owner>bgeffon@chromium.org</owner>
<owner>rockot@google.com</owner>
<summary>The number of messages batched into a single writev call.</summary>
</histogram>
<histogram name="Mojo.Connector.MaxUnreadMessageQuotaUsed" units="messages"
expires_after="2021-04-18">
<owner>siggi@chromium.org</owner>