diff options
| author | 2018-11-08 23:50:40 +0000 | |
|---|---|---|
| committer | 2018-11-08 23:50:40 +0000 | |
| commit | cb10f9567a5940407c1ceb651a2a37cca66c0f75 (patch) | |
| tree | 51f5b24d1b95b89df3474d24ac0049cc79162901 | |
| parent | 8d2680b98cc9ff94c7a995e6a98cf9ed48759b95 (diff) | |
| parent | f09f63f716de45558fa56870a8881305b91a67ce (diff) | |
Merge "Let producer gain posted buffer if no released buffer exist."
3 files changed, 293 insertions, 70 deletions
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp index e1c1aa96c0..b8e2f9dec0 100644 --- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp +++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp @@ -69,7 +69,7 @@ void BufferHubQueue::Initialize() { .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}}; ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); if (ret < 0) { - ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s", + ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__, strerror(-ret)); } } @@ -77,7 +77,7 @@ void BufferHubQueue::Initialize() { Status<void> BufferHubQueue::ImportQueue() { auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>(); if (!status) { - ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s", + ALOGE("%s: Failed to import queue: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return ErrorStatus(status.error()); } else { @@ -136,9 +136,7 @@ BufferHubQueue::CreateConsumerQueueParcelable(bool silent) { consumer_queue->GetChannel()->TakeChannelParcelable()); if (!queue_parcelable.IsValid()) { - ALOGE( - "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create " - "consumer queue parcelable."); + ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__); return ErrorStatus(EINVAL); } @@ -169,8 +167,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) { } if (ret < 0 && ret != -EINTR) { - ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s", - strerror(-ret)); + ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret)); return false; } @@ -264,14 +261,14 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { // wait will be tried again to acquire the newly imported buffer. auto buffer_status = OnBufferAllocated(); if (!buffer_status) { - ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s", + ALOGE("%s: Failed to import buffer: %s", __FUNCTION__, buffer_status.GetErrorMessage().c_str()); } } else if (events & EPOLLHUP) { - ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!"); + ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__); hung_up_ = true; } else { - ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events); + ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events); } return {}; @@ -279,12 +276,11 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { Status<void> BufferHubQueue::AddBuffer( const std::shared_ptr<BufferHubBase>& buffer, size_t slot) { - ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu", - buffer->id(), slot); + ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(), + slot); if (is_full()) { - ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu", - capacity_); + ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_); return ErrorStatus(E2BIG); } @@ -303,7 +299,7 @@ Status<void> BufferHubQueue::AddBuffer( const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); if (ret < 0) { - ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", + ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__, strerror(-ret)); return ErrorStatus(-ret); } @@ -315,17 +311,15 @@ Status<void> BufferHubQueue::AddBuffer( } Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { - ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot); + ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot); if (buffers_[slot]) { for (const auto& event_source : buffers_[slot]->GetEventSources()) { const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); if (ret < 0) { - ALOGE( - "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " - "set: %s", - strerror(-ret)); + ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__, + strerror(-ret)); return ErrorStatus(-ret); } } @@ -345,23 +339,31 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) { if (!is_full()) { available_buffers_.push(std::move(entry)); + // Find and remove the enqueued buffer from unavailable_buffers_slot if + // exist. + auto enqueued_buffer_iter = std::find_if( + unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(), + [&entry](size_t slot) -> bool { return slot == entry.slot; }); + if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) { + unavailable_buffers_slot_.erase(enqueued_buffer_iter); + } + // Trigger OnBufferAvailable callback if registered. if (on_buffer_available_) on_buffer_available_(); return {}; } else { - ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!"); + ALOGE("%s: Buffer queue is full!", __FUNCTION__); return ErrorStatus(E2BIG); } } Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, size_t* slot) { - ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(), - timeout); + ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout); - PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count()); + PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count()); if (count() == 0) { if (!WaitForBuffers(timeout)) @@ -376,6 +378,7 @@ Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, *slot = entry.slot; available_buffers_.pop(); + unavailable_buffers_slot_.push_back(*slot); return {std::move(buffer)}; } @@ -564,7 +567,7 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) { auto status = InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot); if (!status) { - ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s", + ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return status.error_status(); } @@ -580,31 +583,81 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, - pdx::LocalHandle* release_fence) { + pdx::LocalHandle* release_fence, bool gain_posted_buffer) { ATRACE_NAME("ProducerQueue::Dequeue"); if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { - ALOGE("ProducerQueue::Dequeue: Invalid parameter."); + ALOGE("%s: Invalid parameter.", __FUNCTION__); return ErrorStatus(EINVAL); } - auto status = BufferHubQueue::Dequeue(timeout, slot); - if (!status) - return status.error_status(); - - auto buffer = std::static_pointer_cast<BufferProducer>(status.take()); - const int ret = buffer->GainAsync(out_meta, release_fence); + std::shared_ptr<BufferProducer> buffer; + Status<std::shared_ptr<BufferHubBase>> dequeue_status = + BufferHubQueue::Dequeue(timeout, slot); + if (dequeue_status.ok()) { + buffer = std::static_pointer_cast<BufferProducer>(dequeue_status.take()); + } else { + if (gain_posted_buffer) { + Status<std::shared_ptr<BufferProducer>> dequeue_unacquired_status = + ProducerQueue::DequeueUnacquiredBuffer(slot); + if (!dequeue_unacquired_status.ok()) { + ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__, + dequeue_unacquired_status.error()); + return dequeue_unacquired_status.error_status(); + } + buffer = dequeue_unacquired_status.take(); + } else { + return dequeue_status.error_status(); + } + } + const int ret = + buffer->GainAsync(out_meta, release_fence, gain_posted_buffer); if (ret < 0 && ret != -EALREADY) return ErrorStatus(-ret); return {std::move(buffer)}; } +Status<std::shared_ptr<BufferProducer>> ProducerQueue::DequeueUnacquiredBuffer( + size_t* slot) { + if (unavailable_buffers_slot_.size() < 1) { + ALOGE( + "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in " + "acquired state if exist.", + __FUNCTION__); + return ErrorStatus(ENOMEM); + } + + // Find the first buffer that is not in acquired state from + // unavailable_buffers_slot_. + for (auto iter = unavailable_buffers_slot_.begin(); + iter != unavailable_buffers_slot_.end(); iter++) { + std::shared_ptr<BufferProducer> buffer = ProducerQueue::GetBuffer(*iter); + if (buffer == nullptr) { + ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__, + static_cast<int>(*slot)); + return ErrorStatus(EIO); + } + if (!BufferHubDefs::IsBufferAcquired(buffer->buffer_state())) { + *slot = *iter; + unavailable_buffers_slot_.erase(iter); + unavailable_buffers_slot_.push_back(*slot); + ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d", + __FUNCTION__, static_cast<int>(*slot)); + return {std::move(buffer)}; + } + } + ALOGE( + "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.", + __FUNCTION__); + return ErrorStatus(EBUSY); +} + pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() { if (capacity() != 0) { ALOGE( - "ProducerQueue::TakeAsParcelable: producer queue can only be taken out" - " as a parcelable when empty. Current queue capacity: %zu", - capacity()); + "%s: producer queue can only be taken out as a parcelable when empty. " + "Current queue capacity: %zu", + __FUNCTION__, capacity()); return ErrorStatus(EINVAL); } @@ -628,17 +681,16 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) : BufferHubQueue(std::move(handle)) { auto status = ImportQueue(); if (!status) { - ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s", + ALOGE("%s: Failed to import queue: %s", __FUNCTION__, status.GetErrorMessage().c_str()); Close(-status.error()); } auto import_status = ImportBuffers(); if (import_status) { - ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.", - import_status.get()); + ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get()); } else { - ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s", + ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, import_status.GetErrorMessage().c_str()); } } @@ -647,14 +699,11 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); if (!status) { if (status.error() == EBADR) { - ALOGI( - "ConsumerQueue::ImportBuffers: Queue is silent, no buffers " - "imported."); + ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__); return {0}; } else { - ALOGE( - "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", - status.GetErrorMessage().c_str()); + ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__, + status.GetErrorMessage().c_str()); return status.error_status(); } } @@ -665,13 +714,13 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto buffer_handle_slots = status.take(); for (auto& buffer_handle_slot : buffer_handle_slots) { - ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d", + ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__, buffer_handle_slot.first.value()); std::unique_ptr<BufferConsumer> buffer_consumer = BufferConsumer::Import(std::move(buffer_handle_slot.first)); if (!buffer_consumer) { - ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu", + ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__, buffer_handle_slot.second); last_error = ErrorStatus(EPIPE); continue; @@ -680,7 +729,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto add_status = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); if (!add_status) { - ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s", + ALOGE("%s: Failed to add buffer: %s", __FUNCTION__, add_status.GetErrorMessage().c_str()); last_error = add_status; } else { @@ -696,8 +745,8 @@ Status<size_t> ConsumerQueue::ImportBuffers() { Status<void> ConsumerQueue::AddBuffer( const std::shared_ptr<BufferConsumer>& buffer, size_t slot) { - ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", - id(), buffer->id(), slot); + ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(), + buffer->id(), slot); return BufferHubQueue::AddBuffer(buffer, slot); } @@ -706,9 +755,9 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( LocalHandle* acquire_fence) { if (user_metadata_size != user_metadata_size_) { ALOGE( - "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer " - "does not match metadata size (%zu) for the queue.", - user_metadata_size, user_metadata_size_); + "%s: Metadata size (%zu) for the dequeuing buffer does not match " + "metadata size (%zu) for the queue.", + __FUNCTION__, user_metadata_size, user_metadata_size_); return ErrorStatus(EINVAL); } @@ -723,7 +772,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( if (metadata_src) { memcpy(meta, metadata_src, user_metadata_size); } else { - ALOGW("ConsumerQueue::Dequeue: no user-defined metadata."); + ALOGW("%s: no user-defined metadata.", __FUNCTION__); } } @@ -735,7 +784,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( pdx::LocalHandle* acquire_fence) { ATRACE_NAME("ConsumerQueue::Dequeue"); if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { - ALOGE("ConsumerQueue::Dequeue: Invalid parameter."); + ALOGE("%s: Invalid parameter.", __FUNCTION__); return ErrorStatus(EINVAL); } @@ -752,19 +801,18 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( } Status<void> ConsumerQueue::OnBufferAllocated() { - ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id()); + ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id()); auto status = ImportBuffers(); if (!status) { - ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s", + ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return ErrorStatus(status.error()); } else if (status.get() == 0) { - ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!"); + ALOGW("%s: No new buffers allocated!", __FUNCTION__); return ErrorStatus(ENOBUFS); } else { - ALOGD_IF(TRACE, - "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.", + ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__, status.get()); return {}; } diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h index c69002debd..def7c6b66e 100644 --- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h +++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h @@ -57,10 +57,10 @@ class BufferHubQueue : public pdx::Client { uint32_t default_width() const { return default_width_; } // Returns the default buffer height of this buffer queue. - uint32_t default_height() const { return static_cast<uint32_t>(default_height_); } + uint32_t default_height() const { return default_height_; } // Returns the default buffer format of this buffer queue. - uint32_t default_format() const { return static_cast<uint32_t>(default_format_); } + uint32_t default_format() const { return default_format_; } // Creates a new consumer in handle form for immediate transport over RPC. pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle( @@ -208,6 +208,14 @@ class BufferHubQueue : public pdx::Client { // Size of the metadata that buffers in this queue cary. size_t user_metadata_size_{0}; + // Buffers and related data that are available for dequeue. + std::priority_queue<Entry, std::vector<Entry>, EntryComparator> + available_buffers_; + + // Slot of the buffers that are not available for normal dequeue. For example, + // the slot of posted or acquired buffers in the perspective of a producer. + std::vector<size_t> unavailable_buffers_slot_; + private: void Initialize(); @@ -252,10 +260,6 @@ class BufferHubQueue : public pdx::Client { // queue regardless of its queue position or presence in the ring buffer. std::array<std::shared_ptr<BufferHubBase>, kMaxQueueCapacity> buffers_; - // Buffers and related data that are available for dequeue. - std::priority_queue<Entry, std::vector<Entry>, EntryComparator> - available_buffers_; - // Keeps track with how many buffers have been added into the queue. size_t capacity_{0}; @@ -349,11 +353,30 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. + // @return a buffer in gained state, which was originally in released state. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, pdx::LocalHandle* release_fence); + + // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, + // and caller should call Post() once it's done writing to release the buffer + // to the consumer side. + // + // @param timeout to dequeue a buffer. + // @param slot is the slot of the output BufferProducer. + // @param release_fence for gaining a buffer. + // @param out_meta metadata of the output buffer. + // @param gain_posted_buffer whether to gain posted buffer if no released + // buffer is available to gain. + // @return a buffer in gained state, which was originally in released state if + // gain_posted_buffer is false, or in posted/released state if + // gain_posted_buffer is true. + // TODO(b/112007999): gain_posted_buffer true is only used to prevent + // libdvrtracking from starving when there are non-responding clients. This + // gain_posted_buffer param can be removed once libdvrtracking start to use + // the new AHardwareBuffer API. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, - pdx::LocalHandle* release_fence); + pdx::LocalHandle* release_fence, bool gain_posted_buffer = false); // Enqueues a producer buffer in the queue. pdx::Status<void> Enqueue(const std::shared_ptr<BufferProducer>& buffer, @@ -374,6 +397,16 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // arguments as the constructors. explicit ProducerQueue(pdx::LocalChannelHandle handle); ProducerQueue(const ProducerQueueConfig& config, const UsagePolicy& usage); + + // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, + // and caller should call Post() once it's done writing to release the buffer + // to the consumer side. + // + // @param slot the slot of the returned buffer. + // @return a buffer in gained state, which was originally in posted state or + // released state. + pdx::Status<std::shared_ptr<BufferProducer>> DequeueUnacquiredBuffer( + size_t* slot); }; class ConsumerQueue : public BufferHubQueue { diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp index 046df5412f..c58f55f7bd 100644 --- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp +++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp @@ -1,5 +1,6 @@ #include <base/logging.h> #include <binder/Parcel.h> +#include <dvr/dvr_api.h> #include <private/dvr/buffer_hub_client.h> #include <private/dvr/buffer_hub_queue_client.h> @@ -122,6 +123,147 @@ TEST_F(BufferHubQueueTest, TestDequeue) { } } +TEST_F(BufferHubQueueTest, + TestDequeuePostedBufferIfNoAvailableReleasedBuffer_withBufferConsumer) { + ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{})); + + // Allocate 3 buffers to use. + const size_t test_queue_capacity = 3; + for (int64_t i = 0; i < test_queue_capacity; i++) { + AllocateBuffer(); + } + EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity); + + size_t producer_slot, consumer_slot; + LocalHandle fence; + DvrNativeBufferMetadata mi, mo; + + // Producer posts 2 buffers and remember their posted sequence. + std::deque<size_t> posted_slots; + for (int64_t i = 0; i < 2; i++) { + auto p1_status = + producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); + EXPECT_TRUE(p1_status.ok()); + auto p1 = p1_status.take(); + ASSERT_NE(p1, nullptr); + + // Producer should not be gaining posted buffer when there are still + // available buffers to gain. + auto found_iter = + std::find(posted_slots.begin(), posted_slots.end(), producer_slot); + EXPECT_EQ(found_iter, posted_slots.end()); + posted_slots.push_back(producer_slot); + + // Producer posts the buffer. + mi.index = i; + EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); + } + + // Consumer acquires one buffer. + auto c1_status = + consumer_queue_->Dequeue(kTimeoutMs, &consumer_slot, &mo, &fence); + EXPECT_TRUE(c1_status.ok()); + auto c1 = c1_status.take(); + ASSERT_NE(c1, nullptr); + // Consumer should get the oldest posted buffer. No checks here. + // posted_slots[0] should be in acquired state now. + EXPECT_EQ(mo.index, 0); + // Consumer releases the buffer. + EXPECT_EQ(c1->ReleaseAsync(&mi, LocalHandle()), 0); + // posted_slots[0] should be in released state now. + + // Producer gain and post 2 buffers. + for (int64_t i = 0; i < 2; i++) { + auto p1_status = + producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); + EXPECT_TRUE(p1_status.ok()); + auto p1 = p1_status.take(); + ASSERT_NE(p1, nullptr); + + // The gained buffer should be the one in released state or the one haven't + // been use. + EXPECT_NE(posted_slots[1], producer_slot); + + mi.index = i + 2; + EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); + } + + // Producer gains a buffer. + auto p1_status = + producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); + EXPECT_TRUE(p1_status.ok()); + auto p1 = p1_status.take(); + ASSERT_NE(p1, nullptr); + + // The gained buffer should be the oldest posted buffer. + EXPECT_EQ(posted_slots[1], producer_slot); + + // Producer posts the buffer. + mi.index = 4; + EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); +} + +TEST_F(BufferHubQueueTest, + TestDequeuePostedBufferIfNoAvailableReleasedBuffer_noBufferConsumer) { + ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{})); + + // Allocate 4 buffers to use. + const size_t test_queue_capacity = 4; + for (int64_t i = 0; i < test_queue_capacity; i++) { + AllocateBuffer(); + } + EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity); + + // Post all allowed buffers and remember their posted sequence. + std::deque<size_t> posted_slots; + for (int64_t i = 0; i < test_queue_capacity; i++) { + size_t slot; + LocalHandle fence; + DvrNativeBufferMetadata mi, mo; + + // Producer gains a buffer. + auto p1_status = + producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true); + EXPECT_TRUE(p1_status.ok()); + auto p1 = p1_status.take(); + ASSERT_NE(p1, nullptr); + + // Producer should not be gaining posted buffer when there are still + // available buffers to gain. + auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), slot); + EXPECT_EQ(found_iter, posted_slots.end()); + posted_slots.push_back(slot); + + // Producer posts the buffer. + mi.index = i; + EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0); + } + + // Gain posted buffers in sequence. + const int64_t nb_dequeue_all_times = 2; + for (int j = 0; j < nb_dequeue_all_times; ++j) { + for (int i = 0; i < test_queue_capacity; ++i) { + size_t slot; + LocalHandle fence; + DvrNativeBufferMetadata mi, mo; + + // Producer gains a buffer. + auto p1_status = + producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true); + EXPECT_TRUE(p1_status.ok()); + auto p1 = p1_status.take(); + ASSERT_NE(p1, nullptr); + + // The gained buffer should be the oldest posted buffer. + EXPECT_EQ(posted_slots[i], slot); + + // Producer posts the buffer. + mi.index = i + test_queue_capacity * (j + 1); + EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0); + } + } +} + TEST_F(BufferHubQueueTest, TestProducerConsumer) { const size_t kBufferCount = 16; size_t slot; @@ -245,8 +387,8 @@ TEST_F(BufferHubQueueTest, TestRemoveBuffer) { for (size_t i = 0; i < kBufferCount; i++) { Entry* entry = &buffers[i]; - auto producer_status = producer_queue_->Dequeue( - kTimeoutMs, &entry->slot, &mo, &entry->fence); + auto producer_status = + producer_queue_->Dequeue(kTimeoutMs, &entry->slot, &mo, &entry->fence); ASSERT_TRUE(producer_status.ok()); entry->buffer = producer_status.take(); ASSERT_NE(nullptr, entry->buffer); |