audio: Generalize stream implementations
This allows for more code reuse and composability when
implementing streams for a particular audio "backend."
The existing "stub" code has been moved to StreamStub* files.
Bug: 264712385
Test: atest VtsHalAudioCoreTargetTest
Change-Id: I97fd41f87eb6d01e1d57f0d70a86d3b2b3555837
diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp
index 1e6785f..248ba84 100644
--- a/audio/aidl/default/Android.bp
+++ b/audio/aidl/default/Android.bp
@@ -69,6 +69,7 @@
"Module.cpp",
"SoundDose.cpp",
"Stream.cpp",
+ "StreamStub.cpp",
"Telephony.cpp",
],
generated_sources: [
diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp
index 55e5bff..acad70f 100644
--- a/audio/aidl/default/Module.cpp
+++ b/audio/aidl/default/Module.cpp
@@ -28,6 +28,7 @@
#include "core-impl/Bluetooth.h"
#include "core-impl/Module.h"
#include "core-impl/SoundDose.h"
+#include "core-impl/StreamStub.h"
#include "core-impl/Telephony.h"
#include "core-impl/utils.h"
@@ -552,8 +553,9 @@
}
context.fillDescriptor(&_aidl_return->desc);
std::shared_ptr<StreamIn> stream;
- if (auto status = StreamIn::createInstance(in_args.sinkMetadata, std::move(context),
- mConfig->microphones, &stream);
+ // TODO: Add a mapping from module instance names to a corresponding 'createInstance'.
+ if (auto status = StreamInStub::createInstance(in_args.sinkMetadata, std::move(context),
+ mConfig->microphones, &stream);
!status.isOk()) {
return status;
}
@@ -606,8 +608,9 @@
}
context.fillDescriptor(&_aidl_return->desc);
std::shared_ptr<StreamOut> stream;
- if (auto status = StreamOut::createInstance(in_args.sourceMetadata, std::move(context),
- in_args.offloadInfo, &stream);
+ // TODO: Add a mapping from module instance names to a corresponding 'createInstance'.
+ if (auto status = StreamOutStub::createInstance(in_args.sourceMetadata, std::move(context),
+ in_args.offloadInfo, &stream);
!status.isOk()) {
return status;
}
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 0520cba..25814e4 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -85,16 +85,19 @@
if (mCommandMQ == nullptr) return "Command MQ is null";
if (mReplyMQ == nullptr) return "Reply MQ is null";
if (mDataMQ == nullptr) return "Data MQ is null";
- if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
+ if (sizeof(DataBufferElement) != mDataMQ->getQuantumSize()) {
return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
}
mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
- mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
+ mDataBuffer.reset(new (std::nothrow) DataBufferElement[mDataBufferSize]);
if (mDataBuffer == nullptr) {
return "Failed to allocate data buffer for element count " +
std::to_string(mDataMQ->getQuantumCount()) +
", size in bytes: " + std::to_string(mDataBufferSize);
}
+ if (::android::status_t status = mDriver->init(); status != STATUS_OK) {
+ return "Failed to initialize the driver: " + std::to_string(status);
+ }
return "";
}
@@ -191,46 +194,59 @@
}
break;
case Tag::drain:
- if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
+ if (const auto mode = command.get<Tag::drain>();
+ mode == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
if (mState == StreamDescriptor::State::ACTIVE) {
- usleep(1000); // Simulate a blocking call into the driver.
- populateReply(&reply, mIsConnected);
- // Can switch the state to ERROR if a driver error occurs.
- mState = StreamDescriptor::State::DRAINING;
+ if (::android::status_t status = mDriver->drain(mode);
+ status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::DRAINING;
+ } else {
+ LOG(ERROR) << __func__ << ": drain failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
} else {
populateReplyWrongState(&reply, command);
}
} else {
- LOG(WARNING) << __func__
- << ": invalid drain mode: " << toString(command.get<Tag::drain>());
+ LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode);
}
break;
case Tag::standby:
if (mState == StreamDescriptor::State::IDLE) {
- usleep(1000); // Simulate a blocking call into the driver.
- populateReply(&reply, mIsConnected);
- // Can switch the state to ERROR if a driver error occurs.
- mState = StreamDescriptor::State::STANDBY;
+ if (::android::status_t status = mDriver->standby(); status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::STANDBY;
+ } else {
+ LOG(ERROR) << __func__ << ": standby failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause:
if (mState == StreamDescriptor::State::ACTIVE) {
- usleep(1000); // Simulate a blocking call into the driver.
- populateReply(&reply, mIsConnected);
- // Can switch the state to ERROR if a driver error occurs.
- mState = StreamDescriptor::State::PAUSED;
+ if (::android::status_t status = mDriver->pause(); status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::PAUSED;
+ } else {
+ LOG(ERROR) << __func__ << ": pause failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::flush:
if (mState == StreamDescriptor::State::PAUSED) {
- usleep(1000); // Simulate a blocking call into the driver.
- populateReply(&reply, mIsConnected);
- // Can switch the state to ERROR if a driver error occurs.
- mState = StreamDescriptor::State::STANDBY;
+ if (::android::status_t status = mDriver->flush(); status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::STANDBY;
+ } else {
+ LOG(ERROR) << __func__ << ": flush failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
} else {
populateReplyWrongState(&reply, command);
}
@@ -247,33 +263,39 @@
}
bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
- // Can switch the state to ERROR if a driver error occurs.
const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
const bool isConnected = mIsConnected;
+ size_t actualFrameCount = 0;
bool fatal = false;
- // Simulate reading of data, or provide zeroes if the stream is not connected.
- for (size_t i = 0; i < byteCount; ++i) {
- using buffer_type = decltype(mDataBuffer)::element_type;
- constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
- std::numeric_limits<buffer_type>::min() + 1;
- mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
- std::numeric_limits<buffer_type>::min()
- : 0;
+ int32_t latency = Module::kLatencyMs;
+ if (isConnected) {
+ if (::android::status_t status = mDriver->transfer(
+ mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency);
+ status != ::android::OK) {
+ fatal = true;
+ LOG(ERROR) << __func__ << ": read failed: " << status;
+ }
+ } else {
+ usleep(3000); // Simulate blocking transfer delay.
+ for (size_t i = 0; i < byteCount; ++i) mDataBuffer[i] = 0;
+ actualFrameCount = byteCount / mFrameSize;
}
- usleep(3000); // Simulate a blocking call into the driver.
- // Set 'fatal = true' if a driver error occurs.
- if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
- LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+ const size_t actualByteCount = actualFrameCount * mFrameSize;
+ if (bool success =
+ actualByteCount > 0 ? mDataMQ->write(&mDataBuffer[0], actualByteCount) : true;
+ success) {
+ LOG(DEBUG) << __func__ << ": writing of " << actualByteCount << " bytes into data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are provided and counted regardless of connection status.
- reply->fmqByteCount += byteCount;
- mFrameCount += byteCount / mFrameSize;
+ reply->fmqByteCount += actualByteCount;
+ mFrameCount += actualFrameCount;
populateReply(reply, isConnected);
} else {
- LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
+ LOG(WARNING) << __func__ << ": writing of " << actualByteCount
+ << " bytes of data to MQ failed";
reply->status = STATUS_NOT_ENOUGH_DATA;
}
- reply->latencyMs = Module::kLatencyMs;
+ reply->latencyMs = latency;
return !fatal;
}
@@ -395,17 +417,22 @@
}
break;
case Tag::drain:
- if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_ALL ||
- command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
+ if (const auto mode = command.get<Tag::drain>();
+ mode == StreamDescriptor::DrainMode::DRAIN_ALL ||
+ mode == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
if (mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::TRANSFERRING) {
- usleep(1000); // Simulate a blocking call into the driver.
- populateReply(&reply, mIsConnected);
- // Can switch the state to ERROR if a driver error occurs.
- if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) {
- mState = StreamDescriptor::State::IDLE;
+ if (::android::status_t status = mDriver->drain(mode);
+ status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) {
+ mState = StreamDescriptor::State::IDLE;
+ } else {
+ switchToTransientState(StreamDescriptor::State::DRAINING);
+ }
} else {
- switchToTransientState(StreamDescriptor::State::DRAINING);
+ LOG(ERROR) << __func__ << ": drain failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
}
} else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) {
mState = StreamDescriptor::State::DRAIN_PAUSED;
@@ -414,46 +441,58 @@
populateReplyWrongState(&reply, command);
}
} else {
- LOG(WARNING) << __func__
- << ": invalid drain mode: " << toString(command.get<Tag::drain>());
+ LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode);
}
break;
case Tag::standby:
if (mState == StreamDescriptor::State::IDLE) {
- usleep(1000); // Simulate a blocking call into the driver.
- populateReply(&reply, mIsConnected);
- // Can switch the state to ERROR if a driver error occurs.
- mState = StreamDescriptor::State::STANDBY;
+ if (::android::status_t status = mDriver->standby(); status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::STANDBY;
+ } else {
+ LOG(ERROR) << __func__ << ": standby failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause: {
- bool commandAccepted = true;
+ std::optional<StreamDescriptor::State> nextState;
switch (mState) {
case StreamDescriptor::State::ACTIVE:
- mState = StreamDescriptor::State::PAUSED;
+ nextState = StreamDescriptor::State::PAUSED;
break;
case StreamDescriptor::State::DRAINING:
- mState = StreamDescriptor::State::DRAIN_PAUSED;
+ nextState = StreamDescriptor::State::DRAIN_PAUSED;
break;
case StreamDescriptor::State::TRANSFERRING:
- mState = StreamDescriptor::State::TRANSFER_PAUSED;
+ nextState = StreamDescriptor::State::TRANSFER_PAUSED;
break;
default:
populateReplyWrongState(&reply, command);
- commandAccepted = false;
}
- if (commandAccepted) {
- populateReply(&reply, mIsConnected);
+ if (nextState.has_value()) {
+ if (::android::status_t status = mDriver->pause(); status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = nextState.value();
+ } else {
+ LOG(ERROR) << __func__ << ": pause failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
}
} break;
case Tag::flush:
if (mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAIN_PAUSED ||
mState == StreamDescriptor::State::TRANSFER_PAUSED) {
- populateReply(&reply, mIsConnected);
- mState = StreamDescriptor::State::IDLE;
+ if (::android::status_t status = mDriver->flush(); status == ::android::OK) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::IDLE;
+ } else {
+ LOG(ERROR) << __func__ << ": flush failed: " << status;
+ mState = StreamDescriptor::State::ERROR;
+ }
} else {
populateReplyWrongState(&reply, command);
}
@@ -472,6 +511,7 @@
bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
const size_t readByteCount = mDataMQ->availableToRead();
bool fatal = false;
+ int32_t latency = Module::kLatencyMs;
if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
const bool isConnected = mIsConnected;
LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
@@ -483,23 +523,36 @@
// simulate partial write.
byteCount -= mFrameSize;
}
+ size_t actualFrameCount = 0;
+ if (isConnected) {
+ if (::android::status_t status = mDriver->transfer(
+ mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency);
+ status != ::android::OK) {
+ fatal = true;
+ LOG(ERROR) << __func__ << ": write failed: " << status;
+ }
+ } else {
+ if (mAsyncCallback == nullptr) {
+ usleep(3000); // Simulate blocking transfer delay.
+ }
+ actualFrameCount = byteCount / mFrameSize;
+ }
+ const size_t actualByteCount = actualFrameCount * mFrameSize;
// Frames are consumed and counted regardless of the connection status.
- reply->fmqByteCount += byteCount;
- mFrameCount += byteCount / mFrameSize;
+ reply->fmqByteCount += actualByteCount;
+ mFrameCount += actualFrameCount;
populateReply(reply, isConnected);
- usleep(3000); // Simulate a blocking call into the driver.
- // Set 'fatal = true' if a driver error occurs.
} else {
LOG(WARNING) << __func__ << ": reading of " << readByteCount
<< " bytes of data from MQ failed";
reply->status = STATUS_NOT_ENOUGH_DATA;
}
- reply->latencyMs = Module::kLatencyMs;
+ reply->latencyMs = latency;
return !fatal;
}
-template <class Metadata, class StreamWorker>
-StreamCommonImpl<Metadata, StreamWorker>::~StreamCommonImpl() {
+template <class Metadata>
+StreamCommonImpl<Metadata>::~StreamCommonImpl() {
if (!isClosed()) {
LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
stopWorker();
@@ -507,8 +560,8 @@
}
}
-template <class Metadata, class StreamWorker>
-void StreamCommonImpl<Metadata, StreamWorker>::createStreamCommon(
+template <class Metadata>
+void StreamCommonImpl<Metadata>::createStreamCommon(
const std::shared_ptr<StreamCommonInterface>& delegate) {
if (mCommon != nullptr) {
LOG(FATAL) << __func__ << ": attempting to create the common interface twice";
@@ -518,8 +571,8 @@
AIBinder_setMinSchedulerPolicy(mCommonBinder.get(), SCHED_NORMAL, ANDROID_PRIORITY_AUDIO);
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::getStreamCommon(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::getStreamCommon(
std::shared_ptr<IStreamCommon>* _aidl_return) {
if (mCommon == nullptr) {
LOG(FATAL) << __func__ << ": the common interface was not created";
@@ -529,31 +582,30 @@
return ndk::ScopedAStatus::ok();
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::updateHwAvSyncId(
- int32_t in_hwAvSyncId) {
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::updateHwAvSyncId(int32_t in_hwAvSyncId) {
LOG(DEBUG) << __func__ << ": id " << in_hwAvSyncId;
return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::getVendorParameters(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::getVendorParameters(
const std::vector<std::string>& in_ids, std::vector<VendorParameter>* _aidl_return) {
LOG(DEBUG) << __func__ << ": id count: " << in_ids.size();
(void)_aidl_return;
return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::setVendorParameters(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::setVendorParameters(
const std::vector<VendorParameter>& in_parameters, bool in_async) {
LOG(DEBUG) << __func__ << ": parameters count " << in_parameters.size()
<< ", async: " << in_async;
return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::addEffect(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::addEffect(
const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) {
if (in_effect == nullptr) {
LOG(DEBUG) << __func__ << ": null effect";
@@ -563,8 +615,8 @@
return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::removeEffect(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::removeEffect(
const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) {
if (in_effect == nullptr) {
LOG(DEBUG) << __func__ << ": null effect";
@@ -574,16 +626,16 @@
return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::close() {
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::close() {
LOG(DEBUG) << __func__;
if (!isClosed()) {
stopWorker();
LOG(DEBUG) << __func__ << ": joining the worker thread...";
- mWorker.stop();
+ mWorker->stop();
LOG(DEBUG) << __func__ << ": worker thread joined";
mContext.reset();
- mWorker.setClosed();
+ mWorker->setClosed();
return ndk::ScopedAStatus::ok();
} else {
LOG(ERROR) << __func__ << ": stream was already closed";
@@ -591,8 +643,8 @@
}
}
-template <class Metadata, class StreamWorker>
-void StreamCommonImpl<Metadata, StreamWorker>::stopWorker() {
+template <class Metadata>
+void StreamCommonImpl<Metadata>::stopWorker() {
if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
LOG(DEBUG) << __func__ << ": asking the worker to exit...";
auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
@@ -608,9 +660,8 @@
}
}
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::updateMetadata(
- const Metadata& metadata) {
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::updateMetadata(const Metadata& metadata) {
LOG(DEBUG) << __func__;
if (!isClosed()) {
mMetadata = metadata;
@@ -621,16 +672,11 @@
}
// static
-ndk::ScopedAStatus StreamIn::createInstance(const common::SinkMetadata& sinkMetadata,
- StreamContext context,
- const std::vector<MicrophoneInfo>& microphones,
- std::shared_ptr<StreamIn>* result) {
- auto stream = ndk::SharedRefBase::make<StreamIn>(sinkMetadata, std::move(context), microphones);
+ndk::ScopedAStatus StreamIn::initInstance(const std::shared_ptr<StreamIn>& stream) {
if (auto status = stream->init(); !status.isOk()) {
return status;
}
stream->createStreamCommon(stream);
- *result = std::move(stream);
return ndk::ScopedAStatus::ok();
}
@@ -645,8 +691,10 @@
} // namespace
StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext&& context,
+ const DriverInterface::CreateInstance& createDriver,
+ const StreamWorkerInterface::CreateInstance& createWorker,
const std::vector<MicrophoneInfo>& microphones)
- : StreamCommonImpl<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)),
+ : StreamCommonImpl<SinkMetadata>(sinkMetadata, std::move(context), createDriver, createWorker),
mMicrophones(transformMicrophones(microphones)) {
LOG(DEBUG) << __func__;
}
@@ -704,23 +752,20 @@
}
// static
-ndk::ScopedAStatus StreamOut::createInstance(const SourceMetadata& sourceMetadata,
- StreamContext context,
- const std::optional<AudioOffloadInfo>& offloadInfo,
- std::shared_ptr<StreamOut>* result) {
- auto stream =
- ndk::SharedRefBase::make<StreamOut>(sourceMetadata, std::move(context), offloadInfo);
+ndk::ScopedAStatus StreamOut::initInstance(const std::shared_ptr<StreamOut>& stream) {
if (auto status = stream->init(); !status.isOk()) {
return status;
}
stream->createStreamCommon(stream);
- *result = std::move(stream);
return ndk::ScopedAStatus::ok();
}
StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext&& context,
+ const DriverInterface::CreateInstance& createDriver,
+ const StreamWorkerInterface::CreateInstance& createWorker,
const std::optional<AudioOffloadInfo>& offloadInfo)
- : StreamCommonImpl<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
+ : StreamCommonImpl<SourceMetadata>(sourceMetadata, std::move(context), createDriver,
+ createWorker),
mOffloadInfo(offloadInfo) {
LOG(DEBUG) << __func__;
}
diff --git a/audio/aidl/default/StreamStub.cpp b/audio/aidl/default/StreamStub.cpp
new file mode 100644
index 0000000..5442179
--- /dev/null
+++ b/audio/aidl/default/StreamStub.cpp
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "AHAL_Stream"
+#include <android-base/logging.h>
+
+#include "core-impl/Module.h"
+#include "core-impl/StreamStub.h"
+
+using aidl::android::hardware::audio::common::SinkMetadata;
+using aidl::android::hardware::audio::common::SourceMetadata;
+using aidl::android::media::audio::common::AudioOffloadInfo;
+
+namespace aidl::android::hardware::audio::core {
+
+DriverStub::DriverStub(const StreamContext& context, bool isInput)
+ : mFrameSizeBytes(context.getFrameSize()), mIsInput(isInput) {}
+
+::android::status_t DriverStub::init() {
+ usleep(1000);
+ return ::android::OK;
+}
+
+::android::status_t DriverStub::drain(StreamDescriptor::DrainMode) {
+ usleep(1000);
+ return ::android::OK;
+}
+
+::android::status_t DriverStub::flush() {
+ usleep(1000);
+ return ::android::OK;
+}
+
+::android::status_t DriverStub::pause() {
+ usleep(1000);
+ return ::android::OK;
+}
+
+::android::status_t DriverStub::transfer(void* buffer, size_t frameCount, size_t* actualFrameCount,
+ int32_t* latencyMs) {
+ usleep(3000);
+ if (mIsInput) {
+ uint8_t* byteBuffer = static_cast<uint8_t*>(buffer);
+ for (size_t i = 0; i < frameCount * mFrameSizeBytes; ++i) {
+ byteBuffer[i] = std::rand() % 255;
+ }
+ }
+ *actualFrameCount = frameCount;
+ *latencyMs = Module::kLatencyMs;
+ return ::android::OK;
+}
+
+::android::status_t DriverStub::standby() {
+ usleep(1000);
+ return ::android::OK;
+}
+
+// static
+ndk::ScopedAStatus StreamInStub::createInstance(const SinkMetadata& sinkMetadata,
+ StreamContext&& context,
+ const std::vector<MicrophoneInfo>& microphones,
+ std::shared_ptr<StreamIn>* result) {
+ std::shared_ptr<StreamIn> stream =
+ ndk::SharedRefBase::make<StreamInStub>(sinkMetadata, std::move(context), microphones);
+ if (auto status = initInstance(stream); !status.isOk()) {
+ return status;
+ }
+ *result = std::move(stream);
+ return ndk::ScopedAStatus::ok();
+}
+
+StreamInStub::StreamInStub(const SinkMetadata& sinkMetadata, StreamContext&& context,
+ const std::vector<MicrophoneInfo>& microphones)
+ : StreamIn(
+ sinkMetadata, std::move(context),
+ [](const StreamContext& ctx) -> DriverInterface* {
+ return new DriverStub(ctx, true /*isInput*/);
+ },
+ [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* {
+ // The default worker implementation is used.
+ return new StreamInWorker(ctx, driver);
+ },
+ microphones) {}
+
+// static
+ndk::ScopedAStatus StreamOutStub::createInstance(const SourceMetadata& sourceMetadata,
+ StreamContext&& context,
+ const std::optional<AudioOffloadInfo>& offloadInfo,
+ std::shared_ptr<StreamOut>* result) {
+ std::shared_ptr<StreamOut> stream = ndk::SharedRefBase::make<StreamOutStub>(
+ sourceMetadata, std::move(context), offloadInfo);
+ if (auto status = initInstance(stream); !status.isOk()) {
+ return status;
+ }
+ *result = std::move(stream);
+ return ndk::ScopedAStatus::ok();
+}
+
+StreamOutStub::StreamOutStub(const SourceMetadata& sourceMetadata, StreamContext&& context,
+ const std::optional<AudioOffloadInfo>& offloadInfo)
+ : StreamOut(
+ sourceMetadata, std::move(context),
+ [](const StreamContext& ctx) -> DriverInterface* {
+ return new DriverStub(ctx, false /*isInput*/);
+ },
+ [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* {
+ // The default worker implementation is used.
+ return new StreamOutWorker(ctx, driver);
+ },
+ offloadInfo) {}
+
+} // namespace aidl::android::hardware::audio::core
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 8c60efa..7cd4259 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -38,6 +38,7 @@
#include <aidl/android/media/audio/common/AudioOffloadInfo.h>
#include <fmq/AidlMessageQueue.h>
#include <system/thread_defs.h>
+#include <utils/Errors.h>
#include "core-impl/utils.h"
@@ -145,6 +146,20 @@
DebugParameters mDebugParameters;
};
+struct DriverInterface {
+ using CreateInstance = std::function<DriverInterface*(const StreamContext&)>;
+ virtual ~DriverInterface() = default;
+ // This function is called once, on the main thread, before starting the worker thread.
+ virtual ::android::status_t init() = 0;
+ // All the functions below are called on the worker thread.
+ virtual ::android::status_t drain(StreamDescriptor::DrainMode mode) = 0;
+ virtual ::android::status_t flush() = 0;
+ virtual ::android::status_t pause() = 0;
+ virtual ::android::status_t transfer(void* buffer, size_t frameCount, size_t* actualFrameCount,
+ int32_t* latencyMs) = 0;
+ virtual ::android::status_t standby() = 0;
+};
+
class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
public:
bool isClosed() const {
@@ -154,8 +169,11 @@
void setIsConnected(bool connected) { mIsConnected = connected; }
protected:
- explicit StreamWorkerCommonLogic(const StreamContext& context)
- : mInternalCommandCookie(context.getInternalCommandCookie()),
+ using DataBufferElement = int8_t;
+
+ StreamWorkerCommonLogic(const StreamContext& context, DriverInterface* driver)
+ : mDriver(driver),
+ mInternalCommandCookie(context.getInternalCommandCookie()),
mFrameSize(context.getFrameSize()),
mCommandMQ(context.getCommandMQ()),
mReplyMQ(context.getReplyMQ()),
@@ -173,6 +191,7 @@
mTransientStateStart = std::chrono::steady_clock::now();
}
+ DriverInterface* const mDriver;
// Atomic fields are used both by the main and worker threads.
std::atomic<bool> mIsConnected = false;
static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
@@ -180,9 +199,9 @@
// All fields are used on the worker thread only.
const int mInternalCommandCookie;
const size_t mFrameSize;
- StreamContext::CommandMQ* mCommandMQ;
- StreamContext::ReplyMQ* mReplyMQ;
- StreamContext::DataMQ* mDataMQ;
+ StreamContext::CommandMQ* const mCommandMQ;
+ StreamContext::ReplyMQ* const mReplyMQ;
+ StreamContext::DataMQ* const mDataMQ;
std::shared_ptr<IStreamCallback> mAsyncCallback;
const std::chrono::duration<int, std::milli> mTransientStateDelayMs;
std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart;
@@ -190,15 +209,46 @@
const bool mForceSynchronousDrain;
// We use an array and the "size" field instead of a vector to be able to detect
// memory allocation issues.
- std::unique_ptr<int8_t[]> mDataBuffer;
+ std::unique_ptr<DataBufferElement[]> mDataBuffer;
size_t mDataBufferSize;
long mFrameCount = 0;
};
+// This interface is used to decouple stream implementations from a concrete StreamWorker
+// implementation.
+struct StreamWorkerInterface {
+ using CreateInstance = std::function<StreamWorkerInterface*(const StreamContext& context,
+ DriverInterface* driver)>;
+ virtual ~StreamWorkerInterface() = default;
+ virtual bool isClosed() const = 0;
+ virtual void setIsConnected(bool isConnected) = 0;
+ virtual void setClosed() = 0;
+ virtual bool start() = 0;
+ virtual void stop() = 0;
+};
+
+template <class WorkerLogic>
+class StreamWorkerImpl : public StreamWorkerInterface,
+ public ::android::hardware::audio::common::StreamWorker<WorkerLogic> {
+ using WorkerImpl = ::android::hardware::audio::common::StreamWorker<WorkerLogic>;
+
+ public:
+ StreamWorkerImpl(const StreamContext& context, DriverInterface* driver)
+ : WorkerImpl(context, driver) {}
+ bool isClosed() const override { return WorkerImpl::isClosed(); }
+ void setIsConnected(bool isConnected) override { WorkerImpl::setIsConnected(isConnected); }
+ void setClosed() override { WorkerImpl::setClosed(); }
+ bool start() override {
+ return WorkerImpl::start(WorkerImpl::kThreadName, ANDROID_PRIORITY_AUDIO);
+ }
+ void stop() override { return WorkerImpl::stop(); }
+};
+
class StreamInWorkerLogic : public StreamWorkerCommonLogic {
public:
static const std::string kThreadName;
- explicit StreamInWorkerLogic(const StreamContext& context) : StreamWorkerCommonLogic(context) {}
+ StreamInWorkerLogic(const StreamContext& context, DriverInterface* driver)
+ : StreamWorkerCommonLogic(context, driver) {}
protected:
Status cycle() override;
@@ -206,13 +256,13 @@
private:
bool read(size_t clientSize, StreamDescriptor::Reply* reply);
};
-using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
+using StreamInWorker = StreamWorkerImpl<StreamInWorkerLogic>;
class StreamOutWorkerLogic : public StreamWorkerCommonLogic {
public:
static const std::string kThreadName;
- explicit StreamOutWorkerLogic(const StreamContext& context)
- : StreamWorkerCommonLogic(context), mEventCallback(context.getOutEventCallback()) {}
+ StreamOutWorkerLogic(const StreamContext& context, DriverInterface* driver)
+ : StreamWorkerCommonLogic(context, driver), mEventCallback(context.getOutEventCallback()) {}
protected:
Status cycle() override;
@@ -222,7 +272,7 @@
std::shared_ptr<IStreamOutEventCallback> mEventCallback;
};
-using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
+using StreamOutWorker = StreamWorkerImpl<StreamOutWorkerLogic>;
// This provides a C++ interface with methods of the IStreamCommon Binder interface,
// but intentionally does not inherit from it. This is needed to avoid inheriting
@@ -294,7 +344,7 @@
std::weak_ptr<StreamCommonInterface> mDelegate;
};
-template <class Metadata, class StreamWorker>
+template <class Metadata>
class StreamCommonImpl : public StreamCommonInterface {
public:
ndk::ScopedAStatus close() override;
@@ -312,21 +362,25 @@
ndk::ScopedAStatus getStreamCommon(std::shared_ptr<IStreamCommon>* _aidl_return);
ndk::ScopedAStatus init() {
- return mWorker.start(StreamWorker::kThreadName, ANDROID_PRIORITY_AUDIO)
- ? ndk::ScopedAStatus::ok()
- : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
+ return mWorker->start() ? ndk::ScopedAStatus::ok()
+ : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
- bool isClosed() const { return mWorker.isClosed(); }
+ bool isClosed() const { return mWorker->isClosed(); }
void setIsConnected(
const std::vector<::aidl::android::media::audio::common::AudioDevice>& devices) {
- mWorker.setIsConnected(!devices.empty());
+ mWorker->setIsConnected(!devices.empty());
mConnectedDevices = devices;
}
ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
protected:
- StreamCommonImpl(const Metadata& metadata, StreamContext&& context)
- : mMetadata(metadata), mContext(std::move(context)), mWorker(mContext) {}
+ StreamCommonImpl(const Metadata& metadata, StreamContext&& context,
+ const DriverInterface::CreateInstance& createDriver,
+ const StreamWorkerInterface::CreateInstance& createWorker)
+ : mMetadata(metadata),
+ mContext(std::move(context)),
+ mDriver(createDriver(mContext)),
+ mWorker(createWorker(mContext, mDriver.get())) {}
~StreamCommonImpl();
void stopWorker();
void createStreamCommon(const std::shared_ptr<StreamCommonInterface>& delegate);
@@ -335,16 +389,16 @@
ndk::SpAIBinder mCommonBinder;
Metadata mMetadata;
StreamContext mContext;
- StreamWorker mWorker;
+ std::unique_ptr<DriverInterface> mDriver;
+ std::unique_ptr<StreamWorkerInterface> mWorker;
std::vector<::aidl::android::media::audio::common::AudioDevice> mConnectedDevices;
};
-class StreamIn : public StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata,
- StreamInWorker>,
+class StreamIn : public StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata>,
public BnStreamIn {
ndk::ScopedAStatus getStreamCommon(std::shared_ptr<IStreamCommon>* _aidl_return) override {
- return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata,
- StreamInWorker>::getStreamCommon(_aidl_return);
+ return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata>::
+ getStreamCommon(_aidl_return);
}
ndk::ScopedAStatus getActiveMicrophones(
std::vector<MicrophoneDynamicInfo>* _aidl_return) override;
@@ -354,42 +408,46 @@
ndk::ScopedAStatus setMicrophoneFieldDimension(float in_zoom) override;
ndk::ScopedAStatus updateMetadata(const ::aidl::android::hardware::audio::common::SinkMetadata&
in_sinkMetadata) override {
- return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata,
- StreamInWorker>::updateMetadata(in_sinkMetadata);
+ return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata>::
+ updateMetadata(in_sinkMetadata);
}
ndk::ScopedAStatus getHwGain(std::vector<float>* _aidl_return) override;
ndk::ScopedAStatus setHwGain(const std::vector<float>& in_channelGains) override;
- public:
- static ndk::ScopedAStatus createInstance(
- const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
- StreamContext context, const std::vector<MicrophoneInfo>& microphones,
- std::shared_ptr<StreamIn>* result);
-
- private:
+ protected:
friend class ndk::SharedRefBase;
+
+ static ndk::ScopedAStatus initInstance(const std::shared_ptr<StreamIn>& stream);
+
StreamIn(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
- StreamContext&& context, const std::vector<MicrophoneInfo>& microphones);
+ StreamContext&& context, const DriverInterface::CreateInstance& createDriver,
+ const StreamWorkerInterface::CreateInstance& createWorker,
+ const std::vector<MicrophoneInfo>& microphones);
void createStreamCommon(const std::shared_ptr<StreamIn>& myPtr) {
- StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata,
- StreamInWorker>::createStreamCommon(myPtr);
+ StreamCommonImpl<
+ ::aidl::android::hardware::audio::common::SinkMetadata>::createStreamCommon(myPtr);
}
const std::map<::aidl::android::media::audio::common::AudioDevice, std::string> mMicrophones;
+
+ public:
+ using CreateInstance = std::function<ndk::ScopedAStatus(
+ const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
+ StreamContext&& context, const std::vector<MicrophoneInfo>& microphones,
+ std::shared_ptr<StreamIn>* result)>;
};
-class StreamOut : public StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata,
- StreamOutWorker>,
+class StreamOut : public StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>,
public BnStreamOut {
ndk::ScopedAStatus getStreamCommon(std::shared_ptr<IStreamCommon>* _aidl_return) override {
- return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata,
- StreamOutWorker>::getStreamCommon(_aidl_return);
+ return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>::
+ getStreamCommon(_aidl_return);
}
ndk::ScopedAStatus updateMetadata(
const ::aidl::android::hardware::audio::common::SourceMetadata& in_sourceMetadata)
override {
- return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata,
- StreamOutWorker>::updateMetadata(in_sourceMetadata);
+ return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>::
+ updateMetadata(in_sourceMetadata);
}
ndk::ScopedAStatus getHwVolume(std::vector<float>* _aidl_return) override;
ndk::ScopedAStatus setHwVolume(const std::vector<float>& in_channelVolumes) override;
@@ -411,26 +469,31 @@
override;
ndk::ScopedAStatus selectPresentation(int32_t in_presentationId, int32_t in_programId) override;
- public:
- static ndk::ScopedAStatus createInstance(
- const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
- StreamContext context,
- const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
- offloadInfo,
- std::shared_ptr<StreamOut>* result);
-
- private:
- friend class ndk::SharedRefBase;
- StreamOut(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
- StreamContext&& context,
- const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
- offloadInfo);
void createStreamCommon(const std::shared_ptr<StreamOut>& myPtr) {
- StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata,
- StreamOutWorker>::createStreamCommon(myPtr);
+ StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>::
+ createStreamCommon(myPtr);
}
+ protected:
+ friend class ndk::SharedRefBase;
+
+ static ndk::ScopedAStatus initInstance(const std::shared_ptr<StreamOut>& stream);
+
+ StreamOut(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
+ StreamContext&& context, const DriverInterface::CreateInstance& createDriver,
+ const StreamWorkerInterface::CreateInstance& createWorker,
+ const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
+ offloadInfo);
+
std::optional<::aidl::android::media::audio::common::AudioOffloadInfo> mOffloadInfo;
+
+ public:
+ using CreateInstance = std::function<ndk::ScopedAStatus(
+ const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
+ StreamContext&& context,
+ const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
+ offloadInfo,
+ std::shared_ptr<StreamOut>* result)>;
};
class StreamWrapper {
diff --git a/audio/aidl/default/include/core-impl/StreamStub.h b/audio/aidl/default/include/core-impl/StreamStub.h
new file mode 100644
index 0000000..98a062a
--- /dev/null
+++ b/audio/aidl/default/include/core-impl/StreamStub.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "core-impl/Stream.h"
+
+namespace aidl::android::hardware::audio::core {
+
+class DriverStub : public DriverInterface {
+ public:
+ DriverStub(const StreamContext& context, bool isInput);
+ ::android::status_t init() override;
+ ::android::status_t drain(StreamDescriptor::DrainMode) override;
+ ::android::status_t flush() override;
+ ::android::status_t pause() override;
+ ::android::status_t transfer(void* buffer, size_t frameCount, size_t* actualFrameCount,
+ int32_t* latencyMs) override;
+ ::android::status_t standby() override;
+
+ private:
+ const size_t mFrameSizeBytes;
+ const bool mIsInput;
+};
+
+class StreamInStub final : public StreamIn {
+ public:
+ static ndk::ScopedAStatus createInstance(
+ const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
+ StreamContext&& context, const std::vector<MicrophoneInfo>& microphones,
+ std::shared_ptr<StreamIn>* result);
+
+ private:
+ friend class ndk::SharedRefBase;
+ StreamInStub(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
+ StreamContext&& context, const std::vector<MicrophoneInfo>& microphones);
+};
+
+class StreamOutStub final : public StreamOut {
+ public:
+ static ndk::ScopedAStatus createInstance(
+ const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
+ StreamContext&& context,
+ const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
+ offloadInfo,
+ std::shared_ptr<StreamOut>* result);
+
+ private:
+ friend class ndk::SharedRefBase;
+ StreamOutStub(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
+ StreamContext&& context,
+ const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
+ offloadInfo);
+};
+
+} // namespace aidl::android::hardware::audio::core