Add IPTV default implementation
Frontend::tune(): create a streamer using plugin interface to
read a byte and return LOCKED event if byte is read
Demux::setFrontendDataSource():open a new stream to read data
from the socket and push the data read to DVR FMQ.
Test: atest VtsHalTvTunerTargetTest
Bug: 288170590
Change-Id: Iaf2eae7b4dc9e7d69b1f7b3a367d24f6acdd68be
diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp
index 11e7131..34e3442 100644
--- a/tv/tuner/aidl/default/Demux.cpp
+++ b/tv/tuner/aidl/default/Demux.cpp
@@ -20,7 +20,9 @@
#include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
#include <aidl/android/hardware/tv/tuner/Result.h>
+#include <fmq/AidlMessageQueue.h>
#include <utils/Log.h>
+#include <thread>
#include "Demux.h"
namespace aidl {
@@ -29,6 +31,15 @@
namespace tv {
namespace tuner {
+using ::aidl::android::hardware::common::fmq::MQDescriptor;
+using ::aidl::android::hardware::common::fmq::SynchronizedReadWrite;
+using ::android::AidlMessageQueue;
+using ::android::hardware::EventFlag;
+
+using FilterMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
+using AidlMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
+using AidlMQDesc = MQDescriptor<int8_t, SynchronizedReadWrite>;
+
#define WAIT_TIMEOUT 3000000000
Demux::Demux(int32_t demuxId, uint32_t filterTypes) {
@@ -45,6 +56,111 @@
close();
}
+::ndk::ScopedAStatus Demux::openDvr(DvrType in_type, int32_t in_bufferSize,
+ const std::shared_ptr<IDvrCallback>& in_cb,
+ std::shared_ptr<IDvr>* _aidl_return) {
+ ALOGV("%s", __FUNCTION__);
+
+ if (in_cb == nullptr) {
+ ALOGW("[Demux] DVR callback can't be null");
+ *_aidl_return = nullptr;
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::INVALID_ARGUMENT));
+ }
+
+ set<int64_t>::iterator it;
+ switch (in_type) {
+ case DvrType::PLAYBACK:
+ mDvrPlayback = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
+ this->ref<Demux>());
+ if (!mDvrPlayback->createDvrMQ()) {
+ ALOGE("[Demux] cannot create dvr message queue");
+ mDvrPlayback = nullptr;
+ *_aidl_return = mDvrPlayback;
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::UNKNOWN_ERROR));
+ }
+
+ for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
+ if (!mDvrPlayback->addPlaybackFilter(*it, mFilters[*it])) {
+ ALOGE("[Demux] Can't get filter info for DVR playback");
+ mDvrPlayback = nullptr;
+ *_aidl_return = mDvrPlayback;
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::UNKNOWN_ERROR));
+ }
+ }
+
+ ALOGI("Playback normal case");
+
+ *_aidl_return = mDvrPlayback;
+ return ::ndk::ScopedAStatus::ok();
+ case DvrType::RECORD:
+ mDvrRecord = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
+ this->ref<Demux>());
+ if (!mDvrRecord->createDvrMQ()) {
+ mDvrRecord = nullptr;
+ *_aidl_return = mDvrRecord;
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::UNKNOWN_ERROR));
+ }
+
+ *_aidl_return = mDvrRecord;
+ return ::ndk::ScopedAStatus::ok();
+ default:
+ *_aidl_return = nullptr;
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::INVALID_ARGUMENT));
+ }
+}
+
+void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf,
+ size_t buf_size, int timeout_ms, int buffer_timeout) {
+ Timer *timer, *fullBufferTimer;
+ while (mDemuxIptvReadThreadRunning) {
+ if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
+ ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
+ delete fullBufferTimer;
+ break;
+ }
+ timer = new Timer();
+ ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
+ if (bytes_read == 0) {
+ double elapsed_time = timer->get_elapsed_time_ms();
+ if (elapsed_time > timeout_ms) {
+ ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
+ timeout_ms);
+ }
+ ALOGE("[Demux] Cannot read data from the socket");
+ delete timer;
+ break;
+ }
+
+ delete timer;
+ ALOGI("Number of bytes read: %zd", bytes_read);
+ int result = mDvrPlayback->writePlaybackFMQ(buf, bytes_read);
+
+ switch (result) {
+ case DVR_WRITE_FAILURE_REASON_FMQ_FULL:
+ if (!mIsIptvDvrFMQFull) {
+ mIsIptvDvrFMQFull = true;
+ fullBufferTimer = new Timer();
+ }
+ ALOGI("Waiting for client to flush DVR FMQ.");
+ break;
+ case DVR_WRITE_FAILURE_REASON_UNKNOWN:
+ ALOGE("Failed to write data into DVR FMQ for unknown reason");
+ break;
+ case DVR_WRITE_SUCCESS:
+ ALOGI("Wrote %zd bytes to DVR FMQ", bytes_read);
+ break;
+ default:
+ ALOGI("Invalid DVR Status");
+ }
+ }
+ mDemuxIptvReadThreadRunning = false;
+}
+
::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) {
ALOGV("%s", __FUNCTION__);
@@ -52,7 +168,6 @@
return ::ndk::ScopedAStatus::fromServiceSpecificError(
static_cast<int32_t>(Result::NOT_INITIALIZED));
}
-
mFrontend = mTuner->getFrontendById(in_frontendId);
if (mFrontend == nullptr) {
return ::ndk::ScopedAStatus::fromServiceSpecificError(
@@ -61,6 +176,58 @@
mTuner->setFrontendAsDemuxSource(in_frontendId, mDemuxId);
+ // if mFrontend is an IPTV frontend, create streamer to read TS data from socket
+ if (mFrontend->getFrontendType() == FrontendType::IPTV) {
+ // create a DVR instance on the demux
+ shared_ptr<IDvr> iptvDvr;
+
+ std::shared_ptr<IDvrCallback> dvrPlaybackCallback =
+ ::ndk::SharedRefBase::make<DvrPlaybackCallback>();
+
+ ::ndk::ScopedAStatus status =
+ openDvr(DvrType::PLAYBACK, IPTV_BUFFER_SIZE, dvrPlaybackCallback, &iptvDvr);
+ if (status.isOk()) {
+ ALOGI("DVR instance created");
+ }
+
+ // get plugin interface from frontend
+ dtv_plugin* interface = mFrontend->getIptvPluginInterface();
+ if (interface == nullptr) {
+ ALOGE("[Demux] getIptvPluginInterface(): plugin interface is null");
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::INVALID_STATE));
+ }
+ ALOGI("[Demux] getIptvPluginInterface(): plugin interface is not null");
+
+ // get streamer object from Frontend instance
+ dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
+ if (streamer == nullptr) {
+ ALOGE("[Demux] getIptvPluginStreamer(): streamer is null");
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::INVALID_STATE));
+ }
+ ALOGI("[Demux] getIptvPluginStreamer(): streamer is not null");
+
+ // get transport description from frontend
+ string transport_desc = mFrontend->getIptvTransportDescription();
+ ALOGI("[Demux] getIptvTransportDescription(): transport_desc: %s", transport_desc.c_str());
+
+ // call read_stream on the socket to populate the buffer with TS data
+ // while thread is alive, keep reading data
+ int timeout_ms = 20;
+ int buffer_timeout = 10000; // 10s
+ void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
+ if (buf == nullptr) ALOGI("malloc buf failed");
+ ALOGI("[ INFO ] Allocated buffer of size %d", IPTV_BUFFER_SIZE);
+ ALOGI("Getting FMQ from DVR instance to write socket data");
+ mDemuxIptvReadThreadRunning = true;
+ mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
+ buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
+ if (mDemuxIptvReadThread.joinable()) {
+ mDemuxIptvReadThread.join();
+ }
+ free(buf);
+ }
return ::ndk::ScopedAStatus::ok();
}
@@ -193,61 +360,6 @@
return ::ndk::ScopedAStatus::ok();
}
-::ndk::ScopedAStatus Demux::openDvr(DvrType in_type, int32_t in_bufferSize,
- const std::shared_ptr<IDvrCallback>& in_cb,
- std::shared_ptr<IDvr>* _aidl_return) {
- ALOGV("%s", __FUNCTION__);
-
- if (in_cb == nullptr) {
- ALOGW("[Demux] DVR callback can't be null");
- *_aidl_return = nullptr;
- return ::ndk::ScopedAStatus::fromServiceSpecificError(
- static_cast<int32_t>(Result::INVALID_ARGUMENT));
- }
-
- set<int64_t>::iterator it;
- switch (in_type) {
- case DvrType::PLAYBACK:
- mDvrPlayback = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
- this->ref<Demux>());
- if (!mDvrPlayback->createDvrMQ()) {
- mDvrPlayback = nullptr;
- *_aidl_return = mDvrPlayback;
- return ::ndk::ScopedAStatus::fromServiceSpecificError(
- static_cast<int32_t>(Result::UNKNOWN_ERROR));
- }
-
- for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
- if (!mDvrPlayback->addPlaybackFilter(*it, mFilters[*it])) {
- ALOGE("[Demux] Can't get filter info for DVR playback");
- mDvrPlayback = nullptr;
- *_aidl_return = mDvrPlayback;
- return ::ndk::ScopedAStatus::fromServiceSpecificError(
- static_cast<int32_t>(Result::UNKNOWN_ERROR));
- }
- }
-
- *_aidl_return = mDvrPlayback;
- return ::ndk::ScopedAStatus::ok();
- case DvrType::RECORD:
- mDvrRecord = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
- this->ref<Demux>());
- if (!mDvrRecord->createDvrMQ()) {
- mDvrRecord = nullptr;
- *_aidl_return = mDvrRecord;
- return ::ndk::ScopedAStatus::fromServiceSpecificError(
- static_cast<int32_t>(Result::UNKNOWN_ERROR));
- }
-
- *_aidl_return = mDvrRecord;
- return ::ndk::ScopedAStatus::ok();
- default:
- *_aidl_return = nullptr;
- return ::ndk::ScopedAStatus::fromServiceSpecificError(
- static_cast<int32_t>(Result::INVALID_ARGUMENT));
- }
-}
-
::ndk::ScopedAStatus Demux::connectCiCam(int32_t in_ciCamId) {
ALOGV("%s", __FUNCTION__);