0

Add generic, IPC-less version of MessagePort.

MessagePortCastCore bridges MessagePortCore, which supports a
Connector allowing an arbitrary transport layer between ports. By
default a pair will talk directly. This is intended for use on
platforms that do not use FIDL or Mojo but wish to use Cast bindings.

MessagePortCore extends from cast_message_port::MessagePort to support
multiple types of TaskRunners. There is a case where we might need
multiple implementations, so this should stay for now but can be
revisted later.

Additionally, visibility restrictions of the headers have been removed
so other components may reuse the abstraction. CreatePair has been moved
so multiple implementations of MessagePort may be linked. CreatePair
currently always creates the MessagePort "platform" type which can
communicate with bindings.

Bug: None
Change-Id: I61061875e2487568c2d55d043bbefab55e5f94be
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2956392
Reviewed-by: Sean Topping <seantopping@chromium.org>
Reviewed-by: Jiawei Li <lijiawei@chromium.org>
Reviewed-by: Ryan Keane <rwkeane@google.com>
Reviewed-by: Kevin Marshall <kmarshall@chromium.org>
Commit-Queue: Shawn Quereshi <shawnq@google.com>
Cr-Commit-Position: refs/heads/master@{#894442}
This commit is contained in:
Shawn Quereshi
2021-06-21 22:56:29 +00:00
committed by Chromium LUCI CQ
parent b30db05807
commit ffd09bcb7e
34 changed files with 913 additions and 69 deletions

@ -63,7 +63,7 @@ if (is_linux || is_chromeos || is_android) {
"//base",
"//chromecast/bindings/public/mojom",
"//components/cast/api_bindings:manager",
"//components/cast/message_port",
"//components/cast/message_port:message_port_cast",
"//components/on_load_script_injector/browser",
"//mojo/public/cpp/bindings",
"//third_party/blink/public/common",

@ -10,7 +10,7 @@
#include "base/bind.h"
#include "base/macros.h"
#include "components/cast/message_port/message_port_cast.h"
#include "components/cast/message_port/cast/message_port_cast.h"
namespace chromecast {
namespace bindings {

@ -10,7 +10,7 @@
#include "base/check.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/strings/string_piece.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "fuchsia/base/mem_buffer_util.h"
#include "fuchsia/base/message_port.h"

@ -24,7 +24,7 @@
#include "chromecast/common/mojom/queryable_data_store.mojom.h"
#include "chromecast/common/queryable_data.h"
#include "chromecast/net/connectivity_checker.h"
#include "components/cast/message_port/message_port_cast.h"
#include "components/cast/message_port/cast/message_port_cast.h"
#include "components/media_control/mojom/media_playback_options.mojom.h"
#include "content/public/browser/message_port_provider.h"
#include "content/public/browser/navigation_entry.h"

@ -10,7 +10,7 @@
#include "base/logging.h"
#include "base/strings/utf_string_conversions.h"
#include "chromecast/browser/cast_web_contents.h"
#include "components/cast/message_port/message_port_cast.h"
#include "components/cast/message_port/cast/message_port_cast.h"
#include "components/cast/named_message_port_connector/grit/named_message_port_connector_resources.h"
#include "ui/base/resource/resource_bundle.h"

@ -2,14 +2,39 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import("//build/buildflag_header.gni")
import("//build/config/features.gni")
import("//testing/test.gni")
declare_args() {
# If true, forces cast_api_bindings::CreatePlatformMessagePortPair to use
# cast_message_port::CreateMessagePortPair as its implementation. Otherwise,
# uses one of the other types based on platform.
use_message_port_core = false
}
buildflag_header("message_port_buildflags") {
header = "message_port_buildflags.h"
flags = [ "USE_MESSAGE_PORT_CORE=$use_message_port_core" ]
visibility = [ ":*" ]
}
source_set("message_port") {
if (is_fuchsia) {
public_deps = [ ":message_port_fuchsia" ]
public = [ "platform_message_port.h" ]
sources = [ "platform_message_port.cc" ]
public_deps = [ ":public" ]
deps = [
":message_port_buildflags",
"//base",
]
if (use_message_port_core) {
public_deps += [ ":message_port_core" ]
} else if (is_fuchsia) {
public_deps += [ ":message_port_fuchsia" ]
} else {
public_deps = [ ":message_port_cast" ]
public_deps += [ ":message_port_cast" ]
}
}
@ -25,15 +50,13 @@ source_set("public") {
]
defines = [ "CAST_COMPONENT_IMPLEMENTATION" ]
visibility = [ ":*" ]
}
if (is_fuchsia) {
source_set("message_port_fuchsia") {
public = [ "message_port_fuchsia.h" ]
public = [ "fuchsia/message_port_fuchsia.h" ]
sources = [ "message_port_fuchsia.cc" ]
sources = [ "fuchsia/message_port_fuchsia.cc" ]
public_deps = [
":public",
@ -48,9 +71,9 @@ if (is_fuchsia) {
}
source_set("message_port_cast") {
public = [ "message_port_cast.h" ]
public = [ "cast/message_port_cast.h" ]
sources = [ "message_port_cast.cc" ]
sources = [ "cast/message_port_cast.cc" ]
public_deps = [ ":public" ]
@ -61,11 +84,36 @@ source_set("message_port_cast") {
]
}
source_set("message_port_core") {
public = [
"cast_core/create_message_port_core.h",
"cast_core/message_connector.h",
"cast_core/message_port_core.h",
"cast_core/message_port_core_with_task_runner.h",
]
sources = [
"cast_core/create_message_port_core.cc",
"cast_core/message_connector.cc",
"cast_core/message_port_core.cc",
"cast_core/message_port_core_with_task_runner.cc",
]
public_deps = [ ":public" ]
deps = [
":public",
"//base",
]
}
source_set("message_port_unittest") {
testonly = true
sources = [ "message_port_unittest.cc" ]
deps = [
":message_port",
":message_port_buildflags",
":message_port_core",
":test_message_port_receiver",
"//base/test:test_support",
"//testing/gtest",

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/message_port_cast.h"
#include "components/cast/message_port/cast/message_port_cast.h"
#include <utility>
@ -13,8 +13,8 @@
namespace cast_api_bindings {
// static
void MessagePort::CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
void MessagePortCast::CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
auto pair_raw = blink::WebMessagePort::CreatePair();
*client = MessagePortCast::Create(std::move(pair_raw.first));
*server = MessagePortCast::Create(std::move(pair_raw.second));

@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_CAST_H_
#define COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_CAST_H_
#ifndef COMPONENTS_CAST_MESSAGE_PORT_CAST_MESSAGE_PORT_CAST_H_
#define COMPONENTS_CAST_MESSAGE_PORT_CAST_MESSAGE_PORT_CAST_H_
#include "components/cast/message_port/message_port.h"
#include "third_party/blink/public/common/messaging/web_message_port.h"
@ -25,6 +25,10 @@ class MessagePortCast : public cast_api_bindings::MessagePort,
MessagePortCast(const MessagePortCast&) = delete;
MessagePortCast& operator=(const MessagePortCast&) = delete;
// Creates a pair of message ports. Clients must respect |client| and
// |server| semantics because they matter for some implementations.
static void CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server);
static std::unique_ptr<MessagePort> Create(blink::WebMessagePort&& port);
static std::unique_ptr<MessagePort> Create(
blink::MessagePortDescriptor&& port_descriptor);
@ -55,4 +59,4 @@ class MessagePortCast : public cast_api_bindings::MessagePort,
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_CAST_H_
#endif // COMPONENTS_CAST_MESSAGE_PORT_CAST_MESSAGE_PORT_CAST_H_

@ -0,0 +1,28 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/cast_core/create_message_port_core.h"
#include <memory>
#include "components/cast/message_port/cast_core/message_port_core_with_task_runner.h"
namespace cast_api_bindings {
// static
std::unique_ptr<MessagePortCore> CreateMessagePortCore(uint32_t channel_id) {
return std::make_unique<MessagePortCoreWithTaskRunner>(channel_id);
}
// static
void CreateMessagePortCorePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
auto pair_raw = MessagePortCoreWithTaskRunner::CreatePair();
*client = std::make_unique<MessagePortCoreWithTaskRunner>(
std::move(pair_raw.first));
*server = std::make_unique<MessagePortCoreWithTaskRunner>(
std::move(pair_raw.second));
}
} // namespace cast_api_bindings

@ -0,0 +1,30 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_CREATE_MESSAGE_PORT_CORE_H_
#define COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_CREATE_MESSAGE_PORT_CORE_H_
#include <memory>
#include "components/cast/message_port/cast_core/message_port_core.h"
#include "components/cast/message_port/message_port.h"
namespace cast_api_bindings {
// Creates an MessagePort of the appropriate type based on the current
// TaskRunner. The port is unconnected but assigned to |channel_id|. Typically
// this would be used when receiving a serialized port and converting it to
// the local port type.
std::unique_ptr<MessagePortCore> CreateMessagePortCore(uint32_t channel_id);
// Implementation used by CreatePlatformMessagePortPair
// or callers who want to manually create MessagePortCore.
// This is external to the MessagePortCore class because MessagePortCore does
// not know about all of its implementations.
void CreateMessagePortCorePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server);
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_CREATE_MESSAGE_PORT_CORE_H_

@ -0,0 +1,45 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/cast_core/message_connector.h"
#include "base/check_op.h"
#include "components/cast/message_port/cast_core/message_port_core.h"
namespace cast_api_bindings {
MessageConnector::MessageConnector() : MessageConnector(0) {}
MessageConnector::MessageConnector(uint32_t channel_id)
: channel_id_(channel_id) {}
MessageConnector::~MessageConnector() {
if (peer_) {
peer_->DetachPeer();
}
peer_ = nullptr;
}
void MessageConnector::SetPeer(MessageConnector* other) {
DCHECK(other);
DCHECK_EQ(channel_id_, other->channel_id());
peer_ = other;
}
void MessageConnector::DetachPeer() {
peer_ = nullptr;
}
void MessageConnector::Start() {
if (started_) {
return;
}
started_ = true;
if (peer_) {
peer_->OnPeerStarted();
}
}
} // namespace cast_api_bindings

@ -0,0 +1,59 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_CONNECTOR_H_
#define COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_CONNECTOR_H_
#include <cstdint>
namespace cast_api_bindings {
struct Message;
// Represents one end of a channel, typically used by MessagePort.
class MessageConnector {
public:
// Accepts a |message| from another connector.
// Returns |true| if accepted by the connector; else, |false|.
virtual bool Accept(Message message) = 0;
// Accepts a |result| from another connector, typically after calling Accept.
// Returns |true| if accepted by the connector; else, |false|.
virtual bool AcceptResult(bool result) = 0;
// Called when peer is ready to process calls to Accept.
virtual void OnPeerStarted() = 0;
// Called when a peer has an error.
virtual void OnPeerError() = 0;
// Set this connector's peer to |other|, which must be on the same channel.
void SetPeer(MessageConnector* other);
// Detaches from |peer_|. Typically called by |peer_| to notify this
// connector of its invalidation.
void DetachPeer();
// Starts this connector; indicates this connector is ready for Accept.
void Start();
// Retrieve the |channel_id_| for this connector.
uint32_t channel_id() const { return channel_id_; }
// Whether the connector has |started_|
bool started() const { return started_; }
protected:
MessageConnector();
explicit MessageConnector(uint32_t channel_id);
virtual ~MessageConnector();
uint32_t channel_id_;
MessageConnector* peer_ = nullptr;
bool started_ = false;
};
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_CONNECTOR_H_

@ -0,0 +1,255 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/cast_core/message_port_core.h"
#include "components/cast/message_port/message_port.h"
namespace cast_api_bindings {
namespace {
const uint32_t kInvalidChannelId = 0;
} // namespace
MessagePortDescriptor::MessagePortDescriptor(uint32_t channel_id,
bool peer_started)
: channel_id(channel_id), peer_started(peer_started) {}
MessagePortDescriptor::MessagePortDescriptor(MessagePortDescriptor&& other)
: MessagePortDescriptor(other.channel_id, other.peer_started) {}
Message::Message(Message&& other)
: data(std::move(other.data)), ports(std::move(other.ports)) {}
Message& Message::operator=(Message&&) = default;
Message::Message(const std::string& data) : data(data) {}
Message::Message(const std::string& data,
std::vector<std::unique_ptr<MessagePortCore>> ports)
: data(data), ports(std::move(ports)) {}
Message::~Message() = default;
MessagePortCore::MessagePortCore(uint32_t channel_id)
: MessageConnector(channel_id) {}
MessagePortCore::MessagePortCore(MessagePortCore&& other) {
Assign(std::move(other));
}
MessagePortCore::~MessagePortCore() = default;
void MessagePortCore::Assign(MessagePortCore&& other) {
receiver_ = std::exchange(other.receiver_, nullptr);
channel_id_ = other.channel_id_;
peer_ = std::exchange(other.peer_, nullptr);
errored_ = std::exchange(other.errored_, false);
closed_ = std::exchange(other.closed_, true);
if (peer_) {
peer_->SetPeer(this);
}
}
void MessagePortCore::SetReceiver(Receiver* receiver) {
DCHECK(receiver);
SetTaskRunner();
receiver_ = receiver;
if (!peer_) {
OnPipeErrorOnSequence();
return;
}
StartOnSequence();
CheckPeerStartedOnSequence();
}
bool MessagePortCore::Accept(Message message) {
if (closed_ || errored_) {
return false;
}
if (HasTaskRunner()) {
AcceptOnSequence(std::move(message));
}
return true;
}
void MessagePortCore::AcceptInternal(Message message) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(receiver_);
if (errored_) {
return;
}
std::vector<std::unique_ptr<MessagePort>> transferables;
for (auto& port : message.ports) {
transferables.emplace_back(std::move(port));
}
bool result = receiver_->OnMessage(message.data, std::move(transferables));
if (!(peer_ && peer_->AcceptResult(result))) {
// The peer has closed and finished.
OnPipeErrorInternal();
}
}
bool MessagePortCore::AcceptResult(bool result) {
// If it's closed_ but not errored_, we still want to post remaining queue.
if (errored_) {
return false;
}
if (HasTaskRunner()) {
AcceptResultOnSequence(result);
}
// Whether we are closed and finished
return message_queue_.size() || !closed_;
}
void MessagePortCore::AcceptResultInternal(bool result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(pending_response_);
pending_response_ = false;
if (!result) {
OnPipeErrorInternal();
return;
}
ProcessMessageQueue();
}
void MessagePortCore::OnPeerStarted() {
// If it's closed_ but not errored_, we still want to post remaining queue.
if (errored_ || peer_started_) {
return;
}
if (HasTaskRunner()) {
CheckPeerStartedOnSequence();
}
}
void MessagePortCore::OnPeerError() {
if (HasTaskRunner()) {
OnPipeErrorOnSequence();
}
}
void MessagePortCore::CheckPeerStartedInternal() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (peer_started_ || !peer_) {
return;
}
// We're also using this to check the initial state, so don't necessarily
// assign |true|.
peer_started_ = peer_->started();
if (peer_started_) {
ProcessMessageQueue();
}
}
void MessagePortCore::ProcessMessageQueue() {
if (errored_ || pending_response_)
return;
if (!message_queue_.empty()) {
auto message = std::move(message_queue_.front());
message_queue_.pop();
PostMessageInternal(std::move(message));
}
}
bool MessagePortCore::PostMessage(base::StringPiece message) {
return PostMessageWithTransferables(message, {});
}
// static
MessagePortCore* MessagePortCore::FromMessagePort(MessagePort* port) {
DCHECK(port);
// This is safe because there is one MessagePortCore implementation per
// platform and this is called internally to the implementation.
return static_cast<MessagePortCore*>(port);
}
bool MessagePortCore::PostMessageWithTransferables(
base::StringPiece message,
std::vector<std::unique_ptr<MessagePort>> ports) {
std::vector<std::unique_ptr<MessagePortCore>> transferables;
for (auto& port : ports) {
std::unique_ptr<MessagePortCore> port_cast(FromMessagePort(port.release()));
transferables.emplace_back(std::move(port_cast));
}
auto msg = Message(std::string(message), std::move(transferables));
PostMessageOnSequence(std::move(msg));
return true;
}
void MessagePortCore::PostMessageInternal(Message message) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (errored_ || !peer_) {
return;
}
if (!peer_started_ || pending_response_) {
message_queue_.emplace(std::move(message));
return;
}
pending_response_ = peer_->Accept(std::move(message));
if (!pending_response_) {
OnPipeErrorInternal();
}
}
void MessagePortCore::OnPipeErrorInternal() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (errored_) {
return;
}
errored_ = true;
if (closed_) {
return;
}
receiver_->OnPipeError();
}
void MessagePortCore::Close() {
// Leave the receiver and sequence available for finishing up.
closed_ = true;
}
bool MessagePortCore::IsValid() const {
return !closed_ && !errored_ && peer_;
}
bool MessagePortCore::CanPostMessage() const {
return receiver_ && peer_ && !errored_ && !closed_;
}
MessagePortDescriptor MessagePortCore::Transfer(MessageConnector* replacement) {
DCHECK(replacement);
DCHECK(peer_);
MessagePortDescriptor desc(channel_id(), peer_->started());
peer_->SetPeer(replacement);
replacement->SetPeer(peer_);
Close();
peer_ = nullptr;
receiver_ = nullptr;
channel_id_ = kInvalidChannelId;
errored_ = false;
return desc;
}
} // namespace cast_api_bindings

@ -0,0 +1,121 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_PORT_CORE_H_
#define COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_PORT_CORE_H_
#include <memory>
#include <queue>
#include <string>
#include "base/sequence_checker.h"
#include "components/cast/message_port/cast_core/message_connector.h"
#include "components/cast/message_port/message_port.h"
namespace cast_api_bindings {
// A serialized version of the port used for transfer.
struct MessagePortDescriptor {
MessagePortDescriptor(uint32_t channel_id, bool peer_started);
MessagePortDescriptor(MessagePortDescriptor&& other);
uint32_t channel_id;
bool peer_started;
};
class MessagePortCore;
// Message sent over a MessagePortCore, containing data and optional
// MessagePortCore(s).
struct Message {
public:
Message(const Message&) = delete;
Message(Message&&);
Message& operator=(const Message&) = delete;
Message& operator=(Message&&);
explicit Message(const std::string& data);
explicit Message(const std::string& data,
std::vector<std::unique_ptr<MessagePortCore>> port);
~Message();
std::string data;
std::vector<std::unique_ptr<MessagePortCore>> ports;
};
// MessagePortCore used for transferring Message between a
// Receiver and another MessageConnector. Overrides of MessagePort and
// MessageConnector functions are run on a sequence provided by the OnSequence
// methods.
class MessagePortCore : public MessagePort, public MessageConnector {
public:
explicit MessagePortCore(uint32_t channel_id);
MessagePortCore(const MessagePortCore&) = delete;
MessagePortCore(MessagePortCore&& other);
~MessagePortCore() override;
// Gets the MessagePortCore representation of |port|
// for callers who are certain of its type. Typically used when |port|
// is being transferred.
static MessagePortCore* FromMessagePort(MessagePort* port);
// MessagePortCore does not have a service to manage transfer of its handles;
// the connector must be replaced, e.g. with a remote connector.
MessagePortDescriptor Transfer(MessageConnector* replacement);
// MessagePort implementation:
bool PostMessage(base::StringPiece message) override;
bool PostMessageWithTransferables(
base::StringPiece message,
std::vector<std::unique_ptr<MessagePort>> ports) override;
void SetReceiver(MessagePort::Receiver* receiver) override;
void Close() override;
bool CanPostMessage() const override;
protected:
void Assign(MessagePortCore&& other);
// Whether the port is valid and connected to a peer.
bool IsValid() const;
// Implementation must be able to schedule Accept, AcceptResult, PostMessage,
// and OnPipeError on a sequence.
virtual void AcceptOnSequence(Message message) = 0;
virtual void AcceptResultOnSequence(bool result) = 0;
virtual void CheckPeerStartedOnSequence() = 0;
virtual void StartOnSequence() = 0;
virtual void PostMessageOnSequence(Message message) = 0;
virtual void OnPipeErrorOnSequence() = 0;
virtual void SetTaskRunner() = 0;
virtual bool HasTaskRunner() const = 0;
// Synchronous operations used by Accept, AcceptResult, PostMessage, and
// OnPipeError to operate on a sequence.
void AcceptInternal(Message message);
void AcceptResultInternal(bool result);
void CheckPeerStartedInternal();
void PostMessageInternal(Message message);
void OnPipeErrorInternal();
private:
// Posts the next message on the queue, if there is one. Should only be
// invoked on the sequence.
void ProcessMessageQueue();
// MessageConnector implementation:
bool Accept(Message message) override;
bool AcceptResult(bool result) override;
void OnPeerStarted() override;
void OnPeerError() override;
MessagePort::Receiver* receiver_ = nullptr;
bool pending_response_ = false;
bool errored_ = false;
bool closed_ = false;
bool peer_started_ = false;
std::queue<Message> message_queue_;
SEQUENCE_CHECKER(sequence_checker_);
};
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_PORT_CORE_H_

@ -0,0 +1,113 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/cast_core/message_port_core_with_task_runner.h"
#include "base/bind.h"
#include "base/logging.h"
#include "base/sequence_checker.h"
#include "base/threading/sequenced_task_runner_handle.h"
namespace cast_api_bindings {
namespace {
static uint32_t GenerateChannelId() {
// Should theoretically start at a random number to lower collision chance if
// ports are created in multiple places, but in practice this does not happen
static std::atomic<uint32_t> channel_id = {0x8000000};
return ++channel_id;
}
} // namespace
std::pair<MessagePortCoreWithTaskRunner, MessagePortCoreWithTaskRunner>
MessagePortCoreWithTaskRunner::CreatePair() {
auto channel_id = GenerateChannelId();
auto pair = std::make_pair(MessagePortCoreWithTaskRunner(channel_id),
MessagePortCoreWithTaskRunner(channel_id));
pair.first.SetPeer(&pair.second);
pair.second.SetPeer(&pair.first);
return pair;
}
MessagePortCoreWithTaskRunner::MessagePortCoreWithTaskRunner(
uint32_t channel_id)
: MessagePortCore(channel_id) {}
MessagePortCoreWithTaskRunner::MessagePortCoreWithTaskRunner(
MessagePortCoreWithTaskRunner&& other)
: MessagePortCore(std::move(other)) {
task_runner_ = std::exchange(other.task_runner_, nullptr);
}
MessagePortCoreWithTaskRunner::~MessagePortCoreWithTaskRunner() = default;
MessagePortCoreWithTaskRunner& MessagePortCoreWithTaskRunner::operator=(
MessagePortCoreWithTaskRunner&& other) {
task_runner_ = std::exchange(other.task_runner_, nullptr);
Assign(std::move(other));
return *this;
}
void MessagePortCoreWithTaskRunner::SetTaskRunner() {
task_runner_ = base::SequencedTaskRunnerHandle::Get();
}
void MessagePortCoreWithTaskRunner::AcceptOnSequence(Message message) {
DCHECK(task_runner_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_);
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&MessagePortCoreWithTaskRunner::AcceptInternal,
base::Unretained(this), std::move(message)));
}
void MessagePortCoreWithTaskRunner::AcceptResultOnSequence(bool result) {
DCHECK(task_runner_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&MessagePortCoreWithTaskRunner::AcceptResultInternal,
weak_factory_.GetWeakPtr(), result));
}
void MessagePortCoreWithTaskRunner::CheckPeerStartedOnSequence() {
DCHECK(task_runner_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&MessagePortCoreWithTaskRunner::CheckPeerStartedInternal,
weak_factory_.GetWeakPtr()));
}
void MessagePortCoreWithTaskRunner::StartOnSequence() {
DCHECK(task_runner_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_);
task_runner_->PostTask(FROM_HERE,
base::BindOnce(&MessagePortCoreWithTaskRunner::Start,
weak_factory_.GetWeakPtr()));
}
void MessagePortCoreWithTaskRunner::PostMessageOnSequence(Message message) {
DCHECK(task_runner_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&MessagePortCoreWithTaskRunner::PostMessageInternal,
weak_factory_.GetWeakPtr(), std::move(message)));
}
void MessagePortCoreWithTaskRunner::OnPipeErrorOnSequence() {
DCHECK(task_runner_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&MessagePortCoreWithTaskRunner::OnPipeErrorInternal,
weak_factory_.GetWeakPtr()));
}
bool MessagePortCoreWithTaskRunner::HasTaskRunner() const {
return !!task_runner_;
}
} // namespace cast_api_bindings

@ -0,0 +1,49 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_PORT_CORE_WITH_TASK_RUNNER_H_
#define COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_PORT_CORE_WITH_TASK_RUNNER_H_
#include "base/memory/scoped_refptr.h"
#include "base/sequenced_task_runner.h"
#include "components/cast/message_port/cast_core/message_port_core.h"
namespace cast_api_bindings {
// MessagePortCore serving users of base::SequenedTaskRunnerHandle
class MessagePortCoreWithTaskRunner : public MessagePortCore {
public:
explicit MessagePortCoreWithTaskRunner(uint32_t channel_id);
MessagePortCoreWithTaskRunner(const MessagePortCoreWithTaskRunner&) = delete;
MessagePortCoreWithTaskRunner(MessagePortCoreWithTaskRunner&& other);
~MessagePortCoreWithTaskRunner() override;
MessagePortCoreWithTaskRunner& operator=(
const MessagePortCoreWithTaskRunner&) = delete;
MessagePortCoreWithTaskRunner& operator=(
MessagePortCoreWithTaskRunner&& other);
// Creates a pair of bound MessagePorts representing two ends of a channel.
static std::pair<MessagePortCoreWithTaskRunner, MessagePortCoreWithTaskRunner>
CreatePair();
private:
// MessagePortCore implementation:
void AcceptOnSequence(Message message) override;
void AcceptResultOnSequence(bool result) override;
void CheckPeerStartedOnSequence() override;
void StartOnSequence() override;
void PostMessageOnSequence(Message message) override;
void OnPipeErrorOnSequence() override;
void SetTaskRunner() override;
bool HasTaskRunner() const override;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
SEQUENCE_CHECKER(sequence_);
base::WeakPtrFactory<MessagePortCoreWithTaskRunner> weak_factory_{this};
};
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_CAST_CORE_MESSAGE_PORT_CORE_WITH_TASK_RUNNER_H_

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/memory/weak_ptr.h"
@ -210,8 +210,8 @@ class MessagePortFuchsiaServer : public MessagePortFuchsia,
} // namespace
// static
void MessagePort::CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
void MessagePortFuchsia::CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
fidl::InterfaceHandle<fuchsia::web::MessagePort> port0;
fidl::InterfaceRequest<fuchsia::web::MessagePort> port1 = port0.NewRequest();
*client = MessagePortFuchsia::Create(std::move(port0));

@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_FUCHSIA_H_
#define COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_FUCHSIA_H_
#ifndef COMPONENTS_CAST_MESSAGE_PORT_FUCHSIA_MESSAGE_PORT_FUCHSIA_H_
#define COMPONENTS_CAST_MESSAGE_PORT_FUCHSIA_MESSAGE_PORT_FUCHSIA_H_
#include <fuchsia/web/cpp/fidl.h>
#include <lib/fidl/cpp/binding.h>
@ -25,6 +25,10 @@ class MessagePortFuchsia : public cast_api_bindings::MessagePort {
MessagePortFuchsia(const MessagePortFuchsia&) = delete;
MessagePortFuchsia& operator=(const MessagePortFuchsia&) = delete;
// Creates a pair of message ports. Clients must respect |client| and
// |server| semantics because they matter for some implementations.
static void CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server);
static std::unique_ptr<MessagePort> Create(
fidl::InterfaceHandle<::fuchsia::web::MessagePort> port);
static std::unique_ptr<MessagePort> Create(
@ -87,4 +91,4 @@ class MessagePortFuchsia : public cast_api_bindings::MessagePort {
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_FUCHSIA_H_
#endif // COMPONENTS_CAST_MESSAGE_PORT_FUCHSIA_MESSAGE_PORT_FUCHSIA_H_

@ -33,11 +33,6 @@ class CAST_COMPONENT_EXPORT MessagePort {
virtual ~MessagePort();
// Creates a pair of message ports. Clients must respect |client| and
// |server| semantics because they matter for some implementations.
static void CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server);
// Sends a |message| from the port.
virtual bool PostMessage(base::StringPiece message) = 0;

@ -2,17 +2,21 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/message_port.h"
#include <memory>
#include "base/run_loop.h"
#include "base/test/task_environment.h"
#include "build/build_config.h"
#include "components/cast/message_port/cast_core/create_message_port_core.h"
#include "components/cast/message_port/message_port.h"
#include "components/cast/message_port/message_port_buildflags.h"
#include "components/cast/message_port/platform_message_port.h"
#include "components/cast/message_port/test_message_port_receiver.h"
#include "testing/gtest/include/gtest/gtest.h"
#if defined(OS_FUCHSIA)
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#else
#include "components/cast/message_port/message_port_cast.h" // nogncheck
#include "components/cast/message_port/cast/message_port_cast.h" // nogncheck
#include "third_party/blink/public/common/messaging/web_message_port.h" // nogncheck
#endif // defined(OS_FUCHSIA)
@ -22,15 +26,23 @@
namespace cast_api_bindings {
class MessagePortTest : public ::testing::Test {
class MessagePortTest
: public ::testing::TestWithParam<void (*)(std::unique_ptr<MessagePort>*,
std::unique_ptr<MessagePort>*)> {
public:
MessagePortTest()
: task_environment_(base::test::TaskEnvironment::MainThreadType::IO) {
MessagePort::CreatePair(&client_, &server_);
: task_environment_(base::test::TaskEnvironment::MainThreadType::IO),
create_pair_function_(GetParam()) {
CreatePair(&client_, &server_);
}
~MessagePortTest() override = default;
void CreatePair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
create_pair_function_(client, server);
}
void SetDefaultReceivers() {
client_->SetReceiver(&client_receiver_);
server_->SetReceiver(&server_receiver_);
@ -79,20 +91,32 @@ class MessagePortTest : public ::testing::Test {
private:
const base::test::TaskEnvironment task_environment_;
void (*create_pair_function_)(std::unique_ptr<MessagePort>*,
std::unique_ptr<MessagePort>*);
};
TEST_F(MessagePortTest, Close) {
INSTANTIATE_TEST_SUITE_P(
MessagePortTest,
MessagePortTest,
testing::Values(&CreatePlatformMessagePortPair,
&cast_api_bindings::CreateMessagePortCorePair));
TEST_P(MessagePortTest, Close) {
SetDefaultReceivers();
ASSERT_TRUE(client_->CanPostMessage());
ASSERT_TRUE(server_->CanPostMessage());
server_->Close();
// cast_api_bindings::MessagePort reports closure PostMessage is attempted,
// but other ports report it proactively
client_->PostMessage("");
client_receiver_.RunUntilDisconnected();
ASSERT_FALSE(client_->CanPostMessage());
ASSERT_FALSE(server_->CanPostMessage());
}
TEST_F(MessagePortTest, OnError) {
TEST_P(MessagePortTest, OnError) {
server_receiver_.SetOnMessageResult(false);
SetDefaultReceivers();
client_->PostMessage("");
@ -107,22 +131,22 @@ TEST_F(MessagePortTest, OnError) {
client_receiver_.RunUntilDisconnected();
}
TEST_F(MessagePortTest, PostMessage) {
TEST_P(MessagePortTest, PostMessage) {
TestPostMessage();
}
TEST_F(MessagePortTest, PostMessageMultiple) {
TEST_P(MessagePortTest, PostMessageMultiple) {
SetDefaultReceivers();
PostMessages({"c1", "c2", "c3"}, client_.get(), &server_receiver_);
PostMessages({"s1", "s2", "s3"}, server_.get(), &client_receiver_);
}
TEST_F(MessagePortTest, PostMessageWithTransferables) {
TEST_P(MessagePortTest, PostMessageWithTransferables) {
std::unique_ptr<MessagePort> port0;
std::unique_ptr<MessagePort> port1;
TestMessagePortReceiver port0_receiver;
TestMessagePortReceiver port1_receiver;
MessagePort::CreatePair(&port0, &port1);
CreatePair(&port0, &port1);
// If the ports are represented by multiple types as in the case of
// MessagePortFuchsia, make sure both are transferrable
@ -139,9 +163,11 @@ TEST_F(MessagePortTest, PostMessageWithTransferables) {
PostMessages({"from port1"}, port1.get(), &port0_receiver);
}
TEST_F(MessagePortTest, WrapPlatformPort) {
TEST_P(MessagePortTest, WrapPlatformPort) {
// Initialize ports from the platform type instead of agnostic CreatePair
#if defined(OS_FUCHSIA)
#if BUILDFLAG(USE_MESSAGE_PORT_CORE)
cast_api_bindings::CreateMessagePortPair(&client_, &server_);
#elif defined(OS_FUCHSIA)
fidl::InterfaceHandle<fuchsia::web::MessagePort> port0;
fidl::InterfaceRequest<fuchsia::web::MessagePort> port1 = port0.NewRequest();
client_ = MessagePortFuchsia::Create(std::move(port0));
@ -155,9 +181,17 @@ TEST_F(MessagePortTest, WrapPlatformPort) {
TestPostMessage();
}
TEST_F(MessagePortTest, UnwrapPlatformPortCast) {
// Test unwrapping via TakePort (rewrapped for test methods)
#if defined(OS_FUCHSIA)
// Test unwrapping via TakePort (rewrapped for test methods)
TEST_P(MessagePortTest, UnwrapPlatformPortCast) {
// Workaround for parameterized tests which would create the
// wrong port type
CreatePlatformMessagePortPair(&client_, &server_);
#if BUILDFLAG(USE_MESSAGE_PORT_CORE)
client_.reset(
cast_api_bindings::MessagePortCore::FromMessagePort(client_.release()));
server_.reset(
cast_api_bindings::MessagePortCore::FromMessagePort(server_.release()));
#elif defined(OS_FUCHSIA)
client_ = MessagePortFuchsia::Create(
MessagePortFuchsia::FromMessagePort(client_.get())->TakeClientHandle());
server_ = MessagePortFuchsia::Create(

@ -0,0 +1,32 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast/message_port/platform_message_port.h"
#include "build/build_config.h"
#include "components/cast/message_port/message_port_buildflags.h"
#if BUILDFLAG(USE_MESSAGE_PORT_CORE)
#include "components/cast/message_port/cast_core/create_message_port_core.h" // nogncheck
#elif defined(OS_FUCHSIA)
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h" // nogncheck
#else
#include "components/cast/message_port/cast/message_port_cast.h" // nogncheck
#endif
namespace cast_api_bindings {
// static
void CreatePlatformMessagePortPair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server) {
#if BUILDFLAG(USE_MESSAGE_PORT_CORE)
return CreateMessagePortCorePair(client, server);
#elif defined(OS_FUCHSIA)
return MessagePortFuchsia::CreatePair(client, server);
#else
return MessagePortCast::CreatePair(client, server);
#endif
}
} // namespace cast_api_bindings

@ -0,0 +1,22 @@
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_CAST_MESSAGE_PORT_PLATFORM_MESSAGE_PORT_H_
#define COMPONENTS_CAST_MESSAGE_PORT_PLATFORM_MESSAGE_PORT_H_
#include <memory>
#include "components/cast/message_port/message_port.h"
namespace cast_api_bindings {
// Creates a pair of message ports. Clients must respect |client| and
// |server| semantics because some platforms have asymmetric port
// implementations.
void CreatePlatformMessagePortPair(std::unique_ptr<MessagePort>* client,
std::unique_ptr<MessagePort>* server);
} // namespace cast_api_bindings
#endif // COMPONENTS_CAST_MESSAGE_PORT_PLATFORM_MESSAGE_PORT_H_

@ -9,6 +9,7 @@
#include "base/logging.h"
#include "base/strings/utf_string_conversions.h"
#include "components/cast/message_port/platform_message_port.h"
namespace cast_api_bindings {
@ -46,7 +47,7 @@ void NamedMessagePortConnector::GetConnectMessage(
std::string* message,
std::unique_ptr<MessagePort>* port) {
constexpr char kControlPortConnectMessage[] = "cast.master.connect";
MessagePort::CreatePair(&control_port_, port);
CreatePlatformMessagePortPair(&control_port_, port);
*message = kControlPortConnectMessage;
control_port_->SetReceiver(this);
}

@ -8,6 +8,7 @@
#include "base/json/json_writer.h"
#include "base/run_loop.h"
#include "base/test/task_environment.h"
#include "components/cast/message_port/platform_message_port.h"
#include "components/cast/message_port/test_message_port_receiver.h"
#include "components/cast_streaming/browser/message_serialization.h"
#include "testing/gtest/include/gtest/gtest.h"
@ -29,8 +30,8 @@ class CastMessagePortImplTest : public testing::Test,
void SetUp() override {
std::unique_ptr<cast_api_bindings::MessagePort> receiver;
cast_api_bindings::MessagePort::CreatePair(&sender_message_port_,
&receiver);
cast_api_bindings::CreatePlatformMessagePortPair(&sender_message_port_,
&receiver);
sender_message_port_->SetReceiver(&sender_message_port_receiver_);
receiver_message_port_ = std::make_unique<CastMessagePortImpl>(

@ -7,6 +7,7 @@
#include "base/run_loop.h"
#include "base/test/task_environment.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "components/cast/message_port/platform_message_port.h"
#include "components/cast_streaming/browser/test/cast_streaming_test_receiver.h"
#include "components/cast_streaming/browser/test/cast_streaming_test_sender.h"
#include "media/base/media_util.h"
@ -68,8 +69,8 @@ class CastStreamingSessionTest : public testing::Test {
void StartSession() {
std::unique_ptr<cast_api_bindings::MessagePort> sender_message_port;
std::unique_ptr<cast_api_bindings::MessagePort> receiver_message_port;
cast_api_bindings::MessagePort::CreatePair(&sender_message_port,
&receiver_message_port);
cast_api_bindings::CreatePlatformMessagePortPair(&sender_message_port,
&receiver_message_port);
receiver_.Start(std::move(receiver_message_port));
EXPECT_TRUE(sender_.Start(

@ -19,8 +19,8 @@ namespace cast_streaming {
//
// std::unique_ptr<cast_api_bindings::MessagePort> sender_message_port;
// std::unique_ptr<cast_api_bindings::MessagePort> receiver_message_port;
// cast_api_bindings::MessagePort::CreatePair(&sender_message_port,
// &receiver_message_port);
// cast_api_bindings::CreatePlatformMessagePortPair(&sender_message_port,
// &receiver_message_port);
//
// // Send |sender_message_port| to a Sender and start it.
//

@ -25,8 +25,8 @@ namespace cast_streaming {
//
// std::unique_ptr<cast_api_bindings::MessagePort> sender_message_port;
// std::unique_ptr<cast_api_bindings::MessagePort> receiver_message_port;
// cast_api_bindings::MessagePort::CreatePair(&sender_message_port,
// &receiver_message_port);
// cast_api_bindings::CreatePlatformMessagePortPair(&sender_message_port,
// &receiver_message_port);
//
// // Send |receiver_message_port| to a Receiver and start it.
//

@ -4,7 +4,8 @@
#include "base/callback_helpers.h"
#include "base/threading/platform_thread.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "components/cast/message_port/platform_message_port.h"
#include "components/cast_streaming/browser/test/cast_streaming_test_sender.h"
#include "content/public/test/browser_test.h"
#include "fuchsia/base/mem_buffer_util.h"
@ -122,8 +123,8 @@ IN_PROC_BROWSER_TEST_F(CastStreamingTest, LoadSuccess) {
std::unique_ptr<cast_api_bindings::MessagePort> sender_message_port;
std::unique_ptr<cast_api_bindings::MessagePort> receiver_message_port;
cast_api_bindings::MessagePort::CreatePair(&sender_message_port,
&receiver_message_port);
cast_api_bindings::CreatePlatformMessagePortPair(&sender_message_port,
&receiver_message_port);
fidl::InterfaceRequest<::fuchsia::web::MessagePort> message_port_request =
cast_api_bindings::MessagePortFuchsia::FromMessagePort(

@ -5,7 +5,7 @@
#include "fuchsia/engine/browser/receiver_session_client.h"
#include "base/bind.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "components/cast_streaming/public/config_conversions.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/video_decoder_config.h"

@ -9,7 +9,7 @@
#include "base/bind.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/strings/string_piece.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
namespace {

@ -10,7 +10,7 @@
#include "base/files/file_util.h"
#include "base/path_service.h"
#include "base/test/bind.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "content/public/test/browser_test.h"
#include "fuchsia/base/mem_buffer_util.h"
#include "fuchsia/base/test/fit_adapter.h"

@ -16,7 +16,8 @@
#include "base/fuchsia/fuchsia_logging.h"
#include "base/path_service.h"
#include "base/task/current_thread.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "components/cast/message_port/platform_message_port.h"
#include "fuchsia/base/agent_manager.h"
#include "fuchsia/base/mem_buffer_util.h"
#include "fuchsia/fidl/chromium/cast/cpp/fidl.h"
@ -161,8 +162,8 @@ void CastComponent::StartComponent() {
// Register the MessagePort for the Cast Streaming Receiver.
std::unique_ptr<cast_api_bindings::MessagePort> message_port_for_web_engine;
std::unique_ptr<cast_api_bindings::MessagePort> message_port_for_agent;
cast_api_bindings::MessagePort::CreatePair(&message_port_for_agent,
&message_port_for_web_engine);
cast_api_bindings::CreatePlatformMessagePortPair(
&message_port_for_agent, &message_port_for_web_engine);
frame()->PostMessage(
kCastStreamingMessagePortOrigin,
CreateWebMessage("", std::move(message_port_for_web_engine)),

@ -4,7 +4,7 @@
#include "fuchsia/runners/cast/create_web_message.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "fuchsia/base/mem_buffer_util.h"
fuchsia::web::WebMessage CreateWebMessage(

@ -10,7 +10,7 @@
#include "base/macros.h"
#include "base/path_service.h"
#include "base/strings/string_piece.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "components/cast/message_port/fuchsia/message_port_fuchsia.h"
#include "components/cast/message_port/test_message_port_receiver.h"
#include "content/public/test/browser_test.h"
#include "fuchsia/base/mem_buffer_util.h"