Merge "Demux thread reads data after filter start" into main
diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp
index 34e3442..de94467 100644
--- a/tv/tuner/aidl/default/Demux.cpp
+++ b/tv/tuner/aidl/default/Demux.cpp
@@ -53,6 +53,9 @@
 
 Demux::~Demux() {
     ALOGV("%s", __FUNCTION__);
+    if (mDemuxIptvReadThread.joinable()) {
+        mDemuxIptvReadThread.join();
+    }
     close();
 }
 
@@ -114,16 +117,26 @@
     }
 }
 
-void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf,
-                               size_t buf_size, int timeout_ms, int buffer_timeout) {
+void Demux::setIptvThreadRunning(bool isIptvThreadRunning) {
+    std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
+    mIsIptvReadThreadRunning = isIptvThreadRunning;
+    mIsIptvThreadRunningCv.notify_all();
+}
+
+void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size,
+                               int timeout_ms, int buffer_timeout) {
     Timer *timer, *fullBufferTimer;
-    while (mDemuxIptvReadThreadRunning) {
+    while (true) {
+        std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
+        mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; });
         if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
             ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
             delete fullBufferTimer;
             break;
         }
         timer = new Timer();
+        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
+        if (buf == nullptr) ALOGI("Buffer allocation failed");
         ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
         if (bytes_read == 0) {
             double elapsed_time = timer->get_elapsed_time_ms();
@@ -157,8 +170,9 @@
             default:
                 ALOGI("Invalid DVR Status");
         }
+
+        free(buf);
     }
-    mDemuxIptvReadThreadRunning = false;
 }
 
 ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) {
@@ -216,17 +230,8 @@
         // while thread is alive, keep reading data
         int timeout_ms = 20;
         int buffer_timeout = 10000;  // 10s
-        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
-        if (buf == nullptr) ALOGI("malloc buf failed");
-        ALOGI("[   INFO   ] Allocated buffer of size %d", IPTV_BUFFER_SIZE);
-        ALOGI("Getting FMQ from DVR instance to write socket data");
-        mDemuxIptvReadThreadRunning = true;
         mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
-                                           buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
-        if (mDemuxIptvReadThread.joinable()) {
-            mDemuxIptvReadThread.join();
-        }
-        free(buf);
+                                           IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
     }
     return ::ndk::ScopedAStatus::ok();
 }
diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h
index a23063f..ad7b7a7 100644
--- a/tv/tuner/aidl/default/Demux.h
+++ b/tv/tuner/aidl/default/Demux.h
@@ -103,7 +103,7 @@
     void setIsRecording(bool isRecording);
     bool isRecording();
     void startFrontendInputLoop();
-    void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size,
+    void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size,
                             int timeout_ms, int buffer_timeout);
 
     /**
@@ -124,6 +124,11 @@
     void setInUse(bool inUse);
     void setTunerService(std::shared_ptr<Tuner> tuner);
 
+    /**
+     * Setter for IPTV Reading thread
+     */
+    void setIptvThreadRunning(bool isIptvThreadRunning);
+
   private:
     // Tuner service
     std::shared_ptr<Tuner> mTuner;
@@ -196,10 +201,16 @@
      * If a specific filter's writing loop is still running
      */
     std::atomic<bool> mFrontendInputThreadRunning;
-    std::atomic<bool> mDemuxIptvReadThreadRunning;
     std::atomic<bool> mKeepFetchingDataFromFrontend;
 
     /**
+     * Controls IPTV reading thread status
+     */
+    bool mIsIptvReadThreadRunning;
+    std::mutex mIsIptvThreadRunningMutex;
+    std::condition_variable mIsIptvThreadRunningCv;
+
+    /**
      * If the dvr recording is running.
      */
     bool mIsRecording = false;
diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp
index d8f5dd5..212d329 100644
--- a/tv/tuner/aidl/default/Filter.cpp
+++ b/tv/tuner/aidl/default/Filter.cpp
@@ -328,6 +328,8 @@
     std::vector<DemuxFilterEvent> events;
 
     mFilterCount += 1;
+    mDemux->setIptvThreadRunning(true);
+
     // All the filter event callbacks in start are for testing purpose.
     switch (mType.mainType) {
         case DemuxFilterMainType::TS:
@@ -365,6 +367,9 @@
     ALOGV("%s", __FUNCTION__);
 
     mFilterCount -= 1;
+    if (mFilterCount == 0) {
+        mDemux->setIptvThreadRunning(false);
+    }
 
     mFilterThreadRunning = false;
     if (mFilterThread.joinable()) {