audio: Implementation of audio I/O, part II
This patch implements audio I/O for the synchronous, non-MMAP
case.
Updated the StreamDescriptor structure to make it usable.
Clarified comments on the expectations for the client and
the HAL module.
Bug: 205884982
Test: atest VtsHalAudioCoreTargetTest
Merged-In: I09651c6e80a397c80870622ac19234b4d4a38cbb
Change-Id: I09651c6e80a397c80870622ac19234b4d4a38cbb
(cherry picked from commit 01803d454ac192f4b6b732944f0be324b1b03a7f)
diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp
index 07b1097..03f8c64 100644
--- a/audio/aidl/default/Android.bp
+++ b/audio/aidl/default/Android.bp
@@ -7,19 +7,27 @@
default_applicable_licenses: ["hardware_interfaces_license"],
}
-cc_library_static {
- name: "libaudioserviceexampleimpl",
+cc_defaults {
+ name: "aidlaudioservice_defaults",
vendor: true,
shared_libs: [
"libaudioaidlcommon",
"libbase",
"libbinder_ndk",
+ "libcutils",
+ "libfmq",
"libstagefright_foundation",
+ "libutils",
"android.media.audio.common.types-V1-ndk",
"android.hardware.audio.core-V1-ndk",
"android.hardware.common-V2-ndk",
"android.hardware.common.fmq-V1-ndk",
],
+}
+
+cc_library_static {
+ name: "libaudioserviceexampleimpl",
+ defaults: ["aidlaudioservice_defaults"],
export_include_dirs: ["include"],
srcs: [
"Config.cpp",
@@ -37,16 +45,7 @@
relative_install_path: "hw",
init_rc: ["android.hardware.audio.service-aidl.example.rc"],
vintf_fragments: ["android.hardware.audio.service-aidl.xml"],
- vendor: true,
- shared_libs: [
- "libbase",
- "libbinder_ndk",
- "libstagefright_foundation",
- "android.media.audio.common.types-V1-ndk",
- "android.hardware.audio.core-V1-ndk",
- "android.hardware.common-V2-ndk",
- "android.hardware.common.fmq-V1-ndk",
- ],
+ defaults: ["aidlaudioservice_defaults"],
static_libs: [
"libaudioserviceexampleimpl",
],
diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp
index 1c6f90a..af033d0 100644
--- a/audio/aidl/default/Module.cpp
+++ b/audio/aidl/default/Module.cpp
@@ -20,6 +20,8 @@
#define LOG_TAG "AHAL_Module"
#include <android-base/logging.h>
+#include <Utils.h>
+#include <aidl/android/media/audio/common/AudioInputFlags.h>
#include <aidl/android/media/audio/common/AudioOutputFlags.h>
#include "core-impl/Module.h"
@@ -30,6 +32,7 @@
using aidl::android::media::audio::common::AudioChannelLayout;
using aidl::android::media::audio::common::AudioFormatDescription;
using aidl::android::media::audio::common::AudioFormatType;
+using aidl::android::media::audio::common::AudioInputFlags;
using aidl::android::media::audio::common::AudioIoFlags;
using aidl::android::media::audio::common::AudioOffloadInfo;
using aidl::android::media::audio::common::AudioOutputFlags;
@@ -39,6 +42,7 @@
using aidl::android::media::audio::common::AudioProfile;
using aidl::android::media::audio::common::Int;
using aidl::android::media::audio::common::PcmType;
+using android::hardware::audio::common::getFrameSizeInBytes;
namespace aidl::android::hardware::audio::core {
@@ -72,49 +76,6 @@
return true;
}
-constexpr size_t getPcmSampleSizeInBytes(PcmType pcm) {
- switch (pcm) {
- case PcmType::UINT_8_BIT:
- return 1;
- case PcmType::INT_16_BIT:
- return 2;
- case PcmType::INT_32_BIT:
- return 4;
- case PcmType::FIXED_Q_8_24:
- return 4;
- case PcmType::FLOAT_32_BIT:
- return 4;
- case PcmType::INT_24_BIT:
- return 3;
- }
- return 0;
-}
-
-constexpr size_t getChannelCount(const AudioChannelLayout& layout) {
- using Tag = AudioChannelLayout::Tag;
- switch (layout.getTag()) {
- case Tag::none:
- return 0;
- case Tag::invalid:
- return 0;
- case Tag::indexMask:
- return __builtin_popcount(layout.get<Tag::indexMask>());
- case Tag::layoutMask:
- return __builtin_popcount(layout.get<Tag::layoutMask>());
- case Tag::voiceMask:
- return __builtin_popcount(layout.get<Tag::voiceMask>());
- }
- return 0;
-}
-
-size_t getFrameSizeInBytes(const AudioFormatDescription& format, const AudioChannelLayout& layout) {
- if (format.type == AudioFormatType::PCM) {
- return getPcmSampleSizeInBytes(format.pcm) * getChannelCount(layout);
- }
- // For non-PCM formats always use frame size of 1.
- return 1;
-}
-
bool findAudioProfile(const AudioPort& port, const AudioFormatDescription& format,
AudioProfile* profile) {
if (auto profilesIt =
@@ -133,33 +94,8 @@
erase_all_values(mPatches, std::set<int32_t>{patchId});
}
-void Module::cleanUpPatches(int32_t portConfigId) {
- auto& patches = getConfig().patches;
- if (patches.size() == 0) return;
- auto range = mPatches.equal_range(portConfigId);
- for (auto it = range.first; it != range.second; ++it) {
- auto patchIt = findById<AudioPatch>(patches, it->second);
- if (patchIt != patches.end()) {
- erase_if(patchIt->sourcePortConfigIds,
- [portConfigId](auto e) { return e == portConfigId; });
- erase_if(patchIt->sinkPortConfigIds,
- [portConfigId](auto e) { return e == portConfigId; });
- }
- }
- std::set<int32_t> erasedPatches;
- for (size_t i = patches.size() - 1; i != 0; --i) {
- const auto& patch = patches[i];
- if (patch.sourcePortConfigIds.empty() || patch.sinkPortConfigIds.empty()) {
- erasedPatches.insert(patch.id);
- patches.erase(patches.begin() + i);
- }
- }
- erase_all_values(mPatches, erasedPatches);
-}
-
-ndk::ScopedAStatus Module::createStreamDescriptor(int32_t in_portConfigId,
- int64_t in_bufferSizeFrames,
- StreamDescriptor* out_descriptor) {
+ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t in_bufferSizeFrames,
+ StreamContext* out_context) {
if (in_bufferSizeFrames <= 0) {
LOG(ERROR) << __func__ << ": non-positive buffer size " << in_bufferSizeFrames;
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
@@ -171,7 +107,7 @@
}
auto& configs = getConfig().portConfigs;
auto portConfigIt = findById<AudioPortConfig>(configs, in_portConfigId);
- // Since 'createStreamDescriptor' is an internal method, it is assumed that
+ // Since this is a private method, it is assumed that
// validity of the portConfigId has already been checked.
const size_t frameSize =
getFrameSizeInBytes(portConfigIt->format.value(), portConfigIt->channelMask.value());
@@ -187,7 +123,26 @@
<< kMaximumStreamBufferSizeBytes / frameSize;
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
- (void)out_descriptor;
+ const auto& flags = portConfigIt->flags.value();
+ if ((flags.getTag() == AudioIoFlags::Tag::input &&
+ (flags.get<AudioIoFlags::Tag::input>() &
+ 1 << static_cast<int32_t>(AudioInputFlags::MMAP_NOIRQ)) == 0) ||
+ (flags.getTag() == AudioIoFlags::Tag::output &&
+ (flags.get<AudioIoFlags::Tag::output>() &
+ 1 << static_cast<int32_t>(AudioOutputFlags::MMAP_NOIRQ)) == 0)) {
+ StreamContext temp(
+ std::make_unique<StreamContext::CommandMQ>(1, true /*configureEventFlagWord*/),
+ std::make_unique<StreamContext::ReplyMQ>(1, true /*configureEventFlagWord*/),
+ frameSize,
+ std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames));
+ if (temp.isValid()) {
+ *out_context = std::move(temp);
+ } else {
+ return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
+ }
+ } else {
+ // TODO: Implement simulation of MMAP buffer allocation
+ }
return ndk::ScopedAStatus::ok();
}
@@ -253,6 +208,28 @@
do_insert(patch.sinkPortConfigIds);
}
+void Module::updateStreamsConnectedState(const AudioPatch& oldPatch, const AudioPatch& newPatch) {
+ // Streams from the old patch need to be disconnected, streams from the new
+ // patch need to be connected. If the stream belongs to both patches, no need
+ // to update it.
+ std::set<int32_t> idsToDisconnect, idsToConnect;
+ idsToDisconnect.insert(oldPatch.sourcePortConfigIds.begin(),
+ oldPatch.sourcePortConfigIds.end());
+ idsToDisconnect.insert(oldPatch.sinkPortConfigIds.begin(), oldPatch.sinkPortConfigIds.end());
+ idsToConnect.insert(newPatch.sourcePortConfigIds.begin(), newPatch.sourcePortConfigIds.end());
+ idsToConnect.insert(newPatch.sinkPortConfigIds.begin(), newPatch.sinkPortConfigIds.end());
+ std::for_each(idsToDisconnect.begin(), idsToDisconnect.end(), [&](const auto& portConfigId) {
+ if (idsToConnect.count(portConfigId) == 0) {
+ mStreams.setStreamIsConnected(portConfigId, false);
+ }
+ });
+ std::for_each(idsToConnect.begin(), idsToConnect.end(), [&](const auto& portConfigId) {
+ if (idsToDisconnect.count(portConfigId) == 0) {
+ mStreams.setStreamIsConnected(portConfigId, true);
+ }
+ });
+}
+
ndk::ScopedAStatus Module::setModuleDebug(
const ::aidl::android::hardware::audio::core::ModuleDebug& in_debug) {
LOG(DEBUG) << __func__ << ": old flags:" << mDebug.toString()
@@ -467,13 +444,22 @@
<< " does not correspond to an input mix port";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
- if (auto status = createStreamDescriptor(in_args.portConfigId, in_args.bufferSizeFrames,
- &_aidl_return->desc);
+ StreamContext context;
+ if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
!status.isOk()) {
return status;
}
- auto stream = ndk::SharedRefBase::make<StreamIn>(in_args.sinkMetadata);
- mStreams.insert(port->id, in_args.portConfigId, StreamWrapper(stream));
+ context.fillDescriptor(&_aidl_return->desc);
+ auto stream = ndk::SharedRefBase::make<StreamIn>(in_args.sinkMetadata, std::move(context));
+ if (auto status = stream->init(); !status.isOk()) {
+ return status;
+ }
+ StreamWrapper streamWrapper(stream);
+ auto patchIt = mPatches.find(in_args.portConfigId);
+ if (patchIt != mPatches.end()) {
+ streamWrapper.setStreamIsConnected(true);
+ }
+ mStreams.insert(port->id, in_args.portConfigId, std::move(streamWrapper));
_aidl_return->stream = std::move(stream);
return ndk::ScopedAStatus::ok();
}
@@ -499,13 +485,23 @@
<< " has COMPRESS_OFFLOAD flag set, requires offload info";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
- if (auto status = createStreamDescriptor(in_args.portConfigId, in_args.bufferSizeFrames,
- &_aidl_return->desc);
+ StreamContext context;
+ if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
!status.isOk()) {
return status;
}
- auto stream = ndk::SharedRefBase::make<StreamOut>(in_args.sourceMetadata, in_args.offloadInfo);
- mStreams.insert(port->id, in_args.portConfigId, StreamWrapper(stream));
+ context.fillDescriptor(&_aidl_return->desc);
+ auto stream = ndk::SharedRefBase::make<StreamOut>(in_args.sourceMetadata, std::move(context),
+ in_args.offloadInfo);
+ if (auto status = stream->init(); !status.isOk()) {
+ return status;
+ }
+ StreamWrapper streamWrapper(stream);
+ auto patchIt = mPatches.find(in_args.portConfigId);
+ if (patchIt != mPatches.end()) {
+ streamWrapper.setStreamIsConnected(true);
+ }
+ mStreams.insert(port->id, in_args.portConfigId, std::move(streamWrapper));
_aidl_return->stream = std::move(stream);
return ndk::ScopedAStatus::ok();
}
@@ -595,15 +591,20 @@
_aidl_return->latenciesMs.clear();
_aidl_return->latenciesMs.insert(_aidl_return->latenciesMs.end(),
_aidl_return->sinkPortConfigIds.size(), kLatencyMs);
+ AudioPatch oldPatch{};
if (existing == patches.end()) {
_aidl_return->id = getConfig().nextPatchId++;
patches.push_back(*_aidl_return);
existing = patches.begin() + (patches.size() - 1);
} else {
+ oldPatch = *existing;
*existing = *_aidl_return;
}
registerPatch(*existing);
- LOG(DEBUG) << __func__ << ": created or updated patch id " << _aidl_return->id;
+ updateStreamsConnectedState(oldPatch, *_aidl_return);
+
+ LOG(DEBUG) << __func__ << ": " << (oldPatch.id == 0 ? "created" : "updated") << " patch "
+ << _aidl_return->toString();
return ndk::ScopedAStatus::ok();
}
@@ -738,6 +739,7 @@
auto patchIt = findById<AudioPatch>(patches, in_patchId);
if (patchIt != patches.end()) {
cleanUpPatch(patchIt->id);
+ updateStreamsConnectedState(*patchIt, AudioPatch{});
patches.erase(patchIt);
LOG(DEBUG) << __func__ << ": erased patch " << in_patchId;
return ndk::ScopedAStatus::ok();
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index ab3e451..24e46db 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -16,7 +16,9 @@
#define LOG_TAG "AHAL_Stream"
#include <android-base/logging.h>
+#include <utils/SystemClock.h>
+#include "core-impl/Module.h"
#include "core-impl/Stream.h"
using aidl::android::hardware::audio::common::SinkMetadata;
@@ -25,13 +27,198 @@
namespace aidl::android::hardware::audio::core {
-StreamIn::StreamIn(const SinkMetadata& sinkMetadata) : mMetadata(sinkMetadata) {
- LOG(DEBUG) << __func__;
+void StreamContext::fillDescriptor(StreamDescriptor* desc) {
+ if (mCommandMQ) {
+ desc->command = mCommandMQ->dupeDesc();
+ }
+ if (mReplyMQ) {
+ desc->reply = mReplyMQ->dupeDesc();
+ }
+ if (mDataMQ) {
+ desc->bufferSizeFrames =
+ mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize() / mFrameSize;
+ desc->audio.set<StreamDescriptor::AudioBuffer::Tag::fmq>(mDataMQ->dupeDesc());
+ }
}
-ndk::ScopedAStatus StreamIn::close() {
+bool StreamContext::isValid() const {
+ if (mCommandMQ && !mCommandMQ->isValid()) {
+ LOG(ERROR) << "command FMQ is invalid";
+ return false;
+ }
+ if (mReplyMQ && !mReplyMQ->isValid()) {
+ LOG(ERROR) << "reply FMQ is invalid";
+ return false;
+ }
+ if (mFrameSize == 0) {
+ LOG(ERROR) << "frame size is not set";
+ return false;
+ }
+ if (mDataMQ && !mDataMQ->isValid()) {
+ LOG(ERROR) << "data FMQ is invalid";
+ return false;
+ }
+ return true;
+}
+
+void StreamContext::reset() {
+ mCommandMQ.reset();
+ mReplyMQ.reset();
+ mDataMQ.reset();
+}
+
+std::string StreamWorkerCommonLogic::init() {
+ if (mCommandMQ == nullptr) return "Command MQ is null";
+ if (mReplyMQ == nullptr) return "Reply MQ is null";
+ if (mDataMQ == nullptr) return "Data MQ is null";
+ if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
+ return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
+ }
+ mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
+ mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
+ if (mDataBuffer == nullptr) {
+ return "Failed to allocate data buffer for element count " +
+ std::to_string(mDataMQ->getQuantumCount()) +
+ ", size in bytes: " + std::to_string(mDataBufferSize);
+ }
+ return "";
+}
+
+const std::string StreamInWorkerLogic::kThreadName = "reader";
+
+StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
+ StreamDescriptor::Command command{};
+ if (!mCommandMQ->readBlocking(&command, 1)) {
+ LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+ return Status::ABORT;
+ }
+ StreamDescriptor::Reply reply{};
+ if (command.code == StreamContext::COMMAND_EXIT &&
+ command.fmqByteCount == mInternalCommandCookie) {
+ LOG(DEBUG) << __func__ << ": received EXIT command";
+ // This is an internal command, no need to reply.
+ return Status::EXIT;
+ } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+ LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount
+ << " bytes";
+ usleep(3000); // Simulate a blocking call into the driver.
+ const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
+ mDataMQ->availableToWrite(), mDataBufferSize});
+ const bool isConnected = mIsConnected;
+ // Simulate reading of data, or provide zeroes if the stream is not connected.
+ for (size_t i = 0; i < byteCount; ++i) {
+ using buffer_type = decltype(mDataBuffer)::element_type;
+ constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
+ std::numeric_limits<buffer_type>::min() + 1;
+ mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
+ std::numeric_limits<buffer_type>::min()
+ : 0;
+ }
+ bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true;
+ if (success) {
+ LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+ << " succeeded; connected? " << isConnected;
+ // Frames are provided and counted regardless of connection status.
+ reply.fmqByteCount = byteCount;
+ mFrameCount += byteCount / mFrameSize;
+ if (isConnected) {
+ reply.status = STATUS_OK;
+ reply.observable.frames = mFrameCount;
+ reply.observable.timeNs = ::android::elapsedRealtimeNano();
+ } else {
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else {
+ LOG(WARNING) << __func__ << ": writing of " << byteCount
+ << " bytes of data to MQ failed";
+ reply.status = STATUS_NOT_ENOUGH_DATA;
+ }
+ reply.latencyMs = Module::kLatencyMs;
+ } else {
+ LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
+ << ") or count: " << command.fmqByteCount;
+ reply.status = STATUS_BAD_VALUE;
+ }
+ LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
+ if (!mReplyMQ->writeBlocking(&reply, 1)) {
+ LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+ return Status::ABORT;
+ }
+ return Status::CONTINUE;
+}
+
+const std::string StreamOutWorkerLogic::kThreadName = "writer";
+
+StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
+ StreamDescriptor::Command command{};
+ if (!mCommandMQ->readBlocking(&command, 1)) {
+ LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+ return Status::ABORT;
+ }
+ StreamDescriptor::Reply reply{};
+ if (command.code == StreamContext::COMMAND_EXIT &&
+ command.fmqByteCount == mInternalCommandCookie) {
+ LOG(DEBUG) << __func__ << ": received EXIT command";
+ // This is an internal command, no need to reply.
+ return Status::EXIT;
+ } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+ LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount
+ << " bytes";
+ const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
+ mDataMQ->availableToRead(), mDataBufferSize});
+ bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true;
+ if (success) {
+ const bool isConnected = mIsConnected;
+ LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ"
+ << " succeeded; connected? " << isConnected;
+ // Frames are consumed and counted regardless of connection status.
+ reply.fmqByteCount = byteCount;
+ mFrameCount += byteCount / mFrameSize;
+ if (isConnected) {
+ reply.status = STATUS_OK;
+ reply.observable.frames = mFrameCount;
+ reply.observable.timeNs = ::android::elapsedRealtimeNano();
+ } else {
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ usleep(3000); // Simulate a blocking call into the driver.
+ } else {
+ LOG(WARNING) << __func__ << ": reading of " << byteCount
+ << " bytes of data from MQ failed";
+ reply.status = STATUS_NOT_ENOUGH_DATA;
+ }
+ reply.latencyMs = Module::kLatencyMs;
+ } else {
+ LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
+ << ") or count: " << command.fmqByteCount;
+ reply.status = STATUS_BAD_VALUE;
+ }
+ LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
+ if (!mReplyMQ->writeBlocking(&reply, 1)) {
+ LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+ return Status::ABORT;
+ }
+ return Status::CONTINUE;
+}
+
+template <class Metadata, class StreamWorker>
+StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
+ if (!mIsClosed) {
+ LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
+ stopWorker();
+ // The worker and the context should clean up by themselves via destructors.
+ }
+}
+
+template <class Metadata, class StreamWorker>
+ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
LOG(DEBUG) << __func__;
if (!mIsClosed) {
+ stopWorker();
+ LOG(DEBUG) << __func__ << ": joining the worker thread...";
+ mWorker.stop();
+ LOG(DEBUG) << __func__ << ": worker thread joined";
+ mContext.reset();
mIsClosed = true;
return ndk::ScopedAStatus::ok();
} else {
@@ -40,40 +227,44 @@
}
}
-ndk::ScopedAStatus StreamIn::updateMetadata(const SinkMetadata& in_sinkMetadata) {
+template <class Metadata, class StreamWorker>
+void StreamCommon<Metadata, StreamWorker>::stopWorker() {
+ if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
+ LOG(DEBUG) << __func__ << ": asking the worker to stop...";
+ StreamDescriptor::Command cmd;
+ cmd.code = StreamContext::COMMAND_EXIT;
+ cmd.fmqByteCount = mContext.getInternalCommandCookie();
+ // FIXME: This can block in the case when the client wrote a command
+ // while the stream worker's cycle is not running. Need to revisit
+ // when implementing standby and pause/resume.
+ if (!commandMQ->writeBlocking(&cmd, 1)) {
+ LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
+ }
+ LOG(DEBUG) << __func__ << ": done";
+ }
+}
+
+template <class Metadata, class StreamWorker>
+ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
LOG(DEBUG) << __func__;
if (!mIsClosed) {
- mMetadata = in_sinkMetadata;
+ mMetadata = metadata;
return ndk::ScopedAStatus::ok();
}
LOG(ERROR) << __func__ << ": stream was closed";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
-StreamOut::StreamOut(const SourceMetadata& sourceMetadata,
+StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext context)
+ : StreamCommon<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)) {
+ LOG(DEBUG) << __func__;
+}
+
+StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext context,
const std::optional<AudioOffloadInfo>& offloadInfo)
- : mMetadata(sourceMetadata), mOffloadInfo(offloadInfo) {
+ : StreamCommon<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
+ mOffloadInfo(offloadInfo) {
LOG(DEBUG) << __func__;
}
-ndk::ScopedAStatus StreamOut::close() {
- LOG(DEBUG) << __func__;
- if (!mIsClosed) {
- mIsClosed = true;
- return ndk::ScopedAStatus::ok();
- }
- LOG(ERROR) << __func__ << ": stream was already closed";
- return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
-}
-
-ndk::ScopedAStatus StreamOut::updateMetadata(const SourceMetadata& in_sourceMetadata) {
- LOG(DEBUG) << __func__;
- if (!mIsClosed) {
- mMetadata = in_sourceMetadata;
- return ndk::ScopedAStatus::ok();
- }
- LOG(ERROR) << __func__ << ": stream was closed";
- return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
-}
-
} // namespace aidl::android::hardware::audio::core
diff --git a/audio/aidl/default/include/core-impl/Module.h b/audio/aidl/default/include/core-impl/Module.h
index f7e14af..61516b2 100644
--- a/audio/aidl/default/include/core-impl/Module.h
+++ b/audio/aidl/default/include/core-impl/Module.h
@@ -28,6 +28,11 @@
namespace aidl::android::hardware::audio::core {
class Module : public BnModule {
+ public:
+ // This value is used for all AudioPatches and reported by all streams.
+ static constexpr int32_t kLatencyMs = 10;
+
+ private:
ndk::ScopedAStatus setModuleDebug(
const ::aidl::android::hardware::audio::core::ModuleDebug& in_debug) override;
ndk::ScopedAStatus connectExternalDevice(
@@ -66,21 +71,18 @@
ndk::ScopedAStatus resetAudioPatch(int32_t in_patchId) override;
ndk::ScopedAStatus resetAudioPortConfig(int32_t in_portConfigId) override;
- private:
void cleanUpPatch(int32_t patchId);
- void cleanUpPatches(int32_t portConfigId);
- ndk::ScopedAStatus createStreamDescriptor(
+ ndk::ScopedAStatus createStreamContext(
int32_t in_portConfigId, int64_t in_bufferSizeFrames,
- ::aidl::android::hardware::audio::core::StreamDescriptor* out_descriptor);
+ ::aidl::android::hardware::audio::core::StreamContext* out_context);
ndk::ScopedAStatus findPortIdForNewStream(
int32_t in_portConfigId, ::aidl::android::media::audio::common::AudioPort** port);
internal::Configuration& getConfig();
void registerPatch(const AudioPatch& patch);
+ void updateStreamsConnectedState(const AudioPatch& oldPatch, const AudioPatch& newPatch);
// This value is used for all AudioPatches.
static constexpr int32_t kMinimumStreamBufferSizeFrames = 16;
- // This value is used for all AudioPatches.
- static constexpr int32_t kLatencyMs = 10;
// The maximum stream buffer size is 1 GiB = 2 ** 30 bytes;
static constexpr int32_t kMaximumStreamBufferSizeBytes = 1 << 30;
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 87104dd..816cdb1 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -16,50 +16,203 @@
#pragma once
+#include <atomic>
+#include <cstdlib>
#include <map>
+#include <memory>
#include <optional>
#include <variant>
+#include <StreamWorker.h>
#include <aidl/android/hardware/audio/common/SinkMetadata.h>
#include <aidl/android/hardware/audio/common/SourceMetadata.h>
#include <aidl/android/hardware/audio/core/BnStreamIn.h>
#include <aidl/android/hardware/audio/core/BnStreamOut.h>
+#include <aidl/android/hardware/audio/core/StreamDescriptor.h>
#include <aidl/android/media/audio/common/AudioOffloadInfo.h>
+#include <fmq/AidlMessageQueue.h>
+#include <system/thread_defs.h>
#include "core-impl/utils.h"
namespace aidl::android::hardware::audio::core {
-class StreamIn : public BnStreamIn {
- ndk::ScopedAStatus close() override;
- ndk::ScopedAStatus updateMetadata(
- const ::aidl::android::hardware::audio::common::SinkMetadata& in_sinkMetadata) override;
-
+// This class is similar to StreamDescriptor, but unlike
+// the descriptor, it actually owns the objects implementing
+// data exchange: FMQs etc, whereas StreamDescriptor only
+// contains their descriptors.
+class StreamContext {
public:
- explicit StreamIn(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata);
- bool isClosed() const { return mIsClosed; }
+ typedef ::android::AidlMessageQueue<
+ StreamDescriptor::Command,
+ ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+ CommandMQ;
+ typedef ::android::AidlMessageQueue<
+ StreamDescriptor::Reply, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+ ReplyMQ;
+ typedef ::android::AidlMessageQueue<
+ int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+ DataMQ;
+
+ // Ensure that this value is not used by any of StreamDescriptor.COMMAND_*
+ static constexpr int COMMAND_EXIT = -1;
+
+ StreamContext() = default;
+ StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
+ size_t frameSize, std::unique_ptr<DataMQ> dataMQ)
+ : mCommandMQ(std::move(commandMQ)),
+ mInternalCommandCookie(std::rand()),
+ mReplyMQ(std::move(replyMQ)),
+ mFrameSize(frameSize),
+ mDataMQ(std::move(dataMQ)) {}
+ StreamContext(StreamContext&& other)
+ : mCommandMQ(std::move(other.mCommandMQ)),
+ mInternalCommandCookie(other.mInternalCommandCookie),
+ mReplyMQ(std::move(other.mReplyMQ)),
+ mFrameSize(other.mFrameSize),
+ mDataMQ(std::move(other.mDataMQ)) {}
+ StreamContext& operator=(StreamContext&& other) {
+ mCommandMQ = std::move(other.mCommandMQ);
+ mInternalCommandCookie = other.mInternalCommandCookie;
+ mReplyMQ = std::move(other.mReplyMQ);
+ mFrameSize = other.mFrameSize;
+ mDataMQ = std::move(other.mDataMQ);
+ return *this;
+ }
+
+ void fillDescriptor(StreamDescriptor* desc);
+ CommandMQ* getCommandMQ() const { return mCommandMQ.get(); }
+ DataMQ* getDataMQ() const { return mDataMQ.get(); }
+ size_t getFrameSize() const { return mFrameSize; }
+ int getInternalCommandCookie() const { return mInternalCommandCookie; }
+ ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
+ bool isValid() const;
+ void reset();
private:
- ::aidl::android::hardware::audio::common::SinkMetadata mMetadata;
- bool mIsClosed = false;
+ std::unique_ptr<CommandMQ> mCommandMQ;
+ int mInternalCommandCookie; // The value used to confirm that the command was posted internally
+ std::unique_ptr<ReplyMQ> mReplyMQ;
+ size_t mFrameSize;
+ std::unique_ptr<DataMQ> mDataMQ;
};
-class StreamOut : public BnStreamOut {
- ndk::ScopedAStatus close() override;
+class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
+ public:
+ void setIsConnected(bool connected) { mIsConnected = connected; }
+
+ protected:
+ explicit StreamWorkerCommonLogic(const StreamContext& context)
+ : mInternalCommandCookie(context.getInternalCommandCookie()),
+ mFrameSize(context.getFrameSize()),
+ mCommandMQ(context.getCommandMQ()),
+ mReplyMQ(context.getReplyMQ()),
+ mDataMQ(context.getDataMQ()) {}
+ std::string init() override;
+
+ // Used both by the main and worker threads.
+ std::atomic<bool> mIsConnected = false;
+ // All fields are used on the worker thread only.
+ const int mInternalCommandCookie;
+ const size_t mFrameSize;
+ StreamContext::CommandMQ* mCommandMQ;
+ StreamContext::ReplyMQ* mReplyMQ;
+ StreamContext::DataMQ* mDataMQ;
+ // We use an array and the "size" field instead of a vector to be able to detect
+ // memory allocation issues.
+ std::unique_ptr<int8_t[]> mDataBuffer;
+ size_t mDataBufferSize;
+ long mFrameCount = 0;
+};
+
+class StreamInWorkerLogic : public StreamWorkerCommonLogic {
+ public:
+ static const std::string kThreadName;
+ explicit StreamInWorkerLogic(const StreamContext& context) : StreamWorkerCommonLogic(context) {}
+
+ protected:
+ Status cycle() override;
+};
+using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
+
+class StreamOutWorkerLogic : public StreamWorkerCommonLogic {
+ public:
+ static const std::string kThreadName;
+ explicit StreamOutWorkerLogic(const StreamContext& context)
+ : StreamWorkerCommonLogic(context) {}
+
+ protected:
+ Status cycle() override;
+};
+using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
+
+template <class Metadata, class StreamWorker>
+class StreamCommon {
+ public:
+ ndk::ScopedAStatus close();
+ ndk::ScopedAStatus init() {
+ return mWorker.start(StreamWorker::kThreadName, ANDROID_PRIORITY_AUDIO)
+ ? ndk::ScopedAStatus::ok()
+ : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
+ }
+ bool isClosed() const { return mIsClosed; }
+ void setIsConnected(bool connected) { mWorker.setIsConnected(connected); }
+ ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
+
+ protected:
+ StreamCommon(const Metadata& metadata, StreamContext context)
+ : mMetadata(metadata), mContext(std::move(context)), mWorker(mContext) {}
+ ~StreamCommon();
+ void stopWorker();
+
+ Metadata mMetadata;
+ StreamContext mContext;
+ StreamWorker mWorker;
+ // This variable is checked in the destructor which can be called on an arbitrary Binder thread,
+ // thus we need to ensure that any changes made by other threads are sequentially consistent.
+ std::atomic<bool> mIsClosed = false;
+};
+
+class StreamIn
+ : public StreamCommon<::aidl::android::hardware::audio::common::SinkMetadata, StreamInWorker>,
+ public BnStreamIn {
+ ndk::ScopedAStatus close() override {
+ return StreamCommon<::aidl::android::hardware::audio::common::SinkMetadata,
+ StreamInWorker>::close();
+ }
+ ndk::ScopedAStatus updateMetadata(const ::aidl::android::hardware::audio::common::SinkMetadata&
+ in_sinkMetadata) override {
+ return StreamCommon<::aidl::android::hardware::audio::common::SinkMetadata,
+ StreamInWorker>::updateMetadata(in_sinkMetadata);
+ }
+
+ public:
+ StreamIn(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
+ StreamContext context);
+};
+
+class StreamOut : public StreamCommon<::aidl::android::hardware::audio::common::SourceMetadata,
+ StreamOutWorker>,
+ public BnStreamOut {
+ ndk::ScopedAStatus close() override {
+ return StreamCommon<::aidl::android::hardware::audio::common::SourceMetadata,
+ StreamOutWorker>::close();
+ }
ndk::ScopedAStatus updateMetadata(
const ::aidl::android::hardware::audio::common::SourceMetadata& in_sourceMetadata)
- override;
+ override {
+ return StreamCommon<::aidl::android::hardware::audio::common::SourceMetadata,
+ StreamOutWorker>::updateMetadata(in_sourceMetadata);
+ }
public:
StreamOut(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
+ StreamContext context,
const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
offloadInfo);
- bool isClosed() const { return mIsClosed; }
private:
- ::aidl::android::hardware::audio::common::SourceMetadata mMetadata;
std::optional<::aidl::android::media::audio::common::AudioOffloadInfo> mOffloadInfo;
- bool mIsClosed = false;
};
class StreamWrapper {
@@ -74,6 +227,15 @@
},
mStream);
}
+ void setStreamIsConnected(bool connected) {
+ std::visit(
+ [&](auto&& ws) -> bool {
+ auto s = ws.lock();
+ if (s) s->setIsConnected(connected);
+ return !!s;
+ },
+ mStream);
+ }
private:
std::variant<std::weak_ptr<StreamIn>, std::weak_ptr<StreamOut>> mStream;
@@ -93,6 +255,11 @@
mStreams.insert(std::pair{portConfigId, sw});
mStreams.insert(std::pair{portId, sw});
}
+ void setStreamIsConnected(int32_t portConfigId, bool connected) {
+ if (auto it = mStreams.find(portConfigId); it != mStreams.end()) {
+ it->second.setStreamIsConnected(connected);
+ }
+ }
private:
// Maps port ids and port config ids to streams. Multimap because a port
diff --git a/audio/aidl/default/main.cpp b/audio/aidl/default/main.cpp
index aeb9983..15874a0 100644
--- a/audio/aidl/default/main.cpp
+++ b/audio/aidl/default/main.cpp
@@ -14,6 +14,9 @@
* limitations under the License.
*/
+#include <cstdlib>
+#include <ctime>
+
#include "core-impl/Config.h"
#include "core-impl/Module.h"
@@ -25,6 +28,9 @@
using aidl::android::hardware::audio::core::Module;
int main() {
+ // Random values are used in the implementation.
+ std::srand(std::time(nullptr));
+
// This is a debug implementation, always enable debug logging.
android::base::SetMinimumLogSeverity(::android::base::DEBUG);
ABinderProcess_setThreadPoolMaxThreadCount(16);