audio: Fix output blocking condition in the remote submix

During transition to AIDL, the condition for blocking writes
on the output stream of the remote submix was not transferred
correctly. This logic was fixed.

While testing the fixed logic, some deficiencies in the VTS
tests were found and also fixed.

Bug: 383982254
Test: atest --test-filter="AudioModuleRemoteSubmix*" VtsHalAudioCoreTargetTest
Change-Id: Iecd5ecd329d61764b314744eaf4afb6196a38e3e
diff --git a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
index ea59771..cef0ea6 100644
--- a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
+++ b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
@@ -77,17 +77,15 @@
 }
 
 ::android::status_t StreamRemoteSubmix::drain(StreamDescriptor::DrainMode) {
-    usleep(1000);
     return ::android::OK;
 }
 
 ::android::status_t StreamRemoteSubmix::flush() {
-    usleep(1000);
-    return ::android::OK;
+    // TODO(b/372951987): consider if this needs to be done from 'StreamInWorkerLogic::cycle'.
+    return mIsInput ? standby() : ::android::OK;
 }
 
 ::android::status_t StreamRemoteSubmix::pause() {
-    usleep(1000);
     return ::android::OK;
 }
 
diff --git a/audio/aidl/default/r_submix/SubmixRoute.cpp b/audio/aidl/default/r_submix/SubmixRoute.cpp
index 325a012..445b1d3 100644
--- a/audio/aidl/default/r_submix/SubmixRoute.cpp
+++ b/audio/aidl/default/r_submix/SubmixRoute.cpp
@@ -134,10 +134,10 @@
 // - the peer input is in standby AFTER having been active.
 // We DO block if:
 // - the input was never activated to avoid discarding first frames in the pipe in case capture
-// start was delayed
+//   start was delayed
 bool SubmixRoute::shouldBlockWrite() {
     std::lock_guard guard(mLock);
-    return (mStreamInOpen || (mStreamInStandby && (mReadCounterFrames != 0)));
+    return mStreamInOpen && (!mStreamInStandby || mReadCounterFrames == 0);
 }
 
 long SubmixRoute::updateReadCounterFrames(size_t frameCount) {
diff --git a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
index 6bce107..e99ba30 100644
--- a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
@@ -928,6 +928,8 @@
         LOG(DEBUG) << __func__;
         return "";
     }
+    const std::vector<int8_t>& getData() const { return mData; }
+    void fillData(int8_t filler) { std::fill(mData.begin(), mData.end(), filler); }
     std::optional<StreamDescriptor::Command> maybeGetNextCommand(int* actualSize = nullptr) {
         TransitionTrigger trigger = mDriver->getNextTrigger(mData.size(), actualSize);
         if (StreamEventReceiver::Event* expEvent =
@@ -997,6 +999,8 @@
     StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver,
                       StreamEventReceiver* eventReceiver)
         : StreamCommonLogic(context, driver, eventReceiver) {}
+    // Should only be called after the worker has joined.
+    const std::vector<int8_t>& getData() const { return StreamCommonLogic::getData(); }
 
   protected:
     Status cycle() override {
@@ -1062,6 +1066,7 @@
         if (const size_t readCount =
                     !isMmapped() ? getDataMQ()->availableToRead() : reply.fmqByteCount;
             readCount > 0) {
+            fillData(-1);
             if (isMmapped() ? readDataFromMmap(readCount) : readDataFromMQ(readCount)) {
                 goto checkAcceptedReply;
             }
@@ -1083,6 +1088,8 @@
     StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver,
                       StreamEventReceiver* eventReceiver)
         : StreamCommonLogic(context, driver, eventReceiver) {}
+    // Should only be called after the worker has joined.
+    const std::vector<int8_t>& getData() const { return StreamCommonLogic::getData(); }
 
   protected:
     Status cycle() override {
@@ -1099,6 +1106,14 @@
             return Status::ABORT;
         }
         if (actualSize != 0) {
+            if (command.getTag() == StreamDescriptor::Command::burst) {
+                fillData(mBurstIteration);
+                if (mBurstIteration < std::numeric_limits<int8_t>::max()) {
+                    mBurstIteration++;
+                } else {
+                    mBurstIteration = 0;
+                }
+            }
             if (isMmapped() ? !writeDataToMmap() : !writeDataToMQ()) {
                 return Status::ABORT;
             }
@@ -1157,6 +1172,9 @@
         LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
         return Status::ABORT;
     }
+
+  private:
+    int8_t mBurstIteration = 1;
 };
 using StreamWriter = StreamWorker<StreamWriterLogic>;
 
@@ -2849,10 +2867,12 @@
         ASSERT_NO_FATAL_FAILURE(mStream->SetUpStream(module, getMinimumStreamBufferSizeFrames()));
     }
 
-    void SetUpStreamForDevicePort(IModule* module, ModuleConfig* moduleConfig,
-                                  const AudioPort& devicePort, bool connectedOnly = false) {
-        ASSERT_NO_FATAL_FAILURE(
-                SetUpPortConfigForDevicePort(module, moduleConfig, devicePort, connectedOnly));
+    void SetUpStreamForDevicePort(
+            IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort,
+            bool connectedOnly = false,
+            const std::optional<AudioDeviceAddress>& connectionAddress = std::nullopt) {
+        ASSERT_NO_FATAL_FAILURE(SetUpPortConfigForDevicePort(module, moduleConfig, devicePort,
+                                                             connectedOnly, connectionAddress));
         if (!mSkipTestReason.empty()) return;
         ASSERT_NO_FATAL_FAILURE(SetUpStream(module));
     }
@@ -2888,6 +2908,23 @@
         if (!mSkipTestReason.empty()) return;
         ASSERT_NO_FATAL_FAILURE(SetUpStream(module));
     }
+    void SetUpStreamForNewMixPortConfig(IModule* module, ModuleConfig*,
+                                        const AudioPortConfig& existingMixPortConfig,
+                                        const AudioPortConfig& existingDevicePortConfig) {
+        auto mixPortConfig = existingMixPortConfig;
+        mixPortConfig.id = 0;
+        mMixPortConfig = std::make_unique<WithAudioPortConfig>(mixPortConfig);
+        ASSERT_NO_FATAL_FAILURE(mMixPortConfig->SetUp(module));
+        mDevicePortConfig = std::make_unique<WithAudioPortConfig>(existingDevicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(mDevicePortConfig->SetUp(module));
+        mDevice = existingDevicePortConfig.ext.get<AudioPortExt::device>().device;
+        mPatch = std::make_unique<WithAudioPatch>(mIsInput, mMixPortConfig->get(),
+                                                  mDevicePortConfig->get());
+        ASSERT_NO_FATAL_FAILURE(mPatch->SetUp(module));
+        mStream = std::make_unique<WithStream<Stream>>(mMixPortConfig->get());
+        ASSERT_NO_FATAL_FAILURE(mStream->SetUpPortConfig(module));
+        ASSERT_NO_FATAL_FAILURE(SetUpStream(module));
+    }
     void SetUpPatchForMixPortConfig(IModule* module, ModuleConfig* moduleConfig,
                                     const AudioPortConfig& mixPortConfig) {
         constexpr bool connectedOnly = true;
@@ -2919,6 +2956,7 @@
     }
 
     const AudioDevice& getDevice() const { return mDevice; }
+    const AudioPortConfig& getDevicePortConfig() const { return mDevicePortConfig->get(); }
     int32_t getMinimumStreamBufferSizeFrames() const {
         return mPatch->getMinimumStreamBufferSizeFrames();
     }
@@ -2934,7 +2972,8 @@
   private:
     void SetUpDevicePort(IModule* module, ModuleConfig* moduleConfig,
                          const std::set<int32_t>& devicePortIds, bool connectedOnly,
-                         std::optional<AudioPort>* connectedDevicePort) {
+                         std::optional<AudioPort>* connectedDevicePort,
+                         const std::optional<AudioDeviceAddress>& connectionAddress) {
         const auto attachedDevicePorts = moduleConfig->getAttachedDevicePorts();
         if (auto it = findAny<AudioPort>(attachedDevicePorts, devicePortIds);
             it != attachedDevicePorts.end()) {
@@ -2951,7 +2990,12 @@
             const auto externalDevicePorts = moduleConfig->getExternalDevicePorts();
             if (auto it = findAny<AudioPort>(externalDevicePorts, devicePortIds);
                 it != externalDevicePorts.end()) {
-                AudioPort portWithData = GenerateUniqueDeviceAddress(*it);
+                AudioPort portWithData = *it;
+                if (connectionAddress.has_value()) {
+                    portWithData.ext.get<AudioPortExt::Tag::device>().device.address =
+                            *connectionAddress;
+                }
+                portWithData = GenerateUniqueDeviceAddress(portWithData);
                 mPortConnected = std::make_unique<WithDevicePortConnectedState>(portWithData);
                 ASSERT_NO_FATAL_FAILURE(mPortConnected->SetUp(module, moduleConfig));
                 *connectedDevicePort = mPortConnected->get();
@@ -2970,9 +3014,9 @@
             LOG(DEBUG) << __func__ << ": " << mSkipTestReason;
             return;
         };
-        ASSERT_NO_FATAL_FAILURE(SetUpDevicePort(module, moduleConfig,
-                                                extractIds<AudioPort>(devicePorts), connectedOnly,
-                                                connectedDevicePort));
+        ASSERT_NO_FATAL_FAILURE(SetUpDevicePort(
+                module, moduleConfig, extractIds<AudioPort>(devicePorts), connectedOnly,
+                connectedDevicePort, std::nullopt /*connectionAddress*/));
         if (!connectedDevicePort->has_value()) {
             mSkipTestReason = std::string("Unable to find a device port pair for mix port id ")
                                       .append(std::to_string(mixPort.id));
@@ -2980,11 +3024,14 @@
             return;
         }
     }
-    void SetUpPortConfigForDevicePort(IModule* module, ModuleConfig* moduleConfig,
-                                      const AudioPort& devicePort, bool connectedOnly) {
+    void SetUpPortConfigForDevicePort(
+            IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort,
+            bool connectedOnly,
+            const std::optional<AudioDeviceAddress>& connectionAddress = std::nullopt) {
         std::optional<AudioPort> connectedDevicePort;
         ASSERT_NO_FATAL_FAILURE(SetUpDevicePort(module, moduleConfig, {devicePort.id},
-                                                connectedOnly, &connectedDevicePort));
+                                                connectedOnly, &connectedDevicePort,
+                                                connectionAddress));
         if (!connectedDevicePort.has_value()) {
             mSkipTestReason = std::string("Device port id ")
                                       .append(std::to_string(devicePort.id))
@@ -3125,7 +3172,8 @@
 };
 
 // Defined later together with state transition sequences.
-std::shared_ptr<StateSequence> makeBurstCommands(bool isSync);
+std::shared_ptr<StateSequence> makeBurstCommands(bool isSync, size_t burstCount = 10,
+                                                 bool standbyInputWhenDone = false);
 
 // Certain types of ports can not be used without special preconditions.
 static bool skipStreamIoTestForMixPortConfig(const AudioPortConfig& portConfig) {
@@ -3150,10 +3198,11 @@
   public:
     explicit StreamFixtureWithWorker(bool isSync) : mIsSync(isSync) {}
 
-    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {
+    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort,
+               const std::optional<AudioDeviceAddress>& connectionAddress = std::nullopt) {
         mStream = std::make_unique<StreamFixture<Stream>>();
-        ASSERT_NO_FATAL_FAILURE(
-                mStream->SetUpStreamForDevicePort(module, moduleConfig, devicePort));
+        ASSERT_NO_FATAL_FAILURE(mStream->SetUpStreamForDevicePort(
+                module, moduleConfig, devicePort, false /*connectedOnly*/, connectionAddress));
         MaybeSetSkipTestReason();
     }
 
@@ -3165,26 +3214,42 @@
         MaybeSetSkipTestReason();
     }
 
-    void SendBurstCommands(bool validatePosition = true) {
-        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands());
+    void SetUp(IModule* module, ModuleConfig* moduleConfig,
+               const AudioPortConfig& existingMixPortConfig,
+               const AudioPortConfig& existingDevicePortConfig) {
+        mStream = std::make_unique<StreamFixture<Stream>>();
+        ASSERT_NO_FATAL_FAILURE(mStream->SetUpStreamForNewMixPortConfig(
+                module, moduleConfig, existingMixPortConfig, existingDevicePortConfig));
+        MaybeSetSkipTestReason();
+    }
+
+    void SendBurstCommands(bool validatePosition = true, size_t burstCount = 10,
+                           bool standbyInputWhenDone = false) {
+        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands(burstCount, standbyInputWhenDone));
         ASSERT_NO_FATAL_FAILURE(JoinWorkerAfterBurstCommands(validatePosition));
     }
 
-    void StartWorkerToSendBurstCommands() {
+    void StartWorkerToSendBurstCommands(size_t burstCount = 10, bool standbyInputWhenDone = false) {
+        if (!IOTraits<Stream>::is_input) {
+            ASSERT_FALSE(standbyInputWhenDone) << "Only supported for input";
+        }
         const StreamContext* context = mStream->getStreamContext();
         mWorkerDriver = std::make_unique<StreamLogicDefaultDriver>(
-                makeBurstCommands(mIsSync), context->getFrameSizeBytes(), context->isMmapped());
+                makeBurstCommands(mIsSync, burstCount, standbyInputWhenDone),
+                context->getFrameSizeBytes(), context->isMmapped());
         mWorker = std::make_unique<typename IOTraits<Stream>::Worker>(
                 *context, mWorkerDriver.get(), mStream->getStreamEventReceiver());
         LOG(DEBUG) << __func__ << ": starting " << IOTraits<Stream>::directionStr << " worker...";
         ASSERT_TRUE(mWorker->start());
     }
 
-    void JoinWorkerAfterBurstCommands(bool validatePosition = true) {
-        // Must call 'prepareToClose' before attempting to join because the stream may be stuck.
-        std::shared_ptr<IStreamCommon> common;
-        ASSERT_IS_OK(mStream->getStream()->getStreamCommon(&common));
-        ASSERT_IS_OK(common->prepareToClose());
+    void JoinWorkerAfterBurstCommands(bool validatePosition = true,
+                                      bool callPrepareToClose = true) {
+        if (callPrepareToClose) {
+            std::shared_ptr<IStreamCommon> common;
+            ASSERT_IS_OK(mStream->getStream()->getStreamCommon(&common));
+            ASSERT_IS_OK(common->prepareToClose());
+        }
         LOG(DEBUG) << __func__ << ": joining " << IOTraits<Stream>::directionStr << " worker...";
         mWorker->join();
         EXPECT_FALSE(mWorker->hasError()) << mWorker->getError();
@@ -3195,6 +3260,7 @@
             EXPECT_FALSE(mWorkerDriver->hasObservableRetrogradePosition());
             EXPECT_FALSE(mWorkerDriver->hasHardwareRetrogradePosition());
         }
+        mLastData = mWorker->getData();
         mWorker.reset();
         mWorkerDriver.reset();
     }
@@ -3202,6 +3268,9 @@
     void TeardownPatch() { mStream->TeardownPatch(); }
 
     const AudioDevice& getDevice() const { return mStream->getDevice(); }
+    const AudioPortConfig& getDevicePortConfig() const { return mStream->getDevicePortConfig(); }
+    const std::vector<int8_t>& getLastData() const { return mLastData; }
+    const AudioPortConfig& getPortConfig() const { return mStream->getPortConfig(); }
     Stream* getStream() const { return mStream->getStream(); }
     std::string skipTestReason() const {
         return !mSkipTestReason.empty() ? mSkipTestReason : mStream->skipTestReason();
@@ -3219,6 +3288,7 @@
     std::unique_ptr<StreamFixture<Stream>> mStream;
     std::unique_ptr<StreamLogicDefaultDriver> mWorkerDriver;
     std::unique_ptr<typename IOTraits<Stream>::Worker> mWorker;
+    std::vector<int8_t> mLastData;
 };
 
 template <typename Stream>
@@ -4562,15 +4632,20 @@
 
 // TODO: Add async test cases for input once it is implemented.
 
-std::shared_ptr<StateSequence> makeBurstCommands(bool isSync) {
+std::shared_ptr<StateSequence> makeBurstCommands(bool isSync, size_t burstCount,
+                                                 bool standbyInputWhenDone) {
     using State = StreamDescriptor::State;
     auto d = std::make_unique<StateDag>();
-    StateDag::Node last = d->makeFinalNode(State::ACTIVE);
+    StateDag::Node active = d->makeFinalNode(State::ACTIVE);
+    StateDag::Node paused = d->makeNodes({std::make_pair(State::ACTIVE, kPauseCommand),
+                                          std::make_pair(State::PAUSED, kFlushCommand)},
+                                         State::STANDBY);
+    StateDag::Node& last = standbyInputWhenDone ? paused : active;
     if (isSync) {
         StateDag::Node idle = d->makeNode(
                 State::IDLE, kBurstCommand,
                 // Use several bursts to ensure that the driver starts reporting the position.
-                d->makeNodes(State::ACTIVE, kBurstCommand, 10, last));
+                d->makeNodes(State::ACTIVE, kBurstCommand, burstCount, last));
         d->makeNode(State::STANDBY, kStartCommand, idle);
     } else {
         StateDag::Node active2 = d->makeNode(State::ACTIVE, kBurstCommand, last);
@@ -4935,49 +5010,69 @@
   public:
     WithRemoteSubmix() : mStream(true /*isSync*/) {}
     explicit WithRemoteSubmix(AudioDeviceAddress address)
-        : mStream(true /*isSync*/), mAddress(address) {}
+        : mStream(true /*isSync*/), mAddress(address) {
+        LOG(DEBUG) << __func__ << ": Creating " << IOTraits<Stream>::directionStr
+                   << " stream for: " << mAddress.value_or(AudioDeviceAddress{}).toString();
+    }
     WithRemoteSubmix(const WithRemoteSubmix&) = delete;
     WithRemoteSubmix& operator=(const WithRemoteSubmix&) = delete;
+    ~WithRemoteSubmix() {
+        LOG(DEBUG) << __func__ << ": Deleting " << IOTraits<Stream>::directionStr
+                   << " stream for: " << mAddress.value_or(AudioDeviceAddress{}).toString();
+    }
 
-    static std::optional<AudioPort> getRemoteSubmixAudioPort(
-            ModuleConfig* moduleConfig,
-            const std::optional<AudioDeviceAddress>& address = std::nullopt) {
+    static std::optional<AudioPort> getRemoteSubmixAudioPort(ModuleConfig* moduleConfig) {
         auto ports =
                 moduleConfig->getRemoteSubmixPorts(IOTraits<Stream>::is_input, true /*singlePort*/);
         if (ports.empty()) return {};
-        AudioPort port = ports.front();
-        if (address) {
-            port.ext.template get<AudioPortExt::Tag::device>().device.address = address.value();
-        }
-        return port;
+        return ports.front();
     }
 
     void SetUp(IModule* module, ModuleConfig* moduleConfig) {
-        auto devicePort = getRemoteSubmixAudioPort(moduleConfig, mAddress);
+        auto devicePort = getRemoteSubmixAudioPort(moduleConfig);
         ASSERT_TRUE(devicePort.has_value()) << "Device port for remote submix device not found";
-        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, *devicePort));
+        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, *devicePort, mAddress));
         mAddress = mStream.getDevice().address;
     }
-
-    void StartWorkerToSendBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(mStream.StartWorkerToSendBurstCommands());
+    void SetUp(IModule* module, ModuleConfig* moduleConfig,
+               const AudioPortConfig& existingMixPortConfig,
+               const AudioPortConfig& existingDevicePortConfig) {
+        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, existingMixPortConfig,
+                                              existingDevicePortConfig));
+        mAddress = mStream.getDevice().address;
+    }
+    void StartWorkerToSendBurstCommands(size_t burstCount = 10, bool standbyInputWhenDone = false) {
+        ASSERT_NO_FATAL_FAILURE(
+                mStream.StartWorkerToSendBurstCommands(burstCount, standbyInputWhenDone));
     }
 
-    void JoinWorkerAfterBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands());
+    void JoinWorkerAfterBurstCommands(bool callPrepareToCloseBeforeJoin) {
+        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands(
+                true /*validatePositionIncrease*/, callPrepareToCloseBeforeJoin));
     }
 
-    void SendBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(mStream.StartWorkerToSendBurstCommands());
-        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands());
+    void JoinWorkerAfterBurstCommands(bool validatePositionIncrease,
+                                      bool callPrepareToCloseBeforeJoin) {
+        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands(validatePositionIncrease,
+                                                                     callPrepareToCloseBeforeJoin));
+    }
+
+    void SendBurstCommands(bool callPrepareToCloseBeforeJoin, size_t burstCount = 10,
+                           bool standbyInputWhenDone = false) {
+        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands(burstCount, standbyInputWhenDone));
+        // When 'burstCount == 0', there is no "previous" frame count, thus the check for
+        // the position increase fails.
+        ASSERT_NO_FATAL_FAILURE(JoinWorkerAfterBurstCommands(
+                burstCount > 0 /*validatePositionIncrease*/, callPrepareToCloseBeforeJoin));
     }
 
     std::optional<AudioDeviceAddress> getAudioDeviceAddress() const { return mAddress; }
+    const AudioPortConfig& getDevicePortConfig() const { return mStream.getDevicePortConfig(); }
+    int8_t getLastBurstIteration() const { return mStream.getLastData()[0]; }
+    const AudioPortConfig& getPortConfig() const { return mStream.getPortConfig(); }
     std::string skipTestReason() const { return mStream.skipTestReason(); }
 
   private:
-    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {}
-
     StreamFixtureWithWorker<Stream> mStream;
     std::optional<AudioDeviceAddress> mAddress;
 };
@@ -4993,77 +5088,130 @@
         }
         ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig());
     }
+
+    void TearDown() override {
+        streamIn.reset();
+        streamOut.reset();
+    }
+
+    void CreateOutputStream() {
+        streamOut = std::make_unique<WithRemoteSubmix<IStreamOut>>();
+        ASSERT_NO_FATAL_FAILURE(streamOut->SetUp(module.get(), moduleConfig.get()));
+        // Note: any issue with connection attempts is considered as a problem.
+        ASSERT_EQ("", streamOut->skipTestReason());
+        ASSERT_TRUE(streamOut->getAudioDeviceAddress().has_value());
+    }
+
+    void CreateInputStream(const std::optional<AudioDeviceAddress>& address = std::nullopt) {
+        if (address.has_value()) {
+            streamIn = std::make_unique<WithRemoteSubmix<IStreamIn>>(address.value());
+        } else {
+            ASSERT_TRUE(streamOut->getAudioDeviceAddress().has_value());
+            streamIn = std::make_unique<WithRemoteSubmix<IStreamIn>>(
+                    streamOut->getAudioDeviceAddress().value());
+        }
+        ASSERT_NO_FATAL_FAILURE(streamIn->SetUp(module.get(), moduleConfig.get()));
+        ASSERT_EQ("", streamIn->skipTestReason());
+        auto inAddress = streamIn->getAudioDeviceAddress();
+        ASSERT_TRUE(inAddress.has_value());
+        if (address.has_value()) {
+            if (address.value() != AudioDeviceAddress{}) {
+                ASSERT_EQ(address.value(), inAddress.value());
+            }
+        } else {
+            ASSERT_EQ(streamOut->getAudioDeviceAddress().value(), inAddress.value());
+        }
+    }
+
+    std::unique_ptr<WithRemoteSubmix<IStreamOut>> streamOut;
+    std::unique_ptr<WithRemoteSubmix<IStreamIn>> streamIn;
 };
 
 TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenNoInput) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    // Note: here and in other tests any issue with connection attempts is considered as a problem.
-    ASSERT_EQ("", streamOut.skipTestReason());
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
 }
 
-TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamOut.skipTestReason());
-    auto address = streamOut.getAudioDeviceAddress();
-    ASSERT_TRUE(address.has_value());
+TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputInStandby) {
+    if (int32_t version; module->getInterfaceVersion(&version).isOk() && version < 3) {
+        GTEST_SKIP() << "Default remote submix implementation <V3 could not pass this test";
+    }
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    // Send just 1 burst command. This triggers the condition "input is in standby after
+    // being active." The output must flush the fifo before writing to avoid being blocked.
+    ASSERT_NO_FATAL_FAILURE(
+            streamIn->StartWorkerToSendBurstCommands(1, true /*stanbyInputWhenDone*/));
+    // The output must be able to close without shutting down the pipe first (due to a call
+    // to 'prepareToClose').
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+    ASSERT_NO_FATAL_FAILURE(
+            streamIn->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+}
 
-    WithRemoteSubmix<IStreamIn> streamIn(address.value());
-    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamIn.skipTestReason());
+TEST_P(AudioModuleRemoteSubmix, BlockedOutputUnblocksOnClose) {
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    // Send just 3 burst command, but do not enter standby. This is a stalled input.
+    ASSERT_NO_FATAL_FAILURE(streamIn->StartWorkerToSendBurstCommands(3));
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(true /*callPrepareToCloseBeforeJoin*/));
+    ASSERT_NO_FATAL_FAILURE(
+            streamIn->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+}
 
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommands());
+TEST_P(AudioModuleRemoteSubmix, OutputBlocksUntilInputStarts) {
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    // Read the head of the pipe and check that it starts with the first output burst, that is,
+    // the contents of the very first write has not been superseded due to pipe overflow.
+    // The burstCount is '0' because the very first burst is used to exit from the 'IDLE' state,
+    // see 'makeBurstCommands'.
+    ASSERT_NO_FATAL_FAILURE(streamIn->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/, 0,
+                                                        true /*standbyInputWhenDone*/));
+    EXPECT_EQ(1, streamIn->getLastBurstIteration());
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(true /*callPrepareToCloseBeforeJoin*/));
 }
 
 TEST_P(AudioModuleRemoteSubmix, OutputAndInput) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamOut.skipTestReason());
-    auto address = streamOut.getAudioDeviceAddress();
-    ASSERT_TRUE(address.has_value());
-
-    WithRemoteSubmix<IStreamIn> streamIn(address.value());
-    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamIn.skipTestReason());
-
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
     // Start writing into the output stream.
-    ASSERT_NO_FATAL_FAILURE(streamOut.StartWorkerToSendBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
     // Simultaneously, read from the input stream.
-    ASSERT_NO_FATAL_FAILURE(streamIn.SendBurstCommands());
-    ASSERT_NO_FATAL_FAILURE(streamOut.JoinWorkerAfterBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(streamIn->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
 }
 
 TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamOut.skipTestReason());
-    auto address = streamOut.getAudioDeviceAddress();
-    ASSERT_TRUE(address.has_value());
-
-    const size_t streamInCount = 3;
-    std::vector<std::unique_ptr<WithRemoteSubmix<IStreamIn>>> streamIns(streamInCount);
-    for (size_t i = 0; i < streamInCount; i++) {
-        streamIns[i] = std::make_unique<WithRemoteSubmix<IStreamIn>>(address.value());
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SetUp(module.get(), moduleConfig.get()));
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(streamIn->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/, 1,
+                                                        true /*standbyInputWhenDone*/));
+    // For the new stream, only create a new mix port config and a new patch.
+    const size_t extraStreamInCount = 2;
+    std::vector<std::unique_ptr<WithRemoteSubmix<IStreamIn>>> streamIns(extraStreamInCount);
+    for (size_t i = 0; i < extraStreamInCount; i++) {
+        streamIns[i] = std::make_unique<WithRemoteSubmix<IStreamIn>>();
+        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SetUp(module.get(), moduleConfig.get(),
+                                                    streamIn->getPortConfig(),
+                                                    streamIn->getDevicePortConfig()));
         ASSERT_EQ("", streamIns[i]->skipTestReason());
+        const auto inAddress = streamIns[i]->getAudioDeviceAddress();
+        ASSERT_TRUE(inAddress.has_value());
+        ASSERT_EQ(streamOut->getAudioDeviceAddress().value(), inAddress.value());
+        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SendBurstCommands(
+                false /*callPrepareToCloseBeforeJoin*/, 1, true /*standbyInputWhenDone*/));
     }
-    // Start writing into the output stream.
-    ASSERT_NO_FATAL_FAILURE(streamOut.StartWorkerToSendBurstCommands());
-    // Simultaneously, read from input streams.
-    for (size_t i = 0; i < streamInCount; i++) {
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->StartWorkerToSendBurstCommands());
-    }
-    for (size_t i = 0; i < streamInCount; i++) {
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->JoinWorkerAfterBurstCommands());
-    }
-    ASSERT_NO_FATAL_FAILURE(streamOut.JoinWorkerAfterBurstCommands());
-    // Clean up input streams in the reverse order because the device connection is owned
-    // by the first one.
-    for (size_t i = streamInCount; i != 0; --i) {
-        streamIns[i - 1].reset();
-    }
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
 }
 
 INSTANTIATE_TEST_SUITE_P(AudioModuleRemoteSubmixTest, AudioModuleRemoteSubmix,