add filter delay hint default implementation

Adds a event buffering / scheduling mechanism that is configured using
the delay hint.

Bug: 183057734
Test: atest VtsHalTvTunerTargetTest
Change-Id: I154eb05bc419f827008161f85a6304a8599dc399
diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp
index c377772..345a984 100644
--- a/tv/tuner/aidl/default/Filter.cpp
+++ b/tv/tuner/aidl/default/Filter.cpp
@@ -34,16 +34,132 @@
 
 #define WAIT_TIMEOUT 3000000000
 
-Filter::Filter() {}
+FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
+    : mCallback(cb), mDataLength(0), mTimeDelayInMs(0), mDataSizeDelayInBytes(0) {
+    start();
+}
+
+FilterCallbackScheduler::~FilterCallbackScheduler() {
+    stop();
+}
+
+void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
+    std::lock_guard<std::mutex> lock(mLock);
+    mCallbackBuffer.push_back(std::move(event));
+    mDataLength += getDemuxFilterEventDataLength(event);
+
+    if (mDataLength >= mDataSizeDelayInBytes) {
+        // size limit has been reached, send out events
+        mCv.notify_all();
+    }
+}
+
+void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
+    if (mCallback) {
+        mCallback->onFilterStatus(status);
+    }
+}
+
+void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
+    // updating the setTimeDelay does not go into effect until the condition
+    // variable times out or is notified.
+    // One possibility is to notify the condition variable right away when the
+    // time delay changes, but I don't see the benefit over waiting for the next
+    // timeout / push, since -- in any case -- this will not change very often.
+    mTimeDelayInMs = timeDelay;
+}
+
+void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
+    // similar to updating the time delay hint, when mDataSizeDelayInBytes
+    // changes, this will not go into immediate effect, but will wait until the
+    // next filterEvent.
+    // We could technically check the current data length and notify the
+    // condition variable if we wanted to, but again, this may be overkill.
+    mDataSizeDelayInBytes = dataSizeDelay;
+}
+
+bool FilterCallbackScheduler::hasCallbackRegistered() const {
+    return mCallback != nullptr;
+}
+
+void FilterCallbackScheduler::start() {
+    mIsRunning = true;
+    mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
+}
+
+void FilterCallbackScheduler::stop() {
+    mIsRunning = false;
+    if (mCallbackThread.joinable()) {
+        mCallbackThread.join();
+    }
+}
+
+void FilterCallbackScheduler::threadLoop() {
+    while (mIsRunning) {
+        threadLoopOnce();
+    }
+}
+
+void FilterCallbackScheduler::threadLoopOnce() {
+    std::unique_lock<std::mutex> lock(mLock);
+    // mTimeDelayInMs is an atomic value, so it should be copied into a local
+    // variable before use (to make sure both the if statement and wait_for use
+    // the same value).
+    int timeDelayInMs = mTimeDelayInMs;
+    if (timeDelayInMs > 0) {
+        mCv.wait_for(lock, std::chrono::milliseconds(timeDelayInMs));
+    } else {
+        // no reason to timeout, just wait until main thread determines it's
+        // okay to send data.
+        mCv.wait(lock);
+    }
+
+    // condition_variable wait locks mutex on timeout / notify
+    if (!mCallbackBuffer.empty()) {
+        if (mCallback) {
+            mCallback->onFilterEvent(mCallbackBuffer);
+        }
+        mCallbackBuffer.clear();
+        mDataLength = 0;
+    }
+    lock.unlock();
+}
+
+int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
+    // there is a risk that dataLength could be a negative value, but it
+    // *should* be safe to assume that it is always positive.
+    switch (event.getTag()) {
+        case DemuxFilterEvent::Tag::section:
+            return event.get<DemuxFilterEvent::Tag::section>().dataLength;
+        case DemuxFilterEvent::Tag::media:
+            return event.get<DemuxFilterEvent::Tag::media>().dataLength;
+        case DemuxFilterEvent::Tag::pes:
+            return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
+        case DemuxFilterEvent::Tag::download:
+            return event.get<DemuxFilterEvent::Tag::download>().dataLength;
+        case DemuxFilterEvent::Tag::ipPayload:
+            return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;
+
+        case DemuxFilterEvent::Tag::tsRecord:
+        case DemuxFilterEvent::Tag::mmtpRecord:
+        case DemuxFilterEvent::Tag::temi:
+        case DemuxFilterEvent::Tag::monitorEvent:
+        case DemuxFilterEvent::Tag::startId:
+            // these events do not include a payload and should therefore return
+            // 0.
+            // do not add a default option, so this will not compile when new types
+            // are added.
+            return 0;
+    }
+}
 
 Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
-               const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) {
-    mType = type;
-    mFilterId = filterId;
-    mBufferSize = bufferSize;
-    mDemux = demux;
-    mCallback = cb;
-
+               const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
+    : mDemux(demux),
+      mCallbackScheduler(cb),
+      mFilterId(filterId),
+      mBufferSize(bufferSize),
+      mType(type) {
     switch (mType.mainType) {
         case DemuxFilterMainType::TS:
             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
@@ -112,9 +228,29 @@
 }
 
 ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
+    if (mIsMediaFilter) {
+        // delay hint is not supported for media filters
+        return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                static_cast<int32_t>(Result::UNAVAILABLE));
+    }
+
     ALOGV("%s", __FUNCTION__);
-    (void)in_hint;
-    // TODO: implement
+    if (in_hint.hintValue < 0) {
+        return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                static_cast<int32_t>(Result::INVALID_ARGUMENT));
+    }
+
+    switch (in_hint.hintType) {
+        case FilterDelayHintType::TIME_DELAY_IN_MS:
+            mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
+            break;
+        case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
+            mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
+            break;
+        default:
+            return ::ndk::ScopedAStatus::fromServiceSpecificError(
+                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
+    }
 
     return ::ndk::ScopedAStatus::ok();
 }
@@ -155,40 +291,36 @@
 ::ndk::ScopedAStatus Filter::start() {
     ALOGV("%s", __FUNCTION__);
     mFilterThreadRunning = true;
-    vector<DemuxFilterEvent> events;
+    std::vector<DemuxFilterEvent> events;
     // All the filter event callbacks in start are for testing purpose.
     switch (mType.mainType) {
         case DemuxFilterMainType::TS:
             createMediaEvent(events);
-            mCallback->onFilterEvent(events);
             createTsRecordEvent(events);
-            mCallback->onFilterEvent(events);
             createTemiEvent(events);
-            mCallback->onFilterEvent(events);
             break;
         case DemuxFilterMainType::MMTP:
             createDownloadEvent(events);
-            mCallback->onFilterEvent(events);
             createMmtpRecordEvent(events);
-            mCallback->onFilterEvent(events);
             break;
         case DemuxFilterMainType::IP:
             createSectionEvent(events);
-            mCallback->onFilterEvent(events);
             createIpPayloadEvent(events);
-            mCallback->onFilterEvent(events);
             break;
         case DemuxFilterMainType::TLV:
             createMonitorEvent(events);
-            mCallback->onFilterEvent(events);
             break;
         case DemuxFilterMainType::ALP:
             createMonitorEvent(events);
-            mCallback->onFilterEvent(events);
             break;
         default:
             break;
     }
+
+    for (auto&& event : events) {
+        mCallbackScheduler.onFilterEvent(std::move(event));
+    }
+
     return startFilterLoop();
 }
 
@@ -327,15 +459,14 @@
     if (newScramblingStatus ^ mScramblingStatusMonitored) {
         mScramblingStatusMonitored = newScramblingStatus;
         if (mScramblingStatusMonitored) {
-            if (mCallback != nullptr) {
+            if (mCallbackScheduler.hasCallbackRegistered()) {
                 // Assuming current status is always NOT_SCRAMBLED
-                vector<DemuxFilterEvent> events;
-                DemuxFilterMonitorEvent monitorEvent;
-                events.resize(1);
-                monitorEvent.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(
+                auto monitorEvent = DemuxFilterMonitorEvent::make<
+                        DemuxFilterMonitorEvent::Tag::scramblingStatus>(
                         ScramblingStatus::NOT_SCRAMBLED);
-                events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent);
-                mCallback->onFilterEvent(events);
+                auto event =
+                        DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
+                mCallbackScheduler.onFilterEvent(std::move(event));
             } else {
                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
                         static_cast<int32_t>(Result::INVALID_STATE));
@@ -347,14 +478,13 @@
     if (newIpCid ^ mIpCidMonitored) {
         mIpCidMonitored = newIpCid;
         if (mIpCidMonitored) {
-            if (mCallback != nullptr) {
+            if (mCallbackScheduler.hasCallbackRegistered()) {
                 // Return random cid
-                vector<DemuxFilterEvent> events;
-                DemuxFilterMonitorEvent monitorEvent;
-                events.resize(1);
-                monitorEvent.set<DemuxFilterMonitorEvent::Tag::cid>(1);
-                events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent);
-                mCallback->onFilterEvent(events);
+                auto monitorEvent =
+                        DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
+                auto event =
+                        DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
+                mCallbackScheduler.onFilterEvent(std::move(event));
             } else {
                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
                         static_cast<int32_t>(Result::INVALID_STATE));
@@ -410,26 +540,26 @@
         }
 
         // After successfully write, send a callback and wait for the read to be done
-        if (mCallback != nullptr) {
+        if (mCallbackScheduler.hasCallbackRegistered()) {
             if (mConfigured) {
-                vector<DemuxFilterEvent> startEvent;
-                startEvent.resize(1);
-                startEvent[0].set<DemuxFilterEvent::Tag::startId>(mStartId++);
-                mCallback->onFilterEvent(startEvent);
+                auto startEvent =
+                        DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
+                mCallbackScheduler.onFilterEvent(std::move(startEvent));
                 mConfigured = false;
             }
-            mCallback->onFilterEvent(mFilterEvents);
+
+            for (auto&& event : mFilterEvents) {
+                mCallbackScheduler.onFilterEvent(std::move(event));
+            }
         } else {
             ALOGD("[Filter] filter callback is not configured yet.");
             mFilterThreadRunning = false;
             return;
         }
 
-        mFilterEvents.resize(0);
+        mFilterEvents.clear();
         mFilterStatus = DemuxFilterStatus::DATA_READY;
-        if (mCallback != nullptr) {
-            mCallback->onFilterStatus(mFilterStatus);
-        }
+        mCallbackScheduler.onFilterStatus(mFilterStatus);
         break;
     }
 
@@ -460,10 +590,10 @@
                     continue;
                 }
                 // After successfully write, send a callback and wait for the read to be done
-                if (mCallback != nullptr) {
-                    mCallback->onFilterEvent(mFilterEvents);
+                for (auto&& event : mFilterEvents) {
+                    mCallbackScheduler.onFilterEvent(std::move(event));
                 }
-                mFilterEvents.resize(0);
+                mFilterEvents.clear();
                 break;
             }
             // We do not wait for the last read to be done
@@ -499,9 +629,7 @@
     DemuxFilterStatus newStatus = checkFilterStatusChange(
             availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
     if (mFilterStatus != newStatus) {
-        if (mCallback != nullptr) {
-            mCallback->onFilterStatus(newStatus);
-        }
+        mCallbackScheduler.onFilterStatus(newStatus);
         mFilterStatus = newStatus;
     }
 }
@@ -657,9 +785,7 @@
             ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
         }
 
-        int size = mFilterEvents.size();
-        mFilterEvents.resize(size + 1);
-        mFilterEvents[size].set<DemuxFilterEvent::Tag::pes>(pesEvent);
+        mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
         mPesOutput.clear();
     }
 
@@ -763,11 +889,7 @@
             .firstMbInSlice = 0,  // random address
     };
 
-    int size;
-    size = mFilterEvents.size();
-    mFilterEvents.resize(size + 1);
-    mFilterEvents[size].set<DemuxFilterEvent::Tag::tsRecord>(recordEvent);
-
+    mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
     mRecordFilterOutput.clear();
     return ::ndk::ScopedAStatus::ok();
 }
@@ -789,8 +911,6 @@
     if (!writeDataToFilterMQ(data)) {
         return false;
     }
-    int size = mFilterEvents.size();
-    mFilterEvents.resize(size + 1);
     DemuxFilterSectionEvent secEvent;
     secEvent = {
             // temp dump meta data
@@ -799,7 +919,7 @@
             .sectionNum = 1,
             .dataLength = static_cast<int32_t>(data.size()),
     };
-    mFilterEvents[size].set<DemuxFilterEvent::Tag::section>(secEvent);
+    mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
     return true;
 }
 
@@ -888,19 +1008,16 @@
     mDataId2Avfd[dataId] = dup(av_fd);
 
     // Create mediaEvent and send callback
-    int size = mFilterEvents.size();
-    mFilterEvents.resize(size + 1);
-
-    mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
-    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory =
-            ::android::dupToAidl(nativeHandle);
-    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength =
-            static_cast<int64_t>(output.size());
-    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avDataId = static_cast<int64_t>(dataId);
+    auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
+    auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
+    mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
+    mediaEvent.dataLength = static_cast<int64_t>(output.size());
+    mediaEvent.avDataId = static_cast<int64_t>(dataId);
     if (mPts) {
-        mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts;
+        mediaEvent.pts = mPts;
         mPts = 0;
     }
+    mFilterEvents.push_back(std::move(event));
 
     // Clear and log
     native_handle_close(nativeHandle);
@@ -931,18 +1048,17 @@
     }
 
     // Create mediaEvent and send callback
-    int size = mFilterEvents.size();
-    mFilterEvents.resize(size + 1);
-    mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
-    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory =
-            ::android::dupToAidl(nativeHandle);
-    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().offset = mSharedAvMemOffset;
-    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength =
-            static_cast<int64_t>(output.size());
+    auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
+    auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
+    mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
+    mediaEvent.offset = mSharedAvMemOffset;
+    mediaEvent.dataLength = static_cast<int64_t>(output.size());
     if (mPts) {
-        mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts;
+        mediaEvent.pts = mPts;
         mPts = 0;
     }
+    mFilterEvents.push_back(std::move(event));
+
     mSharedAvMemOffset += output.size();
 
     // Clear and log
diff --git a/tv/tuner/aidl/default/Filter.h b/tv/tuner/aidl/default/Filter.h
index 11bbc11..a5adf4c 100644
--- a/tv/tuner/aidl/default/Filter.h
+++ b/tv/tuner/aidl/default/Filter.h
@@ -18,6 +18,8 @@
 
 #include <aidl/android/hardware/tv/tuner/BnFilter.h>
 #include <aidl/android/hardware/tv/tuner/Constant.h>
+#include <aidl/android/hardware/tv/tuner/DemuxFilterEvent.h>
+#include <aidl/android/hardware/tv/tuner/DemuxFilterStatus.h>
 
 #include <fmq/AidlMessageQueue.h>
 #include <inttypes.h>
@@ -25,6 +27,7 @@
 #include <math.h>
 #include <sys/stat.h>
 #include <atomic>
+#include <condition_variable>
 #include <set>
 #include <thread>
 
@@ -52,10 +55,49 @@
 class Demux;
 class Dvr;
 
-class Filter : public BnFilter {
+class FilterCallbackScheduler final {
   public:
-    Filter();
+    FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb);
+    ~FilterCallbackScheduler();
 
+    void onFilterEvent(DemuxFilterEvent&& event);
+    void onFilterStatus(const DemuxFilterStatus& status);
+
+    void setTimeDelayHint(int timeDelay);
+    void setDataSizeDelayHint(int dataSizeDelay);
+
+    bool hasCallbackRegistered() const;
+
+  private:
+    void start();
+    void stop();
+
+    void threadLoop();
+    void threadLoopOnce();
+
+    static int getDemuxFilterEventDataLength(const DemuxFilterEvent& event);
+
+  private:
+    std::shared_ptr<IFilterCallback> mCallback;
+    std::thread mCallbackThread;
+    std::atomic<bool> mIsRunning;
+
+    // mLock protects mCallbackBuffer, mCv, and mDataLength
+    std::mutex mLock;
+    std::vector<DemuxFilterEvent> mCallbackBuffer;
+    std::condition_variable mCv;
+    int mDataLength;
+
+    // both of these need to be atomic (not just mTimeDelayInMs) as this class
+    // needs to be threadsafe.
+    std::atomic<int> mTimeDelayInMs;
+    std::atomic<int> mDataSizeDelayInBytes;
+};
+
+class Filter : public BnFilter {
+    friend class FilterCallbackScheduler;
+
+  public:
     Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
            const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux);
 
@@ -104,10 +146,8 @@
     std::shared_ptr<Demux> mDemux;
     // Dvr reference once the filter is attached to any
     std::shared_ptr<Dvr> mDvr = nullptr;
-    /**
-     * Filter callbacks used on filter events or FMQ status
-     */
-    std::shared_ptr<IFilterCallback> mCallback = nullptr;
+
+    FilterCallbackScheduler mCallbackScheduler;
 
     int64_t mFilterId;
     int32_t mCid = static_cast<int32_t>(Constant::INVALID_IP_FILTER_CONTEXT_ID);