Implement metrics 'summarizers'

The summarizers collet similar records and bunch them to save space.
So a string of extractor operations that crack the same format audio
will be combined into a single record with a new "count=#" attribute.

Summarizers will sum appropriately. The current example is in the
Player summarizer -- it knows to sum the frame counts and times
(duration and playing) as part of the summarization.

Bug: 36736083
Test: lots of 'dumpsys -summary' manual examination
Change-Id: I325c2d2b25720d384bcc75b73c97e5b3d8fa9731
diff --git a/services/mediaanalytics/MediaAnalyticsService.cpp b/services/mediaanalytics/MediaAnalyticsService.cpp
index 35c1f5b..876c685 100644
--- a/services/mediaanalytics/MediaAnalyticsService.cpp
+++ b/services/mediaanalytics/MediaAnalyticsService.cpp
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2016 The Android Open Source Project
+ * Copyright (C) 2017 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
 #define LOG_TAG "MediaAnalyticsService"
 #include <utils/Log.h>
 
+#include <stdint.h>
 #include <inttypes.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -70,11 +71,28 @@
 
 #include "MediaAnalyticsService.h"
 
+#include "MetricsSummarizer.h"
+#include "MetricsSummarizerCodec.h"
+#include "MetricsSummarizerExtractor.h"
+#include "MetricsSummarizerPlayer.h"
+#include "MetricsSummarizerRecorder.h"
+
 
 namespace android {
 
 
-#define DEBUG_QUEUE     0
+
+// summarized records
+// up to 48 sets, each covering an hour -- at least 2 days of coverage
+// (will be longer if there are hours without any media action)
+static const nsecs_t kNewSetIntervalNs = 3600*(1000*1000*1000ll);
+static const int kMaxRecordSets = 48;
+// individual records kept in memory
+static const int kMaxRecords    = 100;
+
+
+static const char *kServiceName = "media.metrics";
+
 
 //using android::status_t;
 //using android::OK;
@@ -85,18 +103,67 @@
 
 void MediaAnalyticsService::instantiate() {
     defaultServiceManager()->addService(
-            String16("media.metrics"), new MediaAnalyticsService());
+            String16(kServiceName), new MediaAnalyticsService());
 }
 
-// XXX: add dynamic controls for mMaxRecords
+// handle sets of summarizers
+MediaAnalyticsService::SummarizerSet::SummarizerSet() {
+    mSummarizers = new List<MetricsSummarizer *>();
+}
+MediaAnalyticsService::SummarizerSet::~SummarizerSet() {
+    // empty the list
+    List<MetricsSummarizer *> *l = mSummarizers;
+    while (l->size() > 0) {
+        MetricsSummarizer *summarizer = *(l->begin());
+        l->erase(l->begin());
+        delete summarizer;
+    }
+}
+
+void MediaAnalyticsService::newSummarizerSet() {
+    ALOGD("MediaAnalyticsService::newSummarizerSet");
+    MediaAnalyticsService::SummarizerSet *set = new MediaAnalyticsService::SummarizerSet();
+    nsecs_t now = systemTime(SYSTEM_TIME_REALTIME);
+    set->setStarted(now);
+
+    set->appendSummarizer(new MetricsSummarizerExtractor("extractor"));
+    set->appendSummarizer(new MetricsSummarizerCodec("codec"));
+    set->appendSummarizer(new MetricsSummarizerPlayer("nuplayer"));
+    set->appendSummarizer(new MetricsSummarizerRecorder("recorder"));
+
+    // ALWAYS at the end, since it catches everything
+    set->appendSummarizer(new MetricsSummarizer(NULL));
+
+    // inject this set at the BACK of the list.
+    mSummarizerSets->push_back(set);
+    mCurrentSet = set;
+
+    // limit the # that we have
+    if (mMaxRecordSets > 0) {
+        List<SummarizerSet *> *l = mSummarizerSets;
+        while (l->size() > (size_t) mMaxRecordSets) {
+            ALOGD("Deleting oldest record set....");
+            MediaAnalyticsService::SummarizerSet *oset = *(l->begin());
+            l->erase(l->begin());
+            delete oset;
+            mSetsDiscarded++;
+        }
+    }
+}
+
 MediaAnalyticsService::MediaAnalyticsService()
-        : mMaxRecords(100) {
+        : mMaxRecords(kMaxRecords),
+          mMaxRecordSets(kMaxRecordSets),
+          mNewSetInterval(kNewSetIntervalNs) {
 
     ALOGD("MediaAnalyticsService created");
     // clear our queues
     mOpen = new List<MediaAnalyticsItem *>();
     mFinalized = new List<MediaAnalyticsItem *>();
 
+    mSummarizerSets = new List<MediaAnalyticsService::SummarizerSet *>();
+    newSummarizerSet();
+
     mItemsSubmitted = 0;
     mItemsFinalized = 0;
     mItemsDiscarded = 0;
@@ -109,7 +176,13 @@
 MediaAnalyticsService::~MediaAnalyticsService() {
         ALOGD("MediaAnalyticsService destroyed");
 
-    // XXX: clean out mOpen and mFinalized
+    // clean out mOpen and mFinalized
+    delete mOpen;
+    mOpen = NULL;
+    delete mFinalized;
+    mFinalized = NULL;
+
+    // XXX: clean out the summaries
 }
 
 
@@ -145,7 +218,7 @@
         case AID_MEDIA_EX:
         case AID_MEDIA_DRM:
             // trusted source, only override default values
-            isTrusted = true;
+                isTrusted = true;
             if (uid_given == (-1)) {
                 item->setUid(uid);
             }
@@ -197,10 +270,12 @@
                 oitem = NULL;
             } else {
                 oitem->setFinalized(true);
+                summarize(oitem);
                 saveItem(mFinalized, oitem, 0);
             }
             // new record could itself be marked finalized...
             if (finalizing) {
+                summarize(item);
                 saveItem(mFinalized, item, 0);
                 mItemsFinalized++;
             } else {
@@ -211,6 +286,7 @@
             // combine the records, send it to finalized if appropriate
             oitem->merge(item);
             if (finalizing) {
+                summarize(oitem);
                 saveItem(mFinalized, oitem, 0);
                 mItemsFinalized++;
             }
@@ -229,6 +305,7 @@
                 delete item;
                 item = NULL;
             } else {
+                summarize(item);
                 saveItem(mFinalized, item, 0);
                 mItemsFinalized++;
             }
@@ -239,26 +316,6 @@
     return id;
 }
 
-List<MediaAnalyticsItem *> *MediaAnalyticsService::getMediaAnalyticsItemList(bool finished, nsecs_t ts) {
-    // this might never get called; the binder interface maps to the full parm list
-    // on the client side before making the binder call.
-    // but this lets us be sure...
-    List<MediaAnalyticsItem*> *list;
-    list = getMediaAnalyticsItemList(finished, ts, MediaAnalyticsItem::kKeyAny);
-    return list;
-}
-
-List<MediaAnalyticsItem *> *MediaAnalyticsService::getMediaAnalyticsItemList(bool , nsecs_t , MediaAnalyticsItem::Key ) {
-
-    // XXX: implement the get-item-list semantics
-
-    List<MediaAnalyticsItem *> *list = NULL;
-    // set up our query on the persistent data
-    // slurp in all of the pieces
-    // return that
-    return list;
-}
-
 status_t MediaAnalyticsService::dump(int fd, const Vector<String16>& args)
 {
     const size_t SIZE = 512;
@@ -277,15 +334,21 @@
 
     // crack any parameters
     bool clear = false;
+    bool summary = false;
     nsecs_t ts_since = 0;
+    String16 summaryOption("-summary");
     String16 clearOption("-clear");
     String16 sinceOption("-since");
     String16 helpOption("-help");
+    String16 onlyOption("-only");
+    const char *only = NULL;
     int n = args.size();
     for (int i = 0; i < n; i++) {
         String8 myarg(args[i]);
         if (args[i] == clearOption) {
             clear = true;
+        } else if (args[i] == summaryOption) {
+            summary = true;
         } else if (args[i] == sinceOption) {
             i++;
             if (i < n) {
@@ -301,12 +364,27 @@
             }
             // command line is milliseconds; internal units are nano-seconds
             ts_since *= 1000*1000;
+        } else if (args[i] == onlyOption) {
+            i++;
+            if (i < n) {
+                String8 value(args[i]);
+                const char *p = value.string();
+                char *q = strdup(p);
+                if (q != NULL) {
+                    if (only != NULL) {
+                        free((void*)only);
+                    }
+                only = q;
+                }
+            }
         } else if (args[i] == helpOption) {
             result.append("Recognized parameters:\n");
             result.append("-help        this help message\n");
+            result.append("-summary     show summary info\n");
             result.append("-clear       clears out saved records\n");
-            result.append("-since XXX   include records since XXX\n");
-            result.append("             (XXX is milliseconds since the UNIX epoch)\n");
+            result.append("-only X      process records for component X\n");
+            result.append("-since X     include records since X\n");
+            result.append("             (X is milliseconds since the UNIX epoch)\n");
             write(fd, result.string(), result.size());
             return NO_ERROR;
         }
@@ -314,9 +392,42 @@
 
     Mutex::Autolock _l(mLock);
 
-    snprintf(buffer, SIZE, "Dump of the mediametrics process:\n");
+    // we ALWAYS dump this piece
+    snprintf(buffer, SIZE, "Dump of the %s process:\n", kServiceName);
     result.append(buffer);
 
+    dumpHeaders(result, ts_since);
+
+    // only want 1, to avoid confusing folks that parse the output
+    if (summary) {
+        dumpSummaries(result, ts_since, only);
+    } else {
+        dumpRecent(result, ts_since, only);
+    }
+
+
+    if (clear) {
+        // remove everything from the finalized queue
+        while (mFinalized->size() > 0) {
+            MediaAnalyticsItem * oitem = *(mFinalized->begin());
+            mFinalized->erase(mFinalized->begin());
+            delete oitem;
+            mItemsDiscarded++;
+        }
+
+        // shall we clear the summary data too?
+
+    }
+
+    write(fd, result.string(), result.size());
+    return NO_ERROR;
+}
+
+// dump headers
+void MediaAnalyticsService::dumpHeaders(String8 &result, nsecs_t ts_since) {
+    const size_t SIZE = 512;
+    char buffer[SIZE];
+
     int enabled = MediaAnalyticsItem::isEnabled();
     if (enabled) {
         snprintf(buffer, SIZE, "Metrics gathering: enabled\n");
@@ -331,50 +442,71 @@
         " Discarded: %" PRId64 "\n",
         mItemsSubmitted, mItemsFinalized, mItemsDiscarded);
     result.append(buffer);
+    snprintf(buffer, SIZE,
+        "Summary Sets Discarded: %" PRId64 "\n", mSetsDiscarded);
+    result.append(buffer);
     if (ts_since != 0) {
         snprintf(buffer, SIZE,
             "Dumping Queue entries more recent than: %" PRId64 "\n",
             (int64_t) ts_since);
         result.append(buffer);
     }
+}
+
+// dump summary info
+void MediaAnalyticsService::dumpSummaries(String8 &result, nsecs_t ts_since, const char *only) {
+    const size_t SIZE = 512;
+    char buffer[SIZE];
+    int slot = 0;
+
+    snprintf(buffer, SIZE, "\nSummarized Metrics:\n");
+    result.append(buffer);
+
+    // have each of the distillers dump records
+    if (mSummarizerSets != NULL) {
+        List<SummarizerSet *>::iterator itSet = mSummarizerSets->begin();
+        for (; itSet != mSummarizerSets->end(); itSet++) {
+            nsecs_t when = (*itSet)->getStarted();
+            if (when < ts_since) {
+                continue;
+            }
+            List<MetricsSummarizer *> *list = (*itSet)->getSummarizers();
+            List<MetricsSummarizer *>::iterator it = list->begin();
+            for (; it != list->end(); it++) {
+                if (only != NULL && strcmp(only, (*it)->getKey()) != 0) {
+                    ALOGV("Told to omit '%s'", (*it)->getKey());
+                }
+                AString distilled = (*it)->dumpSummary(slot, only);
+                result.append(distilled.c_str());
+            }
+        }
+    }
+}
+
+// the recent, detailed queues
+void MediaAnalyticsService::dumpRecent(String8 &result, nsecs_t ts_since, const char * only) {
+    const size_t SIZE = 512;
+    char buffer[SIZE];
 
     // show the recently recorded records
     snprintf(buffer, sizeof(buffer), "\nFinalized Metrics (oldest first):\n");
     result.append(buffer);
-    result.append(this->dumpQueue(mFinalized, ts_since));
+    result.append(this->dumpQueue(mFinalized, ts_since, only));
 
     snprintf(buffer, sizeof(buffer), "\nIn-Progress Metrics (newest first):\n");
     result.append(buffer);
-    result.append(this->dumpQueue(mOpen, ts_since));
+    result.append(this->dumpQueue(mOpen, ts_since, only));
 
     // show who is connected and injecting records?
     // talk about # records fed to the 'readers'
     // talk about # records we discarded, perhaps "discarded w/o reading" too
-
-    if (clear) {
-        // remove everything from the finalized queue
-        while (mFinalized->size() > 0) {
-            MediaAnalyticsItem * oitem = *(mFinalized->begin());
-            if (DEBUG_QUEUE) {
-                ALOGD("zap old record: key %s sessionID %" PRId64 " ts %" PRId64 "",
-                    oitem->getKey().c_str(), oitem->getSessionID(),
-                    oitem->getTimestamp());
-            }
-            mFinalized->erase(mFinalized->begin());
-            mItemsDiscarded++;
-        }
-    }
-
-    write(fd, result.string(), result.size());
-    return NO_ERROR;
 }
-
 // caller has locked mLock...
 String8 MediaAnalyticsService::dumpQueue(List<MediaAnalyticsItem *> *theList) {
-    return dumpQueue(theList, (nsecs_t) 0);
+    return dumpQueue(theList, (nsecs_t) 0, NULL);
 }
 
-String8 MediaAnalyticsService::dumpQueue(List<MediaAnalyticsItem *> *theList, nsecs_t ts_since) {
+String8 MediaAnalyticsService::dumpQueue(List<MediaAnalyticsItem *> *theList, nsecs_t ts_since, const char * only) {
     String8 result;
     int slot = 0;
 
@@ -387,6 +519,11 @@
             if (when < ts_since) {
                 continue;
             }
+            if (only != NULL &&
+                strcmp(only, (*it)->getKey().c_str()) != 0) {
+                ALOGV("Omit '%s', it's not '%s'", (*it)->getKey().c_str(), only);
+                continue;
+            }
             AString entry = (*it)->toString();
             result.appendFormat("%5d: %s\n", slot, entry.c_str());
             slot++;
@@ -405,13 +542,6 @@
 
     Mutex::Autolock _l(mLock);
 
-    if (DEBUG_QUEUE) {
-        ALOGD("Inject a record: session %" PRId64 " ts %" PRId64 "",
-            item->getSessionID(), item->getTimestamp());
-        String8 before = dumpQueue(l);
-        ALOGD("Q before insert: %s", before.string());
-    }
-
     // adding at back of queue (fifo order)
     if (front)  {
         l->push_front(item);
@@ -419,30 +549,15 @@
         l->push_back(item);
     }
 
-    if (DEBUG_QUEUE) {
-        String8 after = dumpQueue(l);
-        ALOGD("Q after insert: %s", after.string());
-    }
-
     // keep removing old records the front until we're in-bounds
     if (mMaxRecords > 0) {
         while (l->size() > (size_t) mMaxRecords) {
             MediaAnalyticsItem * oitem = *(l->begin());
-            if (DEBUG_QUEUE) {
-                ALOGD("zap old record: key %s sessionID %" PRId64 " ts %" PRId64 "",
-                    oitem->getKey().c_str(), oitem->getSessionID(),
-                    oitem->getTimestamp());
-            }
             l->erase(l->begin());
             delete oitem;
             mItemsDiscarded++;
         }
     }
-
-    if (DEBUG_QUEUE) {
-        String8 after = dumpQueue(l);
-        ALOGD("Q after cleanup: %s", after.string());
-    }
 }
 
 // are they alike enough that nitem can be folded into oitem?
@@ -515,29 +630,14 @@
 
     Mutex::Autolock _l(mLock);
 
-    if(DEBUG_QUEUE) {
-        String8 before = dumpQueue(l);
-        ALOGD("Q before delete: %s", before.string());
-    }
-
     for (List<MediaAnalyticsItem *>::iterator it = l->begin();
         it != l->end(); it++) {
         if ((*it)->getSessionID() != item->getSessionID())
             continue;
-
-        if (DEBUG_QUEUE) {
-            ALOGD(" --- removing record for SessionID %" PRId64 "", item->getSessionID());
-            ALOGD("drop record at %s:%d", __FILE__, __LINE__);
-        }
         delete *it;
         l->erase(it);
         break;
     }
-
-    if (DEBUG_QUEUE) {
-        String8 after = dumpQueue(l);
-        ALOGD("Q after delete: %s", after.string());
-    }
 }
 
 static AString allowedKeys[] =
@@ -579,5 +679,43 @@
     return false;
 }
 
+// insert into the appropriate summarizer.
+// we make our own copy to save/summarize
+void MediaAnalyticsService::summarize(MediaAnalyticsItem *item) {
+
+    ALOGV("MediaAnalyticsService::summarize()");
+
+    if (item == NULL) {
+        return;
+    }
+
+    nsecs_t now = systemTime(SYSTEM_TIME_REALTIME);
+    if (mCurrentSet == NULL
+        || (mCurrentSet->getStarted() + mNewSetInterval < now)) {
+        newSummarizerSet();
+    }
+
+    if (mCurrentSet == NULL) {
+        return;
+    }
+
+    List<MetricsSummarizer *> *summarizers = mCurrentSet->getSummarizers();
+    List<MetricsSummarizer *>::iterator it = summarizers->begin();
+    for (; it != summarizers->end(); it++) {
+        if ((*it)->isMine(*item)) {
+            break;
+        }
+    }
+    if (it == summarizers->end()) {
+        ALOGD("no handler for type %s", item->getKey().c_str());
+        return;               // no handler
+    }
+
+    // invoke the summarizer. summarizer will make whatever copies
+    // it wants; the caller retains ownership of item.
+
+    (*it)->handleRecord(item);
+
+}
 
 } // namespace android