audiohal: Re-implement stream read and write using FMQ
Result: no hwbinder calls due read / write session.
Added IStream.close method for explicitly freeing up of resources
consumed by the stream before automatic server objects reaping
gets to it.
Test: make, perform Loopback RTT, check traces
Bug: 30222631
Change-Id: I678559f6ef30026685df787cd2ba7c2ee449ed27
diff --git a/audio/2.0/default/StreamIn.cpp b/audio/2.0/default/StreamIn.cpp
index 1441e74..51c2cc7 100644
--- a/audio/2.0/default/StreamIn.cpp
+++ b/audio/2.0/default/StreamIn.cpp
@@ -15,26 +15,112 @@
*/
#define LOG_TAG "StreamInHAL"
+//#define LOG_NDEBUG 0
-#include <hardware/audio.h>
#include <android/log.h>
+#include <hardware/audio.h>
+#include <mediautils/SchedulingPolicyService.h>
#include "StreamIn.h"
+using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
+
namespace android {
namespace hardware {
namespace audio {
namespace V2_0 {
namespace implementation {
+namespace {
+
+class ReadThread : public Thread {
+ public:
+ // ReadThread's lifespan never exceeds StreamIn's lifespan.
+ ReadThread(std::atomic<bool>* stop,
+ audio_stream_in_t* stream,
+ StreamIn::DataMQ* dataMQ,
+ StreamIn::StatusMQ* statusMQ,
+ EventFlag* efGroup,
+ ThreadPriority threadPriority)
+ : Thread(false /*canCallJava*/),
+ mStop(stop),
+ mStream(stream),
+ mDataMQ(dataMQ),
+ mStatusMQ(statusMQ),
+ mEfGroup(efGroup),
+ mThreadPriority(threadPriority),
+ mBuffer(new uint8_t[dataMQ->getQuantumCount()]) {
+ }
+ virtual ~ReadThread() {}
+
+ status_t readyToRun() override;
+
+ private:
+ std::atomic<bool>* mStop;
+ audio_stream_in_t* mStream;
+ StreamIn::DataMQ* mDataMQ;
+ StreamIn::StatusMQ* mStatusMQ;
+ EventFlag* mEfGroup;
+ ThreadPriority mThreadPriority;
+ std::unique_ptr<uint8_t[]> mBuffer;
+
+ bool threadLoop() override;
+};
+
+status_t ReadThread::readyToRun() {
+ if (mThreadPriority != ThreadPriority::NORMAL) {
+ int err = requestPriority(
+ getpid(), getTid(), static_cast<int>(mThreadPriority), true /*asynchronous*/);
+ ALOGW_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
+ static_cast<int>(mThreadPriority), getpid(), getTid(), err);
+ }
+ return OK;
+}
+
+bool ReadThread::threadLoop() {
+ // This implementation doesn't return control back to the Thread until it decides to stop,
+ // as the Thread uses mutexes, and this can lead to priority inversion.
+ while(!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
+ // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+ uint32_t efState = 0;
+ mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
+ if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) {
+ continue; // Nothing to do.
+ }
+
+ const size_t availToWrite = mDataMQ->availableToWrite();
+ ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite);
+ Result retval = Result::OK;
+ uint64_t read = 0;
+ if (readResult >= 0) {
+ read = readResult;
+ if (!mDataMQ->write(&mBuffer[0], readResult)) {
+ ALOGW("data message queue write failed");
+ }
+ } else {
+ retval = Stream::analyzeStatus("read", readResult);
+ }
+ IStreamIn::ReadStatus status = { retval, read };
+ if (!mStatusMQ->write(&status)) {
+ ALOGW("status message queue write failed");
+ }
+ mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
+ }
+
+ return false;
+}
+
+} // namespace
+
StreamIn::StreamIn(audio_hw_device_t* device, audio_stream_in_t* stream)
- : mDevice(device), mStream(stream),
+ : mIsClosed(false), mDevice(device), mStream(stream),
mStreamCommon(new Stream(&stream->common)),
- mStreamMmap(new StreamMmap<audio_stream_in_t>(stream)) {
+ mStreamMmap(new StreamMmap<audio_stream_in_t>(stream)),
+ mEfGroup(nullptr), mStopReadThread(false) {
}
StreamIn::~StreamIn() {
- mDevice->close_input_stream(mDevice, mStream);
+ close();
mStream = nullptr;
mDevice = nullptr;
}
@@ -149,6 +235,22 @@
return mStreamMmap->getMmapPosition(_hidl_cb);
}
+Return<Result> StreamIn::close() {
+ if (mIsClosed) return Result::INVALID_STATE;
+ mIsClosed = true;
+ if (mReadThread.get()) {
+ mStopReadThread.store(true, std::memory_order_release);
+ status_t status = mReadThread->requestExitAndWait();
+ ALOGE_IF(status, "read thread exit error: %s", strerror(-status));
+ }
+ if (mEfGroup) {
+ status_t status = EventFlag::deleteEventFlag(&mEfGroup);
+ ALOGE_IF(status, "read MQ event flag deletion error: %s", strerror(-status));
+ }
+ mDevice->close_input_stream(mDevice, mStream);
+ return Result::OK;
+}
+
// Methods from ::android::hardware::audio::V2_0::IStreamIn follow.
Return<void> StreamIn::getAudioSource(getAudioSource_cb _hidl_cb) {
int halSource;
@@ -165,19 +267,55 @@
return Stream::analyzeStatus("set_gain", mStream->set_gain(mStream, gain));
}
-Return<void> StreamIn::read(uint64_t size, read_cb _hidl_cb) {
- // TODO(mnaganov): Replace with FMQ version.
- hidl_vec<uint8_t> data;
- data.resize(size);
- Result retval(Result::OK);
- ssize_t readResult = mStream->read(mStream, &data[0], data.size());
- if (readResult >= 0 && static_cast<size_t>(readResult) != data.size()) {
- data.resize(readResult);
- } else if (readResult < 0) {
- data.resize(0);
- retval = Stream::analyzeStatus("read", readResult);
+Return<void> StreamIn::prepareForReading(
+ uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+ prepareForReading_cb _hidl_cb) {
+ status_t status;
+ // Create message queues.
+ if (mDataMQ) {
+ ALOGE("the client attempts to call prepareForReading twice");
+ _hidl_cb(Result::INVALID_STATE,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+ return Void();
}
- _hidl_cb(retval, data);
+ std::unique_ptr<DataMQ> tempDataMQ(
+ new DataMQ(frameSize * framesCount, true /* EventFlag */));
+ std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
+ if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
+ ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
+ _hidl_cb(Result::INVALID_ARGUMENTS,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+ return Void();
+ }
+ // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
+ status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+ if (status != OK || !mEfGroup) {
+ ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
+ _hidl_cb(Result::INVALID_ARGUMENTS,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+ return Void();
+ }
+
+ // Create and launch the thread.
+ mReadThread = new ReadThread(
+ &mStopReadThread,
+ mStream,
+ tempDataMQ.get(),
+ tempStatusMQ.get(),
+ mEfGroup,
+ threadPriority);
+ status = mReadThread->run("reader", PRIORITY_URGENT_AUDIO);
+ if (status != OK) {
+ ALOGW("failed to start reader thread: %s", strerror(-status));
+ _hidl_cb(Result::INVALID_ARGUMENTS,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<ReadStatus>());
+ return Void();
+ }
+
+ mDataMQ = std::move(tempDataMQ);
+ mStatusMQ = std::move(tempStatusMQ);
+ _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void();
}