audio: Implementation of audio I/O, part II

This patch implements audio I/O for the synchronous, non-MMAP
case.

Updated the StreamDescriptor structure to make it usable.
Clarified comments on the expectations for the client and
the HAL module.

Bug: 205884982
Test: atest VtsHalAudioCoreTargetTest
Merged-In: I09651c6e80a397c80870622ac19234b4d4a38cbb
Change-Id: I09651c6e80a397c80870622ac19234b4d4a38cbb
(cherry picked from commit 01803d454ac192f4b6b732944f0be324b1b03a7f)
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
index 472a8a2..e7ed502 100644
--- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
@@ -38,11 +38,7 @@
   android.hardware.common.fmq.MQDescriptor<android.hardware.audio.core.StreamDescriptor.Reply,android.hardware.common.fmq.SynchronizedReadWrite> reply;
   long bufferSizeFrames;
   android.hardware.audio.core.StreamDescriptor.AudioBuffer audio;
-  const int COMMAND_EXIT = 0;
   const int COMMAND_BURST = 1;
-  const int STATUS_OK = 0;
-  const int STATUS_ILLEGAL_ARGUMENT = 1;
-  const int STATUS_ILLEGAL_STATE = 2;
   @FixedSize @VintfStability
   parcelable Position {
     long frames;
@@ -63,7 +59,7 @@
   }
   @VintfStability
   union AudioBuffer {
-    android.hardware.common.fmq.MQDescriptor<byte,android.hardware.common.fmq.UnsynchronizedWrite> fmq;
+    android.hardware.common.fmq.MQDescriptor<byte,android.hardware.common.fmq.SynchronizedReadWrite> fmq;
     android.hardware.audio.core.MmapBufferDescriptor mmap;
   }
 }
diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl
index 363eb68..735f87f 100644
--- a/audio/aidl/android/hardware/audio/core/IModule.aidl
+++ b/audio/aidl/android/hardware/audio/core/IModule.aidl
@@ -255,12 +255,16 @@
      *
      * Note that although it's not prohibited to open a stream on a mix port
      * configuration which is not connected (using a patch) to any device port,
-     * and set up a patch afterwards, this is not the recommended sequence of
-     * calls, because setting up of a patch might fail due to an insufficient
-     * stream buffer size.
+     * and set up a patch afterwards, this sequence of calls is not recommended,
+     * because setting up of a patch might fail due to an insufficient stream
+     * buffer size. Another consequence of having a stream on an unconnected mix
+     * port is that capture positions can not be determined because there is no
+     * "external observer," thus read operations done via StreamDescriptor will
+     * be completing with an error, although data (zero filled) will still be
+     * provided.
      *
      * @return An opened input stream and the associated descriptor.
-     * @param args Input arguments, see 'OpenInputStreamArguments' parcelable.
+     * @param args The pack of arguments, see 'OpenInputStreamArguments' parcelable.
      * @throws EX_ILLEGAL_ARGUMENT In the following cases:
      *                             - If the port config can not be found by the ID.
      *                             - If the port config is not of an input mix port.
@@ -269,6 +273,7 @@
      *                          - If the port config already has a stream opened on it.
      *                          - If the limit on the open stream count for the port has
      *                            been reached.
+     *                          - If the HAL module failed to initialize the stream.
      */
     @VintfStability
     parcelable OpenInputStreamArguments {
@@ -312,12 +317,16 @@
      *
      * Note that although it's not prohibited to open a stream on a mix port
      * configuration which is not connected (using a patch) to any device port,
-     * and set up a patch afterwards, this is not the recommended sequence of
-     * calls, because setting up of a patch might fail due to an insufficient
-     * stream buffer size.
+     * and set up a patch afterwards, this sequence of calls is not recommended,
+     * because setting up of a patch might fail due to an insufficient stream
+     * buffer size. Another consequence of having a stream on an unconnected mix
+     * port is that presentation positions can not be determined because there
+     * is no "external observer," thus write operations done via
+     * StreamDescriptor will be completing with an error, although the data
+     * will still be accepted and immediately discarded.
      *
      * @return An opened output stream and the associated descriptor.
-     * @param args Input arguments, see 'OpenOutputStreamArguments' parcelable.
+     * @param args The pack of arguments, see 'OpenOutputStreamArguments' parcelable.
      * @throws EX_ILLEGAL_ARGUMENT In the following cases:
      *                             - If the port config can not be found by the ID.
      *                             - If the port config is not of an output mix port.
@@ -330,6 +339,7 @@
      *                            been reached.
      *                          - If another opened stream already exists for the 'PRIMARY'
      *                            output port.
+     *                          - If the HAL module failed to initialize the stream.
      */
     @VintfStability
     parcelable OpenOutputStreamArguments {
diff --git a/audio/aidl/android/hardware/audio/core/IStreamIn.aidl b/audio/aidl/android/hardware/audio/core/IStreamIn.aidl
index 7205bb8..0c3e3d1 100644
--- a/audio/aidl/android/hardware/audio/core/IStreamIn.aidl
+++ b/audio/aidl/android/hardware/audio/core/IStreamIn.aidl
@@ -27,10 +27,12 @@
      * Close the stream.
      *
      * Releases any resources allocated for this stream on the HAL module side.
-     * The stream can not be operated after it has been closed. Methods of this
-     * interface throw EX_ILLEGAL_STATE for a closed stream.
+     * This includes the fast message queues and shared memories returned via
+     * the StreamDescriptor. Thus, the stream can not be operated anymore after
+     * it has been closed. The client needs to release the audio data I/O
+     * objects after the call to this method returns.
      *
-     * The associated stream descriptor can be released once this method returns.
+     * Methods of this interface throw EX_ILLEGAL_STATE for a closed stream.
      *
      * @throws EX_ILLEGAL_STATE If the stream has already been closed.
      */
diff --git a/audio/aidl/android/hardware/audio/core/IStreamOut.aidl b/audio/aidl/android/hardware/audio/core/IStreamOut.aidl
index 0a5aacd..9fdb37d 100644
--- a/audio/aidl/android/hardware/audio/core/IStreamOut.aidl
+++ b/audio/aidl/android/hardware/audio/core/IStreamOut.aidl
@@ -27,10 +27,12 @@
      * Close the stream.
      *
      * Releases any resources allocated for this stream on the HAL module side.
-     * The stream can not be operated after it has been closed. Methods of this
-     * interface throw EX_ILLEGAL_STATE for a closed stream.
+     * This includes the fast message queues and shared memories returned via
+     * the StreamDescriptor. Thus, the stream can not be operated anymore after
+     * it has been closed. The client needs to release the audio data I/O
+     * objects after the call to this method returns.
      *
-     * The associated stream descriptor can be released once this method returns.
+     * Methods of this interface throw EX_ILLEGAL_STATE for a closed stream.
      *
      * @throws EX_ILLEGAL_STATE If the stream has already been closed.
      */
diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
index f2338e0..74090d7 100644
--- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
@@ -19,7 +19,6 @@
 import android.hardware.audio.core.MmapBufferDescriptor;
 import android.hardware.common.fmq.MQDescriptor;
 import android.hardware.common.fmq.SynchronizedReadWrite;
-import android.hardware.common.fmq.UnsynchronizedWrite;
 
 /**
  * Stream descriptor contains fast message queues and buffers used for sending
@@ -57,13 +56,6 @@
     }
 
     /**
-     * The exit command is used to unblock the HAL thread and ask it to exit.
-     * This is the last command that the client sends via the StreamDescriptor.
-     * The HAL module must reply to this command in order to unblock the client,
-     * and cease waiting on the command queue.
-     */
-    const int COMMAND_EXIT = 0;
-    /**
      * The command used for audio I/O, see 'AudioBuffer'. For MMap No IRQ mode
      * this command only provides updated positions and latency because actual
      * audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
@@ -83,30 +75,27 @@
          */
         int code;
         /**
-         * For output streams: the amount of bytes provided by the client in the
-         *   'audio.fmq' queue.
-         * For input streams: the amount of bytes requested by the client to read
-         *   from the hardware into the 'audio.fmq' queue.
+         * For output streams: the amount of bytes that the client requests the
+         *   HAL module to read from the 'audio.fmq' queue.
+         * For input streams: the amount of bytes requested by the client to
+         *   read from the hardware into the 'audio.fmq' queue.
+         *
+         * In both cases it is allowed for this field to contain any
+         * non-negative number. The value 0 can be used if the client only needs
+         * to retrieve current positions and latency. Any sufficiently big value
+         * which exceeds the size of the queue's area which is currently
+         * available for reading or writing by the HAL module must be trimmed by
+         * the HAL module to the available size. Note that the HAL module is
+         * allowed to consume or provide less data than requested, and it must
+         * return the amount of actually read or written data via the
+         * 'Reply.fmqByteCount' field. Thus, only attempts to pass a negative
+         * number must be constituted as a client's error.
          */
         int fmqByteCount;
     }
     MQDescriptor<Command, SynchronizedReadWrite> command;
 
     /**
-     * No error, the command completed successfully.
-     */
-    const int STATUS_OK = 0;
-    /**
-     * Invalid data provided in the command, e.g. unknown command code or
-     * negative 'fmqByteCount' value.
-     */
-    const int STATUS_ILLEGAL_ARGUMENT = 1;
-    /**
-     * The HAL module is not in the state when it can complete the command.
-     */
-    const int STATUS_ILLEGAL_STATE = 2;
-
-    /**
      * Used for providing replies to commands. The HAL module writes into
      * the queue, the client reads. The queue can only contain a single reply,
      * corresponding to the last command sent by the client.
@@ -115,7 +104,15 @@
     @FixedSize
     parcelable Reply {
         /**
-         * One of STATUS_* statuses.
+         * One of Binder STATUS_* statuses:
+         *  - STATUS_OK: the command has completed successfully;
+         *  - STATUS_BAD_VALUE: invalid value in the 'Command' structure;
+         *  - STATUS_INVALID_OPERATION: the mix port is not connected
+         *                              to any producer or consumer, thus
+         *                              positions can not be reported;
+         *  - STATUS_NOT_ENOUGH_DATA: a read or write error has
+         *                            occurred for the 'audio.fmq' queue;
+         *
          */
         int status;
         /**
@@ -123,6 +120,9 @@
          *   module from the 'audio.fmq' queue.
          * For input streams: the amount of bytes actually provided by the HAL
          *   in the 'audio.fmq' queue.
+         *
+         * The returned value must not exceed the value passed in the
+         * 'fmqByteCount' field of the corresponding command or be negative.
          */
         int fmqByteCount;
         /**
@@ -162,12 +162,15 @@
     @VintfStability
     union AudioBuffer {
         /**
-         * The fast message queue used for all modes except MMap No IRQ. Access
-         * to this queue is synchronized via the 'command' and 'reply' queues
-         * as described below.
+         * The fast message queue used for all modes except MMap No IRQ.  Both
+         * reads and writes into this queue are non-blocking because access to
+         * this queue is synchronized via the 'command' and 'reply' queues as
+         * described below. The queue nevertheless uses 'SynchronizedReadWrite'
+         * because there is only one reader, and the reading position must be
+         * shared.
          *
          * For output streams the following sequence of operations is used:
-         *  1. The client puts audio data into the 'audio.fmq' queue.
+         *  1. The client writes audio data into the 'audio.fmq' queue.
          *  2. The client writes the 'BURST' command into the 'command' queue,
          *     and hangs on waiting on a read from the 'reply' queue.
          *  3. The high priority thread in the HAL module wakes up due to 2.
@@ -175,19 +178,20 @@
          *  5. The HAL module writes the command status and current positions
          *     into 'reply' queue, and hangs on waiting on a read from
          *     the 'command' queue.
+         *  6. The client wakes up due to 5. and reads the reply.
          *
          * For input streams the following sequence of operations is used:
          *  1. The client writes the 'BURST' command into the 'command' queue,
          *     and hangs on waiting on a read from the 'reply' queue.
          *  2. The high priority thread in the HAL module wakes up due to 1.
-         *  3. The HAL module puts audio data into the 'audio.fmq' queue.
+         *  3. The HAL module writes audio data into the 'audio.fmq' queue.
          *  4. The HAL module writes the command status and current positions
          *     into 'reply' queue, and hangs on waiting on a read from
          *     the 'command' queue.
          *  5. The client wakes up due to 4.
          *  6. The client reads the reply and audio data.
          */
-        MQDescriptor<byte, UnsynchronizedWrite> fmq;
+        MQDescriptor<byte, SynchronizedReadWrite> fmq;
         /**
          * MMap buffers are shared directly with the DSP, which operates
          * independently from the CPU. Writes and reads into these buffers
diff --git a/audio/aidl/common/include/Utils.h b/audio/aidl/common/include/Utils.h
new file mode 100644
index 0000000..1a87882
--- /dev/null
+++ b/audio/aidl/common/include/Utils.h
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <aidl/android/media/audio/common/AudioChannelLayout.h>
+#include <aidl/android/media/audio/common/AudioFormatDescription.h>
+#include <aidl/android/media/audio/common/PcmType.h>
+
+namespace android::hardware::audio::common {
+
+constexpr size_t getPcmSampleSizeInBytes(::aidl::android::media::audio::common::PcmType pcm) {
+    using ::aidl::android::media::audio::common::PcmType;
+    switch (pcm) {
+        case PcmType::UINT_8_BIT:
+            return 1;
+        case PcmType::INT_16_BIT:
+            return 2;
+        case PcmType::INT_32_BIT:
+            return 4;
+        case PcmType::FIXED_Q_8_24:
+            return 4;
+        case PcmType::FLOAT_32_BIT:
+            return 4;
+        case PcmType::INT_24_BIT:
+            return 3;
+    }
+    return 0;
+}
+
+constexpr size_t getChannelCount(
+        const ::aidl::android::media::audio::common::AudioChannelLayout& layout) {
+    using Tag = ::aidl::android::media::audio::common::AudioChannelLayout::Tag;
+    switch (layout.getTag()) {
+        case Tag::none:
+            return 0;
+        case Tag::invalid:
+            return 0;
+        case Tag::indexMask:
+            return __builtin_popcount(layout.get<Tag::indexMask>());
+        case Tag::layoutMask:
+            return __builtin_popcount(layout.get<Tag::layoutMask>());
+        case Tag::voiceMask:
+            return __builtin_popcount(layout.get<Tag::voiceMask>());
+    }
+    return 0;
+}
+
+constexpr size_t getFrameSizeInBytes(
+        const ::aidl::android::media::audio::common::AudioFormatDescription& format,
+        const ::aidl::android::media::audio::common::AudioChannelLayout& layout) {
+    using ::aidl::android::media::audio::common::AudioFormatType;
+    if (format.type == AudioFormatType::PCM) {
+        return getPcmSampleSizeInBytes(format.pcm) * getChannelCount(layout);
+    }
+    // For non-PCM formats always use frame size of 1.
+    return 1;
+}
+
+}  // namespace android::hardware::audio::common
diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp
index 07b1097..03f8c64 100644
--- a/audio/aidl/default/Android.bp
+++ b/audio/aidl/default/Android.bp
@@ -7,19 +7,27 @@
     default_applicable_licenses: ["hardware_interfaces_license"],
 }
 
-cc_library_static {
-    name: "libaudioserviceexampleimpl",
+cc_defaults {
+    name: "aidlaudioservice_defaults",
     vendor: true,
     shared_libs: [
         "libaudioaidlcommon",
         "libbase",
         "libbinder_ndk",
+        "libcutils",
+        "libfmq",
         "libstagefright_foundation",
+        "libutils",
         "android.media.audio.common.types-V1-ndk",
         "android.hardware.audio.core-V1-ndk",
         "android.hardware.common-V2-ndk",
         "android.hardware.common.fmq-V1-ndk",
     ],
+}
+
+cc_library_static {
+    name: "libaudioserviceexampleimpl",
+    defaults: ["aidlaudioservice_defaults"],
     export_include_dirs: ["include"],
     srcs: [
         "Config.cpp",
@@ -37,16 +45,7 @@
     relative_install_path: "hw",
     init_rc: ["android.hardware.audio.service-aidl.example.rc"],
     vintf_fragments: ["android.hardware.audio.service-aidl.xml"],
-    vendor: true,
-    shared_libs: [
-        "libbase",
-        "libbinder_ndk",
-        "libstagefright_foundation",
-        "android.media.audio.common.types-V1-ndk",
-        "android.hardware.audio.core-V1-ndk",
-        "android.hardware.common-V2-ndk",
-        "android.hardware.common.fmq-V1-ndk",
-    ],
+    defaults: ["aidlaudioservice_defaults"],
     static_libs: [
         "libaudioserviceexampleimpl",
     ],
diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp
index 1c6f90a..af033d0 100644
--- a/audio/aidl/default/Module.cpp
+++ b/audio/aidl/default/Module.cpp
@@ -20,6 +20,8 @@
 #define LOG_TAG "AHAL_Module"
 #include <android-base/logging.h>
 
+#include <Utils.h>
+#include <aidl/android/media/audio/common/AudioInputFlags.h>
 #include <aidl/android/media/audio/common/AudioOutputFlags.h>
 
 #include "core-impl/Module.h"
@@ -30,6 +32,7 @@
 using aidl::android::media::audio::common::AudioChannelLayout;
 using aidl::android::media::audio::common::AudioFormatDescription;
 using aidl::android::media::audio::common::AudioFormatType;
+using aidl::android::media::audio::common::AudioInputFlags;
 using aidl::android::media::audio::common::AudioIoFlags;
 using aidl::android::media::audio::common::AudioOffloadInfo;
 using aidl::android::media::audio::common::AudioOutputFlags;
@@ -39,6 +42,7 @@
 using aidl::android::media::audio::common::AudioProfile;
 using aidl::android::media::audio::common::Int;
 using aidl::android::media::audio::common::PcmType;
+using android::hardware::audio::common::getFrameSizeInBytes;
 
 namespace aidl::android::hardware::audio::core {
 
@@ -72,49 +76,6 @@
     return true;
 }
 
-constexpr size_t getPcmSampleSizeInBytes(PcmType pcm) {
-    switch (pcm) {
-        case PcmType::UINT_8_BIT:
-            return 1;
-        case PcmType::INT_16_BIT:
-            return 2;
-        case PcmType::INT_32_BIT:
-            return 4;
-        case PcmType::FIXED_Q_8_24:
-            return 4;
-        case PcmType::FLOAT_32_BIT:
-            return 4;
-        case PcmType::INT_24_BIT:
-            return 3;
-    }
-    return 0;
-}
-
-constexpr size_t getChannelCount(const AudioChannelLayout& layout) {
-    using Tag = AudioChannelLayout::Tag;
-    switch (layout.getTag()) {
-        case Tag::none:
-            return 0;
-        case Tag::invalid:
-            return 0;
-        case Tag::indexMask:
-            return __builtin_popcount(layout.get<Tag::indexMask>());
-        case Tag::layoutMask:
-            return __builtin_popcount(layout.get<Tag::layoutMask>());
-        case Tag::voiceMask:
-            return __builtin_popcount(layout.get<Tag::voiceMask>());
-    }
-    return 0;
-}
-
-size_t getFrameSizeInBytes(const AudioFormatDescription& format, const AudioChannelLayout& layout) {
-    if (format.type == AudioFormatType::PCM) {
-        return getPcmSampleSizeInBytes(format.pcm) * getChannelCount(layout);
-    }
-    // For non-PCM formats always use frame size of 1.
-    return 1;
-}
-
 bool findAudioProfile(const AudioPort& port, const AudioFormatDescription& format,
                       AudioProfile* profile) {
     if (auto profilesIt =
@@ -133,33 +94,8 @@
     erase_all_values(mPatches, std::set<int32_t>{patchId});
 }
 
-void Module::cleanUpPatches(int32_t portConfigId) {
-    auto& patches = getConfig().patches;
-    if (patches.size() == 0) return;
-    auto range = mPatches.equal_range(portConfigId);
-    for (auto it = range.first; it != range.second; ++it) {
-        auto patchIt = findById<AudioPatch>(patches, it->second);
-        if (patchIt != patches.end()) {
-            erase_if(patchIt->sourcePortConfigIds,
-                     [portConfigId](auto e) { return e == portConfigId; });
-            erase_if(patchIt->sinkPortConfigIds,
-                     [portConfigId](auto e) { return e == portConfigId; });
-        }
-    }
-    std::set<int32_t> erasedPatches;
-    for (size_t i = patches.size() - 1; i != 0; --i) {
-        const auto& patch = patches[i];
-        if (patch.sourcePortConfigIds.empty() || patch.sinkPortConfigIds.empty()) {
-            erasedPatches.insert(patch.id);
-            patches.erase(patches.begin() + i);
-        }
-    }
-    erase_all_values(mPatches, erasedPatches);
-}
-
-ndk::ScopedAStatus Module::createStreamDescriptor(int32_t in_portConfigId,
-                                                  int64_t in_bufferSizeFrames,
-                                                  StreamDescriptor* out_descriptor) {
+ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t in_bufferSizeFrames,
+                                               StreamContext* out_context) {
     if (in_bufferSizeFrames <= 0) {
         LOG(ERROR) << __func__ << ": non-positive buffer size " << in_bufferSizeFrames;
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
@@ -171,7 +107,7 @@
     }
     auto& configs = getConfig().portConfigs;
     auto portConfigIt = findById<AudioPortConfig>(configs, in_portConfigId);
-    // Since 'createStreamDescriptor' is an internal method, it is assumed that
+    // Since this is a private method, it is assumed that
     // validity of the portConfigId has already been checked.
     const size_t frameSize =
             getFrameSizeInBytes(portConfigIt->format.value(), portConfigIt->channelMask.value());
@@ -187,7 +123,26 @@
                    << kMaximumStreamBufferSizeBytes / frameSize;
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
     }
-    (void)out_descriptor;
+    const auto& flags = portConfigIt->flags.value();
+    if ((flags.getTag() == AudioIoFlags::Tag::input &&
+         (flags.get<AudioIoFlags::Tag::input>() &
+          1 << static_cast<int32_t>(AudioInputFlags::MMAP_NOIRQ)) == 0) ||
+        (flags.getTag() == AudioIoFlags::Tag::output &&
+         (flags.get<AudioIoFlags::Tag::output>() &
+          1 << static_cast<int32_t>(AudioOutputFlags::MMAP_NOIRQ)) == 0)) {
+        StreamContext temp(
+                std::make_unique<StreamContext::CommandMQ>(1, true /*configureEventFlagWord*/),
+                std::make_unique<StreamContext::ReplyMQ>(1, true /*configureEventFlagWord*/),
+                frameSize,
+                std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames));
+        if (temp.isValid()) {
+            *out_context = std::move(temp);
+        } else {
+            return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
+        }
+    } else {
+        // TODO: Implement simulation of MMAP buffer allocation
+    }
     return ndk::ScopedAStatus::ok();
 }
 
@@ -253,6 +208,28 @@
     do_insert(patch.sinkPortConfigIds);
 }
 
+void Module::updateStreamsConnectedState(const AudioPatch& oldPatch, const AudioPatch& newPatch) {
+    // Streams from the old patch need to be disconnected, streams from the new
+    // patch need to be connected. If the stream belongs to both patches, no need
+    // to update it.
+    std::set<int32_t> idsToDisconnect, idsToConnect;
+    idsToDisconnect.insert(oldPatch.sourcePortConfigIds.begin(),
+                           oldPatch.sourcePortConfigIds.end());
+    idsToDisconnect.insert(oldPatch.sinkPortConfigIds.begin(), oldPatch.sinkPortConfigIds.end());
+    idsToConnect.insert(newPatch.sourcePortConfigIds.begin(), newPatch.sourcePortConfigIds.end());
+    idsToConnect.insert(newPatch.sinkPortConfigIds.begin(), newPatch.sinkPortConfigIds.end());
+    std::for_each(idsToDisconnect.begin(), idsToDisconnect.end(), [&](const auto& portConfigId) {
+        if (idsToConnect.count(portConfigId) == 0) {
+            mStreams.setStreamIsConnected(portConfigId, false);
+        }
+    });
+    std::for_each(idsToConnect.begin(), idsToConnect.end(), [&](const auto& portConfigId) {
+        if (idsToDisconnect.count(portConfigId) == 0) {
+            mStreams.setStreamIsConnected(portConfigId, true);
+        }
+    });
+}
+
 ndk::ScopedAStatus Module::setModuleDebug(
         const ::aidl::android::hardware::audio::core::ModuleDebug& in_debug) {
     LOG(DEBUG) << __func__ << ": old flags:" << mDebug.toString()
@@ -467,13 +444,22 @@
                    << " does not correspond to an input mix port";
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
     }
-    if (auto status = createStreamDescriptor(in_args.portConfigId, in_args.bufferSizeFrames,
-                                             &_aidl_return->desc);
+    StreamContext context;
+    if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
         !status.isOk()) {
         return status;
     }
-    auto stream = ndk::SharedRefBase::make<StreamIn>(in_args.sinkMetadata);
-    mStreams.insert(port->id, in_args.portConfigId, StreamWrapper(stream));
+    context.fillDescriptor(&_aidl_return->desc);
+    auto stream = ndk::SharedRefBase::make<StreamIn>(in_args.sinkMetadata, std::move(context));
+    if (auto status = stream->init(); !status.isOk()) {
+        return status;
+    }
+    StreamWrapper streamWrapper(stream);
+    auto patchIt = mPatches.find(in_args.portConfigId);
+    if (patchIt != mPatches.end()) {
+        streamWrapper.setStreamIsConnected(true);
+    }
+    mStreams.insert(port->id, in_args.portConfigId, std::move(streamWrapper));
     _aidl_return->stream = std::move(stream);
     return ndk::ScopedAStatus::ok();
 }
@@ -499,13 +485,23 @@
                    << " has COMPRESS_OFFLOAD flag set, requires offload info";
         return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
     }
-    if (auto status = createStreamDescriptor(in_args.portConfigId, in_args.bufferSizeFrames,
-                                             &_aidl_return->desc);
+    StreamContext context;
+    if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
         !status.isOk()) {
         return status;
     }
-    auto stream = ndk::SharedRefBase::make<StreamOut>(in_args.sourceMetadata, in_args.offloadInfo);
-    mStreams.insert(port->id, in_args.portConfigId, StreamWrapper(stream));
+    context.fillDescriptor(&_aidl_return->desc);
+    auto stream = ndk::SharedRefBase::make<StreamOut>(in_args.sourceMetadata, std::move(context),
+                                                      in_args.offloadInfo);
+    if (auto status = stream->init(); !status.isOk()) {
+        return status;
+    }
+    StreamWrapper streamWrapper(stream);
+    auto patchIt = mPatches.find(in_args.portConfigId);
+    if (patchIt != mPatches.end()) {
+        streamWrapper.setStreamIsConnected(true);
+    }
+    mStreams.insert(port->id, in_args.portConfigId, std::move(streamWrapper));
     _aidl_return->stream = std::move(stream);
     return ndk::ScopedAStatus::ok();
 }
@@ -595,15 +591,20 @@
     _aidl_return->latenciesMs.clear();
     _aidl_return->latenciesMs.insert(_aidl_return->latenciesMs.end(),
                                      _aidl_return->sinkPortConfigIds.size(), kLatencyMs);
+    AudioPatch oldPatch{};
     if (existing == patches.end()) {
         _aidl_return->id = getConfig().nextPatchId++;
         patches.push_back(*_aidl_return);
         existing = patches.begin() + (patches.size() - 1);
     } else {
+        oldPatch = *existing;
         *existing = *_aidl_return;
     }
     registerPatch(*existing);
-    LOG(DEBUG) << __func__ << ": created or updated patch id " << _aidl_return->id;
+    updateStreamsConnectedState(oldPatch, *_aidl_return);
+
+    LOG(DEBUG) << __func__ << ": " << (oldPatch.id == 0 ? "created" : "updated") << " patch "
+               << _aidl_return->toString();
     return ndk::ScopedAStatus::ok();
 }
 
@@ -738,6 +739,7 @@
     auto patchIt = findById<AudioPatch>(patches, in_patchId);
     if (patchIt != patches.end()) {
         cleanUpPatch(patchIt->id);
+        updateStreamsConnectedState(*patchIt, AudioPatch{});
         patches.erase(patchIt);
         LOG(DEBUG) << __func__ << ": erased patch " << in_patchId;
         return ndk::ScopedAStatus::ok();
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index ab3e451..24e46db 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -16,7 +16,9 @@
 
 #define LOG_TAG "AHAL_Stream"
 #include <android-base/logging.h>
+#include <utils/SystemClock.h>
 
+#include "core-impl/Module.h"
 #include "core-impl/Stream.h"
 
 using aidl::android::hardware::audio::common::SinkMetadata;
@@ -25,13 +27,198 @@
 
 namespace aidl::android::hardware::audio::core {
 
-StreamIn::StreamIn(const SinkMetadata& sinkMetadata) : mMetadata(sinkMetadata) {
-    LOG(DEBUG) << __func__;
+void StreamContext::fillDescriptor(StreamDescriptor* desc) {
+    if (mCommandMQ) {
+        desc->command = mCommandMQ->dupeDesc();
+    }
+    if (mReplyMQ) {
+        desc->reply = mReplyMQ->dupeDesc();
+    }
+    if (mDataMQ) {
+        desc->bufferSizeFrames =
+                mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize() / mFrameSize;
+        desc->audio.set<StreamDescriptor::AudioBuffer::Tag::fmq>(mDataMQ->dupeDesc());
+    }
 }
 
-ndk::ScopedAStatus StreamIn::close() {
+bool StreamContext::isValid() const {
+    if (mCommandMQ && !mCommandMQ->isValid()) {
+        LOG(ERROR) << "command FMQ is invalid";
+        return false;
+    }
+    if (mReplyMQ && !mReplyMQ->isValid()) {
+        LOG(ERROR) << "reply FMQ is invalid";
+        return false;
+    }
+    if (mFrameSize == 0) {
+        LOG(ERROR) << "frame size is not set";
+        return false;
+    }
+    if (mDataMQ && !mDataMQ->isValid()) {
+        LOG(ERROR) << "data FMQ is invalid";
+        return false;
+    }
+    return true;
+}
+
+void StreamContext::reset() {
+    mCommandMQ.reset();
+    mReplyMQ.reset();
+    mDataMQ.reset();
+}
+
+std::string StreamWorkerCommonLogic::init() {
+    if (mCommandMQ == nullptr) return "Command MQ is null";
+    if (mReplyMQ == nullptr) return "Reply MQ is null";
+    if (mDataMQ == nullptr) return "Data MQ is null";
+    if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
+        return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
+    }
+    mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
+    mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
+    if (mDataBuffer == nullptr) {
+        return "Failed to allocate data buffer for element count " +
+               std::to_string(mDataMQ->getQuantumCount()) +
+               ", size in bytes: " + std::to_string(mDataBufferSize);
+    }
+    return "";
+}
+
+const std::string StreamInWorkerLogic::kThreadName = "reader";
+
+StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
+    StreamDescriptor::Command command{};
+    if (!mCommandMQ->readBlocking(&command, 1)) {
+        LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+        return Status::ABORT;
+    }
+    StreamDescriptor::Reply reply{};
+    if (command.code == StreamContext::COMMAND_EXIT &&
+        command.fmqByteCount == mInternalCommandCookie) {
+        LOG(DEBUG) << __func__ << ": received EXIT command";
+        // This is an internal command, no need to reply.
+        return Status::EXIT;
+    } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+        LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount
+                   << " bytes";
+        usleep(3000);  // Simulate a blocking call into the driver.
+        const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
+                                           mDataMQ->availableToWrite(), mDataBufferSize});
+        const bool isConnected = mIsConnected;
+        // Simulate reading of data, or provide zeroes if the stream is not connected.
+        for (size_t i = 0; i < byteCount; ++i) {
+            using buffer_type = decltype(mDataBuffer)::element_type;
+            constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
+                                              std::numeric_limits<buffer_type>::min() + 1;
+            mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
+                                                   std::numeric_limits<buffer_type>::min()
+                                         : 0;
+        }
+        bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true;
+        if (success) {
+            LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+                       << " succeeded; connected? " << isConnected;
+            // Frames are provided and counted regardless of connection status.
+            reply.fmqByteCount = byteCount;
+            mFrameCount += byteCount / mFrameSize;
+            if (isConnected) {
+                reply.status = STATUS_OK;
+                reply.observable.frames = mFrameCount;
+                reply.observable.timeNs = ::android::elapsedRealtimeNano();
+            } else {
+                reply.status = STATUS_INVALID_OPERATION;
+            }
+        } else {
+            LOG(WARNING) << __func__ << ": writing of " << byteCount
+                         << " bytes of data to MQ failed";
+            reply.status = STATUS_NOT_ENOUGH_DATA;
+        }
+        reply.latencyMs = Module::kLatencyMs;
+    } else {
+        LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
+                     << ") or count: " << command.fmqByteCount;
+        reply.status = STATUS_BAD_VALUE;
+    }
+    LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
+    if (!mReplyMQ->writeBlocking(&reply, 1)) {
+        LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+        return Status::ABORT;
+    }
+    return Status::CONTINUE;
+}
+
+const std::string StreamOutWorkerLogic::kThreadName = "writer";
+
+StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
+    StreamDescriptor::Command command{};
+    if (!mCommandMQ->readBlocking(&command, 1)) {
+        LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+        return Status::ABORT;
+    }
+    StreamDescriptor::Reply reply{};
+    if (command.code == StreamContext::COMMAND_EXIT &&
+        command.fmqByteCount == mInternalCommandCookie) {
+        LOG(DEBUG) << __func__ << ": received EXIT command";
+        // This is an internal command, no need to reply.
+        return Status::EXIT;
+    } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+        LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount
+                   << " bytes";
+        const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
+                                           mDataMQ->availableToRead(), mDataBufferSize});
+        bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true;
+        if (success) {
+            const bool isConnected = mIsConnected;
+            LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ"
+                       << " succeeded; connected? " << isConnected;
+            // Frames are consumed and counted regardless of connection status.
+            reply.fmqByteCount = byteCount;
+            mFrameCount += byteCount / mFrameSize;
+            if (isConnected) {
+                reply.status = STATUS_OK;
+                reply.observable.frames = mFrameCount;
+                reply.observable.timeNs = ::android::elapsedRealtimeNano();
+            } else {
+                reply.status = STATUS_INVALID_OPERATION;
+            }
+            usleep(3000);  // Simulate a blocking call into the driver.
+        } else {
+            LOG(WARNING) << __func__ << ": reading of " << byteCount
+                         << " bytes of data from MQ failed";
+            reply.status = STATUS_NOT_ENOUGH_DATA;
+        }
+        reply.latencyMs = Module::kLatencyMs;
+    } else {
+        LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
+                     << ") or count: " << command.fmqByteCount;
+        reply.status = STATUS_BAD_VALUE;
+    }
+    LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
+    if (!mReplyMQ->writeBlocking(&reply, 1)) {
+        LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+        return Status::ABORT;
+    }
+    return Status::CONTINUE;
+}
+
+template <class Metadata, class StreamWorker>
+StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
+    if (!mIsClosed) {
+        LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
+        stopWorker();
+        // The worker and the context should clean up by themselves via destructors.
+    }
+}
+
+template <class Metadata, class StreamWorker>
+ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
     LOG(DEBUG) << __func__;
     if (!mIsClosed) {
+        stopWorker();
+        LOG(DEBUG) << __func__ << ": joining the worker thread...";
+        mWorker.stop();
+        LOG(DEBUG) << __func__ << ": worker thread joined";
+        mContext.reset();
         mIsClosed = true;
         return ndk::ScopedAStatus::ok();
     } else {
@@ -40,40 +227,44 @@
     }
 }
 
-ndk::ScopedAStatus StreamIn::updateMetadata(const SinkMetadata& in_sinkMetadata) {
+template <class Metadata, class StreamWorker>
+void StreamCommon<Metadata, StreamWorker>::stopWorker() {
+    if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
+        LOG(DEBUG) << __func__ << ": asking the worker to stop...";
+        StreamDescriptor::Command cmd;
+        cmd.code = StreamContext::COMMAND_EXIT;
+        cmd.fmqByteCount = mContext.getInternalCommandCookie();
+        // FIXME: This can block in the case when the client wrote a command
+        // while the stream worker's cycle is not running. Need to revisit
+        // when implementing standby and pause/resume.
+        if (!commandMQ->writeBlocking(&cmd, 1)) {
+            LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
+        }
+        LOG(DEBUG) << __func__ << ": done";
+    }
+}
+
+template <class Metadata, class StreamWorker>
+ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
     LOG(DEBUG) << __func__;
     if (!mIsClosed) {
-        mMetadata = in_sinkMetadata;
+        mMetadata = metadata;
         return ndk::ScopedAStatus::ok();
     }
     LOG(ERROR) << __func__ << ": stream was closed";
     return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
 }
 
-StreamOut::StreamOut(const SourceMetadata& sourceMetadata,
+StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext context)
+    : StreamCommon<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)) {
+    LOG(DEBUG) << __func__;
+}
+
+StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext context,
                      const std::optional<AudioOffloadInfo>& offloadInfo)
-    : mMetadata(sourceMetadata), mOffloadInfo(offloadInfo) {
+    : StreamCommon<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
+      mOffloadInfo(offloadInfo) {
     LOG(DEBUG) << __func__;
 }
 
-ndk::ScopedAStatus StreamOut::close() {
-    LOG(DEBUG) << __func__;
-    if (!mIsClosed) {
-        mIsClosed = true;
-        return ndk::ScopedAStatus::ok();
-    }
-    LOG(ERROR) << __func__ << ": stream was already closed";
-    return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
-}
-
-ndk::ScopedAStatus StreamOut::updateMetadata(const SourceMetadata& in_sourceMetadata) {
-    LOG(DEBUG) << __func__;
-    if (!mIsClosed) {
-        mMetadata = in_sourceMetadata;
-        return ndk::ScopedAStatus::ok();
-    }
-    LOG(ERROR) << __func__ << ": stream was closed";
-    return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
-}
-
 }  // namespace aidl::android::hardware::audio::core
diff --git a/audio/aidl/default/include/core-impl/Module.h b/audio/aidl/default/include/core-impl/Module.h
index f7e14af..61516b2 100644
--- a/audio/aidl/default/include/core-impl/Module.h
+++ b/audio/aidl/default/include/core-impl/Module.h
@@ -28,6 +28,11 @@
 namespace aidl::android::hardware::audio::core {
 
 class Module : public BnModule {
+  public:
+    // This value is used for all AudioPatches and reported by all streams.
+    static constexpr int32_t kLatencyMs = 10;
+
+  private:
     ndk::ScopedAStatus setModuleDebug(
             const ::aidl::android::hardware::audio::core::ModuleDebug& in_debug) override;
     ndk::ScopedAStatus connectExternalDevice(
@@ -66,21 +71,18 @@
     ndk::ScopedAStatus resetAudioPatch(int32_t in_patchId) override;
     ndk::ScopedAStatus resetAudioPortConfig(int32_t in_portConfigId) override;
 
-  private:
     void cleanUpPatch(int32_t patchId);
-    void cleanUpPatches(int32_t portConfigId);
-    ndk::ScopedAStatus createStreamDescriptor(
+    ndk::ScopedAStatus createStreamContext(
             int32_t in_portConfigId, int64_t in_bufferSizeFrames,
-            ::aidl::android::hardware::audio::core::StreamDescriptor* out_descriptor);
+            ::aidl::android::hardware::audio::core::StreamContext* out_context);
     ndk::ScopedAStatus findPortIdForNewStream(
             int32_t in_portConfigId, ::aidl::android::media::audio::common::AudioPort** port);
     internal::Configuration& getConfig();
     void registerPatch(const AudioPatch& patch);
+    void updateStreamsConnectedState(const AudioPatch& oldPatch, const AudioPatch& newPatch);
 
     // This value is used for all AudioPatches.
     static constexpr int32_t kMinimumStreamBufferSizeFrames = 16;
-    // This value is used for all AudioPatches.
-    static constexpr int32_t kLatencyMs = 10;
     // The maximum stream buffer size is 1 GiB = 2 ** 30 bytes;
     static constexpr int32_t kMaximumStreamBufferSizeBytes = 1 << 30;
 
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 87104dd..816cdb1 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -16,50 +16,203 @@
 
 #pragma once
 
+#include <atomic>
+#include <cstdlib>
 #include <map>
+#include <memory>
 #include <optional>
 #include <variant>
 
+#include <StreamWorker.h>
 #include <aidl/android/hardware/audio/common/SinkMetadata.h>
 #include <aidl/android/hardware/audio/common/SourceMetadata.h>
 #include <aidl/android/hardware/audio/core/BnStreamIn.h>
 #include <aidl/android/hardware/audio/core/BnStreamOut.h>
+#include <aidl/android/hardware/audio/core/StreamDescriptor.h>
 #include <aidl/android/media/audio/common/AudioOffloadInfo.h>
+#include <fmq/AidlMessageQueue.h>
+#include <system/thread_defs.h>
 
 #include "core-impl/utils.h"
 
 namespace aidl::android::hardware::audio::core {
 
-class StreamIn : public BnStreamIn {
-    ndk::ScopedAStatus close() override;
-    ndk::ScopedAStatus updateMetadata(
-            const ::aidl::android::hardware::audio::common::SinkMetadata& in_sinkMetadata) override;
-
+// This class is similar to StreamDescriptor, but unlike
+// the descriptor, it actually owns the objects implementing
+// data exchange: FMQs etc, whereas StreamDescriptor only
+// contains their descriptors.
+class StreamContext {
   public:
-    explicit StreamIn(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata);
-    bool isClosed() const { return mIsClosed; }
+    typedef ::android::AidlMessageQueue<
+            StreamDescriptor::Command,
+            ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            CommandMQ;
+    typedef ::android::AidlMessageQueue<
+            StreamDescriptor::Reply, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            ReplyMQ;
+    typedef ::android::AidlMessageQueue<
+            int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            DataMQ;
+
+    // Ensure that this value is not used by any of StreamDescriptor.COMMAND_*
+    static constexpr int COMMAND_EXIT = -1;
+
+    StreamContext() = default;
+    StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
+                  size_t frameSize, std::unique_ptr<DataMQ> dataMQ)
+        : mCommandMQ(std::move(commandMQ)),
+          mInternalCommandCookie(std::rand()),
+          mReplyMQ(std::move(replyMQ)),
+          mFrameSize(frameSize),
+          mDataMQ(std::move(dataMQ)) {}
+    StreamContext(StreamContext&& other)
+        : mCommandMQ(std::move(other.mCommandMQ)),
+          mInternalCommandCookie(other.mInternalCommandCookie),
+          mReplyMQ(std::move(other.mReplyMQ)),
+          mFrameSize(other.mFrameSize),
+          mDataMQ(std::move(other.mDataMQ)) {}
+    StreamContext& operator=(StreamContext&& other) {
+        mCommandMQ = std::move(other.mCommandMQ);
+        mInternalCommandCookie = other.mInternalCommandCookie;
+        mReplyMQ = std::move(other.mReplyMQ);
+        mFrameSize = other.mFrameSize;
+        mDataMQ = std::move(other.mDataMQ);
+        return *this;
+    }
+
+    void fillDescriptor(StreamDescriptor* desc);
+    CommandMQ* getCommandMQ() const { return mCommandMQ.get(); }
+    DataMQ* getDataMQ() const { return mDataMQ.get(); }
+    size_t getFrameSize() const { return mFrameSize; }
+    int getInternalCommandCookie() const { return mInternalCommandCookie; }
+    ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
+    bool isValid() const;
+    void reset();
 
   private:
-    ::aidl::android::hardware::audio::common::SinkMetadata mMetadata;
-    bool mIsClosed = false;
+    std::unique_ptr<CommandMQ> mCommandMQ;
+    int mInternalCommandCookie;  // The value used to confirm that the command was posted internally
+    std::unique_ptr<ReplyMQ> mReplyMQ;
+    size_t mFrameSize;
+    std::unique_ptr<DataMQ> mDataMQ;
 };
 
-class StreamOut : public BnStreamOut {
-    ndk::ScopedAStatus close() override;
+class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
+  public:
+    void setIsConnected(bool connected) { mIsConnected = connected; }
+
+  protected:
+    explicit StreamWorkerCommonLogic(const StreamContext& context)
+        : mInternalCommandCookie(context.getInternalCommandCookie()),
+          mFrameSize(context.getFrameSize()),
+          mCommandMQ(context.getCommandMQ()),
+          mReplyMQ(context.getReplyMQ()),
+          mDataMQ(context.getDataMQ()) {}
+    std::string init() override;
+
+    // Used both by the main and worker threads.
+    std::atomic<bool> mIsConnected = false;
+    // All fields are used on the worker thread only.
+    const int mInternalCommandCookie;
+    const size_t mFrameSize;
+    StreamContext::CommandMQ* mCommandMQ;
+    StreamContext::ReplyMQ* mReplyMQ;
+    StreamContext::DataMQ* mDataMQ;
+    // We use an array and the "size" field instead of a vector to be able to detect
+    // memory allocation issues.
+    std::unique_ptr<int8_t[]> mDataBuffer;
+    size_t mDataBufferSize;
+    long mFrameCount = 0;
+};
+
+class StreamInWorkerLogic : public StreamWorkerCommonLogic {
+  public:
+    static const std::string kThreadName;
+    explicit StreamInWorkerLogic(const StreamContext& context) : StreamWorkerCommonLogic(context) {}
+
+  protected:
+    Status cycle() override;
+};
+using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
+
+class StreamOutWorkerLogic : public StreamWorkerCommonLogic {
+  public:
+    static const std::string kThreadName;
+    explicit StreamOutWorkerLogic(const StreamContext& context)
+        : StreamWorkerCommonLogic(context) {}
+
+  protected:
+    Status cycle() override;
+};
+using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
+
+template <class Metadata, class StreamWorker>
+class StreamCommon {
+  public:
+    ndk::ScopedAStatus close();
+    ndk::ScopedAStatus init() {
+        return mWorker.start(StreamWorker::kThreadName, ANDROID_PRIORITY_AUDIO)
+                       ? ndk::ScopedAStatus::ok()
+                       : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
+    }
+    bool isClosed() const { return mIsClosed; }
+    void setIsConnected(bool connected) { mWorker.setIsConnected(connected); }
+    ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
+
+  protected:
+    StreamCommon(const Metadata& metadata, StreamContext context)
+        : mMetadata(metadata), mContext(std::move(context)), mWorker(mContext) {}
+    ~StreamCommon();
+    void stopWorker();
+
+    Metadata mMetadata;
+    StreamContext mContext;
+    StreamWorker mWorker;
+    // This variable is checked in the destructor which can be called on an arbitrary Binder thread,
+    // thus we need to ensure that any changes made by other threads are sequentially consistent.
+    std::atomic<bool> mIsClosed = false;
+};
+
+class StreamIn
+    : public StreamCommon<::aidl::android::hardware::audio::common::SinkMetadata, StreamInWorker>,
+      public BnStreamIn {
+    ndk::ScopedAStatus close() override {
+        return StreamCommon<::aidl::android::hardware::audio::common::SinkMetadata,
+                            StreamInWorker>::close();
+    }
+    ndk::ScopedAStatus updateMetadata(const ::aidl::android::hardware::audio::common::SinkMetadata&
+                                              in_sinkMetadata) override {
+        return StreamCommon<::aidl::android::hardware::audio::common::SinkMetadata,
+                            StreamInWorker>::updateMetadata(in_sinkMetadata);
+    }
+
+  public:
+    StreamIn(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata,
+             StreamContext context);
+};
+
+class StreamOut : public StreamCommon<::aidl::android::hardware::audio::common::SourceMetadata,
+                                      StreamOutWorker>,
+                  public BnStreamOut {
+    ndk::ScopedAStatus close() override {
+        return StreamCommon<::aidl::android::hardware::audio::common::SourceMetadata,
+                            StreamOutWorker>::close();
+    }
     ndk::ScopedAStatus updateMetadata(
             const ::aidl::android::hardware::audio::common::SourceMetadata& in_sourceMetadata)
-            override;
+            override {
+        return StreamCommon<::aidl::android::hardware::audio::common::SourceMetadata,
+                            StreamOutWorker>::updateMetadata(in_sourceMetadata);
+    }
 
   public:
     StreamOut(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata,
+              StreamContext context,
               const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>&
                       offloadInfo);
-    bool isClosed() const { return mIsClosed; }
 
   private:
-    ::aidl::android::hardware::audio::common::SourceMetadata mMetadata;
     std::optional<::aidl::android::media::audio::common::AudioOffloadInfo> mOffloadInfo;
-    bool mIsClosed = false;
 };
 
 class StreamWrapper {
@@ -74,6 +227,15 @@
                 },
                 mStream);
     }
+    void setStreamIsConnected(bool connected) {
+        std::visit(
+                [&](auto&& ws) -> bool {
+                    auto s = ws.lock();
+                    if (s) s->setIsConnected(connected);
+                    return !!s;
+                },
+                mStream);
+    }
 
   private:
     std::variant<std::weak_ptr<StreamIn>, std::weak_ptr<StreamOut>> mStream;
@@ -93,6 +255,11 @@
         mStreams.insert(std::pair{portConfigId, sw});
         mStreams.insert(std::pair{portId, sw});
     }
+    void setStreamIsConnected(int32_t portConfigId, bool connected) {
+        if (auto it = mStreams.find(portConfigId); it != mStreams.end()) {
+            it->second.setStreamIsConnected(connected);
+        }
+    }
 
   private:
     // Maps port ids and port config ids to streams. Multimap because a port
diff --git a/audio/aidl/default/main.cpp b/audio/aidl/default/main.cpp
index aeb9983..15874a0 100644
--- a/audio/aidl/default/main.cpp
+++ b/audio/aidl/default/main.cpp
@@ -14,6 +14,9 @@
  * limitations under the License.
  */
 
+#include <cstdlib>
+#include <ctime>
+
 #include "core-impl/Config.h"
 #include "core-impl/Module.h"
 
@@ -25,6 +28,9 @@
 using aidl::android::hardware::audio::core::Module;
 
 int main() {
+    // Random values are used in the implementation.
+    std::srand(std::time(nullptr));
+
     // This is a debug implementation, always enable debug logging.
     android::base::SetMinimumLogSeverity(::android::base::DEBUG);
     ABinderProcess_setThreadPoolMaxThreadCount(16);
diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp
index 1d0ec7c..7b35133 100644
--- a/audio/aidl/vts/Android.bp
+++ b/audio/aidl/vts/Android.bp
@@ -13,12 +13,10 @@
         "VtsHalTargetTestDefaults",
         "use_libaidlvintf_gtest_helper_static",
     ],
-    srcs: [
-        "ModuleConfig.cpp",
-        "VtsHalAudioCoreTargetTest.cpp",
-    ],
     shared_libs: [
         "libbinder_ndk",
+        "libcutils",
+        "libfmq",
     ],
     static_libs: [
         "android.hardware.audio.common-V1-ndk",
@@ -28,6 +26,16 @@
         "android.media.audio.common.types-V1-ndk",
         "libaudioaidlcommon",
     ],
+    cflags: [
+        "-Wall",
+        "-Wextra",
+        "-Werror",
+        "-Wthread-safety",
+    ],
+    srcs: [
+        "ModuleConfig.cpp",
+        "VtsHalAudioCoreTargetTest.cpp",
+    ],
     test_suites: [
         "general-tests",
         "vts",
diff --git a/audio/aidl/vts/ModuleConfig.cpp b/audio/aidl/vts/ModuleConfig.cpp
index 969b0e9..e36ab4a 100644
--- a/audio/aidl/vts/ModuleConfig.cpp
+++ b/audio/aidl/vts/ModuleConfig.cpp
@@ -123,6 +123,15 @@
     return result;
 }
 
+std::vector<AudioPort> ModuleConfig::getAttachedDevicesPortsForMixPort(
+        bool isInput, const AudioPortConfig& mixPortConfig) const {
+    const auto mixPortIt = findById<AudioPort>(mPorts, mixPortConfig.portId);
+    if (mixPortIt != mPorts.end()) {
+        return getAttachedDevicesPortsForMixPort(isInput, *mixPortIt);
+    }
+    return {};
+}
+
 std::vector<AudioPort> ModuleConfig::getAttachedSinkDevicesPortsForMixPort(
         const AudioPort& mixPort) const {
     std::vector<AudioPort> result;
diff --git a/audio/aidl/vts/ModuleConfig.h b/audio/aidl/vts/ModuleConfig.h
index df13430..552f971 100644
--- a/audio/aidl/vts/ModuleConfig.h
+++ b/audio/aidl/vts/ModuleConfig.h
@@ -54,6 +54,9 @@
         return isInput ? getAttachedSourceDevicesPortsForMixPort(mixPort)
                        : getAttachedSinkDevicesPortsForMixPort(mixPort);
     }
+    std::vector<aidl::android::media::audio::common::AudioPort> getAttachedDevicesPortsForMixPort(
+            bool isInput,
+            const aidl::android::media::audio::common::AudioPortConfig& mixPortConfig) const;
     std::vector<aidl::android::media::audio::common::AudioPort>
     getAttachedSinkDevicesPortsForMixPort(
             const aidl::android::media::audio::common::AudioPort& mixPort) const;
diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
index 0ecc057..ac41ac3 100644
--- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
@@ -22,19 +22,24 @@
 #include <optional>
 #include <set>
 #include <string>
+#include <vector>
 
 #define LOG_TAG "VtsHalAudioCore"
 #include <android-base/logging.h>
 
+#include <StreamWorker.h>
+#include <Utils.h>
 #include <aidl/Gtest.h>
 #include <aidl/Vintf.h>
 #include <aidl/android/hardware/audio/core/IConfig.h>
 #include <aidl/android/hardware/audio/core/IModule.h>
 #include <aidl/android/media/audio/common/AudioIoFlags.h>
 #include <aidl/android/media/audio/common/AudioOutputFlags.h>
+#include <android-base/chrono_utils.h>
 #include <android-base/properties.h>
 #include <android/binder_manager.h>
 #include <android/binder_process.h>
+#include <fmq/AidlMessageQueue.h>
 
 #include "ModuleConfig.h"
 
@@ -63,6 +68,9 @@
 using aidl::android::media::audio::common::AudioPortExt;
 using aidl::android::media::audio::common::AudioSource;
 using aidl::android::media::audio::common::AudioUsage;
+using android::hardware::audio::common::getFrameSizeInBytes;
+using android::hardware::audio::common::StreamLogic;
+using android::hardware::audio::common::StreamWorker;
 using ndk::ScopedAStatus;
 
 namespace ndk {
@@ -126,20 +134,6 @@
     }
 };
 
-template <typename T>
-struct IsInput {
-    constexpr operator bool() const;
-};
-
-template <>
-constexpr IsInput<IStreamIn>::operator bool() const {
-    return true;
-}
-template <>
-constexpr IsInput<IStreamOut>::operator bool() const {
-    return false;
-}
-
 // All 'With*' classes are move-only because they are associated with some
 // resource or state of a HAL module.
 class WithDebugFlags {
@@ -228,7 +222,7 @@
 class AudioCoreModule : public testing::TestWithParam<std::string> {
   public:
     // The default buffer size is used mostly for negative tests.
-    static constexpr int kDefaultBufferSize = 256;
+    static constexpr int kDefaultBufferSizeFrames = 256;
 
     void SetUp() override {
         ASSERT_NO_FATAL_FAILURE(ConnectToService());
@@ -372,6 +366,216 @@
     AudioPort mConnectedPort;
 };
 
+class StreamContext {
+  public:
+    typedef AidlMessageQueue<StreamDescriptor::Command,
+                             ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            CommandMQ;
+    typedef AidlMessageQueue<StreamDescriptor::Reply,
+                             ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            ReplyMQ;
+    typedef AidlMessageQueue<int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            DataMQ;
+
+    StreamContext(const AudioPortConfig& portConfig, const StreamDescriptor& descriptor)
+        : mFrameSizeBytes(
+                  getFrameSizeInBytes(portConfig.format.value(), portConfig.channelMask.value())),
+          mCommandMQ(new CommandMQ(descriptor.command)),
+          mReplyMQ(new ReplyMQ(descriptor.reply)),
+          mBufferSizeFrames(descriptor.bufferSizeFrames),
+          mDataMQ(maybeCreateDataMQ(descriptor)) {}
+    void checkIsValid() const {
+        EXPECT_NE(0UL, mFrameSizeBytes);
+        ASSERT_NE(nullptr, mCommandMQ);
+        EXPECT_TRUE(mCommandMQ->isValid());
+        ASSERT_NE(nullptr, mReplyMQ);
+        EXPECT_TRUE(mReplyMQ->isValid());
+        if (mDataMQ != nullptr) {
+            EXPECT_TRUE(mDataMQ->isValid());
+        }
+    }
+    size_t getBufferSizeBytes() const { return mFrameSizeBytes * mBufferSizeFrames; }
+    size_t getBufferSizeFrames() const { return mBufferSizeFrames; }
+    CommandMQ* getCommandMQ() const { return mCommandMQ.get(); }
+    DataMQ* getDataMQ() const { return mDataMQ.get(); }
+    ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
+
+  private:
+    static std::unique_ptr<DataMQ> maybeCreateDataMQ(const StreamDescriptor& descriptor) {
+        using Tag = StreamDescriptor::AudioBuffer::Tag;
+        if (descriptor.audio.getTag() == Tag::fmq) {
+            return std::make_unique<DataMQ>(descriptor.audio.get<Tag::fmq>());
+        }
+        return nullptr;
+    }
+
+    const size_t mFrameSizeBytes;
+    std::unique_ptr<CommandMQ> mCommandMQ;
+    std::unique_ptr<ReplyMQ> mReplyMQ;
+    const size_t mBufferSizeFrames;
+    std::unique_ptr<DataMQ> mDataMQ;
+};
+
+class StreamCommonLogic : public StreamLogic {
+  public:
+    StreamDescriptor::Position getLastObservablePosition() {
+        std::lock_guard<std::mutex> lock(mLock);
+        return mLastReply.observable;
+    }
+
+  protected:
+    explicit StreamCommonLogic(const StreamContext& context)
+        : mCommandMQ(context.getCommandMQ()),
+          mReplyMQ(context.getReplyMQ()),
+          mDataMQ(context.getDataMQ()),
+          mData(context.getBufferSizeBytes()) {}
+    StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; }
+    StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; }
+
+    std::string init() override { return ""; }
+
+    StreamContext::CommandMQ* mCommandMQ;
+    StreamContext::ReplyMQ* mReplyMQ;
+    StreamContext::DataMQ* mDataMQ;
+    std::vector<int8_t> mData;
+    std::mutex mLock;
+    StreamDescriptor::Reply mLastReply GUARDED_BY(mLock);
+};
+
+class StreamReaderLogic : public StreamCommonLogic {
+  public:
+    explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+
+  protected:
+    Status cycle() override {
+        StreamDescriptor::Command command{};
+        command.code = StreamDescriptor::COMMAND_BURST;
+        command.fmqByteCount = mData.size();
+        if (!mCommandMQ->writeBlocking(&command, 1)) {
+            LOG(ERROR) << __func__ << ": writing of command into MQ failed";
+            return Status::ABORT;
+        }
+        StreamDescriptor::Reply reply{};
+        if (!mReplyMQ->readBlocking(&reply, 1)) {
+            LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+            return Status::ABORT;
+        }
+        if (reply.status != STATUS_OK) {
+            LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
+            return Status::ABORT;
+        }
+        if (reply.fmqByteCount < 0 || reply.fmqByteCount > command.fmqByteCount) {
+            LOG(ERROR) << __func__
+                       << ": received invalid byte count in the reply: " << reply.fmqByteCount;
+            return Status::ABORT;
+        }
+        {
+            std::lock_guard<std::mutex> lock(mLock);
+            mLastReply = reply;
+        }
+        const size_t readCount = std::min({mDataMQ->availableToRead(),
+                                           static_cast<size_t>(reply.fmqByteCount), mData.size()});
+        if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) {
+            return Status::CONTINUE;
+        }
+        LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
+        return Status::ABORT;
+    }
+};
+using StreamReader = StreamWorker<StreamReaderLogic>;
+
+class StreamWriterLogic : public StreamCommonLogic {
+  public:
+    explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+
+  protected:
+    Status cycle() override {
+        if (!mDataMQ->write(mData.data(), mData.size())) {
+            LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
+            return Status::ABORT;
+        }
+        StreamDescriptor::Command command{};
+        command.code = StreamDescriptor::COMMAND_BURST;
+        command.fmqByteCount = mData.size();
+        if (!mCommandMQ->writeBlocking(&command, 1)) {
+            LOG(ERROR) << __func__ << ": writing of command into MQ failed";
+            return Status::ABORT;
+        }
+        StreamDescriptor::Reply reply{};
+        if (!mReplyMQ->readBlocking(&reply, 1)) {
+            LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+            return Status::ABORT;
+        }
+        if (reply.status != STATUS_OK) {
+            LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
+            return Status::ABORT;
+        }
+        if (reply.fmqByteCount < 0 || reply.fmqByteCount > command.fmqByteCount) {
+            LOG(ERROR) << __func__
+                       << ": received invalid byte count in the reply: " << reply.fmqByteCount;
+            return Status::ABORT;
+        }
+        {
+            std::lock_guard<std::mutex> lock(mLock);
+            mLastReply = reply;
+        }
+        return Status::CONTINUE;
+    }
+};
+using StreamWriter = StreamWorker<StreamWriterLogic>;
+
+template <typename T>
+struct IOTraits {
+    static constexpr bool is_input = std::is_same_v<T, IStreamIn>;
+    using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
+};
+
+// A dedicated version to test replies to invalid commands.
+class StreamInvalidCommandLogic : public StreamCommonLogic {
+  public:
+    StreamInvalidCommandLogic(const StreamContext& context,
+                              const std::vector<StreamDescriptor::Command>& commands)
+        : StreamCommonLogic(context), mCommands(commands) {}
+
+    std::vector<std::string> getUnexpectedStatuses() {
+        std::lock_guard<std::mutex> lock(mLock);
+        return mUnexpectedStatuses;
+    }
+
+  protected:
+    Status cycle() override {
+        // Send all commands in one cycle to simplify testing.
+        // Extra logging helps to sort out issues with unexpected HAL behavior.
+        for (const auto& command : mCommands) {
+            LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ...";
+            if (!getCommandMQ()->writeBlocking(&command, 1)) {
+                LOG(ERROR) << __func__ << ": writing of command into MQ failed";
+                return Status::ABORT;
+            }
+            StreamDescriptor::Reply reply{};
+            LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "...";
+            if (!getReplyMQ()->readBlocking(&reply, 1)) {
+                LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+                return Status::ABORT;
+            }
+            LOG(INFO) << __func__ << ": received status " << statusToString(reply.status)
+                      << " for command " << command.toString();
+            if (reply.status != STATUS_BAD_VALUE) {
+                std::string s = command.toString();
+                s.append(", ").append(statusToString(reply.status));
+                std::lock_guard<std::mutex> lock(mLock);
+                mUnexpectedStatuses.push_back(std::move(s));
+            }
+        };
+        return Status::EXIT;
+    }
+
+  private:
+    const std::vector<StreamDescriptor::Command> mCommands;
+    std::mutex mLock;
+    std::vector<std::string> mUnexpectedStatuses GUARDED_BY(mLock);
+};
+
 template <typename Stream>
 class WithStream {
   public:
@@ -381,25 +585,31 @@
     WithStream& operator=(const WithStream&) = delete;
     ~WithStream() {
         if (mStream != nullptr) {
+            mContext.reset();
             ScopedAStatus status = mStream->close();
             EXPECT_EQ(EX_NONE, status.getExceptionCode())
                     << status << "; port config id " << getPortId();
         }
     }
     void SetUpPortConfig(IModule* module) { ASSERT_NO_FATAL_FAILURE(mPortConfig.SetUp(module)); }
-    ScopedAStatus SetUpNoChecks(IModule* module, long bufferSize) {
-        return SetUpNoChecks(module, mPortConfig.get(), bufferSize);
+    ScopedAStatus SetUpNoChecks(IModule* module, long bufferSizeFrames) {
+        return SetUpNoChecks(module, mPortConfig.get(), bufferSizeFrames);
     }
     ScopedAStatus SetUpNoChecks(IModule* module, const AudioPortConfig& portConfig,
-                                long bufferSize);
-    void SetUp(IModule* module, long bufferSize) {
+                                long bufferSizeFrames);
+    void SetUp(IModule* module, long bufferSizeFrames) {
         ASSERT_NO_FATAL_FAILURE(SetUpPortConfig(module));
-        ScopedAStatus status = SetUpNoChecks(module, bufferSize);
+        ScopedAStatus status = SetUpNoChecks(module, bufferSizeFrames);
         ASSERT_EQ(EX_NONE, status.getExceptionCode())
                 << status << "; port config id " << getPortId();
         ASSERT_NE(nullptr, mStream) << "; port config id " << getPortId();
+        EXPECT_GE(mDescriptor.bufferSizeFrames, bufferSizeFrames)
+                << "actual buffer size must be no less than requested";
+        mContext.emplace(mPortConfig.get(), mDescriptor);
+        ASSERT_NO_FATAL_FAILURE(mContext.value().checkIsValid());
     }
     Stream* get() const { return mStream.get(); }
+    const StreamContext* getContext() const { return mContext ? &(mContext.value()) : nullptr; }
     std::shared_ptr<Stream> getSharedPointer() const { return mStream; }
     const AudioPortConfig& getPortConfig() const { return mPortConfig.get(); }
     int32_t getPortId() const { return mPortConfig.getId(); }
@@ -408,6 +618,7 @@
     WithAudioPortConfig mPortConfig;
     std::shared_ptr<Stream> mStream;
     StreamDescriptor mDescriptor;
+    std::optional<StreamContext> mContext;
 };
 
 SinkMetadata GenerateSinkMetadata(const AudioPortConfig& portConfig) {
@@ -423,11 +634,11 @@
 template <>
 ScopedAStatus WithStream<IStreamIn>::SetUpNoChecks(IModule* module,
                                                    const AudioPortConfig& portConfig,
-                                                   long bufferSize) {
+                                                   long bufferSizeFrames) {
     aidl::android::hardware::audio::core::IModule::OpenInputStreamArguments args;
     args.portConfigId = portConfig.id;
     args.sinkMetadata = GenerateSinkMetadata(portConfig);
-    args.bufferSizeFrames = bufferSize;
+    args.bufferSizeFrames = bufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenInputStreamReturn ret;
     ScopedAStatus status = module->openInputStream(args, &ret);
     if (status.isOk()) {
@@ -451,12 +662,12 @@
 template <>
 ScopedAStatus WithStream<IStreamOut>::SetUpNoChecks(IModule* module,
                                                     const AudioPortConfig& portConfig,
-                                                    long bufferSize) {
+                                                    long bufferSizeFrames) {
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
     args.portConfigId = portConfig.id;
     args.sourceMetadata = GenerateSourceMetadata(portConfig);
     args.offloadInfo = ModuleConfig::generateOffloadInfoIfNeeded(portConfig);
-    args.bufferSizeFrames = bufferSize;
+    args.bufferSizeFrames = bufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
     ScopedAStatus status = module->openOutputStream(args, &ret);
     if (status.isOk()) {
@@ -471,6 +682,10 @@
     WithAudioPatch() {}
     WithAudioPatch(const AudioPortConfig& srcPortConfig, const AudioPortConfig& sinkPortConfig)
         : mSrcPortConfig(srcPortConfig), mSinkPortConfig(sinkPortConfig) {}
+    WithAudioPatch(bool sinkIsCfg1, const AudioPortConfig& portConfig1,
+                   const AudioPortConfig& portConfig2)
+        : mSrcPortConfig(sinkIsCfg1 ? portConfig2 : portConfig1),
+          mSinkPortConfig(sinkIsCfg1 ? portConfig1 : portConfig2) {}
     WithAudioPatch(const WithAudioPatch&) = delete;
     WithAudioPatch& operator=(const WithAudioPatch&) = delete;
     ~WithAudioPatch() {
@@ -502,6 +717,11 @@
     }
     int32_t getId() const { return mPatch.id; }
     const AudioPatch& get() const { return mPatch; }
+    const AudioPortConfig& getSinkPortConfig() const { return mSinkPortConfig.get(); }
+    const AudioPortConfig& getSrcPortConfig() const { return mSrcPortConfig.get(); }
+    const AudioPortConfig& getPortConfig(bool getSink) const {
+        return getSink ? getSinkPortConfig() : getSrcPortConfig();
+    }
 
   private:
     WithAudioPortConfig mSrcPortConfig;
@@ -567,7 +787,7 @@
     }
     for (const auto& route : routes) {
         std::set<int32_t> sources(route.sourcePortIds.begin(), route.sourcePortIds.end());
-        EXPECT_NE(0, sources.size())
+        EXPECT_NE(0UL, sources.size())
                 << "empty audio port sinks in the audio route: " << route.toString();
         EXPECT_EQ(sources.size(), route.sourcePortIds.size())
                 << "IDs of audio port sinks are not unique in the audio route: "
@@ -584,10 +804,10 @@
         ASSERT_EQ(EX_NONE, status.getExceptionCode()) << status;
     }
     for (const auto& route : routes) {
-        EXPECT_EQ(1, portIds.count(route.sinkPortId))
+        EXPECT_EQ(1UL, portIds.count(route.sinkPortId))
                 << route.sinkPortId << " sink port id is unknown";
         for (const auto& source : route.sourcePortIds) {
-            EXPECT_EQ(1, portIds.count(source)) << source << " source port id is unknown";
+            EXPECT_EQ(1UL, portIds.count(source)) << source << " source port id is unknown";
         }
     }
 }
@@ -649,7 +869,7 @@
                         << "At least two output device ports are declared as default: "
                         << defaultOutput.value() << " and " << port.id;
                 defaultOutput = port.id;
-                EXPECT_EQ(0, outputs.count(devicePort.device))
+                EXPECT_EQ(0UL, outputs.count(devicePort.device))
                         << "Non-unique output device: " << devicePort.device.toString();
                 outputs.insert(devicePort.device);
             } else if (port.flags.getTag() == AudioIoFlags::Tag::input) {
@@ -657,7 +877,7 @@
                         << "At least two input device ports are declared as default: "
                         << defaultInput.value() << " and " << port.id;
                 defaultInput = port.id;
-                EXPECT_EQ(0, inputs.count(devicePort.device))
+                EXPECT_EQ(0UL, inputs.count(devicePort.device))
                         << "Non-unique input device: " << devicePort.device.toString();
                 inputs.insert(devicePort.device);
             } else {
@@ -744,7 +964,7 @@
                 << status << " returned for getAudioPort port ID " << connectedPortId;
         EXPECT_EQ(portConnected.get(), connectedPort);
         const auto& portProfiles = connectedPort.profiles;
-        EXPECT_NE(0, portProfiles.size())
+        EXPECT_NE(0UL, portProfiles.size())
                 << "Connected port has no profiles: " << connectedPort.toString();
         const auto dynamicProfileIt =
                 std::find_if(portProfiles.begin(), portProfiles.end(), [](const auto& profile) {
@@ -773,7 +993,7 @@
         {
             aidl::android::hardware::audio::core::IModule::OpenInputStreamArguments args;
             args.portConfigId = portConfigId;
-            args.bufferSizeFrames = kDefaultBufferSize;
+            args.bufferSizeFrames = kDefaultBufferSizeFrames;
             aidl::android::hardware::audio::core::IModule::OpenInputStreamReturn ret;
             ScopedAStatus status = module->openInputStream(args, &ret);
             EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())
@@ -783,7 +1003,7 @@
         {
             aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
             args.portConfigId = portConfigId;
-            args.bufferSizeFrames = kDefaultBufferSize;
+            args.bufferSizeFrames = kDefaultBufferSizeFrames;
             aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
             ScopedAStatus status = module->openOutputStream(args, &ret);
             EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())
@@ -807,7 +1027,7 @@
         ASSERT_EQ(EX_NONE, status.getExceptionCode()) << status;
     }
     for (const auto& config : portConfigs) {
-        EXPECT_EQ(1, portIds.count(config.portId))
+        EXPECT_EQ(1UL, portIds.count(config.portId))
                 << config.portId << " port id is unknown, config id " << config.id;
     }
 }
@@ -1151,14 +1371,14 @@
     }
 
     void CloseTwice() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         std::shared_ptr<Stream> heldStream;
         {
             WithStream<Stream> stream(portConfig.value());
-            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
             heldStream = stream.getSharedPointer();
         }
         ScopedAStatus status = heldStream->close();
@@ -1167,15 +1387,16 @@
     }
 
     void OpenAllConfigs() {
-        const auto allPortConfigs = moduleConfig->getPortConfigsForMixPorts(IsInput<Stream>());
+        const auto allPortConfigs =
+                moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
         for (const auto& portConfig : allPortConfigs) {
             WithStream<Stream> stream(portConfig);
-            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
         }
     }
 
     void OpenInvalidBufferSize() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
@@ -1194,13 +1415,14 @@
 
     void OpenInvalidDirection() {
         // Important! The direction of the port config must be reversed.
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(!IsInput<Stream>());
+        const auto portConfig =
+                moduleConfig->getSingleConfigForMixPort(!IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         WithStream<Stream> stream(portConfig.value());
         ASSERT_NO_FATAL_FAILURE(stream.SetUpPortConfig(module.get()));
-        ScopedAStatus status = stream.SetUpNoChecks(module.get(), kDefaultBufferSize);
+        ScopedAStatus status = stream.SetUpNoChecks(module.get(), kDefaultBufferSizeFrames);
         EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())
                 << status << " open" << direction(true) << "Stream returned for port config ID "
                 << stream.getPortId();
@@ -1208,7 +1430,7 @@
     }
 
     void OpenOverMaxCount() {
-        constexpr bool isInput = IsInput<Stream>();
+        constexpr bool isInput = IOTraits<Stream>::is_input;
         auto ports = moduleConfig->getMixPorts(isInput);
         bool hasSingleRun = false;
         for (const auto& port : ports) {
@@ -1229,10 +1451,11 @@
                 streamWraps[i].emplace(portConfigs[i]);
                 WithStream<Stream>& stream = streamWraps[i].value();
                 if (i < maxStreamCount) {
-                    ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+                    ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
                 } else {
                     ASSERT_NO_FATAL_FAILURE(stream.SetUpPortConfig(module.get()));
-                    ScopedAStatus status = stream.SetUpNoChecks(module.get(), kDefaultBufferSize);
+                    ScopedAStatus status =
+                            stream.SetUpNoChecks(module.get(), kDefaultBufferSizeFrames);
                     EXPECT_EQ(EX_ILLEGAL_STATE, status.getExceptionCode())
                             << status << " open" << direction(true)
                             << "Stream returned for port config ID " << stream.getPortId()
@@ -1247,35 +1470,145 @@
     }
 
     void OpenTwiceSamePortConfig() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value()));
     }
 
+    void ReadOrWrite(bool useImpl2, bool testObservablePosition) {
+        const auto allPortConfigs =
+                moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
+        if (allPortConfigs.empty()) {
+            GTEST_SKIP() << "No mix ports have attached devices";
+        }
+        for (const auto& portConfig : allPortConfigs) {
+            EXPECT_NO_FATAL_FAILURE(ReadOrWriteImpl(portConfig, useImpl2, testObservablePosition))
+                    << portConfig.toString();
+        }
+    }
+
     void ResetPortConfigWithOpenStream() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         WithStream<Stream> stream(portConfig.value());
-        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
         ScopedAStatus status = module->resetAudioPortConfig(stream.getPortId());
         EXPECT_EQ(EX_ILLEGAL_STATE, status.getExceptionCode())
                 << status << " returned for port config ID " << stream.getPortId();
     }
 
+    void SendInvalidCommand() {
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
+        if (!portConfig.has_value()) {
+            GTEST_SKIP() << "No mix port for attached devices";
+        }
+        EXPECT_NO_FATAL_FAILURE(SendInvalidCommandImpl(portConfig.value()));
+    }
+
     void OpenTwiceSamePortConfigImpl(const AudioPortConfig& portConfig) {
         WithStream<Stream> stream1(portConfig);
-        ASSERT_NO_FATAL_FAILURE(stream1.SetUp(module.get(), kDefaultBufferSize));
+        ASSERT_NO_FATAL_FAILURE(stream1.SetUp(module.get(), kDefaultBufferSizeFrames));
         WithStream<Stream> stream2;
-        ScopedAStatus status =
-                stream2.SetUpNoChecks(module.get(), stream1.getPortConfig(), kDefaultBufferSize);
+        ScopedAStatus status = stream2.SetUpNoChecks(module.get(), stream1.getPortConfig(),
+                                                     kDefaultBufferSizeFrames);
         EXPECT_EQ(EX_ILLEGAL_STATE, status.getExceptionCode())
                 << status << " when opening " << direction(false)
                 << " stream twice for the same port config ID " << stream1.getPortId();
     }
+
+    template <class Worker>
+    void WaitForObservablePositionAdvance(Worker& worker) {
+        static constexpr int kWriteDurationUs = 50 * 1000;
+        static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000};
+        int64_t framesInitial;
+        framesInitial = worker.getLastObservablePosition().frames;
+        ASSERT_FALSE(worker.hasError());
+        bool timedOut = false;
+        int64_t frames = framesInitial;
+        for (android::base::Timer elapsed;
+             frames <= framesInitial && !worker.hasError() &&
+             !(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) {
+            usleep(kWriteDurationUs);
+            frames = worker.getLastObservablePosition().frames;
+        }
+        EXPECT_FALSE(timedOut);
+        EXPECT_FALSE(worker.hasError()) << worker.getError();
+        EXPECT_GT(frames, framesInitial);
+    }
+
+    void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useImpl2,
+                         bool testObservablePosition) {
+        if (!useImpl2) {
+            ASSERT_NO_FATAL_FAILURE(ReadOrWriteImpl1(portConfig, testObservablePosition));
+        } else {
+            ASSERT_NO_FATAL_FAILURE(ReadOrWriteImpl2(portConfig, testObservablePosition));
+        }
+    }
+
+    // Set up a patch first, then open a stream.
+    void ReadOrWriteImpl1(const AudioPortConfig& portConfig, bool testObservablePosition) {
+        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+                IOTraits<Stream>::is_input, portConfig);
+        ASSERT_FALSE(devicePorts.empty());
+        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+        WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+        WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        typename IOTraits<Stream>::Worker worker(*stream.getContext());
+
+        ASSERT_TRUE(worker.start());
+        ASSERT_TRUE(worker.waitForAtLeastOneCycle());
+        if (testObservablePosition) {
+            ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
+        }
+    }
+
+    // Open a stream, then set up a patch for it.
+    void ReadOrWriteImpl2(const AudioPortConfig& portConfig, bool testObservablePosition) {
+        WithStream<Stream> stream(portConfig);
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        typename IOTraits<Stream>::Worker worker(*stream.getContext());
+
+        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+                IOTraits<Stream>::is_input, portConfig);
+        ASSERT_FALSE(devicePorts.empty());
+        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+        WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+        ASSERT_TRUE(worker.start());
+        ASSERT_TRUE(worker.waitForAtLeastOneCycle());
+        if (testObservablePosition) {
+            ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
+        }
+    }
+
+    void SendInvalidCommandImpl(const AudioPortConfig& portConfig) {
+        std::vector<StreamDescriptor::Command> commands(6);
+        commands[0].code = -1;
+        commands[1].code = StreamDescriptor::COMMAND_BURST - 1;
+        commands[2].code = std::numeric_limits<int32_t>::min();
+        commands[3].code = std::numeric_limits<int32_t>::max();
+        commands[4].code = StreamDescriptor::COMMAND_BURST;
+        commands[4].fmqByteCount = -1;
+        commands[5].code = StreamDescriptor::COMMAND_BURST;
+        commands[5].fmqByteCount = std::numeric_limits<int32_t>::min();
+        WithStream<Stream> stream(portConfig);
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        StreamWorker<StreamInvalidCommandLogic> writer(*stream.getContext(), commands);
+        ASSERT_TRUE(writer.start());
+        writer.waitForAtLeastOneCycle();
+        auto unexpectedStatuses = writer.getUnexpectedStatuses();
+        EXPECT_EQ(0UL, unexpectedStatuses.size())
+                << "Pairs of (command, actual status): "
+                << android::internal::ToString(unexpectedStatuses);
+    }
 };
 using AudioStreamIn = AudioStream<IStreamIn>;
 using AudioStreamOut = AudioStream<IStreamOut>;
@@ -1292,6 +1625,13 @@
 #define TEST_IO_STREAM(method_name)                                                \
     TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
     TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
+#define TEST_IO_STREAM_2(method_name, arg1, arg2)           \
+    TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) {  \
+        ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2));   \
+    }                                                       \
+    TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \
+        ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2));   \
+    }
 
 TEST_IO_STREAM(CloseTwice);
 TEST_IO_STREAM(OpenAllConfigs);
@@ -1299,7 +1639,12 @@
 TEST_IO_STREAM(OpenInvalidDirection);
 TEST_IO_STREAM(OpenOverMaxCount);
 TEST_IO_STREAM(OpenTwiceSamePortConfig);
+TEST_IO_STREAM_2(ReadOrWrite, false, false);
+TEST_IO_STREAM_2(ReadOrWrite, true, false);
+TEST_IO_STREAM_2(ReadOrWrite, false, true);
+TEST_IO_STREAM_2(ReadOrWrite, true, true);
 TEST_IO_STREAM(ResetPortConfigWithOpenStream);
+TEST_IO_STREAM(SendInvalidCommand);
 
 TEST_P(AudioStreamOut, OpenTwicePrimary) {
     const auto mixPorts = moduleConfig->getMixPorts(false);
@@ -1340,7 +1685,7 @@
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
     args.portConfigId = portConfig.value().id;
     args.sourceMetadata = GenerateSourceMetadata(portConfig.value());
-    args.bufferSizeFrames = kDefaultBufferSize;
+    args.bufferSizeFrames = kDefaultBufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
     ScopedAStatus status = module->openOutputStream(args, &ret);
     EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())