audio: Generalize stream implementations

This allows for more code reuse and composability when
implementing streams for a particular audio "backend."

The existing "stub" code has been moved to StreamStub* files.

Bug: 264712385
Test: atest VtsHalAudioCoreTargetTest
Change-Id: I97fd41f87eb6d01e1d57f0d70a86d3b2b3555837
diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp
index 0520cba..25814e4 100644
--- a/audio/aidl/default/Stream.cpp
+++ b/audio/aidl/default/Stream.cpp
@@ -85,16 +85,19 @@
     if (mCommandMQ == nullptr) return "Command MQ is null";
     if (mReplyMQ == nullptr) return "Reply MQ is null";
     if (mDataMQ == nullptr) return "Data MQ is null";
-    if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
+    if (sizeof(DataBufferElement) != mDataMQ->getQuantumSize()) {
         return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
     }
     mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
-    mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
+    mDataBuffer.reset(new (std::nothrow) DataBufferElement[mDataBufferSize]);
     if (mDataBuffer == nullptr) {
         return "Failed to allocate data buffer for element count " +
                std::to_string(mDataMQ->getQuantumCount()) +
                ", size in bytes: " + std::to_string(mDataBufferSize);
     }
+    if (::android::status_t status = mDriver->init(); status != STATUS_OK) {
+        return "Failed to initialize the driver: " + std::to_string(status);
+    }
     return "";
 }
 
@@ -191,46 +194,59 @@
             }
             break;
         case Tag::drain:
-            if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
+            if (const auto mode = command.get<Tag::drain>();
+                mode == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
                 if (mState == StreamDescriptor::State::ACTIVE) {
-                    usleep(1000);  // Simulate a blocking call into the driver.
-                    populateReply(&reply, mIsConnected);
-                    // Can switch the state to ERROR if a driver error occurs.
-                    mState = StreamDescriptor::State::DRAINING;
+                    if (::android::status_t status = mDriver->drain(mode);
+                        status == ::android::OK) {
+                        populateReply(&reply, mIsConnected);
+                        mState = StreamDescriptor::State::DRAINING;
+                    } else {
+                        LOG(ERROR) << __func__ << ": drain failed: " << status;
+                        mState = StreamDescriptor::State::ERROR;
+                    }
                 } else {
                     populateReplyWrongState(&reply, command);
                 }
             } else {
-                LOG(WARNING) << __func__
-                             << ": invalid drain mode: " << toString(command.get<Tag::drain>());
+                LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode);
             }
             break;
         case Tag::standby:
             if (mState == StreamDescriptor::State::IDLE) {
-                usleep(1000);  // Simulate a blocking call into the driver.
-                populateReply(&reply, mIsConnected);
-                // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::STANDBY;
+                if (::android::status_t status = mDriver->standby(); status == ::android::OK) {
+                    populateReply(&reply, mIsConnected);
+                    mState = StreamDescriptor::State::STANDBY;
+                } else {
+                    LOG(ERROR) << __func__ << ": standby failed: " << status;
+                    mState = StreamDescriptor::State::ERROR;
+                }
             } else {
                 populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::pause:
             if (mState == StreamDescriptor::State::ACTIVE) {
-                usleep(1000);  // Simulate a blocking call into the driver.
-                populateReply(&reply, mIsConnected);
-                // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::PAUSED;
+                if (::android::status_t status = mDriver->pause(); status == ::android::OK) {
+                    populateReply(&reply, mIsConnected);
+                    mState = StreamDescriptor::State::PAUSED;
+                } else {
+                    LOG(ERROR) << __func__ << ": pause failed: " << status;
+                    mState = StreamDescriptor::State::ERROR;
+                }
             } else {
                 populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::flush:
             if (mState == StreamDescriptor::State::PAUSED) {
-                usleep(1000);  // Simulate a blocking call into the driver.
-                populateReply(&reply, mIsConnected);
-                // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::STANDBY;
+                if (::android::status_t status = mDriver->flush(); status == ::android::OK) {
+                    populateReply(&reply, mIsConnected);
+                    mState = StreamDescriptor::State::STANDBY;
+                } else {
+                    LOG(ERROR) << __func__ << ": flush failed: " << status;
+                    mState = StreamDescriptor::State::ERROR;
+                }
             } else {
                 populateReplyWrongState(&reply, command);
             }
@@ -247,33 +263,39 @@
 }
 
 bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
-    // Can switch the state to ERROR if a driver error occurs.
     const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
     const bool isConnected = mIsConnected;
+    size_t actualFrameCount = 0;
     bool fatal = false;
-    // Simulate reading of data, or provide zeroes if the stream is not connected.
-    for (size_t i = 0; i < byteCount; ++i) {
-        using buffer_type = decltype(mDataBuffer)::element_type;
-        constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
-                                          std::numeric_limits<buffer_type>::min() + 1;
-        mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
-                                               std::numeric_limits<buffer_type>::min()
-                                     : 0;
+    int32_t latency = Module::kLatencyMs;
+    if (isConnected) {
+        if (::android::status_t status = mDriver->transfer(
+                    mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency);
+            status != ::android::OK) {
+            fatal = true;
+            LOG(ERROR) << __func__ << ": read failed: " << status;
+        }
+    } else {
+        usleep(3000);  // Simulate blocking transfer delay.
+        for (size_t i = 0; i < byteCount; ++i) mDataBuffer[i] = 0;
+        actualFrameCount = byteCount / mFrameSize;
     }
-    usleep(3000);  // Simulate a blocking call into the driver.
-    // Set 'fatal = true' if a driver error occurs.
-    if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
-        LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
+    const size_t actualByteCount = actualFrameCount * mFrameSize;
+    if (bool success =
+                actualByteCount > 0 ? mDataMQ->write(&mDataBuffer[0], actualByteCount) : true;
+        success) {
+        LOG(DEBUG) << __func__ << ": writing of " << actualByteCount << " bytes into data MQ"
                    << " succeeded; connected? " << isConnected;
         // Frames are provided and counted regardless of connection status.
-        reply->fmqByteCount += byteCount;
-        mFrameCount += byteCount / mFrameSize;
+        reply->fmqByteCount += actualByteCount;
+        mFrameCount += actualFrameCount;
         populateReply(reply, isConnected);
     } else {
-        LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
+        LOG(WARNING) << __func__ << ": writing of " << actualByteCount
+                     << " bytes of data to MQ failed";
         reply->status = STATUS_NOT_ENOUGH_DATA;
     }
-    reply->latencyMs = Module::kLatencyMs;
+    reply->latencyMs = latency;
     return !fatal;
 }
 
@@ -395,17 +417,22 @@
             }
             break;
         case Tag::drain:
-            if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_ALL ||
-                command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
+            if (const auto mode = command.get<Tag::drain>();
+                mode == StreamDescriptor::DrainMode::DRAIN_ALL ||
+                mode == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
                 if (mState == StreamDescriptor::State::ACTIVE ||
                     mState == StreamDescriptor::State::TRANSFERRING) {
-                    usleep(1000);  // Simulate a blocking call into the driver.
-                    populateReply(&reply, mIsConnected);
-                    // Can switch the state to ERROR if a driver error occurs.
-                    if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) {
-                        mState = StreamDescriptor::State::IDLE;
+                    if (::android::status_t status = mDriver->drain(mode);
+                        status == ::android::OK) {
+                        populateReply(&reply, mIsConnected);
+                        if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) {
+                            mState = StreamDescriptor::State::IDLE;
+                        } else {
+                            switchToTransientState(StreamDescriptor::State::DRAINING);
+                        }
                     } else {
-                        switchToTransientState(StreamDescriptor::State::DRAINING);
+                        LOG(ERROR) << __func__ << ": drain failed: " << status;
+                        mState = StreamDescriptor::State::ERROR;
                     }
                 } else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) {
                     mState = StreamDescriptor::State::DRAIN_PAUSED;
@@ -414,46 +441,58 @@
                     populateReplyWrongState(&reply, command);
                 }
             } else {
-                LOG(WARNING) << __func__
-                             << ": invalid drain mode: " << toString(command.get<Tag::drain>());
+                LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode);
             }
             break;
         case Tag::standby:
             if (mState == StreamDescriptor::State::IDLE) {
-                usleep(1000);  // Simulate a blocking call into the driver.
-                populateReply(&reply, mIsConnected);
-                // Can switch the state to ERROR if a driver error occurs.
-                mState = StreamDescriptor::State::STANDBY;
+                if (::android::status_t status = mDriver->standby(); status == ::android::OK) {
+                    populateReply(&reply, mIsConnected);
+                    mState = StreamDescriptor::State::STANDBY;
+                } else {
+                    LOG(ERROR) << __func__ << ": standby failed: " << status;
+                    mState = StreamDescriptor::State::ERROR;
+                }
             } else {
                 populateReplyWrongState(&reply, command);
             }
             break;
         case Tag::pause: {
-            bool commandAccepted = true;
+            std::optional<StreamDescriptor::State> nextState;
             switch (mState) {
                 case StreamDescriptor::State::ACTIVE:
-                    mState = StreamDescriptor::State::PAUSED;
+                    nextState = StreamDescriptor::State::PAUSED;
                     break;
                 case StreamDescriptor::State::DRAINING:
-                    mState = StreamDescriptor::State::DRAIN_PAUSED;
+                    nextState = StreamDescriptor::State::DRAIN_PAUSED;
                     break;
                 case StreamDescriptor::State::TRANSFERRING:
-                    mState = StreamDescriptor::State::TRANSFER_PAUSED;
+                    nextState = StreamDescriptor::State::TRANSFER_PAUSED;
                     break;
                 default:
                     populateReplyWrongState(&reply, command);
-                    commandAccepted = false;
             }
-            if (commandAccepted) {
-                populateReply(&reply, mIsConnected);
+            if (nextState.has_value()) {
+                if (::android::status_t status = mDriver->pause(); status == ::android::OK) {
+                    populateReply(&reply, mIsConnected);
+                    mState = nextState.value();
+                } else {
+                    LOG(ERROR) << __func__ << ": pause failed: " << status;
+                    mState = StreamDescriptor::State::ERROR;
+                }
             }
         } break;
         case Tag::flush:
             if (mState == StreamDescriptor::State::PAUSED ||
                 mState == StreamDescriptor::State::DRAIN_PAUSED ||
                 mState == StreamDescriptor::State::TRANSFER_PAUSED) {
-                populateReply(&reply, mIsConnected);
-                mState = StreamDescriptor::State::IDLE;
+                if (::android::status_t status = mDriver->flush(); status == ::android::OK) {
+                    populateReply(&reply, mIsConnected);
+                    mState = StreamDescriptor::State::IDLE;
+                } else {
+                    LOG(ERROR) << __func__ << ": flush failed: " << status;
+                    mState = StreamDescriptor::State::ERROR;
+                }
             } else {
                 populateReplyWrongState(&reply, command);
             }
@@ -472,6 +511,7 @@
 bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
     const size_t readByteCount = mDataMQ->availableToRead();
     bool fatal = false;
+    int32_t latency = Module::kLatencyMs;
     if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
         const bool isConnected = mIsConnected;
         LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
@@ -483,23 +523,36 @@
             // simulate partial write.
             byteCount -= mFrameSize;
         }
+        size_t actualFrameCount = 0;
+        if (isConnected) {
+            if (::android::status_t status = mDriver->transfer(
+                        mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency);
+                status != ::android::OK) {
+                fatal = true;
+                LOG(ERROR) << __func__ << ": write failed: " << status;
+            }
+        } else {
+            if (mAsyncCallback == nullptr) {
+                usleep(3000);  // Simulate blocking transfer delay.
+            }
+            actualFrameCount = byteCount / mFrameSize;
+        }
+        const size_t actualByteCount = actualFrameCount * mFrameSize;
         // Frames are consumed and counted regardless of the connection status.
-        reply->fmqByteCount += byteCount;
-        mFrameCount += byteCount / mFrameSize;
+        reply->fmqByteCount += actualByteCount;
+        mFrameCount += actualFrameCount;
         populateReply(reply, isConnected);
-        usleep(3000);  // Simulate a blocking call into the driver.
-        // Set 'fatal = true' if a driver error occurs.
     } else {
         LOG(WARNING) << __func__ << ": reading of " << readByteCount
                      << " bytes of data from MQ failed";
         reply->status = STATUS_NOT_ENOUGH_DATA;
     }
-    reply->latencyMs = Module::kLatencyMs;
+    reply->latencyMs = latency;
     return !fatal;
 }
 
-template <class Metadata, class StreamWorker>
-StreamCommonImpl<Metadata, StreamWorker>::~StreamCommonImpl() {
+template <class Metadata>
+StreamCommonImpl<Metadata>::~StreamCommonImpl() {
     if (!isClosed()) {
         LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
         stopWorker();
@@ -507,8 +560,8 @@
     }
 }
 
-template <class Metadata, class StreamWorker>
-void StreamCommonImpl<Metadata, StreamWorker>::createStreamCommon(
+template <class Metadata>
+void StreamCommonImpl<Metadata>::createStreamCommon(
         const std::shared_ptr<StreamCommonInterface>& delegate) {
     if (mCommon != nullptr) {
         LOG(FATAL) << __func__ << ": attempting to create the common interface twice";
@@ -518,8 +571,8 @@
     AIBinder_setMinSchedulerPolicy(mCommonBinder.get(), SCHED_NORMAL, ANDROID_PRIORITY_AUDIO);
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::getStreamCommon(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::getStreamCommon(
         std::shared_ptr<IStreamCommon>* _aidl_return) {
     if (mCommon == nullptr) {
         LOG(FATAL) << __func__ << ": the common interface was not created";
@@ -529,31 +582,30 @@
     return ndk::ScopedAStatus::ok();
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::updateHwAvSyncId(
-        int32_t in_hwAvSyncId) {
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::updateHwAvSyncId(int32_t in_hwAvSyncId) {
     LOG(DEBUG) << __func__ << ": id " << in_hwAvSyncId;
     return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::getVendorParameters(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::getVendorParameters(
         const std::vector<std::string>& in_ids, std::vector<VendorParameter>* _aidl_return) {
     LOG(DEBUG) << __func__ << ": id count: " << in_ids.size();
     (void)_aidl_return;
     return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::setVendorParameters(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::setVendorParameters(
         const std::vector<VendorParameter>& in_parameters, bool in_async) {
     LOG(DEBUG) << __func__ << ": parameters count " << in_parameters.size()
                << ", async: " << in_async;
     return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::addEffect(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::addEffect(
         const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) {
     if (in_effect == nullptr) {
         LOG(DEBUG) << __func__ << ": null effect";
@@ -563,8 +615,8 @@
     return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::removeEffect(
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::removeEffect(
         const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) {
     if (in_effect == nullptr) {
         LOG(DEBUG) << __func__ << ": null effect";
@@ -574,16 +626,16 @@
     return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION);
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::close() {
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::close() {
     LOG(DEBUG) << __func__;
     if (!isClosed()) {
         stopWorker();
         LOG(DEBUG) << __func__ << ": joining the worker thread...";
-        mWorker.stop();
+        mWorker->stop();
         LOG(DEBUG) << __func__ << ": worker thread joined";
         mContext.reset();
-        mWorker.setClosed();
+        mWorker->setClosed();
         return ndk::ScopedAStatus::ok();
     } else {
         LOG(ERROR) << __func__ << ": stream was already closed";
@@ -591,8 +643,8 @@
     }
 }
 
-template <class Metadata, class StreamWorker>
-void StreamCommonImpl<Metadata, StreamWorker>::stopWorker() {
+template <class Metadata>
+void StreamCommonImpl<Metadata>::stopWorker() {
     if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
         LOG(DEBUG) << __func__ << ": asking the worker to exit...";
         auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
@@ -608,9 +660,8 @@
     }
 }
 
-template <class Metadata, class StreamWorker>
-ndk::ScopedAStatus StreamCommonImpl<Metadata, StreamWorker>::updateMetadata(
-        const Metadata& metadata) {
+template <class Metadata>
+ndk::ScopedAStatus StreamCommonImpl<Metadata>::updateMetadata(const Metadata& metadata) {
     LOG(DEBUG) << __func__;
     if (!isClosed()) {
         mMetadata = metadata;
@@ -621,16 +672,11 @@
 }
 
 // static
-ndk::ScopedAStatus StreamIn::createInstance(const common::SinkMetadata& sinkMetadata,
-                                            StreamContext context,
-                                            const std::vector<MicrophoneInfo>& microphones,
-                                            std::shared_ptr<StreamIn>* result) {
-    auto stream = ndk::SharedRefBase::make<StreamIn>(sinkMetadata, std::move(context), microphones);
+ndk::ScopedAStatus StreamIn::initInstance(const std::shared_ptr<StreamIn>& stream) {
     if (auto status = stream->init(); !status.isOk()) {
         return status;
     }
     stream->createStreamCommon(stream);
-    *result = std::move(stream);
     return ndk::ScopedAStatus::ok();
 }
 
@@ -645,8 +691,10 @@
 }  // namespace
 
 StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext&& context,
+                   const DriverInterface::CreateInstance& createDriver,
+                   const StreamWorkerInterface::CreateInstance& createWorker,
                    const std::vector<MicrophoneInfo>& microphones)
-    : StreamCommonImpl<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)),
+    : StreamCommonImpl<SinkMetadata>(sinkMetadata, std::move(context), createDriver, createWorker),
       mMicrophones(transformMicrophones(microphones)) {
     LOG(DEBUG) << __func__;
 }
@@ -704,23 +752,20 @@
 }
 
 // static
-ndk::ScopedAStatus StreamOut::createInstance(const SourceMetadata& sourceMetadata,
-                                             StreamContext context,
-                                             const std::optional<AudioOffloadInfo>& offloadInfo,
-                                             std::shared_ptr<StreamOut>* result) {
-    auto stream =
-            ndk::SharedRefBase::make<StreamOut>(sourceMetadata, std::move(context), offloadInfo);
+ndk::ScopedAStatus StreamOut::initInstance(const std::shared_ptr<StreamOut>& stream) {
     if (auto status = stream->init(); !status.isOk()) {
         return status;
     }
     stream->createStreamCommon(stream);
-    *result = std::move(stream);
     return ndk::ScopedAStatus::ok();
 }
 
 StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext&& context,
+                     const DriverInterface::CreateInstance& createDriver,
+                     const StreamWorkerInterface::CreateInstance& createWorker,
                      const std::optional<AudioOffloadInfo>& offloadInfo)
-    : StreamCommonImpl<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
+    : StreamCommonImpl<SourceMetadata>(sourceMetadata, std::move(context), createDriver,
+                                       createWorker),
       mOffloadInfo(offloadInfo) {
     LOG(DEBUG) << __func__;
 }