Adding the mocking frontend tuning functionality to take specific ts
file as source of a Demux.
Test: atest
Bug: 135709325
Change-Id: I69849db58d68a7496f929940a74a63e7a9e6c6be
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 080116c..04382b0 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -67,8 +67,9 @@
0x73, 0x63, 0x65, 0x6e, 0x65,
};
-Demux::Demux(uint32_t demuxId) {
+Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
mDemuxId = demuxId;
+ mTunerService = tuner;
}
Demux::~Demux() {}
@@ -76,9 +77,20 @@
Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
ALOGV("%s", __FUNCTION__);
- mSourceFrontendId = frontendId;
+ if (mTunerService == nullptr) {
+ return Result::NOT_INITIALIZED;
+ }
- return Result::SUCCESS;
+ mFrontend = mTunerService->getFrontendById(frontendId);
+
+ if (mFrontend == nullptr) {
+ return Result::INVALID_STATE;
+ }
+
+ mFrontendSourceFile = mFrontend->getSourceFile();
+
+ mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
+ return startBroadcastInputLoop();
}
Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
@@ -194,6 +206,8 @@
mFilterThreadRunning[filterId] = false;
+ std::lock_guard<std::mutex> lock(mFilterThreadLock);
+
return Result::SUCCESS;
}
@@ -396,6 +410,8 @@
mInputThreadRunning = false;
+ std::lock_guard<std::mutex> lock(mInputThreadLock);
+
return Result::SUCCESS;
}
@@ -447,19 +463,28 @@
return Result::SUCCESS;
}
- // TODO extract PES from TS
- if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) {
- mFilterOutputs[filterId].clear();
- return Result::INVALID_STATE;
+ for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
+ uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40;
+ uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4;
+ ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl);
+ if (pusi && (adaptFieldControl == 0x01)) {
+ vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
+ vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187;
+ vector<uint8_t> filterOutData(first, last);
+ if (!writeDataToFilterMQ(filterOutData, filterId)) {
+ mFilterOutputs[filterId].clear();
+ return Result::INVALID_STATE;
+ }
+ pesEvent = {
+ // temp dump meta data
+ .streamId = filterOutData[3],
+ .dataLength = static_cast<uint16_t>(filterOutData.size()),
+ };
+ int size = mFilterEvents[filterId].events.size();
+ mFilterEvents[filterId].events.resize(size + 1);
+ mFilterEvents[filterId].events[size].pes(pesEvent);
+ }
}
- pesEvent = {
- // temp dump meta data
- .streamId = 0,
- .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()),
- };
- int size = mFilterEvents[filterId].events.size();
- mFilterEvents[filterId].events.resize(size + 1);
- mFilterEvents[filterId].events[size].pes(pesEvent);
mFilterOutputs[filterId].clear();
@@ -481,6 +506,8 @@
};
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].media() = mediaEvent;
+
+ mFilterOutputs[filterId].clear();
// TODO handle write FQM for media stream
return Result::SUCCESS;
}
@@ -495,6 +522,8 @@
recordEvent.indexMask.tsIndexMask() = 0x01;
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].ts() = recordEvent;
+
+ mFilterOutputs[filterId].clear();
return Result::SUCCESS;
}
@@ -554,10 +583,7 @@
return false;
}
-bool Demux::filterAndOutputData() {
- Result result;
- set<uint32_t>::iterator it;
-
+bool Demux::readInputFMQ() {
// Read input data from the input FMQ
int size = mInputMQ->availableToRead();
int inputPacketSize = mInputSettings.packetSize;
@@ -566,15 +592,29 @@
// Dispatch the packet to the PID matching filter output buffer
for (int i = 0; i < size / inputPacketSize; i++) {
- mInputMQ->read(dataOutputBuffer.data(), inputPacketSize);
- for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
- uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff));
- if (pid == mFilterPids[*it]) {
- mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(),
- dataOutputBuffer.end());
- }
+ if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
+ return false;
+ }
+ startTsFilter(dataOutputBuffer);
+ }
+
+ return true;
+}
+
+void Demux::startTsFilter(vector<uint8_t> data) {
+ set<uint32_t>::iterator it;
+ for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
+ uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
+ ALOGW("start ts filter pid: %d", pid);
+ if (pid == mFilterPids[*it]) {
+ mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
}
}
+}
+
+bool Demux::startFilterDispatcher() {
+ Result result;
+ set<uint32_t>::iterator it;
// Handle the output data per filter type
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
@@ -620,6 +660,7 @@
void Demux::filterThreadLoop(uint32_t filterId) {
ALOGD("[Demux] filter %d threadLoop start.", filterId);
+ std::lock_guard<std::mutex> lock(mFilterThreadLock);
mFilterThreadRunning[filterId] = true;
// For the first time of filter output, implementation needs to send the filter
@@ -682,6 +723,7 @@
void Demux::inputThreadLoop() {
ALOGD("[Demux] input threadLoop start.");
+ std::lock_guard<std::mutex> lock(mInputThreadLock);
mInputThreadRunning = true;
while (mInputThreadRunning) {
@@ -695,7 +737,7 @@
}
// Our current implementation filter the data and write it into the filter FMQ immedaitely
// after the DATA_READY from the VTS/framework
- if (!filterAndOutputData()) {
+ if (!readInputFMQ() || !startFilterDispatcher()) {
ALOGD("[Demux] input data failed to be filtered. Ending thread");
break;
}
@@ -735,6 +777,70 @@
return mIntputStatus;
}
+Result Demux::startBroadcastInputLoop() {
+ pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
+ pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
+
+ return Result::SUCCESS;
+}
+
+void* Demux::__threadLoopBroadcast(void* user) {
+ Demux* const self = static_cast<Demux*>(user);
+ self->broadcastInputThreadLoop();
+ return 0;
+}
+
+void Demux::broadcastInputThreadLoop() {
+ std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+ mBroadcastInputThreadRunning = true;
+ mKeepFetchingDataFromFrontend = true;
+
+ // open the stream and get its length
+ std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
+ // TODO take the packet size from the frontend setting
+ int packetSize = 188;
+ int writePacketAmount = 6;
+ char* buffer = new char[packetSize];
+ ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
+ if (!inputData.is_open()) {
+ mBroadcastInputThreadRunning = false;
+ ALOGW("[Demux] Error %s", strerror(errno));
+ }
+
+ while (mBroadcastInputThreadRunning) {
+ // move the stream pointer for packet size * 6 every read until the end
+ while (mKeepFetchingDataFromFrontend) {
+ for (int i = 0; i < writePacketAmount; i++) {
+ inputData.read(buffer, packetSize);
+ if (!inputData) {
+ mBroadcastInputThreadRunning = false;
+ break;
+ }
+ // filter and dispatch filter output
+ vector<uint8_t> byteBuffer;
+ byteBuffer.resize(sizeof(buffer));
+ for (int index = 0; index < byteBuffer.size(); index++) {
+ byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
+ }
+ startTsFilter(byteBuffer);
+ inputData.seekg(packetSize, inputData.cur);
+ }
+ startFilterDispatcher();
+ sleep(1);
+ }
+ }
+
+ ALOGW("[Demux] Broadcast Input thread end.");
+ delete[] buffer;
+ inputData.close();
+}
+
+void Demux::stopBroadcastInput() {
+ mKeepFetchingDataFromFrontend = false;
+ mBroadcastInputThreadRunning = false;
+ std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+}
+
} // namespace implementation
} // namespace V1_0
} // namespace tuner