Merge changes from topic "iptv_vts_fix" into main

* changes:
  Reenable VTS tests for IPTV
  Terminate IPTV read thread on demux close
  Fix tune byte alignment
  mFilterCount cannot be negative
  Refactor plugin interface, streamer creation
diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp
index de94467..5eeb36a 100644
--- a/tv/tuner/aidl/default/Demux.cpp
+++ b/tv/tuner/aidl/default/Demux.cpp
@@ -53,9 +53,6 @@
 
 Demux::~Demux() {
     ALOGV("%s", __FUNCTION__);
-    if (mDemuxIptvReadThread.joinable()) {
-        mDemuxIptvReadThread.join();
-    }
     close();
 }
 
@@ -123,26 +120,43 @@
     mIsIptvThreadRunningCv.notify_all();
 }
 
-void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size,
-                               int timeout_ms, int buffer_timeout) {
+void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf) {
     Timer *timer, *fullBufferTimer;
+    bool isTuneBytePushedToDvr = false;
     while (true) {
         std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
-        mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; });
-        if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
-            ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
+        mIsIptvThreadRunningCv.wait(
+                lock, [this] { return mIsIptvReadThreadRunning || mIsIptvReadThreadTerminated; });
+        if (mIsIptvReadThreadTerminated) {
+            ALOGI("[Demux] IPTV reading thread for playback terminated");
+            break;
+        }
+        if (mIsIptvDvrFMQFull &&
+            fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) {
+            ALOGE("DVR FMQ has not been flushed within timeout of %d ms",
+                  IPTV_PLAYBACK_BUFFER_TIMEOUT);
             delete fullBufferTimer;
             break;
         }
         timer = new Timer();
-        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
-        if (buf == nullptr) ALOGI("Buffer allocation failed");
-        ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
-        if (bytes_read == 0) {
+        ssize_t bytes_read;
+        void* tuneByteBuffer = mFrontend->getTuneByteBuffer();
+        if (!isTuneBytePushedToDvr && tuneByteBuffer != nullptr) {
+            memcpy(buf, tuneByteBuffer, 1);
+            char* offsetBuf = (char*)buf + 1;
+            bytes_read = interface->read_stream(streamer, (void*)offsetBuf, IPTV_BUFFER_SIZE - 1,
+                                                IPTV_PLAYBACK_TIMEOUT);
+            isTuneBytePushedToDvr = true;
+        } else {
+            bytes_read =
+                    interface->read_stream(streamer, buf, IPTV_BUFFER_SIZE, IPTV_PLAYBACK_TIMEOUT);
+        }
+
+        if (bytes_read <= 0) {
             double elapsed_time = timer->get_elapsed_time_ms();
-            if (elapsed_time > timeout_ms) {
+            if (elapsed_time > IPTV_PLAYBACK_TIMEOUT) {
                 ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
-                      timeout_ms);
+                      IPTV_PLAYBACK_TIMEOUT);
             }
             ALOGE("[Demux] Cannot read data from the socket");
             delete timer;
@@ -170,8 +184,6 @@
             default:
                 ALOGI("Invalid DVR Status");
         }
-
-        free(buf);
     }
 }
 
@@ -206,32 +218,44 @@
 
         // get plugin interface from frontend
         dtv_plugin* interface = mFrontend->getIptvPluginInterface();
+        // if plugin interface is not on frontend, create a new plugin interface
         if (interface == nullptr) {
-            ALOGE("[Demux] getIptvPluginInterface(): plugin interface is null");
-            return ::ndk::ScopedAStatus::fromServiceSpecificError(
-                    static_cast<int32_t>(Result::INVALID_STATE));
+            interface = mFrontend->createIptvPluginInterface();
+            if (interface == nullptr) {
+                ALOGE("[   INFO   ] Failed to load plugin.");
+                return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                        static_cast<int32_t>(Result::INVALID_STATE));
+            }
         }
-        ALOGI("[Demux] getIptvPluginInterface(): plugin interface is not null");
+
+        // get transport description from frontend
+        string transport_desc = mFrontend->getIptvTransportDescription();
+        if (transport_desc.empty()) {
+            string content_url = "rtp://127.0.0.1:12345";
+            transport_desc = "{ \"uri\": \"" + content_url + "\"}";
+        }
+        ALOGI("[Demux] transport_desc: %s", transport_desc.c_str());
 
         // get streamer object from Frontend instance
         dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
         if (streamer == nullptr) {
-            ALOGE("[Demux] getIptvPluginStreamer(): streamer is null");
+            streamer = mFrontend->createIptvPluginStreamer(interface, transport_desc.c_str());
+            if (streamer == nullptr) {
+                ALOGE("[   INFO   ] Failed to open stream");
+                return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                        static_cast<int32_t>(Result::INVALID_STATE));
+            }
+        }
+        stopIptvFrontendInput();
+        mIsIptvReadThreadTerminated = false;
+        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
+        if (buf == nullptr) {
+            ALOGE("[Demux] Buffer allocation failed");
             return ::ndk::ScopedAStatus::fromServiceSpecificError(
                     static_cast<int32_t>(Result::INVALID_STATE));
         }
-        ALOGI("[Demux] getIptvPluginStreamer(): streamer is not null");
-
-        // get transport description from frontend
-        string transport_desc = mFrontend->getIptvTransportDescription();
-        ALOGI("[Demux] getIptvTransportDescription(): transport_desc: %s", transport_desc.c_str());
-
-        // call read_stream on the socket to populate the buffer with TS data
-        // while thread is alive, keep reading data
-        int timeout_ms = 20;
-        int buffer_timeout = 10000;  // 10s
-        mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
-                                           IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
+        mDemuxIptvReadThread =
+                std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer, buf);
     }
     return ::ndk::ScopedAStatus::ok();
 }
@@ -348,6 +372,7 @@
     ALOGV("%s", __FUNCTION__);
 
     stopFrontendInput();
+    stopIptvFrontendInput();
 
     set<int64_t>::iterator it;
     for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
@@ -543,6 +568,15 @@
     }
 }
 
+void Demux::stopIptvFrontendInput() {
+    ALOGD("[Demux] stop iptv frontend on demux");
+    if (mDemuxIptvReadThread.joinable()) {
+        mIsIptvReadThreadTerminated = true;
+        mIsIptvThreadRunningCv.notify_all();
+        mDemuxIptvReadThread.join();
+    }
+}
+
 void Demux::setIsRecording(bool isRecording) {
     mIsRecording = isRecording;
 }
diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h
index ad7b7a7..af040d4 100644
--- a/tv/tuner/aidl/default/Demux.h
+++ b/tv/tuner/aidl/default/Demux.h
@@ -56,6 +56,9 @@
 class TimeFilter;
 class Tuner;
 
+const int IPTV_PLAYBACK_TIMEOUT = 20;            // ms
+const int IPTV_PLAYBACK_BUFFER_TIMEOUT = 20000;  // ms
+
 class DvrPlaybackCallback : public BnDvrCallback {
   public:
     virtual ::ndk::ScopedAStatus onPlaybackStatus(PlaybackStatus status) override {
@@ -103,8 +106,7 @@
     void setIsRecording(bool isRecording);
     bool isRecording();
     void startFrontendInputLoop();
-    void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size,
-                            int timeout_ms, int buffer_timeout);
+    void frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf);
 
     /**
      * A dispatcher to read and dispatch input data to all the started filters.
@@ -128,6 +130,10 @@
      * Setter for IPTV Reading thread
      */
     void setIptvThreadRunning(bool isIptvThreadRunning);
+    /**
+     * Stops IPTV playback reading thread.
+     */
+    void stopIptvFrontendInput();
 
   private:
     // Tuner service
@@ -206,7 +212,8 @@
     /**
      * Controls IPTV reading thread status
      */
-    bool mIsIptvReadThreadRunning;
+    bool mIsIptvReadThreadRunning = false;
+    std::atomic<bool> mIsIptvReadThreadTerminated = false;
     std::mutex mIsIptvThreadRunningMutex;
     std::condition_variable mIsIptvThreadRunningCv;
 
diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp
index 212d329..5f7a4cd 100644
--- a/tv/tuner/aidl/default/Filter.cpp
+++ b/tv/tuner/aidl/default/Filter.cpp
@@ -366,9 +366,11 @@
 ::ndk::ScopedAStatus Filter::stop() {
     ALOGV("%s", __FUNCTION__);
 
-    mFilterCount -= 1;
-    if (mFilterCount == 0) {
-        mDemux->setIptvThreadRunning(false);
+    if (mFilterCount > 0) {
+        mFilterCount -= 1;
+        if (mFilterCount.load() == 0) {
+            mDemux->setIptvThreadRunning(false);
+        }
     }
 
     mFilterThreadRunning = false;
diff --git a/tv/tuner/aidl/default/Frontend.cpp b/tv/tuner/aidl/default/Frontend.cpp
index 57ed1ba..1031604 100644
--- a/tv/tuner/aidl/default/Frontend.cpp
+++ b/tv/tuner/aidl/default/Frontend.cpp
@@ -188,6 +188,9 @@
     mCallback = nullptr;
     mIsLocked = false;
     mTuner = nullptr;
+    if (mTuneByteBuffer != nullptr) {
+        free(mTuneByteBuffer);
+    }
 }
 
 ::ndk::ScopedAStatus Frontend::close() {
@@ -215,19 +218,43 @@
     return ::ndk::ScopedAStatus::ok();
 }
 
-void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size, int timeout_ms) {
-    ssize_t bytes_read = mIptvPluginInterface->read_stream(streamer, buf, buf_size, timeout_ms);
+dtv_plugin* Frontend::createIptvPluginInterface() {
+    const char* path = "/vendor/lib/iptv_udp_plugin.so";
+    DtvPlugin* plugin = new DtvPlugin(path);
+    bool plugin_loaded = plugin->load();
+    if (!plugin_loaded) {
+        ALOGE("Failed to load plugin");
+        return nullptr;
+    }
+    return plugin->interface();
+}
+
+dtv_streamer* Frontend::createIptvPluginStreamer(dtv_plugin* interface,
+                                                 const char* transport_desc) {
+    dtv_streamer* streamer = interface->create_streamer();
+    int open_fd = interface->open_stream(streamer, transport_desc);
+    if (open_fd < 0) {
+        return nullptr;
+    }
+    ALOGI("[   INFO   ] open_stream successful, open_fd=%d", open_fd);
+    return streamer;
+}
+
+void Frontend::readTuneByte(void* buf) {
+    ssize_t bytes_read = mIptvPluginInterface->read_stream(mIptvPluginStreamer, buf,
+                                                           TUNE_BUFFER_SIZE, TUNE_BUFFER_TIMEOUT);
     if (bytes_read <= 0) {
         ALOGI("[   ERROR   ] Tune byte couldn't be read.");
         return;
     }
     mCallback->onEvent(FrontendEventType::LOCKED);
     mIsLocked = true;
+    mTuneByteBuffer = buf;
 }
 
 ::ndk::ScopedAStatus Frontend::tune(const FrontendSettings& in_settings) {
     if (mCallback == nullptr) {
-        ALOGW("[   WARN   ] Frontend callback is not set for tunin0g");
+        ALOGW("[   WARN   ] Frontend callback is not set for tuning");
         return ::ndk::ScopedAStatus::fromServiceSpecificError(
                 static_cast<int32_t>(Result::INVALID_STATE));
     }
@@ -242,54 +269,39 @@
         ALOGI("[   INFO   ] Frontend type is set to IPTV, tag = %d id=%d", in_settings.getTag(),
               mId);
 
-        // load udp plugin for reading TS data
-        const char* path = "/vendor/lib/iptv_udp_plugin.so";
-        DtvPlugin* plugin = new DtvPlugin(path);
-        if (!plugin) {
-            ALOGE("Failed to create DtvPlugin, plugin_path is invalid");
+        mIptvPluginInterface = createIptvPluginInterface();
+        if (mIptvPluginInterface == nullptr) {
+            ALOGE("[   INFO   ] Failed to load plugin.");
             return ::ndk::ScopedAStatus::fromServiceSpecificError(
                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
         }
-        bool plugin_loaded = plugin->load();
-        if (!plugin_loaded) {
-            ALOGE("Failed to load plugin");
-            return ::ndk::ScopedAStatus::fromServiceSpecificError(
-                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
-        }
-        mIptvPluginInterface = plugin->interface();
 
         // validate content_url format
         std::string content_url = in_settings.get<FrontendSettings::Tag::iptv>()->contentUrl;
-        std::string transport_desc = "{ \"uri\": \"" + content_url + "\"}";
-        ALOGI("[   INFO   ] transport_desc: %s", transport_desc.c_str());
-        bool is_transport_desc_valid = plugin->validate(transport_desc.c_str());
+        mIptvTransportDescription = "{ \"uri\": \"" + content_url + "\"}";
+        ALOGI("[   INFO   ] transport_desc: %s", mIptvTransportDescription.c_str());
+        bool is_transport_desc_valid =
+                mIptvPluginInterface->validate(mIptvTransportDescription.c_str());
         if (!is_transport_desc_valid) {  // not of format protocol://ip:port
             ALOGE("[   INFO   ] transport_desc is not valid");
             return ::ndk::ScopedAStatus::fromServiceSpecificError(
                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
         }
-        mIptvTransportDescription = transport_desc;
 
         // create a streamer and open it for reading data
-        dtv_streamer* streamer = mIptvPluginInterface->create_streamer();
-        mIptvPluginStreamer = streamer;
-        int open_fd = mIptvPluginInterface->open_stream(streamer, transport_desc.c_str());
-        if (open_fd < 0) {
-            ALOGE("[   INFO   ] could not open stream");
-            return ::ndk::ScopedAStatus::fromServiceSpecificError(
-                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
-        }
-        ALOGI("[   INFO   ] open_stream successful, open_fd=%d", open_fd);
+        mIptvPluginStreamer =
+                createIptvPluginStreamer(mIptvPluginInterface, mIptvTransportDescription.c_str());
 
-        size_t buf_size = 1;
-        int timeout_ms = 2000;
-        void* buf = malloc(sizeof(char) * buf_size);
-        if (buf == nullptr) ALOGI("malloc buf failed [TUNE]");
-        ALOGI("[   INFO   ] [Tune] Allocated buffer of size %zu", buf_size);
-        mIptvFrontendTuneThread =
-                std::thread(&Frontend::readTuneByte, this, streamer, buf, buf_size, timeout_ms);
-        if (mIptvFrontendTuneThread.joinable()) mIptvFrontendTuneThread.join();
-        free(buf);
+        void* buf = malloc(sizeof(char) * TUNE_BUFFER_SIZE);
+        if (buf == nullptr) {
+            ALOGE("Failed to allocate 1 byte buffer for tuning.");
+            return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                    static_cast<int32_t>(Result::INVALID_STATE));
+        }
+        mIptvFrontendTuneThread = std::thread(&Frontend::readTuneByte, this, buf);
+        if (mIptvFrontendTuneThread.joinable()) {
+            mIptvFrontendTuneThread.join();
+        }
     }
 
     return ::ndk::ScopedAStatus::ok();
diff --git a/tv/tuner/aidl/default/Frontend.h b/tv/tuner/aidl/default/Frontend.h
index 17a1aee..f3f8a87 100644
--- a/tv/tuner/aidl/default/Frontend.h
+++ b/tv/tuner/aidl/default/Frontend.h
@@ -33,6 +33,9 @@
 
 class Tuner;
 
+const int TUNE_BUFFER_SIZE = 1;        // byte
+const int TUNE_BUFFER_TIMEOUT = 2000;  // ms
+
 class Frontend : public BnFrontend {
   public:
     Frontend(FrontendType type, int32_t id);
@@ -64,7 +67,10 @@
     dtv_plugin* getIptvPluginInterface();
     string getIptvTransportDescription();
     dtv_streamer* getIptvPluginStreamer();
-    void readTuneByte(dtv_streamer* streamer, void* buf, size_t size, int timeout_ms);
+    void readTuneByte(void* buf);
+    void* getTuneByteBuffer() { return mTuneByteBuffer; };
+    dtv_streamer* createIptvPluginStreamer(dtv_plugin* interface, const char* transport_desc);
+    dtv_plugin* createIptvPluginInterface();
     bool isLocked();
     void getFrontendInfo(FrontendInfo* _aidl_return);
     void setTunerService(std::shared_ptr<Tuner> tuner);
@@ -90,6 +96,7 @@
     string mIptvTransportDescription;
     dtv_streamer* mIptvPluginStreamer;
     std::thread mIptvFrontendTuneThread;
+    void* mTuneByteBuffer = nullptr;
 };
 
 }  // namespace tuner
diff --git a/tv/tuner/aidl/vts/functional/VtsHalTvTunerTargetTest.cpp b/tv/tuner/aidl/vts/functional/VtsHalTvTunerTargetTest.cpp
index 671d079..2b39bc6 100644
--- a/tv/tuner/aidl/vts/functional/VtsHalTvTunerTargetTest.cpp
+++ b/tv/tuner/aidl/vts/functional/VtsHalTvTunerTargetTest.cpp
@@ -684,10 +684,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
         live = configuration;
@@ -784,10 +780,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     // TODO use parameterized tests
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
@@ -802,10 +794,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
         live = configuration;
@@ -821,10 +809,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     // TODO use parameterized tests
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
@@ -1128,7 +1112,7 @@
     if (!record.support) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
+    // Recording is not currently supported for IPTV frontend
     if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
         return;
     }
@@ -1146,7 +1130,7 @@
     if (!record.support) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
+    // Recording is not currently supported for IPTV frontend
     if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
         return;
     }
@@ -1182,7 +1166,7 @@
     if (!record.support) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
+    // Recording is not currently supported for IPTV frontend
     if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
         return;
     }
@@ -1287,10 +1271,6 @@
 
 TEST_P(TunerFrontendAidlTest, getHardwareInfo) {
     description("Test Frontend get hardware info");
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     if (!live.hasFrontendConnection) {
         return;
     }
@@ -1338,10 +1318,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
         live = configuration;
@@ -1369,10 +1345,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
         live = configuration;
@@ -1402,10 +1374,6 @@
     if (!live.hasFrontendConnection) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     auto live_configs = generateLiveConfigurations();
     for (auto& configuration : live_configs) {
         live = configuration;
@@ -1419,10 +1387,6 @@
     if (!descrambling.support) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     vector<DescramblingHardwareConnections> descrambling_configs =
             generateDescramblingConfigurations();
     if (descrambling_configs.empty()) {
@@ -1459,10 +1423,6 @@
     if (!descrambling.support) {
         return;
     }
-    // Do not execute tests for IPTV Frontend
-    if (frontendMap[live.frontendId].type == FrontendType::IPTV) {
-        return;
-    }
     vector<DescramblingHardwareConnections> descrambling_configs =
             generateDescramblingConfigurations();
     if (descrambling_configs.empty()) {