From 9be9039a051d18a5f7df7cdb70ed5f725aefee54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Fri, 9 Jun 2023 16:02:35 +0200 Subject: [PATCH] Add shuffle and repeat for server --- Cargo.lock | 1 + crabidy-core/crabidy/v1/crabidy.proto | 14 ++-- crabidy-core/src/lib.rs | 93 +-------------------------- crabidy-server/Cargo.toml | 10 +++ crabidy-server/src/main.rs | 7 +- crabidy-server/src/playback.rs | 62 +++++++++++------- crabidy-server/src/rpc.rs | 53 +++++++++------ 7 files changed, 102 insertions(+), 138 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd90390..d2b6157 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,7 @@ dependencies = [ "futures", "log", "once_cell", + "rand", "serde", "serde_json", "tidaldy", diff --git a/crabidy-core/crabidy/v1/crabidy.proto b/crabidy-core/crabidy/v1/crabidy.proto index 3a14903..fea0eaa 100644 --- a/crabidy-core/crabidy/v1/crabidy.proto +++ b/crabidy-core/crabidy/v1/crabidy.proto @@ -15,6 +15,8 @@ service CrabidyService { rpc Remove(RemoveRequest) returns (RemoveResponse); rpc Insert(InsertRequest) returns (InsertResponse); rpc SetCurrent(SetCurrentRequest) returns (SetCurrentResponse); + rpc ToggleShuffle(ToggleShuffleRequest) returns (ToggleShuffleResponse); + rpc ToggleRepeat(ToggleRepeatRequest) returns (ToggleRepeatResponse); rpc GetUpdateStream(GetUpdateStreamRequest) returns (stream GetUpdateStreamResponse); rpc SaveQueue(SaveQueueRequest) returns (SaveQueueResponse); @@ -23,7 +25,6 @@ service CrabidyService { rpc Stop(StopRequest) returns (StopResponse); rpc ChangeVolume(ChangeVolumeRequest) returns (ChangeVolumeResponse); rpc ToggleMute(ToggleMuteRequest) returns (ToggleMuteResponse); - rpc ToggleShuffle(ToggleShuffleRequest) returns (ToggleShuffleResponse); rpc Next(NextRequest) returns (NextResponse); rpc Prev(PrevRequest) returns (PrevResponse); rpc RestartTrack(RestartTrackRequest) returns (RestartTrackResponse); @@ -80,6 +81,12 @@ message SetCurrentRequest { } message SetCurrentResponse {} +message ToggleShuffleRequest {} +message ToggleShuffleResponse {} + +message ToggleRepeatRequest {} +message ToggleRepeatResponse {} + message SaveQueueRequest { string name = 1; } @@ -113,9 +120,6 @@ message ChangeVolumeResponse {} message ToggleMuteRequest {} message ToggleMuteResponse {} -message ToggleShuffleRequest {} -message ToggleShuffleResponse {} - message NextRequest {} message NextResponse {} @@ -136,6 +140,8 @@ message Queue { uint32 current_position = 2; // Without album repeated Track tracks = 3; + bool shuffle = 4; + bool repeat = 5; } message QueueTrack { diff --git a/crabidy-core/src/lib.rs b/crabidy-core/src/lib.rs index 3f68780..e56d42a 100644 --- a/crabidy-core/src/lib.rs +++ b/crabidy-core/src/lib.rs @@ -1,7 +1,7 @@ -pub mod proto; - use async_trait::async_trait; -use proto::crabidy::{LibraryNode, LibraryNodeChild, Queue, Track}; +use proto::crabidy::{LibraryNode, LibraryNodeChild, Track}; + +pub mod proto; #[async_trait] pub trait ProviderClient: std::fmt::Debug + Send + Sync { @@ -54,90 +54,3 @@ impl LibraryNodeChild { pub enum QueueError { NotQueable, } - -impl Queue { - pub fn current_track(&self) -> Option { - if self.current_position < self.tracks.len() as u32 { - Some(self.tracks[self.current_position as usize].clone()) - } else { - None - } - } - - pub fn next_track(&mut self) -> Option { - if self.current_position < self.tracks.len() as u32 - 1 { - self.current_position += 1; - Some(self.tracks[self.current_position as usize].clone()) - } else { - None - } - } - - pub fn prev_track(&mut self) -> Option { - if 0 < self.current_position { - self.current_position -= 1; - Some(self.tracks[self.current_position as usize].clone()) - } else { - None - } - } - - pub fn set_current_position(&mut self, current_position: u32) -> bool { - if current_position < self.tracks.len() as u32 { - self.current_position = current_position; - true - } else { - false - } - } - - pub fn replace_with_tracks(&mut self, tracks: &[Track]) -> Option { - self.current_position = 0; - self.tracks = tracks.to_vec(); - if 0 < self.tracks.len() as u32 { - Some(self.tracks[0].clone()) - } else { - None - } - } - - pub fn append_tracks(&mut self, tracks: &[Track]) { - self.tracks.extend(tracks.iter().cloned()); - } - - pub fn queue_tracks(&mut self, tracks: &[Track]) { - let tail: Vec = self - .tracks - .splice((self.current_position as usize + 1).., tracks.to_vec()) - .collect(); - self.tracks.extend(tail); - } - - pub fn remove_tracks(&mut self, positions: &[u32]) -> Option { - let mut play_next = false; - for pos in positions { - if pos == &self.current_position { - play_next = true; - } - if pos < &self.current_position { - self.current_position -= 1; - } - if *pos < self.tracks.len() as u32 { - self.tracks.remove(*pos as usize); - } - } - if play_next { - self.current_track() - } else { - None - } - } - - pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) { - let tail: Vec = self - .tracks - .splice((position as usize + 1).., tracks.to_vec()) - .collect(); - self.tracks.extend(tail); - } -} diff --git a/crabidy-server/Cargo.toml b/crabidy-server/Cargo.toml index d0c905c..b0556e8 100644 --- a/crabidy-server/Cargo.toml +++ b/crabidy-server/Cargo.toml @@ -3,6 +3,15 @@ name = "crabidy-server" version = "0.1.0" edition = "2021" + +[lib] +name = "crabidy_server" +path = "src/lib.rs" + +[[bin]] +name = "crabidy_server" +path = "src/main.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -25,3 +34,4 @@ tracing-subscriber = "0.3.17" tracing-appender = "0.2.2" tracing-log = "0.1.3" log = "0.4.18" +rand = "0.8.5" diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index a35c90d..e74a741 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -174,10 +174,13 @@ pub enum PlaybackMessage { position: u32, span: Span, }, - TogglePlay { + ToggleShuffle { span: Span, }, - ToggleShuffle { + ToggleRepeat { + span: Span, + }, + TogglePlay { span: Span, }, Stop { diff --git a/crabidy-server/src/playback.rs b/crabidy-server/src/playback.rs index 6d1b1a0..cd601e1 100644 --- a/crabidy-server/src/playback.rs +++ b/crabidy-server/src/playback.rs @@ -2,10 +2,11 @@ use crate::PlaybackMessage; use crate::ProviderMessage; use audio_player::Player; use crabidy_core::proto::crabidy::{ - get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack, - Track, TrackPosition, + get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, QueueTrack, Track, + TrackPosition, }; use crabidy_core::ProviderError; +use crabidy_server::QueueManager; use std::sync::Mutex; use tracing::debug_span; use tracing::{debug, error, instrument, trace, warn, Instrument}; @@ -15,7 +16,7 @@ pub struct Playback { provider_tx: flume::Sender, pub playback_tx: flume::Sender, playback_rx: flume::Receiver, - queue: Mutex, + queue: Mutex, state: Mutex, pub player: Player, } @@ -26,11 +27,7 @@ impl Playback { provider_tx: flume::Sender, ) -> Self { let (playback_tx, playback_rx) = flume::bounded(10); - let queue = Mutex::new(Queue { - timestamp: 0, - current_position: 0, - tracks: Vec::new(), - }); + let queue = Mutex::new(QueueManager::new()); let state = Mutex::new(PlayState::Stopped); let player = Player::default(); Self { @@ -54,7 +51,7 @@ impl Playback { let queue = self.queue.lock().unwrap(); debug!("got queue lock"); let queue_track = QueueTrack { - queue_position: queue.current_position, + queue_position: queue.current_position() as u32, track: queue.current_track(), }; trace!("queue_track {:?}", queue_track); @@ -78,7 +75,7 @@ impl Playback { trace!("play_state {:?}", play_state); debug!("released play state lock"); InitResponse { - queue: Some(queue.clone()), + queue: Some(queue.clone().into()), queue_track: Some(queue_track), play_state: play_state as i32, volume: 0.0, @@ -108,9 +105,8 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.replace_with_tracks(&all_tracks); - queue.set_current_position(0); let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); + let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); queue.current_track() }; @@ -138,7 +134,7 @@ impl Playback { debug!("got queue lock"); queue.queue_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); + let update = StreamUpdate::Queue(queue.clone().into()); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } @@ -166,7 +162,7 @@ impl Playback { debug!("got queue lock"); queue.append_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); + let update = StreamUpdate::Queue(queue.clone().into()); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } @@ -182,7 +178,7 @@ impl Playback { debug!("got queue lock"); let track = queue.remove_tracks(&positions); let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); + let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); track }; @@ -214,7 +210,7 @@ impl Playback { debug!("got queue lock"); queue.insert_tracks(position, &all_tracks); let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); + let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); } debug!("queue lock released"); @@ -236,6 +232,30 @@ impl Playback { self.play(track).in_current_span().await; } + PlaybackMessage::ToggleShuffle { span } => { + let _e = span.enter(); + debug!("toggling shuffle"); + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + if queue.shuffle { + queue.shuffle_on() + } else { + queue.shuffle_off() + } + } + + PlaybackMessage::ToggleRepeat { span } => { + let _e = span.enter(); + debug!("toggling repeat"); + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + if queue.repeat { + queue.repeat = false + } else { + queue.repeat = true + } + } + PlaybackMessage::TogglePlay { span } => { let _e = span.enter(); debug!("toggling play"); @@ -280,12 +300,6 @@ impl Playback { // self.player.set_mute(!muted); } - PlaybackMessage::ToggleShuffle { span } => { - let _e = span.enter(); - debug!("toggling shuffle"); - todo!() - } - PlaybackMessage::Next { span } => { let _e = span.enter(); debug!("nexting"); @@ -460,7 +474,7 @@ impl Playback { let queue_update_tx = self.update_tx.clone(); let track = queue.current_track(); let update = StreamUpdate::QueueTrack(QueueTrack { - queue_position: queue.current_position, + queue_position: queue.current_position() as u32, track, }); if let Err(err) = queue_update_tx.send(update) { @@ -500,7 +514,7 @@ impl Playback { let queue_update_tx = self.update_tx.clone(); let track = queue.current_track(); let update = StreamUpdate::QueueTrack(QueueTrack { - queue_position: queue.current_position, + queue_position: queue.current_position() as u32, track, }); if let Err(err) = queue_update_tx.send(update) { diff --git a/crabidy-server/src/rpc.rs b/crabidy-server/src/rpc.rs index 036fac3..d1acbef 100644 --- a/crabidy-server/src/rpc.rs +++ b/crabidy-server/src/rpc.rs @@ -8,7 +8,7 @@ use crabidy_core::proto::crabidy::{ ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, - ToggleShuffleRequest, ToggleShuffleResponse, + ToggleRepeatRequest, ToggleRepeatResponse, ToggleShuffleRequest, ToggleShuffleResponse, }; use futures::TryStreamExt; use std::pin::Pin; @@ -220,6 +220,40 @@ impl CrabidyService for RpcService { Ok(Response::new(reply)) } + #[instrument(skip(self, _request))] + async fn toggle_shuffle( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received toggle_shuffle request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::ToggleShuffle { span }) + .in_current_span() + .await + .unwrap(); + let reply = ToggleShuffleResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn toggle_repeat( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received toggle_shuffle request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::ToggleRepeat { span }) + .in_current_span() + .await + .unwrap(); + let reply = ToggleRepeatResponse {}; + Ok(Response::new(reply)) + } + #[instrument(skip(self, _request))] async fn get_update_stream( &self, @@ -272,23 +306,6 @@ impl CrabidyService for RpcService { Ok(Response::new(reply)) } - #[instrument(skip(self, _request))] - async fn toggle_shuffle( - &self, - _request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - debug!("Received toggle_shuffle request"); - let playback_tx = self.playback_tx.clone(); - let span = debug_span!("play-chan"); - playback_tx - .send_async(PlaybackMessage::ToggleShuffle { span }) - .in_current_span() - .await - .unwrap(); - let reply = ToggleShuffleResponse {}; - Ok(Response::new(reply)) - } - #[instrument(skip(self, _request))] async fn stop( &self,