0

Allow synchronous messages to be sent from threads other than the main thread. This simplifies code that needs to do this (i.e. webkit db and file threads).

BUG=23423
Review URL: http://codereview.chromium.org/1601005

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@43752 0039d316-1c4b-4281-b951-d872f2087c98
This commit is contained in:
jabdelmalek@google.com
2010-04-06 20:33:36 +00:00
parent ae2e0f96d3
commit 1e9499c21c
18 changed files with 425 additions and 347 deletions

@ -90,12 +90,14 @@ bool DatabaseDispatcherHost::OnMessageReceived(
*message_was_ok = true;
bool handled = true;
IPC_BEGIN_MESSAGE_MAP_EX(DatabaseDispatcherHost, message, *message_was_ok)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseOpenFile, OnDatabaseOpenFile)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseDeleteFile, OnDatabaseDeleteFile)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseGetFileAttributes,
OnDatabaseGetFileAttributes)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseGetFileSize,
OnDatabaseGetFileSize)
IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseOpenFile,
OnDatabaseOpenFile)
IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseDeleteFile,
OnDatabaseDeleteFile)
IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseGetFileAttributes,
OnDatabaseGetFileAttributes)
IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseGetFileSize,
OnDatabaseGetFileSize)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseOpened, OnDatabaseOpened)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseModified, OnDatabaseModified)
IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseClosed, OnDatabaseClosed)
@ -129,7 +131,7 @@ void DatabaseDispatcherHost::Send(IPC::Message* message) {
void DatabaseDispatcherHost::OnDatabaseOpenFile(const string16& vfs_file_name,
int desired_flags,
int32 message_id) {
IPC::Message* reply_msg) {
if (!observer_added_) {
observer_added_ = true;
ChromeThread::PostTask(
@ -143,19 +145,7 @@ void DatabaseDispatcherHost::OnDatabaseOpenFile(const string16& vfs_file_name,
&DatabaseDispatcherHost::DatabaseOpenFile,
vfs_file_name,
desired_flags,
message_id));
}
static void SetOpenFileResponseParams(
ViewMsg_DatabaseOpenFileResponse_Params* params,
base::PlatformFile file_handle,
base::PlatformFile dir_handle) {
#if defined(OS_WIN)
params->file_handle = file_handle;
#elif defined(OS_POSIX)
params->file_handle = base::FileDescriptor(file_handle, true);
params->dir_handle = base::FileDescriptor(dir_handle, true);
#endif
reply_msg));
}
// Scheduled by the IO thread on the file thread.
@ -164,7 +154,7 @@ static void SetOpenFileResponseParams(
// corresponding renderer process with the file handle.
void DatabaseDispatcherHost::DatabaseOpenFile(const string16& vfs_file_name,
int desired_flags,
int32 message_id) {
IPC::Message* reply_msg) {
DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE));
base::PlatformFile target_handle = base::kInvalidPlatformFileValue;
base::PlatformFile target_dir_handle = base::kInvalidPlatformFileValue;
@ -186,21 +176,28 @@ void DatabaseDispatcherHost::DatabaseOpenFile(const string16& vfs_file_name,
}
}
ViewMsg_DatabaseOpenFileResponse_Params response_params;
SetOpenFileResponseParams(&response_params, target_handle, target_dir_handle);
Send(new ViewMsg_DatabaseOpenFileResponse(message_id, response_params));
ViewHostMsg_DatabaseOpenFile::WriteReplyParams(
reply_msg,
#if defined(OS_WIN)
target_handle
#elif defined(OS_POSIX)
base::FileDescriptor(target_handle, true),
base::FileDescriptor(target_dir_handle, true)
#endif
);
Send(reply_msg);
}
void DatabaseDispatcherHost::OnDatabaseDeleteFile(const string16& vfs_file_name,
const bool& sync_dir,
int32 message_id) {
IPC::Message* reply_msg) {
ChromeThread::PostTask(
ChromeThread::FILE, FROM_HERE,
NewRunnableMethod(this,
&DatabaseDispatcherHost::DatabaseDeleteFile,
vfs_file_name,
sync_dir,
message_id,
reply_msg,
kNumDeleteRetries));
}
@ -210,7 +207,7 @@ void DatabaseDispatcherHost::OnDatabaseDeleteFile(const string16& vfs_file_name,
// corresponding renderer process with the error code.
void DatabaseDispatcherHost::DatabaseDeleteFile(const string16& vfs_file_name,
bool sync_dir,
int32 message_id,
IPC::Message* reply_msg,
int reschedule_count) {
DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE));
@ -229,25 +226,26 @@ void DatabaseDispatcherHost::DatabaseDeleteFile(const string16& vfs_file_name,
&DatabaseDispatcherHost::DatabaseDeleteFile,
vfs_file_name,
sync_dir,
message_id,
reply_msg,
reschedule_count - 1),
kDelayDeleteRetryMs);
return;
}
}
Send(new ViewMsg_DatabaseDeleteFileResponse(message_id, error_code));
ViewHostMsg_DatabaseDeleteFile::WriteReplyParams(reply_msg, error_code);
Send(reply_msg);
}
void DatabaseDispatcherHost::OnDatabaseGetFileAttributes(
const string16& vfs_file_name,
int32 message_id) {
IPC::Message* reply_msg) {
ChromeThread::PostTask(
ChromeThread::FILE, FROM_HERE,
NewRunnableMethod(this,
&DatabaseDispatcherHost::DatabaseGetFileAttributes,
vfs_file_name,
message_id));
reply_msg));
}
// Scheduled by the IO thread on the file thread.
@ -256,24 +254,27 @@ void DatabaseDispatcherHost::OnDatabaseGetFileAttributes(
// corresponding renderer process.
void DatabaseDispatcherHost::DatabaseGetFileAttributes(
const string16& vfs_file_name,
int32 message_id) {
IPC::Message* reply_msg) {
DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE));
int32 attributes = -1;
FilePath db_file =
DatabaseUtil::GetFullFilePathForVfsFile(db_tracker_, vfs_file_name);
if (!db_file.empty())
attributes = VfsBackend::GetFileAttributes(db_file);
Send(new ViewMsg_DatabaseGetFileAttributesResponse(message_id, attributes));
ViewHostMsg_DatabaseGetFileAttributes::WriteReplyParams(
reply_msg, attributes);
Send(reply_msg);
}
void DatabaseDispatcherHost::OnDatabaseGetFileSize(
const string16& vfs_file_name, int32 message_id) {
const string16& vfs_file_name, IPC::Message* reply_msg) {
ChromeThread::PostTask(
ChromeThread::FILE, FROM_HERE,
NewRunnableMethod(this,
&DatabaseDispatcherHost::DatabaseGetFileSize,
vfs_file_name,
message_id));
reply_msg));
}
// Scheduled by the IO thread on the file thread.
@ -281,14 +282,16 @@ void DatabaseDispatcherHost::OnDatabaseGetFileSize(
// on the IO thread's message loop to send an IPC back to
// the corresponding renderer process.
void DatabaseDispatcherHost::DatabaseGetFileSize(const string16& vfs_file_name,
int32 message_id) {
IPC::Message* reply_msg) {
DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE));
int64 size = 0;
FilePath db_file =
DatabaseUtil::GetFullFilePathForVfsFile(db_tracker_, vfs_file_name);
if (!db_file.empty())
size = VfsBackend::GetFileSize(db_file);
Send(new ViewMsg_DatabaseGetFileSizeResponse(message_id, size));
ViewHostMsg_DatabaseGetFileSize::WriteReplyParams(reply_msg, size);
Send(reply_msg);
}
void DatabaseDispatcherHost::OnDatabaseOpened(const string16& origin_identifier,

@ -34,14 +34,14 @@ class DatabaseDispatcherHost
// VFS message handlers (IO thread)
void OnDatabaseOpenFile(const string16& vfs_file_name,
int desired_flags,
int32 message_id);
IPC::Message* reply_msg);
void OnDatabaseDeleteFile(const string16& vfs_file_name,
const bool& sync_dir,
int32 message_id);
IPC::Message* reply_msg);
void OnDatabaseGetFileAttributes(const string16& vfs_file_name,
int32 message_id);
IPC::Message* reply_msg);
void OnDatabaseGetFileSize(const string16& vfs_file_name,
int32 message_id);
IPC::Message* reply_msg);
// Database tracker message handlers (IO thread)
void OnDatabaseOpened(const string16& origin_identifier,
@ -83,15 +83,15 @@ class DatabaseDispatcherHost
// VFS message handlers (file thread)
void DatabaseOpenFile(const string16& vfs_file_name,
int desired_flags,
int32 message_id);
IPC::Message* reply_msg);
void DatabaseDeleteFile(const string16& vfs_file_name,
bool sync_dir,
int32 message_id,
IPC::Message* reply_msg,
int reschedule_count);
void DatabaseGetFileAttributes(const string16& vfs_file_name,
int32 message_id);
IPC::Message* reply_msg);
void DatabaseGetFileSize(const string16& vfs_file_name,
int32 message_id);
IPC::Message* reply_msg);
// Database tracker message handlers (file thread)
void DatabaseOpened(const string16& origin_identifier,

@ -13,6 +13,7 @@
#include "chrome/common/socket_stream_dispatcher.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message.h"
#include "ipc/ipc_sync_message_filter.h"
#include "ipc/ipc_switches.h"
#include "webkit/glue/webkit_glue.h"
@ -49,6 +50,10 @@ void ChildThread::Init() {
resource_dispatcher_.reset(new ResourceDispatcher(this));
socket_stream_dispatcher_.reset(new SocketStreamDispatcher());
sync_message_filter_ =
new IPC::SyncMessageFilter(ChildProcess::current()->GetShutDownEvent());
channel_->AddFilter(sync_message_filter_.get());
// When running in unit tests, there is already a NotificationService object.
// Since only one can exist at a time per thread, check first.
if (!NotificationService::current())
@ -60,6 +65,8 @@ ChildThread::~ChildThread() {
IPC::Logging::current()->SetIPCSender(NULL);
#endif
channel_->RemoveFilter(sync_message_filter_.get());
// The ChannelProxy object caches a pointer to the IPC thread, so need to
// reset it as it's not guaranteed to outlive this object.
// NOTE: this also has the side-effect of not closing the main IPC channel to
@ -126,7 +133,7 @@ void ChildThread::OnMessageReceived(const IPC::Message& msg) {
#if defined(IPC_MESSAGE_LOG_ENABLED)
IPC_MESSAGE_HANDLER(PluginProcessMsg_SetIPCLoggingEnabled,
OnSetIPCLoggingEnabled)
#endif // IPC_MESSAGE_HANDLER
#endif
IPC_MESSAGE_UNHANDLED(handled = false)
IPC_END_MESSAGE_MAP()

@ -15,6 +15,10 @@
class NotificationService;
class SocketStreamDispatcher;
namespace IPC {
class SyncMessageFilter;
}
// The main thread of a child process derives from this class.
class ChildThread : public IPC::Channel::Listener,
public IPC::Message::Sender {
@ -49,6 +53,10 @@ class ChildThread : public IPC::Channel::Listener,
return socket_stream_dispatcher_.get();
}
// Safe to call on any thread, as long as it's guaranteed that the thread's
// lifetime is less than the main thread.
IPC::SyncMessageFilter* sync_message_filter() { return sync_message_filter_; }
MessageLoop* message_loop() { return message_loop_; }
// Returns the one child thread.
@ -84,6 +92,9 @@ class ChildThread : public IPC::Channel::Listener,
std::string channel_name_;
scoped_ptr<IPC::SyncChannel> channel_;
// Allows threads other than the main thread to send sync messages.
scoped_refptr<IPC::SyncMessageFilter> sync_message_filter_;
// Implements message routing functionality to the consumers of ChildThread.
MessageRouter router_;

@ -10,8 +10,9 @@
#include "third_party/sqlite/preprocessed/sqlite3.h"
#endif
#include "chrome/common/db_message_filter.h"
#include "chrome/common/child_thread.h"
#include "chrome/common/render_messages.h"
#include "ipc/ipc_sync_message_filter.h"
#include "third_party/WebKit/WebKit/chromium/public/WebString.h"
using WebKit::WebKitClient;
@ -20,58 +21,59 @@ using WebKit::WebString;
WebKitClient::FileHandle DatabaseUtil::databaseOpenFile(
const WebString& vfs_file_name, int desired_flags,
WebKitClient::FileHandle* dir_handle) {
DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance();
int message_id = db_message_filter->GetUniqueID();
IPC::PlatformFileForTransit file_handle;
#if defined(OS_WIN)
ViewMsg_DatabaseOpenFileResponse_Params default_response =
{ base::kInvalidPlatformFileValue };
file_handle = base::kInvalidPlatformFileValue;
#elif defined(OS_POSIX)
ViewMsg_DatabaseOpenFileResponse_Params default_response =
{ base::FileDescriptor(base::kInvalidPlatformFileValue, true),
base::FileDescriptor(base::kInvalidPlatformFileValue, true) };
file_handle =
base::FileDescriptor(base::kInvalidPlatformFileValue, true);
base::FileDescriptor dir_handle_rv =
base::FileDescriptor(base::kInvalidPlatformFileValue, true);
#endif
ViewMsg_DatabaseOpenFileResponse_Params result =
db_message_filter->SendAndWait(
new ViewHostMsg_DatabaseOpenFile(
vfs_file_name, desired_flags, message_id),
message_id, default_response);
scoped_refptr<IPC::SyncMessageFilter> filter =
ChildThread::current()->sync_message_filter();
filter->Send(new ViewHostMsg_DatabaseOpenFile(
vfs_file_name,
desired_flags,
&file_handle
#if defined(OS_POSIX)
, &dir_handle_rv
#endif
));
#if defined(OS_WIN)
if (dir_handle)
*dir_handle = base::kInvalidPlatformFileValue;
return result.file_handle;
return file_handle;
#elif defined(OS_POSIX)
if (dir_handle)
*dir_handle = result.dir_handle.fd;
return result.file_handle.fd;
*dir_handle = dir_handle_rv.fd;
return file_handle.fd;
#endif
}
int DatabaseUtil::databaseDeleteFile(
const WebString& vfs_file_name, bool sync_dir) {
DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance();
int message_id = db_message_filter->GetUniqueID();
return db_message_filter->SendAndWait(
new ViewHostMsg_DatabaseDeleteFile(vfs_file_name, sync_dir, message_id),
message_id, SQLITE_IOERR_DELETE);
int rv = SQLITE_IOERR_DELETE;
scoped_refptr<IPC::SyncMessageFilter> filter =
ChildThread::current()->sync_message_filter();
filter->Send(new ViewHostMsg_DatabaseDeleteFile(
vfs_file_name, sync_dir, &rv));
return rv;
}
long DatabaseUtil::databaseGetFileAttributes(
const WebString& vfs_file_name) {
DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance();
int message_id = db_message_filter->GetUniqueID();
return db_message_filter->SendAndWait(
new ViewHostMsg_DatabaseGetFileAttributes(vfs_file_name, message_id),
message_id, -1L);
long DatabaseUtil::databaseGetFileAttributes(const WebString& vfs_file_name) {
int32 rv = -1;
scoped_refptr<IPC::SyncMessageFilter> filter =
ChildThread::current()->sync_message_filter();
filter->Send(new ViewHostMsg_DatabaseGetFileAttributes(vfs_file_name, &rv));
return rv;
}
long long DatabaseUtil::databaseGetFileSize(
const WebString& vfs_file_name) {
DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance();
int message_id = db_message_filter->GetUniqueID();
return db_message_filter->SendAndWait(
new ViewHostMsg_DatabaseGetFileSize(vfs_file_name, message_id),
message_id, 0LL);
long long DatabaseUtil::databaseGetFileSize(const WebString& vfs_file_name) {
int64 rv = 0LL;
scoped_refptr<IPC::SyncMessageFilter> filter =
ChildThread::current()->sync_message_filter();
filter->Send(new ViewHostMsg_DatabaseGetFileSize(vfs_file_name, &rv));
return rv;
}

@ -1,69 +1,18 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/common/db_message_filter.h"
#include "chrome/common/child_process.h"
#include "chrome/common/render_messages.h"
#include "third_party/WebKit/WebKit/chromium/public/WebDatabase.h"
DBMessageFilter* DBMessageFilter::instance_ = NULL;
DBMessageFilter::DBMessageFilter()
: io_thread_message_loop_(ChildProcess::current()->io_message_loop()),
channel_(NULL),
shutdown_event_(ChildProcess::current()->GetShutDownEvent()),
messages_awaiting_replies_(new IDMap<DBMessageState>()),
unique_id_generator_(new base::AtomicSequenceNumber()) {
DCHECK(!instance_);
instance_ = this;
}
int DBMessageFilter::GetUniqueID() {
return unique_id_generator_->GetNext();
}
static void SendMessageOnIOThread(IPC::Message* message,
IPC::Channel* channel,
Lock* channel_lock) {
AutoLock channel_auto_lock(*channel_lock);
if (channel)
channel->Send(message);
else
delete message;
}
void DBMessageFilter::Send(IPC::Message* message) {
io_thread_message_loop_->PostTask(FROM_HERE,
NewRunnableFunction(SendMessageOnIOThread, message, channel_,
&channel_lock_));
}
void DBMessageFilter::OnFilterAdded(IPC::Channel* channel) {
AutoLock channel_auto_lock(channel_lock_);
channel_ = channel;
}
void DBMessageFilter::OnChannelError() {
AutoLock channel_auto_lock(channel_lock_);
channel_ = NULL;
}
void DBMessageFilter::OnChannelClosing() {
AutoLock channel_auto_lock(channel_lock_);
channel_ = NULL;
DBMessageFilter::DBMessageFilter() {
}
bool DBMessageFilter::OnMessageReceived(const IPC::Message& message) {
bool handled = true;
IPC_BEGIN_MESSAGE_MAP(DBMessageFilter, message)
IPC_MESSAGE_HANDLER(ViewMsg_DatabaseOpenFileResponse,
OnResponse<ViewMsg_DatabaseOpenFileResponse_Params>)
IPC_MESSAGE_HANDLER(ViewMsg_DatabaseDeleteFileResponse, OnResponse<int>)
IPC_MESSAGE_HANDLER(ViewMsg_DatabaseGetFileAttributesResponse,
OnResponse<uint32>)
IPC_MESSAGE_HANDLER(ViewMsg_DatabaseGetFileSizeResponse, OnResponse<int64>)
IPC_MESSAGE_HANDLER(ViewMsg_DatabaseUpdateSize, OnDatabaseUpdateSize)
IPC_MESSAGE_HANDLER(ViewMsg_DatabaseCloseImmediately,
OnDatabaseCloseImmediately)

@ -1,143 +1,27 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_COMMON_DB_MESSAGE_FILTER_H_
#define CHROME_COMMON_DB_MESSAGE_FILTER_H_
#include "base/atomic_sequence_num.h"
#include "base/id_map.h"
#include "base/lock.h"
#include "base/scoped_ptr.h"
#include "base/waitable_event.h"
#include "ipc/ipc_channel_proxy.h"
class Lock;
class MessageLoop;
namespace IPC {
class Channel;
}
// A thread-safe message filter used to send IPCs from DB threads and process
// replies from the browser process.
//
// This class should not be instantianted anywhere but RenderThread::Init(). It
// is meant to be a singleton in each renderer process. To access the singleton,
// use GetInstance().
// Receives database messages from the browser process and processes them on the
// IO thread.
class DBMessageFilter : public IPC::ChannelProxy::MessageFilter {
public:
// Returns the DBMessageFilter singleton created in this renderer process.
static DBMessageFilter* GetInstance() { return instance_; }
// Creates a new DBMessageFilter instance.
DBMessageFilter();
// Returns a unique ID for use when calling the SendAndWait() method.
virtual int GetUniqueID();
// Posts a task to the IO thread to send |message| to the browser.
virtual void Send(IPC::Message* message);
// Sends |message| and blocks the current thread. Returns the result from the
// reply message, or |default_result| if the renderer process is being
// destroyed or the message could not be sent.
template<class ResultType>
ResultType SendAndWait(IPC::Message* message,
int message_id,
ResultType default_result) {
ResultType result = default_result;
base::WaitableEvent waitable_event(false, false);
DBMessageState state =
{ reinterpret_cast<intptr_t>(&result), &waitable_event };
{
AutoLock msgs_awaiting_replies_autolock(messages_awaiting_replies_lock_);
messages_awaiting_replies_->AddWithID(&state, message_id);
}
Send(message);
base::WaitableEvent* events[2] = { shutdown_event_, &waitable_event };
base::WaitableEvent::WaitMany(events, 2);
// Locking on messages_awaiting_replies_ guarantees that either the IO
// thread won't enter OnResponse(), or if it's already in OnResponse(),
// then WaitableEvent::Signal() will get a chance to do all its work
// before waitable_event is deleted.
AutoLock msgs_awaiting_replies_autolock(messages_awaiting_replies_lock_);
messages_awaiting_replies_->Remove(message_id);
return result;
}
// Processes incoming message |message| from the browser process.
private:
virtual bool OnMessageReceived(const IPC::Message& message);
private:
// The state we store for each message we send.
struct DBMessageState {
intptr_t result_address_;
base::WaitableEvent* waitable_event_;
};
// This is a RefCounted class, do not allow anybody to destroy it directly.
virtual ~DBMessageFilter() { instance_ = NULL; }
// Invoked when this filter is added to |channel|.
virtual void OnFilterAdded(IPC::Channel* channel);
// Called when the channel encounters a problem. The filter should clean up
// its internal data and not accept any more messages.
virtual void OnChannelError();
// Called when the channel is closing. The filter should clean up its internal
// and not accept any more messages.
virtual void OnChannelClosing();
// Processes the reply to a sync DB request.
template<class ResultType>
void OnResponse(int32 message_id, ResultType result) {
AutoLock msgs_awaiting_replies_autolock(messages_awaiting_replies_lock_);
DBMessageState *state = messages_awaiting_replies_->Lookup(message_id);
if (state) {
*reinterpret_cast<ResultType*>(state->result_address_) = result;
state->waitable_event_->Signal();
}
}
// Processes IPCs that indicate a change in the size of a DB file.
void OnDatabaseUpdateSize(const string16& origin_identifier,
const string16& database_name,
int64 database_size,
int64 space_available);
// Processes IPCs that ask for a DB to be closed immediately.
void OnDatabaseCloseImmediately(const string16& origin_identifier,
const string16& database_name);
// The message loop for the IO thread.
MessageLoop* io_thread_message_loop_;
// The channel to which this filter was added.
IPC::Channel* channel_;
// A lock around the channel.
Lock channel_lock_;
// The shutdown event.
base::WaitableEvent* shutdown_event_;
// The list of messages awaiting replies. For each such message we store a
// DBMessageState instance.
scoped_ptr<IDMap<DBMessageState> > messages_awaiting_replies_;
// The lock for 'messages_awaiting_replies_'.
Lock messages_awaiting_replies_lock_;
// A thread-safe unique number generator.
scoped_ptr<base::AtomicSequenceNumber> unique_id_generator_;
// The singleton.
static DBMessageFilter* instance_;
};
#endif // CHROME_COMMON_DB_MESSAGE_FILTER_H_

@ -422,13 +422,6 @@ struct ViewMsg_PrintPages_Params {
std::vector<int> pages;
};
struct ViewMsg_DatabaseOpenFileResponse_Params {
IPC::PlatformFileForTransit file_handle; // DB file handle
#if defined(OS_POSIX)
base::FileDescriptor dir_handle; // DB directory handle
#endif
};
// Parameters to describe a rendered page.
struct ViewHostMsg_DidPrintPage_Params {
// A shared memory handle to the EMF data. This data can be quite large so a
@ -2037,33 +2030,6 @@ struct ParamTraits<ViewMsg_StopFinding_Params> {
}
};
template <>
struct ParamTraits<ViewMsg_DatabaseOpenFileResponse_Params> {
typedef ViewMsg_DatabaseOpenFileResponse_Params param_type;
static void Write(Message* m, const param_type& p) {
WriteParam(m, p.file_handle);
#if defined(OS_POSIX)
WriteParam(m, p.dir_handle);
#endif
}
static bool Read(const Message* m, void** iter, param_type* p) {
bool ret = ReadParam(m, iter, &p->file_handle);
#if defined(OS_POSIX)
ret = ret && ReadParam(m, iter, &p->dir_handle);
#endif
return ret;
}
static void Log(const param_type& p, std::wstring* l) {
l->append(L"(");
LogParam(p.file_handle, l);
#if defined(OS_POSIX)
l->append(L", ");
LogParam(p.dir_handle, l);
#endif
l->append(L")");
}
};
template <>
struct ParamTraits<appcache::Status> {
typedef appcache::Status param_type;

@ -821,26 +821,6 @@ IPC_BEGIN_MESSAGES(View)
IPC_MESSAGE_ROUTED1(ViewMsg_ExecuteCode,
ViewMsg_ExecuteCode_Params)
// Returns a file handle
IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseOpenFileResponse,
int32 /* the ID of the message we're replying to */,
ViewMsg_DatabaseOpenFileResponse_Params)
// Returns a SQLite error code
IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseDeleteFileResponse,
int32 /* the ID of the message we're replying to */,
int /* SQLite error code */)
// Returns the attributes of a file
IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseGetFileAttributesResponse,
int32 /* the ID of the message we're replying to */,
int32 /* the attributes for the given DB file */)
// Returns the size of a file
IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseGetFileSizeResponse,
int32 /* the ID of the message we're replying to */,
int64 /* the size of the given DB file */)
// Notifies the child process of the new database size
IPC_MESSAGE_CONTROL4(ViewMsg_DatabaseUpdateSize,
string16 /* the origin */,
@ -2126,27 +2106,35 @@ IPC_BEGIN_MESSAGES(ViewHost)
unsigned long /* estimated size */,
bool /* result */)
// Asks the browser process to open a DB file with the given name
IPC_MESSAGE_CONTROL3(ViewHostMsg_DatabaseOpenFile,
// Asks the browser process to open a DB file with the given name.
#if defined (OS_WIN)
IPC_SYNC_MESSAGE_CONTROL2_1(ViewHostMsg_DatabaseOpenFile,
string16 /* vfs file name */,
int /* desired flags */,
IPC::PlatformFileForTransit /* file_handle */)
#elif defined(OS_POSIX)
IPC_SYNC_MESSAGE_CONTROL2_2(ViewHostMsg_DatabaseOpenFile,
string16 /* vfs file name */,
int /* desired flags */,
int32 /* a unique message ID */)
IPC::PlatformFileForTransit /* file_handle */,
base::FileDescriptor /* dir_handle */)
#endif
// Asks the browser process to delete a DB file
IPC_MESSAGE_CONTROL3(ViewHostMsg_DatabaseDeleteFile,
IPC_SYNC_MESSAGE_CONTROL2_1(ViewHostMsg_DatabaseDeleteFile,
string16 /* vfs file name */,
bool /* whether or not to sync the directory */,
int32 /* a unique message ID */)
int /* SQLite error code */)
// Asks the browser process to return the attributes of a DB file
IPC_MESSAGE_CONTROL2(ViewHostMsg_DatabaseGetFileAttributes,
IPC_SYNC_MESSAGE_CONTROL1_1(ViewHostMsg_DatabaseGetFileAttributes,
string16 /* vfs file name */,
int32 /* a unique message ID */)
int32 /* the attributes for the given DB file */)
// Asks the browser process to return the size of a DB file
IPC_MESSAGE_CONTROL2(ViewHostMsg_DatabaseGetFileSize,
IPC_SYNC_MESSAGE_CONTROL1_1(ViewHostMsg_DatabaseGetFileSize,
string16 /* vfs file name */,
int32 /* a unique message ID */)
int64 /* the size of the given DB file */)
// Notifies the browser process that a new database has been opened
IPC_MESSAGE_CONTROL4(ViewHostMsg_DatabaseOpened,

@ -16,7 +16,6 @@
#include "chrome/common/appcache/appcache_dispatcher.h"
#include "chrome/common/chrome_switches.h"
#include "chrome/common/database_util.h"
#include "chrome/common/db_message_filter.h"
#include "chrome/common/render_messages.h"
#include "chrome/common/webmessageportchannel_impl.h"
#include "chrome/plugin/npobject_util.h"

@ -6,7 +6,6 @@
#include "base/logging.h"
#include "chrome/common/database_util.h"
#include "chrome/common/db_message_filter.h"
#include "chrome/common/render_messages.h"
#include "chrome/common/webmessageportchannel_impl.h"
#include "chrome/worker/worker_thread.h"

@ -36,6 +36,8 @@
'ipc_sync_channel.h',
'ipc_sync_message.cc',
'ipc_sync_message.h',
'ipc_sync_message_filter.cc',
'ipc_sync_message_filter.h',
],
'include_dirs': [
'..',

@ -13,6 +13,7 @@
#include "base/ref_counted.h"
#include "base/waitable_event_watcher.h"
#include "ipc/ipc_channel_proxy.h"
#include "ipc/ipc_sync_message.h"
namespace base {
class WaitableEvent;
@ -23,8 +24,8 @@ namespace IPC {
class SyncMessage;
class MessageReplyDeserializer;
// This is similar to IPC::ChannelProxy, with the added feature of supporting
// sending synchronous messages.
// This is similar to ChannelProxy, with the added feature of supporting sending
// synchronous messages.
// Note that care must be taken that the lifetime of the ipc_thread argument
// is more than this object. If the message loop goes away while this object
// is running and it's used to send a message, then it will use the invalid
@ -63,7 +64,7 @@ class SyncChannel : public ChannelProxy,
// Adds information about an outgoing sync message to the context so that
// we know how to deserialize the reply.
void Push(IPC::SyncMessage* sync_msg);
void Push(SyncMessage* sync_msg);
// Cleanly remove the top deserializer (and throw it away). Returns the
// result of the Send call for that message.
@ -96,7 +97,7 @@ class SyncChannel : public ChannelProxy,
private:
~SyncContext();
// IPC::ChannelProxy methods that we override.
// ChannelProxy methods that we override.
// Called on the listener thread.
virtual void Clear();
@ -113,18 +114,6 @@ class SyncChannel : public ChannelProxy,
// WaitableEventWatcher::Delegate implementation.
virtual void OnWaitableEventSignaled(base::WaitableEvent* arg);
// When sending a synchronous message, this structure contains an object
// that knows how to deserialize the response.
struct PendingSyncMsg {
PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d,
base::WaitableEvent* e) :
id(id), deserializer(d), done_event(e), send_result(false) { }
int id;
IPC::MessageReplyDeserializer* deserializer;
base::WaitableEvent* done_event;
bool send_result;
};
typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue;
PendingSyncMessageQueue deserializers_;
Lock deserializers_lock_;

@ -18,6 +18,7 @@
#include "base/waitable_event.h"
#include "ipc/ipc_message.h"
#include "ipc/ipc_sync_channel.h"
#include "ipc/ipc_sync_message_filter.h"
#include "testing/gtest/include/gtest/gtest.h"
@ -120,12 +121,14 @@ class Worker : public Channel::Listener, public Message::Sender {
}
Channel::Mode mode() { return mode_; }
WaitableEvent* done_event() { return done_.get(); }
WaitableEvent* shutdown_event() { return &shutdown_event_; }
void ResetChannel() { channel_.reset(); }
protected:
// Derived classes need to call this when they've completed their part of
// the test.
void Done() { done_->Signal(); }
protected:
IPC::SyncChannel* channel() { return channel_.get(); }
// Functions for dervied classes to implement if they wish.
virtual void Run() { }
virtual void OnAnswer(int* answer) { NOTREACHED(); }
@ -1054,3 +1057,61 @@ TEST_F(IPCSyncChannelTest, DoneEventRace) {
workers.push_back(new SimpleClient());
RunTest(workers);
}
//-----------------------------------------------------------------------------
namespace {
class TestSyncMessageFilter : public IPC::SyncMessageFilter {
public:
TestSyncMessageFilter(base::WaitableEvent* shutdown_event, Worker* worker)
: SyncMessageFilter(shutdown_event),
worker_(worker),
thread_("helper_thread") {
base::Thread::Options options;
options.message_loop_type = MessageLoop::TYPE_DEFAULT;
thread_.StartWithOptions(options);
}
virtual void OnFilterAdded(Channel* channel) {
SyncMessageFilter::OnFilterAdded(channel);
thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
this, &TestSyncMessageFilter::SendMessageOnHelperThread));
}
void SendMessageOnHelperThread() {
int answer = 0;
bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
DCHECK(result);
DCHECK_EQ(answer, 42);
worker_->Done();
}
Worker* worker_;
base::Thread thread_;
};
class SyncMessageFilterServer : public Worker {
public:
SyncMessageFilterServer()
: Worker(Channel::MODE_SERVER, "sync_message_filter_server") {
filter_ = new TestSyncMessageFilter(shutdown_event(), this);
}
void Run() {
channel()->AddFilter(filter_.get());
}
scoped_refptr<TestSyncMessageFilter> filter_;
};
} // namespace
// Tests basic synchronous call
TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
std::vector<Worker*> workers;
workers.push_back(new SyncMessageFilterServer());
workers.push_back(new SimpleClient());
RunTest(workers);
}

@ -1,4 +1,4 @@
// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@ -9,16 +9,18 @@
#endif
#include <stack>
#include "base/atomic_sequence_num.h"
#include "base/logging.h"
#include "base/waitable_event.h"
#include "ipc/ipc_sync_message.h"
namespace IPC {
uint32 SyncMessage::next_id_ = 0;
#define kSyncMessageHeaderSize 4
base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true);
static base::AtomicSequenceNumber g_next_id(base::LINKER_INITIALIZED);
static base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true);
SyncMessage::SyncMessage(
int32 routing_id,
@ -34,7 +36,7 @@ SyncMessage::SyncMessage(
// Add synchronous message data before the message payload.
SyncHeader header;
header.message_id = ++next_id_;
header.message_id = g_next_id.GetNext();
WriteSyncHeader(this, header);
}

@ -1,4 +1,4 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@ -77,8 +77,6 @@ class SyncMessage : public Message {
MessageReplyDeserializer* deserializer_;
base::WaitableEvent* pump_messages_event_;
static uint32 next_id_; // for generation of unique ids
};
// Used to deserialize parameters from a reply to a synchronous message
@ -92,6 +90,19 @@ class MessageReplyDeserializer {
virtual bool SerializeOutputParameters(const Message& msg, void* iter) = 0;
};
// When sending a synchronous message, this structure contains an object
// that knows how to deserialize the response.
struct PendingSyncMsg {
PendingSyncMsg(int id,
MessageReplyDeserializer* d,
base::WaitableEvent* e)
: id(id), deserializer(d), done_event(e), send_result(false) { }
int id;
MessageReplyDeserializer* deserializer;
base::WaitableEvent* done_event;
bool send_result;
};
} // namespace IPC
#endif // IPC_IPC_SYNC_MESSAGE_H_

@ -0,0 +1,126 @@
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipc/ipc_sync_message_filter.h"
#include "base/logging.h"
#include "base/message_loop.h"
#include "ipc/ipc_sync_message.h"
namespace IPC {
SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event)
: channel_(NULL),
listener_loop_(MessageLoop::current()),
io_loop_(NULL),
shutdown_event_(shutdown_event) {
}
SyncMessageFilter::~SyncMessageFilter() {
}
bool SyncMessageFilter::Send(Message* message) {
{
AutoLock auto_lock(lock_);
if (!io_loop_) {
delete message;
return false;
}
}
if (!message->is_sync()) {
io_loop_->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message));
return true;
}
base::WaitableEvent done_event(true, false);
PendingSyncMsg pending_message(
SyncMessage::GetMessageId(*message),
reinterpret_cast<SyncMessage*>(message)->GetReplyDeserializer(),
&done_event);
{
AutoLock auto_lock(lock_);
// Can't use this class on the main thread or else it can lead to deadlocks.
// Also by definition, can't use this on IO thread since we're blocking it.
DCHECK(MessageLoop::current() != listener_loop_);
DCHECK(MessageLoop::current() != io_loop_);
pending_sync_messages_[MessageLoop::current()] = &pending_message;
}
io_loop_->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message));
base::WaitableEvent* events[2] = { shutdown_event_, &done_event };
base::WaitableEvent::WaitMany(events, 2);
{
AutoLock auto_lock(lock_);
delete pending_message.deserializer;
pending_sync_messages_.erase(MessageLoop::current());
}
return pending_message.send_result;
}
void SyncMessageFilter::SendOnIOThread(Message* message) {
if (channel_) {
channel_->Send(message);
return;
}
if (message->is_sync()) {
// We don't know which thread sent it, but it doesn't matter, just signal
// them all.
SignalAllEvents();
}
delete message;
}
void SyncMessageFilter::SignalAllEvents() {
AutoLock auto_lock(lock_);
for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
iter != pending_sync_messages_.end(); ++iter) {
iter->second->done_event->Signal();
}
}
void SyncMessageFilter::OnFilterAdded(Channel* channel) {
channel_ = channel;
AutoLock auto_lock(lock_);
io_loop_ = MessageLoop::current();
}
void SyncMessageFilter::OnChannelError() {
channel_ = NULL;
SignalAllEvents();
}
void SyncMessageFilter::OnChannelClosing() {
channel_ = NULL;
SignalAllEvents();
}
bool SyncMessageFilter::OnMessageReceived(const Message& message) {
AutoLock auto_lock(lock_);
for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
iter != pending_sync_messages_.end(); ++iter) {
if (SyncMessage::IsMessageReplyTo(message, iter->second->id)) {
if (!message.is_reply_error()) {
iter->second->send_result =
iter->second->deserializer->SerializeOutputParameters(message);
}
iter->second->done_event->Signal();
return true;
}
}
return false;
}
} // namespace IPC

@ -0,0 +1,79 @@
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef IPC_IPC_SYNC_MESSAGE_FILTER_H_
#define IPC_IPC_SYNC_MESSAGE_FILTER_H_
#include "base/basictypes.h"
#include "base/hash_tables.h"
#include "base/lock.h"
#include "base/ref_counted.h"
#include "base/waitable_event.h"
#include "ipc/ipc_channel_proxy.h"
#include "ipc/ipc_sync_message.h"
class MessageLoop;
#if defined(COMPILER_GCC)
// Allows us to use MessageLoop in a hash_map with gcc (MSVC is okay without
// specifying this).
namespace __gnu_cxx {
template<>
struct hash<MessageLoop*> {
size_t operator()(MessageLoop* message_loop) const {
return reinterpret_cast<size_t>(message_loop);
}
};
}
#endif
namespace IPC {
class MessageReplyDeserializer;
// This MessageFilter allows sending synchronous IPC messages from a thread
// other than the listener thread associated with the SyncChannel. It does not
// support fancy features that SyncChannel does, such as handling recursion or
// receiving messages while waiting for a response. Note that this object can
// be used to send simultaneous synchronous messages from different threads.
class SyncMessageFilter : public ChannelProxy::MessageFilter,
public Message::Sender {
public:
explicit SyncMessageFilter(base::WaitableEvent* shutdown_event);
virtual ~SyncMessageFilter();
// Message::Sender implementation.
virtual bool Send(Message* message);
// ChannelProxy::MessageFilter implementation.
virtual void OnFilterAdded(Channel* channel);
virtual void OnChannelError();
virtual void OnChannelClosing();
virtual bool OnMessageReceived(const Message& message);
private:
void SendOnIOThread(Message* message);
// Signal all the pending sends as done, used in an error condition.
void SignalAllEvents();
// The channel to which this filter was added.
Channel* channel_;
MessageLoop* listener_loop_; // The process's main thread.
MessageLoop* io_loop_; // The message loop where the Channel lives.
typedef base::hash_map<MessageLoop*, PendingSyncMsg*> PendingSyncMessages;
PendingSyncMessages pending_sync_messages_;
// Locks data members above.
Lock lock_;
base::WaitableEvent* shutdown_event_;
DISALLOW_COPY_AND_ASSIGN(SyncMessageFilter);
};
} // namespace IPC
#endif // IPC_IPC_SYNC_MESSAGE_FILTER_H_