audio: Implement transient state testing

Add ModuleDebug.streamTransientStateDelayMs parameter to ensure
that streams stay in transient states for the specified amount of
time. This enabled sending commands from VTS while the stream is
still in a transient state.

Add 'getStatus' stream command to retrieve current positions,
counters, and stream state. Previously we were planning to use a
zero-sized burst command for that, however, after the
introduction of stream state machines, the 'burst' command is
not handled in every stream state, and may even affect the
current state, thus it's no more usable for this purpose.

Bug: 205884982
Test: atest VtsHalAudioCoreTargetTest
Change-Id: I8717acace8d95d76bef2ec9fd6561796d7544992
diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp
index 6863fe3..a8f3b9b 100644
--- a/audio/aidl/default/Module.cpp
+++ b/audio/aidl/default/Module.cpp
@@ -135,8 +135,8 @@
         StreamContext temp(
                 std::make_unique<StreamContext::CommandMQ>(1, true /*configureEventFlagWord*/),
                 std::make_unique<StreamContext::ReplyMQ>(1, true /*configureEventFlagWord*/),
-                frameSize,
-                std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames));
+                frameSize, std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames),
+                mDebug.streamTransientStateDelayMs);
         if (temp.isValid()) {
             *out_context = std::move(temp);
         } else {
@@ -242,6 +242,11 @@
                    << "while having external devices connected";
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
     }
+    if (in_debug.streamTransientStateDelayMs < 0) {
+        LOG(ERROR) << __func__ << ": streamTransientStateDelayMs is negative: "
+                   << in_debug.streamTransientStateDelayMs;
+        return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
+    }
     mDebug = in_debug;
     return ndk::ScopedAStatus::ok();
 }
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 21dc4b6..c5d00a2 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -96,23 +96,36 @@
     }
 }
 
+void StreamWorkerCommonLogic::populateReplyWrongState(
+        StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const {
+    LOG(WARNING) << "command '" << toString(command.getTag())
+                 << "' can not be handled in the state " << toString(mState);
+    reply->status = STATUS_INVALID_OPERATION;
+}
+
 const std::string StreamInWorkerLogic::kThreadName = "reader";
 
 StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
+    // Note: for input streams, draining is driven by the client, thus
+    // "empty buffer" condition can only happen while handling the 'burst'
+    // command. Thus, unlike for output streams, it does not make sense to
+    // delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
+    // TODO: Add a delay for transitions of async operations when/if they added.
+
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
         mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
+    LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
     StreamDescriptor::Reply reply{};
     reply.status = STATUS_BAD_VALUE;
     using Tag = StreamDescriptor::Command::Tag;
     switch (command.getTag()) {
-        case Tag::hal_reserved_exit:
-            if (const int32_t cookie = command.get<Tag::hal_reserved_exit>();
+        case Tag::halReservedExit:
+            if (const int32_t cookie = command.get<Tag::halReservedExit>();
                 cookie == mInternalCommandCookie) {
-                LOG(DEBUG) << __func__ << ": received EXIT command";
                 setClosed();
                 // This is an internal command, no need to reply.
                 return Status::EXIT;
@@ -120,8 +133,10 @@
                 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
             }
             break;
+        case Tag::getStatus:
+            populateReply(&reply, mIsConnected);
+            break;
         case Tag::start:
-            LOG(DEBUG) << __func__ << ": received START read command";
             if (mState == StreamDescriptor::State::STANDBY ||
                 mState == StreamDescriptor::State::DRAINING) {
                 populateReply(&reply, mIsConnected);
@@ -129,15 +144,13 @@
                                  ? StreamDescriptor::State::IDLE
                                  : StreamDescriptor::State::ACTIVE;
             } else {
-                LOG(WARNING) << __func__ << ": START command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::burst:
             if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
-                LOG(DEBUG) << __func__ << ": received BURST read command for " << fmqByteCount
-                           << " bytes";
+                LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
+                           << fmqByteCount << " bytes";
                 if (mState == StreamDescriptor::State::IDLE ||
                     mState == StreamDescriptor::State::ACTIVE ||
                     mState == StreamDescriptor::State::PAUSED ||
@@ -151,69 +164,56 @@
                     } else if (mState == StreamDescriptor::State::DRAINING) {
                         // To simplify the reference code, we assume that the read operation
                         // has consumed all the data remaining in the hardware buffer.
-                        // TODO: Provide parametrization on the duration of draining to test
-                        //       handling of commands during the 'DRAINING' state.
+                        // In a real implementation, here we would either remain in
+                        // the 'DRAINING' state, or transfer to 'STANDBY' depending on the
+                        // buffer state.
                         mState = StreamDescriptor::State::STANDBY;
                     }
                 } else {
-                    LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
-                                 << toString(mState);
-                    reply.status = STATUS_INVALID_OPERATION;
+                    populateReplyWrongState(&reply, command);
                 }
             } else {
                 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
             }
             break;
         case Tag::drain:
-            LOG(DEBUG) << __func__ << ": received DRAIN read command";
             if (mState == StreamDescriptor::State::ACTIVE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::DRAINING;
             } else {
-                LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::standby:
-            LOG(DEBUG) << __func__ << ": received STANDBY read command";
             if (mState == StreamDescriptor::State::IDLE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::STANDBY;
             } else {
-                LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::pause:
-            LOG(DEBUG) << __func__ << ": received PAUSE read command";
             if (mState == StreamDescriptor::State::ACTIVE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::PAUSED;
             } else {
-                LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::flush:
-            LOG(DEBUG) << __func__ << ": received FLUSH read command";
             if (mState == StreamDescriptor::State::PAUSED) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::STANDBY;
             } else {
-                LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
     }
@@ -261,20 +261,32 @@
 const std::string StreamOutWorkerLogic::kThreadName = "writer";
 
 StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
+    if (mState == StreamDescriptor::State::DRAINING) {
+        if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
+                    std::chrono::steady_clock::now() - mTransientStateStart);
+            stateDurationMs >= mTransientStateDelayMs) {
+            mState = StreamDescriptor::State::IDLE;
+            if (mTransientStateDelayMs.count() != 0) {
+                LOG(DEBUG) << __func__ << ": switched to state " << toString(mState)
+                           << " after a timeout";
+            }
+        }
+    }
+
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
         mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
+    LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
     StreamDescriptor::Reply reply{};
     reply.status = STATUS_BAD_VALUE;
     using Tag = StreamDescriptor::Command::Tag;
     switch (command.getTag()) {
-        case Tag::hal_reserved_exit:
-            if (const int32_t cookie = command.get<Tag::hal_reserved_exit>();
+        case Tag::halReservedExit:
+            if (const int32_t cookie = command.get<Tag::halReservedExit>();
                 cookie == mInternalCommandCookie) {
-                LOG(DEBUG) << __func__ << ": received EXIT command";
                 setClosed();
                 // This is an internal command, no need to reply.
                 return Status::EXIT;
@@ -282,31 +294,31 @@
                 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
             }
             break;
+        case Tag::getStatus:
+            populateReply(&reply, mIsConnected);
+            break;
         case Tag::start:
-            LOG(DEBUG) << __func__ << ": received START write command";
             switch (mState) {
                 case StreamDescriptor::State::STANDBY:
                     mState = StreamDescriptor::State::IDLE;
+                    populateReply(&reply, mIsConnected);
                     break;
                 case StreamDescriptor::State::PAUSED:
                     mState = StreamDescriptor::State::ACTIVE;
+                    populateReply(&reply, mIsConnected);
                     break;
                 case StreamDescriptor::State::DRAIN_PAUSED:
-                    mState = StreamDescriptor::State::PAUSED;
+                    switchToTransientState(StreamDescriptor::State::DRAINING);
+                    populateReply(&reply, mIsConnected);
                     break;
                 default:
-                    LOG(WARNING) << __func__ << ": START command can not be handled in the state "
-                                 << toString(mState);
-                    reply.status = STATUS_INVALID_OPERATION;
-            }
-            if (reply.status != STATUS_INVALID_OPERATION) {
-                populateReply(&reply, mIsConnected);
+                    populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::burst:
             if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
-                LOG(DEBUG) << __func__ << ": received BURST write command for " << fmqByteCount
-                           << " bytes";
+                LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
+                           << fmqByteCount << " bytes";
                 if (mState !=
                     StreamDescriptor::State::ERROR) {  // BURST can be handled in all valid states
                     if (!write(fmqByteCount, &reply)) {
@@ -320,47 +332,33 @@
                         mState = StreamDescriptor::State::ACTIVE;
                     }  // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
                 } else {
-                    LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
-                                 << toString(mState);
-                    reply.status = STATUS_INVALID_OPERATION;
+                    populateReplyWrongState(&reply, command);
                 }
             } else {
                 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
             }
             break;
         case Tag::drain:
-            LOG(DEBUG) << __func__ << ": received DRAIN write command";
             if (mState == StreamDescriptor::State::ACTIVE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::IDLE;
-                // Since there is no actual hardware that would be draining the buffer,
-                // in order to simplify the reference code, we assume that draining
-                // happens instantly, thus skipping the 'DRAINING' state.
-                // TODO: Provide parametrization on the duration of draining to test
-                //       handling of commands during the 'DRAINING' state.
+                switchToTransientState(StreamDescriptor::State::DRAINING);
             } else {
-                LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::standby:
-            LOG(DEBUG) << __func__ << ": received STANDBY write command";
             if (mState == StreamDescriptor::State::IDLE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::STANDBY;
             } else {
-                LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::pause:
-            LOG(DEBUG) << __func__ << ": received PAUSE write command";
             if (mState == StreamDescriptor::State::ACTIVE ||
                 mState == StreamDescriptor::State::DRAINING) {
                 populateReply(&reply, mIsConnected);
@@ -368,21 +366,16 @@
                                  ? StreamDescriptor::State::PAUSED
                                  : StreamDescriptor::State::DRAIN_PAUSED;
             } else {
-                LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::flush:
-            LOG(DEBUG) << __func__ << ": received FLUSH write command";
             if (mState == StreamDescriptor::State::PAUSED ||
                 mState == StreamDescriptor::State::DRAIN_PAUSED) {
                 populateReply(&reply, mIsConnected);
                 mState = StreamDescriptor::State::IDLE;
             } else {
-                LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
     }
@@ -450,9 +443,8 @@
 void StreamCommon<Metadata, StreamWorker>::stopWorker() {
     if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
         LOG(DEBUG) << __func__ << ": asking the worker to exit...";
-        auto cmd =
-                StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::hal_reserved_exit>(
-                        mContext.getInternalCommandCookie());
+        auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
+                mContext.getInternalCommandCookie());
         // Note: never call 'pause' and 'resume' methods of StreamWorker
         // in the HAL implementation. These methods are to be used by
         // the client side only. Preventing the worker loop from running
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 5ee0f82..bcbabad 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <atomic>
+#include <chrono>
 #include <cstdlib>
 #include <map>
 #include <memory>
@@ -59,24 +60,27 @@
 
     StreamContext() = default;
     StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
-                  size_t frameSize, std::unique_ptr<DataMQ> dataMQ)
+                  size_t frameSize, std::unique_ptr<DataMQ> dataMQ, int transientStateDelayMs)
         : mCommandMQ(std::move(commandMQ)),
           mInternalCommandCookie(std::rand()),
           mReplyMQ(std::move(replyMQ)),
           mFrameSize(frameSize),
-          mDataMQ(std::move(dataMQ)) {}
+          mDataMQ(std::move(dataMQ)),
+          mTransientStateDelayMs(transientStateDelayMs) {}
     StreamContext(StreamContext&& other)
         : mCommandMQ(std::move(other.mCommandMQ)),
           mInternalCommandCookie(other.mInternalCommandCookie),
           mReplyMQ(std::move(other.mReplyMQ)),
           mFrameSize(other.mFrameSize),
-          mDataMQ(std::move(other.mDataMQ)) {}
+          mDataMQ(std::move(other.mDataMQ)),
+          mTransientStateDelayMs(other.mTransientStateDelayMs) {}
     StreamContext& operator=(StreamContext&& other) {
         mCommandMQ = std::move(other.mCommandMQ);
         mInternalCommandCookie = other.mInternalCommandCookie;
         mReplyMQ = std::move(other.mReplyMQ);
         mFrameSize = other.mFrameSize;
         mDataMQ = std::move(other.mDataMQ);
+        mTransientStateDelayMs = other.mTransientStateDelayMs;
         return *this;
     }
 
@@ -86,6 +90,7 @@
     size_t getFrameSize() const { return mFrameSize; }
     int getInternalCommandCookie() const { return mInternalCommandCookie; }
     ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
+    int getTransientStateDelayMs() const { return mTransientStateDelayMs; }
     bool isValid() const;
     void reset();
 
@@ -95,6 +100,7 @@
     std::unique_ptr<ReplyMQ> mReplyMQ;
     size_t mFrameSize;
     std::unique_ptr<DataMQ> mDataMQ;
+    int mTransientStateDelayMs;
 };
 
 class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
@@ -111,9 +117,16 @@
           mFrameSize(context.getFrameSize()),
           mCommandMQ(context.getCommandMQ()),
           mReplyMQ(context.getReplyMQ()),
-          mDataMQ(context.getDataMQ()) {}
+          mDataMQ(context.getDataMQ()),
+          mTransientStateDelayMs(context.getTransientStateDelayMs()) {}
     std::string init() override;
     void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
+    void populateReplyWrongState(StreamDescriptor::Reply* reply,
+                                 const StreamDescriptor::Command& command) const;
+    void switchToTransientState(StreamDescriptor::State state) {
+        mState = state;
+        mTransientStateStart = std::chrono::steady_clock::now();
+    }
 
     // Atomic fields are used both by the main and worker threads.
     std::atomic<bool> mIsConnected = false;
@@ -125,6 +138,8 @@
     StreamContext::CommandMQ* mCommandMQ;
     StreamContext::ReplyMQ* mReplyMQ;
     StreamContext::DataMQ* mDataMQ;
+    const std::chrono::duration<int, std::milli> mTransientStateDelayMs;
+    std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart;
     // We use an array and the "size" field instead of a vector to be able to detect
     // memory allocation issues.
     std::unique_ptr<int8_t[]> mDataBuffer;