blob: f0275f933ab6d54a78e5ddf60fc43ddc68f60d18 [file] [log] [blame]
Michael Butler8fc48962021-01-08 17:21:27 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "ExecutionBurstUtils"
18
19#include "ExecutionBurstUtils.h"
20
21#include <android-base/logging.h>
22#include <android/hardware/neuralnetworks/1.0/types.h>
23#include <android/hardware/neuralnetworks/1.1/types.h>
24#include <android/hardware/neuralnetworks/1.2/types.h>
25#include <fmq/MessageQueue.h>
26#include <hidl/MQDescriptor.h>
27
28#include <atomic>
29#include <chrono>
30#include <memory>
31#include <thread>
32#include <tuple>
33#include <utility>
34#include <vector>
35
36namespace android::hardware::neuralnetworks::V1_2::utils {
37namespace {
38
39constexpr V1_2::Timing kNoTiming = {std::numeric_limits<uint64_t>::max(),
40 std::numeric_limits<uint64_t>::max()};
41
42}
43
44// serialize a request into a packet
45std::vector<FmqRequestDatum> serialize(const V1_0::Request& request, V1_2::MeasureTiming measure,
46 const std::vector<int32_t>& slots) {
47 // count how many elements need to be sent for a request
48 size_t count = 2 + request.inputs.size() + request.outputs.size() + request.pools.size();
49 for (const auto& input : request.inputs) {
50 count += input.dimensions.size();
51 }
52 for (const auto& output : request.outputs) {
53 count += output.dimensions.size();
54 }
55
56 // create buffer to temporarily store elements
57 std::vector<FmqRequestDatum> data;
58 data.reserve(count);
59
60 // package packetInfo
61 {
62 FmqRequestDatum datum;
63 datum.packetInformation(
64 {/*.packetSize=*/static_cast<uint32_t>(count),
65 /*.numberOfInputOperands=*/static_cast<uint32_t>(request.inputs.size()),
66 /*.numberOfOutputOperands=*/static_cast<uint32_t>(request.outputs.size()),
67 /*.numberOfPools=*/static_cast<uint32_t>(request.pools.size())});
68 data.push_back(datum);
69 }
70
71 // package input data
72 for (const auto& input : request.inputs) {
73 // package operand information
74 FmqRequestDatum datum;
75 datum.inputOperandInformation(
76 {/*.hasNoValue=*/input.hasNoValue,
77 /*.location=*/input.location,
78 /*.numberOfDimensions=*/static_cast<uint32_t>(input.dimensions.size())});
79 data.push_back(datum);
80
81 // package operand dimensions
82 for (uint32_t dimension : input.dimensions) {
83 FmqRequestDatum datum;
84 datum.inputOperandDimensionValue(dimension);
85 data.push_back(datum);
86 }
87 }
88
89 // package output data
90 for (const auto& output : request.outputs) {
91 // package operand information
92 FmqRequestDatum datum;
93 datum.outputOperandInformation(
94 {/*.hasNoValue=*/output.hasNoValue,
95 /*.location=*/output.location,
96 /*.numberOfDimensions=*/static_cast<uint32_t>(output.dimensions.size())});
97 data.push_back(datum);
98
99 // package operand dimensions
100 for (uint32_t dimension : output.dimensions) {
101 FmqRequestDatum datum;
102 datum.outputOperandDimensionValue(dimension);
103 data.push_back(datum);
104 }
105 }
106
107 // package pool identifier
108 for (int32_t slot : slots) {
109 FmqRequestDatum datum;
110 datum.poolIdentifier(slot);
111 data.push_back(datum);
112 }
113
114 // package measureTiming
115 {
116 FmqRequestDatum datum;
117 datum.measureTiming(measure);
118 data.push_back(datum);
119 }
120
121 // return packet
122 return data;
123}
124
125// serialize result
126std::vector<FmqResultDatum> serialize(V1_0::ErrorStatus errorStatus,
127 const std::vector<V1_2::OutputShape>& outputShapes,
128 V1_2::Timing timing) {
129 // count how many elements need to be sent for a request
130 size_t count = 2 + outputShapes.size();
131 for (const auto& outputShape : outputShapes) {
132 count += outputShape.dimensions.size();
133 }
134
135 // create buffer to temporarily store elements
136 std::vector<FmqResultDatum> data;
137 data.reserve(count);
138
139 // package packetInfo
140 {
141 FmqResultDatum datum;
142 datum.packetInformation({/*.packetSize=*/static_cast<uint32_t>(count),
143 /*.errorStatus=*/errorStatus,
144 /*.numberOfOperands=*/static_cast<uint32_t>(outputShapes.size())});
145 data.push_back(datum);
146 }
147
148 // package output shape data
149 for (const auto& operand : outputShapes) {
150 // package operand information
151 FmqResultDatum::OperandInformation info{};
152 info.isSufficient = operand.isSufficient;
153 info.numberOfDimensions = static_cast<uint32_t>(operand.dimensions.size());
154
155 FmqResultDatum datum;
156 datum.operandInformation(info);
157 data.push_back(datum);
158
159 // package operand dimensions
160 for (uint32_t dimension : operand.dimensions) {
161 FmqResultDatum datum;
162 datum.operandDimensionValue(dimension);
163 data.push_back(datum);
164 }
165 }
166
167 // package executionTiming
168 {
169 FmqResultDatum datum;
170 datum.executionTiming(timing);
171 data.push_back(datum);
172 }
173
174 // return result
175 return data;
176}
177
178// deserialize request
179std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>> deserialize(
180 const std::vector<FmqRequestDatum>& data) {
181 using discriminator = FmqRequestDatum::hidl_discriminator;
182
183 size_t index = 0;
184
185 // validate packet information
186 if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) {
187 LOG(ERROR) << "FMQ Request packet ill-formed";
188 return std::nullopt;
189 }
190
191 // unpackage packet information
192 const FmqRequestDatum::PacketInformation& packetInfo = data[index].packetInformation();
193 index++;
194 const uint32_t packetSize = packetInfo.packetSize;
195 const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands;
196 const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands;
197 const uint32_t numberOfPools = packetInfo.numberOfPools;
198
199 // verify packet size
200 if (data.size() != packetSize) {
201 LOG(ERROR) << "FMQ Request packet ill-formed";
202 return std::nullopt;
203 }
204
205 // unpackage input operands
206 std::vector<V1_0::RequestArgument> inputs;
207 inputs.reserve(numberOfInputOperands);
208 for (size_t operand = 0; operand < numberOfInputOperands; ++operand) {
209 // validate input operand information
210 if (data[index].getDiscriminator() != discriminator::inputOperandInformation) {
211 LOG(ERROR) << "FMQ Request packet ill-formed";
212 return std::nullopt;
213 }
214
215 // unpackage operand information
216 const FmqRequestDatum::OperandInformation& operandInfo =
217 data[index].inputOperandInformation();
218 index++;
219 const bool hasNoValue = operandInfo.hasNoValue;
220 const V1_0::DataLocation location = operandInfo.location;
221 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
222
223 // unpackage operand dimensions
224 std::vector<uint32_t> dimensions;
225 dimensions.reserve(numberOfDimensions);
226 for (size_t i = 0; i < numberOfDimensions; ++i) {
227 // validate dimension
228 if (data[index].getDiscriminator() != discriminator::inputOperandDimensionValue) {
229 LOG(ERROR) << "FMQ Request packet ill-formed";
230 return std::nullopt;
231 }
232
233 // unpackage dimension
234 const uint32_t dimension = data[index].inputOperandDimensionValue();
235 index++;
236
237 // store result
238 dimensions.push_back(dimension);
239 }
240
241 // store result
242 inputs.push_back(
243 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
244 }
245
246 // unpackage output operands
247 std::vector<V1_0::RequestArgument> outputs;
248 outputs.reserve(numberOfOutputOperands);
249 for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) {
250 // validate output operand information
251 if (data[index].getDiscriminator() != discriminator::outputOperandInformation) {
252 LOG(ERROR) << "FMQ Request packet ill-formed";
253 return std::nullopt;
254 }
255
256 // unpackage operand information
257 const FmqRequestDatum::OperandInformation& operandInfo =
258 data[index].outputOperandInformation();
259 index++;
260 const bool hasNoValue = operandInfo.hasNoValue;
261 const V1_0::DataLocation location = operandInfo.location;
262 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
263
264 // unpackage operand dimensions
265 std::vector<uint32_t> dimensions;
266 dimensions.reserve(numberOfDimensions);
267 for (size_t i = 0; i < numberOfDimensions; ++i) {
268 // validate dimension
269 if (data[index].getDiscriminator() != discriminator::outputOperandDimensionValue) {
270 LOG(ERROR) << "FMQ Request packet ill-formed";
271 return std::nullopt;
272 }
273
274 // unpackage dimension
275 const uint32_t dimension = data[index].outputOperandDimensionValue();
276 index++;
277
278 // store result
279 dimensions.push_back(dimension);
280 }
281
282 // store result
283 outputs.push_back(
284 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
285 }
286
287 // unpackage pools
288 std::vector<int32_t> slots;
289 slots.reserve(numberOfPools);
290 for (size_t pool = 0; pool < numberOfPools; ++pool) {
291 // validate input operand information
292 if (data[index].getDiscriminator() != discriminator::poolIdentifier) {
293 LOG(ERROR) << "FMQ Request packet ill-formed";
294 return std::nullopt;
295 }
296
297 // unpackage operand information
298 const int32_t poolId = data[index].poolIdentifier();
299 index++;
300
301 // store result
302 slots.push_back(poolId);
303 }
304
305 // validate measureTiming
306 if (data[index].getDiscriminator() != discriminator::measureTiming) {
307 LOG(ERROR) << "FMQ Request packet ill-formed";
308 return std::nullopt;
309 }
310
311 // unpackage measureTiming
312 const V1_2::MeasureTiming measure = data[index].measureTiming();
313 index++;
314
315 // validate packet information
316 if (index != packetSize) {
317 LOG(ERROR) << "FMQ Result packet ill-formed";
318 return std::nullopt;
319 }
320
321 // return request
322 V1_0::Request request = {/*.inputs=*/inputs, /*.outputs=*/outputs, /*.pools=*/{}};
323 return std::make_tuple(std::move(request), std::move(slots), measure);
324}
325
326// deserialize a packet into the result
327std::optional<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
328deserialize(const std::vector<FmqResultDatum>& data) {
329 using discriminator = FmqResultDatum::hidl_discriminator;
330
331 std::vector<V1_2::OutputShape> outputShapes;
332 size_t index = 0;
333
334 // validate packet information
335 if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) {
336 LOG(ERROR) << "FMQ Result packet ill-formed";
337 return std::nullopt;
338 }
339
340 // unpackage packet information
341 const FmqResultDatum::PacketInformation& packetInfo = data[index].packetInformation();
342 index++;
343 const uint32_t packetSize = packetInfo.packetSize;
344 const V1_0::ErrorStatus errorStatus = packetInfo.errorStatus;
345 const uint32_t numberOfOperands = packetInfo.numberOfOperands;
346
347 // verify packet size
348 if (data.size() != packetSize) {
349 LOG(ERROR) << "FMQ Result packet ill-formed";
350 return std::nullopt;
351 }
352
353 // unpackage operands
354 for (size_t operand = 0; operand < numberOfOperands; ++operand) {
355 // validate operand information
356 if (data[index].getDiscriminator() != discriminator::operandInformation) {
357 LOG(ERROR) << "FMQ Result packet ill-formed";
358 return std::nullopt;
359 }
360
361 // unpackage operand information
362 const FmqResultDatum::OperandInformation& operandInfo = data[index].operandInformation();
363 index++;
364 const bool isSufficient = operandInfo.isSufficient;
365 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
366
367 // unpackage operand dimensions
368 std::vector<uint32_t> dimensions;
369 dimensions.reserve(numberOfDimensions);
370 for (size_t i = 0; i < numberOfDimensions; ++i) {
371 // validate dimension
372 if (data[index].getDiscriminator() != discriminator::operandDimensionValue) {
373 LOG(ERROR) << "FMQ Result packet ill-formed";
374 return std::nullopt;
375 }
376
377 // unpackage dimension
378 const uint32_t dimension = data[index].operandDimensionValue();
379 index++;
380
381 // store result
382 dimensions.push_back(dimension);
383 }
384
385 // store result
386 outputShapes.push_back({/*.dimensions=*/dimensions, /*.isSufficient=*/isSufficient});
387 }
388
389 // validate execution timing
390 if (data[index].getDiscriminator() != discriminator::executionTiming) {
391 LOG(ERROR) << "FMQ Result packet ill-formed";
392 return std::nullopt;
393 }
394
395 // unpackage execution timing
396 const V1_2::Timing timing = data[index].executionTiming();
397 index++;
398
399 // validate packet information
400 if (index != packetSize) {
401 LOG(ERROR) << "FMQ Result packet ill-formed";
402 return std::nullopt;
403 }
404
405 // return result
406 return std::make_tuple(errorStatus, std::move(outputShapes), timing);
407}
408
409V1_0::ErrorStatus legacyConvertResultCodeToErrorStatus(int resultCode) {
410 return convertToV1_0(convertResultCodeToErrorStatus(resultCode));
411}
412
413// RequestChannelSender methods
414
415std::pair<std::unique_ptr<RequestChannelSender>, const FmqRequestDescriptor*>
416RequestChannelSender::create(size_t channelLength) {
417 std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
418 std::make_unique<FmqRequestChannel>(channelLength, /*confEventFlag=*/true);
419 if (!fmqRequestChannel->isValid()) {
420 LOG(ERROR) << "Unable to create RequestChannelSender";
421 return {nullptr, nullptr};
422 }
423
424 const FmqRequestDescriptor* descriptor = fmqRequestChannel->getDesc();
425 return std::make_pair(std::make_unique<RequestChannelSender>(std::move(fmqRequestChannel)),
426 descriptor);
427}
428
429RequestChannelSender::RequestChannelSender(std::unique_ptr<FmqRequestChannel> fmqRequestChannel)
430 : mFmqRequestChannel(std::move(fmqRequestChannel)) {}
431
432bool RequestChannelSender::send(const V1_0::Request& request, V1_2::MeasureTiming measure,
433 const std::vector<int32_t>& slots) {
434 const std::vector<FmqRequestDatum> serialized = serialize(request, measure, slots);
435 return sendPacket(serialized);
436}
437
438bool RequestChannelSender::sendPacket(const std::vector<FmqRequestDatum>& packet) {
439 if (!mValid) {
440 return false;
441 }
442
443 if (packet.size() > mFmqRequestChannel->availableToWrite()) {
444 LOG(ERROR)
445 << "RequestChannelSender::sendPacket -- packet size exceeds size available in FMQ";
446 return false;
447 }
448
449 // Always send the packet with "blocking" because this signals the futex and
450 // unblocks the consumer if it is waiting on the futex.
451 return mFmqRequestChannel->writeBlocking(packet.data(), packet.size());
452}
453
454void RequestChannelSender::invalidate() {
455 mValid = false;
456}
457
458// RequestChannelReceiver methods
459
460std::unique_ptr<RequestChannelReceiver> RequestChannelReceiver::create(
461 const FmqRequestDescriptor& requestChannel, std::chrono::microseconds pollingTimeWindow) {
462 std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
463 std::make_unique<FmqRequestChannel>(requestChannel);
464
465 if (!fmqRequestChannel->isValid()) {
466 LOG(ERROR) << "Unable to create RequestChannelReceiver";
467 return nullptr;
468 }
469 if (fmqRequestChannel->getEventFlagWord() == nullptr) {
470 LOG(ERROR)
471 << "RequestChannelReceiver::create was passed an MQDescriptor without an EventFlag";
472 return nullptr;
473 }
474
475 return std::make_unique<RequestChannelReceiver>(std::move(fmqRequestChannel),
476 pollingTimeWindow);
477}
478
479RequestChannelReceiver::RequestChannelReceiver(std::unique_ptr<FmqRequestChannel> fmqRequestChannel,
480 std::chrono::microseconds pollingTimeWindow)
481 : mFmqRequestChannel(std::move(fmqRequestChannel)), kPollingTimeWindow(pollingTimeWindow) {}
482
483std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>>
484RequestChannelReceiver::getBlocking() {
485 const auto packet = getPacketBlocking();
486 if (!packet) {
487 return std::nullopt;
488 }
489
490 return deserialize(*packet);
491}
492
493void RequestChannelReceiver::invalidate() {
494 mTeardown = true;
495
496 // force unblock
497 // ExecutionBurstServer is by default waiting on a request packet. If the
498 // client process destroys its burst object, the server may still be waiting
499 // on the futex. This force unblock wakes up any thread waiting on the
500 // futex.
501 // TODO: look for a different/better way to signal/notify the futex to wake
502 // up any thread waiting on it
503 FmqRequestDatum datum;
504 datum.packetInformation({/*.packetSize=*/0, /*.numberOfInputOperands=*/0,
505 /*.numberOfOutputOperands=*/0, /*.numberOfPools=*/0});
506 mFmqRequestChannel->writeBlocking(&datum, 1);
507}
508
509std::optional<std::vector<FmqRequestDatum>> RequestChannelReceiver::getPacketBlocking() {
510 if (mTeardown) {
511 return std::nullopt;
512 }
513
514 // First spend time polling if results are available in FMQ instead of
515 // waiting on the futex. Polling is more responsive (yielding lower
516 // latencies), but can take up more power, so only poll for a limited period
517 // of time.
518
519 auto& getCurrentTime = std::chrono::high_resolution_clock::now;
520 const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
521
522 while (getCurrentTime() < timeToStopPolling) {
523 // if class is being torn down, immediately return
524 if (mTeardown.load(std::memory_order_relaxed)) {
525 return std::nullopt;
526 }
527
528 // Check if data is available. If it is, immediately retrieve it and
529 // return.
530 const size_t available = mFmqRequestChannel->availableToRead();
531 if (available > 0) {
532 // This is the first point when we know an execution is occurring,
533 // so begin to collect systraces. Note that a similar systrace does
534 // not exist at the corresponding point in
535 // ResultChannelReceiver::getPacketBlocking because the execution is
536 // already in flight.
537 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
538 "ExecutionBurstServer getting packet");
539 std::vector<FmqRequestDatum> packet(available);
540 const bool success = mFmqRequestChannel->read(packet.data(), available);
541 if (!success) {
542 LOG(ERROR) << "Error receiving packet";
543 return std::nullopt;
544 }
545 return std::make_optional(std::move(packet));
546 }
547 }
548
549 // If we get to this point, we either stopped polling because it was taking
550 // too long or polling was not allowed. Instead, perform a blocking call
551 // which uses a futex to save power.
552
553 // wait for request packet and read first element of request packet
554 FmqRequestDatum datum;
555 bool success = mFmqRequestChannel->readBlocking(&datum, 1);
556
557 // This is the first point when we know an execution is occurring, so begin
558 // to collect systraces. Note that a similar systrace does not exist at the
559 // corresponding point in ResultChannelReceiver::getPacketBlocking because
560 // the execution is already in flight.
561 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, "ExecutionBurstServer getting packet");
562
563 // retrieve remaining elements
564 // NOTE: all of the data is already available at this point, so there's no
565 // need to do a blocking wait to wait for more data. This is known because
566 // in FMQ, all writes are published (made available) atomically. Currently,
567 // the producer always publishes the entire packet in one function call, so
568 // if the first element of the packet is available, the remaining elements
569 // are also available.
570 const size_t count = mFmqRequestChannel->availableToRead();
571 std::vector<FmqRequestDatum> packet(count + 1);
572 std::memcpy(&packet.front(), &datum, sizeof(datum));
573 success &= mFmqRequestChannel->read(packet.data() + 1, count);
574
575 // terminate loop
576 if (mTeardown) {
577 return std::nullopt;
578 }
579
580 // ensure packet was successfully received
581 if (!success) {
582 LOG(ERROR) << "Error receiving packet";
583 return std::nullopt;
584 }
585
586 return std::make_optional(std::move(packet));
587}
588
589// ResultChannelSender methods
590
591std::unique_ptr<ResultChannelSender> ResultChannelSender::create(
592 const FmqResultDescriptor& resultChannel) {
593 std::unique_ptr<FmqResultChannel> fmqResultChannel =
594 std::make_unique<FmqResultChannel>(resultChannel);
595
596 if (!fmqResultChannel->isValid()) {
597 LOG(ERROR) << "Unable to create RequestChannelSender";
598 return nullptr;
599 }
600 if (fmqResultChannel->getEventFlagWord() == nullptr) {
601 LOG(ERROR) << "ResultChannelSender::create was passed an MQDescriptor without an EventFlag";
602 return nullptr;
603 }
604
605 return std::make_unique<ResultChannelSender>(std::move(fmqResultChannel));
606}
607
608ResultChannelSender::ResultChannelSender(std::unique_ptr<FmqResultChannel> fmqResultChannel)
609 : mFmqResultChannel(std::move(fmqResultChannel)) {}
610
611bool ResultChannelSender::send(V1_0::ErrorStatus errorStatus,
612 const std::vector<V1_2::OutputShape>& outputShapes,
613 V1_2::Timing timing) {
614 const std::vector<FmqResultDatum> serialized = serialize(errorStatus, outputShapes, timing);
615 return sendPacket(serialized);
616}
617
618bool ResultChannelSender::sendPacket(const std::vector<FmqResultDatum>& packet) {
619 if (packet.size() > mFmqResultChannel->availableToWrite()) {
620 LOG(ERROR)
621 << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ";
622 const std::vector<FmqResultDatum> errorPacket =
623 serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
624
625 // Always send the packet with "blocking" because this signals the futex
626 // and unblocks the consumer if it is waiting on the futex.
627 return mFmqResultChannel->writeBlocking(errorPacket.data(), errorPacket.size());
628 }
629
630 // Always send the packet with "blocking" because this signals the futex and
631 // unblocks the consumer if it is waiting on the futex.
632 return mFmqResultChannel->writeBlocking(packet.data(), packet.size());
633}
634
635// ResultChannelReceiver methods
636
637std::pair<std::unique_ptr<ResultChannelReceiver>, const FmqResultDescriptor*>
638ResultChannelReceiver::create(size_t channelLength, std::chrono::microseconds pollingTimeWindow) {
639 std::unique_ptr<FmqResultChannel> fmqResultChannel =
640 std::make_unique<FmqResultChannel>(channelLength, /*confEventFlag=*/true);
641 if (!fmqResultChannel->isValid()) {
642 LOG(ERROR) << "Unable to create ResultChannelReceiver";
643 return {nullptr, nullptr};
644 }
645
646 const FmqResultDescriptor* descriptor = fmqResultChannel->getDesc();
647 return std::make_pair(
648 std::make_unique<ResultChannelReceiver>(std::move(fmqResultChannel), pollingTimeWindow),
649 descriptor);
650}
651
652ResultChannelReceiver::ResultChannelReceiver(std::unique_ptr<FmqResultChannel> fmqResultChannel,
653 std::chrono::microseconds pollingTimeWindow)
654 : mFmqResultChannel(std::move(fmqResultChannel)), kPollingTimeWindow(pollingTimeWindow) {}
655
656std::optional<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
657ResultChannelReceiver::getBlocking() {
658 const auto packet = getPacketBlocking();
659 if (!packet) {
660 return std::nullopt;
661 }
662
663 return deserialize(*packet);
664}
665
666void ResultChannelReceiver::invalidate() {
667 mValid = false;
668
669 // force unblock
670 // ExecutionBurstController waits on a result packet after sending a
671 // request. If the driver containing ExecutionBurstServer crashes, the
672 // controller may be waiting on the futex. This force unblock wakes up any
673 // thread waiting on the futex.
674 // TODO: look for a different/better way to signal/notify the futex to
675 // wake up any thread waiting on it
676 FmqResultDatum datum;
677 datum.packetInformation({/*.packetSize=*/0,
678 /*.errorStatus=*/V1_0::ErrorStatus::GENERAL_FAILURE,
679 /*.numberOfOperands=*/0});
680 mFmqResultChannel->writeBlocking(&datum, 1);
681}
682
683std::optional<std::vector<FmqResultDatum>> ResultChannelReceiver::getPacketBlocking() {
684 if (!mValid) {
685 return std::nullopt;
686 }
687
688 // First spend time polling if results are available in FMQ instead of
689 // waiting on the futex. Polling is more responsive (yielding lower
690 // latencies), but can take up more power, so only poll for a limited period
691 // of time.
692
693 auto& getCurrentTime = std::chrono::high_resolution_clock::now;
694 const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
695
696 while (getCurrentTime() < timeToStopPolling) {
697 // if class is being torn down, immediately return
698 if (!mValid.load(std::memory_order_relaxed)) {
699 return std::nullopt;
700 }
701
702 // Check if data is available. If it is, immediately retrieve it and
703 // return.
704 const size_t available = mFmqResultChannel->availableToRead();
705 if (available > 0) {
706 std::vector<FmqResultDatum> packet(available);
707 const bool success = mFmqResultChannel->read(packet.data(), available);
708 if (!success) {
709 LOG(ERROR) << "Error receiving packet";
710 return std::nullopt;
711 }
712 return std::make_optional(std::move(packet));
713 }
714 }
715
716 // If we get to this point, we either stopped polling because it was taking
717 // too long or polling was not allowed. Instead, perform a blocking call
718 // which uses a futex to save power.
719
720 // wait for result packet and read first element of result packet
721 FmqResultDatum datum;
722 bool success = mFmqResultChannel->readBlocking(&datum, 1);
723
724 // retrieve remaining elements
725 // NOTE: all of the data is already available at this point, so there's no
726 // need to do a blocking wait to wait for more data. This is known because
727 // in FMQ, all writes are published (made available) atomically. Currently,
728 // the producer always publishes the entire packet in one function call, so
729 // if the first element of the packet is available, the remaining elements
730 // are also available.
731 const size_t count = mFmqResultChannel->availableToRead();
732 std::vector<FmqResultDatum> packet(count + 1);
733 std::memcpy(&packet.front(), &datum, sizeof(datum));
734 success &= mFmqResultChannel->read(packet.data() + 1, count);
735
736 if (!mValid) {
737 return std::nullopt;
738 }
739
740 // ensure packet was successfully received
741 if (!success) {
742 LOG(ERROR) << "Error receiving packet";
743 return std::nullopt;
744 }
745
746 return std::make_optional(std::move(packet));
747}
748
749} // namespace android::hardware::neuralnetworks::V1_2::utils