CCodec: refactor pipeline logic

Bug: 123632127
Test: bug repro steps
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:include-annotation:android.media.cts.MediaHeavyPresubmitTests
Change-Id: I289f51709dbd675991cd8949cd343c5bf5c6ef5c
diff --git a/media/codec2/sfplugin/CCodec.cpp b/media/codec2/sfplugin/CCodec.cpp
index 10263de..ed1f85b 100644
--- a/media/codec2/sfplugin/CCodec.cpp
+++ b/media/codec2/sfplugin/CCodec.cpp
@@ -448,14 +448,13 @@
 
     virtual void onWorkDone(
             const std::weak_ptr<Codec2Client::Component>& component,
-            std::list<std::unique_ptr<C2Work>>& workItems,
-            size_t numDiscardedInputBuffers) override {
+            std::list<std::unique_ptr<C2Work>>& workItems) override {
         (void)component;
         sp<CCodec> codec(mCodec.promote());
         if (!codec) {
             return;
         }
-        codec->onWorkDone(workItems, numDiscardedInputBuffers);
+        codec->onWorkDone(workItems);
     }
 
     virtual void onTripped(
@@ -504,10 +503,10 @@
     }
 
     virtual void onInputBufferDone(
-            const std::shared_ptr<C2Buffer>& buffer) override {
+            uint64_t frameIndex, size_t arrayIndex) override {
         sp<CCodec> codec(mCodec.promote());
         if (codec) {
-            codec->onInputBufferDone(buffer);
+            codec->onInputBufferDone(frameIndex, arrayIndex);
         }
     }
 
@@ -531,10 +530,6 @@
                 {RenderedFrameInfo(mediaTimeUs, renderTimeNs)});
     }
 
-    void onWorkQueued(bool eos) override {
-        mCodec->onWorkQueued(eos);
-    }
-
     void onOutputBuffersChanged() override {
         mCodec->mCallback->onOutputBuffersChanged();
     }
@@ -546,8 +541,7 @@
 // CCodec
 
 CCodec::CCodec()
-    : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))),
-      mQueuedWorkCount(0) {
+    : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))) {
 }
 
 CCodec::~CCodec() {
@@ -1343,7 +1337,6 @@
     }
 
     mChannel->flush(flushedWork);
-    subQueuedWorkCount(flushedWork.size());
 
     {
         Mutexed<State>::Locked state(mState);
@@ -1465,28 +1458,16 @@
     config->setParameters(comp, params, C2_MAY_BLOCK);
 }
 
-void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
-                        size_t numDiscardedInputBuffers) {
+void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems) {
     if (!workItems.empty()) {
-        {
-            Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
-                    mNumDiscardedInputBuffersQueue);
-            numDiscardedInputBuffersQueue->insert(
-                    numDiscardedInputBuffersQueue->end(),
-                    workItems.size() - 1, 0);
-            numDiscardedInputBuffersQueue->emplace_back(
-                    numDiscardedInputBuffers);
-        }
-        {
-            Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
-            queue->splice(queue->end(), workItems);
-        }
+        Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
+        queue->splice(queue->end(), workItems);
     }
     (new AMessage(kWhatWorkDone, this))->post();
 }
 
-void CCodec::onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer) {
-    mChannel->onInputBufferDone(buffer);
+void CCodec::onInputBufferDone(uint64_t frameIndex, size_t arrayIndex) {
+    mChannel->onInputBufferDone(frameIndex, arrayIndex);
 }
 
 void CCodec::onMessageReceived(const sp<AMessage> &msg) {
@@ -1512,7 +1493,6 @@
         case kWhatStart: {
             // C2Component::start() should return within 500ms.
             setDeadline(now, 550ms, "start");
-            mQueuedWorkCount = 0;
             start();
             break;
         }
@@ -1520,10 +1500,6 @@
             // C2Component::stop() should return within 500ms.
             setDeadline(now, 550ms, "stop");
             stop();
-
-            mQueuedWorkCount = 0;
-            Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-            deadline->set(TimePoint::max(), "none");
             break;
         }
         case kWhatFlush: {
@@ -1549,7 +1525,6 @@
         }
         case kWhatWorkDone: {
             std::unique_ptr<C2Work> work;
-            size_t numDiscardedInputBuffers;
             bool shouldPost = false;
             {
                 Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
@@ -1560,24 +1535,10 @@
                 queue->pop_front();
                 shouldPost = !queue->empty();
             }
-            {
-                Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
-                        mNumDiscardedInputBuffersQueue);
-                if (numDiscardedInputBuffersQueue->empty()) {
-                    numDiscardedInputBuffers = 0;
-                } else {
-                    numDiscardedInputBuffers = numDiscardedInputBuffersQueue->front();
-                    numDiscardedInputBuffersQueue->pop_front();
-                }
-            }
             if (shouldPost) {
                 (new AMessage(kWhatWorkDone, this))->post();
             }
 
-            if (work->worklets.empty()
-                    || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
-                subQueuedWorkCount(1);
-            }
             // handle configuration changes in work done
             Mutexed<Config>::Locked config(mConfig);
             bool changed = false;
@@ -1641,8 +1602,7 @@
             }
             mChannel->onWorkDone(
                     std::move(work), changed ? config->mOutputFormat : nullptr,
-                    initData.hasChanged() ? initData.update().get() : nullptr,
-                    numDiscardedInputBuffers);
+                    initData.hasChanged() ? initData.update().get() : nullptr);
             break;
         }
         case kWhatWatch: {
@@ -1669,17 +1629,26 @@
 void CCodec::initiateReleaseIfStuck() {
     std::string name;
     bool pendingDeadline = false;
-    for (Mutexed<NamedTimePoint> *deadlinePtr : { &mDeadline, &mQueueDeadline, &mEosDeadline }) {
-        Mutexed<NamedTimePoint>::Locked deadline(*deadlinePtr);
+    {
+        Mutexed<NamedTimePoint>::Locked deadline(mDeadline);
         if (deadline->get() < std::chrono::steady_clock::now()) {
             name = deadline->getName();
-            break;
         }
         if (deadline->get() != TimePoint::max()) {
             pendingDeadline = true;
         }
     }
     if (name.empty()) {
+        constexpr std::chrono::steady_clock::duration kWorkDurationThreshold = 3s;
+        std::chrono::steady_clock::duration elapsed = mChannel->elapsed();
+        if (elapsed >= kWorkDurationThreshold) {
+            name = "queue";
+        }
+        if (elapsed > 0s) {
+            pendingDeadline = true;
+        }
+    }
+    if (name.empty()) {
         // We're not stuck.
         if (pendingDeadline) {
             // If we are not stuck yet but still has deadline coming up,
@@ -1694,33 +1663,6 @@
     mCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
 }
 
-void CCodec::onWorkQueued(bool eos) {
-    ALOGV("queued work count +1 from %d", mQueuedWorkCount.load());
-    int32_t count = ++mQueuedWorkCount;
-    if (eos) {
-        CCodecWatchdog::getInstance()->watch(this);
-        Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
-        deadline->set(std::chrono::steady_clock::now() + 3s, "eos");
-    }
-    // TODO: query and use input/pipeline/output delay combined
-    if (count >= 4) {
-        CCodecWatchdog::getInstance()->watch(this);
-        Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-        deadline->set(std::chrono::steady_clock::now() + 3s, "queue");
-    }
-}
-
-void CCodec::subQueuedWorkCount(uint32_t count) {
-    ALOGV("queued work count -%u from %d", count, mQueuedWorkCount.load());
-    int32_t currentCount = (mQueuedWorkCount -= count);
-    if (currentCount == 0) {
-        Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
-        deadline->set(TimePoint::max(), "none");
-    }
-    Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-    deadline->set(TimePoint::max(), "none");
-}
-
 }  // namespace android
 
 extern "C" android::CodecBase *CreateCodec() {