diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index bcda48c..e05a243 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -25,30 +25,30 @@ use std::{ }; use tonic::{transport::Server, Request, Response, Result, Status}; -fn poll_bus(bus: gstreamer::Bus, tx: flume::Sender) { - loop { - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { - match PlayMessage::parse(&msg) { - Ok(PlayMessage::EndOfStream) => { - tx.send(PlaybackMessage::Next).unwrap(); - } - Ok(PlayMessage::StateChanged { state }) => { - tx.send(PlaybackMessage::StateChanged { state }).unwrap(); - } - Ok(PlayMessage::PositionUpdated { position }) => {} - Ok(PlayMessage::Buffering { percent }) => {} - Ok(PlayMessage::VolumeChanged { volume }) => {} - Ok(PlayMessage::MuteChanged { muted }) => {} - - Ok(PlayMessage::MediaInfoUpdated { info }) => {} - _ => println!("Unknown message: {:?}", msg), +fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender) { + for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { + match PlayMessage::parse(&msg) { + Ok(PlayMessage::EndOfStream) => { + tx.send(PlaybackMessage::Next).unwrap(); } + Ok(PlayMessage::StateChanged { state }) => { + tx.send(PlaybackMessage::StateChanged { state }).unwrap(); + } + Ok(PlayMessage::PositionUpdated { position }) => {} + Ok(PlayMessage::Buffering { percent }) => {} + Ok(PlayMessage::VolumeChanged { volume }) => {} + Ok(PlayMessage::MuteChanged { muted }) => {} + + Ok(PlayMessage::MediaInfoUpdated { info }) => {} + _ => println!("Unknown message: {:?}", msg), } } } #[tokio::main] async fn main() -> Result<(), Box> { + gstreamer::init()?; + let (queue_update_tx, _) = tokio::sync::broadcast::channel(100); let (active_track_tx, _) = tokio::sync::broadcast::channel(1000); let orchestrator = ProviderOrchestrator::init("").await.unwrap(); @@ -63,29 +63,9 @@ async fn main() -> Result<(), Box> { let playback_tx = playback.playback_tx.clone(); std::thread::spawn(|| { - poll_bus(bus, playback_tx); + poll_play_bus(bus, playback_tx); }); - // bus.set_sync_handler(move |_, msg| { - // match PlayMessage::parse(msg) { - // Ok(PlayMessage::EndOfStream) => { - // playback_tx.send(PlaybackMessage::Next).unwrap(); - // } - // Ok(PlayMessage::StateChanged { state }) => { - // playback_tx - // .send(PlaybackMessage::StateChanged { state }) - // .unwrap(); - // } - // Ok(PlayMessage::PositionUpdated { position }) => {} - // Ok(PlayMessage::Buffering { percent }) => {} - // Ok(PlayMessage::VolumeChanged { volume }) => {} - // Ok(PlayMessage::MuteChanged { muted }) => {} - // - // Ok(PlayMessage::MediaInfoUpdated { info }) => {} - // _ => println!("Unknown message: {:?}", msg), - // } - // gstreamer::BusSyncReply::Drop - // }); let crabidy_service = RpcService::new( queue_update_tx, active_track_tx, @@ -176,8 +156,6 @@ impl ProviderOrchestrator { #[async_trait] impl ProviderClient for ProviderOrchestrator { async fn init(_s: &str) -> Result { - gstreamer::init().unwrap(); - let play = Play::new(None::); let state = Mutex::new(PlayState::Stopped); let queue = Mutex::new(Queue { timestamp: 0,