Stop PCM streams before attempting to close them

In order to break out from a data wait loop in the driver
the stream state must be changed from "running". This is
achieved by calling `pcm_stop` from the stream thread.

Added a dedicated "I/O" thread to 'StreamAlsa' to be able
to call `pcm_stop` while an I/O operation is running. The
"I/O" thread is connected to the worker thread by means
of a 'MonoPipe'.

Bug: 364960013
Test: atest CtsMediaAudioTestCases
Test: atest VtsHalAudioCoreTargetTest
Change-Id: Ibb020d25f42df54baf46a37b50577cce294dc053
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 4525f6a..c138095 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -704,44 +704,7 @@
         LOG(ERROR) << __func__ << ": Worker start error: " << mWorker->getError();
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
     }
-    if (auto flags = getContext().getFlags();
-        (flags.getTag() == AudioIoFlags::Tag::input &&
-         isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
-                              AudioInputFlags::FAST)) ||
-        (flags.getTag() == AudioIoFlags::Tag::output &&
-         (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
-                               AudioOutputFlags::FAST) ||
-          isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
-                               AudioOutputFlags::SPATIALIZER)))) {
-        // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
-        // might be lacking the capability to request it, thus a failure to set is not an error.
-        pid_t workerTid = mWorker->getTid();
-        if (workerTid > 0) {
-            constexpr int32_t kRTPriorityMin = 1;  // SchedulingPolicyService.PRIORITY_MIN (Java).
-            constexpr int32_t kRTPriorityMax = 3;  // SchedulingPolicyService.PRIORITY_MAX (Java).
-            int priorityBoost = kRTPriorityMax;
-            if (flags.getTag() == AudioIoFlags::Tag::output &&
-                isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
-                                     AudioOutputFlags::SPATIALIZER)) {
-                const int32_t sptPrio =
-                        property_get_int32("audio.spatializer.priority", kRTPriorityMin);
-                if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
-                    priorityBoost = sptPrio;
-                } else {
-                    LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
-                    return ndk::ScopedAStatus::ok();
-                }
-            }
-            struct sched_param param = {
-                    .sched_priority = priorityBoost,
-            };
-            if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, &param) != 0) {
-                PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
-            }
-        } else {
-            LOG(WARNING) << __func__ << ": invalid worker tid: " << workerTid;
-        }
-    }
+    setWorkerThreadPriority(mWorker->getTid());
     getContext().getCommandMQ()->setErrorHandler(
             fmqErrorHandler<StreamContext::CommandMQ::Error>("CommandMQ"));
     getContext().getReplyMQ()->setErrorHandler(
@@ -830,6 +793,42 @@
     }
 }
 
+void StreamCommonImpl::setWorkerThreadPriority(pid_t workerTid) {
+    // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
+    // might be lacking the capability to request it, thus a failure to set is not an error.
+    if (auto flags = getContext().getFlags();
+        (flags.getTag() == AudioIoFlags::Tag::input &&
+         isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
+                              AudioInputFlags::FAST)) ||
+        (flags.getTag() == AudioIoFlags::Tag::output &&
+         (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
+                               AudioOutputFlags::FAST) ||
+          isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
+                               AudioOutputFlags::SPATIALIZER)))) {
+        constexpr int32_t kRTPriorityMin = 1;  // SchedulingPolicyService.PRIORITY_MIN (Java).
+        constexpr int32_t kRTPriorityMax = 3;  // SchedulingPolicyService.PRIORITY_MAX (Java).
+        int priorityBoost = kRTPriorityMax;
+        if (flags.getTag() == AudioIoFlags::Tag::output &&
+            isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
+                                 AudioOutputFlags::SPATIALIZER)) {
+            const int32_t sptPrio =
+                    property_get_int32("audio.spatializer.priority", kRTPriorityMin);
+            if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
+                priorityBoost = sptPrio;
+            } else {
+                LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
+                return;
+            }
+        }
+        struct sched_param param = {
+                .sched_priority = priorityBoost,
+        };
+        if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, &param) != 0) {
+            PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
+        }
+    }
+}
+
 void StreamCommonImpl::stopAndJoinWorker() {
     stopWorker();
     LOG(DEBUG) << __func__ << ": joining the worker thread...";
diff --git a/audio/aidl/default/alsa/StreamAlsa.cpp b/audio/aidl/default/alsa/StreamAlsa.cpp
index c77bfca..114c4c0 100644
--- a/audio/aidl/default/alsa/StreamAlsa.cpp
+++ b/audio/aidl/default/alsa/StreamAlsa.cpp
@@ -23,9 +23,12 @@
 #include <Utils.h>
 #include <audio_utils/clock.h>
 #include <error/expected_utils.h>
+#include <media/AidlConversionCppNdk.h>
 
 #include "core-impl/StreamAlsa.h"
 
+using aidl::android::hardware::audio::common::getChannelCount;
+
 namespace aidl::android::hardware::audio::core {
 
 StreamAlsa::StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries)
@@ -41,6 +44,34 @@
     cleanupWorker();
 }
 
+::android::NBAIO_Format StreamAlsa::getPipeFormat() const {
+    const audio_format_t audioFormat = VALUE_OR_FATAL(
+            aidl2legacy_AudioFormatDescription_audio_format_t(getContext().getFormat()));
+    const int channelCount = getChannelCount(getContext().getChannelLayout());
+    return ::android::Format_from_SR_C(getContext().getSampleRate(), channelCount, audioFormat);
+}
+
+::android::sp<::android::MonoPipe> StreamAlsa::makeSink(bool writeCanBlock) {
+    const ::android::NBAIO_Format format = getPipeFormat();
+    auto sink = ::android::sp<::android::MonoPipe>::make(mBufferSizeFrames, format, writeCanBlock);
+    const ::android::NBAIO_Format offers[1] = {format};
+    size_t numCounterOffers = 0;
+    ssize_t index = sink->negotiate(offers, 1, nullptr, numCounterOffers);
+    LOG_IF(FATAL, index != 0) << __func__ << ": Negotiation for the sink failed, index = " << index;
+    return sink;
+}
+
+::android::sp<::android::MonoPipeReader> StreamAlsa::makeSource(::android::MonoPipe* pipe) {
+    const ::android::NBAIO_Format format = getPipeFormat();
+    const ::android::NBAIO_Format offers[1] = {format};
+    auto source = ::android::sp<::android::MonoPipeReader>::make(pipe);
+    size_t numCounterOffers = 0;
+    ssize_t index = source->negotiate(offers, 1, nullptr, numCounterOffers);
+    LOG_IF(FATAL, index != 0) << __func__
+                              << ": Negotiation for the source failed, index = " << index;
+    return source;
+}
+
 ::android::status_t StreamAlsa::init() {
     return mConfig.has_value() ? ::android::OK : ::android::NO_INIT;
 }
@@ -64,7 +95,7 @@
 }
 
 ::android::status_t StreamAlsa::standby() {
-    mAlsaDeviceProxies.clear();
+    teardownIo();
     return ::android::OK;
 }
 
@@ -74,6 +105,8 @@
         return ::android::OK;
     }
     decltype(mAlsaDeviceProxies) alsaDeviceProxies;
+    decltype(mSources) sources;
+    decltype(mSinks) sinks;
     for (const auto& device : getDeviceProfiles()) {
         if ((device.direction == PCM_OUT && mIsInput) ||
             (device.direction == PCM_IN && !mIsInput)) {
@@ -95,11 +128,29 @@
             return ::android::NO_INIT;
         }
         alsaDeviceProxies.push_back(std::move(proxy));
+        auto sink = makeSink(mIsInput);  // Do not block the writer when it is on our thread.
+        if (sink != nullptr) {
+            sinks.push_back(sink);
+        } else {
+            return ::android::NO_INIT;
+        }
+        if (auto source = makeSource(sink.get()); source != nullptr) {
+            sources.push_back(source);
+        } else {
+            return ::android::NO_INIT;
+        }
     }
     if (alsaDeviceProxies.empty()) {
         return ::android::NO_INIT;
     }
     mAlsaDeviceProxies = std::move(alsaDeviceProxies);
+    mSources = std::move(sources);
+    mSinks = std::move(sinks);
+    mIoThreadIsRunning = true;
+    for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
+        mIoThreads.emplace_back(mIsInput ? &StreamAlsa::inputIoThread : &StreamAlsa::outputIoThread,
+                                this, i);
+    }
     return ::android::OK;
 }
 
@@ -112,15 +163,30 @@
     const size_t bytesToTransfer = frameCount * mFrameSizeBytes;
     unsigned maxLatency = 0;
     if (mIsInput) {
-        // For input case, only support single device.
-        proxy_read_with_retries(mAlsaDeviceProxies[0].get(), buffer, bytesToTransfer,
-                                mReadWriteRetries);
-        maxLatency = proxy_get_latency(mAlsaDeviceProxies[0].get());
+        const size_t i = 0;  // For the input case, only support a single device.
+        LOG(VERBOSE) << __func__ << ": reading from sink " << i;
+        ssize_t framesRead = mSources[i]->read(buffer, frameCount);
+        LOG_IF(FATAL, framesRead < 0) << "Error reading from the pipe: " << framesRead;
+        if (ssize_t framesMissing = static_cast<ssize_t>(frameCount) - framesRead;
+            framesMissing > 0) {
+            LOG(WARNING) << __func__ << ": incomplete data received, inserting " << framesMissing
+                         << " frames of silence";
+            memset(static_cast<char*>(buffer) + framesRead * mFrameSizeBytes, 0,
+                   framesMissing * mFrameSizeBytes);
+        }
+        maxLatency = proxy_get_latency(mAlsaDeviceProxies[i].get());
     } else {
         alsa::applyGain(buffer, mGain, bytesToTransfer, mConfig.value().format, mConfig->channels);
-        for (auto& proxy : mAlsaDeviceProxies) {
-            proxy_write_with_retries(proxy.get(), buffer, bytesToTransfer, mReadWriteRetries);
-            maxLatency = std::max(maxLatency, proxy_get_latency(proxy.get()));
+        for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
+            LOG(VERBOSE) << __func__ << ": writing into sink " << i;
+            ssize_t framesWritten = mSinks[i]->write(buffer, frameCount);
+            LOG_IF(FATAL, framesWritten < 0) << "Error writing into the pipe: " << framesWritten;
+            if (ssize_t framesLost = static_cast<ssize_t>(frameCount) - framesWritten;
+                framesLost > 0) {
+                LOG(WARNING) << __func__ << ": sink " << i << " incomplete data sent, dropping "
+                             << framesLost << " frames";
+            }
+            maxLatency = std::max(maxLatency, proxy_get_latency(mAlsaDeviceProxies[i].get()));
         }
     }
     *actualFrameCount = frameCount;
@@ -164,7 +230,7 @@
 }
 
 void StreamAlsa::shutdown() {
-    mAlsaDeviceProxies.clear();
+    teardownIo();
 }
 
 ndk::ScopedAStatus StreamAlsa::setGain(float gain) {
@@ -172,4 +238,80 @@
     return ndk::ScopedAStatus::ok();
 }
 
+void StreamAlsa::inputIoThread(size_t idx) {
+#if defined(__ANDROID__)
+    setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
+    const std::string threadName = (std::string("in_") + std::to_string(idx)).substr(0, 15);
+    pthread_setname_np(pthread_self(), threadName.c_str());
+#endif
+    const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
+    std::vector<char> buffer(bufferSize);
+    while (mIoThreadIsRunning) {
+        if (int ret = proxy_read_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0], bufferSize,
+                                              mReadWriteRetries);
+            ret == 0) {
+            size_t bufferFramesWritten = 0;
+            while (bufferFramesWritten < mBufferSizeFrames) {
+                if (!mIoThreadIsRunning) return;
+                ssize_t framesWrittenOrError =
+                        mSinks[idx]->write(&buffer[0], mBufferSizeFrames - bufferFramesWritten);
+                if (framesWrittenOrError >= 0) {
+                    bufferFramesWritten += framesWrittenOrError;
+                } else {
+                    LOG(WARNING) << __func__ << "[" << idx
+                                 << "]: Error while writing into the pipe: "
+                                 << framesWrittenOrError;
+                }
+            }
+        } else {
+            // Errors when the stream is being stopped are expected.
+            LOG_IF(WARNING, mIoThreadIsRunning)
+                    << __func__ << "[" << idx << "]: Error reading from ALSA: " << ret;
+        }
+    }
+}
+
+void StreamAlsa::outputIoThread(size_t idx) {
+#if defined(__ANDROID__)
+    setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
+    const std::string threadName = (std::string("out_") + std::to_string(idx)).substr(0, 15);
+    pthread_setname_np(pthread_self(), threadName.c_str());
+#endif
+    const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
+    std::vector<char> buffer(bufferSize);
+    while (mIoThreadIsRunning) {
+        ssize_t framesRead = mSources[idx]->read(&buffer[0], mBufferSizeFrames);
+        if (framesRead > 0) {
+            int ret = proxy_write_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0],
+                                               framesRead * mFrameSizeBytes, mReadWriteRetries);
+            // Errors when the stream is being stopped are expected.
+            LOG_IF(WARNING, ret != 0 && mIoThreadIsRunning)
+                    << __func__ << "[" << idx << "]: Error writing into ALSA: " << ret;
+        }
+    }
+}
+
+void StreamAlsa::teardownIo() {
+    mIoThreadIsRunning = false;
+    if (mIsInput) {
+        LOG(DEBUG) << __func__ << ": shutting down pipes";
+        for (auto& sink : mSinks) {
+            sink->shutdown(true);
+        }
+    }
+    LOG(DEBUG) << __func__ << ": stopping PCM streams";
+    for (const auto& proxy : mAlsaDeviceProxies) {
+        proxy_stop(proxy.get());
+    }
+    LOG(DEBUG) << __func__ << ": joining threads";
+    for (auto& thread : mIoThreads) {
+        if (thread.joinable()) thread.join();
+    }
+    mIoThreads.clear();
+    LOG(DEBUG) << __func__ << ": closing PCM devices";
+    mAlsaDeviceProxies.clear();
+    mSources.clear();
+    mSinks.clear();
+}
+
 }  // namespace aidl::android::hardware::audio::core
diff --git a/audio/aidl/default/alsa/Utils.h b/audio/aidl/default/alsa/Utils.h
index a97ea10..53dcfd0 100644
--- a/audio/aidl/default/alsa/Utils.h
+++ b/audio/aidl/default/alsa/Utils.h
@@ -48,8 +48,8 @@
   public:
     DeviceProxy();  // Constructs a "null" proxy.
     explicit DeviceProxy(const DeviceProfile& deviceProfile);
-    alsa_device_profile* getProfile() { return mProfile.get(); }
-    alsa_device_proxy* get() { return mProxy.get(); }
+    alsa_device_profile* getProfile() const { return mProfile.get(); }
+    alsa_device_proxy* get() const { return mProxy.get(); }
 
   private:
     static void alsaProxyDeleter(alsa_device_proxy* proxy);
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 8297fc5..304f9b7 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -475,6 +475,7 @@
     // the destructor in order to stop and join the worker thread in the case when the client
     // has not called 'IStreamCommon::close' method.
     void cleanupWorker();
+    void setWorkerThreadPriority(pid_t workerTid);
     void stopAndJoinWorker();
     void stopWorker();
 
diff --git a/audio/aidl/default/include/core-impl/StreamAlsa.h b/audio/aidl/default/include/core-impl/StreamAlsa.h
index 8bdf208..7e0f0ac 100644
--- a/audio/aidl/default/include/core-impl/StreamAlsa.h
+++ b/audio/aidl/default/include/core-impl/StreamAlsa.h
@@ -16,9 +16,14 @@
 
 #pragma once
 
+#include <atomic>
 #include <optional>
+#include <thread>
 #include <vector>
 
+#include <media/nbaio/MonoPipe.h>
+#include <media/nbaio/MonoPipeReader.h>
+
 #include "Stream.h"
 #include "alsa/Utils.h"
 
@@ -57,11 +62,24 @@
     const bool mIsInput;
     const std::optional<struct pcm_config> mConfig;
     const int mReadWriteRetries;
-    // All fields below are only used on the worker thread.
-    std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;
 
   private:
+    ::android::NBAIO_Format getPipeFormat() const;
+    ::android::sp<::android::MonoPipe> makeSink(bool writeCanBlock);
+    ::android::sp<::android::MonoPipeReader> makeSource(::android::MonoPipe* pipe);
+    void inputIoThread(size_t idx);
+    void outputIoThread(size_t idx);
+    void teardownIo();
+
     std::atomic<float> mGain = 1.0;
+
+    // All fields below are only used on the worker thread.
+    std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;
+    // Only 'libnbaio_mono' is vendor-accessible, thus no access to the multi-reader Pipe.
+    std::vector<::android::sp<::android::MonoPipe>> mSinks;
+    std::vector<::android::sp<::android::MonoPipeReader>> mSources;
+    std::vector<std::thread> mIoThreads;
+    std::atomic<bool> mIoThreadIsRunning = false;  // used by all threads
 };
 
 }  // namespace aidl::android::hardware::audio::core