diff --git a/Cargo.lock b/Cargo.lock index 8cef92f..99d6bf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1957,6 +1957,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/crabidy-server/Cargo.toml b/crabidy-server/Cargo.toml index 5859cbb..f39e97c 100644 --- a/crabidy-server/Cargo.toml +++ b/crabidy-server/Cargo.toml @@ -19,4 +19,4 @@ flume = "0.10.14" tonic = "0.9.2" async-trait = "0.1.68" futures = "0.3.28" -tokio-stream = "0.1.14" +tokio-stream = { version = "0.1.14", features = ["sync"] } diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index 7aef04b..9ebb747 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -27,8 +27,8 @@ use tonic::{transport::Server, Request, Response, Result, Status}; #[tokio::main] async fn main() -> Result<(), Box> { - let (queue_update_tx, queue_update_rx) = flume::bounded(10); - let (active_track_tx, active_track_rx) = flume::bounded(1000); + let (queue_update_tx, _) = tokio::sync::broadcast::channel(100); + let (active_track_tx, _) = tokio::sync::broadcast::channel(1000); let orchestrator = ProviderOrchestrator::init("").await.unwrap(); let playback = Playback::new( @@ -61,8 +61,8 @@ async fn main() -> Result<(), Box> { gstreamer::BusSyncReply::Drop }); let crabidy_service = RpcService::new( - queue_update_rx, - active_track_rx, + queue_update_tx, + active_track_tx, playback.playback_tx.clone(), orchestrator.provider_tx.clone(), ); @@ -242,8 +242,8 @@ enum PlaybackMessage { #[derive(Debug)] struct Playback { - active_track_tx: flume::Sender, - queue_update_tx: flume::Sender, + active_track_tx: tokio::sync::broadcast::Sender, + queue_update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, playback_tx: flume::Sender, playback_rx: flume::Receiver, @@ -255,8 +255,8 @@ struct Playback { impl Playback { fn new( - active_track_tx: flume::Sender, - queue_update_tx: flume::Sender, + active_track_tx: tokio::sync::broadcast::Sender, + queue_update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, ) -> Self { let (playback_tx, playback_rx) = flume::bounded(10); @@ -302,7 +302,9 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); queue.replace_with_tracks(&tracks); let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; } if !tracks.is_empty() { self.play(tracks[0].clone()).await; @@ -314,7 +316,10 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); queue.queue_tracks(&[track]); let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + // queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; } } PlaybackMessage::QueueNode { uuid } => { @@ -322,7 +327,10 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); queue.queue_tracks(&tracks); let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; + // queue_update_tx.send(queue.clone()).unwrap(); } PlaybackMessage::GetQueue { result_tx } => { @@ -334,7 +342,13 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); queue.append_tracks(&[track]); let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; + // queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; } } PlaybackMessage::AppendNode { uuid } => { @@ -366,7 +380,10 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); queue.set_current(position); let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + // queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; queue.current() }; @@ -384,7 +401,10 @@ impl Playback { let position = queue.current + 1; let stop = !queue.set_current(position); let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + // queue_update_tx.send(queue.clone()).unwrap(); + if let Err(err) = queue_update_tx.send(queue.clone()) { + println!("{:?}", err) + }; (queue.current(), stop) }; @@ -414,7 +434,10 @@ impl Playback { *self.state.lock().unwrap() = state.clone(); let active_track_tx = self.active_track_tx.clone(); let active_track = self.get_active_track().await; - active_track_tx.send(active_track).unwrap(); + // active_track_tx.send(active_track).unwrap(); + if let Err(err) = active_track_tx.send(active_track) { + println!("{:?}", err) + }; } } } @@ -523,22 +546,22 @@ impl Playback { #[derive(Debug)] struct RpcService { - queue_update_rx: flume::Receiver, - active_track_rx: flume::Receiver, + queue_update_tx: tokio::sync::broadcast::Sender, + active_track_tx: tokio::sync::broadcast::Sender, playback_tx: flume::Sender, provider_tx: flume::Sender, } impl RpcService { fn new( - queue_update_rx: flume::Receiver, - active_track_rx: flume::Receiver, + queue_update_rx: tokio::sync::broadcast::Sender, + active_track_rx: tokio::sync::broadcast::Sender, playback_tx: flume::Sender, provider_tx: flume::Sender, ) -> Self { Self { - queue_update_rx, - active_track_rx, + queue_update_tx: queue_update_rx, + active_track_tx: active_track_rx, playback_tx, provider_tx, } @@ -733,9 +756,21 @@ impl CrabidyService for RpcService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - let update_rx = self.queue_update_rx.clone(); + let update_rx = self.queue_update_tx.subscribe(); + let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx); + + let output_stream = update_stream + .into_stream() + .map(|queue_result| match queue_result { + Ok(queue) => Ok(GetQueueUpdatesResponse { + queue_update_result: Some(QueueUpdateResult::Full(queue)), + }), + Err(_) => Err(tonic::Status::new( + tonic::Code::Unknown, + "Internal channel error", + )), + }); - let output_stream = update_rx.into_stream().map(map_queue); Ok(Response::new(Box::pin(output_stream))) } @@ -794,19 +829,18 @@ impl CrabidyService for RpcService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - let update_rx = self.active_track_rx.clone(); + let update_rx = self.active_track_tx.subscribe(); + let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx); - let output_stream = update_rx.into_stream().map(|active_track| { - Ok(GetTrackUpdatesResponse { + let output_stream = update_stream.map(|active_track_result| match active_track_result { + Ok(active_track) => Ok(GetTrackUpdatesResponse { active_track: Some(active_track), - }) + }), + Err(_) => Err(tonic::Status::new( + tonic::Code::Unknown, + "Internal channel error", + )), }); Ok(Response::new(Box::pin(output_stream))) } } - -fn map_queue(queue: Queue) -> Result { - Ok(GetQueueUpdatesResponse { - queue_update_result: Some(QueueUpdateResult::Full(queue)), - }) -}