logd: yield in FlushTo() if a write is pending

In bug 164973960, we see a situation where a log reader thread spends
over a second sending logs to a new client, preventing the log writer
thread from handling new logs.  This eventually causes the logging
socket to back up and log messages to block until the reader thread
finishes.

logd.reader.per threads may hold the logd_lock for a significant
amount of time.  For example, when doing a 'tail' operation,
SerializedLogBuffer::FlushTo() will loop through every log message in
the log buffer with the lock held.  This can be 100000+ logs and
involve multiple compressions and decompressions.

This change adds a pending_write_ variable that
SerializedLogBuffer::Log() sets before attempting to acquire logd
lock.  During each loop in SerializedLogBuffer::FlushTo(), if this
variable is set, then we release logd_lock and wait on this variable
before attempting to reacquire it.  This ensures that logd.reader.per
threads will not block log writer threads.

Bug: 164973960
Test: logging unit tests
Test: view chromium-trace output showing logd.writer getting cycles
      while a logd.reader.per thread is running a 'tail' operation.
Change-Id: I940d14bb7e8cb2313a96b2a16cdea918ade8a37f
diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp
index aa80864..09e44d9 100644
--- a/logd/SerializedLogBuffer.cpp
+++ b/logd/SerializedLogBuffer.cpp
@@ -86,24 +86,30 @@
 
     auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed);
 
-    auto lock = std::lock_guard{logd_lock};
+    pending_write_ = true;
+    {
+        auto lock = std::lock_guard{logd_lock};
 
-    if (logs_[log_id].empty()) {
-        logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
+        if (logs_[log_id].empty()) {
+            logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
+        }
+
+        auto total_len = sizeof(SerializedLogEntry) + len;
+        if (!logs_[log_id].back().CanLog(total_len)) {
+            logs_[log_id].back().FinishWriting();
+            logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
+        }
+
+        auto entry = logs_[log_id].back().Log(sequence, realtime, uid, pid, tid, msg, len);
+        stats_->Add(entry->ToLogStatisticsElement(log_id));
+
+        MaybePrune(log_id);
+
+        reader_list_->NotifyNewLog(1 << log_id);
     }
+    pending_write_ = false;
+    FutexWake(&pending_write_);
 
-    auto total_len = sizeof(SerializedLogEntry) + len;
-    if (!logs_[log_id].back().CanLog(total_len)) {
-        logs_[log_id].back().FinishWriting();
-        logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
-    }
-
-    auto entry = logs_[log_id].back().Log(sequence, realtime, uid, pid, tid, msg, len);
-    stats_->Add(entry->ToLogStatisticsElement(log_id));
-
-    MaybePrune(log_id);
-
-    reader_list_->NotifyNewLog(1 << log_id);
     return len;
 }
 
@@ -205,7 +211,16 @@
                                          log_time realtime)>& filter) {
     auto& state = reinterpret_cast<SerializedFlushToState&>(abstract_state);
 
-    while (state.HasUnreadLogs()) {
+    while (true) {
+        if (pending_write_) {
+            logd_lock.unlock();
+            FutexWait(&pending_write_, true);
+            logd_lock.lock();
+        }
+
+        if (!state.HasUnreadLogs()) {
+            break;
+        }
         LogWithId top = state.PopNextUnreadLog();
         auto* entry = top.entry;
         auto log_id = top.log_id;