0

[Chromecast]Add MixerServiceConnection

MixerServiceConnection is a component that connects
to MixerService via Socket.

Bug: internal b/29571387
Test: build and run on an audio device and verify the
assistant works fine.


Change-Id: Iae323943909167fe247666ffd2c319efcf34c3ac
Reviewed-on: https://chromium-review.googlesource.com/1231813
Reviewed-by: Yuchen Liu <yucliu@chromium.org>
Reviewed-by: Sergey Volk <servolk@chromium.org>
Reviewed-by: Misha Efimov <mef@chromium.org>
Reviewed-by: Kenneth MacKay <kmackay@chromium.org>
Commit-Queue: Guohui Deng <guohuideng@chromium.org>
Cr-Commit-Position: refs/heads/master@{#593229}
This commit is contained in:
Guohui Deng
2018-09-21 16:53:14 +00:00
committed by Commit Bot
parent 027e4add26
commit 1c484f2353
13 changed files with 1055 additions and 0 deletions

@ -162,6 +162,11 @@ const char kBackGestureHorizontalThreshold[] =
// Whether to enable detection and dispatch of a 'drag from the top' gesture.
const char kEnableTopDragGesture[] = "enable-top-drag-gesture";
// Endpoint that the mixer service listens on. On Linux/Android, this is a
// path for a UNIX domain socket (default is /tmp/mixer-service). On other
// platforms, this is a TCP port to listen on (on localhost) (default 12854).
const char kMixerServiceEndpoint[] = "mixer-service-endpoint";
extern const char kCastMemoryPressureCriticalFraction[] =
"memory-pressure-critical-fraction";
extern const char kCastMemoryPressureModerateFraction[] =

@ -81,6 +81,7 @@ extern const char kEnableTopDragGesture[];
// Background color used when Chromium hasn't rendered anything yet.
extern const char kCastAppBackgroundColor[];
extern const char kMixerServiceEndpoint[];
extern const char kCastMemoryPressureCriticalFraction[];
extern const char kCastMemoryPressureModerateFraction[];

@ -0,0 +1,54 @@
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import("//build/buildflag_header.gni")
import("//chromecast/chromecast.gni")
import("//third_party/protobuf/proto_library.gni")
proto_library("proto") {
proto_out_dir = "chromecast/media/audio/mixer_service"
sources = [
"mixer_service.proto",
]
}
use_unix_sockets = is_linux
buildflag_header("buildflags") {
header = "mixer_service_buildflags.h"
flags = [ "USE_UNIX_SOCKETS=$use_unix_sockets" ]
}
cast_source_set("common") {
sources = [
"constants.h",
"proto_helpers.cc",
"proto_helpers.h",
]
deps = [
":buildflags",
":proto",
"//base",
"//chromecast/net:small_message_socket",
"//net",
]
}
cast_source_set("connection") {
sources = [
"mixer_service_connection.cc",
"mixer_service_connection.h",
]
deps = [
":buildflags",
":common",
":proto",
"//base",
"//chromecast/base",
"//chromecast/net:small_message_socket",
"//net",
]
}

@ -0,0 +1,4 @@
include_rules = [
"+chromecast/net",
"+net",
]

@ -0,0 +1,31 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_CONSTANTS_H_
#define CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_CONSTANTS_H_
#include <stdint.h>
#include "chromecast/media/audio/mixer_service/mixer_service_buildflags.h"
namespace chromecast {
namespace media {
namespace mixer_service {
#if BUILDFLAG(USE_UNIX_SOCKETS)
constexpr char kDefaultUnixDomainSocketPath[] = "/tmp/mixer-service";
#else
constexpr int kDefaultTcpPort = 12854;
#endif
enum class MessageType : int16_t {
kMetadata,
kAudio,
};
} // namespace mixer_service
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_CONSTANTS_H_

@ -0,0 +1,67 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
syntax = "proto2";
option optimize_for = LITE_RUNTIME;
package chromecast.media.mixer_service;
message MixerStreamParams {
enum StreamType {
STREAM_TYPE_DEFAULT = 0;
STREAM_TYPE_SFX = 1;
}
optional StreamType stream_type = 1;
enum ContentType {
CONTENT_TYPE_MEDIA = 0;
CONTENT_TYPE_ALARM = 1;
CONTENT_TYPE_COMMUNICATION = 2;
CONTENT_TYPE_OTHER = 3;
}
optional ContentType content_type = 2;
enum SampleFormat {
// Interleaved formats:
SAMPLE_FORMAT_INT16_I = 0;
SAMPLE_FORMAT_INT32_I = 1;
SAMPLE_FORMAT_FLOAT_I = 2;
// Planar formats:
SAMPLE_FORMAT_INT16_P = 3;
SAMPLE_FORMAT_INT32_P = 4;
SAMPLE_FORMAT_FLOAT_P = 5;
}
optional SampleFormat sample_format = 3;
optional string device_id = 4;
optional int32 sample_rate = 5;
optional int32 num_channels = 6;
optional sint32 channel_selection = 7 [default = -1];
optional int32 fill_size_frames = 8;
optional int32 start_threshold_frames = 9;
optional int32 max_buffered_frames = 10;
optional bool use_fader = 11;
optional int32 fade_frames = 12;
}
message BufferPushResult {
optional fixed64 next_playback_timestamp = 1;
}
message EosPlayedOut {}
message Volume {
optional float volume = 1;
}
message Generic {
optional MixerStreamParams params = 1;
optional BufferPushResult push_result = 2;
optional EosPlayedOut eos_played_out = 3;
optional Volume set_volume = 4;
}

@ -0,0 +1,338 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/mixer_service/mixer_service_connection.h"
#include <limits>
#include <queue>
#include <string>
#include <utility>
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/command_line.h"
#include "base/logging.h"
#include "base/sequenced_task_runner.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "chromecast/base/chromecast_switches.h"
#include "chromecast/media/audio/mixer_service/constants.h"
#include "chromecast/media/audio/mixer_service/mixer_service.pb.h"
#include "chromecast/media/audio/mixer_service/mixer_service_buildflags.h"
#include "chromecast/media/audio/mixer_service/proto_helpers.h"
#include "chromecast/net/small_message_socket.h"
#include "net/base/address_list.h"
#include "net/base/io_buffer.h"
#include "net/base/ip_address.h"
#include "net/base/ip_endpoint.h"
#include "net/base/net_errors.h"
#include "net/log/net_log_source.h"
#include "net/socket/stream_socket.h"
#if BUILDFLAG(USE_UNIX_SOCKETS)
#include "net/socket/unix_domain_client_socket_posix.h"
#else
#include "net/socket/tcp_client_socket.h"
#endif
namespace chromecast {
namespace media {
namespace {
// Header is 2 bytes size, 2 bytes type.
const int kHeaderSize = 4;
constexpr base::TimeDelta kConnectTimeout = base::TimeDelta::FromSeconds(1);
int GetSampleSize(mixer_service::MixerStreamParams::SampleFormat format) {
switch (format) {
case mixer_service::MixerStreamParams::SAMPLE_FORMAT_INT16_I:
return sizeof(int16_t);
case mixer_service::MixerStreamParams::SAMPLE_FORMAT_INT32_I:
return sizeof(int32_t);
case mixer_service::MixerStreamParams::SAMPLE_FORMAT_FLOAT_I:
return sizeof(float);
case mixer_service::MixerStreamParams::SAMPLE_FORMAT_INT16_P:
return sizeof(int16_t);
case mixer_service::MixerStreamParams::SAMPLE_FORMAT_INT32_P:
return sizeof(int32_t);
case mixer_service::MixerStreamParams::SAMPLE_FORMAT_FLOAT_P:
return sizeof(float);
default:
NOTREACHED() << "Unknown sample format " << format;
return 0;
}
}
int GetFrameSize(const mixer_service::MixerStreamParams& params) {
return GetSampleSize(params.sample_format()) * params.num_channels();
}
int GetFillSizeFrames(const mixer_service::MixerStreamParams& params) {
if (params.has_fill_size_frames()) {
return params.fill_size_frames();
}
// Use 10 ms by default.
return params.sample_rate() / 100;
}
} // namespace
class MixerServiceConnection::Socket : public SmallMessageSocket {
public:
Socket(std::unique_ptr<net::StreamSocket> socket,
MixerServiceConnection::Delegate* delegate,
const mixer_service::MixerStreamParams& params);
~Socket() override;
void Start(float volume_multiplier);
void SendNextBuffer(int filled_frames);
void SetVolumeMultiplier(float multiplier);
private:
// SmallMessageSocket implementation:
void OnSendUnblocked() override;
void OnError(int error) override;
void OnEndOfStream() override;
bool OnMessage(char* data, int size) override;
void CreateNextBuffer();
void SendProto(const google::protobuf::MessageLite& message);
bool HandleMetadata(char* data, int size);
MixerServiceConnection::Delegate* const delegate_;
const mixer_service::MixerStreamParams params_;
const int frame_size_;
const int fill_size_frames_;
scoped_refptr<net::IOBufferWithSize> next_buffer_;
std::queue<scoped_refptr<net::IOBufferWithSize>> write_queue_;
DISALLOW_COPY_AND_ASSIGN(Socket);
};
MixerServiceConnection::Socket::Socket(
std::unique_ptr<net::StreamSocket> socket,
MixerServiceConnection::Delegate* delegate,
const mixer_service::MixerStreamParams& params)
: SmallMessageSocket(std::move(socket)),
delegate_(delegate),
params_(params),
frame_size_(GetFrameSize(params)),
fill_size_frames_(GetFillSizeFrames(params)) {
DCHECK(delegate_);
DCHECK_LE(fill_size_frames_ * frame_size_,
std::numeric_limits<uint16_t>::max() - kHeaderSize);
}
MixerServiceConnection::Socket::~Socket() = default;
void MixerServiceConnection::Socket::Start(float volume_multiplier) {
ReceiveMessages();
mixer_service::Generic message;
*(message.mutable_params()) = params_;
message.mutable_set_volume()->set_volume(volume_multiplier);
SendProto(message);
CreateNextBuffer();
delegate_->FillNextBuffer(next_buffer_->data() + kHeaderSize,
fill_size_frames_,
std::numeric_limits<int64_t>::min());
}
void MixerServiceConnection::Socket::CreateNextBuffer() {
DCHECK(!next_buffer_);
next_buffer_ = base::MakeRefCounted<net::IOBufferWithSize>(
kHeaderSize + fill_size_frames_ * frame_size_);
}
void MixerServiceConnection::Socket::SendNextBuffer(int filled_frames) {
int payload_size = sizeof(int16_t) + filled_frames * frame_size_;
uint16_t size = static_cast<uint16_t>(payload_size);
int16_t type = static_cast<int16_t>(mixer_service::MessageType::kAudio);
char* ptr = next_buffer_->data();
base::WriteBigEndian(ptr, size);
ptr += sizeof(size);
base::WriteBigEndian(ptr, type);
if (SmallMessageSocket::SendBuffer(next_buffer_.get(),
sizeof(uint16_t) + payload_size)) {
next_buffer_ = nullptr;
} else {
write_queue_.push(std::move(next_buffer_));
}
}
void MixerServiceConnection::Socket::SetVolumeMultiplier(float multiplier) {
mixer_service::Generic message;
message.mutable_set_volume()->set_volume(multiplier);
SendProto(message);
}
void MixerServiceConnection::Socket::SendProto(
const google::protobuf::MessageLite& message) {
auto storage = mixer_service::SendProto(message, this);
if (storage) {
write_queue_.push(std::move(storage));
}
}
void MixerServiceConnection::Socket::OnSendUnblocked() {
while (!write_queue_.empty()) {
if (!SmallMessageSocket::SendBuffer(write_queue_.front().get(),
write_queue_.front()->size())) {
return;
}
write_queue_.pop();
}
}
void MixerServiceConnection::Socket::OnError(int error) {
delegate_->OnConnectionError();
}
void MixerServiceConnection::Socket::OnEndOfStream() {
delegate_->OnConnectionError();
}
bool MixerServiceConnection::Socket::OnMessage(char* data, int size) {
int16_t type;
if (size < static_cast<int>(sizeof(type))) {
LOG(ERROR) << "Invalid message size " << size << " from " << this;
delegate_->OnConnectionError();
return false;
}
base::ReadBigEndian(data, &type);
data += sizeof(type);
size -= sizeof(type);
switch (static_cast<mixer_service::MessageType>(type)) {
case mixer_service::MessageType::kMetadata:
return HandleMetadata(data, size);
default:
// Ignore unhandled message types.
break;
}
return true;
}
bool MixerServiceConnection::Socket::HandleMetadata(char* data, int size) {
mixer_service::Generic message;
if (!mixer_service::ReceiveProto(data, size, &message)) {
LOG(ERROR) << "Invalid metadata from " << this;
delegate_->OnConnectionError();
return false;
}
if (message.has_push_result()) {
CreateNextBuffer();
delegate_->FillNextBuffer(next_buffer_->data() + kHeaderSize,
fill_size_frames_,
message.push_result().next_playback_timestamp());
} else if (message.has_eos_played_out()) {
delegate_->OnEosPlayed();
}
return true;
}
MixerServiceConnection::MixerServiceConnection(
Delegate* delegate,
const mixer_service::MixerStreamParams& params)
: delegate_(delegate),
params_(params),
task_runner_(base::SequencedTaskRunnerHandle::Get()),
weak_factory_(this) {
DCHECK(delegate_);
DCHECK_GT(params_.sample_rate(), 0);
DCHECK_GT(params_.num_channels(), 0);
}
MixerServiceConnection::~MixerServiceConnection() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
}
void MixerServiceConnection::Connect() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
DCHECK(!connecting_socket_);
DCHECK(!socket_);
base::WeakPtr<MixerServiceConnection> self = weak_factory_.GetWeakPtr();
#if BUILDFLAG(USE_UNIX_SOCKETS)
const base::CommandLine* command_line =
base::CommandLine::ForCurrentProcess();
std::string path =
command_line->GetSwitchValueASCII(switches::kMixerServiceEndpoint);
if (path.empty()) {
path = mixer_service::kDefaultUnixDomainSocketPath;
}
connecting_socket_ = std::make_unique<net::UnixDomainClientSocket>(
path, true /* use_abstract_namespace */);
#else // BUILDFLAG(USE_UNIX_SOCKETS)
int port = GetSwitchValueNonNegativeInt(switches::kMixerServiceEndpoint,
mixer_service::kDefaultTcpPort);
net::IPEndPoint endpoint(net::IPAddress::IPv4Localhost(), port);
connecting_socket_ = std::make_unique<net::TCPClientSocket>(
net::AddressList(endpoint), nullptr, nullptr, net::NetLogSource());
#endif // BUILDFLAG(USE_UNIX_SOCKETS)
auto connect_callback =
base::BindRepeating(&MixerServiceConnection::ConnectCallback, self);
int result = connecting_socket_->Connect(connect_callback);
if (result != net::ERR_IO_PENDING) {
task_runner_->PostTask(FROM_HERE, base::BindOnce(connect_callback, result));
return;
}
task_runner_->PostDelayedTask(
FROM_HERE, base::BindOnce(&MixerServiceConnection::ConnectTimeout, self),
kConnectTimeout);
}
void MixerServiceConnection::SendNextBuffer(int filled_frames) {
if (!socket_) {
LOG(ERROR) << "Tried to send buffer without a connection";
delegate_->OnConnectionError();
return;
}
socket_->SendNextBuffer(filled_frames);
}
void MixerServiceConnection::SetVolumeMultiplier(float multiplier) {
volume_multiplier_ = multiplier;
if (socket_) {
socket_->SetVolumeMultiplier(multiplier);
}
}
void MixerServiceConnection::ConnectCallback(int result) {
DCHECK_NE(result, net::ERR_IO_PENDING);
if (!connecting_socket_) {
return;
}
if (result == net::OK) {
socket_ = std::make_unique<Socket>(std::move(connecting_socket_), delegate_,
params_);
socket_->Start(volume_multiplier_);
} else {
connecting_socket_.reset();
delegate_->OnConnectionError();
}
}
void MixerServiceConnection::ConnectTimeout() {
if (!connecting_socket_) {
return;
}
connecting_socket_.reset();
delegate_->OnConnectionError();
}
} // namespace media
} // namespace chromecast

@ -0,0 +1,71 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_MIXER_SERVICE_CONNECTION_H_
#define CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_MIXER_SERVICE_CONNECTION_H_
#include <cstdint>
#include <memory>
#include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "chromecast/media/audio/mixer_service/mixer_service.pb.h"
namespace base {
class SequencedTaskRunner;
} // namespace base
namespace net {
class StreamSocket;
} // namespace net
namespace chromecast {
namespace media {
class MixerServiceConnection {
public:
class Delegate {
public:
virtual void FillNextBuffer(void* buffer,
int frames,
int64_t playout_timestamp) = 0;
virtual void OnConnectionError() = 0;
virtual void OnEosPlayed() = 0;
protected:
virtual ~Delegate() = default;
};
MixerServiceConnection(Delegate* delegate,
const mixer_service::MixerStreamParams& params);
~MixerServiceConnection();
void Connect();
void SendNextBuffer(int filled_frames);
void SetVolumeMultiplier(float multiplier);
private:
class Socket;
void ConnectCallback(int result);
void ConnectTimeout();
Delegate* const delegate_;
const mixer_service::MixerStreamParams params_;
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<net::StreamSocket> connecting_socket_;
std::unique_ptr<Socket> socket_;
float volume_multiplier_ = 1.0f;
base::WeakPtrFactory<MixerServiceConnection> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(MixerServiceConnection);
};
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_MIXER_SERVICE_CONNECTION_H_

@ -0,0 +1,91 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/mixer_service/proto_helpers.h"
#include <google/protobuf/message_lite.h>
#include <cstdint>
#include <cstring>
#include "base/big_endian.h"
#include "base/logging.h"
#include "chromecast/media/audio/mixer_service/constants.h"
#include "chromecast/media/audio/mixer_service/mixer_service.pb.h"
#include "chromecast/net/small_message_socket.h"
#include "net/base/io_buffer.h"
namespace chromecast {
namespace media {
namespace mixer_service {
scoped_refptr<net::IOBufferWithSize> SendProto(
const google::protobuf::MessageLite& message,
SmallMessageSocket* socket) {
int16_t type = static_cast<int16_t>(MessageType::kMetadata);
int message_size = message.ByteSize();
int32_t padding_bytes = (4 - (message_size % 4)) % 4;
int total_size =
sizeof(type) + sizeof(padding_bytes) + message_size + padding_bytes;
scoped_refptr<net::IOBufferWithSize> storage;
void* buffer = socket->PrepareSend(total_size);
char* ptr;
if (buffer) {
ptr = reinterpret_cast<char*>(buffer);
} else {
storage = base::MakeRefCounted<net::IOBufferWithSize>(sizeof(uint16_t) +
total_size);
ptr = storage->data();
base::WriteBigEndian(ptr, static_cast<uint16_t>(total_size));
ptr += sizeof(uint16_t);
}
base::WriteBigEndian(ptr, type);
ptr += sizeof(type);
base::WriteBigEndian(ptr, padding_bytes);
ptr += sizeof(padding_bytes);
message.SerializeToArray(ptr, message_size);
ptr += message_size;
memset(ptr, 0, padding_bytes);
if (buffer) {
socket->Send();
}
return storage;
}
bool ReceiveProto(const char* data, int size, Generic* message) {
int32_t padding_bytes;
if (size < static_cast<int>(sizeof(padding_bytes))) {
LOG(ERROR) << "Invalid metadata message size " << size;
return false;
}
base::ReadBigEndian(data, &padding_bytes);
data += sizeof(padding_bytes);
size -= sizeof(padding_bytes);
if (padding_bytes < 0 || padding_bytes > 3) {
LOG(ERROR) << "Invalid padding bytes count: " << padding_bytes;
return false;
}
if (size < padding_bytes) {
LOG(ERROR) << "Size " << size << " is smaller than padding "
<< padding_bytes;
return false;
}
if (!message->ParseFromArray(data, size - padding_bytes)) {
LOG(ERROR) << "Failed to parse incoming metadata";
return false;
}
return true;
}
} // namespace mixer_service
} // namespace media
} // namespace chromecast

@ -0,0 +1,36 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_PROTO_HELPERS_H_
#define CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_PROTO_HELPERS_H_
#include "base/memory/scoped_refptr.h"
namespace google {
namespace protobuf {
class MessageLite;
} // namespace protobuf
} // namespace google
namespace net {
class IOBufferWithSize;
} // namespace net
namespace chromecast {
class SmallMessageSocket;
namespace media {
namespace mixer_service {
class Generic;
scoped_refptr<net::IOBufferWithSize> SendProto(
const google::protobuf::MessageLite& message,
SmallMessageSocket* socket);
bool ReceiveProto(const char* data, int size, Generic* message);
} // namespace mixer_service
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_MIXER_SERVICE_PROTO_HELPERS_H_

@ -37,6 +37,20 @@ cast_source_set("net") {
]
}
cast_source_set("small_message_socket") {
sources = [
"small_message_socket.cc",
"small_message_socket.h",
]
public_deps = [
"//net",
]
deps = [
"//base",
]
}
cast_source_set("test_support") {
testonly = true

@ -0,0 +1,239 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/net/small_message_socket.h"
#include <stdint.h>
#include <string.h>
#include <limits>
#include <utility>
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/sequenced_task_runner.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/socket/socket.h"
namespace chromecast {
namespace {
// Maximum number of times to read/write in a loop before reposting on the
// run loop (to allow other tasks to run).
const int kMaxIOLoop = 5;
const int kDefaultBufferSize = 2048;
} // namespace
SmallMessageSocket::SmallMessageSocket(std::unique_ptr<net::Socket> socket)
: socket_(std::move(socket)),
task_runner_(base::SequencedTaskRunnerHandle::Get()),
weak_factory_(this) {}
SmallMessageSocket::~SmallMessageSocket() = default;
void* SmallMessageSocket::PrepareSend(int message_size) {
DCHECK_LE(message_size, std::numeric_limits<uint16_t>::max());
if (write_buffer_) {
send_blocked_ = true;
return nullptr;
}
if (!write_storage_) {
write_storage_ = base::MakeRefCounted<net::GrowableIOBuffer>();
}
write_storage_->set_offset(0);
const int total_size = sizeof(uint16_t) + message_size;
if (write_storage_->capacity() < total_size) {
write_storage_->SetCapacity(total_size);
}
write_buffer_ = base::MakeRefCounted<net::DrainableIOBuffer>(
write_storage_.get(), total_size);
char* data = write_buffer_->data();
base::WriteBigEndian(data, static_cast<uint16_t>(message_size));
return data + sizeof(uint16_t);
}
bool SmallMessageSocket::SendBuffer(net::IOBuffer* data, int size) {
if (write_buffer_) {
send_blocked_ = true;
return false;
}
write_buffer_ = base::MakeRefCounted<net::DrainableIOBuffer>(data, size);
Send();
return true;
}
void SmallMessageSocket::Send() {
for (int i = 0; i < kMaxIOLoop; ++i) {
DCHECK(write_buffer_);
// TODO(kmackay): Use base::BindOnce() once it is supported.
int result =
socket_->Write(write_buffer_.get(), write_buffer_->BytesRemaining(),
base::BindRepeating(&SmallMessageSocket::OnWriteComplete,
base::Unretained(this)),
NO_TRAFFIC_ANNOTATION_YET);
if (!HandleWriteResult(result)) {
return;
}
}
DCHECK(write_buffer_);
task_runner_->PostTask(FROM_HERE, base::BindOnce(&SmallMessageSocket::Send,
weak_factory_.GetWeakPtr()));
}
void SmallMessageSocket::OnWriteComplete(int result) {
if (HandleWriteResult(result)) {
Send();
}
}
bool SmallMessageSocket::HandleWriteResult(int result) {
if (result == net::ERR_IO_PENDING) {
return false;
}
if (result <= 0) {
PostError(result);
return false;
}
write_buffer_->DidConsume(result);
if (write_buffer_->BytesRemaining() != 0) {
return true;
}
write_buffer_ = nullptr;
if (send_blocked_) {
send_blocked_ = false;
OnSendUnblocked();
}
return false;
}
void SmallMessageSocket::PostError(int error) {
// Post a task rather than just calling OnError(), to avoid calling OnError()
// synchronously.
task_runner_->PostTask(FROM_HERE,
base::BindOnce(&SmallMessageSocket::OnError,
weak_factory_.GetWeakPtr(), error));
}
void SmallMessageSocket::ReceiveMessages() {
if (!read_buffer_) {
read_buffer_ = base::MakeRefCounted<net::GrowableIOBuffer>();
read_buffer_->SetCapacity(kDefaultBufferSize);
}
// Post a task rather than just calling Read(), to avoid calling delegate
// methods from within this method.
task_runner_->PostTask(FROM_HERE,
base::BindOnce(&SmallMessageSocket::StartReading,
weak_factory_.GetWeakPtr()));
}
void SmallMessageSocket::StartReading() {
if (HandleCompletedMessages()) {
Read();
}
}
void SmallMessageSocket::Read() {
// Read in a loop for a few times while data is immediately available.
// This improves average packet receive delay as compared to always posting a
// new task for each call to Read().
for (int i = 0; i < kMaxIOLoop; ++i) {
// TODO(kmackay): Use base::BindOnce() once it is supported.
int read_result =
socket_->Read(read_buffer_.get(), read_buffer_->RemainingCapacity(),
base::BindRepeating(&SmallMessageSocket::OnReadComplete,
base::Unretained(this)));
if (!HandleReadResult(read_result)) {
return;
}
}
task_runner_->PostTask(FROM_HERE, base::BindOnce(&SmallMessageSocket::Read,
weak_factory_.GetWeakPtr()));
}
void SmallMessageSocket::OnReadComplete(int result) {
if (HandleReadResult(result)) {
Read();
}
}
bool SmallMessageSocket::HandleReadResult(int result) {
if (result == net::ERR_IO_PENDING) {
return false;
}
if (result == 0 || result == net::ERR_CONNECTION_CLOSED) {
OnEndOfStream();
return false;
}
if (result < 0) {
OnError(result);
return false;
}
read_buffer_->set_offset(read_buffer_->offset() + result);
return HandleCompletedMessages();
}
bool SmallMessageSocket::HandleCompletedMessages() {
size_t total_size = read_buffer_->offset();
char* start_ptr = read_buffer_->StartOfBuffer();
bool keep_reading = true;
while (total_size >= sizeof(uint16_t)) {
uint16_t message_size;
base::ReadBigEndian(start_ptr, &message_size);
if (static_cast<size_t>(read_buffer_->capacity()) <
sizeof(uint16_t) + message_size) {
int position = start_ptr - read_buffer_->StartOfBuffer();
read_buffer_->SetCapacity(sizeof(uint16_t) + message_size);
start_ptr = read_buffer_->StartOfBuffer() + position;
}
if (total_size < sizeof(uint16_t) + message_size) {
break; // Haven't received the full message yet.
}
// Take a weak pointer in case OnMessage() causes this to be deleted.
auto self = weak_factory_.GetWeakPtr();
keep_reading = OnMessage(start_ptr + sizeof(uint16_t), message_size);
if (!self) {
return false;
}
total_size -= sizeof(uint16_t) + message_size;
start_ptr += sizeof(uint16_t) + message_size;
if (!keep_reading) {
break;
}
}
if (start_ptr != read_buffer_->StartOfBuffer()) {
memmove(read_buffer_->StartOfBuffer(), start_ptr, total_size);
read_buffer_->set_offset(total_size);
}
return keep_reading;
}
} // namespace chromecast

@ -0,0 +1,104 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_NET_SMALL_MESSAGE_SOCKET_H_
#define CHROMECAST_NET_SMALL_MESSAGE_SOCKET_H_
#include <memory>
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
namespace base {
class SequencedTaskRunner;
} // namespace base
namespace net {
class DrainableIOBuffer;
class GrowableIOBuffer;
class IOBuffer;
class Socket;
} // namespace net
namespace chromecast {
// Sends and receives small messages (< 64 KB) over a Socket. All methods must
// be called on the same sequence. Any of the virtual methods can destroy this
// object if desired.
class SmallMessageSocket {
public:
explicit SmallMessageSocket(std::unique_ptr<net::Socket> socket);
virtual ~SmallMessageSocket();
const net::Socket* socket() const { return socket_.get(); }
// Prepares a buffer to send a message of the given |message_size|. Returns
// nullptr if sending is not allowed right now (ie, another send is currently
// in progress). Otherwise, returns a buffer at least large enough to contain
// |message_size| bytes. The caller should fill in the buffer as desired and
// then call Send() to send the finished message.
// If nullptr is returned, then OnSendUnblocked() will be called once sending
// is possible again.
void* PrepareSend(int message_size);
void Send();
// Sends an already-prepared buffer of data, if possible. The first 2 bytes of
// the buffer must contain the size of the rest of the data, encoded as a
// 16-bit integer in big-endian byte order. Returns true if the buffer will be
// sent; returns false if sending is not allowed right now (ie, another send
// is currently in progress). If false is returned, then OnSendUnblocked()
// will be called once sending is possible again.
bool SendBuffer(net::IOBuffer* data, int size);
// Enables receiving messages from the stream. Messages will be received and
// passed to OnMessage() until either an error occurs, the end of stream is
// reached, or OnMessage() returns false. If OnMessage() returns false, you
// may call ReceiveMessages() to start receiving again.
void ReceiveMessages();
protected:
// Called when sending becomes possible again, if a previous attempt to send
// was rejected.
virtual void OnSendUnblocked() {}
// Called when an unrecoverable error occurs while sending or receiving. Is
// only called asynchronously.
virtual void OnError(int error) {}
// Called when the end of stream has been read. No more data will be received.
virtual void OnEndOfStream() {}
// Called when a message has been received. The |data| buffer contains |size|
// bytes of data.
virtual bool OnMessage(char* data, int size) = 0;
private:
void OnWriteComplete(int result);
bool HandleWriteResult(int result);
void PostError(int error);
void StartReading();
void Read();
void OnReadComplete(int result);
bool HandleReadResult(int result);
bool HandleCompletedMessages();
std::unique_ptr<net::Socket> socket_;
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
scoped_refptr<net::GrowableIOBuffer> write_storage_;
scoped_refptr<net::DrainableIOBuffer> write_buffer_;
bool send_blocked_ = false;
scoped_refptr<net::GrowableIOBuffer> read_buffer_;
base::WeakPtrFactory<SmallMessageSocket> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(SmallMessageSocket);
};
} // namespace chromecast
#endif // CHROMECAST_NET_SMALL_MESSAGE_SOCKET_H_