blob: c5d00a251724610cceb472fcafb6309ece55a4c5 [file] [log] [blame]
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +00001/*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "AHAL_Stream"
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000018#include <android-base/logging.h>
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000019#include <utils/SystemClock.h>
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000020
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000021#include "core-impl/Module.h"
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000022#include "core-impl/Stream.h"
23
24using aidl::android::hardware::audio::common::SinkMetadata;
25using aidl::android::hardware::audio::common::SourceMetadata;
26using aidl::android::media::audio::common::AudioOffloadInfo;
27
28namespace aidl::android::hardware::audio::core {
29
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000030void StreamContext::fillDescriptor(StreamDescriptor* desc) {
31 if (mCommandMQ) {
32 desc->command = mCommandMQ->dupeDesc();
33 }
34 if (mReplyMQ) {
35 desc->reply = mReplyMQ->dupeDesc();
36 }
37 if (mDataMQ) {
Mikhail Naganova2c71412022-08-19 21:37:35 +000038 desc->frameSizeBytes = mFrameSize;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000039 desc->bufferSizeFrames =
40 mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize() / mFrameSize;
41 desc->audio.set<StreamDescriptor::AudioBuffer::Tag::fmq>(mDataMQ->dupeDesc());
42 }
Mikhail Naganov6a4872d2022-06-15 21:39:04 +000043}
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +000044
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +000045bool StreamContext::isValid() const {
46 if (mCommandMQ && !mCommandMQ->isValid()) {
47 LOG(ERROR) << "command FMQ is invalid";
48 return false;
49 }
50 if (mReplyMQ && !mReplyMQ->isValid()) {
51 LOG(ERROR) << "reply FMQ is invalid";
52 return false;
53 }
54 if (mFrameSize == 0) {
55 LOG(ERROR) << "frame size is not set";
56 return false;
57 }
58 if (mDataMQ && !mDataMQ->isValid()) {
59 LOG(ERROR) << "data FMQ is invalid";
60 return false;
61 }
62 return true;
63}
64
65void StreamContext::reset() {
66 mCommandMQ.reset();
67 mReplyMQ.reset();
68 mDataMQ.reset();
69}
70
71std::string StreamWorkerCommonLogic::init() {
72 if (mCommandMQ == nullptr) return "Command MQ is null";
73 if (mReplyMQ == nullptr) return "Reply MQ is null";
74 if (mDataMQ == nullptr) return "Data MQ is null";
75 if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
76 return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
77 }
78 mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
79 mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
80 if (mDataBuffer == nullptr) {
81 return "Failed to allocate data buffer for element count " +
82 std::to_string(mDataMQ->getQuantumCount()) +
83 ", size in bytes: " + std::to_string(mDataBufferSize);
84 }
85 return "";
86}
87
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +000088void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
89 bool isConnected) const {
90 if (isConnected) {
91 reply->status = STATUS_OK;
92 reply->observable.frames = mFrameCount;
93 reply->observable.timeNs = ::android::elapsedRealtimeNano();
94 } else {
95 reply->status = STATUS_NO_INIT;
96 }
97}
98
Mikhail Naganovbd483c02022-11-17 20:33:39 +000099void StreamWorkerCommonLogic::populateReplyWrongState(
100 StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const {
101 LOG(WARNING) << "command '" << toString(command.getTag())
102 << "' can not be handled in the state " << toString(mState);
103 reply->status = STATUS_INVALID_OPERATION;
104}
105
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000106const std::string StreamInWorkerLogic::kThreadName = "reader";
107
108StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000109 // Note: for input streams, draining is driven by the client, thus
110 // "empty buffer" condition can only happen while handling the 'burst'
111 // command. Thus, unlike for output streams, it does not make sense to
112 // delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
113 // TODO: Add a delay for transitions of async operations when/if they added.
114
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000115 StreamDescriptor::Command command{};
116 if (!mCommandMQ->readBlocking(&command, 1)) {
117 LOG(ERROR) << __func__ << ": reading of command from MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000118 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000119 return Status::ABORT;
120 }
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000121 LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000122 StreamDescriptor::Reply reply{};
Mikhail Naganov98334432022-11-09 02:44:32 +0000123 reply.status = STATUS_BAD_VALUE;
124 using Tag = StreamDescriptor::Command::Tag;
125 switch (command.getTag()) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000126 case Tag::halReservedExit:
127 if (const int32_t cookie = command.get<Tag::halReservedExit>();
Mikhail Naganov98334432022-11-09 02:44:32 +0000128 cookie == mInternalCommandCookie) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000129 setClosed();
130 // This is an internal command, no need to reply.
131 return Status::EXIT;
132 } else {
133 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000134 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000135 break;
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000136 case Tag::getStatus:
137 populateReply(&reply, mIsConnected);
138 break;
Mikhail Naganov98334432022-11-09 02:44:32 +0000139 case Tag::start:
Mikhail Naganov98334432022-11-09 02:44:32 +0000140 if (mState == StreamDescriptor::State::STANDBY ||
141 mState == StreamDescriptor::State::DRAINING) {
142 populateReply(&reply, mIsConnected);
143 mState = mState == StreamDescriptor::State::STANDBY
144 ? StreamDescriptor::State::IDLE
145 : StreamDescriptor::State::ACTIVE;
146 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000147 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000148 }
149 break;
150 case Tag::burst:
151 if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000152 LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
153 << fmqByteCount << " bytes";
Mikhail Naganov98334432022-11-09 02:44:32 +0000154 if (mState == StreamDescriptor::State::IDLE ||
155 mState == StreamDescriptor::State::ACTIVE ||
156 mState == StreamDescriptor::State::PAUSED ||
157 mState == StreamDescriptor::State::DRAINING) {
158 if (!read(fmqByteCount, &reply)) {
159 mState = StreamDescriptor::State::ERROR;
160 }
161 if (mState == StreamDescriptor::State::IDLE ||
162 mState == StreamDescriptor::State::PAUSED) {
163 mState = StreamDescriptor::State::ACTIVE;
164 } else if (mState == StreamDescriptor::State::DRAINING) {
165 // To simplify the reference code, we assume that the read operation
166 // has consumed all the data remaining in the hardware buffer.
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000167 // In a real implementation, here we would either remain in
168 // the 'DRAINING' state, or transfer to 'STANDBY' depending on the
169 // buffer state.
Mikhail Naganov98334432022-11-09 02:44:32 +0000170 mState = StreamDescriptor::State::STANDBY;
171 }
172 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000173 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000174 }
175 } else {
176 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
177 }
178 break;
179 case Tag::drain:
Mikhail Naganov98334432022-11-09 02:44:32 +0000180 if (mState == StreamDescriptor::State::ACTIVE) {
181 usleep(1000); // Simulate a blocking call into the driver.
182 populateReply(&reply, mIsConnected);
183 // Can switch the state to ERROR if a driver error occurs.
184 mState = StreamDescriptor::State::DRAINING;
185 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000186 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000187 }
188 break;
189 case Tag::standby:
Mikhail Naganov98334432022-11-09 02:44:32 +0000190 if (mState == StreamDescriptor::State::IDLE) {
191 usleep(1000); // Simulate a blocking call into the driver.
192 populateReply(&reply, mIsConnected);
193 // Can switch the state to ERROR if a driver error occurs.
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000194 mState = StreamDescriptor::State::STANDBY;
Mikhail Naganov98334432022-11-09 02:44:32 +0000195 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000196 populateReplyWrongState(&reply, command);
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000197 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000198 break;
199 case Tag::pause:
Mikhail Naganov98334432022-11-09 02:44:32 +0000200 if (mState == StreamDescriptor::State::ACTIVE) {
201 usleep(1000); // Simulate a blocking call into the driver.
202 populateReply(&reply, mIsConnected);
203 // Can switch the state to ERROR if a driver error occurs.
204 mState = StreamDescriptor::State::PAUSED;
205 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000206 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000207 }
208 break;
209 case Tag::flush:
Mikhail Naganov98334432022-11-09 02:44:32 +0000210 if (mState == StreamDescriptor::State::PAUSED) {
211 usleep(1000); // Simulate a blocking call into the driver.
212 populateReply(&reply, mIsConnected);
213 // Can switch the state to ERROR if a driver error occurs.
214 mState = StreamDescriptor::State::STANDBY;
215 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000216 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000217 }
218 break;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000219 }
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000220 reply.state = mState;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000221 LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
222 if (!mReplyMQ->writeBlocking(&reply, 1)) {
223 LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000224 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000225 return Status::ABORT;
226 }
227 return Status::CONTINUE;
228}
229
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000230bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
231 // Can switch the state to ERROR if a driver error occurs.
232 const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
233 const bool isConnected = mIsConnected;
234 bool fatal = false;
235 // Simulate reading of data, or provide zeroes if the stream is not connected.
236 for (size_t i = 0; i < byteCount; ++i) {
237 using buffer_type = decltype(mDataBuffer)::element_type;
238 constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
239 std::numeric_limits<buffer_type>::min() + 1;
240 mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
241 std::numeric_limits<buffer_type>::min()
242 : 0;
243 }
244 usleep(3000); // Simulate a blocking call into the driver.
245 // Set 'fatal = true' if a driver error occurs.
246 if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
247 LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
248 << " succeeded; connected? " << isConnected;
249 // Frames are provided and counted regardless of connection status.
250 reply->fmqByteCount += byteCount;
251 mFrameCount += byteCount / mFrameSize;
252 populateReply(reply, isConnected);
253 } else {
254 LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
255 reply->status = STATUS_NOT_ENOUGH_DATA;
256 }
257 reply->latencyMs = Module::kLatencyMs;
258 return !fatal;
259}
260
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000261const std::string StreamOutWorkerLogic::kThreadName = "writer";
262
263StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000264 if (mState == StreamDescriptor::State::DRAINING) {
265 if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
266 std::chrono::steady_clock::now() - mTransientStateStart);
267 stateDurationMs >= mTransientStateDelayMs) {
268 mState = StreamDescriptor::State::IDLE;
269 if (mTransientStateDelayMs.count() != 0) {
270 LOG(DEBUG) << __func__ << ": switched to state " << toString(mState)
271 << " after a timeout";
272 }
273 }
274 }
275
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000276 StreamDescriptor::Command command{};
277 if (!mCommandMQ->readBlocking(&command, 1)) {
278 LOG(ERROR) << __func__ << ": reading of command from MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000279 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000280 return Status::ABORT;
281 }
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000282 LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000283 StreamDescriptor::Reply reply{};
Mikhail Naganov98334432022-11-09 02:44:32 +0000284 reply.status = STATUS_BAD_VALUE;
285 using Tag = StreamDescriptor::Command::Tag;
286 switch (command.getTag()) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000287 case Tag::halReservedExit:
288 if (const int32_t cookie = command.get<Tag::halReservedExit>();
Mikhail Naganov98334432022-11-09 02:44:32 +0000289 cookie == mInternalCommandCookie) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000290 setClosed();
291 // This is an internal command, no need to reply.
292 return Status::EXIT;
293 } else {
294 LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
295 }
296 break;
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000297 case Tag::getStatus:
298 populateReply(&reply, mIsConnected);
299 break;
Mikhail Naganov98334432022-11-09 02:44:32 +0000300 case Tag::start:
Mikhail Naganov98334432022-11-09 02:44:32 +0000301 switch (mState) {
302 case StreamDescriptor::State::STANDBY:
303 mState = StreamDescriptor::State::IDLE;
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000304 populateReply(&reply, mIsConnected);
Mikhail Naganov98334432022-11-09 02:44:32 +0000305 break;
306 case StreamDescriptor::State::PAUSED:
307 mState = StreamDescriptor::State::ACTIVE;
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000308 populateReply(&reply, mIsConnected);
Mikhail Naganov98334432022-11-09 02:44:32 +0000309 break;
310 case StreamDescriptor::State::DRAIN_PAUSED:
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000311 switchToTransientState(StreamDescriptor::State::DRAINING);
312 populateReply(&reply, mIsConnected);
Mikhail Naganov98334432022-11-09 02:44:32 +0000313 break;
314 default:
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000315 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000316 }
317 break;
318 case Tag::burst:
319 if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000320 LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
321 << fmqByteCount << " bytes";
Mikhail Naganov98334432022-11-09 02:44:32 +0000322 if (mState !=
323 StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states
324 if (!write(fmqByteCount, &reply)) {
325 mState = StreamDescriptor::State::ERROR;
326 }
327 if (mState == StreamDescriptor::State::STANDBY ||
328 mState == StreamDescriptor::State::DRAIN_PAUSED) {
329 mState = StreamDescriptor::State::PAUSED;
330 } else if (mState == StreamDescriptor::State::IDLE ||
331 mState == StreamDescriptor::State::DRAINING) {
332 mState = StreamDescriptor::State::ACTIVE;
333 } // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
334 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000335 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000336 }
337 } else {
338 LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
339 }
340 break;
341 case Tag::drain:
Mikhail Naganov98334432022-11-09 02:44:32 +0000342 if (mState == StreamDescriptor::State::ACTIVE) {
343 usleep(1000); // Simulate a blocking call into the driver.
344 populateReply(&reply, mIsConnected);
345 // Can switch the state to ERROR if a driver error occurs.
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000346 switchToTransientState(StreamDescriptor::State::DRAINING);
Mikhail Naganov98334432022-11-09 02:44:32 +0000347 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000348 populateReplyWrongState(&reply, command);
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000349 }
Mikhail Naganov98334432022-11-09 02:44:32 +0000350 break;
351 case Tag::standby:
Mikhail Naganov98334432022-11-09 02:44:32 +0000352 if (mState == StreamDescriptor::State::IDLE) {
353 usleep(1000); // Simulate a blocking call into the driver.
354 populateReply(&reply, mIsConnected);
355 // Can switch the state to ERROR if a driver error occurs.
356 mState = StreamDescriptor::State::STANDBY;
357 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000358 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000359 }
360 break;
361 case Tag::pause:
Mikhail Naganov98334432022-11-09 02:44:32 +0000362 if (mState == StreamDescriptor::State::ACTIVE ||
363 mState == StreamDescriptor::State::DRAINING) {
364 populateReply(&reply, mIsConnected);
365 mState = mState == StreamDescriptor::State::ACTIVE
366 ? StreamDescriptor::State::PAUSED
367 : StreamDescriptor::State::DRAIN_PAUSED;
368 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000369 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000370 }
371 break;
372 case Tag::flush:
Mikhail Naganov98334432022-11-09 02:44:32 +0000373 if (mState == StreamDescriptor::State::PAUSED ||
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000374 mState == StreamDescriptor::State::DRAIN_PAUSED) {
Mikhail Naganov98334432022-11-09 02:44:32 +0000375 populateReply(&reply, mIsConnected);
376 mState = StreamDescriptor::State::IDLE;
377 } else {
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000378 populateReplyWrongState(&reply, command);
Mikhail Naganov98334432022-11-09 02:44:32 +0000379 }
380 break;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000381 }
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000382 reply.state = mState;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000383 LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
384 if (!mReplyMQ->writeBlocking(&reply, 1)) {
385 LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000386 mState = StreamDescriptor::State::ERROR;
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000387 return Status::ABORT;
388 }
389 return Status::CONTINUE;
390}
391
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000392bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
393 const size_t readByteCount = mDataMQ->availableToRead();
394 // Amount of data that the HAL module is going to actually use.
395 const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
396 bool fatal = false;
397 if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
398 const bool isConnected = mIsConnected;
399 LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
400 << " succeeded; connected? " << isConnected;
401 // Frames are consumed and counted regardless of connection status.
402 reply->fmqByteCount += byteCount;
403 mFrameCount += byteCount / mFrameSize;
404 populateReply(reply, isConnected);
405 usleep(3000); // Simulate a blocking call into the driver.
406 // Set 'fatal = true' if a driver error occurs.
407 } else {
408 LOG(WARNING) << __func__ << ": reading of " << readByteCount
409 << " bytes of data from MQ failed";
410 reply->status = STATUS_NOT_ENOUGH_DATA;
411 }
412 reply->latencyMs = Module::kLatencyMs;
413 return !fatal;
414}
415
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000416template <class Metadata, class StreamWorker>
417StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000418 if (!isClosed()) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000419 LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
420 stopWorker();
421 // The worker and the context should clean up by themselves via destructors.
422 }
423}
424
425template <class Metadata, class StreamWorker>
426ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000427 LOG(DEBUG) << __func__;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000428 if (!isClosed()) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000429 stopWorker();
430 LOG(DEBUG) << __func__ << ": joining the worker thread...";
431 mWorker.stop();
432 LOG(DEBUG) << __func__ << ": worker thread joined";
433 mContext.reset();
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000434 mWorker.setClosed();
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000435 return ndk::ScopedAStatus::ok();
436 } else {
437 LOG(ERROR) << __func__ << ": stream was already closed";
438 return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
439 }
440}
441
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000442template <class Metadata, class StreamWorker>
443void StreamCommon<Metadata, StreamWorker>::stopWorker() {
444 if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000445 LOG(DEBUG) << __func__ << ": asking the worker to exit...";
Mikhail Naganovbd483c02022-11-17 20:33:39 +0000446 auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
447 mContext.getInternalCommandCookie());
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000448 // Note: never call 'pause' and 'resume' methods of StreamWorker
449 // in the HAL implementation. These methods are to be used by
450 // the client side only. Preventing the worker loop from running
451 // on the HAL side can cause a deadlock.
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000452 if (!commandMQ->writeBlocking(&cmd, 1)) {
453 LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
454 }
455 LOG(DEBUG) << __func__ << ": done";
456 }
457}
458
459template <class Metadata, class StreamWorker>
460ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000461 LOG(DEBUG) << __func__;
Mikhail Naganovcce8e5f2022-09-13 01:20:45 +0000462 if (!isClosed()) {
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000463 mMetadata = metadata;
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000464 return ndk::ScopedAStatus::ok();
465 }
466 LOG(ERROR) << __func__ << ": stream was closed";
467 return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
468}
469
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000470StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext context)
471 : StreamCommon<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)) {
472 LOG(DEBUG) << __func__;
473}
474
475StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext context,
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000476 const std::optional<AudioOffloadInfo>& offloadInfo)
Mikhail Naganov4f5d3f12022-07-22 23:23:25 +0000477 : StreamCommon<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
478 mOffloadInfo(offloadInfo) {
Mikhail Naganov6a4872d2022-06-15 21:39:04 +0000479 LOG(DEBUG) << __func__;
480}
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000481
Mikhail Naganovdf5adfd2021-11-11 22:09:22 +0000482} // namespace aidl::android::hardware::audio::core