Merge "Demux thread reads data after filter start" into main
diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp
index 34e3442..de94467 100644
--- a/tv/tuner/aidl/default/Demux.cpp
+++ b/tv/tuner/aidl/default/Demux.cpp
@@ -53,6 +53,9 @@
Demux::~Demux() {
ALOGV("%s", __FUNCTION__);
+ if (mDemuxIptvReadThread.joinable()) {
+ mDemuxIptvReadThread.join();
+ }
close();
}
@@ -114,16 +117,26 @@
}
}
-void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf,
- size_t buf_size, int timeout_ms, int buffer_timeout) {
+void Demux::setIptvThreadRunning(bool isIptvThreadRunning) {
+ std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
+ mIsIptvReadThreadRunning = isIptvThreadRunning;
+ mIsIptvThreadRunningCv.notify_all();
+}
+
+void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size,
+ int timeout_ms, int buffer_timeout) {
Timer *timer, *fullBufferTimer;
- while (mDemuxIptvReadThreadRunning) {
+ while (true) {
+ std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
+ mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; });
if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
delete fullBufferTimer;
break;
}
timer = new Timer();
+ void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
+ if (buf == nullptr) ALOGI("Buffer allocation failed");
ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
if (bytes_read == 0) {
double elapsed_time = timer->get_elapsed_time_ms();
@@ -157,8 +170,9 @@
default:
ALOGI("Invalid DVR Status");
}
+
+ free(buf);
}
- mDemuxIptvReadThreadRunning = false;
}
::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) {
@@ -216,17 +230,8 @@
// while thread is alive, keep reading data
int timeout_ms = 20;
int buffer_timeout = 10000; // 10s
- void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
- if (buf == nullptr) ALOGI("malloc buf failed");
- ALOGI("[ INFO ] Allocated buffer of size %d", IPTV_BUFFER_SIZE);
- ALOGI("Getting FMQ from DVR instance to write socket data");
- mDemuxIptvReadThreadRunning = true;
mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
- buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
- if (mDemuxIptvReadThread.joinable()) {
- mDemuxIptvReadThread.join();
- }
- free(buf);
+ IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
}
return ::ndk::ScopedAStatus::ok();
}
diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h
index a23063f..ad7b7a7 100644
--- a/tv/tuner/aidl/default/Demux.h
+++ b/tv/tuner/aidl/default/Demux.h
@@ -103,7 +103,7 @@
void setIsRecording(bool isRecording);
bool isRecording();
void startFrontendInputLoop();
- void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size,
+ void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size,
int timeout_ms, int buffer_timeout);
/**
@@ -124,6 +124,11 @@
void setInUse(bool inUse);
void setTunerService(std::shared_ptr<Tuner> tuner);
+ /**
+ * Setter for IPTV Reading thread
+ */
+ void setIptvThreadRunning(bool isIptvThreadRunning);
+
private:
// Tuner service
std::shared_ptr<Tuner> mTuner;
@@ -196,10 +201,16 @@
* If a specific filter's writing loop is still running
*/
std::atomic<bool> mFrontendInputThreadRunning;
- std::atomic<bool> mDemuxIptvReadThreadRunning;
std::atomic<bool> mKeepFetchingDataFromFrontend;
/**
+ * Controls IPTV reading thread status
+ */
+ bool mIsIptvReadThreadRunning;
+ std::mutex mIsIptvThreadRunningMutex;
+ std::condition_variable mIsIptvThreadRunningCv;
+
+ /**
* If the dvr recording is running.
*/
bool mIsRecording = false;
diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp
index d8f5dd5..212d329 100644
--- a/tv/tuner/aidl/default/Filter.cpp
+++ b/tv/tuner/aidl/default/Filter.cpp
@@ -328,6 +328,8 @@
std::vector<DemuxFilterEvent> events;
mFilterCount += 1;
+ mDemux->setIptvThreadRunning(true);
+
// All the filter event callbacks in start are for testing purpose.
switch (mType.mainType) {
case DemuxFilterMainType::TS:
@@ -365,6 +367,9 @@
ALOGV("%s", __FUNCTION__);
mFilterCount -= 1;
+ if (mFilterCount == 0) {
+ mDemux->setIptvThreadRunning(false);
+ }
mFilterThreadRunning = false;
if (mFilterThread.joinable()) {