blob: 022548dcd4389cf0fc6cdafeb53bdab23deee5c3 [file] [log] [blame]
Michael Butlerf6b2d1a2020-12-19 14:44:35 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "ExecutionBurstServer"
18
19#include "ExecutionBurstServer.h"
20
21#include <android-base/logging.h>
22
23#include <algorithm>
24#include <cstring>
25#include <limits>
26#include <map>
27#include <memory>
28#include <tuple>
29#include <utility>
30#include <vector>
31
Michael Butler8fc48962021-01-08 17:21:27 -080032#include "ExecutionBurstUtils.h"
Michael Butlerf6b2d1a2020-12-19 14:44:35 -080033#include "HalInterfaces.h"
34#include "Tracing.h"
35
36namespace android::nn {
37namespace {
38
Michael Butlerf6b2d1a2020-12-19 14:44:35 -080039// DefaultBurstExecutorWithCache adapts an IPreparedModel so that it can be
40// used as an IBurstExecutorWithCache. Specifically, the cache simply stores the
41// hidl_memory object, and the execution forwards calls to the provided
42// IPreparedModel's "executeSynchronously" method. With this class, hidl_memory
43// must be mapped and unmapped for each execution.
44class DefaultBurstExecutorWithCache : public ExecutionBurstServer::IBurstExecutorWithCache {
45 public:
46 DefaultBurstExecutorWithCache(V1_2::IPreparedModel* preparedModel)
47 : mpPreparedModel(preparedModel) {}
48
49 bool isCacheEntryPresent(int32_t slot) const override {
50 const auto it = mMemoryCache.find(slot);
51 return (it != mMemoryCache.end()) && it->second.valid();
52 }
53
54 void addCacheEntry(const hardware::hidl_memory& memory, int32_t slot) override {
55 mMemoryCache[slot] = memory;
56 }
57
58 void removeCacheEntry(int32_t slot) override { mMemoryCache.erase(slot); }
59
60 std::tuple<V1_0::ErrorStatus, hardware::hidl_vec<V1_2::OutputShape>, V1_2::Timing> execute(
61 const V1_0::Request& request, const std::vector<int32_t>& slots,
62 V1_2::MeasureTiming measure) override {
63 // convert slots to pools
64 hardware::hidl_vec<hardware::hidl_memory> pools(slots.size());
65 std::transform(slots.begin(), slots.end(), pools.begin(),
66 [this](int32_t slot) { return mMemoryCache[slot]; });
67
68 // create full request
69 V1_0::Request fullRequest = request;
70 fullRequest.pools = std::move(pools);
71
72 // setup execution
73 V1_0::ErrorStatus returnedStatus = V1_0::ErrorStatus::GENERAL_FAILURE;
74 hardware::hidl_vec<V1_2::OutputShape> returnedOutputShapes;
75 V1_2::Timing returnedTiming;
76 auto cb = [&returnedStatus, &returnedOutputShapes, &returnedTiming](
77 V1_0::ErrorStatus status,
78 const hardware::hidl_vec<V1_2::OutputShape>& outputShapes,
79 const V1_2::Timing& timing) {
80 returnedStatus = status;
81 returnedOutputShapes = outputShapes;
82 returnedTiming = timing;
83 };
84
85 // execute
86 const hardware::Return<void> ret =
87 mpPreparedModel->executeSynchronously(fullRequest, measure, cb);
88 if (!ret.isOk() || returnedStatus != V1_0::ErrorStatus::NONE) {
89 LOG(ERROR) << "IPreparedModelAdapter::execute -- Error executing";
90 return {returnedStatus, std::move(returnedOutputShapes), kNoTiming};
91 }
92
93 return std::make_tuple(returnedStatus, std::move(returnedOutputShapes), returnedTiming);
94 }
95
96 private:
97 V1_2::IPreparedModel* const mpPreparedModel;
98 std::map<int32_t, hardware::hidl_memory> mMemoryCache;
99};
100
101} // anonymous namespace
102
Michael Butlerf6b2d1a2020-12-19 14:44:35 -0800103// ExecutionBurstServer methods
104
105sp<ExecutionBurstServer> ExecutionBurstServer::create(
106 const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel,
107 const MQDescriptorSync<FmqResultDatum>& resultChannel,
108 std::shared_ptr<IBurstExecutorWithCache> executorWithCache,
109 std::chrono::microseconds pollingTimeWindow) {
110 // check inputs
111 if (callback == nullptr || executorWithCache == nullptr) {
112 LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr";
113 return nullptr;
114 }
115
116 // create FMQ objects
117 std::unique_ptr<RequestChannelReceiver> requestChannelReceiver =
118 RequestChannelReceiver::create(requestChannel, pollingTimeWindow);
119 std::unique_ptr<ResultChannelSender> resultChannelSender =
120 ResultChannelSender::create(resultChannel);
121
122 // check FMQ objects
123 if (!requestChannelReceiver || !resultChannelSender) {
124 LOG(ERROR) << "ExecutionBurstServer::create failed to create FastMessageQueue";
125 return nullptr;
126 }
127
128 // make and return context
129 return new ExecutionBurstServer(callback, std::move(requestChannelReceiver),
130 std::move(resultChannelSender), std::move(executorWithCache));
131}
132
133sp<ExecutionBurstServer> ExecutionBurstServer::create(
134 const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel,
135 const MQDescriptorSync<FmqResultDatum>& resultChannel, V1_2::IPreparedModel* preparedModel,
136 std::chrono::microseconds pollingTimeWindow) {
137 // check relevant input
138 if (preparedModel == nullptr) {
139 LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr";
140 return nullptr;
141 }
142
143 // adapt IPreparedModel to have caching
144 const std::shared_ptr<DefaultBurstExecutorWithCache> preparedModelAdapter =
145 std::make_shared<DefaultBurstExecutorWithCache>(preparedModel);
146
147 // make and return context
148 return ExecutionBurstServer::create(callback, requestChannel, resultChannel,
149 preparedModelAdapter, pollingTimeWindow);
150}
151
152ExecutionBurstServer::ExecutionBurstServer(
153 const sp<IBurstCallback>& callback, std::unique_ptr<RequestChannelReceiver> requestChannel,
154 std::unique_ptr<ResultChannelSender> resultChannel,
155 std::shared_ptr<IBurstExecutorWithCache> executorWithCache)
156 : mCallback(callback),
157 mRequestChannelReceiver(std::move(requestChannel)),
158 mResultChannelSender(std::move(resultChannel)),
159 mExecutorWithCache(std::move(executorWithCache)) {
160 // TODO: highly document the threading behavior of this class
161 mWorker = std::thread([this] { task(); });
162}
163
164ExecutionBurstServer::~ExecutionBurstServer() {
165 // set teardown flag
166 mTeardown = true;
167 mRequestChannelReceiver->invalidate();
168
169 // wait for task thread to end
170 mWorker.join();
171}
172
173hardware::Return<void> ExecutionBurstServer::freeMemory(int32_t slot) {
174 std::lock_guard<std::mutex> hold(mMutex);
175 mExecutorWithCache->removeCacheEntry(slot);
176 return hardware::Void();
177}
178
179void ExecutionBurstServer::ensureCacheEntriesArePresentLocked(const std::vector<int32_t>& slots) {
180 const auto slotIsKnown = [this](int32_t slot) {
181 return mExecutorWithCache->isCacheEntryPresent(slot);
182 };
183
184 // find unique unknown slots
185 std::vector<int32_t> unknownSlots = slots;
186 auto unknownSlotsEnd = unknownSlots.end();
187 std::sort(unknownSlots.begin(), unknownSlotsEnd);
188 unknownSlotsEnd = std::unique(unknownSlots.begin(), unknownSlotsEnd);
189 unknownSlotsEnd = std::remove_if(unknownSlots.begin(), unknownSlotsEnd, slotIsKnown);
190 unknownSlots.erase(unknownSlotsEnd, unknownSlots.end());
191
192 // quick-exit if all slots are known
193 if (unknownSlots.empty()) {
194 return;
195 }
196
197 V1_0::ErrorStatus errorStatus = V1_0::ErrorStatus::GENERAL_FAILURE;
198 std::vector<hardware::hidl_memory> returnedMemories;
199 auto cb = [&errorStatus, &returnedMemories](
200 V1_0::ErrorStatus status,
201 const hardware::hidl_vec<hardware::hidl_memory>& memories) {
202 errorStatus = status;
203 returnedMemories = memories;
204 };
205
206 const hardware::Return<void> ret = mCallback->getMemories(unknownSlots, cb);
207
208 if (!ret.isOk() || errorStatus != V1_0::ErrorStatus::NONE ||
209 returnedMemories.size() != unknownSlots.size()) {
210 LOG(ERROR) << "Error retrieving memories";
211 return;
212 }
213
214 // add memories to unknown slots
215 for (size_t i = 0; i < unknownSlots.size(); ++i) {
216 mExecutorWithCache->addCacheEntry(returnedMemories[i], unknownSlots[i]);
217 }
218}
219
220void ExecutionBurstServer::task() {
221 // loop until the burst object is being destroyed
222 while (!mTeardown) {
223 // receive request
224 auto arguments = mRequestChannelReceiver->getBlocking();
225
226 // if the request packet was not properly received, return a generic
227 // error and skip the execution
228 //
229 // if the burst is being torn down, skip the execution so the "task"
230 // function can end
231 if (!arguments) {
232 if (!mTeardown) {
233 mResultChannelSender->send(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
234 }
235 continue;
236 }
237
238 // otherwise begin tracing execution
239 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
240 "ExecutionBurstServer getting memory, executing, and returning results");
241
242 // unpack the arguments; types are Request, std::vector<int32_t>, and
243 // MeasureTiming, respectively
244 const auto [requestWithoutPools, slotsOfPools, measure] = std::move(*arguments);
245
246 // ensure executor with cache has required memory
247 std::lock_guard<std::mutex> hold(mMutex);
248 ensureCacheEntriesArePresentLocked(slotsOfPools);
249
250 // perform computation; types are ErrorStatus, hidl_vec<OutputShape>,
251 // and Timing, respectively
252 const auto [errorStatus, outputShapes, returnedTiming] =
253 mExecutorWithCache->execute(requestWithoutPools, slotsOfPools, measure);
254
255 // return result
256 mResultChannelSender->send(errorStatus, outputShapes, returnedTiming);
257 }
258}
259
260} // namespace android::nn