CCodecBufferChannel: throttle by # of frames in pipeline

Bug: 310807188
Test: atest CtsMediaV2TestCases:android.mediav2.cts.CodecEncoderSurfaceTest
Test: manual
Change-Id: Id81f03fad892a743262bb531f904a743671041b5
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.cpp b/media/codec2/sfplugin/CCodecBufferChannel.cpp
index d829523..5d9ab1c 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.cpp
+++ b/media/codec2/sfplugin/CCodecBufferChannel.cpp
@@ -228,17 +228,23 @@
 status_t CCodecBufferChannel::setInputSurface(
         const std::shared_ptr<InputSurfaceWrapper> &surface) {
     ALOGV("[%s] setInputSurface", mName);
-    Mutexed<std::shared_ptr<InputSurfaceWrapper>>::Locked inputSurface(mInputSurface);
-    *inputSurface = surface;
-    return (*inputSurface)->connect(mComponent);
+    if (!surface) {
+        ALOGE("[%s] setInputSurface: surface must not be null", mName);
+        return BAD_VALUE;
+    }
+    Mutexed<InputSurface>::Locked inputSurface(mInputSurface);
+    inputSurface->numProcessingBuffersBalance = 0;
+    inputSurface->surface = surface;
+    mHasInputSurface = true;
+    return inputSurface->surface->connect(mComponent);
 }
 
 status_t CCodecBufferChannel::signalEndOfInputStream() {
-    Mutexed<std::shared_ptr<InputSurfaceWrapper>>::Locked inputSurface(mInputSurface);
-    if ((*inputSurface) == nullptr) {
+    Mutexed<InputSurface>::Locked inputSurface(mInputSurface);
+    if (inputSurface->surface == nullptr) {
         return INVALID_OPERATION;
     }
-    return (*inputSurface)->signalEndOfInputStream();
+    return inputSurface->surface->signalEndOfInputStream();
 }
 
 status_t CCodecBufferChannel::queueInputBufferInternal(
@@ -1063,19 +1069,36 @@
     if (mInputMetEos) {
         return;
     }
-    {
+    int64_t numOutputSlots = 0;
+    bool outputFull = [this, &numOutputSlots]() {
         Mutexed<Output>::Locked output(mOutput);
-        if (!output->buffers ||
-                output->buffers->hasPending() ||
+        if (!output->buffers) {
+            ALOGV("[%s] feedInputBufferIfAvailableInternal: "
+                  "return because output buffers are null", mName);
+            return true;
+        }
+        numOutputSlots = int64_t(output->numSlots);
+        if (output->buffers->hasPending() ||
                 (!output->bounded && output->buffers->numActiveSlots() >= output->numSlots)) {
-            return;
+            ALOGV("[%s] feedInputBufferIfAvailableInternal: "
+                  "return because there are no room for more output buffers", mName);
+            return true;
+        }
+        return false;
+    }();
+    if (android::media::codec::provider_->input_surface_throttle()) {
+        Mutexed<InputSurface>::Locked inputSurface(mInputSurface);
+        if (inputSurface->surface) {
+            if (inputSurface->numProcessingBuffersBalance <= numOutputSlots) {
+                ++inputSurface->numProcessingBuffersBalance;
+                ALOGV("[%s] feedInputBufferIfAvailableInternal: numProcessingBuffersBalance = %lld",
+                      mName, static_cast<long long>(inputSurface->numProcessingBuffersBalance));
+                inputSurface->surface->onInputBufferEmptied();
+            }
         }
     }
-    if (android::media::codec::provider_->input_surface_throttle()) {
-        Mutexed<std::shared_ptr<InputSurfaceWrapper>>::Locked inputSurface(mInputSurface);
-        if ((*inputSurface) != nullptr) {
-            (*inputSurface)->onInputBufferEmptied();
-        }
+    if (outputFull) {
+        return;
     }
     size_t numActiveSlots = 0;
     while (!mPipelineWatcher.lock()->pipelineFull()) {
@@ -1704,7 +1727,7 @@
                 && (hasCryptoOrDescrambler() || conforming)) {
             input->buffers.reset(new SlotInputBuffers(mName));
         } else if (graphic) {
-            if (mInputSurface.lock()->get()) {
+            if (mHasInputSurface) {
                 input->buffers.reset(new DummyInputBuffers(mName));
             } else if (mMetaMode == MODE_ANW) {
                 input->buffers.reset(new GraphicMetadataInputBuffers(mName));
@@ -1987,7 +2010,7 @@
 
 status_t CCodecBufferChannel::prepareInitialInputBuffers(
         std::map<size_t, sp<MediaCodecBuffer>> *clientInputBuffers, bool retry) {
-    if (mInputSurface.lock()->get()) {
+    if (mHasInputSurface) {
         return OK;
     }
 
@@ -2113,9 +2136,13 @@
 
 void CCodecBufferChannel::reset() {
     stop();
-    mInputSurface.lock()->reset();
     mPipelineWatcher.lock()->flush();
     {
+        mHasInputSurface = false;
+        Mutexed<InputSurface>::Locked inputSurface(mInputSurface);
+        inputSurface->surface.reset();
+    }
+    {
         Mutexed<Input>::Locked input(mInput);
         input->buffers.reset(new DummyInputBuffers(""));
         input->extraBuffers.flush();
@@ -2208,9 +2235,6 @@
 
 void CCodecBufferChannel::onInputBufferDone(
         uint64_t frameIndex, size_t arrayIndex) {
-    if (mInputSurface.lock()->get()) {
-        return;
-    }
     std::shared_ptr<C2Buffer> buffer =
             mPipelineWatcher.lock()->onInputBufferReleased(frameIndex, arrayIndex);
     bool newInputSlotAvailable = false;
@@ -2265,8 +2289,7 @@
         notifyClient = false;
     }
 
-    bool hasInputSurface = (mInputSurface.lock()->get() != nullptr);
-    if (!hasInputSurface && (work->worklets.size() != 1u
+    if (!mHasInputSurface && (work->worklets.size() != 1u
             || !work->worklets.front()
             || !(work->worklets.front()->output.flags &
                  C2FrameData::FLAG_INCOMPLETE))) {
@@ -2475,7 +2498,7 @@
     c2_cntr64_t timestamp =
         worklet->output.ordinal.timestamp + work->input.ordinal.customOrdinal
                 - work->input.ordinal.timestamp;
-    if (hasInputSurface) {
+    if (mHasInputSurface) {
         // When using input surface we need to restore the original input timestamp.
         timestamp = work->input.ordinal.customOrdinal;
     }
@@ -2633,6 +2656,12 @@
                     outBuffer->meta()->setObject("accessUnitInfo", obj);
                 }
             }
+            if (mHasInputSurface && android::media::codec::provider_->input_surface_throttle()) {
+                Mutexed<InputSurface>::Locked inputSurface(mInputSurface);
+                --inputSurface->numProcessingBuffersBalance;
+                ALOGV("[%s] onOutputBufferAvailable: numProcessingBuffersBalance = %lld",
+                      mName, static_cast<long long>(inputSurface->numProcessingBuffersBalance));
+            }
             mCallback->onOutputBufferAvailable(index, outBuffer);
             break;
         }
@@ -2802,7 +2831,7 @@
 }
 
 void CCodecBufferChannel::setInfoBuffer(const std::shared_ptr<C2InfoBuffer> &buffer) {
-    if (mInputSurface.lock()->get() == nullptr) {
+    if (!mHasInputSurface) {
         mInfoBuffers.push_back(buffer);
     } else {
         std::list<std::unique_ptr<C2Work>> items;