blob: 6a96ffcb191a84b9eb05ef3589b0fd8a61a2b5e8 [file] [log] [blame]
alanlxl30f15bd2020-08-11 21:26:12 +10001// Copyright 2020 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "federated/storage_manager_impl.h"
6
alanlxl9d26c1c2020-08-21 13:42:36 +10007#include <cstddef>
8#include <memory>
9
10#include <base/files/file_path.h>
11#include <base/files/file_util.h>
alanlxl30f15bd2020-08-11 21:26:12 +100012#include <base/logging.h>
13#include <base/no_destructor.h>
alanlxl9d26c1c2020-08-21 13:42:36 +100014#include <base/strings/stringprintf.h>
15
16#include "federated/session_manager_proxy.h"
17#include "federated/utils.h"
alanlxl30f15bd2020-08-11 21:26:12 +100018
19namespace federated {
20
alanlxl9d26c1c2020-08-21 13:42:36 +100021void StorageManagerImpl::InitializeSessionManagerProxy(dbus::Bus* bus) {
22 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
23 DCHECK(!session_manager_proxy_)
24 << "session_manager_proxy is already initialized!";
25 DCHECK(bus);
26 session_manager_proxy_ = std::make_unique<SessionManagerProxy>(
27 std::make_unique<org::chromium::SessionManagerInterfaceProxy>(bus));
28
29 session_manager_proxy_->AddObserver(this);
30 // If session already started, connect to database.
31 if (session_manager_proxy_->RetrieveSessionState() == kSessionStartedState) {
32 ConnectToDatabaseIfNecessary();
33 }
34}
35
alanlxl30f15bd2020-08-11 21:26:12 +100036bool StorageManagerImpl::OnExampleReceived(
37 const std::string& client_name, const std::string& serialized_example) {
38 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
alanlxl9d26c1c2020-08-21 13:42:36 +100039 if (!example_database_ || !example_database_->IsOpen()) {
40 VLOG(1) << "No database connection.";
41 return false;
42 }
43
44 ExampleRecord example_record;
45 example_record.client_name = client_name;
46 example_record.serialized_example = serialized_example;
47 example_record.timestamp = base::Time::Now();
48
49 return example_database_->InsertExample(example_record);
alanlxl30f15bd2020-08-11 21:26:12 +100050}
51
52bool StorageManagerImpl::PrepareStreamingForClient(
53 const std::string& client_name) {
54 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
alanlxl9d26c1c2020-08-21 13:42:36 +100055 if (!example_database_ || !example_database_->IsOpen()) {
56 LOG(ERROR) << "No database connection.";
57 return false;
58 }
59 last_seen_example_id_ = 0;
60 streaming_client_name_ = client_name;
61 return example_database_->PrepareStreamingForClient(
62 client_name, kMaxStreamingExampleCount);
63}
64
65bool StorageManagerImpl::GetNextExample(std::string* example,
66 bool* end_of_iterator) {
67 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
68
69 if (!example_database_ || !example_database_->IsOpen()) {
70 VLOG(1) << "No database connection.";
71 return false;
72 }
73 *end_of_iterator = false;
74 auto maybe_example_record = example_database_->GetNextStreamedRecord();
75 if (maybe_example_record == base::nullopt) {
76 *end_of_iterator = true;
77 } else {
78 last_seen_example_id_ = maybe_example_record.value().id;
79 *example = maybe_example_record.value().serialized_example;
80 }
81
alanlxl30f15bd2020-08-11 21:26:12 +100082 return true;
83}
84
alanlxl9d26c1c2020-08-21 13:42:36 +100085bool StorageManagerImpl::CloseStreaming(bool clean_examples) {
alanlxl30f15bd2020-08-11 21:26:12 +100086 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
alanlxl9d26c1c2020-08-21 13:42:36 +100087 if (!example_database_ || !example_database_->IsOpen()) {
88 VLOG(1) << "No database connection!";
89 return true;
90 }
91
92 example_database_->CloseStreaming();
93
94 return !clean_examples ||
95 example_database_->DeleteExamplesWithSmallerIdForClient(
96 streaming_client_name_, last_seen_example_id_);
alanlxl30f15bd2020-08-11 21:26:12 +100097}
98
alanlxl9d26c1c2020-08-21 13:42:36 +100099void StorageManagerImpl::OnSessionStarted() {
alanlxl30f15bd2020-08-11 21:26:12 +1000100 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
alanlxl9d26c1c2020-08-21 13:42:36 +1000101 ConnectToDatabaseIfNecessary();
102}
alanlxl30f15bd2020-08-11 21:26:12 +1000103
alanlxl9d26c1c2020-08-21 13:42:36 +1000104void StorageManagerImpl::OnSessionStopped() {
105 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
106 example_database_.reset();
107}
108
109void StorageManagerImpl::ConnectToDatabaseIfNecessary() {
110 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
111 std::string new_sanitized_username =
112 session_manager_proxy_->GetSanitizedUsername();
113 if (new_sanitized_username.empty()) {
114 VLOG(1) << "Sanitized_username is empty, disconnect the database.";
115 example_database_.reset();
116 return;
117 }
118
119 if (example_database_ && example_database_->IsOpen() &&
120 new_sanitized_username == sanitized_username_) {
121 VLOG(1) << "Database for user " << sanitized_username_
122 << " is already connected, nothing changed.";
123 return;
124 }
125
126 sanitized_username_ = new_sanitized_username;
127 auto db_path = GetDatabasePath(sanitized_username_);
128 // TODO(alanlxl): temp clients for test.
129 example_database_.reset(
130 new ExampleDatabase(db_path, {"analytics_test_population"}));
131
132 if (!example_database_->Init()) {
133 LOG(ERROR) << "Failed to connect to database for user "
134 << sanitized_username_;
135 example_database_.reset();
136 } else if (!example_database_->CheckIntegrity()) {
137 LOG(ERROR) << "Failed to verify the database integrity for user "
138 << sanitized_username_ << ", delete the existing db file.";
139 if (!base::DeleteFile(db_path)) {
140 LOG(ERROR) << "Failed to delete corrupted db file " << db_path.value();
141 }
142 example_database_.reset();
143 }
alanlxl30f15bd2020-08-11 21:26:12 +1000144}
145
146StorageManager* StorageManager::GetInstance() {
147 static base::NoDestructor<StorageManagerImpl> storage_manager;
148 return storage_manager.get();
149}
alanlxl30f15bd2020-08-11 21:26:12 +1000150} // namespace federated