Effect AIDL: Move EffectThread process into mutex

To avoid the case of receive thread stop right before process, in this
case test case AudioEffectTest#ConsumeDataAfterRestart will fail.

Bug: 264618800
Test: atest VtsHalAudioEffectTargetTest
Change-Id: I3c00361a537bc7010e6cd138f637f68b963e8033
diff --git a/audio/aidl/vts/VtsHalAudioEffectTargetTest.cpp b/audio/aidl/vts/VtsHalAudioEffectTargetTest.cpp
index c5a0943..88bdd13 100644
--- a/audio/aidl/vts/VtsHalAudioEffectTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioEffectTargetTest.cpp
@@ -590,12 +590,14 @@
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
 
     std::vector<float> buffer;
-    EffectHelper::allocateInputData(common, inputMQ, buffer);
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
 
     ASSERT_NO_FATAL_FAILURE(close(mEffect));
     ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@@ -617,20 +619,24 @@
     auto outputMQ = std::make_unique<EffectHelper::DataMQ>(ret.outputDataMQ);
     ASSERT_TRUE(outputMQ->isValid());
 
-    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
-    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
-    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
-    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
-    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
-    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
-
     std::vector<float> buffer;
-    EffectHelper::allocateInputData(common, inputMQ, buffer);
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
+    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
+    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
+    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
+    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 0, outputMQ, buffer.size(), buffer));
+    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
+    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
+
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
 
     ASSERT_NO_FATAL_FAILURE(close(mEffect));
     ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@@ -653,14 +659,14 @@
     ASSERT_TRUE(outputMQ->isValid());
 
     std::vector<float> buffer;
-    EffectHelper::allocateInputData(common, inputMQ, buffer);
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
 
-    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
@@ -686,31 +692,30 @@
     ASSERT_TRUE(outputMQ->isValid());
 
     std::vector<float> buffer;
-    EffectHelper::allocateInputData(common, inputMQ, buffer);
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
 
-    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
-    // expect no status and data after consume
-    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
 
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
-    // expect no status and data after consume
-    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
 
     ASSERT_NO_FATAL_FAILURE(close(mEffect));
     ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
 }
 
-// Send data to IDLE state effects and expect it not be consumed.
-TEST_P(AudioEffectTest, NotConsumeDataInIdleState) {
+// Send data to processing state effects, stop, and restart.
+TEST_P(AudioEffectTest, ConsumeDataAndRestart) {
     ASSERT_NO_FATAL_FAILURE(create(mFactory, mEffect, mDescriptor));
 
     Parameter::Common common = EffectHelper::createParamCommon(
@@ -727,17 +732,21 @@
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
+    std::vector<float> buffer;
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
+
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
-
-    std::vector<float> buffer;
-    EffectHelper::allocateInputData(common, inputMQ, buffer);
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
-    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));
 
     ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
@@ -765,9 +774,9 @@
     ASSERT_TRUE(outputMQ->isValid());
 
     std::vector<float> buffer;
-    EffectHelper::allocateInputData(common, inputMQ, buffer);
-    EffectHelper::writeToFmq(inputMQ, buffer);
-    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));
 
     ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
 }
@@ -800,9 +809,10 @@
     ASSERT_TRUE(outputMQ1->isValid());
 
     std::vector<float> buffer1, buffer2;
-    EffectHelper::allocateInputData(common1, inputMQ1, buffer1);
-    EffectHelper::writeToFmq(inputMQ1, buffer1);
-    EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common1, inputMQ1, buffer1));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ1, buffer1));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1));
 
     auto statusMQ2 = std::make_unique<EffectHelper::StatusMQ>(ret2.statusMQ);
     ASSERT_TRUE(statusMQ2->isValid());
@@ -810,9 +820,10 @@
     ASSERT_TRUE(inputMQ2->isValid());
     auto outputMQ2 = std::make_unique<EffectHelper::DataMQ>(ret2.outputDataMQ);
     ASSERT_TRUE(outputMQ2->isValid());
-    EffectHelper::allocateInputData(common2, inputMQ2, buffer2);
-    EffectHelper::writeToFmq(inputMQ2, buffer2);
-    EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2);
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common2, inputMQ2, buffer2));
+    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ2, buffer2));
+    EXPECT_NO_FATAL_FAILURE(
+            EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2));
 
     ASSERT_NO_FATAL_FAILURE(command(effect1, CommandId::STOP));
     ASSERT_NO_FATAL_FAILURE(expectState(effect1, State::IDLE));