Add shuffle and repeat for server

This commit is contained in:
Hans Mündelein 2023-06-09 16:02:35 +02:00
parent 191ed4eed2
commit 9be9039a05
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
7 changed files with 102 additions and 138 deletions

1
Cargo.lock generated
View File

@ -532,6 +532,7 @@ dependencies = [
"futures", "futures",
"log", "log",
"once_cell", "once_cell",
"rand",
"serde", "serde",
"serde_json", "serde_json",
"tidaldy", "tidaldy",

View File

@ -15,6 +15,8 @@ service CrabidyService {
rpc Remove(RemoveRequest) returns (RemoveResponse); rpc Remove(RemoveRequest) returns (RemoveResponse);
rpc Insert(InsertRequest) returns (InsertResponse); rpc Insert(InsertRequest) returns (InsertResponse);
rpc SetCurrent(SetCurrentRequest) returns (SetCurrentResponse); rpc SetCurrent(SetCurrentRequest) returns (SetCurrentResponse);
rpc ToggleShuffle(ToggleShuffleRequest) returns (ToggleShuffleResponse);
rpc ToggleRepeat(ToggleRepeatRequest) returns (ToggleRepeatResponse);
rpc GetUpdateStream(GetUpdateStreamRequest) returns (stream GetUpdateStreamResponse); rpc GetUpdateStream(GetUpdateStreamRequest) returns (stream GetUpdateStreamResponse);
rpc SaveQueue(SaveQueueRequest) returns (SaveQueueResponse); rpc SaveQueue(SaveQueueRequest) returns (SaveQueueResponse);
@ -23,7 +25,6 @@ service CrabidyService {
rpc Stop(StopRequest) returns (StopResponse); rpc Stop(StopRequest) returns (StopResponse);
rpc ChangeVolume(ChangeVolumeRequest) returns (ChangeVolumeResponse); rpc ChangeVolume(ChangeVolumeRequest) returns (ChangeVolumeResponse);
rpc ToggleMute(ToggleMuteRequest) returns (ToggleMuteResponse); rpc ToggleMute(ToggleMuteRequest) returns (ToggleMuteResponse);
rpc ToggleShuffle(ToggleShuffleRequest) returns (ToggleShuffleResponse);
rpc Next(NextRequest) returns (NextResponse); rpc Next(NextRequest) returns (NextResponse);
rpc Prev(PrevRequest) returns (PrevResponse); rpc Prev(PrevRequest) returns (PrevResponse);
rpc RestartTrack(RestartTrackRequest) returns (RestartTrackResponse); rpc RestartTrack(RestartTrackRequest) returns (RestartTrackResponse);
@ -80,6 +81,12 @@ message SetCurrentRequest {
} }
message SetCurrentResponse {} message SetCurrentResponse {}
message ToggleShuffleRequest {}
message ToggleShuffleResponse {}
message ToggleRepeatRequest {}
message ToggleRepeatResponse {}
message SaveQueueRequest { message SaveQueueRequest {
string name = 1; string name = 1;
} }
@ -113,9 +120,6 @@ message ChangeVolumeResponse {}
message ToggleMuteRequest {} message ToggleMuteRequest {}
message ToggleMuteResponse {} message ToggleMuteResponse {}
message ToggleShuffleRequest {}
message ToggleShuffleResponse {}
message NextRequest {} message NextRequest {}
message NextResponse {} message NextResponse {}
@ -136,6 +140,8 @@ message Queue {
uint32 current_position = 2; uint32 current_position = 2;
// Without album // Without album
repeated Track tracks = 3; repeated Track tracks = 3;
bool shuffle = 4;
bool repeat = 5;
} }
message QueueTrack { message QueueTrack {

View File

@ -1,7 +1,7 @@
pub mod proto;
use async_trait::async_trait; use async_trait::async_trait;
use proto::crabidy::{LibraryNode, LibraryNodeChild, Queue, Track}; use proto::crabidy::{LibraryNode, LibraryNodeChild, Track};
pub mod proto;
#[async_trait] #[async_trait]
pub trait ProviderClient: std::fmt::Debug + Send + Sync { pub trait ProviderClient: std::fmt::Debug + Send + Sync {
@ -54,90 +54,3 @@ impl LibraryNodeChild {
pub enum QueueError { pub enum QueueError {
NotQueable, NotQueable,
} }
impl Queue {
pub fn current_track(&self) -> Option<Track> {
if self.current_position < self.tracks.len() as u32 {
Some(self.tracks[self.current_position as usize].clone())
} else {
None
}
}
pub fn next_track(&mut self) -> Option<Track> {
if self.current_position < self.tracks.len() as u32 - 1 {
self.current_position += 1;
Some(self.tracks[self.current_position as usize].clone())
} else {
None
}
}
pub fn prev_track(&mut self) -> Option<Track> {
if 0 < self.current_position {
self.current_position -= 1;
Some(self.tracks[self.current_position as usize].clone())
} else {
None
}
}
pub fn set_current_position(&mut self, current_position: u32) -> bool {
if current_position < self.tracks.len() as u32 {
self.current_position = current_position;
true
} else {
false
}
}
pub fn replace_with_tracks(&mut self, tracks: &[Track]) -> Option<Track> {
self.current_position = 0;
self.tracks = tracks.to_vec();
if 0 < self.tracks.len() as u32 {
Some(self.tracks[0].clone())
} else {
None
}
}
pub fn append_tracks(&mut self, tracks: &[Track]) {
self.tracks.extend(tracks.iter().cloned());
}
pub fn queue_tracks(&mut self, tracks: &[Track]) {
let tail: Vec<Track> = self
.tracks
.splice((self.current_position as usize + 1).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
}
pub fn remove_tracks(&mut self, positions: &[u32]) -> Option<Track> {
let mut play_next = false;
for pos in positions {
if pos == &self.current_position {
play_next = true;
}
if pos < &self.current_position {
self.current_position -= 1;
}
if *pos < self.tracks.len() as u32 {
self.tracks.remove(*pos as usize);
}
}
if play_next {
self.current_track()
} else {
None
}
}
pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) {
let tail: Vec<Track> = self
.tracks
.splice((position as usize + 1).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
}
}

View File

@ -3,6 +3,15 @@ name = "crabidy-server"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[lib]
name = "crabidy_server"
path = "src/lib.rs"
[[bin]]
name = "crabidy_server"
path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
@ -25,3 +34,4 @@ tracing-subscriber = "0.3.17"
tracing-appender = "0.2.2" tracing-appender = "0.2.2"
tracing-log = "0.1.3" tracing-log = "0.1.3"
log = "0.4.18" log = "0.4.18"
rand = "0.8.5"

View File

@ -174,10 +174,13 @@ pub enum PlaybackMessage {
position: u32, position: u32,
span: Span, span: Span,
}, },
TogglePlay { ToggleShuffle {
span: Span, span: Span,
}, },
ToggleShuffle { ToggleRepeat {
span: Span,
},
TogglePlay {
span: Span, span: Span,
}, },
Stop { Stop {

View File

@ -2,10 +2,11 @@ use crate::PlaybackMessage;
use crate::ProviderMessage; use crate::ProviderMessage;
use audio_player::Player; use audio_player::Player;
use crabidy_core::proto::crabidy::{ use crabidy_core::proto::crabidy::{
get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack, get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, QueueTrack, Track,
Track, TrackPosition, TrackPosition,
}; };
use crabidy_core::ProviderError; use crabidy_core::ProviderError;
use crabidy_server::QueueManager;
use std::sync::Mutex; use std::sync::Mutex;
use tracing::debug_span; use tracing::debug_span;
use tracing::{debug, error, instrument, trace, warn, Instrument}; use tracing::{debug, error, instrument, trace, warn, Instrument};
@ -15,7 +16,7 @@ pub struct Playback {
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
pub playback_tx: flume::Sender<PlaybackMessage>, pub playback_tx: flume::Sender<PlaybackMessage>,
playback_rx: flume::Receiver<PlaybackMessage>, playback_rx: flume::Receiver<PlaybackMessage>,
queue: Mutex<Queue>, queue: Mutex<QueueManager>,
state: Mutex<PlayState>, state: Mutex<PlayState>,
pub player: Player, pub player: Player,
} }
@ -26,11 +27,7 @@ impl Playback {
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
) -> Self { ) -> Self {
let (playback_tx, playback_rx) = flume::bounded(10); let (playback_tx, playback_rx) = flume::bounded(10);
let queue = Mutex::new(Queue { let queue = Mutex::new(QueueManager::new());
timestamp: 0,
current_position: 0,
tracks: Vec::new(),
});
let state = Mutex::new(PlayState::Stopped); let state = Mutex::new(PlayState::Stopped);
let player = Player::default(); let player = Player::default();
Self { Self {
@ -54,7 +51,7 @@ impl Playback {
let queue = self.queue.lock().unwrap(); let queue = self.queue.lock().unwrap();
debug!("got queue lock"); debug!("got queue lock");
let queue_track = QueueTrack { let queue_track = QueueTrack {
queue_position: queue.current_position, queue_position: queue.current_position() as u32,
track: queue.current_track(), track: queue.current_track(),
}; };
trace!("queue_track {:?}", queue_track); trace!("queue_track {:?}", queue_track);
@ -78,7 +75,7 @@ impl Playback {
trace!("play_state {:?}", play_state); trace!("play_state {:?}", play_state);
debug!("released play state lock"); debug!("released play state lock");
InitResponse { InitResponse {
queue: Some(queue.clone()), queue: Some(queue.clone().into()),
queue_track: Some(queue_track), queue_track: Some(queue_track),
play_state: play_state as i32, play_state: play_state as i32,
volume: 0.0, volume: 0.0,
@ -108,9 +105,8 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
debug!("got queue lock"); debug!("got queue lock");
queue.replace_with_tracks(&all_tracks); queue.replace_with_tracks(&all_tracks);
queue.set_current_position(0);
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone()); let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap(); queue_update_tx.send(update).unwrap();
queue.current_track() queue.current_track()
}; };
@ -138,7 +134,7 @@ impl Playback {
debug!("got queue lock"); debug!("got queue lock");
queue.queue_tracks(&all_tracks); queue.queue_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone()); let update = StreamUpdate::Queue(queue.clone().into());
if let Err(err) = queue_update_tx.send(update) { if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err) error!("{:?}", err)
} }
@ -166,7 +162,7 @@ impl Playback {
debug!("got queue lock"); debug!("got queue lock");
queue.append_tracks(&all_tracks); queue.append_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone()); let update = StreamUpdate::Queue(queue.clone().into());
if let Err(err) = queue_update_tx.send(update) { if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err) error!("{:?}", err)
} }
@ -182,7 +178,7 @@ impl Playback {
debug!("got queue lock"); debug!("got queue lock");
let track = queue.remove_tracks(&positions); let track = queue.remove_tracks(&positions);
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone()); let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap(); queue_update_tx.send(update).unwrap();
track track
}; };
@ -214,7 +210,7 @@ impl Playback {
debug!("got queue lock"); debug!("got queue lock");
queue.insert_tracks(position, &all_tracks); queue.insert_tracks(position, &all_tracks);
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone()); let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap(); queue_update_tx.send(update).unwrap();
} }
debug!("queue lock released"); debug!("queue lock released");
@ -236,6 +232,30 @@ impl Playback {
self.play(track).in_current_span().await; self.play(track).in_current_span().await;
} }
PlaybackMessage::ToggleShuffle { span } => {
let _e = span.enter();
debug!("toggling shuffle");
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
if queue.shuffle {
queue.shuffle_on()
} else {
queue.shuffle_off()
}
}
PlaybackMessage::ToggleRepeat { span } => {
let _e = span.enter();
debug!("toggling repeat");
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
if queue.repeat {
queue.repeat = false
} else {
queue.repeat = true
}
}
PlaybackMessage::TogglePlay { span } => { PlaybackMessage::TogglePlay { span } => {
let _e = span.enter(); let _e = span.enter();
debug!("toggling play"); debug!("toggling play");
@ -280,12 +300,6 @@ impl Playback {
// self.player.set_mute(!muted); // self.player.set_mute(!muted);
} }
PlaybackMessage::ToggleShuffle { span } => {
let _e = span.enter();
debug!("toggling shuffle");
todo!()
}
PlaybackMessage::Next { span } => { PlaybackMessage::Next { span } => {
let _e = span.enter(); let _e = span.enter();
debug!("nexting"); debug!("nexting");
@ -460,7 +474,7 @@ impl Playback {
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let track = queue.current_track(); let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack { let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: queue.current_position, queue_position: queue.current_position() as u32,
track, track,
}); });
if let Err(err) = queue_update_tx.send(update) { if let Err(err) = queue_update_tx.send(update) {
@ -500,7 +514,7 @@ impl Playback {
let queue_update_tx = self.update_tx.clone(); let queue_update_tx = self.update_tx.clone();
let track = queue.current_track(); let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack { let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: queue.current_position, queue_position: queue.current_position() as u32,
track, track,
}); });
if let Err(err) = queue_update_tx.send(update) { if let Err(err) = queue_update_tx.send(update) {

View File

@ -8,7 +8,7 @@ use crabidy_core::proto::crabidy::{
ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse, SaveQueueRequest,
SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse,
ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse,
ToggleShuffleRequest, ToggleShuffleResponse, ToggleRepeatRequest, ToggleRepeatResponse, ToggleShuffleRequest, ToggleShuffleResponse,
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
use std::pin::Pin; use std::pin::Pin;
@ -220,6 +220,40 @@ impl CrabidyService for RpcService {
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
#[instrument(skip(self, _request))]
async fn toggle_shuffle(
&self,
_request: tonic::Request<ToggleShuffleRequest>,
) -> std::result::Result<tonic::Response<ToggleShuffleResponse>, tonic::Status> {
debug!("Received toggle_shuffle request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::ToggleShuffle { span })
.in_current_span()
.await
.unwrap();
let reply = ToggleShuffleResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn toggle_repeat(
&self,
_request: tonic::Request<ToggleRepeatRequest>,
) -> std::result::Result<tonic::Response<ToggleRepeatResponse>, tonic::Status> {
debug!("Received toggle_shuffle request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::ToggleRepeat { span })
.in_current_span()
.await
.unwrap();
let reply = ToggleRepeatResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))] #[instrument(skip(self, _request))]
async fn get_update_stream( async fn get_update_stream(
&self, &self,
@ -272,23 +306,6 @@ impl CrabidyService for RpcService {
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
#[instrument(skip(self, _request))]
async fn toggle_shuffle(
&self,
_request: tonic::Request<ToggleShuffleRequest>,
) -> std::result::Result<tonic::Response<ToggleShuffleResponse>, tonic::Status> {
debug!("Received toggle_shuffle request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::ToggleShuffle { span })
.in_current_span()
.await
.unwrap();
let reply = ToggleShuffleResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))] #[instrument(skip(self, _request))]
async fn stop( async fn stop(
&self, &self,