Relocate NN burst utility to ExecutionBurstUtils

This CL relocates serialize, deserialize, RequestChannelSender,
RequestChannelReceiver, ResultChannelSender, and ResultChannelReceiver
to ExecutionBurstUtils.

Bug: 177267324
Test: mma
Change-Id: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
Merged-In: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
(cherry picked from commit 297108360f2f97ec09d261bb10c5af0fa41e827a)
diff --git a/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp b/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp
index 848c77b..022548d 100644
--- a/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp
+++ b/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp
@@ -29,21 +29,13 @@
 #include <utility>
 #include <vector>
 
+#include "ExecutionBurstUtils.h"
 #include "HalInterfaces.h"
 #include "Tracing.h"
 
 namespace android::nn {
 namespace {
 
-using hardware::MQDescriptorSync;
-using V1_2::FmqRequestDatum;
-using V1_2::FmqResultDatum;
-using V1_2::IBurstCallback;
-using V1_2::IBurstContext;
-
-constexpr V1_2::Timing kNoTiming = {std::numeric_limits<uint64_t>::max(),
-                                    std::numeric_limits<uint64_t>::max()};
-
 // DefaultBurstExecutorWithCache adapts an IPreparedModel so that it can be
 // used as an IBurstExecutorWithCache. Specifically, the cache simply stores the
 // hidl_memory object, and the execution forwards calls to the provided
@@ -108,384 +100,6 @@
 
 }  // anonymous namespace
 
-// serialize result
-std::vector<FmqResultDatum> serialize(V1_0::ErrorStatus errorStatus,
-                                      const std::vector<V1_2::OutputShape>& outputShapes,
-                                      V1_2::Timing timing) {
-    // count how many elements need to be sent for a request
-    size_t count = 2 + outputShapes.size();
-    for (const auto& outputShape : outputShapes) {
-        count += outputShape.dimensions.size();
-    }
-
-    // create buffer to temporarily store elements
-    std::vector<FmqResultDatum> data;
-    data.reserve(count);
-
-    // package packetInfo
-    {
-        FmqResultDatum datum;
-        datum.packetInformation({/*.packetSize=*/static_cast<uint32_t>(count),
-                                 /*.errorStatus=*/errorStatus,
-                                 /*.numberOfOperands=*/static_cast<uint32_t>(outputShapes.size())});
-        data.push_back(datum);
-    }
-
-    // package output shape data
-    for (const auto& operand : outputShapes) {
-        // package operand information
-        FmqResultDatum::OperandInformation info{};
-        info.isSufficient = operand.isSufficient;
-        info.numberOfDimensions = static_cast<uint32_t>(operand.dimensions.size());
-
-        FmqResultDatum datum;
-        datum.operandInformation(info);
-        data.push_back(datum);
-
-        // package operand dimensions
-        for (uint32_t dimension : operand.dimensions) {
-            FmqResultDatum datum;
-            datum.operandDimensionValue(dimension);
-            data.push_back(datum);
-        }
-    }
-
-    // package executionTiming
-    {
-        FmqResultDatum datum;
-        datum.executionTiming(timing);
-        data.push_back(datum);
-    }
-
-    // return result
-    return data;
-}
-
-// deserialize request
-std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>> deserialize(
-        const std::vector<FmqRequestDatum>& data) {
-    using discriminator = FmqRequestDatum::hidl_discriminator;
-
-    size_t index = 0;
-
-    // validate packet information
-    if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) {
-        LOG(ERROR) << "FMQ Request packet ill-formed";
-        return std::nullopt;
-    }
-
-    // unpackage packet information
-    const FmqRequestDatum::PacketInformation& packetInfo = data[index].packetInformation();
-    index++;
-    const uint32_t packetSize = packetInfo.packetSize;
-    const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands;
-    const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands;
-    const uint32_t numberOfPools = packetInfo.numberOfPools;
-
-    // verify packet size
-    if (data.size() != packetSize) {
-        LOG(ERROR) << "FMQ Request packet ill-formed";
-        return std::nullopt;
-    }
-
-    // unpackage input operands
-    std::vector<V1_0::RequestArgument> inputs;
-    inputs.reserve(numberOfInputOperands);
-    for (size_t operand = 0; operand < numberOfInputOperands; ++operand) {
-        // validate input operand information
-        if (data[index].getDiscriminator() != discriminator::inputOperandInformation) {
-            LOG(ERROR) << "FMQ Request packet ill-formed";
-            return std::nullopt;
-        }
-
-        // unpackage operand information
-        const FmqRequestDatum::OperandInformation& operandInfo =
-                data[index].inputOperandInformation();
-        index++;
-        const bool hasNoValue = operandInfo.hasNoValue;
-        const V1_0::DataLocation location = operandInfo.location;
-        const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
-
-        // unpackage operand dimensions
-        std::vector<uint32_t> dimensions;
-        dimensions.reserve(numberOfDimensions);
-        for (size_t i = 0; i < numberOfDimensions; ++i) {
-            // validate dimension
-            if (data[index].getDiscriminator() != discriminator::inputOperandDimensionValue) {
-                LOG(ERROR) << "FMQ Request packet ill-formed";
-                return std::nullopt;
-            }
-
-            // unpackage dimension
-            const uint32_t dimension = data[index].inputOperandDimensionValue();
-            index++;
-
-            // store result
-            dimensions.push_back(dimension);
-        }
-
-        // store result
-        inputs.push_back(
-                {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
-    }
-
-    // unpackage output operands
-    std::vector<V1_0::RequestArgument> outputs;
-    outputs.reserve(numberOfOutputOperands);
-    for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) {
-        // validate output operand information
-        if (data[index].getDiscriminator() != discriminator::outputOperandInformation) {
-            LOG(ERROR) << "FMQ Request packet ill-formed";
-            return std::nullopt;
-        }
-
-        // unpackage operand information
-        const FmqRequestDatum::OperandInformation& operandInfo =
-                data[index].outputOperandInformation();
-        index++;
-        const bool hasNoValue = operandInfo.hasNoValue;
-        const V1_0::DataLocation location = operandInfo.location;
-        const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
-
-        // unpackage operand dimensions
-        std::vector<uint32_t> dimensions;
-        dimensions.reserve(numberOfDimensions);
-        for (size_t i = 0; i < numberOfDimensions; ++i) {
-            // validate dimension
-            if (data[index].getDiscriminator() != discriminator::outputOperandDimensionValue) {
-                LOG(ERROR) << "FMQ Request packet ill-formed";
-                return std::nullopt;
-            }
-
-            // unpackage dimension
-            const uint32_t dimension = data[index].outputOperandDimensionValue();
-            index++;
-
-            // store result
-            dimensions.push_back(dimension);
-        }
-
-        // store result
-        outputs.push_back(
-                {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
-    }
-
-    // unpackage pools
-    std::vector<int32_t> slots;
-    slots.reserve(numberOfPools);
-    for (size_t pool = 0; pool < numberOfPools; ++pool) {
-        // validate input operand information
-        if (data[index].getDiscriminator() != discriminator::poolIdentifier) {
-            LOG(ERROR) << "FMQ Request packet ill-formed";
-            return std::nullopt;
-        }
-
-        // unpackage operand information
-        const int32_t poolId = data[index].poolIdentifier();
-        index++;
-
-        // store result
-        slots.push_back(poolId);
-    }
-
-    // validate measureTiming
-    if (data[index].getDiscriminator() != discriminator::measureTiming) {
-        LOG(ERROR) << "FMQ Request packet ill-formed";
-        return std::nullopt;
-    }
-
-    // unpackage measureTiming
-    const V1_2::MeasureTiming measure = data[index].measureTiming();
-    index++;
-
-    // validate packet information
-    if (index != packetSize) {
-        LOG(ERROR) << "FMQ Result packet ill-formed";
-        return std::nullopt;
-    }
-
-    // return request
-    V1_0::Request request = {/*.inputs=*/inputs, /*.outputs=*/outputs, /*.pools=*/{}};
-    return std::make_tuple(std::move(request), std::move(slots), measure);
-}
-
-// RequestChannelReceiver methods
-
-std::unique_ptr<RequestChannelReceiver> RequestChannelReceiver::create(
-        const FmqRequestDescriptor& requestChannel, std::chrono::microseconds pollingTimeWindow) {
-    std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
-            std::make_unique<FmqRequestChannel>(requestChannel);
-
-    if (!fmqRequestChannel->isValid()) {
-        LOG(ERROR) << "Unable to create RequestChannelReceiver";
-        return nullptr;
-    }
-    if (fmqRequestChannel->getEventFlagWord() == nullptr) {
-        LOG(ERROR)
-                << "RequestChannelReceiver::create was passed an MQDescriptor without an EventFlag";
-        return nullptr;
-    }
-
-    return std::make_unique<RequestChannelReceiver>(std::move(fmqRequestChannel),
-                                                    pollingTimeWindow);
-}
-
-RequestChannelReceiver::RequestChannelReceiver(std::unique_ptr<FmqRequestChannel> fmqRequestChannel,
-                                               std::chrono::microseconds pollingTimeWindow)
-    : mFmqRequestChannel(std::move(fmqRequestChannel)), kPollingTimeWindow(pollingTimeWindow) {}
-
-std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>>
-RequestChannelReceiver::getBlocking() {
-    const auto packet = getPacketBlocking();
-    if (!packet) {
-        return std::nullopt;
-    }
-
-    return deserialize(*packet);
-}
-
-void RequestChannelReceiver::invalidate() {
-    mTeardown = true;
-
-    // force unblock
-    // ExecutionBurstServer is by default waiting on a request packet. If the
-    // client process destroys its burst object, the server may still be waiting
-    // on the futex. This force unblock wakes up any thread waiting on the
-    // futex.
-    // TODO: look for a different/better way to signal/notify the futex to wake
-    // up any thread waiting on it
-    FmqRequestDatum datum;
-    datum.packetInformation({/*.packetSize=*/0, /*.numberOfInputOperands=*/0,
-                             /*.numberOfOutputOperands=*/0, /*.numberOfPools=*/0});
-    mFmqRequestChannel->writeBlocking(&datum, 1);
-}
-
-std::optional<std::vector<FmqRequestDatum>> RequestChannelReceiver::getPacketBlocking() {
-    if (mTeardown) {
-        return std::nullopt;
-    }
-
-    // First spend time polling if results are available in FMQ instead of
-    // waiting on the futex. Polling is more responsive (yielding lower
-    // latencies), but can take up more power, so only poll for a limited period
-    // of time.
-
-    auto& getCurrentTime = std::chrono::high_resolution_clock::now;
-    const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
-
-    while (getCurrentTime() < timeToStopPolling) {
-        // if class is being torn down, immediately return
-        if (mTeardown.load(std::memory_order_relaxed)) {
-            return std::nullopt;
-        }
-
-        // Check if data is available. If it is, immediately retrieve it and
-        // return.
-        const size_t available = mFmqRequestChannel->availableToRead();
-        if (available > 0) {
-            // This is the first point when we know an execution is occurring,
-            // so begin to collect systraces. Note that a similar systrace does
-            // not exist at the corresponding point in
-            // ResultChannelReceiver::getPacketBlocking because the execution is
-            // already in flight.
-            NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
-                         "ExecutionBurstServer getting packet");
-            std::vector<FmqRequestDatum> packet(available);
-            const bool success = mFmqRequestChannel->read(packet.data(), available);
-            if (!success) {
-                LOG(ERROR) << "Error receiving packet";
-                return std::nullopt;
-            }
-            return std::make_optional(std::move(packet));
-        }
-    }
-
-    // If we get to this point, we either stopped polling because it was taking
-    // too long or polling was not allowed. Instead, perform a blocking call
-    // which uses a futex to save power.
-
-    // wait for request packet and read first element of request packet
-    FmqRequestDatum datum;
-    bool success = mFmqRequestChannel->readBlocking(&datum, 1);
-
-    // This is the first point when we know an execution is occurring, so begin
-    // to collect systraces. Note that a similar systrace does not exist at the
-    // corresponding point in ResultChannelReceiver::getPacketBlocking because
-    // the execution is already in flight.
-    NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, "ExecutionBurstServer getting packet");
-
-    // retrieve remaining elements
-    // NOTE: all of the data is already available at this point, so there's no
-    // need to do a blocking wait to wait for more data. This is known because
-    // in FMQ, all writes are published (made available) atomically. Currently,
-    // the producer always publishes the entire packet in one function call, so
-    // if the first element of the packet is available, the remaining elements
-    // are also available.
-    const size_t count = mFmqRequestChannel->availableToRead();
-    std::vector<FmqRequestDatum> packet(count + 1);
-    std::memcpy(&packet.front(), &datum, sizeof(datum));
-    success &= mFmqRequestChannel->read(packet.data() + 1, count);
-
-    // terminate loop
-    if (mTeardown) {
-        return std::nullopt;
-    }
-
-    // ensure packet was successfully received
-    if (!success) {
-        LOG(ERROR) << "Error receiving packet";
-        return std::nullopt;
-    }
-
-    return std::make_optional(std::move(packet));
-}
-
-// ResultChannelSender methods
-
-std::unique_ptr<ResultChannelSender> ResultChannelSender::create(
-        const FmqResultDescriptor& resultChannel) {
-    std::unique_ptr<FmqResultChannel> fmqResultChannel =
-            std::make_unique<FmqResultChannel>(resultChannel);
-
-    if (!fmqResultChannel->isValid()) {
-        LOG(ERROR) << "Unable to create RequestChannelSender";
-        return nullptr;
-    }
-    if (fmqResultChannel->getEventFlagWord() == nullptr) {
-        LOG(ERROR) << "ResultChannelSender::create was passed an MQDescriptor without an EventFlag";
-        return nullptr;
-    }
-
-    return std::make_unique<ResultChannelSender>(std::move(fmqResultChannel));
-}
-
-ResultChannelSender::ResultChannelSender(std::unique_ptr<FmqResultChannel> fmqResultChannel)
-    : mFmqResultChannel(std::move(fmqResultChannel)) {}
-
-bool ResultChannelSender::send(V1_0::ErrorStatus errorStatus,
-                               const std::vector<V1_2::OutputShape>& outputShapes,
-                               V1_2::Timing timing) {
-    const std::vector<FmqResultDatum> serialized = serialize(errorStatus, outputShapes, timing);
-    return sendPacket(serialized);
-}
-
-bool ResultChannelSender::sendPacket(const std::vector<FmqResultDatum>& packet) {
-    if (packet.size() > mFmqResultChannel->availableToWrite()) {
-        LOG(ERROR)
-                << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ";
-        const std::vector<FmqResultDatum> errorPacket =
-                serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
-
-        // Always send the packet with "blocking" because this signals the futex
-        // and unblocks the consumer if it is waiting on the futex.
-        return mFmqResultChannel->writeBlocking(errorPacket.data(), errorPacket.size());
-    }
-
-    // Always send the packet with "blocking" because this signals the futex and
-    // unblocks the consumer if it is waiting on the futex.
-    return mFmqResultChannel->writeBlocking(packet.data(), packet.size());
-}
-
 // ExecutionBurstServer methods
 
 sp<ExecutionBurstServer> ExecutionBurstServer::create(