Adding the mocking frontend tuning functionality to take specific ts
file as source of a Demux.
Test: atest
Bug: 135709325
Change-Id: I69849db58d68a7496f929940a74a63e7a9e6c6be
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 080116c..04382b0 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -67,8 +67,9 @@
0x73, 0x63, 0x65, 0x6e, 0x65,
};
-Demux::Demux(uint32_t demuxId) {
+Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
mDemuxId = demuxId;
+ mTunerService = tuner;
}
Demux::~Demux() {}
@@ -76,9 +77,20 @@
Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
ALOGV("%s", __FUNCTION__);
- mSourceFrontendId = frontendId;
+ if (mTunerService == nullptr) {
+ return Result::NOT_INITIALIZED;
+ }
- return Result::SUCCESS;
+ mFrontend = mTunerService->getFrontendById(frontendId);
+
+ if (mFrontend == nullptr) {
+ return Result::INVALID_STATE;
+ }
+
+ mFrontendSourceFile = mFrontend->getSourceFile();
+
+ mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
+ return startBroadcastInputLoop();
}
Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
@@ -194,6 +206,8 @@
mFilterThreadRunning[filterId] = false;
+ std::lock_guard<std::mutex> lock(mFilterThreadLock);
+
return Result::SUCCESS;
}
@@ -396,6 +410,8 @@
mInputThreadRunning = false;
+ std::lock_guard<std::mutex> lock(mInputThreadLock);
+
return Result::SUCCESS;
}
@@ -447,19 +463,28 @@
return Result::SUCCESS;
}
- // TODO extract PES from TS
- if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) {
- mFilterOutputs[filterId].clear();
- return Result::INVALID_STATE;
+ for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
+ uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40;
+ uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4;
+ ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl);
+ if (pusi && (adaptFieldControl == 0x01)) {
+ vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
+ vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187;
+ vector<uint8_t> filterOutData(first, last);
+ if (!writeDataToFilterMQ(filterOutData, filterId)) {
+ mFilterOutputs[filterId].clear();
+ return Result::INVALID_STATE;
+ }
+ pesEvent = {
+ // temp dump meta data
+ .streamId = filterOutData[3],
+ .dataLength = static_cast<uint16_t>(filterOutData.size()),
+ };
+ int size = mFilterEvents[filterId].events.size();
+ mFilterEvents[filterId].events.resize(size + 1);
+ mFilterEvents[filterId].events[size].pes(pesEvent);
+ }
}
- pesEvent = {
- // temp dump meta data
- .streamId = 0,
- .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()),
- };
- int size = mFilterEvents[filterId].events.size();
- mFilterEvents[filterId].events.resize(size + 1);
- mFilterEvents[filterId].events[size].pes(pesEvent);
mFilterOutputs[filterId].clear();
@@ -481,6 +506,8 @@
};
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].media() = mediaEvent;
+
+ mFilterOutputs[filterId].clear();
// TODO handle write FQM for media stream
return Result::SUCCESS;
}
@@ -495,6 +522,8 @@
recordEvent.indexMask.tsIndexMask() = 0x01;
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].ts() = recordEvent;
+
+ mFilterOutputs[filterId].clear();
return Result::SUCCESS;
}
@@ -554,10 +583,7 @@
return false;
}
-bool Demux::filterAndOutputData() {
- Result result;
- set<uint32_t>::iterator it;
-
+bool Demux::readInputFMQ() {
// Read input data from the input FMQ
int size = mInputMQ->availableToRead();
int inputPacketSize = mInputSettings.packetSize;
@@ -566,15 +592,29 @@
// Dispatch the packet to the PID matching filter output buffer
for (int i = 0; i < size / inputPacketSize; i++) {
- mInputMQ->read(dataOutputBuffer.data(), inputPacketSize);
- for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
- uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff));
- if (pid == mFilterPids[*it]) {
- mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(),
- dataOutputBuffer.end());
- }
+ if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
+ return false;
+ }
+ startTsFilter(dataOutputBuffer);
+ }
+
+ return true;
+}
+
+void Demux::startTsFilter(vector<uint8_t> data) {
+ set<uint32_t>::iterator it;
+ for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
+ uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
+ ALOGW("start ts filter pid: %d", pid);
+ if (pid == mFilterPids[*it]) {
+ mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
}
}
+}
+
+bool Demux::startFilterDispatcher() {
+ Result result;
+ set<uint32_t>::iterator it;
// Handle the output data per filter type
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
@@ -620,6 +660,7 @@
void Demux::filterThreadLoop(uint32_t filterId) {
ALOGD("[Demux] filter %d threadLoop start.", filterId);
+ std::lock_guard<std::mutex> lock(mFilterThreadLock);
mFilterThreadRunning[filterId] = true;
// For the first time of filter output, implementation needs to send the filter
@@ -682,6 +723,7 @@
void Demux::inputThreadLoop() {
ALOGD("[Demux] input threadLoop start.");
+ std::lock_guard<std::mutex> lock(mInputThreadLock);
mInputThreadRunning = true;
while (mInputThreadRunning) {
@@ -695,7 +737,7 @@
}
// Our current implementation filter the data and write it into the filter FMQ immedaitely
// after the DATA_READY from the VTS/framework
- if (!filterAndOutputData()) {
+ if (!readInputFMQ() || !startFilterDispatcher()) {
ALOGD("[Demux] input data failed to be filtered. Ending thread");
break;
}
@@ -735,6 +777,70 @@
return mIntputStatus;
}
+Result Demux::startBroadcastInputLoop() {
+ pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
+ pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
+
+ return Result::SUCCESS;
+}
+
+void* Demux::__threadLoopBroadcast(void* user) {
+ Demux* const self = static_cast<Demux*>(user);
+ self->broadcastInputThreadLoop();
+ return 0;
+}
+
+void Demux::broadcastInputThreadLoop() {
+ std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+ mBroadcastInputThreadRunning = true;
+ mKeepFetchingDataFromFrontend = true;
+
+ // open the stream and get its length
+ std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
+ // TODO take the packet size from the frontend setting
+ int packetSize = 188;
+ int writePacketAmount = 6;
+ char* buffer = new char[packetSize];
+ ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
+ if (!inputData.is_open()) {
+ mBroadcastInputThreadRunning = false;
+ ALOGW("[Demux] Error %s", strerror(errno));
+ }
+
+ while (mBroadcastInputThreadRunning) {
+ // move the stream pointer for packet size * 6 every read until the end
+ while (mKeepFetchingDataFromFrontend) {
+ for (int i = 0; i < writePacketAmount; i++) {
+ inputData.read(buffer, packetSize);
+ if (!inputData) {
+ mBroadcastInputThreadRunning = false;
+ break;
+ }
+ // filter and dispatch filter output
+ vector<uint8_t> byteBuffer;
+ byteBuffer.resize(sizeof(buffer));
+ for (int index = 0; index < byteBuffer.size(); index++) {
+ byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
+ }
+ startTsFilter(byteBuffer);
+ inputData.seekg(packetSize, inputData.cur);
+ }
+ startFilterDispatcher();
+ sleep(1);
+ }
+ }
+
+ ALOGW("[Demux] Broadcast Input thread end.");
+ delete[] buffer;
+ inputData.close();
+}
+
+void Demux::stopBroadcastInput() {
+ mKeepFetchingDataFromFrontend = false;
+ mBroadcastInputThreadRunning = false;
+ std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+}
+
} // namespace implementation
} // namespace V1_0
} // namespace tuner
diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h
index eaf6ac3..e4a4e2b 100644
--- a/tv/tuner/1.0/default/Demux.h
+++ b/tv/tuner/1.0/default/Demux.h
@@ -20,6 +20,8 @@
#include <android/hardware/tv/tuner/1.0/IDemux.h>
#include <fmq/MessageQueue.h>
#include <set>
+#include "Frontend.h"
+#include "Tuner.h"
using namespace std;
@@ -40,9 +42,12 @@
using FilterMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>;
+class Tuner;
+class Frontend;
+
class Demux : public IDemux {
public:
- Demux(uint32_t demuxId);
+ Demux(uint32_t demuxId, sp<Tuner> tuner);
~Demux();
@@ -103,7 +108,17 @@
virtual Return<Result> removeOutput() override;
+ // Functions interacts with Tuner Service
+ void stopBroadcastInput();
+
private:
+ // Tuner service
+ sp<Tuner> mTunerService;
+
+ // Frontend source
+ sp<Frontend> mFrontend;
+ string mFrontendSourceFile;
+
// A struct that passes the arguments to a newly created filter thread
struct ThreadArgs {
Demux* user;
@@ -122,6 +137,7 @@
Result startRecordFilterHandler(uint32_t filterId);
Result startPcrFilterHandler();
Result startFilterLoop(uint32_t filterId);
+ Result startBroadcastInputLoop();
/**
* To create a FilterMQ with the the next available Filter ID.
@@ -143,14 +159,17 @@
* A dispatcher to read and dispatch input data to all the started filters.
* Each filter handler handles the data filtering/output writing/filterEvent updating.
*/
- bool filterAndOutputData();
+ bool readInputFMQ();
+ void startTsFilter(vector<uint8_t> data);
+ bool startFilterDispatcher();
static void* __threadLoopFilter(void* data);
static void* __threadLoopInput(void* user);
+ static void* __threadLoopBroadcast(void* user);
void filterThreadLoop(uint32_t filterId);
void inputThreadLoop();
+ void broadcastInputThreadLoop();
uint32_t mDemuxId;
- uint32_t mSourceFrontendId;
/**
* Record the last used filter id. Initial value is -1.
* Filter Id starts with 0.
@@ -195,6 +214,7 @@
// Thread handlers
pthread_t mInputThread;
pthread_t mOutputThread;
+ pthread_t mBroadcastInputThread;
vector<pthread_t> mFilterThreads;
// FMQ status local records
@@ -204,6 +224,8 @@
*/
vector<bool> mFilterThreadRunning;
bool mInputThreadRunning;
+ bool mBroadcastInputThreadRunning;
+ bool mKeepFetchingDataFromFrontend;
/**
* Lock to protect writes to the FMQs
*/
@@ -217,6 +239,9 @@
* Lock to protect writes to the input status
*/
std::mutex mInputStatusLock;
+ std::mutex mBroadcastInputThreadLock;
+ std::mutex mFilterThreadLock;
+ std::mutex mInputThreadLock;
/**
* How many times a filter should write
* TODO make this dynamic/random/can take as a parameter
diff --git a/tv/tuner/1.0/default/Frontend.cpp b/tv/tuner/1.0/default/Frontend.cpp
index 6f87d1b..1e07edd 100644
--- a/tv/tuner/1.0/default/Frontend.cpp
+++ b/tv/tuner/1.0/default/Frontend.cpp
@@ -27,14 +27,10 @@
namespace V1_0 {
namespace implementation {
-Frontend::Frontend() {
- // Init callback to nullptr
- mCallback = nullptr;
-}
-
-Frontend::Frontend(FrontendType type, FrontendId id) {
+Frontend::Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner) {
mType = type;
mId = id;
+ mTunerService = tuner;
// Init callback to nullptr
mCallback = nullptr;
}
@@ -67,13 +63,18 @@
return Result::INVALID_STATE;
}
- mCallback->onEvent(FrontendEventType::NO_SIGNAL);
+ // TODO dynamically allocate file to the source file
+ mSourceStreamFile = FRONTEND_STREAM_FILE;
+
+ mCallback->onEvent(FrontendEventType::LOCKED);
return Result::SUCCESS;
}
Return<Result> Frontend::stopTune() {
ALOGV("%s", __FUNCTION__);
+ mTunerService->frontendStopTune(mId);
+
return Result::SUCCESS;
}
@@ -119,6 +120,10 @@
return mId;
}
+string Frontend::getSourceFile() {
+ return mSourceStreamFile;
+}
+
} // namespace implementation
} // namespace V1_0
} // namespace tuner
diff --git a/tv/tuner/1.0/default/Frontend.h b/tv/tuner/1.0/default/Frontend.h
index f881e8b..07fa7b9 100644
--- a/tv/tuner/1.0/default/Frontend.h
+++ b/tv/tuner/1.0/default/Frontend.h
@@ -18,7 +18,9 @@
#define ANDROID_HARDWARE_TV_TUNER_V1_0_FRONTEND_H_
#include <android/hardware/tv/tuner/1.0/IFrontend.h>
-#include <android/hardware/tv/tuner/1.0/ITuner.h>
+#include <fstream>
+#include <iostream>
+#include "Tuner.h"
using namespace std;
@@ -35,11 +37,11 @@
using ::android::hardware::tv::tuner::V1_0::IFrontendCallback;
using ::android::hardware::tv::tuner::V1_0::Result;
+class Tuner;
+
class Frontend : public IFrontend {
public:
- Frontend();
-
- Frontend(FrontendType type, FrontendId id);
+ Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner);
virtual Return<Result> close() override;
@@ -64,11 +66,18 @@
FrontendId getFrontendId();
+ string getSourceFile();
+
private:
virtual ~Frontend();
sp<IFrontendCallback> mCallback;
+ sp<Tuner> mTunerService;
FrontendType mType = FrontendType::UNDEFINED;
FrontendId mId = 0;
+
+ const string FRONTEND_STREAM_FILE = "/vendor/etc/test1.ts";
+ string mSourceStreamFile;
+ std::ifstream mFrontendData;
};
} // namespace implementation
diff --git a/tv/tuner/1.0/default/Tuner.cpp b/tv/tuner/1.0/default/Tuner.cpp
index a4a8ca5..f86b28d 100644
--- a/tv/tuner/1.0/default/Tuner.cpp
+++ b/tv/tuner/1.0/default/Tuner.cpp
@@ -38,14 +38,14 @@
// Array index matches their FrontendId in the default impl
mFrontendSize = 8;
mFrontends.resize(mFrontendSize);
- mFrontends[0] = new Frontend();
- mFrontends[1] = new Frontend(FrontendType::ATSC, 1);
- mFrontends[2] = new Frontend(FrontendType::DVBC, 2);
- mFrontends[3] = new Frontend(FrontendType::DVBS, 3);
- mFrontends[4] = new Frontend(FrontendType::DVBT, 4);
- mFrontends[5] = new Frontend(FrontendType::ISDBT, 5);
- mFrontends[6] = new Frontend(FrontendType::ANALOG, 6);
- mFrontends[7] = new Frontend(FrontendType::ATSC, 7);
+ mFrontends[0] = new Frontend(FrontendType::DVBT, 0, this);
+ mFrontends[1] = new Frontend(FrontendType::ATSC, 1, this);
+ mFrontends[2] = new Frontend(FrontendType::DVBC, 2, this);
+ mFrontends[3] = new Frontend(FrontendType::DVBS, 3, this);
+ mFrontends[4] = new Frontend(FrontendType::DVBT, 4, this);
+ mFrontends[5] = new Frontend(FrontendType::ISDBT, 5, this);
+ mFrontends[6] = new Frontend(FrontendType::ANALOG, 6, this);
+ mFrontends[7] = new Frontend(FrontendType::ATSC, 7, this);
}
Tuner::~Tuner() {}
@@ -81,7 +81,8 @@
DemuxId demuxId = mLastUsedId + 1;
mLastUsedId += 1;
- sp<IDemux> demux = new Demux(demuxId);
+ sp<Demux> demux = new Demux(demuxId, this);
+ mDemuxes[demuxId] = demux;
_hidl_cb(Result::SUCCESS, demuxId, demux);
return Void();
@@ -132,6 +133,25 @@
return Void();
}
+sp<Frontend> Tuner::getFrontendById(uint32_t frontendId) {
+ ALOGV("%s", __FUNCTION__);
+
+ return mFrontends[frontendId];
+}
+
+void Tuner::setFrontendAsDemuxSource(uint32_t frontendId, uint32_t demuxId) {
+ mFrontendToDemux[frontendId] = demuxId;
+}
+
+void Tuner::frontendStopTune(uint32_t frontendId) {
+ map<uint32_t, uint32_t>::iterator it = mFrontendToDemux.find(frontendId);
+ uint32_t demuxId;
+ if (it != mFrontendToDemux.end()) {
+ demuxId = it->second;
+ mDemuxes[demuxId]->stopBroadcastInput();
+ }
+}
+
} // namespace implementation
} // namespace V1_0
} // namespace tuner
diff --git a/tv/tuner/1.0/default/Tuner.h b/tv/tuner/1.0/default/Tuner.h
index c089864..96da257 100644
--- a/tv/tuner/1.0/default/Tuner.h
+++ b/tv/tuner/1.0/default/Tuner.h
@@ -18,6 +18,8 @@
#define ANDROID_HARDWARE_TV_TUNER_V1_0_TUNER_H_
#include <android/hardware/tv/tuner/1.0/ITuner.h>
+#include <map>
+#include "Demux.h"
#include "Frontend.h"
using namespace std;
@@ -29,6 +31,9 @@
namespace V1_0 {
namespace implementation {
+class Frontend;
+class Demux;
+
class Tuner : public ITuner {
public:
Tuner();
@@ -50,10 +55,18 @@
virtual Return<void> openLnbById(LnbId lnbId, openLnbById_cb _hidl_cb) override;
+ sp<Frontend> getFrontendById(uint32_t frontendId);
+
+ void setFrontendAsDemuxSource(uint32_t frontendId, uint32_t demuxId);
+
+ void frontendStopTune(uint32_t frontendId);
+
private:
virtual ~Tuner();
// Static mFrontends array to maintain local frontends information
vector<sp<Frontend>> mFrontends;
+ std::map<uint32_t, uint32_t> mFrontendToDemux;
+ std::map<uint32_t, sp<Demux>> mDemuxes;
// To maintain how many Frontends we have
int mFrontendSize;
// The last used demux id. Initial value is -1.