add filter delay hint default implementation
Adds a event buffering / scheduling mechanism that is configured using
the delay hint.
Bug: 183057734
Test: atest VtsHalTvTunerTargetTest
Change-Id: I154eb05bc419f827008161f85a6304a8599dc399
diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp
index c377772..345a984 100644
--- a/tv/tuner/aidl/default/Filter.cpp
+++ b/tv/tuner/aidl/default/Filter.cpp
@@ -34,16 +34,132 @@
#define WAIT_TIMEOUT 3000000000
-Filter::Filter() {}
+FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
+ : mCallback(cb), mDataLength(0), mTimeDelayInMs(0), mDataSizeDelayInBytes(0) {
+ start();
+}
+
+FilterCallbackScheduler::~FilterCallbackScheduler() {
+ stop();
+}
+
+void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
+ std::lock_guard<std::mutex> lock(mLock);
+ mCallbackBuffer.push_back(std::move(event));
+ mDataLength += getDemuxFilterEventDataLength(event);
+
+ if (mDataLength >= mDataSizeDelayInBytes) {
+ // size limit has been reached, send out events
+ mCv.notify_all();
+ }
+}
+
+void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
+ if (mCallback) {
+ mCallback->onFilterStatus(status);
+ }
+}
+
+void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
+ // updating the setTimeDelay does not go into effect until the condition
+ // variable times out or is notified.
+ // One possibility is to notify the condition variable right away when the
+ // time delay changes, but I don't see the benefit over waiting for the next
+ // timeout / push, since -- in any case -- this will not change very often.
+ mTimeDelayInMs = timeDelay;
+}
+
+void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
+ // similar to updating the time delay hint, when mDataSizeDelayInBytes
+ // changes, this will not go into immediate effect, but will wait until the
+ // next filterEvent.
+ // We could technically check the current data length and notify the
+ // condition variable if we wanted to, but again, this may be overkill.
+ mDataSizeDelayInBytes = dataSizeDelay;
+}
+
+bool FilterCallbackScheduler::hasCallbackRegistered() const {
+ return mCallback != nullptr;
+}
+
+void FilterCallbackScheduler::start() {
+ mIsRunning = true;
+ mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
+}
+
+void FilterCallbackScheduler::stop() {
+ mIsRunning = false;
+ if (mCallbackThread.joinable()) {
+ mCallbackThread.join();
+ }
+}
+
+void FilterCallbackScheduler::threadLoop() {
+ while (mIsRunning) {
+ threadLoopOnce();
+ }
+}
+
+void FilterCallbackScheduler::threadLoopOnce() {
+ std::unique_lock<std::mutex> lock(mLock);
+ // mTimeDelayInMs is an atomic value, so it should be copied into a local
+ // variable before use (to make sure both the if statement and wait_for use
+ // the same value).
+ int timeDelayInMs = mTimeDelayInMs;
+ if (timeDelayInMs > 0) {
+ mCv.wait_for(lock, std::chrono::milliseconds(timeDelayInMs));
+ } else {
+ // no reason to timeout, just wait until main thread determines it's
+ // okay to send data.
+ mCv.wait(lock);
+ }
+
+ // condition_variable wait locks mutex on timeout / notify
+ if (!mCallbackBuffer.empty()) {
+ if (mCallback) {
+ mCallback->onFilterEvent(mCallbackBuffer);
+ }
+ mCallbackBuffer.clear();
+ mDataLength = 0;
+ }
+ lock.unlock();
+}
+
+int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
+ // there is a risk that dataLength could be a negative value, but it
+ // *should* be safe to assume that it is always positive.
+ switch (event.getTag()) {
+ case DemuxFilterEvent::Tag::section:
+ return event.get<DemuxFilterEvent::Tag::section>().dataLength;
+ case DemuxFilterEvent::Tag::media:
+ return event.get<DemuxFilterEvent::Tag::media>().dataLength;
+ case DemuxFilterEvent::Tag::pes:
+ return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
+ case DemuxFilterEvent::Tag::download:
+ return event.get<DemuxFilterEvent::Tag::download>().dataLength;
+ case DemuxFilterEvent::Tag::ipPayload:
+ return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;
+
+ case DemuxFilterEvent::Tag::tsRecord:
+ case DemuxFilterEvent::Tag::mmtpRecord:
+ case DemuxFilterEvent::Tag::temi:
+ case DemuxFilterEvent::Tag::monitorEvent:
+ case DemuxFilterEvent::Tag::startId:
+ // these events do not include a payload and should therefore return
+ // 0.
+ // do not add a default option, so this will not compile when new types
+ // are added.
+ return 0;
+ }
+}
Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
- const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) {
- mType = type;
- mFilterId = filterId;
- mBufferSize = bufferSize;
- mDemux = demux;
- mCallback = cb;
-
+ const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
+ : mDemux(demux),
+ mCallbackScheduler(cb),
+ mFilterId(filterId),
+ mBufferSize(bufferSize),
+ mType(type) {
switch (mType.mainType) {
case DemuxFilterMainType::TS:
if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
@@ -112,9 +228,29 @@
}
::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
+ if (mIsMediaFilter) {
+ // delay hint is not supported for media filters
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::UNAVAILABLE));
+ }
+
ALOGV("%s", __FUNCTION__);
- (void)in_hint;
- // TODO: implement
+ if (in_hint.hintValue < 0) {
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::INVALID_ARGUMENT));
+ }
+
+ switch (in_hint.hintType) {
+ case FilterDelayHintType::TIME_DELAY_IN_MS:
+ mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
+ break;
+ case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
+ mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
+ break;
+ default:
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(
+ static_cast<int32_t>(Result::INVALID_ARGUMENT));
+ }
return ::ndk::ScopedAStatus::ok();
}
@@ -155,40 +291,36 @@
::ndk::ScopedAStatus Filter::start() {
ALOGV("%s", __FUNCTION__);
mFilterThreadRunning = true;
- vector<DemuxFilterEvent> events;
+ std::vector<DemuxFilterEvent> events;
// All the filter event callbacks in start are for testing purpose.
switch (mType.mainType) {
case DemuxFilterMainType::TS:
createMediaEvent(events);
- mCallback->onFilterEvent(events);
createTsRecordEvent(events);
- mCallback->onFilterEvent(events);
createTemiEvent(events);
- mCallback->onFilterEvent(events);
break;
case DemuxFilterMainType::MMTP:
createDownloadEvent(events);
- mCallback->onFilterEvent(events);
createMmtpRecordEvent(events);
- mCallback->onFilterEvent(events);
break;
case DemuxFilterMainType::IP:
createSectionEvent(events);
- mCallback->onFilterEvent(events);
createIpPayloadEvent(events);
- mCallback->onFilterEvent(events);
break;
case DemuxFilterMainType::TLV:
createMonitorEvent(events);
- mCallback->onFilterEvent(events);
break;
case DemuxFilterMainType::ALP:
createMonitorEvent(events);
- mCallback->onFilterEvent(events);
break;
default:
break;
}
+
+ for (auto&& event : events) {
+ mCallbackScheduler.onFilterEvent(std::move(event));
+ }
+
return startFilterLoop();
}
@@ -327,15 +459,14 @@
if (newScramblingStatus ^ mScramblingStatusMonitored) {
mScramblingStatusMonitored = newScramblingStatus;
if (mScramblingStatusMonitored) {
- if (mCallback != nullptr) {
+ if (mCallbackScheduler.hasCallbackRegistered()) {
// Assuming current status is always NOT_SCRAMBLED
- vector<DemuxFilterEvent> events;
- DemuxFilterMonitorEvent monitorEvent;
- events.resize(1);
- monitorEvent.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(
+ auto monitorEvent = DemuxFilterMonitorEvent::make<
+ DemuxFilterMonitorEvent::Tag::scramblingStatus>(
ScramblingStatus::NOT_SCRAMBLED);
- events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent);
- mCallback->onFilterEvent(events);
+ auto event =
+ DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
+ mCallbackScheduler.onFilterEvent(std::move(event));
} else {
return ::ndk::ScopedAStatus::fromServiceSpecificError(
static_cast<int32_t>(Result::INVALID_STATE));
@@ -347,14 +478,13 @@
if (newIpCid ^ mIpCidMonitored) {
mIpCidMonitored = newIpCid;
if (mIpCidMonitored) {
- if (mCallback != nullptr) {
+ if (mCallbackScheduler.hasCallbackRegistered()) {
// Return random cid
- vector<DemuxFilterEvent> events;
- DemuxFilterMonitorEvent monitorEvent;
- events.resize(1);
- monitorEvent.set<DemuxFilterMonitorEvent::Tag::cid>(1);
- events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent);
- mCallback->onFilterEvent(events);
+ auto monitorEvent =
+ DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
+ auto event =
+ DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
+ mCallbackScheduler.onFilterEvent(std::move(event));
} else {
return ::ndk::ScopedAStatus::fromServiceSpecificError(
static_cast<int32_t>(Result::INVALID_STATE));
@@ -410,26 +540,26 @@
}
// After successfully write, send a callback and wait for the read to be done
- if (mCallback != nullptr) {
+ if (mCallbackScheduler.hasCallbackRegistered()) {
if (mConfigured) {
- vector<DemuxFilterEvent> startEvent;
- startEvent.resize(1);
- startEvent[0].set<DemuxFilterEvent::Tag::startId>(mStartId++);
- mCallback->onFilterEvent(startEvent);
+ auto startEvent =
+ DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
+ mCallbackScheduler.onFilterEvent(std::move(startEvent));
mConfigured = false;
}
- mCallback->onFilterEvent(mFilterEvents);
+
+ for (auto&& event : mFilterEvents) {
+ mCallbackScheduler.onFilterEvent(std::move(event));
+ }
} else {
ALOGD("[Filter] filter callback is not configured yet.");
mFilterThreadRunning = false;
return;
}
- mFilterEvents.resize(0);
+ mFilterEvents.clear();
mFilterStatus = DemuxFilterStatus::DATA_READY;
- if (mCallback != nullptr) {
- mCallback->onFilterStatus(mFilterStatus);
- }
+ mCallbackScheduler.onFilterStatus(mFilterStatus);
break;
}
@@ -460,10 +590,10 @@
continue;
}
// After successfully write, send a callback and wait for the read to be done
- if (mCallback != nullptr) {
- mCallback->onFilterEvent(mFilterEvents);
+ for (auto&& event : mFilterEvents) {
+ mCallbackScheduler.onFilterEvent(std::move(event));
}
- mFilterEvents.resize(0);
+ mFilterEvents.clear();
break;
}
// We do not wait for the last read to be done
@@ -499,9 +629,7 @@
DemuxFilterStatus newStatus = checkFilterStatusChange(
availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
if (mFilterStatus != newStatus) {
- if (mCallback != nullptr) {
- mCallback->onFilterStatus(newStatus);
- }
+ mCallbackScheduler.onFilterStatus(newStatus);
mFilterStatus = newStatus;
}
}
@@ -657,9 +785,7 @@
ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
}
- int size = mFilterEvents.size();
- mFilterEvents.resize(size + 1);
- mFilterEvents[size].set<DemuxFilterEvent::Tag::pes>(pesEvent);
+ mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
mPesOutput.clear();
}
@@ -763,11 +889,7 @@
.firstMbInSlice = 0, // random address
};
- int size;
- size = mFilterEvents.size();
- mFilterEvents.resize(size + 1);
- mFilterEvents[size].set<DemuxFilterEvent::Tag::tsRecord>(recordEvent);
-
+ mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
mRecordFilterOutput.clear();
return ::ndk::ScopedAStatus::ok();
}
@@ -789,8 +911,6 @@
if (!writeDataToFilterMQ(data)) {
return false;
}
- int size = mFilterEvents.size();
- mFilterEvents.resize(size + 1);
DemuxFilterSectionEvent secEvent;
secEvent = {
// temp dump meta data
@@ -799,7 +919,7 @@
.sectionNum = 1,
.dataLength = static_cast<int32_t>(data.size()),
};
- mFilterEvents[size].set<DemuxFilterEvent::Tag::section>(secEvent);
+ mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
return true;
}
@@ -888,19 +1008,16 @@
mDataId2Avfd[dataId] = dup(av_fd);
// Create mediaEvent and send callback
- int size = mFilterEvents.size();
- mFilterEvents.resize(size + 1);
-
- mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory =
- ::android::dupToAidl(nativeHandle);
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength =
- static_cast<int64_t>(output.size());
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avDataId = static_cast<int64_t>(dataId);
+ auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
+ auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
+ mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
+ mediaEvent.dataLength = static_cast<int64_t>(output.size());
+ mediaEvent.avDataId = static_cast<int64_t>(dataId);
if (mPts) {
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts;
+ mediaEvent.pts = mPts;
mPts = 0;
}
+ mFilterEvents.push_back(std::move(event));
// Clear and log
native_handle_close(nativeHandle);
@@ -931,18 +1048,17 @@
}
// Create mediaEvent and send callback
- int size = mFilterEvents.size();
- mFilterEvents.resize(size + 1);
- mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory =
- ::android::dupToAidl(nativeHandle);
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().offset = mSharedAvMemOffset;
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength =
- static_cast<int64_t>(output.size());
+ auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
+ auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
+ mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
+ mediaEvent.offset = mSharedAvMemOffset;
+ mediaEvent.dataLength = static_cast<int64_t>(output.size());
if (mPts) {
- mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts;
+ mediaEvent.pts = mPts;
mPts = 0;
}
+ mFilterEvents.push_back(std::move(event));
+
mSharedAvMemOffset += output.size();
// Clear and log
diff --git a/tv/tuner/aidl/default/Filter.h b/tv/tuner/aidl/default/Filter.h
index 11bbc11..a5adf4c 100644
--- a/tv/tuner/aidl/default/Filter.h
+++ b/tv/tuner/aidl/default/Filter.h
@@ -18,6 +18,8 @@
#include <aidl/android/hardware/tv/tuner/BnFilter.h>
#include <aidl/android/hardware/tv/tuner/Constant.h>
+#include <aidl/android/hardware/tv/tuner/DemuxFilterEvent.h>
+#include <aidl/android/hardware/tv/tuner/DemuxFilterStatus.h>
#include <fmq/AidlMessageQueue.h>
#include <inttypes.h>
@@ -25,6 +27,7 @@
#include <math.h>
#include <sys/stat.h>
#include <atomic>
+#include <condition_variable>
#include <set>
#include <thread>
@@ -52,10 +55,49 @@
class Demux;
class Dvr;
-class Filter : public BnFilter {
+class FilterCallbackScheduler final {
public:
- Filter();
+ FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb);
+ ~FilterCallbackScheduler();
+ void onFilterEvent(DemuxFilterEvent&& event);
+ void onFilterStatus(const DemuxFilterStatus& status);
+
+ void setTimeDelayHint(int timeDelay);
+ void setDataSizeDelayHint(int dataSizeDelay);
+
+ bool hasCallbackRegistered() const;
+
+ private:
+ void start();
+ void stop();
+
+ void threadLoop();
+ void threadLoopOnce();
+
+ static int getDemuxFilterEventDataLength(const DemuxFilterEvent& event);
+
+ private:
+ std::shared_ptr<IFilterCallback> mCallback;
+ std::thread mCallbackThread;
+ std::atomic<bool> mIsRunning;
+
+ // mLock protects mCallbackBuffer, mCv, and mDataLength
+ std::mutex mLock;
+ std::vector<DemuxFilterEvent> mCallbackBuffer;
+ std::condition_variable mCv;
+ int mDataLength;
+
+ // both of these need to be atomic (not just mTimeDelayInMs) as this class
+ // needs to be threadsafe.
+ std::atomic<int> mTimeDelayInMs;
+ std::atomic<int> mDataSizeDelayInBytes;
+};
+
+class Filter : public BnFilter {
+ friend class FilterCallbackScheduler;
+
+ public:
Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux);
@@ -104,10 +146,8 @@
std::shared_ptr<Demux> mDemux;
// Dvr reference once the filter is attached to any
std::shared_ptr<Dvr> mDvr = nullptr;
- /**
- * Filter callbacks used on filter events or FMQ status
- */
- std::shared_ptr<IFilterCallback> mCallback = nullptr;
+
+ FilterCallbackScheduler mCallbackScheduler;
int64_t mFilterId;
int32_t mCid = static_cast<int32_t>(Constant::INVALID_IP_FILTER_CONTEXT_ID);