audiohal: Re-implement stream read and write using FMQ
Result: no hwbinder calls due read / write session.
Added IStream.close method for explicitly freeing up of resources
consumed by the stream before automatic server objects reaping
gets to it.
Test: make, perform Loopback RTT, check traces
Bug: 30222631
Change-Id: I678559f6ef30026685df787cd2ba7c2ee449ed27
diff --git a/audio/2.0/default/StreamOut.cpp b/audio/2.0/default/StreamOut.cpp
index 3d20d11..4bb2274 100644
--- a/audio/2.0/default/StreamOut.cpp
+++ b/audio/2.0/default/StreamOut.cpp
@@ -17,8 +17,9 @@
#define LOG_TAG "StreamOutHAL"
//#define LOG_NDEBUG 0
-#include <hardware/audio.h>
#include <android/log.h>
+#include <hardware/audio.h>
+#include <mediautils/SchedulingPolicyService.h>
#include "StreamOut.h"
@@ -28,15 +29,103 @@
namespace V2_0 {
namespace implementation {
+namespace {
+
+class WriteThread : public Thread {
+ public:
+ // WriteThread's lifespan never exceeds StreamOut's lifespan.
+ WriteThread(std::atomic<bool>* stop,
+ audio_stream_out_t* stream,
+ StreamOut::DataMQ* dataMQ,
+ StreamOut::StatusMQ* statusMQ,
+ EventFlag* efGroup,
+ ThreadPriority threadPriority)
+ : Thread(false /*canCallJava*/),
+ mStop(stop),
+ mStream(stream),
+ mDataMQ(dataMQ),
+ mStatusMQ(statusMQ),
+ mEfGroup(efGroup),
+ mThreadPriority(threadPriority),
+ mBuffer(new uint8_t[dataMQ->getQuantumCount()]) {
+ }
+ virtual ~WriteThread() {}
+
+ status_t readyToRun() override;
+
+ private:
+ std::atomic<bool>* mStop;
+ audio_stream_out_t* mStream;
+ StreamOut::DataMQ* mDataMQ;
+ StreamOut::StatusMQ* mStatusMQ;
+ EventFlag* mEfGroup;
+ ThreadPriority mThreadPriority;
+ std::unique_ptr<uint8_t[]> mBuffer;
+
+ bool threadLoop() override;
+};
+
+status_t WriteThread::readyToRun() {
+ if (mThreadPriority != ThreadPriority::NORMAL) {
+ int err = requestPriority(
+ getpid(), getTid(), static_cast<int>(mThreadPriority), true /*asynchronous*/);
+ ALOGW_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
+ static_cast<int>(mThreadPriority), getpid(), getTid(), err);
+ }
+ return OK;
+}
+
+bool WriteThread::threadLoop() {
+ // This implementation doesn't return control back to the Thread until it decides to stop,
+ // as the Thread uses mutexes, and this can lead to priority inversion.
+ while(!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
+ // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+ uint32_t efState = 0;
+ mEfGroup->wait(
+ static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
+ if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
+ continue; // Nothing to do.
+ }
+
+ const size_t availToRead = mDataMQ->availableToRead();
+ Result retval = Result::OK;
+ uint64_t written = 0;
+ if (mDataMQ->read(&mBuffer[0], availToRead)) {
+ ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
+ if (writeResult >= 0) {
+ written = writeResult;
+ } else {
+ retval = Stream::analyzeStatus("write", writeResult);
+ }
+ }
+ uint64_t frames = 0;
+ struct timespec halTimeStamp = { 0, 0 };
+ if (retval == Result::OK && mStream->get_presentation_position != NULL) {
+ mStream->get_presentation_position(mStream, &frames, &halTimeStamp);
+ }
+ IStreamOut::WriteStatus status = { retval, written, frames,
+ { static_cast<uint64_t>(halTimeStamp.tv_sec),
+ static_cast<uint64_t>(halTimeStamp.tv_nsec) } };
+ if (!mStatusMQ->write(&status)) {
+ ALOGW("status message queue write failed");
+ }
+ mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+ }
+
+ return false;
+}
+
+} // namespace
+
StreamOut::StreamOut(audio_hw_device_t* device, audio_stream_out_t* stream)
- : mDevice(device), mStream(stream),
+ : mIsClosed(false), mDevice(device), mStream(stream),
mStreamCommon(new Stream(&stream->common)),
- mStreamMmap(new StreamMmap<audio_stream_out_t>(stream)) {
+ mStreamMmap(new StreamMmap<audio_stream_out_t>(stream)),
+ mEfGroup(nullptr), mStopWriteThread(false) {
}
StreamOut::~StreamOut() {
- mCallback.clear();
- mDevice->close_output_stream(mDevice, mStream);
+ close();
mStream = nullptr;
mDevice = nullptr;
}
@@ -135,6 +224,23 @@
return mStreamCommon->debugDump(fd);
}
+Return<Result> StreamOut::close() {
+ if (mIsClosed) return Result::INVALID_STATE;
+ mIsClosed = true;
+ if (mWriteThread.get()) {
+ mStopWriteThread.store(true, std::memory_order_release);
+ status_t status = mWriteThread->requestExitAndWait();
+ ALOGE_IF(status, "write thread exit error: %s", strerror(-status));
+ }
+ if (mEfGroup) {
+ status_t status = EventFlag::deleteEventFlag(&mEfGroup);
+ ALOGE_IF(status, "write MQ event flag deletion error: %s", strerror(-status));
+ }
+ mCallback.clear();
+ mDevice->close_output_stream(mDevice, mStream);
+ return Result::OK;
+}
+
// Methods from ::android::hardware::audio::V2_0::IStreamOut follow.
Return<uint32_t> StreamOut::getLatency() {
return mStream->get_latency(mStream);
@@ -149,18 +255,55 @@
return retval;
}
-Return<void> StreamOut::write(const hidl_vec<uint8_t>& data, write_cb _hidl_cb) {
- // TODO(mnaganov): Replace with FMQ version.
- Result retval(Result::OK);
- uint64_t written = 0;
- ssize_t writeResult = mStream->write(mStream, &data[0], data.size());
- if (writeResult >= 0) {
- written = writeResult;
- } else {
- retval = Stream::analyzeStatus("write", writeResult);
- written = 0;
+Return<void> StreamOut::prepareForWriting(
+ uint32_t frameSize, uint32_t framesCount, ThreadPriority threadPriority,
+ prepareForWriting_cb _hidl_cb) {
+ status_t status;
+ // Create message queues.
+ if (mDataMQ) {
+ ALOGE("the client attempts to call prepareForWriting twice");
+ _hidl_cb(Result::INVALID_STATE,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+ return Void();
}
- _hidl_cb(retval, written);
+ std::unique_ptr<DataMQ> tempDataMQ(
+ new DataMQ(frameSize * framesCount, true /* EventFlag */));
+ std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
+ if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
+ ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
+ _hidl_cb(Result::INVALID_ARGUMENTS,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+ return Void();
+ }
+ // TODO: Remove event flag management once blocking MQ is implemented. b/33815422
+ status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+ if (status != OK || !mEfGroup) {
+ ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
+ _hidl_cb(Result::INVALID_ARGUMENTS,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+ return Void();
+ }
+
+ // Create and launch the thread.
+ mWriteThread = new WriteThread(
+ &mStopWriteThread,
+ mStream,
+ tempDataMQ.get(),
+ tempStatusMQ.get(),
+ mEfGroup,
+ threadPriority);
+ status = mWriteThread->run("writer", PRIORITY_URGENT_AUDIO);
+ if (status != OK) {
+ ALOGW("failed to start writer thread: %s", strerror(-status));
+ _hidl_cb(Result::INVALID_ARGUMENTS,
+ MQDescriptorSync<uint8_t>(), MQDescriptorSync<WriteStatus>());
+ return Void();
+ }
+
+ mDataMQ = std::move(tempDataMQ);
+ mStatusMQ = std::move(tempStatusMQ);
+ _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void();
}