Adding filter status tracking logic into Demux default impl
Test: manual
Bug: 135709325
Change-Id: I2d473c250dc4d87263d5e7004095d144da684223
diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp
index d65df59..b998468 100644
--- a/tv/tuner/1.0/default/Demux.cpp
+++ b/tv/tuner/1.0/default/Demux.cpp
@@ -106,7 +106,7 @@
} else {
filterId = ++mLastUsedFilterId;
- mDemuxCallbacks.resize(filterId + 1);
+ mFilterCallbacks.resize(filterId + 1);
mFilterMQs.resize(filterId + 1);
mFilterEvents.resize(filterId + 1);
mFilterEventFlags.resize(filterId + 1);
@@ -114,6 +114,7 @@
mFilterThreads.resize(filterId + 1);
mFilterPids.resize(filterId + 1);
mFilterOutputs.resize(filterId + 1);
+ mFilterStatus.resize(filterId + 1);
}
mUsedFilterIds.insert(filterId);
@@ -125,7 +126,7 @@
}
// Add callback
- mDemuxCallbacks[filterId] = cb;
+ mFilterCallbacks[filterId] = cb;
// Mapping from the filter ID to the filter event
DemuxFilterEvent event{
@@ -211,9 +212,16 @@
return Result::SUCCESS;
}
-Return<Result> Demux::flushFilter(uint32_t /* filterId */) {
+Return<Result> Demux::flushFilter(uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
+ // temp implementation to flush the FMQ
+ int size = mFilterMQs[filterId]->availableToRead();
+ char* buffer = new char[size];
+ mOutputMQ->read((unsigned char*)&buffer[0], size);
+ delete[] buffer;
+ mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
+
return Result::SUCCESS;
}
@@ -254,7 +262,7 @@
mFilterThreads.clear();
mUnusedFilterIds.clear();
mUsedFilterIds.clear();
- mDemuxCallbacks.clear();
+ mFilterCallbacks.clear();
mFilterMQs.clear();
mFilterEvents.clear();
mFilterEventFlags.clear();
@@ -475,6 +483,7 @@
mFilterOutputs[filterId].clear();
return Result::INVALID_STATE;
}
+ maySendFilterStatusCallback(filterId);
pesEvent = {
// temp dump meta data
.streamId = filterOutData[3],
@@ -672,8 +681,10 @@
continue;
}
// After successfully write, send a callback and wait for the read to be done
- mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
+ mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
mFilterEvents[filterId].events.resize(0);
+ mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
+ mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
break;
}
@@ -693,18 +704,20 @@
break;
}
- if (mDemuxCallbacks[filterId] == nullptr) {
+ if (mFilterCallbacks[filterId] == nullptr) {
ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
break;
}
+ maySendFilterStatusCallback(filterId);
+
while (mFilterThreadRunning[filterId]) {
std::lock_guard<std::mutex> lock(mFilterEventLock);
if (mFilterEvents[filterId].events.size() == 0) {
continue;
}
// After successfully write, send a callback and wait for the read to be done
- mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
+ mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
mFilterEvents[filterId].events.resize(0);
break;
}
@@ -755,16 +768,31 @@
int availableToWrite = mInputMQ->availableToWrite();
DemuxInputStatus newStatus =
- checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
- mInputSettings.lowThreshold);
+ checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
+ mInputSettings.lowThreshold);
if (mIntputStatus != newStatus) {
mInputCallback->onInputStatus(newStatus);
mIntputStatus = newStatus;
}
}
-DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
- uint32_t highThreshold, uint32_t lowThreshold) {
+void Demux::maySendFilterStatusCallback(uint32_t filterId) {
+ std::lock_guard<std::mutex> lock(mFilterStatusLock);
+ int availableToRead = mFilterMQs[filterId]->availableToRead();
+ int availableToWrite = mInputMQ->availableToWrite();
+ int fmqSize = mFilterMQs[filterId]->getQuantumCount();
+
+ DemuxFilterStatus newStatus =
+ checkFilterStatusChange(filterId, availableToWrite, availableToRead,
+ ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
+ if (mFilterStatus[filterId] != newStatus) {
+ mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
+ mFilterStatus[filterId] = newStatus;
+ }
+}
+
+DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
+ uint32_t highThreshold, uint32_t lowThreshold) {
if (availableToWrite == 0) {
return DemuxInputStatus::SPACE_FULL;
} else if (availableToRead > highThreshold) {
@@ -777,6 +805,19 @@
return mIntputStatus;
}
+DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
+ uint32_t availableToRead, uint32_t highThreshold,
+ uint32_t lowThreshold) {
+ if (availableToWrite == 0) {
+ return DemuxFilterStatus::OVERFLOW;
+ } else if (availableToRead > highThreshold) {
+ return DemuxFilterStatus::HIGH_WATER;
+ } else if (availableToRead < lowThreshold) {
+ return DemuxFilterStatus::LOW_WATER;
+ }
+ return mFilterStatus[filterId];
+}
+
Result Demux::startBroadcastInputLoop() {
pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
@@ -818,7 +859,7 @@
}
// filter and dispatch filter output
vector<uint8_t> byteBuffer;
- byteBuffer.resize(sizeof(buffer));
+ byteBuffer.resize(packetSize);
for (int index = 0; index < byteBuffer.size(); index++) {
byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
}