Adding a DVR Record default implementation in Tuner HAL 1.0
Test: cuttlefish
Bug: 135709325
Change-Id: I415426d6ec048bdd2ae61a3c5142ad02f1d7f1e4
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index 71a26ab..43c4e3a 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -51,7 +51,8 @@
mFrontendSourceFile = mFrontend->getSourceFile();
mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
- return startBroadcastInputLoop();
+
+ return startFrontendInputLoop();
}
Return<void> Demux::openFilter(const DemuxFilterType& type, uint32_t bufferSize,
@@ -136,14 +137,14 @@
return Void();
}
- sp<Dvr> dvr = new Dvr(type, bufferSize, cb, this);
+ mDvr = new Dvr(type, bufferSize, cb, this);
- if (!dvr->createDvrMQ()) {
- _hidl_cb(Result::UNKNOWN_ERROR, dvr);
+ if (!mDvr->createDvrMQ()) {
+ _hidl_cb(Result::UNKNOWN_ERROR, mDvr);
return Void();
}
- _hidl_cb(Result::SUCCESS, dvr);
+ _hidl_cb(Result::SUCCESS, mDvr);
return Void();
}
@@ -166,13 +167,14 @@
// resetFilterRecords(filterId);
mUsedFilterIds.erase(filterId);
+ mRecordFilterIds.erase(filterId);
mUnusedFilterIds.insert(filterId);
mFilters.erase(filterId);
return Result::SUCCESS;
}
-void Demux::startTsFilter(vector<uint8_t> data) {
+void Demux::startBroadcastTsFilter(vector<uint8_t> data) {
set<uint32_t>::iterator it;
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
@@ -185,7 +187,17 @@
}
}
-bool Demux::startFilterDispatcher() {
+void Demux::sendFrontendInputToRecord(vector<uint8_t> data) {
+ set<uint32_t>::iterator it;
+ for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) {
+ if (DEBUG_FILTER) {
+ ALOGW("update record filter output");
+ }
+ mFilters[*it]->updateRecordOutput(data);
+ }
+}
+
+bool Demux::startBroadcastFilterDispatcher() {
set<uint32_t>::iterator it;
// Handle the output data per filter type
@@ -198,6 +210,18 @@
return true;
}
+bool Demux::startRecordFilterDispatcher() {
+ set<uint32_t>::iterator it;
+
+ for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) {
+ if (mFilters[*it]->startRecordFilterHandler() != Result::SUCCESS) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
Result Demux::startFilterHandler(uint32_t filterId) {
return mFilters[filterId]->startFilterHandler();
}
@@ -210,22 +234,22 @@
return mFilters[filterId]->getTpid();
}
-Result Demux::startBroadcastInputLoop() {
- pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
- pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
+Result Demux::startFrontendInputLoop() {
+ pthread_create(&mFrontendInputThread, NULL, __threadLoopFrontend, this);
+ pthread_setname_np(mFrontendInputThread, "frontend_input_thread");
return Result::SUCCESS;
}
-void* Demux::__threadLoopBroadcast(void* user) {
+void* Demux::__threadLoopFrontend(void* user) {
Demux* const self = static_cast<Demux*>(user);
- self->broadcastInputThreadLoop();
+ self->frontendInputThreadLoop();
return 0;
}
-void Demux::broadcastInputThreadLoop() {
- std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
- mBroadcastInputThreadRunning = true;
+void Demux::frontendInputThreadLoop() {
+ std::lock_guard<std::mutex> lock(mFrontendInputThreadLock);
+ mFrontendInputThreadRunning = true;
mKeepFetchingDataFromFrontend = true;
// open the stream and get its length
@@ -234,20 +258,20 @@
int packetSize = 188;
int writePacketAmount = 6;
char* buffer = new char[packetSize];
- ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
+ ALOGW("[Demux] Frontend input thread loop start %s", mFrontendSourceFile.c_str());
if (!inputData.is_open()) {
- mBroadcastInputThreadRunning = false;
+ mFrontendInputThreadRunning = false;
ALOGW("[Demux] Error %s", strerror(errno));
}
- while (mBroadcastInputThreadRunning) {
+ while (mFrontendInputThreadRunning) {
// move the stream pointer for packet size * 6 every read until the end
while (mKeepFetchingDataFromFrontend) {
for (int i = 0; i < writePacketAmount; i++) {
inputData.read(buffer, packetSize);
if (!inputData) {
mKeepFetchingDataFromFrontend = false;
- mBroadcastInputThreadRunning = false;
+ mFrontendInputThreadRunning = false;
break;
}
// filter and dispatch filter output
@@ -256,23 +280,61 @@
for (int index = 0; index < byteBuffer.size(); index++) {
byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
}
- startTsFilter(byteBuffer);
+ if (mIsRecording) {
+ // Feed the data into the Dvr recording input
+ sendFrontendInputToRecord(byteBuffer);
+ } else {
+ // Feed the data into the broadcast demux filter
+ startBroadcastTsFilter(byteBuffer);
+ }
}
- startFilterDispatcher();
+ if (mIsRecording) {
+ // Dispatch the data into the broadcasting filters.
+ startRecordFilterDispatcher();
+ } else {
+ // Dispatch the data into the broadcasting filters.
+ startBroadcastFilterDispatcher();
+ }
usleep(100);
}
}
- ALOGW("[Demux] Broadcast Input thread end.");
+ ALOGW("[Demux] Frontend Input thread end.");
delete[] buffer;
inputData.close();
}
-void Demux::stopBroadcastInput() {
+void Demux::stopFrontendInput() {
ALOGD("[Demux] stop frontend on demux");
mKeepFetchingDataFromFrontend = false;
- mBroadcastInputThreadRunning = false;
- std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
+ mFrontendInputThreadRunning = false;
+ std::lock_guard<std::mutex> lock(mFrontendInputThreadLock);
+}
+
+void Demux::setIsRecording(bool isRecording) {
+ mIsRecording = isRecording;
+}
+
+bool Demux::attachRecordFilter(int filterId) {
+ if (mFilters[filterId] == nullptr || mDvr == nullptr) {
+ return false;
+ }
+
+ mRecordFilterIds.insert(filterId);
+ mFilters[filterId]->attachFilterToRecord(mDvr);
+
+ return true;
+}
+
+bool Demux::detachRecordFilter(int filterId) {
+ if (mFilters[filterId] == nullptr || mDvr == nullptr) {
+ return false;
+ }
+
+ mRecordFilterIds.erase(filterId);
+ mFilters[filterId]->detachFilterFromRecord();
+
+ return true;
}
} // namespace implementation
diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h
index 037429d..1405d0c 100644
--- a/tv/tuner/1.0/default/Demux.h
+++ b/tv/tuner/1.0/default/Demux.h
@@ -81,11 +81,14 @@
virtual Return<Result> disconnectCiCam() override;
// Functions interacts with Tuner Service
- void stopBroadcastInput();
+ void stopFrontendInput();
Result removeFilter(uint32_t filterId);
+ bool attachRecordFilter(int filterId);
+ bool detachRecordFilter(int filterId);
Result startFilterHandler(uint32_t filterId);
void updateFilterOutput(uint16_t filterId, vector<uint8_t> data);
uint16_t getFilterTpid(uint32_t filterId);
+ void setIsRecording(bool isRecording);
private:
// Tuner service
@@ -101,9 +104,9 @@
uint32_t filterId;
};
- Result startBroadcastInputLoop();
- static void* __threadLoopBroadcast(void* user);
- void broadcastInputThreadLoop();
+ Result startFrontendInputLoop();
+ static void* __threadLoopFrontend(void* user);
+ void frontendInputThreadLoop();
/**
* To create a FilterMQ with the the next available Filter ID.
@@ -117,9 +120,13 @@
/**
* A dispatcher to read and dispatch input data to all the started filters.
* Each filter handler handles the data filtering/output writing/filterEvent updating.
+ * Note that recording filters are not included.
*/
- bool startFilterDispatcher();
- void startTsFilter(vector<uint8_t> data);
+ bool startBroadcastFilterDispatcher();
+ void startBroadcastTsFilter(vector<uint8_t> data);
+
+ void sendFrontendInputToRecord(vector<uint8_t> data);
+ bool startRecordFilterDispatcher();
uint32_t mDemuxId;
uint32_t mCiCamId;
@@ -141,26 +148,40 @@
*/
set<uint32_t> mUnusedFilterIds;
/**
+ * Record all the attached record filter Ids.
+ * Any removed filter id should be removed from this set.
+ */
+ set<uint32_t> mRecordFilterIds;
+ /**
* A list of created FilterMQ ptrs.
* The array number is the filter ID.
*/
std::map<uint32_t, sp<Filter>> mFilters;
+ /**
+ * Local reference to the opened DVR object.
+ */
+ sp<Dvr> mDvr;
+
// Thread handlers
- pthread_t mBroadcastInputThread;
+ pthread_t mFrontendInputThread;
/**
* If a specific filter's writing loop is still running
*/
- bool mBroadcastInputThreadRunning;
+ bool mFrontendInputThreadRunning;
bool mKeepFetchingDataFromFrontend;
/**
+ * If the dvr recording is running.
+ */
+ bool mIsRecording = false;
+ /**
* Lock to protect writes to the FMQs
*/
std::mutex mWriteLock;
/**
* Lock to protect writes to the input status
*/
- std::mutex mBroadcastInputThreadLock;
+ std::mutex mFrontendInputThreadLock;
// temp handle single PES filter
// TODO handle mulptiple Pes filters
diff --git a/tv/tuner/1.0/default/Dvr.cpp b/tv/tuner/1.0/default/Dvr.cpp
index eb38f90..3088a9d 100644
--- a/tv/tuner/1.0/default/Dvr.cpp
+++ b/tv/tuner/1.0/default/Dvr.cpp
@@ -70,7 +70,14 @@
return status;
}
+ // check if the attached filter is a record filter
+
mFilters[filterId] = filter;
+ mIsRecordFilterAttached = true;
+ if (!mDemux->attachRecordFilter(filterId)) {
+ return Result::INVALID_ARGUMENT;
+ }
+ mDemux->setIsRecording(mIsRecordStarted | mIsRecordFilterAttached);
return Result::SUCCESS;
}
@@ -95,6 +102,15 @@
it = mFilters.find(filterId);
if (it != mFilters.end()) {
mFilters.erase(filterId);
+ if (!mDemux->detachRecordFilter(filterId)) {
+ return Result::INVALID_ARGUMENT;
+ }
+ }
+
+ // If all the filters are detached, record can't be started
+ if (mFilters.empty()) {
+ mIsRecordFilterAttached = false;
+ mDemux->setIsRecording(mIsRecordStarted | mIsRecordFilterAttached);
}
return Result::SUCCESS;
@@ -115,8 +131,9 @@
pthread_create(&mDvrThread, NULL, __threadLoopPlayback, this);
pthread_setname_np(mDvrThread, "playback_waiting_loop");
} else if (mType == DvrType::RECORD) {
- /*pthread_create(&mInputThread, NULL, __threadLoopInput, this);
- pthread_setname_np(mInputThread, "playback_waiting_loop");*/
+ mRecordStatus = RecordStatus::DATA_READY;
+ mIsRecordStarted = true;
+ mDemux->setIsRecording(mIsRecordStarted | mIsRecordFilterAttached);
}
// TODO start another thread to send filter status callback to the framework
@@ -131,12 +148,17 @@
std::lock_guard<std::mutex> lock(mDvrThreadLock);
+ mIsRecordStarted = false;
+ mDemux->setIsRecording(mIsRecordStarted | mIsRecordFilterAttached);
+
return Result::SUCCESS;
}
Return<Result> Dvr::flush() {
ALOGV("%s", __FUNCTION__);
+ mRecordStatus = RecordStatus::DATA_READY;
+
return Result::SUCCESS;
}
@@ -272,6 +294,45 @@
return true;
}
+bool Dvr::writeRecordFMQ(const std::vector<uint8_t>& data) {
+ std::lock_guard<std::mutex> lock(mWriteLock);
+ ALOGW("[Dvr] write record FMQ");
+ if (mDvrMQ->write(data.data(), data.size())) {
+ mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
+ maySendRecordStatusCallback();
+ return true;
+ }
+
+ maySendRecordStatusCallback();
+ return false;
+}
+
+void Dvr::maySendRecordStatusCallback() {
+ std::lock_guard<std::mutex> lock(mRecordStatusLock);
+ int availableToRead = mDvrMQ->availableToRead();
+ int availableToWrite = mDvrMQ->availableToWrite();
+
+ RecordStatus newStatus = checkRecordStatusChange(availableToWrite, availableToRead,
+ mDvrSettings.record().highThreshold,
+ mDvrSettings.record().lowThreshold);
+ if (mRecordStatus != newStatus) {
+ mCallback->onRecordStatus(newStatus);
+ mRecordStatus = newStatus;
+ }
+}
+
+RecordStatus Dvr::checkRecordStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
+ uint32_t highThreshold, uint32_t lowThreshold) {
+ if (availableToWrite == 0) {
+ return DemuxFilterStatus::OVERFLOW;
+ } else if (availableToRead > highThreshold) {
+ return DemuxFilterStatus::HIGH_WATER;
+ } else if (availableToRead < lowThreshold) {
+ return DemuxFilterStatus::LOW_WATER;
+ }
+ return mRecordStatus;
+}
+
} // namespace implementation
} // namespace V1_0
} // namespace tuner
diff --git a/tv/tuner/1.0/default/Dvr.h b/tv/tuner/1.0/default/Dvr.h
index fbb778c..f39d8db 100644
--- a/tv/tuner/1.0/default/Dvr.h
+++ b/tv/tuner/1.0/default/Dvr.h
@@ -79,6 +79,8 @@
* Return false is any of the above processes fails.
*/
bool createDvrMQ();
+ void sendBroadcastInputToDvrRecord(vector<uint8_t> byteBuffer);
+ bool writeRecordFMQ(const std::vector<uint8_t>& data);
private:
// Demux service
@@ -95,6 +97,8 @@
void maySendRecordStatusCallback();
PlaybackStatus checkPlaybackStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
uint32_t highThreshold, uint32_t lowThreshold);
+ RecordStatus checkRecordStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
+ uint32_t highThreshold, uint32_t lowThreshold);
/**
* A dispatcher to read and dispatch input data to all the started filters.
* Each filter handler handles the data filtering/output writing/filterEvent updating.
@@ -103,9 +107,9 @@
void startTpidFilter(vector<uint8_t> data);
bool startFilterDispatcher();
static void* __threadLoopPlayback(void* user);
- static void* __threadLoopBroadcast(void* user);
+ static void* __threadLoopRecord(void* user);
void playbackThreadLoop();
- void broadcastInputThreadLoop();
+ void recordThreadLoop();
unique_ptr<DvrMQ> mDvrMQ;
EventFlag* mDvrEventFlag;
@@ -121,6 +125,7 @@
// FMQ status local records
PlaybackStatus mPlaybackStatus;
+ RecordStatus mRecordStatus;
/**
* If a specific filter's writing loop is still running
*/
@@ -135,10 +140,16 @@
* Lock to protect writes to the input status
*/
std::mutex mPlaybackStatusLock;
+ std::mutex mRecordStatusLock;
std::mutex mBroadcastInputThreadLock;
std::mutex mDvrThreadLock;
const bool DEBUG_DVR = false;
+
+ // Booleans to check if recording is running.
+ // Recording is ready when both of the following are set to true.
+ bool mIsRecordStarted = false;
+ bool mIsRecordFilterAttached = false;
};
} // namespace implementation
diff --git a/tv/tuner/1.0/default/Filter.cpp b/tv/tuner/1.0/default/Filter.cpp
index befd1e6..b3160fc 100644
--- a/tv/tuner/1.0/default/Filter.cpp
+++ b/tv/tuner/1.0/default/Filter.cpp
@@ -265,10 +265,16 @@
void Filter::updateFilterOutput(vector<uint8_t> data) {
std::lock_guard<std::mutex> lock(mFilterOutputLock);
- ALOGD("[Filter] handler output updated");
+ ALOGD("[Filter] filter output updated");
mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
}
+void Filter::updateRecordOutput(vector<uint8_t> data) {
+ std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
+ ALOGD("[Filter] record filter output updated");
+ mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
+}
+
Result Filter::startFilterHandler() {
std::lock_guard<std::mutex> lock(mFilterOutputLock);
switch (mType.mainType) {
@@ -292,12 +298,11 @@
case DemuxTsFilterType::PCR:
startPcrFilterHandler();
break;
- case DemuxTsFilterType::RECORD:
- startRecordFilterHandler();
- break;
case DemuxTsFilterType::TEMI:
startTemiFilterHandler();
break;
+ default:
+ break;
}
break;
case DemuxFilterMainType::MMTP:
@@ -342,12 +347,16 @@
if (mPesSizeLeft == 0) {
uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
mFilterOutput[i + 6];
- ALOGD("[Filter] prefix %d", prefix);
+ if (DEBUG_FILTER) {
+ ALOGD("[Filter] prefix %d", prefix);
+ }
if (prefix == 0x000001) {
// TODO handle mulptiple Pes filters
mPesSizeLeft = (mFilterOutput[i + 8] << 8) | mFilterOutput[i + 9];
mPesSizeLeft += 6;
- ALOGD("[Filter] pes data length %d", mPesSizeLeft);
+ if (DEBUG_FILTER) {
+ ALOGD("[Filter] pes data length %d", mPesSizeLeft);
+ }
} else {
continue;
}
@@ -360,7 +369,9 @@
mPesOutput.insert(mPesOutput.end(), first, last);
// size does not match then continue
mPesSizeLeft -= endPoint;
- ALOGD("[Filter] pes data left %d", mPesSizeLeft);
+ if (DEBUG_FILTER) {
+ ALOGD("[Filter] pes data left %d", mPesSizeLeft);
+ }
if (mPesSizeLeft > 0) {
continue;
}
@@ -377,7 +388,9 @@
.streamId = mPesOutput[3],
.dataLength = static_cast<uint16_t>(mPesOutput.size()),
};
- ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
+ if (DEBUG_FILTER) {
+ ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
+ }
int size = mFilterEvent.events.size();
mFilterEvent.events.resize(size + 1);
@@ -413,13 +426,23 @@
}
Result Filter::startRecordFilterHandler() {
- DemuxFilterTsRecordEvent tsRecordEvent;
+ /*DemuxFilterTsRecordEvent tsRecordEvent;
tsRecordEvent.pid.tPid(0);
tsRecordEvent.indexMask.tsIndexMask(0x01);
mFilterEvent.events.resize(1);
mFilterEvent.events[0].tsRecord(tsRecordEvent);
+*/
+ std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
+ if (mRecordFilterOutput.empty()) {
+ return Result::SUCCESS;
+ }
- mFilterOutput.clear();
+ if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
+ ALOGD("[Filter] dvr fails to write into record FMQ.");
+ return Result::UNKNOWN_ERROR;
+ }
+
+ mRecordFilterOutput.clear();
return Result::SUCCESS;
}
@@ -462,6 +485,14 @@
return false;
}
+void Filter::attachFilterToRecord(const sp<Dvr> dvr) {
+ mDvr = dvr;
+}
+
+void Filter::detachFilterFromRecord() {
+ mDvr = nullptr;
+}
+
} // namespace implementation
} // namespace V1_0
} // namespace tuner
diff --git a/tv/tuner/1.0/default/Filter.h b/tv/tuner/1.0/default/Filter.h
index fbd965a..d397f73 100644
--- a/tv/tuner/1.0/default/Filter.h
+++ b/tv/tuner/1.0/default/Filter.h
@@ -22,6 +22,7 @@
#include <math.h>
#include <set>
#include "Demux.h"
+#include "Dvr.h"
#include "Frontend.h"
using namespace std;
@@ -44,6 +45,7 @@
using FilterMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>;
class Demux;
+class Dvr;
class Filter : public IFilter {
public:
@@ -80,11 +82,17 @@
bool createFilterMQ();
uint16_t getTpid();
void updateFilterOutput(vector<uint8_t> data);
+ void updateRecordOutput(vector<uint8_t> data);
Result startFilterHandler();
+ Result startRecordFilterHandler();
+ void attachFilterToRecord(const sp<Dvr> dvr);
+ void detachFilterFromRecord();
private:
// Tuner service
sp<Demux> mDemux;
+ // Dvr reference once the filter is attached to any
+ sp<Dvr> mDvr = nullptr;
/**
* Filter callbacks used on filter events or FMQ status
*/
@@ -99,6 +107,7 @@
sp<IFilter> mDataSource;
bool mIsDataSourceDemux = true;
vector<uint8_t> mFilterOutput;
+ vector<uint8_t> mRecordFilterOutput;
unique_ptr<FilterMQ> mFilterMQ;
EventFlag* mFilterEventFlag;
DemuxFilterEvent mFilterEvent;
@@ -120,6 +129,8 @@
*/
const uint16_t SECTION_WRITE_COUNT = 10;
+ bool DEBUG_FILTER = false;
+
/**
* Filter handlers to handle the data filtering.
* They are also responsible to write the filtered output into the filter FMQ
@@ -129,7 +140,6 @@
Result startPesFilterHandler();
Result startTsFilterHandler();
Result startMediaFilterHandler();
- Result startRecordFilterHandler();
Result startPcrFilterHandler();
Result startTemiFilterHandler();
Result startFilterLoop();
@@ -165,6 +175,7 @@
std::mutex mFilterStatusLock;
std::mutex mFilterThreadLock;
std::mutex mFilterOutputLock;
+ std::mutex mRecordFilterOutputLock;
// temp handle single PES filter
// TODO handle mulptiple Pes filters
diff --git a/tv/tuner/1.0/default/Tuner.cpp b/tv/tuner/1.0/default/Tuner.cpp
index f86b28d..c143d61 100644
--- a/tv/tuner/1.0/default/Tuner.cpp
+++ b/tv/tuner/1.0/default/Tuner.cpp
@@ -148,7 +148,7 @@
uint32_t demuxId;
if (it != mFrontendToDemux.end()) {
demuxId = it->second;
- mDemuxes[demuxId]->stopBroadcastInput();
+ mDemuxes[demuxId]->stopFrontendInput();
}
}