Tuner HAL Demux Playback interface implementation

Test: manual
Bug: 135709325
Change-Id: I0b673159b667c5bde47e9ed285cfa1bdc6c668c6
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 4016c5a..889e42e 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -73,34 +73,6 @@
 
 Demux::~Demux() {}
 
-bool Demux::createAndSaveMQ(uint32_t bufferSize, uint32_t filterId) {
-    ALOGV("%s", __FUNCTION__);
-
-    // Create a synchronized FMQ that supports blocking read/write
-    std::unique_ptr<FilterMQ> tmpFilterMQ =
-            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
-    if (!tmpFilterMQ->isValid()) {
-        ALOGW("Failed to create FMQ of filter with id: %d", filterId);
-        return false;
-    }
-
-    mFilterMQs.resize(filterId + 1);
-    mFilterMQs[filterId] = std::move(tmpFilterMQ);
-
-    EventFlag* mFilterEventFlag;
-    if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &mFilterEventFlag) !=
-        OK) {
-        return false;
-    }
-    mFilterEventFlags.resize(filterId + 1);
-    mFilterEventFlags[filterId] = mFilterEventFlag;
-    mFilterWriteCount.resize(filterId + 1);
-    mFilterWriteCount[filterId] = 0;
-    mThreadRunning.resize(filterId + 1);
-
-    return true;
-}
-
 Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
     ALOGV("%s", __FUNCTION__);
 
@@ -113,23 +85,42 @@
                               const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
-    uint32_t filterId = mLastUsedFilterId + 1;
-    mLastUsedFilterId += 1;
+    uint32_t filterId;
+
+    if (!mUnusedFilterIds.empty()) {
+        filterId = *mUnusedFilterIds.begin();
+
+        mUnusedFilterIds.erase(filterId);
+    } else {
+        filterId = ++mLastUsedFilterId;
+
+        mDemuxCallbacks.resize(filterId + 1);
+        mFilterMQs.resize(filterId + 1);
+        mFilterEvents.resize(filterId + 1);
+        mFilterEventFlags.resize(filterId + 1);
+        mFilterThreadRunning.resize(filterId + 1);
+        mFilterThreads.resize(filterId + 1);
+    }
+
+    mUsedFilterIds.insert(filterId);
 
     if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
         ALOGW("callback can't be null");
         _hidl_cb(Result::INVALID_ARGUMENT, filterId);
         return Void();
     }
+
     // Add callback
-    mDemuxCallbacks.resize(filterId + 1);
     mDemuxCallbacks[filterId] = cb;
 
-    // Mapping from the filter ID to the filter type
-    mFilterTypes.resize(filterId + 1);
-    mFilterTypes[filterId] = type;
+    // Mapping from the filter ID to the filter event
+    DemuxFilterEvent event{
+            .filterId = filterId,
+            .filterType = type,
+    };
+    mFilterEvents[filterId] = event;
 
-    if (!createAndSaveMQ(bufferSize, filterId)) {
+    if (!createFilterMQ(bufferSize, filterId)) {
         _hidl_cb(Result::UNKNOWN_ERROR, -1);
         return Void();
     }
@@ -141,8 +132,8 @@
 Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
     ALOGV("%s", __FUNCTION__);
 
-    if (filterId < 0 || filterId > mLastUsedFilterId) {
-        ALOGW("No filter with id: %d exists", filterId);
+    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
+        ALOGW("No filter with id: %d exists to get desc", filterId);
         _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
         return Void();
     }
@@ -160,35 +151,29 @@
 
 Return<Result> Demux::startFilter(uint32_t filterId) {
     ALOGV("%s", __FUNCTION__);
+    Result result;
 
-    if (filterId < 0 || filterId > mLastUsedFilterId) {
-        ALOGW("No filter with id: %d exists", filterId);
+    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
+        ALOGW("No filter with id: %d exists to start filter", filterId);
         return Result::INVALID_ARGUMENT;
     }
 
-    DemuxFilterType filterType = mFilterTypes[filterId];
-    Result result;
-    DemuxFilterEvent event{
-            .filterId = filterId,
-            .filterType = filterType,
-    };
-
-    switch (filterType) {
+    switch (mFilterEvents[filterId].filterType) {
         case DemuxFilterType::SECTION:
-            result = startSectionFilterHandler(event);
+            result = startFilterLoop(filterId);
             break;
         case DemuxFilterType::PES:
-            result = startPesFilterHandler(event);
+            result = startPesFilterHandler(filterId);
             break;
         case DemuxFilterType::TS:
             result = startTsFilterHandler();
             return Result::SUCCESS;
         case DemuxFilterType::AUDIO:
         case DemuxFilterType::VIDEO:
-            result = startMediaFilterHandler(event);
+            result = startMediaFilterHandler(filterId);
             break;
         case DemuxFilterType::RECORD:
-            result = startRecordFilterHandler(event);
+            result = startRecordFilterHandler(filterId);
             break;
         case DemuxFilterType::PCR:
             result = startPcrFilterHandler();
@@ -212,9 +197,13 @@
     return Result::SUCCESS;
 }
 
-Return<Result> Demux::removeFilter(uint32_t /* filterId */) {
+Return<Result> Demux::removeFilter(uint32_t filterId) {
     ALOGV("%s", __FUNCTION__);
 
+    // resetFilterRecords(filterId);
+    mUsedFilterIds.erase(filterId);
+    mUnusedFilterIds.insert(filterId);
+
     return Result::SUCCESS;
 }
 
@@ -239,25 +228,291 @@
 Return<Result> Demux::close() {
     ALOGV("%s", __FUNCTION__);
 
+    set<uint32_t>::iterator it;
+    mInputThread = 0;
+    mOutputThread = 0;
+    mFilterThreads.clear();
+    mUnusedFilterIds.clear();
+    mUsedFilterIds.clear();
+    mDemuxCallbacks.clear();
+    mFilterMQs.clear();
+    mFilterEvents.clear();
+    mFilterEventFlags.clear();
+    mLastUsedFilterId = -1;
+
     return Result::SUCCESS;
 }
 
-bool Demux::writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum) {
-    event.events.resize(sectionNum);
-    for (int i = 0; i < sectionNum; i++) {
-        DemuxFilterSectionEvent secEvent;
-        secEvent = {
-                // temp dump meta data
-                .tableId = 0,
-                .version = 1,
-                .sectionNum = 1,
-                .dataLength = 530,
-        };
-        event.events[i].section(secEvent);
-        if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) {
-            return false;
-        }
+Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
+    ALOGV("%s", __FUNCTION__);
+
+    // Create a synchronized FMQ that supports blocking read/write
+    std::unique_ptr<FilterMQ> tmpFilterMQ =
+            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
+    if (!tmpFilterMQ->isValid()) {
+        ALOGW("Failed to create output FMQ");
+        return Result::UNKNOWN_ERROR;
     }
+
+    mOutputMQ = std::move(tmpFilterMQ);
+
+    if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
+        return Result::UNKNOWN_ERROR;
+    }
+
+    mOutputCallback = cb;
+
+    return Result::SUCCESS;
+}
+
+Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
+    ALOGV("%s", __FUNCTION__);
+
+    if (!mOutputMQ) {
+        _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
+        return Void();
+    }
+
+    _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
+    return Void();
+}
+
+Return<Result> Demux::configureOutput(const DemuxOutputSettings& /* settings */) {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::attachOutputTsFilter(uint32_t /*filterId*/) {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::detachOutputTsFilter(uint32_t /* filterId */) {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::startOutput() {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::stopOutput() {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::flushOutput() {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::removeOutput() {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
+    ALOGV("%s", __FUNCTION__);
+
+    // Create a synchronized FMQ that supports blocking read/write
+    std::unique_ptr<FilterMQ> tmpInputMQ =
+            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
+    if (!tmpInputMQ->isValid()) {
+        ALOGW("Failed to create input FMQ");
+        return Result::UNKNOWN_ERROR;
+    }
+
+    mInputMQ = std::move(tmpInputMQ);
+
+    if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
+        return Result::UNKNOWN_ERROR;
+    }
+
+    mInputCallback = cb;
+
+    return Result::SUCCESS;
+}
+
+Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
+    ALOGV("%s", __FUNCTION__);
+
+    if (!mInputMQ) {
+        _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
+        return Void();
+    }
+
+    _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
+    return Void();
+}
+
+Return<Result> Demux::configureInput(const DemuxInputSettings& /* settings */) {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::startInput() {
+    ALOGV("%s", __FUNCTION__);
+
+    pthread_create(&mInputThread, NULL, __threadLoopInput, this);
+    pthread_setname_np(mInputThread, "demux_input_waiting_loop");
+
+    // TODO start another thread to send filter status callback to the framework
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::stopInput() {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::flushInput() {
+    ALOGV("%s", __FUNCTION__);
+
+    return Result::SUCCESS;
+}
+
+Return<Result> Demux::removeInput() {
+    ALOGV("%s", __FUNCTION__);
+
+    mInputMQ = nullptr;
+
+    return Result::SUCCESS;
+}
+
+Result Demux::startFilterLoop(uint32_t filterId) {
+    struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
+    threadArgs->user = this;
+    threadArgs->filterId = filterId;
+
+    pthread_t mFilterThread;
+    pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
+    mFilterThreads[filterId] = mFilterThread;
+    pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
+
+    return Result::SUCCESS;
+}
+
+Result Demux::startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data) {
+    if (!writeSectionsAndCreateEvent(filterId, data)) {
+        ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
+        return Result::UNKNOWN_ERROR;
+    }
+
+    return Result::SUCCESS;
+}
+
+Result Demux::startPesFilterHandler(uint32_t filterId) {
+    // TODO generate multiple events in one event callback
+    DemuxFilterPesEvent pesEvent;
+    pesEvent = {
+            // temp dump meta data
+            .streamId = 0,
+            .dataLength = 530,
+    };
+    mFilterEvents[filterId].events.resize(1);
+    mFilterEvents[filterId].events[0].pes(pesEvent);
+    /*pthread_create(&mThreadId, NULL, __threadLoop, this);
+    pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
+    if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) {
+        return Result::INVALID_STATE;
+    }
+
+    if (mDemuxCallbacks[filterId] == nullptr) {
+        return Result::NOT_INITIALIZED;
+    }
+
+    mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
+    return Result::SUCCESS;
+}
+
+Result Demux::startTsFilterHandler() {
+    // TODO handle starting TS filter
+    return Result::SUCCESS;
+}
+
+Result Demux::startMediaFilterHandler(uint32_t filterId) {
+    DemuxFilterMediaEvent mediaEvent;
+    mediaEvent = {
+            // temp dump meta data
+            .pts = 0,
+            .dataLength = 530,
+            .secureMemory = nullptr,
+    };
+    mFilterEvents[filterId].events.resize(1);
+    mFilterEvents[filterId].events[0].media() = mediaEvent;
+    // TODO handle write FQM for media stream
+    return Result::SUCCESS;
+}
+
+Result Demux::startRecordFilterHandler(uint32_t filterId) {
+    DemuxFilterRecordEvent recordEvent;
+    recordEvent = {
+            // temp dump meta data
+            .tpid = 0,
+            .packetNum = 0,
+    };
+    recordEvent.indexMask.tsIndexMask() = 0x01;
+    mFilterEvents[filterId].events.resize(1);
+    mFilterEvents[filterId].events[0].ts() = recordEvent;
+    return Result::SUCCESS;
+}
+
+Result Demux::startPcrFilterHandler() {
+    // TODO handle starting PCR filter
+    return Result::SUCCESS;
+}
+
+bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
+    ALOGV("%s", __FUNCTION__);
+
+    // Create a synchronized FMQ that supports blocking read/write
+    std::unique_ptr<FilterMQ> tmpFilterMQ =
+            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
+    if (!tmpFilterMQ->isValid()) {
+        ALOGW("Failed to create FMQ of filter with id: %d", filterId);
+        return false;
+    }
+
+    mFilterMQs[filterId] = std::move(tmpFilterMQ);
+
+    EventFlag* filterEventFlag;
+    if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
+        OK) {
+        return false;
+    }
+    mFilterEventFlags[filterId] = filterEventFlag;
+
+    return true;
+}
+
+bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
+    // TODO check how many sections has been read
+    std::lock_guard<std::mutex> lock(mFilterEventLock);
+    int size = mFilterEvents[filterId].events.size();
+    mFilterEvents[filterId].events.resize(size + 1);
+    if (!writeDataToFilterMQ(data, filterId)) {
+        return false;
+    }
+    DemuxFilterSectionEvent secEvent;
+    secEvent = {
+            // temp dump meta data
+            .tableId = 0,
+            .version = 1,
+            .sectionNum = 1,
+            .dataLength = 530,
+    };
+    mFilterEvents[filterId].events[size].section(secEvent);
     return true;
 }
 
@@ -269,116 +524,82 @@
     return false;
 }
 
-Result Demux::startSectionFilterHandler(DemuxFilterEvent event) {
-    struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
-    threadArgs->user = this;
-    threadArgs->event = &event;
+bool Demux::filterAndOutputData() {
+    ALOGD("[Demux] start to dispatch data to filters");
+    // Read input data from the input FMQ
+    int size = mInputMQ->availableToRead();
+    vector<uint8_t> dataOutputBuffer;
+    dataOutputBuffer.resize(size);
+    mInputMQ->read(dataOutputBuffer.data(), size);
 
-    pthread_create(&mThreadId, NULL, __threadLoop, (void*)threadArgs);
-    pthread_setname_np(mThreadId, "demux_filter_waiting_loop");
-
-    return Result::SUCCESS;
-}
-
-Result Demux::startPesFilterHandler(DemuxFilterEvent& event) {
-    // TODO generate multiple events in one event callback
-    DemuxFilterPesEvent pesEvent;
-    pesEvent = {
-            // temp dump meta data
-            .streamId = 0,
-            .dataLength = 530,
-    };
-    event.events.resize(1);
-    event.events[0].pes(pesEvent);
-    /*pthread_create(&mThreadId, NULL, __threadLoop, this);
-    pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
-    if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) {
-        return Result::INVALID_STATE;
+    Result result;
+    // Filter the data and feed the output to each filter
+    set<uint32_t>::iterator it;
+    for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
+        switch (mFilterEvents[*it].filterType) {
+            case DemuxFilterType::SECTION:
+                result = startSectionFilterHandler(*it, dataOutputBuffer);
+                break;
+            case DemuxFilterType::PES:
+                result = startPesFilterHandler(*it);
+                break;
+            case DemuxFilterType::TS:
+                result = startTsFilterHandler();
+                break;
+            case DemuxFilterType::AUDIO:
+            case DemuxFilterType::VIDEO:
+                result = startMediaFilterHandler(*it);
+                break;
+            case DemuxFilterType::RECORD:
+                result = startRecordFilterHandler(*it);
+                break;
+            case DemuxFilterType::PCR:
+                result = startPcrFilterHandler();
+                break;
+            default:
+                return false;
+        }
     }
 
-    if (mDemuxCallbacks[event.filterId] == nullptr) {
-        return Result::NOT_INITIALIZED;
-    }
-
-    mDemuxCallbacks[event.filterId]->onFilterEvent(event);
-    return Result::SUCCESS;
+    return result == Result::SUCCESS;
 }
 
-Result Demux::startTsFilterHandler() {
-    // TODO handle starting TS filter
-    return Result::SUCCESS;
-}
-
-Result Demux::startMediaFilterHandler(DemuxFilterEvent& event) {
-    DemuxFilterMediaEvent mediaEvent;
-    mediaEvent = {
-            // temp dump meta data
-            .pts = 0,
-            .dataLength = 530,
-            .secureMemory = nullptr,
-    };
-    event.events.resize(1);
-    event.events[0].media() = mediaEvent;
-    // TODO handle write FQM for media stream
-    return Result::SUCCESS;
-}
-
-Result Demux::startRecordFilterHandler(DemuxFilterEvent& event) {
-    DemuxFilterRecordEvent recordEvent;
-    recordEvent = {
-            // temp dump meta data
-            .tpid = 0,
-            .packetNum = 0,
-    };
-    recordEvent.indexMask.tsIndexMask() = 0x01;
-    event.events.resize(1);
-    event.events[0].ts() = recordEvent;
-    return Result::SUCCESS;
-}
-
-Result Demux::startPcrFilterHandler() {
-    // TODO handle starting PCR filter
-    return Result::SUCCESS;
-}
-
-void* Demux::__threadLoop(void* threadArg) {
+void* Demux::__threadLoopFilter(void* threadArg) {
     Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
-    self->filterThreadLoop(((struct ThreadArgs*)threadArg)->event);
+    self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
     return 0;
 }
 
-void Demux::filterThreadLoop(DemuxFilterEvent* event) {
-    uint32_t filterId = event->filterId;
-    ALOGD("[Demux] filter %d threadLoop start.", filterId);
-    mThreadRunning[filterId] = true;
+void* Demux::__threadLoopInput(void* user) {
+    Demux* const self = static_cast<Demux*>(user);
+    self->inputThreadLoop();
+    return 0;
+}
 
-    while (mThreadRunning[filterId]) {
+void Demux::filterThreadLoop(uint32_t filterId) {
+    ALOGD("[Demux] filter %d threadLoop start.", filterId);
+    mFilterThreadRunning[filterId] = true;
+
+    // For the first time of filter output, implementation needs to send the filter
+    // Event Callback without waiting for the DATA_CONSUMED to init the process.
+    while (mFilterThreadRunning[filterId]) {
+        if (mFilterEvents[filterId].events.size() == 0) {
+            ALOGD("[Demux] wait for filter data output.");
+            usleep(1000 * 1000);
+            continue;
+        }
+        // After successfully write, send a callback and wait for the read to be done
+        mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
+        mFilterEvents[filterId].events.resize(0);
+        break;
+    }
+
+    while (mFilterThreadRunning[filterId]) {
         uint32_t efState = 0;
         // We do not wait for the last round of writen data to be read to finish the thread
         // because the VTS can verify the reading itself.
         for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
-            DemuxFilterEvent filterEvent{
-                    .filterId = filterId,
-                    .filterType = event->filterType,
-            };
-            if (!writeSectionsAndCreateEvent(filterEvent, 2)) {
-                ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
-                break;
-            }
-            mFilterWriteCount[filterId]++;
-            if (mDemuxCallbacks[filterId] == nullptr) {
-                ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
-                break;
-            }
-            // After successfully write, send a callback and wait for the read to be done
-            mDemuxCallbacks[filterId]->onFilterEvent(filterEvent);
-            // We do not wait for the last read to be done
-            // VTS can verify the read result itself.
-            if (i == SECTION_WRITE_COUNT - 1) {
-                ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
-                break;
-            }
-            while (mThreadRunning[filterId]) {
+            while (mFilterThreadRunning[filterId]) {
                 status_t status = mFilterEventFlags[filterId]->wait(
                         static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
                         WAIT_TIMEOUT, true /* retry on spurious wake */);
@@ -388,15 +609,60 @@
                 }
                 break;
             }
-        }
 
-        mFilterWriteCount[filterId] = 0;
-        mThreadRunning[filterId] = false;
+            if (mDemuxCallbacks[filterId] == nullptr) {
+                ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
+                break;
+            }
+
+            while (mFilterThreadRunning[filterId]) {
+                std::lock_guard<std::mutex> lock(mFilterEventLock);
+                if (mFilterEvents[filterId].events.size() == 0) {
+                    continue;
+                }
+                // After successfully write, send a callback and wait for the read to be done
+                mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
+                mFilterEvents[filterId].events.resize(0);
+                break;
+            }
+            // We do not wait for the last read to be done
+            // VTS can verify the read result itself.
+            if (i == SECTION_WRITE_COUNT - 1) {
+                ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
+                break;
+            }
+        }
+        mFilterThreadRunning[filterId] = false;
     }
 
     ALOGD("[Demux] filter thread ended.");
 }
 
+void Demux::inputThreadLoop() {
+    ALOGD("[Demux] input threadLoop start.");
+    mInputThreadRunning = true;
+
+    while (mInputThreadRunning) {
+        uint32_t efState = 0;
+        status_t status =
+                mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
+                                      &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
+        if (status != OK) {
+            ALOGD("[Demux] wait for data ready on the input FMQ");
+            continue;
+        }
+        // Our current implementation filter the data and write it into the filter FMQ immedaitely
+        // after the DATA_READY from the VTS/framework
+        if (!filterAndOutputData()) {
+            ALOGD("[Demux] input data failed to be filtered. Ending thread");
+            break;
+        }
+    }
+
+    mInputThreadRunning = false;
+    ALOGD("[Demux] input thread ended.");
+}
+
 }  // namespace implementation
 }  // namespace V1_0
 }  // namespace tuner
diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h
index 8b00266..2fdde8d 100644
--- a/tv/tuner/1.0/default/Demux.h
+++ b/tv/tuner/1.0/default/Demux.h
@@ -19,6 +19,7 @@
 
 #include <android/hardware/tv/tuner/1.0/IDemux.h>
 #include <fmq/MessageQueue.h>
+#include <set>
 
 using namespace std;
 
@@ -43,6 +44,8 @@
   public:
     Demux(uint32_t demuxId);
 
+    ~Demux();
+
     virtual Return<Result> setFrontendDataSource(uint32_t frontendId) override;
 
     virtual Return<Result> close() override;
@@ -68,8 +71,58 @@
 
     virtual Return<void> getAvSyncTime(AvSyncHwId avSyncHwId, getAvSyncTime_cb _hidl_cb) override;
 
+    virtual Return<Result> addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) override;
+
+    virtual Return<void> getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) override;
+
+    virtual Return<Result> configureInput(const DemuxInputSettings& settings) override;
+
+    virtual Return<Result> startInput() override;
+
+    virtual Return<Result> stopInput() override;
+
+    virtual Return<Result> flushInput() override;
+
+    virtual Return<Result> removeInput() override;
+
+    virtual Return<Result> addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) override;
+
+    virtual Return<void> getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) override;
+
+    virtual Return<Result> configureOutput(const DemuxOutputSettings& settings) override;
+
+    virtual Return<Result> attachOutputTsFilter(uint32_t filterId) override;
+
+    virtual Return<Result> detachOutputTsFilter(uint32_t filterId) override;
+
+    virtual Return<Result> startOutput() override;
+
+    virtual Return<Result> stopOutput() override;
+
+    virtual Return<Result> flushOutput() override;
+
+    virtual Return<Result> removeOutput() override;
+
   private:
-    virtual ~Demux();
+    // A struct that passes the arguments to a newly created filter thread
+    struct ThreadArgs {
+        Demux* user;
+        uint32_t filterId;
+    };
+
+    /**
+     * Filter handlers to handle the data filtering.
+     * They are also responsible to write the filtered output into the filter FMQ
+     * and update the filterEvent bound with the same filterId.
+     */
+    Result startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data);
+    Result startPesFilterHandler(uint32_t filterId);
+    Result startTsFilterHandler();
+    Result startMediaFilterHandler(uint32_t filterId);
+    Result startRecordFilterHandler(uint32_t filterId);
+    Result startPcrFilterHandler();
+    Result startFilterLoop(uint32_t filterId);
+
     /**
      * To create a FilterMQ with the the next available Filter ID.
      * Creating Event Flag at the same time.
@@ -77,60 +130,80 @@
      *
      * Return false is any of the above processes fails.
      */
-    bool createAndSaveMQ(uint32_t bufferSize, uint32_t filterId);
+    bool createFilterMQ(uint32_t bufferSize, uint32_t filterId);
+    bool createMQ(FilterMQ* queue, EventFlag* eventFlag, uint32_t bufferSize);
     void deleteEventFlag();
     bool writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId);
-    Result startSectionFilterHandler(DemuxFilterEvent event);
-    Result startPesFilterHandler(DemuxFilterEvent& event);
-    Result startTsFilterHandler();
-    Result startMediaFilterHandler(DemuxFilterEvent& event);
-    Result startRecordFilterHandler(DemuxFilterEvent& event);
-    Result startPcrFilterHandler();
-    bool writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum);
-    void filterThreadLoop(DemuxFilterEvent* event);
-    static void* __threadLoop(void* data);
+    bool readDataFromMQ();
+    bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
+    /**
+     * A dispatcher to read and dispatch input data to all the started filters.
+     * Each filter handler handles the data filtering/output writing/filterEvent updating.
+     */
+    bool filterAndOutputData();
+    static void* __threadLoopFilter(void* data);
+    static void* __threadLoopInput(void* user);
+    void filterThreadLoop(uint32_t filterId);
+    void inputThreadLoop();
 
     uint32_t mDemuxId;
     uint32_t mSourceFrontendId;
     /**
-     * Record the last used filer id. Initial value is -1.
+     * Record the last used filter id. Initial value is -1.
      * Filter Id starts with 0.
      */
     uint32_t mLastUsedFilterId = -1;
     /**
+     * Record all the used filter Ids.
+     * Any removed filter id should be removed from this set.
+     */
+    set<uint32_t> mUsedFilterIds;
+    /**
+     * Record all the unused filter Ids within mLastUsedFilterId.
+     * Removed filter Id should be added into this set.
+     * When this set is not empty, ids here should be allocated first
+     * and added into usedFilterIds.
+     */
+    set<uint32_t> mUnusedFilterIds;
+    /**
      * A list of created FilterMQ ptrs.
      * The array number is the filter ID.
      */
     vector<unique_ptr<FilterMQ>> mFilterMQs;
-    vector<DemuxFilterType> mFilterTypes;
     vector<EventFlag*> mFilterEventFlags;
+    vector<DemuxFilterEvent> mFilterEvents;
+    unique_ptr<FilterMQ> mInputMQ;
+    unique_ptr<FilterMQ> mOutputMQ;
+    EventFlag* mInputEventFlag;
+    EventFlag* mOutputEventFlag;
     /**
      * Demux callbacks used on filter events or IO buffer status
      */
     vector<sp<IDemuxCallback>> mDemuxCallbacks;
-    /**
-     * How many times a specific filter has written since started
-     */
-    vector<uint16_t> mFilterWriteCount;
-    pthread_t mThreadId = 0;
+    sp<IDemuxCallback> mInputCallback;
+    sp<IDemuxCallback> mOutputCallback;
+    // Thread handlers
+    pthread_t mInputThread;
+    pthread_t mOutputThread;
+    vector<pthread_t> mFilterThreads;
     /**
      * If a specific filter's writing loop is still running
      */
-    vector<bool> mThreadRunning;
+    vector<bool> mFilterThreadRunning;
+    bool mInputThreadRunning;
     /**
      * Lock to protect writes to the FMQs
      */
     std::mutex mWriteLock;
     /**
+     * Lock to protect writes to the filter event
+     */
+    std::mutex mFilterEventLock;
+    /**
      * How many times a filter should write
      * TODO make this dynamic/random/can take as a parameter
      */
     const uint16_t SECTION_WRITE_COUNT = 10;
-    // A struct that passes the arguments to a newly created filter thread
-    struct ThreadArgs {
-        Demux* user;
-        DemuxFilterEvent* event;
-    };
 };
 
 }  // namespace implementation