Merge changes I40fe9b79,I4b6f8331

* changes:
  logd: SerializedLogBuffer: never wait for a reader during prune/clear
  logd: always wake 'wrapped' readers on prune
diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp
index 65fedb0..5012d3d 100644
--- a/logd/SerializedLogBuffer.cpp
+++ b/logd/SerializedLogBuffer.cpp
@@ -75,6 +75,10 @@
         return -EINVAL;
     }
 
+    if (len > LOGGER_ENTRY_MAX_PAYLOAD) {
+        len = LOGGER_ENTRY_MAX_PAYLOAD;
+    }
+
     if (!ShouldLog(log_id, msg, len)) {
         stats_->AddTotal(log_id, len);
         return -EACCES;
@@ -135,33 +139,42 @@
     }
 }
 
-bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) {
-    // Don't prune logs that are newer than the point at which any reader threads are reading from.
-    LogReaderThread* oldest = nullptr;
+void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) {
     auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
-    for (const auto& reader_thread : reader_list_->reader_threads()) {
-        if (!reader_thread->IsWatching(log_id)) {
-            continue;
-        }
-        if (!oldest || oldest->start() > reader_thread->start() ||
-            (oldest->start() == reader_thread->start() &&
-             reader_thread->deadline().time_since_epoch().count() != 0)) {
-            oldest = reader_thread.get();
-        }
-    }
 
     auto& log_buffer = logs_[log_id];
     auto it = log_buffer.begin();
     while (it != log_buffer.end()) {
-        if (oldest != nullptr && it->highest_sequence_number() >= oldest->start()) {
-            break;
+        for (const auto& reader_thread : reader_list_->reader_threads()) {
+            if (!reader_thread->IsWatching(log_id)) {
+                continue;
+            }
+
+            if (reader_thread->deadline().time_since_epoch().count() != 0) {
+                // Always wake up wrapped readers when pruning.  'Wrapped' readers are an
+                // optimization that allows the reader to wait until logs starting at a specified
+                // time stamp are about to be pruned.  This is error-prone however, since if that
+                // timestamp is about to be pruned, the reader is not likely to read the messages
+                // fast enough to not back-up logd.  Instead, we can achieve an nearly-as-efficient
+                // but not error-prune batching effect by waking the reader whenever any chunk is
+                // about to be pruned.
+                reader_thread->triggerReader_Locked();
+            }
+
+            // Some readers may be still reading from this log chunk, log a warning that they are
+            // about to lose logs.
+            // TODO: We should forcefully disconnect the reader instead, such that the reader itself
+            // has an indication that they've lost logs.
+            if (reader_thread->start() <= it->highest_sequence_number()) {
+                LOG(WARNING) << "Skipping entries from slow reader, " << reader_thread->name()
+                             << ", from LogBuffer::Prune()";
+            }
         }
 
         // Increment ahead of time since we're going to erase this iterator from the list.
         auto it_to_prune = it++;
 
-        // The sequence number check ensures that all readers have read all logs in this chunk, but
-        // they may still hold a reference to the chunk to track their last read log_position.
+        // Readers may have a reference to the chunk to track their last read log_position.
         // Notify them to delete the reference.
         NotifyReadersOfPrune(log_id, it_to_prune);
 
@@ -176,42 +189,11 @@
             RemoveChunkFromStats(log_id, *it_to_prune);
             log_buffer.erase(it_to_prune);
             if (buffer_size >= bytes_to_free) {
-                return true;
+                return;
             }
             bytes_to_free -= buffer_size;
         }
     }
-
-    // If we've deleted all buffers without bytes_to_free hitting 0, then we're called by Clear()
-    // and should return true.
-    if (it == log_buffer.end()) {
-        return true;
-    }
-
-    // Otherwise we are stuck due to a reader, so mitigate it.
-    CHECK(oldest != nullptr);
-    KickReader(oldest, log_id, bytes_to_free);
-    return false;
-}
-
-// If the selected reader is blocking our pruning progress, decide on
-// what kind of mitigation is necessary to unblock the situation.
-void SerializedLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) {
-    if (bytes_to_free >= max_size_[id]) {  // +100%
-        // A misbehaving or slow reader is dropped if we hit too much memory pressure.
-        LOG(WARNING) << "Kicking blocked reader, " << reader->name()
-                     << ", from LogBuffer::kickMe()";
-        reader->release_Locked();
-    } else if (reader->deadline().time_since_epoch().count() != 0) {
-        // Allow a blocked WRAP deadline reader to trigger and start reporting the log data.
-        reader->triggerReader_Locked();
-    } else {
-        // Tell slow reader to skip entries to catch up.
-        unsigned long prune_rows = bytes_to_free / 300;
-        LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name()
-                     << ", from LogBuffer::kickMe()";
-        reader->triggerSkip_Locked(id, prune_rows);
-    }
 }
 
 std::unique_ptr<FlushToState> SerializedLogBuffer::CreateFlushToState(uint64_t start,
@@ -252,12 +234,18 @@
             }
         }
 
+        // We copy the log entry such that we can flush it without the lock.  We never block pruning
+        // waiting for this Flush() to complete.
+        constexpr size_t kMaxEntrySize = sizeof(*entry) + LOGGER_ENTRY_MAX_PAYLOAD + 1;
+        unsigned char entry_copy[kMaxEntrySize] __attribute__((uninitialized));
+        CHECK_LT(entry->msg_len(), LOGGER_ENTRY_MAX_PAYLOAD + 1);
+        memcpy(entry_copy, entry, sizeof(*entry) + entry->msg_len());
         lock.unlock();
-        // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the
-        // `entry` pointer is safe here without the lock
-        if (!entry->Flush(writer, log_id)) {
+
+        if (!reinterpret_cast<SerializedLogEntry*>(entry_copy)->Flush(writer, log_id)) {
             return false;
         }
+
         lock.lock();
     }
 
@@ -266,38 +254,11 @@
 }
 
 bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) {
-    // Try three times to clear, then disconnect the readers and try one final time.
-    for (int retry = 0; retry < 3; ++retry) {
-        {
-            auto lock = std::lock_guard{lock_};
-            bool prune_success = Prune(id, ULONG_MAX, uid);
-            if (prune_success) {
-                return true;
-            }
-        }
-        sleep(1);
-    }
-    // Check if it is still busy after the sleep, we try to prune one entry, not another clear run,
-    // so we are looking for the quick side effect of the return value to tell us if we have a
-    // _blocked_ reader.
-    bool busy = false;
-    {
-        auto lock = std::lock_guard{lock_};
-        busy = !Prune(id, 1, uid);
-    }
-    // It is still busy, disconnect all readers.
-    if (busy) {
-        auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
-        for (const auto& reader_thread : reader_list_->reader_threads()) {
-            if (reader_thread->IsWatching(id)) {
-                LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name()
-                             << ", from LogBuffer::clear()";
-                reader_thread->release_Locked();
-            }
-        }
-    }
     auto lock = std::lock_guard{lock_};
-    return Prune(id, ULONG_MAX, uid);
+    Prune(id, ULONG_MAX, uid);
+
+    // Clearing SerializedLogBuffer never waits for readers and therefore is always successful.
+    return true;
 }
 
 size_t SerializedLogBuffer::GetSizeUsed(log_id_t id) {
diff --git a/logd/SerializedLogBuffer.h b/logd/SerializedLogBuffer.h
index 32fbe5e..294cfe6 100644
--- a/logd/SerializedLogBuffer.h
+++ b/logd/SerializedLogBuffer.h
@@ -55,9 +55,7 @@
   private:
     bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
     void MaybePrune(log_id_t log_id) REQUIRES(lock_);
-    bool Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_);
-    void KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free)
-            REQUIRES_SHARED(lock_);
+    void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_);
     void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk)
             REQUIRES(reader_list_->reader_threads_lock());
     void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk);