0

Add better handle tracking, and optimize Connector::Accept.

R=viettrungluu@chromium.org

Review URL: https://codereview.chromium.org/65043004

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@234056 0039d316-1c4b-4281-b951-d872f2087c98
This commit is contained in:
darin@chromium.org
2013-11-09 01:01:59 +00:00
parent 11f77da629
commit 3ec76558d0
8 changed files with 86 additions and 24 deletions

@ -330,6 +330,7 @@
],
'dependencies': [
'mojo_bindings',
'mojo_system',
],
'sources': [
'public/bindings/sample/generated/sample_service.cc',

@ -37,8 +37,15 @@ bool Connector::Accept(Message* message) {
if (error_)
return false;
bool wait_to_write;
WriteOne(message, &wait_to_write);
if (wait_to_write) {
WaitToWriteMore();
if (!error_)
write_queue_.Push(message);
WriteMore();
}
return !error_;
}
@ -111,24 +118,37 @@ void Connector::ReadMore() {
}
void Connector::WriteMore() {
while (!write_queue_.IsEmpty()) {
const Message* message = write_queue_.Peek();
while (!error_ && !write_queue_.IsEmpty()) {
Message* message = write_queue_.Peek();
bool wait_to_write;
WriteOne(message, &wait_to_write);
if (wait_to_write)
break;
write_queue_.Pop();
}
}
void Connector::WriteOne(Message* message, bool* wait_to_write) {
// TODO(darin): WriteMessage will eventually start generating an error that
// it cannot accept more data. In that case, we'll need to wait on the pipe
// to determine when we can try writing again. This flag will be set to true
// in that case.
*wait_to_write = false;
MojoResult rv = WriteMessage(message_pipe_,
message->data,
message->data->header.num_bytes,
message->handles.data(),
message->handles.size(),
static_cast<uint32_t>(message->handles.size()),
MOJO_WRITE_MESSAGE_FLAG_NONE);
if (rv == MOJO_RESULT_OK) {
// TODO(darin): Handles were successfully transferred, and so we need
// to take care not to Close them here.
write_queue_.Pop();
continue; // Write another message.
}
// The handles were successfully transferred, so we don't need the message
// to track their lifetime any longer.
message->handles.clear();
} else {
error_ = true;
break;
}
}

@ -59,6 +59,7 @@ class Connector : public MessageReceiver {
void WaitToWriteMore();
void ReadMore();
void WriteMore();
void WriteOne(Message* message, bool* wait_to_write);
Handle message_pipe_;
MessageReceiver* incoming_receiver_;

@ -16,7 +16,7 @@ Message::Message()
Message::~Message() {
free(data);
// TODO(darin): Need to Close any handles so they don't leak.
std::for_each(handles.begin(), handles.end(), Close);
}
void Message::Swap(Message* other) {

@ -28,9 +28,8 @@ MOJO_COMPILE_ASSERT(sizeof(MessageData) == 9, bad_sizeof_MessageData);
#pragma pack(pop)
// Message is a holder for the data and handles to be sent over a MessagePipe.
// Message owns its data, but a consumer of Message is free to manipulate the
// data member or replace it. If replacing, then be sure to use |malloc| to
// allocate the memory.
// Message owns its data and handles, but a consumer of Message is free to
// manipulate the data and handles members.
class Message {
public:
Message();
@ -38,7 +37,7 @@ class Message {
void Swap(Message* other);
MessageData* data; // Heap-allocated.
MessageData* data; // Heap-allocated using malloc.
std::vector<Handle> handles;
private:

@ -23,7 +23,7 @@ bool MessageQueue::IsEmpty() const {
return queue_.empty();
}
const Message* MessageQueue::Peek() const {
Message* MessageQueue::Peek() {
assert(!queue_.empty());
return queue_.front();
}

@ -20,7 +20,7 @@ class MessageQueue {
~MessageQueue();
bool IsEmpty() const;
const Message* Peek() const;
Message* Peek();
// This method transfers ownership of |message->data| and |message->handles|
// to the message queue, resetting |message| in the process.

@ -171,5 +171,46 @@ TEST_F(BindingsConnectorTest, WriteToClosedPipe) {
EXPECT_TRUE(connector0.EncounteredError());
}
#if 0
// Enable this test once MojoWriteMessage supports passing handles.
TEST_F(BindingsConnectorTest, MessageWithHandles) {
Connector connector0(handle0_);
Connector connector1(handle1_);
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
Handle handles[2];
CreateMessagePipe(&handles[0], &handles[1]);
message.handles.push_back(handles[0]);
message.handles.push_back(handles[1]);
connector0.Accept(&message);
// The message should have been transferred.
EXPECT_TRUE(message.data == NULL);
EXPECT_TRUE(message.handles.empty());
MessageAccumulator accumulator;
connector1.SetIncomingReceiver(&accumulator);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(std::string(kText),
std::string(
reinterpret_cast<char*>(message_received.data->payload)));
ASSERT_EQ(2U, message_received.handles.size());
EXPECT_EQ(handles[0].value, message_received.handles[0].value);
EXPECT_EQ(handles[1].value, message_received.handles[1].value);
}
#endif
} // namespace test
} // namespace mojo