Add buffer transport benchmark
1/ This benchmark supports both Binder-based BufferQueue and PDX-based
BufferHub.
2/ On the producer side, it uses the same ANativeWindow API to gain
buffer and post buffer for both backends.
3/ It supports multiple concurrent producer threads to minic buffers
comming from different applications.
4/ For Binder backend, we use a dedicated process to run a Binder
service, which acquires and releases buffers via BufferItemConsumer.
5/ For BufferHub backend, we use a SchedFIFO thread to epoll on multiple
BufferHub ConsumerQueue(s).
6/ Four operations are traced via "ATRACE_TAG_ALWAYS" tag:
GainBuffer, PostBuffer, AcquireBuffer, ReleaseBuffer. We use
ATRACE_TAG_ALWAYS tag so that user can disable all other trace
tags (i.e. gfx, video) to only trace events from this benchmark
without being impacted by the systrace overhead. But note that the
systrace is only enabled if specifying "--trace" in the commandline.
Bug: 66921451
Test: buffer_transport_benchmark
Change-Id: I8d1b70364e504c48658624caf4d0508a9508e3b0
diff --git a/libs/vr/libbufferhubqueue/tests/Android.bp b/libs/vr/libbufferhubqueue/tests/Android.bp
index 8bd1ef1..c4ffb41 100644
--- a/libs/vr/libbufferhubqueue/tests/Android.bp
+++ b/libs/vr/libbufferhubqueue/tests/Android.bp
@@ -12,6 +12,7 @@
"libhardware",
"libui",
"libutils",
+ "libnativewindow",
]
static_libraries = [
@@ -20,6 +21,7 @@
"libchrome",
"libdvrcommon",
"libpdx_default_transport",
+ "libperformance",
]
cc_test {
@@ -51,3 +53,17 @@
name: "buffer_hub_queue_producer-test",
tags: ["optional"],
}
+
+cc_test {
+ srcs: ["buffer_transport_benchmark.cpp"],
+ static_libs: static_libraries,
+ shared_libs: shared_libraries,
+ header_libs: header_libraries,
+ cflags: [
+ "-DLOG_TAG=\"buffer_transport_benchmark\"",
+ "-DTRACE=0",
+ "-O2",
+ ],
+ name: "buffer_transport_benchmark",
+ tags: ["optional"],
+}
diff --git a/libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp b/libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp
new file mode 100644
index 0000000..5b580df
--- /dev/null
+++ b/libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp
@@ -0,0 +1,619 @@
+#include <android/native_window.h>
+#include <base/logging.h>
+#include <binder/IPCThreadState.h>
+#include <binder/IServiceManager.h>
+#include <dvr/dvr_api.h>
+#include <dvr/performance_client_api.h>
+#include <gtest/gtest.h>
+#include <gui/BufferItem.h>
+#include <gui/BufferItemConsumer.h>
+#include <gui/Surface.h>
+#include <private/dvr/buffer_hub_queue_producer.h>
+#include <utils/Trace.h>
+
+#include <functional>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <poll.h>
+#include <sys/wait.h>
+#include <unistd.h> // for pipe
+
+// Use ALWAYS at the tag level. Control is performed manually during command
+// line processing.
+#ifdef ATRACE_TAG
+#undef ATRACE_TAG
+#endif
+#define ATRACE_TAG ATRACE_TAG_ALWAYS
+
+using namespace android;
+using namespace android::dvr;
+
+static const String16 kBinderService = String16("bufferTransport");
+static const uint32_t kBufferWidth = 100;
+static const uint32_t kBufferHeight = 1;
+static const uint32_t kBufferLayerCount = 1;
+static const uint32_t kBufferFormat = HAL_PIXEL_FORMAT_BLOB;
+static const uint64_t kBufferUsage =
+ GRALLOC_USAGE_SW_READ_OFTEN | GRALLOC_USAGE_SW_WRITE_OFTEN;
+static const int kMaxAcquiredImages = 1;
+static const size_t kMaxQueueCounts = 128;
+
+static int gConcurrency = 1; // 1 writer at a time
+static int gIterations = 1000; // 1K times
+static int gSleepIntervalUs = 16 * 1000; // 16ms
+
+enum BufferTransportServiceCode {
+ CREATE_BUFFER_QUEUE = IBinder::FIRST_CALL_TRANSACTION,
+};
+
+// A mininal cross process helper class based on a bidirectional pipe pair. This
+// is used to signal that Binder-based BufferTransportService has finished
+// initialization.
+class Pipe {
+ public:
+ static std::tuple<Pipe, Pipe> CreatePipePair() {
+ int a[2] = {-1, -1};
+ int b[2] = {-1, -1};
+
+ pipe(a);
+ pipe(b);
+
+ return std::make_tuple(Pipe(a[0], b[1]), Pipe(b[0], a[1]));
+ }
+
+ Pipe() = default;
+
+ Pipe(Pipe&& other) {
+ read_fd_ = other.read_fd_;
+ write_fd_ = other.write_fd_;
+ other.read_fd_ = 0;
+ other.write_fd_ = 0;
+ }
+
+ Pipe& operator=(Pipe&& other) {
+ Reset();
+ read_fd_ = other.read_fd_;
+ write_fd_ = other.write_fd_;
+ other.read_fd_ = 0;
+ other.write_fd_ = 0;
+ return *this;
+ }
+
+ ~Pipe() { Reset(); }
+
+ Pipe(const Pipe&) = delete;
+ Pipe& operator=(const Pipe&) = delete;
+ Pipe& operator=(const Pipe&&) = delete;
+
+ bool IsValid() { return read_fd_ > 0 && write_fd_ > 0; }
+
+ void Signal() {
+ bool val = true;
+ int error = write(write_fd_, &val, sizeof(val));
+ ASSERT_GE(error, 0);
+ };
+
+ void Wait() {
+ bool val = false;
+ int error = read(read_fd_, &val, sizeof(val));
+ ASSERT_GE(error, 0);
+ }
+
+ void Reset() {
+ if (read_fd_)
+ close(read_fd_);
+ if (write_fd_)
+ close(write_fd_);
+ }
+
+ private:
+ int read_fd_ = -1;
+ int write_fd_ = -1;
+ Pipe(int read_fd, int write_fd) : read_fd_{read_fd}, write_fd_{write_fd} {}
+};
+
+// A binder services that minics a compositor that consumes buffers. It provides
+// one Binder interface to create a new Surface for buffer producer to write
+// into; while itself will carry out no-op buffer consuming by acquiring then
+// releasing the buffer immediately.
+class BufferTransportService : public BBinder {
+ public:
+ BufferTransportService() = default;
+ ~BufferTransportService() = default;
+
+ virtual status_t onTransact(uint32_t code, const Parcel& data, Parcel* reply,
+ uint32_t flags = 0) {
+ (void)flags;
+ (void)data;
+ switch (code) {
+ case CREATE_BUFFER_QUEUE: {
+ auto new_queue = std::make_shared<BufferQueueHolder>(this);
+ reply->writeStrongBinder(
+ IGraphicBufferProducer::asBinder(new_queue->producer_));
+ buffer_queues_.push_back(new_queue);
+ return NO_ERROR;
+ }
+ default:
+ return UNKNOWN_TRANSACTION;
+ };
+ }
+
+ private:
+ struct FrameListener : public ConsumerBase::FrameAvailableListener {
+ public:
+ FrameListener(BufferTransportService* service,
+ sp<BufferItemConsumer> buffer_item_consumer)
+ : service_(service),
+ buffer_item_consumer_(buffer_item_consumer) {}
+
+ void onFrameAvailable(const BufferItem& /*item*/) override {
+ std::unique_lock<std::mutex> autolock(service_->reader_mutex_);
+
+ BufferItem buffer;
+ status_t ret = 0;
+ {
+ ATRACE_NAME("AcquireBuffer");
+ ret = buffer_item_consumer_->acquireBuffer(&buffer, /*presentWhen=*/0,
+ /*waitForFence=*/false);
+ }
+
+ if (ret != NO_ERROR) {
+ LOG(ERROR) << "Failed to acquire next buffer.";
+ return;
+ }
+
+ {
+ ATRACE_NAME("ReleaseBuffer");
+ ret = buffer_item_consumer_->releaseBuffer(buffer);
+ }
+
+ if (ret != NO_ERROR) {
+ LOG(ERROR) << "Failed to release buffer.";
+ return;
+ }
+ }
+
+ private:
+ BufferTransportService* service_ = nullptr;
+ sp<BufferItemConsumer> buffer_item_consumer_;
+ };
+
+ struct BufferQueueHolder {
+ explicit BufferQueueHolder(BufferTransportService* service) {
+ BufferQueue::createBufferQueue(&producer_, &consumer_);
+
+ sp<BufferItemConsumer> buffer_item_consumer =
+ new BufferItemConsumer(consumer_, kBufferUsage, kMaxAcquiredImages,
+ /*controlledByApp=*/true);
+ buffer_item_consumer->setName(String8("BinderBufferTransport"));
+ frame_listener_ = new FrameListener(service, buffer_item_consumer);
+ buffer_item_consumer->setFrameAvailableListener(frame_listener_);
+ }
+
+ sp<IGraphicBufferProducer> producer_;
+ sp<IGraphicBufferConsumer> consumer_;
+ sp<FrameListener> frame_listener_;
+ };
+
+ std::mutex reader_mutex_;
+ std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
+};
+
+// A virtual interfaces that abstracts the common BufferQueue operations, so
+// that the test suite can use the same test case to drive different types of
+// transport backends.
+class BufferTransport {
+ public:
+ virtual ~BufferTransport() {}
+
+ virtual int Start() = 0;
+ virtual sp<Surface> CreateSurface() = 0;
+};
+
+// Binder-based buffer transport backend.
+//
+// On Start() a new process will be swapned to run a Binder server that
+// actually consumes the buffer.
+// On CreateSurface() a new Binder BufferQueue will be created, which the
+// service holds the concrete binder node of the IGraphicBufferProducer while
+// sending the binder proxy to the client. In another word, the producer side
+// operations are carried out process while the consumer side operations are
+// carried out within the BufferTransportService's own process.
+class BinderBufferTransport : public BufferTransport {
+ public:
+ BinderBufferTransport() {}
+
+ ~BinderBufferTransport() {
+ if (client_pipe_.IsValid()) {
+ client_pipe_.Signal();
+ LOG(INFO) << "Client signals service to shut down.";
+ }
+ }
+
+ int Start() override {
+ // Fork a process to run a binder server. The parent process will return
+ // a pipe here, and we use the pipe to signal the binder server to exit.
+ client_pipe_ = CreateBinderServer();
+
+ // Wait until service is ready.
+ LOG(INFO) << "Service is ready for client.";
+ client_pipe_.Wait();
+ return 0;
+ }
+
+ sp<Surface> CreateSurface() override {
+ sp<IServiceManager> sm = defaultServiceManager();
+ service_ = sm->getService(kBinderService);
+ if (service_ == nullptr) {
+ LOG(ERROR) << "Failed to set the benchmark service.";
+ return nullptr;
+ }
+
+ Parcel data;
+ Parcel reply;
+ int error = service_->transact(CREATE_BUFFER_QUEUE, data, &reply);
+ if (error != NO_ERROR) {
+ LOG(ERROR) << "Failed to get buffer queue over binder.";
+ return nullptr;
+ }
+
+ sp<IBinder> binder;
+ error = reply.readNullableStrongBinder(&binder);
+ if (error != NO_ERROR) {
+ LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder.";
+ return nullptr;
+ }
+
+ auto producer = interface_cast<IGraphicBufferProducer>(binder);
+ if (producer == nullptr) {
+ LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder.";
+ return nullptr;
+ }
+
+ sp<Surface> surface = new Surface(producer, /*controlledByApp=*/true);
+
+ // Set buffer dimension.
+ ANativeWindow* window = static_cast<ANativeWindow*>(surface.get());
+ ANativeWindow_setBuffersGeometry(window, kBufferWidth, kBufferHeight,
+ kBufferFormat);
+
+ return surface;
+ }
+
+ private:
+ static Pipe CreateBinderServer() {
+ std::tuple<Pipe, Pipe> pipe_pair = Pipe::CreatePipePair();
+ pid_t pid = fork();
+ if (pid) {
+ // parent, i.e. the client side.
+ ProcessState::self()->startThreadPool();
+ LOG(INFO) << "Binder server pid: " << pid;
+ return std::move(std::get<0>(pipe_pair));
+ } else {
+ // child, i.e. the service side.
+ Pipe service_pipe = std::move(std::get<1>(pipe_pair));
+
+ ProcessState::self()->startThreadPool();
+ sp<IServiceManager> sm = defaultServiceManager();
+ sp<BufferTransportService> service = new BufferTransportService;
+ sm->addService(kBinderService, service, false);
+
+ LOG(INFO) << "Binder Service Running...";
+
+ service_pipe.Signal();
+ service_pipe.Wait();
+
+ LOG(INFO) << "Service Exiting...";
+ exit(EXIT_SUCCESS);
+
+ /* never get here */
+ return {};
+ }
+ }
+
+ sp<IBinder> service_;
+ Pipe client_pipe_;
+};
+
+// BufferHub/PDX-based buffer transport.
+//
+// On Start() a new thread will be swapned to run an epoll polling thread which
+// minics the behavior of a compositor. Similar to Binder-based backend, the
+// buffer available handler is also a no-op: Buffer gets acquired and released
+// immediately.
+// On CreateSurface() a pair of dvr::ProducerQueue and dvr::ConsumerQueue will
+// be created. The epoll thread holds on the consumer queue and dequeues buffer
+// from it; while the producer queue will be wrapped in a Surface and returned
+// to test suite.
+class BufferHubTransport : public BufferTransport {
+ public:
+ virtual ~BufferHubTransport() {
+ stopped_.store(true);
+ if (reader_thread_.joinable()) {
+ reader_thread_.join();
+ }
+ }
+
+ int Start() override {
+ int ret = epoll_fd_.Create();
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to create epoll fd: %s", strerror(-ret);
+ return -1;
+ }
+
+ // Create the reader thread.
+ reader_thread_ = std::thread([this]() {
+ int ret = dvrSetSchedulerClass(0, "graphics");
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to set thread priority";
+ return;
+ }
+
+
+ ret = dvrSetCpuPartition(0, "/system/performance");
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to set thread cpu partition";
+ return;
+ }
+
+ stopped_.store(false);
+ LOG(INFO) << "Reader Thread Running...";
+
+ while (!stopped_.load()) {
+ std::array<epoll_event, kMaxQueueCounts> events;
+
+ // Don't sleep forever so that we will have a chance to wake up.
+ const int ret = epoll_fd_.Wait(events.data(), events.size(),
+ /*timeout=*/100);
+ if (ret < 0) {
+ LOG(ERROR) << "Error polling consumer queues.";
+ continue;
+ }
+ if (ret == 0) {
+ continue;
+ }
+
+ const int num_events = ret;
+ for (int i = 0; i < num_events; i++) {
+ uint32_t surface_index = events[i].data.u32;
+ // LOG(INFO) << "!!! handle queue events index: " << surface_index;
+ buffer_queues_[surface_index]->consumer_queue_->HandleQueueEvents();
+ }
+ }
+
+ LOG(INFO) << "Reader Thread Exiting...";
+ });
+
+ return 0;
+ }
+
+ sp<Surface> CreateSurface() override {
+ std::lock_guard<std::mutex> autolock(queue_mutex_);
+
+ auto new_queue = std::make_shared<BufferQueueHolder>();
+ if (new_queue->producer_ == nullptr) {
+ LOG(ERROR) << "Failed to create buffer producer.";
+ return nullptr;
+ }
+
+ sp<Surface> surface =
+ new Surface(new_queue->producer_, /*controlledByApp=*/true);
+
+ // Set buffer dimension.
+ ANativeWindow* window = static_cast<ANativeWindow*>(surface.get());
+ ANativeWindow_setBuffersGeometry(window, kBufferWidth, kBufferHeight,
+ kBufferFormat);
+
+ // Use the next position as buffer_queue index.
+ uint32_t index = buffer_queues_.size();
+ epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u32 = index}};
+ const int ret = epoll_fd_.Control(
+ EPOLL_CTL_ADD, new_queue->consumer_queue_->queue_fd(), &event);
+ if (ret < 0) {
+ LOG(ERROR) << "Failed to track consumer queue: " << strerror(-ret)
+ << ", consumer queue fd: "
+ << new_queue->consumer_queue_->queue_fd();
+ return nullptr;
+ }
+
+ new_queue->queue_index_ = index;
+ buffer_queues_.push_back(new_queue);
+ return surface;
+ }
+
+ private:
+ struct BufferQueueHolder {
+ BufferQueueHolder() {
+ ProducerQueueConfigBuilder config_builder;
+ producer_queue_ =
+ ProducerQueue::Create(config_builder.SetDefaultWidth(kBufferWidth)
+ .SetDefaultHeight(kBufferHeight)
+ .SetDefaultFormat(kBufferFormat)
+ .SetMetadata<DvrNativeBufferMetadata>()
+ .Build(),
+ UsagePolicy{});
+ consumer_queue_ = producer_queue_->CreateConsumerQueue();
+ consumer_queue_->SetBufferAvailableCallback([this]() {
+ size_t index = 0;
+ pdx::LocalHandle fence;
+ DvrNativeBufferMetadata meta;
+ pdx::Status<std::shared_ptr<BufferConsumer>> status;
+
+ {
+ ATRACE_NAME("AcquireBuffer");
+ status = consumer_queue_->Dequeue(0, &index, &meta, &fence);
+ }
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to dequeue consumer buffer, error: "
+ << status.GetErrorMessage().c_str();
+ return;
+ }
+
+ auto buffer = status.take();
+
+ if (buffer) {
+ ATRACE_NAME("ReleaseBuffer");
+ buffer->ReleaseAsync();
+ }
+ });
+
+ producer_ = BufferHubQueueProducer::Create(producer_queue_);
+ }
+
+ int count_ = 0;
+ int queue_index_;
+ std::shared_ptr<ProducerQueue> producer_queue_;
+ std::shared_ptr<ConsumerQueue> consumer_queue_;
+ sp<IGraphicBufferProducer> producer_;
+ };
+
+ std::atomic<bool> stopped_;
+ std::thread reader_thread_;
+
+ // Mutex to guard epoll_fd_ and buffer_queues_.
+ std::mutex queue_mutex_;
+ EpollFileDescriptor epoll_fd_;
+ std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
+};
+
+enum TransportType {
+ kBinderBufferTransport,
+ kBufferHubTransport,
+};
+
+// Main test suite, which supports two transport backend: 1) BinderBufferQueue,
+// 2) BufferHubQueue. The test case drives the producer end of both transport
+// backend by queuing buffers into the buffer queue by using ANativeWindow API.
+class BufferTransportBenchmark
+ : public ::testing::TestWithParam<TransportType> {
+ public:
+ void SetUp() override {
+ switch (GetParam()) {
+ case kBinderBufferTransport:
+ transport_.reset(new BinderBufferTransport);
+ break;
+ case kBufferHubTransport:
+ transport_.reset(new BufferHubTransport);
+ break;
+ default:
+ FAIL() << "Unknown test case.";
+ break;
+ }
+ }
+
+ protected:
+ void ProduceBuffers(sp<Surface> surface, int iterations, int sleep_usec) {
+ ANativeWindow* window = static_cast<ANativeWindow*>(surface.get());
+ ANativeWindow_Buffer buffer;
+ int32_t error = 0;
+
+ for (int i = 0; i < iterations; i++) {
+ usleep(sleep_usec);
+
+ {
+ ATRACE_NAME("GainBuffer");
+ error = ANativeWindow_lock(window, &buffer,
+ /*inOutDirtyBounds=*/nullptr);
+ }
+ ASSERT_EQ(error, 0);
+
+ {
+ ATRACE_NAME("PostBuffer");
+ error = ANativeWindow_unlockAndPost(window);
+ }
+ ASSERT_EQ(error, 0);
+ }
+ }
+
+ std::unique_ptr<BufferTransport> transport_;
+};
+
+TEST_P(BufferTransportBenchmark, ContinuousLoad) {
+ ASSERT_NE(transport_, nullptr);
+ const int ret = transport_->Start();
+ ASSERT_EQ(ret, 0);
+
+ LOG(INFO) << "Start Running.";
+
+ std::vector<std::thread> writer_threads;
+ for (int i = 0; i < gConcurrency; i++) {
+ std::thread writer_thread = std::thread([this]() {
+ sp<Surface> surface = transport_->CreateSurface();
+ ASSERT_NE(surface, nullptr);
+
+ ASSERT_NO_FATAL_FAILURE(
+ ProduceBuffers(surface, gIterations, gSleepIntervalUs));
+
+ usleep(1000 * 100);
+ });
+
+ writer_threads.push_back(std::move(writer_thread));
+ }
+
+ for (auto& writer_thread : writer_threads) {
+ writer_thread.join();
+ }
+
+ LOG(INFO) << "All done.";
+};
+
+INSTANTIATE_TEST_CASE_P(BufferTransportBenchmarkInstance,
+ BufferTransportBenchmark,
+ ::testing::ValuesIn({kBinderBufferTransport,
+ kBufferHubTransport}));
+
+// To run binder-based benchmark, use:
+// adb shell buffer_transport_benchmark \
+// --gtest_filter="BufferTransportBenchmark.ContinuousLoad/0"
+//
+// To run bufferhub-based benchmark, use:
+// adb shell buffer_transport_benchmark \
+// --gtest_filter="BufferTransportBenchmark.ContinuousLoad/1"
+int main(int argc, char** argv) {
+ bool tracing_enabled = false;
+
+ // Parse arguments in addition to "--gtest_filter" paramters.
+ for (int i = 1; i < argc; i++) {
+ if (std::string(argv[i]) == "--help") {
+ std::cout << "Usage: binderThroughputTest [OPTIONS]" << std::endl;
+ std::cout << "\t-c N: Specify number of concurrent writer threads, "
+ "(default: 1, max: 128)."
+ << std::endl;
+ std::cout << "\t-i N: Specify number of iterations, (default: 1000)."
+ << std::endl;
+ std::cout << "\t-s N: Specify sleep interval in usec, (default: 16000)."
+ << std::endl;
+ std::cout << "\t--trace: Enable systrace logging."
+ << std::endl;
+ return 0;
+ }
+ if (std::string(argv[i]) == "-c") {
+ gConcurrency = atoi(argv[i + 1]);
+ i++;
+ continue;
+ }
+ if (std::string(argv[i]) == "-s") {
+ gSleepIntervalUs = atoi(argv[i + 1]);
+ i++;
+ continue;
+ }
+ if (std::string(argv[i]) == "-i") {
+ gIterations = atoi(argv[i + 1]);
+ i++;
+ continue;
+ }
+ if (std::string(argv[i]) == "--trace") {
+ tracing_enabled = true;
+ continue;
+ }
+ }
+
+ // Setup ATRACE/systrace based on command line.
+ atrace_setup();
+ atrace_set_tracing_enabled(tracing_enabled);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}