Relocate NN burst utility to ExecutionBurstUtils
This CL relocates serialize, deserialize, RequestChannelSender,
RequestChannelReceiver, ResultChannelSender, and ResultChannelReceiver
to ExecutionBurstUtils.
Bug: 177267324
Test: mma
Change-Id: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
Merged-In: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
(cherry picked from commit 297108360f2f97ec09d261bb10c5af0fa41e827a)
diff --git a/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp b/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp
index 212863e..2265861 100644
--- a/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp
+++ b/neuralnetworks/1.2/utils/src/ExecutionBurstController.cpp
@@ -29,6 +29,7 @@
#include <utility>
#include <vector>
+#include "ExecutionBurstUtils.h"
#include "HalInterfaces.h"
#include "Tracing.h"
#include "Utils.h"
@@ -36,16 +37,6 @@
namespace android::nn {
namespace {
-using V1_2::FmqRequestDatum;
-using V1_2::FmqResultDatum;
-using V1_2::IBurstCallback;
-using V1_2::IBurstContext;
-using FmqRequestDescriptor = hardware::MQDescriptorSync<FmqRequestDatum>;
-using FmqResultDescriptor = hardware::MQDescriptorSync<FmqResultDatum>;
-
-constexpr V1_2::Timing kNoTiming12 = {std::numeric_limits<uint64_t>::max(),
- std::numeric_limits<uint64_t>::max()};
-
class BurstContextDeathHandler : public hardware::hidl_death_recipient {
public:
using Callback = std::function<void()>;
@@ -65,329 +56,6 @@
} // anonymous namespace
-// serialize a request into a packet
-std::vector<FmqRequestDatum> serialize(const V1_0::Request& request, V1_2::MeasureTiming measure,
- const std::vector<int32_t>& slots) {
- // count how many elements need to be sent for a request
- size_t count = 2 + request.inputs.size() + request.outputs.size() + request.pools.size();
- for (const auto& input : request.inputs) {
- count += input.dimensions.size();
- }
- for (const auto& output : request.outputs) {
- count += output.dimensions.size();
- }
-
- // create buffer to temporarily store elements
- std::vector<FmqRequestDatum> data;
- data.reserve(count);
-
- // package packetInfo
- {
- FmqRequestDatum datum;
- datum.packetInformation(
- {/*.packetSize=*/static_cast<uint32_t>(count),
- /*.numberOfInputOperands=*/static_cast<uint32_t>(request.inputs.size()),
- /*.numberOfOutputOperands=*/static_cast<uint32_t>(request.outputs.size()),
- /*.numberOfPools=*/static_cast<uint32_t>(request.pools.size())});
- data.push_back(datum);
- }
-
- // package input data
- for (const auto& input : request.inputs) {
- // package operand information
- FmqRequestDatum datum;
- datum.inputOperandInformation(
- {/*.hasNoValue=*/input.hasNoValue,
- /*.location=*/input.location,
- /*.numberOfDimensions=*/static_cast<uint32_t>(input.dimensions.size())});
- data.push_back(datum);
-
- // package operand dimensions
- for (uint32_t dimension : input.dimensions) {
- FmqRequestDatum datum;
- datum.inputOperandDimensionValue(dimension);
- data.push_back(datum);
- }
- }
-
- // package output data
- for (const auto& output : request.outputs) {
- // package operand information
- FmqRequestDatum datum;
- datum.outputOperandInformation(
- {/*.hasNoValue=*/output.hasNoValue,
- /*.location=*/output.location,
- /*.numberOfDimensions=*/static_cast<uint32_t>(output.dimensions.size())});
- data.push_back(datum);
-
- // package operand dimensions
- for (uint32_t dimension : output.dimensions) {
- FmqRequestDatum datum;
- datum.outputOperandDimensionValue(dimension);
- data.push_back(datum);
- }
- }
-
- // package pool identifier
- for (int32_t slot : slots) {
- FmqRequestDatum datum;
- datum.poolIdentifier(slot);
- data.push_back(datum);
- }
-
- // package measureTiming
- {
- FmqRequestDatum datum;
- datum.measureTiming(measure);
- data.push_back(datum);
- }
-
- // return packet
- return data;
-}
-
-// deserialize a packet into the result
-std::optional<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
-deserialize(const std::vector<FmqResultDatum>& data) {
- using discriminator = FmqResultDatum::hidl_discriminator;
-
- std::vector<V1_2::OutputShape> outputShapes;
- size_t index = 0;
-
- // validate packet information
- if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage packet information
- const FmqResultDatum::PacketInformation& packetInfo = data[index].packetInformation();
- index++;
- const uint32_t packetSize = packetInfo.packetSize;
- const V1_0::ErrorStatus errorStatus = packetInfo.errorStatus;
- const uint32_t numberOfOperands = packetInfo.numberOfOperands;
-
- // verify packet size
- if (data.size() != packetSize) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage operands
- for (size_t operand = 0; operand < numberOfOperands; ++operand) {
- // validate operand information
- if (data[index].getDiscriminator() != discriminator::operandInformation) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage operand information
- const FmqResultDatum::OperandInformation& operandInfo = data[index].operandInformation();
- index++;
- const bool isSufficient = operandInfo.isSufficient;
- const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
-
- // unpackage operand dimensions
- std::vector<uint32_t> dimensions;
- dimensions.reserve(numberOfDimensions);
- for (size_t i = 0; i < numberOfDimensions; ++i) {
- // validate dimension
- if (data[index].getDiscriminator() != discriminator::operandDimensionValue) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage dimension
- const uint32_t dimension = data[index].operandDimensionValue();
- index++;
-
- // store result
- dimensions.push_back(dimension);
- }
-
- // store result
- outputShapes.push_back({/*.dimensions=*/dimensions, /*.isSufficient=*/isSufficient});
- }
-
- // validate execution timing
- if (data[index].getDiscriminator() != discriminator::executionTiming) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage execution timing
- const V1_2::Timing timing = data[index].executionTiming();
- index++;
-
- // validate packet information
- if (index != packetSize) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // return result
- return std::make_tuple(errorStatus, std::move(outputShapes), timing);
-}
-
-V1_0::ErrorStatus legacyConvertResultCodeToErrorStatus(int resultCode) {
- return convertToV1_0(convertResultCodeToErrorStatus(resultCode));
-}
-
-std::pair<std::unique_ptr<ResultChannelReceiver>, const FmqResultDescriptor*>
-ResultChannelReceiver::create(size_t channelLength, std::chrono::microseconds pollingTimeWindow) {
- std::unique_ptr<FmqResultChannel> fmqResultChannel =
- std::make_unique<FmqResultChannel>(channelLength, /*confEventFlag=*/true);
- if (!fmqResultChannel->isValid()) {
- LOG(ERROR) << "Unable to create ResultChannelReceiver";
- return {nullptr, nullptr};
- }
-
- const FmqResultDescriptor* descriptor = fmqResultChannel->getDesc();
- return std::make_pair(
- std::make_unique<ResultChannelReceiver>(std::move(fmqResultChannel), pollingTimeWindow),
- descriptor);
-}
-
-ResultChannelReceiver::ResultChannelReceiver(std::unique_ptr<FmqResultChannel> fmqResultChannel,
- std::chrono::microseconds pollingTimeWindow)
- : mFmqResultChannel(std::move(fmqResultChannel)), kPollingTimeWindow(pollingTimeWindow) {}
-
-std::optional<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
-ResultChannelReceiver::getBlocking() {
- const auto packet = getPacketBlocking();
- if (!packet) {
- return std::nullopt;
- }
-
- return deserialize(*packet);
-}
-
-void ResultChannelReceiver::invalidate() {
- mValid = false;
-
- // force unblock
- // ExecutionBurstController waits on a result packet after sending a
- // request. If the driver containing ExecutionBurstServer crashes, the
- // controller may be waiting on the futex. This force unblock wakes up any
- // thread waiting on the futex.
- // TODO: look for a different/better way to signal/notify the futex to
- // wake up any thread waiting on it
- FmqResultDatum datum;
- datum.packetInformation({/*.packetSize=*/0,
- /*.errorStatus=*/V1_0::ErrorStatus::GENERAL_FAILURE,
- /*.numberOfOperands=*/0});
- mFmqResultChannel->writeBlocking(&datum, 1);
-}
-
-std::optional<std::vector<FmqResultDatum>> ResultChannelReceiver::getPacketBlocking() {
- if (!mValid) {
- return std::nullopt;
- }
-
- // First spend time polling if results are available in FMQ instead of
- // waiting on the futex. Polling is more responsive (yielding lower
- // latencies), but can take up more power, so only poll for a limited period
- // of time.
-
- auto& getCurrentTime = std::chrono::high_resolution_clock::now;
- const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
-
- while (getCurrentTime() < timeToStopPolling) {
- // if class is being torn down, immediately return
- if (!mValid.load(std::memory_order_relaxed)) {
- return std::nullopt;
- }
-
- // Check if data is available. If it is, immediately retrieve it and
- // return.
- const size_t available = mFmqResultChannel->availableToRead();
- if (available > 0) {
- std::vector<FmqResultDatum> packet(available);
- const bool success = mFmqResultChannel->read(packet.data(), available);
- if (!success) {
- LOG(ERROR) << "Error receiving packet";
- return std::nullopt;
- }
- return std::make_optional(std::move(packet));
- }
- }
-
- // If we get to this point, we either stopped polling because it was taking
- // too long or polling was not allowed. Instead, perform a blocking call
- // which uses a futex to save power.
-
- // wait for result packet and read first element of result packet
- FmqResultDatum datum;
- bool success = mFmqResultChannel->readBlocking(&datum, 1);
-
- // retrieve remaining elements
- // NOTE: all of the data is already available at this point, so there's no
- // need to do a blocking wait to wait for more data. This is known because
- // in FMQ, all writes are published (made available) atomically. Currently,
- // the producer always publishes the entire packet in one function call, so
- // if the first element of the packet is available, the remaining elements
- // are also available.
- const size_t count = mFmqResultChannel->availableToRead();
- std::vector<FmqResultDatum> packet(count + 1);
- std::memcpy(&packet.front(), &datum, sizeof(datum));
- success &= mFmqResultChannel->read(packet.data() + 1, count);
-
- if (!mValid) {
- return std::nullopt;
- }
-
- // ensure packet was successfully received
- if (!success) {
- LOG(ERROR) << "Error receiving packet";
- return std::nullopt;
- }
-
- return std::make_optional(std::move(packet));
-}
-
-std::pair<std::unique_ptr<RequestChannelSender>, const FmqRequestDescriptor*>
-RequestChannelSender::create(size_t channelLength) {
- std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
- std::make_unique<FmqRequestChannel>(channelLength, /*confEventFlag=*/true);
- if (!fmqRequestChannel->isValid()) {
- LOG(ERROR) << "Unable to create RequestChannelSender";
- return {nullptr, nullptr};
- }
-
- const FmqRequestDescriptor* descriptor = fmqRequestChannel->getDesc();
- return std::make_pair(std::make_unique<RequestChannelSender>(std::move(fmqRequestChannel)),
- descriptor);
-}
-
-RequestChannelSender::RequestChannelSender(std::unique_ptr<FmqRequestChannel> fmqRequestChannel)
- : mFmqRequestChannel(std::move(fmqRequestChannel)) {}
-
-bool RequestChannelSender::send(const V1_0::Request& request, V1_2::MeasureTiming measure,
- const std::vector<int32_t>& slots) {
- const std::vector<FmqRequestDatum> serialized = serialize(request, measure, slots);
- return sendPacket(serialized);
-}
-
-bool RequestChannelSender::sendPacket(const std::vector<FmqRequestDatum>& packet) {
- if (!mValid) {
- return false;
- }
-
- if (packet.size() > mFmqRequestChannel->availableToWrite()) {
- LOG(ERROR)
- << "RequestChannelSender::sendPacket -- packet size exceeds size available in FMQ";
- return false;
- }
-
- // Always send the packet with "blocking" because this signals the futex and
- // unblocks the consumer if it is waiting on the futex.
- return mFmqRequestChannel->writeBlocking(packet.data(), packet.size());
-}
-
-void RequestChannelSender::invalidate() {
- mValid = false;
-}
-
hardware::Return<void> ExecutionBurstController::ExecutionBurstCallback::getMemories(
const hardware::hidl_vec<int32_t>& slots, getMemories_cb cb) {
std::lock_guard<std::mutex> guard(mMutex);