audiohal: Make sure audio data transfer related commands go via FMQ
When outputting audio, the framework issues several HAL calls
from the same thread that writes into data FMQ. These calls
also need to be served on the same thread that writes audio data
to HAL. The same thing happens when audio input is commenced.
Add a command FMQ for passing different commands to the HAL thread.
This way, depending on the calling thread, the same call may go
either via hwbinder or via the command queue.
This dramatically reduces jitter in RTT measurements (although
doesn't improve the latency).
Bug: 30222631
Test: scripted RTT app
Change-Id: I04c826e2479d8210fd9c99756241156cda3143b6
diff --git a/audio/2.0/default/StreamIn.cpp b/audio/2.0/default/StreamIn.cpp
index ad18986..1cde4ac 100644
--- a/audio/2.0/default/StreamIn.cpp
+++ b/audio/2.0/default/StreamIn.cpp
@@ -38,6 +38,7 @@
// ReadThread's lifespan never exceeds StreamIn's lifespan.
ReadThread(std::atomic<bool>* stop,
audio_stream_in_t* stream,
+ StreamIn::CommandMQ* commandMQ,
StreamIn::DataMQ* dataMQ,
StreamIn::StatusMQ* statusMQ,
EventFlag* efGroup,
@@ -45,6 +46,7 @@
: Thread(false /*canCallJava*/),
mStop(stop),
mStream(stream),
+ mCommandMQ(commandMQ),
mDataMQ(dataMQ),
mStatusMQ(statusMQ),
mEfGroup(efGroup),
@@ -58,13 +60,19 @@
private:
std::atomic<bool>* mStop;
audio_stream_in_t* mStream;
+ StreamIn::CommandMQ* mCommandMQ;
StreamIn::DataMQ* mDataMQ;
StreamIn::StatusMQ* mStatusMQ;
EventFlag* mEfGroup;
ThreadPriority mThreadPriority;
std::unique_ptr<uint8_t[]> mBuffer;
+ IStreamIn::ReadParameters mParameters;
+ IStreamIn::ReadStatus mStatus;
bool threadLoop() override;
+
+ void doGetCapturePosition();
+ void doRead();
};
status_t ReadThread::readyToRun() {
@@ -77,6 +85,32 @@
return OK;
}
+void ReadThread::doRead() {
+ size_t availableToWrite = mDataMQ->availableToWrite();
+ size_t requestedToRead = mParameters.params.read;
+ if (requestedToRead > availableToWrite) {
+ ALOGW("truncating read data from %d to %d due to insufficient data queue space",
+ (int32_t)requestedToRead, (int32_t)availableToWrite);
+ requestedToRead = availableToWrite;
+ }
+ ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead);
+ mStatus.retval = Result::OK;
+ uint64_t read = 0;
+ if (readResult >= 0) {
+ mStatus.reply.read = readResult;
+ if (!mDataMQ->write(&mBuffer[0], readResult)) {
+ ALOGW("data message queue write failed");
+ }
+ } else {
+ mStatus.retval = Stream::analyzeStatus("read", readResult);
+ }
+}
+
+void ReadThread::doGetCapturePosition() {
+ mStatus.retval = StreamIn::getCapturePositionImpl(
+ mStream, &mStatus.reply.capturePosition.frames, &mStatus.reply.capturePosition.time);
+}
+
bool ReadThread::threadLoop() {
// This implementation doesn't return control back to the Thread until it decides to stop,
// as the Thread uses mutexes, and this can lead to priority inversion.
@@ -87,21 +121,23 @@
if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) {
continue; // Nothing to do.
}
-
- const size_t availToWrite = mDataMQ->availableToWrite();
- ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite);
- Result retval = Result::OK;
- uint64_t read = 0;
- if (readResult >= 0) {
- read = readResult;
- if (!mDataMQ->write(&mBuffer[0], readResult)) {
- ALOGW("data message queue write failed");
- }
- } else {
- retval = Stream::analyzeStatus("read", readResult);
+ if (!mCommandMQ->read(&mParameters)) {
+ continue; // Nothing to do.
}
- IStreamIn::ReadStatus status = { retval, read };
- if (!mStatusMQ->write(&status)) {
+ mStatus.replyTo = mParameters.command;
+ switch (mParameters.command) {
+ case IStreamIn::ReadCommand::READ:
+ doRead();
+ break;
+ case IStreamIn::ReadCommand::GET_CAPTURE_POSITION:
+ doGetCapturePosition();
+ break;
+ default:
+ ALOGE("Unknown read thread command code %d", mParameters.command);
+ mStatus.retval = Result::NOT_SUPPORTED;
+ break;
+ }
+ if (!mStatusMQ->write(&mStatus)) {
ALOGW("status message queue write failed");
}
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
@@ -275,17 +311,19 @@
if (mDataMQ) {
ALOGE("the client attempts to call prepareForReading twice");
_hidl_cb(Result::INVALID_STATE,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
+ std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1));
std::unique_ptr<DataMQ> tempDataMQ(
new DataMQ(frameSize * framesCount, true /* EventFlag */));
std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
- if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid");
ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
// TODO: Remove event flag management once blocking MQ is implemented. b/33815422
@@ -293,7 +331,7 @@
if (status != OK || !mEfGroup) {
ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
@@ -301,6 +339,7 @@
mReadThread = new ReadThread(
&mStopReadThread,
mStream,
+ tempCommandMQ.get(),
tempDataMQ.get(),
tempStatusMQ.get(),
mEfGroup,
@@ -309,13 +348,14 @@
if (status != OK) {
ALOGW("failed to start reader thread: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
+ mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
- _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
+ _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void();
}
@@ -323,22 +363,28 @@
return mStream->get_input_frames_lost(mStream);
}
-Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) {
+// static
+Result StreamIn::getCapturePositionImpl(
+ audio_stream_in_t *stream, uint64_t *frames, uint64_t *time) {
Result retval(Result::NOT_SUPPORTED);
- uint64_t frames = 0, time = 0;
- if (mStream->get_capture_position != NULL) {
- int64_t halFrames, halTime;
- retval = Stream::analyzeStatus(
- "get_capture_position",
- mStream->get_capture_position(mStream, &halFrames, &halTime),
- // HAL may have a stub function, always returning ENOSYS, don't
- // spam the log in this case.
- ENOSYS);
- if (retval == Result::OK) {
- frames = halFrames;
- time = halTime;
- }
+ if (stream->get_capture_position != NULL) return retval;
+ int64_t halFrames, halTime;
+ retval = Stream::analyzeStatus(
+ "get_capture_position",
+ stream->get_capture_position(stream, &halFrames, &halTime),
+ // HAL may have a stub function, always returning ENOSYS, don't
+ // spam the log in this case.
+ ENOSYS);
+ if (retval == Result::OK) {
+ *frames = halFrames;
+ *time = halTime;
}
+ return retval;
+};
+
+Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) {
+ uint64_t frames = 0, time = 0;
+ Result retval = getCapturePositionImpl(mStream, &frames, &time);
_hidl_cb(retval, frames, time);
return Void();
}
diff --git a/audio/2.0/default/StreamIn.h b/audio/2.0/default/StreamIn.h
index fc813d9..3566430 100644
--- a/audio/2.0/default/StreamIn.h
+++ b/audio/2.0/default/StreamIn.h
@@ -52,6 +52,7 @@
using ::android::sp;
struct StreamIn : public IStreamIn {
+ typedef MessageQueue<ReadParameters, kSynchronizedReadWrite> CommandMQ;
typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
typedef MessageQueue<ReadStatus, kSynchronizedReadWrite> StatusMQ;
@@ -97,12 +98,16 @@
Return<void> createMmapBuffer(int32_t minSizeFrames, createMmapBuffer_cb _hidl_cb) override;
Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override;
+ static Result getCapturePositionImpl(
+ audio_stream_in_t *stream, uint64_t *frames, uint64_t *time);
+
private:
bool mIsClosed;
audio_hw_device_t *mDevice;
audio_stream_in_t *mStream;
sp<Stream> mStreamCommon;
sp<StreamMmap<audio_stream_in_t>> mStreamMmap;
+ std::unique_ptr<CommandMQ> mCommandMQ;
std::unique_ptr<DataMQ> mDataMQ;
std::unique_ptr<StatusMQ> mStatusMQ;
EventFlag* mEfGroup;
diff --git a/audio/2.0/default/StreamOut.cpp b/audio/2.0/default/StreamOut.cpp
index 1948b68..5f187c5 100644
--- a/audio/2.0/default/StreamOut.cpp
+++ b/audio/2.0/default/StreamOut.cpp
@@ -36,6 +36,7 @@
// WriteThread's lifespan never exceeds StreamOut's lifespan.
WriteThread(std::atomic<bool>* stop,
audio_stream_out_t* stream,
+ StreamOut::CommandMQ* commandMQ,
StreamOut::DataMQ* dataMQ,
StreamOut::StatusMQ* statusMQ,
EventFlag* efGroup,
@@ -43,6 +44,7 @@
: Thread(false /*canCallJava*/),
mStop(stop),
mStream(stream),
+ mCommandMQ(commandMQ),
mDataMQ(dataMQ),
mStatusMQ(statusMQ),
mEfGroup(efGroup),
@@ -56,13 +58,19 @@
private:
std::atomic<bool>* mStop;
audio_stream_out_t* mStream;
+ StreamOut::CommandMQ* mCommandMQ;
StreamOut::DataMQ* mDataMQ;
StreamOut::StatusMQ* mStatusMQ;
EventFlag* mEfGroup;
ThreadPriority mThreadPriority;
std::unique_ptr<uint8_t[]> mBuffer;
+ IStreamOut::WriteStatus mStatus;
bool threadLoop() override;
+
+ void doGetLatency();
+ void doGetPresentationPosition();
+ void doWrite();
};
status_t WriteThread::readyToRun() {
@@ -75,6 +83,32 @@
return OK;
}
+void WriteThread::doWrite() {
+ const size_t availToRead = mDataMQ->availableToRead();
+ mStatus.retval = Result::OK;
+ mStatus.reply.written = 0;
+ if (mDataMQ->read(&mBuffer[0], availToRead)) {
+ ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
+ if (writeResult >= 0) {
+ mStatus.reply.written = writeResult;
+ } else {
+ mStatus.retval = Stream::analyzeStatus("write", writeResult);
+ }
+ }
+}
+
+void WriteThread::doGetPresentationPosition() {
+ mStatus.retval = StreamOut::getPresentationPositionImpl(
+ mStream,
+ &mStatus.reply.presentationPosition.frames,
+ &mStatus.reply.presentationPosition.timeStamp);
+}
+
+void WriteThread::doGetLatency() {
+ mStatus.retval = Result::OK;
+ mStatus.reply.latencyMs = mStream->get_latency(mStream);
+}
+
bool WriteThread::threadLoop() {
// This implementation doesn't return control back to the Thread until it decides to stop,
// as the Thread uses mutexes, and this can lead to priority inversion.
@@ -86,24 +120,26 @@
if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
continue; // Nothing to do.
}
-
- const size_t availToRead = mDataMQ->availableToRead();
- IStreamOut::WriteStatus status;
- status.writeRetval = Result::OK;
- status.written = 0;
- if (mDataMQ->read(&mBuffer[0], availToRead)) {
- ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
- if (writeResult >= 0) {
- status.written = writeResult;
- } else {
- status.writeRetval = Stream::analyzeStatus("write", writeResult);
- }
+ if (!mCommandMQ->read(&mStatus.replyTo)) {
+ continue; // Nothing to do.
}
- status.presentationPositionRetval = status.writeRetval == Result::OK ?
- StreamOut::getPresentationPositionImpl(mStream, &status.frames, &status.timeStamp) :
- Result::OK;
- if (!mStatusMQ->write(&status)) {
- ALOGW("status message queue write failed");
+ switch (mStatus.replyTo) {
+ case IStreamOut::WriteCommand::WRITE:
+ doWrite();
+ break;
+ case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
+ doGetPresentationPosition();
+ break;
+ case IStreamOut::WriteCommand::GET_LATENCY:
+ doGetLatency();
+ break;
+ default:
+ ALOGE("Unknown write thread command code %d", mStatus.replyTo);
+ mStatus.retval = Result::NOT_SUPPORTED;
+ break;
+ }
+ if (!mStatusMQ->write(&mStatus)) {
+ ALOGE("status message queue write failed");
}
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
}
@@ -259,17 +295,19 @@
if (mDataMQ) {
ALOGE("the client attempts to call prepareForWriting twice");
_hidl_cb(Result::INVALID_STATE,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
+ std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1));
std::unique_ptr<DataMQ> tempDataMQ(
new DataMQ(frameSize * framesCount, true /* EventFlag */));
std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
- if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
+ ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid");
ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
// TODO: Remove event flag management once blocking MQ is implemented. b/33815422
@@ -277,7 +315,7 @@
if (status != OK || !mEfGroup) {
ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
@@ -285,6 +323,7 @@
mWriteThread = new WriteThread(
&mStopWriteThread,
mStream,
+ tempCommandMQ.get(),
tempDataMQ.get(),
tempStatusMQ.get(),
mEfGroup,
@@ -293,13 +332,14 @@
if (status != OK) {
ALOGW("failed to start writer thread: %s", strerror(-status));
_hidl_cb(Result::INVALID_ARGUMENTS,
- DataMQ::Descriptor(), StatusMQ::Descriptor());
+ CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor());
return Void();
}
+ mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
- _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
+ _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void();
}
diff --git a/audio/2.0/default/StreamOut.h b/audio/2.0/default/StreamOut.h
index 754a0c0..6616557 100644
--- a/audio/2.0/default/StreamOut.h
+++ b/audio/2.0/default/StreamOut.h
@@ -54,6 +54,7 @@
using ::android::sp;
struct StreamOut : public IStreamOut {
+ typedef MessageQueue<WriteCommand, kSynchronizedReadWrite> CommandMQ;
typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
typedef MessageQueue<WriteStatus, kSynchronizedReadWrite> StatusMQ;
@@ -118,6 +119,7 @@
sp<Stream> mStreamCommon;
sp<StreamMmap<audio_stream_out_t>> mStreamMmap;
sp<IStreamOutCallback> mCallback;
+ std::unique_ptr<CommandMQ> mCommandMQ;
std::unique_ptr<DataMQ> mDataMQ;
std::unique_ptr<StatusMQ> mStatusMQ;
EventFlag* mEfGroup;