Relocate NN burst utility to ExecutionBurstUtils
This CL relocates serialize, deserialize, RequestChannelSender,
RequestChannelReceiver, ResultChannelSender, and ResultChannelReceiver
to ExecutionBurstUtils.
Bug: 177267324
Test: mma
Change-Id: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
Merged-In: Ie1fffdc89dc5bd325d3cd7806d2de632b8513cf9
(cherry picked from commit 297108360f2f97ec09d261bb10c5af0fa41e827a)
diff --git a/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp b/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp
index 848c77b..022548d 100644
--- a/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp
+++ b/neuralnetworks/1.2/utils/src/ExecutionBurstServer.cpp
@@ -29,21 +29,13 @@
#include <utility>
#include <vector>
+#include "ExecutionBurstUtils.h"
#include "HalInterfaces.h"
#include "Tracing.h"
namespace android::nn {
namespace {
-using hardware::MQDescriptorSync;
-using V1_2::FmqRequestDatum;
-using V1_2::FmqResultDatum;
-using V1_2::IBurstCallback;
-using V1_2::IBurstContext;
-
-constexpr V1_2::Timing kNoTiming = {std::numeric_limits<uint64_t>::max(),
- std::numeric_limits<uint64_t>::max()};
-
// DefaultBurstExecutorWithCache adapts an IPreparedModel so that it can be
// used as an IBurstExecutorWithCache. Specifically, the cache simply stores the
// hidl_memory object, and the execution forwards calls to the provided
@@ -108,384 +100,6 @@
} // anonymous namespace
-// serialize result
-std::vector<FmqResultDatum> serialize(V1_0::ErrorStatus errorStatus,
- const std::vector<V1_2::OutputShape>& outputShapes,
- V1_2::Timing timing) {
- // count how many elements need to be sent for a request
- size_t count = 2 + outputShapes.size();
- for (const auto& outputShape : outputShapes) {
- count += outputShape.dimensions.size();
- }
-
- // create buffer to temporarily store elements
- std::vector<FmqResultDatum> data;
- data.reserve(count);
-
- // package packetInfo
- {
- FmqResultDatum datum;
- datum.packetInformation({/*.packetSize=*/static_cast<uint32_t>(count),
- /*.errorStatus=*/errorStatus,
- /*.numberOfOperands=*/static_cast<uint32_t>(outputShapes.size())});
- data.push_back(datum);
- }
-
- // package output shape data
- for (const auto& operand : outputShapes) {
- // package operand information
- FmqResultDatum::OperandInformation info{};
- info.isSufficient = operand.isSufficient;
- info.numberOfDimensions = static_cast<uint32_t>(operand.dimensions.size());
-
- FmqResultDatum datum;
- datum.operandInformation(info);
- data.push_back(datum);
-
- // package operand dimensions
- for (uint32_t dimension : operand.dimensions) {
- FmqResultDatum datum;
- datum.operandDimensionValue(dimension);
- data.push_back(datum);
- }
- }
-
- // package executionTiming
- {
- FmqResultDatum datum;
- datum.executionTiming(timing);
- data.push_back(datum);
- }
-
- // return result
- return data;
-}
-
-// deserialize request
-std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>> deserialize(
- const std::vector<FmqRequestDatum>& data) {
- using discriminator = FmqRequestDatum::hidl_discriminator;
-
- size_t index = 0;
-
- // validate packet information
- if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage packet information
- const FmqRequestDatum::PacketInformation& packetInfo = data[index].packetInformation();
- index++;
- const uint32_t packetSize = packetInfo.packetSize;
- const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands;
- const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands;
- const uint32_t numberOfPools = packetInfo.numberOfPools;
-
- // verify packet size
- if (data.size() != packetSize) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage input operands
- std::vector<V1_0::RequestArgument> inputs;
- inputs.reserve(numberOfInputOperands);
- for (size_t operand = 0; operand < numberOfInputOperands; ++operand) {
- // validate input operand information
- if (data[index].getDiscriminator() != discriminator::inputOperandInformation) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage operand information
- const FmqRequestDatum::OperandInformation& operandInfo =
- data[index].inputOperandInformation();
- index++;
- const bool hasNoValue = operandInfo.hasNoValue;
- const V1_0::DataLocation location = operandInfo.location;
- const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
-
- // unpackage operand dimensions
- std::vector<uint32_t> dimensions;
- dimensions.reserve(numberOfDimensions);
- for (size_t i = 0; i < numberOfDimensions; ++i) {
- // validate dimension
- if (data[index].getDiscriminator() != discriminator::inputOperandDimensionValue) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage dimension
- const uint32_t dimension = data[index].inputOperandDimensionValue();
- index++;
-
- // store result
- dimensions.push_back(dimension);
- }
-
- // store result
- inputs.push_back(
- {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
- }
-
- // unpackage output operands
- std::vector<V1_0::RequestArgument> outputs;
- outputs.reserve(numberOfOutputOperands);
- for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) {
- // validate output operand information
- if (data[index].getDiscriminator() != discriminator::outputOperandInformation) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage operand information
- const FmqRequestDatum::OperandInformation& operandInfo =
- data[index].outputOperandInformation();
- index++;
- const bool hasNoValue = operandInfo.hasNoValue;
- const V1_0::DataLocation location = operandInfo.location;
- const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
-
- // unpackage operand dimensions
- std::vector<uint32_t> dimensions;
- dimensions.reserve(numberOfDimensions);
- for (size_t i = 0; i < numberOfDimensions; ++i) {
- // validate dimension
- if (data[index].getDiscriminator() != discriminator::outputOperandDimensionValue) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage dimension
- const uint32_t dimension = data[index].outputOperandDimensionValue();
- index++;
-
- // store result
- dimensions.push_back(dimension);
- }
-
- // store result
- outputs.push_back(
- {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
- }
-
- // unpackage pools
- std::vector<int32_t> slots;
- slots.reserve(numberOfPools);
- for (size_t pool = 0; pool < numberOfPools; ++pool) {
- // validate input operand information
- if (data[index].getDiscriminator() != discriminator::poolIdentifier) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage operand information
- const int32_t poolId = data[index].poolIdentifier();
- index++;
-
- // store result
- slots.push_back(poolId);
- }
-
- // validate measureTiming
- if (data[index].getDiscriminator() != discriminator::measureTiming) {
- LOG(ERROR) << "FMQ Request packet ill-formed";
- return std::nullopt;
- }
-
- // unpackage measureTiming
- const V1_2::MeasureTiming measure = data[index].measureTiming();
- index++;
-
- // validate packet information
- if (index != packetSize) {
- LOG(ERROR) << "FMQ Result packet ill-formed";
- return std::nullopt;
- }
-
- // return request
- V1_0::Request request = {/*.inputs=*/inputs, /*.outputs=*/outputs, /*.pools=*/{}};
- return std::make_tuple(std::move(request), std::move(slots), measure);
-}
-
-// RequestChannelReceiver methods
-
-std::unique_ptr<RequestChannelReceiver> RequestChannelReceiver::create(
- const FmqRequestDescriptor& requestChannel, std::chrono::microseconds pollingTimeWindow) {
- std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
- std::make_unique<FmqRequestChannel>(requestChannel);
-
- if (!fmqRequestChannel->isValid()) {
- LOG(ERROR) << "Unable to create RequestChannelReceiver";
- return nullptr;
- }
- if (fmqRequestChannel->getEventFlagWord() == nullptr) {
- LOG(ERROR)
- << "RequestChannelReceiver::create was passed an MQDescriptor without an EventFlag";
- return nullptr;
- }
-
- return std::make_unique<RequestChannelReceiver>(std::move(fmqRequestChannel),
- pollingTimeWindow);
-}
-
-RequestChannelReceiver::RequestChannelReceiver(std::unique_ptr<FmqRequestChannel> fmqRequestChannel,
- std::chrono::microseconds pollingTimeWindow)
- : mFmqRequestChannel(std::move(fmqRequestChannel)), kPollingTimeWindow(pollingTimeWindow) {}
-
-std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>>
-RequestChannelReceiver::getBlocking() {
- const auto packet = getPacketBlocking();
- if (!packet) {
- return std::nullopt;
- }
-
- return deserialize(*packet);
-}
-
-void RequestChannelReceiver::invalidate() {
- mTeardown = true;
-
- // force unblock
- // ExecutionBurstServer is by default waiting on a request packet. If the
- // client process destroys its burst object, the server may still be waiting
- // on the futex. This force unblock wakes up any thread waiting on the
- // futex.
- // TODO: look for a different/better way to signal/notify the futex to wake
- // up any thread waiting on it
- FmqRequestDatum datum;
- datum.packetInformation({/*.packetSize=*/0, /*.numberOfInputOperands=*/0,
- /*.numberOfOutputOperands=*/0, /*.numberOfPools=*/0});
- mFmqRequestChannel->writeBlocking(&datum, 1);
-}
-
-std::optional<std::vector<FmqRequestDatum>> RequestChannelReceiver::getPacketBlocking() {
- if (mTeardown) {
- return std::nullopt;
- }
-
- // First spend time polling if results are available in FMQ instead of
- // waiting on the futex. Polling is more responsive (yielding lower
- // latencies), but can take up more power, so only poll for a limited period
- // of time.
-
- auto& getCurrentTime = std::chrono::high_resolution_clock::now;
- const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
-
- while (getCurrentTime() < timeToStopPolling) {
- // if class is being torn down, immediately return
- if (mTeardown.load(std::memory_order_relaxed)) {
- return std::nullopt;
- }
-
- // Check if data is available. If it is, immediately retrieve it and
- // return.
- const size_t available = mFmqRequestChannel->availableToRead();
- if (available > 0) {
- // This is the first point when we know an execution is occurring,
- // so begin to collect systraces. Note that a similar systrace does
- // not exist at the corresponding point in
- // ResultChannelReceiver::getPacketBlocking because the execution is
- // already in flight.
- NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
- "ExecutionBurstServer getting packet");
- std::vector<FmqRequestDatum> packet(available);
- const bool success = mFmqRequestChannel->read(packet.data(), available);
- if (!success) {
- LOG(ERROR) << "Error receiving packet";
- return std::nullopt;
- }
- return std::make_optional(std::move(packet));
- }
- }
-
- // If we get to this point, we either stopped polling because it was taking
- // too long or polling was not allowed. Instead, perform a blocking call
- // which uses a futex to save power.
-
- // wait for request packet and read first element of request packet
- FmqRequestDatum datum;
- bool success = mFmqRequestChannel->readBlocking(&datum, 1);
-
- // This is the first point when we know an execution is occurring, so begin
- // to collect systraces. Note that a similar systrace does not exist at the
- // corresponding point in ResultChannelReceiver::getPacketBlocking because
- // the execution is already in flight.
- NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, "ExecutionBurstServer getting packet");
-
- // retrieve remaining elements
- // NOTE: all of the data is already available at this point, so there's no
- // need to do a blocking wait to wait for more data. This is known because
- // in FMQ, all writes are published (made available) atomically. Currently,
- // the producer always publishes the entire packet in one function call, so
- // if the first element of the packet is available, the remaining elements
- // are also available.
- const size_t count = mFmqRequestChannel->availableToRead();
- std::vector<FmqRequestDatum> packet(count + 1);
- std::memcpy(&packet.front(), &datum, sizeof(datum));
- success &= mFmqRequestChannel->read(packet.data() + 1, count);
-
- // terminate loop
- if (mTeardown) {
- return std::nullopt;
- }
-
- // ensure packet was successfully received
- if (!success) {
- LOG(ERROR) << "Error receiving packet";
- return std::nullopt;
- }
-
- return std::make_optional(std::move(packet));
-}
-
-// ResultChannelSender methods
-
-std::unique_ptr<ResultChannelSender> ResultChannelSender::create(
- const FmqResultDescriptor& resultChannel) {
- std::unique_ptr<FmqResultChannel> fmqResultChannel =
- std::make_unique<FmqResultChannel>(resultChannel);
-
- if (!fmqResultChannel->isValid()) {
- LOG(ERROR) << "Unable to create RequestChannelSender";
- return nullptr;
- }
- if (fmqResultChannel->getEventFlagWord() == nullptr) {
- LOG(ERROR) << "ResultChannelSender::create was passed an MQDescriptor without an EventFlag";
- return nullptr;
- }
-
- return std::make_unique<ResultChannelSender>(std::move(fmqResultChannel));
-}
-
-ResultChannelSender::ResultChannelSender(std::unique_ptr<FmqResultChannel> fmqResultChannel)
- : mFmqResultChannel(std::move(fmqResultChannel)) {}
-
-bool ResultChannelSender::send(V1_0::ErrorStatus errorStatus,
- const std::vector<V1_2::OutputShape>& outputShapes,
- V1_2::Timing timing) {
- const std::vector<FmqResultDatum> serialized = serialize(errorStatus, outputShapes, timing);
- return sendPacket(serialized);
-}
-
-bool ResultChannelSender::sendPacket(const std::vector<FmqResultDatum>& packet) {
- if (packet.size() > mFmqResultChannel->availableToWrite()) {
- LOG(ERROR)
- << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ";
- const std::vector<FmqResultDatum> errorPacket =
- serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
-
- // Always send the packet with "blocking" because this signals the futex
- // and unblocks the consumer if it is waiting on the futex.
- return mFmqResultChannel->writeBlocking(errorPacket.data(), errorPacket.size());
- }
-
- // Always send the packet with "blocking" because this signals the futex and
- // unblocks the consumer if it is waiting on the futex.
- return mFmqResultChannel->writeBlocking(packet.data(), packet.size());
-}
-
// ExecutionBurstServer methods
sp<ExecutionBurstServer> ExecutionBurstServer::create(