diff options
author | 2023-10-05 19:36:14 +0000 | |
---|---|---|
committer | 2023-10-16 17:23:02 +0000 | |
commit | 53f8b579869acdce0f8a721a21f00eb65d9ca6b5 (patch) | |
tree | 759c88340b9a117bc1a2388a86e6e7de7c2ff0d7 | |
parent | 912e153ee6c8776faac825ebe9f7a9510256093f (diff) |
libbufferstreams: Add the BufferPoolPublisher.
The BufferPoolPublisher submits buffers from a pool over to a
subscriber.
Pair: jshargo
Bug: 296450854, 296101127
Test: atest libbufferstreams-internal_test
Change-Id: Ic473677c9c71b0505c3fcd2b4fb7d0fdf3d7d01b
-rw-r--r-- | libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs | 112 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/publishers/mod.rs | 3 |
2 files changed, 115 insertions, 0 deletions
diff --git a/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs b/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs new file mode 100644 index 0000000000..846105dacd --- /dev/null +++ b/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs @@ -0,0 +1,112 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! + +use std::time::Instant; + +use crate::{ + buffers::BufferPool, subscriptions::SharedBufferSubscription, BufferPublisher, + BufferSubscriber, Frame, StreamConfig, +}; + +/// The [BufferPoolPublisher] submits buffers from a pool over to the subscriber. +pub struct BufferPoolPublisher { + stream_config: StreamConfig, + buffer_pool: BufferPool, + subscription: SharedBufferSubscription, + subscriber: Option<Box<dyn BufferSubscriber>>, +} + +impl BufferPoolPublisher { + /// The [BufferPoolPublisher] needs to initialize a [BufferPool], the [BufferPool] will create + /// all buffers at initialization using the stream_config. + pub fn new(stream_config: StreamConfig, size: usize) -> Option<Self> { + BufferPool::new(size, stream_config).map(|buffer_pool| Self { + stream_config, + buffer_pool, + subscription: SharedBufferSubscription::new(), + subscriber: None, + }) + } + + /// If the [SharedBufferSubscription] is ready for a [Frame], a buffer will be requested from + /// [BufferPool] and sent over to the [BufferSubscriber]. + pub fn send_next_frame(&mut self, present_time: Instant) -> bool { + if let Some(subscriber) = self.subscriber.as_mut() { + if self.subscription.take_request() { + if let Some(buffer) = self.buffer_pool.next_buffer() { + let frame = Frame { buffer, present_time, fence: 0 }; + + subscriber.on_next(frame); + return true; + } + } + } + false + } +} + +impl BufferPublisher for BufferPoolPublisher { + fn get_publisher_stream_config(&self) -> StreamConfig { + self.stream_config + } + + fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) { + assert!(self.subscriber.is_none()); + + self.subscriber = Some(Box::new(subscriber)); + self.subscriber.as_mut().unwrap().on_subscribe(self.subscription.clone_for_subscriber()); + } +} + +#[cfg(test)] +mod test { + use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags}; + + use super::*; + + use crate::{ + subscribers::{ + testing::{TestSubscriber, TestingSubscriberEvent}, + SharedSubscriber, + }, + StreamConfig, + }; + + const STREAM_CONFIG: StreamConfig = StreamConfig { + width: 1, + height: 1, + layers: 1, + format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM, + usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN, + stride: 0, + }; + + #[test] + fn test_send_next_frame() { + let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG)); + + let mut buffer_pool_publisher = BufferPoolPublisher::new(STREAM_CONFIG, 1).unwrap(); + buffer_pool_publisher.subscribe(subscriber.clone()); + + subscriber.map_inner(|s| s.request(1)); + + assert!(buffer_pool_publisher.send_next_frame(Instant::now())); + + let events = subscriber.map_inner_mut(|s| s.take_events()); + assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_))); + assert_eq!(buffer_pool_publisher.subscription.pending_requests(), 0); + } +} diff --git a/libs/bufferstreams/rust/src/publishers/mod.rs b/libs/bufferstreams/rust/src/publishers/mod.rs index 2fd518efee..8ed3ba0e00 100644 --- a/libs/bufferstreams/rust/src/publishers/mod.rs +++ b/libs/bufferstreams/rust/src/publishers/mod.rs @@ -14,4 +14,7 @@ //! This module provides [BufferSubscriber] implementations and helpers. +mod buffer_pool_publisher; pub mod testing; + +pub use buffer_pool_publisher::*; |