AIDL BufferPool implementation (HIDL -> AIDL)
Bug: 254050250
Test: m
Change-Id: I0d7bae2c01bd480d1e99f4b39c4a9013a2828897
diff --git a/media/bufferpool/aidl/default/BufferPoolClient.cpp b/media/bufferpool/aidl/default/BufferPoolClient.cpp
index cda23ff..e9777d8 100644
--- a/media/bufferpool/aidl/default/BufferPoolClient.cpp
+++ b/media/bufferpool/aidl/default/BufferPoolClient.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2018 The Android Open Source Project
+ * Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,33 +14,37 @@
* limitations under the License.
*/
-#define LOG_TAG "BufferPoolClient"
+#define LOG_TAG "AidlBufferPoolCli"
//#define LOG_NDEBUG 0
#include <thread>
+#include <aidlcommonsupport/NativeHandle.h>
#include <utils/Log.h>
#include "BufferPoolClient.h"
+#include "Accessor.h"
#include "Connection.h"
-namespace android {
-namespace hardware {
-namespace media {
-namespace bufferpool {
-namespace V2_0 {
-namespace implementation {
+namespace aidl::android::hardware::media::bufferpool2::implementation {
-static constexpr int64_t kReceiveTimeoutUs = 2000000; // 2s
+using aidl::android::hardware::media::bufferpool2::IConnection;
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+using FetchInfo = aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo;
+using FetchResult = aidl::android::hardware::media::bufferpool2::IConnection::FetchResult;
+
+static constexpr int64_t kReceiveTimeoutMs = 2000; // 2s
static constexpr int kPostMaxRetry = 3;
-static constexpr int kCacheTtlUs = 1000000; // TODO: tune
+static constexpr int kCacheTtlMs = 1000;
static constexpr size_t kMaxCachedBufferCount = 64;
static constexpr size_t kCachedBufferCountTarget = kMaxCachedBufferCount - 16;
class BufferPoolClient::Impl
: public std::enable_shared_from_this<BufferPoolClient::Impl> {
public:
- explicit Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer);
+ explicit Impl(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer);
- explicit Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer);
+ explicit Impl(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer);
bool isValid() {
return mValid;
@@ -54,35 +58,35 @@
return mConnectionId;
}
- sp<IAccessor> &getAccessor() {
+ std::shared_ptr<IAccessor> &getAccessor() {
return mAccessor;
}
- bool isActive(int64_t *lastTransactionUs, bool clearCache);
+ bool isActive(int64_t *lastTransactionMs, bool clearCache);
void receiveInvalidation(uint32_t msgID);
- ResultStatus flush();
+ BufferPoolStatus flush();
- ResultStatus allocate(const std::vector<uint8_t> ¶ms,
+ BufferPoolStatus allocate(const std::vector<uint8_t> ¶ms,
native_handle_t **handle,
std::shared_ptr<BufferPoolData> *buffer);
- ResultStatus receive(
+ BufferPoolStatus receive(
TransactionId transactionId, BufferId bufferId,
- int64_t timestampUs,
+ int64_t timestampMs,
native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
void postBufferRelease(BufferId bufferId);
bool postSend(
BufferId bufferId, ConnectionId receiver,
- TransactionId *transactionId, int64_t *timestampUs);
+ TransactionId *transactionId, int64_t *timestampMs);
private:
bool postReceive(
BufferId bufferId, TransactionId transactionId,
- int64_t timestampUs);
+ int64_t timestampMs);
bool postReceiveResult(
BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
@@ -97,11 +101,11 @@
void invalidateRange(BufferId from, BufferId to);
- ResultStatus allocateBufferHandle(
+ BufferPoolStatus allocateBufferHandle(
const std::vector<uint8_t>& params, BufferId *bufferId,
native_handle_t **handle);
- ResultStatus fetchBufferHandle(
+ BufferPoolStatus fetchBufferHandle(
TransactionId transactionId, BufferId bufferId,
native_handle_t **handle);
@@ -110,12 +114,12 @@
bool mLocal;
bool mValid;
- sp<IAccessor> mAccessor;
- sp<Connection> mLocalConnection;
- sp<IConnection> mRemoteConnection;
+ std::shared_ptr<IAccessor> mAccessor;
+ std::shared_ptr<Connection> mLocalConnection;
+ std::shared_ptr<IConnection> mRemoteConnection;
uint32_t mSeqId;
ConnectionId mConnectionId;
- int64_t mLastEvictCacheUs;
+ int64_t mLastEvictCacheMs;
std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
// CachedBuffers
@@ -125,18 +129,19 @@
std::condition_variable mCreateCv;
std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
int mActive;
- int64_t mLastChangeUs;
+ int64_t mLastChangeMs;
- BufferCache() : mCreating(false), mActive(0), mLastChangeUs(getTimestampNow()) {}
+ BufferCache() : mCreating(false), mActive(0),
+ mLastChangeMs(::android::elapsedRealtime()) {}
void incActive_l() {
++mActive;
- mLastChangeUs = getTimestampNow();
+ mLastChangeMs = ::android::elapsedRealtime();
}
void decActive_l() {
--mActive;
- mLastChangeUs = getTimestampNow();
+ mLastChangeMs = ::android::elapsedRealtime();
}
int cachedBufferCount() const {
@@ -147,7 +152,6 @@
// FMQ - release notifier
struct ReleaseCache {
std::mutex mLock;
- // TODO: use only one list?(using one list may dealy sending messages?)
std::list<BufferId> mReleasingIds;
std::list<BufferId> mReleasedIds;
uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
@@ -158,7 +162,7 @@
} mReleasing;
// This lock is held during synchronization from remote side.
- // In order to minimize remote calls and locking durtaion, this lock is held
+ // In order to minimize remote calls and locking duration, this lock is held
// by best effort approach using try_lock().
std::mutex mRemoteSyncLock;
};
@@ -181,7 +185,7 @@
struct BufferPoolClient::Impl::ClientBuffer {
private:
- int64_t mExpireUs;
+ int64_t mExpireMs;
bool mHasCache;
ConnectionId mConnectionId;
BufferId mId;
@@ -189,7 +193,7 @@
std::weak_ptr<BufferPoolData> mCache;
void updateExpire() {
- mExpireUs = getTimestampNow() + kCacheTtlUs;
+ mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
}
public:
@@ -197,7 +201,7 @@
ConnectionId connectionId, BufferId id, native_handle_t *handle)
: mHasCache(false), mConnectionId(connectionId),
mId(id), mHandle(handle) {
- mExpireUs = getTimestampNow() + kCacheTtlUs;
+ mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
}
~ClientBuffer() {
@@ -212,8 +216,8 @@
}
bool expire() const {
- int64_t now = getTimestampNow();
- return now >= mExpireUs;
+ int64_t now = ::android::elapsedRealtime();
+ return now >= mExpireMs;
}
bool hasCache() const {
@@ -265,20 +269,21 @@
}
};
-BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer)
+BufferPoolClient::Impl::Impl(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer)
: mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
- mLastEvictCacheUs(getTimestampNow()) {
- const StatusDescriptor *statusDesc;
- const InvalidationDescriptor *invDesc;
- ResultStatus status = accessor->connect(
+ mLastEvictCacheMs(::android::elapsedRealtime()) {
+ StatusDescriptor statusDesc;
+ InvalidationDescriptor invDesc;
+ BufferPoolStatus status = accessor->connect(
observer, true,
&mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
&statusDesc, &invDesc);
if (status == ResultStatus::OK) {
mReleasing.mStatusChannel =
- std::make_unique<BufferStatusChannel>(*statusDesc);
+ std::make_unique<BufferStatusChannel>(statusDesc);
mInvalidationListener =
- std::make_unique<BufferInvalidationListener>(*invDesc);
+ std::make_unique<BufferInvalidationListener>(invDesc);
mValid = mReleasing.mStatusChannel &&
mReleasing.mStatusChannel->isValid() &&
mInvalidationListener &&
@@ -286,46 +291,36 @@
}
}
-BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer)
+BufferPoolClient::Impl::Impl(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer)
: mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
- mLastEvictCacheUs(getTimestampNow()) {
+ mLastEvictCacheMs(::android::elapsedRealtime()) {
+ IAccessor::ConnectionInfo conInfo;
bool valid = false;
- sp<IConnection>& outConnection = mRemoteConnection;
- ConnectionId& id = mConnectionId;
- uint32_t& outMsgId = mReleasing.mInvalidateId;
- std::unique_ptr<BufferStatusChannel>& outChannel =
- mReleasing.mStatusChannel;
- std::unique_ptr<BufferInvalidationListener>& outObserver =
- mInvalidationListener;
- Return<void> transResult = accessor->connect(
- observer,
- [&valid, &outConnection, &id, &outMsgId, &outChannel, &outObserver]
- (ResultStatus status, sp<IConnection> connection,
- ConnectionId connectionId, uint32_t msgId,
- const StatusDescriptor& statusDesc,
- const InvalidationDescriptor& invDesc) {
- if (status == ResultStatus::OK) {
- outConnection = connection;
- id = connectionId;
- outMsgId = msgId;
- outChannel = std::make_unique<BufferStatusChannel>(statusDesc);
- outObserver = std::make_unique<BufferInvalidationListener>(invDesc);
- if (outChannel && outChannel->isValid() &&
- outObserver && outObserver->isValid()) {
- valid = true;
- }
- }
- });
- mValid = transResult.isOk() && valid;
+ if(accessor->connect(observer, &conInfo).isOk()) {
+ auto channel = std::make_unique<BufferStatusChannel>(conInfo.toFmqDesc);
+ auto observer = std::make_unique<BufferInvalidationListener>(conInfo.fromFmqDesc);
+
+ if (channel && channel->isValid()
+ && observer && observer->isValid()) {
+ mRemoteConnection = conInfo.connection;
+ mConnectionId = conInfo.connectionId;
+ mReleasing.mInvalidateId = conInfo.msgId;
+ mReleasing.mStatusChannel = std::move(channel);
+ mInvalidationListener = std::move(observer);
+ valid = true;
+ }
+ }
+ mValid = valid;
}
-bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionUs, bool clearCache) {
+bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionMs, bool clearCache) {
bool active = false;
{
std::lock_guard<std::mutex> lock(mCache.mLock);
syncReleased();
evictCaches(clearCache);
- *lastTransactionUs = mCache.mLastChangeUs;
+ *lastTransactionMs = mCache.mLastChangeMs;
active = mCache.mActive > 0;
}
if (mValid && mLocal && mLocalConnection) {
@@ -341,7 +336,7 @@
// TODO: evict cache required?
}
-ResultStatus BufferPoolClient::Impl::flush() {
+BufferPoolStatus BufferPoolClient::Impl::flush() {
if (!mLocal || !mLocalConnection || !mValid) {
return ResultStatus::CRITICAL_ERROR;
}
@@ -353,7 +348,7 @@
}
}
-ResultStatus BufferPoolClient::Impl::allocate(
+BufferPoolStatus BufferPoolClient::Impl::allocate(
const std::vector<uint8_t> ¶ms,
native_handle_t **pHandle,
std::shared_ptr<BufferPoolData> *buffer) {
@@ -363,7 +358,7 @@
BufferId bufferId;
native_handle_t *handle = nullptr;
buffer->reset();
- ResultStatus status = allocateBufferHandle(params, &bufferId, &handle);
+ BufferPoolStatus status = allocateBufferHandle(params, &bufferId, &handle);
if (status == ResultStatus::OK) {
if (handle) {
std::unique_lock<std::mutex> lock(mCache.mLock);
@@ -398,20 +393,20 @@
return status;
}
-ResultStatus BufferPoolClient::Impl::receive(
- TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
+BufferPoolStatus BufferPoolClient::Impl::receive(
+ TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
native_handle_t **pHandle,
std::shared_ptr<BufferPoolData> *buffer) {
if (!mValid) {
return ResultStatus::CRITICAL_ERROR;
}
- if (timestampUs != 0) {
- timestampUs += kReceiveTimeoutUs;
+ if (timestampMs != 0) {
+ timestampMs += kReceiveTimeoutMs;
}
- if (!postReceive(bufferId, transactionId, timestampUs)) {
+ if (!postReceive(bufferId, transactionId, timestampMs)) {
return ResultStatus::CRITICAL_ERROR;
}
- ResultStatus status = ResultStatus::CRITICAL_ERROR;
+ BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
buffer->reset();
while(1) {
std::unique_lock<std::mutex> lock(mCache.mLock);
@@ -505,7 +500,7 @@
// TODO: revise ad-hoc posting data structure
bool BufferPoolClient::Impl::postSend(
BufferId bufferId, ConnectionId receiver,
- TransactionId *transactionId, int64_t *timestampUs) {
+ TransactionId *transactionId, int64_t *timestampMs) {
{
// TODO: don't need to call syncReleased every time
std::lock_guard<std::mutex> lock(mCache.mLock);
@@ -515,7 +510,7 @@
bool needsSync = false;
{
std::lock_guard<std::mutex> lock(mReleasing.mLock);
- *timestampUs = getTimestampNow();
+ *timestampMs = ::android::elapsedRealtime();
*transactionId = (mConnectionId << 32) | mSeqId++;
// TODO: retry, add timeout, target?
ret = mReleasing.mStatusChannel->postBufferStatusMessage(
@@ -533,11 +528,11 @@
}
bool BufferPoolClient::Impl::postReceive(
- BufferId bufferId, TransactionId transactionId, int64_t timestampUs) {
+ BufferId bufferId, TransactionId transactionId, int64_t timestampMs) {
for (int i = 0; i < kPostMaxRetry; ++i) {
std::unique_lock<std::mutex> lock(mReleasing.mLock);
- int64_t now = getTimestampNow();
- if (timestampUs == 0 || now < timestampUs) {
+ int64_t now = ::android::elapsedRealtime();
+ if (timestampMs == 0 || now < timestampMs) {
bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
transactionId, bufferId, BufferStatus::TRANSFER_FROM,
mConnectionId, -1, mReleasing.mReleasingIds,
@@ -579,16 +574,7 @@
needsSync = mReleasing.mStatusChannel->needsSync();
}
if (needsSync) {
- TransactionId transactionId = (mConnectionId << 32);
- BufferId bufferId = Connection::SYNC_BUFFERID;
- Return<void> transResult = mRemoteConnection->fetch(
- transactionId, bufferId,
- []
- (ResultStatus outStatus, Buffer outBuffer) {
- (void) outStatus;
- (void) outBuffer;
- });
- if (!transResult.isOk()) {
+ if (!mRemoteConnection->sync().isOk()) {
ALOGD("sync from client %lld failed: bufferpool process died.",
(long long)mConnectionId);
}
@@ -616,12 +602,12 @@
mCache.decActive_l();
} else {
// should not happen!
- ALOGW("client %lld cache release status inconsitent!",
+ ALOGW("client %lld cache release status inconsistent!",
(long long)mConnectionId);
}
} else {
// should not happen!
- ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
+ ALOGW("client %lld cache status inconsistent!", (long long)mConnectionId);
}
}
mReleasing.mReleasedIds.clear();
@@ -673,8 +659,8 @@
// should have mCache.mLock
void BufferPoolClient::Impl::evictCaches(bool clearCache) {
- int64_t now = getTimestampNow();
- if (now >= mLastEvictCacheUs + kCacheTtlUs ||
+ int64_t now = ::android::elapsedRealtime();
+ if (now >= mLastEvictCacheMs + kCacheTtlMs ||
clearCache || mCache.cachedBufferCount() > kMaxCachedBufferCount) {
size_t evicted = 0;
for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
@@ -688,7 +674,7 @@
}
ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
(long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
- mLastEvictCacheUs = now;
+ mLastEvictCacheMs = now;
}
}
@@ -701,7 +687,7 @@
ALOGV("cache invalidated %lld : buffer %u",
(long long)mConnectionId, id);
} else {
- ALOGW("Inconsitent invalidation %lld : activer buffer!! %u",
+ ALOGW("Inconsistent invalidation %lld : activer buffer!! %u",
(long long)mConnectionId, (unsigned int)id);
}
break;
@@ -735,12 +721,12 @@
(long long)mConnectionId, invalidated);
}
-ResultStatus BufferPoolClient::Impl::allocateBufferHandle(
+BufferPoolStatus BufferPoolClient::Impl::allocateBufferHandle(
const std::vector<uint8_t>& params, BufferId *bufferId,
native_handle_t** handle) {
if (mLocalConnection) {
const native_handle_t* allocHandle = nullptr;
- ResultStatus status = mLocalConnection->allocate(
+ BufferPoolStatus status = mLocalConnection->allocate(
params, bufferId, &allocHandle);
if (status == ResultStatus::OK) {
*handle = native_handle_clone(allocHandle);
@@ -753,37 +739,38 @@
return ResultStatus::CRITICAL_ERROR;
}
-ResultStatus BufferPoolClient::Impl::fetchBufferHandle(
+BufferPoolStatus BufferPoolClient::Impl::fetchBufferHandle(
TransactionId transactionId, BufferId bufferId,
native_handle_t **handle) {
- sp<IConnection> connection;
+ std::shared_ptr<IConnection> connection;
if (mLocal) {
connection = mLocalConnection;
} else {
connection = mRemoteConnection;
}
- ResultStatus status;
- Return<void> transResult = connection->fetch(
- transactionId, bufferId,
- [&status, &handle]
- (ResultStatus outStatus, Buffer outBuffer) {
- status = outStatus;
- if (status == ResultStatus::OK) {
- *handle = native_handle_clone(
- outBuffer.buffer.getNativeHandle());
- }
- });
- return transResult.isOk() ? status : ResultStatus::CRITICAL_ERROR;
+ std::vector<FetchInfo> infos;
+ std::vector<FetchResult> results;
+ infos.emplace_back(FetchInfo{ToAidl(transactionId), ToAidl(bufferId)});
+ ndk::ScopedAStatus status = connection->fetch(infos, &results);
+ if (!status.isOk()) {
+ BufferPoolStatus svcSpecific = status.getServiceSpecificError();
+ return svcSpecific ? svcSpecific : ResultStatus::CRITICAL_ERROR;
+ }
+ if (results[0].getTag() == FetchResult::buffer) {
+ *handle = ::android::dupFromAidl(results[0].get<FetchResult::buffer>().buffer);
+ return ResultStatus::OK;
+ }
+ return results[0].get<FetchResult::failure>();
}
-BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor,
- const sp<IObserver> &observer) {
+BufferPoolClient::BufferPoolClient(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer) {
mImpl = std::make_shared<Impl>(accessor, observer);
}
-BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor,
- const sp<IObserver> &observer) {
+BufferPoolClient::BufferPoolClient(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer) {
mImpl = std::make_shared<Impl>(accessor, observer);
}
@@ -799,12 +786,12 @@
return mImpl && mImpl->isLocal();
}
-bool BufferPoolClient::isActive(int64_t *lastTransactionUs, bool clearCache) {
+bool BufferPoolClient::isActive(int64_t *lastTransactionMs, bool clearCache) {
if (!isValid()) {
- *lastTransactionUs = 0;
+ *lastTransactionMs = 0;
return false;
}
- return mImpl->isActive(lastTransactionUs, clearCache);
+ return mImpl->isActive(lastTransactionMs, clearCache);
}
ConnectionId BufferPoolClient::getConnectionId() {
@@ -814,7 +801,7 @@
return -1;
}
-ResultStatus BufferPoolClient::getAccessor(sp<IAccessor> *accessor) {
+BufferPoolStatus BufferPoolClient::getAccessor(std::shared_ptr<IAccessor> *accessor) {
if (isValid()) {
*accessor = mImpl->getAccessor();
return ResultStatus::OK;
@@ -829,14 +816,14 @@
}
}
-ResultStatus BufferPoolClient::flush() {
+BufferPoolStatus BufferPoolClient::flush() {
if (isValid()) {
return mImpl->flush();
}
return ResultStatus::CRITICAL_ERROR;
}
-ResultStatus BufferPoolClient::allocate(
+BufferPoolStatus BufferPoolClient::allocate(
const std::vector<uint8_t> ¶ms,
native_handle_t **handle,
std::shared_ptr<BufferPoolData> *buffer) {
@@ -846,31 +833,26 @@
return ResultStatus::CRITICAL_ERROR;
}
-ResultStatus BufferPoolClient::receive(
- TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
+BufferPoolStatus BufferPoolClient::receive(
+ TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
if (isValid()) {
- return mImpl->receive(transactionId, bufferId, timestampUs, handle, buffer);
+ return mImpl->receive(transactionId, bufferId, timestampMs, handle, buffer);
}
return ResultStatus::CRITICAL_ERROR;
}
-ResultStatus BufferPoolClient::postSend(
+BufferPoolStatus BufferPoolClient::postSend(
ConnectionId receiverId,
const std::shared_ptr<BufferPoolData> &buffer,
TransactionId *transactionId,
- int64_t *timestampUs) {
+ int64_t *timestampMs) {
if (isValid()) {
bool result = mImpl->postSend(
- buffer->mId, receiverId, transactionId, timestampUs);
+ buffer->mId, receiverId, transactionId, timestampMs);
return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
}
return ResultStatus::CRITICAL_ERROR;
}
-} // namespace implementation
-} // namespace V2_0
-} // namespace bufferpool
-} // namespace media
-} // namespace hardware
-} // namespace android
+} // namespace aidl::android::hardware::media::bufferpool2::implementation