blob: f1238969a1d71805945eec5236b54a610928685f [file] [log] [blame]
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +00001/*
2 * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "data_log.h"
12
13#include <assert.h>
14
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +000015#include <algorithm>
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +000016#include <list>
17
18#include "critical_section_wrapper.h"
19#include "event_wrapper.h"
20#include "file_wrapper.h"
21#include "rw_lock_wrapper.h"
22#include "thread_wrapper.h"
23
24namespace webrtc {
25
26DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
27 CriticalSectionWrapper::CreateCriticalSection());
28
29DataLogImpl* DataLogImpl::instance_ = NULL;
30
31// A Row contains cells, which are indexed by the column names as std::string.
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +000032// The string index is treated in a case sensitive way.
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +000033class Row {
34 public:
35 Row();
36 ~Row();
37
38 // Inserts a Container into the cell of the column specified with
39 // column_name.
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +000040 // column_name is treated in a case sensitive way.
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +000041 int InsertCell(const std::string& column_name,
42 const Container* value_container);
43
44 // Converts the value at the column specified by column_name to a string
45 // stored in value_string.
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +000046 // column_name is treated in a case sensitive way.
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +000047 void ToString(const std::string& column_name, std::string* value_string);
48
49 private:
50 // Collection of containers indexed by column name as std::string
51 typedef std::map<std::string, const Container*> CellMap;
52
53 CellMap cells_;
54 CriticalSectionWrapper* cells_lock_;
55};
56
57// A LogTable contains multiple rows, where only the latest row is active for
58// editing. The rows are defined by the ColumnMap, which contains the name of
59// each column and the length of the column (1 for one-value-columns and greater
60// than 1 for multi-value-columns).
61class LogTable {
62 public:
63 LogTable();
64 ~LogTable();
65
66 // Adds the column with name column_name to the table. The column will be a
67 // multi-value-column if multi_value_length is greater than 1.
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +000068 // column_name is treated in a case sensitive way.
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +000069 int AddColumn(const std::string& column_name, int multi_value_length);
70
71 // Buffers the current row while it is waiting to be written to file,
72 // which is done by a call to Flush(). A new row is available when the
73 // function returns
74 void NextRow();
75
76 // Inserts a Container into the cell of the column specified with
77 // column_name.
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +000078 // column_name is treated in a case sensitive way.
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +000079 int InsertCell(const std::string& column_name,
80 const Container* value_container);
81
82 // Creates a log file, named as specified in the string file_name, to
83 // where the table will be written when calling Flush().
84 int CreateLogFile(const std::string& file_name);
85
86 // Write all complete rows to file.
87 // May not be called by two threads simultaneously (doing so may result in
88 // a race condition). Will be called by the file_writer_thread_ when that
89 // thread is running.
90 void Flush();
91
92 private:
93 // Collection of multi_value_lengths indexed by column name as std::string
94 typedef std::map<std::string, int> ColumnMap;
95 typedef std::list<Row*> RowList;
96
97 ColumnMap columns_;
98 RowList rows_[2];
99 RowList* rows_history_;
100 RowList* rows_flush_;
101 Row* current_row_;
102 FileWrapper* file_;
103 bool write_header_;
104 CriticalSectionWrapper* table_lock_;
105};
106
107Row::Row()
108 : cells_(),
109 cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
110}
111
112Row::~Row() {
113 for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
114 delete it->second;
115 // For maps all iterators (except the erased) are valid after an erase
116 cells_.erase(it++);
117 }
118 delete cells_lock_;
119}
120
121int Row::InsertCell(const std::string& column_name,
122 const Container* value_container) {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000123 CriticalSectionScoped synchronize(cells_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000124 assert(cells_.count(column_name) == 0);
125 if (cells_.count(column_name) > 0)
126 return -1;
127 cells_[column_name] = value_container;
128 return 0;
129}
130
131void Row::ToString(const std::string& column_name,
132 std::string* value_string) {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000133 CriticalSectionScoped synchronize(cells_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000134 const Container* container = cells_[column_name];
135 if (container == NULL) {
136 *value_string = "NaN,";
137 return;
138 }
139 container->ToString(value_string);
140}
141
142LogTable::LogTable()
143 : columns_(),
144 rows_(),
145 rows_history_(&rows_[0]),
146 rows_flush_(&rows_[1]),
147 current_row_(new Row),
148 file_(FileWrapper::Create()),
149 write_header_(true),
150 table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
151}
152
153LogTable::~LogTable() {
154 for (RowList::iterator row_it = rows_history_->begin();
155 row_it != rows_history_->end();) {
156 delete *row_it;
157 row_it = rows_history_->erase(row_it);
158 }
159 for (ColumnMap::iterator col_it = columns_.begin();
160 col_it != columns_.end();) {
161 // For maps all iterators (except the erased) are valid after an erase
162 columns_.erase(col_it++);
163 }
164 if (file_ != NULL) {
165 file_->Flush();
166 file_->CloseFile();
167 delete file_;
168 }
169 delete current_row_;
170 delete table_lock_;
171}
172
173int LogTable::AddColumn(const std::string& column_name,
174 int multi_value_length) {
175 assert(multi_value_length > 0);
176 if (!write_header_) {
177 // It's not allowed to add new columns after the header
178 // has been written.
179 assert(false);
180 return -1;
181 } else {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000182 CriticalSectionScoped synchronize(table_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000183 if (write_header_)
184 columns_[column_name] = multi_value_length;
185 else
186 return -1;
187 }
188 return 0;
189}
190
191void LogTable::NextRow() {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000192 CriticalSectionScoped sync_rows(table_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000193 rows_history_->push_back(current_row_);
194 current_row_ = new Row;
195}
196
197int LogTable::InsertCell(const std::string& column_name,
198 const Container* value_container) {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000199 CriticalSectionScoped synchronize(table_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000200 assert(columns_.count(column_name) > 0);
201 if (columns_.count(column_name) == 0)
202 return -1;
203 return current_row_->InsertCell(column_name, value_container);
204}
205
206int LogTable::CreateLogFile(const std::string& file_name) {
207 if (file_name.length() == 0)
208 return -1;
209 if (file_->Open())
210 return -1;
211 file_->OpenFile(file_name.c_str(),
212 false, // Open with read/write permissions
213 false, // Don't wraparound and write at the beginning when
214 // the file is full
215 true); // Open as a text file
216 if (file_ == NULL)
217 return -1;
218 return 0;
219}
220
221void LogTable::Flush() {
222 ColumnMap::iterator column_it;
223 bool commit_header = false;
224 if (write_header_) {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000225 CriticalSectionScoped synchronize(table_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000226 if (write_header_) {
227 commit_header = true;
228 write_header_ = false;
229 }
230 }
231 if (commit_header) {
232 for (column_it = columns_.begin();
233 column_it != columns_.end(); ++column_it) {
234 if (column_it->second > 1) {
235 file_->WriteText("%s[%u],", column_it->first.c_str(),
236 column_it->second);
237 for (int i = 1; i < column_it->second; ++i)
238 file_->WriteText(",");
239 } else {
240 file_->WriteText("%s,", column_it->first.c_str());
241 }
242 }
243 if (columns_.size() > 0)
244 file_->WriteText("\n");
245 }
246
247 // Swap the list used for flushing with the list containing the row history
248 // and clear the history. We also create a local pointer to the new
249 // list used for flushing to avoid race conditions if another thread
250 // calls this function while we are writing.
251 // We don't want to block the list while we're writing to file.
252 {
henrike@webrtc.orgbfa80ce2011-12-15 17:59:58 +0000253 CriticalSectionScoped synchronize(table_lock_);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000254 RowList* tmp = rows_flush_;
255 rows_flush_ = rows_history_;
256 rows_history_ = tmp;
257 rows_history_->clear();
258 }
259
260 // Write all complete rows to file and delete them
261 for (RowList::iterator row_it = rows_flush_->begin();
262 row_it != rows_flush_->end();) {
263 for (column_it = columns_.begin();
264 column_it != columns_.end(); ++column_it) {
265 std::string row_string;
266 (*row_it)->ToString(column_it->first, &row_string);
267 file_->WriteText("%s", row_string.c_str());
268 }
269 if (columns_.size() > 0)
270 file_->WriteText("\n");
271 delete *row_it;
272 row_it = rows_flush_->erase(row_it);
273 }
274}
275
276int DataLog::CreateLog() {
277 return DataLogImpl::CreateLog();
278}
279
280void DataLog::ReturnLog() {
281 return DataLogImpl::ReturnLog();
282}
283
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +0000284std::string DataLog::Combine(const std::string& table_name, int table_id) {
285 std::stringstream ss;
henrik.lundin@webrtc.org5dedd0e2011-10-18 05:45:08 +0000286 std::string combined_id = table_name;
287 std::string number_suffix;
288 ss << "_" << table_id;
289 ss >> number_suffix;
290 combined_id += number_suffix;
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +0000291 std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
292 ::tolower);
293 return combined_id;
294}
295
296int DataLog::AddTable(const std::string& table_name) {
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000297 DataLogImpl* data_log = DataLogImpl::StaticInstance();
298 if (data_log == NULL)
299 return -1;
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +0000300 return data_log->AddTable(table_name);
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000301}
302
303int DataLog::AddColumn(const std::string& table_name,
304 const std::string& column_name,
305 int multi_value_length) {
306 DataLogImpl* data_log = DataLogImpl::StaticInstance();
307 if (data_log == NULL)
308 return -1;
309 return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
310 column_name,
311 multi_value_length);
312}
313
314int DataLog::NextRow(const std::string& table_name) {
315 DataLogImpl* data_log = DataLogImpl::StaticInstance();
316 if (data_log == NULL)
317 return -1;
318 return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
319}
320
321DataLogImpl::DataLogImpl()
322 : counter_(1),
323 tables_(),
324 flush_event_(EventWrapper::Create()),
325 file_writer_thread_(NULL),
326 tables_lock_(RWLockWrapper::CreateRWLock()) {
327}
328
329DataLogImpl::~DataLogImpl() {
330 StopThread();
331 Flush(); // Write any remaining rows
332 delete file_writer_thread_;
333 delete flush_event_;
334 for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
335 delete static_cast<LogTable*>(it->second);
336 // For maps all iterators (except the erased) are valid after an erase
337 tables_.erase(it++);
338 }
339 delete tables_lock_;
340}
341
342int DataLogImpl::CreateLog() {
andrew@webrtc.org04f5cba2011-12-15 21:33:11 +0000343 CriticalSectionScoped synchronize(crit_sect_.get());
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000344 if (instance_ == NULL) {
345 instance_ = new DataLogImpl();
346 return instance_->Init();
347 } else {
348 ++instance_->counter_;
349 }
350 return 0;
351}
352
353int DataLogImpl::Init() {
354 file_writer_thread_ = ThreadWrapper::CreateThread(
355 DataLogImpl::Run,
356 instance_,
357 kHighestPriority,
358 "DataLog");
359 if (file_writer_thread_ == NULL)
360 return -1;
361 unsigned int thread_id = 0;
362 bool success = file_writer_thread_->Start(thread_id);
363 if (!success)
364 return -1;
365 return 0;
366}
367
368DataLogImpl* DataLogImpl::StaticInstance() {
369 return instance_;
370}
371
372void DataLogImpl::ReturnLog() {
andrew@webrtc.org04f5cba2011-12-15 21:33:11 +0000373 CriticalSectionScoped synchronize(crit_sect_.get());
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000374 if (instance_ && instance_->counter_ > 1) {
375 --instance_->counter_;
376 return;
377 }
378 delete instance_;
379 instance_ = NULL;
380}
381
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +0000382int DataLogImpl::AddTable(const std::string& table_name) {
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000383 WriteLockScoped synchronize(*tables_lock_);
384 // Make sure we don't add a table which already exists
385 if (tables_.count(table_name) > 0)
386 return -1;
387 tables_[table_name] = new LogTable();
stefan@webrtc.org3bbe41a2011-09-07 15:31:03 +0000388 if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
stefan@webrtc.orgc9cff242011-08-29 07:39:02 +0000389 return -1;
390 return 0;
391}
392
393int DataLogImpl::AddColumn(const std::string& table_name,
394 const std::string& column_name,
395 int multi_value_length) {
396 ReadLockScoped synchronize(*tables_lock_);
397 if (tables_.count(table_name) == 0)
398 return -1;
399 return tables_[table_name]->AddColumn(column_name, multi_value_length);
400}
401
402int DataLogImpl::InsertCell(const std::string& table_name,
403 const std::string& column_name,
404 const Container* value_container) {
405 ReadLockScoped synchronize(*tables_lock_);
406 assert(tables_.count(table_name) > 0);
407 if (tables_.count(table_name) == 0)
408 return -1;
409 return tables_[table_name]->InsertCell(column_name, value_container);
410}
411
412int DataLogImpl::NextRow(const std::string& table_name) {
413 ReadLockScoped synchronize(*tables_lock_);
414 if (tables_.count(table_name) == 0)
415 return -1;
416 tables_[table_name]->NextRow();
417 if (file_writer_thread_ == NULL) {
418 // Write every row to file as they get complete.
419 tables_[table_name]->Flush();
420 } else {
421 // Signal a complete row
422 flush_event_->Set();
423 }
424 return 0;
425}
426
427void DataLogImpl::Flush() {
428 ReadLockScoped synchronize(*tables_lock_);
429 for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
430 it->second->Flush();
431 }
432}
433
434bool DataLogImpl::Run(void* obj) {
435 static_cast<DataLogImpl*>(obj)->Process();
436 return true;
437}
438
439void DataLogImpl::Process() {
440 // Wait for a row to be complete
441 flush_event_->Wait(WEBRTC_EVENT_INFINITE);
442 Flush();
443}
444
445void DataLogImpl::StopThread() {
446 if (file_writer_thread_ != NULL) {
447 file_writer_thread_->SetNotAlive();
448 flush_event_->Set();
449 // Call Stop() repeatedly, waiting for the Flush() call in Process() to
450 // finish.
451 while (!file_writer_thread_->Stop()) continue;
452 }
453}
454
455} // namespace webrtc