0

Remove use of std::function from Mojo internals

Introduces a ports::MessageFilter type for filtering messages
on a port's queue. This should be slightly more efficient since it
avoids a heap allocation, and it should lead to more useful stack
traces in certain crash reports.

BUG=609030
R=yzshen@chromium.org

Review-Url: https://codereview.chromium.org/2466993004
Cr-Commit-Position: refs/heads/master@{#429524}
This commit is contained in:
rockot
2016-11-02 21:23:45 -07:00
committed by Commit bot
parent e77160e615
commit 03e91a8a23
10 changed files with 173 additions and 89 deletions

@ -527,8 +527,8 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
} else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
ports::ScopedMessage message;
do {
int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
&message);
int rv = node_controller_->node()->GetMessage(
control_port_, &message, nullptr);
if (rv != ports::OK)
peer_closed_ = true;
if (message) {

@ -504,8 +504,8 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
} else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
ports::ScopedMessage message;
do {
int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
&message);
int rv = node_controller_->node()->GetMessage(
control_port_, &message, nullptr);
if (rv != ports::OK)
peer_closed_ = true;
if (message) {

@ -14,6 +14,7 @@
#include "mojo/edk/system/core.h"
#include "mojo/edk/system/message_for_transit.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports/message_filter.h"
#include "mojo/edk/system/ports_message.h"
#include "mojo/edk/system/request_context.h"
@ -59,6 +60,103 @@ class MessagePipeDispatcher::PortObserverThunk
DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
};
// A MessageFilter used by ReadMessage to determine whether a message should
// actually be consumed yet.
class ReadMessageFilter : public ports::MessageFilter {
public:
// Creates a new ReadMessageFilter which captures and potentially modifies
// various (unowned) local state within MessagePipeDispatcher::ReadMessage.
ReadMessageFilter(bool read_any_size,
bool may_discard,
uint32_t* num_bytes,
uint32_t* num_handles,
bool* no_space,
bool* invalid_message)
: read_any_size_(read_any_size),
may_discard_(may_discard),
num_bytes_(num_bytes),
num_handles_(num_handles),
no_space_(no_space),
invalid_message_(invalid_message) {}
~ReadMessageFilter() override {}
// ports::MessageFilter:
bool Match(const ports::Message& m) override {
const PortsMessage& message = static_cast<const PortsMessage&>(m);
if (message.num_payload_bytes() < sizeof(MessageHeader)) {
*invalid_message_ = true;
return true;
}
const MessageHeader* header =
static_cast<const MessageHeader*>(message.payload_bytes());
if (header->header_size > message.num_payload_bytes()) {
*invalid_message_ = true;
return true;
}
uint32_t bytes_to_read = 0;
uint32_t bytes_available =
static_cast<uint32_t>(message.num_payload_bytes()) -
header->header_size;
if (num_bytes_) {
bytes_to_read = std::min(*num_bytes_, bytes_available);
*num_bytes_ = bytes_available;
}
uint32_t handles_to_read = 0;
uint32_t handles_available = header->num_dispatchers;
if (num_handles_) {
handles_to_read = std::min(*num_handles_, handles_available);
*num_handles_ = handles_available;
}
if (handles_to_read < handles_available ||
(!read_any_size_ && bytes_to_read < bytes_available)) {
*no_space_ = true;
return may_discard_;
}
return true;
}
private:
const bool read_any_size_;
const bool may_discard_;
uint32_t* const num_bytes_;
uint32_t* const num_handles_;
bool* const no_space_;
bool* const invalid_message_;
DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter);
};
#if DCHECK_IS_ON()
// A MessageFilter which never matches a message. Used to peek at the size of
// the next available message on a port, for debug logging only.
class PeekSizeMessageFilter : public ports::MessageFilter {
public:
PeekSizeMessageFilter() {}
~PeekSizeMessageFilter() override {}
// ports::MessageFilter:
bool Match(const ports::Message& message) override {
message_size_ = message.num_payload_bytes();
return false;
}
size_t message_size() const { return message_size_; }
private:
size_t message_size_ = 0;
DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
};
#endif // DCHECK_IS_ON()
MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
const ports::PortRef& port,
uint64_t pipe_id,
@ -186,50 +284,9 @@ MojoResult MessagePipeDispatcher::ReadMessage(
// This flag exists to support both new and old API behavior.
ports::ScopedMessage ports_message;
int rv = node_controller_->node()->GetMessageIf(
port_,
[read_any_size, num_bytes, num_handles, &no_space, &may_discard,
&invalid_message](
const ports::Message& next_message) {
const PortsMessage& message =
static_cast<const PortsMessage&>(next_message);
if (message.num_payload_bytes() < sizeof(MessageHeader)) {
invalid_message = true;
return true;
}
const MessageHeader* header =
static_cast<const MessageHeader*>(message.payload_bytes());
if (header->header_size > message.num_payload_bytes()) {
invalid_message = true;
return true;
}
uint32_t bytes_to_read = 0;
uint32_t bytes_available =
static_cast<uint32_t>(message.num_payload_bytes()) -
header->header_size;
if (num_bytes) {
bytes_to_read = std::min(*num_bytes, bytes_available);
*num_bytes = bytes_available;
}
uint32_t handles_to_read = 0;
uint32_t handles_available = header->num_dispatchers;
if (num_handles) {
handles_to_read = std::min(*num_handles, handles_available);
*num_handles = handles_available;
}
if (handles_to_read < handles_available ||
(!read_any_size && bytes_to_read < bytes_available)) {
no_space = true;
return may_discard;
}
return true;
},
&ports_message);
ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles,
&no_space, &invalid_message);
int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter);
if (invalid_message)
return MOJO_RESULT_UNKNOWN;
@ -530,15 +587,11 @@ void MessagePipeDispatcher::OnPortStatusChanged() {
if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
if (port_status.has_messages) {
ports::ScopedMessage unused;
size_t message_size = 0;
node_controller_->node()->GetMessageIf(
port_, [&message_size](const ports::Message& message) {
message_size = message.num_payload_bytes();
return false;
}, &unused);
PeekSizeMessageFilter filter;
node_controller_->node()->GetMessage(port_, &unused, &filter);
DVLOG(4) << "New message detected on message pipe " << pipe_id_
<< " endpoint " << endpoint_ << " [port=" << port_.name()
<< "; size=" << message_size << "]";
<< "; size=" << filter.message_size() << "]";
}
if (port_status.peer_closed) {
DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_

@ -10,6 +10,7 @@ source_set("ports") {
"event.h",
"message.cc",
"message.h",
"message_filter.h",
"message_queue.cc",
"message_queue.h",
"name.cc",

@ -0,0 +1,29 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_PORTS_MESSAGE_FILTER_H_
#define MOJO_EDK_SYSTEM_PORTS_MESSAGE_FILTER_H_
namespace mojo {
namespace edk {
namespace ports {
class Message;
// An interface which can be implemented to filter port messages according to
// arbitrary policy.
class MessageFilter {
public:
virtual ~MessageFilter() {}
// Returns true of |message| should be accepted by whomever is applying this
// filter. See MessageQueue::GetNextMessage(), for example.
virtual bool Match(const Message& message) = 0;
};
} // namespace ports
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_PORTS_MESSAGE_FILTER_H_

@ -8,6 +8,7 @@
#include "base/logging.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/message_filter.h"
namespace mojo {
namespace edk {
@ -44,10 +45,9 @@ bool MessageQueue::HasNextMessage() const {
return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_;
}
void MessageQueue::GetNextMessageIf(
std::function<bool(const Message&)> selector,
ScopedMessage* message) {
if (!HasNextMessage() || (selector && !selector(*heap_[0].get()))) {
void MessageQueue::GetNextMessage(ScopedMessage* message,
MessageFilter* filter) {
if (!HasNextMessage() || (filter && !filter->Match(*heap_[0].get()))) {
message->reset();
return;
}

@ -22,6 +22,8 @@ namespace ports {
const uint64_t kInitialSequenceNum = 1;
const uint64_t kInvalidSequenceNum = std::numeric_limits<uint64_t>::max();
class MessageFilter;
// An incoming message queue for a port. MessageQueue keeps track of the highest
// known sequence number and can indicate whether the next sequential message is
// available. Thus the queue enforces message ordering for the consumer without
@ -38,9 +40,9 @@ class MessageQueue {
bool HasNextMessage() const;
// Gives ownership of the message. The selector may be null.
void GetNextMessageIf(std::function<bool(const Message&)> selector,
ScopedMessage* message);
// Gives ownership of the message. If |filter| is non-null, the next message
// will only be retrieved if the filter successfully matches it.
void GetNextMessage(ScopedMessage* message, MessageFilter* filter);
// Takes ownership of the message. Note: Messages are ordered, so while we
// have added a message to the queue, we may still be waiting on a message

@ -263,16 +263,12 @@ int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
return OK;
}
int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
return GetMessageIf(port_ref, nullptr, message);
}
int Node::GetMessageIf(const PortRef& port_ref,
std::function<bool(const Message&)> selector,
ScopedMessage* message) {
int Node::GetMessage(const PortRef& port_ref,
ScopedMessage* message,
MessageFilter* filter) {
*message = nullptr;
DVLOG(4) << "GetMessageIf for " << port_ref.name() << "@" << name_;
DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
Port* port = port_ref.port();
{
@ -288,7 +284,7 @@ int Node::GetMessageIf(const PortRef& port_ref,
if (!CanAcceptMoreMessages(port))
return ERROR_PORT_PEER_CLOSED;
port->message_queue.GetNextMessageIf(std::move(selector), message);
port->message_queue.GetNextMessage(message, filter);
}
// Allow referenced ports to trigger PortStatusChanged calls.
@ -994,7 +990,7 @@ int Node::AcceptPort(const PortName& port_name,
<< port->last_sequence_num_to_receive << "]";
// A newly accepted port is not signalable until the message referencing the
// new port finds its way to the consumer (see GetMessageIf).
// new port finds its way to the consumer (see GetMessage).
port->message_queue.set_signalable(false);
int rv = AddPortWithName(port_name, port);
@ -1176,7 +1172,7 @@ int Node::ForwardMessages_Locked(const LockedPort& port,
for (;;) {
ScopedMessage message;
port->message_queue.GetNextMessageIf(nullptr, &message);
port->message_queue.GetNextMessage(&message, nullptr);
if (!message)
break;

@ -44,6 +44,7 @@ struct PortStatus {
bool peer_closed;
};
class MessageFilter;
class NodeDelegate;
class Node {
@ -107,15 +108,15 @@ class Node {
// indicate that this port's peer has closed. In such cases GetMessage may
// be called until it yields a null message, indicating that no more messages
// may be read from the port.
int GetMessage(const PortRef& port_ref, ScopedMessage* message);
// Like GetMessage, but the caller may optionally supply a selector function
// that decides whether or not to return the message. If |selector| is a
// nullptr, then GetMessageIf acts just like GetMessage. The |selector| may
// not call any Node methods.
int GetMessageIf(const PortRef& port_ref,
std::function<bool(const Message&)> selector,
ScopedMessage* message);
//
// If |filter| is non-null, the next available message is returned only if it
// is matched by the filter. If the provided filter does not match the next
// available message, GetMessage() behaves as if there is no message
// available. Ownership of |filter| is not taken, and it must outlive the
// extent of this call.
int GetMessage(const PortRef& port_ref,
ScopedMessage* message,
MessageFilter* filter);
// Sends a message from the specified port to its peer. Note that the message
// notification may arrive synchronously (via PortStatusChanged() on the

@ -172,7 +172,7 @@ class TestNode : public NodeDelegate {
}
bool ReadMessage(const PortRef& port, ScopedMessage* message) {
return node_.GetMessage(port, message) == OK && *message;
return node_.GetMessage(port, message, nullptr) == OK && *message;
}
bool GetSavedMessage(ScopedMessage* message) {
@ -581,14 +581,15 @@ TEST_F(PortsTest, LostConnectionToNode2) {
// a0 should have eventually detected peer closure after node loss.
ScopedMessage message;
EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message));
EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
node0.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node0.node().ClosePort(a0));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().GetMessage(x1, &message));
EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
EXPECT_TRUE(message);
node1.ClosePortsInMessage(message.get());
@ -725,14 +726,15 @@ TEST_F(PortsTest, GetMessage1) {
EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
ScopedMessage message;
EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node.node().ClosePort(a1));
WaitForIdle();
EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message));
EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
node.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node.node().ClosePort(a0));
@ -752,7 +754,7 @@ TEST_F(PortsTest, GetMessage2) {
EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
ScopedMessage message;
EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
ASSERT_TRUE(message);
EXPECT_TRUE(MessageEquals(message, "1"));
@ -781,7 +783,7 @@ TEST_F(PortsTest, GetMessage3) {
ScopedMessage message;
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
ASSERT_TRUE(message);
EXPECT_TRUE(MessageEquals(message, kStrings[i]));
}