Do not send command when aaudio command thread is not running.

If there is error when opening aaudio MMAP stream, the aaudio command
thread may not be started. In that case, if there is any command sending
to the command queue, it will always be blocked until timeout.

In this CL, sending command when the command thread is not running will
directly return AAUDIO_ERROR_INVALID_STATE.

Bug: 209520562
Test: atest AAudioTests
Test: repo steps from the bug
Change-Id: I01c076c120be9a68b984d0c365a61e77821ef117
diff --git a/services/oboeservice/AAudioCommandQueue.cpp b/services/oboeservice/AAudioCommandQueue.cpp
index ddaabe8..9bd18b3 100644
--- a/services/oboeservice/AAudioCommandQueue.cpp
+++ b/services/oboeservice/AAudioCommandQueue.cpp
@@ -28,6 +28,10 @@
 aaudio_result_t AAudioCommandQueue::sendCommand(std::shared_ptr<AAudioCommand> command) {
     {
         std::scoped_lock<std::mutex> _l(mLock);
+        if (!mRunning) {
+            ALOGE("Tried to send command while it was not running");
+            return AAUDIO_ERROR_INVALID_STATE;
+        }
         mCommands.push(command);
         mWaitWorkCond.notify_one();
     }
@@ -68,7 +72,7 @@
                 return !mRunning || !mCommands.empty();
             });
         }
-        if (!mCommands.empty()) {
+        if (!mCommands.empty() && mRunning) {
             command = mCommands.front();
             mCommands.pop();
         }
@@ -76,9 +80,27 @@
     return command;
 }
 
+void AAudioCommandQueue::startWaiting() {
+    std::scoped_lock<std::mutex> _l(mLock);
+    mRunning = true;
+}
+
 void AAudioCommandQueue::stopWaiting() {
     std::scoped_lock<std::mutex> _l(mLock);
     mRunning = false;
+    // Clear all commands in the queue as the command thread is stopped.
+    while (!mCommands.empty()) {
+        auto command = mCommands.front();
+        mCommands.pop();
+        std::scoped_lock<std::mutex> _cl(command->lock);
+        // If the command is waiting for result, returns AAUDIO_ERROR_INVALID_STATE
+        // as there is no thread waiting for the command.
+        if (command->isWaitingForReply) {
+            command->result = AAUDIO_ERROR_INVALID_STATE;
+            command->isWaitingForReply = false;
+            command->conditionVariable.notify_one();
+        }
+    }
     mWaitWorkCond.notify_one();
 }
 
diff --git a/services/oboeservice/AAudioCommandQueue.h b/services/oboeservice/AAudioCommandQueue.h
index 5f25507..64442a3 100644
--- a/services/oboeservice/AAudioCommandQueue.h
+++ b/services/oboeservice/AAudioCommandQueue.h
@@ -78,6 +78,12 @@
     std::shared_ptr<AAudioCommand> waitForCommand(int64_t timeoutNanos = -1);
 
     /**
+     * Start waiting for commands. Commands can only be pushed into the command queue after it
+     * starts waiting.
+     */
+    void startWaiting();
+
+    /**
      * Force stop waiting for next command
      */
     void stopWaiting();
@@ -87,7 +93,7 @@
     std::condition_variable mWaitWorkCond;
 
     std::queue<std::shared_ptr<AAudioCommand>> mCommands GUARDED_BY(mLock);
-    bool mRunning GUARDED_BY(mLock) = true;
+    bool mRunning GUARDED_BY(mLock) = false;
 };
 
 } // namespace aaudio
\ No newline at end of file
diff --git a/services/oboeservice/AAudioServiceStreamBase.cpp b/services/oboeservice/AAudioServiceStreamBase.cpp
index a25a791..8b5ccaa 100644
--- a/services/oboeservice/AAudioServiceStreamBase.cpp
+++ b/services/oboeservice/AAudioServiceStreamBase.cpp
@@ -52,7 +52,6 @@
         , mAtomicStreamTimestamp()
         , mAudioService(audioService) {
     mMmapClient.attributionSource = AttributionSourceState();
-    mThreadEnabled = true;
 }
 
 AAudioServiceStreamBase::~AAudioServiceStreamBase() {
@@ -178,6 +177,7 @@
 
     // Make sure this object does not get deleted before the run() method
     // can protect it by making a strong pointer.
+    mCommandQueue.startWaiting();
     mThreadEnabled = true;
     incStrong(nullptr); // See run() method.
     result = mCommandThread.start(this);
@@ -188,14 +188,15 @@
     return result;
 
 error:
-    close();
+    closeAndClear();
+    mThreadEnabled = false;
+    mCommandQueue.stopWaiting();
+    mCommandThread.stop();
     return result;
 }
 
 aaudio_result_t AAudioServiceStreamBase::close() {
-    auto command = std::make_shared<AAudioCommand>(
-            CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
-    aaudio_result_t result = mCommandQueue.sendCommand(command);
+    aaudio_result_t result = sendCommand(CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
 
     // Stop the command thread as the stream is closed.
     mThreadEnabled = false;
@@ -213,25 +214,7 @@
     // This will stop the stream, just in case it was not already stopped.
     stop_l();
 
-    aaudio_result_t result = AAUDIO_OK;
-    sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
-    if (endpoint == nullptr) {
-        result = AAUDIO_ERROR_INVALID_STATE;
-    } else {
-        endpoint->unregisterStream(this);
-        AAudioEndpointManager &endpointManager = AAudioEndpointManager::getInstance();
-        endpointManager.closeEndpoint(endpoint);
-
-        // AAudioService::closeStream() prevents two threads from closing at the same time.
-        mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns.
-    }
-
-    setState(AAUDIO_STREAM_STATE_CLOSED);
-
-    mediametrics::LogItem(mMetricsId)
-        .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_CLOSE)
-        .record();
-    return result;
+    return closeAndClear();
 }
 
 aaudio_result_t AAudioServiceStreamBase::startDevice() {
@@ -250,9 +233,7 @@
  * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
  */
 aaudio_result_t AAudioServiceStreamBase::start() {
-    auto command = std::make_shared<AAudioCommand>(
-            START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
+    return sendCommand(START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
 }
 
 aaudio_result_t AAudioServiceStreamBase::start_l() {
@@ -300,9 +281,7 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::pause() {
-    auto command = std::make_shared<AAudioCommand>(
-            PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
+    return sendCommand(PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
 }
 
 aaudio_result_t AAudioServiceStreamBase::pause_l() {
@@ -338,9 +317,7 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::stop() {
-    auto command = std::make_shared<AAudioCommand>(
-            STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
+    return sendCommand(STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
 }
 
 aaudio_result_t AAudioServiceStreamBase::stop_l() {
@@ -385,9 +362,7 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::flush() {
-    auto command = std::make_shared<AAudioCommand>(
-            FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
+    return sendCommand(FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
 }
 
 aaudio_result_t AAudioServiceStreamBase::flush_l() {
@@ -514,8 +489,7 @@
 }
 
 void AAudioServiceStreamBase::disconnect() {
-    auto command = std::make_shared<AAudioCommand>(DISCONNECT);
-    mCommandQueue.sendCommand(command);
+    sendCommand(DISCONNECT);
 }
 
 void AAudioServiceStreamBase::disconnect_l() {
@@ -533,12 +507,10 @@
 
 aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) {
     const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
-    auto command = std::make_shared<AAudioCommand>(
-            REGISTER_AUDIO_THREAD,
+    return sendCommand(REGISTER_AUDIO_THREAD,
             std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority),
             true /*waitForReply*/,
             TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
 }
 
 aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l(
@@ -561,12 +533,10 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) {
-    auto command = std::make_shared<AAudioCommand>(
-            UNREGISTER_AUDIO_THREAD,
+    return sendCommand(UNREGISTER_AUDIO_THREAD,
             std::make_shared<UnregisterAudioThreadParam>(clientThreadId),
             true /*waitForReply*/,
             TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
 }
 
 aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) {
@@ -682,12 +652,11 @@
  * used to communicate with the underlying HAL or Service.
  */
 aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) {
-    auto command = std::make_shared<AAudioCommand>(
+    return sendCommand(
             GET_DESCRIPTION,
             std::make_shared<GetDescriptionParam>(&parcelable),
             true /*waitForReply*/,
             TIMEOUT_NANOS);
-    return mCommandQueue.sendCommand(command);
 }
 
 aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) {
@@ -707,3 +676,33 @@
 void AAudioServiceStreamBase::onVolumeChanged(float volume) {
     sendServiceEvent(AAUDIO_SERVICE_EVENT_VOLUME, volume);
 }
+
+aaudio_result_t AAudioServiceStreamBase::sendCommand(aaudio_command_opcode opCode,
+                                                     std::shared_ptr<AAudioCommandParam> param,
+                                                     bool waitForReply,
+                                                     int64_t timeoutNanos) {
+    return mCommandQueue.sendCommand(std::make_shared<AAudioCommand>(
+            opCode, param, waitForReply, timeoutNanos));
+}
+
+aaudio_result_t AAudioServiceStreamBase::closeAndClear() {
+    aaudio_result_t result = AAUDIO_OK;
+    sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
+    if (endpoint == nullptr) {
+        result = AAUDIO_ERROR_INVALID_STATE;
+    } else {
+        endpoint->unregisterStream(this);
+        AAudioEndpointManager &endpointManager = AAudioEndpointManager::getInstance();
+        endpointManager.closeEndpoint(endpoint);
+
+        // AAudioService::closeStream() prevents two threads from closing at the same time.
+        mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns.
+    }
+
+    setState(AAUDIO_STREAM_STATE_CLOSED);
+
+    mediametrics::LogItem(mMetricsId)
+        .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_CLOSE)
+        .record();
+    return result;
+}
diff --git a/services/oboeservice/AAudioServiceStreamBase.h b/services/oboeservice/AAudioServiceStreamBase.h
index aa8e8cf..dddd69f 100644
--- a/services/oboeservice/AAudioServiceStreamBase.h
+++ b/services/oboeservice/AAudioServiceStreamBase.h
@@ -366,6 +366,13 @@
     aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
                                      double dataDouble);
 
+    aaudio_result_t sendCommand(aaudio_command_opcode opCode,
+                                std::shared_ptr<AAudioCommandParam> param = nullptr,
+                                bool waitForReply = false,
+                                int64_t timeoutNanos = 0);
+
+    aaudio_result_t closeAndClear();
+
     /**
      * @return true if the queue is getting full.
      */