audiohal: Re-implement effect process using FMQ and IMemory

Result: no hwbinder calls due music processing.

Added IEffect.close method for explicitly freeing up of resources
consumed by the effect before automatic server objects reaping
gets to it.

Added IEffect.setProcessBuffers method for updating the input /
output buffers on the go.

Test: make, use Play Music with effects, check traces
Bug: 30222631
Change-Id: Ia1e1bc7098fab59aa970e0ce4acdb48007409644
diff --git a/audio/effect/2.0/default/Effect.cpp b/audio/effect/2.0/default/Effect.cpp
index 1a7ea9c..9ca5834 100644
--- a/audio/effect/2.0/default/Effect.cpp
+++ b/audio/effect/2.0/default/Effect.cpp
@@ -17,8 +17,8 @@
 #include <memory.h>
 
 #define LOG_TAG "EffectHAL"
-#include <media/EffectsFactoryApi.h>
 #include <android/log.h>
+#include <media/EffectsFactoryApi.h>
 
 #include "Conversions.h"
 #include "Effect.h"
@@ -33,20 +33,108 @@
 
 using ::android::hardware::audio::common::V2_0::AudioChannelMask;
 using ::android::hardware::audio::common::V2_0::AudioFormat;
+using ::android::hardware::audio::effect::V2_0::MessageQueueFlagBits;
+
+namespace {
+
+class ProcessThread : public Thread {
+  public:
+    // ProcessThread's lifespan never exceeds Effect's lifespan.
+    ProcessThread(std::atomic<bool>* stop,
+            effect_handle_t effect,
+            std::atomic<audio_buffer_t*>* inBuffer,
+            std::atomic<audio_buffer_t*>* outBuffer,
+            Effect::StatusMQ* statusMQ,
+            EventFlag* efGroup)
+            : Thread(false /*canCallJava*/),
+              mStop(stop),
+              mEffect(effect),
+              mHasProcessReverse((*mEffect)->process_reverse != NULL),
+              mInBuffer(inBuffer),
+              mOutBuffer(outBuffer),
+              mStatusMQ(statusMQ),
+              mEfGroup(efGroup) {
+    }
+    virtual ~ProcessThread() {}
+
+  private:
+    std::atomic<bool>* mStop;
+    effect_handle_t mEffect;
+    bool mHasProcessReverse;
+    std::atomic<audio_buffer_t*>* mInBuffer;
+    std::atomic<audio_buffer_t*>* mOutBuffer;
+    Effect::StatusMQ* mStatusMQ;
+    EventFlag* mEfGroup;
+
+    bool threadLoop() override;
+};
+
+bool ProcessThread::threadLoop() {
+    // This implementation doesn't return control back to the Thread until it decides to stop,
+    // as the Thread uses mutexes, and this can lead to priority inversion.
+    while(!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
+        uint32_t efState = 0;
+        mEfGroup->wait(
+                static_cast<uint32_t>(MessageQueueFlagBits::REQUEST_PROCESS_ALL),
+                &efState,
+                NS_PER_SEC);
+        if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::REQUEST_PROCESS_ALL))) {
+            continue;  // Nothing to do.
+        }
+        Result retval = Result::OK;
+        if (efState & static_cast<uint32_t>(MessageQueueFlagBits::REQUEST_PROCESS_REVERSE)
+                && !mHasProcessReverse) {
+            retval = Result::NOT_SUPPORTED;
+        }
+
+        if (retval == Result::OK) {
+            // affects both buffer pointers and their contents.
+            std::atomic_thread_fence(std::memory_order_acquire);
+            int32_t processResult;
+            audio_buffer_t* inBuffer =
+                    std::atomic_load_explicit(mInBuffer, std::memory_order_relaxed);
+            audio_buffer_t* outBuffer =
+                    std::atomic_load_explicit(mOutBuffer, std::memory_order_relaxed);
+            if (inBuffer != nullptr && outBuffer != nullptr) {
+                if (efState & static_cast<uint32_t>(MessageQueueFlagBits::REQUEST_PROCESS)) {
+                    processResult = (*mEffect)->process(mEffect, inBuffer, outBuffer);
+                } else {
+                    processResult = (*mEffect)->process_reverse(mEffect, inBuffer, outBuffer);
+                }
+                std::atomic_thread_fence(std::memory_order_release);
+            } else {
+                ALOGE("processing buffers were not set before calling 'process'");
+                processResult = -ENODEV;
+            }
+            switch(processResult) {
+                case 0: retval = Result::OK; break;
+                case -ENODATA: retval = Result::INVALID_STATE; break;
+                case -EINVAL: retval = Result::INVALID_ARGUMENTS; break;
+                default: retval = Result::NOT_INITIALIZED;
+            }
+        }
+        if (!mStatusMQ->write(&retval)) {
+            ALOGW("status message queue write failed");
+        }
+        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::DONE_PROCESSING));
+    }
+
+    return false;
+}
+
+}  // namespace
 
 // static
 const char *Effect::sContextResultOfCommand = "returned status";
 const char *Effect::sContextCallToCommand = "error";
 const char *Effect::sContextCallFunction = sContextCallToCommand;
 
-Effect::Effect(effect_handle_t handle) : mHandle(handle) {
+Effect::Effect(effect_handle_t handle)
+        : mIsClosed(false), mHandle(handle), mEfGroup(nullptr), mStopProcessThread(false) {
 }
 
 Effect::~Effect() {
-    int status = EffectRelease(mHandle);
-    ALOGW_IF(status, "Error releasing effect %p: %s", mHandle, strerror(-status));
-    EffectMap::getInstance().remove(mHandle);
-    mHandle = 0;
+    close();
 }
 
 // static
@@ -83,9 +171,6 @@
 // static
 void Effect::effectBufferConfigFromHal(
         const buffer_config_t& halConfig, EffectBufferConfig* config) {
-    // TODO(mnaganov): Use FMQ instead of AudioBuffer.
-    (void)halConfig.buffer.frameCount;
-    (void)halConfig.buffer.raw;
     config->samplingRateHz = halConfig.samplingRate;
     config->channels = AudioChannelMask(halConfig.channels);
     config->format = AudioFormat(halConfig.format);
@@ -95,12 +180,13 @@
 
 // static
 void Effect::effectBufferConfigToHal(const EffectBufferConfig& config, buffer_config_t* halConfig) {
-    // TODO(mnaganov): Use FMQ instead of AudioBuffer.
+    // Note: setting the buffers directly is considered obsolete. They need to be set
+    // using 'setProcessBuffers'.
     halConfig->buffer.frameCount = 0;
     halConfig->buffer.raw = NULL;
     halConfig->samplingRate = config.samplingRateHz;
     halConfig->channels = static_cast<uint32_t>(config.channels);
-    // TODO(mnaganov): As the calling code does not use BP for now, implement later.
+    // TODO(mnaganov): The framework code currently does not use BP, implement later.
     halConfig->bufferProvider.cookie = NULL;
     halConfig->bufferProvider.getBuffer = NULL;
     halConfig->bufferProvider.releaseBuffer = NULL;
@@ -250,31 +336,66 @@
             });
 }
 
-void Effect::processImpl(
-        ProcessFunction process,
-        const char* funcName,
-        const AudioBuffer& inBuffer,
-        uint32_t outFrameSize,
-        ProcessCallback cb) {
-    audio_buffer_t halInBuffer;
-    halInBuffer.frameCount = inBuffer.frameCount;
-    halInBuffer.u8 = const_cast<uint8_t*>(&inBuffer.data[0]);
-    audio_buffer_t halOutBuffer;
-    halOutBuffer.frameCount = halInBuffer.frameCount;
-    // TODO(mnaganov): Consider stashing the buffer to avoid reallocating it every time.
-    std::unique_ptr<uint8_t[]> halOutBufferData(
-            new uint8_t[halOutBuffer.frameCount * outFrameSize]);
-    halOutBuffer.u8 = &halOutBufferData[0];
-    status_t status = process(mHandle, &halInBuffer, &halOutBuffer);
-    Result retval = analyzeStatus(funcName, "", sContextCallFunction, status);
-    AudioBuffer outBuffer;
-    if (status == OK) {
-        outBuffer.frameCount = halOutBuffer.frameCount;
-        outBuffer.data.setToExternal(halOutBuffer.u8, halOutBuffer.frameCount * outFrameSize);
-    } else {
-        outBuffer.frameCount = 0;
+Return<void> Effect::prepareForProcessing(prepareForProcessing_cb _hidl_cb) {
+    status_t status;
+    // Create message queue.
+    if (mStatusMQ) {
+        ALOGE("the client attempts to call prepareForProcessing_cb twice");
+        _hidl_cb(Result::INVALID_STATE, StatusMQ::Descriptor());
+        return Void();
     }
-    cb(retval, outBuffer);
+    std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1, true /*EventFlag*/));
+    if (!tempStatusMQ->isValid()) {
+        ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
+        _hidl_cb(Result::INVALID_ARGUMENTS, StatusMQ::Descriptor());
+        return Void();
+    }
+    status = EventFlag::createEventFlag(tempStatusMQ->getEventFlagWord(), &mEfGroup);
+    if (status != OK || !mEfGroup) {
+        ALOGE("failed creating event flag for status MQ: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS, StatusMQ::Descriptor());
+        return Void();
+    }
+
+    // Create and launch the thread.
+    mProcessThread = new ProcessThread(
+            &mStopProcessThread,
+            mHandle,
+            &mHalInBufferPtr,
+            &mHalOutBufferPtr,
+            tempStatusMQ.get(),
+            mEfGroup);
+    status = mProcessThread->run("effect", PRIORITY_URGENT_AUDIO);
+    if (status != OK) {
+        ALOGW("failed to start effect processing thread: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS, MQDescriptorSync<Result>());
+        return Void();
+    }
+
+    mStatusMQ = std::move(tempStatusMQ);
+    _hidl_cb(Result::OK, *mStatusMQ->getDesc());
+    return Void();
+}
+
+Return<Result> Effect::setProcessBuffers(
+        const AudioBuffer& inBuffer, const AudioBuffer& outBuffer) {
+    AudioBufferManager& manager = AudioBufferManager::getInstance();
+    sp<AudioBufferWrapper> tempInBuffer, tempOutBuffer;
+    if (!manager.wrap(inBuffer, &tempInBuffer)) {
+        ALOGE("Could not map memory of the input buffer");
+        return Result::INVALID_ARGUMENTS;
+    }
+    if (!manager.wrap(outBuffer, &tempOutBuffer)) {
+        ALOGE("Could not map memory of the output buffer");
+        return Result::INVALID_ARGUMENTS;
+    }
+    mInBuffer = tempInBuffer;
+    mOutBuffer = tempOutBuffer;
+    // The processing thread only reads these pointers after waking up by an event flag,
+    // so it's OK to update the pair non-atomically.
+    mHalInBufferPtr.store(mInBuffer->getHalBuffer(), std::memory_order_release);
+    mHalOutBufferPtr.store(mOutBuffer->getHalBuffer(), std::memory_order_release);
+    return Result::OK;
 }
 
 Result Effect::sendCommand(int commandCode, const char* commandName) {
@@ -510,23 +631,6 @@
     return Void();
 }
 
-Return<void> Effect::process(
-        const AudioBuffer& inBuffer, uint32_t outFrameSize, process_cb _hidl_cb)  {
-    processImpl((*mHandle)->process, "process", inBuffer, outFrameSize, _hidl_cb);
-    return Void();
-}
-
-Return<void> Effect::processReverse(
-        const AudioBuffer& inBuffer, uint32_t outFrameSize, processReverse_cb _hidl_cb)  {
-    if ((*mHandle)->process_reverse != NULL) {
-        processImpl(
-                (*mHandle)->process_reverse, "process_reverse", inBuffer, outFrameSize, _hidl_cb);
-    } else {
-        _hidl_cb(Result::NOT_SUPPORTED, AudioBuffer());
-    }
-    return Void();
-}
-
 Return<void> Effect::command(
         uint32_t commandId,
         const hidl_vec<uint8_t>& data,
@@ -611,6 +715,27 @@
             EFFECT_CMD_SET_FEATURE_CONFIG, "SET_FEATURE_CONFIG", sizeof(halCmd), halCmd);
 }
 
+Return<Result> Effect::close() {
+    if (mIsClosed) return Result::INVALID_STATE;
+    mIsClosed = true;
+    if (mProcessThread.get()) {
+        mStopProcessThread.store(true, std::memory_order_release);
+        status_t status = mProcessThread->requestExitAndWait();
+        ALOGE_IF(status, "processing thread exit error: %s", strerror(-status));
+    }
+    if (mEfGroup) {
+        status_t status = EventFlag::deleteEventFlag(&mEfGroup);
+        ALOGE_IF(status, "processing MQ event flag deletion error: %s", strerror(-status));
+    }
+    mInBuffer.clear();
+    mOutBuffer.clear();
+    int status = EffectRelease(mHandle);
+    ALOGW_IF(status, "Error releasing effect %p: %s", mHandle, strerror(-status));
+    EffectMap::getInstance().remove(mHandle);
+    mHandle = 0;
+    return Result::OK;
+}
+
 } // namespace implementation
 }  // namespace V2_0
 }  // namespace effect