audio: Add pause, resume, and standby stream operations

Clarify and verify in VTS that the data FMQ of StreamDescriptor
is a transient buffer. The consumer must always read its entire
contents. This is the same behavior as in the HIDL HAL.

Define the state machine for streams and the set of commands for
transferring between states.

Clarify and verify in VTS that the frame counter of the
observable position must never be reset.

Implement commands for the synchronous I/O case.

Refactor stream test logic to simplify testing of state
transitions.

Bug: 205884982
Test: atest VtsHalAudioCoreTargetTest
Change-Id: Ibed7f4c3e77852863714f1910112f664b32d5897
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 312df72..7b544a1 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -85,126 +85,315 @@
     return "";
 }
 
+void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
+                                            bool isConnected) const {
+    if (isConnected) {
+        reply->status = STATUS_OK;
+        reply->observable.frames = mFrameCount;
+        reply->observable.timeNs = ::android::elapsedRealtimeNano();
+    } else {
+        reply->status = STATUS_NO_INIT;
+    }
+}
+
 const std::string StreamInWorkerLogic::kThreadName = "reader";
 
 StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     StreamDescriptor::Reply reply{};
-    if (command.code == StreamContext::COMMAND_EXIT &&
+    if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
         command.fmqByteCount == mInternalCommandCookie) {
         LOG(DEBUG) << __func__ << ": received EXIT command";
+        setClosed();
         // This is an internal command, no need to reply.
         return Status::EXIT;
-    } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+    } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
+        LOG(DEBUG) << __func__ << ": received START read command";
+        if (mState == StreamDescriptor::State::STANDBY ||
+            mState == StreamDescriptor::State::DRAINING) {
+            populateReply(&reply, mIsConnected);
+            mState = mState == StreamDescriptor::State::STANDBY ? StreamDescriptor::State::IDLE
+                                                                : StreamDescriptor::State::ACTIVE;
+        } else {
+            LOG(WARNING) << __func__ << ": START command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
         LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount
                    << " bytes";
-        usleep(3000);  // Simulate a blocking call into the driver.
-        const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
-                                           mDataMQ->availableToWrite(), mDataBufferSize});
-        const bool isConnected = mIsConnected;
-        // Simulate reading of data, or provide zeroes if the stream is not connected.
-        for (size_t i = 0; i < byteCount; ++i) {
-            using buffer_type = decltype(mDataBuffer)::element_type;
-            constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
-                                              std::numeric_limits<buffer_type>::min() + 1;
-            mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
-                                                   std::numeric_limits<buffer_type>::min()
-                                         : 0;
-        }
-        bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true;
-        if (success) {
-            LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
-                       << " succeeded; connected? " << isConnected;
-            // Frames are provided and counted regardless of connection status.
-            reply.fmqByteCount = byteCount;
-            mFrameCount += byteCount / mFrameSize;
-            if (isConnected) {
-                reply.status = STATUS_OK;
-                reply.observable.frames = mFrameCount;
-                reply.observable.timeNs = ::android::elapsedRealtimeNano();
-            } else {
-                reply.status = STATUS_INVALID_OPERATION;
+        if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE ||
+            mState == StreamDescriptor::State::PAUSED ||
+            mState == StreamDescriptor::State::DRAINING) {
+            if (!read(command.fmqByteCount, &reply)) {
+                mState = StreamDescriptor::State::ERROR;
+            }
+            if (mState == StreamDescriptor::State::IDLE ||
+                mState == StreamDescriptor::State::PAUSED) {
+                mState = StreamDescriptor::State::ACTIVE;
+            } else if (mState == StreamDescriptor::State::DRAINING) {
+                // To simplify the reference code, we assume that the read operation
+                // has consumed all the data remaining in the hardware buffer.
+                // TODO: Provide parametrization on the duration of draining to test
+                //       handling of commands during the 'DRAINING' state.
+                mState = StreamDescriptor::State::STANDBY;
             }
         } else {
-            LOG(WARNING) << __func__ << ": writing of " << byteCount
-                         << " bytes of data to MQ failed";
-            reply.status = STATUS_NOT_ENOUGH_DATA;
+            LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
         }
-        reply.latencyMs = Module::kLatencyMs;
+    } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received DRAIN read command";
+        if (mState == StreamDescriptor::State::ACTIVE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::DRAINING;
+        } else {
+            LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received PAUSE read command";
+        if (mState == StreamDescriptor::State::ACTIVE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::PAUSED;
+        } else {
+            LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received FLUSH read command";
+        if (mState == StreamDescriptor::State::PAUSED) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::STANDBY;
+        } else {
+            LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
+               command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received STANDBY read command";
+        if (mState == StreamDescriptor::State::IDLE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::STANDBY;
+        } else {
+            LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
     } else {
         LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
                      << ") or count: " << command.fmqByteCount;
         reply.status = STATUS_BAD_VALUE;
     }
+    reply.state = mState;
     LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
     if (!mReplyMQ->writeBlocking(&reply, 1)) {
         LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     return Status::CONTINUE;
 }
 
+bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
+    // Can switch the state to ERROR if a driver error occurs.
+    const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
+    const bool isConnected = mIsConnected;
+    bool fatal = false;
+    // Simulate reading of data, or provide zeroes if the stream is not connected.
+    for (size_t i = 0; i < byteCount; ++i) {
+        using buffer_type = decltype(mDataBuffer)::element_type;
+        constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
+                                          std::numeric_limits<buffer_type>::min() + 1;
+        mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
+                                               std::numeric_limits<buffer_type>::min()
+                                     : 0;
+    }
+    usleep(3000);  // Simulate a blocking call into the driver.
+    // Set 'fatal = true' if a driver error occurs.
+    if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
+        LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+                   << " succeeded; connected? " << isConnected;
+        // Frames are provided and counted regardless of connection status.
+        reply->fmqByteCount += byteCount;
+        mFrameCount += byteCount / mFrameSize;
+        populateReply(reply, isConnected);
+    } else {
+        LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
+        reply->status = STATUS_NOT_ENOUGH_DATA;
+    }
+    reply->latencyMs = Module::kLatencyMs;
+    return !fatal;
+}
+
 const std::string StreamOutWorkerLogic::kThreadName = "writer";
 
 StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     StreamDescriptor::Reply reply{};
-    if (command.code == StreamContext::COMMAND_EXIT &&
+    if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
         command.fmqByteCount == mInternalCommandCookie) {
         LOG(DEBUG) << __func__ << ": received EXIT command";
+        setClosed();
         // This is an internal command, no need to reply.
         return Status::EXIT;
-    } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+    } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
+        LOG(DEBUG) << __func__ << ": received START read command";
+        switch (mState) {
+            case StreamDescriptor::State::STANDBY:
+                mState = StreamDescriptor::State::IDLE;
+                break;
+            case StreamDescriptor::State::PAUSED:
+                mState = StreamDescriptor::State::ACTIVE;
+                break;
+            case StreamDescriptor::State::DRAIN_PAUSED:
+                mState = StreamDescriptor::State::PAUSED;
+                break;
+            default:
+                LOG(WARNING) << __func__ << ": START command can not be handled in the state "
+                             << toString(mState);
+                reply.status = STATUS_INVALID_OPERATION;
+        }
+        if (reply.status != STATUS_INVALID_OPERATION) {
+            populateReply(&reply, mIsConnected);
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
         LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount
                    << " bytes";
-        const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
-                                           mDataMQ->availableToRead(), mDataBufferSize});
-        bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true;
-        if (success) {
-            const bool isConnected = mIsConnected;
-            LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ"
-                       << " succeeded; connected? " << isConnected;
-            // Frames are consumed and counted regardless of connection status.
-            reply.fmqByteCount = byteCount;
-            mFrameCount += byteCount / mFrameSize;
-            if (isConnected) {
-                reply.status = STATUS_OK;
-                reply.observable.frames = mFrameCount;
-                reply.observable.timeNs = ::android::elapsedRealtimeNano();
-            } else {
-                reply.status = STATUS_INVALID_OPERATION;
+        if (mState != StreamDescriptor::State::ERROR) {  // BURST can be handled in all valid states
+            if (!write(command.fmqByteCount, &reply)) {
+                mState = StreamDescriptor::State::ERROR;
             }
-            usleep(3000);  // Simulate a blocking call into the driver.
+            if (mState == StreamDescriptor::State::STANDBY ||
+                mState == StreamDescriptor::State::DRAIN_PAUSED) {
+                mState = StreamDescriptor::State::PAUSED;
+            } else if (mState == StreamDescriptor::State::IDLE ||
+                       mState == StreamDescriptor::State::DRAINING) {
+                mState = StreamDescriptor::State::ACTIVE;
+            }  // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
         } else {
-            LOG(WARNING) << __func__ << ": reading of " << byteCount
-                         << " bytes of data from MQ failed";
-            reply.status = STATUS_NOT_ENOUGH_DATA;
+            LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
         }
-        reply.latencyMs = Module::kLatencyMs;
+    } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received DRAIN write command";
+        if (mState == StreamDescriptor::State::ACTIVE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::IDLE;
+            // Since there is no actual hardware that would be draining the buffer,
+            // in order to simplify the reference code, we assume that draining
+            // happens instantly, thus skipping the 'DRAINING' state.
+            // TODO: Provide parametrization on the duration of draining to test
+            //       handling of commands during the 'DRAINING' state.
+        } else {
+            LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
+               command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received STANDBY write command";
+        if (mState == StreamDescriptor::State::IDLE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::STANDBY;
+        } else {
+            LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received PAUSE write command";
+        if (mState == StreamDescriptor::State::ACTIVE ||
+            mState == StreamDescriptor::State::DRAINING) {
+            populateReply(&reply, mIsConnected);
+            mState = mState == StreamDescriptor::State::ACTIVE
+                             ? StreamDescriptor::State::PAUSED
+                             : StreamDescriptor::State::DRAIN_PAUSED;
+        } else {
+            LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received FLUSH write command";
+        if (mState == StreamDescriptor::State::PAUSED ||
+            mState == StreamDescriptor::State::DRAIN_PAUSED) {
+            populateReply(&reply, mIsConnected);
+            mState = StreamDescriptor::State::IDLE;
+        } else {
+            LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
     } else {
         LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
                      << ") or count: " << command.fmqByteCount;
         reply.status = STATUS_BAD_VALUE;
     }
+    reply.state = mState;
     LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
     if (!mReplyMQ->writeBlocking(&reply, 1)) {
         LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     return Status::CONTINUE;
 }
 
+bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
+    const size_t readByteCount = mDataMQ->availableToRead();
+    // Amount of data that the HAL module is going to actually use.
+    const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
+    bool fatal = false;
+    if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
+        const bool isConnected = mIsConnected;
+        LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
+                   << " succeeded; connected? " << isConnected;
+        // Frames are consumed and counted regardless of connection status.
+        reply->fmqByteCount += byteCount;
+        mFrameCount += byteCount / mFrameSize;
+        populateReply(reply, isConnected);
+        usleep(3000);  // Simulate a blocking call into the driver.
+        // Set 'fatal = true' if a driver error occurs.
+    } else {
+        LOG(WARNING) << __func__ << ": reading of " << readByteCount
+                     << " bytes of data from MQ failed";
+        reply->status = STATUS_NOT_ENOUGH_DATA;
+    }
+    reply->latencyMs = Module::kLatencyMs;
+    return !fatal;
+}
+
 template <class Metadata, class StreamWorker>
 StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
-    if (!mIsClosed) {
+    if (!isClosed()) {
         LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
         stopWorker();
         // The worker and the context should clean up by themselves via destructors.
@@ -214,13 +403,13 @@
 template <class Metadata, class StreamWorker>
 ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
     LOG(DEBUG) << __func__;
-    if (!mIsClosed) {
+    if (!isClosed()) {
         stopWorker();
         LOG(DEBUG) << __func__ << ": joining the worker thread...";
         mWorker.stop();
         LOG(DEBUG) << __func__ << ": worker thread joined";
         mContext.reset();
-        mIsClosed = true;
+        mWorker.setClosed();
         return ndk::ScopedAStatus::ok();
     } else {
         LOG(ERROR) << __func__ << ": stream was already closed";
@@ -231,13 +420,14 @@
 template <class Metadata, class StreamWorker>
 void StreamCommon<Metadata, StreamWorker>::stopWorker() {
     if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
-        LOG(DEBUG) << __func__ << ": asking the worker to stop...";
+        LOG(DEBUG) << __func__ << ": asking the worker to exit...";
         StreamDescriptor::Command cmd;
-        cmd.code = StreamContext::COMMAND_EXIT;
+        cmd.code = StreamDescriptor::CommandCode(StreamContext::COMMAND_EXIT);
         cmd.fmqByteCount = mContext.getInternalCommandCookie();
-        // FIXME: This can block in the case when the client wrote a command
-        // while the stream worker's cycle is not running. Need to revisit
-        // when implementing standby and pause/resume.
+        // Note: never call 'pause' and 'resume' methods of StreamWorker
+        // in the HAL implementation. These methods are to be used by
+        // the client side only. Preventing the worker loop from running
+        // on the HAL side can cause a deadlock.
         if (!commandMQ->writeBlocking(&cmd, 1)) {
             LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
         }
@@ -248,7 +438,7 @@
 template <class Metadata, class StreamWorker>
 ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
     LOG(DEBUG) << __func__;
-    if (!mIsClosed) {
+    if (!isClosed()) {
         mMetadata = metadata;
         return ndk::ScopedAStatus::ok();
     }
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 488edf1..539fa8b 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -54,8 +54,10 @@
             int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
             DataMQ;
 
-    // Ensure that this value is not used by any of StreamDescriptor.COMMAND_*
-    static constexpr int COMMAND_EXIT = -1;
+    // Ensure that this value is not used by any of StreamDescriptor.CommandCode enums
+    static constexpr int32_t COMMAND_EXIT = -1;
+    // Ensure that this value is not used by any of StreamDescriptor.State enums
+    static constexpr int32_t STATE_CLOSED = -1;
 
     StreamContext() = default;
     StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
@@ -99,6 +101,10 @@
 
 class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
   public:
+    bool isClosed() const {
+        return static_cast<int32_t>(mState.load()) == StreamContext::STATE_CLOSED;
+    }
+    void setClosed() { mState = static_cast<StreamDescriptor::State>(StreamContext::STATE_CLOSED); }
     void setIsConnected(bool connected) { mIsConnected = connected; }
 
   protected:
@@ -109,9 +115,12 @@
           mReplyMQ(context.getReplyMQ()),
           mDataMQ(context.getDataMQ()) {}
     std::string init() override;
+    void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
 
-    // Used both by the main and worker threads.
+    // Atomic fields are used both by the main and worker threads.
     std::atomic<bool> mIsConnected = false;
+    static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
+    std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY;
     // All fields are used on the worker thread only.
     const int mInternalCommandCookie;
     const size_t mFrameSize;
@@ -132,6 +141,9 @@
 
   protected:
     Status cycle() override;
+
+  private:
+    bool read(size_t clientSize, StreamDescriptor::Reply* reply);
 };
 using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
 
@@ -143,6 +155,9 @@
 
   protected:
     Status cycle() override;
+
+  private:
+    bool write(size_t clientSize, StreamDescriptor::Reply* reply);
 };
 using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
 
@@ -155,7 +170,7 @@
                        ? ndk::ScopedAStatus::ok()
                        : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
     }
-    bool isClosed() const { return mIsClosed; }
+    bool isClosed() const { return mWorker.isClosed(); }
     void setIsConnected(bool connected) { mWorker.setIsConnected(connected); }
     ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
 
@@ -168,9 +183,6 @@
     Metadata mMetadata;
     StreamContext mContext;
     StreamWorker mWorker;
-    // This variable is checked in the destructor which can be called on an arbitrary Binder thread,
-    // thus we need to ensure that any changes made by other threads are sequentially consistent.
-    std::atomic<bool> mIsClosed = false;
 };
 
 class StreamIn