logd: track SerializedFlushToState instances within SerializedLogChunk

Previously, NotifyReadersOfPrune() would iterate through the list of
reader_threads_ to find SerializedFlushToState instances, but that
misses some instances, which leads to violating important CHECK()'s.

This change 'attaches' a SerializedFlushToState instance to a
SerializedLogChunk instance when it begins reading from it and
'detaches' it when it has finished with it.  This allows a new
SerializedLogChunk::NotifyReadersOfPrune() function to notify exactly
those SerializedFlushToState instances that are currently 'attached'
to it.

Bug: 169736426
Bug: 172279637
Test: logging unit tests
Change-Id: I0ab4979c8fe2b84a0e23fc5ea5fb6dd54fadc691
diff --git a/logd/SerializedFlushToState.cpp b/logd/SerializedFlushToState.cpp
index 30276ff..a514772 100644
--- a/logd/SerializedFlushToState.cpp
+++ b/logd/SerializedFlushToState.cpp
@@ -34,7 +34,7 @@
 SerializedFlushToState::~SerializedFlushToState() {
     log_id_for_each(i) {
         if (log_positions_[i]) {
-            log_positions_[i]->buffer_it->DecReaderRefCount();
+            log_positions_[i]->buffer_it->DetachReader(this);
         }
     }
 }
@@ -49,7 +49,7 @@
     if (it == logs_[log_id].end()) {
         --it;
     }
-    it->IncReaderRefCount();
+    it->AttachReader(this);
     log_position.buffer_it = it;
 
     // Find the offset of the first log with sequence number >= start().
@@ -80,9 +80,9 @@
             logs_needed_from_next_position_[log_id] = true;
         } else {
             // Otherwise, if there is another buffer piece, move to that and do the same check.
-            buffer_it->DecReaderRefCount();
+            buffer_it->DetachReader(this);
             ++buffer_it;
-            buffer_it->IncReaderRefCount();
+            buffer_it->AttachReader(this);
             log_positions_[log_id]->read_offset = 0;
             if (buffer_it->write_offset() == 0) {
                 logs_needed_from_next_position_[log_id] = true;
@@ -145,15 +145,11 @@
     return {log_id, entry};
 }
 
-void SerializedFlushToState::Prune(log_id_t log_id,
-                                   const std::list<SerializedLogChunk>::iterator& buffer_it) {
-    // If we don't have a position for this log or if we're not referencing buffer_it, ignore.
-    if (!log_positions_[log_id].has_value() || log_positions_[log_id]->buffer_it != buffer_it) {
-        return;
-    }
+void SerializedFlushToState::Prune(log_id_t log_id) {
+    CHECK(log_positions_[log_id].has_value());
 
     // Decrease the ref count since we're deleting our reference.
-    buffer_it->DecReaderRefCount();
+    log_positions_[log_id]->buffer_it->DetachReader(this);
 
     // Delete in the reference.
     log_positions_[log_id].reset();
diff --git a/logd/SerializedFlushToState.h b/logd/SerializedFlushToState.h
index 1a69f7b..8a0d0c5 100644
--- a/logd/SerializedFlushToState.h
+++ b/logd/SerializedFlushToState.h
@@ -61,8 +61,7 @@
 
     // If the parent log buffer prunes logs, the reference that this class contains may become
     // invalid, so this must be called first to drop the reference to buffer_it, if any.
-    void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it)
-            REQUIRES(logd_lock);
+    void Prune(log_id_t log_id) REQUIRES(logd_lock);
 
   private:
     // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread
diff --git a/logd/SerializedFlushToStateTest.cpp b/logd/SerializedFlushToStateTest.cpp
index f41de37..12904cb 100644
--- a/logd/SerializedFlushToStateTest.cpp
+++ b/logd/SerializedFlushToStateTest.cpp
@@ -306,5 +306,5 @@
     auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks};
     ASSERT_TRUE(state.HasUnreadLogs());
 
-    state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin());
+    state.Prune(LOG_ID_MAIN);
 }
diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp
index 09e44d9..5f1a77b 100644
--- a/logd/SerializedLogBuffer.cpp
+++ b/logd/SerializedLogBuffer.cpp
@@ -137,14 +137,6 @@
     chunk.DecReaderRefCount();
 }
 
-void SerializedLogBuffer::NotifyReadersOfPrune(
-        log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk) {
-    for (const auto& reader_thread : reader_list_->reader_threads()) {
-        auto& state = reinterpret_cast<SerializedFlushToState&>(reader_thread->flush_to_state());
-        state.Prune(log_id, chunk);
-    }
-}
-
 void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) {
     auto& log_buffer = logs_[log_id];
     auto it = log_buffer.begin();
@@ -180,7 +172,7 @@
 
         // Readers may have a reference to the chunk to track their last read log_position.
         // Notify them to delete the reference.
-        NotifyReadersOfPrune(log_id, it_to_prune);
+        it_to_prune->NotifyReadersOfPrune(log_id);
 
         if (uid != 0) {
             // Reorder the log buffer to remove logs from the given UID.  If there are no logs left
diff --git a/logd/SerializedLogBuffer.h b/logd/SerializedLogBuffer.h
index 2e57e73..1038571 100644
--- a/logd/SerializedLogBuffer.h
+++ b/logd/SerializedLogBuffer.h
@@ -58,8 +58,6 @@
     bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
     void MaybePrune(log_id_t log_id) REQUIRES(logd_lock);
     void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(logd_lock);
-    void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk)
-            REQUIRES(logd_lock);
     void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk);
     size_t GetSizeUsed(log_id_t id) REQUIRES(logd_lock);
 
diff --git a/logd/SerializedLogChunk.cpp b/logd/SerializedLogChunk.cpp
index 1ffe7a8..a512bf3 100644
--- a/logd/SerializedLogChunk.cpp
+++ b/logd/SerializedLogChunk.cpp
@@ -19,6 +19,7 @@
 #include <android-base/logging.h>
 
 #include "CompressionEngine.h"
+#include "SerializedFlushToState.h"
 
 SerializedLogChunk::~SerializedLogChunk() {
     CHECK_EQ(reader_ref_count_, 0U);
@@ -52,6 +53,26 @@
     }
 }
 
+void SerializedLogChunk::AttachReader(SerializedFlushToState* reader) {
+    readers_.emplace_back(reader);
+    IncReaderRefCount();
+}
+
+void SerializedLogChunk::DetachReader(SerializedFlushToState* reader) {
+    auto it = std::find(readers_.begin(), readers_.end(), reader);
+    CHECK(readers_.end() != it);
+    readers_.erase(it);
+    DecReaderRefCount();
+}
+
+void SerializedLogChunk::NotifyReadersOfPrune(log_id_t log_id) {
+    // Readers will call DetachReader() in their Prune() call, so we make a copy of the list first.
+    auto readers = readers_;
+    for (auto& reader : readers) {
+        reader->Prune(log_id);
+    }
+}
+
 bool SerializedLogChunk::ClearUidLogs(uid_t uid, log_id_t log_id, LogStatistics* stats) {
     CHECK_EQ(reader_ref_count_, 0U);
     if (write_offset_ == 0) {
diff --git a/logd/SerializedLogChunk.h b/logd/SerializedLogChunk.h
index 645433d..2318a2d 100644
--- a/logd/SerializedLogChunk.h
+++ b/logd/SerializedLogChunk.h
@@ -18,12 +18,17 @@
 
 #include <sys/types.h>
 
+#include <vector>
+
 #include <android-base/logging.h>
 
 #include "LogWriter.h"
+#include "LogdLock.h"
 #include "SerializedData.h"
 #include "SerializedLogEntry.h"
 
+class SerializedFlushToState;
+
 class SerializedLogChunk {
   public:
     explicit SerializedLogChunk(size_t size) : contents_(size) {}
@@ -33,6 +38,10 @@
     void Compress();
     void IncReaderRefCount();
     void DecReaderRefCount();
+    void AttachReader(SerializedFlushToState* reader);
+    void DetachReader(SerializedFlushToState* reader);
+
+    void NotifyReadersOfPrune(log_id_t log_id) REQUIRES(logd_lock);
 
     // Must have no readers referencing this.  Return true if there are no logs left in this chunk.
     bool ClearUidLogs(uid_t uid, log_id_t log_id, LogStatistics* stats);
@@ -76,4 +85,5 @@
     bool writer_active_ = true;
     uint64_t highest_sequence_number_ = 1;
     SerializedData compressed_log_;
+    std::vector<SerializedFlushToState*> readers_;
 };