Allow multiple concurrent clients receive streams

This commit is contained in:
Hans Mündelein 2023-05-28 08:53:57 +02:00
parent 25249dc244
commit 46937b3d6e
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
3 changed files with 69 additions and 34 deletions

1
Cargo.lock generated
View File

@ -1957,6 +1957,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]

View File

@ -19,4 +19,4 @@ flume = "0.10.14"
tonic = "0.9.2" tonic = "0.9.2"
async-trait = "0.1.68" async-trait = "0.1.68"
futures = "0.3.28" futures = "0.3.28"
tokio-stream = "0.1.14" tokio-stream = { version = "0.1.14", features = ["sync"] }

View File

@ -27,8 +27,8 @@ use tonic::{transport::Server, Request, Response, Result, Status};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (queue_update_tx, queue_update_rx) = flume::bounded(10); let (queue_update_tx, _) = tokio::sync::broadcast::channel(100);
let (active_track_tx, active_track_rx) = flume::bounded(1000); let (active_track_tx, _) = tokio::sync::broadcast::channel(1000);
let orchestrator = ProviderOrchestrator::init("").await.unwrap(); let orchestrator = ProviderOrchestrator::init("").await.unwrap();
let playback = Playback::new( let playback = Playback::new(
@ -61,8 +61,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
gstreamer::BusSyncReply::Drop gstreamer::BusSyncReply::Drop
}); });
let crabidy_service = RpcService::new( let crabidy_service = RpcService::new(
queue_update_rx, queue_update_tx,
active_track_rx, active_track_tx,
playback.playback_tx.clone(), playback.playback_tx.clone(),
orchestrator.provider_tx.clone(), orchestrator.provider_tx.clone(),
); );
@ -242,8 +242,8 @@ enum PlaybackMessage {
#[derive(Debug)] #[derive(Debug)]
struct Playback { struct Playback {
active_track_tx: flume::Sender<ActiveTrack>, active_track_tx: tokio::sync::broadcast::Sender<ActiveTrack>,
queue_update_tx: flume::Sender<Queue>, queue_update_tx: tokio::sync::broadcast::Sender<Queue>,
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
playback_tx: flume::Sender<PlaybackMessage>, playback_tx: flume::Sender<PlaybackMessage>,
playback_rx: flume::Receiver<PlaybackMessage>, playback_rx: flume::Receiver<PlaybackMessage>,
@ -255,8 +255,8 @@ struct Playback {
impl Playback { impl Playback {
fn new( fn new(
active_track_tx: flume::Sender<ActiveTrack>, active_track_tx: tokio::sync::broadcast::Sender<ActiveTrack>,
queue_update_tx: flume::Sender<Queue>, queue_update_tx: tokio::sync::broadcast::Sender<Queue>,
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
) -> Self { ) -> Self {
let (playback_tx, playback_rx) = flume::bounded(10); let (playback_tx, playback_rx) = flume::bounded(10);
@ -302,7 +302,9 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&tracks); queue.replace_with_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone(); let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap(); if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
} }
if !tracks.is_empty() { if !tracks.is_empty() {
self.play(tracks[0].clone()).await; self.play(tracks[0].clone()).await;
@ -314,7 +316,10 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&[track]); queue.queue_tracks(&[track]);
let queue_update_tx = self.queue_update_tx.clone(); let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap(); // queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
} }
} }
PlaybackMessage::QueueNode { uuid } => { PlaybackMessage::QueueNode { uuid } => {
@ -322,7 +327,10 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&tracks); queue.queue_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone(); let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap(); if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
// queue_update_tx.send(queue.clone()).unwrap();
} }
PlaybackMessage::GetQueue { result_tx } => { PlaybackMessage::GetQueue { result_tx } => {
@ -334,7 +342,13 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&[track]); queue.append_tracks(&[track]);
let queue_update_tx = self.queue_update_tx.clone(); let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap(); if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
// queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
} }
} }
PlaybackMessage::AppendNode { uuid } => { PlaybackMessage::AppendNode { uuid } => {
@ -366,7 +380,10 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.set_current(position); queue.set_current(position);
let queue_update_tx = self.queue_update_tx.clone(); let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap(); // queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
queue.current() queue.current()
}; };
@ -384,7 +401,10 @@ impl Playback {
let position = queue.current + 1; let position = queue.current + 1;
let stop = !queue.set_current(position); let stop = !queue.set_current(position);
let queue_update_tx = self.queue_update_tx.clone(); let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap(); // queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
(queue.current(), stop) (queue.current(), stop)
}; };
@ -414,7 +434,10 @@ impl Playback {
*self.state.lock().unwrap() = state.clone(); *self.state.lock().unwrap() = state.clone();
let active_track_tx = self.active_track_tx.clone(); let active_track_tx = self.active_track_tx.clone();
let active_track = self.get_active_track().await; let active_track = self.get_active_track().await;
active_track_tx.send(active_track).unwrap(); // active_track_tx.send(active_track).unwrap();
if let Err(err) = active_track_tx.send(active_track) {
println!("{:?}", err)
};
} }
} }
} }
@ -523,22 +546,22 @@ impl Playback {
#[derive(Debug)] #[derive(Debug)]
struct RpcService { struct RpcService {
queue_update_rx: flume::Receiver<Queue>, queue_update_tx: tokio::sync::broadcast::Sender<Queue>,
active_track_rx: flume::Receiver<ActiveTrack>, active_track_tx: tokio::sync::broadcast::Sender<ActiveTrack>,
playback_tx: flume::Sender<PlaybackMessage>, playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
} }
impl RpcService { impl RpcService {
fn new( fn new(
queue_update_rx: flume::Receiver<Queue>, queue_update_rx: tokio::sync::broadcast::Sender<Queue>,
active_track_rx: flume::Receiver<ActiveTrack>, active_track_rx: tokio::sync::broadcast::Sender<ActiveTrack>,
playback_tx: flume::Sender<PlaybackMessage>, playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
) -> Self { ) -> Self {
Self { Self {
queue_update_rx, queue_update_tx: queue_update_rx,
active_track_rx, active_track_tx: active_track_rx,
playback_tx, playback_tx,
provider_tx, provider_tx,
} }
@ -733,9 +756,21 @@ impl CrabidyService for RpcService {
&self, &self,
request: tonic::Request<GetQueueUpdatesRequest>, request: tonic::Request<GetQueueUpdatesRequest>,
) -> std::result::Result<tonic::Response<Self::GetQueueUpdatesStream>, tonic::Status> { ) -> std::result::Result<tonic::Response<Self::GetQueueUpdatesStream>, tonic::Status> {
let update_rx = self.queue_update_rx.clone(); let update_rx = self.queue_update_tx.subscribe();
let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx);
let output_stream = update_stream
.into_stream()
.map(|queue_result| match queue_result {
Ok(queue) => Ok(GetQueueUpdatesResponse {
queue_update_result: Some(QueueUpdateResult::Full(queue)),
}),
Err(_) => Err(tonic::Status::new(
tonic::Code::Unknown,
"Internal channel error",
)),
});
let output_stream = update_rx.into_stream().map(map_queue);
Ok(Response::new(Box::pin(output_stream))) Ok(Response::new(Box::pin(output_stream)))
} }
@ -794,19 +829,18 @@ impl CrabidyService for RpcService {
&self, &self,
request: tonic::Request<GetTrackUpdatesRequest>, request: tonic::Request<GetTrackUpdatesRequest>,
) -> std::result::Result<tonic::Response<Self::GetTrackUpdatesStream>, tonic::Status> { ) -> std::result::Result<tonic::Response<Self::GetTrackUpdatesStream>, tonic::Status> {
let update_rx = self.active_track_rx.clone(); let update_rx = self.active_track_tx.subscribe();
let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx);
let output_stream = update_rx.into_stream().map(|active_track| { let output_stream = update_stream.map(|active_track_result| match active_track_result {
Ok(GetTrackUpdatesResponse { Ok(active_track) => Ok(GetTrackUpdatesResponse {
active_track: Some(active_track), active_track: Some(active_track),
}) }),
Err(_) => Err(tonic::Status::new(
tonic::Code::Unknown,
"Internal channel error",
)),
}); });
Ok(Response::new(Box::pin(output_stream))) Ok(Response::new(Box::pin(output_stream)))
} }
} }
fn map_queue(queue: Queue) -> Result<GetQueueUpdatesResponse, Status> {
Ok(GetQueueUpdatesResponse {
queue_update_result: Some(QueueUpdateResult::Full(queue)),
})
}