diff options
-rw-r--r-- | libs/bufferstreams/rust/Android.bp | 23 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/lib.rs | 180 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/publishers/mod.rs | 17 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/publishers/testing.rs | 103 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/stream_config.rs | 67 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/subscribers/mod.rs | 20 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/subscribers/shared.rs | 94 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/subscribers/testing.rs | 106 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/subscriptions/mod.rs | 19 | ||||
-rw-r--r-- | libs/bufferstreams/rust/src/subscriptions/shared_buffer_subscription.rs | 84 |
10 files changed, 666 insertions, 47 deletions
diff --git a/libs/bufferstreams/rust/Android.bp b/libs/bufferstreams/rust/Android.bp index ff951487bc..7fcb222085 100644 --- a/libs/bufferstreams/rust/Android.bp +++ b/libs/bufferstreams/rust/Android.bp @@ -12,13 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -rust_library { - name: "libbufferstreams", - crate_name: "bufferstreams", +rust_defaults { + name: "libbufferstreams_defaults", srcs: ["src/lib.rs"], - edition: "2021", - rlibs: [ + rustlibs: [ + "libanyhow", "libnativewindow_rs", ], + edition: "2021", +} + +rust_library { + name: "libbufferstreams", + crate_name: "bufferstreams", + defaults: ["libbufferstreams_defaults"], min_sdk_version: "30", } + +rust_test { + name: "libbufferstreams-internal_test", + crate_name: "bufferstreams", + defaults: ["libbufferstreams_defaults"], + test_suites: ["general-tests"], +} diff --git a/libs/bufferstreams/rust/src/lib.rs b/libs/bufferstreams/rust/src/lib.rs index 1d321c833d..87f3104915 100644 --- a/libs/bufferstreams/rust/src/lib.rs +++ b/libs/bufferstreams/rust/src/lib.rs @@ -14,8 +14,14 @@ //! libbufferstreams: Reactive Streams for Graphics Buffers +pub mod publishers; +mod stream_config; +pub mod subscribers; +pub mod subscriptions; + +pub use stream_config::*; + use nativewindow::*; -use std::sync::{Arc, Weak}; use std::time::Instant; /// This function will print Hello World. @@ -31,28 +37,30 @@ pub extern "C" fn hello() -> bool { /// /// BufferPublishers are required to adhere to the following, based on the /// reactive streams specification: -/// * The total number of on_next´s signalled by a Publisher to a Subscriber +/// * The total number of on_next´s signalled by a Publisher to a Subscriber /// MUST be less than or equal to the total number of elements requested by that /// Subscriber´s Subscription at all times. -/// * A Publisher MAY signal fewer on_next than requested and terminate the +/// * A Publisher MAY signal fewer on_next than requested and terminate the /// Subscription by calling on_complete or on_error. -/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber +/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber /// MUST be signaled serially. -/// * If a Publisher fails it MUST signal an on_error. -/// * If a Publisher terminates successfully (finite stream) it MUST signal an +/// * If a Publisher fails it MUST signal an on_error. +/// * If a Publisher terminates successfully (finite stream) it MUST signal an /// on_complete. -/// * If a Publisher signals either on_error or on_complete on a Subscriber, +/// * If a Publisher signals either on_error or on_complete on a Subscriber, /// that Subscriber’s Subscription MUST be considered cancelled. -/// * Once a terminal state has been signaled (on_error, on_complete) it is +/// * Once a terminal state has been signaled (on_error, on_complete) it is /// REQUIRED that no further signals occur. -/// * If a Subscription is cancelled its Subscriber MUST eventually stop being +/// * If a Subscription is cancelled its Subscriber MUST eventually stop being /// signaled. -/// * A Publisher MAY support multiple Subscribers and decides whether each +/// * A Publisher MAY support multiple Subscribers and decides whether each /// Subscription is unicast or multicast. pub trait BufferPublisher { + /// Returns the StreamConfig of buffers that publisher creates. + fn get_publisher_stream_config(&self) -> StreamConfig; /// This function will create the subscription between the publisher and /// the subscriber. - fn subscribe(&self, subscriber: Weak<dyn BufferSubscriber>); + fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static); } /// BufferSubscribers can subscribe to BufferPublishers. They can request Frames @@ -61,35 +69,37 @@ pub trait BufferPublisher { /// /// BufferSubcribers are required to adhere to the following, based on the /// reactive streams specification: -/// * The total number of on_next´s signalled by a Publisher to a Subscriber +/// * The total number of on_next´s signalled by a Publisher to a Subscriber /// MUST be less than or equal to the total number of elements requested by that /// Subscriber´s Subscription at all times. -/// * A Publisher MAY signal fewer on_next than requested and terminate the +/// * A Publisher MAY signal fewer on_next than requested and terminate the /// Subscription by calling on_complete or on_error. -/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber +/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber /// MUST be signaled serially. -/// * If a Publisher fails it MUST signal an on_error. -/// * If a Publisher terminates successfully (finite stream) it MUST signal an +/// * If a Publisher fails it MUST signal an on_error. +/// * If a Publisher terminates successfully (finite stream) it MUST signal an /// on_complete. -/// * If a Publisher signals either on_error or on_complete on a Subscriber, +/// * If a Publisher signals either on_error or on_complete on a Subscriber, /// that Subscriber’s Subscription MUST be considered cancelled. -/// * Once a terminal state has been signaled (on_error, on_complete) it is +/// * Once a terminal state has been signaled (on_error, on_complete) it is /// REQUIRED that no further signals occur. -/// * If a Subscription is cancelled its Subscriber MUST eventually stop being +/// * If a Subscription is cancelled its Subscriber MUST eventually stop being /// signaled. -/// * Publisher.subscribe MAY be called as many times as wanted but MUST be +/// * Publisher.subscribe MAY be called as many times as wanted but MUST be /// with a different Subscriber each time. -/// * A Publisher MAY support multiple Subscribers and decides whether each +/// * A Publisher MAY support multiple Subscribers and decides whether each /// Subscription is unicast or multicast. pub trait BufferSubscriber { + /// The StreamConfig of buffers that this subscriber expects. + fn get_subscriber_stream_config(&self) -> StreamConfig; /// This function will be called at the beginning of the subscription. - fn on_subscribe(&self, subscription: Arc<dyn BufferSubscription>); + fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>); /// This function will be called for buffer that comes in. - fn on_next(&self, frame: Frame); + fn on_next(&mut self, frame: Frame); /// This function will be called in case of an error. - fn on_error(&self, error: BufferError); + fn on_error(&mut self, error: BufferError); /// This function will be called on finite streams when done. - fn on_complete(&self); + fn on_complete(&mut self); } /// BufferSubscriptions serve as the bridge between BufferPublishers and @@ -100,50 +110,51 @@ pub trait BufferSubscriber { /// /// BufferSubcriptions are required to adhere to the following, based on the /// reactive streams specification: -/// * Subscription.request and Subscription.cancel MUST only be called inside +/// * Subscription.request and Subscription.cancel MUST only be called inside /// of its Subscriber context. -/// * The Subscription MUST allow the Subscriber to call Subscription.request +/// * The Subscription MUST allow the Subscriber to call Subscription.request /// synchronously from within on_next or on_subscribe. -/// * Subscription.request MUST place an upper bound on possible synchronous +/// * Subscription.request MUST place an upper bound on possible synchronous /// recursion between Publisher and Subscriber. -/// * Subscription.request SHOULD respect the responsivity of its caller by +/// * Subscription.request SHOULD respect the responsivity of its caller by /// returning in a timely manner. -/// * Subscription.cancel MUST respect the responsivity of its caller by +/// * Subscription.cancel MUST respect the responsivity of its caller by /// returning in a timely manner, MUST be idempotent and MUST be thread-safe. -/// * After the Subscription is cancelled, additional +/// * After the Subscription is cancelled, additional /// Subscription.request(n: u64) MUST be NOPs. -/// * After the Subscription is cancelled, additional Subscription.cancel() +/// * After the Subscription is cancelled, additional Subscription.cancel() /// MUST be NOPs. -/// * While the Subscription is not cancelled, Subscription.request(n: u64) +/// * While the Subscription is not cancelled, Subscription.request(n: u64) /// MUST register the given number of additional elements to be produced to the /// respective subscriber. -/// * While the Subscription is not cancelled, Subscription.request(n: u64) +/// * While the Subscription is not cancelled, Subscription.request(n: u64) /// MUST signal on_error if the argument is <= 0. The cause message SHOULD /// explain that non-positive request signals are illegal. -/// * While the Subscription is not cancelled, Subscription.request(n: u64) +/// * While the Subscription is not cancelled, Subscription.request(n: u64) /// MAY synchronously call on_next on this (or other) subscriber(s). -/// * While the Subscription is not cancelled, Subscription.request(n: u64) +/// * While the Subscription is not cancelled, Subscription.request(n: u64) /// MAY synchronously call on_complete or on_error on this (or other) /// subscriber(s). -/// * While the Subscription is not cancelled, Subscription.cancel() MUST +/// * While the Subscription is not cancelled, Subscription.cancel() MUST /// request the Publisher to eventually stop signaling its Subscriber. The /// operation is NOT REQUIRED to affect the Subscription immediately. -/// * While the Subscription is not cancelled, Subscription.cancel() MUST +/// * While the Subscription is not cancelled, Subscription.cancel() MUST /// request the Publisher to eventually drop any references to the corresponding /// subscriber. -/// * While the Subscription is not cancelled, calling Subscription.cancel MAY +/// * While the Subscription is not cancelled, calling Subscription.cancel MAY /// cause the Publisher, if stateful, to transition into the shut-down state if /// no other Subscription exists at this point. -/// * Calling Subscription.cancel MUST return normally. -/// * Calling Subscription.request MUST return normally. +/// * Calling Subscription.cancel MUST return normally. +/// * Calling Subscription.request MUST return normally. pub trait BufferSubscription { /// request fn request(&self, n: u64); /// cancel fn cancel(&self); } + /// Type used to describe errors produced by subscriptions. -type BufferError = Box<dyn std::error::Error + Send + Sync + 'static>; +pub type BufferError = anyhow::Error; /// Struct used to contain the buffer. pub struct Frame { @@ -154,3 +165,88 @@ pub struct Frame { /// A fence used for reading/writing safely. pub fence: i32, } + +#[cfg(test)] +mod test { + #![allow(warnings, unused)] + use super::*; + + use anyhow::anyhow; + use std::borrow::BorrowMut; + use std::error::Error; + use std::ops::Add; + use std::sync::Arc; + use std::time::Duration; + + use crate::publishers::testing::*; + use crate::subscribers::{testing::*, SharedSubscriber}; + + const STREAM_CONFIG: StreamConfig = StreamConfig { + width: 1, + height: 1, + layers: 1, + format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM, + usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN, + stride: 0, + }; + + fn make_frame() -> Frame { + Frame { + buffer: STREAM_CONFIG + .create_hardware_buffer() + .expect("Unable to create hardware buffer for test"), + present_time: Instant::now() + Duration::from_secs(1), + fence: 0, + } + } + + #[test] + fn test_test_implementations_next() { + let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG)); + let mut publisher = TestPublisher::new(STREAM_CONFIG); + + publisher.subscribe(subscriber.clone()); + assert!(subscriber.map_inner(|s| s.has_subscription())); + assert!(publisher.has_subscriber()); + + publisher.send_frame(make_frame()); + let events = subscriber.map_inner_mut(|s| s.take_events()); + assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_))); + + subscriber.map_inner(|s| s.request(1)); + assert_eq!(publisher.pending_requests(), 1); + + publisher.send_frame(make_frame()); + let events = subscriber.map_inner_mut(|s| s.take_events()); + assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_))); + assert_eq!(publisher.pending_requests(), 0); + } + + #[test] + fn test_test_implementations_complete() { + let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG)); + let mut publisher = TestPublisher::new(STREAM_CONFIG); + + publisher.subscribe(subscriber.clone()); + assert!(subscriber.map_inner(|s| s.has_subscription())); + assert!(publisher.has_subscriber()); + + publisher.send_complete(); + let events = subscriber.map_inner_mut(|s| s.take_events()); + assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete)); + } + + #[test] + fn test_test_implementations_error() { + let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG)); + let mut publisher = TestPublisher::new(STREAM_CONFIG); + + publisher.subscribe(subscriber.clone()); + assert!(subscriber.map_inner(|s| s.has_subscription())); + assert!(publisher.has_subscriber()); + + publisher.send_error(anyhow!("error")); + let events = subscriber.map_inner_mut(|s| s.take_events()); + assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_))); + } +} diff --git a/libs/bufferstreams/rust/src/publishers/mod.rs b/libs/bufferstreams/rust/src/publishers/mod.rs new file mode 100644 index 0000000000..2fd518efee --- /dev/null +++ b/libs/bufferstreams/rust/src/publishers/mod.rs @@ -0,0 +1,17 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module provides [BufferSubscriber] implementations and helpers. + +pub mod testing; diff --git a/libs/bufferstreams/rust/src/publishers/testing.rs b/libs/bufferstreams/rust/src/publishers/testing.rs new file mode 100644 index 0000000000..1593b18d7f --- /dev/null +++ b/libs/bufferstreams/rust/src/publishers/testing.rs @@ -0,0 +1,103 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Provides useful publishers for testing specifically. These should not be used in normal code. + +use crate::{subscriptions::SharedBufferSubscription, *}; + +/// A [BufferPublisher] specifically for testing. +/// +/// Provides users the ability to send events and read the state of the subscription. +pub struct TestPublisher { + config: StreamConfig, + subscriber: Option<Box<dyn BufferSubscriber>>, + subscription: SharedBufferSubscription, +} + +impl TestPublisher { + /// Create a new [TestPublisher]. + pub fn new(config: StreamConfig) -> Self { + Self { config, subscriber: None, subscription: SharedBufferSubscription::new() } + } + + /// Send a [BufferSubscriber::on_next] event to an owned [BufferSubscriber] if it has any + /// requested and returns true. Drops the frame and returns false otherwise. + /// + /// # Panics + /// + /// This will panic if there is no owned subscriber. + pub fn send_frame(&mut self, frame: Frame) -> bool { + let subscriber = + self.subscriber.as_deref_mut().expect("Tried to send_frame with no subscriber"); + + if self.subscription.take_request() { + subscriber.on_next(frame); + true + } else { + false + } + } + + /// Send a [BufferSubscriber::on_complete] event to an owned [BufferSubscriber]. + /// + /// # Panics + /// + /// This will panic if there is no owned subscriber. + pub fn send_complete(&mut self) { + let subscriber = + self.subscriber.as_deref_mut().expect("Tried to send_complete with no subscriber"); + subscriber.on_complete(); + } + + /// Send a [BufferSubscriber::on_error] event to an owned [BufferSubscriber]. + /// + /// # Panics + /// + /// This will panic if there is no owned subscriber. + pub fn send_error(&mut self, error: BufferError) { + let subscriber = + self.subscriber.as_deref_mut().expect("Tried to send_error with no subscriber"); + subscriber.on_error(error); + } + + /// Returns whether this [BufferPublisher] owns a subscriber. + pub fn has_subscriber(&self) -> bool { + self.subscriber.is_some() + } + + /// Returns the nummber of frames requested by the [BufferSubscriber]. + pub fn pending_requests(&self) -> u64 { + self.subscription.pending_requests() + } + + /// Returns whether the [BufferSubscriber] has cancelled the subscription. + pub fn is_cancelled(&self) -> bool { + self.subscription.is_cancelled() + } +} + +impl BufferPublisher for TestPublisher { + fn get_publisher_stream_config(&self) -> crate::StreamConfig { + self.config + } + + fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) { + assert!(self.subscriber.is_none(), "TestingPublishers can only take one subscriber"); + self.subscriber = Some(Box::new(subscriber)); + + if let Some(ref mut subscriber) = self.subscriber { + subscriber.on_subscribe(self.subscription.clone_for_subscriber()); + } + } +} diff --git a/libs/bufferstreams/rust/src/stream_config.rs b/libs/bufferstreams/rust/src/stream_config.rs new file mode 100644 index 0000000000..d0c621b0c4 --- /dev/null +++ b/libs/bufferstreams/rust/src/stream_config.rs @@ -0,0 +1,67 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use nativewindow::*; + +/// The configuration of the buffers published by a [BufferPublisher] or +/// expected by a [BufferSubscriber]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct StreamConfig { + /// Width in pixels of streaming buffers. + pub width: u32, + /// Height in pixels of streaming buffers. + pub height: u32, + /// Number of layers of streaming buffers. + pub layers: u32, + /// Format of streaming buffers. + pub format: AHardwareBuffer_Format::Type, + /// Usage of streaming buffers. + pub usage: AHardwareBuffer_UsageFlags, + /// Stride of streaming buffers. + pub stride: u32, +} + +impl StreamConfig { + /// Tries to create a new AHardwareBuffer from settings in a [StreamConfig]. + pub fn create_hardware_buffer(&self) -> Option<AHardwareBuffer> { + AHardwareBuffer::new(self.width, self.height, self.layers, self.format, self.usage) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_create_hardware_buffer() { + let config = StreamConfig { + width: 123, + height: 456, + layers: 1, + format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM, + usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN + | AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_WRITE_OFTEN, + stride: 0, + }; + + let maybe_buffer = config.create_hardware_buffer(); + assert!(maybe_buffer.is_some()); + + let buffer = maybe_buffer.unwrap(); + assert_eq!(config.width, buffer.width()); + assert_eq!(config.height, buffer.height()); + assert_eq!(config.format, buffer.format()); + assert_eq!(config.usage, buffer.usage()); + } +} diff --git a/libs/bufferstreams/rust/src/subscribers/mod.rs b/libs/bufferstreams/rust/src/subscribers/mod.rs new file mode 100644 index 0000000000..dd038c6c32 --- /dev/null +++ b/libs/bufferstreams/rust/src/subscribers/mod.rs @@ -0,0 +1,20 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module provides [BufferSubscriber] implementations and helpers. + +mod shared; +pub mod testing; + +pub use shared::*; diff --git a/libs/bufferstreams/rust/src/subscribers/shared.rs b/libs/bufferstreams/rust/src/subscribers/shared.rs new file mode 100644 index 0000000000..46c58dc04a --- /dev/null +++ b/libs/bufferstreams/rust/src/subscribers/shared.rs @@ -0,0 +1,94 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module provides [BufferSubscriber] implementations and helpers. + +use std::sync::{Arc, Mutex}; + +use crate::*; + +/// A [BufferSubscriber] wrapper that provides shared access. +/// +/// Normally, [BufferSubscriber]s are fully owned by the publisher that they are attached to. With +/// [SharedSubscriber], a +/// +/// # Panics +/// +/// [BufferSubscriber::on_subscribe] on a [SharedSubscriber] can only be called once, otherwise it +/// will panic. This is to prevent accidental and unsupported sharing between multiple publishers to +/// reflect the usual behavior where a publisher takes full ownership of a subscriber. +pub struct SharedSubscriber<S: BufferSubscriber>(Arc<Mutex<SharedSubscriberInner<S>>>); + +struct SharedSubscriberInner<S: BufferSubscriber> { + subscriber: S, + is_subscribed: bool, +} + +impl<S: BufferSubscriber> SharedSubscriber<S> { + /// Create a new wrapper around a [BufferSubscriber]. + pub fn new(subscriber: S) -> Self { + Self(Arc::new(Mutex::new(SharedSubscriberInner { subscriber, is_subscribed: false }))) + } + + /// Provides access to an immutable reference to the wrapped [BufferSubscriber]. + pub fn map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R { + let inner = self.0.lock().unwrap(); + f(&inner.subscriber) + } + + /// Provides access to a mutable reference to the wrapped [BufferSubscriber]. + pub fn map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R { + let mut inner = self.0.lock().unwrap(); + f(&mut inner.subscriber) + } +} + +impl<S: BufferSubscriber> Clone for SharedSubscriber<S> { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl<S: BufferSubscriber> BufferSubscriber for SharedSubscriber<S> { + fn get_subscriber_stream_config(&self) -> StreamConfig { + let inner = self.0.lock().unwrap(); + inner.subscriber.get_subscriber_stream_config() + } + + fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>) { + let mut inner = self.0.lock().unwrap(); + assert!( + !inner.is_subscribed, + "A SharedSubscriber can not be shared between two BufferPublishers" + ); + inner.is_subscribed = true; + + inner.subscriber.on_subscribe(subscription); + } + + fn on_next(&mut self, frame: Frame) { + let mut inner = self.0.lock().unwrap(); + inner.subscriber.on_next(frame); + } + + fn on_error(&mut self, error: BufferError) { + let mut inner = self.0.lock().unwrap(); + inner.subscriber.on_error(error); + } + + fn on_complete(&mut self) { + let mut inner = self.0.lock().unwrap(); + inner.subscriber.on_complete(); + } +} diff --git a/libs/bufferstreams/rust/src/subscribers/testing.rs b/libs/bufferstreams/rust/src/subscribers/testing.rs new file mode 100644 index 0000000000..b7e970579e --- /dev/null +++ b/libs/bufferstreams/rust/src/subscribers/testing.rs @@ -0,0 +1,106 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Provides useful subscribers for testing specifically. These should not be used in normal code. + +use crate::*; + +/// Represents a callback called by a [BufferPublisher] on a [BufferSubscriber]. +pub enum TestingSubscriberEvent { + /// Represents a call to [BufferSubscriber::on_subscribe]. + Subscribe, + /// Represents a call to [BufferSubscriber::on_next]. + Next(Frame), + /// Represents a call to [BufferSubscriber::on_error]. + Error(BufferError), + /// Represents a call to [BufferSubscriber::on_complete]. + Complete, +} + +/// A [BufferSubscriber] specifically for testing. Logs events as they happen which can be retrieved +/// by the test to ensure appropriate behavior. +pub struct TestSubscriber { + config: StreamConfig, + subscription: Option<Box<dyn BufferSubscription>>, + events: Vec<TestingSubscriberEvent>, +} + +impl TestSubscriber { + /// Create a new [TestSubscriber]. + pub fn new(config: StreamConfig) -> Self { + Self { config, subscription: None, events: Vec::new() } + } + + /// Returns true if this [BufferSubscriber] has an active subscription. + pub fn has_subscription(&self) -> bool { + self.subscription.is_some() + } + + /// Make a request on behalf of this test subscriber. + /// + /// This will panic if there is no owned subscription. + pub fn request(&self, n: u64) { + let subscription = self + .subscription + .as_deref() + .expect("Tried to request on a TestSubscriber with no subscription"); + subscription.request(n); + } + + /// Cancel on behalf of this test subscriber. + /// + /// # Panics + /// + /// This will panic if there is no owned subscription. + pub fn cancel(&self) { + let subscription = self + .subscription + .as_deref() + .expect("Tried to cancel a TestSubscriber with no subscription"); + subscription.cancel(); + } + + /// Gets all of the events that have happened to this [BufferSubscriber] since the last call + /// to this function or it was created. + pub fn take_events(&mut self) -> Vec<TestingSubscriberEvent> { + let mut out = Vec::new(); + out.append(&mut self.events); + out + } +} + +impl BufferSubscriber for TestSubscriber { + fn get_subscriber_stream_config(&self) -> StreamConfig { + self.config + } + + fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>) { + assert!(self.subscription.is_none(), "TestSubscriber must only be subscribed to once"); + self.subscription = Some(subscription); + + self.events.push(TestingSubscriberEvent::Subscribe); + } + + fn on_next(&mut self, frame: Frame) { + self.events.push(TestingSubscriberEvent::Next(frame)); + } + + fn on_error(&mut self, error: BufferError) { + self.events.push(TestingSubscriberEvent::Error(error)); + } + + fn on_complete(&mut self) { + self.events.push(TestingSubscriberEvent::Complete); + } +} diff --git a/libs/bufferstreams/rust/src/subscriptions/mod.rs b/libs/bufferstreams/rust/src/subscriptions/mod.rs new file mode 100644 index 0000000000..e046dbbda3 --- /dev/null +++ b/libs/bufferstreams/rust/src/subscriptions/mod.rs @@ -0,0 +1,19 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module provides [BufferSubscription] implementations and helpers. + +mod shared_buffer_subscription; + +pub use shared_buffer_subscription::*; diff --git a/libs/bufferstreams/rust/src/subscriptions/shared_buffer_subscription.rs b/libs/bufferstreams/rust/src/subscriptions/shared_buffer_subscription.rs new file mode 100644 index 0000000000..90275c7320 --- /dev/null +++ b/libs/bufferstreams/rust/src/subscriptions/shared_buffer_subscription.rs @@ -0,0 +1,84 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Mutex}; + +use crate::*; + +/// A simple sharable helper that can be used as a [BufferSubscription] by a [BufferSubscriber] and +/// as a state tracker by a [BufferPublisher]. +#[derive(Clone, Debug)] +pub struct SharedBufferSubscription(Arc<Mutex<BufferSubscriptionData>>); + +#[derive(Debug, Default)] +struct BufferSubscriptionData { + requests: u64, + is_cancelled: bool, +} + +impl SharedBufferSubscription { + /// Create a new [SharedBufferSubscription]. + pub fn new() -> Self { + SharedBufferSubscription::default() + } + + /// Clone this [SharedBufferSubscription] so it can be passed into + /// [BufferSubscriber::on_subscribe]. + pub fn clone_for_subscriber(&self) -> Box<dyn BufferSubscription> { + Box::new(self.clone()) as Box<dyn BufferSubscription> + } + + /// If possible (not cancelled and with requests pending), take + pub fn take_request(&self) -> bool { + let mut data = self.0.lock().unwrap(); + + if data.is_cancelled || data.requests == 0 { + false + } else { + data.requests -= 1; + true + } + } + + /// Get the number of pending requests made by the [BufferSubscriber] via + /// [BufferSubscription::request]. + pub fn pending_requests(&self) -> u64 { + self.0.lock().unwrap().requests + } + + /// Get get whether the [BufferSubscriber] has called [BufferSubscription::cancel]. + pub fn is_cancelled(&self) -> bool { + self.0.lock().unwrap().is_cancelled + } +} + +impl Default for SharedBufferSubscription { + fn default() -> Self { + Self(Arc::new(Mutex::new(BufferSubscriptionData::default()))) + } +} + +impl BufferSubscription for SharedBufferSubscription { + fn request(&self, n: u64) { + let mut data = self.0.lock().unwrap(); + if !data.is_cancelled { + data.requests = data.requests.saturating_add(n); + } + } + + fn cancel(&self) { + let mut data = self.0.lock().unwrap(); + data.is_cancelled = true; + } +} |