Terminate IPTV read thread on demux close

IPTV reading thread doesn't have a termination condition. This CL
adds a flag mIsIptvReadThreadTerminated which tracks whether the
thread resources should be cleaned up.

Bug: 288170590
Test: atest VtsHalTvTunerTargetTest
Change-Id: I3a19e1045ee67dac2d95457d217adb1375674ed4
diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp
index a9c205f..5eeb36a 100644
--- a/tv/tuner/aidl/default/Demux.cpp
+++ b/tv/tuner/aidl/default/Demux.cpp
@@ -53,9 +53,6 @@
 
 Demux::~Demux() {
     ALOGV("%s", __FUNCTION__);
-    if (mDemuxIptvReadThread.joinable()) {
-        mDemuxIptvReadThread.join();
-    }
     close();
 }
 
@@ -123,12 +120,17 @@
     mIsIptvThreadRunningCv.notify_all();
 }
 
-void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer) {
+void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf) {
     Timer *timer, *fullBufferTimer;
     bool isTuneBytePushedToDvr = false;
     while (true) {
         std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
-        mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; });
+        mIsIptvThreadRunningCv.wait(
+                lock, [this] { return mIsIptvReadThreadRunning || mIsIptvReadThreadTerminated; });
+        if (mIsIptvReadThreadTerminated) {
+            ALOGI("[Demux] IPTV reading thread for playback terminated");
+            break;
+        }
         if (mIsIptvDvrFMQFull &&
             fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) {
             ALOGE("DVR FMQ has not been flushed within timeout of %d ms",
@@ -137,11 +139,6 @@
             break;
         }
         timer = new Timer();
-        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
-        if (buf == nullptr) {
-            ALOGE("[Demux] Buffer allocation failed");
-            return;
-        }
         ssize_t bytes_read;
         void* tuneByteBuffer = mFrontend->getTuneByteBuffer();
         if (!isTuneBytePushedToDvr && tuneByteBuffer != nullptr) {
@@ -187,8 +184,6 @@
             default:
                 ALOGI("Invalid DVR Status");
         }
-
-        free(buf);
     }
 }
 
@@ -251,9 +246,16 @@
                         static_cast<int32_t>(Result::INVALID_STATE));
             }
         }
-
+        stopIptvFrontendInput();
+        mIsIptvReadThreadTerminated = false;
+        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
+        if (buf == nullptr) {
+            ALOGE("[Demux] Buffer allocation failed");
+            return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                    static_cast<int32_t>(Result::INVALID_STATE));
+        }
         mDemuxIptvReadThread =
-                std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer);
+                std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer, buf);
     }
     return ::ndk::ScopedAStatus::ok();
 }
@@ -370,6 +372,7 @@
     ALOGV("%s", __FUNCTION__);
 
     stopFrontendInput();
+    stopIptvFrontendInput();
 
     set<int64_t>::iterator it;
     for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
@@ -565,6 +568,15 @@
     }
 }
 
+void Demux::stopIptvFrontendInput() {
+    ALOGD("[Demux] stop iptv frontend on demux");
+    if (mDemuxIptvReadThread.joinable()) {
+        mIsIptvReadThreadTerminated = true;
+        mIsIptvThreadRunningCv.notify_all();
+        mDemuxIptvReadThread.join();
+    }
+}
+
 void Demux::setIsRecording(bool isRecording) {
     mIsRecording = isRecording;
 }
diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h
index b8d57df..af040d4 100644
--- a/tv/tuner/aidl/default/Demux.h
+++ b/tv/tuner/aidl/default/Demux.h
@@ -106,7 +106,7 @@
     void setIsRecording(bool isRecording);
     bool isRecording();
     void startFrontendInputLoop();
-    void frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer);
+    void frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf);
 
     /**
      * A dispatcher to read and dispatch input data to all the started filters.
@@ -130,6 +130,10 @@
      * Setter for IPTV Reading thread
      */
     void setIptvThreadRunning(bool isIptvThreadRunning);
+    /**
+     * Stops IPTV playback reading thread.
+     */
+    void stopIptvFrontendInput();
 
   private:
     // Tuner service
@@ -208,7 +212,8 @@
     /**
      * Controls IPTV reading thread status
      */
-    bool mIsIptvReadThreadRunning;
+    bool mIsIptvReadThreadRunning = false;
+    std::atomic<bool> mIsIptvReadThreadTerminated = false;
     std::mutex mIsIptvThreadRunningMutex;
     std::condition_variable mIsIptvThreadRunningCv;