0

Introducing mojo::DataPipeProducedHandle::WriteAllData method.

The new method offers a more ergonomic alternative to using
`MOJO_WRITE_DATA_FLAG_ALL_OR_NONE`.  For more details and discussion
please see
https://docs.google.com/document/d/1c4NKpXwpQ9MKK1SbJ4C6MvhXI8-KJZ4jq7N4VHTHJoI/edit#heading=h.ti2k549mrv1h

Bug: 40284755
Change-Id: Iecb50c0b8aab0e8f72bd2eae814eb2b1a4e54e6c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5646444
Commit-Queue: Łukasz Anforowicz <lukasza@chromium.org>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Owners-Override: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1318748}
This commit is contained in:
Lukasz Anforowicz
2024-06-24 19:57:19 +00:00
committed by Chromium LUCI CQ
parent 695286bad6
commit 0339afd627
27 changed files with 68 additions and 146 deletions

@ -184,15 +184,11 @@ void PluginResponseInterceptorURLLoaderThrottle::WillProcessResponse(
mojo::CreateDataPipe(payload.size(), producer_handle, consumer_handle),
MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
CHECK_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(payload),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
producer_handle->WriteAllData(base::as_byte_span(payload)));
network::URLLoaderCompletionStatus status(net::OK);
status.decoded_body_length =
base::checked_cast<int64_t>(actually_written_bytes);
status.decoded_body_length = base::checked_cast<int64_t>(payload.size());
new_client->OnComplete(status);
mojo::PendingRemote<network::mojom::URLLoader> original_loader;

@ -60,11 +60,7 @@ bool TlsClientConnection::Send(openscreen::ByteView data) {
return false;
}
base::span<const uint8_t> span(data.data(), data.size());
size_t actually_written_bytes = 0;
const MojoResult result = send_stream_->WriteData(
span, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, actually_written_bytes);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE` flag.
const MojoResult result = send_stream_->WriteAllData(data);
mojo::HandleSignalsState state = send_stream_->QuerySignalsState();
return ProcessMojoResult(result, state.peer_closed()
? Error::Code::kSocketClosedFailure

@ -77,11 +77,8 @@ class FakeSocketStreams {
// Writes data into the inbound data pipe, which should ultimately result in a
// TlsClientConnection::Client's OnRead() method being called.
void SimulateSocketReceive(base::span<const uint8_t> data) {
size_t actually_written_bytes = 0;
const MojoResult result = inbound_stream_->WriteData(
data, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, actually_written_bytes);
const MojoResult result = inbound_stream_->WriteAllData(data);
ASSERT_EQ(result, MOJO_RESULT_OK);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE` flag.
}
// Closes the inbound (or outbound) data pipe to allow the unit tests to check

@ -436,12 +436,8 @@ TEST_P(InterestGroupPermissionsCheckerParamaterizedTest,
mojo::ScopedDataPipeConsumerHandle body;
ASSERT_EQ(mojo::CreateDataPipe(response_body.size(), producer_handle, body),
MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(response_body),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
// `actually_written_bytes` can be ignored because of `...ALL_OR_NONE`.
producer_handle->WriteAllData(base::as_byte_span(response_body)));
pending_request.client->OnReceiveResponse(std::move(head), std::move(body),
std::nullopt);

@ -319,12 +319,8 @@ class RequestInterceptor {
original_client_->OnReceiveResponse(
std::move(response_head), std::move(consumer_handle), std::nullopt);
size_t actually_written_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(response_body),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
// `actually_written_bytes` can be ignored because of `...ALL_OR_NONE`.
EXPECT_EQ(MOJO_RESULT_OK, producer_handle->WriteAllData(
base::as_byte_span(response_body)));
}
original_client_->OnComplete(status);

@ -1040,10 +1040,8 @@ class PrefetchServiceTest : public RenderViewHostTestHarness {
ASSERT_TRUE(producer_handle_for_gurl_.count(request->request.url));
ASSERT_TRUE(producer_handle_for_gurl_[request->request.url]);
size_t actually_written_bytes = 0;
EXPECT_EQ(producer_handle_for_gurl_[request->request.url]->WriteData(
base::as_byte_span(body), MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes),
EXPECT_EQ(producer_handle_for_gurl_[request->request.url]->WriteAllData(
base::as_byte_span(body)),
MOJO_RESULT_OK);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE`.
task_environment()->RunUntilIdle();

@ -87,11 +87,8 @@ class TestURLLoaderFactory : public network::mojom::URLLoaderFactory {
void SimulateReceiveData(const std::string& data,
bool expected_successful = true) {
ASSERT_TRUE(producer_handle_);
size_t actually_written_bytes = 0;
MojoResult write_result = producer_handle_->WriteData(
base::as_byte_span(data), MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE`.
MojoResult write_result =
producer_handle_->WriteAllData(base::as_byte_span(data));
if (expected_successful) {
EXPECT_EQ(write_result, MOJO_RESULT_OK);

@ -1156,11 +1156,8 @@ TEST_P(PrefetchURLLoaderInterceptorBecomeNotServableTest, DISABLE_ASAN(Basic)) {
CHECK_EQ(
mojo::CreateDataPipe(content.size(), producer_handle, consumer_handle),
MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
CHECK_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(content),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
producer_handle->WriteAllData(base::as_byte_span(content)));
pending_request.client->OnReceiveResponse(
network::mojom::URLResponseHead::New(), std::move(consumer_handle),
std::nullopt);
@ -1532,11 +1529,8 @@ TEST_P(PrefetchURLLoaderInterceptorTest,
CHECK_EQ(
mojo::CreateDataPipe(content.size(), producer_handle, consumer_handle),
MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
CHECK_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(content),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
producer_handle->WriteAllData(base::as_byte_span(content)));
pending_request.client->OnReceiveResponse(
network::mojom::URLResponseHead::New(), std::move(consumer_handle),
std::nullopt);
@ -1607,11 +1601,8 @@ TEST_P(PrefetchURLLoaderInterceptorTest,
CHECK_EQ(
mojo::CreateDataPipe(content.size(), producer_handle, consumer_handle),
MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
CHECK_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(content),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
producer_handle->WriteAllData(base::as_byte_span(content)));
pending_request.client->OnReceiveResponse(
network::mojom::URLResponseHead::New(), std::move(consumer_handle),
std::nullopt);

@ -124,10 +124,8 @@ class MockNetwork {
mojo::ScopedDataPipeConsumerHandle consumer;
mojo::ScopedDataPipeProducerHandle producer;
CHECK_EQ(MOJO_RESULT_OK, mojo::CreateDataPipe(nullptr, producer, consumer));
size_t actually_written_bytes = 0;
MojoResult result = producer->WriteData(base::as_byte_span(response.body),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
MojoResult result =
producer->WriteAllData(base::as_byte_span(response.body));
CHECK_EQ(MOJO_RESULT_OK, result);
client->OnReceiveResponse(std::move(response_head), std::move(consumer),
std::nullopt);

@ -806,11 +806,8 @@ TEST_F(ServiceWorkerSingleScriptUpdateCheckerTest,
mojo::ScopedDataPipeProducerHandle producer;
EXPECT_EQ(MOJO_RESULT_OK,
mojo::CreateDataPipe(&options, producer, consumer));
size_t actually_written_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
producer->WriteData(base::as_byte_span(body_from_net),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
producer->WriteAllData(base::as_byte_span(body_from_net)));
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE`.
request->client->OnReceiveResponse(std::move(head), std::move(consumer),
std::nullopt);

@ -878,12 +878,9 @@ void ServiceWorkerUpdateCheckTestUtils::
// Create a data pipe which has the new block sent from the network.
ASSERT_EQ(MOJO_RESULT_OK, mojo::CreateDataPipe(nullptr, *out_body_handle,
network_consumer));
size_t written_size = 0;
ASSERT_EQ(MOJO_RESULT_OK,
(*out_body_handle)
->WriteData(base::as_byte_span(diff_data_block),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, written_size));
ASSERT_EQ(diff_data_block.size(), written_size);
ASSERT_EQ(
MOJO_RESULT_OK,
(*out_body_handle)->WriteAllData(base::as_byte_span(diff_data_block)));
base::RunLoop().RunUntilIdle();
// Read the data to make a pending buffer.

@ -36,15 +36,12 @@ bool AbortOnEndInterceptor(URLLoaderInterceptor::RequestParams* params) {
response->headers->GetMimeType(&response->mime_type);
std::string body = "some data\r\n";
size_t actually_written_bytes = 0;
mojo::ScopedDataPipeProducerHandle producer_handle;
mojo::ScopedDataPipeConsumerHandle consumer_handle;
CHECK_EQ(mojo::CreateDataPipe(body.size(), producer_handle, consumer_handle),
MOJO_RESULT_OK);
CHECK_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(body),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
producer_handle->WriteAllData(base::as_byte_span(body)));
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE`.
params->client->OnReceiveResponse(std::move(response),
std::move(consumer_handle), std::nullopt);

@ -105,20 +105,19 @@ void RaceNetworkRequestWriteBufferManager::ArmOrNotify() {
}
std::tuple<MojoResult, size_t> RaceNetworkRequestWriteBufferManager::WriteData(
base::span<const char> read_buffer) {
// In order to use |MOJO_WRITE_DATA_FLAG_ALL_OR_NONE| flag to write data, the
// read buffer data size should be smaller than the write buffer size.
// Otherwise we can't finish the write operation and `WriteData()` always
// returns |MOJO_RESULT_OUT_OF_RANGE|.
read_buffer = read_buffer.first(
std::min(read_buffer.size(), size_t{data_pipe_buffer_size_}));
size_t actually_written_bytes = 0;
MojoResult result = producer_->WriteData(base::as_bytes(read_buffer),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
num_bytes_written_ += actually_written_bytes;
base::span<const char> buffer) {
// In order to use `WriteAllData` method to write data, the read buffer data
// size should be smaller than the write buffer size. Otherwise we can't
// finish the write operation and `WriteData()` always returns
// |MOJO_RESULT_OUT_OF_RANGE|.
buffer =
buffer.first(std::min(buffer.size(), size_t{data_pipe_buffer_size_}));
MojoResult result = producer_->WriteAllData(base::as_bytes(buffer));
if (result == MOJO_RESULT_OK) {
num_bytes_written_ += buffer.size();
}
return {result, actually_written_bytes};
return {result, buffer.size()};
}
size_t RaceNetworkRequestWriteBufferManager::CopyAndCompleteWriteData(

@ -35,7 +35,7 @@ class CONTENT_EXPORT RaceNetworkRequestWriteBufferManager {
void CancelWatching();
MojoResult BeginWriteData();
MojoResult EndWriteData(uint32_t num_bytes_written);
std::tuple<MojoResult, size_t> WriteData(base::span<const char> read_buffer);
std::tuple<MojoResult, size_t> WriteData(base::span<const char> buffer);
void ArmOrNotify();
size_t buffer_size() const { return buffer_.size(); }
size_t num_bytes_written() const { return num_bytes_written_; }

@ -498,10 +498,7 @@ void URLLoaderInterceptor::WriteResponse(
CreateDataPipe(&options, producer_handle, consumer_handle);
CHECK_EQ(result, MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
result = producer_handle->WriteData(base::as_byte_span(body),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
result = producer_handle->WriteAllData(base::as_byte_span(body));
CHECK_EQ(result, MOJO_RESULT_OK);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE` flag.

@ -106,15 +106,11 @@ bool FakeNetwork::HandleRequest(URLLoaderInterceptor::RequestParams* params) {
network::PopulateParsedHeaders(info.headers.get(), url_request.url);
mojo::Remote<network::mojom::URLLoaderClient>& client = params->client;
size_t actually_written_bytes = 0;
mojo::ScopedDataPipeProducerHandle producer_handle;
mojo::ScopedDataPipeConsumerHandle consumer_handle;
CHECK_EQ(MOJO_RESULT_OK,
mojo::CreateDataPipe(nullptr, producer_handle, consumer_handle));
producer_handle->WriteData(base::as_byte_span(response_info.body),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE`.
producer_handle->WriteAllData(base::as_byte_span(response_info.body));
client->OnReceiveResponse(std::move(response), std::move(consumer_handle),
std::nullopt);

@ -48,13 +48,7 @@ bool WebSocketAdapter::Write(base::span<const uint8_t> data) {
}
socket_remote_->SendMessage(network::mojom::WebSocketMessageType::BINARY,
data.size());
size_t actually_written_bytes = 0;
MojoResult result = write_pipe_->WriteData(
data, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, actually_written_bytes);
// This follows from the `...ALL_OR_NONE` flag.
DCHECK(result != MOJO_RESULT_OK || data.size() == actually_written_bytes);
MojoResult result = write_pipe_->WriteAllData(data);
return result == MOJO_RESULT_OK;
}

@ -149,10 +149,7 @@ void EnclaveWebSocketClient::InternalWrite(base::span<const uint8_t> data) {
websocket_->SendMessage(network::mojom::WebSocketMessageType::BINARY,
data.size());
size_t actually_written_bytes = 0;
MojoResult result = writable_->WriteData(
data, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, actually_written_bytes);
CHECK(result != MOJO_RESULT_OK || data.size() == actually_written_bytes);
MojoResult result = writable_->WriteAllData(data);
if (result != MOJO_RESULT_OK) {
FIDO_LOG(ERROR) << "Failed to write to WebSocket.";
ClosePipe(SocketStatus::kError);

@ -40,6 +40,10 @@ class DataPipeProducerHandle : public Handle {
// how many bytes (from the beginning of `data`) were actually written into
// the mojo data pipe (depending on the size of the internal pipe buffer, this
// may be less than `data.size()`).
//
// Note that instead of passing specific `flags`, a more direct method can be
// used instead:
// - `MOJO_WRITE_DATA_FLAG_ALL_OR_NONE` => `WriteAllData`
MojoResult WriteData(base::span<const uint8_t> data,
MojoWriteDataFlags flags,
size_t& bytes_written) const {
@ -64,6 +68,15 @@ class DataPipeProducerHandle : public Handle {
return result;
}
MojoResult WriteAllData(base::span<const uint8_t> data) const {
// Ok to ignore `bytes_written` because `MOJO_WRITE_DATA_FLAG_ALL_OR_NONE`
// means that `MOJO_RESULT_OK` will be returned only if exactly
// `data.size()` bytes have been written (i.e. if `data.size() ==
// bytes_written`).
size_t bytes_written = 0;
return WriteData(data, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, bytes_written);
}
// TODO(https://crbug.com/40284755): Remove this deprecated, non-`span`-ified
// overload (after double-checking that no callers remain).
MojoResult WriteData(const void* elements,
@ -183,6 +196,10 @@ class DataPipeConsumerHandle : public Handle {
// many bytes (at the beginning of `buffer`) were actually read (depending on
// the size of the internal pipe buffer, this may be less than
// `buffer.size()`).
//
// Note that instead of passing specific `flags`, a more direct method can be
// used instead:
// - `MOJO_READ_DATA_FLAG_DISCARD` => `DiscardData`
MojoResult ReadData(MojoReadDataFlags flags,
base::span<uint8_t> buffer,
size_t& bytes_read) const {

@ -630,10 +630,8 @@ TEST_F(TCPSocketTest, SocketClosed) {
// Send pipe should be closed.
while (true) {
base::RunLoop().RunUntilIdle();
size_t actually_written_bytes = 0;
MojoResult r = client_socket_send_handle->WriteData(
base::as_byte_span(kTestMsg), MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
MojoResult r =
client_socket_send_handle->WriteAllData(base::as_byte_span(kTestMsg));
if (r == MOJO_RESULT_SHOULD_WAIT)
continue;
if (r == MOJO_RESULT_FAILED_PRECONDITION)

@ -342,12 +342,8 @@ void TestURLLoaderFactory::SimulateResponse(
mojo::ScopedDataPipeProducerHandle producer_handle;
CHECK_EQ(mojo::CreateDataPipe(content.size(), producer_handle, body),
MOJO_RESULT_OK);
size_t actually_written_bytes = 0;
CHECK_EQ(MOJO_RESULT_OK,
producer_handle->WriteData(base::as_byte_span(content),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes));
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE` flag.
producer_handle->WriteAllData(base::as_byte_span(content)));
}
if ((response_flags & kSendHeadersOnNetworkError) ||

@ -31,15 +31,12 @@ void FakeBlobDataHandle::Read(mojo::ScopedDataPipeProducerHandle producer,
base::span<const uint8_t> bytes = base::as_byte_span(body_data_);
bytes = bytes.subspan(src_offset);
bytes = bytes.first(base::checked_cast<size_t>(bytes_to_read));
size_t actually_written_bytes = 0;
MojoResult result = producer->WriteData(
bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE, actually_written_bytes);
MojoResult result = producer->WriteAllData(bytes);
// This should all succeed.
DCHECK_EQ(MOJO_RESULT_OK, result);
DCHECK_EQ(bytes.size(), actually_written_bytes);
std::move(callback).Run(base::checked_cast<int>(actually_written_bytes));
std::move(callback).Run(base::checked_cast<int>(bytes.size()));
}
uint64_t FakeBlobDataHandle::GetSideDataSize() const {

@ -155,12 +155,8 @@ class NoopLoaderFactory final : public ResourceFetcher::LoaderFactory {
void AppendDataToDataPipe(std::string_view data,
mojo::ScopedDataPipeProducerHandle& producer_handle) {
size_t actually_written_bytes = 0;
MojoResult result = producer_handle->WriteData(
base::as_byte_span(data), MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
MojoResult result = producer_handle->WriteAllData(base::as_byte_span(data));
EXPECT_EQ(result, MOJO_RESULT_OK);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE` flag.
// In case the mojo datapipe is being read on the main thread, we need to
// spin the event loop to allow the watcher to post its "data received"
@ -1770,9 +1766,8 @@ TEST_F(BackgroundResourceScriptStreamerTest,
const std::string function_line = base::StrCat(
{kFunctionScript,
std::string(kDataPipeSize - kFunctionScript.size(), '/')});
size_t data_len = function_line.size();
MojoResult result = producer_handle_->WriteData(
function_line.data(), &data_len, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
MojoResult result =
producer_handle_->WriteAllData(base::as_byte_span(function_line));
EXPECT_EQ(result, MOJO_RESULT_OK);
// Busyloop until the parser thread reads the `function_line` form the data
// pipe.

@ -64,12 +64,7 @@ class StreamCreator : public GarbageCollected<StreamCreator> {
void ResetPipe() { data_pipe_producer_.reset(); }
void WriteToPipe(Vector<uint8_t> data) {
size_t actually_written_bytes = 0;
EXPECT_EQ(
data_pipe_producer_->WriteData(data, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes),
MOJO_RESULT_OK);
EXPECT_EQ(actually_written_bytes, data.size());
EXPECT_EQ(data_pipe_producer_->WriteAllData(data), MOJO_RESULT_OK);
}
// Copies the contents of a v8::Value containing a Uint8Array to a Vector.

@ -305,12 +305,8 @@ void TestRead(V8TestingScope& scope,
BidirectionalStream* bidirectional_stream) {
mojo::ScopedDataPipeProducerHandle& input_producer =
scoped_web_transport->Stub()->InputProducer();
constexpr std::string_view input = "B";
size_t actually_written_bytes = 0;
MojoResult mojo_result = input_producer->WriteData(
base::as_byte_span(input), MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes);
// Ok to ignore `actually_written_bytes` because of `...ALL_OR_NONE` flag.
MojoResult mojo_result =
input_producer->WriteAllData(base::as_byte_span(std::string_view("B")));
ASSERT_EQ(mojo_result, MOJO_RESULT_OK);

@ -63,12 +63,7 @@ class IncomingStreamTest : public ::testing::Test {
}
void WriteToPipe(Vector<uint8_t> data) {
size_t actually_written_bytes = 0;
EXPECT_EQ(
data_pipe_producer_->WriteData(data, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes),
MOJO_RESULT_OK);
EXPECT_EQ(actually_written_bytes, data.size());
EXPECT_EQ(data_pipe_producer_->WriteAllData(data), MOJO_RESULT_OK);
}
void ClosePipe() { data_pipe_producer_.reset(); }

@ -1764,13 +1764,7 @@ TEST_F(WebTransportTest, CreateReceiveStream) {
ReceiveStream* receive_stream = ReadReceiveStream(scope, web_transport);
const std::string_view data = "what";
size_t actually_written_bytes = 0;
EXPECT_EQ(producer->WriteData(base::as_byte_span(data),
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE,
actually_written_bytes),
MOJO_RESULT_OK);
EXPECT_EQ(actually_written_bytes, 4u);
EXPECT_EQ(producer->WriteAllData(base::as_byte_span(data)), MOJO_RESULT_OK);
producer.reset();
web_transport->OnIncomingStreamClosed(/*stream_id=*/0, true);