Adding the mocking frontend tuning functionality to take specific ts
file as source of a Demux.

Test: atest
Bug: 135709325
Change-Id: I69849db58d68a7496f929940a74a63e7a9e6c6be
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 080116c..04382b0 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -67,8 +67,9 @@
         0x73, 0x63, 0x65, 0x6e, 0x65,
 };
 
-Demux::Demux(uint32_t demuxId) {
+Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
     mDemuxId = demuxId;
+    mTunerService = tuner;
 }
 
 Demux::~Demux() {}
@@ -76,9 +77,20 @@
 Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
     ALOGV("%s", __FUNCTION__);
 
-    mSourceFrontendId = frontendId;
+    if (mTunerService == nullptr) {
+        return Result::NOT_INITIALIZED;
+    }
 
-    return Result::SUCCESS;
+    mFrontend = mTunerService->getFrontendById(frontendId);
+
+    if (mFrontend == nullptr) {
+        return Result::INVALID_STATE;
+    }
+
+    mFrontendSourceFile = mFrontend->getSourceFile();
+
+    mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
+    return startBroadcastInputLoop();
 }
 
 Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
@@ -194,6 +206,8 @@
 
     mFilterThreadRunning[filterId] = false;
 
+    std::lock_guard<std::mutex> lock(mFilterThreadLock);
+
     return Result::SUCCESS;
 }
 
@@ -396,6 +410,8 @@
 
     mInputThreadRunning = false;
 
+    std::lock_guard<std::mutex> lock(mInputThreadLock);
+
     return Result::SUCCESS;
 }
 
@@ -447,19 +463,28 @@
         return Result::SUCCESS;
     }
 
-    // TODO extract PES from TS
-    if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) {
-        mFilterOutputs[filterId].clear();
-        return Result::INVALID_STATE;
+    for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
+        uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40;
+        uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4;
+        ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl);
+        if (pusi && (adaptFieldControl == 0x01)) {
+            vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
+            vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187;
+            vector<uint8_t> filterOutData(first, last);
+            if (!writeDataToFilterMQ(filterOutData, filterId)) {
+                mFilterOutputs[filterId].clear();
+                return Result::INVALID_STATE;
+            }
+            pesEvent = {
+                    // temp dump meta data
+                    .streamId = filterOutData[3],
+                    .dataLength = static_cast<uint16_t>(filterOutData.size()),
+            };
+            int size = mFilterEvents[filterId].events.size();
+            mFilterEvents[filterId].events.resize(size + 1);
+            mFilterEvents[filterId].events[size].pes(pesEvent);
+        }
     }
-    pesEvent = {
-            // temp dump meta data
-            .streamId = 0,
-            .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()),
-    };
-    int size = mFilterEvents[filterId].events.size();
-    mFilterEvents[filterId].events.resize(size + 1);
-    mFilterEvents[filterId].events[size].pes(pesEvent);
 
     mFilterOutputs[filterId].clear();
 
@@ -481,6 +506,8 @@
     };
     mFilterEvents[filterId].events.resize(1);
     mFilterEvents[filterId].events[0].media() = mediaEvent;
+
+    mFilterOutputs[filterId].clear();
     // TODO handle write FQM for media stream
     return Result::SUCCESS;
 }
@@ -495,6 +522,8 @@
     recordEvent.indexMask.tsIndexMask() = 0x01;
     mFilterEvents[filterId].events.resize(1);
     mFilterEvents[filterId].events[0].ts() = recordEvent;
+
+    mFilterOutputs[filterId].clear();
     return Result::SUCCESS;
 }
 
@@ -554,10 +583,7 @@
     return false;
 }
 
-bool Demux::filterAndOutputData() {
-    Result result;
-    set<uint32_t>::iterator it;
-
+bool Demux::readInputFMQ() {
     // Read input data from the input FMQ
     int size = mInputMQ->availableToRead();
     int inputPacketSize = mInputSettings.packetSize;
@@ -566,15 +592,29 @@
 
     // Dispatch the packet to the PID matching filter output buffer
     for (int i = 0; i < size / inputPacketSize; i++) {
-        mInputMQ->read(dataOutputBuffer.data(), inputPacketSize);
-        for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
-            uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff));
-            if (pid == mFilterPids[*it]) {
-                mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(),
-                                           dataOutputBuffer.end());
-            }
+        if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
+            return false;
+        }
+        startTsFilter(dataOutputBuffer);
+    }
+
+    return true;
+}
+
+void Demux::startTsFilter(vector<uint8_t> data) {
+    set<uint32_t>::iterator it;
+    for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
+        uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
+        ALOGW("start ts filter pid: %d", pid);
+        if (pid == mFilterPids[*it]) {
+            mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
         }
     }
+}
+
+bool Demux::startFilterDispatcher() {
+    Result result;
+    set<uint32_t>::iterator it;
 
     // Handle the output data per filter type
     for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
@@ -620,6 +660,7 @@
 
 void Demux::filterThreadLoop(uint32_t filterId) {
     ALOGD("[Demux] filter %d threadLoop start.", filterId);
+    std::lock_guard<std::mutex> lock(mFilterThreadLock);
     mFilterThreadRunning[filterId] = true;
 
     // For the first time of filter output, implementation needs to send the filter
@@ -682,6 +723,7 @@
 
 void Demux::inputThreadLoop() {
     ALOGD("[Demux] input threadLoop start.");
+    std::lock_guard<std::mutex> lock(mInputThreadLock);
     mInputThreadRunning = true;
 
     while (mInputThreadRunning) {
@@ -695,7 +737,7 @@
         }
         // Our current implementation filter the data and write it into the filter FMQ immedaitely
         // after the DATA_READY from the VTS/framework
-        if (!filterAndOutputData()) {
+        if (!readInputFMQ() || !startFilterDispatcher()) {
             ALOGD("[Demux] input data failed to be filtered. Ending thread");
             break;
         }
@@ -735,6 +777,70 @@
     return mIntputStatus;
 }
 
+Result Demux::startBroadcastInputLoop() {
+    pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
+    pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
+
+    return Result::SUCCESS;
+}
+
+void* Demux::__threadLoopBroadcast(void* user) {
+    Demux* const self = static_cast<Demux*>(user);
+    self->broadcastInputThreadLoop();
+    return 0;
+}
+
+void Demux::broadcastInputThreadLoop() {
+    std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+    mBroadcastInputThreadRunning = true;
+    mKeepFetchingDataFromFrontend = true;
+
+    // open the stream and get its length
+    std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
+    // TODO take the packet size from the frontend setting
+    int packetSize = 188;
+    int writePacketAmount = 6;
+    char* buffer = new char[packetSize];
+    ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
+    if (!inputData.is_open()) {
+        mBroadcastInputThreadRunning = false;
+        ALOGW("[Demux] Error %s", strerror(errno));
+    }
+
+    while (mBroadcastInputThreadRunning) {
+        // move the stream pointer for packet size * 6 every read until the end
+        while (mKeepFetchingDataFromFrontend) {
+            for (int i = 0; i < writePacketAmount; i++) {
+                inputData.read(buffer, packetSize);
+                if (!inputData) {
+                    mBroadcastInputThreadRunning = false;
+                    break;
+                }
+                // filter and dispatch filter output
+                vector<uint8_t> byteBuffer;
+                byteBuffer.resize(sizeof(buffer));
+                for (int index = 0; index < byteBuffer.size(); index++) {
+                    byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
+                }
+                startTsFilter(byteBuffer);
+                inputData.seekg(packetSize, inputData.cur);
+            }
+            startFilterDispatcher();
+            sleep(1);
+        }
+    }
+
+    ALOGW("[Demux] Broadcast Input thread end.");
+    delete[] buffer;
+    inputData.close();
+}
+
+void Demux::stopBroadcastInput() {
+    mKeepFetchingDataFromFrontend = false;
+    mBroadcastInputThreadRunning = false;
+    std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+}
+
 }  // namespace implementation
 }  // namespace V1_0
 }  // namespace tuner
diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h
index eaf6ac3..e4a4e2b 100644
--- a/tv/tuner/1.0/default/Demux.h
+++ b/tv/tuner/1.0/default/Demux.h
@@ -20,6 +20,8 @@
 #include <android/hardware/tv/tuner/1.0/IDemux.h>
 #include <fmq/MessageQueue.h>
 #include <set>
+#include "Frontend.h"
+#include "Tuner.h"
 
 using namespace std;
 
@@ -40,9 +42,12 @@
 
 using FilterMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>;
 
+class Tuner;
+class Frontend;
+
 class Demux : public IDemux {
   public:
-    Demux(uint32_t demuxId);
+    Demux(uint32_t demuxId, sp<Tuner> tuner);
 
     ~Demux();
 
@@ -103,7 +108,17 @@
 
     virtual Return<Result> removeOutput() override;
 
+    // Functions interacts with Tuner Service
+    void stopBroadcastInput();
+
   private:
+    // Tuner service
+    sp<Tuner> mTunerService;
+
+    // Frontend source
+    sp<Frontend> mFrontend;
+    string mFrontendSourceFile;
+
     // A struct that passes the arguments to a newly created filter thread
     struct ThreadArgs {
         Demux* user;
@@ -122,6 +137,7 @@
     Result startRecordFilterHandler(uint32_t filterId);
     Result startPcrFilterHandler();
     Result startFilterLoop(uint32_t filterId);
+    Result startBroadcastInputLoop();
 
     /**
      * To create a FilterMQ with the the next available Filter ID.
@@ -143,14 +159,17 @@
      * A dispatcher to read and dispatch input data to all the started filters.
      * Each filter handler handles the data filtering/output writing/filterEvent updating.
      */
-    bool filterAndOutputData();
+    bool readInputFMQ();
+    void startTsFilter(vector<uint8_t> data);
+    bool startFilterDispatcher();
     static void* __threadLoopFilter(void* data);
     static void* __threadLoopInput(void* user);
+    static void* __threadLoopBroadcast(void* user);
     void filterThreadLoop(uint32_t filterId);
     void inputThreadLoop();
+    void broadcastInputThreadLoop();
 
     uint32_t mDemuxId;
-    uint32_t mSourceFrontendId;
     /**
      * Record the last used filter id. Initial value is -1.
      * Filter Id starts with 0.
@@ -195,6 +214,7 @@
     // Thread handlers
     pthread_t mInputThread;
     pthread_t mOutputThread;
+    pthread_t mBroadcastInputThread;
     vector<pthread_t> mFilterThreads;
 
     // FMQ status local records
@@ -204,6 +224,8 @@
      */
     vector<bool> mFilterThreadRunning;
     bool mInputThreadRunning;
+    bool mBroadcastInputThreadRunning;
+    bool mKeepFetchingDataFromFrontend;
     /**
      * Lock to protect writes to the FMQs
      */
@@ -217,6 +239,9 @@
      * Lock to protect writes to the input status
      */
     std::mutex mInputStatusLock;
+    std::mutex mBroadcastInputThreadLock;
+    std::mutex mFilterThreadLock;
+    std::mutex mInputThreadLock;
     /**
      * How many times a filter should write
      * TODO make this dynamic/random/can take as a parameter
diff --git a/tv/tuner/1.0/default/Frontend.cpp b/tv/tuner/1.0/default/Frontend.cpp
index 6f87d1b..1e07edd 100644
--- a/tv/tuner/1.0/default/Frontend.cpp
+++ b/tv/tuner/1.0/default/Frontend.cpp
@@ -27,14 +27,10 @@
 namespace V1_0 {
 namespace implementation {
 
-Frontend::Frontend() {
-    // Init callback to nullptr
-    mCallback = nullptr;
-}
-
-Frontend::Frontend(FrontendType type, FrontendId id) {
+Frontend::Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner) {
     mType = type;
     mId = id;
+    mTunerService = tuner;
     // Init callback to nullptr
     mCallback = nullptr;
 }
@@ -67,13 +63,18 @@
         return Result::INVALID_STATE;
     }
 
-    mCallback->onEvent(FrontendEventType::NO_SIGNAL);
+    // TODO dynamically allocate file to the source file
+    mSourceStreamFile = FRONTEND_STREAM_FILE;
+
+    mCallback->onEvent(FrontendEventType::LOCKED);
     return Result::SUCCESS;
 }
 
 Return<Result> Frontend::stopTune() {
     ALOGV("%s", __FUNCTION__);
 
+    mTunerService->frontendStopTune(mId);
+
     return Result::SUCCESS;
 }
 
@@ -119,6 +120,10 @@
     return mId;
 }
 
+string Frontend::getSourceFile() {
+    return mSourceStreamFile;
+}
+
 }  // namespace implementation
 }  // namespace V1_0
 }  // namespace tuner
diff --git a/tv/tuner/1.0/default/Frontend.h b/tv/tuner/1.0/default/Frontend.h
index f881e8b..07fa7b9 100644
--- a/tv/tuner/1.0/default/Frontend.h
+++ b/tv/tuner/1.0/default/Frontend.h
@@ -18,7 +18,9 @@
 #define ANDROID_HARDWARE_TV_TUNER_V1_0_FRONTEND_H_
 
 #include <android/hardware/tv/tuner/1.0/IFrontend.h>
-#include <android/hardware/tv/tuner/1.0/ITuner.h>
+#include <fstream>
+#include <iostream>
+#include "Tuner.h"
 
 using namespace std;
 
@@ -35,11 +37,11 @@
 using ::android::hardware::tv::tuner::V1_0::IFrontendCallback;
 using ::android::hardware::tv::tuner::V1_0::Result;
 
+class Tuner;
+
 class Frontend : public IFrontend {
   public:
-    Frontend();
-
-    Frontend(FrontendType type, FrontendId id);
+    Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner);
 
     virtual Return<Result> close() override;
 
@@ -64,11 +66,18 @@
 
     FrontendId getFrontendId();
 
+    string getSourceFile();
+
   private:
     virtual ~Frontend();
     sp<IFrontendCallback> mCallback;
+    sp<Tuner> mTunerService;
     FrontendType mType = FrontendType::UNDEFINED;
     FrontendId mId = 0;
+
+    const string FRONTEND_STREAM_FILE = "/vendor/etc/test1.ts";
+    string mSourceStreamFile;
+    std::ifstream mFrontendData;
 };
 
 }  // namespace implementation
diff --git a/tv/tuner/1.0/default/Tuner.cpp b/tv/tuner/1.0/default/Tuner.cpp
index a4a8ca5..f86b28d 100644
--- a/tv/tuner/1.0/default/Tuner.cpp
+++ b/tv/tuner/1.0/default/Tuner.cpp
@@ -38,14 +38,14 @@
     // Array index matches their FrontendId in the default impl
     mFrontendSize = 8;
     mFrontends.resize(mFrontendSize);
-    mFrontends[0] = new Frontend();
-    mFrontends[1] = new Frontend(FrontendType::ATSC, 1);
-    mFrontends[2] = new Frontend(FrontendType::DVBC, 2);
-    mFrontends[3] = new Frontend(FrontendType::DVBS, 3);
-    mFrontends[4] = new Frontend(FrontendType::DVBT, 4);
-    mFrontends[5] = new Frontend(FrontendType::ISDBT, 5);
-    mFrontends[6] = new Frontend(FrontendType::ANALOG, 6);
-    mFrontends[7] = new Frontend(FrontendType::ATSC, 7);
+    mFrontends[0] = new Frontend(FrontendType::DVBT, 0, this);
+    mFrontends[1] = new Frontend(FrontendType::ATSC, 1, this);
+    mFrontends[2] = new Frontend(FrontendType::DVBC, 2, this);
+    mFrontends[3] = new Frontend(FrontendType::DVBS, 3, this);
+    mFrontends[4] = new Frontend(FrontendType::DVBT, 4, this);
+    mFrontends[5] = new Frontend(FrontendType::ISDBT, 5, this);
+    mFrontends[6] = new Frontend(FrontendType::ANALOG, 6, this);
+    mFrontends[7] = new Frontend(FrontendType::ATSC, 7, this);
 }
 
 Tuner::~Tuner() {}
@@ -81,7 +81,8 @@
 
     DemuxId demuxId = mLastUsedId + 1;
     mLastUsedId += 1;
-    sp<IDemux> demux = new Demux(demuxId);
+    sp<Demux> demux = new Demux(demuxId, this);
+    mDemuxes[demuxId] = demux;
 
     _hidl_cb(Result::SUCCESS, demuxId, demux);
     return Void();
@@ -132,6 +133,25 @@
     return Void();
 }
 
+sp<Frontend> Tuner::getFrontendById(uint32_t frontendId) {
+    ALOGV("%s", __FUNCTION__);
+
+    return mFrontends[frontendId];
+}
+
+void Tuner::setFrontendAsDemuxSource(uint32_t frontendId, uint32_t demuxId) {
+    mFrontendToDemux[frontendId] = demuxId;
+}
+
+void Tuner::frontendStopTune(uint32_t frontendId) {
+    map<uint32_t, uint32_t>::iterator it = mFrontendToDemux.find(frontendId);
+    uint32_t demuxId;
+    if (it != mFrontendToDemux.end()) {
+        demuxId = it->second;
+        mDemuxes[demuxId]->stopBroadcastInput();
+    }
+}
+
 }  // namespace implementation
 }  // namespace V1_0
 }  // namespace tuner
diff --git a/tv/tuner/1.0/default/Tuner.h b/tv/tuner/1.0/default/Tuner.h
index c089864..96da257 100644
--- a/tv/tuner/1.0/default/Tuner.h
+++ b/tv/tuner/1.0/default/Tuner.h
@@ -18,6 +18,8 @@
 #define ANDROID_HARDWARE_TV_TUNER_V1_0_TUNER_H_
 
 #include <android/hardware/tv/tuner/1.0/ITuner.h>
+#include <map>
+#include "Demux.h"
 #include "Frontend.h"
 
 using namespace std;
@@ -29,6 +31,9 @@
 namespace V1_0 {
 namespace implementation {
 
+class Frontend;
+class Demux;
+
 class Tuner : public ITuner {
   public:
     Tuner();
@@ -50,10 +55,18 @@
 
     virtual Return<void> openLnbById(LnbId lnbId, openLnbById_cb _hidl_cb) override;
 
+    sp<Frontend> getFrontendById(uint32_t frontendId);
+
+    void setFrontendAsDemuxSource(uint32_t frontendId, uint32_t demuxId);
+
+    void frontendStopTune(uint32_t frontendId);
+
   private:
     virtual ~Tuner();
     // Static mFrontends array to maintain local frontends information
     vector<sp<Frontend>> mFrontends;
+    std::map<uint32_t, uint32_t> mFrontendToDemux;
+    std::map<uint32_t, sp<Demux>> mDemuxes;
     // To maintain how many Frontends we have
     int mFrontendSize;
     // The last used demux id. Initial value is -1.