Stop PCM streams before attempting to close them
In order to break out from a data wait loop in the driver
the stream state must be changed from "running". This is
achieved by calling `pcm_stop` from the stream thread.
Added a dedicated "I/O" thread to 'StreamAlsa' to be able
to call `pcm_stop` while an I/O operation is running. The
"I/O" thread is connected to the worker thread by means
of a 'MonoPipe'.
Bug: 364960013
Test: atest CtsMediaAudioTestCases
Test: atest VtsHalAudioCoreTargetTest
Change-Id: Ibb020d25f42df54baf46a37b50577cce294dc053
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 4525f6a..c138095 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -704,44 +704,7 @@
LOG(ERROR) << __func__ << ": Worker start error: " << mWorker->getError();
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
- if (auto flags = getContext().getFlags();
- (flags.getTag() == AudioIoFlags::Tag::input &&
- isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
- AudioInputFlags::FAST)) ||
- (flags.getTag() == AudioIoFlags::Tag::output &&
- (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
- AudioOutputFlags::FAST) ||
- isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
- AudioOutputFlags::SPATIALIZER)))) {
- // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
- // might be lacking the capability to request it, thus a failure to set is not an error.
- pid_t workerTid = mWorker->getTid();
- if (workerTid > 0) {
- constexpr int32_t kRTPriorityMin = 1; // SchedulingPolicyService.PRIORITY_MIN (Java).
- constexpr int32_t kRTPriorityMax = 3; // SchedulingPolicyService.PRIORITY_MAX (Java).
- int priorityBoost = kRTPriorityMax;
- if (flags.getTag() == AudioIoFlags::Tag::output &&
- isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
- AudioOutputFlags::SPATIALIZER)) {
- const int32_t sptPrio =
- property_get_int32("audio.spatializer.priority", kRTPriorityMin);
- if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
- priorityBoost = sptPrio;
- } else {
- LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
- return ndk::ScopedAStatus::ok();
- }
- }
- struct sched_param param = {
- .sched_priority = priorityBoost,
- };
- if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, ¶m) != 0) {
- PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
- }
- } else {
- LOG(WARNING) << __func__ << ": invalid worker tid: " << workerTid;
- }
- }
+ setWorkerThreadPriority(mWorker->getTid());
getContext().getCommandMQ()->setErrorHandler(
fmqErrorHandler<StreamContext::CommandMQ::Error>("CommandMQ"));
getContext().getReplyMQ()->setErrorHandler(
@@ -830,6 +793,42 @@
}
}
+void StreamCommonImpl::setWorkerThreadPriority(pid_t workerTid) {
+ // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
+ // might be lacking the capability to request it, thus a failure to set is not an error.
+ if (auto flags = getContext().getFlags();
+ (flags.getTag() == AudioIoFlags::Tag::input &&
+ isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
+ AudioInputFlags::FAST)) ||
+ (flags.getTag() == AudioIoFlags::Tag::output &&
+ (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
+ AudioOutputFlags::FAST) ||
+ isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
+ AudioOutputFlags::SPATIALIZER)))) {
+ constexpr int32_t kRTPriorityMin = 1; // SchedulingPolicyService.PRIORITY_MIN (Java).
+ constexpr int32_t kRTPriorityMax = 3; // SchedulingPolicyService.PRIORITY_MAX (Java).
+ int priorityBoost = kRTPriorityMax;
+ if (flags.getTag() == AudioIoFlags::Tag::output &&
+ isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
+ AudioOutputFlags::SPATIALIZER)) {
+ const int32_t sptPrio =
+ property_get_int32("audio.spatializer.priority", kRTPriorityMin);
+ if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
+ priorityBoost = sptPrio;
+ } else {
+ LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
+ return;
+ }
+ }
+ struct sched_param param = {
+ .sched_priority = priorityBoost,
+ };
+ if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, ¶m) != 0) {
+ PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
+ }
+ }
+}
+
void StreamCommonImpl::stopAndJoinWorker() {
stopWorker();
LOG(DEBUG) << __func__ << ": joining the worker thread...";
diff --git a/audio/aidl/default/alsa/StreamAlsa.cpp b/audio/aidl/default/alsa/StreamAlsa.cpp
index c77bfca..114c4c0 100644
--- a/audio/aidl/default/alsa/StreamAlsa.cpp
+++ b/audio/aidl/default/alsa/StreamAlsa.cpp
@@ -23,9 +23,12 @@
#include <Utils.h>
#include <audio_utils/clock.h>
#include <error/expected_utils.h>
+#include <media/AidlConversionCppNdk.h>
#include "core-impl/StreamAlsa.h"
+using aidl::android::hardware::audio::common::getChannelCount;
+
namespace aidl::android::hardware::audio::core {
StreamAlsa::StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries)
@@ -41,6 +44,34 @@
cleanupWorker();
}
+::android::NBAIO_Format StreamAlsa::getPipeFormat() const {
+ const audio_format_t audioFormat = VALUE_OR_FATAL(
+ aidl2legacy_AudioFormatDescription_audio_format_t(getContext().getFormat()));
+ const int channelCount = getChannelCount(getContext().getChannelLayout());
+ return ::android::Format_from_SR_C(getContext().getSampleRate(), channelCount, audioFormat);
+}
+
+::android::sp<::android::MonoPipe> StreamAlsa::makeSink(bool writeCanBlock) {
+ const ::android::NBAIO_Format format = getPipeFormat();
+ auto sink = ::android::sp<::android::MonoPipe>::make(mBufferSizeFrames, format, writeCanBlock);
+ const ::android::NBAIO_Format offers[1] = {format};
+ size_t numCounterOffers = 0;
+ ssize_t index = sink->negotiate(offers, 1, nullptr, numCounterOffers);
+ LOG_IF(FATAL, index != 0) << __func__ << ": Negotiation for the sink failed, index = " << index;
+ return sink;
+}
+
+::android::sp<::android::MonoPipeReader> StreamAlsa::makeSource(::android::MonoPipe* pipe) {
+ const ::android::NBAIO_Format format = getPipeFormat();
+ const ::android::NBAIO_Format offers[1] = {format};
+ auto source = ::android::sp<::android::MonoPipeReader>::make(pipe);
+ size_t numCounterOffers = 0;
+ ssize_t index = source->negotiate(offers, 1, nullptr, numCounterOffers);
+ LOG_IF(FATAL, index != 0) << __func__
+ << ": Negotiation for the source failed, index = " << index;
+ return source;
+}
+
::android::status_t StreamAlsa::init() {
return mConfig.has_value() ? ::android::OK : ::android::NO_INIT;
}
@@ -64,7 +95,7 @@
}
::android::status_t StreamAlsa::standby() {
- mAlsaDeviceProxies.clear();
+ teardownIo();
return ::android::OK;
}
@@ -74,6 +105,8 @@
return ::android::OK;
}
decltype(mAlsaDeviceProxies) alsaDeviceProxies;
+ decltype(mSources) sources;
+ decltype(mSinks) sinks;
for (const auto& device : getDeviceProfiles()) {
if ((device.direction == PCM_OUT && mIsInput) ||
(device.direction == PCM_IN && !mIsInput)) {
@@ -95,11 +128,29 @@
return ::android::NO_INIT;
}
alsaDeviceProxies.push_back(std::move(proxy));
+ auto sink = makeSink(mIsInput); // Do not block the writer when it is on our thread.
+ if (sink != nullptr) {
+ sinks.push_back(sink);
+ } else {
+ return ::android::NO_INIT;
+ }
+ if (auto source = makeSource(sink.get()); source != nullptr) {
+ sources.push_back(source);
+ } else {
+ return ::android::NO_INIT;
+ }
}
if (alsaDeviceProxies.empty()) {
return ::android::NO_INIT;
}
mAlsaDeviceProxies = std::move(alsaDeviceProxies);
+ mSources = std::move(sources);
+ mSinks = std::move(sinks);
+ mIoThreadIsRunning = true;
+ for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
+ mIoThreads.emplace_back(mIsInput ? &StreamAlsa::inputIoThread : &StreamAlsa::outputIoThread,
+ this, i);
+ }
return ::android::OK;
}
@@ -112,15 +163,30 @@
const size_t bytesToTransfer = frameCount * mFrameSizeBytes;
unsigned maxLatency = 0;
if (mIsInput) {
- // For input case, only support single device.
- proxy_read_with_retries(mAlsaDeviceProxies[0].get(), buffer, bytesToTransfer,
- mReadWriteRetries);
- maxLatency = proxy_get_latency(mAlsaDeviceProxies[0].get());
+ const size_t i = 0; // For the input case, only support a single device.
+ LOG(VERBOSE) << __func__ << ": reading from sink " << i;
+ ssize_t framesRead = mSources[i]->read(buffer, frameCount);
+ LOG_IF(FATAL, framesRead < 0) << "Error reading from the pipe: " << framesRead;
+ if (ssize_t framesMissing = static_cast<ssize_t>(frameCount) - framesRead;
+ framesMissing > 0) {
+ LOG(WARNING) << __func__ << ": incomplete data received, inserting " << framesMissing
+ << " frames of silence";
+ memset(static_cast<char*>(buffer) + framesRead * mFrameSizeBytes, 0,
+ framesMissing * mFrameSizeBytes);
+ }
+ maxLatency = proxy_get_latency(mAlsaDeviceProxies[i].get());
} else {
alsa::applyGain(buffer, mGain, bytesToTransfer, mConfig.value().format, mConfig->channels);
- for (auto& proxy : mAlsaDeviceProxies) {
- proxy_write_with_retries(proxy.get(), buffer, bytesToTransfer, mReadWriteRetries);
- maxLatency = std::max(maxLatency, proxy_get_latency(proxy.get()));
+ for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
+ LOG(VERBOSE) << __func__ << ": writing into sink " << i;
+ ssize_t framesWritten = mSinks[i]->write(buffer, frameCount);
+ LOG_IF(FATAL, framesWritten < 0) << "Error writing into the pipe: " << framesWritten;
+ if (ssize_t framesLost = static_cast<ssize_t>(frameCount) - framesWritten;
+ framesLost > 0) {
+ LOG(WARNING) << __func__ << ": sink " << i << " incomplete data sent, dropping "
+ << framesLost << " frames";
+ }
+ maxLatency = std::max(maxLatency, proxy_get_latency(mAlsaDeviceProxies[i].get()));
}
}
*actualFrameCount = frameCount;
@@ -164,7 +230,7 @@
}
void StreamAlsa::shutdown() {
- mAlsaDeviceProxies.clear();
+ teardownIo();
}
ndk::ScopedAStatus StreamAlsa::setGain(float gain) {
@@ -172,4 +238,80 @@
return ndk::ScopedAStatus::ok();
}
+void StreamAlsa::inputIoThread(size_t idx) {
+#if defined(__ANDROID__)
+ setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
+ const std::string threadName = (std::string("in_") + std::to_string(idx)).substr(0, 15);
+ pthread_setname_np(pthread_self(), threadName.c_str());
+#endif
+ const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
+ std::vector<char> buffer(bufferSize);
+ while (mIoThreadIsRunning) {
+ if (int ret = proxy_read_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0], bufferSize,
+ mReadWriteRetries);
+ ret == 0) {
+ size_t bufferFramesWritten = 0;
+ while (bufferFramesWritten < mBufferSizeFrames) {
+ if (!mIoThreadIsRunning) return;
+ ssize_t framesWrittenOrError =
+ mSinks[idx]->write(&buffer[0], mBufferSizeFrames - bufferFramesWritten);
+ if (framesWrittenOrError >= 0) {
+ bufferFramesWritten += framesWrittenOrError;
+ } else {
+ LOG(WARNING) << __func__ << "[" << idx
+ << "]: Error while writing into the pipe: "
+ << framesWrittenOrError;
+ }
+ }
+ } else {
+ // Errors when the stream is being stopped are expected.
+ LOG_IF(WARNING, mIoThreadIsRunning)
+ << __func__ << "[" << idx << "]: Error reading from ALSA: " << ret;
+ }
+ }
+}
+
+void StreamAlsa::outputIoThread(size_t idx) {
+#if defined(__ANDROID__)
+ setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
+ const std::string threadName = (std::string("out_") + std::to_string(idx)).substr(0, 15);
+ pthread_setname_np(pthread_self(), threadName.c_str());
+#endif
+ const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
+ std::vector<char> buffer(bufferSize);
+ while (mIoThreadIsRunning) {
+ ssize_t framesRead = mSources[idx]->read(&buffer[0], mBufferSizeFrames);
+ if (framesRead > 0) {
+ int ret = proxy_write_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0],
+ framesRead * mFrameSizeBytes, mReadWriteRetries);
+ // Errors when the stream is being stopped are expected.
+ LOG_IF(WARNING, ret != 0 && mIoThreadIsRunning)
+ << __func__ << "[" << idx << "]: Error writing into ALSA: " << ret;
+ }
+ }
+}
+
+void StreamAlsa::teardownIo() {
+ mIoThreadIsRunning = false;
+ if (mIsInput) {
+ LOG(DEBUG) << __func__ << ": shutting down pipes";
+ for (auto& sink : mSinks) {
+ sink->shutdown(true);
+ }
+ }
+ LOG(DEBUG) << __func__ << ": stopping PCM streams";
+ for (const auto& proxy : mAlsaDeviceProxies) {
+ proxy_stop(proxy.get());
+ }
+ LOG(DEBUG) << __func__ << ": joining threads";
+ for (auto& thread : mIoThreads) {
+ if (thread.joinable()) thread.join();
+ }
+ mIoThreads.clear();
+ LOG(DEBUG) << __func__ << ": closing PCM devices";
+ mAlsaDeviceProxies.clear();
+ mSources.clear();
+ mSinks.clear();
+}
+
} // namespace aidl::android::hardware::audio::core
diff --git a/audio/aidl/default/alsa/Utils.h b/audio/aidl/default/alsa/Utils.h
index a97ea10..53dcfd0 100644
--- a/audio/aidl/default/alsa/Utils.h
+++ b/audio/aidl/default/alsa/Utils.h
@@ -48,8 +48,8 @@
public:
DeviceProxy(); // Constructs a "null" proxy.
explicit DeviceProxy(const DeviceProfile& deviceProfile);
- alsa_device_profile* getProfile() { return mProfile.get(); }
- alsa_device_proxy* get() { return mProxy.get(); }
+ alsa_device_profile* getProfile() const { return mProfile.get(); }
+ alsa_device_proxy* get() const { return mProxy.get(); }
private:
static void alsaProxyDeleter(alsa_device_proxy* proxy);
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 8297fc5..304f9b7 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -475,6 +475,7 @@
// the destructor in order to stop and join the worker thread in the case when the client
// has not called 'IStreamCommon::close' method.
void cleanupWorker();
+ void setWorkerThreadPriority(pid_t workerTid);
void stopAndJoinWorker();
void stopWorker();
diff --git a/audio/aidl/default/include/core-impl/StreamAlsa.h b/audio/aidl/default/include/core-impl/StreamAlsa.h
index 8bdf208..7e0f0ac 100644
--- a/audio/aidl/default/include/core-impl/StreamAlsa.h
+++ b/audio/aidl/default/include/core-impl/StreamAlsa.h
@@ -16,9 +16,14 @@
#pragma once
+#include <atomic>
#include <optional>
+#include <thread>
#include <vector>
+#include <media/nbaio/MonoPipe.h>
+#include <media/nbaio/MonoPipeReader.h>
+
#include "Stream.h"
#include "alsa/Utils.h"
@@ -57,11 +62,24 @@
const bool mIsInput;
const std::optional<struct pcm_config> mConfig;
const int mReadWriteRetries;
- // All fields below are only used on the worker thread.
- std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;
private:
+ ::android::NBAIO_Format getPipeFormat() const;
+ ::android::sp<::android::MonoPipe> makeSink(bool writeCanBlock);
+ ::android::sp<::android::MonoPipeReader> makeSource(::android::MonoPipe* pipe);
+ void inputIoThread(size_t idx);
+ void outputIoThread(size_t idx);
+ void teardownIo();
+
std::atomic<float> mGain = 1.0;
+
+ // All fields below are only used on the worker thread.
+ std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;
+ // Only 'libnbaio_mono' is vendor-accessible, thus no access to the multi-reader Pipe.
+ std::vector<::android::sp<::android::MonoPipe>> mSinks;
+ std::vector<::android::sp<::android::MonoPipeReader>> mSources;
+ std::vector<std::thread> mIoThreads;
+ std::atomic<bool> mIoThreadIsRunning = false; // used by all threads
};
} // namespace aidl::android::hardware::audio::core