AIDL BufferPool implementation (HIDL -> AIDL)
Bug: 254050250
Test: m
Change-Id: I0d7bae2c01bd480d1e99f4b39c4a9013a2828897
diff --git a/media/bufferpool/aidl/default/Accessor.cpp b/media/bufferpool/aidl/default/Accessor.cpp
index e05b12a..3d206ac 100644
--- a/media/bufferpool/aidl/default/Accessor.cpp
+++ b/media/bufferpool/aidl/default/Accessor.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2018 The Android Open Source Project
+ * Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,22 +13,60 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#define LOG_TAG "BufferPoolConnection"
+#define LOG_TAG "AidlBufferPoolAcc"
+//#define LOG_NDEBUG 0
+
+#include <sys/types.h>
+#include <stdint.h>
+#include <time.h>
+#include <unistd.h>
+#include <utils/Log.h>
+#include <thread>
#include "Accessor.h"
-#include "AccessorImpl.h"
#include "Connection.h"
+#include "DataHelper.h"
-namespace android {
-namespace hardware {
-namespace media {
-namespace bufferpool {
-namespace V2_0 {
-namespace implementation {
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+namespace {
+ static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
+ static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
+}
+
+#ifdef __ANDROID_VNDK__
+static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
+#else
+static constexpr uint32_t kSeqIdVndkBit = 0;
+#endif
+
+static constexpr uint32_t kSeqIdMax = 0x7fffffff;
+uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;
+
+namespace {
+// anonymous namespace
+static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
+ std::make_shared<ConnectionDeathRecipient>();
+
+void serviceDied(void *cookie) {
+ if (sConnectionDeathRecipient) {
+ sConnectionDeathRecipient->onDead(cookie);
+ }
+}
+}
+
+std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
+ return sConnectionDeathRecipient;
+}
+
+ConnectionDeathRecipient::ConnectionDeathRecipient() {
+ mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
+ AIBinder_DeathRecipient_new(serviceDied));
+}
void ConnectionDeathRecipient::add(
int64_t connectionId,
- const sp<Accessor> &accessor) {
+ const std::shared_ptr<Accessor> &accessor) {
std::lock_guard<std::mutex> lock(mLock);
if (mAccessors.find(connectionId) == mAccessors.end()) {
mAccessors.insert(std::make_pair(connectionId, accessor));
@@ -40,7 +78,7 @@
mAccessors.erase(connectionId);
auto it = mConnectionToCookie.find(connectionId);
if (it != mConnectionToCookie.end()) {
- uint64_t cookie = it->second;
+ void * cookie = it->second;
mConnectionToCookie.erase(it);
auto cit = mCookieToConnections.find(cookie);
if (cit != mCookieToConnections.end()) {
@@ -53,7 +91,7 @@
}
void ConnectionDeathRecipient::addCookieToConnection(
- uint64_t cookie,
+ void *cookie,
int64_t connectionId) {
std::lock_guard<std::mutex> lock(mLock);
if (mAccessors.find(connectionId) == mAccessors.end()) {
@@ -69,11 +107,8 @@
}
}
-void ConnectionDeathRecipient::serviceDied(
- uint64_t cookie,
- const wp<::android::hidl::base::V1_0::IBase>& /* who */
- ) {
- std::map<int64_t, const wp<Accessor>> connectionsToClose;
+void ConnectionDeathRecipient::onDead(void *cookie) {
+ std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
{
std::lock_guard<std::mutex> lock(mLock);
@@ -92,9 +127,9 @@
}
if (connectionsToClose.size() > 0) {
- sp<Accessor> accessor;
+ std::shared_ptr<Accessor> accessor;
for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
- accessor = it->second.promote();
+ accessor = it->second.lock();
if (accessor) {
accessor->close(it->first);
@@ -104,127 +139,371 @@
}
}
-namespace {
-static sp<ConnectionDeathRecipient> sConnectionDeathRecipient =
- new ConnectionDeathRecipient();
+AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
+ return mDeathRecipient.get();
}
-sp<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
- return sConnectionDeathRecipient;
-}
-
-void Accessor::createInvalidator() {
- Accessor::Impl::createInvalidator();
-}
-
-void Accessor::createEvictor() {
- Accessor::Impl::createEvictor();
-}
-
-// Methods from ::android::hardware::media::bufferpool::V2_0::IAccessor follow.
-Return<void> Accessor::connect(
- const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer,
- connect_cb _hidl_cb) {
- sp<Connection> connection;
+::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
+ std::shared_ptr<Connection> connection;
ConnectionId connectionId;
uint32_t msgId;
- const StatusDescriptor* fmqDesc;
- const InvalidationDescriptor* invDesc;
-
- ResultStatus status = connect(
- observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc);
+ StatusDescriptor statusDesc;
+ InvalidationDescriptor invDesc;
+ BufferPoolStatus status = connect(
+ in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
if (status == ResultStatus::OK) {
- _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc);
- } else {
- _hidl_cb(status, nullptr, -1LL, 0,
- android::hardware::MQDescriptorSync<BufferStatusMessage>(
- std::vector<android::hardware::GrantorDescriptor>(),
- nullptr /* nhandle */, 0 /* size */),
- android::hardware::MQDescriptorUnsync<BufferInvalidationMessage>(
- std::vector<android::hardware::GrantorDescriptor>(),
- nullptr /* nhandle */, 0 /* size */));
+ _aidl_return->connection = connection;
+ _aidl_return->connectionId = connectionId;
+ _aidl_return->msgId = msgId;
+ _aidl_return->toFmqDesc = std::move(statusDesc);
+ _aidl_return->fromFmqDesc = std::move(invDesc);
+ return ::ndk::ScopedAStatus::ok();
}
- return Void();
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
}
Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
- : mImpl(new Impl(allocator)) {}
+ : mAllocator(allocator), mScheduleEvictTs(0) {}
Accessor::~Accessor() {
}
bool Accessor::isValid() {
- return (bool)mImpl && mImpl->isValid();
+ return mBufferPool.isValid();
}
-ResultStatus Accessor::flush() {
- if (mImpl) {
- mImpl->flush();
- return ResultStatus::OK;
- }
- return ResultStatus::CRITICAL_ERROR;
+BufferPoolStatus Accessor::flush() {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.flush(ref<Accessor>());
+ return ResultStatus::OK;
}
-ResultStatus Accessor::allocate(
+BufferPoolStatus Accessor::allocate(
ConnectionId connectionId,
const std::vector<uint8_t> ¶ms,
BufferId *bufferId, const native_handle_t** handle) {
- if (mImpl) {
- return mImpl->allocate(connectionId, params, bufferId, handle);
+ std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ BufferPoolStatus status = ResultStatus::OK;
+ if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
+ lock.unlock();
+ std::shared_ptr<BufferPoolAllocation> alloc;
+ size_t allocSize;
+ status = mAllocator->allocate(params, &alloc, &allocSize);
+ lock.lock();
+ if (status == ResultStatus::OK) {
+ status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
+ }
+ ALOGV("create a buffer %d : %u %p",
+ status == ResultStatus::OK, *bufferId, *handle);
}
- return ResultStatus::CRITICAL_ERROR;
+ if (status == ResultStatus::OK) {
+ // TODO: handle ownBuffer failure
+ mBufferPool.handleOwnBuffer(connectionId, *bufferId);
+ }
+ mBufferPool.cleanUp();
+ scheduleEvictIfNeeded();
+ return status;
}
-ResultStatus Accessor::fetch(
+BufferPoolStatus Accessor::fetch(
ConnectionId connectionId, TransactionId transactionId,
BufferId bufferId, const native_handle_t** handle) {
- if (mImpl) {
- return mImpl->fetch(connectionId, transactionId, bufferId, handle);
- }
- return ResultStatus::CRITICAL_ERROR;
-}
-
-ResultStatus Accessor::connect(
- const sp<IObserver> &observer, bool local,
- sp<Connection> *connection, ConnectionId *pConnectionId,
- uint32_t *pMsgId,
- const StatusDescriptor** statusDescPtr,
- const InvalidationDescriptor** invDescPtr) {
- if (mImpl) {
- ResultStatus status = mImpl->connect(
- this, observer, connection, pConnectionId, pMsgId,
- statusDescPtr, invDescPtr);
- if (!local && status == ResultStatus::OK) {
- sp<Accessor> accessor(this);
- sConnectionDeathRecipient->add(*pConnectionId, accessor);
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ auto found = mBufferPool.mTransactions.find(transactionId);
+ if (found != mBufferPool.mTransactions.end() &&
+ contains(&mBufferPool.mPendingTransactions,
+ connectionId, transactionId)) {
+ if (found->second->mSenderValidated &&
+ found->second->mStatus == BufferStatus::TRANSFER_FROM &&
+ found->second->mBufferId == bufferId) {
+ found->second->mStatus = BufferStatus::TRANSFER_FETCH;
+ auto bufferIt = mBufferPool.mBuffers.find(bufferId);
+ if (bufferIt != mBufferPool.mBuffers.end()) {
+ mBufferPool.mStats.onBufferFetched();
+ *handle = bufferIt->second->handle();
+ return ResultStatus::OK;
+ }
}
- return status;
}
+ mBufferPool.cleanUp();
+ scheduleEvictIfNeeded();
return ResultStatus::CRITICAL_ERROR;
}
-ResultStatus Accessor::close(ConnectionId connectionId) {
- if (mImpl) {
- ResultStatus status = mImpl->close(connectionId);
- sConnectionDeathRecipient->remove(connectionId);
- return status;
+BufferPoolStatus Accessor::connect(
+ const std::shared_ptr<IObserver> &observer, bool local,
+ std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
+ uint32_t *pMsgId,
+ StatusDescriptor* statusDescPtr,
+ InvalidationDescriptor* invDescPtr) {
+ std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
+ BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
+ {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ if (newConnection) {
+ int32_t pid = getpid();
+ ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
+ status = mBufferPool.mObserver.open(id, statusDescPtr);
+ if (status == ResultStatus::OK) {
+ newConnection->initialize(ref<Accessor>(), id);
+ *connection = newConnection;
+ *pConnectionId = id;
+ *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+ mBufferPool.mConnectionIds.insert(id);
+ mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
+ mBufferPool.mInvalidation.onConnect(id, observer);
+ if (sSeqId == kSeqIdMax) {
+ sSeqId = 0;
+ } else {
+ ++sSeqId;
+ }
+ }
+
+ }
+ mBufferPool.processStatusMessages();
+ mBufferPool.cleanUp();
+ scheduleEvictIfNeeded();
}
- return ResultStatus::CRITICAL_ERROR;
+ if (!local && status == ResultStatus::OK) {
+ std::shared_ptr<Accessor> accessor(ref<Accessor>());
+ sConnectionDeathRecipient->add(*pConnectionId, accessor);
+ }
+ return status;
+}
+
+BufferPoolStatus Accessor::close(ConnectionId connectionId) {
+ {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
+ mBufferPool.processStatusMessages();
+ mBufferPool.handleClose(connectionId);
+ mBufferPool.mObserver.close(connectionId);
+ mBufferPool.mInvalidation.onClose(connectionId);
+ // Since close# will be called after all works are finished, it is OK to
+ // evict unused buffers.
+ mBufferPool.cleanUp(true);
+ scheduleEvictIfNeeded();
+ }
+ sConnectionDeathRecipient->remove(connectionId);
+ return ResultStatus::OK;
}
void Accessor::cleanUp(bool clearCache) {
- if (mImpl) {
- mImpl->cleanUp(clearCache);
+ // transaction timeout, buffer caching TTL handling
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.cleanUp(clearCache);
+}
+
+void Accessor::handleInvalidateAck() {
+ std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
+ uint32_t invalidationId;
+ {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
+ }
+ // Do not hold lock for send invalidations
+ size_t deadClients = 0;
+ for (auto it = observers.begin(); it != observers.end(); ++it) {
+ const std::shared_ptr<IObserver> observer = it->second;
+ if (observer) {
+ ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
+ if (!status.isOk()) {
+ ++deadClients;
+ }
+ }
+ }
+ if (deadClients > 0) {
+ ALOGD("During invalidation found %zu dead clients", deadClients);
}
}
-//IAccessor* HIDL_FETCH_IAccessor(const char* /* name */) {
-// return new Accessor();
-//}
+void Accessor::invalidatorThread(
+ std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv,
+ bool &ready) {
+ constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
+ constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
+ constexpr useconds_t MAX_SLEEP_US = 10000;
+ uint32_t numSpin = 0;
+ useconds_t sleepUs = 1;
-} // namespace implementation
-} // namespace V2_0
-} // namespace bufferpool
-} // namespace media
-} // namespace hardware
-} // namespace android
+ while(true) {
+ std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (!ready) {
+ numSpin = 0;
+ sleepUs = 1;
+ cv.wait(lock);
+ }
+ copied.insert(accessors.begin(), accessors.end());
+ }
+ std::list<ConnectionId> erased;
+ for (auto it = copied.begin(); it != copied.end(); ++it) {
+ const std::shared_ptr<Accessor> acc = it->second.lock();
+ if (!acc) {
+ erased.push_back(it->first);
+ } else {
+ acc->handleInvalidateAck();
+ }
+ }
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ for (auto it = erased.begin(); it != erased.end(); ++it) {
+ accessors.erase(*it);
+ }
+ if (accessors.size() == 0) {
+ ready = false;
+ } else {
+ // N.B. Since there is not a efficient way to wait over FMQ,
+ // polling over the FMQ is the current way to prevent draining
+ // CPU.
+ lock.unlock();
+ ++numSpin;
+ if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
+ sleepUs < MAX_SLEEP_US) {
+ sleepUs *= 10;
+ }
+ if (numSpin % NUM_SPIN_TO_LOG == 0) {
+ ALOGW("invalidator thread spinning");
+ }
+ ::usleep(sleepUs);
+ }
+ }
+ }
+}
+
+Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
+ std::thread invalidator(
+ invalidatorThread,
+ std::ref(mAccessors),
+ std::ref(mMutex),
+ std::ref(mCv),
+ std::ref(mReady));
+ invalidator.detach();
+}
+
+void Accessor::AccessorInvalidator::addAccessor(
+ uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
+ bool notify = false;
+ std::unique_lock<std::mutex> lock(mMutex);
+ if (mAccessors.find(accessorId) == mAccessors.end()) {
+ if (!mReady) {
+ mReady = true;
+ notify = true;
+ }
+ mAccessors.emplace(accessorId, accessor);
+ ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
+ }
+ lock.unlock();
+ if (notify) {
+ mCv.notify_one();
+ }
+}
+
+void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ mAccessors.erase(accessorId);
+ ALOGV("buffer invalidation deleted bp:%u", accessorId);
+ if (mAccessors.size() == 0) {
+ mReady = false;
+ }
+}
+
+std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
+
+void Accessor::createInvalidator() {
+ if (!sInvalidator) {
+ sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
+ }
+}
+
+void Accessor::evictorThread(
+ std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv) {
+ std::list<const std::weak_ptr<Accessor>> evictList;
+ while (true) {
+ int expired = 0;
+ int evicted = 0;
+ {
+ nsecs_t now = systemTime();
+ std::unique_lock<std::mutex> lock(mutex);
+ while (accessors.size() == 0) {
+ cv.wait(lock);
+ }
+ auto it = accessors.begin();
+ while (it != accessors.end()) {
+ if (now > (it->second + kEvictDurationNs)) {
+ ++expired;
+ evictList.push_back(it->first);
+ it = accessors.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+ // evict idle accessors;
+ for (auto it = evictList.begin(); it != evictList.end(); ++it) {
+ const std::shared_ptr<Accessor> accessor = it->lock();
+ if (accessor) {
+ accessor->cleanUp(true);
+ ++evicted;
+ }
+ }
+ if (expired > 0) {
+ ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
+ }
+ evictList.clear();
+ ::usleep(kEvictGranularityNs / 1000);
+ }
+}
+
+Accessor::AccessorEvictor::AccessorEvictor() {
+ std::thread evictor(
+ evictorThread,
+ std::ref(mAccessors),
+ std::ref(mMutex),
+ std::ref(mCv));
+ evictor.detach();
+}
+
+void Accessor::AccessorEvictor::addAccessor(
+ const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ bool notify = mAccessors.empty();
+ auto it = mAccessors.find(accessor);
+ if (it == mAccessors.end()) {
+ mAccessors.emplace(accessor, ts);
+ } else {
+ it->second = ts;
+ }
+ if (notify) {
+ mCv.notify_one();
+ }
+}
+
+std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
+
+void Accessor::createEvictor() {
+ if (!sEvictor) {
+ sEvictor = std::make_unique<Accessor::AccessorEvictor>();
+ }
+}
+
+void Accessor::scheduleEvictIfNeeded() {
+ nsecs_t now = systemTime();
+
+ if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
+ mScheduleEvictTs = now;
+ sEvictor->addAccessor(ref<Accessor>(), now);
+ }
+}
+
+} // namespace aidl::android::hardware::media::bufferpool2::implemntation {