AIDL BufferPool implementation (HIDL -> AIDL)

Bug: 254050250
Test: m
Change-Id: I0d7bae2c01bd480d1e99f4b39c4a9013a2828897
diff --git a/media/bufferpool/aidl/default/Accessor.cpp b/media/bufferpool/aidl/default/Accessor.cpp
index e05b12a..3d206ac 100644
--- a/media/bufferpool/aidl/default/Accessor.cpp
+++ b/media/bufferpool/aidl/default/Accessor.cpp
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2018 The Android Open Source Project
+ * Copyright (C) 2022 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,22 +13,60 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#define LOG_TAG "BufferPoolConnection"
+#define LOG_TAG "AidlBufferPoolAcc"
+//#define LOG_NDEBUG 0
+
+#include <sys/types.h>
+#include <stdint.h>
+#include <time.h>
+#include <unistd.h>
+#include <utils/Log.h>
+#include <thread>
 
 #include "Accessor.h"
-#include "AccessorImpl.h"
 #include "Connection.h"
+#include "DataHelper.h"
 
-namespace android {
-namespace hardware {
-namespace media {
-namespace bufferpool {
-namespace V2_0 {
-namespace implementation {
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+namespace {
+    static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
+    static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
+}
+
+#ifdef __ANDROID_VNDK__
+static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
+#else
+static constexpr uint32_t kSeqIdVndkBit = 0;
+#endif
+
+static constexpr uint32_t kSeqIdMax = 0x7fffffff;
+uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;
+
+namespace {
+// anonymous namespace
+static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
+    std::make_shared<ConnectionDeathRecipient>();
+
+void serviceDied(void *cookie) {
+    if (sConnectionDeathRecipient) {
+        sConnectionDeathRecipient->onDead(cookie);
+    }
+}
+}
+
+std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
+    return sConnectionDeathRecipient;
+}
+
+ConnectionDeathRecipient::ConnectionDeathRecipient() {
+    mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
+            AIBinder_DeathRecipient_new(serviceDied));
+}
 
 void ConnectionDeathRecipient::add(
         int64_t connectionId,
-        const sp<Accessor> &accessor) {
+        const std::shared_ptr<Accessor> &accessor) {
     std::lock_guard<std::mutex> lock(mLock);
     if (mAccessors.find(connectionId) == mAccessors.end()) {
         mAccessors.insert(std::make_pair(connectionId, accessor));
@@ -40,7 +78,7 @@
     mAccessors.erase(connectionId);
     auto it = mConnectionToCookie.find(connectionId);
     if (it != mConnectionToCookie.end()) {
-        uint64_t cookie = it->second;
+        void * cookie = it->second;
         mConnectionToCookie.erase(it);
         auto cit = mCookieToConnections.find(cookie);
         if (cit != mCookieToConnections.end()) {
@@ -53,7 +91,7 @@
 }
 
 void ConnectionDeathRecipient::addCookieToConnection(
-        uint64_t cookie,
+        void *cookie,
         int64_t connectionId) {
     std::lock_guard<std::mutex> lock(mLock);
     if (mAccessors.find(connectionId) == mAccessors.end()) {
@@ -69,11 +107,8 @@
     }
 }
 
-void ConnectionDeathRecipient::serviceDied(
-        uint64_t cookie,
-        const wp<::android::hidl::base::V1_0::IBase>& /* who */
-        ) {
-    std::map<int64_t, const wp<Accessor>> connectionsToClose;
+void ConnectionDeathRecipient::onDead(void *cookie) {
+    std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
     {
         std::lock_guard<std::mutex> lock(mLock);
 
@@ -92,9 +127,9 @@
     }
 
     if (connectionsToClose.size() > 0) {
-        sp<Accessor> accessor;
+        std::shared_ptr<Accessor> accessor;
         for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
-            accessor = it->second.promote();
+            accessor = it->second.lock();
 
             if (accessor) {
                 accessor->close(it->first);
@@ -104,127 +139,371 @@
     }
 }
 
-namespace {
-static sp<ConnectionDeathRecipient> sConnectionDeathRecipient =
-        new ConnectionDeathRecipient();
+AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
+    return mDeathRecipient.get();
 }
 
-sp<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
-    return sConnectionDeathRecipient;
-}
-
-void Accessor::createInvalidator() {
-    Accessor::Impl::createInvalidator();
-}
-
-void Accessor::createEvictor() {
-    Accessor::Impl::createEvictor();
-}
-
-// Methods from ::android::hardware::media::bufferpool::V2_0::IAccessor follow.
-Return<void> Accessor::connect(
-        const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer,
-        connect_cb _hidl_cb) {
-    sp<Connection> connection;
+::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
+    std::shared_ptr<Connection> connection;
     ConnectionId connectionId;
     uint32_t msgId;
-    const StatusDescriptor* fmqDesc;
-    const InvalidationDescriptor* invDesc;
-
-    ResultStatus status = connect(
-            observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc);
+    StatusDescriptor statusDesc;
+    InvalidationDescriptor invDesc;
+    BufferPoolStatus status = connect(
+            in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
     if (status == ResultStatus::OK) {
-        _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc);
-    } else {
-        _hidl_cb(status, nullptr, -1LL, 0,
-                 android::hardware::MQDescriptorSync<BufferStatusMessage>(
-                         std::vector<android::hardware::GrantorDescriptor>(),
-                         nullptr /* nhandle */, 0 /* size */),
-                 android::hardware::MQDescriptorUnsync<BufferInvalidationMessage>(
-                         std::vector<android::hardware::GrantorDescriptor>(),
-                         nullptr /* nhandle */, 0 /* size */));
+        _aidl_return->connection = connection;
+        _aidl_return->connectionId = connectionId;
+        _aidl_return->msgId = msgId;
+        _aidl_return->toFmqDesc = std::move(statusDesc);
+        _aidl_return->fromFmqDesc = std::move(invDesc);
+        return ::ndk::ScopedAStatus::ok();
     }
-    return Void();
+    return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
 }
 
 Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
-    : mImpl(new Impl(allocator)) {}
+    : mAllocator(allocator), mScheduleEvictTs(0) {}
 
 Accessor::~Accessor() {
 }
 
 bool Accessor::isValid() {
-    return (bool)mImpl && mImpl->isValid();
+    return mBufferPool.isValid();
 }
 
-ResultStatus Accessor::flush() {
-    if (mImpl) {
-        mImpl->flush();
-        return ResultStatus::OK;
-    }
-    return ResultStatus::CRITICAL_ERROR;
+BufferPoolStatus Accessor::flush() {
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    mBufferPool.flush(ref<Accessor>());
+    return ResultStatus::OK;
 }
 
-ResultStatus Accessor::allocate(
+BufferPoolStatus Accessor::allocate(
         ConnectionId connectionId,
         const std::vector<uint8_t> &params,
         BufferId *bufferId, const native_handle_t** handle) {
-    if (mImpl) {
-        return mImpl->allocate(connectionId, params, bufferId, handle);
+    std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    BufferPoolStatus status = ResultStatus::OK;
+    if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
+        lock.unlock();
+        std::shared_ptr<BufferPoolAllocation> alloc;
+        size_t allocSize;
+        status = mAllocator->allocate(params, &alloc, &allocSize);
+        lock.lock();
+        if (status == ResultStatus::OK) {
+            status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
+        }
+        ALOGV("create a buffer %d : %u %p",
+              status == ResultStatus::OK, *bufferId, *handle);
     }
-    return ResultStatus::CRITICAL_ERROR;
+    if (status == ResultStatus::OK) {
+        // TODO: handle ownBuffer failure
+        mBufferPool.handleOwnBuffer(connectionId, *bufferId);
+    }
+    mBufferPool.cleanUp();
+    scheduleEvictIfNeeded();
+    return status;
 }
 
-ResultStatus Accessor::fetch(
+BufferPoolStatus Accessor::fetch(
         ConnectionId connectionId, TransactionId transactionId,
         BufferId bufferId, const native_handle_t** handle) {
-    if (mImpl) {
-        return mImpl->fetch(connectionId, transactionId, bufferId, handle);
-    }
-    return ResultStatus::CRITICAL_ERROR;
-}
-
-ResultStatus Accessor::connect(
-        const sp<IObserver> &observer, bool local,
-        sp<Connection> *connection, ConnectionId *pConnectionId,
-        uint32_t *pMsgId,
-        const StatusDescriptor** statusDescPtr,
-        const InvalidationDescriptor** invDescPtr) {
-    if (mImpl) {
-        ResultStatus status = mImpl->connect(
-                this, observer, connection, pConnectionId, pMsgId,
-                statusDescPtr, invDescPtr);
-        if (!local && status == ResultStatus::OK) {
-            sp<Accessor> accessor(this);
-            sConnectionDeathRecipient->add(*pConnectionId, accessor);
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    auto found = mBufferPool.mTransactions.find(transactionId);
+    if (found != mBufferPool.mTransactions.end() &&
+            contains(&mBufferPool.mPendingTransactions,
+                     connectionId, transactionId)) {
+        if (found->second->mSenderValidated &&
+                found->second->mStatus == BufferStatus::TRANSFER_FROM &&
+                found->second->mBufferId == bufferId) {
+            found->second->mStatus = BufferStatus::TRANSFER_FETCH;
+            auto bufferIt = mBufferPool.mBuffers.find(bufferId);
+            if (bufferIt != mBufferPool.mBuffers.end()) {
+                mBufferPool.mStats.onBufferFetched();
+                *handle = bufferIt->second->handle();
+                return ResultStatus::OK;
+            }
         }
-        return status;
     }
+    mBufferPool.cleanUp();
+    scheduleEvictIfNeeded();
     return ResultStatus::CRITICAL_ERROR;
 }
 
-ResultStatus Accessor::close(ConnectionId connectionId) {
-    if (mImpl) {
-        ResultStatus status = mImpl->close(connectionId);
-        sConnectionDeathRecipient->remove(connectionId);
-        return status;
+BufferPoolStatus Accessor::connect(
+        const std::shared_ptr<IObserver> &observer, bool local,
+        std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
+        uint32_t *pMsgId,
+        StatusDescriptor* statusDescPtr,
+        InvalidationDescriptor* invDescPtr) {
+    std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
+    BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
+    {
+        std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+        if (newConnection) {
+            int32_t pid = getpid();
+            ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
+            status = mBufferPool.mObserver.open(id, statusDescPtr);
+            if (status == ResultStatus::OK) {
+                newConnection->initialize(ref<Accessor>(), id);
+                *connection = newConnection;
+                *pConnectionId = id;
+                *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+                mBufferPool.mConnectionIds.insert(id);
+                mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
+                mBufferPool.mInvalidation.onConnect(id, observer);
+                if (sSeqId == kSeqIdMax) {
+                   sSeqId = 0;
+                } else {
+                    ++sSeqId;
+                }
+            }
+
+        }
+        mBufferPool.processStatusMessages();
+        mBufferPool.cleanUp();
+        scheduleEvictIfNeeded();
     }
-    return ResultStatus::CRITICAL_ERROR;
+    if (!local && status == ResultStatus::OK) {
+        std::shared_ptr<Accessor> accessor(ref<Accessor>());
+        sConnectionDeathRecipient->add(*pConnectionId, accessor);
+    }
+    return status;
+}
+
+BufferPoolStatus Accessor::close(ConnectionId connectionId) {
+    {
+        std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+        ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
+        mBufferPool.processStatusMessages();
+        mBufferPool.handleClose(connectionId);
+        mBufferPool.mObserver.close(connectionId);
+        mBufferPool.mInvalidation.onClose(connectionId);
+        // Since close# will be called after all works are finished, it is OK to
+        // evict unused buffers.
+        mBufferPool.cleanUp(true);
+        scheduleEvictIfNeeded();
+    }
+    sConnectionDeathRecipient->remove(connectionId);
+    return ResultStatus::OK;
 }
 
 void Accessor::cleanUp(bool clearCache) {
-    if (mImpl) {
-        mImpl->cleanUp(clearCache);
+    // transaction timeout, buffer caching TTL handling
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    mBufferPool.cleanUp(clearCache);
+}
+
+void Accessor::handleInvalidateAck() {
+    std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
+    uint32_t invalidationId;
+    {
+        std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+        mBufferPool.processStatusMessages();
+        mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
+    }
+    // Do not hold lock for send invalidations
+    size_t deadClients = 0;
+    for (auto it = observers.begin(); it != observers.end(); ++it) {
+        const std::shared_ptr<IObserver> observer = it->second;
+        if (observer) {
+            ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
+            if (!status.isOk()) {
+                ++deadClients;
+            }
+        }
+    }
+    if (deadClients > 0) {
+        ALOGD("During invalidation found %zu dead clients", deadClients);
     }
 }
 
-//IAccessor* HIDL_FETCH_IAccessor(const char* /* name */) {
-//    return new Accessor();
-//}
+void Accessor::invalidatorThread(
+            std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
+            std::mutex &mutex,
+            std::condition_variable &cv,
+            bool &ready) {
+    constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
+    constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
+    constexpr useconds_t MAX_SLEEP_US = 10000;
+    uint32_t numSpin = 0;
+    useconds_t sleepUs = 1;
 
-}  // namespace implementation
-}  // namespace V2_0
-}  // namespace bufferpool
-}  // namespace media
-}  // namespace hardware
-}  // namespace android
+    while(true) {
+        std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            while (!ready) {
+                numSpin = 0;
+                sleepUs = 1;
+                cv.wait(lock);
+            }
+            copied.insert(accessors.begin(), accessors.end());
+        }
+        std::list<ConnectionId> erased;
+        for (auto it = copied.begin(); it != copied.end(); ++it) {
+            const std::shared_ptr<Accessor> acc = it->second.lock();
+            if (!acc) {
+                erased.push_back(it->first);
+            } else {
+                acc->handleInvalidateAck();
+            }
+        }
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            for (auto it = erased.begin(); it != erased.end(); ++it) {
+                accessors.erase(*it);
+            }
+            if (accessors.size() == 0) {
+                ready = false;
+            } else {
+                // N.B. Since there is not a efficient way to wait over FMQ,
+                // polling over the FMQ is the current way to prevent draining
+                // CPU.
+                lock.unlock();
+                ++numSpin;
+                if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
+                    sleepUs < MAX_SLEEP_US) {
+                    sleepUs *= 10;
+                }
+                if (numSpin % NUM_SPIN_TO_LOG == 0) {
+                    ALOGW("invalidator thread spinning");
+                }
+                ::usleep(sleepUs);
+            }
+        }
+    }
+}
+
+Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
+    std::thread invalidator(
+            invalidatorThread,
+            std::ref(mAccessors),
+            std::ref(mMutex),
+            std::ref(mCv),
+            std::ref(mReady));
+    invalidator.detach();
+}
+
+void Accessor::AccessorInvalidator::addAccessor(
+        uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
+    bool notify = false;
+    std::unique_lock<std::mutex> lock(mMutex);
+    if (mAccessors.find(accessorId) == mAccessors.end()) {
+        if (!mReady) {
+            mReady = true;
+            notify = true;
+        }
+        mAccessors.emplace(accessorId, accessor);
+        ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
+    }
+    lock.unlock();
+    if (notify) {
+        mCv.notify_one();
+    }
+}
+
+void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
+    std::lock_guard<std::mutex> lock(mMutex);
+    mAccessors.erase(accessorId);
+    ALOGV("buffer invalidation deleted bp:%u", accessorId);
+    if (mAccessors.size() == 0) {
+        mReady = false;
+    }
+}
+
+std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
+
+void Accessor::createInvalidator() {
+    if (!sInvalidator) {
+        sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
+    }
+}
+
+void Accessor::evictorThread(
+        std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
+        std::mutex &mutex,
+        std::condition_variable &cv) {
+    std::list<const std::weak_ptr<Accessor>> evictList;
+    while (true) {
+        int expired = 0;
+        int evicted = 0;
+        {
+            nsecs_t now = systemTime();
+            std::unique_lock<std::mutex> lock(mutex);
+            while (accessors.size() == 0) {
+                cv.wait(lock);
+            }
+            auto it = accessors.begin();
+            while (it != accessors.end()) {
+                if (now > (it->second + kEvictDurationNs)) {
+                    ++expired;
+                    evictList.push_back(it->first);
+                    it = accessors.erase(it);
+                } else {
+                    ++it;
+                }
+            }
+        }
+        // evict idle accessors;
+        for (auto it = evictList.begin(); it != evictList.end(); ++it) {
+            const std::shared_ptr<Accessor> accessor = it->lock();
+            if (accessor) {
+                accessor->cleanUp(true);
+                ++evicted;
+            }
+        }
+        if (expired > 0) {
+            ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
+        }
+        evictList.clear();
+        ::usleep(kEvictGranularityNs / 1000);
+    }
+}
+
+Accessor::AccessorEvictor::AccessorEvictor() {
+    std::thread evictor(
+            evictorThread,
+            std::ref(mAccessors),
+            std::ref(mMutex),
+            std::ref(mCv));
+    evictor.detach();
+}
+
+void Accessor::AccessorEvictor::addAccessor(
+        const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
+    std::lock_guard<std::mutex> lock(mMutex);
+    bool notify = mAccessors.empty();
+    auto it = mAccessors.find(accessor);
+    if (it == mAccessors.end()) {
+        mAccessors.emplace(accessor, ts);
+    } else {
+        it->second = ts;
+    }
+    if (notify) {
+        mCv.notify_one();
+    }
+}
+
+std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
+
+void Accessor::createEvictor() {
+    if (!sEvictor) {
+        sEvictor = std::make_unique<Accessor::AccessorEvictor>();
+    }
+}
+
+void Accessor::scheduleEvictIfNeeded() {
+    nsecs_t now = systemTime();
+
+    if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
+        mScheduleEvictTs = now;
+        sEvictor->addAccessor(ref<Accessor>(), now);
+    }
+}
+
+}  // namespace aidl::android::hardware::media::bufferpool2::implemntation {