Merge "audio: Fix output blocking condition in the remote submix" into main am: 759e587ae5

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/3433644

Change-Id: I50b41a79f4ae25a04ae714a46b23d02cb480ac31
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
index ea59771..cef0ea6 100644
--- a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
+++ b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp
@@ -77,17 +77,15 @@
 }
 
 ::android::status_t StreamRemoteSubmix::drain(StreamDescriptor::DrainMode) {
-    usleep(1000);
     return ::android::OK;
 }
 
 ::android::status_t StreamRemoteSubmix::flush() {
-    usleep(1000);
-    return ::android::OK;
+    // TODO(b/372951987): consider if this needs to be done from 'StreamInWorkerLogic::cycle'.
+    return mIsInput ? standby() : ::android::OK;
 }
 
 ::android::status_t StreamRemoteSubmix::pause() {
-    usleep(1000);
     return ::android::OK;
 }
 
diff --git a/audio/aidl/default/r_submix/SubmixRoute.cpp b/audio/aidl/default/r_submix/SubmixRoute.cpp
index 325a012..445b1d3 100644
--- a/audio/aidl/default/r_submix/SubmixRoute.cpp
+++ b/audio/aidl/default/r_submix/SubmixRoute.cpp
@@ -134,10 +134,10 @@
 // - the peer input is in standby AFTER having been active.
 // We DO block if:
 // - the input was never activated to avoid discarding first frames in the pipe in case capture
-// start was delayed
+//   start was delayed
 bool SubmixRoute::shouldBlockWrite() {
     std::lock_guard guard(mLock);
-    return (mStreamInOpen || (mStreamInStandby && (mReadCounterFrames != 0)));
+    return mStreamInOpen && (!mStreamInStandby || mReadCounterFrames == 0);
 }
 
 long SubmixRoute::updateReadCounterFrames(size_t frameCount) {
diff --git a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
index 93c2a61..750e54d 100644
--- a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
@@ -938,6 +938,8 @@
         LOG(DEBUG) << __func__;
         return "";
     }
+    const std::vector<int8_t>& getData() const { return mData; }
+    void fillData(int8_t filler) { std::fill(mData.begin(), mData.end(), filler); }
     std::optional<StreamDescriptor::Command> maybeGetNextCommand(int* actualSize = nullptr) {
         TransitionTrigger trigger = mDriver->getNextTrigger(mData.size(), actualSize);
         if (StreamEventReceiver::Event* expEvent =
@@ -1007,6 +1009,8 @@
     StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver,
                       StreamEventReceiver* eventReceiver)
         : StreamCommonLogic(context, driver, eventReceiver) {}
+    // Should only be called after the worker has joined.
+    const std::vector<int8_t>& getData() const { return StreamCommonLogic::getData(); }
 
   protected:
     Status cycle() override {
@@ -1072,6 +1076,7 @@
         if (const size_t readCount =
                     !isMmapped() ? getDataMQ()->availableToRead() : reply.fmqByteCount;
             readCount > 0) {
+            fillData(-1);
             if (isMmapped() ? readDataFromMmap(readCount) : readDataFromMQ(readCount)) {
                 goto checkAcceptedReply;
             }
@@ -1093,6 +1098,8 @@
     StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver,
                       StreamEventReceiver* eventReceiver)
         : StreamCommonLogic(context, driver, eventReceiver) {}
+    // Should only be called after the worker has joined.
+    const std::vector<int8_t>& getData() const { return StreamCommonLogic::getData(); }
 
   protected:
     Status cycle() override {
@@ -1109,6 +1116,14 @@
             return Status::ABORT;
         }
         if (actualSize != 0) {
+            if (command.getTag() == StreamDescriptor::Command::burst) {
+                fillData(mBurstIteration);
+                if (mBurstIteration < std::numeric_limits<int8_t>::max()) {
+                    mBurstIteration++;
+                } else {
+                    mBurstIteration = 0;
+                }
+            }
             if (isMmapped() ? !writeDataToMmap() : !writeDataToMQ()) {
                 return Status::ABORT;
             }
@@ -1167,6 +1182,9 @@
         LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
         return Status::ABORT;
     }
+
+  private:
+    int8_t mBurstIteration = 1;
 };
 using StreamWriter = StreamWorker<StreamWriterLogic>;
 
@@ -2859,10 +2877,12 @@
         ASSERT_NO_FATAL_FAILURE(mStream->SetUpStream(module, getMinimumStreamBufferSizeFrames()));
     }
 
-    void SetUpStreamForDevicePort(IModule* module, ModuleConfig* moduleConfig,
-                                  const AudioPort& devicePort, bool connectedOnly = false) {
-        ASSERT_NO_FATAL_FAILURE(
-                SetUpPortConfigForDevicePort(module, moduleConfig, devicePort, connectedOnly));
+    void SetUpStreamForDevicePort(
+            IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort,
+            bool connectedOnly = false,
+            const std::optional<AudioDeviceAddress>& connectionAddress = std::nullopt) {
+        ASSERT_NO_FATAL_FAILURE(SetUpPortConfigForDevicePort(module, moduleConfig, devicePort,
+                                                             connectedOnly, connectionAddress));
         if (!mSkipTestReason.empty()) return;
         ASSERT_NO_FATAL_FAILURE(SetUpStream(module));
     }
@@ -2898,6 +2918,23 @@
         if (!mSkipTestReason.empty()) return;
         ASSERT_NO_FATAL_FAILURE(SetUpStream(module));
     }
+    void SetUpStreamForNewMixPortConfig(IModule* module, ModuleConfig*,
+                                        const AudioPortConfig& existingMixPortConfig,
+                                        const AudioPortConfig& existingDevicePortConfig) {
+        auto mixPortConfig = existingMixPortConfig;
+        mixPortConfig.id = 0;
+        mMixPortConfig = std::make_unique<WithAudioPortConfig>(mixPortConfig);
+        ASSERT_NO_FATAL_FAILURE(mMixPortConfig->SetUp(module));
+        mDevicePortConfig = std::make_unique<WithAudioPortConfig>(existingDevicePortConfig);
+        ASSERT_NO_FATAL_FAILURE(mDevicePortConfig->SetUp(module));
+        mDevice = existingDevicePortConfig.ext.get<AudioPortExt::device>().device;
+        mPatch = std::make_unique<WithAudioPatch>(mIsInput, mMixPortConfig->get(),
+                                                  mDevicePortConfig->get());
+        ASSERT_NO_FATAL_FAILURE(mPatch->SetUp(module));
+        mStream = std::make_unique<WithStream<Stream>>(mMixPortConfig->get());
+        ASSERT_NO_FATAL_FAILURE(mStream->SetUpPortConfig(module));
+        ASSERT_NO_FATAL_FAILURE(SetUpStream(module));
+    }
     void SetUpPatchForMixPortConfig(IModule* module, ModuleConfig* moduleConfig,
                                     const AudioPortConfig& mixPortConfig) {
         constexpr bool connectedOnly = true;
@@ -2929,6 +2966,7 @@
     }
 
     const AudioDevice& getDevice() const { return mDevice; }
+    const AudioPortConfig& getDevicePortConfig() const { return mDevicePortConfig->get(); }
     int32_t getMinimumStreamBufferSizeFrames() const {
         return mPatch->getMinimumStreamBufferSizeFrames();
     }
@@ -2944,7 +2982,8 @@
   private:
     void SetUpDevicePort(IModule* module, ModuleConfig* moduleConfig,
                          const std::set<int32_t>& devicePortIds, bool connectedOnly,
-                         std::optional<AudioPort>* connectedDevicePort) {
+                         std::optional<AudioPort>* connectedDevicePort,
+                         const std::optional<AudioDeviceAddress>& connectionAddress) {
         const auto attachedDevicePorts = moduleConfig->getAttachedDevicePorts();
         if (auto it = findAny<AudioPort>(attachedDevicePorts, devicePortIds);
             it != attachedDevicePorts.end()) {
@@ -2961,7 +3000,12 @@
             const auto externalDevicePorts = moduleConfig->getExternalDevicePorts();
             if (auto it = findAny<AudioPort>(externalDevicePorts, devicePortIds);
                 it != externalDevicePorts.end()) {
-                AudioPort portWithData = GenerateUniqueDeviceAddress(*it);
+                AudioPort portWithData = *it;
+                if (connectionAddress.has_value()) {
+                    portWithData.ext.get<AudioPortExt::Tag::device>().device.address =
+                            *connectionAddress;
+                }
+                portWithData = GenerateUniqueDeviceAddress(portWithData);
                 mPortConnected = std::make_unique<WithDevicePortConnectedState>(portWithData);
                 ASSERT_NO_FATAL_FAILURE(mPortConnected->SetUp(module, moduleConfig));
                 *connectedDevicePort = mPortConnected->get();
@@ -2980,9 +3024,9 @@
             LOG(DEBUG) << __func__ << ": " << mSkipTestReason;
             return;
         };
-        ASSERT_NO_FATAL_FAILURE(SetUpDevicePort(module, moduleConfig,
-                                                extractIds<AudioPort>(devicePorts), connectedOnly,
-                                                connectedDevicePort));
+        ASSERT_NO_FATAL_FAILURE(SetUpDevicePort(
+                module, moduleConfig, extractIds<AudioPort>(devicePorts), connectedOnly,
+                connectedDevicePort, std::nullopt /*connectionAddress*/));
         if (!connectedDevicePort->has_value()) {
             mSkipTestReason = std::string("Unable to find a device port pair for mix port id ")
                                       .append(std::to_string(mixPort.id));
@@ -2990,11 +3034,14 @@
             return;
         }
     }
-    void SetUpPortConfigForDevicePort(IModule* module, ModuleConfig* moduleConfig,
-                                      const AudioPort& devicePort, bool connectedOnly) {
+    void SetUpPortConfigForDevicePort(
+            IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort,
+            bool connectedOnly,
+            const std::optional<AudioDeviceAddress>& connectionAddress = std::nullopt) {
         std::optional<AudioPort> connectedDevicePort;
         ASSERT_NO_FATAL_FAILURE(SetUpDevicePort(module, moduleConfig, {devicePort.id},
-                                                connectedOnly, &connectedDevicePort));
+                                                connectedOnly, &connectedDevicePort,
+                                                connectionAddress));
         if (!connectedDevicePort.has_value()) {
             mSkipTestReason = std::string("Device port id ")
                                       .append(std::to_string(devicePort.id))
@@ -3135,7 +3182,8 @@
 };
 
 // Defined later together with state transition sequences.
-std::shared_ptr<StateSequence> makeBurstCommands(bool isSync);
+std::shared_ptr<StateSequence> makeBurstCommands(bool isSync, size_t burstCount = 10,
+                                                 bool standbyInputWhenDone = false);
 
 // Certain types of ports can not be used without special preconditions.
 static bool skipStreamIoTestForMixPortConfig(const AudioPortConfig& portConfig) {
@@ -3160,10 +3208,11 @@
   public:
     explicit StreamFixtureWithWorker(bool isSync) : mIsSync(isSync) {}
 
-    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {
+    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort,
+               const std::optional<AudioDeviceAddress>& connectionAddress = std::nullopt) {
         mStream = std::make_unique<StreamFixture<Stream>>();
-        ASSERT_NO_FATAL_FAILURE(
-                mStream->SetUpStreamForDevicePort(module, moduleConfig, devicePort));
+        ASSERT_NO_FATAL_FAILURE(mStream->SetUpStreamForDevicePort(
+                module, moduleConfig, devicePort, false /*connectedOnly*/, connectionAddress));
         MaybeSetSkipTestReason();
     }
 
@@ -3175,26 +3224,42 @@
         MaybeSetSkipTestReason();
     }
 
-    void SendBurstCommands(bool validatePosition = true) {
-        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands());
+    void SetUp(IModule* module, ModuleConfig* moduleConfig,
+               const AudioPortConfig& existingMixPortConfig,
+               const AudioPortConfig& existingDevicePortConfig) {
+        mStream = std::make_unique<StreamFixture<Stream>>();
+        ASSERT_NO_FATAL_FAILURE(mStream->SetUpStreamForNewMixPortConfig(
+                module, moduleConfig, existingMixPortConfig, existingDevicePortConfig));
+        MaybeSetSkipTestReason();
+    }
+
+    void SendBurstCommands(bool validatePosition = true, size_t burstCount = 10,
+                           bool standbyInputWhenDone = false) {
+        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands(burstCount, standbyInputWhenDone));
         ASSERT_NO_FATAL_FAILURE(JoinWorkerAfterBurstCommands(validatePosition));
     }
 
-    void StartWorkerToSendBurstCommands() {
+    void StartWorkerToSendBurstCommands(size_t burstCount = 10, bool standbyInputWhenDone = false) {
+        if (!IOTraits<Stream>::is_input) {
+            ASSERT_FALSE(standbyInputWhenDone) << "Only supported for input";
+        }
         const StreamContext* context = mStream->getStreamContext();
         mWorkerDriver = std::make_unique<StreamLogicDefaultDriver>(
-                makeBurstCommands(mIsSync), context->getFrameSizeBytes(), context->isMmapped());
+                makeBurstCommands(mIsSync, burstCount, standbyInputWhenDone),
+                context->getFrameSizeBytes(), context->isMmapped());
         mWorker = std::make_unique<typename IOTraits<Stream>::Worker>(
                 *context, mWorkerDriver.get(), mStream->getStreamEventReceiver());
         LOG(DEBUG) << __func__ << ": starting " << IOTraits<Stream>::directionStr << " worker...";
         ASSERT_TRUE(mWorker->start());
     }
 
-    void JoinWorkerAfterBurstCommands(bool validatePosition = true) {
-        // Must call 'prepareToClose' before attempting to join because the stream may be stuck.
-        std::shared_ptr<IStreamCommon> common;
-        ASSERT_IS_OK(mStream->getStream()->getStreamCommon(&common));
-        ASSERT_IS_OK(common->prepareToClose());
+    void JoinWorkerAfterBurstCommands(bool validatePosition = true,
+                                      bool callPrepareToClose = true) {
+        if (callPrepareToClose) {
+            std::shared_ptr<IStreamCommon> common;
+            ASSERT_IS_OK(mStream->getStream()->getStreamCommon(&common));
+            ASSERT_IS_OK(common->prepareToClose());
+        }
         LOG(DEBUG) << __func__ << ": joining " << IOTraits<Stream>::directionStr << " worker...";
         mWorker->join();
         EXPECT_FALSE(mWorker->hasError()) << mWorker->getError();
@@ -3205,6 +3270,7 @@
             EXPECT_FALSE(mWorkerDriver->hasObservableRetrogradePosition());
             EXPECT_FALSE(mWorkerDriver->hasHardwareRetrogradePosition());
         }
+        mLastData = mWorker->getData();
         mWorker.reset();
         mWorkerDriver.reset();
     }
@@ -3212,6 +3278,9 @@
     void TeardownPatch() { mStream->TeardownPatch(); }
 
     const AudioDevice& getDevice() const { return mStream->getDevice(); }
+    const AudioPortConfig& getDevicePortConfig() const { return mStream->getDevicePortConfig(); }
+    const std::vector<int8_t>& getLastData() const { return mLastData; }
+    const AudioPortConfig& getPortConfig() const { return mStream->getPortConfig(); }
     Stream* getStream() const { return mStream->getStream(); }
     std::string skipTestReason() const {
         return !mSkipTestReason.empty() ? mSkipTestReason : mStream->skipTestReason();
@@ -3229,6 +3298,7 @@
     std::unique_ptr<StreamFixture<Stream>> mStream;
     std::unique_ptr<StreamLogicDefaultDriver> mWorkerDriver;
     std::unique_ptr<typename IOTraits<Stream>::Worker> mWorker;
+    std::vector<int8_t> mLastData;
 };
 
 template <typename Stream>
@@ -4576,15 +4646,20 @@
 
 // TODO: Add async test cases for input once it is implemented.
 
-std::shared_ptr<StateSequence> makeBurstCommands(bool isSync) {
+std::shared_ptr<StateSequence> makeBurstCommands(bool isSync, size_t burstCount,
+                                                 bool standbyInputWhenDone) {
     using State = StreamDescriptor::State;
     auto d = std::make_unique<StateDag>();
-    StateDag::Node last = d->makeFinalNode(State::ACTIVE);
+    StateDag::Node active = d->makeFinalNode(State::ACTIVE);
+    StateDag::Node paused = d->makeNodes({std::make_pair(State::ACTIVE, kPauseCommand),
+                                          std::make_pair(State::PAUSED, kFlushCommand)},
+                                         State::STANDBY);
+    StateDag::Node& last = standbyInputWhenDone ? paused : active;
     if (isSync) {
         StateDag::Node idle = d->makeNode(
                 State::IDLE, kBurstCommand,
                 // Use several bursts to ensure that the driver starts reporting the position.
-                d->makeNodes(State::ACTIVE, kBurstCommand, 10, last));
+                d->makeNodes(State::ACTIVE, kBurstCommand, burstCount, last));
         d->makeNode(State::STANDBY, kStartCommand, idle);
     } else {
         StateDag::Node active2 = d->makeNode(State::ACTIVE, kBurstCommand, last);
@@ -4949,49 +5024,69 @@
   public:
     WithRemoteSubmix() : mStream(true /*isSync*/) {}
     explicit WithRemoteSubmix(AudioDeviceAddress address)
-        : mStream(true /*isSync*/), mAddress(address) {}
+        : mStream(true /*isSync*/), mAddress(address) {
+        LOG(DEBUG) << __func__ << ": Creating " << IOTraits<Stream>::directionStr
+                   << " stream for: " << mAddress.value_or(AudioDeviceAddress{}).toString();
+    }
     WithRemoteSubmix(const WithRemoteSubmix&) = delete;
     WithRemoteSubmix& operator=(const WithRemoteSubmix&) = delete;
+    ~WithRemoteSubmix() {
+        LOG(DEBUG) << __func__ << ": Deleting " << IOTraits<Stream>::directionStr
+                   << " stream for: " << mAddress.value_or(AudioDeviceAddress{}).toString();
+    }
 
-    static std::optional<AudioPort> getRemoteSubmixAudioPort(
-            ModuleConfig* moduleConfig,
-            const std::optional<AudioDeviceAddress>& address = std::nullopt) {
+    static std::optional<AudioPort> getRemoteSubmixAudioPort(ModuleConfig* moduleConfig) {
         auto ports =
                 moduleConfig->getRemoteSubmixPorts(IOTraits<Stream>::is_input, true /*singlePort*/);
         if (ports.empty()) return {};
-        AudioPort port = ports.front();
-        if (address) {
-            port.ext.template get<AudioPortExt::Tag::device>().device.address = address.value();
-        }
-        return port;
+        return ports.front();
     }
 
     void SetUp(IModule* module, ModuleConfig* moduleConfig) {
-        auto devicePort = getRemoteSubmixAudioPort(moduleConfig, mAddress);
+        auto devicePort = getRemoteSubmixAudioPort(moduleConfig);
         ASSERT_TRUE(devicePort.has_value()) << "Device port for remote submix device not found";
-        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, *devicePort));
+        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, *devicePort, mAddress));
         mAddress = mStream.getDevice().address;
     }
-
-    void StartWorkerToSendBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(mStream.StartWorkerToSendBurstCommands());
+    void SetUp(IModule* module, ModuleConfig* moduleConfig,
+               const AudioPortConfig& existingMixPortConfig,
+               const AudioPortConfig& existingDevicePortConfig) {
+        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, existingMixPortConfig,
+                                              existingDevicePortConfig));
+        mAddress = mStream.getDevice().address;
+    }
+    void StartWorkerToSendBurstCommands(size_t burstCount = 10, bool standbyInputWhenDone = false) {
+        ASSERT_NO_FATAL_FAILURE(
+                mStream.StartWorkerToSendBurstCommands(burstCount, standbyInputWhenDone));
     }
 
-    void JoinWorkerAfterBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands());
+    void JoinWorkerAfterBurstCommands(bool callPrepareToCloseBeforeJoin) {
+        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands(
+                true /*validatePositionIncrease*/, callPrepareToCloseBeforeJoin));
     }
 
-    void SendBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(mStream.StartWorkerToSendBurstCommands());
-        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands());
+    void JoinWorkerAfterBurstCommands(bool validatePositionIncrease,
+                                      bool callPrepareToCloseBeforeJoin) {
+        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands(validatePositionIncrease,
+                                                                     callPrepareToCloseBeforeJoin));
+    }
+
+    void SendBurstCommands(bool callPrepareToCloseBeforeJoin, size_t burstCount = 10,
+                           bool standbyInputWhenDone = false) {
+        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands(burstCount, standbyInputWhenDone));
+        // When 'burstCount == 0', there is no "previous" frame count, thus the check for
+        // the position increase fails.
+        ASSERT_NO_FATAL_FAILURE(JoinWorkerAfterBurstCommands(
+                burstCount > 0 /*validatePositionIncrease*/, callPrepareToCloseBeforeJoin));
     }
 
     std::optional<AudioDeviceAddress> getAudioDeviceAddress() const { return mAddress; }
+    const AudioPortConfig& getDevicePortConfig() const { return mStream.getDevicePortConfig(); }
+    int8_t getLastBurstIteration() const { return mStream.getLastData()[0]; }
+    const AudioPortConfig& getPortConfig() const { return mStream.getPortConfig(); }
     std::string skipTestReason() const { return mStream.skipTestReason(); }
 
   private:
-    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {}
-
     StreamFixtureWithWorker<Stream> mStream;
     std::optional<AudioDeviceAddress> mAddress;
 };
@@ -5007,77 +5102,130 @@
         }
         ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig());
     }
+
+    void TearDown() override {
+        streamIn.reset();
+        streamOut.reset();
+    }
+
+    void CreateOutputStream() {
+        streamOut = std::make_unique<WithRemoteSubmix<IStreamOut>>();
+        ASSERT_NO_FATAL_FAILURE(streamOut->SetUp(module.get(), moduleConfig.get()));
+        // Note: any issue with connection attempts is considered as a problem.
+        ASSERT_EQ("", streamOut->skipTestReason());
+        ASSERT_TRUE(streamOut->getAudioDeviceAddress().has_value());
+    }
+
+    void CreateInputStream(const std::optional<AudioDeviceAddress>& address = std::nullopt) {
+        if (address.has_value()) {
+            streamIn = std::make_unique<WithRemoteSubmix<IStreamIn>>(address.value());
+        } else {
+            ASSERT_TRUE(streamOut->getAudioDeviceAddress().has_value());
+            streamIn = std::make_unique<WithRemoteSubmix<IStreamIn>>(
+                    streamOut->getAudioDeviceAddress().value());
+        }
+        ASSERT_NO_FATAL_FAILURE(streamIn->SetUp(module.get(), moduleConfig.get()));
+        ASSERT_EQ("", streamIn->skipTestReason());
+        auto inAddress = streamIn->getAudioDeviceAddress();
+        ASSERT_TRUE(inAddress.has_value());
+        if (address.has_value()) {
+            if (address.value() != AudioDeviceAddress{}) {
+                ASSERT_EQ(address.value(), inAddress.value());
+            }
+        } else {
+            ASSERT_EQ(streamOut->getAudioDeviceAddress().value(), inAddress.value());
+        }
+    }
+
+    std::unique_ptr<WithRemoteSubmix<IStreamOut>> streamOut;
+    std::unique_ptr<WithRemoteSubmix<IStreamIn>> streamIn;
 };
 
 TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenNoInput) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    // Note: here and in other tests any issue with connection attempts is considered as a problem.
-    ASSERT_EQ("", streamOut.skipTestReason());
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
 }
 
-TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamOut.skipTestReason());
-    auto address = streamOut.getAudioDeviceAddress();
-    ASSERT_TRUE(address.has_value());
+TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputInStandby) {
+    if (int32_t version; module->getInterfaceVersion(&version).isOk() && version < 3) {
+        GTEST_SKIP() << "Default remote submix implementation <V3 could not pass this test";
+    }
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    // Send just 1 burst command. This triggers the condition "input is in standby after
+    // being active." The output must flush the fifo before writing to avoid being blocked.
+    ASSERT_NO_FATAL_FAILURE(
+            streamIn->StartWorkerToSendBurstCommands(1, true /*stanbyInputWhenDone*/));
+    // The output must be able to close without shutting down the pipe first (due to a call
+    // to 'prepareToClose').
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+    ASSERT_NO_FATAL_FAILURE(
+            streamIn->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+}
 
-    WithRemoteSubmix<IStreamIn> streamIn(address.value());
-    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamIn.skipTestReason());
+TEST_P(AudioModuleRemoteSubmix, BlockedOutputUnblocksOnClose) {
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    // Send just 3 burst command, but do not enter standby. This is a stalled input.
+    ASSERT_NO_FATAL_FAILURE(streamIn->StartWorkerToSendBurstCommands(3));
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(true /*callPrepareToCloseBeforeJoin*/));
+    ASSERT_NO_FATAL_FAILURE(
+            streamIn->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+}
 
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommands());
+TEST_P(AudioModuleRemoteSubmix, OutputBlocksUntilInputStarts) {
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    // Read the head of the pipe and check that it starts with the first output burst, that is,
+    // the contents of the very first write has not been superseded due to pipe overflow.
+    // The burstCount is '0' because the very first burst is used to exit from the 'IDLE' state,
+    // see 'makeBurstCommands'.
+    ASSERT_NO_FATAL_FAILURE(streamIn->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/, 0,
+                                                        true /*standbyInputWhenDone*/));
+    EXPECT_EQ(1, streamIn->getLastBurstIteration());
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(true /*callPrepareToCloseBeforeJoin*/));
 }
 
 TEST_P(AudioModuleRemoteSubmix, OutputAndInput) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamOut.skipTestReason());
-    auto address = streamOut.getAudioDeviceAddress();
-    ASSERT_TRUE(address.has_value());
-
-    WithRemoteSubmix<IStreamIn> streamIn(address.value());
-    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamIn.skipTestReason());
-
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
     // Start writing into the output stream.
-    ASSERT_NO_FATAL_FAILURE(streamOut.StartWorkerToSendBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
     // Simultaneously, read from the input stream.
-    ASSERT_NO_FATAL_FAILURE(streamIn.SendBurstCommands());
-    ASSERT_NO_FATAL_FAILURE(streamOut.JoinWorkerAfterBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(streamIn->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
 }
 
 TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
-    WithRemoteSubmix<IStreamOut> streamOut;
-    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
-    ASSERT_EQ("", streamOut.skipTestReason());
-    auto address = streamOut.getAudioDeviceAddress();
-    ASSERT_TRUE(address.has_value());
-
-    const size_t streamInCount = 3;
-    std::vector<std::unique_ptr<WithRemoteSubmix<IStreamIn>>> streamIns(streamInCount);
-    for (size_t i = 0; i < streamInCount; i++) {
-        streamIns[i] = std::make_unique<WithRemoteSubmix<IStreamIn>>(address.value());
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SetUp(module.get(), moduleConfig.get()));
+    ASSERT_NO_FATAL_FAILURE(CreateOutputStream());
+    ASSERT_NO_FATAL_FAILURE(CreateInputStream());
+    ASSERT_NO_FATAL_FAILURE(streamOut->StartWorkerToSendBurstCommands());
+    ASSERT_NO_FATAL_FAILURE(streamIn->SendBurstCommands(false /*callPrepareToCloseBeforeJoin*/, 1,
+                                                        true /*standbyInputWhenDone*/));
+    // For the new stream, only create a new mix port config and a new patch.
+    const size_t extraStreamInCount = 2;
+    std::vector<std::unique_ptr<WithRemoteSubmix<IStreamIn>>> streamIns(extraStreamInCount);
+    for (size_t i = 0; i < extraStreamInCount; i++) {
+        streamIns[i] = std::make_unique<WithRemoteSubmix<IStreamIn>>();
+        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SetUp(module.get(), moduleConfig.get(),
+                                                    streamIn->getPortConfig(),
+                                                    streamIn->getDevicePortConfig()));
         ASSERT_EQ("", streamIns[i]->skipTestReason());
+        const auto inAddress = streamIns[i]->getAudioDeviceAddress();
+        ASSERT_TRUE(inAddress.has_value());
+        ASSERT_EQ(streamOut->getAudioDeviceAddress().value(), inAddress.value());
+        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SendBurstCommands(
+                false /*callPrepareToCloseBeforeJoin*/, 1, true /*standbyInputWhenDone*/));
     }
-    // Start writing into the output stream.
-    ASSERT_NO_FATAL_FAILURE(streamOut.StartWorkerToSendBurstCommands());
-    // Simultaneously, read from input streams.
-    for (size_t i = 0; i < streamInCount; i++) {
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->StartWorkerToSendBurstCommands());
-    }
-    for (size_t i = 0; i < streamInCount; i++) {
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->JoinWorkerAfterBurstCommands());
-    }
-    ASSERT_NO_FATAL_FAILURE(streamOut.JoinWorkerAfterBurstCommands());
-    // Clean up input streams in the reverse order because the device connection is owned
-    // by the first one.
-    for (size_t i = streamInCount; i != 0; --i) {
-        streamIns[i - 1].reset();
-    }
+    ASSERT_NO_FATAL_FAILURE(
+            streamOut->JoinWorkerAfterBurstCommands(false /*callPrepareToCloseBeforeJoin*/));
 }
 
 INSTANTIATE_TEST_SUITE_P(AudioModuleRemoteSubmixTest, AudioModuleRemoteSubmix,