0

Refactoring on Windows IPC channel.

This makes the Windows IPC channel interface the same as I've created for the Posix one:
http://codereview.chromium.org/9570001/

The core reading functions DispatchInputData, ProcessIncomingMessages, and the "hello" message handling are now identical. The follow-up step will be to merge these into a common IPCChannelReader base class and eliminate the duplicate code.


Review URL: http://codereview.chromium.org/9568031

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@124848 0039d316-1c4b-4281-b951-d872f2087c98
This commit is contained in:
brettw@chromium.org
2012-03-03 08:55:45 +00:00
parent 1420d5e4d6
commit 215f6fd176
2 changed files with 159 additions and 87 deletions

@ -104,6 +104,109 @@ bool Channel::ChannelImpl::IsNamedServerInitialized(
return GetLastError() == ERROR_SEM_TIMEOUT;
}
Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
char* buffer,
int buffer_len,
int* /* bytes_read */) {
if (INVALID_HANDLE_VALUE == pipe_)
return READ_FAILED;
DWORD bytes_read = 0;
BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize,
&bytes_read, &input_state_.context.overlapped);
if (!ok) {
DWORD err = GetLastError();
if (err == ERROR_IO_PENDING) {
input_state_.is_pending = true;
return READ_PENDING;
}
LOG(ERROR) << "pipe error: " << err;
return READ_FAILED;
}
// We could return READ_SUCCEEDED here. But the way that this code is
// structured we instead go back to the message loop. Our completion port
// will be signalled even in the "synchronously completed" state.
//
// This allows us to potentially process some outgoing messages and
// interleave other work on this thread when we're getting hammered with
// input messages. Potentially, this could be tuned to be more efficient
// with some testing.
input_state_.is_pending = true;
return READ_PENDING;
}
bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
return true;
}
void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
// The hello message contains one parameter containing the PID.
listener_->OnChannelConnected(MessageIterator(msg).NextInt());
}
bool Channel::ChannelImpl::DidEmptyInputBuffers() {
return true;
}
bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
int input_data_len) {
const char* p;
const char* end;
// Possibly combine with the overflow buffer to make a larger buffer.
if (input_overflow_buf_.empty()) {
p = input_data;
end = input_data + input_data_len;
} else {
if (input_overflow_buf_.size() >
kMaximumMessageSize - input_data_len) {
input_overflow_buf_.clear();
LOG(ERROR) << "IPC message is too big";
return false;
}
input_overflow_buf_.append(input_data, input_data_len);
p = input_overflow_buf_.data();
end = p + input_overflow_buf_.size();
}
// Dispatch all complete messages in the data buffer.
while (p < end) {
const char* message_tail = Message::FindNext(p, end);
if (message_tail) {
int len = static_cast<int>(message_tail - p);
Message m(p, len);
if (!WillDispatchInputMessage(&m))
return false;
if (IsHelloMessage(m))
HandleHelloMessage(m);
else
listener_->OnMessageReceived(m);
p = message_tail;
} else {
// Last message is partial.
break;
}
}
// Save any partial data in the overflow buffer.
input_overflow_buf_.assign(p, end - p);
if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
return false;
return true;
}
bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const {
return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE;
}
bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) {
return DispatchInputData(input_buf_, bytes_read);
}
// static
const std::wstring Channel::ChannelImpl::PipeName(
const std::string& channel_id) {
@ -257,89 +360,19 @@ bool Channel::ChannelImpl::ProcessConnection() {
return true;
}
bool Channel::ChannelImpl::ProcessIncomingMessages(
MessageLoopForIO::IOContext* context,
DWORD bytes_read) {
DCHECK(thread_check_->CalledOnValidThread());
if (input_state_.is_pending) {
input_state_.is_pending = false;
DCHECK(context);
if (!context || !bytes_read)
bool Channel::ChannelImpl::ProcessIncomingMessages() {
while (true) {
int bytes_read = 0;
ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
&bytes_read);
if (read_state == READ_FAILED)
return false;
} else {
// This happens at channel initialization.
DCHECK(!bytes_read && context == &input_state_.context);
}
for (;;) {
if (bytes_read == 0) {
if (INVALID_HANDLE_VALUE == pipe_)
return false;
// Read from pipe...
BOOL ok = ReadFile(pipe_,
input_buf_,
Channel::kReadBufferSize,
&bytes_read,
&input_state_.context.overlapped);
if (!ok) {
DWORD err = GetLastError();
if (err == ERROR_IO_PENDING) {
input_state_.is_pending = true;
return true;
}
LOG(ERROR) << "pipe error: " << err;
return false;
}
input_state_.is_pending = true;
if (read_state == READ_PENDING)
return true;
}
DCHECK(bytes_read);
// Process messages from input buffer.
const char* p, *end;
if (input_overflow_buf_.empty()) {
p = input_buf_;
end = p + bytes_read;
} else {
if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
input_overflow_buf_.clear();
LOG(ERROR) << "IPC message is too big";
return false;
}
input_overflow_buf_.append(input_buf_, bytes_read);
p = input_overflow_buf_.data();
end = p + input_overflow_buf_.size();
}
while (p < end) {
const char* message_tail = Message::FindNext(p, end);
if (message_tail) {
int len = static_cast<int>(message_tail - p);
const Message m(p, len);
DVLOG(2) << "received message on channel @" << this
<< " with type " << m.type();
if (m.routing_id() == MSG_ROUTING_NONE &&
m.type() == HELLO_MESSAGE_TYPE) {
// The Hello message contains only the process id.
listener_->OnChannelConnected(MessageIterator(m).NextInt());
} else {
listener_->OnMessageReceived(m);
}
p = message_tail;
} else {
// Last message is partial.
break;
}
}
input_overflow_buf_.assign(p, end - p);
bytes_read = 0; // Get more data.
DCHECK(bytes_read > 0);
if (!DispatchInputData(input_buf_, bytes_read))
return false;
}
return true;
}
bool Channel::ChannelImpl::ProcessOutgoingMessages(
@ -400,8 +433,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
}
void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
DWORD bytes_transfered, DWORD error) {
bool ok;
DWORD bytes_transfered,
DWORD error) {
bool ok = true;
DCHECK(thread_check_->CalledOnValidThread());
if (context == &input_state_.context) {
if (waiting_connect_) {
@ -414,10 +448,26 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
return;
// else, fall-through and look for incoming messages...
}
// we don't support recursion through OnMessageReceived yet!
// We don't support recursion through OnMessageReceived yet!
DCHECK(!processing_incoming_);
AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true);
ok = ProcessIncomingMessages(context, bytes_transfered);
// Process the new data.
if (input_state_.is_pending) {
// This is the normal case for everything except the initialization step.
input_state_.is_pending = false;
if (!bytes_transfered)
ok = false;
else
ok = AsyncReadComplete(bytes_transfered);
} else {
DCHECK(!bytes_transfered);
}
// Request more data.
if (ok)
ok = ProcessIncomingMessages();
} else {
DCHECK(context == &output_state_.context);
ok = ProcessOutgoingMessages(context, bytes_transfered);

@ -1,4 +1,4 @@
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@ -32,13 +32,35 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
void set_listener(Listener* listener) { listener_ = listener; }
bool Send(Message* message);
static bool IsNamedServerInitialized(const std::string& channel_id);
private:
enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
// This will become the virtual interface implemented by this class to
// handle platform-specific reading.
// TODO(brettw) finish refactoring.
ReadState ReadData(char* buffer, int buffer_len, int* bytes_read);
bool WillDispatchInputMessage(Message* msg);
void HandleHelloMessage(const Message& msg);
bool DidEmptyInputBuffers();
bool DispatchInputData(const char* input_data, int input_data_len);
// Returns true if the given message is the hello message.
bool IsHelloMessage(const Message& m) const;
// Handles asynchronously read data.
//
// Optionally call this after returning READ_PENDING from ReadData to
// indicate that buffer was filled with the given number of bytes of
// data. See ReadData for more.
bool AsyncReadComplete(int bytes_read);
static const std::wstring PipeName(const std::string& channel_id);
bool CreatePipe(const IPC::ChannelHandle &channel_handle, Mode mode);
bool ProcessConnection();
bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_read);
bool ProcessIncomingMessages();
bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_written);