Implement ES data process in Tuner default impl to support Sample TIS

This CL also provides a new VTS case to test ES input stream.
Please see the test.es https://drive.google.com/file/d/13ZDT9uhEO1LXDT2GcOhB91iIK6m_KEER/view?usp=sharing

ES Format(all the numbers are in decimal):
1. First line is a general meta data to describe the whole file

m:meta data size in bytes, l:ES frame line count X, V:video raw data size
int bytes, A:audio raw data size in bytes, pv:video pid, pa:audio pid

2. The following X lines(equals to the ES frame line count) are the size/pts information
of the video or audio ES frames. Starting with v means video, a means
audio. They are printed in the same order as how they presented in the
original ts.

v, Len:current ES frame size in bytes, PTS: current ES frame PTS

3. After the X lines of ES frame descriptions, there are the video ES raw
data connected with the audio ES raw data.

Test: atest VtsHalTvTunerV1_0TargetTest
Bug: 159027928
Change-Id: I56bd799fd6eda867df54d593235510a5e4758257
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 67eff1b..b122a05 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -294,6 +294,11 @@
     mFilters[filterId]->updateFilterOutput(data);
 }
 
+void Demux::updateMediaFilterOutput(uint16_t filterId, vector<uint8_t> data, uint64_t pts) {
+    updateFilterOutput(filterId, data);
+    mFilters[filterId]->updatePts(pts);
+}
+
 uint16_t Demux::getFilterTpid(uint32_t filterId) {
     return mFilters[filterId]->getTpid();
 }
@@ -322,6 +327,12 @@
             ALOGD("[Demux] wait for data ready on the playback FMQ");
             continue;
         }
+        if (mDvrPlayback->getSettings().playback().dataFormat == DataFormat::ES) {
+            if (!mDvrPlayback->processEsDataOnPlayback(true /*isVirtualFrontend*/, mIsRecording)) {
+                ALOGE("[Demux] playback es data failed to be filtered. Ending thread");
+                break;
+            }
+        }
         // Our current implementation filter the data and write it into the filter FMQ immediately
         // after the DATA_READY from the VTS/framework
         if (!mDvrPlayback->readPlaybackFMQ(true /*isVirtualFrontend*/, mIsRecording) ||
diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h
index 7f282b2..6e93ea3 100644
--- a/tv/tuner/1.0/default/Demux.h
+++ b/tv/tuner/1.0/default/Demux.h
@@ -87,6 +87,7 @@
     bool detachRecordFilter(int filterId);
     Result startFilterHandler(uint32_t filterId);
     void updateFilterOutput(uint16_t filterId, vector<uint8_t> data);
+    void updateMediaFilterOutput(uint16_t filterId, vector<uint8_t> data, uint64_t pts);
     uint16_t getFilterTpid(uint32_t filterId);
     void setIsRecording(bool isRecording);
     void startFrontendInputLoop();
diff --git a/tv/tuner/1.0/default/Dvr.cpp b/tv/tuner/1.0/default/Dvr.cpp
index 68e175c..bb3b087 100644
--- a/tv/tuner/1.0/default/Dvr.cpp
+++ b/tv/tuner/1.0/default/Dvr.cpp
@@ -129,7 +129,7 @@
 
     mDvrThreadRunning = false;
 
-    std::lock_guard<std::mutex> lock(mDvrThreadLock);
+    lock_guard<mutex> lock(mDvrThreadLock);
 
     mIsRecordStarted = false;
     mDemux->setIsRecording(false);
@@ -155,14 +155,13 @@
     ALOGV("%s", __FUNCTION__);
 
     // Create a synchronized FMQ that supports blocking read/write
-    std::unique_ptr<DvrMQ> tmpDvrMQ =
-            std::unique_ptr<DvrMQ>(new (std::nothrow) DvrMQ(mBufferSize, true));
+    unique_ptr<DvrMQ> tmpDvrMQ = unique_ptr<DvrMQ>(new (nothrow) DvrMQ(mBufferSize, true));
     if (!tmpDvrMQ->isValid()) {
         ALOGW("[Dvr] Failed to create FMQ of DVR");
         return false;
     }
 
-    mDvrMQ = std::move(tmpDvrMQ);
+    mDvrMQ = move(tmpDvrMQ);
 
     if (EventFlag::createEventFlag(mDvrMQ->getEventFlagWord(), &mDvrEventFlag) != OK) {
         return false;
@@ -183,7 +182,7 @@
 
 void Dvr::playbackThreadLoop() {
     ALOGD("[Dvr] playback threadLoop start.");
-    std::lock_guard<std::mutex> lock(mDvrThreadLock);
+    lock_guard<mutex> lock(mDvrThreadLock);
     mDvrThreadRunning = true;
 
     while (mDvrThreadRunning) {
@@ -195,6 +194,14 @@
             ALOGD("[Dvr] wait for data ready on the playback FMQ");
             continue;
         }
+
+        if (mDvrSettings.playback().dataFormat == DataFormat::ES) {
+            if (!processEsDataOnPlayback(false /*isVirtualFrontend*/, false /*isRecording*/)) {
+                ALOGE("[Dvr] playback es data failed to be filtered. Ending thread");
+                break;
+            }
+            maySendPlaybackStatusCallback();
+        }
         // Our current implementation filter the data and write it into the filter FMQ immediately
         // after the DATA_READY from the VTS/framework
         if (!readPlaybackFMQ(false /*isVirtualFrontend*/, false /*isRecording*/) ||
@@ -211,7 +218,7 @@
 }
 
 void Dvr::maySendPlaybackStatusCallback() {
-    std::lock_guard<std::mutex> lock(mPlaybackStatusLock);
+    lock_guard<mutex> lock(mPlaybackStatusLock);
     int availableToRead = mDvrMQ->availableToRead();
     int availableToWrite = mDvrMQ->availableToWrite();
 
@@ -263,8 +270,128 @@
     return true;
 }
 
+bool Dvr::processEsDataOnPlayback(bool isVirtualFrontend, bool isRecording) {
+    // Read ES from the DVR FMQ
+    // Note that currently we only provides ES with metaData in a specific format to be parsed.
+    // The ES size should be smaller than the Playback FMQ size to avoid reading truncated data.
+    int size = mDvrMQ->availableToRead();
+    vector<uint8_t> dataOutputBuffer;
+    dataOutputBuffer.resize(size);
+    if (!mDvrMQ->read(dataOutputBuffer.data(), size)) {
+        return false;
+    }
+
+    int metaDataSize = size;
+    int totalFrames = 0;
+    int videoEsDataSize = 0;
+    int audioEsDataSize = 0;
+    int audioPid = 0;
+    int videoPid = 0;
+
+    vector<MediaEsMetaData> esMeta;
+    int videoReadPointer = 0;
+    int audioReadPointer = 0;
+    int frameCount = 0;
+    // Get meta data from the es
+    for (int i = 0; i < metaDataSize; i++) {
+        switch (dataOutputBuffer[i]) {
+            case 'm':
+                metaDataSize = 0;
+                getMetaDataValue(i, dataOutputBuffer.data(), metaDataSize);
+                videoReadPointer = metaDataSize;
+                continue;
+            case 'l':
+                getMetaDataValue(i, dataOutputBuffer.data(), totalFrames);
+                esMeta.resize(totalFrames);
+                continue;
+            case 'V':
+                getMetaDataValue(i, dataOutputBuffer.data(), videoEsDataSize);
+                audioReadPointer = metaDataSize + videoEsDataSize;
+                continue;
+            case 'A':
+                getMetaDataValue(i, dataOutputBuffer.data(), audioEsDataSize);
+                continue;
+            case 'p':
+                if (dataOutputBuffer[++i] == 'a') {
+                    getMetaDataValue(i, dataOutputBuffer.data(), audioPid);
+                } else if (dataOutputBuffer[i] == 'v') {
+                    getMetaDataValue(i, dataOutputBuffer.data(), videoPid);
+                }
+                continue;
+            case 'v':
+            case 'a':
+                if (dataOutputBuffer[i + 1] != ',') {
+                    ALOGE("[Dvr] Invalid format meta data.");
+                    return false;
+                }
+                esMeta[frameCount] = {
+                        .isAudio = dataOutputBuffer[i] == 'a' ? true : false,
+                };
+                i += 5;  // Move to Len
+                getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].len);
+                if (esMeta[frameCount].isAudio) {
+                    esMeta[frameCount].startIndex = audioReadPointer;
+                    audioReadPointer += esMeta[frameCount].len;
+                } else {
+                    esMeta[frameCount].startIndex = videoReadPointer;
+                    videoReadPointer += esMeta[frameCount].len;
+                }
+                i += 4;  // move to PTS
+                getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].pts);
+                frameCount++;
+                continue;
+            default:
+                continue;
+        }
+    }
+
+    if (frameCount != totalFrames) {
+        ALOGE("[Dvr] Invalid meta data, frameCount=%d, totalFrames reported=%d", frameCount,
+              totalFrames);
+        return false;
+    }
+
+    if (metaDataSize + audioEsDataSize + videoEsDataSize != size) {
+        ALOGE("[Dvr] Invalid meta data, metaSize=%d, videoSize=%d, audioSize=%d, totolSize=%d",
+              metaDataSize, videoEsDataSize, audioEsDataSize, size);
+        return false;
+    }
+
+    // Read es raw data from the FMQ per meta data built previously
+    vector<uint8_t> frameData;
+    map<uint32_t, sp<IFilter>>::iterator it;
+    int pid = 0;
+    for (int i = 0; i < totalFrames; i++) {
+        frameData.resize(esMeta[i].len);
+        pid = esMeta[i].isAudio ? audioPid : videoPid;
+        memcpy(dataOutputBuffer.data() + esMeta[i].startIndex, frameData.data(), esMeta[i].len);
+        // Send to the media filter
+        if (isVirtualFrontend && isRecording) {
+            // TODO validate record
+            mDemux->sendFrontendInputToRecord(frameData);
+        } else {
+            for (it = mFilters.begin(); it != mFilters.end(); it++) {
+                if (pid == mDemux->getFilterTpid(it->first)) {
+                    mDemux->updateMediaFilterOutput(it->first, frameData,
+                                                    static_cast<uint64_t>(esMeta[i].pts));
+                    startFilterDispatcher(isVirtualFrontend, isRecording);
+                }
+            }
+        }
+    }
+
+    return true;
+}
+
+void Dvr::getMetaDataValue(int& index, uint8_t* dataOutputBuffer, int& value) {
+    index += 2;  // Move the pointer across the ":" to the value
+    while (dataOutputBuffer[index] != ',' && dataOutputBuffer[index] != '\n') {
+        value = ((dataOutputBuffer[index++] - 48) + value * 10);
+    }
+}
+
 void Dvr::startTpidFilter(vector<uint8_t> data) {
-    std::map<uint32_t, sp<IFilter>>::iterator it;
+    map<uint32_t, sp<IFilter>>::iterator it;
     for (it = mFilters.begin(); it != mFilters.end(); it++) {
         uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
         if (DEBUG_DVR) {
@@ -285,7 +412,7 @@
         }
     }
 
-    std::map<uint32_t, sp<IFilter>>::iterator it;
+    map<uint32_t, sp<IFilter>>::iterator it;
     // Handle the output data per filter type
     for (it = mFilters.begin(); it != mFilters.end(); it++) {
         if (mDemux->startFilterHandler(it->first) != Result::SUCCESS) {
@@ -296,8 +423,8 @@
     return true;
 }
 
-bool Dvr::writeRecordFMQ(const std::vector<uint8_t>& data) {
-    std::lock_guard<std::mutex> lock(mWriteLock);
+bool Dvr::writeRecordFMQ(const vector<uint8_t>& data) {
+    lock_guard<mutex> lock(mWriteLock);
     if (mRecordStatus == RecordStatus::OVERFLOW) {
         ALOGW("[Dvr] stops writing and wait for the client side flushing.");
         return true;
@@ -313,7 +440,7 @@
 }
 
 void Dvr::maySendRecordStatusCallback() {
-    std::lock_guard<std::mutex> lock(mRecordStatusLock);
+    lock_guard<mutex> lock(mRecordStatusLock);
     int availableToRead = mDvrMQ->availableToRead();
     int availableToWrite = mDvrMQ->availableToWrite();
 
diff --git a/tv/tuner/1.0/default/Dvr.h b/tv/tuner/1.0/default/Dvr.h
index a63a256..3069586 100644
--- a/tv/tuner/1.0/default/Dvr.h
+++ b/tv/tuner/1.0/default/Dvr.h
@@ -44,6 +44,13 @@
 
 using DvrMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>;
 
+struct MediaEsMetaData {
+    bool isAudio;
+    int startIndex;
+    int len;
+    int pts;
+};
+
 class Demux;
 class Filter;
 class Frontend;
@@ -84,8 +91,10 @@
     bool addPlaybackFilter(uint32_t filterId, sp<IFilter> filter);
     bool removePlaybackFilter(uint32_t filterId);
     bool readPlaybackFMQ(bool isVirtualFrontend, bool isRecording);
+    bool processEsDataOnPlayback(bool isVirtualFrontend, bool isRecording);
     bool startFilterDispatcher(bool isVirtualFrontend, bool isRecording);
     EventFlag* getDvrEventFlag();
+    DvrSettings getSettings() { return mDvrSettings; }
 
   private:
     // Demux service
@@ -98,6 +107,7 @@
 
     void deleteEventFlag();
     bool readDataFromMQ();
+    void getMetaDataValue(int& index, uint8_t* dataOutputBuffer, int& value);
     void maySendPlaybackStatusCallback();
     void maySendRecordStatusCallback();
     PlaybackStatus checkPlaybackStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
diff --git a/tv/tuner/1.0/default/Filter.cpp b/tv/tuner/1.0/default/Filter.cpp
index 30b19c0..ce748e5 100644
--- a/tv/tuner/1.0/default/Filter.cpp
+++ b/tv/tuner/1.0/default/Filter.cpp
@@ -317,6 +317,11 @@
     mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
 }
 
+void Filter::updatePts(uint64_t pts) {
+    std::lock_guard<std::mutex> lock(mFilterOutputLock);
+    mPts = pts;
+}
+
 void Filter::updateRecordOutput(vector<uint8_t> data) {
     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
     mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
@@ -460,6 +465,11 @@
     if (mFilterOutput.empty()) {
         return Result::SUCCESS;
     }
+
+    if (mPts) {
+        return createMediaFilterEventWithIon(mFilterOutput);
+    }
+
     for (int i = 0; i < mFilterOutput.size(); i += 188) {
         if (mPesSizeLeft == 0) {
             uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
@@ -493,46 +503,7 @@
             continue;
         }
 
-        int av_fd = createAvIonFd(mPesOutput.size());
-        if (av_fd == -1) {
-            return Result::UNKNOWN_ERROR;
-        }
-        // copy the filtered data to the buffer
-        uint8_t* avBuffer = getIonBuffer(av_fd, mPesOutput.size());
-        if (avBuffer == NULL) {
-            return Result::UNKNOWN_ERROR;
-        }
-        memcpy(avBuffer, mPesOutput.data(), mPesOutput.size() * sizeof(uint8_t));
-
-        native_handle_t* nativeHandle = createNativeHandle(av_fd);
-        if (nativeHandle == NULL) {
-            return Result::UNKNOWN_ERROR;
-        }
-        hidl_handle handle;
-        handle.setTo(nativeHandle, /*shouldOwn=*/true);
-
-        // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
-        uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
-        mDataId2Avfd[dataId] = dup(av_fd);
-
-        // Create mediaEvent and send callback
-        DemuxFilterMediaEvent mediaEvent;
-        mediaEvent = {
-                .avMemory = std::move(handle),
-                .dataLength = static_cast<uint32_t>(mPesOutput.size()),
-                .avDataId = dataId,
-        };
-        int size = mFilterEvent.events.size();
-        mFilterEvent.events.resize(size + 1);
-        mFilterEvent.events[size].media(mediaEvent);
-
-        // Clear and log
-        mPesOutput.clear();
-        mAvBufferCopyCount = 0;
-        ::close(av_fd);
-        if (DEBUG_FILTER) {
-            ALOGD("[Filter] assembled av data length %d", mediaEvent.dataLength);
-        }
+        createMediaFilterEventWithIon(mPesOutput);
     }
 
     mFilterOutput.clear();
@@ -540,6 +511,54 @@
     return Result::SUCCESS;
 }
 
+Result Filter::createMediaFilterEventWithIon(vector<uint8_t> output) {
+    int av_fd = createAvIonFd(output.size());
+    if (av_fd == -1) {
+        return Result::UNKNOWN_ERROR;
+    }
+    // copy the filtered data to the buffer
+    uint8_t* avBuffer = getIonBuffer(av_fd, output.size());
+    if (avBuffer == NULL) {
+        return Result::UNKNOWN_ERROR;
+    }
+    memcpy(avBuffer, output.data(), output.size() * sizeof(uint8_t));
+
+    native_handle_t* nativeHandle = createNativeHandle(av_fd);
+    if (nativeHandle == NULL) {
+        return Result::UNKNOWN_ERROR;
+    }
+    hidl_handle handle;
+    handle.setTo(nativeHandle, /*shouldOwn=*/true);
+
+    // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
+    uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
+    mDataId2Avfd[dataId] = dup(av_fd);
+
+    // Create mediaEvent and send callback
+    DemuxFilterMediaEvent mediaEvent;
+    mediaEvent = {
+            .avMemory = std::move(handle),
+            .dataLength = static_cast<uint32_t>(output.size()),
+            .avDataId = dataId,
+    };
+    if (mPts) {
+        mediaEvent.pts = mPts;
+        mPts = 0;
+    }
+    int size = mFilterEvent.events.size();
+    mFilterEvent.events.resize(size + 1);
+    mFilterEvent.events[size].media(mediaEvent);
+
+    // Clear and log
+    output.clear();
+    mAvBufferCopyCount = 0;
+    ::close(av_fd);
+    if (DEBUG_FILTER) {
+        ALOGD("[Filter] av data length %d", mediaEvent.dataLength);
+    }
+    return Result::SUCCESS;
+}
+
 Result Filter::startRecordFilterHandler() {
     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
     if (mRecordFilterOutput.empty()) {
diff --git a/tv/tuner/1.0/default/Filter.h b/tv/tuner/1.0/default/Filter.h
index 9386dca..9b18a66 100644
--- a/tv/tuner/1.0/default/Filter.h
+++ b/tv/tuner/1.0/default/Filter.h
@@ -84,6 +84,7 @@
     uint16_t getTpid();
     void updateFilterOutput(vector<uint8_t> data);
     void updateRecordOutput(vector<uint8_t> data);
+    void updatePts(uint64_t pts);
     Result startFilterHandler();
     Result startRecordFilterHandler();
     void attachFilterToRecord(const sp<Dvr> dvr);
@@ -116,6 +117,7 @@
     bool mIsDataSourceDemux = true;
     vector<uint8_t> mFilterOutput;
     vector<uint8_t> mRecordFilterOutput;
+    uint64_t mPts = 0;
     unique_ptr<FilterMQ> mFilterMQ;
     bool mIsUsingFMQ = false;
     EventFlag* mFilterEventFlag;
@@ -172,6 +174,7 @@
     int createAvIonFd(int size);
     uint8_t* getIonBuffer(int fd, int size);
     native_handle_t* createNativeHandle(int fd);
+    Result createMediaFilterEventWithIon(vector<uint8_t> output);
 
     /**
      * Lock to protect writes to the FMQs