0

Reland "Direct Sockets: Add WritableStream for UDPSocket."

This is a reland of 4a3be276e1

Replace
	DCHECK_EQ(net::OK, helper.BindSync(...));
with
	int result = helper.BindSync(...);
	DCHECK_EQ(net::OK, result);
so that the bind statement doesn't get dropped silently in release mode.

Original change's description:
> Direct Sockets: Add WritableStream for UDPSocket.
>
> Add UDPWritableStreamWrapper which
> provides writable stream implementation for the UDP Socket.
>
> Streams API reference: https://streams.spec.whatwg.org/#writablestream
> UDP writable stream proposal (old): see §9.2.13
> https://www.w3.org/TR/tcp-udp-sockets/#widl-UDPSocket-writeable
>
> Bug: 1284560
> Change-Id: I004180d7f3fac7644aebebeb264e83148f3b259a
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3386797
> Reviewed-by: Eric Willigers <ericwilligers@chromium.org>
> Reviewed-by: Yutaka Hirano <yhirano@chromium.org>
> Commit-Queue: Andrew Rayskiy <greengrape@google.com>
> Cr-Commit-Position: refs/heads/main@{#962464}

Change-Id: Ic0a6e05206dbfe27497cc5ed4bf077fb75dd5306
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3416874
Reviewed-by: Eric Willigers <ericwilligers@chromium.org>
Commit-Queue: Andrew Rayskiy <greengrape@google.com>
Cr-Commit-Position: refs/heads/main@{#963953}
This commit is contained in:
Andrew Rayskiy
2022-01-27 09:52:33 +00:00
committed by Chromium LUCI CQ
parent 664cb870fb
commit 105716ef02
16 changed files with 780 additions and 27 deletions

@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/notreached.h"
#include "base/strings/stringprintf.h"
#include "base/test/bind.h"
#include "base/test/scoped_feature_list.h"
#include "content/browser/direct_sockets/direct_sockets_service_impl.h"
@ -21,6 +23,8 @@
#include "net/base/ip_endpoint.h"
#include "net/test/embedded_test_server/embedded_test_server.h"
#include "services/network/public/mojom/network_context.mojom.h"
#include "services/network/public/mojom/udp_socket.mojom.h"
#include "services/network/test/udp_socket_test_util.h"
#include "testing/gmock/include/gmock/gmock-matchers.h"
#include "url/gurl.h"
@ -72,12 +76,36 @@ class DirectSocketsUdpBrowserTest : public ContentBrowserTest {
origin_list);
}
uint16_t CreateUDPSocket(
mojo::PendingRemote<network::mojom::UDPSocketListener>
listener_receiver_remote) {
GetNetworkContext()->CreateUDPSocket(
server_socket_.BindNewPipeAndPassReceiver(),
std::move(listener_receiver_remote));
server_socket_.set_disconnect_handler(
base::BindLambdaForTesting([]() { NOTREACHED(); }));
net::IPEndPoint server_addr(net::IPAddress::IPv4Localhost(), 0);
network::test::UDPSocketTestHelper helper(&server_socket_);
int result = helper.BindSync(server_addr, nullptr, &server_addr);
DCHECK_EQ(net::OK, result);
return server_addr.port();
}
void ReceiveMore(uint32_t num_additional_datagrams) {
server_socket_->ReceiveMore(num_additional_datagrams);
}
private:
BrowserContext* browser_context() {
return shell()->web_contents()->GetBrowserContext();
}
base::test::ScopedFeatureList feature_list_;
mojo::Remote<network::mojom::UDPSocket> server_socket_;
};
IN_PROC_BROWSER_TEST_F(DirectSocketsUdpBrowserTest, CloseUdp) {
@ -92,4 +120,69 @@ IN_PROC_BROWSER_TEST_F(DirectSocketsUdpBrowserTest, CloseUdp) {
EXPECT_EQ("closeUdp succeeded", EvalJs(shell(), script));
}
// TODO(crbug/1290807): fails on the Win10 Tests x64 builder.
#if !BUILDFLAG(IS_WIN)
IN_PROC_BROWSER_TEST_F(DirectSocketsUdpBrowserTest, SendUdp) {
// We send datagrams with one byte, two bytes, three bytes, ...
const uint32_t kRequiredDatagrams = 35;
const uint32_t kRequiredBytes =
kRequiredDatagrams * (kRequiredDatagrams + 1) / 2;
EXPECT_TRUE(NavigateToURL(shell(), GetTestPageURL()));
DirectSocketsServiceImpl::SetPermissionCallbackForTesting(
base::BindRepeating(&UnconditionallyPermitConnection));
// Any attempt to make this a class member results into
// "This caller requires a single-threaded context".
network::test::UDPSocketListenerImpl listener;
mojo::Receiver<network::mojom::UDPSocketListener> listener_receiver{
&listener};
const uint16_t port =
CreateUDPSocket(listener_receiver.BindNewPipeAndPassRemote());
ReceiveMore(kRequiredDatagrams);
const std::string script = base::StringPrintf(
"sendUdp({remoteAddress: '127.0.0.1', remotePort: %d}, %u)", port,
kRequiredBytes);
EXPECT_EQ("send succeeded", EvalJs(shell(), script));
listener.WaitForReceivedResults(kRequiredDatagrams);
EXPECT_EQ(listener.results().size(), kRequiredDatagrams);
uint32_t bytes_received = 0, expected_data_size = 0;
for (const network::test::UDPSocketListenerImpl::ReceivedResult& result :
listener.results()) {
expected_data_size++;
EXPECT_EQ(result.net_error, net::OK);
EXPECT_TRUE(result.src_addr.has_value());
EXPECT_TRUE(result.data.has_value());
EXPECT_EQ(result.data->size(), expected_data_size);
for (uint8_t current : *result.data) {
EXPECT_EQ(current, static_cast<uint8_t>('a'));
++bytes_received;
}
}
EXPECT_EQ(bytes_received, kRequiredBytes);
}
#endif
IN_PROC_BROWSER_TEST_F(DirectSocketsUdpBrowserTest, SendUdpAfterClose) {
const uint32_t kRequiredBytes = 1;
EXPECT_TRUE(NavigateToURL(shell(), GetTestPageURL()));
DirectSocketsServiceImpl::SetPermissionCallbackForTesting(
base::BindRepeating(&UnconditionallyPermitConnection));
const std::string script = base::StringPrintf(
"sendUdpAfterClose({remoteAddress: '127.0.0.1', remotePort: 993}, %u)",
kRequiredBytes);
EXPECT_EQ(
"send failed: InvalidStateError: Failed to execute 'write' on "
"'UnderlyingSinkBase': Socket is disconnected.",
EvalJs(shell(), script));
}
} // namespace content

@ -5,6 +5,30 @@
<script>
'use strict';
async function sendLoop(writer, requiredBytes) {
let bytesWritten = 0;
let chunkLength = 0;
const encoder = new TextEncoder();
while (bytesWritten < requiredBytes) {
chunkLength = Math.min(chunkLength + 1,
requiredBytes - bytesWritten);
bytesWritten += chunkLength;
const view = encoder.encode('a'.repeat(chunkLength));
await writer.write({ data: view.buffer });
}
return 'send succeeded';
}
async function sendUdp(options, requiredBytes) {
try {
let udpSocket = await navigator.openUDPSocket(options);
return await sendLoop(udpSocket.writable.getWriter(), requiredBytes);
} catch(error) {
return ('sendUdp failed: ' + error);
}
}
async function closeUdp(options) {
try {
let udpSocket = await navigator.openUDPSocket(options);
@ -15,6 +39,16 @@
}
}
async function sendUdpAfterClose(options, requiredBytes) {
try {
let udpSocket = await navigator.openUDPSocket(options);
await udpSocket.close();
return await sendLoop(udpSocket.writable.getWriter(), requiredBytes);
} catch(error) {
return ('send failed: ' + error);
}
}
</script>
</head>
<body>

@ -2609,6 +2609,8 @@ if (target_os != "android") {
"$root_gen_dir/third_party/blink/renderer/bindings/modules/v8/v8_tcp_socket_options.h",
"$root_gen_dir/third_party/blink/renderer/bindings/modules/v8/v8_udp_socket_options.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/modules/v8/v8_udp_socket_options.h",
"$root_gen_dir/third_party/blink/renderer/bindings/modules/v8/v8_udp_message.h",
"$root_gen_dir/third_party/blink/renderer/bindings/modules/v8/v8_udp_message.cc",
]
generated_interface_sources_in_modules += [
"$root_gen_dir/third_party/blink/renderer/bindings/modules/v8/v8_tcp_socket.cc",

@ -1156,6 +1156,7 @@ if (target_os != "android") {
"//third_party/blink/renderer/modules/direct_sockets/navigator_socket.idl",
"//third_party/blink/renderer/modules/direct_sockets/socket_options.idl",
"//third_party/blink/renderer/modules/direct_sockets/tcp_socket.idl",
"//third_party/blink/renderer/modules/direct_sockets/udp_message.idl",
"//third_party/blink/renderer/modules/direct_sockets/udp_socket.idl",
],
"abspath")

@ -16,6 +16,10 @@ blink_modules_sources("direct_sockets") {
"tcp_writable_stream_wrapper.h",
"udp_socket.cc",
"udp_socket.h",
"udp_socket_mojo_remote.cc",
"udp_socket_mojo_remote.h",
"udp_writable_stream_wrapper.cc",
"udp_writable_stream_wrapper.h",
]
deps = [ "//net:net" ]
}
@ -26,6 +30,7 @@ source_set("unit_tests") {
"tcp_readable_stream_wrapper_unittest.cc",
"tcp_socket_unittest.cc",
"tcp_writable_stream_wrapper_unittest.cc",
"udp_writable_stream_wrapper_unittest.cc",
]
configs += [

@ -33,7 +33,7 @@ class TCPSocketCreator {
return tcp_socket;
}
ScriptPromise GetSciptPromise() { return create_promise_; }
ScriptPromise GetScriptPromise() { return create_promise_; }
private:
ScriptPromise create_promise_;
@ -43,13 +43,13 @@ TEST(TCPSocketTest, Create) {
V8TestingScope scope;
TCPSocketCreator tcp_socket_creator;
auto create_promise = tcp_socket_creator.GetSciptPromise();
auto create_promise = tcp_socket_creator.GetScriptPromise();
EXPECT_TRUE(create_promise.IsEmpty());
tcp_socket_creator.Create(scope);
auto* script_state = scope.GetScriptState();
create_promise = tcp_socket_creator.GetSciptPromise();
create_promise = tcp_socket_creator.GetScriptPromise();
ScriptPromiseTester create_tester(script_state, create_promise);
EXPECT_TRUE(create_promise.IsAssociatedWith(script_state));
@ -62,7 +62,7 @@ TEST(TCPSocketTest, CloseBeforeInit) {
auto* tcp_socket = tcp_socket_creator.Create(scope);
auto* script_state = scope.GetScriptState();
auto create_promise = tcp_socket_creator.GetSciptPromise();
auto create_promise = tcp_socket_creator.GetScriptPromise();
ScriptPromiseTester create_tester(script_state, create_promise);
ASSERT_FALSE(create_tester.IsRejected());
@ -89,7 +89,7 @@ TEST(TCPSocketTest, CloseAfterInitWithoutResultOK) {
auto* tcp_socket = tcp_socket_creator.Create(scope);
auto* script_state = scope.GetScriptState();
auto create_promise = tcp_socket_creator.GetSciptPromise();
auto create_promise = tcp_socket_creator.GetScriptPromise();
ScriptPromiseTester create_tester(script_state, create_promise);
ASSERT_FALSE(create_tester.IsRejected());
@ -121,7 +121,7 @@ TEST(TCPSocketTest, CloseAfterInitWithResultOK) {
auto* tcp_socket = tcp_socket_creator.Create(scope);
auto* script_state = scope.GetScriptState();
auto create_promise = tcp_socket_creator.GetSciptPromise();
auto create_promise = tcp_socket_creator.GetScriptPromise();
ScriptPromiseTester create_tester(script_state, create_promise);
ASSERT_FALSE(create_tester.IsFulfilled());
@ -148,7 +148,7 @@ TEST(TCPSocketTest, OnSocketObserverConnectionError) {
auto* tcp_socket = tcp_socket_creator.Create(scope);
auto* script_state = scope.GetScriptState();
auto create_promise = tcp_socket_creator.GetSciptPromise();
auto create_promise = tcp_socket_creator.GetScriptPromise();
ScriptPromiseTester create_tester(script_state, create_promise);
ASSERT_FALSE(create_tester.IsRejected());

@ -50,8 +50,6 @@ class MODULES_EXPORT TCPWritableStreamWrapper final
return writable_;
}
ScriptState* GetScriptState() { return script_state_; }
void Reset();
State GetState() const { return state_; }

@ -0,0 +1,11 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// https://github.com/WICG/direct-sockets/blob/main/docs/explainer.md#udp
dictionary UDPMessage {
ArrayBuffer data;
DOMString remoteAddress;
unsigned short remotePort;
};

@ -5,13 +5,18 @@
#include "third_party/blink/renderer/modules/direct_sockets/udp_socket.h"
#include "base/metrics/histogram_functions.h"
#include "base/notreached.h"
#include "net/base/net_errors.h"
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink.h"
#include "third_party/blink/public/platform/task_type.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/fileapi/blob.h"
#include "third_party/blink/renderer/modules/direct_sockets/udp_writable_stream_wrapper.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/mojo/heap_mojo_remote.h"
#include "third_party/blink/renderer/platform/scheduler/public/scheduling_policy.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
@ -31,7 +36,9 @@ UDPSocket::UDPSocket(ExecutionContext* execution_context,
feature_handle_for_scheduler_(
execution_context->GetScheduler()->RegisterFeature(
SchedulingPolicy::Feature::kOutstandingNetworkRequestDirectSocket,
{SchedulingPolicy::DisableBackForwardCache()})) {
{SchedulingPolicy::DisableBackForwardCache()})),
udp_socket_(MakeGarbageCollected<UDPSocketMojoRemote>(execution_context)),
socket_listener_receiver_(this, execution_context) {
DCHECK(init_resolver_);
}
@ -40,13 +47,15 @@ UDPSocket::~UDPSocket() = default;
mojo::PendingReceiver<blink::mojom::blink::DirectUDPSocket>
UDPSocket::GetUDPSocketReceiver() {
DCHECK(init_resolver_);
return udp_socket_.BindNewPipeAndPassReceiver();
return udp_socket_->get().BindNewPipeAndPassReceiver(
GetExecutionContext()->GetTaskRunner(TaskType::kNetworking));
}
mojo::PendingRemote<network::mojom::blink::UDPSocketListener>
UDPSocket::GetUDPSocketListener() {
DCHECK(init_resolver_);
auto result = socket_listener_receiver_.BindNewPipeAndPassRemote();
auto result = socket_listener_receiver_.BindNewPipeAndPassRemote(
GetExecutionContext()->GetTaskRunner(TaskType::kNetworking));
socket_listener_receiver_.set_disconnect_handler(WTF::Bind(
&UDPSocket::OnSocketListenerConnectionError, WrapPersistent(this)));
@ -60,6 +69,9 @@ void UDPSocket::Init(int32_t result,
DCHECK(init_resolver_);
if (result == net::Error::OK && peer_addr.has_value()) {
peer_addr_ = peer_addr;
udp_writable_stream_wrapper_ =
MakeGarbageCollected<UDPWritableStreamWrapper>(
init_resolver_->GetScriptState(), udp_socket_);
init_resolver_->Resolve(this);
} else {
if (result != net::Error::OK) {
@ -74,11 +86,16 @@ void UDPSocket::Init(int32_t result,
}
ScriptPromise UDPSocket::close(ScriptState* script_state, ExceptionState&) {
DoClose(/*is_local_close=*/true);
DoClose();
return ScriptPromise::CastUndefined(script_state);
}
WritableStream* UDPSocket::writable() const {
DCHECK(udp_writable_stream_wrapper_);
return udp_writable_stream_wrapper_->Writable();
}
String UDPSocket::remoteAddress() const {
return String::FromUTF8(peer_addr_->ToStringWithoutPort());
}
@ -95,27 +112,37 @@ void UDPSocket::OnReceived(int32_t result,
}
bool UDPSocket::HasPendingActivity() const {
return !!send_resolver_;
if (!udp_writable_stream_wrapper_) {
return false;
}
return udp_writable_stream_wrapper_->HasPendingActivity();
}
void UDPSocket::Trace(Visitor* visitor) const {
visitor->Trace(init_resolver_);
visitor->Trace(send_resolver_);
visitor->Trace(udp_writable_stream_wrapper_);
visitor->Trace(udp_socket_);
visitor->Trace(socket_listener_receiver_);
ScriptWrappable::Trace(visitor);
ActiveScriptWrappable::Trace(visitor);
ExecutionContextClient::Trace(visitor);
}
void UDPSocket::OnSocketListenerConnectionError() {
DoClose(/*is_local_close=*/false);
DoClose();
}
void UDPSocket::DoClose(bool is_local_close) {
void UDPSocket::DoClose() {
init_resolver_ = nullptr;
socket_listener_receiver_.reset();
if (is_local_close && udp_socket_.is_bound())
udp_socket_->Close();
udp_socket_.reset();
// Reject pending write promises.
if (udp_writable_stream_wrapper_) {
udp_writable_stream_wrapper_->Dispose();
}
// Close the socket.
udp_socket_->Close();
feature_handle_for_scheduler_.reset();
}

@ -5,6 +5,7 @@
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_SOCKET_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_SOCKET_H_
#include "base/callback_forward.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/bindings/receiver.h"
@ -15,10 +16,14 @@
#include "third_party/blink/renderer/bindings/core/v8/active_script_wrappable.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/execution_context/execution_context_lifecycle_observer.h"
#include "third_party/blink/renderer/modules/direct_sockets/udp_socket_mojo_remote.h"
#include "third_party/blink/renderer/modules/direct_sockets/udp_writable_stream_wrapper.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/heap/member.h"
#include "third_party/blink/renderer/platform/mojo/heap_mojo_receiver.h"
#include "third_party/blink/renderer/platform/mojo/heap_mojo_remote.h"
#include "third_party/blink/renderer/platform/scheduler/public/frame_or_worker_scheduler.h"
namespace net {
@ -56,6 +61,7 @@ class MODULES_EXPORT UDPSocket final
// Web-exposed functions
ScriptPromise close(ScriptState*, ExceptionState&);
WritableStream* writable() const;
String remoteAddress() const;
uint16_t remotePort() const;
@ -64,23 +70,23 @@ class MODULES_EXPORT UDPSocket final
const absl::optional<::net::IPEndPoint>& src_addr,
absl::optional<::base::span<const ::uint8_t>> data) override;
// ScriptWrappable:
// ActiveScriptWrappable overrides.
bool HasPendingActivity() const override;
void Trace(Visitor* visitor) const override;
private:
void OnSocketListenerConnectionError();
void DoClose(bool is_local_close);
void DoClose();
Member<ScriptPromiseResolver> init_resolver_;
FrameOrWorkerScheduler::SchedulingAffectingFeatureHandle
feature_handle_for_scheduler_;
mojo::Remote<blink::mojom::blink::DirectUDPSocket> udp_socket_;
mojo::Receiver<network::mojom::blink::UDPSocketListener>
socket_listener_receiver_{this};
const Member<UDPSocketMojoRemote> udp_socket_;
HeapMojoReceiver<network::mojom::blink::UDPSocketListener, UDPSocket>
socket_listener_receiver_;
Member<ScriptPromiseResolver> send_resolver_;
Member<UDPWritableStreamWrapper> udp_writable_stream_wrapper_;
absl::optional<net::IPEndPoint> local_addr_;
absl::optional<net::IPEndPoint> peer_addr_;
};

@ -10,7 +10,7 @@
RuntimeEnabled=DirectSockets,
DirectSocketEnabled
] interface UDPSocket {
// TODO(crbug.com/1119620): Add support for sending, receiving
// TODO(crbug.com/1119620): Add support for receiving
// TODO(crbug.com/1119620): Add measurement
[RaisesException, CallWith=ScriptState, Measure]
@ -18,4 +18,5 @@
readonly attribute DOMString remoteAddress;
readonly attribute unsigned short remotePort;
readonly attribute WritableStream writable;
};

@ -0,0 +1,28 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/direct_sockets/udp_socket_mojo_remote.h"
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
namespace blink {
UDPSocketMojoRemote::UDPSocketMojoRemote(ExecutionContext* execution_context)
: udp_socket_{execution_context} {}
UDPSocketMojoRemote::~UDPSocketMojoRemote() = default;
void UDPSocketMojoRemote::Close() {
if (udp_socket_.is_bound()) {
udp_socket_->Close();
}
udp_socket_.reset();
}
void UDPSocketMojoRemote::Trace(Visitor* visitor) const {
visitor->Trace(udp_socket_);
}
} // namespace blink

@ -0,0 +1,36 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_SOCKET_MOJO_REMOTE_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_SOCKET_MOJO_REMOTE_H_
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/mojo/heap_mojo_remote.h"
namespace blink {
// A wrapper class of HeapMojoRemote<DirectUDPSocket> so that multiple owners
// share the same HeapmojoRemote.
class MODULES_EXPORT UDPSocketMojoRemote
: public GarbageCollected<UDPSocketMojoRemote> {
public:
explicit UDPSocketMojoRemote(ExecutionContext* execution_context);
~UDPSocketMojoRemote();
HeapMojoRemote<mojom::blink::DirectUDPSocket>& get() { return udp_socket_; }
void Close();
void Trace(Visitor* visitor) const;
private:
HeapMojoRemote<mojom::blink::DirectUDPSocket> udp_socket_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_SOCKET_MOJO_REMOTE_H_

@ -0,0 +1,193 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/direct_sockets/udp_writable_stream_wrapper.h"
#include "net/base/net_errors.h"
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_typedefs.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_union_arraybuffer_arraybufferview.h"
#include "third_party/blink/renderer/bindings/modules/v8/v8_udp_message.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/execution_context/execution_context_lifecycle_observer.h"
#include "third_party/blink/renderer/core/streams/underlying_sink_base.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/core/streams/writable_stream_default_controller.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_array_piece.h"
#include "third_party/blink/renderer/modules/direct_sockets/udp_socket_mojo_remote.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
namespace blink {
// UDPWritableStreamWrapper::UnderlyingSink declaration
class UDPWritableStreamWrapper::UnderlyingSink final
: public UnderlyingSinkBase {
public:
explicit UnderlyingSink(UDPWritableStreamWrapper* udp_writable_stream_wrapper)
: udp_writable_stream_wrapper_(udp_writable_stream_wrapper) {}
ScriptPromise start(ScriptState* script_state,
WritableStreamDefaultController* controller,
ExceptionState&) override;
ScriptPromise write(ScriptState* script_state,
ScriptValue chunk,
WritableStreamDefaultController*,
ExceptionState& exception_state) override;
ScriptPromise close(ScriptState* script_state, ExceptionState&) override;
ScriptPromise abort(ScriptState* script_state,
ScriptValue reason,
ExceptionState& exception_state) override;
void Trace(Visitor* visitor) const override {
visitor->Trace(udp_writable_stream_wrapper_);
UnderlyingSinkBase::Trace(visitor);
}
private:
const Member<UDPWritableStreamWrapper> udp_writable_stream_wrapper_;
};
// UDPWritableStreamWrapper::UnderlyingSink definition
ScriptPromise UDPWritableStreamWrapper::UnderlyingSink::start(
ScriptState* script_state,
WritableStreamDefaultController* controller,
ExceptionState&) {
udp_writable_stream_wrapper_->controller_ = controller;
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise UDPWritableStreamWrapper::UnderlyingSink::write(
ScriptState* script_state,
ScriptValue chunk,
WritableStreamDefaultController*,
ExceptionState& exception_state) {
return udp_writable_stream_wrapper_->SinkWrite(script_state, chunk,
exception_state);
}
ScriptPromise UDPWritableStreamWrapper::UnderlyingSink::close(
ScriptState* script_state,
ExceptionState&) {
// The specification guarantees that this will only be called after all
// pending writes have been completed.
DCHECK(!udp_writable_stream_wrapper_->send_resolver_);
// It's not possible to close the writable side of the UDP socket, therefore
// no action is taken.
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise UDPWritableStreamWrapper::UnderlyingSink::abort(
ScriptState* script_state,
ScriptValue reason,
ExceptionState& exception_state) {
// The specification guarantees that this will only be called after all
// pending writes have been completed.
DCHECK(!udp_writable_stream_wrapper_->send_resolver_);
// It's not possible to close the writable side of the UDP socket, therefore
// no action is taken.
return ScriptPromise::CastUndefined(script_state);
}
// UDPWritableStreamWrapper definition
UDPWritableStreamWrapper::UDPWritableStreamWrapper(
ScriptState* script_state,
const Member<UDPSocketMojoRemote> udp_socket)
: ExecutionContextClient(ExecutionContext::From(script_state)),
script_state_(script_state),
udp_socket_(udp_socket) {
ScriptState::Scope scope(script_state);
writable_ = WritableStream::CreateWithCountQueueingStrategy(
script_state_,
MakeGarbageCollected<UDPWritableStreamWrapper::UnderlyingSink>(this), 1);
}
UDPWritableStreamWrapper::~UDPWritableStreamWrapper() = default;
bool UDPWritableStreamWrapper::HasPendingActivity() const {
return !!send_resolver_;
}
void UDPWritableStreamWrapper::Trace(Visitor* visitor) const {
visitor->Trace(script_state_);
visitor->Trace(udp_socket_);
visitor->Trace(send_resolver_);
visitor->Trace(writable_);
visitor->Trace(controller_);
ActiveScriptWrappable::Trace(visitor);
ExecutionContextClient::Trace(visitor);
}
ScriptPromise UDPWritableStreamWrapper::SinkWrite(
ScriptState* script_state,
ScriptValue chunk,
ExceptionState& exception_state) {
// If socket has been closed.
if (!udp_socket_->get().is_bound()) {
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"Socket is disconnected.");
return ScriptPromise();
}
UDPMessage* message = UDPMessage::Create(script_state->GetIsolate(),
chunk.V8Value(), exception_state);
if (!message->hasData()) {
exception_state.ThrowDOMException(DOMExceptionCode::kDataError,
"UDPMessage: missing 'data' field.");
return ScriptPromise();
}
DOMArrayPiece array_piece(message->data());
base::span<const uint8_t> data{array_piece.Bytes(), array_piece.ByteLength()};
DCHECK(!send_resolver_);
send_resolver_ = MakeGarbageCollected<ScriptPromiseResolver>(script_state);
// Why not just return send_resolver_->Promise()?
// In view of the async nature of the write handler, the callback might get
// executed earlier than the function return statement. There are two
// concerns related to that behavior:
// -- send_resolver_ will be set to nullptr and the above call with crash;
// -- send_resover_->Reject() will be called earlier than
// send_resolver_->Promise(), and the resulting promise will be dummy
// (i.e. fulfilled by default).
ScriptPromise promise = send_resolver_->Promise();
udp_socket_->get()->Send(data, WTF::Bind(&UDPWritableStreamWrapper::OnSend,
WrapWeakPersistent(this)));
return promise;
}
void UDPWritableStreamWrapper::OnSend(int32_t result) {
if (send_resolver_) {
if (result == net::Error::OK) {
send_resolver_->Resolve();
} else {
send_resolver_->Reject(MakeGarbageCollected<DOMException>(
DOMExceptionCode::kNetworkError, "Failed to send."));
}
send_resolver_ = nullptr;
}
}
void UDPWritableStreamWrapper::Dispose() {
if (send_resolver_) {
send_resolver_->Reject(MakeGarbageCollected<DOMException>(
DOMExceptionCode::kInvalidStateError, "Failed to send data."));
send_resolver_ = nullptr;
}
}
} // namespace blink

@ -0,0 +1,69 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_WRITABLE_STREAM_WRAPPER_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_WRITABLE_STREAM_WRAPPER_H_
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink.h"
#include "third_party/blink/renderer/bindings/core/v8/active_script_wrappable.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/dom/events/event_target_impl.h"
#include "third_party/blink/renderer/core/execution_context/execution_context_lifecycle_observer.h"
#include "third_party/blink/renderer/modules/direct_sockets/udp_socket_mojo_remote.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/heap/prefinalizer.h"
#include "third_party/blink/renderer/platform/mojo/heap_mojo_remote.h"
namespace blink {
class ScriptState;
class WritableStream;
class WritableStreamDefaultController;
class MODULES_EXPORT UDPWritableStreamWrapper final
: public GarbageCollected<UDPWritableStreamWrapper>,
public ActiveScriptWrappable<UDPWritableStreamWrapper>,
public ExecutionContextClient {
USING_PRE_FINALIZER(UDPWritableStreamWrapper, Dispose);
public:
UDPWritableStreamWrapper(ScriptState* script_state,
const Member<UDPSocketMojoRemote> udp_socket_);
~UDPWritableStreamWrapper() override;
WritableStream* Writable() const { return writable_; }
// ActiveScriptWrappable overrides.
bool HasPendingActivity() const;
void Trace(Visitor*) const override;
// Called before destruction of the StreamWrapper.
void Dispose();
private:
class UnderlyingSink;
// Implements UnderlyingSink::write().
ScriptPromise SinkWrite(ScriptState* script_state,
ScriptValue chunk,
ExceptionState& exception_state);
// Callback for DirectUDPSocket::Send().
void OnSend(int32_t result);
const Member<ScriptState> script_state_;
const Member<UDPSocketMojoRemote> udp_socket_;
Member<ScriptPromiseResolver> send_resolver_;
Member<WritableStream> writable_;
Member<WritableStreamDefaultController> controller_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_DIRECT_SOCKETS_UDP_WRITABLE_STREAM_WRAPPER_H_

@ -0,0 +1,249 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/direct_sockets/udp_writable_stream_wrapper.h"
#include "base/bind.h"
#include "base/notreached.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "net/base/net_errors.h"
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink-forward.h"
#include "third_party/blink/public/mojom/direct_sockets/direct_sockets.mojom-blink.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_tester.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_testing.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_dom_exception.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/bindings/modules/v8/v8_udp_message.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/core/streams/writable_stream_default_writer.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/modules/direct_sockets/udp_socket_mojo_remote.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/testing/unit_test_helpers.h"
#include "third_party/blink/renderer/platform/wtf/allocator/allocator.h"
#include "third_party/googletest/src/googlemock/include/gmock/gmock-matchers.h"
namespace blink {
namespace {
class FakeDirectUDPSocket : public blink::mojom::blink::DirectUDPSocket {
public:
void Send(base::span<const uint8_t> data, SendCallback callback) override {
data_.Append(data.data(), static_cast<uint32_t>(data.size_bytes()));
std::move(callback).Run(net::Error::OK);
}
void ReceiveMore(uint32_t num_additional_datagrams) override { NOTREACHED(); }
void Close() override { NOTREACHED(); }
const Vector<uint8_t>& GetReceivedData() const { return data_; }
private:
Vector<uint8_t> data_;
};
class StreamCreator {
STACK_ALLOCATED();
public:
explicit StreamCreator(const V8TestingScope& scope,
FakeDirectUDPSocket* fake_udp_socket)
: scope_(scope), receiver_{fake_udp_socket} {}
~StreamCreator() { test::RunPendingTasks(); }
UDPWritableStreamWrapper* Create() {
auto* udp_socket =
MakeGarbageCollected<UDPSocketMojoRemote>(scope_.GetExecutionContext());
udp_socket->get().Bind(
receiver_.BindNewPipeAndPassRemote(),
scope_.GetExecutionContext()->GetTaskRunner(TaskType::kNetworking));
auto* script_state = scope_.GetScriptState();
auto* udp_writable_stream_wrapper =
MakeGarbageCollected<UDPWritableStreamWrapper>(script_state,
udp_socket);
return udp_writable_stream_wrapper;
}
private:
const V8TestingScope& scope_;
mojo::Receiver<blink::mojom::blink::DirectUDPSocket> receiver_;
};
TEST(UDPWritableStreamWrapperTest, Create) {
V8TestingScope scope;
FakeDirectUDPSocket fake_udp_socket;
StreamCreator stream_creator{scope, &fake_udp_socket};
auto* udp_writable_stream_wrapper = stream_creator.Create();
EXPECT_TRUE(udp_writable_stream_wrapper->Writable());
}
TEST(UDPWritableStreamWrapperTest, WriteUdpMessage) {
V8TestingScope scope;
FakeDirectUDPSocket fake_udp_socket;
StreamCreator stream_creator{scope, &fake_udp_socket};
auto* udp_writable_stream_wrapper = stream_creator.Create();
auto* script_state = scope.GetScriptState();
auto* writer = udp_writable_stream_wrapper->Writable()->getWriter(
script_state, ASSERT_NO_EXCEPTION);
auto* chunk = DOMArrayBuffer::Create("A", 1);
auto* message = UDPMessage::Create();
message->setData(chunk);
ScriptPromise result =
writer->write(script_state, ScriptValue::From(script_state, message),
ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, result);
tester.WaitUntilSettled();
ASSERT_TRUE(tester.IsFulfilled());
EXPECT_THAT(fake_udp_socket.GetReceivedData(), ::testing::ElementsAre('A'));
}
TEST(UDPWritableStreamWrapperTest, WriteUdpMessageWithEmptyDataField) {
V8TestingScope scope;
FakeDirectUDPSocket fake_udp_socket;
StreamCreator stream_creator{scope, &fake_udp_socket};
auto* udp_writable_stream_wrapper = stream_creator.Create();
auto* script_state = scope.GetScriptState();
auto* writer = udp_writable_stream_wrapper->Writable()->getWriter(
script_state, ASSERT_NO_EXCEPTION);
// Create empty DOMArrayBuffer.
auto* chunk = DOMArrayBuffer::Create(/*num_elements=*/static_cast<size_t>(0),
/*element_byte_size=*/1);
auto* message = UDPMessage::Create();
message->setData(chunk);
ScriptPromise result =
writer->write(script_state, ScriptValue::From(script_state, message),
ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, result);
tester.WaitUntilSettled();
ASSERT_TRUE(tester.IsFulfilled());
// Nothing should have been written from the empty DOMArrayBuffer.
EXPECT_THAT(fake_udp_socket.GetReceivedData(), ::testing::ElementsAre());
}
TEST(UDPWritableStreamWrapperTest, WriteUdpMessageWithoutDataField) {
V8TestingScope scope;
FakeDirectUDPSocket fake_udp_socket;
StreamCreator stream_creator{scope, &fake_udp_socket};
auto* udp_writable_stream_wrapper = stream_creator.Create();
auto* script_state = scope.GetScriptState();
auto* writer = udp_writable_stream_wrapper->Writable()->getWriter(
script_state, ASSERT_NO_EXCEPTION);
// Create empty message (without 'data' field).
auto* message = UDPMessage::Create();
ScriptPromise result =
writer->write(script_state, ScriptValue::From(script_state, message),
ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, result);
tester.WaitUntilSettled();
// Should be rejected due to missing 'data' field.
ASSERT_TRUE(tester.IsRejected());
DOMException* exception = V8DOMException::ToImplWithTypeCheck(
scope.GetIsolate(), tester.Value().V8Value());
ASSERT_TRUE(exception);
ASSERT_EQ(exception->name(), "DataError");
ASSERT_EQ(exception->message(),
"Failed to execute 'write' on 'UnderlyingSinkBase': UDPMessage: "
"missing 'data' field.");
}
TEST(UDPWritableStreamWrapperTest, WriteAfterFinishedWrite) {
V8TestingScope scope;
FakeDirectUDPSocket fake_udp_socket;
StreamCreator stream_creator{scope, &fake_udp_socket};
auto* udp_writable_stream_wrapper = stream_creator.Create();
auto* script_state = scope.GetScriptState();
auto* writer = udp_writable_stream_wrapper->Writable()->getWriter(
script_state, ASSERT_NO_EXCEPTION);
for (const auto* value : {"A", "B"}) {
auto* chunk = DOMArrayBuffer::Create(value, 1);
auto* message = UDPMessage::Create();
message->setData(chunk);
ScriptPromise result =
writer->write(script_state, ScriptValue::From(script_state, message),
ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, result);
tester.WaitUntilSettled();
ASSERT_TRUE(tester.IsFulfilled());
}
EXPECT_THAT(fake_udp_socket.GetReceivedData(),
::testing::ElementsAre('A', 'B'));
}
TEST(UDPWritableStreamWrapperTest, WriteAfterClose) {
V8TestingScope scope;
FakeDirectUDPSocket fake_udp_socket;
StreamCreator stream_creator{scope, &fake_udp_socket};
auto* udp_writable_stream_wrapper = stream_creator.Create();
auto* script_state = scope.GetScriptState();
auto* writer = udp_writable_stream_wrapper->Writable()->getWriter(
script_state, ASSERT_NO_EXCEPTION);
auto* chunk = DOMArrayBuffer::Create("A", 1);
auto* message = UDPMessage::Create();
message->setData(chunk);
ScriptPromise write_result =
writer->write(script_state, ScriptValue::From(script_state, message),
ASSERT_NO_EXCEPTION);
ScriptPromiseTester write_tester(script_state, write_result);
write_tester.WaitUntilSettled();
ASSERT_TRUE(write_tester.IsFulfilled());
ScriptPromise close_result = writer->close(script_state, ASSERT_NO_EXCEPTION);
ScriptPromiseTester close_tester(script_state, close_result);
close_tester.WaitUntilSettled();
ASSERT_TRUE(write_tester.IsFulfilled());
ScriptPromise write_after_close_result =
writer->write(script_state, ScriptValue::From(script_state, message),
ASSERT_NO_EXCEPTION);
ScriptPromiseTester write_after_close_tester(script_state,
write_after_close_result);
write_after_close_tester.WaitUntilSettled();
ASSERT_TRUE(write_after_close_tester.IsRejected());
}
} // namespace
} // namespace blink