Refactor Tuner HAL Default Impl for Filter and Dvr separation

Test: manual
Bug: 135709325
Change-Id: I130f555315683fa02272f40d1e6209c5695c884a
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 8bb79f9..c5921f7 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -28,45 +28,6 @@
 
 #define WAIT_TIMEOUT 3000000000
 
-const std::vector<uint8_t> fakeDataInputBuffer{
-        0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1e, 0xdb,
-        0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0f, 0x03,
-        0xc5, 0x8b, 0xb8, 0x00, 0x00, 0x00, 0x01, 0x68, 0xca, 0x8c, 0xb2, 0x00, 0x00, 0x01, 0x06,
-        0x05, 0xff, 0xff, 0x70, 0xdc, 0x45, 0xe9, 0xbd, 0xe6, 0xd9, 0x48, 0xb7, 0x96, 0x2c, 0xd8,
-        0x20, 0xd9, 0x23, 0xee, 0xef, 0x78, 0x32, 0x36, 0x34, 0x20, 0x2d, 0x20, 0x63, 0x6f, 0x72,
-        0x65, 0x20, 0x31, 0x34, 0x32, 0x20, 0x2d, 0x20, 0x48, 0x2e, 0x32, 0x36, 0x34, 0x2f, 0x4d,
-        0x50, 0x45, 0x47, 0x2d, 0x34, 0x20, 0x41, 0x56, 0x43, 0x20, 0x63, 0x6f, 0x64, 0x65, 0x63,
-        0x20, 0x2d, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x6c, 0x65, 0x66, 0x74, 0x20, 0x32, 0x30, 0x30,
-        0x33, 0x2d, 0x32, 0x30, 0x31, 0x34, 0x20, 0x2d, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3a, 0x2f,
-        0x2f, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6c, 0x61, 0x6e, 0x2e, 0x6f,
-        0x72, 0x67, 0x2f, 0x78, 0x32, 0x36, 0x34, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x20, 0x2d, 0x20,
-        0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x3a, 0x20, 0x63, 0x61, 0x62, 0x61, 0x63, 0x3d,
-        0x30, 0x20, 0x72, 0x65, 0x66, 0x3d, 0x32, 0x20, 0x64, 0x65, 0x62, 0x6c, 0x6f, 0x63, 0x6b,
-        0x3d, 0x31, 0x3a, 0x30, 0x3a, 0x30, 0x20, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x73, 0x65, 0x3d,
-        0x30, 0x78, 0x31, 0x3a, 0x30, 0x78, 0x31, 0x31, 0x31, 0x20, 0x6d, 0x65, 0x3d, 0x68, 0x65,
-        0x78, 0x20, 0x73, 0x75, 0x62, 0x6d, 0x65, 0x3d, 0x37, 0x20, 0x70, 0x73, 0x79, 0x3d, 0x31,
-        0x20, 0x70, 0x73, 0x79, 0x5f, 0x72, 0x64, 0x3d, 0x31, 0x2e, 0x30, 0x30, 0x3a, 0x30, 0x2e,
-        0x30, 0x30, 0x20, 0x6d, 0x69, 0x78, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x3d, 0x31, 0x20,
-        0x6d, 0x65, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x3d, 0x31, 0x36, 0x20, 0x63, 0x68, 0x72,
-        0x6f, 0x6d, 0x61, 0x5f, 0x6d, 0x65, 0x3d, 0x31, 0x20, 0x74, 0x72, 0x65, 0x6c, 0x6c, 0x69,
-        0x73, 0x3d, 0x31, 0x20, 0x38, 0x78, 0x38, 0x64, 0x63, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x71,
-        0x6d, 0x3d, 0x30, 0x20, 0x64, 0x65, 0x61, 0x64, 0x7a, 0x6f, 0x6e, 0x65, 0x3d, 0x32, 0x31,
-        0x2c, 0x31, 0x31, 0x20, 0x66, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x73, 0x6b, 0x69, 0x70, 0x3d,
-        0x31, 0x20, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x5f, 0x71, 0x70, 0x5f, 0x6f, 0x66, 0x66,
-        0x73, 0x65, 0x74, 0x3d, 0x2d, 0x32, 0x20, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d,
-        0x36, 0x30, 0x20, 0x6c, 0x6f, 0x6f, 0x6b, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x74, 0x68,
-        0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x35, 0x20, 0x73, 0x6c, 0x69, 0x63, 0x65, 0x64, 0x5f,
-        0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x30, 0x20, 0x6e, 0x72, 0x3d, 0x30, 0x20,
-        0x64, 0x65, 0x63, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x3d, 0x31, 0x20, 0x69, 0x6e, 0x74, 0x65,
-        0x72, 0x6c, 0x61, 0x63, 0x65, 0x64, 0x3d, 0x30, 0x20, 0x62, 0x6c, 0x75, 0x72, 0x61, 0x79,
-        0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x6f, 0x6e, 0x73, 0x74,
-        0x72, 0x61, 0x69, 0x6e, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x74, 0x72, 0x61, 0x3d, 0x30, 0x20,
-        0x62, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x3d, 0x30, 0x20, 0x77, 0x65, 0x69, 0x67, 0x68,
-        0x74, 0x70, 0x3d, 0x30, 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x3d, 0x32, 0x35, 0x30,
-        0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x5f, 0x6d, 0x69, 0x6e, 0x3d, 0x32, 0x35, 0x20,
-        0x73, 0x63, 0x65, 0x6e, 0x65,
-};
-
 Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
     mDemuxId = demuxId;
     mTunerService = tuner;
@@ -93,8 +54,8 @@
     return startBroadcastInputLoop();
 }
 
-Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
-                              const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
+Return<void> Demux::openFilter(const DemuxFilterType& type, uint32_t bufferSize,
+                               const sp<IFilterCallback>& cb, openFilter_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
     uint32_t filterId;
@@ -105,137 +66,39 @@
         mUnusedFilterIds.erase(filterId);
     } else {
         filterId = ++mLastUsedFilterId;
-
-        mFilterCallbacks.resize(filterId + 1);
-        mFilterMQs.resize(filterId + 1);
-        mFilterEvents.resize(filterId + 1);
-        mFilterEventFlags.resize(filterId + 1);
-        mFilterThreadRunning.resize(filterId + 1);
-        mFilterThreads.resize(filterId + 1);
-        mFilterPids.resize(filterId + 1);
-        mFilterOutputs.resize(filterId + 1);
-        mFilterStatus.resize(filterId + 1);
     }
 
     mUsedFilterIds.insert(filterId);
 
-    if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
+    if (cb == nullptr) {
         ALOGW("callback can't be null");
-        _hidl_cb(Result::INVALID_ARGUMENT, filterId);
+        _hidl_cb(Result::INVALID_ARGUMENT, new Filter());
         return Void();
     }
 
-    // Add callback
-    mFilterCallbacks[filterId] = cb;
+    sp<Filter> filter = new Filter(type, filterId, bufferSize, cb, this);
 
-    // Mapping from the filter ID to the filter event
-    DemuxFilterEvent event{
-            .filterId = filterId,
-            .filterType = type,
-    };
-    mFilterEvents[filterId] = event;
-
-    if (!createFilterMQ(bufferSize, filterId)) {
-        _hidl_cb(Result::UNKNOWN_ERROR, -1);
+    if (!filter->createFilterMQ()) {
+        _hidl_cb(Result::UNKNOWN_ERROR, filter);
         return Void();
     }
 
-    _hidl_cb(Result::SUCCESS, filterId);
+    mFilters[filterId] = filter;
+
+    _hidl_cb(Result::SUCCESS, filter);
     return Void();
 }
 
-Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
+Return<void> Demux::openTimeFilter(openTimeFilter_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
-    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
-        ALOGW("No filter with id: %d exists to get desc", filterId);
-        _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
-        return Void();
-    }
+    sp<TimeFilter> timeFilter = new TimeFilter(this);
 
-    _hidl_cb(Result::SUCCESS, *mFilterMQs[filterId]->getDesc());
+    _hidl_cb(Result::SUCCESS, timeFilter);
     return Void();
 }
 
-Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
-    ALOGV("%s", __FUNCTION__);
-
-    switch (mFilterEvents[filterId].filterType) {
-        case DemuxFilterType::SECTION:
-            mFilterPids[filterId] = settings.section().tpid;
-            break;
-        case DemuxFilterType::PES:
-            mFilterPids[filterId] = settings.pesData().tpid;
-            break;
-        case DemuxFilterType::TS:
-            mFilterPids[filterId] = settings.ts().tpid;
-            break;
-        case DemuxFilterType::AUDIO:
-            mFilterPids[filterId] = settings.audio().tpid;
-            break;
-        case DemuxFilterType::VIDEO:
-            mFilterPids[filterId] = settings.video().tpid;
-            break;
-        case DemuxFilterType::RECORD:
-            mFilterPids[filterId] = settings.record().tpid;
-            break;
-        case DemuxFilterType::PCR:
-            mFilterPids[filterId] = settings.pcr().tpid;
-            break;
-        default:
-            return Result::UNKNOWN_ERROR;
-    }
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::startFilter(uint32_t filterId) {
-    ALOGV("%s", __FUNCTION__);
-    Result result;
-
-    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
-        ALOGW("No filter with id: %d exists to start filter", filterId);
-        return Result::INVALID_ARGUMENT;
-    }
-
-    result = startFilterLoop(filterId);
-
-    return result;
-}
-
-Return<Result> Demux::stopFilter(uint32_t filterId) {
-    ALOGV("%s", __FUNCTION__);
-
-    mFilterThreadRunning[filterId] = false;
-
-    std::lock_guard<std::mutex> lock(mFilterThreadLock);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::flushFilter(uint32_t filterId) {
-    ALOGV("%s", __FUNCTION__);
-
-    // temp implementation to flush the FMQ
-    int size = mFilterMQs[filterId]->availableToRead();
-    char* buffer = new char[size];
-    mOutputMQ->read((unsigned char*)&buffer[0], size);
-    delete[] buffer;
-    mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::removeFilter(uint32_t filterId) {
-    ALOGV("%s", __FUNCTION__);
-
-    // resetFilterRecords(filterId);
-    mUsedFilterIds.erase(filterId);
-    mUnusedFilterIds.insert(filterId);
-
-    return Result::SUCCESS;
-}
-
-Return<void> Demux::getAvSyncHwId(uint32_t /* filterId */, getAvSyncHwId_cb _hidl_cb) {
+Return<void> Demux::getAvSyncHwId(const sp<IFilter>& /* filter */, getAvSyncHwId_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
     AvSyncHwId avSyncHwId = 0;
@@ -256,588 +119,81 @@
 Return<Result> Demux::close() {
     ALOGV("%s", __FUNCTION__);
 
-    set<uint32_t>::iterator it;
-    mInputThread = 0;
-    mOutputThread = 0;
-    mFilterThreads.clear();
     mUnusedFilterIds.clear();
     mUsedFilterIds.clear();
-    mFilterCallbacks.clear();
-    mFilterMQs.clear();
-    mFilterEvents.clear();
-    mFilterEventFlags.clear();
-    mFilterOutputs.clear();
-    mFilterPids.clear();
     mLastUsedFilterId = -1;
 
     return Result::SUCCESS;
 }
 
-Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
+Return<void> Demux::openDvr(DvrType type, uint32_t bufferSize, const sp<IDvrCallback>& cb,
+                            openDvr_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
-    // Create a synchronized FMQ that supports blocking read/write
-    std::unique_ptr<FilterMQ> tmpFilterMQ =
-            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
-    if (!tmpFilterMQ->isValid()) {
-        ALOGW("Failed to create output FMQ");
-        return Result::UNKNOWN_ERROR;
-    }
-
-    mOutputMQ = std::move(tmpFilterMQ);
-
-    if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
-        return Result::UNKNOWN_ERROR;
-    }
-
-    mOutputCallback = cb;
-
-    return Result::SUCCESS;
-}
-
-Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
-    ALOGV("%s", __FUNCTION__);
-
-    if (!mOutputMQ) {
-        _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
+    if (cb == nullptr) {
+        ALOGW("DVR callback can't be null");
+        _hidl_cb(Result::INVALID_ARGUMENT, new Dvr());
         return Void();
     }
 
-    _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
-    return Void();
-}
+    sp<Dvr> dvr = new Dvr(type, bufferSize, cb, this);
 
-Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
-    ALOGV("%s", __FUNCTION__);
-
-    mOutputConfigured = true;
-    mOutputSettings = settings;
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::startOutput() {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::stopOutput() {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::flushOutput() {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::removeOutput() {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
-    ALOGV("%s", __FUNCTION__);
-
-    // Create a synchronized FMQ that supports blocking read/write
-    std::unique_ptr<FilterMQ> tmpInputMQ =
-            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
-    if (!tmpInputMQ->isValid()) {
-        ALOGW("Failed to create input FMQ");
-        return Result::UNKNOWN_ERROR;
-    }
-
-    mInputMQ = std::move(tmpInputMQ);
-
-    if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
-        return Result::UNKNOWN_ERROR;
-    }
-
-    mInputCallback = cb;
-
-    return Result::SUCCESS;
-}
-
-Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
-    ALOGV("%s", __FUNCTION__);
-
-    if (!mInputMQ) {
-        _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
+    if (!dvr->createDvrMQ()) {
+        _hidl_cb(Result::UNKNOWN_ERROR, dvr);
         return Void();
     }
 
-    _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
+    _hidl_cb(Result::SUCCESS, dvr);
     return Void();
 }
 
-Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
+Result Demux::removeFilter(uint32_t filterId) {
     ALOGV("%s", __FUNCTION__);
 
-    mInputConfigured = true;
-    mInputSettings = settings;
+    // resetFilterRecords(filterId);
+    mUsedFilterIds.erase(filterId);
+    mUnusedFilterIds.insert(filterId);
+    mFilters.erase(filterId);
 
     return Result::SUCCESS;
 }
 
-Return<Result> Demux::startInput() {
-    ALOGV("%s", __FUNCTION__);
-
-    if (!mInputCallback) {
-        return Result::NOT_INITIALIZED;
-    }
-
-    if (!mInputConfigured) {
-        return Result::INVALID_STATE;
-    }
-
-    pthread_create(&mInputThread, NULL, __threadLoopInput, this);
-    pthread_setname_np(mInputThread, "demux_input_waiting_loop");
-
-    // TODO start another thread to send filter status callback to the framework
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::stopInput() {
-    ALOGV("%s", __FUNCTION__);
-
-    mInputThreadRunning = false;
-
-    std::lock_guard<std::mutex> lock(mInputThreadLock);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::flushInput() {
-    ALOGV("%s", __FUNCTION__);
-
-    return Result::SUCCESS;
-}
-
-Return<Result> Demux::removeInput() {
-    ALOGV("%s", __FUNCTION__);
-
-    mInputMQ = nullptr;
-
-    return Result::SUCCESS;
-}
-
-Result Demux::startFilterLoop(uint32_t filterId) {
-    struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
-    threadArgs->user = this;
-    threadArgs->filterId = filterId;
-
-    pthread_t mFilterThread;
-    pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
-    mFilterThreads[filterId] = mFilterThread;
-    pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
-
-    return Result::SUCCESS;
-}
-
-Result Demux::startSectionFilterHandler(uint32_t filterId) {
-    if (mFilterOutputs[filterId].empty()) {
-        return Result::SUCCESS;
-    }
-    if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
-        ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
-        return Result::UNKNOWN_ERROR;
-    }
-
-    mFilterOutputs[filterId].clear();
-
-    return Result::SUCCESS;
-}
-
-Result Demux::startPesFilterHandler(uint32_t filterId) {
-    std::lock_guard<std::mutex> lock(mFilterEventLock);
-    if (mFilterOutputs[filterId].empty()) {
-        return Result::SUCCESS;
-    }
-
-    for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
-        if (mPesSizeLeft == 0) {
-            uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) |
-                              (mFilterOutputs[filterId][i + 5] << 8) |
-                              mFilterOutputs[filterId][i + 6];
-            ALOGD("[Demux] prefix %d", prefix);
-            if (prefix == 0x000001) {
-                // TODO handle mulptiple Pes filters
-                mPesSizeLeft =
-                        (mFilterOutputs[filterId][i + 8] << 8) | mFilterOutputs[filterId][i + 9];
-                mPesSizeLeft += 6;
-                ALOGD("[Demux] pes data length %d", mPesSizeLeft);
-            } else {
-                continue;
-            }
-        }
-
-        int endPoint = min(184, mPesSizeLeft);
-        // append data and check size
-        vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
-        vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 4 + endPoint;
-        mPesOutput.insert(mPesOutput.end(), first, last);
-        // size does not match then continue
-        mPesSizeLeft -= endPoint;
-        if (mPesSizeLeft > 0) {
-            continue;
-        }
-        // size match then create event
-        if (!writeDataToFilterMQ(mPesOutput, filterId)) {
-            mFilterOutputs[filterId].clear();
-            return Result::INVALID_STATE;
-        }
-        maySendFilterStatusCallback(filterId);
-        DemuxFilterPesEvent pesEvent;
-        pesEvent = {
-                // temp dump meta data
-                .streamId = mPesOutput[3],
-                .dataLength = static_cast<uint16_t>(mPesOutput.size()),
-        };
-        ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength);
-
-        int size = mFilterEvents[filterId].events.size();
-        mFilterEvents[filterId].events.resize(size + 1);
-        mFilterEvents[filterId].events[size].pes(pesEvent);
-        mPesOutput.clear();
-    }
-
-    mFilterOutputs[filterId].clear();
-
-    return Result::SUCCESS;
-}
-
-Result Demux::startTsFilterHandler() {
-    // TODO handle starting TS filter
-    return Result::SUCCESS;
-}
-
-Result Demux::startMediaFilterHandler(uint32_t filterId) {
-    DemuxFilterMediaEvent mediaEvent;
-    mediaEvent = {
-            // temp dump meta data
-            .pts = 0,
-            .dataLength = 530,
-            .secureMemory = nullptr,
-    };
-    mFilterEvents[filterId].events.resize(1);
-    mFilterEvents[filterId].events[0].media() = mediaEvent;
-
-    mFilterOutputs[filterId].clear();
-    // TODO handle write FQM for media stream
-    return Result::SUCCESS;
-}
-
-Result Demux::startRecordFilterHandler(uint32_t filterId) {
-    DemuxFilterRecordEvent recordEvent;
-    recordEvent = {
-            // temp dump meta data
-            .tpid = 0,
-            .packetNum = 0,
-    };
-    recordEvent.indexMask.tsIndexMask() = 0x01;
-    mFilterEvents[filterId].events.resize(1);
-    mFilterEvents[filterId].events[0].ts() = recordEvent;
-
-    mFilterOutputs[filterId].clear();
-    return Result::SUCCESS;
-}
-
-Result Demux::startPcrFilterHandler() {
-    // TODO handle starting PCR filter
-    return Result::SUCCESS;
-}
-
-bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
-    ALOGV("%s", __FUNCTION__);
-
-    // Create a synchronized FMQ that supports blocking read/write
-    std::unique_ptr<FilterMQ> tmpFilterMQ =
-            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
-    if (!tmpFilterMQ->isValid()) {
-        ALOGW("Failed to create FMQ of filter with id: %d", filterId);
-        return false;
-    }
-
-    mFilterMQs[filterId] = std::move(tmpFilterMQ);
-
-    EventFlag* filterEventFlag;
-    if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
-        OK) {
-        return false;
-    }
-    mFilterEventFlags[filterId] = filterEventFlag;
-
-    return true;
-}
-
-bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
-    // TODO check how many sections has been read
-    std::lock_guard<std::mutex> lock(mFilterEventLock);
-    if (!writeDataToFilterMQ(data, filterId)) {
-        return false;
-    }
-    int size = mFilterEvents[filterId].events.size();
-    mFilterEvents[filterId].events.resize(size + 1);
-    DemuxFilterSectionEvent secEvent;
-    secEvent = {
-            // temp dump meta data
-            .tableId = 0,
-            .version = 1,
-            .sectionNum = 1,
-            .dataLength = static_cast<uint16_t>(data.size()),
-    };
-    mFilterEvents[filterId].events[size].section(secEvent);
-    return true;
-}
-
-bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
-    std::lock_guard<std::mutex> lock(mWriteLock);
-    if (mFilterMQs[filterId]->write(data.data(), data.size())) {
-        return true;
-    }
-    return false;
-}
-
-bool Demux::readInputFMQ() {
-    // Read input data from the input FMQ
-    int size = mInputMQ->availableToRead();
-    int inputPacketSize = mInputSettings.packetSize;
-    vector<uint8_t> dataOutputBuffer;
-    dataOutputBuffer.resize(inputPacketSize);
-
-    // Dispatch the packet to the PID matching filter output buffer
-    for (int i = 0; i < size / inputPacketSize; i++) {
-        if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
-            return false;
-        }
-        startTsFilter(dataOutputBuffer);
-    }
-
-    return true;
-}
-
 void Demux::startTsFilter(vector<uint8_t> data) {
     set<uint32_t>::iterator it;
     for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
         uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
-        ALOGW("start ts filter pid: %d", pid);
-        if (pid == mFilterPids[*it]) {
-            mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
+        if (DEBUG_FILTER) {
+            ALOGW("start ts filter pid: %d", pid);
+        }
+        if (pid == mFilters[*it]->getTpid()) {
+            mFilters[*it]->updateFilterOutput(data);
         }
     }
 }
 
 bool Demux::startFilterDispatcher() {
-    Result result;
     set<uint32_t>::iterator it;
 
     // Handle the output data per filter type
     for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
-        switch (mFilterEvents[*it].filterType) {
-            case DemuxFilterType::SECTION:
-                result = startSectionFilterHandler(*it);
-                break;
-            case DemuxFilterType::PES:
-                result = startPesFilterHandler(*it);
-                break;
-            case DemuxFilterType::TS:
-                result = startTsFilterHandler();
-                break;
-            case DemuxFilterType::AUDIO:
-            case DemuxFilterType::VIDEO:
-                result = startMediaFilterHandler(*it);
-                break;
-            case DemuxFilterType::RECORD:
-                result = startRecordFilterHandler(*it);
-                break;
-            case DemuxFilterType::PCR:
-                result = startPcrFilterHandler();
-                break;
-            default:
-                return false;
+        if (mFilters[*it]->startFilterHandler() != Result::SUCCESS) {
+            return false;
         }
     }
 
-    return result == Result::SUCCESS;
+    return true;
 }
 
-void* Demux::__threadLoopFilter(void* threadArg) {
-    Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
-    self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
-    return 0;
+Result Demux::startFilterHandler(uint32_t filterId) {
+    return mFilters[filterId]->startFilterHandler();
 }
 
-void* Demux::__threadLoopInput(void* user) {
-    Demux* const self = static_cast<Demux*>(user);
-    self->inputThreadLoop();
-    return 0;
+void Demux::updateFilterOutput(uint16_t filterId, vector<uint8_t> data) {
+    mFilters[filterId]->updateFilterOutput(data);
 }
 
-void Demux::filterThreadLoop(uint32_t filterId) {
-    ALOGD("[Demux] filter %d threadLoop start.", filterId);
-    std::lock_guard<std::mutex> lock(mFilterThreadLock);
-    mFilterThreadRunning[filterId] = true;
-
-    // For the first time of filter output, implementation needs to send the filter
-    // Event Callback without waiting for the DATA_CONSUMED to init the process.
-    while (mFilterThreadRunning[filterId]) {
-        if (mFilterEvents[filterId].events.size() == 0) {
-            ALOGD("[Demux] wait for filter data output.");
-            usleep(1000 * 1000);
-            continue;
-        }
-        // After successfully write, send a callback and wait for the read to be done
-        mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
-        mFilterEvents[filterId].events.resize(0);
-        mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
-        mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
-        break;
-    }
-
-    while (mFilterThreadRunning[filterId]) {
-        uint32_t efState = 0;
-        // We do not wait for the last round of writen data to be read to finish the thread
-        // because the VTS can verify the reading itself.
-        for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
-            while (mFilterThreadRunning[filterId]) {
-                status_t status = mFilterEventFlags[filterId]->wait(
-                        static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
-                        WAIT_TIMEOUT, true /* retry on spurious wake */);
-                if (status != OK) {
-                    ALOGD("[Demux] wait for data consumed");
-                    continue;
-                }
-                break;
-            }
-
-            if (mFilterCallbacks[filterId] == nullptr) {
-                ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
-                break;
-            }
-
-            maySendFilterStatusCallback(filterId);
-
-            while (mFilterThreadRunning[filterId]) {
-                std::lock_guard<std::mutex> lock(mFilterEventLock);
-                if (mFilterEvents[filterId].events.size() == 0) {
-                    continue;
-                }
-                // After successfully write, send a callback and wait for the read to be done
-                mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
-                mFilterEvents[filterId].events.resize(0);
-                break;
-            }
-            // We do not wait for the last read to be done
-            // VTS can verify the read result itself.
-            if (i == SECTION_WRITE_COUNT - 1) {
-                ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
-                break;
-            }
-        }
-        mFilterThreadRunning[filterId] = false;
-    }
-
-    ALOGD("[Demux] filter thread ended.");
-}
-
-void Demux::inputThreadLoop() {
-    ALOGD("[Demux] input threadLoop start.");
-    std::lock_guard<std::mutex> lock(mInputThreadLock);
-    mInputThreadRunning = true;
-
-    while (mInputThreadRunning) {
-        uint32_t efState = 0;
-        status_t status =
-                mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
-                                      &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
-        if (status != OK) {
-            ALOGD("[Demux] wait for data ready on the input FMQ");
-            continue;
-        }
-        // Our current implementation filter the data and write it into the filter FMQ immediately
-        // after the DATA_READY from the VTS/framework
-        if (!readInputFMQ() || !startFilterDispatcher()) {
-            ALOGD("[Demux] input data failed to be filtered. Ending thread");
-            break;
-        }
-
-        maySendInputStatusCallback();
-    }
-
-    mInputThreadRunning = false;
-    ALOGD("[Demux] input thread ended.");
-}
-
-void Demux::maySendInputStatusCallback() {
-    std::lock_guard<std::mutex> lock(mInputStatusLock);
-    int availableToRead = mInputMQ->availableToRead();
-    int availableToWrite = mInputMQ->availableToWrite();
-
-    DemuxInputStatus newStatus =
-            checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
-                                   mInputSettings.lowThreshold);
-    if (mIntputStatus != newStatus) {
-        mInputCallback->onInputStatus(newStatus);
-        mIntputStatus = newStatus;
-    }
-}
-
-void Demux::maySendFilterStatusCallback(uint32_t filterId) {
-    std::lock_guard<std::mutex> lock(mFilterStatusLock);
-    int availableToRead = mFilterMQs[filterId]->availableToRead();
-    int availableToWrite = mFilterMQs[filterId]->availableToWrite();
-    int fmqSize = mFilterMQs[filterId]->getQuantumCount();
-
-    DemuxFilterStatus newStatus =
-            checkFilterStatusChange(filterId, availableToWrite, availableToRead,
-                                    ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
-    if (mFilterStatus[filterId] != newStatus) {
-        mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
-        mFilterStatus[filterId] = newStatus;
-    }
-}
-
-DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
-                                               uint32_t highThreshold, uint32_t lowThreshold) {
-    if (availableToWrite == 0) {
-        return DemuxInputStatus::SPACE_FULL;
-    } else if (availableToRead > highThreshold) {
-        return DemuxInputStatus::SPACE_ALMOST_FULL;
-    } else if (availableToRead < lowThreshold) {
-        return DemuxInputStatus::SPACE_ALMOST_EMPTY;
-    } else if (availableToRead == 0) {
-        return DemuxInputStatus::SPACE_EMPTY;
-    }
-    return mIntputStatus;
-}
-
-DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
-                                                 uint32_t availableToRead, uint32_t highThreshold,
-                                                 uint32_t lowThreshold) {
-    if (availableToWrite == 0) {
-        return DemuxFilterStatus::OVERFLOW;
-    } else if (availableToRead > highThreshold) {
-        return DemuxFilterStatus::HIGH_WATER;
-    } else if (availableToRead < lowThreshold) {
-        return DemuxFilterStatus::LOW_WATER;
-    }
-    return mFilterStatus[filterId];
+uint16_t Demux::getFilterTpid(uint32_t filterId) {
+    return mFilters[filterId]->getTpid();
 }
 
 Result Demux::startBroadcastInputLoop() {
@@ -876,6 +232,7 @@
             for (int i = 0; i < writePacketAmount; i++) {
                 inputData.read(buffer, packetSize);
                 if (!inputData) {
+                    mKeepFetchingDataFromFrontend = false;
                     mBroadcastInputThreadRunning = false;
                     break;
                 }
@@ -888,7 +245,7 @@
                 startTsFilter(byteBuffer);
             }
             startFilterDispatcher();
-            sleep(1);
+            usleep(100);
         }
     }
 
@@ -898,6 +255,7 @@
 }
 
 void Demux::stopBroadcastInput() {
+    ALOGD("[Demux] stop frontend on demux");
     mKeepFetchingDataFromFrontend = false;
     mBroadcastInputThreadRunning = false;
     std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);