Relocate NN burst utility to ExecutionBurstUtils

This CL relocates serialize, deserialize, RequestChannelSender,
RequestChannelReceiver, ResultChannelSender, and ResultChannelReceiver
to ExecutionBurstUtils.

Bug: 177267324
Test: mma
Change-Id: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
Merged-In: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
(cherry picked from commit 297108360f2f97ec09d261bb10c5af0fa41e827a)
diff --git a/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp b/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp
index 212863e..2265861 100644
--- a/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp
+++ b/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp
@@ -29,6 +29,7 @@
 #include <utility>
 #include <vector>
 
+#include "ExecutionBurstUtils.h"
 #include "HalInterfaces.h"
 #include "Tracing.h"
 #include "Utils.h"
@@ -36,16 +37,6 @@
 namespace android::nn {
 namespace {
 
-using V1_2::FmqRequestDatum;
-using V1_2::FmqResultDatum;
-using V1_2::IBurstCallback;
-using V1_2::IBurstContext;
-using FmqRequestDescriptor = hardware::MQDescriptorSync<FmqRequestDatum>;
-using FmqResultDescriptor = hardware::MQDescriptorSync<FmqResultDatum>;
-
-constexpr V1_2::Timing kNoTiming12 = {std::numeric_limits<uint64_t>::max(),
-                                      std::numeric_limits<uint64_t>::max()};
-
 class BurstContextDeathHandler : public hardware::hidl_death_recipient {
   public:
     using Callback = std::function<void()>;
@@ -65,329 +56,6 @@
 
 }  // anonymous namespace
 
-// serialize a request into a packet
-std::vector<FmqRequestDatum> serialize(const V1_0::Request& request, V1_2::MeasureTiming measure,
-                                       const std::vector<int32_t>& slots) {
-    // count how many elements need to be sent for a request
-    size_t count = 2 + request.inputs.size() + request.outputs.size() + request.pools.size();
-    for (const auto& input : request.inputs) {
-        count += input.dimensions.size();
-    }
-    for (const auto& output : request.outputs) {
-        count += output.dimensions.size();
-    }
-
-    // create buffer to temporarily store elements
-    std::vector<FmqRequestDatum> data;
-    data.reserve(count);
-
-    // package packetInfo
-    {
-        FmqRequestDatum datum;
-        datum.packetInformation(
-                {/*.packetSize=*/static_cast<uint32_t>(count),
-                 /*.numberOfInputOperands=*/static_cast<uint32_t>(request.inputs.size()),
-                 /*.numberOfOutputOperands=*/static_cast<uint32_t>(request.outputs.size()),
-                 /*.numberOfPools=*/static_cast<uint32_t>(request.pools.size())});
-        data.push_back(datum);
-    }
-
-    // package input data
-    for (const auto& input : request.inputs) {
-        // package operand information
-        FmqRequestDatum datum;
-        datum.inputOperandInformation(
-                {/*.hasNoValue=*/input.hasNoValue,
-                 /*.location=*/input.location,
-                 /*.numberOfDimensions=*/static_cast<uint32_t>(input.dimensions.size())});
-        data.push_back(datum);
-
-        // package operand dimensions
-        for (uint32_t dimension : input.dimensions) {
-            FmqRequestDatum datum;
-            datum.inputOperandDimensionValue(dimension);
-            data.push_back(datum);
-        }
-    }
-
-    // package output data
-    for (const auto& output : request.outputs) {
-        // package operand information
-        FmqRequestDatum datum;
-        datum.outputOperandInformation(
-                {/*.hasNoValue=*/output.hasNoValue,
-                 /*.location=*/output.location,
-                 /*.numberOfDimensions=*/static_cast<uint32_t>(output.dimensions.size())});
-        data.push_back(datum);
-
-        // package operand dimensions
-        for (uint32_t dimension : output.dimensions) {
-            FmqRequestDatum datum;
-            datum.outputOperandDimensionValue(dimension);
-            data.push_back(datum);
-        }
-    }
-
-    // package pool identifier
-    for (int32_t slot : slots) {
-        FmqRequestDatum datum;
-        datum.poolIdentifier(slot);
-        data.push_back(datum);
-    }
-
-    // package measureTiming
-    {
-        FmqRequestDatum datum;
-        datum.measureTiming(measure);
-        data.push_back(datum);
-    }
-
-    // return packet
-    return data;
-}
-
-// deserialize a packet into the result
-std::optional<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
-deserialize(const std::vector<FmqResultDatum>& data) {
-    using discriminator = FmqResultDatum::hidl_discriminator;
-
-    std::vector<V1_2::OutputShape> outputShapes;
-    size_t index = 0;
-
-    // validate packet information
-    if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) {
-        LOG(ERROR) << "FMQ Result packet ill-formed";
-        return std::nullopt;
-    }
-
-    // unpackage packet information
-    const FmqResultDatum::PacketInformation& packetInfo = data[index].packetInformation();
-    index++;
-    const uint32_t packetSize = packetInfo.packetSize;
-    const V1_0::ErrorStatus errorStatus = packetInfo.errorStatus;
-    const uint32_t numberOfOperands = packetInfo.numberOfOperands;
-
-    // verify packet size
-    if (data.size() != packetSize) {
-        LOG(ERROR) << "FMQ Result packet ill-formed";
-        return std::nullopt;
-    }
-
-    // unpackage operands
-    for (size_t operand = 0; operand < numberOfOperands; ++operand) {
-        // validate operand information
-        if (data[index].getDiscriminator() != discriminator::operandInformation) {
-            LOG(ERROR) << "FMQ Result packet ill-formed";
-            return std::nullopt;
-        }
-
-        // unpackage operand information
-        const FmqResultDatum::OperandInformation& operandInfo = data[index].operandInformation();
-        index++;
-        const bool isSufficient = operandInfo.isSufficient;
-        const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
-
-        // unpackage operand dimensions
-        std::vector<uint32_t> dimensions;
-        dimensions.reserve(numberOfDimensions);
-        for (size_t i = 0; i < numberOfDimensions; ++i) {
-            // validate dimension
-            if (data[index].getDiscriminator() != discriminator::operandDimensionValue) {
-                LOG(ERROR) << "FMQ Result packet ill-formed";
-                return std::nullopt;
-            }
-
-            // unpackage dimension
-            const uint32_t dimension = data[index].operandDimensionValue();
-            index++;
-
-            // store result
-            dimensions.push_back(dimension);
-        }
-
-        // store result
-        outputShapes.push_back({/*.dimensions=*/dimensions, /*.isSufficient=*/isSufficient});
-    }
-
-    // validate execution timing
-    if (data[index].getDiscriminator() != discriminator::executionTiming) {
-        LOG(ERROR) << "FMQ Result packet ill-formed";
-        return std::nullopt;
-    }
-
-    // unpackage execution timing
-    const V1_2::Timing timing = data[index].executionTiming();
-    index++;
-
-    // validate packet information
-    if (index != packetSize) {
-        LOG(ERROR) << "FMQ Result packet ill-formed";
-        return std::nullopt;
-    }
-
-    // return result
-    return std::make_tuple(errorStatus, std::move(outputShapes), timing);
-}
-
-V1_0::ErrorStatus legacyConvertResultCodeToErrorStatus(int resultCode) {
-    return convertToV1_0(convertResultCodeToErrorStatus(resultCode));
-}
-
-std::pair<std::unique_ptr<ResultChannelReceiver>, const FmqResultDescriptor*>
-ResultChannelReceiver::create(size_t channelLength, std::chrono::microseconds pollingTimeWindow) {
-    std::unique_ptr<FmqResultChannel> fmqResultChannel =
-            std::make_unique<FmqResultChannel>(channelLength, /*confEventFlag=*/true);
-    if (!fmqResultChannel->isValid()) {
-        LOG(ERROR) << "Unable to create ResultChannelReceiver";
-        return {nullptr, nullptr};
-    }
-
-    const FmqResultDescriptor* descriptor = fmqResultChannel->getDesc();
-    return std::make_pair(
-            std::make_unique<ResultChannelReceiver>(std::move(fmqResultChannel), pollingTimeWindow),
-            descriptor);
-}
-
-ResultChannelReceiver::ResultChannelReceiver(std::unique_ptr<FmqResultChannel> fmqResultChannel,
-                                             std::chrono::microseconds pollingTimeWindow)
-    : mFmqResultChannel(std::move(fmqResultChannel)), kPollingTimeWindow(pollingTimeWindow) {}
-
-std::optional<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
-ResultChannelReceiver::getBlocking() {
-    const auto packet = getPacketBlocking();
-    if (!packet) {
-        return std::nullopt;
-    }
-
-    return deserialize(*packet);
-}
-
-void ResultChannelReceiver::invalidate() {
-    mValid = false;
-
-    // force unblock
-    // ExecutionBurstController waits on a result packet after sending a
-    // request. If the driver containing ExecutionBurstServer crashes, the
-    // controller may be waiting on the futex. This force unblock wakes up any
-    // thread waiting on the futex.
-    // TODO: look for a different/better way to signal/notify the futex to
-    // wake up any thread waiting on it
-    FmqResultDatum datum;
-    datum.packetInformation({/*.packetSize=*/0,
-                             /*.errorStatus=*/V1_0::ErrorStatus::GENERAL_FAILURE,
-                             /*.numberOfOperands=*/0});
-    mFmqResultChannel->writeBlocking(&datum, 1);
-}
-
-std::optional<std::vector<FmqResultDatum>> ResultChannelReceiver::getPacketBlocking() {
-    if (!mValid) {
-        return std::nullopt;
-    }
-
-    // First spend time polling if results are available in FMQ instead of
-    // waiting on the futex. Polling is more responsive (yielding lower
-    // latencies), but can take up more power, so only poll for a limited period
-    // of time.
-
-    auto& getCurrentTime = std::chrono::high_resolution_clock::now;
-    const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
-
-    while (getCurrentTime() < timeToStopPolling) {
-        // if class is being torn down, immediately return
-        if (!mValid.load(std::memory_order_relaxed)) {
-            return std::nullopt;
-        }
-
-        // Check if data is available. If it is, immediately retrieve it and
-        // return.
-        const size_t available = mFmqResultChannel->availableToRead();
-        if (available > 0) {
-            std::vector<FmqResultDatum> packet(available);
-            const bool success = mFmqResultChannel->read(packet.data(), available);
-            if (!success) {
-                LOG(ERROR) << "Error receiving packet";
-                return std::nullopt;
-            }
-            return std::make_optional(std::move(packet));
-        }
-    }
-
-    // If we get to this point, we either stopped polling because it was taking
-    // too long or polling was not allowed. Instead, perform a blocking call
-    // which uses a futex to save power.
-
-    // wait for result packet and read first element of result packet
-    FmqResultDatum datum;
-    bool success = mFmqResultChannel->readBlocking(&datum, 1);
-
-    // retrieve remaining elements
-    // NOTE: all of the data is already available at this point, so there's no
-    // need to do a blocking wait to wait for more data. This is known because
-    // in FMQ, all writes are published (made available) atomically. Currently,
-    // the producer always publishes the entire packet in one function call, so
-    // if the first element of the packet is available, the remaining elements
-    // are also available.
-    const size_t count = mFmqResultChannel->availableToRead();
-    std::vector<FmqResultDatum> packet(count + 1);
-    std::memcpy(&packet.front(), &datum, sizeof(datum));
-    success &= mFmqResultChannel->read(packet.data() + 1, count);
-
-    if (!mValid) {
-        return std::nullopt;
-    }
-
-    // ensure packet was successfully received
-    if (!success) {
-        LOG(ERROR) << "Error receiving packet";
-        return std::nullopt;
-    }
-
-    return std::make_optional(std::move(packet));
-}
-
-std::pair<std::unique_ptr<RequestChannelSender>, const FmqRequestDescriptor*>
-RequestChannelSender::create(size_t channelLength) {
-    std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
-            std::make_unique<FmqRequestChannel>(channelLength, /*confEventFlag=*/true);
-    if (!fmqRequestChannel->isValid()) {
-        LOG(ERROR) << "Unable to create RequestChannelSender";
-        return {nullptr, nullptr};
-    }
-
-    const FmqRequestDescriptor* descriptor = fmqRequestChannel->getDesc();
-    return std::make_pair(std::make_unique<RequestChannelSender>(std::move(fmqRequestChannel)),
-                          descriptor);
-}
-
-RequestChannelSender::RequestChannelSender(std::unique_ptr<FmqRequestChannel> fmqRequestChannel)
-    : mFmqRequestChannel(std::move(fmqRequestChannel)) {}
-
-bool RequestChannelSender::send(const V1_0::Request& request, V1_2::MeasureTiming measure,
-                                const std::vector<int32_t>& slots) {
-    const std::vector<FmqRequestDatum> serialized = serialize(request, measure, slots);
-    return sendPacket(serialized);
-}
-
-bool RequestChannelSender::sendPacket(const std::vector<FmqRequestDatum>& packet) {
-    if (!mValid) {
-        return false;
-    }
-
-    if (packet.size() > mFmqRequestChannel->availableToWrite()) {
-        LOG(ERROR)
-                << "RequestChannelSender::sendPacket -- packet size exceeds size available in FMQ";
-        return false;
-    }
-
-    // Always send the packet with "blocking" because this signals the futex and
-    // unblocks the consumer if it is waiting on the futex.
-    return mFmqRequestChannel->writeBlocking(packet.data(), packet.size());
-}
-
-void RequestChannelSender::invalidate() {
-    mValid = false;
-}
-
 hardware::Return<void> ExecutionBurstController::ExecutionBurstCallback::getMemories(
         const hardware::hidl_vec<int32_t>& slots, getMemories_cb cb) {
     std::lock_guard<std::mutex> guard(mMutex);