0

[mojo] Introduce an async flush API

This introduces a new kind of asynchronous flushing API for Mojo
endpoints, allowing independent pipes to synchronize against each
others' message queue.

Remote and Receiver both introduce |FlushAsync()| methods for
initiating a flush of their peer's message queue. Such operations
are tied to a corresponding PendingFlush object which can be used
to remotely observe completion of the async flush.

Remote also introduces |PauseReceiverUntilFlushCompletes()| to pause
its receiver's message queue until a given PendingFlush observes
completion. Receiver introduces an analogous
|PauseRemoteCallbacksUntilFlushCompletes()| method for essentially
the same purpose, in the reverse direction.

Combined, these APIs allow arbitrary pipes to synchronize
against each other even across a process boundary. This
synchronization mechanism can be used in cases where associated
interfaces are impossible to use and may eventually serve as a
complete replacement for associated interfaces.

Bug: 1040226
Change-Id: I3f8aaa39dc0e25b2c2d460cd7be1dbc4a2c0b11d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1968240
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Reviewed-by: Darin Fisher <darin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#730829}
This commit is contained in:
Ken Rockot
2020-01-13 21:13:46 +00:00
committed by Commit Bot
parent a29d4a3a0e
commit eb2366a5de
22 changed files with 963 additions and 26 deletions

@ -989,6 +989,12 @@ class ChannelAssociatedGroupController
return true;
}
bool WaitForFlushToComplete(
mojo::ScopedMessagePipeHandle flush_pipe) override {
// We don't support async flushing on the IPC Channel pipe.
return false;
}
// Checked in places which must be run on the master endpoint's thread.
base::ThreadChecker thread_checker_;

@ -129,6 +129,8 @@ component("bindings") {
"associated_receiver.h",
"associated_receiver_set.h",
"associated_remote.h",
"async_flusher.cc",
"async_flusher.h",
"binder_map.cc",
"binder_map.h",
"binding.h",
@ -183,6 +185,8 @@ component("bindings") {
"native_enum.h",
"pending_associated_receiver.h",
"pending_associated_remote.h",
"pending_flush.cc",
"pending_flush.h",
"pending_receiver.h",
"pending_remote.h",
"pipe_control_message_handler.h",
@ -240,9 +244,7 @@ component("bindings") {
"//mojo/public/interfaces/bindings",
]
deps = [
"//ipc:native_handle_type_converters",
]
deps = [ "//ipc:native_handle_type_converters" ]
defines = [ "IS_MOJO_CPP_BINDINGS_IMPL" ]
}

@ -0,0 +1,27 @@
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/async_flusher.h"
#include <utility>
namespace mojo {
AsyncFlusher::AsyncFlusher() = default;
AsyncFlusher::AsyncFlusher(AsyncFlusher&&) = default;
AsyncFlusher& AsyncFlusher::operator=(AsyncFlusher&&) = default;
AsyncFlusher::~AsyncFlusher() = default;
void AsyncFlusher::SetPipe(ScopedMessagePipeHandle pipe) {
pipe_ = std::move(pipe);
}
ScopedMessagePipeHandle AsyncFlusher::PassPipe() {
return std::move(pipe_);
}
} // namespace mojo

@ -0,0 +1,54 @@
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_CPP_BINDINGS_ASYNC_FLUSHER_H_
#define MOJO_PUBLIC_CPP_BINDINGS_ASYNC_FLUSHER_H_
#include "base/component_export.h"
#include "mojo/public/cpp/system/message_pipe.h"
namespace mojo {
class PendingFlush;
class PipeControlMessageProxy;
// An object that can be consumed by |FlushAsync()| on a Remote or Receiver in
// order to perform an asynchronous flush operation on the object. Every
// AsyncFlusher is associated with a PendingFlush object which can be monitored
// or consumed to remotely observe completion of the corresponding flush
// operation.
//
// NOTE: Most commonly for asynchronous flush operations, |FlushAsync()| can
// be called on a Remote or Receiver with no arguments. This creates an
// AsyncFlusher/PendingFlush pair and immediately flushes the callee with the
// resulting AsyncFlusher. The entangled PendingFlush is returned for subsequent
// consumption.
//
// Direct use of AsyncFlusher (and in particular of the PendingFlush constructor
// which takes an AsyncFlusher* argument to initialize) is reserved for edge
// cases where a PendingFlush is needed before its corresponding flush operation
// can be initiated (e.g. when the interface to flush lives on a different
// thread from the interface that will wait on its PendingFlush).
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) AsyncFlusher {
public:
AsyncFlusher();
AsyncFlusher(AsyncFlusher&&);
AsyncFlusher(const AsyncFlusher&) = delete;
AsyncFlusher& operator=(AsyncFlusher&&);
AsyncFlusher& operator=(const AsyncFlusher&) = delete;
~AsyncFlusher();
private:
friend class PendingFlush;
friend class PipeControlMessageProxy;
void SetPipe(ScopedMessagePipeHandle pipe);
ScopedMessagePipeHandle PassPipe();
ScopedMessagePipeHandle pipe_;
};
} // namespace mojo
#endif // MOJO_PUBLIC_CPP_BINDINGS_ASYNC_FLUSHER_H_

@ -41,6 +41,15 @@ bool BindingStateBase::WaitForIncomingMethodCall(MojoDeadline deadline) {
return router_->WaitForIncomingMessage(deadline);
}
void BindingStateBase::PauseRemoteCallbacksUntilFlushCompletes(
PendingFlush flush) {
router_->PausePeerUntilFlushCompletes(std::move(flush));
}
void BindingStateBase::FlushAsync(AsyncFlusher flusher) {
router_->FlushAsync(std::move(flusher));
}
void BindingStateBase::Close() {
if (!router_)
return;

@ -17,6 +17,7 @@
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/connection_error_callback.h"
#include "mojo/public/cpp/bindings/connection_group.h"
#include "mojo/public/cpp/bindings/interface_endpoint_client.h"
@ -27,6 +28,7 @@
#include "mojo/public/cpp/bindings/lib/multiplex_router.h"
#include "mojo/public/cpp/bindings/lib/pending_receiver_state.h"
#include "mojo/public/cpp/bindings/message_header_validator.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
#include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
#include "mojo/public/cpp/system/core.h"
@ -49,6 +51,9 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) BindingStateBase {
bool WaitForIncomingMethodCall(
MojoDeadline deadline = MOJO_DEADLINE_INDEFINITE);
void PauseRemoteCallbacksUntilFlushCompletes(PendingFlush flush);
void FlushAsync(AsyncFlusher flusher);
void Close();
void CloseWithReason(uint32_t custom_reason, const std::string& description);

@ -34,6 +34,15 @@ void InterfacePtrStateBase::RequireVersion(uint32_t version) {
endpoint_client_->RequireVersion(version);
}
void InterfacePtrStateBase::PauseReceiverUntilFlushCompletes(
PendingFlush flush) {
router_->PausePeerUntilFlushCompletes(std::move(flush));
}
void InterfacePtrStateBase::FlushAsync(AsyncFlusher flusher) {
router_->FlushAsync(std::move(flusher));
}
void InterfacePtrStateBase::Swap(InterfacePtrStateBase* other) {
using std::swap;
swap(other->router_, router_);

@ -23,6 +23,7 @@
#include "base/sequenced_task_runner.h"
#include "base/time/time.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/connection_error_callback.h"
#include "mojo/public/cpp/bindings/interface_endpoint_client.h"
#include "mojo/public/cpp/bindings/interface_id.h"
@ -30,6 +31,7 @@
#include "mojo/public/cpp/bindings/lib/multiplex_router.h"
#include "mojo/public/cpp/bindings/lib/pending_remote_state.h"
#include "mojo/public/cpp/bindings/message_header_validator.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
#include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
namespace mojo {
@ -80,6 +82,8 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfacePtrStateBase {
void QueryVersion(base::OnceCallback<void(uint32_t)> callback);
void RequireVersion(uint32_t version);
void PauseReceiverUntilFlushCompletes(PendingFlush flush);
void FlushAsync(AsyncFlusher flusher);
void Swap(InterfacePtrStateBase* other);
void Bind(PendingRemoteState* remote_state,
scoped_refptr<base::SequencedTaskRunner> task_runner);
@ -146,6 +150,16 @@ class InterfacePtrState : public InterfacePtrStateBase {
InterfacePtrStateBase::RequireVersion(version);
}
void PauseReceiverUntilFlushCompletes(PendingFlush flush) {
ConfigureProxyIfNecessary();
InterfacePtrStateBase::PauseReceiverUntilFlushCompletes(std::move(flush));
}
void FlushAsync(AsyncFlusher flusher) {
ConfigureProxyIfNecessary();
InterfacePtrStateBase::FlushAsync(std::move(flusher));
}
void FlushForTesting() {
ConfigureProxyIfNecessary();
endpoint_client()->FlushForTesting();

@ -514,6 +514,8 @@ bool MultiplexRouter::PrefersSerializedMessages() {
void MultiplexRouter::CloseMessagePipe() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connector_.CloseMessagePipe();
flush_pipe_watcher_.reset();
active_flush_pipe_.reset();
// CloseMessagePipe() above won't trigger connection error handler.
// Explicitly call OnPipeConnectionError() so that associated endpoints will
// get notified.
@ -521,22 +523,28 @@ void MultiplexRouter::CloseMessagePipe() {
}
void MultiplexRouter::PauseIncomingMethodCallProcessing() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connector_.PauseIncomingMethodCallProcessing();
MayAutoLock locker(&lock_);
paused_ = true;
for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
iter->second->ResetSyncMessageSignal();
PauseInternal(/*must_resume_manually=*/true);
}
void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// If the owner is manually resuming from a previous pause request, the
// interface may also still be paused due to waiting on a pending async flush
// in the system.
//
// In that case we ignore the caller, except to subsequently allow implicit
// resume once the pending flush operation is finished.
if (active_flush_pipe_) {
MayAutoLock locker(&lock_);
must_resume_manually_ = false;
return;
}
connector_.ResumeIncomingMethodCallProcessing();
MayAutoLock locker(&lock_);
paused_ = false;
must_resume_manually_ = false;
for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
auto sync_iter = sync_message_tasks_.find(iter->first);
@ -550,6 +558,14 @@ void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
}
void MultiplexRouter::FlushAsync(AsyncFlusher flusher) {
control_message_proxy_.FlushAsync(std::move(flusher));
}
void MultiplexRouter::PausePeerUntilFlushCompletes(PendingFlush flush) {
control_message_proxy_.PausePeerUntilFlushCompletes(std::move(flush));
}
bool MultiplexRouter::HasAssociatedEndpoints() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
MayAutoLock locker(&lock_);
@ -656,6 +672,31 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
return true;
}
bool MultiplexRouter::WaitForFlushToComplete(ScopedMessagePipeHandle pipe) {
// If this MultiplexRouter has an associated interface on some task runner
// other than the master interface's task runner, it is possible to process
// incoming control messages on that task runner. We don't support this
// control message on anything but the main interface though.
if (!task_runner_->RunsTasksInCurrentSequence())
return false;
flush_pipe_watcher_.emplace(FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL,
task_runner_);
flush_pipe_watcher_->Watch(
pipe.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&MultiplexRouter::OnFlushPipeSignaled, this));
if (flush_pipe_watcher_->Arm() != MOJO_RESULT_OK) {
// The peer must already be closed, so consider the flush to be complete.
flush_pipe_watcher_.reset();
return true;
}
active_flush_pipe_ = std::move(pipe);
PauseInternal(/*must_resume_manually=*/false);
return true;
}
void MultiplexRouter::OnPipeConnectionError(bool force_async_dispatch) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
@ -688,6 +729,35 @@ void MultiplexRouter::OnPipeConnectionError(bool force_async_dispatch) {
ProcessTasks(call_behavior, connector_.task_runner());
}
void MultiplexRouter::OnFlushPipeSignaled(MojoResult result,
const HandleSignalsState& state) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
flush_pipe_watcher_.reset();
active_flush_pipe_.reset();
// If there is not an explicit Pause waiting for a Resume, we can unpause.
if (!must_resume_manually_)
ResumeIncomingMethodCallProcessing();
}
void MultiplexRouter::PauseInternal(bool must_resume_manually) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connector_.PauseIncomingMethodCallProcessing();
MayAutoLock locker(&lock_);
paused_ = true;
for (auto& entry : endpoints_)
entry.second->ResetSyncMessageSignal();
// We do not want to override this to |false| if it's already |true|. If it's
// ever |true|, that means there's been at least one explicit Pause call since
// the last Resume and we must never unpause until at least one call to Resume
// is made.
must_resume_manually_ = must_resume_manually_ || must_resume_manually;
}
void MultiplexRouter::ProcessTasks(
ClientCallBehavior client_call_behavior,
base::SequencedTaskRunner* current_task_runner) {

@ -24,11 +24,13 @@
#include "base/sequenced_task_runner.h"
#include "base/synchronization/lock.h"
#include "mojo/public/cpp/bindings/associated_group_controller.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/connection_group.h"
#include "mojo/public/cpp/bindings/connector.h"
#include "mojo/public/cpp/bindings/interface_id.h"
#include "mojo/public/cpp/bindings/message_dispatcher.h"
#include "mojo/public/cpp/bindings/message_header_validator.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
#include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
@ -139,6 +141,14 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MultiplexRouter
void PauseIncomingMethodCallProcessing();
void ResumeIncomingMethodCallProcessing();
// Initiates an async flush operation. |flusher| signals its corresponding
// PendingFlush when the flush is actually complete.
void FlushAsync(AsyncFlusher flusher);
// Pauses the peer endpoint's message processing until a (potentially remote)
// flush operation corresponding to |flush| is completed.
void PausePeerUntilFlushCompletes(PendingFlush flush);
// Whether there are any associated interfaces running currently.
bool HasAssociatedEndpoints() const;
@ -180,8 +190,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MultiplexRouter
bool OnPeerAssociatedEndpointClosed(
InterfaceId id,
const base::Optional<DisconnectReason>& reason) override;
bool WaitForFlushToComplete(ScopedMessagePipeHandle flush_pipe) override;
void OnPipeConnectionError(bool force_async_dispatch);
void OnFlushPipeSignaled(MojoResult result, const HandleSignalsState& state);
void PauseInternal(bool must_resume_manually);
// Specifies whether we are allowed to directly call into
// InterfaceEndpointClient (given that we are already on the same sequence as
@ -257,6 +270,10 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MultiplexRouter
MessageDispatcher dispatcher_;
Connector connector_;
// Active whenever dispatch is blocked by a pending remote flush.
ScopedMessagePipeHandle active_flush_pipe_;
base::Optional<mojo::SimpleWatcher> flush_pipe_watcher_;
SEQUENCE_CHECKER(sequence_checker_);
// Protects the following members.
@ -280,8 +297,16 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MultiplexRouter
bool encountered_error_ = false;
// Indicates whether this router is paused, meaning it is not currently
// listening for or dispatching available inbound messages.
bool paused_ = false;
// If this router is paused, this indicates whether the pause is due to an
// explicit call to |PauseIncomingMethodCallProcessing()| when |true|, or
// due implicit pause when waiting on an async flush operation when |false|.
// When |paused_| is |false|, this value is ignored.
bool must_resume_manually_ = false;
bool testing_mode_ = false;
bool being_destructed_ = false;

@ -43,8 +43,8 @@ bool PipeControlMessageHandler::Accept(Message* message) {
bool PipeControlMessageHandler::Validate(Message* message) {
internal::ValidationContext validation_context(
message->payload(), message->payload_num_bytes(), 0, 0, message,
description_.c_str());
message->payload(), message->payload_num_bytes(),
message->handles()->size(), 0, message, description_.c_str());
if (message->name() == pipe_control::kRunOrClosePipeMessageId) {
if (!internal::ValidateMessageIsRequestWithoutResponse(
@ -53,7 +53,7 @@ bool PipeControlMessageHandler::Validate(Message* message) {
}
return internal::ValidateMessagePayload<
pipe_control::internal::RunOrClosePipeMessageParams_Data>(
message, &validation_context);
message, &validation_context);
}
return false;
@ -65,6 +65,7 @@ bool PipeControlMessageHandler::RunOrClosePipe(Message* message) {
reinterpret_cast<
pipe_control::internal::RunOrClosePipeMessageParams_Data*>(
message->mutable_payload());
context.TakeHandlesFromMessage(message);
pipe_control::RunOrClosePipeMessageParamsPtr params_ptr;
internal::Deserialize<pipe_control::RunOrClosePipeMessageParamsDataView>(
params, &params_ptr, &context);
@ -81,6 +82,19 @@ bool PipeControlMessageHandler::RunOrClosePipe(Message* message) {
return delegate_->OnPeerAssociatedEndpointClosed(event->id, reason);
}
if (params_ptr->input->is_flush_async()) {
// NOTE: There's nothing to do here but let the attached pipe go out of
// scoped and be closed. This means that the corresponding PendingFlush will
// eventually be signalled, unblocking the endpoint which is waiting on it,
// if any.
return true;
}
if (params_ptr->input->is_pause_until_flush_completes()) {
return delegate_->WaitForFlushToComplete(std::move(
params_ptr->input->get_pause_until_flush_completes()->flush_pipe));
}
DVLOG(1) << "Unsupported command in a RunOrClosePipe message pipe control "
<< "message. Closing the pipe.";
return false;

@ -30,6 +30,7 @@ Message ConstructRunOrClosePipeMessage(
params_ptr, message.payload_buffer(), &params, &context);
message.set_interface_id(kInvalidInterfaceId);
message.set_heap_profiler_tag(kMessageTag);
message.AttachHandlesFromSerializationContext(&context);
return message;
}
@ -46,6 +47,23 @@ void PipeControlMessageProxy::NotifyPeerEndpointClosed(
ignore_result(receiver_->Accept(&message));
}
void PipeControlMessageProxy::PausePeerUntilFlushCompletes(PendingFlush flush) {
auto input = pipe_control::RunOrClosePipeInput::New();
input->set_pause_until_flush_completes(
pipe_control::PauseUntilFlushCompletes::New(flush.PassPipe()));
Message message(ConstructRunOrClosePipeMessage(std::move(input)));
message.set_heap_profiler_tag(kMessageTag);
ignore_result(receiver_->Accept(&message));
}
void PipeControlMessageProxy::FlushAsync(AsyncFlusher flusher) {
auto input = pipe_control::RunOrClosePipeInput::New();
input->set_flush_async(pipe_control::FlushAsync::New(flusher.PassPipe()));
Message message(ConstructRunOrClosePipeMessage(std::move(input)));
message.set_heap_profiler_tag(kMessageTag);
ignore_result(receiver_->Accept(&message));
}
// static
Message PipeControlMessageProxy::ConstructPeerEndpointClosedMessage(
InterfaceId id,

@ -0,0 +1,30 @@
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/pending_flush.h"
#include <utility>
#include "mojo/public/cpp/bindings/async_flusher.h"
namespace mojo {
PendingFlush::PendingFlush(AsyncFlusher* flusher) {
ScopedMessagePipeHandle flusher_pipe;
CreateMessagePipe(/*options=*/nullptr, &pipe_, &flusher_pipe);
flusher->SetPipe(std::move(flusher_pipe));
}
PendingFlush::PendingFlush(PendingFlush&& other) = default;
PendingFlush& PendingFlush::operator=(PendingFlush&& other) = default;
PendingFlush::~PendingFlush() = default;
ScopedMessagePipeHandle PendingFlush::PassPipe() {
DCHECK(pipe_) << "This PendingFlush has already been consumed.";
return std::move(pipe_);
}
} // namespace mojo

@ -0,0 +1,73 @@
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_CPP_BINDINGS_PENDING_FLUSH_H_
#define MOJO_PUBLIC_CPP_BINDINGS_PENDING_FLUSH_H_
#include "base/component_export.h"
#include "mojo/public/cpp/system/message_pipe.h"
namespace mojo {
class AsyncFlusher;
// A PendingFlush represents an asynchronous flush operation on an arbitrary
// (and potentially remote) interface pipe. This is generally used to allow
// another pipe in the system to pause its own message processing until the
// original pipe has been flushed. As such, it's a useful primitive for
// arbitrarily complex synchronization operations across the system.
//
// The most common way to create a PendingFlush is to call |FlushAsync()| with
// arguments on a Remote or Receiver. For example, consider a storage API with a
// central control interface as well as multiple independent writers:
//
// Remote<mojom::Storage> storage = ...;
// Remote<mojom::Writer> writer1, writer2;
// storage->GetWriter(writer1.BindNewPipeAndPassReceiver());
// storage->GetWriter(writer2.BindNewPipeAndPassReceiver());
//
// Suppose we want to issue some commands on each Writer, followed by a query on
// the remote Storage object; but we want to ensure that the Storage query is
// not dispatched until all previous Writer operations are dispatched. We could
// write something like:
//
// writer1->Put(...);
// storage.PauseReceiverUntilFlushCompletes(writer1.FlushAsync());
// writer2->Put(...);
// storage.PauseReceiverUntilFlushCompletes(writer2.FlushAsync());
// storage->Query(...);
//
// This effectively guarantees that the |Query()| call will never dispatch on
// the Storage receiver before both |Put()| calls have dispatched on their
// respective Writer receivers. This holds even if the receiving endpoints are
// all in different processes.
//
// Note that |FlushAsync()| returns a PendingFlush object. For some use cases,
// it may be desirable to create a PendingFlush before issuing a corresponding
// |FlushAsync()| call. In that case, use the single-argument constructor
// defined below.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) PendingFlush {
public:
// Constructs a new PendingFlush associated with |*flusher|. |*flusher| should
// be a default-constructed AsyncFlusher, and once it is initialized by this
// constructor it should be used to flush some Remote or Receiver using their
// |FlushAsyncWithFlusher()| method.
explicit PendingFlush(AsyncFlusher* flusher);
PendingFlush(PendingFlush&&);
PendingFlush(const PendingFlush&) = delete;
PendingFlush& operator=(PendingFlush&&);
PendingFlush& operator=(const PendingFlush&) = delete;
~PendingFlush();
private:
friend class PipeControlMessageProxy;
ScopedMessagePipeHandle PassPipe();
ScopedMessagePipeHandle pipe_;
};
} // namespace mojo
#endif // MOJO_PUBLIC_CPP_BINDINGS_PENDING_FLUSH_H_

@ -20,6 +20,10 @@ class PipeControlMessageHandlerDelegate {
InterfaceId id,
const base::Optional<DisconnectReason>& reason) = 0;
// The implementation should cease dispatching messages until the
// |flush_pipe|'s peer is closed.
virtual bool WaitForFlushToComplete(ScopedMessagePipeHandle flush_pipe) = 0;
protected:
virtual ~PipeControlMessageHandlerDelegate() {}
};

@ -8,10 +8,12 @@
#include "base/component_export.h"
#include "base/macros.h"
#include "base/optional.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/disconnect_reason.h"
#include "mojo/public/cpp/bindings/interface_id.h"
#include "mojo/public/cpp/bindings/lib/serialization_context.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
namespace mojo {
@ -28,6 +30,8 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) PipeControlMessageProxy {
void NotifyPeerEndpointClosed(InterfaceId id,
const base::Optional<DisconnectReason>& reason);
void PausePeerUntilFlushCompletes(PendingFlush flush);
void FlushAsync(AsyncFlusher flusher);
static Message ConstructPeerEndpointClosedMessage(
InterfaceId id,

@ -13,10 +13,12 @@
#include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/sequenced_task_runner.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/connection_error_callback.h"
#include "mojo/public/cpp/bindings/connection_group.h"
#include "mojo/public/cpp/bindings/interface_request.h"
#include "mojo/public/cpp/bindings/lib/binding_state.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/bindings/raw_ptr_impl_ref_traits.h"
#include "mojo/public/cpp/system/message_pipe.h"
@ -205,6 +207,49 @@ class Receiver {
return internal_state_.WaitForIncomingMethodCall(MOJO_DEADLINE_INDEFINITE);
}
// Pauses the Remote endpoint, stopping dispatch of callbacks on that end. Any
// callbacks called prior to this will dispatch before the Remote endpoint is
// paused; any callbacks called after this will only be called once the flush
// operation corresponding to |flush| is completed or canceled.
//
// See documentation for |FlushAsync()| on Remote and Receiver for how to
// acquire a PendingFlush object, and documentation on PendingFlush for
// example usage.
void PauseRemoteCallbacksUntilFlushCompletes(PendingFlush flush) {
internal_state_.PauseRemoteCallbacksUntilFlushCompletes(std::move(flush));
}
// Flushes the Remote endpoint asynchronously using |flusher|. The
// corresponding PendingFlush will be notified only once all response
// callbacks issued prior to this operation have been dispatched at the Remote
// endpoint.
//
// NOTE: It is more common to use |FlushAsync()| defined below. If you really
// want to provide your own AsyncFlusher using this method, see the
// single-arugment constructor on PendingFlush. This would typically be used
// when code executing on the current sequence wishes to immediately pause
// one of its remote endpoints to wait on a flush operation that needs to be
// initiated on a separate sequence. Rather than bouncing to the second
// sequence to initiate a flush and then passing a PendingFlush back to the
// original sequence, the AsyncFlusher/PendingFlush can be created on the
// original sequence and a single task can be posted to pass the AsyncFlusher
// to the second sequence for use with this method.
void FlushAsyncWithFlusher(AsyncFlusher flusher) {
internal_state_.FlushAsync(std::move(flusher));
}
// Same as above but an AsyncFlusher/PendingFlush pair is created on the
// caller's behalf. The AsyncFlusher is immediately passed to a
// |FlushAsyncWithFlusher()| call on this object, while the PendingFlush is
// returned for use by the caller. See documentation on PendingFlush for
// example usage.
PendingFlush FlushAsync() {
AsyncFlusher flusher;
PendingFlush flush(&flusher);
FlushAsyncWithFlusher(std::move(flusher));
return flush;
}
// Flushes any replies previously sent by the Receiver, only unblocking once
// acknowledgement from the Remote is received.
void FlushForTesting() { internal_state_.FlushForTesting(); }

@ -15,8 +15,10 @@
#include "base/memory/scoped_refptr.h"
#include "base/sequenced_task_runner.h"
#include "base/time/time.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/interface_ptr_info.h"
#include "mojo/public/cpp/bindings/lib/interface_ptr_state.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/system/message_pipe.h"
@ -307,6 +309,51 @@ class Remote {
internal_state_.RequireVersion(version);
}
// Pauses the receiving endpoint until the flush corresponding to |flush| has
// completed. Any calls made on this Remote prior to this call will be
// dispatched at the receiving endpoint before pausing. The endpoint will not
// dispatch any subsequent calls until the flush operation corresponding to
// |flush| has been completed or canceled.
//
// See documentation for |FlushAsync()| on Remote and Receiver for how to
// acquire a PendingFlush object, and documentation on PendingFlush for
// example usage.
void PauseReceiverUntilFlushCompletes(PendingFlush flush) {
internal_state_.PauseReceiverUntilFlushCompletes(std::move(flush));
}
// Flushes the receiving endpoint asynchronously using |flusher|. Once all
// calls made on this Remote prior to this |FlushAsyncWithFlusher()| call have
// dispatched at the receiving endpoint, |flusher| will signal its
// corresponding PendingFlush, unblocking any endpoint waiting on the flush
// operation.
//
// NOTE: It is more common to use |FlushAsync()| defined below. If you really
// want to provide your own AsyncFlusher using this method, see the
// single-arugment constructor on PendingFlush. This would typically be used
// when code executing on the current sequence wishes to immediately pause
// one of its remote endpoints to wait on a flush operation that needs to be
// initiated on a separate sequence. Rather than bouncing to the second
// sequence to initiate a flush and then passing a PendingFlush back to the
// original sequence, the AsyncFlusher/PendingFlush can be created on the
// original sequence and a single task can be posted to pass the AsyncFlusher
// to the second sequence for use with this method.
void FlushAsyncWithFlusher(AsyncFlusher flusher) {
internal_state_.FlushAsync(std::move(flusher));
}
// Same as above but an AsyncFlusher/PendingFlush pair is created on the
// caller's behalf. The AsyncFlusher is immediately passed to a
// |FlushAsyncWithFlusher()| call on this object, while the PendingFlush is
// returned for use by the caller. See documentation on PendingFlush for
// example usage.
PendingFlush FlushAsync() {
AsyncFlusher flusher;
PendingFlush flush(&flusher);
FlushAsyncWithFlusher(std::move(flusher));
return flush;
}
// Sends a no-op message on the underlying message pipe and runs the current
// message loop until its response is received. This can be used in tests to
// verify that no message was sent on a message pipe in response to some

@ -23,6 +23,7 @@ source_set("tests") {
"data_view_unittest.cc",
"enum_headers_unittest.cc",
"equals_unittest.cc",
"flush_async_unittest.cc",
"handle_passing_unittest.cc",
"hash_unittest.cc",
"idle_tracking_unittest.cc",
@ -76,9 +77,7 @@ source_set("tests") {
"//testing/gtest",
]
data = [
"//mojo/public/interfaces/bindings/tests/data/validation/",
]
data = [ "//mojo/public/interfaces/bindings/tests/data/validation/" ]
if (is_ios) {
assert_no_deps = [ "//third_party/blink/*" ]
@ -141,6 +140,7 @@ mojom("test_mojom") {
"binder_map_unittest.test-mojom",
"connection_group_unittest.test-mojom",
"enum_headers_unittest.test-mojom",
"flush_async_unittest.test-mojom",
"idle_tracking_unittest.test-mojom",
"receiver_unittest.test-mojom",
"remote_unittest.test-mojom",
@ -148,9 +148,7 @@ mojom("test_mojom") {
"struct_headers_unittest.test-mojom",
]
public_deps = [
"//mojo/public/mojom/base",
]
public_deps = [ "//mojo/public/mojom/base" ]
support_lazy_serialization = true
}
@ -158,9 +156,7 @@ mojom("test_mojom") {
source_set("perftests") {
testonly = true
sources = [
"bindings_perftest.cc",
]
sources = [ "bindings_perftest.cc" ]
if (!is_ios) {
sources += [ "e2e_perftest.cc" ]
@ -184,7 +180,5 @@ source_set("mojo_public_bindings_test_utils") {
"validation_test_input_parser.h",
]
deps = [
"//mojo/public/c/system",
]
deps = [ "//mojo/public/c/system" ]
}

@ -0,0 +1,451 @@
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <memory>
#include <utility>
#include <vector>
#include "base/barrier_closure.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/containers/flat_map.h"
#include "base/memory/ref_counted.h"
#include "base/optional.h"
#include "base/synchronization/lock.h"
#include "base/task/post_task.h"
#include "base/test/bind_test_util.h"
#include "mojo/public/cpp/bindings/async_flusher.h"
#include "mojo/public/cpp/bindings/pending_flush.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
#include "mojo/public/cpp/bindings/tests/bindings_test_base.h"
#include "mojo/public/cpp/bindings/tests/flush_async_unittest.test-mojom.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace test {
namespace flush_async_unittest {
// This implementation binds its receiver on an arbitrary ThreadPool task
// runner. Any incoming Writer receivers are in turn bound on arbitrary (and
// potentially different) ThreadPool task runners. There is therefore no general
// ordering guarantee regarding message dispatch among each bound interface,
// yielding generally racy behavior.
//
// This allows tests to reliably verify correctness of async flushing behavior.
class KeyValueStoreImpl : public base::RefCountedThreadSafe<KeyValueStoreImpl>,
public mojom::KeyValueStore {
public:
KeyValueStoreImpl()
: task_runner_(base::CreateSequencedTaskRunner({base::ThreadPool()})) {}
void Bind(PendingReceiver<mojom::KeyValueStore> receiver) {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&KeyValueStoreImpl::BindOnTaskRunner, this,
std::move(receiver)));
}
void ShutDown(base::OnceClosure callback) {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&KeyValueStoreImpl::ShutDownOnTaskRunner,
this, std::move(callback)));
}
void StoreValue(const std::string& key, const std::string& value) {
base::AutoLock locker(lock_);
contents_[key] = value;
}
private:
friend class base::RefCountedThreadSafe<KeyValueStoreImpl>;
class WriterImpl : public mojom::Writer {
public:
WriterImpl(KeyValueStoreImpl* key_value_store)
: task_runner_(base::CreateSequencedTaskRunner({base::ThreadPool()})),
key_value_store_(key_value_store) {}
~WriterImpl() override = default;
void Bind(PendingReceiver<mojom::Writer> receiver) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&WriterImpl::BindOnTaskRunner, base::Unretained(this),
std::move(receiver)));
}
void ShutDown(base::OnceClosure callback) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&WriterImpl::ShutDownOnTaskRunner,
base::Unretained(this), std::move(callback)));
}
// mojom::Writer implementation:
void Put(const std::string& key, const std::string& value) override {
key_value_store_->StoreValue(key, value);
}
private:
void BindOnTaskRunner(PendingReceiver<mojom::Writer> receiver) {
receiver_ =
std::make_unique<Receiver<mojom::Writer>>(this, std::move(receiver));
}
void ShutDownOnTaskRunner(base::OnceClosure callback) {
receiver_.reset();
std::move(callback).Run();
}
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
KeyValueStoreImpl* const key_value_store_;
std::unique_ptr<Receiver<mojom::Writer>> receiver_;
};
void BindOnTaskRunner(PendingReceiver<mojom::KeyValueStore> receiver) {
receiver_ = std::make_unique<Receiver<mojom::KeyValueStore>>(
this, std::move(receiver));
}
void ShutDownOnTaskRunner(base::OnceClosure callback) {
receiver_.reset();
client_.reset();
// Shutdown all WriterImpls too.
auto shutdown = base::BarrierClosure(writers_.size(), std::move(callback));
for (auto& writer : writers_)
writer->ShutDown(base::BindOnce(shutdown));
}
// mojom::KeyValueStore implementation:
void SetClient(PendingRemote<mojom::KeyValueStoreClient> client) override {
client_.Bind(std::move(client));
}
void BindWriter(PendingReceiver<mojom::Writer> receiver) override {
// NOTE: Each WriterImpl internally binds on an arbitrary ThreadPool task
// runner, leaving us with no ordering guarantee among Writers with respect
// to each other or this KeyValueStore.
auto new_writer = std::make_unique<WriterImpl>(this);
new_writer->Bind(std::move(receiver));
writers_.push_back(std::move(new_writer));
}
void GetSnapshot(GetSnapshotCallback callback) override {
base::AutoLock locker(lock_);
std::move(callback).Run(contents_);
// If we have a client, notify it that a snapshot was taken, but ensure that
// it doesn't dispatch that notification until the above callback is
// dispatched. Then also ensure that our remote doesn't receiver any
// subsequent callbacks until the client processes this |OnSnapshotTaken()|.
if (client_) {
client_.PauseReceiverUntilFlushCompletes(receiver_->FlushAsync());
client_->OnSnapshotTaken();
receiver_->PauseRemoteCallbacksUntilFlushCompletes(client_.FlushAsync());
}
}
~KeyValueStoreImpl() override = default;
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<Receiver<mojom::KeyValueStore>> receiver_;
Remote<mojom::KeyValueStoreClient> client_;
std::vector<std::unique_ptr<WriterImpl>> writers_;
base::Lock lock_;
base::flat_map<std::string, std::string> contents_;
};
class FlushAsyncTest : public BindingsTestBase {
public:
FlushAsyncTest() {
key_value_store_->Bind(
remote_key_value_store_.BindNewPipeAndPassReceiver());
}
void TearDown() override {
base::RunLoop wait_for_clean_shutdown;
key_value_store_->ShutDown(wait_for_clean_shutdown.QuitClosure());
wait_for_clean_shutdown.Run();
}
Remote<mojom::KeyValueStore>& key_value_store() {
return remote_key_value_store_;
}
Remote<mojom::Writer> MakeWriter() {
Remote<mojom::Writer> writer;
key_value_store()->BindWriter(writer.BindNewPipeAndPassReceiver());
return writer;
}
private:
Remote<mojom::KeyValueStore> remote_key_value_store_;
scoped_refptr<KeyValueStoreImpl> key_value_store_{
base::MakeRefCounted<KeyValueStoreImpl>()};
};
TEST_P(FlushAsyncTest, WaitForMultipleFlushes) {
const std::string kKey1 = "bar";
const std::string kKey2 = "foo";
const std::string kValue1 = "42";
const std::string kValue2 = "37";
Remote<mojom::Writer> writer1 = MakeWriter();
Remote<mojom::Writer> writer2 = MakeWriter();
writer1->Put(kKey1, kValue1);
writer2->Put(kKey2, kValue2);
// Both |Put()| calls must be received by the time |GetSnapshot()| is
// dispatched.
base::flat_map<std::string, std::string> snapshot;
base::RunLoop loop;
key_value_store().PauseReceiverUntilFlushCompletes(writer1.FlushAsync());
key_value_store().PauseReceiverUntilFlushCompletes(writer2.FlushAsync());
key_value_store()->GetSnapshot(base::BindLambdaForTesting(
[&](const base::flat_map<std::string, std::string>& contents) {
snapshot = contents;
loop.Quit();
}));
loop.Run();
EXPECT_EQ(2u, snapshot.size());
EXPECT_EQ(kValue1, snapshot[kKey1]);
EXPECT_EQ(kValue2, snapshot[kKey2]);
}
TEST_P(FlushAsyncTest, MultipleFlushesInSequence) {
const std::string kKey1 = "foo";
const std::string kKey2 = "bar";
const std::string kKey3 = "baz";
const std::string kValue1 = "1";
const std::string kValue2 = "2";
const std::string kValue3 = "3";
Remote<mojom::Writer> writer1 = MakeWriter();
Remote<mojom::Writer> writer2 = MakeWriter();
writer1->Put(kKey1, kValue1);
writer1.FlushForTesting();
// Pause each Writer until the |GetSnapshot()| call below has executed,
// ensuring that the snapshot never reflects the result of the |Put()| calls
// below.
base::RunLoop loop;
base::flat_map<std::string, std::string> snapshot;
key_value_store()->GetSnapshot(base::BindLambdaForTesting(
[&](const base::flat_map<std::string, std::string>& contents) {
snapshot = contents;
loop.Quit();
}));
writer1.PauseReceiverUntilFlushCompletes(key_value_store().FlushAsync());
writer2.PauseReceiverUntilFlushCompletes(key_value_store().FlushAsync());
writer1->Put(kKey2, kValue2);
writer2->Put(kKey3, kValue3);
loop.Run();
EXPECT_EQ(1u, snapshot.size());
EXPECT_EQ(kValue1, snapshot[kKey1]);
}
TEST_P(FlushAsyncTest, DroppedFlusherCompletesPendingFlush) {
const std::string kKey = "foo";
const std::string kValue = "bar";
Remote<mojom::Writer> writer = MakeWriter();
writer->Put(kKey, kValue);
writer.FlushForTesting();
// Pause the KeyValueStore to block |GetSnapshot()|, but drop the
// corresponding AsyncFlusher. The call should eventually execute.
base::RunLoop loop;
base::flat_map<std::string, std::string> snapshot;
base::Optional<AsyncFlusher> flusher(base::in_place);
key_value_store().PauseReceiverUntilFlushCompletes(
PendingFlush(&flusher.value()));
key_value_store()->GetSnapshot(base::BindLambdaForTesting(
[&](const base::flat_map<std::string, std::string>& contents) {
snapshot = contents;
loop.Quit();
}));
flusher.reset();
loop.Run();
EXPECT_EQ(1u, snapshot.size());
EXPECT_EQ(kValue, snapshot[kKey]);
}
class PingerImpl : public mojom::Pinger {
public:
explicit PingerImpl(PendingReceiver<mojom::Pinger> receiver)
: receiver_(this, std::move(receiver)) {}
~PingerImpl() override = default;
Receiver<mojom::Pinger>& receiver() { return receiver_; }
// mojom::Pinger implementation:
void Ping(PingCallback callback) override { std::move(callback).Run(); }
private:
Receiver<mojom::Pinger> receiver_;
};
TEST_P(FlushAsyncTest, PausedInterfaceDoesNotAutoResumeOnFlush) {
// Verifies that if a receiver is implicitly paused via a remote call to
// |PauseReceiverUntilFlushCompletes()|, but also explicitly paused by its
// owner calling |Pause()|, it does not auto-resume when the flush completes.
Remote<mojom::Pinger> pinger;
PingerImpl impl(pinger.BindNewPipeAndPassReceiver());
base::Optional<AsyncFlusher> flusher(base::in_place);
PendingFlush flush(&flusher.value());
pinger.PauseReceiverUntilFlushCompletes(std::move(flush));
// Allow the receiver to become implicitly paused as a result of the above
// call. Using |RunUntilIdle()| is safe here since this is a simple unit test
// and we are only concerned with activity on the calling sequence.
base::RunLoop().RunUntilIdle();
// We should not see a reply until the receiver is unpaused.
bool got_reply = false;
base::RunLoop ping_loop;
pinger->Ping(base::BindLambdaForTesting([&] {
ping_loop.Quit();
got_reply = true;
}));
// Explicitly pause the receiver and complete the AsyncFlusher.
impl.receiver().Pause();
flusher.reset();
// Ensure that any asynchronous side-effects of resetting the AsyncFlusher
// have a chance to execute.
base::RunLoop().RunUntilIdle();
// The receiver should still be paused despite the flush completing, because
// we haven't called an explicit |Resume()| to match the explicit |Pause()|
// above.
EXPECT_FALSE(got_reply);
// Now allow it to resume and verify that everything's cool.
impl.receiver().Resume();
ping_loop.Run();
EXPECT_TRUE(got_reply);
}
TEST_P(FlushAsyncTest, ResumeDoesNotInterruptWaitingOnFlush) {
// Verifies that an explicit |Resume()| does not actually resume message
// processing if the endpoint is still waiting on an asynchronous flush
// operation.
Remote<mojom::Pinger> pinger;
PingerImpl impl(pinger.BindNewPipeAndPassReceiver());
base::Optional<AsyncFlusher> flusher(base::in_place);
PendingFlush flush(&flusher.value());
pinger.PauseReceiverUntilFlushCompletes(std::move(flush));
// Allow the receiver to become implicitly paused as a result of the above
// call. Using |RunUntilIdle()| is safe here since this is a simple unit test
// and we are only concerned with activity on the calling sequence.
base::RunLoop().RunUntilIdle();
// We should not see a reply until the receiver is unpaused.
bool got_reply = false;
base::RunLoop ping_loop;
pinger->Ping(base::BindLambdaForTesting([&] {
ping_loop.Quit();
got_reply = true;
}));
// Explicitly resume the receiver and let tasks settle. There should still be
// no reply, because |flusher| is still active and the receiver is waiting on
// it to complete.
impl.receiver().Resume();
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(got_reply);
// Now allow the flush to complete and verify that the receiver is unblocked.
flusher.reset();
ping_loop.Run();
EXPECT_TRUE(got_reply);
}
class KeyValueStoreClientImpl : public mojom::KeyValueStoreClient {
public:
explicit KeyValueStoreClientImpl(
PendingReceiver<mojom::KeyValueStoreClient> receiver)
: receiver_(this, std::move(receiver)) {}
~KeyValueStoreClientImpl() override = default;
Receiver<mojom::KeyValueStoreClient>& receiver() { return receiver_; }
void set_snapshot_taken_callback(base::RepeatingClosure callback) {
snapshot_taken_callback_ = std::move(callback);
}
// mojom::KeyValueStoreClient implementation:
void OnSnapshotTaken() override {
if (snapshot_taken_callback_)
snapshot_taken_callback_.Run();
}
private:
Receiver<mojom::KeyValueStoreClient> receiver_;
base::RepeatingClosure snapshot_taken_callback_;
};
TEST_P(FlushAsyncTest, PauseRemote) {
// Smoke test to exercise the async flushing APIs on a Receiver to pause
// callback dispatch on its corresponding Remote. |GetSnapshot()| replies are
// strictly ordered against corresponding calls to |OnSnapshotTaken()| on the
// client interface. This is enforced entirely by logic in
// |KeyValueStoreImpl::GetSnapshot()| using async flush operations.
PendingRemote<mojom::KeyValueStoreClient> client;
KeyValueStoreClientImpl impl(client.InitWithNewPipeAndPassReceiver());
key_value_store()->SetClient(std::move(client));
int num_replies = 0;
int num_client_calls = 0;
// Any time the client gets an |OnSnapshotTaken()| call, it should be able
// to rely on the corresponding |GetSnapshot()| reply having already been
// dispatched.
impl.set_snapshot_taken_callback(base::BindLambdaForTesting([&] {
EXPECT_EQ(num_replies, num_client_calls + 1);
++num_client_calls;
}));
// Perform a few trial snapshots. All replies should be dispatched after any
// previous snapshot's client notification, but before its own corresponding
// client notification.
base::RunLoop loop;
key_value_store()->GetSnapshot(base::BindLambdaForTesting(
[&](const base::flat_map<std::string, std::string>&) {
EXPECT_EQ(0, num_replies);
EXPECT_EQ(0, num_client_calls);
++num_replies;
}));
key_value_store()->GetSnapshot(base::BindLambdaForTesting(
[&](const base::flat_map<std::string, std::string>&) {
EXPECT_EQ(1, num_replies);
EXPECT_EQ(1, num_client_calls);
++num_replies;
}));
key_value_store()->GetSnapshot(base::BindLambdaForTesting(
[&](const base::flat_map<std::string, std::string>&) {
EXPECT_EQ(2, num_replies);
EXPECT_EQ(2, num_client_calls);
++num_replies;
loop.Quit();
}));
loop.Run();
}
INSTANTIATE_MOJO_BINDINGS_TEST_SUITE_P(FlushAsyncTest);
} // namespace flush_async_unittest
} // namespace test
} // namespace mojo

@ -0,0 +1,21 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
module mojo.test.flush_async_unittest.mojom;
interface Writer {
Put(string key, string value);
};
interface KeyValueStoreClient {
OnSnapshotTaken();
};
interface KeyValueStore {
SetClient(pending_remote<KeyValueStoreClient> client);
BindWriter(pending_receiver<Writer> receiver);
GetSnapshot() => (map<string, string> entries);
};
interface Pinger { Ping() => (); };

@ -25,6 +25,8 @@ struct RunOrClosePipeMessageParams {
union RunOrClosePipeInput {
PeerAssociatedEndpointClosedEvent peer_associated_endpoint_closed_event;
PauseUntilFlushCompletes pause_until_flush_completes;
FlushAsync flush_async;
};
// A user-defined reason about why the interface is disconnected.
@ -44,3 +46,16 @@ struct PeerAssociatedEndpointClosedEvent {
DisconnectReason? disconnect_reason;
};
// Sent to an endpoint to pause its message dispatch. |flush_pipe| is
// monitored asynchronously for peer closure. Once its peer is closed, the
// endpoint unpauses.
struct PauseUntilFlushCompletes {
handle<message_pipe> flush_pipe;
};
// Sent to an endpoint so that the flush operation corresponding to
// |flusher_pipe| is signaled by the endpoint's receipt of this message.
struct FlushAsync {
handle<message_pipe> flusher_pipe;
};