audio: Fix remote submix module I/O timing and atomicity
Similar to the primary module implementation, align the time
spent in the transfer operation with the duration of audio.
Change the read operation to ingest as much data as possible
during the audio burst duration.
Ensure that checking the existence of a SubmixRoute and adding
a new one is an atomic operation.
Minor improvements to avoid extra synchronization.
In the configuration, change the limit of max open streams to 10
to match the legacy implementation.
Bug: 302132812
Test: atest CtsMediaAudioTestCases --test-filter=".*AudioPlaybackCaptureTest#testPlaybackCaptureDoS"
Change-Id: Iccb6aaac46c039551c3d5f7760b1459168d9cfe5
diff --git a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
index fc61dcb..6258c93 100644
--- a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
+++ b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
@@ -16,6 +16,9 @@
#define LOG_TAG "AHAL_StreamRemoteSubmix"
#include <android-base/logging.h>
+#include <audio_utils/clock.h>
+#include <error/Result.h>
+#include <error/expected_utils.h>
#include "core-impl/StreamRemoteSubmix.h"
@@ -50,37 +53,33 @@
if (routeItr != sSubmixRoutes.end()) {
mCurrentRoute = routeItr->second;
}
- }
- // If route is not available for this port, add it.
- if (mCurrentRoute == nullptr) {
- // Initialize the pipe.
- mCurrentRoute = std::make_shared<SubmixRoute>();
- if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
- LOG(ERROR) << __func__ << ": create pipe failed";
- return ::android::NO_INIT;
- }
- {
- std::lock_guard guard(sSubmixRoutesLock);
- sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
- }
- } else {
- if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
- LOG(ERROR) << __func__ << ": invalid stream config";
- return ::android::NO_INIT;
- }
- sp<MonoPipe> sink = mCurrentRoute->getSink();
- if (sink == nullptr) {
- LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
- return ::android::NO_INIT;
- }
- // If the sink has been shutdown or pipe recreation is forced, delete the pipe and
- // recreate it.
- if (sink->isShutdown()) {
- LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
- if (::android::OK != mCurrentRoute->resetPipe()) {
- LOG(ERROR) << __func__ << ": reset pipe failed";
+ // If route is not available for this port, add it.
+ if (mCurrentRoute == nullptr) {
+ // Initialize the pipe.
+ mCurrentRoute = std::make_shared<SubmixRoute>();
+ if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
+ LOG(ERROR) << __func__ << ": create pipe failed";
return ::android::NO_INIT;
}
+ sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
+ }
+ }
+ if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
+ LOG(ERROR) << __func__ << ": invalid stream config";
+ return ::android::NO_INIT;
+ }
+ sp<MonoPipe> sink = mCurrentRoute->getSink();
+ if (sink == nullptr) {
+ LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
+ return ::android::NO_INIT;
+ }
+ // If the sink has been shutdown or pipe recreation is forced, delete the pipe and
+ // recreate it.
+ if (sink->isShutdown()) {
+ LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
+ if (::android::OK != mCurrentRoute->resetPipe()) {
+ LOG(ERROR) << __func__ << ": reset pipe failed";
+ return ::android::NO_INIT;
}
}
@@ -110,6 +109,8 @@
::android::status_t StreamRemoteSubmix::start() {
mCurrentRoute->exitStandby(mIsInput);
+ mStartTimeNs = ::android::uptimeNanos();
+ mFramesSinceStart = 0;
return ::android::OK;
}
@@ -161,8 +162,21 @@
*latencyMs = getDelayInUsForFrameCount(getStreamPipeSizeInFrames()) / 1000;
LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";
mCurrentRoute->exitStandby(mIsInput);
- return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
- : outWrite(buffer, frameCount, actualFrameCount));
+ RETURN_STATUS_IF_ERROR(mIsInput ? inRead(buffer, frameCount, actualFrameCount)
+ : outWrite(buffer, frameCount, actualFrameCount));
+ const long bufferDurationUs =
+ (*actualFrameCount) * MICROS_PER_SECOND / mContext.getSampleRate();
+ const long totalDurationUs = (::android::uptimeNanos() - mStartTimeNs) / NANOS_PER_MICROSECOND;
+ mFramesSinceStart += *actualFrameCount;
+ const long totalOffsetUs =
+ mFramesSinceStart * MICROS_PER_SECOND / mContext.getSampleRate() - totalDurationUs;
+ LOG(VERBOSE) << __func__ << ": totalOffsetUs " << totalOffsetUs;
+ if (totalOffsetUs > 0) {
+ const long sleepTimeUs = std::min(totalOffsetUs, bufferDurationUs);
+ LOG(VERBOSE) << __func__ << ": sleeping for " << sleepTimeUs << " us";
+ usleep(sleepTimeUs);
+ }
+ return ::android::OK;
}
::android::status_t StreamRemoteSubmix::refinePosition(StreamDescriptor::Position* position) {
@@ -200,12 +214,7 @@
if (sink != nullptr) {
if (sink->isShutdown()) {
sink.clear();
- const auto delayUs = getDelayInUsForFrameCount(frameCount);
- LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write, sleeping for "
- << delayUs << " us";
- // the pipe has already been shutdown, this buffer will be lost but we must
- // simulate timing so we don't drain the output faster than realtime
- usleep(delayUs);
+ LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write";
*actualFrameCount = frameCount;
return ::android::OK;
}
@@ -214,6 +223,9 @@
return ::android::UNKNOWN_ERROR;
}
+ LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
+ << " frames";
+
const bool shouldBlockWrite = mCurrentRoute->shouldBlockWrite();
size_t availableToWrite = sink->availableToWrite();
// NOTE: sink has been checked above and sink and source life cycles are synchronized
@@ -236,6 +248,8 @@
availableToWrite = sink->availableToWrite();
if (!shouldBlockWrite && frameCount > availableToWrite) {
+ LOG(WARNING) << __func__ << ": writing " << availableToWrite << " vs. requested "
+ << frameCount;
// Truncate the request to avoid blocking.
frameCount = availableToWrite;
}
@@ -258,92 +272,59 @@
*actualFrameCount = 0;
return ::android::UNKNOWN_ERROR;
}
- LOG(VERBOSE) << __func__ << ": wrote " << writtenFrames << "frames";
+ if (writtenFrames > 0 && frameCount > (size_t)writtenFrames) {
+ LOG(WARNING) << __func__ << ": wrote " << writtenFrames << " vs. requested " << frameCount;
+ }
*actualFrameCount = writtenFrames;
return ::android::OK;
}
::android::status_t StreamRemoteSubmix::inRead(void* buffer, size_t frameCount,
size_t* actualFrameCount) {
+ // in any case, it is emulated that data for the entire buffer was available
+ memset(buffer, 0, mStreamConfig.frameSize * frameCount);
+ *actualFrameCount = frameCount;
+
// about to read from audio source
sp<MonoPipeReader> source = mCurrentRoute->getSource();
if (source == nullptr) {
- int readErrorCount = mCurrentRoute->notifyReadError();
- if (readErrorCount < kMaxReadErrorLogs) {
+ if (++mReadErrorCount < kMaxReadErrorLogs) {
LOG(ERROR) << __func__
<< ": no audio pipe yet we're trying to read! (not all errors will be "
"logged)";
- } else {
- LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
}
- const auto delayUs = getDelayInUsForFrameCount(frameCount);
- LOG(DEBUG) << __func__ << ": no source, ignoring the read, sleeping for " << delayUs
- << " us";
- usleep(delayUs);
- memset(buffer, 0, mStreamConfig.frameSize * frameCount);
- *actualFrameCount = frameCount;
return ::android::OK;
}
+ LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
+ << " frames";
// read the data from the pipe
- int attempts = 0;
- const long delayUs = kReadAttemptSleepUs;
char* buff = (char*)buffer;
- size_t remainingFrames = frameCount;
- int availableToRead = source->availableToRead();
-
- while ((remainingFrames > 0) && (availableToRead > 0) && (attempts < kMaxReadFailureAttempts)) {
- LOG(VERBOSE) << __func__ << ": frames available to read " << availableToRead;
-
+ size_t actuallyRead = 0;
+ long remainingFrames = frameCount;
+ const long deadlineTimeNs = ::android::uptimeNanos() +
+ getDelayInUsForFrameCount(frameCount) * NANOS_PER_MICROSECOND;
+ while (remainingFrames > 0) {
ssize_t framesRead = source->read(buff, remainingFrames);
-
LOG(VERBOSE) << __func__ << ": frames read " << framesRead;
-
if (framesRead > 0) {
remainingFrames -= framesRead;
buff += framesRead * mStreamConfig.frameSize;
- availableToRead -= framesRead;
- LOG(VERBOSE) << __func__ << ": (attempts = " << attempts << ") got " << framesRead
+ LOG(VERBOSE) << __func__ << ": got " << framesRead
<< " frames, remaining =" << remainingFrames;
- } else {
- attempts++;
- LOG(WARNING) << __func__ << ": read returned " << framesRead
- << " , read failure attempts = " << attempts << ", sleeping for "
- << delayUs << " us";
- usleep(delayUs);
+ actuallyRead += framesRead;
+ }
+ if (::android::uptimeNanos() >= deadlineTimeNs) break;
+ if (framesRead <= 0) {
+ LOG(VERBOSE) << __func__ << ": read returned " << framesRead
+ << ", read failure, sleeping for " << kReadAttemptSleepUs << " us";
+ usleep(kReadAttemptSleepUs);
}
}
- // done using the source
- source.clear();
-
- if (remainingFrames > 0) {
- const size_t remainingBytes = remainingFrames * mStreamConfig.frameSize;
- LOG(VERBOSE) << __func__ << ": clearing remaining_frames = " << remainingFrames;
- memset(((char*)buffer) + (mStreamConfig.frameSize * frameCount) - remainingBytes, 0,
- remainingBytes);
+ if (actuallyRead < frameCount) {
+ LOG(WARNING) << __func__ << ": read " << actuallyRead << " vs. requested " << frameCount;
}
-
- long readCounterFrames = mCurrentRoute->updateReadCounterFrames(frameCount);
- *actualFrameCount = frameCount;
-
- // compute how much we need to sleep after reading the data by comparing the wall clock with
- // the projected time at which we should return.
- // wall clock after reading from the pipe
- auto recordDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime());
-
- // readCounterFrames contains the number of frames that have been read since the beginning of
- // recording (including this call): it's converted to usec and compared to how long we've been
- // recording for, which gives us how long we must wait to sync the projected recording time, and
- // the observed recording time.
- const long projectedVsObservedOffsetUs =
- getDelayInUsForFrameCount(readCounterFrames) - recordDurationUs.count();
-
- LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
- << " us, will wait: " << projectedVsObservedOffsetUs << " us";
- if (projectedVsObservedOffsetUs > 0) {
- usleep(projectedVsObservedOffsetUs);
- }
+ mCurrentRoute->updateReadCounterFrames(*actualFrameCount);
return ::android::OK;
}