logd: remove min heap in SerializedFlushToState

There was a bug in SerializedFlushToState::Prune() caused by an access
to a SerializedLogEntry raw pointer as a member of a MinHeapElement,
which was deleted earlier in the function.

Instead of just fixing the order of the access and the deletion, I
sought out to remove the raw pointer entirely.  In doing so, I noticed
that the min heap doesn't provide significant benefit, since we'll
only ever have 8 log buffers so scalability is not an issue.

Therefore this change removes the min heap entirely and uses the
existing log_position_ and logs_needed_from_next_position_ members to
keep track of which are the next unread logs.

It also adds a smoke test for SerializedFlushToState::Prune() and
additional CHECK() statements to help prevent future errors.

Bug: 168869299
Test: unit tests
Change-Id: Id4d5fdbaff2fe6dc49c38f01e73f900f84d3696b
diff --git a/logd/SerializedFlushToState.cpp b/logd/SerializedFlushToState.cpp
index b02ccc3..378cf20 100644
--- a/logd/SerializedFlushToState.cpp
+++ b/logd/SerializedFlushToState.cpp
@@ -16,6 +16,8 @@
 
 #include "SerializedFlushToState.h"
 
+#include <limits>
+
 #include <android-base/logging.h>
 
 SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
@@ -63,14 +65,13 @@
     log_positions_[log_id].emplace(log_position);
 }
 
-void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) {
+void SerializedFlushToState::UpdateLogsNeeded(log_id_t log_id) {
     auto& buffer_it = log_positions_[log_id]->buffer_it;
     auto read_offset = log_positions_[log_id]->read_offset;
 
-    // If there is another log to read in this buffer, add it to the min heap.
+    // If there is another log to read in this buffer, let it be read.
     if (read_offset < buffer_it->write_offset()) {
-        auto* entry = buffer_it->log_entry(read_offset);
-        min_heap_.emplace(log_id, entry);
+        logs_needed_from_next_position_[log_id] = false;
     } else if (read_offset == buffer_it->write_offset()) {
         // If there are no more logs to read in this buffer and it's the last buffer, then
         // set logs_needed_from_next_position_ to wait until more logs get logged.
@@ -85,8 +86,7 @@
             if (buffer_it->write_offset() == 0) {
                 logs_needed_from_next_position_[log_id] = true;
             } else {
-                auto* entry = buffer_it->log_entry(0);
-                min_heap_.emplace(log_id, entry);
+                logs_needed_from_next_position_[log_id] = false;
             }
         }
     } else {
@@ -106,24 +106,41 @@
             }
             CreateLogPosition(i);
         }
-        logs_needed_from_next_position_[i] = false;
-        // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true.
-        AddMinHeapEntry(i);
+        UpdateLogsNeeded(i);
     }
 }
 
-MinHeapElement SerializedFlushToState::PopNextUnreadLog() {
-    auto top = min_heap_.top();
-    min_heap_.pop();
+bool SerializedFlushToState::HasUnreadLogs() {
+    CheckForNewLogs();
+    log_id_for_each(i) {
+        if (log_positions_[i] && !logs_needed_from_next_position_[i]) {
+            return true;
+        }
+    }
+    return false;
+}
 
-    auto* entry = top.entry;
-    auto log_id = top.log_id;
+LogWithId SerializedFlushToState::PopNextUnreadLog() {
+    uint64_t min_sequence = std::numeric_limits<uint64_t>::max();
+    log_id_t log_id;
+    const SerializedLogEntry* entry = nullptr;
+    log_id_for_each(i) {
+        if (!log_positions_[i] || logs_needed_from_next_position_[i]) {
+            continue;
+        }
+        if (log_positions_[i]->log_entry()->sequence() < min_sequence) {
+            log_id = i;
+            entry = log_positions_[i]->log_entry();
+            min_sequence = entry->sequence();
+        }
+    }
+    CHECK_NE(nullptr, entry);
 
     log_positions_[log_id]->read_offset += entry->total_len();
 
     logs_needed_from_next_position_[log_id] = true;
 
-    return top;
+    return {log_id, entry};
 }
 
 void SerializedFlushToState::Prune(log_id_t log_id,
@@ -133,25 +150,12 @@
         return;
     }
 
-    // // Decrease the ref count since we're deleting our reference.
+    // Decrease the ref count since we're deleting our reference.
     buffer_it->DecReaderRefCount();
 
     // Delete in the reference.
     log_positions_[log_id].reset();
 
-    // Remove the MinHeapElement referencing log_id, if it exists, but retain the others.
-    std::vector<MinHeapElement> old_elements;
-    while (!min_heap_.empty()) {
-        auto& element = min_heap_.top();
-        if (element.log_id != log_id) {
-            old_elements.emplace_back(element);
-        }
-        min_heap_.pop();
-    }
-    for (auto&& element : old_elements) {
-        min_heap_.emplace(element);
-    }
-
     // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the
     // log_position_ object during the next read.
     logs_needed_from_next_position_[log_id] = true;
diff --git a/logd/SerializedFlushToState.h b/logd/SerializedFlushToState.h
index 0b20822..c953a16 100644
--- a/logd/SerializedFlushToState.h
+++ b/logd/SerializedFlushToState.h
@@ -27,26 +27,19 @@
 struct LogPosition {
     std::list<SerializedLogChunk>::iterator buffer_it;
     int read_offset;
+
+    const SerializedLogEntry* log_entry() const { return buffer_it->log_entry(read_offset); }
 };
 
-struct MinHeapElement {
-    MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry)
-        : log_id(log_id), entry(entry) {}
+struct LogWithId {
     log_id_t log_id;
     const SerializedLogEntry* entry;
-    // The change of comparison operators is intentional, std::priority_queue uses operator<() to
-    // compare but creates a max heap.  Since we want a min heap, we return the opposite result.
-    bool operator<(const MinHeapElement& rhs) const {
-        return entry->sequence() > rhs.entry->sequence();
-    }
 };
 
 // This class tracks the specific point where a FlushTo client has read through the logs.  It
 // directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset
 // into each log chunk where it has last read.  All interactions with this class, except for its
-// construction, must be done with SerializedLogBuffer::lock_ held.  No log chunks that it
-// references may be pruned, which is handled by ensuring prune does not touch any log chunk with
-// highest sequence number greater or equal to start().
+// construction, must be done with SerializedLogBuffer::lock_ held.
 class SerializedFlushToState : public FlushToState {
   public:
     // Initializes this state object.  For each log buffer set in log_mask, this sets
@@ -61,31 +54,29 @@
         if (logs_ == nullptr) logs_ = logs;
     }
 
-    bool HasUnreadLogs() {
-        CheckForNewLogs();
-        return !min_heap_.empty();
-    }
+    // Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if
+    // there are any unread logs, false otherwise.
+    bool HasUnreadLogs();
 
-    // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to
-    // indicate that we're waiting for more logs from the associated log buffer.
-    MinHeapElement PopNextUnreadLog();
+    // Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're
+    // waiting for more logs from the associated log buffer.
+    LogWithId PopNextUnreadLog();
 
     // If the parent log buffer prunes logs, the reference that this class contains may become
     // invalid, so this must be called first to drop the reference to buffer_it, if any.
     void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
 
   private:
-    // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the
-    // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're
-    // waiting for the next log.
-    void AddMinHeapEntry(log_id_t log_id);
+    // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread
+    // log or to the point at which the next log will appear.
+    void UpdateLogsNeeded(log_id_t log_id);
 
     // Create a LogPosition object for the given log_id by searching through the log chunks for the
     // first chunk and then first log entry within that chunk that is greater or equal to start().
     void CreateLogPosition(log_id_t log_id);
 
     // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
-    // calls AddMinHeapEntry() if so.
+    // calls UpdateLogsNeeded() if so.
     void CheckForNewLogs();
 
     std::list<SerializedLogChunk>* logs_ = nullptr;
@@ -97,7 +88,4 @@
     // next_log_position == logs_write_position_)`.  These will be re-checked in each
     // loop in case new logs came in.
     std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
-    // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next
-    // element that this reader should read.
-    std::priority_queue<MinHeapElement> min_heap_;
 };
diff --git a/logd/SerializedFlushToStateTest.cpp b/logd/SerializedFlushToStateTest.cpp
index f4515c8..88f4052 100644
--- a/logd/SerializedFlushToStateTest.cpp
+++ b/logd/SerializedFlushToStateTest.cpp
@@ -287,4 +287,21 @@
     EXPECT_EQ(second_chunk->reader_ref_count(), 1U);
 
     EXPECT_FALSE(state.HasUnreadLogs());
-}
\ No newline at end of file
+}
+
+TEST(SerializedFlushToState, Prune) {
+    auto chunk = SerializedLogChunk{kChunkSize};
+    chunk.Log(1, log_time(), 0, 1, 1, "abc", 3);
+    chunk.Log(2, log_time(), 0, 1, 1, "abc", 3);
+    chunk.Log(3, log_time(), 0, 1, 1, "abc", 3);
+    chunk.FinishWriting();
+
+    std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX];
+    log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk));
+
+    auto state = SerializedFlushToState{1, kLogMaskAll};
+    state.InitializeLogs(log_chunks);
+    ASSERT_TRUE(state.HasUnreadLogs());
+
+    state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin());
+}
diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp
index 5012d3d..acd093b 100644
--- a/logd/SerializedLogBuffer.cpp
+++ b/logd/SerializedLogBuffer.cpp
@@ -211,7 +211,7 @@
     state.InitializeLogs(logs_);
 
     while (state.HasUnreadLogs()) {
-        MinHeapElement top = state.PopNextUnreadLog();
+        LogWithId top = state.PopNextUnreadLog();
         auto* entry = top.entry;
         auto log_id = top.log_id;
 
diff --git a/logd/SerializedLogChunk.h b/logd/SerializedLogChunk.h
index 0991eac..645433d 100644
--- a/logd/SerializedLogChunk.h
+++ b/logd/SerializedLogChunk.h
@@ -18,6 +18,8 @@
 
 #include <sys/types.h>
 
+#include <android-base/logging.h>
+
 #include "LogWriter.h"
 #include "SerializedData.h"
 #include "SerializedLogEntry.h"
@@ -55,6 +57,7 @@
     }
 
     const SerializedLogEntry* log_entry(int offset) const {
+        CHECK(writer_active_ || reader_ref_count_ > 0);
         return reinterpret_cast<const SerializedLogEntry*>(data() + offset);
     }
     const uint8_t* data() const { return contents_.data(); }