logd: rename LogTimes -> LogReaderThread
LogTimes has evolved from being simply a store of the last timestamp
that each reader has read to being a class representing an individual
reader thread, including the thread function, so name it
appropriately.
Test: logging unit tests
Change-Id: I6914824376a6ff1f7509e657fa4dc044ead62954
diff --git a/logd/LogReaderThread.cpp b/logd/LogReaderThread.cpp
new file mode 100644
index 0000000..f1452a6
--- /dev/null
+++ b/logd/LogReaderThread.cpp
@@ -0,0 +1,252 @@
+/*
+ * Copyright (C) 2014 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LogReaderThread.h"
+
+#include <errno.h>
+#include <string.h>
+#include <sys/prctl.h>
+
+#include "LogBuffer.h"
+#include "LogReader.h"
+
+pthread_mutex_t LogReaderThread::timesLock = PTHREAD_MUTEX_INITIALIZER;
+
+LogReaderThread::LogReaderThread(LogReader& reader, SocketClient* client, bool non_block,
+ unsigned long tail, log_mask_t log_mask, pid_t pid,
+ log_time start_time, uint64_t start, uint64_t timeout,
+ bool privileged, bool can_read_security_logs)
+ : leadingDropped(false),
+ mReader(reader),
+ mLogMask(log_mask),
+ mPid(pid),
+ mCount(0),
+ mTail(tail),
+ mIndex(0),
+ mClient(client),
+ mStartTime(start_time),
+ mStart(start),
+ mNonBlock(non_block),
+ privileged_(privileged),
+ can_read_security_logs_(can_read_security_logs) {
+ mTimeout.tv_sec = timeout / NS_PER_SEC;
+ mTimeout.tv_nsec = timeout % NS_PER_SEC;
+ memset(mLastTid, 0, sizeof(mLastTid));
+ pthread_cond_init(&threadTriggeredCondition, nullptr);
+ cleanSkip_Locked();
+}
+
+bool LogReaderThread::startReader_Locked() {
+ pthread_attr_t attr;
+
+ if (!pthread_attr_init(&attr)) {
+ if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) {
+ if (!pthread_create(&mThread, &attr, LogReaderThread::threadStart, this)) {
+ pthread_attr_destroy(&attr);
+ return true;
+ }
+ }
+ pthread_attr_destroy(&attr);
+ }
+
+ return false;
+}
+
+void* LogReaderThread::threadStart(void* obj) {
+ prctl(PR_SET_NAME, "logd.reader.per");
+
+ LogReaderThread* me = reinterpret_cast<LogReaderThread*>(obj);
+
+ SocketClient* client = me->mClient;
+
+ LogBuffer& logbuf = me->mReader.logbuf();
+
+ me->leadingDropped = true;
+
+ wrlock();
+
+ uint64_t start = me->mStart;
+
+ while (!me->mRelease) {
+ if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) {
+ if (pthread_cond_clockwait(&me->threadTriggeredCondition, ×Lock, CLOCK_MONOTONIC,
+ &me->mTimeout) == ETIMEDOUT) {
+ me->mTimeout.tv_sec = 0;
+ me->mTimeout.tv_nsec = 0;
+ }
+ if (me->mRelease) {
+ break;
+ }
+ }
+
+ unlock();
+
+ if (me->mTail) {
+ logbuf.flushTo(client, start, nullptr, me->privileged_, me->can_read_security_logs_,
+ FilterFirstPass, me);
+ me->leadingDropped = true;
+ }
+ start = logbuf.flushTo(client, start, me->mLastTid, me->privileged_,
+ me->can_read_security_logs_, FilterSecondPass, me);
+
+ // We only ignore entries before the original start time for the first flushTo(), if we
+ // get entries after this first flush before the original start time, then the client
+ // wouldn't have seen them.
+ // Note: this is still racy and may skip out of order events that came in since the last
+ // time the client disconnected and then reconnected with the new start time. The long term
+ // solution here is that clients must request events since a specific sequence number.
+ me->mStartTime.tv_sec = 0;
+ me->mStartTime.tv_nsec = 0;
+
+ wrlock();
+
+ if (start == LogBufferElement::FLUSH_ERROR) {
+ break;
+ }
+
+ me->mStart = start + 1;
+
+ if (me->mNonBlock || me->mRelease) {
+ break;
+ }
+
+ me->cleanSkip_Locked();
+
+ if (!me->mTimeout.tv_sec && !me->mTimeout.tv_nsec) {
+ pthread_cond_wait(&me->threadTriggeredCondition, ×Lock);
+ }
+ }
+
+ LogReader& reader = me->mReader;
+ reader.release(client);
+
+ client->decRef();
+
+ LastLogTimes& times = reader.logbuf().mTimes;
+ auto it = std::find_if(times.begin(), times.end(),
+ [&me](const auto& other) { return other.get() == me; });
+
+ if (it != times.end()) {
+ times.erase(it);
+ }
+
+ unlock();
+
+ return nullptr;
+}
+
+// A first pass to count the number of elements
+int LogReaderThread::FilterFirstPass(const LogBufferElement* element, void* obj) {
+ LogReaderThread* me = reinterpret_cast<LogReaderThread*>(obj);
+
+ LogReaderThread::wrlock();
+
+ if (me->leadingDropped) {
+ if (element->getDropped()) {
+ LogReaderThread::unlock();
+ return false;
+ }
+ me->leadingDropped = false;
+ }
+
+ if (me->mCount == 0) {
+ me->mStart = element->getSequence();
+ }
+
+ if ((!me->mPid || me->mPid == element->getPid()) && me->isWatching(element->getLogId()) &&
+ (me->mStartTime == log_time::EPOCH || me->mStartTime <= element->getRealTime())) {
+ ++me->mCount;
+ }
+
+ LogReaderThread::unlock();
+
+ return false;
+}
+
+// A second pass to send the selected elements
+int LogReaderThread::FilterSecondPass(const LogBufferElement* element, void* obj) {
+ LogReaderThread* me = reinterpret_cast<LogReaderThread*>(obj);
+
+ LogReaderThread::wrlock();
+
+ me->mStart = element->getSequence();
+
+ if (me->skipAhead[element->getLogId()]) {
+ me->skipAhead[element->getLogId()]--;
+ goto skip;
+ }
+
+ if (me->leadingDropped) {
+ if (element->getDropped()) {
+ goto skip;
+ }
+ me->leadingDropped = false;
+ }
+
+ // Truncate to close race between first and second pass
+ if (me->mNonBlock && me->mTail && (me->mIndex >= me->mCount)) {
+ goto stop;
+ }
+
+ if (!me->isWatching(element->getLogId())) {
+ goto skip;
+ }
+
+ if (me->mPid && (me->mPid != element->getPid())) {
+ goto skip;
+ }
+
+ if (me->mStartTime != log_time::EPOCH && element->getRealTime() <= me->mStartTime) {
+ goto skip;
+ }
+
+ if (me->mRelease) {
+ goto stop;
+ }
+
+ if (!me->mTail) {
+ goto ok;
+ }
+
+ ++me->mIndex;
+
+ if ((me->mCount > me->mTail) && (me->mIndex <= (me->mCount - me->mTail))) {
+ goto skip;
+ }
+
+ if (!me->mNonBlock) {
+ me->mTail = 0;
+ }
+
+ok:
+ if (!me->skipAhead[element->getLogId()]) {
+ LogReaderThread::unlock();
+ return true;
+ }
+ // FALLTHRU
+
+skip:
+ LogReaderThread::unlock();
+ return false;
+
+stop:
+ LogReaderThread::unlock();
+ return -1;
+}
+
+void LogReaderThread::cleanSkip_Locked(void) {
+ memset(skipAhead, 0, sizeof(skipAhead));
+}