audiohal: Re-implement stream read and write using FMQ

Result: no hwbinder calls due read / write session.

Added IStream.close method for explicitly freeing up of resources
consumed by the stream before automatic server objects reaping
gets to it.

Test: make, perform Loopback RTT, check traces
Bug: 30222631
Change-Id: I678559f6ef30026685df787cd2ba7c2ee449ed27
diff --git a/audio/2.0/default/StreamOut.cpp b/audio/2.0/default/StreamOut.cpp
index 3d20d11..4bb2274 100644
--- a/audio/2.0/default/StreamOut.cpp
+++ b/audio/2.0/default/StreamOut.cpp
@@ -17,8 +17,9 @@
 #define LOG_TAG "StreamOutHAL"
 //#define LOG_NDEBUG 0
 
-#include <hardware/audio.h>
 #include <android/log.h>
+#include <hardware/audio.h>
+#include <mediautils/SchedulingPolicyService.h>
 
 #include "StreamOut.h"
 
@@ -28,15 +29,103 @@
 namespace V2_0 {
 namespace implementation {
 
+namespace {
+
+class WriteThread : public Thread {
+  public:
+    // WriteThread's lifespan never exceeds StreamOut's lifespan.
+    WriteThread(std::atomic<bool>* stop,
+            audio_stream_out_t* stream,
+            StreamOut::DataMQ* dataMQ,
+            StreamOut::StatusMQ* statusMQ,
+            EventFlag* efGroup,
+            ThreadPriority threadPriority)
+            : Thread(false /*canCallJava*/),
+              mStop(stop),
+              mStream(stream),
+              mDataMQ(dataMQ),
+              mStatusMQ(statusMQ),
+              mEfGroup(efGroup),
+              mThreadPriority(threadPriority),
+              mBuffer(new uint8_t[dataMQ->getQuantumCount()]) {
+    }
+    virtual ~WriteThread() {}
+
+    status_t readyToRun() override;
+
+  private:
+    std::atomic<bool>* mStop;
+    audio_stream_out_t* mStream;
+    StreamOut::DataMQ* mDataMQ;
+    StreamOut::StatusMQ* mStatusMQ;
+    EventFlag* mEfGroup;
+    ThreadPriority mThreadPriority;
+    std::unique_ptr<uint8_t[]> mBuffer;
+
+    bool threadLoop() override;
+};
+
+status_t WriteThread::readyToRun() {
+    if (mThreadPriority != ThreadPriority::NORMAL) {
+        int err = requestPriority(
+                getpid(), getTid(), static_cast<int>(mThreadPriority), true /*asynchronous*/);
+        ALOGW_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
+                static_cast<int>(mThreadPriority), getpid(), getTid(), err);
+    }
+    return OK;
+}
+
+bool WriteThread::threadLoop() {
+    // This implementation doesn't return control back to the Thread until it decides to stop,
+    // as the Thread uses mutexes, and this can lead to priority inversion.
+    while(!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
+        // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+        uint32_t efState = 0;
+        mEfGroup->wait(
+                static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
+        if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
+            continue;  // Nothing to do.
+        }
+
+        const size_t availToRead = mDataMQ->availableToRead();
+        Result retval = Result::OK;
+        uint64_t written = 0;
+        if (mDataMQ->read(&mBuffer[0], availToRead)) {
+            ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
+            if (writeResult >= 0) {
+                written = writeResult;
+            } else {
+                retval = Stream::analyzeStatus("write", writeResult);
+            }
+        }
+        uint64_t frames = 0;
+        struct timespec halTimeStamp = { 0, 0 };
+        if (retval == Result::OK && mStream->get_presentation_position != NULL) {
+            mStream->get_presentation_position(mStream, &frames, &halTimeStamp);
+        }
+        IStreamOut::WriteStatus status = { retval, written, frames,
+                                           { static_cast<uint64_t>(halTimeStamp.tv_sec),
+                                             static_cast<uint64_t>(halTimeStamp.tv_nsec) } };
+        if (!mStatusMQ->write(&status)) {
+            ALOGW("status message queue write failed");
+        }
+        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+    }
+
+    return false;
+}
+
+}  // namespace
+
 StreamOut::StreamOut(audio_hw_device_t* device, audio_stream_out_t* stream)
-        : mDevice(device), mStream(stream),
+        : mIsClosed(false), mDevice(device), mStream(stream),
           mStreamCommon(new Stream(&stream->common)),
-          mStreamMmap(new StreamMmap<audio_stream_out_t>(stream)) {
+          mStreamMmap(new StreamMmap<audio_stream_out_t>(stream)),
+          mEfGroup(nullptr), mStopWriteThread(false) {
 }
 
 StreamOut::~StreamOut() {
-    mCallback.clear();
-    mDevice->close_output_stream(mDevice, mStream);
+    close();
     mStream = nullptr;
     mDevice = nullptr;
 }
@@ -135,6 +224,23 @@
     return mStreamCommon->debugDump(fd);
 }
 
+Return<Result> StreamOut::close()  {
+    if (mIsClosed) return Result::INVALID_STATE;
+    mIsClosed = true;
+    if (mWriteThread.get()) {
+        mStopWriteThread.store(true, std::memory_order_release);
+        status_t status = mWriteThread->requestExitAndWait();
+        ALOGE_IF(status, "write thread exit error: %s", strerror(-status));
+    }
+    if (mEfGroup) {
+        status_t status = EventFlag::deleteEventFlag(&mEfGroup);
+        ALOGE_IF(status, "write MQ event flag deletion error: %s", strerror(-status));
+    }
+    mCallback.clear();
+    mDevice->close_output_stream(mDevice, mStream);
+    return Result::OK;
+}
+
 // Methods from ::android::hardware::audio::V2_0::IStreamOut follow.
 Return<uint32_t> StreamOut::getLatency()  {
     return mStream->get_latency(mStream);
@@ -149,18 +255,55 @@
     return retval;
 }
 
-Return<void> StreamOut::write(const hidl_vec<uint8_t>& data, write_cb _hidl_cb)  {
-    // TODO(mnaganov): Replace with FMQ version.
-    Result retval(Result::OK);
-    uint64_t written = 0;
-    ssize_t writeResult = mStream->write(mStream, &data[0], data.size());
-    if (writeResult >= 0) {
-        written = writeResult;
-    } else {
-        retval = Stream::analyzeStatus("write", writeResult);
-        written = 0;
+Return<void> StreamOut::prepareForWriting(
+        uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+        prepareForWriting_cb _hidl_cb)  {
+    status_t status;
+    // Create message queues.
+    if (mDataMQ) {
+        ALOGE("the client attempts to call prepareForWriting twice");
+        _hidl_cb(Result::INVALID_STATE,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
     }
-    _hidl_cb(retval, written);
+    std::unique_ptr<DataMQ> tempDataMQ(
+            new DataMQ(frameSize * framesCount, true /* EventFlag */));
+    std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
+    if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+        ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
+        ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
+    }
+    // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
+    status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+    if (status != OK || !mEfGroup) {
+        ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
+    }
+
+    // Create and launch the thread.
+    mWriteThread = new WriteThread(
+            &mStopWriteThread,
+            mStream,
+            tempDataMQ.get(),
+            tempStatusMQ.get(),
+            mEfGroup,
+            threadPriority);
+    status = mWriteThread->run("writer", PRIORITY_URGENT_AUDIO);
+    if (status != OK) {
+        ALOGW("failed to start writer thread: %s", strerror(-status));
+        _hidl_cb(Result::INVALID_ARGUMENTS,
+                MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+        return Void();
+    }
+
+    mDataMQ = std::move(tempDataMQ);
+    mStatusMQ = std::move(tempStatusMQ);
+    _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
     return Void();
 }