blob: bb123a21b180a3adc394ea784abb8e9b2627e03a [file] [log] [blame]
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +00001/*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "AHAL_Stream"
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000018#include <android-base/logging.h>
Mikhail Naganove9f10fc2022-10-14 23:31:52 +000019#include <android/binder_ibinder_platform.h>
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000020#include <utils/SystemClock.h>
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000021
Mikhail Naganovef6bc742022-10-06 00:14:19 +000022#include <Utils.h>
23
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000024#include "core-impl/Module.h"
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000025#include "core-impl/Stream.h"
26
27using aidl::android::hardware::audio::common::SinkMetadata;
28using aidl::android::hardware::audio::common::SourceMetadata;
Mikhail Naganovef6bc742022-10-06 00:14:19 +000029using aidl::android::media::audio::common::AudioDevice;
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000030using aidl::android::media::audio::common::AudioOffloadInfo;
Mikhail Naganovef6bc742022-10-06 00:14:19 +000031using android::hardware::audio::common::getChannelCount;
32using android::hardware::audio::common::getFrameSizeInBytes;
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000033
34namespace aidl::android::hardware::audio::core {
35
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000036void StreamContext::fillDescriptor(StreamDescriptor* desc) {
37 if (mCommandMQ) {
38 desc->command = mCommandMQ->dupeDesc();
39 }
40 if (mReplyMQ) {
41 desc->reply = mReplyMQ->dupeDesc();
42 }
43 if (mDataMQ) {
Mikhail Naganovef6bc742022-10-06 00:14:19 +000044 const size_t frameSize = getFrameSize();
45 desc->frameSizeBytes = frameSize;
46 desc->bufferSizeFrames = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize() / frameSize;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000047 desc->audio.set<StreamDescriptor::AudioBuffer::Tag::fmq>(mDataMQ->dupeDesc());
48 }
Mikhail Naganov6a4872d2022-06-15 21:39:04 +000049}
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000050
Mikhail Naganovef6bc742022-10-06 00:14:19 +000051size_t StreamContext::getFrameSize() const {
52 return getFrameSizeInBytes(mFormat, mChannelLayout);
53}
54
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000055bool StreamContext::isValid() const {
56 if (mCommandMQ && !mCommandMQ->isValid()) {
57 LOG(ERROR) << "command FMQ is invalid";
58 return false;
59 }
60 if (mReplyMQ && !mReplyMQ->isValid()) {
61 LOG(ERROR) << "reply FMQ is invalid";
62 return false;
63 }
Mikhail Naganovef6bc742022-10-06 00:14:19 +000064 if (getFrameSize() == 0) {
65 LOG(ERROR) << "frame size is invalid";
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000066 return false;
67 }
68 if (mDataMQ && !mDataMQ->isValid()) {
69 LOG(ERROR) << "data FMQ is invalid";
70 return false;
71 }
72 return true;
73}
74
75void StreamContext::reset() {
76 mCommandMQ.reset();
77 mReplyMQ.reset();
78 mDataMQ.reset();
79}
80
81std::string StreamWorkerCommonLogic::init() {
82 if (mCommandMQ == nullptr) return "Command MQ is null";
83 if (mReplyMQ == nullptr) return "Reply MQ is null";
84 if (mDataMQ == nullptr) return "Data MQ is null";
85 if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
86 return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
87 }
88 mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
89 mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
90 if (mDataBuffer == nullptr) {
91 return "Failed to allocate data buffer for element count " +
92 std::to_string(mDataMQ->getQuantumCount()) +
93 ", size in bytes: " + std::to_string(mDataBufferSize);
94 }
95 return "";
96}
97
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +000098void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
99 bool isConnected) const {
Mikhail Naganov549a8222022-11-23 18:30:07 +0000100 reply->status = STATUS_OK;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000101 if (isConnected) {
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000102 reply->observable.frames = mFrameCount;
103 reply->observable.timeNs = ::android::elapsedRealtimeNano();
104 } else {
Mikhail Naganov549a8222022-11-23 18:30:07 +0000105 reply->observable.frames = StreamDescriptor::Position::UNKNOWN;
106 reply->observable.timeNs = StreamDescriptor::Position::UNKNOWN;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000107 }
108}
109
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000110void StreamWorkerCommonLogic::populateReplyWrongState(
111 StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const {
112 LOG(WARNING) << "command '" << toString(command.getTag())
113 << "' can not be handled in the state " << toString(mState);
114 reply->status = STATUS_INVALID_OPERATION;
115}
116
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000117const std::string StreamInWorkerLogic::kThreadName = "reader";
118
119StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000120 // Note: for input streams, draining is driven by the client, thus
121 // "empty buffer" condition can only happen while handling the 'burst'
122 // command. Thus, unlike for output streams, it does not make sense to
123 // delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
124 // TODO: Add a delay for transitions of async operations when/if they added.
125
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000126 StreamDescriptor::Command command{};
127 if (!mCommandMQ->readBlocking(&command, 1)) {
128 LOG(ERROR) << __func__ << ": reading of command from MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000129 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000130 return Status::ABORT;
131 }
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000132 LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000133 StreamDescriptor::Reply reply{};
Mikhail Naganov98334432022-11-09 02:44:32 +0000134 reply.status = STATUS_BAD_VALUE;
135 using Tag = StreamDescriptor::Command::Tag;
136 switch (command.getTag()) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000137 case Tag::halReservedExit:
138 if (const int32_t cookie = command.get<Tag::halReservedExit>();
Mikhail Naganov98334432022-11-09 02:44:32 +0000139 cookie == mInternalCommandCookie) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000140 setClosed();
141 // This is an internal command, no need to reply.
142 return Status::EXIT;
143 } else {
144 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000145 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000146 break;
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000147 case Tag::getStatus:
148 populateReply(&reply, mIsConnected);
149 break;
Mikhail Naganov98334432022-11-09 02:44:32 +0000150 case Tag::start:
Mikhail Naganov98334432022-11-09 02:44:32 +0000151 if (mState == StreamDescriptor::State::STANDBY ||
152 mState == StreamDescriptor::State::DRAINING) {
153 populateReply(&reply, mIsConnected);
154 mState = mState == StreamDescriptor::State::STANDBY
155 ? StreamDescriptor::State::IDLE
156 : StreamDescriptor::State::ACTIVE;
157 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000158 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000159 }
160 break;
161 case Tag::burst:
162 if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000163 LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
164 << fmqByteCount << " bytes";
Mikhail Naganov98334432022-11-09 02:44:32 +0000165 if (mState == StreamDescriptor::State::IDLE ||
166 mState == StreamDescriptor::State::ACTIVE ||
167 mState == StreamDescriptor::State::PAUSED ||
168 mState == StreamDescriptor::State::DRAINING) {
169 if (!read(fmqByteCount, &reply)) {
170 mState = StreamDescriptor::State::ERROR;
171 }
172 if (mState == StreamDescriptor::State::IDLE ||
173 mState == StreamDescriptor::State::PAUSED) {
174 mState = StreamDescriptor::State::ACTIVE;
175 } else if (mState == StreamDescriptor::State::DRAINING) {
176 // To simplify the reference code, we assume that the read operation
177 // has consumed all the data remaining in the hardware buffer.
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000178 // In a real implementation, here we would either remain in
179 // the 'DRAINING' state, or transfer to 'STANDBY' depending on the
180 // buffer state.
Mikhail Naganov98334432022-11-09 02:44:32 +0000181 mState = StreamDescriptor::State::STANDBY;
182 }
183 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000184 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000185 }
186 } else {
187 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
188 }
189 break;
190 case Tag::drain:
Mikhail Naganov30301a42022-09-13 01:20:45 +0000191 if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
192 if (mState == StreamDescriptor::State::ACTIVE) {
193 usleep(1000); // Simulate a blocking call into the driver.
194 populateReply(&reply, mIsConnected);
195 // Can switch the state to ERROR if a driver error occurs.
196 mState = StreamDescriptor::State::DRAINING;
197 } else {
198 populateReplyWrongState(&reply, command);
199 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000200 } else {
Mikhail Naganov30301a42022-09-13 01:20:45 +0000201 LOG(WARNING) << __func__
202 << ": invalid drain mode: " << toString(command.get<Tag::drain>());
Mikhail Naganov98334432022-11-09 02:44:32 +0000203 }
204 break;
205 case Tag::standby:
Mikhail Naganov98334432022-11-09 02:44:32 +0000206 if (mState == StreamDescriptor::State::IDLE) {
207 usleep(1000); // Simulate a blocking call into the driver.
208 populateReply(&reply, mIsConnected);
209 // Can switch the state to ERROR if a driver error occurs.
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000210 mState = StreamDescriptor::State::STANDBY;
Mikhail Naganov98334432022-11-09 02:44:32 +0000211 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000212 populateReplyWrongState(&reply, command);
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000213 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000214 break;
215 case Tag::pause:
Mikhail Naganov98334432022-11-09 02:44:32 +0000216 if (mState == StreamDescriptor::State::ACTIVE) {
217 usleep(1000); // Simulate a blocking call into the driver.
218 populateReply(&reply, mIsConnected);
219 // Can switch the state to ERROR if a driver error occurs.
220 mState = StreamDescriptor::State::PAUSED;
221 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000222 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000223 }
224 break;
225 case Tag::flush:
Mikhail Naganov98334432022-11-09 02:44:32 +0000226 if (mState == StreamDescriptor::State::PAUSED) {
227 usleep(1000); // Simulate a blocking call into the driver.
228 populateReply(&reply, mIsConnected);
229 // Can switch the state to ERROR if a driver error occurs.
230 mState = StreamDescriptor::State::STANDBY;
231 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000232 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000233 }
234 break;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000235 }
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000236 reply.state = mState;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000237 LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
238 if (!mReplyMQ->writeBlocking(&reply, 1)) {
239 LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000240 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000241 return Status::ABORT;
242 }
243 return Status::CONTINUE;
244}
245
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000246bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
247 // Can switch the state to ERROR if a driver error occurs.
248 const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
249 const bool isConnected = mIsConnected;
250 bool fatal = false;
251 // Simulate reading of data, or provide zeroes if the stream is not connected.
252 for (size_t i = 0; i < byteCount; ++i) {
253 using buffer_type = decltype(mDataBuffer)::element_type;
254 constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
255 std::numeric_limits<buffer_type>::min() + 1;
256 mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
257 std::numeric_limits<buffer_type>::min()
258 : 0;
259 }
260 usleep(3000); // Simulate a blocking call into the driver.
261 // Set 'fatal = true' if a driver error occurs.
262 if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
263 LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
264 << " succeeded; connected? " << isConnected;
265 // Frames are provided and counted regardless of connection status.
266 reply->fmqByteCount += byteCount;
267 mFrameCount += byteCount / mFrameSize;
268 populateReply(reply, isConnected);
269 } else {
270 LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
271 reply->status = STATUS_NOT_ENOUGH_DATA;
272 }
273 reply->latencyMs = Module::kLatencyMs;
274 return !fatal;
275}
276
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000277const std::string StreamOutWorkerLogic::kThreadName = "writer";
278
279StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
Mikhail Naganov30301a42022-09-13 01:20:45 +0000280 if (mState == StreamDescriptor::State::DRAINING ||
281 mState == StreamDescriptor::State::TRANSFERRING) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000282 if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
283 std::chrono::steady_clock::now() - mTransientStateStart);
284 stateDurationMs >= mTransientStateDelayMs) {
Mikhail Naganov30301a42022-09-13 01:20:45 +0000285 if (mAsyncCallback == nullptr) {
286 // In blocking mode, mState can only be DRAINING.
287 mState = StreamDescriptor::State::IDLE;
288 } else {
289 // In a real implementation, the driver should notify the HAL about
290 // drain or transfer completion. In the stub, we switch unconditionally.
291 if (mState == StreamDescriptor::State::DRAINING) {
292 mState = StreamDescriptor::State::IDLE;
293 ndk::ScopedAStatus status = mAsyncCallback->onDrainReady();
294 if (!status.isOk()) {
295 LOG(ERROR) << __func__ << ": error from onDrainReady: " << status;
296 }
297 } else {
298 mState = StreamDescriptor::State::ACTIVE;
299 ndk::ScopedAStatus status = mAsyncCallback->onTransferReady();
300 if (!status.isOk()) {
301 LOG(ERROR) << __func__ << ": error from onTransferReady: " << status;
302 }
303 }
304 }
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000305 if (mTransientStateDelayMs.count() != 0) {
306 LOG(DEBUG) << __func__ << ": switched to state " << toString(mState)
307 << " after a timeout";
308 }
309 }
310 }
311
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000312 StreamDescriptor::Command command{};
313 if (!mCommandMQ->readBlocking(&command, 1)) {
314 LOG(ERROR) << __func__ << ": reading of command from MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000315 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000316 return Status::ABORT;
317 }
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000318 LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000319 StreamDescriptor::Reply reply{};
Mikhail Naganov98334432022-11-09 02:44:32 +0000320 reply.status = STATUS_BAD_VALUE;
321 using Tag = StreamDescriptor::Command::Tag;
322 switch (command.getTag()) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000323 case Tag::halReservedExit:
324 if (const int32_t cookie = command.get<Tag::halReservedExit>();
Mikhail Naganov98334432022-11-09 02:44:32 +0000325 cookie == mInternalCommandCookie) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000326 setClosed();
327 // This is an internal command, no need to reply.
328 return Status::EXIT;
329 } else {
330 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
331 }
332 break;
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000333 case Tag::getStatus:
334 populateReply(&reply, mIsConnected);
335 break;
Mikhail Naganov30301a42022-09-13 01:20:45 +0000336 case Tag::start: {
337 bool commandAccepted = true;
Mikhail Naganov98334432022-11-09 02:44:32 +0000338 switch (mState) {
339 case StreamDescriptor::State::STANDBY:
340 mState = StreamDescriptor::State::IDLE;
341 break;
342 case StreamDescriptor::State::PAUSED:
343 mState = StreamDescriptor::State::ACTIVE;
344 break;
345 case StreamDescriptor::State::DRAIN_PAUSED:
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000346 switchToTransientState(StreamDescriptor::State::DRAINING);
Mikhail Naganov30301a42022-09-13 01:20:45 +0000347 break;
348 case StreamDescriptor::State::TRANSFER_PAUSED:
349 switchToTransientState(StreamDescriptor::State::TRANSFERRING);
Mikhail Naganov98334432022-11-09 02:44:32 +0000350 break;
351 default:
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000352 populateReplyWrongState(&reply, command);
Mikhail Naganov30301a42022-09-13 01:20:45 +0000353 commandAccepted = false;
Mikhail Naganov98334432022-11-09 02:44:32 +0000354 }
Mikhail Naganov30301a42022-09-13 01:20:45 +0000355 if (commandAccepted) {
356 populateReply(&reply, mIsConnected);
357 }
358 } break;
Mikhail Naganov98334432022-11-09 02:44:32 +0000359 case Tag::burst:
360 if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000361 LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
362 << fmqByteCount << " bytes";
Mikhail Naganov30301a42022-09-13 01:20:45 +0000363 if (mState != StreamDescriptor::State::ERROR &&
364 mState != StreamDescriptor::State::TRANSFERRING &&
365 mState != StreamDescriptor::State::TRANSFER_PAUSED) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000366 if (!write(fmqByteCount, &reply)) {
367 mState = StreamDescriptor::State::ERROR;
368 }
369 if (mState == StreamDescriptor::State::STANDBY ||
Mikhail Naganov30301a42022-09-13 01:20:45 +0000370 mState == StreamDescriptor::State::DRAIN_PAUSED ||
371 mState == StreamDescriptor::State::PAUSED) {
372 if (mAsyncCallback == nullptr ||
373 mState != StreamDescriptor::State::DRAIN_PAUSED) {
374 mState = StreamDescriptor::State::PAUSED;
375 } else {
376 mState = StreamDescriptor::State::TRANSFER_PAUSED;
377 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000378 } else if (mState == StreamDescriptor::State::IDLE ||
Mikhail Naganov30301a42022-09-13 01:20:45 +0000379 mState == StreamDescriptor::State::DRAINING ||
380 mState == StreamDescriptor::State::ACTIVE) {
381 if (mAsyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) {
382 mState = StreamDescriptor::State::ACTIVE;
383 } else {
384 switchToTransientState(StreamDescriptor::State::TRANSFERRING);
385 }
386 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000387 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000388 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000389 }
390 } else {
391 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
392 }
393 break;
394 case Tag::drain:
Mikhail Naganov30301a42022-09-13 01:20:45 +0000395 if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_ALL ||
396 command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
397 if (mState == StreamDescriptor::State::ACTIVE ||
398 mState == StreamDescriptor::State::TRANSFERRING) {
399 usleep(1000); // Simulate a blocking call into the driver.
400 populateReply(&reply, mIsConnected);
401 // Can switch the state to ERROR if a driver error occurs.
402 switchToTransientState(StreamDescriptor::State::DRAINING);
403 } else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) {
404 mState = StreamDescriptor::State::DRAIN_PAUSED;
405 populateReply(&reply, mIsConnected);
406 } else {
407 populateReplyWrongState(&reply, command);
408 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000409 } else {
Mikhail Naganov30301a42022-09-13 01:20:45 +0000410 LOG(WARNING) << __func__
411 << ": invalid drain mode: " << toString(command.get<Tag::drain>());
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000412 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000413 break;
414 case Tag::standby:
Mikhail Naganov98334432022-11-09 02:44:32 +0000415 if (mState == StreamDescriptor::State::IDLE) {
416 usleep(1000); // Simulate a blocking call into the driver.
417 populateReply(&reply, mIsConnected);
418 // Can switch the state to ERROR if a driver error occurs.
419 mState = StreamDescriptor::State::STANDBY;
420 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000421 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000422 }
423 break;
Mikhail Naganov30301a42022-09-13 01:20:45 +0000424 case Tag::pause: {
425 bool commandAccepted = true;
426 switch (mState) {
427 case StreamDescriptor::State::ACTIVE:
428 mState = StreamDescriptor::State::PAUSED;
429 break;
430 case StreamDescriptor::State::DRAINING:
431 mState = StreamDescriptor::State::DRAIN_PAUSED;
432 break;
433 case StreamDescriptor::State::TRANSFERRING:
434 mState = StreamDescriptor::State::TRANSFER_PAUSED;
435 break;
436 default:
437 populateReplyWrongState(&reply, command);
438 commandAccepted = false;
Mikhail Naganov98334432022-11-09 02:44:32 +0000439 }
Mikhail Naganov30301a42022-09-13 01:20:45 +0000440 if (commandAccepted) {
441 populateReply(&reply, mIsConnected);
442 }
443 } break;
Mikhail Naganov98334432022-11-09 02:44:32 +0000444 case Tag::flush:
Mikhail Naganov98334432022-11-09 02:44:32 +0000445 if (mState == StreamDescriptor::State::PAUSED ||
Mikhail Naganov30301a42022-09-13 01:20:45 +0000446 mState == StreamDescriptor::State::DRAIN_PAUSED ||
447 mState == StreamDescriptor::State::TRANSFER_PAUSED) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000448 populateReply(&reply, mIsConnected);
449 mState = StreamDescriptor::State::IDLE;
450 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000451 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000452 }
453 break;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000454 }
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000455 reply.state = mState;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000456 LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
457 if (!mReplyMQ->writeBlocking(&reply, 1)) {
458 LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000459 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000460 return Status::ABORT;
461 }
462 return Status::CONTINUE;
463}
464
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000465bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
466 const size_t readByteCount = mDataMQ->availableToRead();
467 // Amount of data that the HAL module is going to actually use.
468 const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
469 bool fatal = false;
470 if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
471 const bool isConnected = mIsConnected;
472 LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
473 << " succeeded; connected? " << isConnected;
474 // Frames are consumed and counted regardless of connection status.
475 reply->fmqByteCount += byteCount;
476 mFrameCount += byteCount / mFrameSize;
477 populateReply(reply, isConnected);
478 usleep(3000); // Simulate a blocking call into the driver.
479 // Set 'fatal = true' if a driver error occurs.
480 } else {
481 LOG(WARNING) << __func__ << ": reading of " << readByteCount
482 << " bytes of data from MQ failed";
483 reply->status = STATUS_NOT_ENOUGH_DATA;
484 }
485 reply->latencyMs = Module::kLatencyMs;
486 return !fatal;
487}
488
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000489template <class Metadata, class StreamWorker>
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000490StreamCommonImpl<Metadata, StreamWorker>::~StreamCommonImpl() {
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000491 if (!isClosed()) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000492 LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
493 stopWorker();
494 // The worker and the context should clean up by themselves via destructors.
495 }
496}
497
498template <class Metadata, class StreamWorker>
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000499void StreamCommonImpl<Metadata, StreamWorker>::createStreamCommon(
500 const std::shared_ptr<StreamCommonInterface>& delegate) {
501 if (mCommon != nullptr) {
502 LOG(FATAL) << __func__ << ": attempting to create the common interface twice";
503 }
504 mCommon = ndk::SharedRefBase::make<StreamCommon>(delegate);
505 mCommonBinder = mCommon->asBinder();
506 AIBinder_setMinSchedulerPolicy(mCommonBinder.get(), SCHED_NORMAL, ANDROID_PRIORITY_AUDIO);
507}
508
509template <class Metadata, class StreamWorker>
510ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::getStreamCommon(
511 std::shared_ptr<IStreamCommon>* _aidl_return) {
512 if (mCommon == nullptr) {
513 LOG(FATAL) << __func__ << ": the common interface was not created";
514 }
515 *_aidl_return = mCommon;
516 LOG(DEBUG) << __func__ << ": returning " << _aidl_return->get()->asBinder().get();
517 return ndk::ScopedAStatus::ok();
518}
519
520template <class Metadata, class StreamWorker>
521ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::updateHwAvSyncId(
522 int32_t in_hwAvSyncId) {
523 LOG(DEBUG) << __func__ << ": id " << in_hwAvSyncId;
524 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
525}
526
527template <class Metadata, class StreamWorker>
528ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::getVendorParameters(
529 const std::vector<std::string>& in_ids, std::vector<VendorParameter>* _aidl_return) {
530 LOG(DEBUG) << __func__ << ": id count: " << in_ids.size();
531 (void)_aidl_return;
532 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
533}
534
535template <class Metadata, class StreamWorker>
536ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::setVendorParameters(
537 const std::vector<VendorParameter>& in_parameters, bool in_async) {
538 LOG(DEBUG) << __func__ << ": parameters count " << in_parameters.size()
539 << ", async: " << in_async;
540 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
541}
542
543template <class Metadata, class StreamWorker>
Mikhail Naganovfb1acde2022-12-12 18:57:36 +0000544ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::addEffect(
545 const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) {
546 if (in_effect == nullptr) {
547 LOG(DEBUG) << __func__ << ": null effect";
548 } else {
549 LOG(DEBUG) << __func__ << ": effect Binder" << in_effect->asBinder().get();
550 }
551 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
552}
553
554template <class Metadata, class StreamWorker>
555ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::removeEffect(
556 const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) {
557 if (in_effect == nullptr) {
558 LOG(DEBUG) << __func__ << ": null effect";
559 } else {
560 LOG(DEBUG) << __func__ << ": effect Binder" << in_effect->asBinder().get();
561 }
562 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
563}
564
565template <class Metadata, class StreamWorker>
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000566ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::close() {
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000567 LOG(DEBUG) << __func__;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000568 if (!isClosed()) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000569 stopWorker();
570 LOG(DEBUG) << __func__ << ": joining the worker thread...";
571 mWorker.stop();
572 LOG(DEBUG) << __func__ << ": worker thread joined";
573 mContext.reset();
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000574 mWorker.setClosed();
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000575 return ndk::ScopedAStatus::ok();
576 } else {
577 LOG(ERROR) << __func__ << ": stream was already closed";
578 return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
579 }
580}
581
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000582template <class Metadata, class StreamWorker>
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000583void StreamCommonImpl<Metadata, StreamWorker>::stopWorker() {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000584 if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000585 LOG(DEBUG) << __func__ << ": asking the worker to exit...";
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000586 auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
587 mContext.getInternalCommandCookie());
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000588 // Note: never call 'pause' and 'resume' methods of StreamWorker
589 // in the HAL implementation. These methods are to be used by
590 // the client side only. Preventing the worker loop from running
591 // on the HAL side can cause a deadlock.
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000592 if (!commandMQ->writeBlocking(&cmd, 1)) {
593 LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
594 }
595 LOG(DEBUG) << __func__ << ": done";
596 }
597}
598
599template <class Metadata, class StreamWorker>
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000600ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::updateMetadata(
601 const Metadata& metadata) {
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000602 LOG(DEBUG) << __func__;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000603 if (!isClosed()) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000604 mMetadata = metadata;
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000605 return ndk::ScopedAStatus::ok();
606 }
607 LOG(ERROR) << __func__ << ": stream was closed";
608 return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
609}
610
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000611// static
612ndk::ScopedAStatus StreamIn::createInstance(const common::SinkMetadata& sinkMetadata,
613 StreamContext context,
614 const std::vector<MicrophoneInfo>& microphones,
615 std::shared_ptr<StreamIn>* result) {
616 auto stream = ndk::SharedRefBase::make<StreamIn>(sinkMetadata, std::move(context), microphones);
617 if (auto status = stream->init(); !status.isOk()) {
618 return status;
619 }
620 stream->createStreamCommon(stream);
621 *result = std::move(stream);
622 return ndk::ScopedAStatus::ok();
623}
624
Mikhail Naganovef6bc742022-10-06 00:14:19 +0000625namespace {
626static std::map<AudioDevice, std::string> transformMicrophones(
627 const std::vector<MicrophoneInfo>& microphones) {
628 std::map<AudioDevice, std::string> result;
629 std::transform(microphones.begin(), microphones.end(), std::inserter(result, result.begin()),
630 [](const auto& mic) { return std::make_pair(mic.device, mic.id); });
631 return result;
632}
633} // namespace
634
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000635StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext&& context,
Mikhail Naganovef6bc742022-10-06 00:14:19 +0000636 const std::vector<MicrophoneInfo>& microphones)
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000637 : StreamCommonImpl<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)),
Mikhail Naganovef6bc742022-10-06 00:14:19 +0000638 mMicrophones(transformMicrophones(microphones)) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000639 LOG(DEBUG) << __func__;
640}
641
Mikhail Naganovef6bc742022-10-06 00:14:19 +0000642ndk::ScopedAStatus StreamIn::getActiveMicrophones(
643 std::vector<MicrophoneDynamicInfo>* _aidl_return) {
644 std::vector<MicrophoneDynamicInfo> result;
645 std::vector<MicrophoneDynamicInfo::ChannelMapping> channelMapping{
646 getChannelCount(mContext.getChannelLayout()),
647 MicrophoneDynamicInfo::ChannelMapping::DIRECT};
648 for (auto it = mConnectedDevices.begin(); it != mConnectedDevices.end(); ++it) {
649 if (auto micIt = mMicrophones.find(*it); micIt != mMicrophones.end()) {
650 MicrophoneDynamicInfo dynMic;
651 dynMic.id = micIt->second;
652 dynMic.channelMapping = channelMapping;
653 result.push_back(std::move(dynMic));
654 }
655 }
656 *_aidl_return = std::move(result);
657 LOG(DEBUG) << __func__ << ": returning " << ::android::internal::ToString(*_aidl_return);
658 return ndk::ScopedAStatus::ok();
659}
660
661ndk::ScopedAStatus StreamIn::getMicrophoneDirection(MicrophoneDirection* _aidl_return) {
662 LOG(DEBUG) << __func__;
663 (void)_aidl_return;
664 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
665}
666
667ndk::ScopedAStatus StreamIn::setMicrophoneDirection(MicrophoneDirection in_direction) {
668 LOG(DEBUG) << __func__ << ": direction " << toString(in_direction);
669 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
670}
671
672ndk::ScopedAStatus StreamIn::getMicrophoneFieldDimension(float* _aidl_return) {
673 LOG(DEBUG) << __func__;
674 (void)_aidl_return;
675 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
676}
677
678ndk::ScopedAStatus StreamIn::setMicrophoneFieldDimension(float in_zoom) {
679 LOG(DEBUG) << __func__ << ": zoom " << in_zoom;
680 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
681}
682
Mikhail Naganov383cd422022-10-15 00:25:45 +0000683ndk::ScopedAStatus StreamIn::getHwGain(std::vector<float>* _aidl_return) {
684 LOG(DEBUG) << __func__;
685 (void)_aidl_return;
686 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
687}
688
689ndk::ScopedAStatus StreamIn::setHwGain(const std::vector<float>& in_channelGains) {
690 LOG(DEBUG) << __func__ << ": gains " << ::android::internal::ToString(in_channelGains);
691 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
692}
693
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000694// static
695ndk::ScopedAStatus StreamOut::createInstance(const SourceMetadata& sourceMetadata,
696 StreamContext context,
697 const std::optional<AudioOffloadInfo>& offloadInfo,
698 std::shared_ptr<StreamOut>* result) {
699 auto stream =
700 ndk::SharedRefBase::make<StreamOut>(sourceMetadata, std::move(context), offloadInfo);
701 if (auto status = stream->init(); !status.isOk()) {
702 return status;
703 }
704 stream->createStreamCommon(stream);
705 *result = std::move(stream);
706 return ndk::ScopedAStatus::ok();
707}
708
709StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext&& context,
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000710 const std::optional<AudioOffloadInfo>& offloadInfo)
Mikhail Naganove9f10fc2022-10-14 23:31:52 +0000711 : StreamCommonImpl<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000712 mOffloadInfo(offloadInfo) {
Mikhail Naganov6a4872d2022-06-15 21:39:04 +0000713 LOG(DEBUG) << __func__;
714}
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000715
Mikhail Naganov383cd422022-10-15 00:25:45 +0000716ndk::ScopedAStatus StreamOut::getHwVolume(std::vector<float>* _aidl_return) {
717 LOG(DEBUG) << __func__;
718 (void)_aidl_return;
719 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
720}
721
722ndk::ScopedAStatus StreamOut::setHwVolume(const std::vector<float>& in_channelVolumes) {
723 LOG(DEBUG) << __func__ << ": gains " << ::android::internal::ToString(in_channelVolumes);
724 return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
725}
726
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000727} // namespace aidl::android::hardware::audio::core