blob: 46c58dc04a732fc1e25492b8abeb4b4daade465e [file] [log] [blame]
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! This module provides [BufferSubscriber] implementations and helpers.
use std::sync::{Arc, Mutex};
use crate::*;
/// A [BufferSubscriber] wrapper that provides shared access.
///
/// Normally, [BufferSubscriber]s are fully owned by the publisher that they are attached to. With
/// [SharedSubscriber], a
///
/// # Panics
///
/// [BufferSubscriber::on_subscribe] on a [SharedSubscriber] can only be called once, otherwise it
/// will panic. This is to prevent accidental and unsupported sharing between multiple publishers to
/// reflect the usual behavior where a publisher takes full ownership of a subscriber.
pub struct SharedSubscriber<S: BufferSubscriber>(Arc<Mutex<SharedSubscriberInner<S>>>);
struct SharedSubscriberInner<S: BufferSubscriber> {
subscriber: S,
is_subscribed: bool,
}
impl<S: BufferSubscriber> SharedSubscriber<S> {
/// Create a new wrapper around a [BufferSubscriber].
pub fn new(subscriber: S) -> Self {
Self(Arc::new(Mutex::new(SharedSubscriberInner { subscriber, is_subscribed: false })))
}
/// Provides access to an immutable reference to the wrapped [BufferSubscriber].
pub fn map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R {
let inner = self.0.lock().unwrap();
f(&inner.subscriber)
}
/// Provides access to a mutable reference to the wrapped [BufferSubscriber].
pub fn map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R {
let mut inner = self.0.lock().unwrap();
f(&mut inner.subscriber)
}
}
impl<S: BufferSubscriber> Clone for SharedSubscriber<S> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl<S: BufferSubscriber> BufferSubscriber for SharedSubscriber<S> {
fn get_subscriber_stream_config(&self) -> StreamConfig {
let inner = self.0.lock().unwrap();
inner.subscriber.get_subscriber_stream_config()
}
fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>) {
let mut inner = self.0.lock().unwrap();
assert!(
!inner.is_subscribed,
"A SharedSubscriber can not be shared between two BufferPublishers"
);
inner.is_subscribed = true;
inner.subscriber.on_subscribe(subscription);
}
fn on_next(&mut self, frame: Frame) {
let mut inner = self.0.lock().unwrap();
inner.subscriber.on_next(frame);
}
fn on_error(&mut self, error: BufferError) {
let mut inner = self.0.lock().unwrap();
inner.subscriber.on_error(error);
}
fn on_complete(&mut self) {
let mut inner = self.0.lock().unwrap();
inner.subscriber.on_complete();
}
}