Merge changes I0a18a6d9,I13a83113,I13c9c8d1,I8717acac

* changes:
  audio: Add non-blocking I/O stream operations
  audio: Fix handling of quick worker completion in StreamWorker
  audio: Report unknown stream positions explicitly
  audio: Implement transient state testing
diff --git a/audio/aidl/Android.bp b/audio/aidl/Android.bp
index 92d7d54..563ee62 100644
--- a/audio/aidl/Android.bp
+++ b/audio/aidl/Android.bp
@@ -113,6 +113,7 @@
         "android/hardware/audio/core/AudioRoute.aidl",
         "android/hardware/audio/core/IConfig.aidl",
         "android/hardware/audio/core/IModule.aidl",
+        "android/hardware/audio/core/IStreamCallback.aidl",
         "android/hardware/audio/core/IStreamIn.aidl",
         "android/hardware/audio/core/IStreamOut.aidl",
         "android/hardware/audio/core/ITelephony.aidl",
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl
index be382c5..7f960e0 100644
--- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl
@@ -76,6 +76,7 @@
     android.hardware.audio.common.SourceMetadata sourceMetadata;
     @nullable android.media.audio.common.AudioOffloadInfo offloadInfo;
     long bufferSizeFrames;
+    @nullable android.hardware.audio.core.IStreamCallback callback;
   }
   @VintfStability
   parcelable OpenOutputStreamReturn {
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IStreamCallback.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IStreamCallback.aidl
new file mode 100644
index 0000000..5a2ab78
--- /dev/null
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IStreamCallback.aidl
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+///////////////////////////////////////////////////////////////////////////////
+// THIS FILE IS IMMUTABLE. DO NOT EDIT IN ANY CASE.                          //
+///////////////////////////////////////////////////////////////////////////////
+
+// This file is a snapshot of an AIDL file. Do not edit it manually. There are
+// two cases:
+// 1). this is a frozen version file - do not edit this in any case.
+// 2). this is a 'current' file. If you make a backwards compatible change to
+//     the interface (from the latest frozen version), the build system will
+//     prompt you to update this file with `m <name>-update-api`.
+//
+// You must not make a backward incompatible change to any AIDL file built
+// with the aidl_interface module type with versions property set. The module
+// type is used to build AIDL files in a way that they can be used across
+// independently updatable components of the system. If a device is shipped
+// with such a backward incompatible change, it has a high risk of breaking
+// later when a module using the interface is updated, e.g., Mainline modules.
+
+package android.hardware.audio.core;
+@VintfStability
+interface IStreamCallback {
+  oneway void onTransferReady();
+  oneway void onError();
+  oneway void onDrainReady();
+}
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl
index 80ee185..467d37b 100644
--- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl
@@ -35,4 +35,5 @@
 @JavaDerive(equals=true, toString=true) @VintfStability
 parcelable ModuleDebug {
   boolean simulateDeviceConnections;
+  int streamTransientStateDelayMs;
 }
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
index 3a77ad1..3a4271b 100644
--- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
@@ -42,8 +42,9 @@
   const int LATENCY_UNKNOWN = -1;
   @FixedSize @VintfStability
   parcelable Position {
-    long frames;
-    long timeNs;
+    long frames = -1;
+    long timeNs = -1;
+    const long UNKNOWN = -1;
   }
   @Backing(type="int") @VintfStability
   enum State {
@@ -53,14 +54,23 @@
     PAUSED = 4,
     DRAINING = 5,
     DRAIN_PAUSED = 6,
+    TRANSFERRING = 7,
+    TRANSFER_PAUSED = 8,
     ERROR = 100,
   }
+  @Backing(type="byte") @VintfStability
+  enum DrainMode {
+    DRAIN_UNSPECIFIED = 0,
+    DRAIN_ALL = 1,
+    DRAIN_EARLY_NOTIFY = 2,
+  }
   @FixedSize @VintfStability
   union Command {
-    int hal_reserved_exit;
+    int halReservedExit;
+    android.media.audio.common.Void getStatus;
     android.media.audio.common.Void start;
     int burst;
-    android.media.audio.common.Void drain;
+    android.hardware.audio.core.StreamDescriptor.DrainMode drain;
     android.media.audio.common.Void standby;
     android.media.audio.common.Void pause;
     android.media.audio.common.Void flush;
diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl
index be40051..974e7e8 100644
--- a/audio/aidl/android/hardware/audio/core/IModule.aidl
+++ b/audio/aidl/android/hardware/audio/core/IModule.aidl
@@ -21,6 +21,7 @@
 import android.hardware.audio.core.AudioMode;
 import android.hardware.audio.core.AudioPatch;
 import android.hardware.audio.core.AudioRoute;
+import android.hardware.audio.core.IStreamCallback;
 import android.hardware.audio.core.IStreamIn;
 import android.hardware.audio.core.IStreamOut;
 import android.hardware.audio.core.ITelephony;
@@ -53,9 +54,13 @@
      * the HAL module behavior that would otherwise require human intervention.
      *
      * The HAL module must throw an error if there is an attempt to change
-     * the debug behavior for the aspect which is currently in use.
+     * the debug behavior for the aspect which is currently in use, or when
+     * the value of any of the debug flags is invalid. See 'ModuleDebug' for
+     * the full list of constraints.
      *
      * @param debug The debug options.
+     * @throws EX_ILLEGAL_ARGUMENT If some of the configuration parameters are
+     *                             invalid.
      * @throws EX_ILLEGAL_STATE If the flag(s) being changed affect functionality
      *                          which is currently in use.
      */
@@ -316,9 +321,13 @@
      * 'setAudioPortConfig' method. Existence of an audio patch involving this
      * port configuration is not required for successful opening of a stream.
      *
-     * If the port configuration has 'COMPRESS_OFFLOAD' output flag set,
-     * the framework must provide additional information about the encoded
-     * audio stream in 'offloadInfo' argument.
+     * If the port configuration has the 'COMPRESS_OFFLOAD' output flag set,
+     * the client must provide additional information about the encoded
+     * audio stream in the 'offloadInfo' argument.
+     *
+     * If the port configuration has the 'NON_BLOCKING' output flag set,
+     * the client must provide a callback for asynchronous notifications
+     * in the 'callback' argument.
      *
      * The requested buffer size is expressed in frames, thus the actual size
      * in bytes depends on the audio port configuration. Also, the HAL module
@@ -354,6 +363,8 @@
      *                             - If the offload info is not provided for an offload
      *                               port configuration.
      *                             - If a buffer of the requested size can not be provided.
+     *                             - If the callback is not provided for a non-blocking
+     *                               port configuration.
      * @throws EX_ILLEGAL_STATE In the following cases:
      *                          - If the port config already has a stream opened on it.
      *                          - If the limit on the open stream count for the port has
@@ -372,6 +383,8 @@
         @nullable AudioOffloadInfo offloadInfo;
         /** Requested audio I/O buffer minimum size, in frames. */
         long bufferSizeFrames;
+        /** Client callback interface for the non-blocking output mode. */
+        @nullable IStreamCallback callback;
     }
     @VintfStability
     parcelable OpenOutputStreamReturn {
diff --git a/audio/aidl/android/hardware/audio/core/IStreamCallback.aidl b/audio/aidl/android/hardware/audio/core/IStreamCallback.aidl
new file mode 100644
index 0000000..440ab25
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/IStreamCallback.aidl
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package android.hardware.audio.core;
+
+/**
+ * This interface is used to indicate completion of asynchronous operations.
+ * See the state machines referenced by StreamDescriptor for details.
+ */
+@VintfStability
+oneway interface IStreamCallback {
+    /**
+     * Indicate that the stream is ready for next data exchange.
+     */
+    void onTransferReady();
+    /**
+     * Indicate that an irrecoverable error has occurred during the last I/O
+     * operation. After sending this callback, the stream enters the 'ERROR'
+     * state.
+     */
+    void onError();
+    /**
+     * Indicate that the stream has finished draining. This is only used
+     * for output streams because for input streams draining is performed
+     * by the client.
+     */
+    void onDrainReady();
+}
diff --git a/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl b/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl
index 858a9bd..871a5c9 100644
--- a/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl
+++ b/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl
@@ -35,4 +35,19 @@
      *    profiles.
      */
     boolean simulateDeviceConnections;
+    /**
+     * Must be non-negative. When set to non-zero, HAL module must delay
+     * transition from "transient" stream states (see StreamDescriptor.aidl)
+     * by the specified amount of milliseconds. The purpose of this delay
+     * is to allow VTS to test sending of stream commands while the stream is
+     * in a transient state. The delay must apply to newly created streams,
+     * it is not required to apply the delay to already opened streams.
+     *
+     * Note: the drawback of enabling this delay for asynchronous (non-blocking)
+     *       modes is that sending of callbacks will also be delayed, because
+     *       callbacks are sent once the stream state machine exits a transient
+     *       state. Thus, it's not recommended to use it with tests that require
+     *       waiting for an async callback.
+     */
+    int streamTransientStateDelayMs;
 }
diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
index 2b1fc99..65ea9ef 100644
--- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
@@ -84,13 +84,13 @@
  *     are different.
  *
  * State machines of both input and output streams start from the 'STANDBY'
- * state.  Transitions between states happen naturally with changes in the
+ * state. Transitions between states happen naturally with changes in the
  * states of the model elements. For simplicity, we restrict the change to one
  * element only, for example, in the 'STANDBY' state, either the producer or the
  * consumer can become active, but not both at the same time. States 'STANDBY',
  * 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event,
  * whereas a change from the 'DRAINING' state can happen with time as the buffer
- * gets empty.
+ * gets empty, thus it's a "transient" state.
  *
  * The state machine for input streams is defined in the `stream-in-sm.gv` file,
  * for output streams—in the `stream-out-sm.gv` file. State machines define how
@@ -100,6 +100,28 @@
  * client can never observe a stream with a functioning command queue in this
  * state. The 'ERROR' state is a special state which the state machine enters
  * when an unrecoverable hardware error is detected by the HAL module.
+ *
+ * Non-blocking (asynchronous) modes introduce a new 'TRANSFERRING' state, which
+ * the state machine can enter after replying to the 'burst' command, instead of
+ * staying in the 'ACTIVE' state. In this case the client gets unblocked
+ * earlier, while the actual audio delivery to / from the observer is not
+ * complete yet. Once the HAL module is ready for the next transfer, it notifies
+ * the client via a oneway callback, and the machine switches to 'ACTIVE'
+ * state. The 'TRANSFERRING' state is thus "transient", similar to the
+ * 'DRAINING' state. For output streams, asynchronous transfer can be paused,
+ * and it's another new state: 'TRANSFER_PAUSED'. It differs from 'PAUSED' by
+ * the fact that no new writes are allowed. Please see 'stream-in-async-sm.gv'
+ * and 'stream-out-async-sm.gv' files for details. Below is the table summary
+ * for asynchronous only-states:
+ *
+ *  Producer | Buffer state | Consumer | Applies | State
+ *  active?  |              | active?  | to      |
+ * ==========|==============|==========|=========|==============================
+ *  Yes      | Not empty    | Yes      | Both    | TRANSFERRING, s/w x-runs counted
+ * ----------|--------------|----------|---------|-----------------------------
+ *  Yes      | Not empty    | No       | Output  | TRANSFER_PAUSED,
+ *           |              |          |         | h/w emits silence.
+ *
  */
 @JavaDerive(equals=true, toString=true)
 @VintfStability
@@ -116,10 +138,15 @@
     @VintfStability
     @FixedSize
     parcelable Position {
+        /**
+         * The value used when the position can not be reported by the HAL
+         * module.
+         */
+        const long UNKNOWN = -1;
         /** Frame count. */
-        long frames;
+        long frames = UNKNOWN;
         /** Timestamp in nanoseconds. */
-        long timeNs;
+        long timeNs = UNKNOWN;
     }
 
     @VintfStability
@@ -166,11 +193,24 @@
         /**
          * Used for output streams only, pauses draining. This state is similar
          * to the 'PAUSED' state, except that the client is not adding any
-         * new data. If it emits a 'BURST' command, this brings the stream
+         * new data. If it emits a 'burst' command, this brings the stream
          * into the regular 'PAUSED' state.
          */
         DRAIN_PAUSED = 6,
         /**
+         * Used only for streams in asynchronous mode. The stream enters this
+         * state after receiving a 'burst' command and returning control back
+         * to the client, thus unblocking it.
+         */
+        TRANSFERRING = 7,
+        /**
+         * Used only for output streams in asynchronous mode only. The stream
+         * enters this state after receiving a 'pause' command while being in
+         * the 'TRANSFERRING' state. Unlike 'PAUSED' state, this state does not
+         * accept new writes.
+         */
+        TRANSFER_PAUSED = 8,
+        /**
          * The ERROR state is entered when the stream has encountered an
          * irrecoverable error from the lower layer. After entering it, the
          * stream can only be closed.
@@ -178,6 +218,29 @@
         ERROR = 100,
     }
 
+    @VintfStability
+    @Backing(type="byte")
+    enum DrainMode {
+        /**
+         * Unspecified—used with input streams only, because the client controls
+         * draining.
+         */
+        DRAIN_UNSPECIFIED = 0,
+        /**
+         * Used with output streams only, the HAL module indicates drain
+         * completion when all remaining audio data has been consumed.
+         */
+        DRAIN_ALL = 1,
+        /**
+         * Used with output streams only, the HAL module indicates drain
+         * completion shortly before all audio data has been consumed in order
+         * to give the client an opportunity to provide data for the next track
+         * for gapless playback. The exact amount of provided time is specific
+         * to the HAL implementation.
+         */
+        DRAIN_EARLY_NOTIFY = 2,
+    }
+
     /**
      * Used for sending commands to the HAL module. The client writes into
      * the queue, the HAL module reads. The queue can only contain a single
@@ -198,7 +261,14 @@
          * implementation must pass a random cookie as the command argument,
          * which is only known to the implementation.
          */
-        int hal_reserved_exit;
+        int halReservedExit;
+        /**
+         * Retrieve the current state of the stream. This command must be
+         * processed by the stream in any state. The stream must provide current
+         * positions, counters, and its state in the reply. This command must be
+         * handled by the HAL module without any observable side effects.
+         */
+        Void getStatus;
         /**
          * See the state machines on the applicability of this command to
          * different states.
@@ -215,15 +285,14 @@
          *    read from the hardware into the 'audio.fmq' queue.
          *
          * In both cases it is allowed for this field to contain any
-         * non-negative number. The value 0 can be used if the client only needs
-         * to retrieve current positions and latency. Any sufficiently big value
-         * which exceeds the size of the queue's area which is currently
-         * available for reading or writing by the HAL module must be trimmed by
-         * the HAL module to the available size. Note that the HAL module is
-         * allowed to consume or provide less data than requested, and it must
-         * return the amount of actually read or written data via the
-         * 'Reply.fmqByteCount' field. Thus, only attempts to pass a negative
-         * number must be constituted as a client's error.
+         * non-negative number. Any sufficiently big value which exceeds the
+         * size of the queue's area which is currently available for reading or
+         * writing by the HAL module must be trimmed by the HAL module to the
+         * available size. Note that the HAL module is allowed to consume or
+         * provide less data than requested, and it must return the amount of
+         * actually read or written data via the 'Reply.fmqByteCount'
+         * field. Thus, only attempts to pass a negative number must be
+         * constituted as a client's error.
          *
          * Differences for the MMap No IRQ mode:
          *
@@ -233,13 +302,16 @@
          *    with sending of this command.
          *
          *  - the value must always be set to 0.
+         *
+         * See the state machines on the applicability of this command to
+         * different states.
          */
         int burst;
         /**
          * See the state machines on the applicability of this command to
          * different states.
          */
-        Void drain;
+        DrainMode drain;
         /**
          * See the state machines on the applicability of this command to
          * different states.
@@ -286,10 +358,6 @@
          *  - STATUS_INVALID_OPERATION: the command is not applicable in the
          *                              current state of the stream, or to this
          *                              type of the stream;
-         *  - STATUS_NO_INIT: positions can not be reported because the mix port
-         *                    is not connected to any producer or consumer, or
-         *                    because the HAL module does not support positions
-         *                    reporting for this AudioSource (on input streams).
          *  - STATUS_NOT_ENOUGH_DATA: a read or write error has
          *                            occurred for the 'audio.fmq' queue;
          */
@@ -307,9 +375,11 @@
          */
         int fmqByteCount;
         /**
-         * It is recommended to report the current position for any command.
-         * If the position can not be reported, the 'status' field must be
-         * set to 'NO_INIT'.
+         * It is recommended to report the current position for any command. If
+         * the position can not be reported, for example because the mix port is
+         * not connected to any producer or consumer, or because the HAL module
+         * does not support positions reporting for this AudioSource (on input
+         * streams), the 'Position::UNKNOWN' value must be used.
          *
          * For output streams: the moment when the specified stream position
          *   was presented to an external observer (i.e. presentation position).
@@ -401,6 +471,10 @@
          *     into 'reply' queue, and hangs on waiting on a read from
          *     the 'command' queue.
          *  6. The client wakes up due to 5. and reads the reply.
+         *     Note: in non-blocking mode, when the HAL module goes to
+         *           the 'TRANSFERRING' state (as indicated by the 'reply.state'
+         *           field), the client must wait for the 'IStreamCallback.onTransferReady'
+         *           notification to arrive before starting the next burst.
          *
          * For input streams the following sequence of operations is used:
          *  1. The client writes the BURST command into the 'command' queue,
@@ -415,6 +489,10 @@
          *  5. The client wakes up due to 4.
          *  6. The client reads the reply and audio data. The client must
          *     always read from the FMQ all the data it contains.
+         *     Note: in non-blocking mode, when the HAL module goes to
+         *           the 'TRANSFERRING' state (as indicated by the 'reply.state'
+         *           field) the client must wait for the 'IStreamCallback.onTransferReady'
+         *           notification to arrive before starting the next burst.
          *
          */
         MQDescriptor<byte, SynchronizedReadWrite> fmq;
diff --git a/audio/aidl/android/hardware/audio/core/stream-in-async-sm.gv b/audio/aidl/android/hardware/audio/core/stream-in-async-sm.gv
new file mode 100644
index 0000000..818b18e
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/stream-in-async-sm.gv
@@ -0,0 +1,47 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// To render: dot -Tpng stream-in-async-sm.gv -o stream-in-async-sm.png
+digraph stream_in_async_state_machine {
+    node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
+    node [shape=point width=0.5] F;
+    node [shape=oval width=1];
+    node [fillcolor=lightgreen] STANDBY;  // buffer is empty
+    node [fillcolor=tomato] CLOSED;
+    node [fillcolor=tomato] ERROR;
+    node [style=dashed] ANY_STATE;
+    node [fillcolor=lightblue style=filled];
+    // Note that when the producer (h/w) is passive, "burst" operations
+    // complete synchronously, bypassing the TRANSFERRING state.
+    I -> STANDBY;
+    STANDBY -> IDLE [label="start"];           // producer -> active
+    IDLE -> STANDBY [label="standby"];         // producer -> passive, buffer is cleared
+    IDLE -> TRANSFERRING [label="burst"];      // consumer -> active
+    ACTIVE -> PAUSED [label="pause"];          // consumer -> passive
+    ACTIVE -> DRAINING [label="drain"];        // producer -> passive
+    ACTIVE -> TRANSFERRING [label="burst"];
+    TRANSFERRING -> ACTIVE [label="←IStreamCallback.onTransferReady"];
+    TRANSFERRING -> PAUSED [label="pause"];    // consumer -> passive
+    TRANSFERRING -> DRAINING [label="drain"];  // producer -> passive
+    PAUSED -> TRANSFERRING [label="burst"];    // consumer -> active
+    PAUSED -> STANDBY [label="flush"];         // producer -> passive, buffer is cleared
+    DRAINING -> DRAINING [label="burst"];
+    DRAINING -> ACTIVE [label="start"];      // producer -> active
+    DRAINING -> STANDBY [label="<empty buffer>"];  // consumer deactivates
+    IDLE -> ERROR [label="←IStreamCallback.onError"];
+    PAUSED -> ERROR [label="←IStreamCallback.onError"];
+    TRANSFERRING -> ERROR [label="←IStreamCallback.onError"];
+    ANY_STATE -> CLOSED [label="→IStream*.close"];
+    CLOSED -> F;
+}
diff --git a/audio/aidl/android/hardware/audio/core/stream-out-async-sm.gv b/audio/aidl/android/hardware/audio/core/stream-out-async-sm.gv
new file mode 100644
index 0000000..e25b15a
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/stream-out-async-sm.gv
@@ -0,0 +1,57 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// To render: dot -Tpng stream-out-async-sm.gv -o stream-out-async-sm.png
+digraph stream_out_async_state_machine {
+    node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
+    node [shape=point width=0.5] F;
+    node [shape=oval width=1];
+    node [fillcolor=lightgreen] STANDBY;  // buffer is empty
+    node [fillcolor=lightgreen] IDLE;     // buffer is empty
+    node [fillcolor=tomato] CLOSED;
+    node [fillcolor=tomato] ERROR;
+    node [style=dashed] ANY_STATE;
+    node [fillcolor=lightblue style=filled];
+    // Note that when the consumer (h/w) is passive, "burst" operations
+    // complete synchronously, bypassing the TRANSFERRING state.
+    I -> STANDBY;
+    STANDBY -> IDLE [label="start"];                  // consumer -> active
+    STANDBY -> PAUSED [label="burst"];                // producer -> active
+    IDLE -> STANDBY [label="standby"];                // consumer -> passive
+    IDLE -> TRANSFERRING [label="burst"];             // producer -> active
+    ACTIVE -> PAUSED [label="pause"];                 // consumer -> passive (not consuming)
+    ACTIVE -> DRAINING [label="drain"];               // producer -> passive
+    ACTIVE -> TRANSFERRING [label="burst"];           // early unblocking
+    ACTIVE -> ACTIVE [label="burst"];                 // full write
+    TRANSFERRING -> ACTIVE [label="←IStreamCallback.onTransferReady"];
+    TRANSFERRING -> TRANSFER_PAUSED [label="pause"];  // consumer -> passive (not consuming)
+    TRANSFERRING -> DRAINING [label="drain"];         // producer -> passive
+    TRANSFER_PAUSED -> TRANSFERRING [label="start"];  // consumer -> active
+    TRANSFER_PAUSED -> DRAIN_PAUSED [label="drain"];  // producer -> passive
+    TRANSFER_PAUSED -> IDLE [label="flush"];          // buffer is cleared
+    PAUSED -> PAUSED [label="burst"];
+    PAUSED -> ACTIVE [label="start"];                 // consumer -> active
+    PAUSED -> IDLE [label="flush"];                   // producer -> passive, buffer is cleared
+    DRAINING -> IDLE [label="←IStreamCallback.onDrainReady"];
+    DRAINING -> TRANSFERRING [label="burst"];         // producer -> active
+    DRAINING -> DRAIN_PAUSED [label="pause"];         // consumer -> passive (not consuming)
+    DRAIN_PAUSED -> DRAINING [label="start"];         // consumer -> active
+    DRAIN_PAUSED -> TRANSFER_PAUSED [label="burst"];  // producer -> active
+    DRAIN_PAUSED -> IDLE [label="flush"];             // buffer is cleared
+    IDLE -> ERROR [label="←IStreamCallback.onError"];
+    DRAINING -> ERROR [label="←IStreamCallback.onError"];
+    TRANSFERRING -> ERROR [label="←IStreamCallback.onError"];
+    ANY_STATE -> CLOSED [label="→IStream*.close"];
+    CLOSED -> F;
+}
diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp
index dda0e4a..0d2121c 100644
--- a/audio/aidl/common/StreamWorker.cpp
+++ b/audio/aidl/common/StreamWorker.cpp
@@ -25,15 +25,20 @@
 bool ThreadController::start(const std::string& name, int priority) {
     mThreadName = name;
     mThreadPriority = priority;
-    mWorker = std::thread(&ThreadController::workerThread, this);
+    if (kTestSingleThread != name) {
+        mWorker = std::thread(&ThreadController::workerThread, this);
+    } else {
+        // Simulate the case when the workerThread completes prior
+        // to the moment when we being waiting for its start.
+        workerThread();
+    }
     std::unique_lock<std::mutex> lock(mWorkerLock);
     android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
     mWorkerCv.wait(lock, [&]() {
         android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-        return mWorkerState == WorkerState::RUNNING || !mError.empty();
+        return mWorkerState != WorkerState::INITIAL || !mError.empty();
     });
-    mWorkerStateChangeRequest = false;
-    return mWorkerState == WorkerState::RUNNING;
+    return mError.empty();
 }
 
 void ThreadController::stop() {
@@ -81,8 +86,8 @@
 void ThreadController::workerThread() {
     using Status = StreamLogic::Status;
 
-    std::string error = mLogic->init();
-    if (error.empty() && !mThreadName.empty()) {
+    std::string error;
+    if (!mThreadName.empty()) {
         std::string compliantName(mThreadName.substr(0, 15));
         if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
             error.append("Failed to set thread name: ").append(strerror(errCode));
@@ -94,6 +99,9 @@
             error.append("Failed to set thread priority: ").append(strerror(errCode));
         }
     }
+    if (error.empty()) {
+        error.append(mLogic->init());
+    }
     {
         std::lock_guard<std::mutex> lock(mWorkerLock);
         mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index ab2ec26..e9c1070 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -32,7 +32,7 @@
 namespace internal {
 
 class ThreadController {
-    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
+    enum class WorkerState { INITIAL, STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
 
   public:
     explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
@@ -76,7 +76,7 @@
     std::thread mWorker;
     std::mutex mWorkerLock;
     std::condition_variable mWorkerCv;
-    WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
+    WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::INITIAL;
     std::string mError GUARDED_BY(mWorkerLock);
     // The atomic lock-free variable is used to prevent priority inversions
     // that can occur when a high priority worker tries to acquire the lock
@@ -90,6 +90,9 @@
     std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
 };
 
+// A special thread name used in tests only.
+static const std::string kTestSingleThread = "__testST__";
+
 }  // namespace internal
 
 class StreamLogic {
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index 8ea8424..f7a30b9 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -283,4 +283,16 @@
     EXPECT_EQ(priority, worker.getPriority());
 }
 
+TEST_P(StreamWorkerTest, DeferredStartCheckNoError) {
+    stream.setStopStatus();
+    EXPECT_TRUE(worker.start(android::hardware::audio::common::internal::kTestSingleThread));
+    EXPECT_FALSE(worker.hasError());
+}
+
+TEST_P(StreamWorkerTest, DeferredStartCheckWithError) {
+    stream.setErrorStatus();
+    EXPECT_FALSE(worker.start(android::hardware::audio::common::internal::kTestSingleThread));
+    EXPECT_TRUE(worker.hasError());
+}
+
 INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());
diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp
index 6863fe3..9dbd61c 100644
--- a/audio/aidl/default/Module.cpp
+++ b/audio/aidl/default/Module.cpp
@@ -97,6 +97,7 @@
 }
 
 ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t in_bufferSizeFrames,
+                                               std::shared_ptr<IStreamCallback> asyncCallback,
                                                StreamContext* out_context) {
     if (in_bufferSizeFrames <= 0) {
         LOG(ERROR) << __func__ << ": non-positive buffer size " << in_bufferSizeFrames;
@@ -135,8 +136,8 @@
         StreamContext temp(
                 std::make_unique<StreamContext::CommandMQ>(1, true /*configureEventFlagWord*/),
                 std::make_unique<StreamContext::ReplyMQ>(1, true /*configureEventFlagWord*/),
-                frameSize,
-                std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames));
+                frameSize, std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames),
+                asyncCallback, mDebug.streamTransientStateDelayMs);
         if (temp.isValid()) {
             *out_context = std::move(temp);
         } else {
@@ -242,6 +243,11 @@
                    << "while having external devices connected";
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
     }
+    if (in_debug.streamTransientStateDelayMs < 0) {
+        LOG(ERROR) << __func__ << ": streamTransientStateDelayMs is negative: "
+                   << in_debug.streamTransientStateDelayMs;
+        return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
+    }
     mDebug = in_debug;
     return ndk::ScopedAStatus::ok();
 }
@@ -456,7 +462,8 @@
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
     }
     StreamContext context;
-    if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
+    if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, nullptr,
+                                          &context);
         !status.isOk()) {
         return status;
     }
@@ -496,8 +503,16 @@
                    << " has COMPRESS_OFFLOAD flag set, requires offload info";
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
     }
+    const bool isNonBlocking = isBitPositionFlagSet(port->flags.get<AudioIoFlags::Tag::output>(),
+                                                    AudioOutputFlags::NON_BLOCKING);
+    if (isNonBlocking && in_args.callback == nullptr) {
+        LOG(ERROR) << __func__ << ": port id " << port->id
+                   << " has NON_BLOCKING flag set, requires async callback";
+        return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
+    }
     StreamContext context;
-    if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
+    if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames,
+                                          isNonBlocking ? in_args.callback : nullptr, &context);
         !status.isOk()) {
         return status;
     }
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 21dc4b6..d7c352f 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -87,32 +87,46 @@
 
 void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
                                             bool isConnected) const {
+    reply->status = STATUS_OK;
     if (isConnected) {
-        reply->status = STATUS_OK;
         reply->observable.frames = mFrameCount;
         reply->observable.timeNs = ::android::elapsedRealtimeNano();
     } else {
-        reply->status = STATUS_NO_INIT;
+        reply->observable.frames = StreamDescriptor::Position::UNKNOWN;
+        reply->observable.timeNs = StreamDescriptor::Position::UNKNOWN;
     }
 }
 
+void StreamWorkerCommonLogic::populateReplyWrongState(
+        StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const {
+    LOG(WARNING) << "command '" << toString(command.getTag())
+                 << "' can not be handled in the state " << toString(mState);
+    reply->status = STATUS_INVALID_OPERATION;
+}
+
 const std::string StreamInWorkerLogic::kThreadName = "reader";
 
 StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
+    // Note: for input streams, draining is driven by the client, thus
+    // "empty buffer" condition can only happen while handling the 'burst'
+    // command. Thus, unlike for output streams, it does not make sense to
+    // delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
+    // TODO: Add a delay for transitions of async operations when/if they added.
+
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
         mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
+    LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
     StreamDescriptor::Reply reply{};
     reply.status = STATUS_BAD_VALUE;
     using Tag = StreamDescriptor::Command::Tag;
     switch (command.getTag()) {
-        case Tag::hal_reserved_exit:
-            if (const int32_t cookie = command.get<Tag::hal_reserved_exit>();
+        case Tag::halReservedExit:
+            if (const int32_t cookie = command.get<Tag::halReservedExit>();
                 cookie == mInternalCommandCookie) {
-                LOG(DEBUG) << __func__ << ": received EXIT command";
                 setClosed();
                 // This is an internal command, no need to reply.
                 return Status::EXIT;
@@ -120,8 +134,10 @@
                 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
             }
             break;
+        case Tag::getStatus:
+            populateReply(&reply, mIsConnected);
+            break;
         case Tag::start:
-            LOG(DEBUG) << __func__ << ": received START read command";
             if (mState == StreamDescriptor::State::STANDBY ||
                 mState == StreamDescriptor::State::DRAINING) {
                 populateReply(&reply, mIsConnected);
@@ -129,15 +145,13 @@
                                  ? StreamDescriptor::State::IDLE
                                  : StreamDescriptor::State::ACTIVE;
             } else {
-                LOG(WARNING) << __func__ << ": START command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::burst:
             if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
-                LOG(DEBUG) << __func__ << ": received BURST read command for " << fmqByteCount
-                           << " bytes";
+                LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
+                           << fmqByteCount << " bytes";
                 if (mState == StreamDescriptor::State::IDLE ||
                     mState == StreamDescriptor::State::ACTIVE ||
                     mState == StreamDescriptor::State::PAUSED ||
@@ -151,69 +165,61 @@
                     } else if (mState == StreamDescriptor::State::DRAINING) {
                         // To simplify the reference code, we assume that the read operation
                         // has consumed all the data remaining in the hardware buffer.
-                        // TODO: Provide parametrization on the duration of draining to test
-                        //       handling of commands during the 'DRAINING' state.
+                        // In a real implementation, here we would either remain in
+                        // the 'DRAINING' state, or transfer to 'STANDBY' depending on the
+                        // buffer state.
                         mState = StreamDescriptor::State::STANDBY;
                     }
                 } else {
-                    LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
-                                 << toString(mState);
-                    reply.status = STATUS_INVALID_OPERATION;
+                    populateReplyWrongState(&reply, command);
                 }
             } else {
                 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
             }
             break;
         case Tag::drain:
-            LOG(DEBUG) << __func__ << ": received DRAIN read command";
-            if (mState == StreamDescriptor::State::ACTIVE) {
-                usleep(1000);  // Simulate a blocking call into the driver.
-                populateReply(&reply, mIsConnected);
-                // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::DRAINING;
+            if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
+                if (mState == StreamDescriptor::State::ACTIVE) {
+                    usleep(1000);  // Simulate a blocking call into the driver.
+                    populateReply(&reply, mIsConnected);
+                    // Can switch the state to ERROR if a driver error occurs.
+                    mState = StreamDescriptor::State::DRAINING;
+                } else {
+                    populateReplyWrongState(&reply, command);
+                }
             } else {
-                LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                LOG(WARNING) << __func__
+                             << ": invalid drain mode: " << toString(command.get<Tag::drain>());
             }
             break;
         case Tag::standby:
-            LOG(DEBUG) << __func__ << ": received STANDBY read command";
             if (mState == StreamDescriptor::State::IDLE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::STANDBY;
             } else {
-                LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::pause:
-            LOG(DEBUG) << __func__ << ": received PAUSE read command";
             if (mState == StreamDescriptor::State::ACTIVE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::PAUSED;
             } else {
-                LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::flush:
-            LOG(DEBUG) << __func__ << ": received FLUSH read command";
             if (mState == StreamDescriptor::State::PAUSED) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::STANDBY;
             } else {
-                LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
     }
@@ -261,20 +267,52 @@
 const std::string StreamOutWorkerLogic::kThreadName = "writer";
 
 StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
+    if (mState == StreamDescriptor::State::DRAINING ||
+        mState == StreamDescriptor::State::TRANSFERRING) {
+        if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
+                    std::chrono::steady_clock::now() - mTransientStateStart);
+            stateDurationMs >= mTransientStateDelayMs) {
+            if (mAsyncCallback == nullptr) {
+                // In blocking mode, mState can only be DRAINING.
+                mState = StreamDescriptor::State::IDLE;
+            } else {
+                // In a real implementation, the driver should notify the HAL about
+                // drain or transfer completion. In the stub, we switch unconditionally.
+                if (mState == StreamDescriptor::State::DRAINING) {
+                    mState = StreamDescriptor::State::IDLE;
+                    ndk::ScopedAStatus status = mAsyncCallback->onDrainReady();
+                    if (!status.isOk()) {
+                        LOG(ERROR) << __func__ << ": error from onDrainReady: " << status;
+                    }
+                } else {
+                    mState = StreamDescriptor::State::ACTIVE;
+                    ndk::ScopedAStatus status = mAsyncCallback->onTransferReady();
+                    if (!status.isOk()) {
+                        LOG(ERROR) << __func__ << ": error from onTransferReady: " << status;
+                    }
+                }
+            }
+            if (mTransientStateDelayMs.count() != 0) {
+                LOG(DEBUG) << __func__ << ": switched to state " << toString(mState)
+                           << " after a timeout";
+            }
+        }
+    }
+
     StreamDescriptor::Command command{};
     if (!mCommandMQ->readBlocking(&command, 1)) {
         LOG(ERROR) << __func__ << ": reading of command from MQ failed";
         mState = StreamDescriptor::State::ERROR;
         return Status::ABORT;
     }
+    LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
     StreamDescriptor::Reply reply{};
     reply.status = STATUS_BAD_VALUE;
     using Tag = StreamDescriptor::Command::Tag;
     switch (command.getTag()) {
-        case Tag::hal_reserved_exit:
-            if (const int32_t cookie = command.get<Tag::hal_reserved_exit>();
+        case Tag::halReservedExit:
+            if (const int32_t cookie = command.get<Tag::halReservedExit>();
                 cookie == mInternalCommandCookie) {
-                LOG(DEBUG) << __func__ << ": received EXIT command";
                 setClosed();
                 // This is an internal command, no need to reply.
                 return Status::EXIT;
@@ -282,8 +320,11 @@
                 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
             }
             break;
-        case Tag::start:
-            LOG(DEBUG) << __func__ << ": received START write command";
+        case Tag::getStatus:
+            populateReply(&reply, mIsConnected);
+            break;
+        case Tag::start: {
+            bool commandAccepted = true;
             switch (mState) {
                 case StreamDescriptor::State::STANDBY:
                     mState = StreamDescriptor::State::IDLE;
@@ -292,97 +333,112 @@
                     mState = StreamDescriptor::State::ACTIVE;
                     break;
                 case StreamDescriptor::State::DRAIN_PAUSED:
-                    mState = StreamDescriptor::State::PAUSED;
+                    switchToTransientState(StreamDescriptor::State::DRAINING);
+                    break;
+                case StreamDescriptor::State::TRANSFER_PAUSED:
+                    switchToTransientState(StreamDescriptor::State::TRANSFERRING);
                     break;
                 default:
-                    LOG(WARNING) << __func__ << ": START command can not be handled in the state "
-                                 << toString(mState);
-                    reply.status = STATUS_INVALID_OPERATION;
+                    populateReplyWrongState(&reply, command);
+                    commandAccepted = false;
             }
-            if (reply.status != STATUS_INVALID_OPERATION) {
+            if (commandAccepted) {
                 populateReply(&reply, mIsConnected);
             }
-            break;
+        } break;
         case Tag::burst:
             if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
-                LOG(DEBUG) << __func__ << ": received BURST write command for " << fmqByteCount
-                           << " bytes";
-                if (mState !=
-                    StreamDescriptor::State::ERROR) {  // BURST can be handled in all valid states
+                LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
+                           << fmqByteCount << " bytes";
+                if (mState != StreamDescriptor::State::ERROR &&
+                    mState != StreamDescriptor::State::TRANSFERRING &&
+                    mState != StreamDescriptor::State::TRANSFER_PAUSED) {
                     if (!write(fmqByteCount, &reply)) {
                         mState = StreamDescriptor::State::ERROR;
                     }
                     if (mState == StreamDescriptor::State::STANDBY ||
-                        mState == StreamDescriptor::State::DRAIN_PAUSED) {
-                        mState = StreamDescriptor::State::PAUSED;
+                        mState == StreamDescriptor::State::DRAIN_PAUSED ||
+                        mState == StreamDescriptor::State::PAUSED) {
+                        if (mAsyncCallback == nullptr ||
+                            mState != StreamDescriptor::State::DRAIN_PAUSED) {
+                            mState = StreamDescriptor::State::PAUSED;
+                        } else {
+                            mState = StreamDescriptor::State::TRANSFER_PAUSED;
+                        }
                     } else if (mState == StreamDescriptor::State::IDLE ||
-                               mState == StreamDescriptor::State::DRAINING) {
-                        mState = StreamDescriptor::State::ACTIVE;
-                    }  // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
+                               mState == StreamDescriptor::State::DRAINING ||
+                               mState == StreamDescriptor::State::ACTIVE) {
+                        if (mAsyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) {
+                            mState = StreamDescriptor::State::ACTIVE;
+                        } else {
+                            switchToTransientState(StreamDescriptor::State::TRANSFERRING);
+                        }
+                    }
                 } else {
-                    LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
-                                 << toString(mState);
-                    reply.status = STATUS_INVALID_OPERATION;
+                    populateReplyWrongState(&reply, command);
                 }
             } else {
                 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
             }
             break;
         case Tag::drain:
-            LOG(DEBUG) << __func__ << ": received DRAIN write command";
-            if (mState == StreamDescriptor::State::ACTIVE) {
-                usleep(1000);  // Simulate a blocking call into the driver.
-                populateReply(&reply, mIsConnected);
-                // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::IDLE;
-                // Since there is no actual hardware that would be draining the buffer,
-                // in order to simplify the reference code, we assume that draining
-                // happens instantly, thus skipping the 'DRAINING' state.
-                // TODO: Provide parametrization on the duration of draining to test
-                //       handling of commands during the 'DRAINING' state.
+            if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_ALL ||
+                command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
+                if (mState == StreamDescriptor::State::ACTIVE ||
+                    mState == StreamDescriptor::State::TRANSFERRING) {
+                    usleep(1000);  // Simulate a blocking call into the driver.
+                    populateReply(&reply, mIsConnected);
+                    // Can switch the state to ERROR if a driver error occurs.
+                    switchToTransientState(StreamDescriptor::State::DRAINING);
+                } else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) {
+                    mState = StreamDescriptor::State::DRAIN_PAUSED;
+                    populateReply(&reply, mIsConnected);
+                } else {
+                    populateReplyWrongState(&reply, command);
+                }
             } else {
-                LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                LOG(WARNING) << __func__
+                             << ": invalid drain mode: " << toString(command.get<Tag::drain>());
             }
             break;
         case Tag::standby:
-            LOG(DEBUG) << __func__ << ": received STANDBY write command";
             if (mState == StreamDescriptor::State::IDLE) {
                 usleep(1000);  // Simulate a blocking call into the driver.
                 populateReply(&reply, mIsConnected);
                 // Can switch the state to ERROR if a driver error occurs.
                 mState = StreamDescriptor::State::STANDBY;
             } else {
-                LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
-        case Tag::pause:
-            LOG(DEBUG) << __func__ << ": received PAUSE write command";
-            if (mState == StreamDescriptor::State::ACTIVE ||
-                mState == StreamDescriptor::State::DRAINING) {
+        case Tag::pause: {
+            bool commandAccepted = true;
+            switch (mState) {
+                case StreamDescriptor::State::ACTIVE:
+                    mState = StreamDescriptor::State::PAUSED;
+                    break;
+                case StreamDescriptor::State::DRAINING:
+                    mState = StreamDescriptor::State::DRAIN_PAUSED;
+                    break;
+                case StreamDescriptor::State::TRANSFERRING:
+                    mState = StreamDescriptor::State::TRANSFER_PAUSED;
+                    break;
+                default:
+                    populateReplyWrongState(&reply, command);
+                    commandAccepted = false;
+            }
+            if (commandAccepted) {
                 populateReply(&reply, mIsConnected);
-                mState = mState == StreamDescriptor::State::ACTIVE
-                                 ? StreamDescriptor::State::PAUSED
-                                 : StreamDescriptor::State::DRAIN_PAUSED;
-            } else {
-                LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
             }
-            break;
+        } break;
         case Tag::flush:
-            LOG(DEBUG) << __func__ << ": received FLUSH write command";
             if (mState == StreamDescriptor::State::PAUSED ||
-                mState == StreamDescriptor::State::DRAIN_PAUSED) {
+                mState == StreamDescriptor::State::DRAIN_PAUSED ||
+                mState == StreamDescriptor::State::TRANSFER_PAUSED) {
                 populateReply(&reply, mIsConnected);
                 mState = StreamDescriptor::State::IDLE;
             } else {
-                LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
-                             << toString(mState);
-                reply.status = STATUS_INVALID_OPERATION;
+                populateReplyWrongState(&reply, command);
             }
             break;
     }
@@ -450,9 +506,8 @@
 void StreamCommon<Metadata, StreamWorker>::stopWorker() {
     if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
         LOG(DEBUG) << __func__ << ": asking the worker to exit...";
-        auto cmd =
-                StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::hal_reserved_exit>(
-                        mContext.getInternalCommandCookie());
+        auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
+                mContext.getInternalCommandCookie());
         // Note: never call 'pause' and 'resume' methods of StreamWorker
         // in the HAL implementation. These methods are to be used by
         // the client side only. Preventing the worker loop from running
diff --git a/audio/aidl/default/include/core-impl/Module.h b/audio/aidl/default/include/core-impl/Module.h
index 0086743..f7b85ed 100644
--- a/audio/aidl/default/include/core-impl/Module.h
+++ b/audio/aidl/default/include/core-impl/Module.h
@@ -86,6 +86,7 @@
     void cleanUpPatch(int32_t patchId);
     ndk::ScopedAStatus createStreamContext(
             int32_t in_portConfigId, int64_t in_bufferSizeFrames,
+            std::shared_ptr<IStreamCallback> asyncCallback,
             ::aidl::android::hardware::audio::core::StreamContext* out_context);
     ndk::ScopedAStatus findPortIdForNewStream(
             int32_t in_portConfigId, ::aidl::android::media::audio::common::AudioPort** port);
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 5ee0f82..3c96973 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <atomic>
+#include <chrono>
 #include <cstdlib>
 #include <map>
 #include <memory>
@@ -28,6 +29,7 @@
 #include <aidl/android/hardware/audio/common/SourceMetadata.h>
 #include <aidl/android/hardware/audio/core/BnStreamIn.h>
 #include <aidl/android/hardware/audio/core/BnStreamOut.h>
+#include <aidl/android/hardware/audio/core/IStreamCallback.h>
 #include <aidl/android/hardware/audio/core/StreamDescriptor.h>
 #include <aidl/android/media/audio/common/AudioOffloadInfo.h>
 #include <fmq/AidlMessageQueue.h>
@@ -59,33 +61,42 @@
 
     StreamContext() = default;
     StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
-                  size_t frameSize, std::unique_ptr<DataMQ> dataMQ)
+                  size_t frameSize, std::unique_ptr<DataMQ> dataMQ,
+                  std::shared_ptr<IStreamCallback> asyncCallback, int transientStateDelayMs)
         : mCommandMQ(std::move(commandMQ)),
           mInternalCommandCookie(std::rand()),
           mReplyMQ(std::move(replyMQ)),
           mFrameSize(frameSize),
-          mDataMQ(std::move(dataMQ)) {}
+          mDataMQ(std::move(dataMQ)),
+          mAsyncCallback(asyncCallback),
+          mTransientStateDelayMs(transientStateDelayMs) {}
     StreamContext(StreamContext&& other)
         : mCommandMQ(std::move(other.mCommandMQ)),
           mInternalCommandCookie(other.mInternalCommandCookie),
           mReplyMQ(std::move(other.mReplyMQ)),
           mFrameSize(other.mFrameSize),
-          mDataMQ(std::move(other.mDataMQ)) {}
+          mDataMQ(std::move(other.mDataMQ)),
+          mAsyncCallback(other.mAsyncCallback),
+          mTransientStateDelayMs(other.mTransientStateDelayMs) {}
     StreamContext& operator=(StreamContext&& other) {
         mCommandMQ = std::move(other.mCommandMQ);
         mInternalCommandCookie = other.mInternalCommandCookie;
         mReplyMQ = std::move(other.mReplyMQ);
         mFrameSize = other.mFrameSize;
         mDataMQ = std::move(other.mDataMQ);
+        mAsyncCallback = other.mAsyncCallback;
+        mTransientStateDelayMs = other.mTransientStateDelayMs;
         return *this;
     }
 
     void fillDescriptor(StreamDescriptor* desc);
+    std::shared_ptr<IStreamCallback> getAsyncCallback() const { return mAsyncCallback; }
     CommandMQ* getCommandMQ() const { return mCommandMQ.get(); }
     DataMQ* getDataMQ() const { return mDataMQ.get(); }
     size_t getFrameSize() const { return mFrameSize; }
     int getInternalCommandCookie() const { return mInternalCommandCookie; }
     ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
+    int getTransientStateDelayMs() const { return mTransientStateDelayMs; }
     bool isValid() const;
     void reset();
 
@@ -95,6 +106,8 @@
     std::unique_ptr<ReplyMQ> mReplyMQ;
     size_t mFrameSize;
     std::unique_ptr<DataMQ> mDataMQ;
+    std::shared_ptr<IStreamCallback> mAsyncCallback;
+    int mTransientStateDelayMs;
 };
 
 class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
@@ -111,9 +124,17 @@
           mFrameSize(context.getFrameSize()),
           mCommandMQ(context.getCommandMQ()),
           mReplyMQ(context.getReplyMQ()),
-          mDataMQ(context.getDataMQ()) {}
+          mDataMQ(context.getDataMQ()),
+          mAsyncCallback(context.getAsyncCallback()),
+          mTransientStateDelayMs(context.getTransientStateDelayMs()) {}
     std::string init() override;
     void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
+    void populateReplyWrongState(StreamDescriptor::Reply* reply,
+                                 const StreamDescriptor::Command& command) const;
+    void switchToTransientState(StreamDescriptor::State state) {
+        mState = state;
+        mTransientStateStart = std::chrono::steady_clock::now();
+    }
 
     // Atomic fields are used both by the main and worker threads.
     std::atomic<bool> mIsConnected = false;
@@ -125,6 +146,9 @@
     StreamContext::CommandMQ* mCommandMQ;
     StreamContext::ReplyMQ* mReplyMQ;
     StreamContext::DataMQ* mDataMQ;
+    std::shared_ptr<IStreamCallback> mAsyncCallback;
+    const std::chrono::duration<int, std::milli> mTransientStateDelayMs;
+    std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart;
     // We use an array and the "size" field instead of a vector to be able to detect
     // memory allocation issues.
     std::unique_ptr<int8_t[]> mDataBuffer;
diff --git a/audio/aidl/vts/ModuleConfig.cpp b/audio/aidl/vts/ModuleConfig.cpp
index 33c5b72..c081402 100644
--- a/audio/aidl/vts/ModuleConfig.cpp
+++ b/audio/aidl/vts/ModuleConfig.cpp
@@ -125,21 +125,21 @@
     return result;
 }
 
+std::vector<AudioPort> ModuleConfig::getNonBlockingMixPorts(bool attachedOnly,
+                                                            bool singlePort) const {
+    return findMixPorts(false /*isInput*/, singlePort, [&](const AudioPort& port) {
+        return isBitPositionFlagSet(port.flags.get<AudioIoFlags::Tag::output>(),
+                                    AudioOutputFlags::NON_BLOCKING) &&
+               (!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty());
+    });
+}
+
 std::vector<AudioPort> ModuleConfig::getOffloadMixPorts(bool attachedOnly, bool singlePort) const {
-    std::vector<AudioPort> result;
-    const auto mixPorts = getMixPorts(false /*isInput*/);
-    auto offloadPortIt = mixPorts.begin();
-    while (offloadPortIt != mixPorts.end()) {
-        offloadPortIt = std::find_if(offloadPortIt, mixPorts.end(), [&](const AudioPort& port) {
-            return isBitPositionFlagSet(port.flags.get<AudioIoFlags::Tag::output>(),
-                                        AudioOutputFlags::COMPRESS_OFFLOAD) &&
-                   (!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty());
-        });
-        if (offloadPortIt == mixPorts.end()) break;
-        result.push_back(*offloadPortIt++);
-        if (singlePort) break;
-    }
-    return result;
+    return findMixPorts(false /*isInput*/, singlePort, [&](const AudioPort& port) {
+        return isBitPositionFlagSet(port.flags.get<AudioIoFlags::Tag::output>(),
+                                    AudioOutputFlags::COMPRESS_OFFLOAD) &&
+               (!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty());
+    });
 }
 
 std::vector<AudioPort> ModuleConfig::getAttachedDevicesPortsForMixPort(
@@ -343,6 +343,19 @@
            profile.sampleRates.empty() || profile.channelMasks.empty();
 }
 
+std::vector<AudioPort> ModuleConfig::findMixPorts(
+        bool isInput, bool singlePort, std::function<bool(const AudioPort&)> pred) const {
+    std::vector<AudioPort> result;
+    const auto mixPorts = getMixPorts(isInput);
+    for (auto mixPortIt = mixPorts.begin(); mixPortIt != mixPorts.end();) {
+        mixPortIt = std::find_if(mixPortIt, mixPorts.end(), pred);
+        if (mixPortIt == mixPorts.end()) break;
+        result.push_back(*mixPortIt++);
+        if (singlePort) break;
+    }
+    return result;
+}
+
 std::vector<AudioPortConfig> ModuleConfig::generateAudioMixPortConfigs(
         const std::vector<AudioPort>& ports, bool isInput, bool singleProfile) const {
     std::vector<AudioPortConfig> result;
diff --git a/audio/aidl/vts/ModuleConfig.h b/audio/aidl/vts/ModuleConfig.h
index dc109a7..a85aa7f 100644
--- a/audio/aidl/vts/ModuleConfig.h
+++ b/audio/aidl/vts/ModuleConfig.h
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <functional>
 #include <optional>
 #include <set>
 #include <utility>
@@ -48,6 +49,8 @@
     std::vector<aidl::android::media::audio::common::AudioPort> getMixPorts(bool isInput) const {
         return isInput ? getInputMixPorts() : getOutputMixPorts();
     }
+    std::vector<aidl::android::media::audio::common::AudioPort> getNonBlockingMixPorts(
+            bool attachedOnly, bool singlePort) const;
     std::vector<aidl::android::media::audio::common::AudioPort> getOffloadMixPorts(
             bool attachedOnly, bool singlePort) const;
 
@@ -121,6 +124,9 @@
     std::string toString() const;
 
   private:
+    std::vector<aidl::android::media::audio::common::AudioPort> findMixPorts(
+            bool isInput, bool singlePort,
+            std::function<bool(const aidl::android::media::audio::common::AudioPort&)> pred) const;
     std::vector<aidl::android::media::audio::common::AudioPortConfig> generateAudioMixPortConfigs(
             const std::vector<aidl::android::media::audio::common::AudioPort>& ports, bool isInput,
             bool singleProfile) const;
diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
index 5e9aa7f..79b20fe 100644
--- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
@@ -15,12 +15,16 @@
  */
 
 #include <algorithm>
+#include <chrono>
 #include <cmath>
+#include <condition_variable>
 #include <limits>
 #include <memory>
+#include <mutex>
 #include <optional>
 #include <set>
 #include <string>
+#include <variant>
 #include <vector>
 
 #define LOG_TAG "VtsHalAudioCore"
@@ -30,6 +34,7 @@
 #include <Utils.h>
 #include <aidl/Gtest.h>
 #include <aidl/Vintf.h>
+#include <aidl/android/hardware/audio/core/BnStreamCallback.h>
 #include <aidl/android/hardware/audio/core/IModule.h>
 #include <aidl/android/hardware/audio/core/ITelephony.h>
 #include <aidl/android/media/audio/common/AudioIoFlags.h>
@@ -389,15 +394,132 @@
     std::unique_ptr<DataMQ> mDataMQ;
 };
 
-class StreamLogicDriver {
+struct StreamEventReceiver {
+    virtual ~StreamEventReceiver() = default;
+    enum class Event { None, DrainReady, Error, TransferReady };
+    virtual std::tuple<int, Event> getLastEvent() const = 0;
+    virtual std::tuple<int, Event> waitForEvent(int clientEventSeq) = 0;
+    static constexpr int kEventSeqInit = -1;
+};
+std::string toString(StreamEventReceiver::Event event) {
+    switch (event) {
+        case StreamEventReceiver::Event::None:
+            return "None";
+        case StreamEventReceiver::Event::DrainReady:
+            return "DrainReady";
+        case StreamEventReceiver::Event::Error:
+            return "Error";
+        case StreamEventReceiver::Event::TransferReady:
+            return "TransferReady";
+    }
+    return std::to_string(static_cast<int32_t>(event));
+}
+
+// Transition to the next state happens either due to a command from the client,
+// or after an event received from the server.
+using TransitionTrigger = std::variant<StreamDescriptor::Command, StreamEventReceiver::Event>;
+using StateTransition = std::pair<TransitionTrigger, StreamDescriptor::State>;
+struct StateSequence {
+    virtual ~StateSequence() = default;
+    virtual void rewind() = 0;
+    virtual bool done() const = 0;
+    virtual TransitionTrigger getTrigger() = 0;
+    virtual std::set<StreamDescriptor::State> getExpectedStates() = 0;
+    virtual void advance(StreamDescriptor::State state) = 0;
+};
+
+static const StreamDescriptor::Command kGetStatusCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::getStatus>(Void{});
+static const StreamDescriptor::Command kStartCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::start>(Void{});
+static const StreamDescriptor::Command kBurstCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::burst>(0);
+static const StreamDescriptor::Command kDrainInCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::drain>(
+                StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED);
+static const StreamDescriptor::Command kDrainOutAllCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::drain>(
+                StreamDescriptor::DrainMode::DRAIN_ALL);
+static const StreamDescriptor::Command kDrainOutEarlyCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::drain>(
+                StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY);
+static const StreamDescriptor::Command kStandbyCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::standby>(Void{});
+static const StreamDescriptor::Command kPauseCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::pause>(Void{});
+static const StreamDescriptor::Command kFlushCommand =
+        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::flush>(Void{});
+static const StreamEventReceiver::Event kTransferReadyEvent =
+        StreamEventReceiver::Event::TransferReady;
+static const StreamEventReceiver::Event kDrainReadyEvent = StreamEventReceiver::Event::DrainReady;
+
+// Handle possible bifurcations:
+//   - on burst and on start: 'TRANSFERRING' -> {'ACTIVE', 'TRANSFERRING'}
+//   - on pause: 'TRANSFER_PAUSED' -> {'PAUSED', 'TRANSFER_PAUSED'}
+// It is assumed that the 'steps' provided on the construction contain the sequence
+// for the async case, which gets corrected in the case when the HAL decided to do
+// a synchronous transfer.
+class SmartStateSequence : public StateSequence {
   public:
+    explicit SmartStateSequence(const std::vector<StateTransition>& steps) : mSteps(steps) {}
+    explicit SmartStateSequence(std::vector<StateTransition>&& steps) : mSteps(std::move(steps)) {}
+    void rewind() override { mCurrentStep = 0; }
+    bool done() const override { return mCurrentStep >= mSteps.size(); }
+    TransitionTrigger getTrigger() override { return mSteps[mCurrentStep].first; }
+    std::set<StreamDescriptor::State> getExpectedStates() override {
+        std::set<StreamDescriptor::State> result = {getState()};
+        if (isBurstBifurcation() || isStartBifurcation()) {
+            result.insert(StreamDescriptor::State::ACTIVE);
+        } else if (isPauseBifurcation()) {
+            result.insert(StreamDescriptor::State::PAUSED);
+        }
+        return result;
+    }
+    void advance(StreamDescriptor::State state) override {
+        if (isBurstBifurcation() && state == StreamDescriptor::State::ACTIVE &&
+            mCurrentStep + 1 < mSteps.size() &&
+            mSteps[mCurrentStep + 1].first == TransitionTrigger{kTransferReadyEvent}) {
+            mCurrentStep++;
+        }
+        mCurrentStep++;
+    }
+
+  private:
+    StreamDescriptor::State getState() const { return mSteps[mCurrentStep].second; }
+    bool isBurstBifurcation() {
+        return getTrigger() == TransitionTrigger{kBurstCommand}&& getState() ==
+               StreamDescriptor::State::TRANSFERRING;
+    }
+    bool isPauseBifurcation() {
+        return getTrigger() == TransitionTrigger{kPauseCommand}&& getState() ==
+               StreamDescriptor::State::TRANSFER_PAUSED;
+    }
+    bool isStartBifurcation() {
+        return getTrigger() == TransitionTrigger{kStartCommand}&& getState() ==
+               StreamDescriptor::State::TRANSFERRING;
+    }
+    const std::vector<StateTransition> mSteps;
+    size_t mCurrentStep = 0;
+};
+
+std::string toString(const TransitionTrigger& trigger) {
+    if (std::holds_alternative<StreamDescriptor::Command>(trigger)) {
+        return std::string("'")
+                .append(toString(std::get<StreamDescriptor::Command>(trigger).getTag()))
+                .append("' command");
+    }
+    return std::string("'")
+            .append(toString(std::get<StreamEventReceiver::Event>(trigger)))
+            .append("' event");
+}
+
+struct StreamLogicDriver {
     virtual ~StreamLogicDriver() = default;
     // Return 'true' to stop the worker.
     virtual bool done() = 0;
     // For 'Writer' logic, if the 'actualSize' is 0, write is skipped.
     // The 'fmqByteCount' from the returned command is passed as is to the HAL.
-    virtual StreamDescriptor::Command getNextCommand(int maxDataSize,
-                                                     int* actualSize = nullptr) = 0;
+    virtual TransitionTrigger getNextTrigger(int maxDataSize, int* actualSize = nullptr) = 0;
     // Return 'true' to indicate that no further processing is needed,
     // for example, the driver is expecting a bad status to be returned.
     // The logic cycle will return with 'CONTINUE' status. Otherwise,
@@ -410,46 +532,102 @@
 
 class StreamCommonLogic : public StreamLogic {
   protected:
-    StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver)
+    StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver,
+                      StreamEventReceiver* eventReceiver)
         : mCommandMQ(context.getCommandMQ()),
           mReplyMQ(context.getReplyMQ()),
           mDataMQ(context.getDataMQ()),
           mData(context.getBufferSizeBytes()),
-          mDriver(driver) {}
+          mDriver(driver),
+          mEventReceiver(eventReceiver) {}
     StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; }
     StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; }
+    StreamContext::DataMQ* getDataMQ() const { return mDataMQ; }
     StreamLogicDriver* getDriver() const { return mDriver; }
+    StreamEventReceiver* getEventReceiver() const { return mEventReceiver; }
 
-    std::string init() override { return ""; }
+    std::string init() override {
+        LOG(DEBUG) << __func__;
+        return "";
+    }
+    std::optional<StreamDescriptor::Command> maybeGetNextCommand(int* actualSize = nullptr) {
+        TransitionTrigger trigger = mDriver->getNextTrigger(mData.size(), actualSize);
+        if (StreamEventReceiver::Event* expEvent =
+                    std::get_if<StreamEventReceiver::Event>(&trigger);
+            expEvent != nullptr) {
+            auto [eventSeq, event] = mEventReceiver->waitForEvent(mLastEventSeq);
+            mLastEventSeq = eventSeq;
+            if (event != *expEvent) {
+                LOG(ERROR) << __func__ << ": expected event " << toString(*expEvent) << ", got "
+                           << toString(event);
+                return {};
+            }
+            // If we were waiting for an event, the new stream state must be retrieved
+            // via 'getStatus'.
+            return StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::getStatus>(
+                    Void{});
+        }
+        return std::get<StreamDescriptor::Command>(trigger);
+    }
+    bool readDataFromMQ(size_t readCount) {
+        std::vector<int8_t> data(readCount);
+        if (mDataMQ->read(data.data(), readCount)) {
+            memcpy(mData.data(), data.data(), std::min(mData.size(), data.size()));
+            return true;
+        }
+        LOG(ERROR) << __func__ << ": reading of " << readCount << " bytes from MQ failed";
+        return false;
+    }
+    bool writeDataToMQ() {
+        if (mDataMQ->write(mData.data(), mData.size())) {
+            return true;
+        }
+        LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
+        return false;
+    }
 
+  private:
     StreamContext::CommandMQ* mCommandMQ;
     StreamContext::ReplyMQ* mReplyMQ;
     StreamContext::DataMQ* mDataMQ;
     std::vector<int8_t> mData;
     StreamLogicDriver* const mDriver;
+    StreamEventReceiver* const mEventReceiver;
+    int mLastEventSeq = StreamEventReceiver::kEventSeqInit;
 };
 
 class StreamReaderLogic : public StreamCommonLogic {
   public:
-    StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver)
-        : StreamCommonLogic(context, driver) {}
+    StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver,
+                      StreamEventReceiver* eventReceiver)
+        : StreamCommonLogic(context, driver, eventReceiver) {}
 
   protected:
     Status cycle() override {
         if (getDriver()->done()) {
+            LOG(DEBUG) << __func__ << ": clean exit";
             return Status::EXIT;
         }
-        StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size());
-        if (!mCommandMQ->writeBlocking(&command, 1)) {
+        StreamDescriptor::Command command;
+        if (auto maybeCommand = maybeGetNextCommand(); maybeCommand.has_value()) {
+            command = std::move(maybeCommand.value());
+        } else {
+            LOG(ERROR) << __func__ << ": no next command";
+            return Status::ABORT;
+        }
+        LOG(DEBUG) << "Writing command: " << command.toString();
+        if (!getCommandMQ()->writeBlocking(&command, 1)) {
             LOG(ERROR) << __func__ << ": writing of command into MQ failed";
             return Status::ABORT;
         }
         StreamDescriptor::Reply reply{};
-        if (!mReplyMQ->readBlocking(&reply, 1)) {
-            LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+        LOG(DEBUG) << "Reading reply...";
+        if (!getReplyMQ()->readBlocking(&reply, 1)) {
             return Status::ABORT;
         }
+        LOG(DEBUG) << "Reply received: " << reply.toString();
         if (getDriver()->interceptRawReply(reply)) {
+            LOG(DEBUG) << __func__ << ": reply has been intercepted by the driver";
             return Status::CONTINUE;
         }
         if (reply.status != STATUS_OK) {
@@ -463,11 +641,11 @@
                        << ": received invalid byte count in the reply: " << reply.fmqByteCount;
             return Status::ABORT;
         }
-        if (static_cast<size_t>(reply.fmqByteCount) != mDataMQ->availableToRead()) {
+        if (static_cast<size_t>(reply.fmqByteCount) != getDataMQ()->availableToRead()) {
             LOG(ERROR) << __func__
                        << ": the byte count in the reply is not the same as the amount of "
                        << "data available in the MQ: " << reply.fmqByteCount
-                       << " != " << mDataMQ->availableToRead();
+                       << " != " << getDataMQ()->availableToRead();
         }
         if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
             LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
@@ -484,10 +662,8 @@
             return Status::ABORT;
         }
         const bool acceptedReply = getDriver()->processValidReply(reply);
-        if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) {
-            std::vector<int8_t> data(readCount);
-            if (mDataMQ->read(data.data(), readCount)) {
-                memcpy(mData.data(), data.data(), std::min(mData.size(), data.size()));
+        if (const size_t readCount = getDataMQ()->availableToRead(); readCount > 0) {
+            if (readDataFromMQ(readCount)) {
                 goto checkAcceptedReply;
             }
             LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
@@ -505,29 +681,39 @@
 
 class StreamWriterLogic : public StreamCommonLogic {
   public:
-    StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver)
-        : StreamCommonLogic(context, driver) {}
+    StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver,
+                      StreamEventReceiver* eventReceiver)
+        : StreamCommonLogic(context, driver, eventReceiver) {}
 
   protected:
     Status cycle() override {
         if (getDriver()->done()) {
+            LOG(DEBUG) << __func__ << ": clean exit";
             return Status::EXIT;
         }
         int actualSize = 0;
-        StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize);
-        if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) {
-            LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
+        StreamDescriptor::Command command;
+        if (auto maybeCommand = maybeGetNextCommand(&actualSize); maybeCommand.has_value()) {
+            command = std::move(maybeCommand.value());
+        } else {
+            LOG(ERROR) << __func__ << ": no next command";
             return Status::ABORT;
         }
-        if (!mCommandMQ->writeBlocking(&command, 1)) {
+        if (actualSize != 0 && !writeDataToMQ()) {
+            return Status::ABORT;
+        }
+        LOG(DEBUG) << "Writing command: " << command.toString();
+        if (!getCommandMQ()->writeBlocking(&command, 1)) {
             LOG(ERROR) << __func__ << ": writing of command into MQ failed";
             return Status::ABORT;
         }
         StreamDescriptor::Reply reply{};
-        if (!mReplyMQ->readBlocking(&reply, 1)) {
+        LOG(DEBUG) << "Reading reply...";
+        if (!getReplyMQ()->readBlocking(&reply, 1)) {
             LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
             return Status::ABORT;
         }
+        LOG(DEBUG) << "Reply received: " << reply.toString();
         if (getDriver()->interceptRawReply(reply)) {
             return Status::CONTINUE;
         }
@@ -542,10 +728,10 @@
                        << ": received invalid byte count in the reply: " << reply.fmqByteCount;
             return Status::ABORT;
         }
-        if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) {
+        if (getDataMQ()->availableToWrite() != getDataMQ()->getQuantumCount()) {
             LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: "
-                       << "available to write " << mDataMQ->availableToWrite()
-                       << ", total size: " << mDataMQ->getQuantumCount();
+                       << "available to write " << getDataMQ()->availableToWrite()
+                       << ", total size: " << getDataMQ()->getQuantumCount();
             return Status::ABORT;
         }
         if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
@@ -571,6 +757,71 @@
 };
 using StreamWriter = StreamWorker<StreamWriterLogic>;
 
+class DefaultStreamCallback : public ::aidl::android::hardware::audio::core::BnStreamCallback,
+                              public StreamEventReceiver {
+    ndk::ScopedAStatus onTransferReady() override {
+        LOG(DEBUG) << __func__;
+        putLastEvent(Event::TransferReady);
+        return ndk::ScopedAStatus::ok();
+    }
+    ndk::ScopedAStatus onError() override {
+        LOG(DEBUG) << __func__;
+        putLastEvent(Event::Error);
+        return ndk::ScopedAStatus::ok();
+    }
+    ndk::ScopedAStatus onDrainReady() override {
+        LOG(DEBUG) << __func__;
+        putLastEvent(Event::DrainReady);
+        return ndk::ScopedAStatus::ok();
+    }
+
+  public:
+    // To avoid timing out the whole test suite in case no event is received
+    // from the HAL, use a local timeout for event waiting.
+    static constexpr auto kEventTimeoutMs = std::chrono::milliseconds(1000);
+
+    StreamEventReceiver* getEventReceiver() { return this; }
+    std::tuple<int, Event> getLastEvent() const override {
+        std::lock_guard l(mLock);
+        return getLastEvent_l();
+    }
+    std::tuple<int, Event> waitForEvent(int clientEventSeq) override {
+        std::unique_lock l(mLock);
+        android::base::ScopedLockAssertion lock_assertion(mLock);
+        LOG(DEBUG) << __func__ << ": client " << clientEventSeq << ", last " << mLastEventSeq;
+        if (mCv.wait_for(l, kEventTimeoutMs, [&]() {
+                android::base::ScopedLockAssertion lock_assertion(mLock);
+                return clientEventSeq < mLastEventSeq;
+            })) {
+        } else {
+            LOG(WARNING) << __func__ << ": timed out waiting for an event";
+            putLastEvent_l(Event::None);
+        }
+        return getLastEvent_l();
+    }
+
+  private:
+    std::tuple<int, Event> getLastEvent_l() const REQUIRES(mLock) {
+        return std::make_tuple(mLastEventSeq, mLastEvent);
+    }
+    void putLastEvent(Event event) {
+        {
+            std::lock_guard l(mLock);
+            putLastEvent_l(event);
+        }
+        mCv.notify_one();
+    }
+    void putLastEvent_l(Event event) REQUIRES(mLock) {
+        mLastEventSeq++;
+        mLastEvent = event;
+    }
+
+    mutable std::mutex mLock;
+    std::condition_variable mCv;
+    int mLastEventSeq GUARDED_BY(mLock) = kEventSeqInit;
+    Event mLastEvent GUARDED_BY(mLock) = Event::None;
+};
+
 template <typename T>
 struct IOTraits {
     static constexpr bool is_input = std::is_same_v<T, IStreamIn>;
@@ -607,6 +858,7 @@
     }
     Stream* get() const { return mStream.get(); }
     const StreamContext* getContext() const { return mContext ? &(mContext.value()) : nullptr; }
+    StreamEventReceiver* getEventReceiver() { return mStreamCallback->getEventReceiver(); }
     std::shared_ptr<Stream> getSharedPointer() const { return mStream; }
     const AudioPortConfig& getPortConfig() const { return mPortConfig.get(); }
     int32_t getPortId() const { return mPortConfig.getId(); }
@@ -616,6 +868,7 @@
     std::shared_ptr<Stream> mStream;
     StreamDescriptor mDescriptor;
     std::optional<StreamContext> mContext;
+    std::shared_ptr<DefaultStreamCallback> mStreamCallback;
 };
 
 SinkMetadata GenerateSinkMetadata(const AudioPortConfig& portConfig) {
@@ -636,11 +889,15 @@
     args.portConfigId = portConfig.id;
     args.sinkMetadata = GenerateSinkMetadata(portConfig);
     args.bufferSizeFrames = bufferSizeFrames;
+    auto callback = ndk::SharedRefBase::make<DefaultStreamCallback>();
+    // TODO: Uncomment when support for asynchronous input is implemented.
+    // args.callback = callback;
     aidl::android::hardware::audio::core::IModule::OpenInputStreamReturn ret;
     ScopedAStatus status = module->openInputStream(args, &ret);
     if (status.isOk()) {
         mStream = std::move(ret.stream);
         mDescriptor = std::move(ret.desc);
+        mStreamCallback = std::move(callback);
     }
     return status;
 }
@@ -665,11 +922,14 @@
     args.sourceMetadata = GenerateSourceMetadata(portConfig);
     args.offloadInfo = ModuleConfig::generateOffloadInfoIfNeeded(portConfig);
     args.bufferSizeFrames = bufferSizeFrames;
+    auto callback = ndk::SharedRefBase::make<DefaultStreamCallback>();
+    args.callback = callback;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
     ScopedAStatus status = module->openOutputStream(args, &ret);
     if (status.isOk()) {
         mStream = std::move(ret.stream);
         mDescriptor = std::move(ret.desc);
+        mStreamCallback = std::move(callback);
     }
     return status;
 }
@@ -1379,10 +1639,10 @@
     }
 }
 
+using CommandSequence = std::vector<StreamDescriptor::Command>;
 class StreamLogicDriverInvalidCommand : public StreamLogicDriver {
   public:
-    StreamLogicDriverInvalidCommand(const std::vector<StreamDescriptor::Command>& commands)
-        : mCommands(commands) {}
+    StreamLogicDriverInvalidCommand(const CommandSequence& commands) : mCommands(commands) {}
 
     std::string getUnexpectedStatuses() {
         // This method is intended to be called after the worker thread has joined,
@@ -1396,25 +1656,29 @@
     }
 
     bool done() override { return mNextCommand >= mCommands.size(); }
-    StreamDescriptor::Command getNextCommand(int, int* actualSize) override {
+    TransitionTrigger getNextTrigger(int, int* actualSize) override {
         if (actualSize != nullptr) *actualSize = 0;
         return mCommands[mNextCommand++];
     }
     bool interceptRawReply(const StreamDescriptor::Reply& reply) override {
-        if (reply.status != STATUS_BAD_VALUE) {
-            std::string s = mCommands[mNextCommand - 1].toString();
+        const size_t currentCommand = mNextCommand - 1;  // increased by getNextTrigger
+        const bool isLastCommand = currentCommand == mCommands.size() - 1;
+        // All but the last command should run correctly. The last command must return 'BAD_VALUE'
+        // status.
+        if ((!isLastCommand && reply.status != STATUS_OK) ||
+            (isLastCommand && reply.status != STATUS_BAD_VALUE)) {
+            std::string s = mCommands[currentCommand].toString();
             s.append(", ").append(statusToString(reply.status));
             mStatuses.push_back(std::move(s));
-            // If the HAL does not recognize the command as invalid,
-            // retrieve the data etc.
-            return reply.status != STATUS_OK;
+            // Process the reply, since the worker exits in case of an error.
+            return false;
         }
-        return true;
+        return isLastCommand;
     }
     bool processValidReply(const StreamDescriptor::Reply&) override { return true; }
 
   private:
-    const std::vector<StreamDescriptor::Command> mCommands;
+    const CommandSequence mCommands;
     size_t mNextCommand = 0;
     std::vector<std::string> mStatuses;
 };
@@ -1556,22 +1820,46 @@
     }
 
     void SendInvalidCommandImpl(const AudioPortConfig& portConfig) {
-        std::vector<StreamDescriptor::Command> commands = {
-                StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::hal_reserved_exit>(
-                        0),
-                // TODO: For proper testing of input streams, need to put the stream into
-                // a state which accepts BURST commands.
-                StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::burst>(-1),
-                StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::burst>(
-                        std::numeric_limits<int32_t>::min()),
-        };
-        WithStream<Stream> stream(portConfig);
-        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
-        StreamLogicDriverInvalidCommand driver(commands);
-        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
-        ASSERT_TRUE(worker.start());
-        worker.join();
-        EXPECT_EQ("", driver.getUnexpectedStatuses());
+        using TestSequence = std::pair<std::string, CommandSequence>;
+        // The last command in 'CommandSequence' is the one that must trigger
+        // an error status. All preceding commands are to put the state machine
+        // into a state which accepts the last command.
+        std::vector<TestSequence> sequences{
+                std::make_pair(std::string("HalReservedExit"),
+                               std::vector{StreamDescriptor::Command::make<
+                                       StreamDescriptor::Command::Tag::halReservedExit>(0)}),
+                std::make_pair(std::string("BurstNeg"),
+                               std::vector{kStartCommand,
+                                           StreamDescriptor::Command::make<
+                                                   StreamDescriptor::Command::Tag::burst>(-1)}),
+                std::make_pair(
+                        std::string("BurstMinInt"),
+                        std::vector{kStartCommand, StreamDescriptor::Command::make<
+                                                           StreamDescriptor::Command::Tag::burst>(
+                                                           std::numeric_limits<int32_t>::min())})};
+        if (IOTraits<Stream>::is_input) {
+            sequences.emplace_back("DrainAll",
+                                   std::vector{kStartCommand, kBurstCommand, kDrainOutAllCommand});
+            sequences.emplace_back(
+                    "DrainEarly", std::vector{kStartCommand, kBurstCommand, kDrainOutEarlyCommand});
+        } else {
+            sequences.emplace_back("DrainUnspecified",
+                                   std::vector{kStartCommand, kBurstCommand, kDrainInCommand});
+        }
+        for (const auto& seq : sequences) {
+            SCOPED_TRACE(std::string("Sequence ").append(seq.first));
+            LOG(DEBUG) << __func__ << ": Sequence " << seq.first;
+            WithStream<Stream> stream(portConfig);
+            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+            StreamLogicDriverInvalidCommand driver(seq.second);
+            typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver,
+                                                     stream.getEventReceiver());
+            LOG(DEBUG) << __func__ << ": starting worker...";
+            ASSERT_TRUE(worker.start());
+            LOG(DEBUG) << __func__ << ": joining worker...";
+            worker.join();
+            EXPECT_EQ("", driver.getUnexpectedStatuses());
+        }
     }
 };
 using AudioStreamIn = AudioStream<IStreamIn>;
@@ -1615,27 +1903,51 @@
         GTEST_SKIP()
                 << "No mix port for compressed offload that could be routed to attached devices";
     }
-    const auto portConfig =
-            moduleConfig->getSingleConfigForMixPort(false, *offloadMixPorts.begin());
-    ASSERT_TRUE(portConfig.has_value())
-            << "No profiles specified for the compressed offload mix port";
+    const auto config = moduleConfig->getSingleConfigForMixPort(false, *offloadMixPorts.begin());
+    ASSERT_TRUE(config.has_value()) << "No profiles specified for the compressed offload mix port";
+    WithAudioPortConfig portConfig(config.value());
+    ASSERT_NO_FATAL_FAILURE(portConfig.SetUp(module.get()));
     StreamDescriptor descriptor;
     std::shared_ptr<IStreamOut> ignored;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
-    args.portConfigId = portConfig.value().id;
-    args.sourceMetadata = GenerateSourceMetadata(portConfig.value());
+    args.portConfigId = portConfig.getId();
+    args.sourceMetadata = GenerateSourceMetadata(portConfig.get());
     args.bufferSizeFrames = kDefaultBufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
     EXPECT_STATUS(EX_ILLEGAL_ARGUMENT, module->openOutputStream(args, &ret))
             << "when no offload info is provided for a compressed offload mix port";
 }
 
-using CommandAndState = std::pair<StreamDescriptor::Command, StreamDescriptor::State>;
+TEST_P(AudioStreamOut, RequireAsyncCallback) {
+    const auto nonBlockingMixPorts =
+            moduleConfig->getNonBlockingMixPorts(true /*attachedOnly*/, true /*singlePort*/);
+    if (nonBlockingMixPorts.empty()) {
+        GTEST_SKIP()
+                << "No mix port for non-blocking output that could be routed to attached devices";
+    }
+    const auto config =
+            moduleConfig->getSingleConfigForMixPort(false, *nonBlockingMixPorts.begin());
+    ASSERT_TRUE(config.has_value()) << "No profiles specified for the non-blocking mix port";
+    WithAudioPortConfig portConfig(config.value());
+    ASSERT_NO_FATAL_FAILURE(portConfig.SetUp(module.get()));
+    StreamDescriptor descriptor;
+    std::shared_ptr<IStreamOut> ignored;
+    aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
+    args.portConfigId = portConfig.getId();
+    args.sourceMetadata = GenerateSourceMetadata(portConfig.get());
+    args.offloadInfo = ModuleConfig::generateOffloadInfoIfNeeded(portConfig.get());
+    args.bufferSizeFrames = kDefaultBufferSizeFrames;
+    aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
+    EXPECT_STATUS(EX_ILLEGAL_ARGUMENT, module->openOutputStream(args, &ret))
+            << "when no async callback is provided for a non-blocking mix port";
+}
 
 class StreamLogicDefaultDriver : public StreamLogicDriver {
   public:
-    explicit StreamLogicDefaultDriver(const std::vector<CommandAndState>& commands)
-        : mCommands(commands) {}
+    explicit StreamLogicDefaultDriver(std::shared_ptr<StateSequence> commands)
+        : mCommands(commands) {
+        mCommands->rewind();
+    }
 
     // The three methods below is intended to be called after the worker
     // thread has joined, thus no extra synchronization is needed.
@@ -1643,59 +1955,72 @@
     bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; }
     std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; }
 
-    bool done() override { return mNextCommand >= mCommands.size(); }
-    StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override {
-        auto command = mCommands[mNextCommand++].first;
-        if (command.getTag() == StreamDescriptor::Command::Tag::burst) {
-            if (actualSize != nullptr) {
-                // In the output scenario, reduce slightly the fmqByteCount to verify
-                // that the HAL module always consumes all data from the MQ.
-                if (maxDataSize > 1) maxDataSize--;
-                *actualSize = maxDataSize;
+    bool done() override { return mCommands->done(); }
+    TransitionTrigger getNextTrigger(int maxDataSize, int* actualSize) override {
+        auto trigger = mCommands->getTrigger();
+        if (StreamDescriptor::Command* command = std::get_if<StreamDescriptor::Command>(&trigger);
+            command != nullptr) {
+            if (command->getTag() == StreamDescriptor::Command::Tag::burst) {
+                if (actualSize != nullptr) {
+                    // In the output scenario, reduce slightly the fmqByteCount to verify
+                    // that the HAL module always consumes all data from the MQ.
+                    if (maxDataSize > 1) maxDataSize--;
+                    *actualSize = maxDataSize;
+                }
+                command->set<StreamDescriptor::Command::Tag::burst>(maxDataSize);
+            } else {
+                if (actualSize != nullptr) *actualSize = 0;
             }
-            command.set<StreamDescriptor::Command::Tag::burst>(maxDataSize);
-        } else {
-            if (actualSize != nullptr) *actualSize = 0;
         }
-        return command;
+        return trigger;
     }
     bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; }
     bool processValidReply(const StreamDescriptor::Reply& reply) override {
-        if (mPreviousFrames.has_value()) {
-            if (reply.observable.frames > mPreviousFrames.value()) {
-                mObservablePositionIncrease = true;
-            } else if (reply.observable.frames < mPreviousFrames.value()) {
-                mRetrogradeObservablePosition = true;
+        if (reply.observable.frames != StreamDescriptor::Position::UNKNOWN) {
+            if (mPreviousFrames.has_value()) {
+                if (reply.observable.frames > mPreviousFrames.value()) {
+                    mObservablePositionIncrease = true;
+                } else if (reply.observable.frames < mPreviousFrames.value()) {
+                    mRetrogradeObservablePosition = true;
+                }
             }
+            mPreviousFrames = reply.observable.frames;
         }
-        mPreviousFrames = reply.observable.frames;
 
-        const auto& lastCommandState = mCommands[mNextCommand - 1];
-        if (lastCommandState.second != reply.state) {
-            std::string s = std::string("Unexpected transition from the state ")
-                                    .append(mPreviousState)
-                                    .append(" to ")
-                                    .append(toString(reply.state))
-                                    .append(" caused by the command ")
-                                    .append(lastCommandState.first.toString());
+        auto expected = mCommands->getExpectedStates();
+        if (expected.count(reply.state) == 0) {
+            std::string s =
+                    std::string("Unexpected transition from the state ")
+                            .append(mPreviousState.has_value() ? toString(mPreviousState.value())
+                                                               : "<initial state>")
+                            .append(" to ")
+                            .append(toString(reply.state))
+                            .append(" (expected one of ")
+                            .append(::android::internal::ToString(expected))
+                            .append(") caused by the ")
+                            .append(toString(mCommands->getTrigger()));
             LOG(ERROR) << __func__ << ": " << s;
             mUnexpectedTransition = std::move(s);
             return false;
         }
+        mCommands->advance(reply.state);
+        mPreviousState = reply.state;
         return true;
     }
 
   protected:
-    const std::vector<CommandAndState>& mCommands;
-    size_t mNextCommand = 0;
+    std::shared_ptr<StateSequence> mCommands;
+    std::optional<StreamDescriptor::State> mPreviousState;
     std::optional<int64_t> mPreviousFrames;
-    std::string mPreviousState = "<initial state>";
     bool mObservablePositionIncrease = false;
     bool mRetrogradeObservablePosition = false;
     std::string mUnexpectedTransition;
 };
 
-using NamedCommandSequence = std::pair<std::string, std::vector<CommandAndState>>;
+enum { NAMED_CMD_NAME, NAMED_CMD_DELAY_MS, NAMED_CMD_STREAM_TYPE, NAMED_CMD_CMDS };
+enum class StreamTypeFilter { ANY, SYNC, ASYNC };
+using NamedCommandSequence =
+        std::tuple<std::string, int, StreamTypeFilter, std::shared_ptr<StateSequence>>;
 enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ };
 using StreamIoTestParameters =
         std::tuple<std::string /*moduleName*/, NamedCommandSequence, bool /*useSetupSequence2*/>;
@@ -1716,7 +2041,29 @@
         }
         for (const auto& portConfig : allPortConfigs) {
             SCOPED_TRACE(portConfig.toString());
-            const auto& commandsAndStates = std::get<PARAM_CMD_SEQ>(GetParam()).second;
+            const bool isNonBlocking =
+                    IOTraits<Stream>::is_input
+                            ? false
+                            :
+                            // TODO: Uncomment when support for asynchronous input is implemented.
+                            /*isBitPositionFlagSet(
+                              portConfig.flags.value().template get<AudioIoFlags::Tag::input>(),
+                              AudioInputFlags::NON_BLOCKING) :*/
+                            isBitPositionFlagSet(portConfig.flags.value()
+                                                         .template get<AudioIoFlags::Tag::output>(),
+                                                 AudioOutputFlags::NON_BLOCKING);
+            if (auto streamType =
+                        std::get<NAMED_CMD_STREAM_TYPE>(std::get<PARAM_CMD_SEQ>(GetParam()));
+                (isNonBlocking && streamType == StreamTypeFilter::SYNC) ||
+                (!isNonBlocking && streamType == StreamTypeFilter::ASYNC)) {
+                continue;
+            }
+            WithDebugFlags delayTransientStates = WithDebugFlags::createNested(debug);
+            delayTransientStates.flags().streamTransientStateDelayMs =
+                    std::get<NAMED_CMD_DELAY_MS>(std::get<PARAM_CMD_SEQ>(GetParam()));
+            ASSERT_NO_FATAL_FAILURE(delayTransientStates.SetUp(module.get()));
+            const auto& commandsAndStates =
+                    std::get<NAMED_CMD_CMDS>(std::get<PARAM_CMD_SEQ>(GetParam()));
             if (!std::get<PARAM_SETUP_SEQ>(GetParam())) {
                 ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates));
             } else {
@@ -1732,7 +2079,7 @@
 
     // Set up a patch first, then open a stream.
     void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig,
-                                     const std::vector<CommandAndState>& commandsAndStates) {
+                                     std::shared_ptr<StateSequence> commandsAndStates) {
         auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
                 IOTraits<Stream>::is_input, portConfig);
         ASSERT_FALSE(devicePorts.empty());
@@ -1743,9 +2090,12 @@
         WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
         ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
         StreamLogicDefaultDriver driver(commandsAndStates);
-        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver,
+                                                 stream.getEventReceiver());
 
+        LOG(DEBUG) << __func__ << ": starting worker...";
         ASSERT_TRUE(worker.start());
+        LOG(DEBUG) << __func__ << ": joining worker...";
         worker.join();
         EXPECT_FALSE(worker.hasError()) << worker.getError();
         EXPECT_EQ("", driver.getUnexpectedStateTransition());
@@ -1757,11 +2107,12 @@
 
     // Open a stream, then set up a patch for it.
     void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig,
-                                     const std::vector<CommandAndState>& commandsAndStates) {
+                                     std::shared_ptr<StateSequence> commandsAndStates) {
         WithStream<Stream> stream(portConfig);
         ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
         StreamLogicDefaultDriver driver(commandsAndStates);
-        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+        typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver,
+                                                 stream.getEventReceiver());
 
         auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
                 IOTraits<Stream>::is_input, portConfig);
@@ -1770,7 +2121,9 @@
         WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
         ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
 
+        LOG(DEBUG) << __func__ << ": starting worker...";
         ASSERT_TRUE(worker.start());
+        LOG(DEBUG) << __func__ << ": joining worker...";
         worker.join();
         EXPECT_FALSE(worker.hasError()) << worker.getError();
         EXPECT_EQ("", driver.getUnexpectedStateTransition());
@@ -1975,103 +2328,210 @@
                          android::PrintInstanceNameToString);
 GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut);
 
-static const StreamDescriptor::Command kStartCommand =
-        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::start>(Void{});
-static const StreamDescriptor::Command kBurstCommand =
-        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::burst>(0);
-static const StreamDescriptor::Command kDrainCommand =
-        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::drain>(Void{});
-static const StreamDescriptor::Command kStandbyCommand =
-        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::standby>(Void{});
-static const StreamDescriptor::Command kPauseCommand =
-        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::pause>(Void{});
-static const StreamDescriptor::Command kFlushCommand =
-        StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::flush>(Void{});
-static const NamedCommandSequence kReadOrWriteSeq =
-        std::make_pair(std::string("ReadOrWrite"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE)});
-static const NamedCommandSequence kDrainInSeq =
-        std::make_pair(std::string("Drain"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kDrainCommand, StreamDescriptor::State::DRAINING),
-                               std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kDrainCommand, StreamDescriptor::State::DRAINING),
-                               // TODO: This will need to be changed once DRAIN starts taking time.
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::STANDBY)});
-static const NamedCommandSequence kDrainOutSeq =
-        std::make_pair(std::string("Drain"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               // TODO: This will need to be changed once DRAIN starts taking time.
-                               std::make_pair(kDrainCommand, StreamDescriptor::State::IDLE)});
-// TODO: This will need to be changed once DRAIN starts taking time so we can pause it.
-static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair(
-        std::string("DrainPause"),
-        std::vector<CommandAndState>{std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                                     std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                                     std::make_pair(kDrainCommand, StreamDescriptor::State::IDLE)});
-static const NamedCommandSequence kStandbySeq =
-        std::make_pair(std::string("Standby"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kStandbyCommand, StreamDescriptor::State::STANDBY),
-                               // Perform a read or write in order to advance observable position
-                               // (this is verified by tests).
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE)});
+// This is the value used in test sequences for which the test needs to ensure
+// that the HAL stays in a transient state long enough to receive the next command.
+static const int kStreamTransientStateTransitionDelayMs = 3000;
+
+// TODO: Add async test cases for input once it is implemented.
+
+std::shared_ptr<StateSequence> makeBurstCommands(bool isSync, size_t burstCount) {
+    const auto burst =
+            isSync ? std::vector<StateTransition>{std::make_pair(kBurstCommand,
+                                                                 StreamDescriptor::State::ACTIVE)}
+                   : std::vector<StateTransition>{
+                             std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFERRING),
+                             std::make_pair(kTransferReadyEvent, StreamDescriptor::State::ACTIVE)};
+    std::vector<StateTransition> result{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE)};
+    for (size_t i = 0; i < burstCount; ++i) {
+        result.insert(result.end(), burst.begin(), burst.end());
+    }
+    return std::make_shared<SmartStateSequence>(result);
+}
+static const NamedCommandSequence kReadSeq =
+        std::make_tuple(std::string("Read"), 0, StreamTypeFilter::ANY, makeBurstCommands(true, 3));
+static const NamedCommandSequence kWriteSyncSeq = std::make_tuple(
+        std::string("Write"), 0, StreamTypeFilter::SYNC, makeBurstCommands(true, 3));
+static const NamedCommandSequence kWriteAsyncSeq = std::make_tuple(
+        std::string("Write"), 0, StreamTypeFilter::ASYNC, makeBurstCommands(false, 3));
+
+std::shared_ptr<StateSequence> makeAsyncDrainCommands(bool isInput) {
+    return std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, isInput ? StreamDescriptor::State::ACTIVE
+                                                  : StreamDescriptor::State::TRANSFERRING),
+            std::make_pair(isInput ? kDrainInCommand : kDrainOutAllCommand,
+                           StreamDescriptor::State::DRAINING),
+            isInput ? std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE)
+                    : std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFERRING),
+            std::make_pair(isInput ? kDrainInCommand : kDrainOutAllCommand,
+                           StreamDescriptor::State::DRAINING)});
+}
+static const NamedCommandSequence kWriteDrainAsyncSeq =
+        std::make_tuple(std::string("WriteDrain"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::ASYNC, makeAsyncDrainCommands(false));
+static const NamedCommandSequence kDrainInSeq = std::make_tuple(
+        std::string("Drain"), 0, StreamTypeFilter::ANY, makeAsyncDrainCommands(true));
+
+std::shared_ptr<StateSequence> makeDrainOutCommands(bool isSync) {
+    return std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
+            std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAINING),
+            std::make_pair(isSync ? TransitionTrigger(kGetStatusCommand)
+                                  : TransitionTrigger(kDrainReadyEvent),
+                           StreamDescriptor::State::IDLE)});
+}
+static const NamedCommandSequence kDrainOutSyncSeq = std::make_tuple(
+        std::string("Drain"), 0, StreamTypeFilter::SYNC, makeDrainOutCommands(true));
+static const NamedCommandSequence kDrainOutAsyncSeq = std::make_tuple(
+        std::string("Drain"), 0, StreamTypeFilter::ASYNC, makeDrainOutCommands(false));
+
+std::shared_ptr<StateSequence> makeDrainOutPauseCommands(bool isSync) {
+    return std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, isSync ? StreamDescriptor::State::ACTIVE
+                                                 : StreamDescriptor::State::TRANSFERRING),
+            std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAINING),
+            std::make_pair(kPauseCommand, StreamDescriptor::State::DRAIN_PAUSED),
+            std::make_pair(kStartCommand, StreamDescriptor::State::DRAINING),
+            std::make_pair(kPauseCommand, StreamDescriptor::State::DRAIN_PAUSED),
+            std::make_pair(kBurstCommand, isSync ? StreamDescriptor::State::PAUSED
+                                                 : StreamDescriptor::State::TRANSFER_PAUSED)});
+}
+static const NamedCommandSequence kDrainPauseOutSyncSeq =
+        std::make_tuple(std::string("DrainPause"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::SYNC, makeDrainOutPauseCommands(true));
+static const NamedCommandSequence kDrainPauseOutAsyncSeq =
+        std::make_tuple(std::string("DrainPause"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::ASYNC, makeDrainOutPauseCommands(false));
+
+// This sequence also verifies that the capture / presentation position is not reset on standby.
+std::shared_ptr<StateSequence> makeStandbyCommands(bool isInput, bool isSync) {
+    return std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kStandbyCommand, StreamDescriptor::State::STANDBY),
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, isInput || isSync
+                                                  ? StreamDescriptor::State::ACTIVE
+                                                  : StreamDescriptor::State::TRANSFERRING),
+            std::make_pair(kPauseCommand, isInput || isSync
+                                                  ? StreamDescriptor::State::PAUSED
+                                                  : StreamDescriptor::State::TRANSFER_PAUSED),
+            std::make_pair(kFlushCommand, isInput ? StreamDescriptor::State::STANDBY
+                                                  : StreamDescriptor::State::IDLE),
+            std::make_pair(isInput ? kGetStatusCommand : kStandbyCommand,  // no-op for input
+                           StreamDescriptor::State::STANDBY),
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, isInput || isSync
+                                                  ? StreamDescriptor::State::ACTIVE
+                                                  : StreamDescriptor::State::TRANSFERRING)});
+}
+static const NamedCommandSequence kStandbyInSeq = std::make_tuple(
+        std::string("Standby"), 0, StreamTypeFilter::ANY, makeStandbyCommands(true, false));
+static const NamedCommandSequence kStandbyOutSyncSeq = std::make_tuple(
+        std::string("Standby"), 0, StreamTypeFilter::SYNC, makeStandbyCommands(false, true));
+static const NamedCommandSequence kStandbyOutAsyncSeq =
+        std::make_tuple(std::string("Standby"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::ASYNC, makeStandbyCommands(false, false));
+
 static const NamedCommandSequence kPauseInSeq =
-        std::make_pair(std::string("Pause"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
-                               std::make_pair(kFlushCommand, StreamDescriptor::State::STANDBY)});
-static const NamedCommandSequence kPauseOutSeq =
-        std::make_pair(std::string("Pause"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
-                               std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::PAUSED),
-                               std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED)});
-static const NamedCommandSequence kFlushInSeq =
-        std::make_pair(std::string("Flush"),
-                       std::vector<CommandAndState>{
-                               std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                               std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                               std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
-                               std::make_pair(kFlushCommand, StreamDescriptor::State::STANDBY)});
-static const NamedCommandSequence kFlushOutSeq = std::make_pair(
-        std::string("Flush"),
-        std::vector<CommandAndState>{std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
-                                     std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
-                                     std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
-                                     std::make_pair(kFlushCommand, StreamDescriptor::State::IDLE)});
+        std::make_tuple(std::string("Pause"), 0, StreamTypeFilter::ANY,
+                        std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+                                std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+                                std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
+                                std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
+                                std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
+                                std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
+                                std::make_pair(kFlushCommand, StreamDescriptor::State::STANDBY)}));
+static const NamedCommandSequence kPauseOutSyncSeq =
+        std::make_tuple(std::string("Pause"), 0, StreamTypeFilter::SYNC,
+                        std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+                                std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+                                std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE),
+                                std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
+                                std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE),
+                                std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED),
+                                std::make_pair(kBurstCommand, StreamDescriptor::State::PAUSED),
+                                std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE),
+                                std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED)}));
+/* TODO: Figure out a better way for testing sync/async bursts
+static const NamedCommandSequence kPauseOutAsyncSeq = std::make_tuple(
+        std::string("Pause"), kStreamTransientStateTransitionDelayMs, StreamTypeFilter::ASYNC,
+        std::make_shared<StaticStateSequence>(std::vector<StateTransition>{
+                std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+                std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFERRING),
+                std::make_pair(kPauseCommand, StreamDescriptor::State::TRANSFER_PAUSED),
+                std::make_pair(kStartCommand, StreamDescriptor::State::TRANSFERRING),
+                std::make_pair(kPauseCommand, StreamDescriptor::State::TRANSFER_PAUSED),
+                std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAIN_PAUSED),
+                std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFER_PAUSED)}));
+*/
+
+std::shared_ptr<StateSequence> makeFlushCommands(bool isInput, bool isSync) {
+    return std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, isInput || isSync
+                                                  ? StreamDescriptor::State::ACTIVE
+                                                  : StreamDescriptor::State::TRANSFERRING),
+            std::make_pair(kPauseCommand, isInput || isSync
+                                                  ? StreamDescriptor::State::PAUSED
+                                                  : StreamDescriptor::State::TRANSFER_PAUSED),
+            std::make_pair(kFlushCommand, isInput ? StreamDescriptor::State::STANDBY
+                                                  : StreamDescriptor::State::IDLE)});
+}
+static const NamedCommandSequence kFlushInSeq = std::make_tuple(
+        std::string("Flush"), 0, StreamTypeFilter::ANY, makeFlushCommands(true, false));
+static const NamedCommandSequence kFlushOutSyncSeq = std::make_tuple(
+        std::string("Flush"), 0, StreamTypeFilter::SYNC, makeFlushCommands(false, true));
+static const NamedCommandSequence kFlushOutAsyncSeq =
+        std::make_tuple(std::string("Flush"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::ASYNC, makeFlushCommands(false, false));
+
+std::shared_ptr<StateSequence> makeDrainPauseFlushOutCommands(bool isSync) {
+    return std::make_shared<SmartStateSequence>(std::vector<StateTransition>{
+            std::make_pair(kStartCommand, StreamDescriptor::State::IDLE),
+            std::make_pair(kBurstCommand, isSync ? StreamDescriptor::State::ACTIVE
+                                                 : StreamDescriptor::State::TRANSFERRING),
+            std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAINING),
+            std::make_pair(kPauseCommand, StreamDescriptor::State::DRAIN_PAUSED),
+            std::make_pair(kFlushCommand, StreamDescriptor::State::IDLE)});
+}
+static const NamedCommandSequence kDrainPauseFlushOutSyncSeq =
+        std::make_tuple(std::string("DrainPauseFlush"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::SYNC, makeDrainPauseFlushOutCommands(true));
+static const NamedCommandSequence kDrainPauseFlushOutAsyncSeq =
+        std::make_tuple(std::string("DrainPauseFlush"), kStreamTransientStateTransitionDelayMs,
+                        StreamTypeFilter::ASYNC, makeDrainPauseFlushOutCommands(false));
+
+// Note, this isn't the "official" enum printer, it is only used to make the test name suffix.
+std::string PrintStreamFilterToString(StreamTypeFilter filter) {
+    switch (filter) {
+        case StreamTypeFilter::ANY:
+            return "";
+        case StreamTypeFilter::SYNC:
+            return "Sync";
+        case StreamTypeFilter::ASYNC:
+            return "Async";
+    }
+    return std::string("Unknown").append(std::to_string(static_cast<int32_t>(filter)));
+}
 std::string GetStreamIoTestName(const testing::TestParamInfo<StreamIoTestParameters>& info) {
     return android::PrintInstanceNameToString(
                    testing::TestParamInfo<std::string>{std::get<PARAM_MODULE_NAME>(info.param),
                                                        info.index})
             .append("_")
-            .append(std::get<PARAM_CMD_SEQ>(info.param).first)
+            .append(std::get<NAMED_CMD_NAME>(std::get<PARAM_CMD_SEQ>(info.param)))
+            .append(PrintStreamFilterToString(
+                    std::get<NAMED_CMD_STREAM_TYPE>(std::get<PARAM_CMD_SEQ>(info.param))))
             .append("_SetupSeq")
             .append(std::get<PARAM_SETUP_SEQ>(info.param) ? "2" : "1");
 }
+
 INSTANTIATE_TEST_SUITE_P(
         AudioStreamIoInTest, AudioStreamIoIn,
         testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
-                         testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq,
+                         testing::Values(kReadSeq, kDrainInSeq, kStandbyInSeq, kPauseInSeq,
                                          kFlushInSeq),
                          testing::Values(false, true)),
         GetStreamIoTestName);
@@ -2079,8 +2539,13 @@
 INSTANTIATE_TEST_SUITE_P(
         AudioStreamIoOutTest, AudioStreamIoOut,
         testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
-                         testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq,
-                                         kStandbySeq, kPauseOutSeq, kFlushOutSeq),
+                         testing::Values(kWriteSyncSeq, kWriteAsyncSeq, kWriteDrainAsyncSeq,
+                                         kDrainOutSyncSeq, kDrainPauseOutSyncSeq,
+                                         kDrainPauseOutAsyncSeq, kStandbyOutSyncSeq,
+                                         kStandbyOutAsyncSeq,
+                                         kPauseOutSyncSeq,  // kPauseOutAsyncSeq,
+                                         kFlushOutSyncSeq, kFlushOutAsyncSeq,
+                                         kDrainPauseFlushOutSyncSeq, kDrainPauseFlushOutAsyncSeq),
                          testing::Values(false, true)),
         GetStreamIoTestName);
 GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut);
@@ -2095,7 +2560,6 @@
     void OnTestStart(const ::testing::TestInfo& test_info) override {
         TraceTestState("Started", test_info);
     }
-
     void OnTestEnd(const ::testing::TestInfo& test_info) override {
         TraceTestState("Completed", test_info);
     }
@@ -2109,6 +2573,7 @@
 int main(int argc, char** argv) {
     ::testing::InitGoogleTest(&argc, argv);
     ::testing::UnitTest::GetInstance()->listeners().Append(new TestExecutionTracer());
+    android::base::SetMinimumLogSeverity(::android::base::DEBUG);
     ABinderProcess_setThreadPoolMaxThreadCount(1);
     ABinderProcess_startThreadPool();
     return RUN_ALL_TESTS();