Merge changes Ibed7f4c3,I8a9f4f0b
* changes:
audio: Add pause, resume, and standby stream operations
audio: Add 'join' method to StreamWorker
diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
index db1ac22..da24a10 100644
--- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl
@@ -39,15 +39,34 @@
int frameSizeBytes;
long bufferSizeFrames;
android.hardware.audio.core.StreamDescriptor.AudioBuffer audio;
- const int COMMAND_BURST = 1;
+ const int LATENCY_UNKNOWN = -1;
@FixedSize @VintfStability
parcelable Position {
long frames;
long timeNs;
}
+ @Backing(type="int") @VintfStability
+ enum State {
+ STANDBY = 1,
+ IDLE = 2,
+ ACTIVE = 3,
+ PAUSED = 4,
+ DRAINING = 5,
+ DRAIN_PAUSED = 6,
+ ERROR = 100,
+ }
+ @Backing(type="int") @VintfStability
+ enum CommandCode {
+ START = 1,
+ BURST = 2,
+ DRAIN = 3,
+ STANDBY = 4,
+ PAUSE = 5,
+ FLUSH = 6,
+ }
@FixedSize @VintfStability
parcelable Command {
- int code;
+ android.hardware.audio.core.StreamDescriptor.CommandCode code = android.hardware.audio.core.StreamDescriptor.CommandCode.START;
int fmqByteCount;
}
@FixedSize @VintfStability
@@ -57,6 +76,8 @@
android.hardware.audio.core.StreamDescriptor.Position observable;
android.hardware.audio.core.StreamDescriptor.Position hardware;
int latencyMs;
+ int xrunFrames;
+ android.hardware.audio.core.StreamDescriptor.State state = android.hardware.audio.core.StreamDescriptor.State.STANDBY;
}
@VintfStability
union AudioBuffer {
diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl
index 735f87f..0959840 100644
--- a/audio/aidl/android/hardware/audio/core/IModule.aidl
+++ b/audio/aidl/android/hardware/audio/core/IModule.aidl
@@ -263,6 +263,9 @@
* be completing with an error, although data (zero filled) will still be
* provided.
*
+ * After the stream has been opened, it remains in the STANDBY state, see
+ * StreamDescriptor for more details.
+ *
* @return An opened input stream and the associated descriptor.
* @param args The pack of arguments, see 'OpenInputStreamArguments' parcelable.
* @throws EX_ILLEGAL_ARGUMENT In the following cases:
@@ -325,6 +328,9 @@
* StreamDescriptor will be completing with an error, although the data
* will still be accepted and immediately discarded.
*
+ * After the stream has been opened, it remains in the STANDBY state, see
+ * StreamDescriptor for more details.
+ *
* @return An opened output stream and the associated descriptor.
* @param args The pack of arguments, see 'OpenOutputStreamArguments' parcelable.
* @throws EX_ILLEGAL_ARGUMENT In the following cases:
diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
index 2b1ed8c..e5e56fc 100644
--- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
+++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl
@@ -33,6 +33,72 @@
* internal components of the stream while serving commands invoked via the
* stream's AIDL interface and commands invoked via the command queue of the
* descriptor.
+ *
+ * There is a state machine defined for the stream, which executes on the
+ * thread handling the commands from the queue. The states are defined based
+ * on the model of idealized producer and consumer connected via a ring buffer.
+ * For input streams, the "producer" is hardware, the "consumer" is software,
+ * for outputs streams it's the opposite. When the producer is active, but
+ * the buffer is full, the following actions are possible:
+ * - if the consumer is active, the producer blocks until there is space,
+ * this behavior is only possible for software producers;
+ * - if the consumer is passive:
+ * - the producer can preserve the buffer contents—a s/w producer can
+ * keep the data on their side, while a h/w producer can only drop captured
+ * data in this case;
+ * - or the producer overwrites old data in the buffer.
+ * Similarly, when an active consumer faces an empty buffer, it can:
+ * - block until there is data (producer must be active), only possible
+ * for software consumers;
+ * - walk away with no data; when the consumer is hardware, it must emit
+ * silence in this case.
+ *
+ * The model is defined below, note the asymmetry regarding the 'IDLE' state
+ * between input and output streams:
+ *
+ * Producer | Buffer state | Consumer | Applies | State
+ * active? | | active? | to |
+ * ==========|==============|==========|=========|==============================
+ * No | Empty | No | Both | STANDBY
+ * ----------|--------------|----------|---------|-----------------------------
+ * Yes | Filling up | No | Input | IDLE, overwrite behavior
+ * ----------|--------------|----------|---------|-----------------------------
+ * No | Empty | Yes† | Output | IDLE, h/w emits silence
+ * ----------|--------------|----------|---------|-----------------------------
+ * Yes | Not empty | Yes | Both | ACTIVE, s/w x-runs counted
+ * ----------|--------------|----------|---------|-----------------------------
+ * Yes | Filling up | No | Input | PAUSED, drop behavior
+ * ----------|--------------|----------|---------|-----------------------------
+ * Yes | Filling up | No† | Output | PAUSED, s/w stops writing once
+ * | | | | the buffer is filled up;
+ * | | | | h/w emits silence.
+ * ----------|--------------|----------|---------|-----------------------------
+ * No | Not empty | Yes | Both | DRAINING
+ * ----------|--------------|----------|---------|-----------------------------
+ * No | Not empty | No† | Output | DRAIN_PAUSED,
+ * | | | | h/w emits silence.
+ *
+ * † - note that for output, "buffer empty, h/w consuming" has the same outcome
+ * as "buffer not empty, h/w not consuming", but logically these conditions
+ * are different.
+ *
+ * State machines of both input and output streams start from the 'STANDBY'
+ * state. Transitions between states happen naturally with changes in the
+ * states of the model elements. For simplicity, we restrict the change to one
+ * element only, for example, in the 'STANDBY' state, either the producer or the
+ * consumer can become active, but not both at the same time. States 'STANDBY',
+ * 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event,
+ * whereas a change from the 'DRAINING' state can happen with time as the buffer
+ * gets empty.
+ *
+ * The state machine for input streams is defined in the `stream-in-sm.gv` file,
+ * for output streams—in the `stream-out-sm.gv` file. State machines define how
+ * commands (from the enum 'CommandCode') trigger state changes. The full list
+ * of states and commands is defined by constants of the 'State' enum. Note that
+ * the 'CLOSED' state does not have a constant in the interface because the
+ * client can never observe a stream with a functioning command queue in this
+ * state. The 'ERROR' state is a special state which the state machine enters
+ * when an unrecoverable hardware error is detected by the HAL module.
*/
@JavaDerive(equals=true, toString=true)
@VintfStability
@@ -55,12 +121,110 @@
long timeNs;
}
- /**
- * The command used for audio I/O, see 'AudioBuffer'. For MMap No IRQ mode
- * this command only provides updated positions and latency because actual
- * audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
- */
- const int COMMAND_BURST = 1;
+ @VintfStability
+ @Backing(type="int")
+ enum State {
+ /**
+ * 'STANDBY' is the initial state of the stream, entered after
+ * opening. Since both the producer and the consumer are inactive in
+ * this state, it allows the HAL module to put associated hardware into
+ * "standby" mode to save power.
+ */
+ STANDBY = 1,
+ /**
+ * In the 'IDLE' state the audio hardware is active. For input streams,
+ * the hardware is filling buffer with captured data, overwriting old
+ * contents on buffer wraparounds. For output streams, the buffer is
+ * still empty, and the hardware is outputting zeroes. The HAL module
+ * must not account for any under- or overruns as the client is not
+ * expected to perform audio I/O.
+ */
+ IDLE = 2,
+ /**
+ * The active state of the stream in which it handles audio I/O. The HAL
+ * module can assume that the audio I/O will be periodic, thus inability
+ * of the client to provide or consume audio data on time must be
+ * considered as an under- or overrun and indicated via the 'xrunFrames'
+ * field of the reply.
+ */
+ ACTIVE = 3,
+ /**
+ * In the 'PAUSED' state the consumer is inactive. For input streams,
+ * the hardware stops updating the buffer as soon as it fills up (this
+ * is the difference from the 'IDLE' state). For output streams,
+ * "inactivity" of hardware means that it does not consume audio data,
+ * but rather emits silence.
+ */
+ PAUSED = 4,
+ /**
+ * In the 'DRAINING' state the producer is inactive, the consumer is
+ * finishing up on the buffer contents, emptying it up. As soon as it
+ * gets empty, the stream transfers itself into the next state.
+ */
+ DRAINING = 5,
+ /**
+ * Used for output streams only, pauses draining. This state is similar
+ * to the 'PAUSED' state, except that the client is not adding any
+ * new data. If it emits a 'BURST' command, this brings the stream
+ * into the regular 'PAUSED' state.
+ */
+ DRAIN_PAUSED = 6,
+ /**
+ * The ERROR state is entered when the stream has encountered an
+ * irrecoverable error from the lower layer. After entering it, the
+ * stream can only be closed.
+ */
+ ERROR = 100,
+ }
+
+ @VintfStability
+ @Backing(type="int")
+ enum CommandCode {
+ /**
+ * See the state machines on the applicability of this command to
+ * different states. The 'fmqByteCount' field must always be set to 0.
+ */
+ START = 1,
+ /**
+ * The BURST command used for audio I/O, see 'AudioBuffer'. Differences
+ * for the MMap No IRQ mode:
+ *
+ * - this command only provides updated positions and latency because
+ * actual audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
+ * The client does not synchronize reads and writes into the buffer
+ * with sending of this command.
+ *
+ * - the 'fmqByteCount' must always be set to 0.
+ */
+ BURST = 2,
+ /**
+ * See the state machines on the applicability of this command to
+ * different states. The 'fmqByteCount' field must always be set to 0.
+ */
+ DRAIN = 3,
+ /**
+ * See the state machines on the applicability of this command to
+ * different states. The 'fmqByteCount' field must always be set to 0.
+ *
+ * Note that it's left on the discretion of the HAL implementation to
+ * assess all the necessary conditions that could prevent hardware from
+ * being suspended. Even if it can not be suspended, the state machine
+ * must still enter the 'STANDBY' state for consistency. Since the
+ * buffer must remain empty in this state, even if capturing hardware is
+ * still active, captured data must be discarded.
+ */
+ STANDBY = 4,
+ /**
+ * See the state machines on the applicability of this command to
+ * different states. The 'fmqByteCount' field must always be set to 0.
+ */
+ PAUSE = 5,
+ /**
+ * See the state machines on the applicability of this command to
+ * different states. The 'fmqByteCount' field must always be set to 0.
+ */
+ FLUSH = 6,
+ }
/**
* Used for sending commands to the HAL module. The client writes into
@@ -71,12 +235,16 @@
@FixedSize
parcelable Command {
/**
- * One of COMMAND_* codes.
+ * The code of the command.
*/
- int code;
+ CommandCode code = CommandCode.START;
/**
+ * This field is only used for the BURST command. For all other commands
+ * it must be set to 0. The following description applies to the use
+ * of this field for the BURST command.
+ *
* For output streams: the amount of bytes that the client requests the
- * HAL module to read from the 'audio.fmq' queue.
+ * HAL module to use out of the data contained in the 'audio.fmq' queue.
* For input streams: the amount of bytes requested by the client to
* read from the hardware into the 'audio.fmq' queue.
*
@@ -96,6 +264,12 @@
MQDescriptor<Command, SynchronizedReadWrite> command;
/**
+ * The value used for the 'Reply.latencyMs' field when the effective
+ * latency can not be reported by the HAL module.
+ */
+ const int LATENCY_UNKNOWN = -1;
+
+ /**
* Used for providing replies to commands. The HAL module writes into
* the queue, the client reads. The queue can only contain a single reply,
* corresponding to the last command sent by the client.
@@ -107,17 +281,22 @@
* One of Binder STATUS_* statuses:
* - STATUS_OK: the command has completed successfully;
* - STATUS_BAD_VALUE: invalid value in the 'Command' structure;
- * - STATUS_INVALID_OPERATION: the mix port is not connected
- * to any producer or consumer, thus
- * positions can not be reported;
+ * - STATUS_INVALID_OPERATION: the command is not applicable in the
+ * current state of the stream, or to this
+ * type of the stream;
+ * - STATUS_NO_INIT: positions can not be reported because the mix port
+ * is not connected to any producer or consumer, or
+ * because the HAL module does not support positions
+ * reporting for this AudioSource (on input streams).
* - STATUS_NOT_ENOUGH_DATA: a read or write error has
* occurred for the 'audio.fmq' queue;
- *
*/
int status;
/**
- * For output streams: the amount of bytes actually consumed by the HAL
- * module from the 'audio.fmq' queue.
+ * Used with the BURST command only.
+ *
+ * For output streams: the amount of bytes of data actually consumed
+ * by the HAL module.
* For input streams: the amount of bytes actually provided by the HAL
* in the 'audio.fmq' queue.
*
@@ -126,10 +305,18 @@
*/
int fmqByteCount;
/**
+ * It is recommended to report the current position for any command.
+ * If the position can not be reported, the 'status' field must be
+ * set to 'NO_INIT'.
+ *
* For output streams: the moment when the specified stream position
* was presented to an external observer (i.e. presentation position).
* For input streams: the moment when data at the specified stream position
* was acquired (i.e. capture position).
+ *
+ * The observable position must never be reset by the HAL module.
+ * The data type of the frame counter is large enough to support
+ * continuous counting for years of operation.
*/
Position observable;
/**
@@ -138,9 +325,22 @@
*/
Position hardware;
/**
- * Current latency reported by the hardware.
+ * Current latency reported by the hardware. It is recommended to
+ * report the current latency for any command. If the value of latency
+ * can not be determined, this field must be set to 'LATENCY_UNKNOWN'.
*/
int latencyMs;
+ /**
+ * Number of frames lost due to an underrun (for input streams),
+ * or not provided on time (for output streams) for the **previous**
+ * transfer operation.
+ */
+ int xrunFrames;
+ /**
+ * The state that the stream was in while the HAL module was sending the
+ * reply.
+ */
+ State state = State.STANDBY;
}
MQDescriptor<Reply, SynchronizedReadWrite> reply;
@@ -170,42 +370,59 @@
@VintfStability
union AudioBuffer {
/**
- * The fast message queue used for all modes except MMap No IRQ. Both
- * reads and writes into this queue are non-blocking because access to
- * this queue is synchronized via the 'command' and 'reply' queues as
- * described below. The queue nevertheless uses 'SynchronizedReadWrite'
- * because there is only one reader, and the reading position must be
- * shared.
+ * The fast message queue used for BURST commands in all modes except
+ * MMap No IRQ. Both reads and writes into this queue are non-blocking
+ * because access to this queue is synchronized via the 'command' and
+ * 'reply' queues as described below. The queue nevertheless uses
+ * 'SynchronizedReadWrite' because there is only one reader, and the
+ * reading position must be shared.
+ *
+ * Note that the fast message queue is a transient buffer, only used for
+ * data transfer. Neither of the sides can use it to store any data
+ * outside of the 'BURST' operation. The consumer must always retrieve
+ * all data available in the fast message queue, even if it can not use
+ * it. The producer must re-send any unconsumed data on the next
+ * transfer operation. This restriction is posed in order to make the
+ * fast message queue fully transparent from the latency perspective.
*
* For output streams the following sequence of operations is used:
* 1. The client writes audio data into the 'audio.fmq' queue.
- * 2. The client writes the 'BURST' command into the 'command' queue,
+ * 2. The client writes the BURST command into the 'command' queue,
* and hangs on waiting on a read from the 'reply' queue.
* 3. The high priority thread in the HAL module wakes up due to 2.
- * 4. The HAL module reads the command and audio data.
+ * 4. The HAL module reads the command and audio data. According
+ * to the statement above, the HAL module must always read
+ * from the FMQ all the data it contains. The amount of data that
+ * the HAL module has actually consumed is indicated to the client
+ * via the 'reply.fmqByteCount' field.
* 5. The HAL module writes the command status and current positions
* into 'reply' queue, and hangs on waiting on a read from
* the 'command' queue.
* 6. The client wakes up due to 5. and reads the reply.
*
* For input streams the following sequence of operations is used:
- * 1. The client writes the 'BURST' command into the 'command' queue,
+ * 1. The client writes the BURST command into the 'command' queue,
* and hangs on waiting on a read from the 'reply' queue.
* 2. The high priority thread in the HAL module wakes up due to 1.
* 3. The HAL module writes audio data into the 'audio.fmq' queue.
+ * The value of 'reply.fmqByteCount' must be the equal to the amount
+ * of data in the queue.
* 4. The HAL module writes the command status and current positions
* into 'reply' queue, and hangs on waiting on a read from
* the 'command' queue.
* 5. The client wakes up due to 4.
- * 6. The client reads the reply and audio data.
+ * 6. The client reads the reply and audio data. The client must
+ * always read from the FMQ all the data it contains.
+ *
*/
MQDescriptor<byte, SynchronizedReadWrite> fmq;
/**
* MMap buffers are shared directly with the DSP, which operates
- * independently from the CPU. Writes and reads into these buffers
- * are not synchronized with 'command' and 'reply' queues. However,
- * the client still uses the 'BURST' command for obtaining current
- * positions from the HAL module.
+ * independently from the CPU. Writes and reads into these buffers are
+ * not synchronized with 'command' and 'reply' queues. However, the
+ * client still uses the same commands for controlling the audio data
+ * exchange and for obtaining current positions and latency from the HAL
+ * module.
*/
MmapBufferDescriptor mmap;
}
diff --git a/audio/aidl/android/hardware/audio/core/stream-in-sm.gv b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv
new file mode 100644
index 0000000..889a14b
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv
@@ -0,0 +1,42 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// To render: dot -Tpng stream-in-sm.gv -o stream-in-sm.png
+digraph stream_in_state_machine {
+ node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
+ node [shape=point width=0.5] F;
+ node [shape=oval width=1];
+ node [fillcolor=lightgreen] STANDBY; // buffer is empty
+ node [fillcolor=tomato] CLOSED;
+ node [fillcolor=tomato] ERROR;
+ node [style=dashed] ANY_STATE;
+ node [fillcolor=lightblue style=filled];
+ I -> STANDBY;
+ STANDBY -> IDLE [label="START"]; // producer -> active
+ IDLE -> STANDBY [label="STANDBY"]; // producer -> passive, buffer is cleared
+ IDLE -> ACTIVE [label="BURST"]; // consumer -> active
+ ACTIVE -> ACTIVE [label="BURST"];
+ ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive
+ ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive
+ PAUSED -> ACTIVE [label="BURST"]; // consumer -> active
+ PAUSED -> STANDBY [label="FLUSH"]; // producer -> passive, buffer is cleared
+ DRAINING -> DRAINING [label="BURST"];
+ DRAINING -> ACTIVE [label="START"]; // producer -> active
+ DRAINING -> STANDBY [label="<empty buffer>"]; // consumer deactivates
+ IDLE -> ERROR [label="<hardware failure>"];
+ ACTIVE -> ERROR [label="<hardware failure>"];
+ PAUSED -> ERROR [label="<hardware failure>"];
+ ANY_STATE -> CLOSED [label="→IStream*.close"];
+ CLOSED -> F;
+}
diff --git a/audio/aidl/android/hardware/audio/core/stream-out-sm.gv b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv
new file mode 100644
index 0000000..56dd5290
--- /dev/null
+++ b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv
@@ -0,0 +1,48 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// To render: dot -Tpng stream-out-sm.gv -o stream-out-sm.png
+digraph stream_out_state_machine {
+ node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
+ node [shape=point width=0.5] F;
+ node [shape=oval width=1];
+ node [fillcolor=lightgreen] STANDBY; // buffer is empty
+ node [fillcolor=lightgreen] IDLE; // buffer is empty
+ node [fillcolor=tomato] CLOSED;
+ node [fillcolor=tomato] ERROR;
+ node [style=dashed] ANY_STATE;
+ node [fillcolor=lightblue style=filled];
+ I -> STANDBY;
+ STANDBY -> IDLE [label="START"]; // consumer -> active
+ STANDBY -> PAUSED [label="BURST"]; // producer -> active
+ IDLE -> STANDBY [label="STANDBY"]; // consumer -> passive
+ IDLE -> ACTIVE [label="BURST"]; // producer -> active
+ ACTIVE -> ACTIVE [label="BURST"];
+ ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive (not consuming)
+ ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive
+ PAUSED -> PAUSED [label="BURST"];
+ PAUSED -> ACTIVE [label="START"]; // consumer -> active
+ PAUSED -> IDLE [label="FLUSH"]; // producer -> passive, buffer is cleared
+ DRAINING -> IDLE [label="<empty buffer>"];
+ DRAINING -> ACTIVE [label="BURST"]; // producer -> active
+ DRAINING -> DRAIN_PAUSED [label="PAUSE"]; // consumer -> passive (not consuming)
+ DRAIN_PAUSED -> DRAINING [label="START"]; // consumer -> active
+ DRAIN_PAUSED -> PAUSED [label="BURST"]; // producer -> active
+ DRAIN_PAUSED -> IDLE [label="FLUSH"]; // buffer is cleared
+ IDLE -> ERROR [label="<hardware failure>"];
+ ACTIVE -> ERROR [label="<hardware failure>"];
+ DRAINING -> ERROR [label="<hardware failure>"];
+ ANY_STATE -> CLOSED [label="→IStream*.close"];
+ CLOSED -> F;
+}
diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp
index 9bca760..dda0e4a 100644
--- a/audio/aidl/common/StreamWorker.cpp
+++ b/audio/aidl/common/StreamWorker.cpp
@@ -44,6 +44,10 @@
mWorkerStateChangeRequest = true;
}
}
+ join();
+}
+
+void ThreadController::join() {
if (mWorker.joinable()) {
mWorker.join();
}
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 6260eca..ab2ec26 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -39,6 +39,9 @@
~ThreadController() { stop(); }
bool start(const std::string& name, int priority);
+ // Note: 'pause' and 'resume' methods should only be used on the "driving" side.
+ // In the case of audio HAL I/O, the driving side is the client, because the HAL
+ // implementation always blocks on getting a command.
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
bool hasError() {
@@ -50,6 +53,10 @@
return mError;
}
void stop();
+ // Direct use of 'join' assumes that the StreamLogic is not intended
+ // to run forever, and is guaranteed to exit by itself. This normally
+ // only happen in tests.
+ void join();
bool waitForAtLeastOneCycle();
// Only used by unit tests.
@@ -133,7 +140,8 @@
void resume() { mThread.resume(); }
bool hasError() { return mThread.hasError(); }
std::string getError() { return mThread.getError(); }
- void stop() { return mThread.stop(); }
+ void stop() { mThread.stop(); }
+ void join() { mThread.join(); }
bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
// Only used by unit tests.
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index e3e484d..8ea8424 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -160,6 +160,14 @@
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
+TEST_P(StreamWorkerTest, WorkerJoin) {
+ ASSERT_TRUE(worker.start());
+ stream.setStopStatus();
+ worker.join();
+ EXPECT_FALSE(worker.hasError());
+ EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
+}
+
TEST_P(StreamWorkerTest, WorkerError) {
ASSERT_TRUE(worker.start());
stream.setErrorStatus();
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 312df72..7b544a1 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -85,126 +85,315 @@
return "";
}
+void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
+ bool isConnected) const {
+ if (isConnected) {
+ reply->status = STATUS_OK;
+ reply->observable.frames = mFrameCount;
+ reply->observable.timeNs = ::android::elapsedRealtimeNano();
+ } else {
+ reply->status = STATUS_NO_INIT;
+ }
+}
+
const std::string StreamInWorkerLogic::kThreadName = "reader";
StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+ mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
StreamDescriptor::Reply reply{};
- if (command.code == StreamContext::COMMAND_EXIT &&
+ if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
command.fmqByteCount == mInternalCommandCookie) {
LOG(DEBUG) << __func__ << ": received EXIT command";
+ setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
- } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+ } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
+ LOG(DEBUG) << __func__ << ": received START read command";
+ if (mState == StreamDescriptor::State::STANDBY ||
+ mState == StreamDescriptor::State::DRAINING) {
+ populateReply(&reply, mIsConnected);
+ mState = mState == StreamDescriptor::State::STANDBY ? StreamDescriptor::State::IDLE
+ : StreamDescriptor::State::ACTIVE;
+ } else {
+ LOG(WARNING) << __func__ << ": START command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount
<< " bytes";
- usleep(3000); // Simulate a blocking call into the driver.
- const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
- mDataMQ->availableToWrite(), mDataBufferSize});
- const bool isConnected = mIsConnected;
- // Simulate reading of data, or provide zeroes if the stream is not connected.
- for (size_t i = 0; i < byteCount; ++i) {
- using buffer_type = decltype(mDataBuffer)::element_type;
- constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
- std::numeric_limits<buffer_type>::min() + 1;
- mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
- std::numeric_limits<buffer_type>::min()
- : 0;
- }
- bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true;
- if (success) {
- LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
- << " succeeded; connected? " << isConnected;
- // Frames are provided and counted regardless of connection status.
- reply.fmqByteCount = byteCount;
- mFrameCount += byteCount / mFrameSize;
- if (isConnected) {
- reply.status = STATUS_OK;
- reply.observable.frames = mFrameCount;
- reply.observable.timeNs = ::android::elapsedRealtimeNano();
- } else {
- reply.status = STATUS_INVALID_OPERATION;
+ if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE ||
+ mState == StreamDescriptor::State::PAUSED ||
+ mState == StreamDescriptor::State::DRAINING) {
+ if (!read(command.fmqByteCount, &reply)) {
+ mState = StreamDescriptor::State::ERROR;
+ }
+ if (mState == StreamDescriptor::State::IDLE ||
+ mState == StreamDescriptor::State::PAUSED) {
+ mState = StreamDescriptor::State::ACTIVE;
+ } else if (mState == StreamDescriptor::State::DRAINING) {
+ // To simplify the reference code, we assume that the read operation
+ // has consumed all the data remaining in the hardware buffer.
+ // TODO: Provide parametrization on the duration of draining to test
+ // handling of commands during the 'DRAINING' state.
+ mState = StreamDescriptor::State::STANDBY;
}
} else {
- LOG(WARNING) << __func__ << ": writing of " << byteCount
- << " bytes of data to MQ failed";
- reply.status = STATUS_NOT_ENOUGH_DATA;
+ LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
}
- reply.latencyMs = Module::kLatencyMs;
+ } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received DRAIN read command";
+ if (mState == StreamDescriptor::State::ACTIVE) {
+ usleep(1000); // Simulate a blocking call into the driver.
+ populateReply(&reply, mIsConnected);
+ // Can switch the state to ERROR if a driver error occurs.
+ mState = StreamDescriptor::State::DRAINING;
+ } else {
+ LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received PAUSE read command";
+ if (mState == StreamDescriptor::State::ACTIVE) {
+ usleep(1000); // Simulate a blocking call into the driver.
+ populateReply(&reply, mIsConnected);
+ // Can switch the state to ERROR if a driver error occurs.
+ mState = StreamDescriptor::State::PAUSED;
+ } else {
+ LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received FLUSH read command";
+ if (mState == StreamDescriptor::State::PAUSED) {
+ usleep(1000); // Simulate a blocking call into the driver.
+ populateReply(&reply, mIsConnected);
+ // Can switch the state to ERROR if a driver error occurs.
+ mState = StreamDescriptor::State::STANDBY;
+ } else {
+ LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
+ command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received STANDBY read command";
+ if (mState == StreamDescriptor::State::IDLE) {
+ usleep(1000); // Simulate a blocking call into the driver.
+ populateReply(&reply, mIsConnected);
+ // Can switch the state to ERROR if a driver error occurs.
+ mState = StreamDescriptor::State::STANDBY;
+ } else {
+ LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
} else {
LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
<< ") or count: " << command.fmqByteCount;
reply.status = STATUS_BAD_VALUE;
}
+ reply.state = mState;
LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+ mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
+bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
+ // Can switch the state to ERROR if a driver error occurs.
+ const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
+ const bool isConnected = mIsConnected;
+ bool fatal = false;
+ // Simulate reading of data, or provide zeroes if the stream is not connected.
+ for (size_t i = 0; i < byteCount; ++i) {
+ using buffer_type = decltype(mDataBuffer)::element_type;
+ constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
+ std::numeric_limits<buffer_type>::min() + 1;
+ mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
+ std::numeric_limits<buffer_type>::min()
+ : 0;
+ }
+ usleep(3000); // Simulate a blocking call into the driver.
+ // Set 'fatal = true' if a driver error occurs.
+ if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
+ LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+ << " succeeded; connected? " << isConnected;
+ // Frames are provided and counted regardless of connection status.
+ reply->fmqByteCount += byteCount;
+ mFrameCount += byteCount / mFrameSize;
+ populateReply(reply, isConnected);
+ } else {
+ LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
+ reply->status = STATUS_NOT_ENOUGH_DATA;
+ }
+ reply->latencyMs = Module::kLatencyMs;
+ return !fatal;
+}
+
const std::string StreamOutWorkerLogic::kThreadName = "writer";
StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
+ mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
StreamDescriptor::Reply reply{};
- if (command.code == StreamContext::COMMAND_EXIT &&
+ if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
command.fmqByteCount == mInternalCommandCookie) {
LOG(DEBUG) << __func__ << ": received EXIT command";
+ setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
- } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
+ } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
+ LOG(DEBUG) << __func__ << ": received START read command";
+ switch (mState) {
+ case StreamDescriptor::State::STANDBY:
+ mState = StreamDescriptor::State::IDLE;
+ break;
+ case StreamDescriptor::State::PAUSED:
+ mState = StreamDescriptor::State::ACTIVE;
+ break;
+ case StreamDescriptor::State::DRAIN_PAUSED:
+ mState = StreamDescriptor::State::PAUSED;
+ break;
+ default:
+ LOG(WARNING) << __func__ << ": START command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ if (reply.status != STATUS_INVALID_OPERATION) {
+ populateReply(&reply, mIsConnected);
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount
<< " bytes";
- const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
- mDataMQ->availableToRead(), mDataBufferSize});
- bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true;
- if (success) {
- const bool isConnected = mIsConnected;
- LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ"
- << " succeeded; connected? " << isConnected;
- // Frames are consumed and counted regardless of connection status.
- reply.fmqByteCount = byteCount;
- mFrameCount += byteCount / mFrameSize;
- if (isConnected) {
- reply.status = STATUS_OK;
- reply.observable.frames = mFrameCount;
- reply.observable.timeNs = ::android::elapsedRealtimeNano();
- } else {
- reply.status = STATUS_INVALID_OPERATION;
+ if (mState != StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states
+ if (!write(command.fmqByteCount, &reply)) {
+ mState = StreamDescriptor::State::ERROR;
}
- usleep(3000); // Simulate a blocking call into the driver.
+ if (mState == StreamDescriptor::State::STANDBY ||
+ mState == StreamDescriptor::State::DRAIN_PAUSED) {
+ mState = StreamDescriptor::State::PAUSED;
+ } else if (mState == StreamDescriptor::State::IDLE ||
+ mState == StreamDescriptor::State::DRAINING) {
+ mState = StreamDescriptor::State::ACTIVE;
+ } // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
} else {
- LOG(WARNING) << __func__ << ": reading of " << byteCount
- << " bytes of data from MQ failed";
- reply.status = STATUS_NOT_ENOUGH_DATA;
+ LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
}
- reply.latencyMs = Module::kLatencyMs;
+ } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received DRAIN write command";
+ if (mState == StreamDescriptor::State::ACTIVE) {
+ usleep(1000); // Simulate a blocking call into the driver.
+ populateReply(&reply, mIsConnected);
+ // Can switch the state to ERROR if a driver error occurs.
+ mState = StreamDescriptor::State::IDLE;
+ // Since there is no actual hardware that would be draining the buffer,
+ // in order to simplify the reference code, we assume that draining
+ // happens instantly, thus skipping the 'DRAINING' state.
+ // TODO: Provide parametrization on the duration of draining to test
+ // handling of commands during the 'DRAINING' state.
+ } else {
+ LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
+ command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received STANDBY write command";
+ if (mState == StreamDescriptor::State::IDLE) {
+ usleep(1000); // Simulate a blocking call into the driver.
+ populateReply(&reply, mIsConnected);
+ // Can switch the state to ERROR if a driver error occurs.
+ mState = StreamDescriptor::State::STANDBY;
+ } else {
+ LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received PAUSE write command";
+ if (mState == StreamDescriptor::State::ACTIVE ||
+ mState == StreamDescriptor::State::DRAINING) {
+ populateReply(&reply, mIsConnected);
+ mState = mState == StreamDescriptor::State::ACTIVE
+ ? StreamDescriptor::State::PAUSED
+ : StreamDescriptor::State::DRAIN_PAUSED;
+ } else {
+ LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
+ } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
+ LOG(DEBUG) << __func__ << ": received FLUSH write command";
+ if (mState == StreamDescriptor::State::PAUSED ||
+ mState == StreamDescriptor::State::DRAIN_PAUSED) {
+ populateReply(&reply, mIsConnected);
+ mState = StreamDescriptor::State::IDLE;
+ } else {
+ LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
+ << toString(mState);
+ reply.status = STATUS_INVALID_OPERATION;
+ }
} else {
LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
<< ") or count: " << command.fmqByteCount;
reply.status = STATUS_BAD_VALUE;
}
+ reply.state = mState;
LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
+ mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
+bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
+ const size_t readByteCount = mDataMQ->availableToRead();
+ // Amount of data that the HAL module is going to actually use.
+ const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
+ bool fatal = false;
+ if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
+ const bool isConnected = mIsConnected;
+ LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
+ << " succeeded; connected? " << isConnected;
+ // Frames are consumed and counted regardless of connection status.
+ reply->fmqByteCount += byteCount;
+ mFrameCount += byteCount / mFrameSize;
+ populateReply(reply, isConnected);
+ usleep(3000); // Simulate a blocking call into the driver.
+ // Set 'fatal = true' if a driver error occurs.
+ } else {
+ LOG(WARNING) << __func__ << ": reading of " << readByteCount
+ << " bytes of data from MQ failed";
+ reply->status = STATUS_NOT_ENOUGH_DATA;
+ }
+ reply->latencyMs = Module::kLatencyMs;
+ return !fatal;
+}
+
template <class Metadata, class StreamWorker>
StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
- if (!mIsClosed) {
+ if (!isClosed()) {
LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
stopWorker();
// The worker and the context should clean up by themselves via destructors.
@@ -214,13 +403,13 @@
template <class Metadata, class StreamWorker>
ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
LOG(DEBUG) << __func__;
- if (!mIsClosed) {
+ if (!isClosed()) {
stopWorker();
LOG(DEBUG) << __func__ << ": joining the worker thread...";
mWorker.stop();
LOG(DEBUG) << __func__ << ": worker thread joined";
mContext.reset();
- mIsClosed = true;
+ mWorker.setClosed();
return ndk::ScopedAStatus::ok();
} else {
LOG(ERROR) << __func__ << ": stream was already closed";
@@ -231,13 +420,14 @@
template <class Metadata, class StreamWorker>
void StreamCommon<Metadata, StreamWorker>::stopWorker() {
if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
- LOG(DEBUG) << __func__ << ": asking the worker to stop...";
+ LOG(DEBUG) << __func__ << ": asking the worker to exit...";
StreamDescriptor::Command cmd;
- cmd.code = StreamContext::COMMAND_EXIT;
+ cmd.code = StreamDescriptor::CommandCode(StreamContext::COMMAND_EXIT);
cmd.fmqByteCount = mContext.getInternalCommandCookie();
- // FIXME: This can block in the case when the client wrote a command
- // while the stream worker's cycle is not running. Need to revisit
- // when implementing standby and pause/resume.
+ // Note: never call 'pause' and 'resume' methods of StreamWorker
+ // in the HAL implementation. These methods are to be used by
+ // the client side only. Preventing the worker loop from running
+ // on the HAL side can cause a deadlock.
if (!commandMQ->writeBlocking(&cmd, 1)) {
LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
}
@@ -248,7 +438,7 @@
template <class Metadata, class StreamWorker>
ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
LOG(DEBUG) << __func__;
- if (!mIsClosed) {
+ if (!isClosed()) {
mMetadata = metadata;
return ndk::ScopedAStatus::ok();
}
diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h
index 488edf1..539fa8b 100644
--- a/audio/aidl/default/include/core-impl/Stream.h
+++ b/audio/aidl/default/include/core-impl/Stream.h
@@ -54,8 +54,10 @@
int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
DataMQ;
- // Ensure that this value is not used by any of StreamDescriptor.COMMAND_*
- static constexpr int COMMAND_EXIT = -1;
+ // Ensure that this value is not used by any of StreamDescriptor.CommandCode enums
+ static constexpr int32_t COMMAND_EXIT = -1;
+ // Ensure that this value is not used by any of StreamDescriptor.State enums
+ static constexpr int32_t STATE_CLOSED = -1;
StreamContext() = default;
StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
@@ -99,6 +101,10 @@
class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
public:
+ bool isClosed() const {
+ return static_cast<int32_t>(mState.load()) == StreamContext::STATE_CLOSED;
+ }
+ void setClosed() { mState = static_cast<StreamDescriptor::State>(StreamContext::STATE_CLOSED); }
void setIsConnected(bool connected) { mIsConnected = connected; }
protected:
@@ -109,9 +115,12 @@
mReplyMQ(context.getReplyMQ()),
mDataMQ(context.getDataMQ()) {}
std::string init() override;
+ void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
- // Used both by the main and worker threads.
+ // Atomic fields are used both by the main and worker threads.
std::atomic<bool> mIsConnected = false;
+ static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
+ std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY;
// All fields are used on the worker thread only.
const int mInternalCommandCookie;
const size_t mFrameSize;
@@ -132,6 +141,9 @@
protected:
Status cycle() override;
+
+ private:
+ bool read(size_t clientSize, StreamDescriptor::Reply* reply);
};
using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
@@ -143,6 +155,9 @@
protected:
Status cycle() override;
+
+ private:
+ bool write(size_t clientSize, StreamDescriptor::Reply* reply);
};
using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
@@ -155,7 +170,7 @@
? ndk::ScopedAStatus::ok()
: ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
- bool isClosed() const { return mIsClosed; }
+ bool isClosed() const { return mWorker.isClosed(); }
void setIsConnected(bool connected) { mWorker.setIsConnected(connected); }
ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
@@ -168,9 +183,6 @@
Metadata mMetadata;
StreamContext mContext;
StreamWorker mWorker;
- // This variable is checked in the destructor which can be called on an arbitrary Binder thread,
- // thus we need to ensure that any changes made by other threads are sequentially consistent.
- std::atomic<bool> mIsClosed = false;
};
class StreamIn
diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
index 2381200..b415da4 100644
--- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp
@@ -34,6 +34,7 @@
#include <aidl/android/media/audio/common/AudioIoFlags.h>
#include <aidl/android/media/audio/common/AudioOutputFlags.h>
#include <android-base/chrono_utils.h>
+#include <android/binder_enums.h>
#include <fmq/AidlMessageQueue.h>
#include "AudioHalBinderServiceUtil.h"
@@ -69,6 +70,7 @@
using android::hardware::audio::common::isBitPositionFlagSet;
using android::hardware::audio::common::StreamLogic;
using android::hardware::audio::common::StreamWorker;
+using ndk::enum_range;
using ndk::ScopedAStatus;
template <typename T>
@@ -171,25 +173,26 @@
AudioPortConfig mConfig;
};
-class AudioCoreModule : public testing::TestWithParam<std::string> {
+// Can be used as a base for any test here, does not depend on the fixture GTest parameters.
+class AudioCoreModuleBase {
public:
// The default buffer size is used mostly for negative tests.
static constexpr int kDefaultBufferSizeFrames = 256;
- void SetUp() override {
- ASSERT_NO_FATAL_FAILURE(ConnectToService());
+ void SetUpImpl(const std::string& moduleName) {
+ ASSERT_NO_FATAL_FAILURE(ConnectToService(moduleName));
debug.flags().simulateDeviceConnections = true;
ASSERT_NO_FATAL_FAILURE(debug.SetUp(module.get()));
}
- void TearDown() override {
+ void TearDownImpl() {
if (module != nullptr) {
EXPECT_IS_OK(module->setModuleDebug(ModuleDebug{}));
}
}
- void ConnectToService() {
- module = IModule::fromBinder(binderUtil.connectToService(GetParam()));
+ void ConnectToService(const std::string& moduleName) {
+ module = IModule::fromBinder(binderUtil.connectToService(moduleName));
ASSERT_NE(module, nullptr);
}
@@ -269,6 +272,13 @@
WithDebugFlags debug;
};
+class AudioCoreModule : public AudioCoreModuleBase, public testing::TestWithParam<std::string> {
+ public:
+ void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpImpl(GetParam())); }
+
+ void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownImpl()); }
+};
+
class WithDevicePortConnectedState {
public:
explicit WithDevicePortConnectedState(const AudioPort& idAndData) : mIdAndData(idAndData) {}
@@ -352,21 +362,36 @@
std::unique_ptr<DataMQ> mDataMQ;
};
-class StreamCommonLogic : public StreamLogic {
+class StreamLogicDriver {
public:
- StreamDescriptor::Position getLastObservablePosition() {
- std::lock_guard<std::mutex> lock(mLock);
- return mLastReply.observable;
- }
+ virtual ~StreamLogicDriver() = default;
+ // Return 'true' to stop the worker.
+ virtual bool done() = 0;
+ // For 'Writer' logic, if the 'actualSize' is 0, write is skipped.
+ // The 'fmqByteCount' from the returned command is passed as is to the HAL.
+ virtual StreamDescriptor::Command getNextCommand(int maxDataSize,
+ int* actualSize = nullptr) = 0;
+ // Return 'true' to indicate that no further processing is needed,
+ // for example, the driver is expecting a bad status to be returned.
+ // The logic cycle will return with 'CONTINUE' status. Otherwise,
+ // the reply will be validated and then passed to 'processValidReply'.
+ virtual bool interceptRawReply(const StreamDescriptor::Reply& reply) = 0;
+ // Return 'false' to indicate that the contents of the reply are unexpected.
+ // Will abort the logic cycle.
+ virtual bool processValidReply(const StreamDescriptor::Reply& reply) = 0;
+};
+class StreamCommonLogic : public StreamLogic {
protected:
- explicit StreamCommonLogic(const StreamContext& context)
+ StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver)
: mCommandMQ(context.getCommandMQ()),
mReplyMQ(context.getReplyMQ()),
mDataMQ(context.getDataMQ()),
- mData(context.getBufferSizeBytes()) {}
+ mData(context.getBufferSizeBytes()),
+ mDriver(driver) {}
StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; }
StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; }
+ StreamLogicDriver* getDriver() const { return mDriver; }
std::string init() override { return ""; }
@@ -374,19 +399,20 @@
StreamContext::ReplyMQ* mReplyMQ;
StreamContext::DataMQ* mDataMQ;
std::vector<int8_t> mData;
- std::mutex mLock;
- StreamDescriptor::Reply mLastReply GUARDED_BY(mLock);
+ StreamLogicDriver* const mDriver;
};
class StreamReaderLogic : public StreamCommonLogic {
public:
- explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+ StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver)
+ : StreamCommonLogic(context, driver) {}
protected:
Status cycle() override {
- StreamDescriptor::Command command{};
- command.code = StreamDescriptor::COMMAND_BURST;
- command.fmqByteCount = mData.size();
+ if (getDriver()->done()) {
+ return Status::EXIT;
+ }
+ StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size());
if (!mCommandMQ->writeBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": writing of command into MQ failed";
return Status::ABORT;
@@ -396,6 +422,9 @@
LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
return Status::ABORT;
}
+ if (getDriver()->interceptRawReply(reply)) {
+ return Status::CONTINUE;
+ }
if (reply.status != STATUS_OK) {
LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
return Status::ABORT;
@@ -405,16 +434,41 @@
<< ": received invalid byte count in the reply: " << reply.fmqByteCount;
return Status::ABORT;
}
- {
- std::lock_guard<std::mutex> lock(mLock);
- mLastReply = reply;
+ if (static_cast<size_t>(reply.fmqByteCount) != mDataMQ->availableToRead()) {
+ LOG(ERROR) << __func__
+ << ": the byte count in the reply is not the same as the amount of "
+ << "data available in the MQ: " << reply.fmqByteCount
+ << " != " << mDataMQ->availableToRead();
}
- const size_t readCount = std::min({mDataMQ->availableToRead(),
- static_cast<size_t>(reply.fmqByteCount), mData.size()});
- if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) {
+ if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
+ LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
+ return Status::ABORT;
+ }
+ if (reply.xrunFrames < 0) {
+ LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames;
+ return Status::ABORT;
+ }
+ if (std::find(enum_range<StreamDescriptor::State>().begin(),
+ enum_range<StreamDescriptor::State>().end(),
+ reply.state) == enum_range<StreamDescriptor::State>().end()) {
+ LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state);
+ return Status::ABORT;
+ }
+ const bool acceptedReply = getDriver()->processValidReply(reply);
+ if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) {
+ std::vector<int8_t> data(readCount);
+ if (mDataMQ->read(data.data(), readCount)) {
+ memcpy(mData.data(), data.data(), std::min(mData.size(), data.size()));
+ goto checkAcceptedReply;
+ }
+ LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
+ return Status::ABORT;
+ } // readCount == 0
+ checkAcceptedReply:
+ if (acceptedReply) {
return Status::CONTINUE;
}
- LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
+ LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
return Status::ABORT;
}
};
@@ -422,17 +476,20 @@
class StreamWriterLogic : public StreamCommonLogic {
public:
- explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {}
+ StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver)
+ : StreamCommonLogic(context, driver) {}
protected:
Status cycle() override {
- if (!mDataMQ->write(mData.data(), mData.size())) {
+ if (getDriver()->done()) {
+ return Status::EXIT;
+ }
+ int actualSize = 0;
+ StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize);
+ if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) {
LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
return Status::ABORT;
}
- StreamDescriptor::Command command{};
- command.code = StreamDescriptor::COMMAND_BURST;
- command.fmqByteCount = mData.size();
if (!mCommandMQ->writeBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": writing of command into MQ failed";
return Status::ABORT;
@@ -442,6 +499,9 @@
LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
return Status::ABORT;
}
+ if (getDriver()->interceptRawReply(reply)) {
+ return Status::CONTINUE;
+ }
if (reply.status != STATUS_OK) {
LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
return Status::ABORT;
@@ -451,11 +511,31 @@
<< ": received invalid byte count in the reply: " << reply.fmqByteCount;
return Status::ABORT;
}
- {
- std::lock_guard<std::mutex> lock(mLock);
- mLastReply = reply;
+ if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) {
+ LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: "
+ << "available to write " << mDataMQ->availableToWrite()
+ << ", total size: " << mDataMQ->getQuantumCount();
+ return Status::ABORT;
}
- return Status::CONTINUE;
+ if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
+ LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
+ return Status::ABORT;
+ }
+ if (reply.xrunFrames < 0) {
+ LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames;
+ return Status::ABORT;
+ }
+ if (std::find(enum_range<StreamDescriptor::State>().begin(),
+ enum_range<StreamDescriptor::State>().end(),
+ reply.state) == enum_range<StreamDescriptor::State>().end()) {
+ LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state);
+ return Status::ABORT;
+ }
+ if (getDriver()->processValidReply(reply)) {
+ return Status::CONTINUE;
+ }
+ LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
+ return Status::ABORT;
}
};
using StreamWriter = StreamWorker<StreamWriterLogic>;
@@ -466,52 +546,6 @@
using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
};
-// A dedicated version to test replies to invalid commands.
-class StreamInvalidCommandLogic : public StreamCommonLogic {
- public:
- StreamInvalidCommandLogic(const StreamContext& context,
- const std::vector<StreamDescriptor::Command>& commands)
- : StreamCommonLogic(context), mCommands(commands) {}
-
- std::vector<std::string> getUnexpectedStatuses() {
- std::lock_guard<std::mutex> lock(mLock);
- return mUnexpectedStatuses;
- }
-
- protected:
- Status cycle() override {
- // Send all commands in one cycle to simplify testing.
- // Extra logging helps to sort out issues with unexpected HAL behavior.
- for (const auto& command : mCommands) {
- LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ...";
- if (!getCommandMQ()->writeBlocking(&command, 1)) {
- LOG(ERROR) << __func__ << ": writing of command into MQ failed";
- return Status::ABORT;
- }
- StreamDescriptor::Reply reply{};
- LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "...";
- if (!getReplyMQ()->readBlocking(&reply, 1)) {
- LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
- return Status::ABORT;
- }
- LOG(INFO) << __func__ << ": received status " << statusToString(reply.status)
- << " for command " << command.toString();
- if (reply.status != STATUS_BAD_VALUE) {
- std::string s = command.toString();
- s.append(", ").append(statusToString(reply.status));
- std::lock_guard<std::mutex> lock(mLock);
- mUnexpectedStatuses.push_back(std::move(s));
- }
- };
- return Status::EXIT;
- }
-
- private:
- const std::vector<StreamDescriptor::Command> mCommands;
- std::mutex mLock;
- std::vector<std::string> mUnexpectedStatuses GUARDED_BY(mLock);
-};
-
template <typename Stream>
class WithStream {
public:
@@ -1208,6 +1242,46 @@
}
}
+class StreamLogicDriverInvalidCommand : public StreamLogicDriver {
+ public:
+ StreamLogicDriverInvalidCommand(const std::vector<StreamDescriptor::Command>& commands)
+ : mCommands(commands) {}
+
+ std::string getUnexpectedStatuses() {
+ // This method is intended to be called after the worker thread has joined,
+ // thus no extra synchronization is needed.
+ std::string s;
+ if (!mStatuses.empty()) {
+ s = std::string("Pairs of (command, actual status): ")
+ .append((android::internal::ToString(mStatuses)));
+ }
+ return s;
+ }
+
+ bool done() override { return mNextCommand >= mCommands.size(); }
+ StreamDescriptor::Command getNextCommand(int, int* actualSize) override {
+ if (actualSize != nullptr) *actualSize = 0;
+ return mCommands[mNextCommand++];
+ }
+ bool interceptRawReply(const StreamDescriptor::Reply& reply) override {
+ if (reply.status != STATUS_BAD_VALUE) {
+ std::string s = mCommands[mNextCommand - 1].toString();
+ s.append(", ").append(statusToString(reply.status));
+ mStatuses.push_back(std::move(s));
+ // If the HAL does not recognize the command as invalid,
+ // retrieve the data etc.
+ return reply.status != STATUS_OK;
+ }
+ return true;
+ }
+ bool processValidReply(const StreamDescriptor::Reply&) override { return true; }
+
+ private:
+ const std::vector<StreamDescriptor::Command> mCommands;
+ size_t mNextCommand = 0;
+ std::vector<std::string> mStatuses;
+};
+
template <typename Stream>
class AudioStream : public AudioCoreModule {
public:
@@ -1315,19 +1389,6 @@
EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value()));
}
- void ReadOrWrite(bool useSetupSequence2, bool validateObservablePosition) {
- const auto allPortConfigs =
- moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
- if (allPortConfigs.empty()) {
- GTEST_SKIP() << "No mix ports have attached devices";
- }
- for (const auto& portConfig : allPortConfigs) {
- EXPECT_NO_FATAL_FAILURE(
- ReadOrWriteImpl(portConfig, useSetupSequence2, validateObservablePosition))
- << portConfig.toString();
- }
- }
-
void ResetPortConfigWithOpenStream() {
const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
if (!portConfig.has_value()) {
@@ -1357,131 +1418,43 @@
<< stream1.getPortId();
}
- template <class Worker>
- void WaitForObservablePositionAdvance(Worker& worker) {
- static constexpr int kWriteDurationUs = 50 * 1000;
- static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000};
- int64_t framesInitial;
- framesInitial = worker.getLastObservablePosition().frames;
- ASSERT_FALSE(worker.hasError());
- bool timedOut = false;
- int64_t frames = framesInitial;
- for (android::base::Timer elapsed;
- frames <= framesInitial && !worker.hasError() &&
- !(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) {
- usleep(kWriteDurationUs);
- frames = worker.getLastObservablePosition().frames;
- }
- EXPECT_FALSE(timedOut);
- EXPECT_FALSE(worker.hasError()) << worker.getError();
- EXPECT_GT(frames, framesInitial);
- }
-
- void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useSetupSequence2,
- bool validateObservablePosition) {
- if (!useSetupSequence2) {
- ASSERT_NO_FATAL_FAILURE(
- ReadOrWriteSetupSequence1(portConfig, validateObservablePosition));
- } else {
- ASSERT_NO_FATAL_FAILURE(
- ReadOrWriteSetupSequence2(portConfig, validateObservablePosition));
- }
- }
-
- // Set up a patch first, then open a stream.
- void ReadOrWriteSetupSequence1(const AudioPortConfig& portConfig,
- bool validateObservablePosition) {
- auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
- IOTraits<Stream>::is_input, portConfig);
- ASSERT_FALSE(devicePorts.empty());
- auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
- WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
- ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
-
- WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
- ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
- typename IOTraits<Stream>::Worker worker(*stream.getContext());
-
- ASSERT_TRUE(worker.start());
- ASSERT_TRUE(worker.waitForAtLeastOneCycle());
- if (validateObservablePosition) {
- ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
- }
- }
-
- // Open a stream, then set up a patch for it.
- void ReadOrWriteSetupSequence2(const AudioPortConfig& portConfig,
- bool validateObservablePosition) {
- WithStream<Stream> stream(portConfig);
- ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
- typename IOTraits<Stream>::Worker worker(*stream.getContext());
-
- auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
- IOTraits<Stream>::is_input, portConfig);
- ASSERT_FALSE(devicePorts.empty());
- auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
- WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
- ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
-
- ASSERT_TRUE(worker.start());
- ASSERT_TRUE(worker.waitForAtLeastOneCycle());
- if (validateObservablePosition) {
- ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
- }
- }
-
void SendInvalidCommandImpl(const AudioPortConfig& portConfig) {
std::vector<StreamDescriptor::Command> commands(6);
- commands[0].code = -1;
- commands[1].code = StreamDescriptor::COMMAND_BURST - 1;
- commands[2].code = std::numeric_limits<int32_t>::min();
- commands[3].code = std::numeric_limits<int32_t>::max();
- commands[4].code = StreamDescriptor::COMMAND_BURST;
+ commands[0].code = StreamDescriptor::CommandCode(-1);
+ commands[1].code = StreamDescriptor::CommandCode(
+ static_cast<int32_t>(StreamDescriptor::CommandCode::START) - 1);
+ commands[2].code = StreamDescriptor::CommandCode(std::numeric_limits<int32_t>::min());
+ commands[3].code = StreamDescriptor::CommandCode(std::numeric_limits<int32_t>::max());
+ // TODO: For proper testing of input streams, need to put the stream into
+ // a state which accepts BURST commands.
+ commands[4].code = StreamDescriptor::CommandCode::BURST;
commands[4].fmqByteCount = -1;
- commands[5].code = StreamDescriptor::COMMAND_BURST;
+ commands[5].code = StreamDescriptor::CommandCode::BURST;
commands[5].fmqByteCount = std::numeric_limits<int32_t>::min();
WithStream<Stream> stream(portConfig);
ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
- StreamWorker<StreamInvalidCommandLogic> writer(*stream.getContext(), commands);
- ASSERT_TRUE(writer.start());
- writer.waitForAtLeastOneCycle();
- auto unexpectedStatuses = writer.getUnexpectedStatuses();
- EXPECT_EQ(0UL, unexpectedStatuses.size())
- << "Pairs of (command, actual status): "
- << android::internal::ToString(unexpectedStatuses);
+ StreamLogicDriverInvalidCommand driver(commands);
+ typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+ ASSERT_TRUE(worker.start());
+ worker.join();
+ EXPECT_EQ("", driver.getUnexpectedStatuses());
}
};
using AudioStreamIn = AudioStream<IStreamIn>;
using AudioStreamOut = AudioStream<IStreamOut>;
-#define TEST_IO_STREAM(method_name) \
+#define TEST_IN_AND_OUT_STREAM(method_name) \
TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
-#define TEST_IO_STREAM_2(method_name, arg1, arg2) \
- TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) { \
- ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \
- } \
- TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \
- ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \
- }
-TEST_IO_STREAM(CloseTwice);
-TEST_IO_STREAM(OpenAllConfigs);
-TEST_IO_STREAM(OpenInvalidBufferSize);
-TEST_IO_STREAM(OpenInvalidDirection);
-TEST_IO_STREAM(OpenOverMaxCount);
-TEST_IO_STREAM(OpenTwiceSamePortConfig);
-// Use of constants makes comprehensible test names.
-constexpr bool SetupSequence1 = false;
-constexpr bool SetupSequence2 = true;
-constexpr bool SetupOnly = false;
-constexpr bool ValidateObservablePosition = true;
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, SetupOnly);
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, SetupOnly);
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, ValidateObservablePosition);
-TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, ValidateObservablePosition);
-TEST_IO_STREAM(ResetPortConfigWithOpenStream);
-TEST_IO_STREAM(SendInvalidCommand);
+TEST_IN_AND_OUT_STREAM(CloseTwice);
+TEST_IN_AND_OUT_STREAM(OpenAllConfigs);
+TEST_IN_AND_OUT_STREAM(OpenInvalidBufferSize);
+TEST_IN_AND_OUT_STREAM(OpenInvalidDirection);
+TEST_IN_AND_OUT_STREAM(OpenOverMaxCount);
+TEST_IN_AND_OUT_STREAM(OpenTwiceSamePortConfig);
+TEST_IN_AND_OUT_STREAM(ResetPortConfigWithOpenStream);
+TEST_IN_AND_OUT_STREAM(SendInvalidCommand);
TEST_P(AudioStreamOut, OpenTwicePrimary) {
const auto mixPorts = moduleConfig->getMixPorts(false);
@@ -1523,6 +1496,163 @@
<< "when no offload info is provided for a compressed offload mix port";
}
+using CommandAndState = std::pair<StreamDescriptor::CommandCode, StreamDescriptor::State>;
+
+class StreamLogicDefaultDriver : public StreamLogicDriver {
+ public:
+ explicit StreamLogicDefaultDriver(const std::vector<CommandAndState>& commands)
+ : mCommands(commands) {}
+
+ // The three methods below is intended to be called after the worker
+ // thread has joined, thus no extra synchronization is needed.
+ bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; }
+ bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; }
+ std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; }
+
+ bool done() override { return mNextCommand >= mCommands.size(); }
+ StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override {
+ StreamDescriptor::Command command{};
+ command.code = mCommands[mNextCommand++].first;
+ const int dataSize = command.code == StreamDescriptor::CommandCode::BURST ? maxDataSize : 0;
+ command.fmqByteCount = dataSize;
+ if (actualSize != nullptr) {
+ // In the output scenario, reduce slightly the fmqByteCount to verify
+ // that the HAL module always consumes all data from the MQ.
+ if (command.fmqByteCount > 1) command.fmqByteCount--;
+ *actualSize = dataSize;
+ }
+ return command;
+ }
+ bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; }
+ bool processValidReply(const StreamDescriptor::Reply& reply) override {
+ if (mPreviousFrames.has_value()) {
+ if (reply.observable.frames > mPreviousFrames.value()) {
+ mObservablePositionIncrease = true;
+ } else if (reply.observable.frames < mPreviousFrames.value()) {
+ mRetrogradeObservablePosition = true;
+ }
+ }
+ mPreviousFrames = reply.observable.frames;
+
+ const auto& lastCommandState = mCommands[mNextCommand - 1];
+ if (lastCommandState.second != reply.state) {
+ std::string s = std::string("Unexpected transition from the state ")
+ .append(mPreviousState)
+ .append(" to ")
+ .append(toString(reply.state))
+ .append(" caused by the command ")
+ .append(toString(lastCommandState.first));
+ LOG(ERROR) << __func__ << ": " << s;
+ mUnexpectedTransition = std::move(s);
+ return false;
+ }
+ return true;
+ }
+
+ protected:
+ const std::vector<CommandAndState>& mCommands;
+ size_t mNextCommand = 0;
+ std::optional<int64_t> mPreviousFrames;
+ std::string mPreviousState = "<initial state>";
+ bool mObservablePositionIncrease = false;
+ bool mRetrogradeObservablePosition = false;
+ std::string mUnexpectedTransition;
+};
+
+using NamedCommandSequence = std::pair<std::string, std::vector<CommandAndState>>;
+enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ };
+using StreamIoTestParameters =
+ std::tuple<std::string /*moduleName*/, NamedCommandSequence, bool /*useSetupSequence2*/>;
+template <typename Stream>
+class AudioStreamIo : public AudioCoreModuleBase,
+ public testing::TestWithParam<StreamIoTestParameters> {
+ public:
+ void SetUp() override {
+ ASSERT_NO_FATAL_FAILURE(SetUpImpl(std::get<PARAM_MODULE_NAME>(GetParam())));
+ ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig());
+ }
+
+ void Run() {
+ const auto allPortConfigs =
+ moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
+ if (allPortConfigs.empty()) {
+ GTEST_SKIP() << "No mix ports have attached devices";
+ }
+ for (const auto& portConfig : allPortConfigs) {
+ SCOPED_TRACE(portConfig.toString());
+ const auto& commandsAndStates = std::get<PARAM_CMD_SEQ>(GetParam()).second;
+ if (!std::get<PARAM_SETUP_SEQ>(GetParam())) {
+ ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates));
+ } else {
+ ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq2(portConfig, commandsAndStates));
+ }
+ }
+ }
+
+ bool ValidateObservablePosition(const AudioPortConfig& /*portConfig*/) {
+ // May return false based on the portConfig, e.g. for telephony ports.
+ return true;
+ }
+
+ // Set up a patch first, then open a stream.
+ void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig,
+ const std::vector<CommandAndState>& commandsAndStates) {
+ auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+ IOTraits<Stream>::is_input, portConfig);
+ ASSERT_FALSE(devicePorts.empty());
+ auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+ WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
+ ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+ WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
+ ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+ StreamLogicDefaultDriver driver(commandsAndStates);
+ typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+
+ ASSERT_TRUE(worker.start());
+ worker.join();
+ EXPECT_FALSE(worker.hasError()) << worker.getError();
+ EXPECT_EQ("", driver.getUnexpectedStateTransition());
+ if (ValidateObservablePosition(portConfig)) {
+ EXPECT_TRUE(driver.hasObservablePositionIncrease());
+ EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
+ }
+ }
+
+ // Open a stream, then set up a patch for it.
+ void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig,
+ const std::vector<CommandAndState>& commandsAndStates) {
+ WithStream<Stream> stream(portConfig);
+ ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
+ StreamLogicDefaultDriver driver(commandsAndStates);
+ typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
+
+ auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
+ IOTraits<Stream>::is_input, portConfig);
+ ASSERT_FALSE(devicePorts.empty());
+ auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
+ WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
+ ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
+
+ ASSERT_TRUE(worker.start());
+ worker.join();
+ EXPECT_FALSE(worker.hasError()) << worker.getError();
+ EXPECT_EQ("", driver.getUnexpectedStateTransition());
+ if (ValidateObservablePosition(portConfig)) {
+ EXPECT_TRUE(driver.hasObservablePositionIncrease());
+ EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
+ }
+ }
+};
+using AudioStreamIoIn = AudioStreamIo<IStreamIn>;
+using AudioStreamIoOut = AudioStreamIo<IStreamOut>;
+
+#define TEST_IN_AND_OUT_STREAM_IO(method_name) \
+ TEST_P(AudioStreamIoIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
+ TEST_P(AudioStreamIoOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
+
+TEST_IN_AND_OUT_STREAM_IO(Run);
+
// Tests specific to audio patches. The fixure class is named 'AudioModulePatch'
// to avoid clashing with 'AudioPatch' class.
class AudioModulePatch : public AudioCoreModule {
@@ -1704,6 +1834,139 @@
testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
android::PrintInstanceNameToString);
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut);
+
+static const NamedCommandSequence kReadOrWriteSeq = std::make_pair(
+ std::string("ReadOrWrite"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE)});
+static const NamedCommandSequence kDrainInSeq = std::make_pair(
+ std::string("Drain"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+ StreamDescriptor::State::DRAINING),
+ std::make_pair(StreamDescriptor::CommandCode::START,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+ StreamDescriptor::State::DRAINING),
+ // TODO: This will need to be changed once DRAIN starts taking time.
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::STANDBY)});
+static const NamedCommandSequence kDrainOutSeq = std::make_pair(
+ std::string("Drain"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ // TODO: This will need to be changed once DRAIN starts taking time.
+ std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+ StreamDescriptor::State::IDLE)});
+// TODO: This will need to be changed once DRAIN starts taking time so we can pause it.
+static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair(
+ std::string("DrainPause"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::DRAIN,
+ StreamDescriptor::State::IDLE)});
+static const NamedCommandSequence kStandbySeq = std::make_pair(
+ std::string("Standby"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::STANDBY,
+ StreamDescriptor::State::STANDBY),
+ // Perform a read or write in order to advance observable position
+ // (this is verified by tests).
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE)});
+static const NamedCommandSequence kPauseInSeq = std::make_pair(
+ std::string("Pause"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::FLUSH,
+ StreamDescriptor::State::STANDBY)});
+static const NamedCommandSequence kPauseOutSeq = std::make_pair(
+ std::string("Pause"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::START,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::START,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED)});
+static const NamedCommandSequence kFlushInSeq = std::make_pair(
+ std::string("Flush"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::FLUSH,
+ StreamDescriptor::State::STANDBY)});
+static const NamedCommandSequence kFlushOutSeq = std::make_pair(
+ std::string("Flush"),
+ std::vector<CommandAndState>{
+ std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
+ std::make_pair(StreamDescriptor::CommandCode::BURST,
+ StreamDescriptor::State::ACTIVE),
+ std::make_pair(StreamDescriptor::CommandCode::PAUSE,
+ StreamDescriptor::State::PAUSED),
+ std::make_pair(StreamDescriptor::CommandCode::FLUSH,
+ StreamDescriptor::State::IDLE)});
+std::string GetStreamIoTestName(const testing::TestParamInfo<StreamIoTestParameters>& info) {
+ return android::PrintInstanceNameToString(
+ testing::TestParamInfo<std::string>{std::get<PARAM_MODULE_NAME>(info.param),
+ info.index})
+ .append("_")
+ .append(std::get<PARAM_CMD_SEQ>(info.param).first)
+ .append("_SetupSeq")
+ .append(std::get<PARAM_SETUP_SEQ>(info.param) ? "2" : "1");
+}
+INSTANTIATE_TEST_SUITE_P(
+ AudioStreamIoInTest, AudioStreamIoIn,
+ testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
+ testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq,
+ kFlushInSeq),
+ testing::Values(false, true)),
+ GetStreamIoTestName);
+GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoIn);
+INSTANTIATE_TEST_SUITE_P(
+ AudioStreamIoOutTest, AudioStreamIoOut,
+ testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
+ testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq,
+ kStandbySeq, kPauseOutSeq, kFlushOutSeq),
+ testing::Values(false, true)),
+ GetStreamIoTestName);
+GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut);
+
INSTANTIATE_TEST_SUITE_P(AudioPatchTest, AudioModulePatch,
testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
android::PrintInstanceNameToString);