From 9665d919506cb6e8c5df33c867de9831e5b57c28 Mon Sep 17 00:00:00 2001 From: Jiwen 'Steve' Cai Date: Fri, 1 Sep 2017 10:19:47 -0700 Subject: Add buffer transport benchmark 1/ This benchmark supports both Binder-based BufferQueue and PDX-based BufferHub. 2/ On the producer side, it uses the same ANativeWindow API to gain buffer and post buffer for both backends. 3/ It supports multiple concurrent producer threads to minic buffers comming from different applications. 4/ For Binder backend, we use a dedicated process to run a Binder service, which acquires and releases buffers via BufferItemConsumer. 5/ For BufferHub backend, we use a SchedFIFO thread to epoll on multiple BufferHub ConsumerQueue(s). 6/ Four operations are traced via "ATRACE_TAG_ALWAYS" tag: GainBuffer, PostBuffer, AcquireBuffer, ReleaseBuffer. We use ATRACE_TAG_ALWAYS tag so that user can disable all other trace tags (i.e. gfx, video) to only trace events from this benchmark without being impacted by the systrace overhead. But note that the systrace is only enabled if specifying "--trace" in the commandline. Bug: 66921451 Test: buffer_transport_benchmark Change-Id: I8d1b70364e504c48658624caf4d0508a9508e3b0 --- libs/vr/libbufferhubqueue/tests/Android.bp | 16 + .../tests/buffer_transport_benchmark.cpp | 619 +++++++++++++++++++++ 2 files changed, 635 insertions(+) create mode 100644 libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp diff --git a/libs/vr/libbufferhubqueue/tests/Android.bp b/libs/vr/libbufferhubqueue/tests/Android.bp index 8bd1ef1414..c4ffb41385 100644 --- a/libs/vr/libbufferhubqueue/tests/Android.bp +++ b/libs/vr/libbufferhubqueue/tests/Android.bp @@ -12,6 +12,7 @@ shared_libraries = [ "libhardware", "libui", "libutils", + "libnativewindow", ] static_libraries = [ @@ -20,6 +21,7 @@ static_libraries = [ "libchrome", "libdvrcommon", "libpdx_default_transport", + "libperformance", ] cc_test { @@ -51,3 +53,17 @@ cc_test { name: "buffer_hub_queue_producer-test", tags: ["optional"], } + +cc_test { + srcs: ["buffer_transport_benchmark.cpp"], + static_libs: static_libraries, + shared_libs: shared_libraries, + header_libs: header_libraries, + cflags: [ + "-DLOG_TAG=\"buffer_transport_benchmark\"", + "-DTRACE=0", + "-O2", + ], + name: "buffer_transport_benchmark", + tags: ["optional"], +} diff --git a/libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp b/libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp new file mode 100644 index 0000000000..5b580df284 --- /dev/null +++ b/libs/vr/libbufferhubqueue/tests/buffer_transport_benchmark.cpp @@ -0,0 +1,619 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include // for pipe + +// Use ALWAYS at the tag level. Control is performed manually during command +// line processing. +#ifdef ATRACE_TAG +#undef ATRACE_TAG +#endif +#define ATRACE_TAG ATRACE_TAG_ALWAYS + +using namespace android; +using namespace android::dvr; + +static const String16 kBinderService = String16("bufferTransport"); +static const uint32_t kBufferWidth = 100; +static const uint32_t kBufferHeight = 1; +static const uint32_t kBufferLayerCount = 1; +static const uint32_t kBufferFormat = HAL_PIXEL_FORMAT_BLOB; +static const uint64_t kBufferUsage = + GRALLOC_USAGE_SW_READ_OFTEN | GRALLOC_USAGE_SW_WRITE_OFTEN; +static const int kMaxAcquiredImages = 1; +static const size_t kMaxQueueCounts = 128; + +static int gConcurrency = 1; // 1 writer at a time +static int gIterations = 1000; // 1K times +static int gSleepIntervalUs = 16 * 1000; // 16ms + +enum BufferTransportServiceCode { + CREATE_BUFFER_QUEUE = IBinder::FIRST_CALL_TRANSACTION, +}; + +// A mininal cross process helper class based on a bidirectional pipe pair. This +// is used to signal that Binder-based BufferTransportService has finished +// initialization. +class Pipe { + public: + static std::tuple CreatePipePair() { + int a[2] = {-1, -1}; + int b[2] = {-1, -1}; + + pipe(a); + pipe(b); + + return std::make_tuple(Pipe(a[0], b[1]), Pipe(b[0], a[1])); + } + + Pipe() = default; + + Pipe(Pipe&& other) { + read_fd_ = other.read_fd_; + write_fd_ = other.write_fd_; + other.read_fd_ = 0; + other.write_fd_ = 0; + } + + Pipe& operator=(Pipe&& other) { + Reset(); + read_fd_ = other.read_fd_; + write_fd_ = other.write_fd_; + other.read_fd_ = 0; + other.write_fd_ = 0; + return *this; + } + + ~Pipe() { Reset(); } + + Pipe(const Pipe&) = delete; + Pipe& operator=(const Pipe&) = delete; + Pipe& operator=(const Pipe&&) = delete; + + bool IsValid() { return read_fd_ > 0 && write_fd_ > 0; } + + void Signal() { + bool val = true; + int error = write(write_fd_, &val, sizeof(val)); + ASSERT_GE(error, 0); + }; + + void Wait() { + bool val = false; + int error = read(read_fd_, &val, sizeof(val)); + ASSERT_GE(error, 0); + } + + void Reset() { + if (read_fd_) + close(read_fd_); + if (write_fd_) + close(write_fd_); + } + + private: + int read_fd_ = -1; + int write_fd_ = -1; + Pipe(int read_fd, int write_fd) : read_fd_{read_fd}, write_fd_{write_fd} {} +}; + +// A binder services that minics a compositor that consumes buffers. It provides +// one Binder interface to create a new Surface for buffer producer to write +// into; while itself will carry out no-op buffer consuming by acquiring then +// releasing the buffer immediately. +class BufferTransportService : public BBinder { + public: + BufferTransportService() = default; + ~BufferTransportService() = default; + + virtual status_t onTransact(uint32_t code, const Parcel& data, Parcel* reply, + uint32_t flags = 0) { + (void)flags; + (void)data; + switch (code) { + case CREATE_BUFFER_QUEUE: { + auto new_queue = std::make_shared(this); + reply->writeStrongBinder( + IGraphicBufferProducer::asBinder(new_queue->producer_)); + buffer_queues_.push_back(new_queue); + return NO_ERROR; + } + default: + return UNKNOWN_TRANSACTION; + }; + } + + private: + struct FrameListener : public ConsumerBase::FrameAvailableListener { + public: + FrameListener(BufferTransportService* service, + sp buffer_item_consumer) + : service_(service), + buffer_item_consumer_(buffer_item_consumer) {} + + void onFrameAvailable(const BufferItem& /*item*/) override { + std::unique_lock autolock(service_->reader_mutex_); + + BufferItem buffer; + status_t ret = 0; + { + ATRACE_NAME("AcquireBuffer"); + ret = buffer_item_consumer_->acquireBuffer(&buffer, /*presentWhen=*/0, + /*waitForFence=*/false); + } + + if (ret != NO_ERROR) { + LOG(ERROR) << "Failed to acquire next buffer."; + return; + } + + { + ATRACE_NAME("ReleaseBuffer"); + ret = buffer_item_consumer_->releaseBuffer(buffer); + } + + if (ret != NO_ERROR) { + LOG(ERROR) << "Failed to release buffer."; + return; + } + } + + private: + BufferTransportService* service_ = nullptr; + sp buffer_item_consumer_; + }; + + struct BufferQueueHolder { + explicit BufferQueueHolder(BufferTransportService* service) { + BufferQueue::createBufferQueue(&producer_, &consumer_); + + sp buffer_item_consumer = + new BufferItemConsumer(consumer_, kBufferUsage, kMaxAcquiredImages, + /*controlledByApp=*/true); + buffer_item_consumer->setName(String8("BinderBufferTransport")); + frame_listener_ = new FrameListener(service, buffer_item_consumer); + buffer_item_consumer->setFrameAvailableListener(frame_listener_); + } + + sp producer_; + sp consumer_; + sp frame_listener_; + }; + + std::mutex reader_mutex_; + std::vector> buffer_queues_; +}; + +// A virtual interfaces that abstracts the common BufferQueue operations, so +// that the test suite can use the same test case to drive different types of +// transport backends. +class BufferTransport { + public: + virtual ~BufferTransport() {} + + virtual int Start() = 0; + virtual sp CreateSurface() = 0; +}; + +// Binder-based buffer transport backend. +// +// On Start() a new process will be swapned to run a Binder server that +// actually consumes the buffer. +// On CreateSurface() a new Binder BufferQueue will be created, which the +// service holds the concrete binder node of the IGraphicBufferProducer while +// sending the binder proxy to the client. In another word, the producer side +// operations are carried out process while the consumer side operations are +// carried out within the BufferTransportService's own process. +class BinderBufferTransport : public BufferTransport { + public: + BinderBufferTransport() {} + + ~BinderBufferTransport() { + if (client_pipe_.IsValid()) { + client_pipe_.Signal(); + LOG(INFO) << "Client signals service to shut down."; + } + } + + int Start() override { + // Fork a process to run a binder server. The parent process will return + // a pipe here, and we use the pipe to signal the binder server to exit. + client_pipe_ = CreateBinderServer(); + + // Wait until service is ready. + LOG(INFO) << "Service is ready for client."; + client_pipe_.Wait(); + return 0; + } + + sp CreateSurface() override { + sp sm = defaultServiceManager(); + service_ = sm->getService(kBinderService); + if (service_ == nullptr) { + LOG(ERROR) << "Failed to set the benchmark service."; + return nullptr; + } + + Parcel data; + Parcel reply; + int error = service_->transact(CREATE_BUFFER_QUEUE, data, &reply); + if (error != NO_ERROR) { + LOG(ERROR) << "Failed to get buffer queue over binder."; + return nullptr; + } + + sp binder; + error = reply.readNullableStrongBinder(&binder); + if (error != NO_ERROR) { + LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder."; + return nullptr; + } + + auto producer = interface_cast(binder); + if (producer == nullptr) { + LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder."; + return nullptr; + } + + sp surface = new Surface(producer, /*controlledByApp=*/true); + + // Set buffer dimension. + ANativeWindow* window = static_cast(surface.get()); + ANativeWindow_setBuffersGeometry(window, kBufferWidth, kBufferHeight, + kBufferFormat); + + return surface; + } + + private: + static Pipe CreateBinderServer() { + std::tuple pipe_pair = Pipe::CreatePipePair(); + pid_t pid = fork(); + if (pid) { + // parent, i.e. the client side. + ProcessState::self()->startThreadPool(); + LOG(INFO) << "Binder server pid: " << pid; + return std::move(std::get<0>(pipe_pair)); + } else { + // child, i.e. the service side. + Pipe service_pipe = std::move(std::get<1>(pipe_pair)); + + ProcessState::self()->startThreadPool(); + sp sm = defaultServiceManager(); + sp service = new BufferTransportService; + sm->addService(kBinderService, service, false); + + LOG(INFO) << "Binder Service Running..."; + + service_pipe.Signal(); + service_pipe.Wait(); + + LOG(INFO) << "Service Exiting..."; + exit(EXIT_SUCCESS); + + /* never get here */ + return {}; + } + } + + sp service_; + Pipe client_pipe_; +}; + +// BufferHub/PDX-based buffer transport. +// +// On Start() a new thread will be swapned to run an epoll polling thread which +// minics the behavior of a compositor. Similar to Binder-based backend, the +// buffer available handler is also a no-op: Buffer gets acquired and released +// immediately. +// On CreateSurface() a pair of dvr::ProducerQueue and dvr::ConsumerQueue will +// be created. The epoll thread holds on the consumer queue and dequeues buffer +// from it; while the producer queue will be wrapped in a Surface and returned +// to test suite. +class BufferHubTransport : public BufferTransport { + public: + virtual ~BufferHubTransport() { + stopped_.store(true); + if (reader_thread_.joinable()) { + reader_thread_.join(); + } + } + + int Start() override { + int ret = epoll_fd_.Create(); + if (ret < 0) { + LOG(ERROR) << "Failed to create epoll fd: %s", strerror(-ret); + return -1; + } + + // Create the reader thread. + reader_thread_ = std::thread([this]() { + int ret = dvrSetSchedulerClass(0, "graphics"); + if (ret < 0) { + LOG(ERROR) << "Failed to set thread priority"; + return; + } + + + ret = dvrSetCpuPartition(0, "/system/performance"); + if (ret < 0) { + LOG(ERROR) << "Failed to set thread cpu partition"; + return; + } + + stopped_.store(false); + LOG(INFO) << "Reader Thread Running..."; + + while (!stopped_.load()) { + std::array events; + + // Don't sleep forever so that we will have a chance to wake up. + const int ret = epoll_fd_.Wait(events.data(), events.size(), + /*timeout=*/100); + if (ret < 0) { + LOG(ERROR) << "Error polling consumer queues."; + continue; + } + if (ret == 0) { + continue; + } + + const int num_events = ret; + for (int i = 0; i < num_events; i++) { + uint32_t surface_index = events[i].data.u32; + // LOG(INFO) << "!!! handle queue events index: " << surface_index; + buffer_queues_[surface_index]->consumer_queue_->HandleQueueEvents(); + } + } + + LOG(INFO) << "Reader Thread Exiting..."; + }); + + return 0; + } + + sp CreateSurface() override { + std::lock_guard autolock(queue_mutex_); + + auto new_queue = std::make_shared(); + if (new_queue->producer_ == nullptr) { + LOG(ERROR) << "Failed to create buffer producer."; + return nullptr; + } + + sp surface = + new Surface(new_queue->producer_, /*controlledByApp=*/true); + + // Set buffer dimension. + ANativeWindow* window = static_cast(surface.get()); + ANativeWindow_setBuffersGeometry(window, kBufferWidth, kBufferHeight, + kBufferFormat); + + // Use the next position as buffer_queue index. + uint32_t index = buffer_queues_.size(); + epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u32 = index}}; + const int ret = epoll_fd_.Control( + EPOLL_CTL_ADD, new_queue->consumer_queue_->queue_fd(), &event); + if (ret < 0) { + LOG(ERROR) << "Failed to track consumer queue: " << strerror(-ret) + << ", consumer queue fd: " + << new_queue->consumer_queue_->queue_fd(); + return nullptr; + } + + new_queue->queue_index_ = index; + buffer_queues_.push_back(new_queue); + return surface; + } + + private: + struct BufferQueueHolder { + BufferQueueHolder() { + ProducerQueueConfigBuilder config_builder; + producer_queue_ = + ProducerQueue::Create(config_builder.SetDefaultWidth(kBufferWidth) + .SetDefaultHeight(kBufferHeight) + .SetDefaultFormat(kBufferFormat) + .SetMetadata() + .Build(), + UsagePolicy{}); + consumer_queue_ = producer_queue_->CreateConsumerQueue(); + consumer_queue_->SetBufferAvailableCallback([this]() { + size_t index = 0; + pdx::LocalHandle fence; + DvrNativeBufferMetadata meta; + pdx::Status> status; + + { + ATRACE_NAME("AcquireBuffer"); + status = consumer_queue_->Dequeue(0, &index, &meta, &fence); + } + if (!status.ok()) { + LOG(ERROR) << "Failed to dequeue consumer buffer, error: " + << status.GetErrorMessage().c_str(); + return; + } + + auto buffer = status.take(); + + if (buffer) { + ATRACE_NAME("ReleaseBuffer"); + buffer->ReleaseAsync(); + } + }); + + producer_ = BufferHubQueueProducer::Create(producer_queue_); + } + + int count_ = 0; + int queue_index_; + std::shared_ptr producer_queue_; + std::shared_ptr consumer_queue_; + sp producer_; + }; + + std::atomic stopped_; + std::thread reader_thread_; + + // Mutex to guard epoll_fd_ and buffer_queues_. + std::mutex queue_mutex_; + EpollFileDescriptor epoll_fd_; + std::vector> buffer_queues_; +}; + +enum TransportType { + kBinderBufferTransport, + kBufferHubTransport, +}; + +// Main test suite, which supports two transport backend: 1) BinderBufferQueue, +// 2) BufferHubQueue. The test case drives the producer end of both transport +// backend by queuing buffers into the buffer queue by using ANativeWindow API. +class BufferTransportBenchmark + : public ::testing::TestWithParam { + public: + void SetUp() override { + switch (GetParam()) { + case kBinderBufferTransport: + transport_.reset(new BinderBufferTransport); + break; + case kBufferHubTransport: + transport_.reset(new BufferHubTransport); + break; + default: + FAIL() << "Unknown test case."; + break; + } + } + + protected: + void ProduceBuffers(sp surface, int iterations, int sleep_usec) { + ANativeWindow* window = static_cast(surface.get()); + ANativeWindow_Buffer buffer; + int32_t error = 0; + + for (int i = 0; i < iterations; i++) { + usleep(sleep_usec); + + { + ATRACE_NAME("GainBuffer"); + error = ANativeWindow_lock(window, &buffer, + /*inOutDirtyBounds=*/nullptr); + } + ASSERT_EQ(error, 0); + + { + ATRACE_NAME("PostBuffer"); + error = ANativeWindow_unlockAndPost(window); + } + ASSERT_EQ(error, 0); + } + } + + std::unique_ptr transport_; +}; + +TEST_P(BufferTransportBenchmark, ContinuousLoad) { + ASSERT_NE(transport_, nullptr); + const int ret = transport_->Start(); + ASSERT_EQ(ret, 0); + + LOG(INFO) << "Start Running."; + + std::vector writer_threads; + for (int i = 0; i < gConcurrency; i++) { + std::thread writer_thread = std::thread([this]() { + sp surface = transport_->CreateSurface(); + ASSERT_NE(surface, nullptr); + + ASSERT_NO_FATAL_FAILURE( + ProduceBuffers(surface, gIterations, gSleepIntervalUs)); + + usleep(1000 * 100); + }); + + writer_threads.push_back(std::move(writer_thread)); + } + + for (auto& writer_thread : writer_threads) { + writer_thread.join(); + } + + LOG(INFO) << "All done."; +}; + +INSTANTIATE_TEST_CASE_P(BufferTransportBenchmarkInstance, + BufferTransportBenchmark, + ::testing::ValuesIn({kBinderBufferTransport, + kBufferHubTransport})); + +// To run binder-based benchmark, use: +// adb shell buffer_transport_benchmark \ +// --gtest_filter="BufferTransportBenchmark.ContinuousLoad/0" +// +// To run bufferhub-based benchmark, use: +// adb shell buffer_transport_benchmark \ +// --gtest_filter="BufferTransportBenchmark.ContinuousLoad/1" +int main(int argc, char** argv) { + bool tracing_enabled = false; + + // Parse arguments in addition to "--gtest_filter" paramters. + for (int i = 1; i < argc; i++) { + if (std::string(argv[i]) == "--help") { + std::cout << "Usage: binderThroughputTest [OPTIONS]" << std::endl; + std::cout << "\t-c N: Specify number of concurrent writer threads, " + "(default: 1, max: 128)." + << std::endl; + std::cout << "\t-i N: Specify number of iterations, (default: 1000)." + << std::endl; + std::cout << "\t-s N: Specify sleep interval in usec, (default: 16000)." + << std::endl; + std::cout << "\t--trace: Enable systrace logging." + << std::endl; + return 0; + } + if (std::string(argv[i]) == "-c") { + gConcurrency = atoi(argv[i + 1]); + i++; + continue; + } + if (std::string(argv[i]) == "-s") { + gSleepIntervalUs = atoi(argv[i + 1]); + i++; + continue; + } + if (std::string(argv[i]) == "-i") { + gIterations = atoi(argv[i + 1]); + i++; + continue; + } + if (std::string(argv[i]) == "--trace") { + tracing_enabled = true; + continue; + } + } + + // Setup ATRACE/systrace based on command line. + atrace_setup(); + atrace_set_tracing_enabled(tracing_enabled); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3-59-g8ed1b