Add AAudioCommandQueue in aaudio service.

With AAudioCommandQueue, all the requests to aaudio service stream can
be turned into command and executed one by one. This can help reduce
multiple thread issue in aaudio service.

Test: atest AAudioTest
Test: run oboetester
Bug: 201000721
Change-Id: I81134e036d8921d327feb029d8fda725ce65d080
diff --git a/services/oboeservice/AAudioCommandQueue.cpp b/services/oboeservice/AAudioCommandQueue.cpp
new file mode 100644
index 0000000..ddaabe8
--- /dev/null
+++ b/services/oboeservice/AAudioCommandQueue.cpp
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "AAudioCommandQueue"
+//#define LOG_NDEBUG 0
+
+#include <chrono>
+
+#include <utils/Log.h>
+
+#include "AAudioCommandQueue.h"
+
+namespace aaudio {
+
+aaudio_result_t AAudioCommandQueue::sendCommand(std::shared_ptr<AAudioCommand> command) {
+    {
+        std::scoped_lock<std::mutex> _l(mLock);
+        mCommands.push(command);
+        mWaitWorkCond.notify_one();
+    }
+
+    std::unique_lock _cl(command->lock);
+    android::base::ScopedLockAssertion lockAssertion(command->lock);
+    ALOGV("Sending command %d, wait for reply(%d) with timeout %jd",
+           command->operationCode, command->isWaitingForReply, command->timeoutNanoseconds);
+    // `mWaitForReply` is first initialized when the command is constructed. It will be flipped
+    // when the command is completed.
+    auto timeoutExpire = std::chrono::steady_clock::now()
+            + std::chrono::nanoseconds(command->timeoutNanoseconds);
+    while (command->isWaitingForReply) {
+        if (command->conditionVariable.wait_until(_cl, timeoutExpire)
+                == std::cv_status::timeout) {
+            ALOGD("Command %d time out", command->operationCode);
+            command->result = AAUDIO_ERROR_TIMEOUT;
+            command->isWaitingForReply = false;
+        }
+    }
+    ALOGV("Command %d sent with result as %d", command->operationCode, command->result);
+    return command->result;
+}
+
+std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeoutNanos) {
+    std::shared_ptr<AAudioCommand> command;
+    {
+        std::unique_lock _l(mLock);
+        android::base::ScopedLockAssertion lockAssertion(mLock);
+        if (timeoutNanos >= 0) {
+            mWaitWorkCond.wait_for(_l, std::chrono::nanoseconds(timeoutNanos), [this]() {
+                android::base::ScopedLockAssertion lockAssertion(mLock);
+                return !mRunning || !mCommands.empty();
+            });
+        } else {
+            mWaitWorkCond.wait(_l, [this]() {
+                android::base::ScopedLockAssertion lockAssertion(mLock);
+                return !mRunning || !mCommands.empty();
+            });
+        }
+        if (!mCommands.empty()) {
+            command = mCommands.front();
+            mCommands.pop();
+        }
+    }
+    return command;
+}
+
+void AAudioCommandQueue::stopWaiting() {
+    std::scoped_lock<std::mutex> _l(mLock);
+    mRunning = false;
+    mWaitWorkCond.notify_one();
+}
+
+} // namespace aaudio
\ No newline at end of file
diff --git a/services/oboeservice/AAudioCommandQueue.h b/services/oboeservice/AAudioCommandQueue.h
new file mode 100644
index 0000000..5f25507
--- /dev/null
+++ b/services/oboeservice/AAudioCommandQueue.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <queue>
+
+#include <aaudio/AAudio.h>
+#include <android-base/thread_annotations.h>
+
+namespace aaudio {
+
+typedef int32_t aaudio_command_opcode;
+
+class AAudioCommandParam {
+public:
+    AAudioCommandParam() = default;
+    virtual ~AAudioCommandParam() = default;
+};
+
+class AAudioCommand {
+public:
+    explicit AAudioCommand(
+            aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param = nullptr,
+            bool waitForReply = false, int64_t timeoutNanos = 0)
+            : operationCode(opCode), parameter(param), isWaitingForReply(waitForReply),
+              timeoutNanoseconds(timeoutNanos) { }
+    virtual ~AAudioCommand() = default;
+
+    std::mutex lock;
+    std::condition_variable conditionVariable;
+
+    const aaudio_command_opcode operationCode;
+    std::shared_ptr<AAudioCommandParam> parameter;
+    bool isWaitingForReply GUARDED_BY(lock);
+    const int64_t timeoutNanoseconds;
+    aaudio_result_t result GUARDED_BY(lock) = AAUDIO_OK;
+};
+
+class AAudioCommandQueue {
+public:
+    AAudioCommandQueue() = default;
+    ~AAudioCommandQueue() = default;
+
+    /**
+     * Send a command to the command queue. The return will be waiting for a specified timeout
+     * period indicated by the command if it is required.
+     *
+     * @param command the command to send to the command queue.
+     * @return the result of sending the command or the result of executing the command if command
+     *         need to wait for a reply. If timeout happens, AAUDIO_ERROR_TIMEOUT will be returned.
+     */
+    aaudio_result_t sendCommand(std::shared_ptr<AAudioCommand> command);
+
+    /**
+     * Wait for next available command OR until the timeout is expired.
+     *
+     * @param timeoutNanos the maximum time to wait for next command (0 means return immediately in
+     *                     any case), negative to wait forever.
+     * @return the next available command if any or a nullptr when there is none.
+     */
+    std::shared_ptr<AAudioCommand> waitForCommand(int64_t timeoutNanos = -1);
+
+    /**
+     * Force stop waiting for next command
+     */
+    void stopWaiting();
+
+private:
+    std::mutex mLock;
+    std::condition_variable mWaitWorkCond;
+
+    std::queue<std::shared_ptr<AAudioCommand>> mCommands GUARDED_BY(mLock);
+    bool mRunning GUARDED_BY(mLock) = true;
+};
+
+} // namespace aaudio
\ No newline at end of file
diff --git a/services/oboeservice/AAudioServiceEndpointMMAP.cpp b/services/oboeservice/AAudioServiceEndpointMMAP.cpp
index 35a0890..42b07ae 100644
--- a/services/oboeservice/AAudioServiceEndpointMMAP.cpp
+++ b/services/oboeservice/AAudioServiceEndpointMMAP.cpp
@@ -401,16 +401,17 @@
 /**
  * Get an immutable description of the data queue from the HAL.
  */
-aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription(AudioEndpointParcelable &parcelable)
+aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription(
+        AudioEndpointParcelable* parcelable)
 {
     // Gather information on the data queue based on HAL info.
     int32_t bytesPerFrame = calculateBytesPerFrame();
     int32_t capacityInBytes = getBufferCapacity() * bytesPerFrame;
-    int fdIndex = parcelable.addFileDescriptor(mAudioDataFileDescriptor, capacityInBytes);
-    parcelable.mDownDataQueueParcelable.setupMemory(fdIndex, 0, capacityInBytes);
-    parcelable.mDownDataQueueParcelable.setBytesPerFrame(bytesPerFrame);
-    parcelable.mDownDataQueueParcelable.setFramesPerBurst(mFramesPerBurst);
-    parcelable.mDownDataQueueParcelable.setCapacityInFrames(getBufferCapacity());
+    int fdIndex = parcelable->addFileDescriptor(mAudioDataFileDescriptor, capacityInBytes);
+    parcelable->mDownDataQueueParcelable.setupMemory(fdIndex, 0, capacityInBytes);
+    parcelable->mDownDataQueueParcelable.setBytesPerFrame(bytesPerFrame);
+    parcelable->mDownDataQueueParcelable.setFramesPerBurst(mFramesPerBurst);
+    parcelable->mDownDataQueueParcelable.setCapacityInFrames(getBufferCapacity());
     return AAUDIO_OK;
 }
 
diff --git a/services/oboeservice/AAudioServiceEndpointMMAP.h b/services/oboeservice/AAudioServiceEndpointMMAP.h
index 5a53885..ddfac63 100644
--- a/services/oboeservice/AAudioServiceEndpointMMAP.h
+++ b/services/oboeservice/AAudioServiceEndpointMMAP.h
@@ -79,7 +79,7 @@
     void onRoutingChanged(audio_port_handle_t portHandle) override;
     // ------------------------------------------------------------------------------
 
-    aaudio_result_t getDownDataDescription(AudioEndpointParcelable &parcelable);
+    aaudio_result_t getDownDataDescription(AudioEndpointParcelable* parcelable);
 
     int64_t getHardwareTimeOffsetNanos() const {
         return mHardwareTimeOffsetNanos;
diff --git a/services/oboeservice/AAudioServiceStreamBase.cpp b/services/oboeservice/AAudioServiceStreamBase.cpp
index 4ffc127..a25a791 100644
--- a/services/oboeservice/AAudioServiceStreamBase.cpp
+++ b/services/oboeservice/AAudioServiceStreamBase.cpp
@@ -34,23 +34,25 @@
 #include "AAudioService.h"
 #include "AAudioServiceEndpoint.h"
 #include "AAudioServiceStreamBase.h"
-#include "TimestampScheduler.h"
 
 using namespace android;  // TODO just import names needed
 using namespace aaudio;   // TODO just import names needed
 
 using content::AttributionSourceState;
 
+static const int64_t TIMEOUT_NANOS = 3LL * 1000 * 1000 * 1000;
+
 /**
  * Base class for streams in the service.
  * @return
  */
 
 AAudioServiceStreamBase::AAudioServiceStreamBase(AAudioService &audioService)
-        : mTimestampThread("AATime")
+        : mCommandThread("AACommand")
         , mAtomicStreamTimestamp()
         , mAudioService(audioService) {
     mMmapClient.attributionSource = AttributionSourceState();
+    mThreadEnabled = true;
 }
 
 AAudioServiceStreamBase::~AAudioServiceStreamBase() {
@@ -70,6 +72,13 @@
                         || getState() == AAUDIO_STREAM_STATE_UNINITIALIZED),
                         "service stream %p still open, state = %d",
                         this, getState());
+
+    // Stop the command thread before destroying.
+    if (mThreadEnabled) {
+        mThreadEnabled = false;
+        mCommandQueue.stopWaiting();
+        mCommandThread.stop();
+    }
 }
 
 std::string AAudioServiceStreamBase::dumpHeader() {
@@ -166,6 +175,16 @@
         mFramesPerBurst = mServiceEndpoint->getFramesPerBurst();
         copyFrom(*mServiceEndpoint);
     }
+
+    // Make sure this object does not get deleted before the run() method
+    // can protect it by making a strong pointer.
+    mThreadEnabled = true;
+    incStrong(nullptr); // See run() method.
+    result = mCommandThread.start(this);
+    if (result != AAUDIO_OK) {
+        decStrong(nullptr); // run() can't do it so we have to do it here.
+        goto error;
+    }
     return result;
 
 error:
@@ -174,8 +193,16 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::close() {
-    std::lock_guard<std::mutex> lock(mLock);
-    return close_l();
+    auto command = std::make_shared<AAudioCommand>(
+            CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+    aaudio_result_t result = mCommandQueue.sendCommand(command);
+
+    // Stop the command thread as the stream is closed.
+    mThreadEnabled = false;
+    mCommandQueue.stopWaiting();
+    mCommandThread.stop();
+
+    return result;
 }
 
 aaudio_result_t AAudioServiceStreamBase::close_l() {
@@ -183,8 +210,7 @@
         return AAUDIO_OK;
     }
 
-    // This will call stopTimestampThread() and also stop the stream,
-    // just in case it was not already stopped.
+    // This will stop the stream, just in case it was not already stopped.
     stop_l();
 
     aaudio_result_t result = AAUDIO_OK;
@@ -224,8 +250,12 @@
  * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
  */
 aaudio_result_t AAudioServiceStreamBase::start() {
-    std::lock_guard<std::mutex> lock(mLock);
+    auto command = std::make_shared<AAudioCommand>(
+            START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
+}
 
+aaudio_result_t AAudioServiceStreamBase::start_l() {
     const int64_t beginNs = AudioClock::getNanoseconds();
     aaudio_result_t result = AAUDIO_OK;
 
@@ -261,15 +291,6 @@
     // This should happen at the end of the start.
     sendServiceEvent(AAUDIO_SERVICE_EVENT_STARTED);
     setState(AAUDIO_STREAM_STATE_STARTED);
-    mThreadEnabled.store(true);
-    // Make sure this object does not get deleted before the run() method
-    // can protect it by making a strong pointer.
-    incStrong(nullptr); // See run() method.
-    result = mTimestampThread.start(this);
-    if (result != AAUDIO_OK) {
-        decStrong(nullptr); // run() can't do it so we have to do it here.
-        goto error;
-    }
 
     return result;
 
@@ -279,8 +300,9 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::pause() {
-    std::lock_guard<std::mutex> lock(mLock);
-    return pause_l();
+    auto command = std::make_shared<AAudioCommand>(
+            PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
 }
 
 aaudio_result_t AAudioServiceStreamBase::pause_l() {
@@ -298,12 +320,6 @@
             .set(AMEDIAMETRICS_PROP_STATUS, (int32_t)result)
             .record(); });
 
-    result = stopTimestampThread();
-    if (result != AAUDIO_OK) {
-        disconnect_l();
-        return result;
-    }
-
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
     if (endpoint == nullptr) {
         ALOGE("%s() has no endpoint", __func__);
@@ -322,8 +338,9 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::stop() {
-    std::lock_guard<std::mutex> lock(mLock);
-    return stop_l();
+    auto command = std::make_shared<AAudioCommand>(
+            STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
 }
 
 aaudio_result_t AAudioServiceStreamBase::stop_l() {
@@ -343,12 +360,6 @@
 
     setState(AAUDIO_STREAM_STATE_STOPPING);
 
-    // Temporarily unlock because we are joining the timestamp thread and it may try
-    // to acquire mLock.
-    mLock.unlock();
-    result = stopTimestampThread();
-    mLock.lock();
-
     if (result != AAUDIO_OK) {
         disconnect_l();
         return result;
@@ -373,17 +384,13 @@
     return result;
 }
 
-aaudio_result_t AAudioServiceStreamBase::stopTimestampThread() {
-    aaudio_result_t result = AAUDIO_OK;
-    // clear flag that tells thread to loop
-    if (mThreadEnabled.exchange(false)) {
-        result = mTimestampThread.stop();
-    }
-    return result;
+aaudio_result_t AAudioServiceStreamBase::flush() {
+    auto command = std::make_shared<AAudioCommand>(
+            FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
 }
 
-aaudio_result_t AAudioServiceStreamBase::flush() {
-    std::lock_guard<std::mutex> lock(mLock);
+aaudio_result_t AAudioServiceStreamBase::flush_l() {
     aaudio_result_t result = AAudio_isFlushAllowed(getState());
     if (result != AAUDIO_OK) {
         return result;
@@ -404,48 +411,111 @@
     return AAUDIO_OK;
 }
 
-// implement Runnable, periodically send timestamps to client
+// implement Runnable, periodically send timestamps to client and process commands from queue.
 __attribute__((no_sanitize("integer")))
 void AAudioServiceStreamBase::run() {
-    ALOGD("%s() %s entering >>>>>>>>>>>>>> TIMESTAMPS", __func__, getTypeText());
+    ALOGD("%s() %s entering >>>>>>>>>>>>>> COMMANDS", __func__, getTypeText());
     // Hold onto the ref counted stream until the end.
     android::sp<AAudioServiceStreamBase> holdStream(this);
     TimestampScheduler timestampScheduler;
+    int64_t nextTime;
     // Balance the incStrong from when the thread was launched.
     holdStream->decStrong(nullptr);
 
-    timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate());
-    timestampScheduler.start(AudioClock::getNanoseconds());
-    int64_t nextTime = timestampScheduler.nextAbsoluteTime();
+    // Taking mLock while starting the thread. All the operation must be able to
+    // run with holding the lock.
+    std::scoped_lock<std::mutex> _l(mLock);
+
     int32_t loopCount = 0;
-    aaudio_result_t result = AAUDIO_OK;
-    while(mThreadEnabled.load()) {
+    while (mThreadEnabled.load()) {
         loopCount++;
-        if (AudioClock::getNanoseconds() >= nextTime) {
-            result = sendCurrentTimestamp();
-            if (result != AAUDIO_OK) {
-                ALOGE("%s() timestamp thread got result = %d", __func__, result);
-                break;
+        int64_t timeoutNanos = -1;
+        if (isRunning()) {
+            timeoutNanos = nextTime - AudioClock::getNanoseconds();
+            timeoutNanos = std::max<int64_t>(0, timeoutNanos);
+        }
+
+        auto command = mCommandQueue.waitForCommand(timeoutNanos);
+        if (!mThreadEnabled) {
+            // Break the loop if the thread is disabled.
+            break;
+        }
+
+        if (isRunning() && AudioClock::getNanoseconds() >= nextTime) {
+            // It is time to update timestamp.
+            if (sendCurrentTimestamp_l() != AAUDIO_OK) {
+                ALOGE("Failed to send current timestamp, stop updating timestamp");
+                disconnect_l();
+            } else {
+                nextTime = timestampScheduler.nextAbsoluteTime();
             }
-            nextTime = timestampScheduler.nextAbsoluteTime();
-        } else  {
-            // Sleep until it is time to send the next timestamp.
-            // TODO Wait for a signal with a timeout so that we can stop more quickly.
-            AudioClock::sleepUntilNanoTime(nextTime);
+        }
+
+        if (command != nullptr) {
+            std::scoped_lock<std::mutex> _commandLock(command->lock);
+            switch (command->operationCode) {
+                case START:
+                    command->result = start_l();
+                    timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate());
+                    timestampScheduler.start(AudioClock::getNanoseconds());
+                    nextTime = timestampScheduler.nextAbsoluteTime();
+                    break;
+                case PAUSE:
+                    command->result = pause_l();
+                    break;
+                case STOP:
+                    command->result = stop_l();
+                    break;
+                case FLUSH:
+                    command->result = flush_l();
+                    break;
+                case CLOSE:
+                    command->result = close_l();
+                    break;
+                case DISCONNECT:
+                    disconnect_l();
+                    break;
+                case REGISTER_AUDIO_THREAD: {
+                    RegisterAudioThreadParam *param =
+                            (RegisterAudioThreadParam *) command->parameter.get();
+                    command->result =
+                            param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
+                                             : registerAudioThread_l(param->mOwnerPid,
+                                                                     param->mClientThreadId,
+                                                                     param->mPriority);
+                }
+                    break;
+                case UNREGISTER_AUDIO_THREAD: {
+                    UnregisterAudioThreadParam *param =
+                            (UnregisterAudioThreadParam *) command->parameter.get();
+                    command->result =
+                            param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
+                                             : unregisterAudioThread_l(param->mClientThreadId);
+                }
+                    break;
+                case GET_DESCRIPTION: {
+                    GetDescriptionParam *param = (GetDescriptionParam *) command->parameter.get();
+                    command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
+                                                        : getDescription_l(param->mParcelable);
+                }
+                    break;
+                default:
+                    ALOGE("Invalid command op code: %d", command->operationCode);
+                    break;
+            }
+            if (command->isWaitingForReply) {
+                command->isWaitingForReply = false;
+                command->conditionVariable.notify_one();
+            }
         }
     }
-    // This was moved from the calls in stop_l() and pause_l(), which could cause a deadlock
-    // if it resulted in a call to disconnect.
-    if (result == AAUDIO_OK) {
-        (void) sendCurrentTimestamp();
-    }
-    ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< TIMESTAMPS",
+    ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< COMMANDS",
           __func__, getTypeText(), loopCount);
 }
 
 void AAudioServiceStreamBase::disconnect() {
-    std::lock_guard<std::mutex> lock(mLock);
-    disconnect_l();
+    auto command = std::make_shared<AAudioCommand>(DISCONNECT);
+    mCommandQueue.sendCommand(command);
 }
 
 void AAudioServiceStreamBase::disconnect_l() {
@@ -461,15 +531,23 @@
     }
 }
 
-aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId,
-        int priority) {
-    std::lock_guard<std::mutex> lock(mLock);
+aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) {
+    const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
+    auto command = std::make_shared<AAudioCommand>(
+            REGISTER_AUDIO_THREAD,
+            std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority),
+            true /*waitForReply*/,
+            TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
+}
+
+aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l(
+        pid_t ownerPid, pid_t clientThreadId, int priority) {
     aaudio_result_t result = AAUDIO_OK;
     if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
         ALOGE("AAudioService::registerAudioThread(), thread already registered");
         result = AAUDIO_ERROR_INVALID_STATE;
     } else {
-        const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
         setRegisteredThread(clientThreadId);
         int err = android::requestPriority(ownerPid, clientThreadId,
                                            priority, true /* isForApp */);
@@ -483,7 +561,15 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) {
-    std::lock_guard<std::mutex> lock(mLock);
+    auto command = std::make_shared<AAudioCommand>(
+            UNREGISTER_AUDIO_THREAD,
+            std::make_shared<UnregisterAudioThreadParam>(clientThreadId),
+            true /*waitForReply*/,
+            TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
+}
+
+aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) {
     aaudio_result_t result = AAUDIO_OK;
     if (getRegisteredThread() != clientThreadId) {
         ALOGE("%s(), wrong thread", __func__);
@@ -552,7 +638,7 @@
     return sendServiceEvent(AAUDIO_SERVICE_EVENT_XRUN, (int64_t) xRunCount);
 }
 
-aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() {
+aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp_l() {
     AAudioServiceMessage command;
     // It is not worth filling up the queue with timestamps.
     // That can cause the stream to get suspended.
@@ -562,8 +648,8 @@
     }
 
     // Send a timestamp for the clock model.
-    aaudio_result_t result = getFreeRunningPosition(&command.timestamp.position,
-                                                    &command.timestamp.timestamp);
+    aaudio_result_t result = getFreeRunningPosition_l(&command.timestamp.position,
+                                                      &command.timestamp.timestamp);
     if (result == AAUDIO_OK) {
         ALOGV("%s() SERVICE  %8lld at %lld", __func__,
               (long long) command.timestamp.position,
@@ -573,8 +659,8 @@
 
         if (result == AAUDIO_OK) {
             // Send a hardware timestamp for presentation time.
-            result = getHardwareTimestamp(&command.timestamp.position,
-                                          &command.timestamp.timestamp);
+            result = getHardwareTimestamp_l(&command.timestamp.position,
+                                            &command.timestamp.timestamp);
             if (result == AAUDIO_OK) {
                 ALOGV("%s() HARDWARE %8lld at %lld", __func__,
                       (long long) command.timestamp.position,
@@ -596,7 +682,15 @@
  * used to communicate with the underlying HAL or Service.
  */
 aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) {
-    std::lock_guard<std::mutex> lock(mLock);
+    auto command = std::make_shared<AAudioCommand>(
+            GET_DESCRIPTION,
+            std::make_shared<GetDescriptionParam>(&parcelable),
+            true /*waitForReply*/,
+            TIMEOUT_NANOS);
+    return mCommandQueue.sendCommand(command);
+}
+
+aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) {
     {
         std::lock_guard<std::mutex> lock(mUpMessageQueueLock);
         if (mUpMessageQueue == nullptr) {
@@ -605,9 +699,9 @@
         }
         // Gather information on the message queue.
         mUpMessageQueue->fillParcelable(parcelable,
-                                        parcelable.mUpMessageQueueParcelable);
+                                        parcelable->mUpMessageQueueParcelable);
     }
-    return getAudioDataDescription(parcelable);
+    return getAudioDataDescription_l(parcelable);
 }
 
 void AAudioServiceStreamBase::onVolumeChanged(float volume) {
diff --git a/services/oboeservice/AAudioServiceStreamBase.h b/services/oboeservice/AAudioServiceStreamBase.h
index 976996d..aa8e8cf 100644
--- a/services/oboeservice/AAudioServiceStreamBase.h
+++ b/services/oboeservice/AAudioServiceStreamBase.h
@@ -33,8 +33,10 @@
 #include "utility/AAudioUtilities.h"
 #include "utility/AudioClock.h"
 
-#include "SharedRingBuffer.h"
+#include "AAudioCommandQueue.h"
 #include "AAudioThread.h"
+#include "SharedRingBuffer.h"
+#include "TimestampScheduler.h"
 
 namespace android {
     class AAudioService;
@@ -235,10 +237,46 @@
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request,
                          aaudio_sharing_mode_t sharingMode);
 
+    aaudio_result_t start_l() REQUIRES(mLock);
     virtual aaudio_result_t close_l() REQUIRES(mLock);
     virtual aaudio_result_t pause_l() REQUIRES(mLock);
     virtual aaudio_result_t stop_l() REQUIRES(mLock);
     void disconnect_l() REQUIRES(mLock);
+    aaudio_result_t flush_l() REQUIRES(mLock);
+
+    class RegisterAudioThreadParam : public AAudioCommandParam {
+    public:
+        RegisterAudioThreadParam(pid_t ownerPid, pid_t clientThreadId, int priority)
+                : AAudioCommandParam(), mOwnerPid(ownerPid),
+                  mClientThreadId(clientThreadId), mPriority(priority) { }
+        ~RegisterAudioThreadParam() = default;
+
+        pid_t mOwnerPid;
+        pid_t mClientThreadId;
+        int mPriority;
+    };
+    aaudio_result_t registerAudioThread_l(
+            pid_t ownerPid, pid_t clientThreadId, int priority) REQUIRES(mLock);
+
+    class UnregisterAudioThreadParam : public AAudioCommandParam {
+    public:
+        UnregisterAudioThreadParam(pid_t clientThreadId)
+                : AAudioCommandParam(), mClientThreadId(clientThreadId) { }
+        ~UnregisterAudioThreadParam() = default;
+
+        pid_t mClientThreadId;
+    };
+    aaudio_result_t unregisterAudioThread_l(pid_t clientThreadId) REQUIRES(mLock);
+
+    class GetDescriptionParam : public AAudioCommandParam {
+    public:
+        GetDescriptionParam(AudioEndpointParcelable* parcelable)
+                : AAudioCommandParam(), mParcelable(parcelable) { }
+        ~GetDescriptionParam() = default;
+
+        AudioEndpointParcelable* mParcelable;
+    };
+    aaudio_result_t getDescription_l(AudioEndpointParcelable* parcelable) REQUIRES(mLock);
 
     void setState(aaudio_stream_state_t state);
 
@@ -250,7 +288,7 @@
 
     aaudio_result_t writeUpMessageQueue(AAudioServiceMessage *command);
 
-    aaudio_result_t sendCurrentTimestamp() EXCLUDES(mLock);
+    aaudio_result_t sendCurrentTimestamp_l() REQUIRES(mLock);
 
     aaudio_result_t sendXRunCount(int32_t xRunCount);
 
@@ -259,11 +297,13 @@
      * @param timeNanos
      * @return AAUDIO_OK or AAUDIO_ERROR_UNAVAILABLE or other negative error
      */
-    virtual aaudio_result_t getFreeRunningPosition(int64_t *positionFrames, int64_t *timeNanos) = 0;
+    virtual aaudio_result_t getFreeRunningPosition_l(
+            int64_t *positionFrames, int64_t *timeNanos) = 0;
 
-    virtual aaudio_result_t getHardwareTimestamp(int64_t *positionFrames, int64_t *timeNanos) = 0;
+    virtual aaudio_result_t getHardwareTimestamp_l(int64_t *positionFrames, int64_t *timeNanos) = 0;
 
-    virtual aaudio_result_t getAudioDataDescription(AudioEndpointParcelable &parcelable) = 0;
+    virtual aaudio_result_t getAudioDataDescription_l(AudioEndpointParcelable* parcelable) = 0;
+
 
     aaudio_stream_state_t   mState = AAUDIO_STREAM_STATE_UNINITIALIZED;
 
@@ -279,9 +319,20 @@
     std::mutex              mUpMessageQueueLock;
     std::shared_ptr<SharedRingBuffer> mUpMessageQueue;
 
-    AAudioThread            mTimestampThread;
-    // This is used by one thread to tell another thread to exit. So it must be atomic.
+    enum : int32_t {
+        START,
+        PAUSE,
+        STOP,
+        FLUSH,
+        CLOSE,
+        DISCONNECT,
+        REGISTER_AUDIO_THREAD,
+        UNREGISTER_AUDIO_THREAD,
+        GET_DESCRIPTION,
+    };
+    AAudioThread            mCommandThread;
     std::atomic<bool>       mThreadEnabled{false};
+    AAudioCommandQueue      mCommandQueue;
 
     int32_t                 mFramesPerBurst = 0;
     android::AudioClient    mMmapClient; // set in open, used in MMAP start()
@@ -336,6 +387,8 @@
 protected:
     // Locking order is important.
     // Acquire mLock before acquiring AAudioServiceEndpoint::mLockStreams
+    // The lock will be held by the command thread. All operations needing the lock must run from
+    // the command thread.
     std::mutex              mLock; // Prevent start/stop/close etcetera from colliding
 };
 
diff --git a/services/oboeservice/AAudioServiceStreamMMAP.cpp b/services/oboeservice/AAudioServiceStreamMMAP.cpp
index 57dc1ab..05b7f7d 100644
--- a/services/oboeservice/AAudioServiceStreamMMAP.cpp
+++ b/services/oboeservice/AAudioServiceStreamMMAP.cpp
@@ -141,7 +141,7 @@
 }
 
 // Get free-running DSP or DMA hardware position from the HAL.
-aaudio_result_t AAudioServiceStreamMMAP::getFreeRunningPosition(int64_t *positionFrames,
+aaudio_result_t AAudioServiceStreamMMAP::getFreeRunningPosition_l(int64_t *positionFrames,
                                                                   int64_t *timeNanos) {
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
     if (endpoint == nullptr) {
@@ -158,14 +158,14 @@
         *positionFrames = timestamp.getPosition();
         *timeNanos = timestamp.getNanoseconds();
     } else if (result != AAUDIO_ERROR_UNAVAILABLE) {
-        disconnect();
+        disconnect_l();
     }
     return result;
 }
 
 // Get timestamp from presentation position.
 // If it fails, get timestamp that was written by getFreeRunningPosition()
-aaudio_result_t AAudioServiceStreamMMAP::getHardwareTimestamp(int64_t *positionFrames,
+aaudio_result_t AAudioServiceStreamMMAP::getHardwareTimestamp_l(int64_t *positionFrames,
                                                                 int64_t *timeNanos) {
 
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
@@ -198,8 +198,8 @@
 }
 
 // Get an immutable description of the data queue from the HAL.
-aaudio_result_t AAudioServiceStreamMMAP::getAudioDataDescription(
-        AudioEndpointParcelable &parcelable)
+aaudio_result_t AAudioServiceStreamMMAP::getAudioDataDescription_l(
+        AudioEndpointParcelable* parcelable)
 {
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
     if (endpoint == nullptr) {
diff --git a/services/oboeservice/AAudioServiceStreamMMAP.h b/services/oboeservice/AAudioServiceStreamMMAP.h
index 667465a..28da120 100644
--- a/services/oboeservice/AAudioServiceStreamMMAP.h
+++ b/services/oboeservice/AAudioServiceStreamMMAP.h
@@ -71,12 +71,14 @@
 
     aaudio_result_t stop_l() REQUIRES(mLock) override;
 
-    aaudio_result_t getAudioDataDescription(AudioEndpointParcelable &parcelable) override;
+    aaudio_result_t getAudioDataDescription_l(
+            AudioEndpointParcelable* parcelable) REQUIRES(mLock) override;
 
-    aaudio_result_t getFreeRunningPosition(int64_t *positionFrames,
-            int64_t *timeNanos) EXCLUDES(mLock) override;
+    aaudio_result_t getFreeRunningPosition_l(int64_t *positionFrames,
+            int64_t *timeNanos) REQUIRES(mLock) override;
 
-    aaudio_result_t getHardwareTimestamp(int64_t *positionFrames, int64_t *timeNanos) override;
+    aaudio_result_t getHardwareTimestamp_l(
+            int64_t *positionFrames, int64_t *timeNanos) REQUIRES(mLock) override;
 
     /**
      * Device specific startup.
diff --git a/services/oboeservice/AAudioServiceStreamShared.cpp b/services/oboeservice/AAudioServiceStreamShared.cpp
index ad06d97..04fcd6d 100644
--- a/services/oboeservice/AAudioServiceStreamShared.cpp
+++ b/services/oboeservice/AAudioServiceStreamShared.cpp
@@ -211,8 +211,8 @@
 /**
  * Get an immutable description of the data queue created by this service.
  */
-aaudio_result_t AAudioServiceStreamShared::getAudioDataDescription(
-        AudioEndpointParcelable &parcelable)
+aaudio_result_t AAudioServiceStreamShared::getAudioDataDescription_l(
+        AudioEndpointParcelable* parcelable)
 {
     std::lock_guard<std::mutex> lock(audioDataQueueLock);
     if (mAudioDataQueue == nullptr) {
@@ -221,8 +221,8 @@
     }
     // Gather information on the data queue.
     mAudioDataQueue->fillParcelable(parcelable,
-                                    parcelable.mDownDataQueueParcelable);
-    parcelable.mDownDataQueueParcelable.setFramesPerBurst(getFramesPerBurst());
+                                    parcelable->mDownDataQueueParcelable);
+    parcelable->mDownDataQueueParcelable.setFramesPerBurst(getFramesPerBurst());
     return AAUDIO_OK;
 }
 
@@ -231,8 +231,8 @@
 }
 
 // Get timestamp that was written by mixer or distributor.
-aaudio_result_t AAudioServiceStreamShared::getFreeRunningPosition(int64_t *positionFrames,
-                                                                  int64_t *timeNanos) {
+aaudio_result_t AAudioServiceStreamShared::getFreeRunningPosition_l(int64_t *positionFrames,
+                                                                    int64_t *timeNanos) {
     // TODO Get presentation timestamp from the HAL
     if (mAtomicStreamTimestamp.isValid()) {
         Timestamp timestamp = mAtomicStreamTimestamp.read();
@@ -245,8 +245,8 @@
 }
 
 // Get timestamp from lower level service.
-aaudio_result_t AAudioServiceStreamShared::getHardwareTimestamp(int64_t *positionFrames,
-                                                                int64_t *timeNanos) {
+aaudio_result_t AAudioServiceStreamShared::getHardwareTimestamp_l(int64_t *positionFrames,
+                                                                  int64_t *timeNanos) {
 
     int64_t position = 0;
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
diff --git a/services/oboeservice/AAudioServiceStreamShared.h b/services/oboeservice/AAudioServiceStreamShared.h
index 4fae5b4..78f9787 100644
--- a/services/oboeservice/AAudioServiceStreamShared.h
+++ b/services/oboeservice/AAudioServiceStreamShared.h
@@ -88,11 +88,14 @@
 
 protected:
 
-    aaudio_result_t getAudioDataDescription(AudioEndpointParcelable &parcelable) override;
+    aaudio_result_t getAudioDataDescription_l(
+            AudioEndpointParcelable* parcelable) REQUIRES(mLock) override;
 
-    aaudio_result_t getFreeRunningPosition(int64_t *positionFrames, int64_t *timeNanos) override;
+    aaudio_result_t getFreeRunningPosition_l(
+            int64_t *positionFrames, int64_t *timeNanos) REQUIRES(mLock) override;
 
-    aaudio_result_t getHardwareTimestamp(int64_t *positionFrames, int64_t *timeNanos) override;
+    aaudio_result_t getHardwareTimestamp_l(
+            int64_t *positionFrames, int64_t *timeNanos) REQUIRES(mLock) override;
 
     /**
      * @param requestedCapacityFrames
diff --git a/services/oboeservice/AAudioThread.cpp b/services/oboeservice/AAudioThread.cpp
index 68496ac..549fa59 100644
--- a/services/oboeservice/AAudioThread.cpp
+++ b/services/oboeservice/AAudioThread.cpp
@@ -16,9 +16,10 @@
 
 #define LOG_TAG "AAudioThread"
 //#define LOG_NDEBUG 0
-#include <utils/Log.h>
 
-#include <pthread.h>
+#include <system_error>
+
+#include <utils/Log.h>
 
 #include <aaudio/AAudio.h>
 #include <utility/AAudioUtilities.h>
@@ -38,7 +39,7 @@
 }
 
 AAudioThread::~AAudioThread() {
-    ALOGE_IF(pthread_equal(pthread_self(), mThread),
+    ALOGE_IF(mThread.get_id() == std::this_thread::get_id(),
             "%s() destructor running in thread", __func__);
     ALOGE_IF(mHasThread, "%s() thread never joined", __func__);
 }
@@ -60,32 +61,16 @@
     }
 }
 
-// This is the entry point for the new thread created by createThread_l().
-// It converts the 'C' function call to a C++ method call.
-static void * AAudioThread_internalThreadProc(void *arg) {
-    AAudioThread *aaudioThread = (AAudioThread *) arg;
-    aaudioThread->dispatch();
-    return nullptr;
-}
-
 aaudio_result_t AAudioThread::start(Runnable *runnable) {
     if (mHasThread) {
         ALOGE("start() - mHasThread already true");
         return AAUDIO_ERROR_INVALID_STATE;
     }
-    // mRunnable will be read by the new thread when it starts.
-    // pthread_create() forces a memory synchronization so mRunnable does not need to be atomic.
+    // mRunnable will be read by the new thread when it starts. A std::thread is created.
     mRunnable = runnable;
-    int err = pthread_create(&mThread, nullptr, AAudioThread_internalThreadProc, this);
-    if (err != 0) {
-        ALOGE("start() - pthread_create() returned %d %s", err, strerror(err));
-        return AAudioConvert_androidToAAudioResult(-err);
-    } else {
-        int err = pthread_setname_np(mThread, mName);
-        ALOGW_IF((err != 0), "Could not set name of AAudioThread. err = %d", err);
-        mHasThread = true;
-        return AAUDIO_OK;
-    }
+    mHasThread = true;
+    mThread = std::thread(&AAudioThread::dispatch, this);
+    return AAUDIO_OK;
 }
 
 aaudio_result_t AAudioThread::stop() {
@@ -93,18 +78,18 @@
         ALOGE("stop() but no thread running");
         return AAUDIO_ERROR_INVALID_STATE;
     }
-    // Check to see if the thread is trying to stop itself.
-    if (pthread_equal(pthread_self(), mThread)) {
-        ALOGE("%s() attempt to pthread_join() from launched thread!", __func__);
-        return AAUDIO_ERROR_INTERNAL;
-    }
 
-    int err = pthread_join(mThread, nullptr);
-    if (err != 0) {
-        ALOGE("stop() - pthread_join() returned %d %s", err, strerror(err));
-        return AAudioConvert_androidToAAudioResult(-err);
-    } else {
+    if (mThread.get_id() == std::this_thread::get_id()) {
+        // The thread must not be joined by itself.
+        ALOGE("%s() attempt to join() from launched thread!", __func__);
+        return AAUDIO_ERROR_INTERNAL;
+    } else if (mThread.joinable()) {
+        // Double check if the thread is joinable to avoid exception when calling join.
+        mThread.join();
         mHasThread = false;
         return AAUDIO_OK;
+    } else {
+        ALOGE("%s() the thread is not joinable", __func__);
+        return AAUDIO_ERROR_INTERNAL;
     }
 }
diff --git a/services/oboeservice/AAudioThread.h b/services/oboeservice/AAudioThread.h
index 08a8a98..b2774e0 100644
--- a/services/oboeservice/AAudioThread.h
+++ b/services/oboeservice/AAudioThread.h
@@ -18,7 +18,7 @@
 #define AAUDIO_THREAD_H
 
 #include <atomic>
-#include <pthread.h>
+#include <thread>
 
 #include <aaudio/AAudio.h>
 
@@ -37,7 +37,6 @@
 
 /**
  * Abstraction for a host dependent thread.
- * TODO Consider using Android "Thread" class or std::thread instead.
  */
 class AAudioThread
 {
@@ -73,7 +72,7 @@
 
     Runnable    *mRunnable = nullptr;
     bool         mHasThread = false;
-    pthread_t    mThread = {};
+    std::thread  mThread;
 
     static std::atomic<uint32_t> mNextThreadIndex;
     char         mName[16]; // max length for a pthread_name
diff --git a/services/oboeservice/Android.bp b/services/oboeservice/Android.bp
index 3563d66..20a4c34 100644
--- a/services/oboeservice/Android.bp
+++ b/services/oboeservice/Android.bp
@@ -27,6 +27,7 @@
 
     srcs: [
         "AAudioClientTracker.cpp",
+        "AAudioCommandQueue.cpp",
         "AAudioEndpointManager.cpp",
         "AAudioMixer.cpp",
         "AAudioService.cpp",
diff --git a/services/oboeservice/SharedRingBuffer.cpp b/services/oboeservice/SharedRingBuffer.cpp
index c1d4e16..fd2a454 100644
--- a/services/oboeservice/SharedRingBuffer.cpp
+++ b/services/oboeservice/SharedRingBuffer.cpp
@@ -85,9 +85,9 @@
     return AAUDIO_OK;
 }
 
-void SharedRingBuffer::fillParcelable(AudioEndpointParcelable &endpointParcelable,
+void SharedRingBuffer::fillParcelable(AudioEndpointParcelable* endpointParcelable,
                     RingBufferParcelable &ringBufferParcelable) {
-    int fdIndex = endpointParcelable.addFileDescriptor(mFileDescriptor, mSharedMemorySizeInBytes);
+    int fdIndex = endpointParcelable->addFileDescriptor(mFileDescriptor, mSharedMemorySizeInBytes);
     ringBufferParcelable.setupMemory(fdIndex,
                                      SHARED_RINGBUFFER_DATA_OFFSET,
                                      mDataMemorySizeInBytes,
diff --git a/services/oboeservice/SharedRingBuffer.h b/services/oboeservice/SharedRingBuffer.h
index c3a9bb7..cff1261 100644
--- a/services/oboeservice/SharedRingBuffer.h
+++ b/services/oboeservice/SharedRingBuffer.h
@@ -45,7 +45,7 @@
 
     aaudio_result_t allocate(android::fifo_frames_t bytesPerFrame, android::fifo_frames_t capacityInFrames);
 
-    void fillParcelable(AudioEndpointParcelable &endpointParcelable,
+    void fillParcelable(AudioEndpointParcelable* endpointParcelable,
                         RingBufferParcelable &ringBufferParcelable);
 
     /**