From 4fbeccdde1533dda1921d8e4b37a0901cc1041a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Fri, 2 Jun 2023 19:20:30 +0200 Subject: [PATCH] Implement init on the server --- crabidy-core/crabidy/v1/crabidy.proto | 4 +- crabidy-core/src/lib.rs | 2 +- crabidy-server/src/main.rs | 83 +++++++++++++++++++++++---- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/crabidy-core/crabidy/v1/crabidy.proto b/crabidy-core/crabidy/v1/crabidy.proto index c3fab66..3a14903 100644 --- a/crabidy-core/crabidy/v1/crabidy.proto +++ b/crabidy-core/crabidy/v1/crabidy.proto @@ -35,7 +35,7 @@ message InitResponse { Queue queue = 1; QueueTrack queue_track = 2; PlayState play_state = 3; - uint32 volume = 4; + float volume = 4; bool mute = 5; TrackPosition position = 6; } @@ -92,7 +92,7 @@ message GetUpdateStreamResponse { Queue queue = 1; QueueTrack queue_track = 2; PlayState play_state = 3; - uint32 volume = 4; + float volume = 4; bool mute = 5; TrackPosition position = 6; } diff --git a/crabidy-core/src/lib.rs b/crabidy-core/src/lib.rs index 405f42e..4d86e98 100644 --- a/crabidy-core/src/lib.rs +++ b/crabidy-core/src/lib.rs @@ -55,7 +55,7 @@ pub enum QueueError { } impl Queue { - pub fn current(&mut self) -> Option { + pub fn current(&self) -> Option { if self.current_position < self.tracks.len() as u32 { Some(self.tracks[self.current_position as usize].clone()) } else { diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index fe09a98..6df4feb 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -10,7 +10,7 @@ use crabidy_core::proto::crabidy::{ RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, ToggleShuffleRequest, - ToggleShuffleResponse, Track, + ToggleShuffleResponse, Track, TrackPosition, }; use crabidy_core::{ProviderClient, ProviderError}; use futures::TryStreamExt; @@ -149,24 +149,14 @@ impl ProviderOrchestrator { #[async_trait] impl ProviderClient for ProviderOrchestrator { async fn init(_s: &str) -> Result { - let state = Mutex::new(PlayState::Stopped); - let queue = Mutex::new(Queue { - timestamp: 0, - current_position: 0, - tracks: Vec::new(), - }); let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned()); let tidal_client = Arc::new(tidaldy::Client::init(&raw_toml_settings).await.unwrap()); let new_toml_config = tidal_client.settings(); fs::write("/tmp/tidaldy.toml", new_toml_config).unwrap(); - // let known_tracks = HashMap::new(); - // let known_nodes = HashMap::new(); let (provider_tx, provider_rx) = flume::bounded(100); Ok(Self { provider_rx, provider_tx, - // known_tracks, - // known_nodes, tidal_client, }) } @@ -200,6 +190,9 @@ impl ProviderClient for ProviderOrchestrator { #[derive(Debug)] enum PlaybackMessage { + Init { + result_tx: flume::Sender, + }, Replace { uuids: Vec, }, @@ -237,6 +230,13 @@ enum PlaybackMessage { StateChanged { state: GstPlaystate, }, + + MuteChanged { + muted: bool, + }, + PostitionChanged { + position: u32, + }, } #[derive(Debug)] @@ -280,6 +280,43 @@ impl Playback { tokio::spawn(async move { while let Ok(message) = self.playback_rx.recv_async().await { match message { + PlaybackMessage::Init { result_tx } => { + let response = { + let queue = self.queue.lock().unwrap(); + let queue_track = QueueTrack { + queue_position: queue.current_position, + track: queue.current(), + }; + let position = TrackPosition { + duration: self + .play + .duration() + .and_then(|t| Some(t.mseconds() as u32)) + .unwrap_or(0), + position: self + .play + .position() + .and_then(|t| Some(t.mseconds() as u32)) + .unwrap_or(0), + }; + let play_state = match *self.state.lock().unwrap() { + GstPlaystate::Playing => PlayState::Playing, + GstPlaystate::Paused => PlayState::Paused, + GstPlaystate::Stopped => PlayState::Stopped, + GstPlaystate::Buffering => PlayState::Loading, + _ => PlayState::Unspecified, + }; + InitResponse { + queue: Some(queue.clone()), + queue_track: Some(queue_track), + play_state: play_state as i32, + volume: self.play.volume() as f32, + mute: self.play.is_muted(), + position: Some(position), + } + }; + result_tx.send(response).unwrap(); + } PlaybackMessage::Replace { uuids } => { println!("Replace {:?}", uuids); let mut all_tracks = Vec::new(); @@ -510,6 +547,12 @@ impl Playback { println!("{:?}", err) }; } + PlaybackMessage::MuteChanged { muted } => { + todo!() + } + PlaybackMessage::PostitionChanged { position } => { + todo!() + } } } }); @@ -644,7 +687,23 @@ impl CrabidyService for RpcService { Pin> + Send>>; async fn init(&self, request: Request) -> Result, Status> { - todo!() + let playback_tx = self.playback_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let Ok(response) = playback_tx + .send_async(PlaybackMessage::Init { result_tx }) + .await + else { + return Err(Status::internal("Sending init failed internally")); + + }; + let Ok(response) = result_rx + .recv_async() + .await + else { + return Err(Status::internal("Init failed internally")); + + }; + Ok(Response::new(response)) } async fn get_library_node(