CCodec: handle delay config update

Bug: 130223947
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Test: manual test with software codecs changing delay randomly
Change-Id: Ia574522a44df22f9638bc4049d7418843a76b57b
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.cpp b/media/codec2/sfplugin/CCodecBufferChannel.cpp
index 7669421..eb20b20 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.cpp
+++ b/media/codec2/sfplugin/CCodecBufferChannel.cpp
@@ -210,22 +210,36 @@
     }
 }
 
+// Input
+
+CCodecBufferChannel::Input::Input() : extraBuffers("extra") {}
+
 // CCodecBufferChannel
 
 CCodecBufferChannel::CCodecBufferChannel(
         const std::shared_ptr<CCodecCallback> &callback)
     : mHeapSeqNum(-1),
       mCCodecCallback(callback),
-      mNumInputSlots(kSmoothnessFactor),
-      mNumOutputSlots(kSmoothnessFactor),
       mDelay(0),
       mFrameIndex(0u),
       mFirstValidFrameIndex(0u),
       mMetaMode(MODE_NONE),
       mInputMetEos(false) {
     mOutputSurface.lock()->maxDequeueBuffers = kSmoothnessFactor + kRenderingDepth;
-    Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-    buffers->reset(new DummyInputBuffers(""));
+    {
+        Mutexed<Input>::Locked input(mInput);
+        input->buffers.reset(new DummyInputBuffers(""));
+        input->extraBuffers.flush();
+        input->inputDelay = 0u;
+        input->pipelineDelay = 0u;
+        input->numSlots = kSmoothnessFactor;
+        input->numExtraSlots = 0u;
+    }
+    {
+        Mutexed<Output>::Locked output(mOutput);
+        output->outputDelay = 0u;
+        output->numSlots = kSmoothnessFactor;
+    }
 }
 
 CCodecBufferChannel::~CCodecBufferChannel() {
@@ -255,7 +269,7 @@
     return mInputSurface->signalEndOfInputStream();
 }
 
-status_t CCodecBufferChannel::queueInputBufferInternal(const sp<MediaCodecBuffer> &buffer) {
+status_t CCodecBufferChannel::queueInputBufferInternal(sp<MediaCodecBuffer> buffer) {
     int64_t timeUs;
     CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
 
@@ -287,13 +301,31 @@
 
     uint64_t queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
     std::vector<std::shared_ptr<C2Buffer>> queuedBuffers;
+    sp<Codec2Buffer> copy;
 
     if (buffer->size() > 0u) {
-        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+        Mutexed<Input>::Locked input(mInput);
         std::shared_ptr<C2Buffer> c2buffer;
-        if (!(*buffers)->releaseBuffer(buffer, &c2buffer, false)) {
+        if (!input->buffers->releaseBuffer(buffer, &c2buffer, false)) {
             return -ENOENT;
         }
+        // TODO: we want to delay copying buffers.
+        if (input->extraBuffers.numComponentBuffers() < input->numExtraSlots) {
+            copy = input->buffers->cloneAndReleaseBuffer(buffer);
+            if (copy != nullptr) {
+                (void)input->extraBuffers.assignSlot(copy);
+                if (!input->extraBuffers.releaseSlot(copy, &c2buffer, false)) {
+                    return UNKNOWN_ERROR;
+                }
+                bool released = input->buffers->releaseBuffer(buffer, nullptr, true);
+                ALOGV("[%s] queueInputBuffer: buffer copied; %sreleased",
+                      mName, released ? "" : "not ");
+                buffer.clear();
+            } else {
+                ALOGW("[%s] queueInputBuffer: failed to copy a buffer; this may cause input "
+                      "buffer starvation on component.", mName);
+            }
+        }
         work->input.buffers.push_back(c2buffer);
         queuedBuffers.push_back(c2buffer);
     } else if (eos) {
@@ -343,9 +375,15 @@
         }
     }
     if (err == C2_OK) {
-        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-        bool released = (*buffers)->releaseBuffer(buffer, nullptr, true);
-        ALOGV("[%s] queueInputBuffer: buffer %sreleased", mName, released ? "" : "not ");
+        Mutexed<Input>::Locked input(mInput);
+        bool released = false;
+        if (buffer) {
+            released = input->buffers->releaseBuffer(buffer, nullptr, true);
+        } else if (copy) {
+            released = input->extraBuffers.releaseSlot(copy, nullptr, true);
+        }
+        ALOGV("[%s] queueInputBuffer: buffer%s %sreleased",
+              mName, (buffer == nullptr) ? "(copy)" : "", released ? "" : "not ");
     }
 
     feedInputBufferIfAvailableInternal();
@@ -492,20 +530,21 @@
            mPipelineWatcher.lock()->pipelineFull()) {
         return;
     } else {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        if ((*buffers)->numClientBuffers() >= mNumOutputSlots) {
+        Mutexed<Output>::Locked output(mOutput);
+        if (output->buffers->numClientBuffers() >= output->numSlots) {
             return;
         }
     }
-    for (size_t i = 0; i < mNumInputSlots; ++i) {
+    size_t numInputSlots = mInput.lock()->numSlots;
+    for (size_t i = 0; i < numInputSlots; ++i) {
         sp<MediaCodecBuffer> inBuffer;
         size_t index;
         {
-            Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-            if ((*buffers)->numClientBuffers() >= mNumInputSlots) {
+            Mutexed<Input>::Locked input(mInput);
+            if (input->buffers->numClientBuffers() >= input->numSlots) {
                 return;
             }
-            if (!(*buffers)->requestNewBuffer(&index, &inBuffer)) {
+            if (!input->buffers->requestNewBuffer(&index, &inBuffer)) {
                 ALOGV("[%s] no new buffer available", mName);
                 break;
             }
@@ -521,9 +560,9 @@
     std::shared_ptr<C2Buffer> c2Buffer;
     bool released = false;
     {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        if (*buffers) {
-            released = (*buffers)->releaseBuffer(buffer, &c2Buffer);
+        Mutexed<Output>::Locked output(mOutput);
+        if (output->buffers) {
+            released = output->buffers->releaseBuffer(buffer, &c2Buffer);
         }
     }
     // NOTE: some apps try to releaseOutputBuffer() with timestamp and/or render
@@ -685,14 +724,14 @@
     ALOGV("[%s] discardBuffer: %p", mName, buffer.get());
     bool released = false;
     {
-        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-        if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr, true)) {
+        Mutexed<Input>::Locked input(mInput);
+        if (input->buffers && input->buffers->releaseBuffer(buffer, nullptr, true)) {
             released = true;
         }
     }
     {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
+        Mutexed<Output>::Locked output(mOutput);
+        if (output->buffers && output->buffers->releaseBuffer(buffer, nullptr)) {
             released = true;
         }
     }
@@ -707,24 +746,24 @@
 
 void CCodecBufferChannel::getInputBufferArray(Vector<sp<MediaCodecBuffer>> *array) {
     array->clear();
-    Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+    Mutexed<Input>::Locked input(mInput);
 
-    if (!(*buffers)->isArrayMode()) {
-        *buffers = (*buffers)->toArrayMode(mNumInputSlots);
+    if (!input->buffers->isArrayMode()) {
+        input->buffers = input->buffers->toArrayMode(input->numSlots);
     }
 
-    (*buffers)->getArray(array);
+    input->buffers->getArray(array);
 }
 
 void CCodecBufferChannel::getOutputBufferArray(Vector<sp<MediaCodecBuffer>> *array) {
     array->clear();
-    Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
+    Mutexed<Output>::Locked output(mOutput);
 
-    if (!(*buffers)->isArrayMode()) {
-        *buffers = (*buffers)->toArrayMode(mNumOutputSlots);
+    if (!output->buffers->isArrayMode()) {
+        output->buffers = output->buffers->toArrayMode(output->numSlots);
     }
 
-    (*buffers)->getArray(array);
+    output->buffers->getArray(array);
 }
 
 status_t CCodecBufferChannel::start(
@@ -773,8 +812,8 @@
     uint32_t pipelineDelayValue = pipelineDelay ? pipelineDelay.value : 0;
     uint32_t outputDelayValue = outputDelay ? outputDelay.value : 0;
 
-    mNumInputSlots = inputDelayValue + pipelineDelayValue + kSmoothnessFactor;
-    mNumOutputSlots = outputDelayValue + kSmoothnessFactor;
+    size_t numInputSlots = inputDelayValue + pipelineDelayValue + kSmoothnessFactor;
+    size_t numOutputSlots = outputDelayValue + kSmoothnessFactor;
     mDelay = inputDelayValue + pipelineDelayValue + outputDelayValue;
 
     // TODO: get this from input format
@@ -848,14 +887,17 @@
         }
 
         bool forceArrayMode = false;
-        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+        Mutexed<Input>::Locked input(mInput);
+        input->numSlots = numInputSlots;
+        input->extraBuffers.flush();
+        input->numExtraSlots = 0u;
         if (graphic) {
             if (mInputSurface) {
-                buffers->reset(new DummyInputBuffers(mName));
+                input->buffers.reset(new DummyInputBuffers(mName));
             } else if (mMetaMode == MODE_ANW) {
-                buffers->reset(new GraphicMetadataInputBuffers(mName));
+                input->buffers.reset(new GraphicMetadataInputBuffers(mName));
             } else {
-                buffers->reset(new GraphicInputBuffers(mNumInputSlots, mName));
+                input->buffers.reset(new GraphicInputBuffers(numInputSlots, mName));
             }
         } else {
             if (hasCryptoOrDescrambler()) {
@@ -868,7 +910,7 @@
                 if (mDealer == nullptr) {
                     mDealer = new MemoryDealer(
                             align(capacity, MemoryDealer::getAllocationAlignment())
-                                * (mNumInputSlots + 1),
+                                * (numInputSlots + 1),
                             "EncryptedLinearInputBuffers");
                     mDecryptDestination = mDealer->allocate((size_t)capacity);
                 }
@@ -877,24 +919,24 @@
                 } else {
                     mHeapSeqNum = -1;
                 }
-                buffers->reset(new EncryptedLinearInputBuffers(
+                input->buffers.reset(new EncryptedLinearInputBuffers(
                         secure, mDealer, mCrypto, mHeapSeqNum, (size_t)capacity,
-                        mNumInputSlots, mName));
+                        numInputSlots, mName));
                 forceArrayMode = true;
             } else {
-                buffers->reset(new LinearInputBuffers(mName));
+                input->buffers.reset(new LinearInputBuffers(mName));
             }
         }
-        (*buffers)->setFormat(inputFormat);
+        input->buffers->setFormat(inputFormat);
 
         if (err == C2_OK) {
-            (*buffers)->setPool(pool);
+            input->buffers->setPool(pool);
         } else {
             // TODO: error
         }
 
         if (forceArrayMode) {
-            *buffers = (*buffers)->toArrayMode(mNumInputSlots);
+            input->buffers = input->buffers->toArrayMode(numInputSlots);
         }
     }
 
@@ -903,7 +945,7 @@
         uint32_t outputGeneration;
         {
             Mutexed<OutputSurface>::Locked output(mOutputSurface);
-            output->maxDequeueBuffers = mNumOutputSlots + reorderDepth.value + kRenderingDepth;
+            output->maxDequeueBuffers = numOutputSlots + reorderDepth.value + kRenderingDepth;
             outputSurface = output->surface ?
                     output->surface->getIGraphicBufferProducer() : nullptr;
             if (outputSurface) {
@@ -1011,18 +1053,18 @@
             outputPoolId_ = pools->outputPoolId;
         }
 
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-
+        Mutexed<Output>::Locked output(mOutput);
+        output->numSlots = numOutputSlots;
         if (graphic) {
             if (outputSurface) {
-                buffers->reset(new GraphicOutputBuffers(mName));
+                output->buffers.reset(new GraphicOutputBuffers(mName));
             } else {
-                buffers->reset(new RawGraphicOutputBuffers(mNumOutputSlots, mName));
+                output->buffers.reset(new RawGraphicOutputBuffers(numOutputSlots, mName));
             }
         } else {
-            buffers->reset(new LinearOutputBuffers(mName));
+            output->buffers.reset(new LinearOutputBuffers(mName));
         }
-        (*buffers)->setFormat(outputFormat->dup());
+        output->buffers->setFormat(outputFormat->dup());
 
 
         // Try to set output surface to created block pool if given.
@@ -1038,7 +1080,7 @@
             // WORKAROUND: if we're using early CSD workaround we convert to
             //             array mode, to appease apps assuming the output
             //             buffers to be of the same size.
-            (*buffers) = (*buffers)->toArrayMode(mNumOutputSlots);
+            output->buffers = output->buffers->toArrayMode(numOutputSlots);
 
             int32_t channelCount;
             int32_t sampleRate;
@@ -1055,7 +1097,7 @@
                 if (delay || padding) {
                     // We need write access to the buffers, and we're already in
                     // array mode.
-                    (*buffers)->initSkipCutBuffer(delay, padding, sampleRate, channelCount);
+                    output->buffers->initSkipCutBuffer(delay, padding, sampleRate, channelCount);
                 }
             }
         }
@@ -1090,14 +1132,14 @@
     if (err != C2_OK) {
         return UNKNOWN_ERROR;
     }
+    size_t numInputSlots = mInput.lock()->numSlots;
     std::vector<sp<MediaCodecBuffer>> toBeQueued;
-    // TODO: use proper buffer depth instead of this random value
-    for (size_t i = 0; i < mNumInputSlots; ++i) {
+    for (size_t i = 0; i < numInputSlots; ++i) {
         size_t index;
         sp<MediaCodecBuffer> buffer;
         {
-            Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-            if (!(*buffers)->requestNewBuffer(&index, &buffer)) {
+            Mutexed<Input>::Locked input(mInput);
+            if (!input->buffers->requestNewBuffer(&index, &buffer)) {
                 if (i == 0) {
                     ALOGW("[%s] start: cannot allocate memory at all", mName);
                     return NO_MEMORY;
@@ -1182,12 +1224,13 @@
         }
     }
     {
-        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-        (*buffers)->flush();
+        Mutexed<Input>::Locked input(mInput);
+        input->buffers->flush();
+        input->extraBuffers.flush();
     }
     {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        (*buffers)->flush(flushedWork);
+        Mutexed<Output>::Locked output(mOutput);
+        output->buffers->flush(flushedWork);
     }
     mReorderStash.lock()->flush();
     mPipelineWatcher.lock()->flush();
@@ -1210,8 +1253,11 @@
             mPipelineWatcher.lock()->onInputBufferReleased(frameIndex, arrayIndex);
     bool newInputSlotAvailable;
     {
-        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
-        newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
+        Mutexed<Input>::Locked input(mInput);
+        newInputSlotAvailable = input->buffers->expireComponentBuffer(buffer);
+        if (!newInputSlotAvailable) {
+            (void)input->extraBuffers.expireComponentBuffer(buffer);
+        }
     }
     if (newInputSlotAvailable) {
         feedInputBufferIfAvailable();
@@ -1269,6 +1315,7 @@
         }
     }
 
+    std::optional<uint32_t> newInputDelay, newPipelineDelay;
     while (!worklet->output.configUpdate.empty()) {
         std::unique_ptr<C2Param> param;
         worklet->output.configUpdate.back().swap(param);
@@ -1280,8 +1327,10 @@
                     mReorderStash.lock()->setDepth(reorderDepth.value);
                     ALOGV("[%s] onWorkDone: updated reorder depth to %u",
                           mName, reorderDepth.value);
+                    size_t numOutputSlots = mOutput.lock()->numSlots;
                     Mutexed<OutputSurface>::Locked output(mOutputSurface);
-                    output->maxDequeueBuffers = mNumOutputSlots + reorderDepth.value + kRenderingDepth;
+                    output->maxDequeueBuffers =
+                        numOutputSlots + reorderDepth.value + kRenderingDepth;
                     if (output->surface) {
                         output->surface->setMaxDequeuedBufferCount(output->maxDequeueBuffers);
                     }
@@ -1301,18 +1350,86 @@
                 }
                 break;
             }
+            case C2PortActualDelayTuning::CORE_INDEX: {
+                if (param->isGlobal()) {
+                    C2ActualPipelineDelayTuning pipelineDelay;
+                    if (pipelineDelay.updateFrom(*param)) {
+                        ALOGV("[%s] onWorkDone: updating pipeline delay %u",
+                              mName, pipelineDelay.value);
+                        newPipelineDelay = pipelineDelay.value;
+                        (void)mPipelineWatcher.lock()->pipelineDelay(pipelineDelay.value);
+                    }
+                }
+                if (param->forInput()) {
+                    C2PortActualDelayTuning::input inputDelay;
+                    if (inputDelay.updateFrom(*param)) {
+                        ALOGV("[%s] onWorkDone: updating input delay %u",
+                              mName, inputDelay.value);
+                        newInputDelay = inputDelay.value;
+                        (void)mPipelineWatcher.lock()->inputDelay(inputDelay.value);
+                    }
+                }
+                if (param->forOutput()) {
+                    C2PortActualDelayTuning::output outputDelay;
+                    if (outputDelay.updateFrom(*param)) {
+                        ALOGV("[%s] onWorkDone: updating output delay %u",
+                              mName, outputDelay.value);
+                        (void)mPipelineWatcher.lock()->outputDelay(outputDelay.value);
+
+                        bool outputBuffersChanged = false;
+                        Mutexed<Output>::Locked output(mOutput);
+                        output->outputDelay = outputDelay.value;
+                        size_t numOutputSlots = outputDelay.value + kSmoothnessFactor;
+                        if (output->numSlots < numOutputSlots) {
+                            output->numSlots = numOutputSlots;
+                            if (output->buffers->isArrayMode()) {
+                                OutputBuffersArray *array =
+                                    (OutputBuffersArray *)output->buffers.get();
+                                ALOGV("[%s] onWorkDone: growing output buffer array to %zu",
+                                      mName, numOutputSlots);
+                                array->grow(numOutputSlots);
+                                outputBuffersChanged = true;
+                            }
+                        }
+                        output.unlock();
+
+                        if (outputBuffersChanged) {
+                            mCCodecCallback->onOutputBuffersChanged();
+                        }
+                    }
+                }
+                break;
+            }
             default:
                 ALOGV("[%s] onWorkDone: unrecognized config update (%08X)",
                       mName, param->index());
                 break;
         }
     }
+    if (newInputDelay || newPipelineDelay) {
+        Mutexed<Input>::Locked input(mInput);
+        size_t newNumSlots =
+            newInputDelay.value_or(input->inputDelay) +
+            newPipelineDelay.value_or(input->pipelineDelay) +
+            kSmoothnessFactor;
+        if (input->buffers->isArrayMode()) {
+            if (input->numSlots >= newNumSlots) {
+                input->numExtraSlots = 0;
+            } else {
+                input->numExtraSlots = newNumSlots - input->numSlots;
+            }
+            ALOGV("[%s] onWorkDone: updated number of extra slots to %zu (input array mode)",
+                  mName, input->numExtraSlots);
+        } else {
+            input->numSlots = newNumSlots;
+        }
+    }
 
     if (outputFormat != nullptr) {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
+        Mutexed<Output>::Locked output(mOutput);
         ALOGD("[%s] onWorkDone: output format changed to %s",
                 mName, outputFormat->debugString().c_str());
-        (*buffers)->setFormat(outputFormat);
+        output->buffers->setFormat(outputFormat);
 
         AString mediaType;
         if (outputFormat->findString(KEY_MIME, &mediaType)
@@ -1321,7 +1438,7 @@
             int32_t sampleRate;
             if (outputFormat->findInt32(KEY_CHANNEL_COUNT, &channelCount)
                     && outputFormat->findInt32(KEY_SAMPLE_RATE, &sampleRate)) {
-                (*buffers)->updateSkipCutBuffer(sampleRate, channelCount);
+                output->buffers->updateSkipCutBuffer(sampleRate, channelCount);
             }
         }
     }
@@ -1356,20 +1473,18 @@
           timestamp.peekll());
 
     if (initData != nullptr) {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        if ((*buffers)->registerCsd(initData, &index, &outBuffer) == OK) {
+        Mutexed<Output>::Locked output(mOutput);
+        if (output->buffers->registerCsd(initData, &index, &outBuffer) == OK) {
             outBuffer->meta()->setInt64("timeUs", timestamp.peek());
             outBuffer->meta()->setInt32("flags", MediaCodec::BUFFER_FLAG_CODECCONFIG);
             ALOGV("[%s] onWorkDone: csd index = %zu [%p]", mName, index, outBuffer.get());
 
-            buffers.unlock();
+            output.unlock();
             mCallback->onOutputBufferAvailable(index, outBuffer);
-            buffers.lock();
         } else {
             ALOGD("[%s] onWorkDone: unable to register csd", mName);
-            buffers.unlock();
+            output.unlock();
             mCCodecCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
-            buffers.lock();
             return false;
         }
     }
@@ -1421,22 +1536,22 @@
             break;
         }
 
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        status_t err = (*buffers)->registerBuffer(entry.buffer, &index, &outBuffer);
+        Mutexed<Output>::Locked output(mOutput);
+        status_t err = output->buffers->registerBuffer(entry.buffer, &index, &outBuffer);
         if (err != OK) {
             bool outputBuffersChanged = false;
             if (err != WOULD_BLOCK) {
-                if (!(*buffers)->isArrayMode()) {
-                    *buffers = (*buffers)->toArrayMode(mNumOutputSlots);
+                if (!output->buffers->isArrayMode()) {
+                    output->buffers = output->buffers->toArrayMode(output->numSlots);
                 }
-                OutputBuffersArray *array = (OutputBuffersArray *)buffers->get();
+                OutputBuffersArray *array = (OutputBuffersArray *)output->buffers.get();
                 array->realloc(entry.buffer);
                 outputBuffersChanged = true;
             }
             ALOGV("[%s] sendOutputBuffers: unable to register output buffer", mName);
             reorder->defer(entry);
 
-            buffers.unlock();
+            output.unlock();
             reorder.unlock();
 
             if (outputBuffersChanged) {
@@ -1444,7 +1559,7 @@
             }
             return;
         }
-        buffers.unlock();
+        output.unlock();
         reorder.unlock();
 
         outBuffer->meta()->setInt64("timeUs", entry.timestamp);