0

[GCM] Add RMQ storage and MCS message passing support

The RMQ store will contain both the pending outgoing messages as well as the
unacknolwedged incoming message ids. Additionally, the device credentials
are stored there, with the security token encrypted using the local system
encryptor.

In order to support lightweight message passing across threads a MCSMessage
class is introduced.

BUG=284553

Review URL: https://codereview.chromium.org/56353002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@234413 0039d316-1c4b-4281-b951-d872f2087c98
This commit is contained in:
zea@chromium.org
2013-11-12 05:23:50 +00:00
parent 5103016770
commit f07db232d2
7 changed files with 1069 additions and 0 deletions

@ -7,6 +7,8 @@ include_rules = [
"+base",
"+testing",
"+components/webdata/encryptor",
"+google", # For third_party/protobuf/src.
"+net",
"+third_party/leveldatabase",
]

@ -0,0 +1,78 @@
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "google_apis/gcm/base/mcs_message.h"
#include "base/logging.h"
#include "google_apis/gcm/base/mcs_util.h"
namespace gcm {
MCSMessage::Core::Core() {}
MCSMessage::Core::Core(uint8 tag,
const google::protobuf::MessageLite& protobuf) {
scoped_ptr<google::protobuf::MessageLite> owned_protobuf(protobuf.New());
owned_protobuf->CheckTypeAndMergeFrom(protobuf);
protobuf_ = owned_protobuf.Pass();
}
MCSMessage::Core::Core(
uint8 tag,
scoped_ptr<const google::protobuf::MessageLite> protobuf) {
protobuf_ = protobuf.Pass();
}
MCSMessage::Core::~Core() {}
const google::protobuf::MessageLite& MCSMessage::Core::Get() const {
return *protobuf_;
}
MCSMessage::MCSMessage() : tag_(0), size_(0) {}
MCSMessage::MCSMessage(const google::protobuf::MessageLite& protobuf)
: tag_(GetMCSProtoTag(protobuf)),
size_(protobuf.ByteSize()),
core_(new Core(tag_, protobuf)) {
}
MCSMessage::MCSMessage(uint8 tag,
const google::protobuf::MessageLite& protobuf)
: tag_(tag),
size_(protobuf.ByteSize()),
core_(new Core(tag_, protobuf)) {
DCHECK_EQ(tag, GetMCSProtoTag(protobuf));
}
MCSMessage::MCSMessage(uint8 tag,
scoped_ptr<const google::protobuf::MessageLite> protobuf)
: tag_(tag),
size_(protobuf->ByteSize()),
core_(new Core(tag_, protobuf.Pass())) {
DCHECK_EQ(tag, GetMCSProtoTag(core_->Get()));
}
MCSMessage::~MCSMessage() {
}
bool MCSMessage::IsValid() const {
return core_.get() != NULL;
}
std::string MCSMessage::SerializeAsString() const {
return core_->Get().SerializeAsString();
}
const google::protobuf::MessageLite& MCSMessage::GetProtobuf() const {
return core_->Get();
}
scoped_ptr<google::protobuf::MessageLite> MCSMessage::CloneProtobuf() const {
scoped_ptr<google::protobuf::MessageLite> clone(GetProtobuf().New());
clone->CheckTypeAndMergeFrom(GetProtobuf());
return clone.Pass();
}
} // namespace gcm

@ -0,0 +1,85 @@
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef GOOGLE_APIS_GCM_BASE_MCS_MESSAGE_H_
#define GOOGLE_APIS_GCM_BASE_MCS_MESSAGE_H_
#include <string>
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "google_apis/gcm/base/gcm_export.h"
namespace google {
namespace protobuf {
class MessageLite;
} // namespace protobuf
} // namespace google
namespace gcm {
// A wrapper for MCS protobuffers that encapsulates their tag, size and data
// in an immutable and thread-safe format. If a mutable version is desired,
// CloneProtobuf() should use used to create a new copy of the protobuf.
//
// Note: default copy and assign welcome.
class GCM_EXPORT MCSMessage {
public:
// Creates an invalid MCSMessage.
MCSMessage();
// Infers tag from |message|.
explicit MCSMessage(const google::protobuf::MessageLite& protobuf);
// |tag| must match |protobuf|'s message type.
MCSMessage(uint8 tag, const google::protobuf::MessageLite& protobuf);
// |tag| must match |protobuf|'s message type. Takes ownership of |protobuf|.
MCSMessage(uint8 tag,
scoped_ptr<const google::protobuf::MessageLite> protobuf);
~MCSMessage();
// Returns whether this message is valid or not (whether a protobuf was
// provided at construction time or not).
bool IsValid() const;
// Getters for serialization.
uint8 tag() const { return tag_; }
int size() const {return size_; }
std::string SerializeAsString() const;
// Getter for accessing immutable probotuf fields.
const google::protobuf::MessageLite& GetProtobuf() const;
// Getter for creating a mutated version of the protobuf.
scoped_ptr<google::protobuf::MessageLite> CloneProtobuf() const;
private:
class Core : public base::RefCountedThreadSafe<MCSMessage::Core> {
public:
Core();
Core(uint8 tag, const google::protobuf::MessageLite& protobuf);
Core(uint8 tag, scoped_ptr<const google::protobuf::MessageLite> protobuf);
const google::protobuf::MessageLite& Get() const;
private:
friend class base::RefCountedThreadSafe<MCSMessage::Core>;
~Core();
// The immutable protobuf.
scoped_ptr<const google::protobuf::MessageLite> protobuf_;
DISALLOW_COPY_AND_ASSIGN(Core);
};
// These are cached separately to avoid having to recompute them.
const uint8 tag_;
const int size_;
// The refcounted core, containing the protobuf memory.
scoped_refptr<const Core> core_;
};
} // namespace gcm
#endif // GOOGLE_APIS_GCM_BASE_MCS_MESSAGE_H_

@ -0,0 +1,491 @@
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "google_apis/gcm/engine/rmq_store.h"
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/files/file_path.h"
#include "base/logging.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/sequenced_task_runner.h"
#include "base/stl_util.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_piece.h"
#include "base/tracked_objects.h"
#include "components/webdata/encryptor/encryptor.h"
#include "google_apis/gcm/base/mcs_message.h"
#include "google_apis/gcm/base/mcs_util.h"
#include "google_apis/gcm/protocol/mcs.pb.h"
#include "third_party/leveldatabase/src/include/leveldb/db.h"
namespace gcm {
namespace {
// ---- LevelDB keys. ----
// Key for this device's android id.
const char kDeviceAIDKey[] = "device_aid_key";
// Key for this device's android security token.
const char kDeviceTokenKey[] = "device_token_key";
// Lowest lexicographically ordered incoming message key.
// Used for prefixing messages.
const char kIncomingMsgKeyStart[] = "incoming1-";
// Key guaranteed to be higher than all incoming message keys.
// Used for limiting iteration.
const char kIncomingMsgKeyEnd[] = "incoming2-";
// Lowest lexicographically ordered outgoing message key.
// Used for prefixing outgoing messages.
const char kOutgoingMsgKeyStart[] = "outgoing1-";
// Key guaranteed to be higher than all outgoing message keys.
// Used for limiting iteration.
const char kOutgoingMsgKeyEnd[] = "outgoing2-";
std::string MakeIncomingKey(const std::string& persistent_id) {
return kIncomingMsgKeyStart + persistent_id;
}
std::string MakeOutgoingKey(const std::string& persistent_id) {
return kOutgoingMsgKeyStart + persistent_id;
}
std::string ParseOutgoingKey(const std::string& key) {
return key.substr(arraysize(kOutgoingMsgKeyStart) - 1);
}
leveldb::Slice MakeSlice(const base::StringPiece& s) {
return leveldb::Slice(s.begin(), s.size());
}
} // namespace
class RMQStore::Backend : public base::RefCountedThreadSafe<RMQStore::Backend> {
public:
Backend(const base::FilePath& path,
scoped_refptr<base::SequencedTaskRunner> foreground_runner);
// Blocking implementations of RMQStore methods.
void Load(const LoadCallback& callback);
void Destroy(const UpdateCallback& callback);
void SetDeviceCredentials(uint64 device_android_id,
uint64 device_security_token,
const UpdateCallback& callback);
void AddIncomingMessage(const std::string& persistent_id,
const UpdateCallback& callback);
void RemoveIncomingMessages(const PersistentIdList& persistent_ids,
const UpdateCallback& callback);
void AddOutgoingMessage(const std::string& persistent_id,
const MCSMessage& message,
const UpdateCallback& callback);
void RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
const UpdateCallback& callback);
private:
friend class base::RefCountedThreadSafe<Backend>;
~Backend();
bool LoadDeviceCredentials(uint64* android_id, uint64* security_token);
bool LoadIncomingMessages(std::vector<std::string>* incoming_messages);
bool LoadOutgoingMessages(
std::map<std::string, google::protobuf::MessageLite*>* outgoing_messages);
const base::FilePath path_;
scoped_refptr<base::SequencedTaskRunner> foreground_task_runner_;
scoped_ptr<leveldb::DB> db_;
};
RMQStore::Backend::Backend(
const base::FilePath& path,
scoped_refptr<base::SequencedTaskRunner> foreground_task_runner)
: path_(path),
foreground_task_runner_(foreground_task_runner) {
}
RMQStore::Backend::~Backend() {
}
void RMQStore::Backend::Load(const LoadCallback& callback) {
LoadResult result;
leveldb::Options options;
options.create_if_missing = true;
leveldb::DB* db;
leveldb::Status status = leveldb::DB::Open(options,
path_.AsUTF8Unsafe(),
&db);
if (!status.ok()) {
LOG(ERROR) << "Failed to open database " << path_.value()
<< ": " << status.ToString();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, result));
return;
}
db_.reset(db);
if (!LoadDeviceCredentials(&result.device_android_id,
&result.device_security_token) ||
!LoadIncomingMessages(&result.incoming_messages) ||
!LoadOutgoingMessages(&result.outgoing_messages)) {
result.device_android_id = 0;
result.device_security_token = 0;
result.incoming_messages.clear();
STLDeleteContainerPairSecondPointers(result.outgoing_messages.begin(),
result.outgoing_messages.end());
result.outgoing_messages.clear();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, result));
return;
}
DVLOG(1) << "Succeeded in loading " << result.incoming_messages.size()
<< " unacknowledged incoming messages and "
<< result.outgoing_messages.size()
<< " unacknowledged outgoing messages.";
result.success = true;
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, result));
return;
}
void RMQStore::Backend::Destroy(const UpdateCallback& callback) {
DVLOG(1) << "Destroying RMQ store.";
const leveldb::Status s =
leveldb::DestroyDB(path_.AsUTF8Unsafe(),
leveldb::Options());
if (s.ok()) {
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, true));
return;
}
LOG(ERROR) << "Destroy failed.";
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, false));
}
void RMQStore::Backend::SetDeviceCredentials(uint64 device_android_id,
uint64 device_security_token,
const UpdateCallback& callback) {
DVLOG(1) << "Saving device credentials with AID " << device_android_id;
leveldb::WriteOptions write_options;
write_options.sync = true;
std::string encrypted_token;
Encryptor::EncryptString(base::Uint64ToString(device_security_token),
&encrypted_token);
leveldb::Status s =
db_->Put(write_options,
MakeSlice(kDeviceAIDKey),
MakeSlice(base::Uint64ToString(device_android_id)));
if (s.ok()) {
s = db_->Put(write_options,
MakeSlice(kDeviceTokenKey),
MakeSlice(encrypted_token));
}
if (s.ok()) {
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, true));
return;
}
LOG(ERROR) << "LevelDB put failed: " << s.ToString();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, false));
}
void RMQStore::Backend::AddIncomingMessage(const std::string& persistent_id,
const UpdateCallback& callback) {
DVLOG(1) << "Saving incoming message with id " << persistent_id;
leveldb::WriteOptions write_options;
write_options.sync = true;
const leveldb::Status s =
db_->Put(write_options,
MakeSlice(MakeIncomingKey(persistent_id)),
MakeSlice(persistent_id));
if (s.ok()) {
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, true));
return;
}
LOG(ERROR) << "LevelDB put failed: " << s.ToString();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, false));
}
void RMQStore::Backend::RemoveIncomingMessages(
const PersistentIdList& persistent_ids,
const UpdateCallback& callback) {
leveldb::WriteOptions write_options;
write_options.sync = true;
leveldb::Status s;
for (PersistentIdList::const_iterator iter = persistent_ids.begin();
iter != persistent_ids.end(); ++iter){
DVLOG(1) << "Removing incoming message with id " << *iter;
s = db_->Delete(write_options,
MakeSlice(MakeIncomingKey(*iter)));
if (!s.ok())
break;
}
if (s.ok()) {
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, true));
return;
}
LOG(ERROR) << "LevelDB remove failed: " << s.ToString();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, false));
}
void RMQStore::Backend::AddOutgoingMessage(
const std::string& persistent_id,
const MCSMessage& message,
const UpdateCallback& callback) {
DVLOG(1) << "Saving outgoing message with id " << persistent_id;
leveldb::WriteOptions write_options;
write_options.sync = true;
std::string data = static_cast<char>(message.tag()) +
message.SerializeAsString();
const leveldb::Status s =
db_->Put(write_options,
MakeSlice(MakeOutgoingKey(persistent_id)),
MakeSlice(data));
if (s.ok()) {
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, true));
return;
}
LOG(ERROR) << "LevelDB put failed: " << s.ToString();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, false));
}
void RMQStore::Backend::RemoveOutgoingMessages(
const PersistentIdList& persistent_ids,
const UpdateCallback& callback) {
leveldb::WriteOptions write_options;
write_options.sync = true;
leveldb::Status s;
for (PersistentIdList::const_iterator iter = persistent_ids.begin();
iter != persistent_ids.end(); ++iter){
DVLOG(1) << "Removing outgoing message with id " << *iter;
s = db_->Delete(write_options,
MakeSlice(MakeOutgoingKey(*iter)));
if (!s.ok())
break;
}
if (s.ok()) {
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, true));
return;
}
LOG(ERROR) << "LevelDB remove failed: " << s.ToString();
foreground_task_runner_->PostTask(FROM_HERE,
base::Bind(callback, false));
}
bool RMQStore::Backend::LoadDeviceCredentials(uint64* android_id,
uint64* security_token) {
leveldb::ReadOptions read_options;
read_options.verify_checksums = true;
std::string result;
leveldb::Status s = db_->Get(read_options,
MakeSlice(kDeviceAIDKey),
&result);
if (s.ok()) {
if (!base::StringToUint64(result, android_id)) {
LOG(ERROR) << "Failed to restore device id.";
return false;
}
result.clear();
s = db_->Get(read_options,
MakeSlice(kDeviceTokenKey),
&result);
}
if (s.ok()) {
std::string decrypted_token;
Encryptor::DecryptString(result, &decrypted_token);
if (!base::StringToUint64(decrypted_token, security_token)) {
LOG(ERROR) << "Failed to restore security token.";
return false;
}
return true;
}
if (s.IsNotFound()) {
DVLOG(1) << "No credentials found.";
return true;
}
LOG(ERROR) << "Error reading credentials from store.";
return false;
}
bool RMQStore::Backend::LoadIncomingMessages(
std::vector<std::string>* incoming_messages) {
leveldb::ReadOptions read_options;
read_options.verify_checksums = true;
scoped_ptr<leveldb::Iterator> iter(db_->NewIterator(read_options));
for (iter->Seek(MakeSlice(kIncomingMsgKeyStart));
iter->Valid() && iter->key().ToString() < kIncomingMsgKeyEnd;
iter->Next()) {
leveldb::Slice s = iter->value();
if (s.empty()) {
LOG(ERROR) << "Error reading incoming message with key "
<< iter->key().ToString();
return false;
}
DVLOG(1) << "Found incoming message with id " << s.ToString();
incoming_messages->push_back(s.ToString());
}
return true;
}
bool RMQStore::Backend::LoadOutgoingMessages(
std::map<std::string, google::protobuf::MessageLite*>*
outgoing_messages) {
leveldb::ReadOptions read_options;
read_options.verify_checksums = true;
scoped_ptr<leveldb::Iterator> iter(db_->NewIterator(read_options));
for (iter->Seek(MakeSlice(kOutgoingMsgKeyStart));
iter->Valid() && iter->key().ToString() < kOutgoingMsgKeyEnd;
iter->Next()) {
leveldb::Slice s = iter->value();
if (s.size() <= 1) {
LOG(ERROR) << "Error reading incoming message with key " << s.ToString();
return false;
}
uint8 tag = iter->value().data()[0];
std::string id = ParseOutgoingKey(iter->key().ToString());
scoped_ptr<google::protobuf::MessageLite> message(
BuildProtobufFromTag(tag));
if (!message.get() ||
!message->ParseFromString(iter->value().ToString().substr(1))) {
LOG(ERROR) << "Failed to parse outgoing message with id "
<< id << " and tag " << tag;
return false;
}
DVLOG(1) << "Found outgoing message with id " << id << " of type "
<< base::IntToString(tag);
(*outgoing_messages)[id] = message.release();
}
return true;
}
RMQStore::LoadResult::LoadResult()
: success(false),
device_android_id(0),
device_security_token(0) {
}
RMQStore::LoadResult::~LoadResult() {}
RMQStore::RMQStore(
const base::FilePath& path,
scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
: backend_(new Backend(path, base::MessageLoopProxy::current())),
blocking_task_runner_(blocking_task_runner) {
}
RMQStore::~RMQStore() {
}
void RMQStore::Load(const LoadCallback& callback) {
blocking_task_runner_->PostTask(FROM_HERE,
base::Bind(&RMQStore::Backend::Load,
backend_,
callback));
}
void RMQStore::Destroy(const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::Destroy,
backend_,
callback));
}
void RMQStore::SetDeviceCredentials(uint64 device_android_id,
uint64 device_security_token,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::SetDeviceCredentials,
backend_,
device_android_id,
device_security_token,
callback));
}
void RMQStore::AddIncomingMessage(const std::string& persistent_id,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::AddIncomingMessage,
backend_,
persistent_id,
callback));
}
void RMQStore::RemoveIncomingMessage(const std::string& persistent_id,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::RemoveIncomingMessages,
backend_,
PersistentIdList(1, persistent_id),
callback));
}
void RMQStore::RemoveIncomingMessages(const PersistentIdList& persistent_ids,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::RemoveIncomingMessages,
backend_,
persistent_ids,
callback));
}
void RMQStore::AddOutgoingMessage(const std::string& persistent_id,
const MCSMessage& message,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::AddOutgoingMessage,
backend_,
persistent_id,
message,
callback));
}
void RMQStore::RemoveOutgoingMessage(const std::string& persistent_id,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::RemoveOutgoingMessages,
backend_,
PersistentIdList(1, persistent_id),
callback));
}
void RMQStore::RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RMQStore::Backend::RemoveOutgoingMessages,
backend_,
persistent_ids,
callback));
}
} // namespace gcm

@ -0,0 +1,102 @@
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef GOOGLE_APIS_GCM_ENGINE_RMQ_STORE_H_
#define GOOGLE_APIS_GCM_ENGINE_RMQ_STORE_H_
#include <map>
#include <string>
#include <vector>
#include "base/basictypes.h"
#include "base/callback_forward.h"
#include "base/memory/ref_counted.h"
#include "google_apis/gcm/base/gcm_export.h"
namespace base {
class FilePath;
class SequencedTaskRunner;
} // namespace base
namespace google {
namespace protobuf {
class MessageLite;
} // namespace protobuf
} // namespace google
namespace gcm {
class MCSMessage;
// A Reliable Message Queue store.
// Will perform all blocking operations on the blocking task runner, and will
// post all callbacks to the thread on which the RMQStore is created.
class GCM_EXPORT RMQStore {
public:
// Container for Load(..) results.
struct GCM_EXPORT LoadResult {
LoadResult();
~LoadResult();
bool success;
uint64 device_android_id;
uint64 device_security_token;
std::vector<std::string> incoming_messages;
std::map<std::string, google::protobuf::MessageLite*>
outgoing_messages;
};
typedef std::vector<std::string> PersistentIdList;
// Note: callee receives ownership of |outgoing_messages|' values.
typedef base::Callback<void(const LoadResult& result)> LoadCallback;
typedef base::Callback<void(bool success)> UpdateCallback;
RMQStore(const base::FilePath& path,
scoped_refptr<base::SequencedTaskRunner> blocking_task_runner);
~RMQStore();
// Load the directory and pass the initial state back to caller.
void Load(const LoadCallback& callback);
// Clears the RMQ store of all data and destroys any LevelDB files associated
// with this store.
// WARNING: this will permanently destroy any pending outgoing messages
// and require the device to re-create credentials.
void Destroy(const UpdateCallback& callback);
// Sets this device's messaging credentials.
void SetDeviceCredentials(uint64 device_android_id,
uint64 device_security_token,
const UpdateCallback& callback);
// Unacknowledged incoming message handling.
void AddIncomingMessage(const std::string& persistent_id,
const UpdateCallback& callback);
void RemoveIncomingMessage(const std::string& persistent_id,
const UpdateCallback& callback);
void RemoveIncomingMessages(const PersistentIdList& persistent_ids,
const UpdateCallback& callback);
// Unacknowledged outgoing messages handling.
// TODO(zea): implement per-app limits on the number of outgoing messages.
void AddOutgoingMessage(const std::string& persistent_id,
const MCSMessage& message,
const UpdateCallback& callback);
void RemoveOutgoingMessage(const std::string& persistent_id,
const UpdateCallback& callback);
void RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
const UpdateCallback& callback);
private:
class Backend;
scoped_refptr<Backend> backend_;
scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_;
DISALLOW_COPY_AND_ASSIGN(RMQStore);
};
} // namespace gcm
#endif // GOOGLE_APIS_GCM_ENGINE_RMQ_STORE_H_

@ -0,0 +1,303 @@
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "google_apis/gcm/engine/rmq_store.h"
#include <string>
#include <vector>
#include "base/bind.h"
#include "base/files/file_path.h"
#include "base/files/scoped_temp_dir.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/strings/string_number_conversions.h"
#include "components/webdata/encryptor/encryptor.h"
#include "google_apis/gcm/base/mcs_message.h"
#include "google_apis/gcm/base/mcs_util.h"
#include "google_apis/gcm/protocol/mcs.pb.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace gcm {
namespace {
// Number of persistent ids to use in tests.
const int kNumPersistentIds = 10;
const uint64 kDeviceId = 22;
const uint64 kDeviceToken = 55;
class RMQStoreTest : public testing::Test {
public:
RMQStoreTest();
virtual ~RMQStoreTest();
scoped_ptr<RMQStore> BuildRMQStore();
std::string GetNextPersistentId();
void PumpLoop();
void LoadCallback(RMQStore::LoadResult* result_dst,
const RMQStore::LoadResult& result);
void UpdateCallback(bool success);
private:
base::MessageLoop message_loop_;
base::ScopedTempDir temp_directory_;
scoped_ptr<base::RunLoop> run_loop_;
};
RMQStoreTest::RMQStoreTest() {
EXPECT_TRUE(temp_directory_.CreateUniqueTempDir());
run_loop_.reset(new base::RunLoop());
// On OSX, prevent the Keychain permissions popup during unit tests.
#if defined(OS_MACOSX)
Encryptor::UseMockKeychain(true);
#endif
}
RMQStoreTest::~RMQStoreTest() {
}
scoped_ptr<RMQStore> RMQStoreTest::BuildRMQStore() {
return scoped_ptr<RMQStore>(new RMQStore(temp_directory_.path(),
message_loop_.message_loop_proxy()));
}
std::string RMQStoreTest::GetNextPersistentId() {
return base::Uint64ToString(base::Time::Now().ToInternalValue());
}
void RMQStoreTest::PumpLoop() {
message_loop_.RunUntilIdle();
}
void RMQStoreTest::LoadCallback(RMQStore::LoadResult* result_dst,
const RMQStore::LoadResult& result) {
ASSERT_TRUE(result.success);
*result_dst = result;
run_loop_->Quit();
run_loop_.reset(new base::RunLoop());
}
void RMQStoreTest::UpdateCallback(bool success) {
ASSERT_TRUE(success);
}
// Verify creating a new database and loading it.
TEST_F(RMQStoreTest, LoadNew) {
scoped_ptr<RMQStore> rmq_store(BuildRMQStore());
RMQStore::LoadResult load_result;
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_EQ(0U, load_result.device_android_id);
ASSERT_EQ(0U, load_result.device_security_token);
ASSERT_TRUE(load_result.incoming_messages.empty());
ASSERT_TRUE(load_result.outgoing_messages.empty());
}
TEST_F(RMQStoreTest, DeviceCredentials) {
scoped_ptr<RMQStore> rmq_store(BuildRMQStore());
RMQStore::LoadResult load_result;
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
rmq_store->SetDeviceCredentials(kDeviceId,
kDeviceToken,
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
rmq_store = BuildRMQStore().Pass();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_EQ(kDeviceId, load_result.device_android_id);
ASSERT_EQ(kDeviceToken, load_result.device_security_token);
}
// Verify saving some incoming messages, reopening the directory, and then
// removing those incoming messages.
TEST_F(RMQStoreTest, IncomingMessages) {
scoped_ptr<RMQStore> rmq_store(BuildRMQStore());
RMQStore::LoadResult load_result;
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
std::vector<std::string> persistent_ids;
for (int i = 0; i < kNumPersistentIds; ++i) {
persistent_ids.push_back(GetNextPersistentId());
rmq_store->AddIncomingMessage(persistent_ids.back(),
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
}
rmq_store = BuildRMQStore().Pass();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_EQ(persistent_ids, load_result.incoming_messages);
ASSERT_TRUE(load_result.outgoing_messages.empty());
rmq_store->RemoveIncomingMessages(persistent_ids,
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
rmq_store = BuildRMQStore().Pass();
load_result.incoming_messages.clear();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_TRUE(load_result.incoming_messages.empty());
ASSERT_TRUE(load_result.outgoing_messages.empty());
}
// Verify saving some outgoing messages, reopening the directory, and then
// removing those outgoing messages.
TEST_F(RMQStoreTest, OutgoingMessages) {
scoped_ptr<RMQStore> rmq_store(BuildRMQStore());
RMQStore::LoadResult load_result;
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
std::vector<std::string> persistent_ids;
const int kNumPersistentIds = 10;
for (int i = 0; i < kNumPersistentIds; ++i) {
persistent_ids.push_back(GetNextPersistentId());
mcs_proto::DataMessageStanza message;
message.set_from(persistent_ids.back());
message.set_category(persistent_ids.back());
rmq_store->AddOutgoingMessage(persistent_ids.back(),
MCSMessage(message),
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
}
rmq_store = BuildRMQStore().Pass();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_TRUE(load_result.incoming_messages.empty());
ASSERT_EQ(load_result.outgoing_messages.size(), persistent_ids.size());
for (int i =0 ; i < kNumPersistentIds; ++i) {
std::string id = persistent_ids[i];
ASSERT_TRUE(load_result.outgoing_messages[id]);
const mcs_proto::DataMessageStanza* message =
reinterpret_cast<mcs_proto::DataMessageStanza *>(
load_result.outgoing_messages[id]);
ASSERT_EQ(message->from(), id);
ASSERT_EQ(message->category(), id);
}
rmq_store->RemoveOutgoingMessages(persistent_ids,
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
rmq_store = BuildRMQStore().Pass();
load_result.outgoing_messages.clear();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_TRUE(load_result.incoming_messages.empty());
ASSERT_TRUE(load_result.outgoing_messages.empty());
}
// Verify incoming and outgoing messages don't conflict.
TEST_F(RMQStoreTest, IncomingAndOutgoingMessages) {
scoped_ptr<RMQStore> rmq_store(BuildRMQStore());
RMQStore::LoadResult load_result;
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
std::vector<std::string> persistent_ids;
const int kNumPersistentIds = 10;
for (int i = 0; i < kNumPersistentIds; ++i) {
persistent_ids.push_back(GetNextPersistentId());
rmq_store->AddIncomingMessage(persistent_ids.back(),
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
mcs_proto::DataMessageStanza message;
message.set_from(persistent_ids.back());
message.set_category(persistent_ids.back());
rmq_store->AddOutgoingMessage(persistent_ids.back(),
MCSMessage(message),
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
}
rmq_store = BuildRMQStore().Pass();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_EQ(persistent_ids, load_result.incoming_messages);
ASSERT_EQ(load_result.outgoing_messages.size(), persistent_ids.size());
for (int i =0 ; i < kNumPersistentIds; ++i) {
std::string id = persistent_ids[i];
ASSERT_TRUE(load_result.outgoing_messages[id]);
const mcs_proto::DataMessageStanza* message =
reinterpret_cast<mcs_proto::DataMessageStanza *>(
load_result.outgoing_messages[id]);
ASSERT_EQ(message->from(), id);
ASSERT_EQ(message->category(), id);
}
rmq_store->RemoveIncomingMessages(persistent_ids,
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
rmq_store->RemoveOutgoingMessages(persistent_ids,
base::Bind(&RMQStoreTest::UpdateCallback,
base::Unretained(this)));
PumpLoop();
rmq_store = BuildRMQStore().Pass();
load_result.incoming_messages.clear();
load_result.outgoing_messages.clear();
rmq_store->Load(base::Bind(&RMQStoreTest::LoadCallback,
base::Unretained(this),
&load_result));
PumpLoop();
ASSERT_TRUE(load_result.incoming_messages.empty());
ASSERT_TRUE(load_result.outgoing_messages.empty());
}
} // namespace
} // namespace gcm

@ -31,16 +31,22 @@
'dependencies': [
'../../base/base.gyp:base',
'../../base/third_party/dynamic_annotations/dynamic_annotations.gyp:dynamic_annotations',
'../../components/components.gyp:encryptor',
'../../net/net.gyp:net',
'../../third_party/leveldatabase/leveldatabase.gyp:leveldatabase',
'../../third_party/protobuf/protobuf.gyp:protobuf_lite'
],
'sources': [
'base/mcs_message.h',
'base/mcs_message.cc',
'base/mcs_util.h',
'base/mcs_util.cc',
'base/socket_stream.h',
'base/socket_stream.cc',
'engine/connection_handler.h',
'engine/connection_handler.cc',
'engine/rmq_store.h',
'engine/rmq_store.cc',
'gcm_client.cc',
'gcm_client.h',
'gcm_client_impl.cc',
@ -63,6 +69,7 @@
'dependencies': [
'../../base/base.gyp:run_all_unittests',
'../../base/base.gyp:base',
'../../components/components.gyp:encryptor',
'../../net/net.gyp:net_test_support',
'../../testing/gtest.gyp:gtest',
'../../third_party/protobuf/protobuf.gyp:protobuf_lite',
@ -72,6 +79,7 @@
'base/mcs_util_unittest.cc',
'base/socket_stream_unittest.cc',
'engine/connection_handler_unittest.cc',
'engine/rmq_store_unittest.cc',
]
},
],