Adding a TS filter functionality into the Demux default impl

Test: atest
Bug: 135709325
Change-Id: I149104fd4c7d1ce413036b147365a49973455e72
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 889e42e..080116c 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -100,6 +100,8 @@
         mFilterEventFlags.resize(filterId + 1);
         mFilterThreadRunning.resize(filterId + 1);
         mFilterThreads.resize(filterId + 1);
+        mFilterPids.resize(filterId + 1);
+        mFilterOutputs.resize(filterId + 1);
     }
 
     mUsedFilterIds.insert(filterId);
@@ -142,10 +144,34 @@
     return Void();
 }
 
-Return<Result> Demux::configureFilter(uint32_t /* filterId */,
-                                      const DemuxFilterSettings& /* settings */) {
+Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
     ALOGV("%s", __FUNCTION__);
 
+    switch (mFilterEvents[filterId].filterType) {
+        case DemuxFilterType::SECTION:
+            mFilterPids[filterId] = settings.section().tpid;
+            break;
+        case DemuxFilterType::PES:
+            mFilterPids[filterId] = settings.pesData().tpid;
+            break;
+        case DemuxFilterType::TS:
+            mFilterPids[filterId] = settings.ts().tpid;
+            break;
+        case DemuxFilterType::AUDIO:
+            mFilterPids[filterId] = settings.audio().tpid;
+            break;
+        case DemuxFilterType::VIDEO:
+            mFilterPids[filterId] = settings.video().tpid;
+            break;
+        case DemuxFilterType::RECORD:
+            mFilterPids[filterId] = settings.record().tpid;
+            break;
+        case DemuxFilterType::PCR:
+            mFilterPids[filterId] = settings.pcr().tpid;
+            break;
+        default:
+            return Result::UNKNOWN_ERROR;
+    }
     return Result::SUCCESS;
 }
 
@@ -158,36 +184,16 @@
         return Result::INVALID_ARGUMENT;
     }
 
-    switch (mFilterEvents[filterId].filterType) {
-        case DemuxFilterType::SECTION:
-            result = startFilterLoop(filterId);
-            break;
-        case DemuxFilterType::PES:
-            result = startPesFilterHandler(filterId);
-            break;
-        case DemuxFilterType::TS:
-            result = startTsFilterHandler();
-            return Result::SUCCESS;
-        case DemuxFilterType::AUDIO:
-        case DemuxFilterType::VIDEO:
-            result = startMediaFilterHandler(filterId);
-            break;
-        case DemuxFilterType::RECORD:
-            result = startRecordFilterHandler(filterId);
-            break;
-        case DemuxFilterType::PCR:
-            result = startPcrFilterHandler();
-            return Result::SUCCESS;
-        default:
-            return Result::UNKNOWN_ERROR;
-    }
+    result = startFilterLoop(filterId);
 
     return result;
 }
 
-Return<Result> Demux::stopFilter(uint32_t /* filterId */) {
+Return<Result> Demux::stopFilter(uint32_t filterId) {
     ALOGV("%s", __FUNCTION__);
 
+    mFilterThreadRunning[filterId] = false;
+
     return Result::SUCCESS;
 }
 
@@ -238,6 +244,8 @@
     mFilterMQs.clear();
     mFilterEvents.clear();
     mFilterEventFlags.clear();
+    mFilterOutputs.clear();
+    mFilterPids.clear();
     mLastUsedFilterId = -1;
 
     return Result::SUCCESS;
@@ -277,19 +285,21 @@
     return Void();
 }
 
-Return<Result> Demux::configureOutput(const DemuxOutputSettings& /* settings */) {
+Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
+    ALOGV("%s", __FUNCTION__);
+
+    mOutputConfigured = true;
+    mOutputSettings = settings;
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
     ALOGV("%s", __FUNCTION__);
 
     return Result::SUCCESS;
 }
 
-Return<Result> Demux::attachOutputTsFilter(uint32_t /*filterId*/) {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::detachOutputTsFilter(uint32_t /* filterId */) {
+Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
     ALOGV("%s", __FUNCTION__);
 
     return Result::SUCCESS;
@@ -353,15 +363,26 @@
     return Void();
 }
 
-Return<Result> Demux::configureInput(const DemuxInputSettings& /* settings */) {
+Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
     ALOGV("%s", __FUNCTION__);
 
+    mInputConfigured = true;
+    mInputSettings = settings;
+
     return Result::SUCCESS;
 }
 
 Return<Result> Demux::startInput() {
     ALOGV("%s", __FUNCTION__);
 
+    if (!mInputCallback) {
+        return Result::NOT_INITIALIZED;
+    }
+
+    if (!mInputConfigured) {
+        return Result::INVALID_STATE;
+    }
+
     pthread_create(&mInputThread, NULL, __threadLoopInput, this);
     pthread_setname_np(mInputThread, "demux_input_waiting_loop");
 
@@ -373,6 +394,8 @@
 Return<Result> Demux::stopInput() {
     ALOGV("%s", __FUNCTION__);
 
+    mInputThreadRunning = false;
+
     return Result::SUCCESS;
 }
 
@@ -403,36 +426,43 @@
     return Result::SUCCESS;
 }
 
-Result Demux::startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data) {
-    if (!writeSectionsAndCreateEvent(filterId, data)) {
+Result Demux::startSectionFilterHandler(uint32_t filterId) {
+    if (mFilterOutputs[filterId].empty()) {
+        return Result::SUCCESS;
+    }
+    if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
         ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
         return Result::UNKNOWN_ERROR;
     }
 
+    mFilterOutputs[filterId].clear();
+
     return Result::SUCCESS;
 }
 
 Result Demux::startPesFilterHandler(uint32_t filterId) {
-    // TODO generate multiple events in one event callback
+    std::lock_guard<std::mutex> lock(mFilterEventLock);
     DemuxFilterPesEvent pesEvent;
+    if (mFilterOutputs[filterId].empty()) {
+        return Result::SUCCESS;
+    }
+
+    // TODO extract PES from TS
+    if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) {
+        mFilterOutputs[filterId].clear();
+        return Result::INVALID_STATE;
+    }
     pesEvent = {
             // temp dump meta data
             .streamId = 0,
-            .dataLength = 530,
+            .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()),
     };
-    mFilterEvents[filterId].events.resize(1);
-    mFilterEvents[filterId].events[0].pes(pesEvent);
-    /*pthread_create(&mThreadId, NULL, __threadLoop, this);
-    pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
-    if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) {
-        return Result::INVALID_STATE;
-    }
+    int size = mFilterEvents[filterId].events.size();
+    mFilterEvents[filterId].events.resize(size + 1);
+    mFilterEvents[filterId].events[size].pes(pesEvent);
 
-    if (mDemuxCallbacks[filterId] == nullptr) {
-        return Result::NOT_INITIALIZED;
-    }
+    mFilterOutputs[filterId].clear();
 
-    mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
     return Result::SUCCESS;
 }
 
@@ -499,18 +529,18 @@
 bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
     // TODO check how many sections has been read
     std::lock_guard<std::mutex> lock(mFilterEventLock);
-    int size = mFilterEvents[filterId].events.size();
-    mFilterEvents[filterId].events.resize(size + 1);
     if (!writeDataToFilterMQ(data, filterId)) {
         return false;
     }
+    int size = mFilterEvents[filterId].events.size();
+    mFilterEvents[filterId].events.resize(size + 1);
     DemuxFilterSectionEvent secEvent;
     secEvent = {
             // temp dump meta data
             .tableId = 0,
             .version = 1,
             .sectionNum = 1,
-            .dataLength = 530,
+            .dataLength = static_cast<uint16_t>(data.size()),
     };
     mFilterEvents[filterId].events[size].section(secEvent);
     return true;
@@ -525,20 +555,32 @@
 }
 
 bool Demux::filterAndOutputData() {
-    ALOGD("[Demux] start to dispatch data to filters");
+    Result result;
+    set<uint32_t>::iterator it;
+
     // Read input data from the input FMQ
     int size = mInputMQ->availableToRead();
+    int inputPacketSize = mInputSettings.packetSize;
     vector<uint8_t> dataOutputBuffer;
-    dataOutputBuffer.resize(size);
-    mInputMQ->read(dataOutputBuffer.data(), size);
+    dataOutputBuffer.resize(inputPacketSize);
 
-    Result result;
-    // Filter the data and feed the output to each filter
-    set<uint32_t>::iterator it;
+    // Dispatch the packet to the PID matching filter output buffer
+    for (int i = 0; i < size / inputPacketSize; i++) {
+        mInputMQ->read(dataOutputBuffer.data(), inputPacketSize);
+        for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
+            uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff));
+            if (pid == mFilterPids[*it]) {
+                mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(),
+                                           dataOutputBuffer.end());
+            }
+        }
+    }
+
+    // Handle the output data per filter type
     for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
         switch (mFilterEvents[*it].filterType) {
             case DemuxFilterType::SECTION:
-                result = startSectionFilterHandler(*it, dataOutputBuffer);
+                result = startSectionFilterHandler(*it);
                 break;
             case DemuxFilterType::PES:
                 result = startPesFilterHandler(*it);
@@ -657,12 +699,42 @@
             ALOGD("[Demux] input data failed to be filtered. Ending thread");
             break;
         }
+
+        maySendInputStatusCallback();
     }
 
     mInputThreadRunning = false;
     ALOGD("[Demux] input thread ended.");
 }
 
+void Demux::maySendInputStatusCallback() {
+    std::lock_guard<std::mutex> lock(mInputStatusLock);
+    int availableToRead = mInputMQ->availableToRead();
+    int availableToWrite = mInputMQ->availableToWrite();
+
+    DemuxInputStatus newStatus =
+            checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
+                              mInputSettings.lowThreshold);
+    if (mIntputStatus != newStatus) {
+        mInputCallback->onInputStatus(newStatus);
+        mIntputStatus = newStatus;
+    }
+}
+
+DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
+                                          uint32_t highThreshold, uint32_t lowThreshold) {
+    if (availableToWrite == 0) {
+        return DemuxInputStatus::SPACE_FULL;
+    } else if (availableToRead > highThreshold) {
+        return DemuxInputStatus::SPACE_ALMOST_FULL;
+    } else if (availableToRead < lowThreshold) {
+        return DemuxInputStatus::SPACE_ALMOST_EMPTY;
+    } else if (availableToRead == 0) {
+        return DemuxInputStatus::SPACE_EMPTY;
+    }
+    return mIntputStatus;
+}
+
 }  // namespace implementation
 }  // namespace V1_0
 }  // namespace tuner
diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h
index 2fdde8d..eaf6ac3 100644
--- a/tv/tuner/1.0/default/Demux.h
+++ b/tv/tuner/1.0/default/Demux.h
@@ -91,9 +91,9 @@
 
     virtual Return<Result> configureOutput(const DemuxOutputSettings& settings) override;
 
-    virtual Return<Result> attachOutputTsFilter(uint32_t filterId) override;
+    virtual Return<Result> attachOutputFilter(uint32_t filterId) override;
 
-    virtual Return<Result> detachOutputTsFilter(uint32_t filterId) override;
+    virtual Return<Result> detachOutputFilter(uint32_t filterId) override;
 
     virtual Return<Result> startOutput() override;
 
@@ -115,7 +115,7 @@
      * They are also responsible to write the filtered output into the filter FMQ
      * and update the filterEvent bound with the same filterId.
      */
-    Result startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data);
+    Result startSectionFilterHandler(uint32_t filterId);
     Result startPesFilterHandler(uint32_t filterId);
     Result startTsFilterHandler();
     Result startMediaFilterHandler(uint32_t filterId);
@@ -136,6 +136,9 @@
     bool writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId);
     bool readDataFromMQ();
     bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
+    void maySendInputStatusCallback();
+    DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
+                                       uint32_t highThreshold, uint32_t lowThreshold);
     /**
      * A dispatcher to read and dispatch input data to all the started filters.
      * Each filter handler handles the data filtering/output writing/filterEvent updating.
@@ -169,6 +172,8 @@
      * A list of created FilterMQ ptrs.
      * The array number is the filter ID.
      */
+    vector<uint16_t> mFilterPids;
+    vector<vector<uint8_t>> mFilterOutputs;
     vector<unique_ptr<FilterMQ>> mFilterMQs;
     vector<EventFlag*> mFilterEventFlags;
     vector<DemuxFilterEvent> mFilterEvents;
@@ -182,10 +187,18 @@
     vector<sp<IDemuxCallback>> mDemuxCallbacks;
     sp<IDemuxCallback> mInputCallback;
     sp<IDemuxCallback> mOutputCallback;
+    bool mInputConfigured = false;
+    bool mOutputConfigured = false;
+    DemuxInputSettings mInputSettings;
+    DemuxOutputSettings mOutputSettings;
+
     // Thread handlers
     pthread_t mInputThread;
     pthread_t mOutputThread;
     vector<pthread_t> mFilterThreads;
+
+    // FMQ status local records
+    DemuxInputStatus mIntputStatus;
     /**
      * If a specific filter's writing loop is still running
      */
@@ -198,8 +211,13 @@
     /**
      * Lock to protect writes to the filter event
      */
+    // TODO make each filter separate event lock
     std::mutex mFilterEventLock;
     /**
+     * Lock to protect writes to the input status
+     */
+    std::mutex mInputStatusLock;
+    /**
      * How many times a filter should write
      * TODO make this dynamic/random/can take as a parameter
      */
diff --git a/tv/tuner/1.0/default/Frontend.cpp b/tv/tuner/1.0/default/Frontend.cpp
index 0609d05..6f87d1b 100644
--- a/tv/tuner/1.0/default/Frontend.cpp
+++ b/tv/tuner/1.0/default/Frontend.cpp
@@ -105,13 +105,7 @@
     return Result::SUCCESS;
 }
 
-Return<Result> Frontend::setLnb(const sp<ILnb>& /* lnb */) {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Frontend::sendDiseqcMessage(const hidl_vec<uint8_t>& /* diseqcMessage */) {
+Return<Result> Frontend::setLnb(uint32_t /* lnb */) {
     ALOGV("%s", __FUNCTION__);
 
     return Result::SUCCESS;
diff --git a/tv/tuner/1.0/default/Frontend.h b/tv/tuner/1.0/default/Frontend.h
index fc586b5..f881e8b 100644
--- a/tv/tuner/1.0/default/Frontend.h
+++ b/tv/tuner/1.0/default/Frontend.h
@@ -56,11 +56,9 @@
     virtual Return<void> getStatus(const hidl_vec<FrontendStatusType>& statusTypes,
                                    getStatus_cb _hidl_cb) override;
 
-    virtual Return<Result> sendDiseqcMessage(const hidl_vec<uint8_t>& diseqcMessage) override;
-
     virtual Return<Result> setLna(bool bEnable) override;
 
-    virtual Return<Result> setLnb(const sp<ILnb>& lnb) override;
+    virtual Return<Result> setLnb(uint32_t lnb) override;
 
     FrontendType getFrontendType();
 
diff --git a/tv/tuner/1.0/default/Lnb.cpp b/tv/tuner/1.0/default/Lnb.cpp
index b81bb15..1446f7f 100644
--- a/tv/tuner/1.0/default/Lnb.cpp
+++ b/tv/tuner/1.0/default/Lnb.cpp
@@ -48,6 +48,12 @@
     return Result::SUCCESS;
 }
 
+Return<Result> Lnb::sendDiseqcMessage(const hidl_vec<uint8_t>& /* diseqcMessage */) {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
 Return<Result> Lnb::close() {
     ALOGV("%s", __FUNCTION__);
 
diff --git a/tv/tuner/1.0/default/Lnb.h b/tv/tuner/1.0/default/Lnb.h
index df7e0fe..4c251f7 100644
--- a/tv/tuner/1.0/default/Lnb.h
+++ b/tv/tuner/1.0/default/Lnb.h
@@ -38,12 +38,14 @@
   public:
     Lnb();
 
-    virtual Return<Result> setVoltage(FrontendLnbVoltage voltage);
+    virtual Return<Result> setVoltage(FrontendLnbVoltage voltage) override;
 
     virtual Return<Result> setTone(FrontendLnbTone tone) override;
 
     virtual Return<Result> setSatellitePosition(FrontendLnbPosition position) override;
 
+    virtual Return<Result> sendDiseqcMessage(const hidl_vec<uint8_t>& diseqcMessage) override;
+
     virtual Return<Result> close() override;
 
   private:
diff --git a/tv/tuner/1.0/default/Tuner.cpp b/tv/tuner/1.0/default/Tuner.cpp
index 00831ae..a4a8ca5 100644
--- a/tv/tuner/1.0/default/Tuner.cpp
+++ b/tv/tuner/1.0/default/Tuner.cpp
@@ -87,6 +87,15 @@
     return Void();
 }
 
+Return<void> Tuner::getDemuxCaps(getDemuxCaps_cb _hidl_cb) {
+    ALOGV("%s", __FUNCTION__);
+
+    DemuxCapabilities caps;
+
+    _hidl_cb(Result::SUCCESS, caps);
+    return Void();
+}
+
 Return<void> Tuner::openDescrambler(openDescrambler_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
diff --git a/tv/tuner/1.0/default/Tuner.h b/tv/tuner/1.0/default/Tuner.h
index 62227ee..c089864 100644
--- a/tv/tuner/1.0/default/Tuner.h
+++ b/tv/tuner/1.0/default/Tuner.h
@@ -39,6 +39,8 @@
 
     virtual Return<void> openDemux(openDemux_cb _hidl_cb) override;
 
+    virtual Return<void> getDemuxCaps(getDemuxCaps_cb _hidl_cb) override;
+
     virtual Return<void> openDescrambler(openDescrambler_cb _hidl_cb) override;
 
     virtual Return<void> getFrontendInfo(FrontendId frontendId,