audio: Implementation of audio I/O, part II

This patch implements audio I/O for the synchronous, non-MMAP
case.

Updated the StreamDescriptor structure to make it usable.
Clarified comments on the expectations for the client and
the HAL module.

Bug: 205884982
Test: atest VtsHalAudioCoreTargetTest
Merged-In: I09651c6e80a397c80870622ac19234b4d4a38cbb
Change-Id: I09651c6e80a397c80870622ac19234b4d4a38cbb
(cherry picked from commit 01803d454ac192f4b6b732944f0be324b1b03a7f)
diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp
index 1d0ec7c..7b35133 100644
--- a/audio/aidl/vts/Android.bp
+++ b/audio/aidl/vts/Android.bp
@@ -13,12 +13,10 @@
         "VtsHalTargetTestDefaults",
         "use_libaidlvintf_gtest_helper_static",
     ],
-    srcs: [
-        "ModuleConfig.cpp",
-        "VtsHalAudioCoreTargetTest.cpp",
-    ],
     shared_libs: [
         "libbinder_ndk",
+        "libcutils",
+        "libfmq",
     ],
     static_libs: [
         "android.hardware.audio.common-V1-ndk",
@@ -28,6 +26,16 @@
         "android.media.audio.common.types-V1-ndk",
         "libaudioaidlcommon",
     ],
+    cflags: [
+        "-Wall",
+        "-Wextra",
+        "-Werror",
+        "-Wthread-safety",
+    ],
+    srcs: [
+        "ModuleConfig.cpp",
+        "VtsHalAudioCoreTargetTest.cpp",
+    ],
     test_suites: [
         "general-tests",
         "vts",
diff --git a/audio/aidl/vts/ModuleConfig.cpp b/audio/aidl/vts/ModuleConfig.cpp
index 969b0e9..e36ab4a 100644
--- a/audio/aidl/vts/ModuleConfig.cpp
+++ b/audio/aidl/vts/ModuleConfig.cpp
@@ -123,6 +123,15 @@
     return result;
 }
 
+std::vector<AudioPort> ModuleConfig::getAttachedDevicesPortsForMixPort(
+        bool isInput, const AudioPortConfig& mixPortConfig) const {
+    const auto mixPortIt = findById<AudioPort>(mPorts, mixPortConfig.portId);
+    if (mixPortIt != mPorts.end()) {
+        return getAttachedDevicesPortsForMixPort(isInput, *mixPortIt);
+    }
+    return {};
+}
+
 std::vector<AudioPort> ModuleConfig::getAttachedSinkDevicesPortsForMixPort(
         const AudioPort& mixPort) const {
     std::vector<AudioPort> result;
diff --git a/audio/aidl/vts/ModuleConfig.h b/audio/aidl/vts/ModuleConfig.h
index df13430..552f971 100644
--- a/audio/aidl/vts/ModuleConfig.h
+++ b/audio/aidl/vts/ModuleConfig.h
@@ -54,6 +54,9 @@
         return isInput ? getAttachedSourceDevicesPortsForMixPort(mixPort)
                        : getAttachedSinkDevicesPortsForMixPort(mixPort);
     }
+    std::vector<aidl::android::media::audio::common::AudioPort> getAttachedDevicesPortsForMixPort(
+            bool isInput,
+            const aidl::android::media::audio::common::AudioPortConfig& mixPortConfig) const;
     std::vector<aidl::android::media::audio::common::AudioPort>
     getAttachedSinkDevicesPortsForMixPort(
             const aidl::android::media::audio::common::AudioPort& mixPort) const;
diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
index 0ecc057..ac41ac3 100644
--- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
@@ -22,19 +22,24 @@
 #include <optional>
 #include <set>
 #include <string>
+#include <vector>
 
 #define LOG_TAG "VtsHalAudioCore"
 #include <android-base/logging.h>
 
+#include <StreamWorker.h>
+#include <Utils.h>
 #include <aidl/Gtest.h>
 #include <aidl/Vintf.h>
 #include <aidl/android/hardware/audio/core/IConfig.h>
 #include <aidl/android/hardware/audio/core/IModule.h>
 #include <aidl/android/media/audio/common/AudioIoFlags.h>
 #include <aidl/android/media/audio/common/AudioOutputFlags.h>
+#include <android-base/chrono_utils.h>
 #include <android-base/properties.h>
 #include <android/binder_manager.h>
 #include <android/binder_process.h>
+#include <fmq/AidlMessageQueue.h>
 
 #include "ModuleConfig.h"
 
@@ -63,6 +68,9 @@
 using aidl::android::media::audio::common::AudioPortExt;
 using aidl::android::media::audio::common::AudioSource;
 using aidl::android::media::audio::common::AudioUsage;
+using android::hardware::audio::common::getFrameSizeInBytes;
+using android::hardware::audio::common::StreamLogic;
+using android::hardware::audio::common::StreamWorker;
 using ndk::ScopedAStatus;
 
 namespace ndk {
@@ -126,20 +134,6 @@
     }
 };
 
-template <typename T>
-struct IsInput {
-    constexpr operator bool() const;
-};
-
-template <>
-constexpr IsInput<IStreamIn>::operator bool() const {
-    return true;
-}
-template <>
-constexpr IsInput<IStreamOut>::operator bool() const {
-    return false;
-}
-
 // All 'With*' classes are move-only because they are associated with some
 // resource or state of a HAL module.
 class WithDebugFlags {
@@ -228,7 +222,7 @@
 class AudioCoreModule : public testing::TestWithParam<std::string> {
   public:
     // The default buffer size is used mostly for negative tests.
-    static constexpr int kDefaultBufferSize = 256;
+    static constexpr int kDefaultBufferSizeFrames = 256;
 
     void SetUp() override {
         ASSERT_NO_FATAL_FAILURE(ConnectToService());
@@ -372,6 +366,216 @@
     AudioPort mConnectedPort;
 };
 
+class StreamContext {
+  public:
+    typedef AidlMessageQueue<StreamDescriptor::Command,
+                             ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            CommandMQ;
+    typedef AidlMessageQueue<StreamDescriptor::Reply,
+                             ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            ReplyMQ;
+    typedef AidlMessageQueue<int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
+            DataMQ;
+
+    StreamContext(const AudioPortConfig& portConfig, const StreamDescriptor& descriptor)
+        : mFrameSizeBytes(
+                  getFrameSizeInBytes(portConfig.format.value(), portConfig.channelMask.value())),
+          mCommandMQ(new CommandMQ(descriptor.command)),
+          mReplyMQ(new ReplyMQ(descriptor.reply)),
+          mBufferSizeFrames(descriptor.bufferSizeFrames),
+          mDataMQ(maybeCreateDataMQ(descriptor)) {}
+    void checkIsValid() const {
+        EXPECT_NE(0UL, mFrameSizeBytes);
+        ASSERT_NE(nullptr, mCommandMQ);
+        EXPECT_TRUE(mCommandMQ->isValid());
+        ASSERT_NE(nullptr, mReplyMQ);
+        EXPECT_TRUE(mReplyMQ->isValid());
+        if (mDataMQ != nullptr) {
+            EXPECT_TRUE(mDataMQ->isValid());
+        }
+    }
+    size_t getBufferSizeBytes() const { return mFrameSizeBytes * mBufferSizeFrames; }
+    size_t getBufferSizeFrames() const { return mBufferSizeFrames; }
+    CommandMQ* getCommandMQ() const { return mCommandMQ.get(); }
+    DataMQ* getDataMQ() const { return mDataMQ.get(); }
+    ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
+
+  private:
+    static std::unique_ptr<DataMQ> maybeCreateDataMQ(const StreamDescriptor& descriptor) {
+        using Tag = StreamDescriptor::AudioBuffer::Tag;
+        if (descriptor.audio.getTag() == Tag::fmq) {
+            return std::make_unique<DataMQ>(descriptor.audio.get<Tag::fmq>());
+        }
+        return nullptr;
+    }
+
+    const size_t mFrameSizeBytes;
+    std::unique_ptr<CommandMQ> mCommandMQ;
+    std::unique_ptr<ReplyMQ> mReplyMQ;
+    const size_t mBufferSizeFrames;
+    std::unique_ptr<DataMQ> mDataMQ;
+};
+
+class StreamCommonLogic : public StreamLogic {
+  public:
+    StreamDescriptor::Position getLastObservablePosition() {
+        std::lock_guard<std::mutex> lock(mLock);
+        return mLastReply.observable;
+    }
+
+  protected:
+    explicit StreamCommonLogic(const StreamContext& context)
+        : mCommandMQ(context.getCommandMQ()),
+          mReplyMQ(context.getReplyMQ()),
+          mDataMQ(context.getDataMQ()),
+          mData(context.getBufferSizeBytes()) {}
+    StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; }
+    StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; }
+
+    std::string init() override { return ""; }
+
+    StreamContext::CommandMQ* mCommandMQ;
+    StreamContext::ReplyMQ* mReplyMQ;
+    StreamContext::DataMQ* mDataMQ;
+    std::vector<int8_t> mData;
+    std::mutex mLock;
+    StreamDescriptor::Reply mLastReply GUARDED_BY(mLock);
+};
+
+class StreamReaderLogic : public StreamCommonLogic {
+  public:
+    explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+
+  protected:
+    Status cycle() override {
+        StreamDescriptor::Command command{};
+        command.code = StreamDescriptor::COMMAND_BURST;
+        command.fmqByteCount = mData.size();
+        if (!mCommandMQ->writeBlocking(&command, 1)) {
+            LOG(ERROR) << __func__ << ": writing of command into MQ failed";
+            return Status::ABORT;
+        }
+        StreamDescriptor::Reply reply{};
+        if (!mReplyMQ->readBlocking(&reply, 1)) {
+            LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+            return Status::ABORT;
+        }
+        if (reply.status != STATUS_OK) {
+            LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
+            return Status::ABORT;
+        }
+        if (reply.fmqByteCount < 0 || reply.fmqByteCount > command.fmqByteCount) {
+            LOG(ERROR) << __func__
+                       << ": received invalid byte count in the reply: " << reply.fmqByteCount;
+            return Status::ABORT;
+        }
+        {
+            std::lock_guard<std::mutex> lock(mLock);
+            mLastReply = reply;
+        }
+        const size_t readCount = std::min({mDataMQ->availableToRead(),
+                                           static_cast<size_t>(reply.fmqByteCount), mData.size()});
+        if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) {
+            return Status::CONTINUE;
+        }
+        LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
+        return Status::ABORT;
+    }
+};
+using StreamReader = StreamWorker<StreamReaderLogic>;
+
+class StreamWriterLogic : public StreamCommonLogic {
+  public:
+    explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+
+  protected:
+    Status cycle() override {
+        if (!mDataMQ->write(mData.data(), mData.size())) {
+            LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
+            return Status::ABORT;
+        }
+        StreamDescriptor::Command command{};
+        command.code = StreamDescriptor::COMMAND_BURST;
+        command.fmqByteCount = mData.size();
+        if (!mCommandMQ->writeBlocking(&command, 1)) {
+            LOG(ERROR) << __func__ << ": writing of command into MQ failed";
+            return Status::ABORT;
+        }
+        StreamDescriptor::Reply reply{};
+        if (!mReplyMQ->readBlocking(&reply, 1)) {
+            LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+            return Status::ABORT;
+        }
+        if (reply.status != STATUS_OK) {
+            LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
+            return Status::ABORT;
+        }
+        if (reply.fmqByteCount < 0 || reply.fmqByteCount > command.fmqByteCount) {
+            LOG(ERROR) << __func__
+                       << ": received invalid byte count in the reply: " << reply.fmqByteCount;
+            return Status::ABORT;
+        }
+        {
+            std::lock_guard<std::mutex> lock(mLock);
+            mLastReply = reply;
+        }
+        return Status::CONTINUE;
+    }
+};
+using StreamWriter = StreamWorker<StreamWriterLogic>;
+
+template <typename T>
+struct IOTraits {
+    static constexpr bool is_input = std::is_same_v<T, IStreamIn>;
+    using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
+};
+
+// A dedicated version to test replies to invalid commands.
+class StreamInvalidCommandLogic : public StreamCommonLogic {
+  public:
+    StreamInvalidCommandLogic(const StreamContext& context,
+                              const std::vector<StreamDescriptor::Command>& commands)
+        : StreamCommonLogic(context), mCommands(commands) {}
+
+    std::vector<std::string> getUnexpectedStatuses() {
+        std::lock_guard<std::mutex> lock(mLock);
+        return mUnexpectedStatuses;
+    }
+
+  protected:
+    Status cycle() override {
+        // Send all commands in one cycle to simplify testing.
+        // Extra logging helps to sort out issues with unexpected HAL behavior.
+        for (const auto& command : mCommands) {
+            LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ...";
+            if (!getCommandMQ()->writeBlocking(&command, 1)) {
+                LOG(ERROR) << __func__ << ": writing of command into MQ failed";
+                return Status::ABORT;
+            }
+            StreamDescriptor::Reply reply{};
+            LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "...";
+            if (!getReplyMQ()->readBlocking(&reply, 1)) {
+                LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
+                return Status::ABORT;
+            }
+            LOG(INFO) << __func__ << ": received status " << statusToString(reply.status)
+                      << " for command " << command.toString();
+            if (reply.status != STATUS_BAD_VALUE) {
+                std::string s = command.toString();
+                s.append(", ").append(statusToString(reply.status));
+                std::lock_guard<std::mutex> lock(mLock);
+                mUnexpectedStatuses.push_back(std::move(s));
+            }
+        };
+        return Status::EXIT;
+    }
+
+  private:
+    const std::vector<StreamDescriptor::Command> mCommands;
+    std::mutex mLock;
+    std::vector<std::string> mUnexpectedStatuses GUARDED_BY(mLock);
+};
+
 template <typename Stream>
 class WithStream {
   public:
@@ -381,25 +585,31 @@
     WithStream& operator=(const WithStream&) = delete;
     ~WithStream() {
         if (mStream != nullptr) {
+            mContext.reset();
             ScopedAStatus status = mStream->close();
             EXPECT_EQ(EX_NONE, status.getExceptionCode())
                     << status << "; port config id " << getPortId();
         }
     }
     void SetUpPortConfig(IModule* module) { ASSERT_NO_FATAL_FAILURE(mPortConfig.SetUp(module)); }
-    ScopedAStatus SetUpNoChecks(IModule* module, long bufferSize) {
-        return SetUpNoChecks(module, mPortConfig.get(), bufferSize);
+    ScopedAStatus SetUpNoChecks(IModule* module, long bufferSizeFrames) {
+        return SetUpNoChecks(module, mPortConfig.get(), bufferSizeFrames);
     }
     ScopedAStatus SetUpNoChecks(IModule* module, const AudioPortConfig& portConfig,
-                                long bufferSize);
-    void SetUp(IModule* module, long bufferSize) {
+                                long bufferSizeFrames);
+    void SetUp(IModule* module, long bufferSizeFrames) {
         ASSERT_NO_FATAL_FAILURE(SetUpPortConfig(module));
-        ScopedAStatus status = SetUpNoChecks(module, bufferSize);
+        ScopedAStatus status = SetUpNoChecks(module, bufferSizeFrames);
         ASSERT_EQ(EX_NONE, status.getExceptionCode())
                 << status << "; port config id " << getPortId();
         ASSERT_NE(nullptr, mStream) << "; port config id " << getPortId();
+        EXPECT_GE(mDescriptor.bufferSizeFrames, bufferSizeFrames)
+                << "actual buffer size must be no less than requested";
+        mContext.emplace(mPortConfig.get(), mDescriptor);
+        ASSERT_NO_FATAL_FAILURE(mContext.value().checkIsValid());
     }
     Stream* get() const { return mStream.get(); }
+    const StreamContext* getContext() const { return mContext ? &(mContext.value()) : nullptr; }
     std::shared_ptr<Stream> getSharedPointer() const { return mStream; }
     const AudioPortConfig& getPortConfig() const { return mPortConfig.get(); }
     int32_t getPortId() const { return mPortConfig.getId(); }
@@ -408,6 +618,7 @@
     WithAudioPortConfig mPortConfig;
     std::shared_ptr<Stream> mStream;
     StreamDescriptor mDescriptor;
+    std::optional<StreamContext> mContext;
 };
 
 SinkMetadata GenerateSinkMetadata(const AudioPortConfig& portConfig) {
@@ -423,11 +634,11 @@
 template <>
 ScopedAStatus WithStream<IStreamIn>::SetUpNoChecks(IModule* module,
                                                    const AudioPortConfig& portConfig,
-                                                   long bufferSize) {
+                                                   long bufferSizeFrames) {
     aidl::android::hardware::audio::core::IModule::OpenInputStreamArguments args;
     args.portConfigId = portConfig.id;
     args.sinkMetadata = GenerateSinkMetadata(portConfig);
-    args.bufferSizeFrames = bufferSize;
+    args.bufferSizeFrames = bufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenInputStreamReturn ret;
     ScopedAStatus status = module->openInputStream(args, &ret);
     if (status.isOk()) {
@@ -451,12 +662,12 @@
 template <>
 ScopedAStatus WithStream<IStreamOut>::SetUpNoChecks(IModule* module,
                                                     const AudioPortConfig& portConfig,
-                                                    long bufferSize) {
+                                                    long bufferSizeFrames) {
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
     args.portConfigId = portConfig.id;
     args.sourceMetadata = GenerateSourceMetadata(portConfig);
     args.offloadInfo = ModuleConfig::generateOffloadInfoIfNeeded(portConfig);
-    args.bufferSizeFrames = bufferSize;
+    args.bufferSizeFrames = bufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
     ScopedAStatus status = module->openOutputStream(args, &ret);
     if (status.isOk()) {
@@ -471,6 +682,10 @@
     WithAudioPatch() {}
     WithAudioPatch(const AudioPortConfig& srcPortConfig, const AudioPortConfig& sinkPortConfig)
         : mSrcPortConfig(srcPortConfig), mSinkPortConfig(sinkPortConfig) {}
+    WithAudioPatch(bool sinkIsCfg1, const AudioPortConfig& portConfig1,
+                   const AudioPortConfig& portConfig2)
+        : mSrcPortConfig(sinkIsCfg1 ? portConfig2 : portConfig1),
+          mSinkPortConfig(sinkIsCfg1 ? portConfig1 : portConfig2) {}
     WithAudioPatch(const WithAudioPatch&) = delete;
     WithAudioPatch& operator=(const WithAudioPatch&) = delete;
     ~WithAudioPatch() {
@@ -502,6 +717,11 @@
     }
     int32_t getId() const { return mPatch.id; }
     const AudioPatch& get() const { return mPatch; }
+    const AudioPortConfig& getSinkPortConfig() const { return mSinkPortConfig.get(); }
+    const AudioPortConfig& getSrcPortConfig() const { return mSrcPortConfig.get(); }
+    const AudioPortConfig& getPortConfig(bool getSink) const {
+        return getSink ? getSinkPortConfig() : getSrcPortConfig();
+    }
 
   private:
     WithAudioPortConfig mSrcPortConfig;
@@ -567,7 +787,7 @@
     }
     for (const auto& route : routes) {
         std::set<int32_t> sources(route.sourcePortIds.begin(), route.sourcePortIds.end());
-        EXPECT_NE(0, sources.size())
+        EXPECT_NE(0UL, sources.size())
                 << "empty audio port sinks in the audio route: " << route.toString();
         EXPECT_EQ(sources.size(), route.sourcePortIds.size())
                 << "IDs of audio port sinks are not unique in the audio route: "
@@ -584,10 +804,10 @@
         ASSERT_EQ(EX_NONE, status.getExceptionCode()) << status;
     }
     for (const auto& route : routes) {
-        EXPECT_EQ(1, portIds.count(route.sinkPortId))
+        EXPECT_EQ(1UL, portIds.count(route.sinkPortId))
                 << route.sinkPortId << " sink port id is unknown";
         for (const auto& source : route.sourcePortIds) {
-            EXPECT_EQ(1, portIds.count(source)) << source << " source port id is unknown";
+            EXPECT_EQ(1UL, portIds.count(source)) << source << " source port id is unknown";
         }
     }
 }
@@ -649,7 +869,7 @@
                         << "At least two output device ports are declared as default: "
                         << defaultOutput.value() << " and " << port.id;
                 defaultOutput = port.id;
-                EXPECT_EQ(0, outputs.count(devicePort.device))
+                EXPECT_EQ(0UL, outputs.count(devicePort.device))
                         << "Non-unique output device: " << devicePort.device.toString();
                 outputs.insert(devicePort.device);
             } else if (port.flags.getTag() == AudioIoFlags::Tag::input) {
@@ -657,7 +877,7 @@
                         << "At least two input device ports are declared as default: "
                         << defaultInput.value() << " and " << port.id;
                 defaultInput = port.id;
-                EXPECT_EQ(0, inputs.count(devicePort.device))
+                EXPECT_EQ(0UL, inputs.count(devicePort.device))
                         << "Non-unique input device: " << devicePort.device.toString();
                 inputs.insert(devicePort.device);
             } else {
@@ -744,7 +964,7 @@
                 << status << " returned for getAudioPort port ID " << connectedPortId;
         EXPECT_EQ(portConnected.get(), connectedPort);
         const auto& portProfiles = connectedPort.profiles;
-        EXPECT_NE(0, portProfiles.size())
+        EXPECT_NE(0UL, portProfiles.size())
                 << "Connected port has no profiles: " << connectedPort.toString();
         const auto dynamicProfileIt =
                 std::find_if(portProfiles.begin(), portProfiles.end(), [](const auto& profile) {
@@ -773,7 +993,7 @@
         {
             aidl::android::hardware::audio::core::IModule::OpenInputStreamArguments args;
             args.portConfigId = portConfigId;
-            args.bufferSizeFrames = kDefaultBufferSize;
+            args.bufferSizeFrames = kDefaultBufferSizeFrames;
             aidl::android::hardware::audio::core::IModule::OpenInputStreamReturn ret;
             ScopedAStatus status = module->openInputStream(args, &ret);
             EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())
@@ -783,7 +1003,7 @@
         {
             aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
             args.portConfigId = portConfigId;
-            args.bufferSizeFrames = kDefaultBufferSize;
+            args.bufferSizeFrames = kDefaultBufferSizeFrames;
             aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
             ScopedAStatus status = module->openOutputStream(args, &ret);
             EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())
@@ -807,7 +1027,7 @@
         ASSERT_EQ(EX_NONE, status.getExceptionCode()) << status;
     }
     for (const auto& config : portConfigs) {
-        EXPECT_EQ(1, portIds.count(config.portId))
+        EXPECT_EQ(1UL, portIds.count(config.portId))
                 << config.portId << " port id is unknown, config id " << config.id;
     }
 }
@@ -1151,14 +1371,14 @@
     }
 
     void CloseTwice() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         std::shared_ptr<Stream> heldStream;
         {
             WithStream<Stream> stream(portConfig.value());
-            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
             heldStream = stream.getSharedPointer();
         }
         ScopedAStatus status = heldStream->close();
@@ -1167,15 +1387,16 @@
     }
 
     void OpenAllConfigs() {
-        const auto allPortConfigs = moduleConfig->getPortConfigsForMixPorts(IsInput<Stream>());
+        const auto allPortConfigs =
+                moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
         for (const auto& portConfig : allPortConfigs) {
             WithStream<Stream> stream(portConfig);
-            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+            ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
         }
     }
 
     void OpenInvalidBufferSize() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
@@ -1194,13 +1415,14 @@
 
     void OpenInvalidDirection() {
         // Important! The direction of the port config must be reversed.
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(!IsInput<Stream>());
+        const auto portConfig =
+                moduleConfig->getSingleConfigForMixPort(!IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         WithStream<Stream> stream(portConfig.value());
         ASSERT_NO_FATAL_FAILURE(stream.SetUpPortConfig(module.get()));
-        ScopedAStatus status = stream.SetUpNoChecks(module.get(), kDefaultBufferSize);
+        ScopedAStatus status = stream.SetUpNoChecks(module.get(), kDefaultBufferSizeFrames);
         EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())
                 << status << " open" << direction(true) << "Stream returned for port config ID "
                 << stream.getPortId();
@@ -1208,7 +1430,7 @@
     }
 
     void OpenOverMaxCount() {
-        constexpr bool isInput = IsInput<Stream>();
+        constexpr bool isInput = IOTraits<Stream>::is_input;
         auto ports = moduleConfig->getMixPorts(isInput);
         bool hasSingleRun = false;
         for (const auto& port : ports) {
@@ -1229,10 +1451,11 @@
                 streamWraps[i].emplace(portConfigs[i]);
                 WithStream<Stream>& stream = streamWraps[i].value();
                 if (i < maxStreamCount) {
-                    ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+                    ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
                 } else {
                     ASSERT_NO_FATAL_FAILURE(stream.SetUpPortConfig(module.get()));
-                    ScopedAStatus status = stream.SetUpNoChecks(module.get(), kDefaultBufferSize);
+                    ScopedAStatus status =
+                            stream.SetUpNoChecks(module.get(), kDefaultBufferSizeFrames);
                     EXPECT_EQ(EX_ILLEGAL_STATE, status.getExceptionCode())
                             << status << " open" << direction(true)
                             << "Stream returned for port config ID " << stream.getPortId()
@@ -1247,35 +1470,145 @@
     }
 
     void OpenTwiceSamePortConfig() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value()));
     }
 
+    void ReadOrWrite(bool useImpl2, bool testObservablePosition) {
+        const auto allPortConfigs =
+                moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
+        if (allPortConfigs.empty()) {
+            GTEST_SKIP() << "No mix ports have attached devices";
+        }
+        for (const auto& portConfig : allPortConfigs) {
+            EXPECT_NO_FATAL_FAILURE(ReadOrWriteImpl(portConfig, useImpl2, testObservablePosition))
+                    << portConfig.toString();
+        }
+    }
+
     void ResetPortConfigWithOpenStream() {
-        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IsInput<Stream>());
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
         if (!portConfig.has_value()) {
             GTEST_SKIP() << "No mix port for attached devices";
         }
         WithStream<Stream> stream(portConfig.value());
-        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSize));
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
         ScopedAStatus status = module->resetAudioPortConfig(stream.getPortId());
         EXPECT_EQ(EX_ILLEGAL_STATE, status.getExceptionCode())
                 << status << " returned for port config ID " << stream.getPortId();
     }
 
+    void SendInvalidCommand() {
+        const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
+        if (!portConfig.has_value()) {
+            GTEST_SKIP() << "No mix port for attached devices";
+        }
+        EXPECT_NO_FATAL_FAILURE(SendInvalidCommandImpl(portConfig.value()));
+    }
+
     void OpenTwiceSamePortConfigImpl(const AudioPortConfig& portConfig) {
         WithStream<Stream> stream1(portConfig);
-        ASSERT_NO_FATAL_FAILURE(stream1.SetUp(module.get(), kDefaultBufferSize));
+        ASSERT_NO_FATAL_FAILURE(stream1.SetUp(module.get(), kDefaultBufferSizeFrames));
         WithStream<Stream> stream2;
-        ScopedAStatus status =
-                stream2.SetUpNoChecks(module.get(), stream1.getPortConfig(), kDefaultBufferSize);
+        ScopedAStatus status = stream2.SetUpNoChecks(module.get(), stream1.getPortConfig(),
+                                                     kDefaultBufferSizeFrames);
         EXPECT_EQ(EX_ILLEGAL_STATE, status.getExceptionCode())
                 << status << " when opening " << direction(false)
                 << " stream twice for the same port config ID " << stream1.getPortId();
     }
+
+    template <class Worker>
+    void WaitForObservablePositionAdvance(Worker& worker) {
+        static constexpr int kWriteDurationUs = 50 * 1000;
+        static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000};
+        int64_t framesInitial;
+        framesInitial = worker.getLastObservablePosition().frames;
+        ASSERT_FALSE(worker.hasError());
+        bool timedOut = false;
+        int64_t frames = framesInitial;
+        for (android::base::Timer elapsed;
+             frames <= framesInitial && !worker.hasError() &&
+             !(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) {
+            usleep(kWriteDurationUs);
+            frames = worker.getLastObservablePosition().frames;
+        }
+        EXPECT_FALSE(timedOut);
+        EXPECT_FALSE(worker.hasError()) << worker.getError();
+        EXPECT_GT(frames, framesInitial);
+    }
+
+    void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useImpl2,
+                         bool testObservablePosition) {
+        if (!useImpl2) {
+            ASSERT_NO_FATAL_FAILURE(ReadOrWriteImpl1(portConfig, testObservablePosition));
+        } else {
+            ASSERT_NO_FATAL_FAILURE(ReadOrWriteImpl2(portConfig, testObservablePosition));
+        }
+    }
+
+    // Set up a patch first, then open a stream.
+    void ReadOrWriteImpl1(const AudioPortConfig& portConfig, bool testObservablePosition) {
+        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+                IOTraits<Stream>::is_input, portConfig);
+        ASSERT_FALSE(devicePorts.empty());
+        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+        WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+        WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        typename IOTraits<Stream>::Worker worker(*stream.getContext());
+
+        ASSERT_TRUE(worker.start());
+        ASSERT_TRUE(worker.waitForAtLeastOneCycle());
+        if (testObservablePosition) {
+            ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
+        }
+    }
+
+    // Open a stream, then set up a patch for it.
+    void ReadOrWriteImpl2(const AudioPortConfig& portConfig, bool testObservablePosition) {
+        WithStream<Stream> stream(portConfig);
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        typename IOTraits<Stream>::Worker worker(*stream.getContext());
+
+        auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+                IOTraits<Stream>::is_input, portConfig);
+        ASSERT_FALSE(devicePorts.empty());
+        auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+        WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+        ASSERT_TRUE(worker.start());
+        ASSERT_TRUE(worker.waitForAtLeastOneCycle());
+        if (testObservablePosition) {
+            ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
+        }
+    }
+
+    void SendInvalidCommandImpl(const AudioPortConfig& portConfig) {
+        std::vector<StreamDescriptor::Command> commands(6);
+        commands[0].code = -1;
+        commands[1].code = StreamDescriptor::COMMAND_BURST - 1;
+        commands[2].code = std::numeric_limits<int32_t>::min();
+        commands[3].code = std::numeric_limits<int32_t>::max();
+        commands[4].code = StreamDescriptor::COMMAND_BURST;
+        commands[4].fmqByteCount = -1;
+        commands[5].code = StreamDescriptor::COMMAND_BURST;
+        commands[5].fmqByteCount = std::numeric_limits<int32_t>::min();
+        WithStream<Stream> stream(portConfig);
+        ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+        StreamWorker<StreamInvalidCommandLogic> writer(*stream.getContext(), commands);
+        ASSERT_TRUE(writer.start());
+        writer.waitForAtLeastOneCycle();
+        auto unexpectedStatuses = writer.getUnexpectedStatuses();
+        EXPECT_EQ(0UL, unexpectedStatuses.size())
+                << "Pairs of (command, actual status): "
+                << android::internal::ToString(unexpectedStatuses);
+    }
 };
 using AudioStreamIn = AudioStream<IStreamIn>;
 using AudioStreamOut = AudioStream<IStreamOut>;
@@ -1292,6 +1625,13 @@
 #define TEST_IO_STREAM(method_name)                                                \
     TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
     TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
+#define TEST_IO_STREAM_2(method_name, arg1, arg2)           \
+    TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) {  \
+        ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2));   \
+    }                                                       \
+    TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \
+        ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2));   \
+    }
 
 TEST_IO_STREAM(CloseTwice);
 TEST_IO_STREAM(OpenAllConfigs);
@@ -1299,7 +1639,12 @@
 TEST_IO_STREAM(OpenInvalidDirection);
 TEST_IO_STREAM(OpenOverMaxCount);
 TEST_IO_STREAM(OpenTwiceSamePortConfig);
+TEST_IO_STREAM_2(ReadOrWrite, false, false);
+TEST_IO_STREAM_2(ReadOrWrite, true, false);
+TEST_IO_STREAM_2(ReadOrWrite, false, true);
+TEST_IO_STREAM_2(ReadOrWrite, true, true);
 TEST_IO_STREAM(ResetPortConfigWithOpenStream);
+TEST_IO_STREAM(SendInvalidCommand);
 
 TEST_P(AudioStreamOut, OpenTwicePrimary) {
     const auto mixPorts = moduleConfig->getMixPorts(false);
@@ -1340,7 +1685,7 @@
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args;
     args.portConfigId = portConfig.value().id;
     args.sourceMetadata = GenerateSourceMetadata(portConfig.value());
-    args.bufferSizeFrames = kDefaultBufferSize;
+    args.bufferSizeFrames = kDefaultBufferSizeFrames;
     aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret;
     ScopedAStatus status = module->openOutputStream(args, &ret);
     EXPECT_EQ(EX_ILLEGAL_ARGUMENT, status.getExceptionCode())