blob: 3d206ac78b6f66fe03fe805d1f2d563e8e1840eb [file] [log] [blame]
Sungtak Lee97e1dfb2022-12-07 07:45:45 +00001/*
Sungtak Lee8878a132022-12-07 11:42:03 +00002 * Copyright (C) 2022 The Android Open Source Project
Sungtak Lee97e1dfb2022-12-07 07:45:45 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Sungtak Lee8878a132022-12-07 11:42:03 +000016#define LOG_TAG "AidlBufferPoolAcc"
17//#define LOG_NDEBUG 0
18
19#include <sys/types.h>
20#include <stdint.h>
21#include <time.h>
22#include <unistd.h>
23#include <utils/Log.h>
24#include <thread>
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000025
26#include "Accessor.h"
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000027#include "Connection.h"
Sungtak Lee8878a132022-12-07 11:42:03 +000028#include "DataHelper.h"
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000029
Sungtak Lee8878a132022-12-07 11:42:03 +000030namespace aidl::android::hardware::media::bufferpool2::implementation {
31
32namespace {
33 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
34 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
35}
36
37#ifdef __ANDROID_VNDK__
38static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
39#else
40static constexpr uint32_t kSeqIdVndkBit = 0;
41#endif
42
43static constexpr uint32_t kSeqIdMax = 0x7fffffff;
44uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;
45
46namespace {
47// anonymous namespace
48static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
49 std::make_shared<ConnectionDeathRecipient>();
50
51void serviceDied(void *cookie) {
52 if (sConnectionDeathRecipient) {
53 sConnectionDeathRecipient->onDead(cookie);
54 }
55}
56}
57
58std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
59 return sConnectionDeathRecipient;
60}
61
62ConnectionDeathRecipient::ConnectionDeathRecipient() {
63 mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
64 AIBinder_DeathRecipient_new(serviceDied));
65}
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000066
67void ConnectionDeathRecipient::add(
68 int64_t connectionId,
Sungtak Lee8878a132022-12-07 11:42:03 +000069 const std::shared_ptr<Accessor> &accessor) {
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000070 std::lock_guard<std::mutex> lock(mLock);
71 if (mAccessors.find(connectionId) == mAccessors.end()) {
72 mAccessors.insert(std::make_pair(connectionId, accessor));
73 }
74}
75
76void ConnectionDeathRecipient::remove(int64_t connectionId) {
77 std::lock_guard<std::mutex> lock(mLock);
78 mAccessors.erase(connectionId);
79 auto it = mConnectionToCookie.find(connectionId);
80 if (it != mConnectionToCookie.end()) {
Sungtak Lee8878a132022-12-07 11:42:03 +000081 void * cookie = it->second;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000082 mConnectionToCookie.erase(it);
83 auto cit = mCookieToConnections.find(cookie);
84 if (cit != mCookieToConnections.end()) {
85 cit->second.erase(connectionId);
86 if (cit->second.size() == 0) {
87 mCookieToConnections.erase(cit);
88 }
89 }
90 }
91}
92
93void ConnectionDeathRecipient::addCookieToConnection(
Sungtak Lee8878a132022-12-07 11:42:03 +000094 void *cookie,
Sungtak Lee97e1dfb2022-12-07 07:45:45 +000095 int64_t connectionId) {
96 std::lock_guard<std::mutex> lock(mLock);
97 if (mAccessors.find(connectionId) == mAccessors.end()) {
98 return;
99 }
100 mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
101 auto it = mCookieToConnections.find(cookie);
102 if (it != mCookieToConnections.end()) {
103 it->second.insert(connectionId);
104 } else {
105 mCookieToConnections.insert(std::make_pair(
106 cookie, std::set<int64_t>{connectionId}));
107 }
108}
109
Sungtak Lee8878a132022-12-07 11:42:03 +0000110void ConnectionDeathRecipient::onDead(void *cookie) {
111 std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000112 {
113 std::lock_guard<std::mutex> lock(mLock);
114
115 auto it = mCookieToConnections.find(cookie);
116 if (it != mCookieToConnections.end()) {
117 for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
118 auto accessorIt = mAccessors.find(*conIt);
119 if (accessorIt != mAccessors.end()) {
120 connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
121 mAccessors.erase(accessorIt);
122 }
123 mConnectionToCookie.erase(*conIt);
124 }
125 mCookieToConnections.erase(it);
126 }
127 }
128
129 if (connectionsToClose.size() > 0) {
Sungtak Lee8878a132022-12-07 11:42:03 +0000130 std::shared_ptr<Accessor> accessor;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000131 for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
Sungtak Lee8878a132022-12-07 11:42:03 +0000132 accessor = it->second.lock();
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000133
134 if (accessor) {
135 accessor->close(it->first);
136 ALOGD("connection %lld closed on death", (long long)it->first);
137 }
138 }
139 }
140}
141
Sungtak Lee8878a132022-12-07 11:42:03 +0000142AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
143 return mDeathRecipient.get();
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000144}
145
Sungtak Lee8878a132022-12-07 11:42:03 +0000146::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
147 std::shared_ptr<Connection> connection;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000148 ConnectionId connectionId;
149 uint32_t msgId;
Sungtak Lee8878a132022-12-07 11:42:03 +0000150 StatusDescriptor statusDesc;
151 InvalidationDescriptor invDesc;
152 BufferPoolStatus status = connect(
153 in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000154 if (status == ResultStatus::OK) {
Sungtak Lee8878a132022-12-07 11:42:03 +0000155 _aidl_return->connection = connection;
156 _aidl_return->connectionId = connectionId;
157 _aidl_return->msgId = msgId;
158 _aidl_return->toFmqDesc = std::move(statusDesc);
159 _aidl_return->fromFmqDesc = std::move(invDesc);
160 return ::ndk::ScopedAStatus::ok();
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000161 }
Sungtak Lee8878a132022-12-07 11:42:03 +0000162 return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000163}
164
165Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
Sungtak Lee8878a132022-12-07 11:42:03 +0000166 : mAllocator(allocator), mScheduleEvictTs(0) {}
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000167
168Accessor::~Accessor() {
169}
170
171bool Accessor::isValid() {
Sungtak Lee8878a132022-12-07 11:42:03 +0000172 return mBufferPool.isValid();
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000173}
174
Sungtak Lee8878a132022-12-07 11:42:03 +0000175BufferPoolStatus Accessor::flush() {
176 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
177 mBufferPool.processStatusMessages();
178 mBufferPool.flush(ref<Accessor>());
179 return ResultStatus::OK;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000180}
181
Sungtak Lee8878a132022-12-07 11:42:03 +0000182BufferPoolStatus Accessor::allocate(
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000183 ConnectionId connectionId,
184 const std::vector<uint8_t> &params,
185 BufferId *bufferId, const native_handle_t** handle) {
Sungtak Lee8878a132022-12-07 11:42:03 +0000186 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
187 mBufferPool.processStatusMessages();
188 BufferPoolStatus status = ResultStatus::OK;
189 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
190 lock.unlock();
191 std::shared_ptr<BufferPoolAllocation> alloc;
192 size_t allocSize;
193 status = mAllocator->allocate(params, &alloc, &allocSize);
194 lock.lock();
195 if (status == ResultStatus::OK) {
196 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
197 }
198 ALOGV("create a buffer %d : %u %p",
199 status == ResultStatus::OK, *bufferId, *handle);
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000200 }
Sungtak Lee8878a132022-12-07 11:42:03 +0000201 if (status == ResultStatus::OK) {
202 // TODO: handle ownBuffer failure
203 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
204 }
205 mBufferPool.cleanUp();
206 scheduleEvictIfNeeded();
207 return status;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000208}
209
Sungtak Lee8878a132022-12-07 11:42:03 +0000210BufferPoolStatus Accessor::fetch(
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000211 ConnectionId connectionId, TransactionId transactionId,
212 BufferId bufferId, const native_handle_t** handle) {
Sungtak Lee8878a132022-12-07 11:42:03 +0000213 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
214 mBufferPool.processStatusMessages();
215 auto found = mBufferPool.mTransactions.find(transactionId);
216 if (found != mBufferPool.mTransactions.end() &&
217 contains(&mBufferPool.mPendingTransactions,
218 connectionId, transactionId)) {
219 if (found->second->mSenderValidated &&
220 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
221 found->second->mBufferId == bufferId) {
222 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
223 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
224 if (bufferIt != mBufferPool.mBuffers.end()) {
225 mBufferPool.mStats.onBufferFetched();
226 *handle = bufferIt->second->handle();
227 return ResultStatus::OK;
228 }
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000229 }
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000230 }
Sungtak Lee8878a132022-12-07 11:42:03 +0000231 mBufferPool.cleanUp();
232 scheduleEvictIfNeeded();
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000233 return ResultStatus::CRITICAL_ERROR;
234}
235
Sungtak Lee8878a132022-12-07 11:42:03 +0000236BufferPoolStatus Accessor::connect(
237 const std::shared_ptr<IObserver> &observer, bool local,
238 std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
239 uint32_t *pMsgId,
240 StatusDescriptor* statusDescPtr,
241 InvalidationDescriptor* invDescPtr) {
242 std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
243 BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
244 {
245 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
246 if (newConnection) {
247 int32_t pid = getpid();
248 ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
249 status = mBufferPool.mObserver.open(id, statusDescPtr);
250 if (status == ResultStatus::OK) {
251 newConnection->initialize(ref<Accessor>(), id);
252 *connection = newConnection;
253 *pConnectionId = id;
254 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
255 mBufferPool.mConnectionIds.insert(id);
256 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
257 mBufferPool.mInvalidation.onConnect(id, observer);
258 if (sSeqId == kSeqIdMax) {
259 sSeqId = 0;
260 } else {
261 ++sSeqId;
262 }
263 }
264
265 }
266 mBufferPool.processStatusMessages();
267 mBufferPool.cleanUp();
268 scheduleEvictIfNeeded();
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000269 }
Sungtak Lee8878a132022-12-07 11:42:03 +0000270 if (!local && status == ResultStatus::OK) {
271 std::shared_ptr<Accessor> accessor(ref<Accessor>());
272 sConnectionDeathRecipient->add(*pConnectionId, accessor);
273 }
274 return status;
275}
276
277BufferPoolStatus Accessor::close(ConnectionId connectionId) {
278 {
279 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
280 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
281 mBufferPool.processStatusMessages();
282 mBufferPool.handleClose(connectionId);
283 mBufferPool.mObserver.close(connectionId);
284 mBufferPool.mInvalidation.onClose(connectionId);
285 // Since close# will be called after all works are finished, it is OK to
286 // evict unused buffers.
287 mBufferPool.cleanUp(true);
288 scheduleEvictIfNeeded();
289 }
290 sConnectionDeathRecipient->remove(connectionId);
291 return ResultStatus::OK;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000292}
293
294void Accessor::cleanUp(bool clearCache) {
Sungtak Lee8878a132022-12-07 11:42:03 +0000295 // transaction timeout, buffer caching TTL handling
296 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
297 mBufferPool.processStatusMessages();
298 mBufferPool.cleanUp(clearCache);
299}
300
301void Accessor::handleInvalidateAck() {
302 std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
303 uint32_t invalidationId;
304 {
305 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
306 mBufferPool.processStatusMessages();
307 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
308 }
309 // Do not hold lock for send invalidations
310 size_t deadClients = 0;
311 for (auto it = observers.begin(); it != observers.end(); ++it) {
312 const std::shared_ptr<IObserver> observer = it->second;
313 if (observer) {
314 ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
315 if (!status.isOk()) {
316 ++deadClients;
317 }
318 }
319 }
320 if (deadClients > 0) {
321 ALOGD("During invalidation found %zu dead clients", deadClients);
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000322 }
323}
324
Sungtak Lee8878a132022-12-07 11:42:03 +0000325void Accessor::invalidatorThread(
326 std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
327 std::mutex &mutex,
328 std::condition_variable &cv,
329 bool &ready) {
330 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
331 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
332 constexpr useconds_t MAX_SLEEP_US = 10000;
333 uint32_t numSpin = 0;
334 useconds_t sleepUs = 1;
Sungtak Lee97e1dfb2022-12-07 07:45:45 +0000335
Sungtak Lee8878a132022-12-07 11:42:03 +0000336 while(true) {
337 std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
338 {
339 std::unique_lock<std::mutex> lock(mutex);
340 while (!ready) {
341 numSpin = 0;
342 sleepUs = 1;
343 cv.wait(lock);
344 }
345 copied.insert(accessors.begin(), accessors.end());
346 }
347 std::list<ConnectionId> erased;
348 for (auto it = copied.begin(); it != copied.end(); ++it) {
349 const std::shared_ptr<Accessor> acc = it->second.lock();
350 if (!acc) {
351 erased.push_back(it->first);
352 } else {
353 acc->handleInvalidateAck();
354 }
355 }
356 {
357 std::unique_lock<std::mutex> lock(mutex);
358 for (auto it = erased.begin(); it != erased.end(); ++it) {
359 accessors.erase(*it);
360 }
361 if (accessors.size() == 0) {
362 ready = false;
363 } else {
364 // N.B. Since there is not a efficient way to wait over FMQ,
365 // polling over the FMQ is the current way to prevent draining
366 // CPU.
367 lock.unlock();
368 ++numSpin;
369 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
370 sleepUs < MAX_SLEEP_US) {
371 sleepUs *= 10;
372 }
373 if (numSpin % NUM_SPIN_TO_LOG == 0) {
374 ALOGW("invalidator thread spinning");
375 }
376 ::usleep(sleepUs);
377 }
378 }
379 }
380}
381
382Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
383 std::thread invalidator(
384 invalidatorThread,
385 std::ref(mAccessors),
386 std::ref(mMutex),
387 std::ref(mCv),
388 std::ref(mReady));
389 invalidator.detach();
390}
391
392void Accessor::AccessorInvalidator::addAccessor(
393 uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
394 bool notify = false;
395 std::unique_lock<std::mutex> lock(mMutex);
396 if (mAccessors.find(accessorId) == mAccessors.end()) {
397 if (!mReady) {
398 mReady = true;
399 notify = true;
400 }
401 mAccessors.emplace(accessorId, accessor);
402 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
403 }
404 lock.unlock();
405 if (notify) {
406 mCv.notify_one();
407 }
408}
409
410void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
411 std::lock_guard<std::mutex> lock(mMutex);
412 mAccessors.erase(accessorId);
413 ALOGV("buffer invalidation deleted bp:%u", accessorId);
414 if (mAccessors.size() == 0) {
415 mReady = false;
416 }
417}
418
419std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
420
421void Accessor::createInvalidator() {
422 if (!sInvalidator) {
423 sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
424 }
425}
426
427void Accessor::evictorThread(
428 std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
429 std::mutex &mutex,
430 std::condition_variable &cv) {
431 std::list<const std::weak_ptr<Accessor>> evictList;
432 while (true) {
433 int expired = 0;
434 int evicted = 0;
435 {
436 nsecs_t now = systemTime();
437 std::unique_lock<std::mutex> lock(mutex);
438 while (accessors.size() == 0) {
439 cv.wait(lock);
440 }
441 auto it = accessors.begin();
442 while (it != accessors.end()) {
443 if (now > (it->second + kEvictDurationNs)) {
444 ++expired;
445 evictList.push_back(it->first);
446 it = accessors.erase(it);
447 } else {
448 ++it;
449 }
450 }
451 }
452 // evict idle accessors;
453 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
454 const std::shared_ptr<Accessor> accessor = it->lock();
455 if (accessor) {
456 accessor->cleanUp(true);
457 ++evicted;
458 }
459 }
460 if (expired > 0) {
461 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
462 }
463 evictList.clear();
464 ::usleep(kEvictGranularityNs / 1000);
465 }
466}
467
468Accessor::AccessorEvictor::AccessorEvictor() {
469 std::thread evictor(
470 evictorThread,
471 std::ref(mAccessors),
472 std::ref(mMutex),
473 std::ref(mCv));
474 evictor.detach();
475}
476
477void Accessor::AccessorEvictor::addAccessor(
478 const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
479 std::lock_guard<std::mutex> lock(mMutex);
480 bool notify = mAccessors.empty();
481 auto it = mAccessors.find(accessor);
482 if (it == mAccessors.end()) {
483 mAccessors.emplace(accessor, ts);
484 } else {
485 it->second = ts;
486 }
487 if (notify) {
488 mCv.notify_one();
489 }
490}
491
492std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
493
494void Accessor::createEvictor() {
495 if (!sEvictor) {
496 sEvictor = std::make_unique<Accessor::AccessorEvictor>();
497 }
498}
499
500void Accessor::scheduleEvictIfNeeded() {
501 nsecs_t now = systemTime();
502
503 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
504 mScheduleEvictTs = now;
505 sEvictor->addAccessor(ref<Accessor>(), now);
506 }
507}
508
509} // namespace aidl::android::hardware::media::bufferpool2::implemntation {