Add AAudioCommandQueue in aaudio service.
With AAudioCommandQueue, all the requests to aaudio service stream can
be turned into command and executed one by one. This can help reduce
multiple thread issue in aaudio service.
Test: atest AAudioTest
Test: run oboetester
Bug: 201000721
Change-Id: I81134e036d8921d327feb029d8fda725ce65d080
diff --git a/services/oboeservice/AAudioServiceStreamBase.cpp b/services/oboeservice/AAudioServiceStreamBase.cpp
index 4ffc127..a25a791 100644
--- a/services/oboeservice/AAudioServiceStreamBase.cpp
+++ b/services/oboeservice/AAudioServiceStreamBase.cpp
@@ -34,23 +34,25 @@
#include "AAudioService.h"
#include "AAudioServiceEndpoint.h"
#include "AAudioServiceStreamBase.h"
-#include "TimestampScheduler.h"
using namespace android; // TODO just import names needed
using namespace aaudio; // TODO just import names needed
using content::AttributionSourceState;
+static const int64_t TIMEOUT_NANOS = 3LL * 1000 * 1000 * 1000;
+
/**
* Base class for streams in the service.
* @return
*/
AAudioServiceStreamBase::AAudioServiceStreamBase(AAudioService &audioService)
- : mTimestampThread("AATime")
+ : mCommandThread("AACommand")
, mAtomicStreamTimestamp()
, mAudioService(audioService) {
mMmapClient.attributionSource = AttributionSourceState();
+ mThreadEnabled = true;
}
AAudioServiceStreamBase::~AAudioServiceStreamBase() {
@@ -70,6 +72,13 @@
|| getState() == AAUDIO_STREAM_STATE_UNINITIALIZED),
"service stream %p still open, state = %d",
this, getState());
+
+ // Stop the command thread before destroying.
+ if (mThreadEnabled) {
+ mThreadEnabled = false;
+ mCommandQueue.stopWaiting();
+ mCommandThread.stop();
+ }
}
std::string AAudioServiceStreamBase::dumpHeader() {
@@ -166,6 +175,16 @@
mFramesPerBurst = mServiceEndpoint->getFramesPerBurst();
copyFrom(*mServiceEndpoint);
}
+
+ // Make sure this object does not get deleted before the run() method
+ // can protect it by making a strong pointer.
+ mThreadEnabled = true;
+ incStrong(nullptr); // See run() method.
+ result = mCommandThread.start(this);
+ if (result != AAUDIO_OK) {
+ decStrong(nullptr); // run() can't do it so we have to do it here.
+ goto error;
+ }
return result;
error:
@@ -174,8 +193,16 @@
}
aaudio_result_t AAudioServiceStreamBase::close() {
- std::lock_guard<std::mutex> lock(mLock);
- return close_l();
+ auto command = std::make_shared<AAudioCommand>(
+ CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+ aaudio_result_t result = mCommandQueue.sendCommand(command);
+
+ // Stop the command thread as the stream is closed.
+ mThreadEnabled = false;
+ mCommandQueue.stopWaiting();
+ mCommandThread.stop();
+
+ return result;
}
aaudio_result_t AAudioServiceStreamBase::close_l() {
@@ -183,8 +210,7 @@
return AAUDIO_OK;
}
- // This will call stopTimestampThread() and also stop the stream,
- // just in case it was not already stopped.
+ // This will stop the stream, just in case it was not already stopped.
stop_l();
aaudio_result_t result = AAUDIO_OK;
@@ -224,8 +250,12 @@
* An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
*/
aaudio_result_t AAudioServiceStreamBase::start() {
- std::lock_guard<std::mutex> lock(mLock);
+ auto command = std::make_shared<AAudioCommand>(
+ START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
+}
+aaudio_result_t AAudioServiceStreamBase::start_l() {
const int64_t beginNs = AudioClock::getNanoseconds();
aaudio_result_t result = AAUDIO_OK;
@@ -261,15 +291,6 @@
// This should happen at the end of the start.
sendServiceEvent(AAUDIO_SERVICE_EVENT_STARTED);
setState(AAUDIO_STREAM_STATE_STARTED);
- mThreadEnabled.store(true);
- // Make sure this object does not get deleted before the run() method
- // can protect it by making a strong pointer.
- incStrong(nullptr); // See run() method.
- result = mTimestampThread.start(this);
- if (result != AAUDIO_OK) {
- decStrong(nullptr); // run() can't do it so we have to do it here.
- goto error;
- }
return result;
@@ -279,8 +300,9 @@
}
aaudio_result_t AAudioServiceStreamBase::pause() {
- std::lock_guard<std::mutex> lock(mLock);
- return pause_l();
+ auto command = std::make_shared<AAudioCommand>(
+ PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
}
aaudio_result_t AAudioServiceStreamBase::pause_l() {
@@ -298,12 +320,6 @@
.set(AMEDIAMETRICS_PROP_STATUS, (int32_t)result)
.record(); });
- result = stopTimestampThread();
- if (result != AAUDIO_OK) {
- disconnect_l();
- return result;
- }
-
sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
if (endpoint == nullptr) {
ALOGE("%s() has no endpoint", __func__);
@@ -322,8 +338,9 @@
}
aaudio_result_t AAudioServiceStreamBase::stop() {
- std::lock_guard<std::mutex> lock(mLock);
- return stop_l();
+ auto command = std::make_shared<AAudioCommand>(
+ STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
}
aaudio_result_t AAudioServiceStreamBase::stop_l() {
@@ -343,12 +360,6 @@
setState(AAUDIO_STREAM_STATE_STOPPING);
- // Temporarily unlock because we are joining the timestamp thread and it may try
- // to acquire mLock.
- mLock.unlock();
- result = stopTimestampThread();
- mLock.lock();
-
if (result != AAUDIO_OK) {
disconnect_l();
return result;
@@ -373,17 +384,13 @@
return result;
}
-aaudio_result_t AAudioServiceStreamBase::stopTimestampThread() {
- aaudio_result_t result = AAUDIO_OK;
- // clear flag that tells thread to loop
- if (mThreadEnabled.exchange(false)) {
- result = mTimestampThread.stop();
- }
- return result;
+aaudio_result_t AAudioServiceStreamBase::flush() {
+ auto command = std::make_shared<AAudioCommand>(
+ FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
}
-aaudio_result_t AAudioServiceStreamBase::flush() {
- std::lock_guard<std::mutex> lock(mLock);
+aaudio_result_t AAudioServiceStreamBase::flush_l() {
aaudio_result_t result = AAudio_isFlushAllowed(getState());
if (result != AAUDIO_OK) {
return result;
@@ -404,48 +411,111 @@
return AAUDIO_OK;
}
-// implement Runnable, periodically send timestamps to client
+// implement Runnable, periodically send timestamps to client and process commands from queue.
__attribute__((no_sanitize("integer")))
void AAudioServiceStreamBase::run() {
- ALOGD("%s() %s entering >>>>>>>>>>>>>> TIMESTAMPS", __func__, getTypeText());
+ ALOGD("%s() %s entering >>>>>>>>>>>>>> COMMANDS", __func__, getTypeText());
// Hold onto the ref counted stream until the end.
android::sp<AAudioServiceStreamBase> holdStream(this);
TimestampScheduler timestampScheduler;
+ int64_t nextTime;
// Balance the incStrong from when the thread was launched.
holdStream->decStrong(nullptr);
- timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate());
- timestampScheduler.start(AudioClock::getNanoseconds());
- int64_t nextTime = timestampScheduler.nextAbsoluteTime();
+ // Taking mLock while starting the thread. All the operation must be able to
+ // run with holding the lock.
+ std::scoped_lock<std::mutex> _l(mLock);
+
int32_t loopCount = 0;
- aaudio_result_t result = AAUDIO_OK;
- while(mThreadEnabled.load()) {
+ while (mThreadEnabled.load()) {
loopCount++;
- if (AudioClock::getNanoseconds() >= nextTime) {
- result = sendCurrentTimestamp();
- if (result != AAUDIO_OK) {
- ALOGE("%s() timestamp thread got result = %d", __func__, result);
- break;
+ int64_t timeoutNanos = -1;
+ if (isRunning()) {
+ timeoutNanos = nextTime - AudioClock::getNanoseconds();
+ timeoutNanos = std::max<int64_t>(0, timeoutNanos);
+ }
+
+ auto command = mCommandQueue.waitForCommand(timeoutNanos);
+ if (!mThreadEnabled) {
+ // Break the loop if the thread is disabled.
+ break;
+ }
+
+ if (isRunning() && AudioClock::getNanoseconds() >= nextTime) {
+ // It is time to update timestamp.
+ if (sendCurrentTimestamp_l() != AAUDIO_OK) {
+ ALOGE("Failed to send current timestamp, stop updating timestamp");
+ disconnect_l();
+ } else {
+ nextTime = timestampScheduler.nextAbsoluteTime();
}
- nextTime = timestampScheduler.nextAbsoluteTime();
- } else {
- // Sleep until it is time to send the next timestamp.
- // TODO Wait for a signal with a timeout so that we can stop more quickly.
- AudioClock::sleepUntilNanoTime(nextTime);
+ }
+
+ if (command != nullptr) {
+ std::scoped_lock<std::mutex> _commandLock(command->lock);
+ switch (command->operationCode) {
+ case START:
+ command->result = start_l();
+ timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate());
+ timestampScheduler.start(AudioClock::getNanoseconds());
+ nextTime = timestampScheduler.nextAbsoluteTime();
+ break;
+ case PAUSE:
+ command->result = pause_l();
+ break;
+ case STOP:
+ command->result = stop_l();
+ break;
+ case FLUSH:
+ command->result = flush_l();
+ break;
+ case CLOSE:
+ command->result = close_l();
+ break;
+ case DISCONNECT:
+ disconnect_l();
+ break;
+ case REGISTER_AUDIO_THREAD: {
+ RegisterAudioThreadParam *param =
+ (RegisterAudioThreadParam *) command->parameter.get();
+ command->result =
+ param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
+ : registerAudioThread_l(param->mOwnerPid,
+ param->mClientThreadId,
+ param->mPriority);
+ }
+ break;
+ case UNREGISTER_AUDIO_THREAD: {
+ UnregisterAudioThreadParam *param =
+ (UnregisterAudioThreadParam *) command->parameter.get();
+ command->result =
+ param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
+ : unregisterAudioThread_l(param->mClientThreadId);
+ }
+ break;
+ case GET_DESCRIPTION: {
+ GetDescriptionParam *param = (GetDescriptionParam *) command->parameter.get();
+ command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
+ : getDescription_l(param->mParcelable);
+ }
+ break;
+ default:
+ ALOGE("Invalid command op code: %d", command->operationCode);
+ break;
+ }
+ if (command->isWaitingForReply) {
+ command->isWaitingForReply = false;
+ command->conditionVariable.notify_one();
+ }
}
}
- // This was moved from the calls in stop_l() and pause_l(), which could cause a deadlock
- // if it resulted in a call to disconnect.
- if (result == AAUDIO_OK) {
- (void) sendCurrentTimestamp();
- }
- ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< TIMESTAMPS",
+ ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< COMMANDS",
__func__, getTypeText(), loopCount);
}
void AAudioServiceStreamBase::disconnect() {
- std::lock_guard<std::mutex> lock(mLock);
- disconnect_l();
+ auto command = std::make_shared<AAudioCommand>(DISCONNECT);
+ mCommandQueue.sendCommand(command);
}
void AAudioServiceStreamBase::disconnect_l() {
@@ -461,15 +531,23 @@
}
}
-aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId,
- int priority) {
- std::lock_guard<std::mutex> lock(mLock);
+aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) {
+ const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
+ auto command = std::make_shared<AAudioCommand>(
+ REGISTER_AUDIO_THREAD,
+ std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority),
+ true /*waitForReply*/,
+ TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
+}
+
+aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l(
+ pid_t ownerPid, pid_t clientThreadId, int priority) {
aaudio_result_t result = AAUDIO_OK;
if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
ALOGE("AAudioService::registerAudioThread(), thread already registered");
result = AAUDIO_ERROR_INVALID_STATE;
} else {
- const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
setRegisteredThread(clientThreadId);
int err = android::requestPriority(ownerPid, clientThreadId,
priority, true /* isForApp */);
@@ -483,7 +561,15 @@
}
aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) {
- std::lock_guard<std::mutex> lock(mLock);
+ auto command = std::make_shared<AAudioCommand>(
+ UNREGISTER_AUDIO_THREAD,
+ std::make_shared<UnregisterAudioThreadParam>(clientThreadId),
+ true /*waitForReply*/,
+ TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
+}
+
+aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) {
aaudio_result_t result = AAUDIO_OK;
if (getRegisteredThread() != clientThreadId) {
ALOGE("%s(), wrong thread", __func__);
@@ -552,7 +638,7 @@
return sendServiceEvent(AAUDIO_SERVICE_EVENT_XRUN, (int64_t) xRunCount);
}
-aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() {
+aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp_l() {
AAudioServiceMessage command;
// It is not worth filling up the queue with timestamps.
// That can cause the stream to get suspended.
@@ -562,8 +648,8 @@
}
// Send a timestamp for the clock model.
- aaudio_result_t result = getFreeRunningPosition(&command.timestamp.position,
- &command.timestamp.timestamp);
+ aaudio_result_t result = getFreeRunningPosition_l(&command.timestamp.position,
+ &command.timestamp.timestamp);
if (result == AAUDIO_OK) {
ALOGV("%s() SERVICE %8lld at %lld", __func__,
(long long) command.timestamp.position,
@@ -573,8 +659,8 @@
if (result == AAUDIO_OK) {
// Send a hardware timestamp for presentation time.
- result = getHardwareTimestamp(&command.timestamp.position,
- &command.timestamp.timestamp);
+ result = getHardwareTimestamp_l(&command.timestamp.position,
+ &command.timestamp.timestamp);
if (result == AAUDIO_OK) {
ALOGV("%s() HARDWARE %8lld at %lld", __func__,
(long long) command.timestamp.position,
@@ -596,7 +682,15 @@
* used to communicate with the underlying HAL or Service.
*/
aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) {
- std::lock_guard<std::mutex> lock(mLock);
+ auto command = std::make_shared<AAudioCommand>(
+ GET_DESCRIPTION,
+ std::make_shared<GetDescriptionParam>(&parcelable),
+ true /*waitForReply*/,
+ TIMEOUT_NANOS);
+ return mCommandQueue.sendCommand(command);
+}
+
+aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) {
{
std::lock_guard<std::mutex> lock(mUpMessageQueueLock);
if (mUpMessageQueue == nullptr) {
@@ -605,9 +699,9 @@
}
// Gather information on the message queue.
mUpMessageQueue->fillParcelable(parcelable,
- parcelable.mUpMessageQueueParcelable);
+ parcelable->mUpMessageQueueParcelable);
}
- return getAudioDataDescription(parcelable);
+ return getAudioDataDescription_l(parcelable);
}
void AAudioServiceStreamBase::onVolumeChanged(float volume) {