From c3654ade89673708ce6ecfde1f7a87716b7a3f1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Fri, 2 Jun 2023 21:06:35 +0200 Subject: [PATCH] Implement curretn proto spec for server --- crabidy-server/src/main.rs | 152 +++++++++++++++++++++++-------------- 1 file changed, 96 insertions(+), 56 deletions(-) diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index 5e2b2b5..9a2858c 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -34,10 +34,21 @@ fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender) { Ok(PlayMessage::StateChanged { state }) => { tx.send(PlaybackMessage::StateChanged { state }).unwrap(); } - Ok(PlayMessage::PositionUpdated { position }) => {} + Ok(PlayMessage::PositionUpdated { position }) => { + let position = position + .and_then(|t| Some(t.mseconds() as u32)) + .unwrap_or(0); + tx.send(PlaybackMessage::PostitionChanged { position }) + .unwrap(); + } Ok(PlayMessage::Buffering { percent }) => {} - Ok(PlayMessage::VolumeChanged { volume }) => {} - Ok(PlayMessage::MuteChanged { muted }) => {} + Ok(PlayMessage::VolumeChanged { volume }) => { + let volume = volume as f32; + tx.send(PlaybackMessage::VolumeChanged { volume }).unwrap(); + } + Ok(PlayMessage::MuteChanged { muted }) => { + tx.send(PlaybackMessage::MuteChanged { muted }).unwrap(); + } Ok(PlayMessage::MediaInfoUpdated { info }) => {} _ => println!("Unknown message: {:?}", msg), @@ -140,8 +151,10 @@ impl ProviderOrchestrator { let Ok(node) = self.get_lib_node(&node_uuid).await else { continue }; - tracks.extend(node.tracks); - nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid)) + if node.is_queable { + tracks.extend(node.tracks); + nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid)) + } } tracks } @@ -227,12 +240,6 @@ enum PlaybackMessage { SetCurrent { position: u32, }, - GetQueue { - result_tx: flume::Sender, - }, - GetQueueTrack { - result_tx: flume::Sender, - }, TogglePlay, ToggleShuffle, Stop, @@ -242,9 +249,13 @@ enum PlaybackMessage { ToggleMute, Next, Prev, + RestartTrack, StateChanged { state: GstPlaystate, }, + VolumeChanged { + volume: f32, + }, MuteChanged { muted: bool, @@ -333,7 +344,6 @@ impl Playback { result_tx.send(response).unwrap(); } PlaybackMessage::Replace { uuids } => { - println!("Replace {:?}", uuids); let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { @@ -356,6 +366,14 @@ impl Playback { if let Err(err) = queue_update_tx.send(update) { println!("{:?}", err) } + let track = queue.current(); + let update = StreamUpdate::QueueTrack(QueueTrack { + queue_position: queue.current_position, + track, + }); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) + } queue.current() }; if let Some(track) = current { @@ -364,61 +382,65 @@ impl Playback { } PlaybackMessage::Queue { uuids } => { + let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { + println!("Track {}", uuid); if let Ok(track) = self.get_track(&uuid).await { - let mut queue = self.queue.lock().unwrap(); - queue.queue_tracks(&[track]); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - }; + all_tracks.push(track); } } else { + println!("Node {}", uuid); let tracks = self.flatten_node(&uuid).await; - let mut queue = self.queue.lock().unwrap(); - queue.queue_tracks(&tracks); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - }; + all_tracks.extend(tracks); + } + } + { + let mut queue = self.queue.lock().unwrap(); + queue.queue_tracks(&all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) } } } PlaybackMessage::Append { uuids } => { + let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { + println!("Track {}", uuid); if let Ok(track) = self.get_track(&uuid).await { - let mut queue = self.queue.lock().unwrap(); - queue.append_tracks(&[track]); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - }; + all_tracks.push(track); } } else { + println!("Node {}", uuid); let tracks = self.flatten_node(&uuid).await; - let mut queue = self.queue.lock().unwrap(); - queue.append_tracks(&tracks); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - queue_update_tx.send(update).unwrap(); + all_tracks.extend(tracks); + } + } + { + let mut queue = self.queue.lock().unwrap(); + queue.append_tracks(&all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) } } } - //TODO handle deletion of current track PlaybackMessage::Remove { positions } => { let mut queue = self.queue.lock().unwrap(); queue.remove_tracks(&positions); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone()); queue_update_tx.send(update).unwrap(); + if positions.contains(&queue.current_position) { + let playback_tx = self.playback_tx.clone(); + playback_tx.send(PlaybackMessage::Next).unwrap(); + } } PlaybackMessage::Insert { position, uuids } => { @@ -462,16 +484,6 @@ impl Playback { } } - PlaybackMessage::GetQueue { result_tx } => { - let queue = self.queue.lock().unwrap(); - result_tx.send(queue.clone()).unwrap(); - } - - PlaybackMessage::GetQueueTrack { result_tx } => { - let current = self.get_queue_track().await; - result_tx.send(current).unwrap(); - } - PlaybackMessage::TogglePlay => { let mut state = self.state.lock().unwrap(); if *state == GstPlaystate::Playing { @@ -504,7 +516,8 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); let position = queue.current_position + 1; let stop = !queue.set_current_position(position); - (queue.current(), stop, position) + let pos = queue.current_position; + (queue.current(), stop, pos) }; let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::QueueTrack(QueueTrack { @@ -527,7 +540,8 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); let position = queue.current_position - 1; let stop = !queue.set_current_position(position); - (queue.current(), stop, position) + let pos = queue.current_position; + (queue.current(), stop, pos) }; let update = StreamUpdate::QueueTrack(QueueTrack { queue_position: pos, @@ -549,7 +563,6 @@ impl Playback { PlaybackMessage::StateChanged { state } => { *self.state.lock().unwrap() = state.clone(); let active_track_tx = self.update_tx.clone(); - let active_track = self.get_queue_track().await; let play_state = match state { GstPlaystate::Playing => PlayState::Playing, GstPlaystate::Paused => PlayState::Paused, @@ -562,11 +575,35 @@ impl Playback { println!("{:?}", err) }; } + PlaybackMessage::RestartTrack => { + self.play.stop(); + self.play.play(); + } + PlaybackMessage::VolumeChanged { volume } => { + let update_tx = self.update_tx.clone(); + let update = StreamUpdate::Volume(volume); + if let Err(err) = update_tx.send(update) { + println!("{:?}", err) + } + } PlaybackMessage::MuteChanged { muted } => { - todo!() + let update_tx = self.update_tx.clone(); + let update = StreamUpdate::Mute(muted); + if let Err(err) = update_tx.send(update) { + println!("{:?}", err) + } } PlaybackMessage::PostitionChanged { position } => { - todo!() + let update_tx = self.update_tx.clone(); + let duration = self + .play + .duration() + .and_then(|t| Some(t.mseconds() as u32)) + .unwrap_or(0); + let update = StreamUpdate::Position(TrackPosition { duration, position }); + if let Err(err) = update_tx.send(update) { + println!("{:?}", err) + } } } } @@ -961,7 +998,10 @@ impl CrabidyService for RpcService { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); - playback_tx.send_async(PlaybackMessage::Prev).await.unwrap(); + playback_tx + .send_async(PlaybackMessage::RestartTrack) + .await + .unwrap(); let reply = RestartTrackResponse {}; Ok(Response::new(reply)) }