Merge "audio: Fix remote submix module I/O timing and atomicity" into main am: 5260337cbb

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2855474

Change-Id: I8009303c123c5475efcc1744372345a37439f782
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/audio/aidl/default/Configuration.cpp b/audio/aidl/default/Configuration.cpp
index 254eb46..d63e353 100644
--- a/audio/aidl/default/Configuration.cpp
+++ b/audio/aidl/default/Configuration.cpp
@@ -320,9 +320,9 @@
 //    - no profiles specified
 //
 // Mix ports:
-//  * "r_submix output", maximum 20 opened streams, maximum 10 active streams
+//  * "r_submix output", maximum 10 opened streams, maximum 10 active streams
 //    - profile PCM 16-bit; MONO, STEREO; 8000, 11025, 16000, 32000, 44100, 48000
-//  * "r_submix input", maximum 20 opened streams, maximum 10 active streams
+//  * "r_submix input", maximum 10 opened streams, maximum 10 active streams
 //    - profile PCM 16-bit; MONO, STEREO; 8000, 11025, 16000, 32000, 44100, 48000
 //
 // Routes:
@@ -355,12 +355,12 @@
         // Mix ports
 
         AudioPort rsubmixOutMix =
-                createPort(c.nextPortId++, "r_submix output", 0, false, createPortMixExt(20, 10));
+                createPort(c.nextPortId++, "r_submix output", 0, false, createPortMixExt(10, 10));
         rsubmixOutMix.profiles = remoteSubmixPcmAudioProfiles;
         c.ports.push_back(rsubmixOutMix);
 
         AudioPort rsubmixInMix =
-                createPort(c.nextPortId++, "r_submix input", 0, true, createPortMixExt(20, 10));
+                createPort(c.nextPortId++, "r_submix input", 0, true, createPortMixExt(10, 10));
         rsubmixInMix.profiles = remoteSubmixPcmAudioProfiles;
         c.ports.push_back(rsubmixInMix);
 
diff --git a/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h b/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h
index 21592b3..ee10abf 100644
--- a/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h
+++ b/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h
@@ -71,6 +71,10 @@
     static constexpr int kMaxReadFailureAttempts = 3;
     // 5ms between two read attempts when pipe is empty
     static constexpr int kReadAttemptSleepUs = 5000;
+
+    long mStartTimeNs = 0;
+    long mFramesSinceStart = 0;
+    int mReadErrorCount = 0;
 };
 
 class StreamInRemoteSubmix final : public StreamIn, public StreamSwitcher {
diff --git a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
index fc61dcb..6258c93 100644
--- a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
+++ b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
@@ -16,6 +16,9 @@
 
 #define LOG_TAG "AHAL_StreamRemoteSubmix"
 #include <android-base/logging.h>
+#include <audio_utils/clock.h>
+#include <error/Result.h>
+#include <error/expected_utils.h>
 
 #include "core-impl/StreamRemoteSubmix.h"
 
@@ -50,37 +53,33 @@
         if (routeItr != sSubmixRoutes.end()) {
             mCurrentRoute = routeItr->second;
         }
-    }
-    // If route is not available for this port, add it.
-    if (mCurrentRoute == nullptr) {
-        // Initialize the pipe.
-        mCurrentRoute = std::make_shared<SubmixRoute>();
-        if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
-            LOG(ERROR) << __func__ << ": create pipe failed";
-            return ::android::NO_INIT;
-        }
-        {
-            std::lock_guard guard(sSubmixRoutesLock);
-            sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
-        }
-    } else {
-        if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
-            LOG(ERROR) << __func__ << ": invalid stream config";
-            return ::android::NO_INIT;
-        }
-        sp<MonoPipe> sink = mCurrentRoute->getSink();
-        if (sink == nullptr) {
-            LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
-            return ::android::NO_INIT;
-        }
-        // If the sink has been shutdown or pipe recreation is forced, delete the pipe and
-        // recreate it.
-        if (sink->isShutdown()) {
-            LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
-            if (::android::OK != mCurrentRoute->resetPipe()) {
-                LOG(ERROR) << __func__ << ": reset pipe failed";
+        // If route is not available for this port, add it.
+        if (mCurrentRoute == nullptr) {
+            // Initialize the pipe.
+            mCurrentRoute = std::make_shared<SubmixRoute>();
+            if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
+                LOG(ERROR) << __func__ << ": create pipe failed";
                 return ::android::NO_INIT;
             }
+            sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
+        }
+    }
+    if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
+        LOG(ERROR) << __func__ << ": invalid stream config";
+        return ::android::NO_INIT;
+    }
+    sp<MonoPipe> sink = mCurrentRoute->getSink();
+    if (sink == nullptr) {
+        LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
+        return ::android::NO_INIT;
+    }
+    // If the sink has been shutdown or pipe recreation is forced, delete the pipe and
+    // recreate it.
+    if (sink->isShutdown()) {
+        LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
+        if (::android::OK != mCurrentRoute->resetPipe()) {
+            LOG(ERROR) << __func__ << ": reset pipe failed";
+            return ::android::NO_INIT;
         }
     }
 
@@ -110,6 +109,8 @@
 
 ::android::status_t StreamRemoteSubmix::start() {
     mCurrentRoute->exitStandby(mIsInput);
+    mStartTimeNs = ::android::uptimeNanos();
+    mFramesSinceStart = 0;
     return ::android::OK;
 }
 
@@ -161,8 +162,21 @@
     *latencyMs = getDelayInUsForFrameCount(getStreamPipeSizeInFrames()) / 1000;
     LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";
     mCurrentRoute->exitStandby(mIsInput);
-    return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
-                     : outWrite(buffer, frameCount, actualFrameCount));
+    RETURN_STATUS_IF_ERROR(mIsInput ? inRead(buffer, frameCount, actualFrameCount)
+                                    : outWrite(buffer, frameCount, actualFrameCount));
+    const long bufferDurationUs =
+            (*actualFrameCount) * MICROS_PER_SECOND / mContext.getSampleRate();
+    const long totalDurationUs = (::android::uptimeNanos() - mStartTimeNs) / NANOS_PER_MICROSECOND;
+    mFramesSinceStart += *actualFrameCount;
+    const long totalOffsetUs =
+            mFramesSinceStart * MICROS_PER_SECOND / mContext.getSampleRate() - totalDurationUs;
+    LOG(VERBOSE) << __func__ << ": totalOffsetUs " << totalOffsetUs;
+    if (totalOffsetUs > 0) {
+        const long sleepTimeUs = std::min(totalOffsetUs, bufferDurationUs);
+        LOG(VERBOSE) << __func__ << ": sleeping for " << sleepTimeUs << " us";
+        usleep(sleepTimeUs);
+    }
+    return ::android::OK;
 }
 
 ::android::status_t StreamRemoteSubmix::refinePosition(StreamDescriptor::Position* position) {
@@ -200,12 +214,7 @@
     if (sink != nullptr) {
         if (sink->isShutdown()) {
             sink.clear();
-            const auto delayUs = getDelayInUsForFrameCount(frameCount);
-            LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write, sleeping for "
-                       << delayUs << " us";
-            // the pipe has already been shutdown, this buffer will be lost but we must
-            // simulate timing so we don't drain the output faster than realtime
-            usleep(delayUs);
+            LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write";
             *actualFrameCount = frameCount;
             return ::android::OK;
         }
@@ -214,6 +223,9 @@
         return ::android::UNKNOWN_ERROR;
     }
 
+    LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
+                 << " frames";
+
     const bool shouldBlockWrite = mCurrentRoute->shouldBlockWrite();
     size_t availableToWrite = sink->availableToWrite();
     // NOTE: sink has been checked above and sink and source life cycles are synchronized
@@ -236,6 +248,8 @@
     availableToWrite = sink->availableToWrite();
 
     if (!shouldBlockWrite && frameCount > availableToWrite) {
+        LOG(WARNING) << __func__ << ": writing " << availableToWrite << " vs. requested "
+                     << frameCount;
         // Truncate the request to avoid blocking.
         frameCount = availableToWrite;
     }
@@ -258,92 +272,59 @@
         *actualFrameCount = 0;
         return ::android::UNKNOWN_ERROR;
     }
-    LOG(VERBOSE) << __func__ << ": wrote " << writtenFrames << "frames";
+    if (writtenFrames > 0 && frameCount > (size_t)writtenFrames) {
+        LOG(WARNING) << __func__ << ": wrote " << writtenFrames << " vs. requested " << frameCount;
+    }
     *actualFrameCount = writtenFrames;
     return ::android::OK;
 }
 
 ::android::status_t StreamRemoteSubmix::inRead(void* buffer, size_t frameCount,
                                                size_t* actualFrameCount) {
+    // in any case, it is emulated that data for the entire buffer was available
+    memset(buffer, 0, mStreamConfig.frameSize * frameCount);
+    *actualFrameCount = frameCount;
+
     // about to read from audio source
     sp<MonoPipeReader> source = mCurrentRoute->getSource();
     if (source == nullptr) {
-        int readErrorCount = mCurrentRoute->notifyReadError();
-        if (readErrorCount < kMaxReadErrorLogs) {
+        if (++mReadErrorCount < kMaxReadErrorLogs) {
             LOG(ERROR) << __func__
                        << ": no audio pipe yet we're trying to read! (not all errors will be "
                           "logged)";
-        } else {
-            LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
         }
-        const auto delayUs = getDelayInUsForFrameCount(frameCount);
-        LOG(DEBUG) << __func__ << ": no source, ignoring the read, sleeping for " << delayUs
-                   << " us";
-        usleep(delayUs);
-        memset(buffer, 0, mStreamConfig.frameSize * frameCount);
-        *actualFrameCount = frameCount;
         return ::android::OK;
     }
 
+    LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
+                 << " frames";
     // read the data from the pipe
-    int attempts = 0;
-    const long delayUs = kReadAttemptSleepUs;
     char* buff = (char*)buffer;
-    size_t remainingFrames = frameCount;
-    int availableToRead = source->availableToRead();
-
-    while ((remainingFrames > 0) && (availableToRead > 0) && (attempts < kMaxReadFailureAttempts)) {
-        LOG(VERBOSE) << __func__ << ": frames available to read " << availableToRead;
-
+    size_t actuallyRead = 0;
+    long remainingFrames = frameCount;
+    const long deadlineTimeNs = ::android::uptimeNanos() +
+                                getDelayInUsForFrameCount(frameCount) * NANOS_PER_MICROSECOND;
+    while (remainingFrames > 0) {
         ssize_t framesRead = source->read(buff, remainingFrames);
-
         LOG(VERBOSE) << __func__ << ": frames read " << framesRead;
-
         if (framesRead > 0) {
             remainingFrames -= framesRead;
             buff += framesRead * mStreamConfig.frameSize;
-            availableToRead -= framesRead;
-            LOG(VERBOSE) << __func__ << ": (attempts = " << attempts << ") got " << framesRead
+            LOG(VERBOSE) << __func__ << ": got " << framesRead
                          << " frames, remaining =" << remainingFrames;
-        } else {
-            attempts++;
-            LOG(WARNING) << __func__ << ": read returned " << framesRead
-                         << " , read failure attempts = " << attempts << ", sleeping for "
-                         << delayUs << " us";
-            usleep(delayUs);
+            actuallyRead += framesRead;
+        }
+        if (::android::uptimeNanos() >= deadlineTimeNs) break;
+        if (framesRead <= 0) {
+            LOG(VERBOSE) << __func__ << ": read returned " << framesRead
+                         << ", read failure, sleeping for " << kReadAttemptSleepUs << " us";
+            usleep(kReadAttemptSleepUs);
         }
     }
-    // done using the source
-    source.clear();
-
-    if (remainingFrames > 0) {
-        const size_t remainingBytes = remainingFrames * mStreamConfig.frameSize;
-        LOG(VERBOSE) << __func__ << ": clearing remaining_frames = " << remainingFrames;
-        memset(((char*)buffer) + (mStreamConfig.frameSize * frameCount) - remainingBytes, 0,
-               remainingBytes);
+    if (actuallyRead < frameCount) {
+        LOG(WARNING) << __func__ << ": read " << actuallyRead << " vs. requested " << frameCount;
     }
-
-    long readCounterFrames = mCurrentRoute->updateReadCounterFrames(frameCount);
-    *actualFrameCount = frameCount;
-
-    // compute how much we need to sleep after reading the data by comparing the wall clock with
-    //   the projected time at which we should return.
-    // wall clock after reading from the pipe
-    auto recordDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(
-            std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime());
-
-    // readCounterFrames contains the number of frames that have been read since the beginning of
-    // recording (including this call): it's converted to usec and compared to how long we've been
-    // recording for, which gives us how long we must wait to sync the projected recording time, and
-    // the observed recording time.
-    const long projectedVsObservedOffsetUs =
-            getDelayInUsForFrameCount(readCounterFrames) - recordDurationUs.count();
-
-    LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
-                 << " us, will wait: " << projectedVsObservedOffsetUs << " us";
-    if (projectedVsObservedOffsetUs > 0) {
-        usleep(projectedVsObservedOffsetUs);
-    }
+    mCurrentRoute->updateReadCounterFrames(*actualFrameCount);
     return ::android::OK;
 }
 
diff --git a/audio/aidl/default/r_submix/SubmixRoute.cpp b/audio/aidl/default/r_submix/SubmixRoute.cpp
index ddac64d..f04e607 100644
--- a/audio/aidl/default/r_submix/SubmixRoute.cpp
+++ b/audio/aidl/default/r_submix/SubmixRoute.cpp
@@ -81,11 +81,6 @@
     return (mStreamInOpen || (mStreamInStandby && (mReadCounterFrames != 0)));
 }
 
-int SubmixRoute::notifyReadError() {
-    std::lock_guard guard(mLock);
-    return ++mReadErrorCount;
-}
-
 long SubmixRoute::updateReadCounterFrames(size_t frameCount) {
     std::lock_guard guard(mLock);
     mReadCounterFrames += frameCount;
@@ -103,7 +98,6 @@
         }
         mStreamInStandby = true;
         mReadCounterFrames = 0;
-        mReadErrorCount = 0;
     } else {
         mStreamOutOpen = true;
     }
@@ -214,9 +208,6 @@
         if (mStreamInStandby || mStreamOutStandbyTransition) {
             mStreamInStandby = false;
             mStreamOutStandbyTransition = false;
-            // keep track of when we exit input standby (== first read == start "real recording")
-            // or when we start recording silence, and reset projected time
-            mRecordStartTime = std::chrono::steady_clock::now();
             mReadCounterFrames = 0;
         }
     } else {
diff --git a/audio/aidl/default/r_submix/SubmixRoute.h b/audio/aidl/default/r_submix/SubmixRoute.h
index 92b95e9..252b1c9 100644
--- a/audio/aidl/default/r_submix/SubmixRoute.h
+++ b/audio/aidl/default/r_submix/SubmixRoute.h
@@ -16,7 +16,6 @@
 
 #pragma once
 
-#include <chrono>
 #include <mutex>
 
 #include <android-base/thread_annotations.h>
@@ -83,14 +82,6 @@
         std::lock_guard guard(mLock);
         return mReadCounterFrames;
     }
-    int getReadErrorCount() {
-        std::lock_guard guard(mLock);
-        return mReadErrorCount;
-    }
-    std::chrono::time_point<std::chrono::steady_clock> getRecordStartTime() {
-        std::lock_guard guard(mLock);
-        return mRecordStartTime;
-    }
     sp<MonoPipe> getSink() {
         std::lock_guard guard(mLock);
         return mSink;
@@ -126,9 +117,6 @@
     bool mStreamOutStandby GUARDED_BY(mLock) = true;
     // how many frames have been requested to be read since standby
     long mReadCounterFrames GUARDED_BY(mLock) = 0;
-    int mReadErrorCount GUARDED_BY(mLock) = 0;
-    // wall clock when recording starts
-    std::chrono::time_point<std::chrono::steady_clock> mRecordStartTime GUARDED_BY(mLock);
 
     // Pipe variables: they handle the ring buffer that "pipes" audio:
     //  - from the submix virtual audio output == what needs to be played