Merge changes Ibed7f4c3,I8a9f4f0b

* changes:
  audio: Add pause, resume, and standby stream operations
  audio: Add 'join' method to StreamWorker
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
index db1ac22..da24a10 100644
--- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
@@ -39,15 +39,34 @@
   int frameSizeBytes;
   long bufferSizeFrames;
   android.hardware.audio.core.StreamDescriptor.AudioBuffer audio;
-  const int COMMAND_BURST = 1;
+  const int LATENCY_UNKNOWN = -1;
   @FixedSize @VintfStability
   parcelable Position {
     long frames;
     long timeNs;
   }
+  @Backing(type="int") @VintfStability
+  enum State {
+    STANDBY = 1,
+    IDLE = 2,
+    ACTIVE = 3,
+    PAUSED = 4,
+    DRAINING = 5,
+    DRAIN_PAUSED = 6,
+    ERROR = 100,
+  }
+  @Backing(type="int") @VintfStability
+  enum CommandCode {
+    START = 1,
+    BURST = 2,
+    DRAIN = 3,
+    STANDBY = 4,
+    PAUSE = 5,
+    FLUSH = 6,
+  }
   @FixedSize @VintfStability
   parcelable Command {
-    int code;
+    android.hardware.audio.core.StreamDescriptor.CommandCode code = android.hardware.audio.core.StreamDescriptor.CommandCode.START;
     int fmqByteCount;
   }
   @FixedSize @VintfStability
@@ -57,6 +76,8 @@
     android.hardware.audio.core.StreamDescriptor.Position observable;
     android.hardware.audio.core.StreamDescriptor.Position hardware;
     int latencyMs;
+    int xrunFrames;
+    android.hardware.audio.core.StreamDescriptor.State state = android.hardware.audio.core.StreamDescriptor.State.STANDBY;
   }
   @VintfStability
   union AudioBuffer {
diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl
index 735f87f..0959840 100644
--- a/audio/aidl/android/hardware/audio/core/IModule.aidl
+++ b/audio/aidl/android/hardware/audio/core/IModule.aidl
@@ -263,6 +263,9 @@
      * be completing with an error, although data (zero filled) will still be
      * provided.
      *
+     * After the stream has been opened, it remains in the STANDBY state, see
+     * StreamDescriptor for more details.
+     *
      * @return An opened input stream and the associated descriptor.
      * @param args The pack of arguments, see 'OpenInputStreamArguments' parcelable.
      * @throws EX_ILLEGAL_ARGUMENT In the following cases:
@@ -325,6 +328,9 @@
      * StreamDescriptor will be completing with an error, although the data
      * will still be accepted and immediately discarded.
      *
+     * After the stream has been opened, it remains in the STANDBY state, see
+     * StreamDescriptor for more details.
+     *
      * @return An opened output stream and the associated descriptor.
      * @param args The pack of arguments, see 'OpenOutputStreamArguments' parcelable.
      * @throws EX_ILLEGAL_ARGUMENT In the following cases:
diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
index 2b1ed8c..e5e56fc 100644
--- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
@@ -33,6 +33,72 @@
  * internal components of the stream while serving commands invoked via the
  * stream's AIDL interface and commands invoked via the command queue of the
  * descriptor.
+ *
+ * There is a state machine defined for the stream, which executes on the
+ * thread handling the commands from the queue. The states are defined based
+ * on the model of idealized producer and consumer connected via a ring buffer.
+ * For input streams, the "producer" is hardware, the "consumer" is software,
+ * for outputs streams it's the opposite. When the producer is active, but
+ * the buffer is full, the following actions are possible:
+ *  - if the consumer is active, the producer blocks until there is space,
+ *    this behavior is only possible for software producers;
+ *  - if the consumer is passive:
+ *    - the producer can preserve the buffer contents—a s/w producer can
+ *      keep the data on their side, while a h/w producer can only drop captured
+ *      data in this case;
+ *    - or the producer overwrites old data in the buffer.
+ * Similarly, when an active consumer faces an empty buffer, it can:
+ *  - block until there is data (producer must be active), only possible
+ *    for software consumers;
+ *  - walk away with no data; when the consumer is hardware, it must emit
+ *    silence in this case.
+ *
+ * The model is defined below, note the asymmetry regarding the 'IDLE' state
+ * between input and output streams:
+ *
+ *  Producer | Buffer state | Consumer | Applies | State
+ *  active?  |              | active?  | to      |
+ * ==========|==============|==========|=========|==============================
+ *  No       | Empty        | No       | Both    | STANDBY
+ * ----------|--------------|----------|---------|-----------------------------
+ *  Yes      | Filling up   | No       | Input   | IDLE, overwrite behavior
+ * ----------|--------------|----------|---------|-----------------------------
+ *  No       | Empty        | Yes†     | Output  | IDLE, h/w emits silence
+ * ----------|--------------|----------|---------|-----------------------------
+ *  Yes      | Not empty    | Yes      | Both    | ACTIVE, s/w x-runs counted
+ * ----------|--------------|----------|---------|-----------------------------
+ *  Yes      | Filling up   | No       | Input   | PAUSED, drop behavior
+ * ----------|--------------|----------|---------|-----------------------------
+ *  Yes      | Filling up   | No†      | Output  | PAUSED, s/w stops writing once
+ *           |              |          |         | the buffer is filled up;
+ *           |              |          |         | h/w emits silence.
+ * ----------|--------------|----------|---------|-----------------------------
+ *  No       | Not empty    | Yes      | Both    | DRAINING
+ * ----------|--------------|----------|---------|-----------------------------
+ *  No       | Not empty    | No†      | Output  | DRAIN_PAUSED,
+ *           |              |          |         | h/w emits silence.
+ *
+ * † - note that for output, "buffer empty, h/w consuming" has the same outcome
+ *     as "buffer not empty, h/w not consuming", but logically these conditions
+ *     are different.
+ *
+ * State machines of both input and output streams start from the 'STANDBY'
+ * state.  Transitions between states happen naturally with changes in the
+ * states of the model elements. For simplicity, we restrict the change to one
+ * element only, for example, in the 'STANDBY' state, either the producer or the
+ * consumer can become active, but not both at the same time. States 'STANDBY',
+ * 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event,
+ * whereas a change from the 'DRAINING' state can happen with time as the buffer
+ * gets empty.
+ *
+ * The state machine for input streams is defined in the `stream-in-sm.gv` file,
+ * for output streams—in the `stream-out-sm.gv` file. State machines define how
+ * commands (from the enum 'CommandCode') trigger state changes. The full list
+ * of states and commands is defined by constants of the 'State' enum. Note that
+ * the 'CLOSED' state does not have a constant in the interface because the
+ * client can never observe a stream with a functioning command queue in this
+ * state. The 'ERROR' state is a special state which the state machine enters
+ * when an unrecoverable hardware error is detected by the HAL module.
  */
 @JavaDerive(equals=true, toString=true)
 @VintfStability
@@ -55,12 +121,110 @@
         long timeNs;
     }
 
-    /**
-     * The command used for audio I/O, see 'AudioBuffer'. For MMap No IRQ mode
-     * this command only provides updated positions and latency because actual
-     * audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
-     */
-    const int COMMAND_BURST = 1;
+    @VintfStability
+    @Backing(type="int")
+    enum State {
+        /**
+         * 'STANDBY' is the initial state of the stream, entered after
+         * opening. Since both the producer and the consumer are inactive in
+         * this state, it allows the HAL module to put associated hardware into
+         * "standby" mode to save power.
+         */
+        STANDBY = 1,
+        /**
+         * In the 'IDLE' state the audio hardware is active. For input streams,
+         * the hardware is filling buffer with captured data, overwriting old
+         * contents on buffer wraparounds. For output streams, the buffer is
+         * still empty, and the hardware is outputting zeroes. The HAL module
+         * must not account for any under- or overruns as the client is not
+         * expected to perform audio I/O.
+         */
+        IDLE = 2,
+        /**
+         * The active state of the stream in which it handles audio I/O. The HAL
+         * module can assume that the audio I/O will be periodic, thus inability
+         * of the client to provide or consume audio data on time must be
+         * considered as an under- or overrun and indicated via the 'xrunFrames'
+         * field of the reply.
+         */
+        ACTIVE = 3,
+        /**
+         * In the 'PAUSED' state the consumer is inactive. For input streams,
+         * the hardware stops updating the buffer as soon as it fills up (this
+         * is the difference from the 'IDLE' state). For output streams,
+         * "inactivity" of hardware means that it does not consume audio data,
+         * but rather emits silence.
+         */
+        PAUSED = 4,
+        /**
+         * In the 'DRAINING' state the producer is inactive, the consumer is
+         * finishing up on the buffer contents, emptying it up. As soon as it
+         * gets empty, the stream transfers itself into the next state.
+         */
+        DRAINING = 5,
+        /**
+         * Used for output streams only, pauses draining. This state is similar
+         * to the 'PAUSED' state, except that the client is not adding any
+         * new data. If it emits a 'BURST' command, this brings the stream
+         * into the regular 'PAUSED' state.
+         */
+        DRAIN_PAUSED = 6,
+        /**
+         * The ERROR state is entered when the stream has encountered an
+         * irrecoverable error from the lower layer. After entering it, the
+         * stream can only be closed.
+         */
+        ERROR = 100,
+    }
+
+    @VintfStability
+    @Backing(type="int")
+    enum CommandCode {
+        /**
+         * See the state machines on the applicability of this command to
+         * different states. The 'fmqByteCount' field must always be set to 0.
+         */
+        START = 1,
+        /**
+         * The BURST command used for audio I/O, see 'AudioBuffer'. Differences
+         * for the MMap No IRQ mode:
+         *
+         *  - this command only provides updated positions and latency because
+         *    actual audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
+         *    The client does not synchronize reads and writes into the buffer
+         *    with sending of this command.
+         *
+         *  - the 'fmqByteCount' must always be set to 0.
+         */
+        BURST = 2,
+        /**
+         * See the state machines on the applicability of this command to
+         * different states. The 'fmqByteCount' field must always be set to 0.
+         */
+        DRAIN = 3,
+        /**
+         * See the state machines on the applicability of this command to
+         * different states. The 'fmqByteCount' field must always be set to 0.
+         *
+         * Note that it's left on the discretion of the HAL implementation to
+         * assess all the necessary conditions that could prevent hardware from
+         * being suspended. Even if it can not be suspended, the state machine
+         * must still enter the 'STANDBY' state for consistency. Since the
+         * buffer must remain empty in this state, even if capturing hardware is
+         * still active, captured data must be discarded.
+         */
+        STANDBY = 4,
+        /**
+         * See the state machines on the applicability of this command to
+         * different states. The 'fmqByteCount' field must always be set to 0.
+         */
+        PAUSE = 5,
+        /**
+         * See the state machines on the applicability of this command to
+         * different states. The 'fmqByteCount' field must always be set to 0.
+         */
+        FLUSH = 6,
+    }
 
     /**
      * Used for sending commands to the HAL module. The client writes into
@@ -71,12 +235,16 @@
     @FixedSize
     parcelable Command {
         /**
-         * One of COMMAND_* codes.
+         * The code of the command.
          */
-        int code;
+        CommandCode code = CommandCode.START;
         /**
+         * This field is only used for the BURST command. For all other commands
+         * it must be set to 0. The following description applies to the use
+         * of this field for the BURST command.
+         *
          * For output streams: the amount of bytes that the client requests the
-         *   HAL module to read from the 'audio.fmq' queue.
+         *   HAL module to use out of the data contained in the 'audio.fmq' queue.
          * For input streams: the amount of bytes requested by the client to
          *   read from the hardware into the 'audio.fmq' queue.
          *
@@ -96,6 +264,12 @@
     MQDescriptor<Command, SynchronizedReadWrite> command;
 
     /**
+     * The value used for the 'Reply.latencyMs' field when the effective
+     * latency can not be reported by the HAL module.
+     */
+    const int LATENCY_UNKNOWN = -1;
+
+    /**
      * Used for providing replies to commands. The HAL module writes into
      * the queue, the client reads. The queue can only contain a single reply,
      * corresponding to the last command sent by the client.
@@ -107,17 +281,22 @@
          * One of Binder STATUS_* statuses:
          *  - STATUS_OK: the command has completed successfully;
          *  - STATUS_BAD_VALUE: invalid value in the 'Command' structure;
-         *  - STATUS_INVALID_OPERATION: the mix port is not connected
-         *                              to any producer or consumer, thus
-         *                              positions can not be reported;
+         *  - STATUS_INVALID_OPERATION: the command is not applicable in the
+         *                              current state of the stream, or to this
+         *                              type of the stream;
+         *  - STATUS_NO_INIT: positions can not be reported because the mix port
+         *                    is not connected to any producer or consumer, or
+         *                    because the HAL module does not support positions
+         *                    reporting for this AudioSource (on input streams).
          *  - STATUS_NOT_ENOUGH_DATA: a read or write error has
          *                            occurred for the 'audio.fmq' queue;
-         *
          */
         int status;
         /**
-         * For output streams: the amount of bytes actually consumed by the HAL
-         *   module from the 'audio.fmq' queue.
+         * Used with the BURST command only.
+         *
+         * For output streams: the amount of bytes of data actually consumed
+         *   by the HAL module.
          * For input streams: the amount of bytes actually provided by the HAL
          *   in the 'audio.fmq' queue.
          *
@@ -126,10 +305,18 @@
          */
         int fmqByteCount;
         /**
+         * It is recommended to report the current position for any command.
+         * If the position can not be reported, the 'status' field must be
+         * set to 'NO_INIT'.
+         *
          * For output streams: the moment when the specified stream position
          *   was presented to an external observer (i.e. presentation position).
          * For input streams: the moment when data at the specified stream position
          *   was acquired (i.e. capture position).
+         *
+         * The observable position must never be reset by the HAL module.
+         * The data type of the frame counter is large enough to support
+         * continuous counting for years of operation.
          */
         Position observable;
         /**
@@ -138,9 +325,22 @@
          */
         Position hardware;
         /**
-         * Current latency reported by the hardware.
+         * Current latency reported by the hardware. It is recommended to
+         * report the current latency for any command. If the value of latency
+         * can not be determined, this field must be set to 'LATENCY_UNKNOWN'.
          */
         int latencyMs;
+        /**
+         * Number of frames lost due to an underrun (for input streams),
+         * or not provided on time (for output streams) for the **previous**
+         * transfer operation.
+         */
+        int xrunFrames;
+        /**
+         * The state that the stream was in while the HAL module was sending the
+         * reply.
+         */
+        State state = State.STANDBY;
     }
     MQDescriptor<Reply, SynchronizedReadWrite> reply;
 
@@ -170,42 +370,59 @@
     @VintfStability
     union AudioBuffer {
         /**
-         * The fast message queue used for all modes except MMap No IRQ.  Both
-         * reads and writes into this queue are non-blocking because access to
-         * this queue is synchronized via the 'command' and 'reply' queues as
-         * described below. The queue nevertheless uses 'SynchronizedReadWrite'
-         * because there is only one reader, and the reading position must be
-         * shared.
+         * The fast message queue used for BURST commands in all modes except
+         * MMap No IRQ. Both reads and writes into this queue are non-blocking
+         * because access to this queue is synchronized via the 'command' and
+         * 'reply' queues as described below. The queue nevertheless uses
+         * 'SynchronizedReadWrite' because there is only one reader, and the
+         * reading position must be shared.
+         *
+         * Note that the fast message queue is a transient buffer, only used for
+         * data transfer. Neither of the sides can use it to store any data
+         * outside of the 'BURST' operation. The consumer must always retrieve
+         * all data available in the fast message queue, even if it can not use
+         * it. The producer must re-send any unconsumed data on the next
+         * transfer operation. This restriction is posed in order to make the
+         * fast message queue fully transparent from the latency perspective.
          *
          * For output streams the following sequence of operations is used:
          *  1. The client writes audio data into the 'audio.fmq' queue.
-         *  2. The client writes the 'BURST' command into the 'command' queue,
+         *  2. The client writes the BURST command into the 'command' queue,
          *     and hangs on waiting on a read from the 'reply' queue.
          *  3. The high priority thread in the HAL module wakes up due to 2.
-         *  4. The HAL module reads the command and audio data.
+         *  4. The HAL module reads the command and audio data. According
+         *     to the statement above, the HAL module must always read
+         *     from the FMQ all the data it contains. The amount of data that
+         *     the HAL module has actually consumed is indicated to the client
+         *     via the 'reply.fmqByteCount' field.
          *  5. The HAL module writes the command status and current positions
          *     into 'reply' queue, and hangs on waiting on a read from
          *     the 'command' queue.
          *  6. The client wakes up due to 5. and reads the reply.
          *
          * For input streams the following sequence of operations is used:
-         *  1. The client writes the 'BURST' command into the 'command' queue,
+         *  1. The client writes the BURST command into the 'command' queue,
          *     and hangs on waiting on a read from the 'reply' queue.
          *  2. The high priority thread in the HAL module wakes up due to 1.
          *  3. The HAL module writes audio data into the 'audio.fmq' queue.
+         *     The value of 'reply.fmqByteCount' must be the equal to the amount
+         *     of data in the queue.
          *  4. The HAL module writes the command status and current positions
          *     into 'reply' queue, and hangs on waiting on a read from
          *     the 'command' queue.
          *  5. The client wakes up due to 4.
-         *  6. The client reads the reply and audio data.
+         *  6. The client reads the reply and audio data. The client must
+         *     always read from the FMQ all the data it contains.
+         *
          */
         MQDescriptor<byte, SynchronizedReadWrite> fmq;
         /**
          * MMap buffers are shared directly with the DSP, which operates
-         * independently from the CPU. Writes and reads into these buffers
-         * are not synchronized with 'command' and 'reply' queues. However,
-         * the client still uses the 'BURST' command for obtaining current
-         * positions from the HAL module.
+         * independently from the CPU. Writes and reads into these buffers are
+         * not synchronized with 'command' and 'reply' queues. However, the
+         * client still uses the same commands for controlling the audio data
+         * exchange and for obtaining current positions and latency from the HAL
+         * module.
          */
         MmapBufferDescriptor mmap;
     }
diff --git a/audio/aidl/android/hardware/audio/core/stream-in-sm.gv b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv
new file mode 100644
index 0000000..889a14b
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv
@@ -0,0 +1,42 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// To render: dot -Tpng stream-in-sm.gv -o stream-in-sm.png
+digraph stream_in_state_machine {
+    node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
+    node [shape=point width=0.5] F;
+    node [shape=oval width=1];
+    node [fillcolor=lightgreen] STANDBY;  // buffer is empty
+    node [fillcolor=tomato] CLOSED;
+    node [fillcolor=tomato] ERROR;
+    node [style=dashed] ANY_STATE;
+    node [fillcolor=lightblue style=filled];
+    I -> STANDBY;
+    STANDBY -> IDLE [label="START"];    // producer -> active
+    IDLE -> STANDBY [label="STANDBY"];  // producer -> passive, buffer is cleared
+    IDLE -> ACTIVE [label="BURST"];     // consumer -> active
+    ACTIVE -> ACTIVE [label="BURST"];
+    ACTIVE -> PAUSED [label="PAUSE"];   // consumer -> passive
+    ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive
+    PAUSED -> ACTIVE [label="BURST"];   // consumer -> active
+    PAUSED -> STANDBY [label="FLUSH"];  // producer -> passive, buffer is cleared
+    DRAINING -> DRAINING [label="BURST"];
+    DRAINING -> ACTIVE [label="START"];  // producer -> active
+    DRAINING -> STANDBY [label="<empty buffer>"];  // consumer deactivates
+    IDLE -> ERROR [label="<hardware failure>"];
+    ACTIVE -> ERROR [label="<hardware failure>"];
+    PAUSED -> ERROR [label="<hardware failure>"];
+    ANY_STATE -> CLOSED [label="→IStream*.close"];
+    CLOSED -> F;
+}
diff --git a/audio/aidl/android/hardware/audio/core/stream-out-sm.gv b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv
new file mode 100644
index 0000000..56dd5290
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv
@@ -0,0 +1,48 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// To render: dot -Tpng stream-out-sm.gv -o stream-out-sm.png
+digraph stream_out_state_machine {
+    node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
+    node [shape=point width=0.5] F;
+    node [shape=oval width=1];
+    node [fillcolor=lightgreen] STANDBY;  // buffer is empty
+    node [fillcolor=lightgreen] IDLE;     // buffer is empty
+    node [fillcolor=tomato] CLOSED;
+    node [fillcolor=tomato] ERROR;
+    node [style=dashed] ANY_STATE;
+    node [fillcolor=lightblue style=filled];
+    I -> STANDBY;
+    STANDBY -> IDLE [label="START"];           // consumer -> active
+    STANDBY -> PAUSED [label="BURST"];         // producer -> active
+    IDLE -> STANDBY [label="STANDBY"];         // consumer -> passive
+    IDLE -> ACTIVE [label="BURST"];            // producer -> active
+    ACTIVE -> ACTIVE [label="BURST"];
+    ACTIVE -> PAUSED [label="PAUSE"];          // consumer -> passive (not consuming)
+    ACTIVE -> DRAINING [label="DRAIN"];        // producer -> passive
+    PAUSED -> PAUSED [label="BURST"];
+    PAUSED -> ACTIVE [label="START"];          // consumer -> active
+    PAUSED -> IDLE [label="FLUSH"];            // producer -> passive, buffer is cleared
+    DRAINING -> IDLE [label="<empty buffer>"];
+    DRAINING -> ACTIVE [label="BURST"];        // producer -> active
+    DRAINING -> DRAIN_PAUSED [label="PAUSE"];  // consumer -> passive (not consuming)
+    DRAIN_PAUSED -> DRAINING [label="START"];  // consumer -> active
+    DRAIN_PAUSED -> PAUSED [label="BURST"];    // producer -> active
+    DRAIN_PAUSED -> IDLE [label="FLUSH"];      // buffer is cleared
+    IDLE -> ERROR [label="<hardware failure>"];
+    ACTIVE -> ERROR [label="<hardware failure>"];
+    DRAINING -> ERROR [label="<hardware failure>"];
+    ANY_STATE -> CLOSED [label="→IStream*.close"];
+    CLOSED -> F;
+}
diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp
index 9bca760..dda0e4a 100644
--- a/audio/aidl/common/StreamWorker.cpp
+++ b/audio/aidl/common/StreamWorker.cpp
@@ -44,6 +44,10 @@
             mWorkerStateChangeRequest = true;
         }
     }
+    join();
+}
+
+void ThreadController::join() {
     if (mWorker.joinable()) {
         mWorker.join();
     }
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 6260eca..ab2ec26 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -39,6 +39,9 @@
     ~ThreadController() { stop(); }
 
     bool start(const std::string& name, int priority);
+    // Note: 'pause' and 'resume' methods should only be used on the "driving" side.
+    // In the case of audio HAL I/O, the driving side is the client, because the HAL
+    // implementation always blocks on getting a command.
     void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
     void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
     bool hasError() {
@@ -50,6 +53,10 @@
         return mError;
     }
     void stop();
+    // Direct use of 'join' assumes that the StreamLogic is not intended
+    // to run forever, and is guaranteed to exit by itself. This normally
+    // only happen in tests.
+    void join();
     bool waitForAtLeastOneCycle();
 
     // Only used by unit tests.
@@ -133,7 +140,8 @@
     void resume() { mThread.resume(); }
     bool hasError() { return mThread.hasError(); }
     std::string getError() { return mThread.getError(); }
-    void stop() { return mThread.stop(); }
+    void stop() { mThread.stop(); }
+    void join() { mThread.join(); }
     bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
 
     // Only used by unit tests.
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index e3e484d..8ea8424 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -160,6 +160,14 @@
     EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
 }
 
+TEST_P(StreamWorkerTest, WorkerJoin) {
+    ASSERT_TRUE(worker.start());
+    stream.setStopStatus();
+    worker.join();
+    EXPECT_FALSE(worker.hasError());
+    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
+}
+
 TEST_P(StreamWorkerTest, WorkerError) {
     ASSERT_TRUE(worker.start());
     stream.setErrorStatus();
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 312df72..7b544a1 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -85,126 +85,315 @@
     return "";
 }
 
+void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
+                                            bool isConnected) const {
+    if (isConnected) {
+        reply->status = STATUS_OK;
+        reply->observable.frames = mFrameCount;
+        reply->observable.timeNs = ::android::elapsedRealtimeNano();
+    } else {
+        reply->status = STATUS_NO_INIT;
+    }
+}
+
 const std::string StreamInWorkerLogic::kThreadName = "reader";
 
 StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     StreamDescriptor::Reply reply{};
-    if (command.code == StreamContext::COMMAND_EXIT &&
+    if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
         command.fmqByteCount == mInternalCommandCookie) {
         LOG(DEBUG) << __func__ << ": received EXIT command";
+        setClosed();
         // This is an internal command, no need to reply.
         return Status::EXIT;
-    } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+    } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
+        LOG(DEBUG) << __func__ << ": received START read command";
+        if (mState == StreamDescriptor::State::STANDBY ||
+            mState == StreamDescriptor::State::DRAINING) {
+            populateReply(&reply, mIsConnected);
+            mState = mState == StreamDescriptor::State::STANDBY ? StreamDescriptor::State::IDLE
+                                                                : StreamDescriptor::State::ACTIVE;
+        } else {
+            LOG(WARNING) << __func__ << ": START command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
         LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount
                    << " bytes";
-        usleep(3000);  // Simulate a blocking call into the driver.
-        const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
-                                           mDataMQ->availableToWrite(), mDataBufferSize});
-        const bool isConnected = mIsConnected;
-        // Simulate reading of data, or provide zeroes if the stream is not connected.
-        for (size_t i = 0; i < byteCount; ++i) {
-            using buffer_type = decltype(mDataBuffer)::element_type;
-            constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
-                                              std::numeric_limits<buffer_type>::min() + 1;
-            mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
-                                                   std::numeric_limits<buffer_type>::min()
-                                         : 0;
-        }
-        bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true;
-        if (success) {
-            LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
-                       << " succeeded; connected? " << isConnected;
-            // Frames are provided and counted regardless of connection status.
-            reply.fmqByteCount = byteCount;
-            mFrameCount += byteCount / mFrameSize;
-            if (isConnected) {
-                reply.status = STATUS_OK;
-                reply.observable.frames = mFrameCount;
-                reply.observable.timeNs = ::android::elapsedRealtimeNano();
-            } else {
-                reply.status = STATUS_INVALID_OPERATION;
+        if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE ||
+            mState == StreamDescriptor::State::PAUSED ||
+            mState == StreamDescriptor::State::DRAINING) {
+            if (!read(command.fmqByteCount, &reply)) {
+                mState = StreamDescriptor::State::ERROR;
+            }
+            if (mState == StreamDescriptor::State::IDLE ||
+                mState == StreamDescriptor::State::PAUSED) {
+                mState = StreamDescriptor::State::ACTIVE;
+            } else if (mState == StreamDescriptor::State::DRAINING) {
+                // To simplify the reference code, we assume that the read operation
+                // has consumed all the data remaining in the hardware buffer.
+                // TODO: Provide parametrization on the duration of draining to test
+                //       handling of commands during the 'DRAINING' state.
+                mState = StreamDescriptor::State::STANDBY;
             }
         } else {
-            LOG(WARNING) << __func__ << ": writing of " << byteCount
-                         << " bytes of data to MQ failed";
-            reply.status = STATUS_NOT_ENOUGH_DATA;
+            LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
         }
-        reply.latencyMs = Module::kLatencyMs;
+    } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received DRAIN read command";
+        if (mState == StreamDescriptor::State::ACTIVE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::DRAINING;
+        } else {
+            LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received PAUSE read command";
+        if (mState == StreamDescriptor::State::ACTIVE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::PAUSED;
+        } else {
+            LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received FLUSH read command";
+        if (mState == StreamDescriptor::State::PAUSED) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::STANDBY;
+        } else {
+            LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
+               command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received STANDBY read command";
+        if (mState == StreamDescriptor::State::IDLE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::STANDBY;
+        } else {
+            LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
     } else {
         LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
                      << ") or count: " << command.fmqByteCount;
         reply.status = STATUS_BAD_VALUE;
     }
+    reply.state = mState;
     LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
     if (!mReplyMQ->writeBlocking(&reply, 1)) {
         LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     return Status::CONTINUE;
 }
 
+bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
+    // Can switch the state to ERROR if a driver error occurs.
+    const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
+    const bool isConnected = mIsConnected;
+    bool fatal = false;
+    // Simulate reading of data, or provide zeroes if the stream is not connected.
+    for (size_t i = 0; i < byteCount; ++i) {
+        using buffer_type = decltype(mDataBuffer)::element_type;
+        constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
+                                          std::numeric_limits<buffer_type>::min() + 1;
+        mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
+                                               std::numeric_limits<buffer_type>::min()
+                                     : 0;
+    }
+    usleep(3000);  // Simulate a blocking call into the driver.
+    // Set 'fatal = true' if a driver error occurs.
+    if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
+        LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+                   << " succeeded; connected? " << isConnected;
+        // Frames are provided and counted regardless of connection status.
+        reply->fmqByteCount += byteCount;
+        mFrameCount += byteCount / mFrameSize;
+        populateReply(reply, isConnected);
+    } else {
+        LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
+        reply->status = STATUS_NOT_ENOUGH_DATA;
+    }
+    reply->latencyMs = Module::kLatencyMs;
+    return !fatal;
+}
+
 const std::string StreamOutWorkerLogic::kThreadName = "writer";
 
 StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     StreamDescriptor::Reply reply{};
-    if (command.code == StreamContext::COMMAND_EXIT &&
+    if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
         command.fmqByteCount == mInternalCommandCookie) {
         LOG(DEBUG) << __func__ << ": received EXIT command";
+        setClosed();
         // This is an internal command, no need to reply.
         return Status::EXIT;
-    } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+    } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
+        LOG(DEBUG) << __func__ << ": received START read command";
+        switch (mState) {
+            case StreamDescriptor::State::STANDBY:
+                mState = StreamDescriptor::State::IDLE;
+                break;
+            case StreamDescriptor::State::PAUSED:
+                mState = StreamDescriptor::State::ACTIVE;
+                break;
+            case StreamDescriptor::State::DRAIN_PAUSED:
+                mState = StreamDescriptor::State::PAUSED;
+                break;
+            default:
+                LOG(WARNING) << __func__ << ": START command can not be handled in the state "
+                             << toString(mState);
+                reply.status = STATUS_INVALID_OPERATION;
+        }
+        if (reply.status != STATUS_INVALID_OPERATION) {
+            populateReply(&reply, mIsConnected);
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
         LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount
                    << " bytes";
-        const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
-                                           mDataMQ->availableToRead(), mDataBufferSize});
-        bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true;
-        if (success) {
-            const bool isConnected = mIsConnected;
-            LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ"
-                       << " succeeded; connected? " << isConnected;
-            // Frames are consumed and counted regardless of connection status.
-            reply.fmqByteCount = byteCount;
-            mFrameCount += byteCount / mFrameSize;
-            if (isConnected) {
-                reply.status = STATUS_OK;
-                reply.observable.frames = mFrameCount;
-                reply.observable.timeNs = ::android::elapsedRealtimeNano();
-            } else {
-                reply.status = STATUS_INVALID_OPERATION;
+        if (mState != StreamDescriptor::State::ERROR) {  // BURST can be handled in all valid states
+            if (!write(command.fmqByteCount, &reply)) {
+                mState = StreamDescriptor::State::ERROR;
             }
-            usleep(3000);  // Simulate a blocking call into the driver.
+            if (mState == StreamDescriptor::State::STANDBY ||
+                mState == StreamDescriptor::State::DRAIN_PAUSED) {
+                mState = StreamDescriptor::State::PAUSED;
+            } else if (mState == StreamDescriptor::State::IDLE ||
+                       mState == StreamDescriptor::State::DRAINING) {
+                mState = StreamDescriptor::State::ACTIVE;
+            }  // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
         } else {
-            LOG(WARNING) << __func__ << ": reading of " << byteCount
-                         << " bytes of data from MQ failed";
-            reply.status = STATUS_NOT_ENOUGH_DATA;
+            LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
         }
-        reply.latencyMs = Module::kLatencyMs;
+    } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received DRAIN write command";
+        if (mState == StreamDescriptor::State::ACTIVE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::IDLE;
+            // Since there is no actual hardware that would be draining the buffer,
+            // in order to simplify the reference code, we assume that draining
+            // happens instantly, thus skipping the 'DRAINING' state.
+            // TODO: Provide parametrization on the duration of draining to test
+            //       handling of commands during the 'DRAINING' state.
+        } else {
+            LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
+               command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received STANDBY write command";
+        if (mState == StreamDescriptor::State::IDLE) {
+            usleep(1000);  // Simulate a blocking call into the driver.
+            populateReply(&reply, mIsConnected);
+            // Can switch the state to ERROR if a driver error occurs.
+            mState = StreamDescriptor::State::STANDBY;
+        } else {
+            LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received PAUSE write command";
+        if (mState == StreamDescriptor::State::ACTIVE ||
+            mState == StreamDescriptor::State::DRAINING) {
+            populateReply(&reply, mIsConnected);
+            mState = mState == StreamDescriptor::State::ACTIVE
+                             ? StreamDescriptor::State::PAUSED
+                             : StreamDescriptor::State::DRAIN_PAUSED;
+        } else {
+            LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
+    } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
+        LOG(DEBUG) << __func__ << ": received FLUSH write command";
+        if (mState == StreamDescriptor::State::PAUSED ||
+            mState == StreamDescriptor::State::DRAIN_PAUSED) {
+            populateReply(&reply, mIsConnected);
+            mState = StreamDescriptor::State::IDLE;
+        } else {
+            LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+                         << toString(mState);
+            reply.status = STATUS_INVALID_OPERATION;
+        }
     } else {
         LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
                      << ") or count: " << command.fmqByteCount;
         reply.status = STATUS_BAD_VALUE;
     }
+    reply.state = mState;
     LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
     if (!mReplyMQ->writeBlocking(&reply, 1)) {
         LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+        mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
     return Status::CONTINUE;
 }
 
+bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
+    const size_t readByteCount = mDataMQ->availableToRead();
+    // Amount of data that the HAL module is going to actually use.
+    const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
+    bool fatal = false;
+    if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
+        const bool isConnected = mIsConnected;
+        LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
+                   << " succeeded; connected? " << isConnected;
+        // Frames are consumed and counted regardless of connection status.
+        reply->fmqByteCount += byteCount;
+        mFrameCount += byteCount / mFrameSize;
+        populateReply(reply, isConnected);
+        usleep(3000);  // Simulate a blocking call into the driver.
+        // Set 'fatal = true' if a driver error occurs.
+    } else {
+        LOG(WARNING) << __func__ << ": reading of " << readByteCount
+                     << " bytes of data from MQ failed";
+        reply->status = STATUS_NOT_ENOUGH_DATA;
+    }
+    reply->latencyMs = Module::kLatencyMs;
+    return !fatal;
+}
+
 template <class Metadata, class StreamWorker>
 StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
-    if (!mIsClosed) {
+    if (!isClosed()) {
         LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
         stopWorker();
         // The worker and the context should clean up by themselves via destructors.
@@ -214,13 +403,13 @@
 template <class Metadata, class StreamWorker>
 ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
     LOG(DEBUG) << __func__;
-    if (!mIsClosed) {
+    if (!isClosed()) {
         stopWorker();
         LOG(DEBUG) << __func__ << ": joining the worker thread...";
         mWorker.stop();
         LOG(DEBUG) << __func__ << ": worker thread joined";
         mContext.reset();
-        mIsClosed = true;
+        mWorker.setClosed();
         return ndk::ScopedAStatus::ok();
     } else {
         LOG(ERROR) << __func__ << ": stream was already closed";
@@ -231,13 +420,14 @@
 template <class Metadata, class StreamWorker>
 void StreamCommon<Metadata, StreamWorker>::stopWorker() {
     if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
-        LOG(DEBUG) << __func__ << ": asking the worker to stop...";
+        LOG(DEBUG) << __func__ << ": asking the worker to exit...";
         StreamDescriptor::Command cmd;
-        cmd.code = StreamContext::COMMAND_EXIT;
+        cmd.code = StreamDescriptor::CommandCode(StreamContext::COMMAND_EXIT);
         cmd.fmqByteCount = mContext.getInternalCommandCookie();
-        // FIXME: This can block in the case when the client wrote a command
-        // while the stream worker's cycle is not running. Need to revisit
-        // when implementing standby and pause/resume.
+        // Note: never call 'pause' and 'resume' methods of StreamWorker
+        // in the HAL implementation. These methods are to be used by
+        // the client side only. Preventing the worker loop from running
+        // on the HAL side can cause a deadlock.
         if (!commandMQ->writeBlocking(&cmd, 1)) {
             LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
         }
@@ -248,7 +438,7 @@
 template <class Metadata, class StreamWorker>
 ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
     LOG(DEBUG) << __func__;
-    if (!mIsClosed) {
+    if (!isClosed()) {
         mMetadata = metadata;
         return ndk::ScopedAStatus::ok();
     }
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 488edf1..539fa8b 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -54,8 +54,10 @@
             int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
             DataMQ;
 
-    // Ensure that this value is not used by any of StreamDescriptor.COMMAND_*
-    static constexpr int COMMAND_EXIT = -1;
+    // Ensure that this value is not used by any of StreamDescriptor.CommandCode enums
+    static constexpr int32_t COMMAND_EXIT = -1;
+    // Ensure that this value is not used by any of StreamDescriptor.State enums
+    static constexpr int32_t STATE_CLOSED = -1;
 
     StreamContext() = default;
     StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
@@ -99,6 +101,10 @@
 
 class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
   public:
+    bool isClosed() const {
+        return static_cast<int32_t>(mState.load()) == StreamContext::STATE_CLOSED;
+    }
+    void setClosed() { mState = static_cast<StreamDescriptor::State>(StreamContext::STATE_CLOSED); }
     void setIsConnected(bool connected) { mIsConnected = connected; }
 
   protected:
@@ -109,9 +115,12 @@
           mReplyMQ(context.getReplyMQ()),
           mDataMQ(context.getDataMQ()) {}
     std::string init() override;
+    void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
 
-    // Used both by the main and worker threads.
+    // Atomic fields are used both by the main and worker threads.
     std::atomic<bool> mIsConnected = false;
+    static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
+    std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY;
     // All fields are used on the worker thread only.
     const int mInternalCommandCookie;
     const size_t mFrameSize;
@@ -132,6 +141,9 @@
 
   protected:
     Status cycle() override;
+
+  private:
+    bool read(size_t clientSize, StreamDescriptor::Reply* reply);
 };
 using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
 
@@ -143,6 +155,9 @@
 
   protected:
     Status cycle() override;
+
+  private:
+    bool write(size_t clientSize, StreamDescriptor::Reply* reply);
 };
 using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
 
@@ -155,7 +170,7 @@
                        ? ndk::ScopedAStatus::ok()
                        : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
     }
-    bool isClosed() const { return mIsClosed; }
+    bool isClosed() const { return mWorker.isClosed(); }
     void setIsConnected(bool connected) { mWorker.setIsConnected(connected); }
     ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
 
@@ -168,9 +183,6 @@
     Metadata mMetadata;
     StreamContext mContext;
     StreamWorker mWorker;
-    // This variable is checked in the destructor which can be called on an arbitrary Binder thread,
-    // thus we need to ensure that any changes made by other threads are sequentially consistent.
-    std::atomic<bool> mIsClosed = false;
 };
 
 class StreamIn
diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
index 2381200..b415da4 100644
--- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
@@ -34,6 +34,7 @@
 #include <aidl/android/media/audio/common/AudioIoFlags.h>
 #include <aidl/android/media/audio/common/AudioOutputFlags.h>
 #include <android-base/chrono_utils.h>
+#include <android/binder_enums.h>
 #include <fmq/AidlMessageQueue.h>
 
 #include "AudioHalBinderServiceUtil.h"
@@ -69,6 +70,7 @@
 using android::hardware::audio::common::isBitPositionFlagSet;
 using android::hardware::audio::common::StreamLogic;
 using android::hardware::audio::common::StreamWorker;
+using ndk::enum_range;
 using ndk::ScopedAStatus;
 
 template <typename T>
@@ -171,25 +173,26 @@
     AudioPortConfig mConfig;
 };
 
-class AudioCoreModule : public testing::TestWithParam<std::string> {
+// Can be used as a base for any test here, does not depend on the fixture GTest parameters.
+class AudioCoreModuleBase {
   public:
     // The default buffer size is used mostly for negative tests.
     static constexpr int kDefaultBufferSizeFrames = 256;
 
-    void SetUp() override {
-        ASSERT_NO_FATAL_FAILURE(ConnectToService());
+    void SetUpImpl(const std::string& moduleName) {
+        ASSERT_NO_FATAL_FAILURE(ConnectToService(moduleName));
         debug.flags().simulateDeviceConnections = true;
         ASSERT_NO_FATAL_FAILURE(debug.SetUp(module.get()));
     }
 
-    void TearDown() override {
+    void TearDownImpl() {
         if (module != nullptr) {
             EXPECT_IS_OK(module->setModuleDebug(ModuleDebug{}));
         }
     }
 
-    void ConnectToService() {
-        module = IModule::fromBinder(binderUtil.connectToService(GetParam()));
+    void ConnectToService(const std::string& moduleName) {
+        module = IModule::fromBinder(binderUtil.connectToService(moduleName));
         ASSERT_NE(module, nullptr);
     }
 
@@ -269,6 +272,13 @@
     WithDebugFlags debug;
 };
 
+class AudioCoreModule : public AudioCoreModuleBase, public testing::TestWithParam<std::string> {
+  public:
+    void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpImpl(GetParam())); }
+
+    void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownImpl()); }
+};
+
 class WithDevicePortConnectedState {
   public:
     explicit WithDevicePortConnectedState(const AudioPort& idAndData) : mIdAndData(idAndData) {}
@@ -352,21 +362,36 @@
     std::unique_ptr<DataMQ> mDataMQ;
 };
 
-class StreamCommonLogic : public StreamLogic {
+class StreamLogicDriver {
   public:
-    StreamDescriptor::Position getLastObservablePosition() {
-        std::lock_guard<std::mutex> lock(mLock);
-        return mLastReply.observable;
-    }
+    virtual ~StreamLogicDriver() = default;
+    // Return 'true' to stop the worker.
+    virtual bool done() = 0;
+    // For 'Writer' logic, if the 'actualSize' is 0, write is skipped.
+    // The 'fmqByteCount' from the returned command is passed as is to the HAL.
+    virtual StreamDescriptor::Command getNextCommand(int maxDataSize,
+                                                     int* actualSize = nullptr) = 0;
+    // Return 'true' to indicate that no further processing is needed,
+    // for example, the driver is expecting a bad status to be returned.
+    // The logic cycle will return with 'CONTINUE' status. Otherwise,
+    // the reply will be validated and then passed to 'processValidReply'.
+    virtual bool interceptRawReply(const StreamDescriptor::Reply& reply) = 0;
+    // Return 'false' to indicate that the contents of the reply are unexpected.
+    // Will abort the logic cycle.
+    virtual bool processValidReply(const StreamDescriptor::Reply& reply) = 0;
+};
 
+class StreamCommonLogic : public StreamLogic {
   protected:
-    explicit StreamCommonLogic(const StreamContext& context)
+    StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver)
         : mCommandMQ(context.getCommandMQ()),
           mReplyMQ(context.getReplyMQ()),
           mDataMQ(context.getDataMQ()),
-          mData(context.getBufferSizeBytes()) {}
+          mData(context.getBufferSizeBytes()),
+          mDriver(driver) {}
     StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; }
     StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; }
+    StreamLogicDriver* getDriver() const { return mDriver; }
 
     std::string init() override { return ""; }
 
@@ -374,19 +399,20 @@
     StreamContext::ReplyMQ* mReplyMQ;
     StreamContext::DataMQ* mDataMQ;
     std::vector<int8_t> mData;
-    std::mutex mLock;
-    StreamDescriptor::Reply mLastReply GUARDED_BY(mLock);
+    StreamLogicDriver* const mDriver;
 };
 
 class StreamReaderLogic : public StreamCommonLogic {
   public:
-    explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+    StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver)
+        : StreamCommonLogic(context, driver) {}
 
   protected:
     Status cycle() override {
-        StreamDescriptor::Command command{};
-        command.code = StreamDescriptor::COMMAND_BURST;
-        command.fmqByteCount = mData.size();
+        if (getDriver()->done()) {
+            return Status::EXIT;
+        }
+        StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size());
         if (!mCommandMQ->writeBlocking(&command, 1)) {
             LOG(ERROR) << __func__ << ": writing of command into MQ failed";
             return Status::ABORT;
@@ -396,6 +422,9 @@
             LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
             return Status::ABORT;
         }
+        if (getDriver()->interceptRawReply(reply)) {
+            return Status::CONTINUE;
+        }
         if (reply.status != STATUS_OK) {
             LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
             return Status::ABORT;
@@ -405,16 +434,41 @@
                        << ": received invalid byte count in the reply: " << reply.fmqByteCount;
             return Status::ABORT;
         }
-        {
-            std::lock_guard<std::mutex> lock(mLock);
-            mLastReply = reply;
+        if (static_cast<size_t>(reply.fmqByteCount) != mDataMQ->availableToRead()) {
+            LOG(ERROR) << __func__
+                       << ": the byte count in the reply is not the same as the amount of "
+                       << "data available in the MQ: " << reply.fmqByteCount
+                       << " != " << mDataMQ->availableToRead();
         }
-        const size_t readCount = std::min({mDataMQ->availableToRead(),
-                                           static_cast<size_t>(reply.fmqByteCount), mData.size()});
-        if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) {
+        if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
+            LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
+            return Status::ABORT;
+        }
+        if (reply.xrunFrames < 0) {
+            LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames;
+            return Status::ABORT;
+        }
+        if (std::find(enum_range<StreamDescriptor::State>().begin(),
+                      enum_range<StreamDescriptor::State>().end(),
+                      reply.state) == enum_range<StreamDescriptor::State>().end()) {
+            LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state);
+            return Status::ABORT;
+        }
+        const bool acceptedReply = getDriver()->processValidReply(reply);
+        if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) {
+            std::vector<int8_t> data(readCount);
+            if (mDataMQ->read(data.data(), readCount)) {
+                memcpy(mData.data(), data.data(), std::min(mData.size(), data.size()));
+                goto checkAcceptedReply;
+            }
+            LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
+            return Status::ABORT;
+        }  // readCount == 0
+    checkAcceptedReply:
+        if (acceptedReply) {
             return Status::CONTINUE;
         }
-        LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
+        LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
         return Status::ABORT;
     }
 };
@@ -422,17 +476,20 @@
 
 class StreamWriterLogic : public StreamCommonLogic {
   public:
-    explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+    StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver)
+        : StreamCommonLogic(context, driver) {}
 
   protected:
     Status cycle() override {
-        if (!mDataMQ->write(mData.data(), mData.size())) {
+        if (getDriver()->done()) {
+            return Status::EXIT;
+        }
+        int actualSize = 0;
+        StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize);
+        if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) {
             LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
             return Status::ABORT;
         }
-        StreamDescriptor::Command command{};
-        command.code = StreamDescriptor::COMMAND_BURST;
-        command.fmqByteCount = mData.size();
         if (!mCommandMQ->writeBlocking(&command, 1)) {
             LOG(ERROR) << __func__ << ": writing of command into MQ failed";
             return Status::ABORT;
@@ -442,6 +499,9 @@
             LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
             return Status::ABORT;
         }
+        if (getDriver()->interceptRawReply(reply)) {
+            return Status::CONTINUE;
+        }
         if (reply.status != STATUS_OK) {
             LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
             return Status::ABORT;
@@ -451,11 +511,31 @@
                        << ": received invalid byte count in the reply: " << reply.fmqByteCount;
             return Status::ABORT;
         }
-        {
-            std::lock_guard<std::mutex> lock(mLock);
-            mLastReply = reply;
+        if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) {
+            LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: "
+                       << "available to write " << mDataMQ->availableToWrite()
+                       << ", total size: " << mDataMQ->getQuantumCount();
+            return Status::ABORT;
         }
-        return Status::CONTINUE;
+        if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
+            LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
+            return Status::ABORT;
+        }
+        if (reply.xrunFrames < 0) {
+            LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames;
+            return Status::ABORT;
+        }
+        if (std::find(enum_range<StreamDescriptor::State>().begin(),
+                      enum_range<StreamDescriptor::State>().end(),
+                      reply.state) == enum_range<StreamDescriptor::State>().end()) {
+            LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state);
+            return Status::ABORT;
+        }
+        if (getDriver()->processValidReply(reply)) {
+            return Status::CONTINUE;
+        }
+        LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
+        return Status::ABORT;
     }
 };
 using StreamWriter = StreamWorker<StreamWriterLogic>;
@@ -466,52 +546,6 @@
     using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
 };
 
-// A dedicated version to test replies to invalid commands.
-class StreamInvalidCommandLogic : public StreamCommonLogic {
-  public:
-    StreamInvalidCommandLogic(const StreamContext& context,
-                              const std::vector<StreamDescriptor::Command>& commands)
-        : StreamCommonLogic(context), mCommands(commands) {}
-
-    std::vector<std::string> getUnexpectedStatuses() {
-        std::lock_guard<std::mutex> lock(mLock);
-        return mUnexpectedStatuses;
-    }
-
-  protected:
-    Status cycle() override {
-        // Send all commands in one cycle to simplify testing.
-        // Extra logging helps to sort out issues with unexpected HAL behavior.
-        for (const auto& command : mCommands) {
-            LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ...";
-            if (!getCommandMQ()->writeBlocking(&command, 1)) {
-                LOG(ERROR) << __func__ << ": writing of command into MQ failed";
-                return Status::ABORT;
-            }
-            StreamDescriptor::Reply reply{};
-            LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "...";
-            if (!getReplyMQ()->readBlocking(&reply, 1)) {
-                LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
-                return Status::ABORT;
-            }
-            LOG(INFO) << __func__ << ": received status " << statusToString(reply.status)
-                      << " for command " << command.toString();
-            if (reply.status != STATUS_BAD_VALUE) {
-                std::string s = command.toString();
-                s.append(", ").append(statusToString(reply.status));
-                std::lock_guard<std::mutex> lock(mLock);
-                mUnexpectedStatuses.push_back(std::move(s));
-            }
-        };
-        return Status::EXIT;
-    }
-
-  private:
-    const std::vector<StreamDescriptor::Command> mCommands;
-    std::mutex mLock;
-    std::vector<std::string> mUnexpectedStatuses GUARDED_BY(mLock);
-};
-
 template <typename Stream>
 class WithStream {
   public:
@@ -1208,6 +1242,46 @@
     }
 }
 
+class StreamLogicDriverInvalidCommand : public StreamLogicDriver {
+  public:
+    StreamLogicDriverInvalidCommand(const std::vector<StreamDescriptor::Command>& commands)
+        : mCommands(commands) {}
+
+    std::string getUnexpectedStatuses() {
+        // This method is intended to be called after the worker thread has joined,
+        // thus no extra synchronization is needed.
+        std::string s;
+        if (!mStatuses.empty()) {
+            s = std::string("Pairs of (command, actual status): ")
+                        .append((android::internal::ToString(mStatuses)));
+        }
+        return s;
+    }
+
+    bool done() override { return mNextCommand >= mCommands.size(); }
+    StreamDescriptor::Command getNextCommand(int, int* actualSize) override {
+        if (actualSize != nullptr) *actualSize = 0;
+        return mCommands[mNextCommand++];
+    }
+    bool interceptRawReply(const StreamDescriptor::Reply& reply) override {
+        if (reply.status != STATUS_BAD_VALUE) {
+            std::string s = mCommands[mNextCommand - 1].toString();
+            s.append(", ").append(statusToString(reply.status));
+            mStatuses.push_back(std::move(s));
+            // If the HAL does not recognize the command as invalid,
+            // retrieve the data etc.
+            return reply.status != STATUS_OK;
+        }
+        return true;
+    }
+    bool processValidReply(const StreamDescriptor::Reply&) override { return true; }
+
+  private:
+    const std::vector<StreamDescriptor::Command> mCommands;
+    size_t mNextCommand = 0;
+    std::vector<std::string> mStatuses;
+};
+
 template <typename Stream>
 class AudioStream : public AudioCoreModule {
   public:
@@ -1315,19 +1389,6 @@
         EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value()));
     }
 
-    void ReadOrWrite(bool useSetupSequence2, bool validateObservablePosition) {
-        const auto allPortConfigs =
-                moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
-        if (allPortConfigs.empty()) {
-            GTEST_SKIP() << "No mix ports have attached devices";
-        }
-        for (const auto& portConfig : allPortConfigs) {
-            EXPECT_NO_FATAL_FAILURE(
-                    ReadOrWriteImpl(portConfig, useSetupSequence2, validateObservablePosition))
-                    << portConfig.toString();
-        }
-    }
-
     void ResetPortConfigWithOpenStream() {
         const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
@@ -1357,131 +1418,43 @@
                 << stream1.getPortId();
     }
 
-    template <class Worker>
-    void WaitForObservablePositionAdvance(Worker& worker) {
-        static constexpr int kWriteDurationUs = 50 * 1000;
-        static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000};
-        int64_t framesInitial;
-        framesInitial = worker.getLastObservablePosition().frames;
-        ASSERT_FALSE(worker.hasError());
-        bool timedOut = false;
-        int64_t frames = framesInitial;
-        for (android::base::Timer elapsed;
-             frames <= framesInitial && !worker.hasError() &&
-             !(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) {
-            usleep(kWriteDurationUs);
-            frames = worker.getLastObservablePosition().frames;
-        }
-        EXPECT_FALSE(timedOut);
-        EXPECT_FALSE(worker.hasError()) << worker.getError();
-        EXPECT_GT(frames, framesInitial);
-    }
-
-    void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useSetupSequence2,
-                         bool validateObservablePosition) {
-        if (!useSetupSequence2) {
-            ASSERT_NO_FATAL_FAILURE(
-                    ReadOrWriteSetupSequence1(portConfig, validateObservablePosition));
-        } else {
-            ASSERT_NO_FATAL_FAILURE(
-                    ReadOrWriteSetupSequence2(portConfig, validateObservablePosition));
-        }
-    }
-
-    // Set up a patch first, then open a stream.
-    void ReadOrWriteSetupSequence1(const AudioPortConfig& portConfig,
-                                   bool validateObservablePosition) {
-        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
-                IOTraits<Stream>::is_input, portConfig);
-        ASSERT_FALSE(devicePorts.empty());
-        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
-        WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
-        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
-
-        WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
-        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
-        typename IOTraits<Stream>::Worker worker(*stream.getContext());
-
-        ASSERT_TRUE(worker.start());
-        ASSERT_TRUE(worker.waitForAtLeastOneCycle());
-        if (validateObservablePosition) {
-            ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
-        }
-    }
-
-    // Open a stream, then set up a patch for it.
-    void ReadOrWriteSetupSequence2(const AudioPortConfig& portConfig,
-                                   bool validateObservablePosition) {
-        WithStream<Stream> stream(portConfig);
-        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
-        typename IOTraits<Stream>::Worker worker(*stream.getContext());
-
-        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
-                IOTraits<Stream>::is_input, portConfig);
-        ASSERT_FALSE(devicePorts.empty());
-        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
-        WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
-        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
-
-        ASSERT_TRUE(worker.start());
-        ASSERT_TRUE(worker.waitForAtLeastOneCycle());
-        if (validateObservablePosition) {
-            ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
-        }
-    }
-
     void SendInvalidCommandImpl(const AudioPortConfig& portConfig) {
         std::vector<StreamDescriptor::Command> commands(6);
-        commands[0].code = -1;
-        commands[1].code = StreamDescriptor::COMMAND_BURST - 1;
-        commands[2].code = std::numeric_limits<int32_t>::min();
-        commands[3].code = std::numeric_limits<int32_t>::max();
-        commands[4].code = StreamDescriptor::COMMAND_BURST;
+        commands[0].code = StreamDescriptor::CommandCode(-1);
+        commands[1].code = StreamDescriptor::CommandCode(
+                static_cast<int32_t>(StreamDescriptor::CommandCode::START) - 1);
+        commands[2].code = StreamDescriptor::CommandCode(std::numeric_limits<int32_t>::min());
+        commands[3].code = StreamDescriptor::CommandCode(std::numeric_limits<int32_t>::max());
+        // TODO: For proper testing of input streams, need to put the stream into
+        // a state which accepts BURST commands.
+        commands[4].code = StreamDescriptor::CommandCode::BURST;
         commands[4].fmqByteCount = -1;
-        commands[5].code = StreamDescriptor::COMMAND_BURST;
+        commands[5].code = StreamDescriptor::CommandCode::BURST;
         commands[5].fmqByteCount = std::numeric_limits<int32_t>::min();
         WithStream<Stream> stream(portConfig);
         ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
-        StreamWorker<StreamInvalidCommandLogic> writer(*stream.getContext(), commands);
-        ASSERT_TRUE(writer.start());
-        writer.waitForAtLeastOneCycle();
-        auto unexpectedStatuses = writer.getUnexpectedStatuses();
-        EXPECT_EQ(0UL, unexpectedStatuses.size())
-                << "Pairs of (command, actual status): "
-                << android::internal::ToString(unexpectedStatuses);
+        StreamLogicDriverInvalidCommand driver(commands);
+        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+        ASSERT_TRUE(worker.start());
+        worker.join();
+        EXPECT_EQ("", driver.getUnexpectedStatuses());
     }
 };
 using AudioStreamIn = AudioStream<IStreamIn>;
 using AudioStreamOut = AudioStream<IStreamOut>;
 
-#define TEST_IO_STREAM(method_name)                                                \
+#define TEST_IN_AND_OUT_STREAM(method_name)                                        \
     TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
     TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
-#define TEST_IO_STREAM_2(method_name, arg1, arg2)           \
-    TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) {  \
-        ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2));   \
-    }                                                       \
-    TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \
-        ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2));   \
-    }
 
-TEST_IO_STREAM(CloseTwice);
-TEST_IO_STREAM(OpenAllConfigs);
-TEST_IO_STREAM(OpenInvalidBufferSize);
-TEST_IO_STREAM(OpenInvalidDirection);
-TEST_IO_STREAM(OpenOverMaxCount);
-TEST_IO_STREAM(OpenTwiceSamePortConfig);
-// Use of constants makes comprehensible test names.
-constexpr bool SetupSequence1 = false;
-constexpr bool SetupSequence2 = true;
-constexpr bool SetupOnly = false;
-constexpr bool ValidateObservablePosition = true;
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, SetupOnly);
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, SetupOnly);
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, ValidateObservablePosition);
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, ValidateObservablePosition);
-TEST_IO_STREAM(ResetPortConfigWithOpenStream);
-TEST_IO_STREAM(SendInvalidCommand);
+TEST_IN_AND_OUT_STREAM(CloseTwice);
+TEST_IN_AND_OUT_STREAM(OpenAllConfigs);
+TEST_IN_AND_OUT_STREAM(OpenInvalidBufferSize);
+TEST_IN_AND_OUT_STREAM(OpenInvalidDirection);
+TEST_IN_AND_OUT_STREAM(OpenOverMaxCount);
+TEST_IN_AND_OUT_STREAM(OpenTwiceSamePortConfig);
+TEST_IN_AND_OUT_STREAM(ResetPortConfigWithOpenStream);
+TEST_IN_AND_OUT_STREAM(SendInvalidCommand);
 
 TEST_P(AudioStreamOut, OpenTwicePrimary) {
     const auto mixPorts = moduleConfig->getMixPorts(false);
@@ -1523,6 +1496,163 @@
             << "when no offload info is provided for a compressed offload mix port";
 }
 
+using CommandAndState = std::pair<StreamDescriptor::CommandCode, StreamDescriptor::State>;
+
+class StreamLogicDefaultDriver : public StreamLogicDriver {
+  public:
+    explicit StreamLogicDefaultDriver(const std::vector<CommandAndState>& commands)
+        : mCommands(commands) {}
+
+    // The three methods below is intended to be called after the worker
+    // thread has joined, thus no extra synchronization is needed.
+    bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; }
+    bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; }
+    std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; }
+
+    bool done() override { return mNextCommand >= mCommands.size(); }
+    StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override {
+        StreamDescriptor::Command command{};
+        command.code = mCommands[mNextCommand++].first;
+        const int dataSize = command.code == StreamDescriptor::CommandCode::BURST ? maxDataSize : 0;
+        command.fmqByteCount = dataSize;
+        if (actualSize != nullptr) {
+            // In the output scenario, reduce slightly the fmqByteCount to verify
+            // that the HAL module always consumes all data from the MQ.
+            if (command.fmqByteCount > 1) command.fmqByteCount--;
+            *actualSize = dataSize;
+        }
+        return command;
+    }
+    bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; }
+    bool processValidReply(const StreamDescriptor::Reply& reply) override {
+        if (mPreviousFrames.has_value()) {
+            if (reply.observable.frames > mPreviousFrames.value()) {
+                mObservablePositionIncrease = true;
+            } else if (reply.observable.frames < mPreviousFrames.value()) {
+                mRetrogradeObservablePosition = true;
+            }
+        }
+        mPreviousFrames = reply.observable.frames;
+
+        const auto& lastCommandState = mCommands[mNextCommand - 1];
+        if (lastCommandState.second != reply.state) {
+            std::string s = std::string("Unexpected transition from the state ")
+                                    .append(mPreviousState)
+                                    .append(" to ")
+                                    .append(toString(reply.state))
+                                    .append(" caused by the command ")
+                                    .append(toString(lastCommandState.first));
+            LOG(ERROR) << __func__ << ": " << s;
+            mUnexpectedTransition = std::move(s);
+            return false;
+        }
+        return true;
+    }
+
+  protected:
+    const std::vector<CommandAndState>& mCommands;
+    size_t mNextCommand = 0;
+    std::optional<int64_t> mPreviousFrames;
+    std::string mPreviousState = "<initial state>";
+    bool mObservablePositionIncrease = false;
+    bool mRetrogradeObservablePosition = false;
+    std::string mUnexpectedTransition;
+};
+
+using NamedCommandSequence = std::pair<std::string, std::vector<CommandAndState>>;
+enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ };
+using StreamIoTestParameters =
+        std::tuple<std::string /*moduleName*/, NamedCommandSequence, bool /*useSetupSequence2*/>;
+template <typename Stream>
+class AudioStreamIo : public AudioCoreModuleBase,
+                      public testing::TestWithParam<StreamIoTestParameters> {
+  public:
+    void SetUp() override {
+        ASSERT_NO_FATAL_FAILURE(SetUpImpl(std::get<PARAM_MODULE_NAME>(GetParam())));
+        ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig());
+    }
+
+    void Run() {
+        const auto allPortConfigs =
+                moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
+        if (allPortConfigs.empty()) {
+            GTEST_SKIP() << "No mix ports have attached devices";
+        }
+        for (const auto& portConfig : allPortConfigs) {
+            SCOPED_TRACE(portConfig.toString());
+            const auto& commandsAndStates = std::get<PARAM_CMD_SEQ>(GetParam()).second;
+            if (!std::get<PARAM_SETUP_SEQ>(GetParam())) {
+                ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates));
+            } else {
+                ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq2(portConfig, commandsAndStates));
+            }
+        }
+    }
+
+    bool ValidateObservablePosition(const AudioPortConfig& /*portConfig*/) {
+        // May return false based on the portConfig, e.g. for telephony ports.
+        return true;
+    }
+
+    // Set up a patch first, then open a stream.
+    void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig,
+                                     const std::vector<CommandAndState>& commandsAndStates) {
+        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+                IOTraits<Stream>::is_input, portConfig);
+        ASSERT_FALSE(devicePorts.empty());
+        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+        WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+        WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        StreamLogicDefaultDriver driver(commandsAndStates);
+        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+
+        ASSERT_TRUE(worker.start());
+        worker.join();
+        EXPECT_FALSE(worker.hasError()) << worker.getError();
+        EXPECT_EQ("", driver.getUnexpectedStateTransition());
+        if (ValidateObservablePosition(portConfig)) {
+            EXPECT_TRUE(driver.hasObservablePositionIncrease());
+            EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
+        }
+    }
+
+    // Open a stream, then set up a patch for it.
+    void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig,
+                                     const std::vector<CommandAndState>& commandsAndStates) {
+        WithStream<Stream> stream(portConfig);
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        StreamLogicDefaultDriver driver(commandsAndStates);
+        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+
+        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+                IOTraits<Stream>::is_input, portConfig);
+        ASSERT_FALSE(devicePorts.empty());
+        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+        WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+        ASSERT_TRUE(worker.start());
+        worker.join();
+        EXPECT_FALSE(worker.hasError()) << worker.getError();
+        EXPECT_EQ("", driver.getUnexpectedStateTransition());
+        if (ValidateObservablePosition(portConfig)) {
+            EXPECT_TRUE(driver.hasObservablePositionIncrease());
+            EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
+        }
+    }
+};
+using AudioStreamIoIn = AudioStreamIo<IStreamIn>;
+using AudioStreamIoOut = AudioStreamIo<IStreamOut>;
+
+#define TEST_IN_AND_OUT_STREAM_IO(method_name)                                       \
+    TEST_P(AudioStreamIoIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
+    TEST_P(AudioStreamIoOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
+
+TEST_IN_AND_OUT_STREAM_IO(Run);
+
 // Tests specific to audio patches. The fixure class is named 'AudioModulePatch'
 // to avoid clashing with 'AudioPatch' class.
 class AudioModulePatch : public AudioCoreModule {
@@ -1704,6 +1834,139 @@
                          testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
                          android::PrintInstanceNameToString);
 GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut);
+
+static const NamedCommandSequence kReadOrWriteSeq = std::make_pair(
+        std::string("ReadOrWrite"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE)});
+static const NamedCommandSequence kDrainInSeq = std::make_pair(
+        std::string("Drain"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+                               StreamDescriptor::State::DRAINING),
+                std::make_pair(StreamDescriptor::CommandCode::START,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+                               StreamDescriptor::State::DRAINING),
+                // TODO: This will need to be changed once DRAIN starts taking time.
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::STANDBY)});
+static const NamedCommandSequence kDrainOutSeq = std::make_pair(
+        std::string("Drain"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                // TODO: This will need to be changed once DRAIN starts taking time.
+                std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+                               StreamDescriptor::State::IDLE)});
+// TODO: This will need to be changed once DRAIN starts taking time so we can pause it.
+static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair(
+        std::string("DrainPause"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+                               StreamDescriptor::State::IDLE)});
+static const NamedCommandSequence kStandbySeq = std::make_pair(
+        std::string("Standby"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::STANDBY,
+                               StreamDescriptor::State::STANDBY),
+                // Perform a read or write in order to advance observable position
+                // (this is verified by tests).
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE)});
+static const NamedCommandSequence kPauseInSeq = std::make_pair(
+        std::string("Pause"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::FLUSH,
+                               StreamDescriptor::State::STANDBY)});
+static const NamedCommandSequence kPauseOutSeq = std::make_pair(
+        std::string("Pause"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::START,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::START,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED)});
+static const NamedCommandSequence kFlushInSeq = std::make_pair(
+        std::string("Flush"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::FLUSH,
+                               StreamDescriptor::State::STANDBY)});
+static const NamedCommandSequence kFlushOutSeq = std::make_pair(
+        std::string("Flush"),
+        std::vector<CommandAndState>{
+                std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+                std::make_pair(StreamDescriptor::CommandCode::BURST,
+                               StreamDescriptor::State::ACTIVE),
+                std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+                               StreamDescriptor::State::PAUSED),
+                std::make_pair(StreamDescriptor::CommandCode::FLUSH,
+                               StreamDescriptor::State::IDLE)});
+std::string GetStreamIoTestName(const testing::TestParamInfo<StreamIoTestParameters>& info) {
+    return android::PrintInstanceNameToString(
+                   testing::TestParamInfo<std::string>{std::get<PARAM_MODULE_NAME>(info.param),
+                                                       info.index})
+            .append("_")
+            .append(std::get<PARAM_CMD_SEQ>(info.param).first)
+            .append("_SetupSeq")
+            .append(std::get<PARAM_SETUP_SEQ>(info.param) ? "2" : "1");
+}
+INSTANTIATE_TEST_SUITE_P(
+        AudioStreamIoInTest, AudioStreamIoIn,
+        testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
+                         testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq,
+                                         kFlushInSeq),
+                         testing::Values(false, true)),
+        GetStreamIoTestName);
+GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoIn);
+INSTANTIATE_TEST_SUITE_P(
+        AudioStreamIoOutTest, AudioStreamIoOut,
+        testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
+                         testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq,
+                                         kStandbySeq, kPauseOutSeq, kFlushOutSeq),
+                         testing::Values(false, true)),
+        GetStreamIoTestName);
+GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut);
+
 INSTANTIATE_TEST_SUITE_P(AudioPatchTest, AudioModulePatch,
                          testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
                          android::PrintInstanceNameToString);