blob: e9777d8c41a5a610576e44fd57e65a0561e8f9e4 [file] [log] [blame]
Sungtak Lee8fc3ca42022-12-07 07:45:45 +00001/*
Sungtak Lee76937c62022-12-07 11:42:03 +00002 * Copyright (C) 2022 The Android Open Source Project
Sungtak Lee8fc3ca42022-12-07 07:45:45 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Sungtak Lee76937c62022-12-07 11:42:03 +000017#define LOG_TAG "AidlBufferPoolCli"
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000018//#define LOG_NDEBUG 0
19
20#include <thread>
Sungtak Lee76937c62022-12-07 11:42:03 +000021#include <aidlcommonsupport/NativeHandle.h>
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000022#include <utils/Log.h>
23#include "BufferPoolClient.h"
Sungtak Lee76937c62022-12-07 11:42:03 +000024#include "Accessor.h"
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000025#include "Connection.h"
26
Sungtak Lee76937c62022-12-07 11:42:03 +000027namespace aidl::android::hardware::media::bufferpool2::implementation {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000028
Sungtak Lee76937c62022-12-07 11:42:03 +000029using aidl::android::hardware::media::bufferpool2::IConnection;
30using aidl::android::hardware::media::bufferpool2::ResultStatus;
31using FetchInfo = aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo;
32using FetchResult = aidl::android::hardware::media::bufferpool2::IConnection::FetchResult;
33
34static constexpr int64_t kReceiveTimeoutMs = 2000; // 2s
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000035static constexpr int kPostMaxRetry = 3;
Sungtak Lee76937c62022-12-07 11:42:03 +000036static constexpr int kCacheTtlMs = 1000;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000037static constexpr size_t kMaxCachedBufferCount = 64;
38static constexpr size_t kCachedBufferCountTarget = kMaxCachedBufferCount - 16;
39
40class BufferPoolClient::Impl
41 : public std::enable_shared_from_this<BufferPoolClient::Impl> {
42public:
Sungtak Lee76937c62022-12-07 11:42:03 +000043 explicit Impl(const std::shared_ptr<Accessor> &accessor,
44 const std::shared_ptr<IObserver> &observer);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000045
Sungtak Lee76937c62022-12-07 11:42:03 +000046 explicit Impl(const std::shared_ptr<IAccessor> &accessor,
47 const std::shared_ptr<IObserver> &observer);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000048
49 bool isValid() {
50 return mValid;
51 }
52
53 bool isLocal() {
54 return mValid && mLocal;
55 }
56
57 ConnectionId getConnectionId() {
58 return mConnectionId;
59 }
60
Sungtak Lee76937c62022-12-07 11:42:03 +000061 std::shared_ptr<IAccessor> &getAccessor() {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000062 return mAccessor;
63 }
64
Sungtak Lee76937c62022-12-07 11:42:03 +000065 bool isActive(int64_t *lastTransactionMs, bool clearCache);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000066
67 void receiveInvalidation(uint32_t msgID);
68
Sungtak Lee76937c62022-12-07 11:42:03 +000069 BufferPoolStatus flush();
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000070
Sungtak Lee76937c62022-12-07 11:42:03 +000071 BufferPoolStatus allocate(const std::vector<uint8_t> &params,
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000072 native_handle_t **handle,
73 std::shared_ptr<BufferPoolData> *buffer);
74
Sungtak Lee76937c62022-12-07 11:42:03 +000075 BufferPoolStatus receive(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000076 TransactionId transactionId, BufferId bufferId,
Sungtak Lee76937c62022-12-07 11:42:03 +000077 int64_t timestampMs,
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000078 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
79
80 void postBufferRelease(BufferId bufferId);
81
82 bool postSend(
83 BufferId bufferId, ConnectionId receiver,
Sungtak Lee76937c62022-12-07 11:42:03 +000084 TransactionId *transactionId, int64_t *timestampMs);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000085private:
86
87 bool postReceive(
88 BufferId bufferId, TransactionId transactionId,
Sungtak Lee76937c62022-12-07 11:42:03 +000089 int64_t timestampMs);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +000090
91 bool postReceiveResult(
92 BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
93
94 void trySyncFromRemote();
95
96 bool syncReleased(uint32_t msgId = 0);
97
98 void evictCaches(bool clearCache = false);
99
100 void invalidateBuffer(BufferId id);
101
102 void invalidateRange(BufferId from, BufferId to);
103
Sungtak Lee76937c62022-12-07 11:42:03 +0000104 BufferPoolStatus allocateBufferHandle(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000105 const std::vector<uint8_t>& params, BufferId *bufferId,
106 native_handle_t **handle);
107
Sungtak Lee76937c62022-12-07 11:42:03 +0000108 BufferPoolStatus fetchBufferHandle(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000109 TransactionId transactionId, BufferId bufferId,
110 native_handle_t **handle);
111
112 struct BlockPoolDataDtor;
113 struct ClientBuffer;
114
115 bool mLocal;
116 bool mValid;
Sungtak Lee76937c62022-12-07 11:42:03 +0000117 std::shared_ptr<IAccessor> mAccessor;
118 std::shared_ptr<Connection> mLocalConnection;
119 std::shared_ptr<IConnection> mRemoteConnection;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000120 uint32_t mSeqId;
121 ConnectionId mConnectionId;
Sungtak Lee76937c62022-12-07 11:42:03 +0000122 int64_t mLastEvictCacheMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000123 std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
124
125 // CachedBuffers
126 struct BufferCache {
127 std::mutex mLock;
128 bool mCreating;
129 std::condition_variable mCreateCv;
130 std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
131 int mActive;
Sungtak Lee76937c62022-12-07 11:42:03 +0000132 int64_t mLastChangeMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000133
Sungtak Lee76937c62022-12-07 11:42:03 +0000134 BufferCache() : mCreating(false), mActive(0),
135 mLastChangeMs(::android::elapsedRealtime()) {}
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000136
137 void incActive_l() {
138 ++mActive;
Sungtak Lee76937c62022-12-07 11:42:03 +0000139 mLastChangeMs = ::android::elapsedRealtime();
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000140 }
141
142 void decActive_l() {
143 --mActive;
Sungtak Lee76937c62022-12-07 11:42:03 +0000144 mLastChangeMs = ::android::elapsedRealtime();
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000145 }
146
147 int cachedBufferCount() const {
148 return mBuffers.size() - mActive;
149 }
150 } mCache;
151
152 // FMQ - release notifier
153 struct ReleaseCache {
154 std::mutex mLock;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000155 std::list<BufferId> mReleasingIds;
156 std::list<BufferId> mReleasedIds;
157 uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
158 bool mInvalidateAck;
159 std::unique_ptr<BufferStatusChannel> mStatusChannel;
160
161 ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
162 } mReleasing;
163
164 // This lock is held during synchronization from remote side.
Sungtak Lee76937c62022-12-07 11:42:03 +0000165 // In order to minimize remote calls and locking duration, this lock is held
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000166 // by best effort approach using try_lock().
167 std::mutex mRemoteSyncLock;
168};
169
170struct BufferPoolClient::Impl::BlockPoolDataDtor {
171 BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
172 : mImpl(impl) {}
173
174 void operator()(BufferPoolData *buffer) {
175 BufferId id = buffer->mId;
176 delete buffer;
177
178 auto impl = mImpl.lock();
179 if (impl && impl->isValid()) {
180 impl->postBufferRelease(id);
181 }
182 }
183 const std::weak_ptr<BufferPoolClient::Impl> mImpl;
184};
185
186struct BufferPoolClient::Impl::ClientBuffer {
187private:
Sungtak Lee76937c62022-12-07 11:42:03 +0000188 int64_t mExpireMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000189 bool mHasCache;
190 ConnectionId mConnectionId;
191 BufferId mId;
192 native_handle_t *mHandle;
193 std::weak_ptr<BufferPoolData> mCache;
194
195 void updateExpire() {
Sungtak Lee76937c62022-12-07 11:42:03 +0000196 mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000197 }
198
199public:
200 ClientBuffer(
201 ConnectionId connectionId, BufferId id, native_handle_t *handle)
202 : mHasCache(false), mConnectionId(connectionId),
203 mId(id), mHandle(handle) {
Sungtak Lee76937c62022-12-07 11:42:03 +0000204 mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000205 }
206
207 ~ClientBuffer() {
208 if (mHandle) {
209 native_handle_close(mHandle);
210 native_handle_delete(mHandle);
211 }
212 }
213
214 BufferId id() const {
215 return mId;
216 }
217
218 bool expire() const {
Sungtak Lee76937c62022-12-07 11:42:03 +0000219 int64_t now = ::android::elapsedRealtime();
220 return now >= mExpireMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000221 }
222
223 bool hasCache() const {
224 return mHasCache;
225 }
226
227 std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
228 if (mHasCache) {
229 std::shared_ptr<BufferPoolData> cache = mCache.lock();
230 if (cache) {
231 *pHandle = mHandle;
232 }
233 return cache;
234 }
235 return nullptr;
236 }
237
238 std::shared_ptr<BufferPoolData> createCache(
239 const std::shared_ptr<BufferPoolClient::Impl> &impl,
240 native_handle_t **pHandle) {
241 if (!mHasCache) {
242 // Allocates a raw ptr in order to avoid sending #postBufferRelease
243 // from deleter, in case of native_handle_clone failure.
244 BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
245 if (ptr) {
246 std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
247 if (cache) {
248 mCache = cache;
249 mHasCache = true;
250 *pHandle = mHandle;
251 return cache;
252 }
253 }
254 if (ptr) {
255 delete ptr;
256 }
257 }
258 return nullptr;
259 }
260
261 bool onCacheRelease() {
262 if (mHasCache) {
263 // TODO: verify mCache is not valid;
264 updateExpire();
265 mHasCache = false;
266 return true;
267 }
268 return false;
269 }
270};
271
Sungtak Lee76937c62022-12-07 11:42:03 +0000272BufferPoolClient::Impl::Impl(const std::shared_ptr<Accessor> &accessor,
273 const std::shared_ptr<IObserver> &observer)
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000274 : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
Sungtak Lee76937c62022-12-07 11:42:03 +0000275 mLastEvictCacheMs(::android::elapsedRealtime()) {
276 StatusDescriptor statusDesc;
277 InvalidationDescriptor invDesc;
278 BufferPoolStatus status = accessor->connect(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000279 observer, true,
280 &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
281 &statusDesc, &invDesc);
282 if (status == ResultStatus::OK) {
283 mReleasing.mStatusChannel =
Sungtak Lee76937c62022-12-07 11:42:03 +0000284 std::make_unique<BufferStatusChannel>(statusDesc);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000285 mInvalidationListener =
Sungtak Lee76937c62022-12-07 11:42:03 +0000286 std::make_unique<BufferInvalidationListener>(invDesc);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000287 mValid = mReleasing.mStatusChannel &&
288 mReleasing.mStatusChannel->isValid() &&
289 mInvalidationListener &&
290 mInvalidationListener->isValid();
291 }
292}
293
Sungtak Lee76937c62022-12-07 11:42:03 +0000294BufferPoolClient::Impl::Impl(const std::shared_ptr<IAccessor> &accessor,
295 const std::shared_ptr<IObserver> &observer)
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000296 : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
Sungtak Lee76937c62022-12-07 11:42:03 +0000297 mLastEvictCacheMs(::android::elapsedRealtime()) {
298 IAccessor::ConnectionInfo conInfo;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000299 bool valid = false;
Sungtak Lee76937c62022-12-07 11:42:03 +0000300 if(accessor->connect(observer, &conInfo).isOk()) {
301 auto channel = std::make_unique<BufferStatusChannel>(conInfo.toFmqDesc);
302 auto observer = std::make_unique<BufferInvalidationListener>(conInfo.fromFmqDesc);
303
304 if (channel && channel->isValid()
305 && observer && observer->isValid()) {
306 mRemoteConnection = conInfo.connection;
307 mConnectionId = conInfo.connectionId;
308 mReleasing.mInvalidateId = conInfo.msgId;
309 mReleasing.mStatusChannel = std::move(channel);
310 mInvalidationListener = std::move(observer);
311 valid = true;
312 }
313 }
314 mValid = valid;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000315}
316
Sungtak Lee76937c62022-12-07 11:42:03 +0000317bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionMs, bool clearCache) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000318 bool active = false;
319 {
320 std::lock_guard<std::mutex> lock(mCache.mLock);
321 syncReleased();
322 evictCaches(clearCache);
Sungtak Lee76937c62022-12-07 11:42:03 +0000323 *lastTransactionMs = mCache.mLastChangeMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000324 active = mCache.mActive > 0;
325 }
326 if (mValid && mLocal && mLocalConnection) {
327 mLocalConnection->cleanUp(clearCache);
328 return true;
329 }
330 return active;
331}
332
333void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
334 std::lock_guard<std::mutex> lock(mCache.mLock);
335 syncReleased(messageId);
336 // TODO: evict cache required?
337}
338
Sungtak Lee76937c62022-12-07 11:42:03 +0000339BufferPoolStatus BufferPoolClient::Impl::flush() {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000340 if (!mLocal || !mLocalConnection || !mValid) {
341 return ResultStatus::CRITICAL_ERROR;
342 }
343 {
344 std::unique_lock<std::mutex> lock(mCache.mLock);
345 syncReleased();
346 evictCaches();
347 return mLocalConnection->flush();
348 }
349}
350
Sungtak Lee76937c62022-12-07 11:42:03 +0000351BufferPoolStatus BufferPoolClient::Impl::allocate(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000352 const std::vector<uint8_t> &params,
353 native_handle_t **pHandle,
354 std::shared_ptr<BufferPoolData> *buffer) {
355 if (!mLocal || !mLocalConnection || !mValid) {
356 return ResultStatus::CRITICAL_ERROR;
357 }
358 BufferId bufferId;
359 native_handle_t *handle = nullptr;
360 buffer->reset();
Sungtak Lee76937c62022-12-07 11:42:03 +0000361 BufferPoolStatus status = allocateBufferHandle(params, &bufferId, &handle);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000362 if (status == ResultStatus::OK) {
363 if (handle) {
364 std::unique_lock<std::mutex> lock(mCache.mLock);
365 syncReleased();
366 evictCaches();
367 auto cacheIt = mCache.mBuffers.find(bufferId);
368 if (cacheIt != mCache.mBuffers.end()) {
369 // TODO: verify it is recycled. (not having active ref)
370 mCache.mBuffers.erase(cacheIt);
371 }
372 auto clientBuffer = std::make_unique<ClientBuffer>(
373 mConnectionId, bufferId, handle);
374 if (clientBuffer) {
375 auto result = mCache.mBuffers.insert(std::make_pair(
376 bufferId, std::move(clientBuffer)));
377 if (result.second) {
378 *buffer = result.first->second->createCache(
379 shared_from_this(), pHandle);
380 if (*buffer) {
381 mCache.incActive_l();
382 }
383 }
384 }
385 }
386 if (!*buffer) {
387 ALOGV("client cache creation failure %d: %lld",
388 handle != nullptr, (long long)mConnectionId);
389 status = ResultStatus::NO_MEMORY;
390 postBufferRelease(bufferId);
391 }
392 }
393 return status;
394}
395
Sungtak Lee76937c62022-12-07 11:42:03 +0000396BufferPoolStatus BufferPoolClient::Impl::receive(
397 TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000398 native_handle_t **pHandle,
399 std::shared_ptr<BufferPoolData> *buffer) {
400 if (!mValid) {
401 return ResultStatus::CRITICAL_ERROR;
402 }
Sungtak Lee76937c62022-12-07 11:42:03 +0000403 if (timestampMs != 0) {
404 timestampMs += kReceiveTimeoutMs;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000405 }
Sungtak Lee76937c62022-12-07 11:42:03 +0000406 if (!postReceive(bufferId, transactionId, timestampMs)) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000407 return ResultStatus::CRITICAL_ERROR;
408 }
Sungtak Lee76937c62022-12-07 11:42:03 +0000409 BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000410 buffer->reset();
411 while(1) {
412 std::unique_lock<std::mutex> lock(mCache.mLock);
413 syncReleased();
414 evictCaches();
415 auto cacheIt = mCache.mBuffers.find(bufferId);
416 if (cacheIt != mCache.mBuffers.end()) {
417 if (cacheIt->second->hasCache()) {
418 *buffer = cacheIt->second->fetchCache(pHandle);
419 if (!*buffer) {
420 // check transfer time_out
421 lock.unlock();
422 std::this_thread::yield();
423 continue;
424 }
425 ALOGV("client receive from reference %lld", (long long)mConnectionId);
426 break;
427 } else {
428 *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
429 if (*buffer) {
430 mCache.incActive_l();
431 }
432 ALOGV("client receive from cache %lld", (long long)mConnectionId);
433 break;
434 }
435 } else {
436 if (!mCache.mCreating) {
437 mCache.mCreating = true;
438 lock.unlock();
439 native_handle_t* handle = nullptr;
440 status = fetchBufferHandle(transactionId, bufferId, &handle);
441 lock.lock();
442 if (status == ResultStatus::OK) {
443 if (handle) {
444 auto clientBuffer = std::make_unique<ClientBuffer>(
445 mConnectionId, bufferId, handle);
446 if (clientBuffer) {
447 auto result = mCache.mBuffers.insert(
448 std::make_pair(bufferId, std::move(
449 clientBuffer)));
450 if (result.second) {
451 *buffer = result.first->second->createCache(
452 shared_from_this(), pHandle);
453 if (*buffer) {
454 mCache.incActive_l();
455 }
456 }
457 }
458 }
459 if (!*buffer) {
460 status = ResultStatus::NO_MEMORY;
461 }
462 }
463 mCache.mCreating = false;
464 lock.unlock();
465 mCache.mCreateCv.notify_all();
466 break;
467 }
468 mCache.mCreateCv.wait(lock);
469 }
470 }
471 bool needsSync = false;
472 bool posted = postReceiveResult(bufferId, transactionId,
473 *buffer ? true : false, &needsSync);
474 ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
475 *buffer ? "ok" : "fail", posted);
476 if (mValid && mLocal && mLocalConnection) {
477 mLocalConnection->cleanUp(false);
478 }
479 if (needsSync && mRemoteConnection) {
480 trySyncFromRemote();
481 }
482 if (*buffer) {
483 if (!posted) {
484 buffer->reset();
485 return ResultStatus::CRITICAL_ERROR;
486 }
487 return ResultStatus::OK;
488 }
489 return status;
490}
491
492
493void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
494 std::lock_guard<std::mutex> lock(mReleasing.mLock);
495 mReleasing.mReleasingIds.push_back(bufferId);
496 mReleasing.mStatusChannel->postBufferRelease(
497 mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
498}
499
500// TODO: revise ad-hoc posting data structure
501bool BufferPoolClient::Impl::postSend(
502 BufferId bufferId, ConnectionId receiver,
Sungtak Lee76937c62022-12-07 11:42:03 +0000503 TransactionId *transactionId, int64_t *timestampMs) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000504 {
505 // TODO: don't need to call syncReleased every time
506 std::lock_guard<std::mutex> lock(mCache.mLock);
507 syncReleased();
508 }
509 bool ret = false;
510 bool needsSync = false;
511 {
512 std::lock_guard<std::mutex> lock(mReleasing.mLock);
Sungtak Lee76937c62022-12-07 11:42:03 +0000513 *timestampMs = ::android::elapsedRealtime();
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000514 *transactionId = (mConnectionId << 32) | mSeqId++;
515 // TODO: retry, add timeout, target?
516 ret = mReleasing.mStatusChannel->postBufferStatusMessage(
517 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
518 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
519 needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
520 }
521 if (mValid && mLocal && mLocalConnection) {
522 mLocalConnection->cleanUp(false);
523 }
524 if (needsSync && mRemoteConnection) {
525 trySyncFromRemote();
526 }
527 return ret;
528}
529
530bool BufferPoolClient::Impl::postReceive(
Sungtak Lee76937c62022-12-07 11:42:03 +0000531 BufferId bufferId, TransactionId transactionId, int64_t timestampMs) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000532 for (int i = 0; i < kPostMaxRetry; ++i) {
533 std::unique_lock<std::mutex> lock(mReleasing.mLock);
Sungtak Lee76937c62022-12-07 11:42:03 +0000534 int64_t now = ::android::elapsedRealtime();
535 if (timestampMs == 0 || now < timestampMs) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000536 bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
537 transactionId, bufferId, BufferStatus::TRANSFER_FROM,
538 mConnectionId, -1, mReleasing.mReleasingIds,
539 mReleasing.mReleasedIds);
540 if (result) {
541 return true;
542 }
543 lock.unlock();
544 std::this_thread::yield();
545 } else {
546 mReleasing.mStatusChannel->postBufferStatusMessage(
547 transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
548 mConnectionId, -1, mReleasing.mReleasingIds,
549 mReleasing.mReleasedIds);
550 return false;
551 }
552 }
553 return false;
554}
555
556bool BufferPoolClient::Impl::postReceiveResult(
557 BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
558 std::lock_guard<std::mutex> lock(mReleasing.mLock);
559 // TODO: retry, add timeout
560 bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
561 transactionId, bufferId,
562 result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
563 mConnectionId, -1, mReleasing.mReleasingIds,
564 mReleasing.mReleasedIds);
565 *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
566 return ret;
567}
568
569void BufferPoolClient::Impl::trySyncFromRemote() {
570 if (mRemoteSyncLock.try_lock()) {
571 bool needsSync = false;
572 {
573 std::lock_guard<std::mutex> lock(mReleasing.mLock);
574 needsSync = mReleasing.mStatusChannel->needsSync();
575 }
576 if (needsSync) {
Sungtak Lee76937c62022-12-07 11:42:03 +0000577 if (!mRemoteConnection->sync().isOk()) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000578 ALOGD("sync from client %lld failed: bufferpool process died.",
579 (long long)mConnectionId);
580 }
581 }
582 mRemoteSyncLock.unlock();
583 }
584}
585
586// should have mCache.mLock
587bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
588 bool cleared = false;
589 {
590 std::lock_guard<std::mutex> lock(mReleasing.mLock);
591 if (mReleasing.mReleasingIds.size() > 0) {
592 mReleasing.mStatusChannel->postBufferRelease(
593 mConnectionId, mReleasing.mReleasingIds,
594 mReleasing.mReleasedIds);
595 }
596 if (mReleasing.mReleasedIds.size() > 0) {
597 for (BufferId& id: mReleasing.mReleasedIds) {
598 ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
599 auto found = mCache.mBuffers.find(id);
600 if (found != mCache.mBuffers.end()) {
601 if (found->second->onCacheRelease()) {
602 mCache.decActive_l();
603 } else {
604 // should not happen!
Sungtak Lee76937c62022-12-07 11:42:03 +0000605 ALOGW("client %lld cache release status inconsistent!",
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000606 (long long)mConnectionId);
607 }
608 } else {
609 // should not happen!
Sungtak Lee76937c62022-12-07 11:42:03 +0000610 ALOGW("client %lld cache status inconsistent!", (long long)mConnectionId);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000611 }
612 }
613 mReleasing.mReleasedIds.clear();
614 cleared = true;
615 }
616 }
617 std::vector<BufferInvalidationMessage> invalidations;
618 mInvalidationListener->getInvalidations(invalidations);
619 uint32_t lastMsgId = 0;
620 if (invalidations.size() > 0) {
621 for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
622 if (it->messageId != 0) {
623 lastMsgId = it->messageId;
624 }
625 if (it->fromBufferId == it->toBufferId) {
626 // TODO: handle fromBufferId = UINT32_MAX
627 invalidateBuffer(it->fromBufferId);
628 } else {
629 invalidateRange(it->fromBufferId, it->toBufferId);
630 }
631 }
632 }
633 {
634 std::lock_guard<std::mutex> lock(mReleasing.mLock);
635 if (lastMsgId != 0) {
636 if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
637 mReleasing.mInvalidateId = lastMsgId;
638 mReleasing.mInvalidateAck = false;
639 }
640 } else if (messageId != 0) {
641 // messages are drained.
642 if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
643 mReleasing.mInvalidateId = messageId;
644 mReleasing.mInvalidateAck = true;
645 }
646 }
647 if (!mReleasing.mInvalidateAck) {
648 // post ACK
649 mReleasing.mStatusChannel->postBufferInvalidateAck(
650 mConnectionId,
651 mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
652 ALOGV("client %lld invalidateion ack (%d) %u",
653 (long long)mConnectionId,
654 mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
655 }
656 }
657 return cleared;
658}
659
660// should have mCache.mLock
661void BufferPoolClient::Impl::evictCaches(bool clearCache) {
Sungtak Lee76937c62022-12-07 11:42:03 +0000662 int64_t now = ::android::elapsedRealtime();
663 if (now >= mLastEvictCacheMs + kCacheTtlMs ||
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000664 clearCache || mCache.cachedBufferCount() > kMaxCachedBufferCount) {
665 size_t evicted = 0;
666 for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
667 if (!it->second->hasCache() && (it->second->expire() ||
668 clearCache || mCache.cachedBufferCount() > kCachedBufferCountTarget)) {
669 it = mCache.mBuffers.erase(it);
670 ++evicted;
671 } else {
672 ++it;
673 }
674 }
675 ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
676 (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
Sungtak Lee76937c62022-12-07 11:42:03 +0000677 mLastEvictCacheMs = now;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000678 }
679}
680
681// should have mCache.mLock
682void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
683 for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
684 if (id == it->second->id()) {
685 if (!it->second->hasCache()) {
686 mCache.mBuffers.erase(it);
687 ALOGV("cache invalidated %lld : buffer %u",
688 (long long)mConnectionId, id);
689 } else {
Sungtak Lee76937c62022-12-07 11:42:03 +0000690 ALOGW("Inconsistent invalidation %lld : activer buffer!! %u",
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000691 (long long)mConnectionId, (unsigned int)id);
692 }
693 break;
694 }
695 }
696}
697
698// should have mCache.mLock
699void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
700 size_t invalidated = 0;
701 for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
702 if (!it->second->hasCache()) {
703 BufferId bid = it->second->id();
704 if (from < to) {
705 if (from <= bid && bid < to) {
706 ++invalidated;
707 it = mCache.mBuffers.erase(it);
708 continue;
709 }
710 } else {
711 if (from <= bid || bid < to) {
712 ++invalidated;
713 it = mCache.mBuffers.erase(it);
714 continue;
715 }
716 }
717 }
718 ++it;
719 }
720 ALOGV("cache invalidated %lld : # of invalidated %zu",
721 (long long)mConnectionId, invalidated);
722}
723
Sungtak Lee76937c62022-12-07 11:42:03 +0000724BufferPoolStatus BufferPoolClient::Impl::allocateBufferHandle(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000725 const std::vector<uint8_t>& params, BufferId *bufferId,
726 native_handle_t** handle) {
727 if (mLocalConnection) {
728 const native_handle_t* allocHandle = nullptr;
Sungtak Lee76937c62022-12-07 11:42:03 +0000729 BufferPoolStatus status = mLocalConnection->allocate(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000730 params, bufferId, &allocHandle);
731 if (status == ResultStatus::OK) {
732 *handle = native_handle_clone(allocHandle);
733 }
734 ALOGV("client allocate result %lld %d : %u clone %p",
735 (long long)mConnectionId, status == ResultStatus::OK,
736 *handle ? *bufferId : 0 , *handle);
737 return status;
738 }
739 return ResultStatus::CRITICAL_ERROR;
740}
741
Sungtak Lee76937c62022-12-07 11:42:03 +0000742BufferPoolStatus BufferPoolClient::Impl::fetchBufferHandle(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000743 TransactionId transactionId, BufferId bufferId,
744 native_handle_t **handle) {
Sungtak Lee76937c62022-12-07 11:42:03 +0000745 std::shared_ptr<IConnection> connection;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000746 if (mLocal) {
747 connection = mLocalConnection;
748 } else {
749 connection = mRemoteConnection;
750 }
Sungtak Lee76937c62022-12-07 11:42:03 +0000751 std::vector<FetchInfo> infos;
752 std::vector<FetchResult> results;
753 infos.emplace_back(FetchInfo{ToAidl(transactionId), ToAidl(bufferId)});
754 ndk::ScopedAStatus status = connection->fetch(infos, &results);
755 if (!status.isOk()) {
756 BufferPoolStatus svcSpecific = status.getServiceSpecificError();
757 return svcSpecific ? svcSpecific : ResultStatus::CRITICAL_ERROR;
758 }
759 if (results[0].getTag() == FetchResult::buffer) {
760 *handle = ::android::dupFromAidl(results[0].get<FetchResult::buffer>().buffer);
761 return ResultStatus::OK;
762 }
763 return results[0].get<FetchResult::failure>();
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000764}
765
766
Sungtak Lee76937c62022-12-07 11:42:03 +0000767BufferPoolClient::BufferPoolClient(const std::shared_ptr<Accessor> &accessor,
768 const std::shared_ptr<IObserver> &observer) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000769 mImpl = std::make_shared<Impl>(accessor, observer);
770}
771
Sungtak Lee76937c62022-12-07 11:42:03 +0000772BufferPoolClient::BufferPoolClient(const std::shared_ptr<IAccessor> &accessor,
773 const std::shared_ptr<IObserver> &observer) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000774 mImpl = std::make_shared<Impl>(accessor, observer);
775}
776
777BufferPoolClient::~BufferPoolClient() {
778 // TODO: how to handle orphaned buffers?
779}
780
781bool BufferPoolClient::isValid() {
782 return mImpl && mImpl->isValid();
783}
784
785bool BufferPoolClient::isLocal() {
786 return mImpl && mImpl->isLocal();
787}
788
Sungtak Lee76937c62022-12-07 11:42:03 +0000789bool BufferPoolClient::isActive(int64_t *lastTransactionMs, bool clearCache) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000790 if (!isValid()) {
Sungtak Lee76937c62022-12-07 11:42:03 +0000791 *lastTransactionMs = 0;
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000792 return false;
793 }
Sungtak Lee76937c62022-12-07 11:42:03 +0000794 return mImpl->isActive(lastTransactionMs, clearCache);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000795}
796
797ConnectionId BufferPoolClient::getConnectionId() {
798 if (isValid()) {
799 return mImpl->getConnectionId();
800 }
801 return -1;
802}
803
Sungtak Lee76937c62022-12-07 11:42:03 +0000804BufferPoolStatus BufferPoolClient::getAccessor(std::shared_ptr<IAccessor> *accessor) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000805 if (isValid()) {
806 *accessor = mImpl->getAccessor();
807 return ResultStatus::OK;
808 }
809 return ResultStatus::CRITICAL_ERROR;
810}
811
812void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
813 ALOGV("bufferpool2 client recv inv %u", msgId);
814 if (isValid()) {
815 mImpl->receiveInvalidation(msgId);
816 }
817}
818
Sungtak Lee76937c62022-12-07 11:42:03 +0000819BufferPoolStatus BufferPoolClient::flush() {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000820 if (isValid()) {
821 return mImpl->flush();
822 }
823 return ResultStatus::CRITICAL_ERROR;
824}
825
Sungtak Lee76937c62022-12-07 11:42:03 +0000826BufferPoolStatus BufferPoolClient::allocate(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000827 const std::vector<uint8_t> &params,
828 native_handle_t **handle,
829 std::shared_ptr<BufferPoolData> *buffer) {
830 if (isValid()) {
831 return mImpl->allocate(params, handle, buffer);
832 }
833 return ResultStatus::CRITICAL_ERROR;
834}
835
Sungtak Lee76937c62022-12-07 11:42:03 +0000836BufferPoolStatus BufferPoolClient::receive(
837 TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000838 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
839 if (isValid()) {
Sungtak Lee76937c62022-12-07 11:42:03 +0000840 return mImpl->receive(transactionId, bufferId, timestampMs, handle, buffer);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000841 }
842 return ResultStatus::CRITICAL_ERROR;
843}
844
Sungtak Lee76937c62022-12-07 11:42:03 +0000845BufferPoolStatus BufferPoolClient::postSend(
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000846 ConnectionId receiverId,
847 const std::shared_ptr<BufferPoolData> &buffer,
848 TransactionId *transactionId,
Sungtak Lee76937c62022-12-07 11:42:03 +0000849 int64_t *timestampMs) {
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000850 if (isValid()) {
851 bool result = mImpl->postSend(
Sungtak Lee76937c62022-12-07 11:42:03 +0000852 buffer->mId, receiverId, transactionId, timestampMs);
Sungtak Lee8fc3ca42022-12-07 07:45:45 +0000853 return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
854 }
855 return ResultStatus::CRITICAL_ERROR;
856}
857
Sungtak Lee76937c62022-12-07 11:42:03 +0000858} // namespace aidl::android::hardware::media::bufferpool2::implementation