From 5a42ddfbdb06a70415f3849f0558308355c32ed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Fri, 2 Jun 2023 18:41:19 +0200 Subject: [PATCH] Adjust server to new proto spec --- cbd-tui/src/main.rs | 6 +- crabidy-core/crabidy/v1/crabidy.proto | 7 +- crabidy-core/src/lib.rs | 10 +- crabidy-server/src/main.rs | 705 ++++++++++++++------------ tidaldy/src/lib.rs | 25 +- tidaldy/src/models.rs | 18 +- 6 files changed, 421 insertions(+), 350 deletions(-) diff --git a/cbd-tui/src/main.rs b/cbd-tui/src/main.rs index 5d9cdc3..d283047 100644 --- a/cbd-tui/src/main.rs +++ b/cbd-tui/src/main.rs @@ -349,7 +349,7 @@ impl App { fn new() -> App { let mut library = LibraryView { title: "Library".to_string(), - uuid: "/".to_string(), + uuid: "node:/".to_string(), list: Vec::new(), list_state: ListState::default(), positions: HashMap::new(), @@ -443,9 +443,9 @@ async fn poll( async fn orchestrate<'a>( (tx, rx): (Sender, Receiver), ) -> Result<(), Box> { - let mut rpc_client = rpc::RpcClient::connect("http://127.0.0.1:50051").await?; + let mut rpc_client = rpc::RpcClient::connect("http://localhost:50051").await?; - if let Some(root_node) = rpc_client.get_library_node("/").await? { + if let Some(root_node) = rpc_client.get_library_node("node:/").await? { tx.send(MessageToUi::ReplaceLibraryNode(root_node.clone())); } diff --git a/crabidy-core/crabidy/v1/crabidy.proto b/crabidy-core/crabidy/v1/crabidy.proto index feac7d3..c3fab66 100644 --- a/crabidy-core/crabidy/v1/crabidy.proto +++ b/crabidy-core/crabidy/v1/crabidy.proto @@ -65,12 +65,13 @@ message AppendRequest { message AppendResponse {} message RemoveRequest { - repeated string uuid = 1; + repeated uint32 positions = 1; } message RemoveResponse {} message InsertRequest { - repeated string uuid = 1; + uint32 position = 1; + repeated string uuid = 2; } message InsertResponse {} @@ -105,7 +106,7 @@ message StopRequest {} message StopResponse {} message ChangeVolumeRequest { - int32 delta = 1; + float delta = 1; } message ChangeVolumeResponse {} diff --git a/crabidy-core/src/lib.rs b/crabidy-core/src/lib.rs index ee878a1..405f42e 100644 --- a/crabidy-core/src/lib.rs +++ b/crabidy-core/src/lib.rs @@ -34,7 +34,7 @@ impl std::fmt::Display for ProviderError { impl LibraryNode { pub fn new() -> Self { Self { - uuid: "/".to_string(), + uuid: "node:/".to_string(), title: "/".to_string(), children: Vec::new(), parent: None, @@ -105,4 +105,12 @@ impl Queue { } } } + + pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) { + let tail: Vec = self + .tracks + .splice((position as usize).., tracks.to_vec()) + .collect(); + self.tracks.extend(tail); + } } diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index e05a243..fe09a98 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -1,21 +1,20 @@ use async_trait::async_trait; use crabidy_core::proto::crabidy::{ crabidy_service_server::{CrabidyService, CrabidyServiceServer}, - get_queue_updates_response::QueueUpdateResult, - ActiveTrack, AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse, - GetActiveTrackRequest, GetActiveTrackResponse, GetLibraryNodeRequest, GetLibraryNodeResponse, - GetQueueRequest, GetQueueResponse, GetQueueUpdatesRequest, GetQueueUpdatesResponse, - GetTrackRequest, GetTrackResponse, GetTrackUpdatesRequest, GetTrackUpdatesResponse, - LibraryNode, LibraryNodeChild, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse, - QueuePositionChange, QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest, - RemoveTracksResponse, ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest, - ReplaceWithTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentTrackRequest, - SetCurrentTrackResponse, StopRequest, StopResponse, TogglePlayRequest, TogglePlayResponse, - Track, + get_update_stream_response::Update as StreamUpdate, + AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse, + GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse, + InitRequest, InitResponse, InsertRequest, InsertResponse, LibraryNode, LibraryNodeChild, + NextRequest, NextResponse, PlayState, PrevRequest, PrevResponse, Queue, QueueRequest, + QueueResponse, QueueTrack, RemoveRequest, RemoveResponse, ReplaceRequest, ReplaceResponse, + RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, SaveQueueResponse, + SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, ToggleMuteRequest, + ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, ToggleShuffleRequest, + ToggleShuffleResponse, Track, }; use crabidy_core::{ProviderClient, ProviderError}; use futures::TryStreamExt; -use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer}; +use gstreamer_play::{Play, PlayMessage, PlayState as GstPlaystate, PlayVideoRenderer}; use tokio_stream::StreamExt; use std::{ @@ -49,15 +48,10 @@ fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender) { async fn main() -> Result<(), Box> { gstreamer::init()?; - let (queue_update_tx, _) = tokio::sync::broadcast::channel(100); - let (active_track_tx, _) = tokio::sync::broadcast::channel(1000); + let (update_tx, _) = tokio::sync::broadcast::channel(2048); let orchestrator = ProviderOrchestrator::init("").await.unwrap(); - let playback = Playback::new( - active_track_tx.clone(), - queue_update_tx.clone(), - orchestrator.provider_tx.clone(), - ); + let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone()); let bus = playback.play.message_bus(); let playback_tx = playback.playback_tx.clone(); @@ -67,8 +61,7 @@ async fn main() -> Result<(), Box> { }); let crabidy_service = RpcService::new( - queue_update_tx, - active_track_tx, + update_tx, playback.playback_tx.clone(), orchestrator.provider_tx.clone(), ); @@ -159,7 +152,7 @@ impl ProviderClient for ProviderOrchestrator { let state = Mutex::new(PlayState::Stopped); let queue = Mutex::new(Queue { timestamp: 0, - current: 0, + current_position: 0, tracks: Vec::new(), }); let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned()); @@ -188,15 +181,17 @@ impl ProviderClient for ProviderOrchestrator { } fn get_lib_root(&self) -> LibraryNode { let mut root_node = LibraryNode::new(); - let child = LibraryNodeChild::new("tidal".to_owned(), "tidal".to_owned()); + let child = LibraryNodeChild::new("node:tidal".to_owned(), "tidal".to_owned()); root_node.children.push(child); + println!("Global root node {:?}", root_node); root_node } async fn get_lib_node(&self, uuid: &str) -> Result { - if uuid == "/" { + println!("get_lib_node {}", uuid); + if uuid == "node:/" { return Ok(self.get_lib_root()); } - if uuid == "tidal" { + if uuid == "node:tidal" { return Ok(self.tidal_client.get_lib_root()); } self.tidal_client.get_lib_node(uuid).await @@ -205,76 +200,73 @@ impl ProviderClient for ProviderOrchestrator { #[derive(Debug)] enum PlaybackMessage { - ReplaceWithTrack { - uuid: String, + Replace { + uuids: Vec, }, - ReplaceWithNode { - uuid: String, + Queue { + uuids: Vec, }, - QueueTrack { - uuid: String, + Append { + uuids: Vec, }, - QueueNode { - uuid: String, - }, - ClearQueue, - GetQueue { - result_tx: flume::Sender, - }, - AppendTrack { - uuid: String, - }, - AppendNode { - uuid: String, - }, - RemoveTracks { + Remove { positions: Vec, }, + Insert { + position: u32, + uuids: Vec, + }, SetCurrent { position: u32, }, - GetCurrent { - result_tx: flume::Sender, + GetQueue { + result_tx: flume::Sender, }, - Next, - PlayPause, + GetQueueTrack { + result_tx: flume::Sender, + }, + TogglePlay, + ToggleShuffle, Stop, + ChangeVolume { + delta: f32, + }, + ToggleMute, + Next, + Prev, StateChanged { - state: PlayState, + state: GstPlaystate, }, } #[derive(Debug)] struct Playback { - active_track_tx: tokio::sync::broadcast::Sender, - queue_update_tx: tokio::sync::broadcast::Sender, + update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, playback_tx: flume::Sender, playback_rx: flume::Receiver, queue: Mutex, - state: Mutex, + state: Mutex, play: Play, creation: std::time::Instant, } impl Playback { fn new( - active_track_tx: tokio::sync::broadcast::Sender, - queue_update_tx: tokio::sync::broadcast::Sender, + update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, ) -> Self { let (playback_tx, playback_rx) = flume::bounded(10); let queue = Mutex::new(Queue { timestamp: 0, - current: 0, + current_position: 0, tracks: Vec::new(), }); - let state = Mutex::new(PlayState::Stopped); + let state = Mutex::new(GstPlaystate::Stopped); let play = Play::new(None::); let creation = std::time::Instant::now(); Self { - active_track_tx, - queue_update_tx, + update_tx, provider_tx, playback_tx, playback_rx, @@ -288,128 +280,210 @@ impl Playback { tokio::spawn(async move { while let Ok(message) = self.playback_rx.recv_async().await { match message { - PlaybackMessage::ReplaceWithTrack { uuid } => { - if let Ok(track) = self.get_track(&uuid).await { - { - let mut queue = self.queue.lock().unwrap(); - queue.replace_with_tracks(&[track.clone()]); - let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + PlaybackMessage::Replace { uuids } => { + println!("Replace {:?}", uuids); + let mut all_tracks = Vec::new(); + for uuid in uuids { + if is_track(&uuid) { + println!("Track {}", uuid); + if let Ok(track) = self.get_track(&uuid).await { + all_tracks.push(track); + } + } else { + println!("Node {}", uuid); + let tracks = self.flatten_node(&uuid).await; + all_tracks.extend(tracks); } + } + let current = { + let mut queue = self.queue.lock().unwrap(); + queue.set_current_position(0); + queue.replace_with_tracks(&all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) + } + queue.current() + }; + if let Some(track) = current { self.play(track).await; } } - PlaybackMessage::ReplaceWithNode { uuid } => { - let tracks = self.flatten_node(&uuid).await; - { - let mut queue = self.queue.lock().unwrap(); - queue.replace_with_tracks(&tracks); - let queue_update_tx = self.queue_update_tx.clone(); - if let Err(err) = queue_update_tx.send(queue.clone()) { - println!("{:?}", err) - }; - } - if !tracks.is_empty() { - self.play(tracks[0].clone()).await; + PlaybackMessage::Queue { uuids } => { + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).await { + let mut queue = self.queue.lock().unwrap(); + queue.queue_tracks(&[track]); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) + }; + } + } else { + let tracks = self.flatten_node(&uuid).await; + let mut queue = self.queue.lock().unwrap(); + queue.queue_tracks(&tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) + }; + } } } - PlaybackMessage::QueueTrack { uuid } => { - if let Ok(track) = self.get_track(&uuid).await { - let mut queue = self.queue.lock().unwrap(); - queue.queue_tracks(&[track]); - let queue_update_tx = self.queue_update_tx.clone(); - // queue_update_tx.send(queue.clone()).unwrap(); - if let Err(err) = queue_update_tx.send(queue.clone()) { - println!("{:?}", err) - }; + PlaybackMessage::Append { uuids } => { + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).await { + let mut queue = self.queue.lock().unwrap(); + queue.append_tracks(&[track]); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) + }; + } + } else { + let tracks = self.flatten_node(&uuid).await; + let mut queue = self.queue.lock().unwrap(); + queue.append_tracks(&tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + queue_update_tx.send(update).unwrap(); + } } } - PlaybackMessage::QueueNode { uuid } => { - let tracks = self.flatten_node(&uuid).await; + + //TODO handle deletion of current track + PlaybackMessage::Remove { positions } => { let mut queue = self.queue.lock().unwrap(); - queue.queue_tracks(&tracks); - let queue_update_tx = self.queue_update_tx.clone(); - if let Err(err) = queue_update_tx.send(queue.clone()) { + queue.remove_tracks(&positions); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + queue_update_tx.send(update).unwrap(); + } + + PlaybackMessage::Insert { position, uuids } => { + let mut all_tracks = Vec::new(); + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).await { + all_tracks.push(track); + } + } else { + let tracks = self.flatten_node(&uuid).await; + all_tracks.extend(tracks); + } + } + let mut queue = self.queue.lock().unwrap(); + queue.insert_tracks(position, &all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + queue_update_tx.send(update).unwrap(); + } + + PlaybackMessage::SetCurrent { + position: queue_position, + } => { + let track = { + let mut queue = self.queue.lock().unwrap(); + queue.set_current_position(queue_position); + queue.current() + }; + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::QueueTrack(QueueTrack { + queue_position, + track: track.clone(), + }); + + if let Err(err) = queue_update_tx.send(update) { println!("{:?}", err) }; - // queue_update_tx.send(queue.clone()).unwrap(); + if let Some(track) = track { + self.play(track).await; + } } PlaybackMessage::GetQueue { result_tx } => { let queue = self.queue.lock().unwrap(); result_tx.send(queue.clone()).unwrap(); } - PlaybackMessage::AppendTrack { uuid } => { - if let Ok(track) = self.get_track(&uuid).await { - let mut queue = self.queue.lock().unwrap(); - queue.append_tracks(&[track]); - let queue_update_tx = self.queue_update_tx.clone(); - if let Err(err) = queue_update_tx.send(queue.clone()) { - println!("{:?}", err) - }; - // queue_update_tx.send(queue.clone()).unwrap(); - if let Err(err) = queue_update_tx.send(queue.clone()) { - println!("{:?}", err) - }; + + PlaybackMessage::GetQueueTrack { result_tx } => { + let current = self.get_queue_track().await; + result_tx.send(current).unwrap(); + } + + PlaybackMessage::TogglePlay => { + let mut state = self.state.lock().unwrap(); + if *state == GstPlaystate::Playing { + self.play.pause(); + } else { + self.play.play(); } } - PlaybackMessage::AppendNode { uuid } => { - let tracks = self.flatten_node(&uuid).await; - let mut queue = self.queue.lock().unwrap(); - queue.append_tracks(&tracks); - let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + + PlaybackMessage::Stop => { + self.play.stop(); } - PlaybackMessage::ClearQueue => { - let mut queue = self.queue.lock().unwrap(); - queue.replace_with_tracks(&vec![]); - self.stop_track(); - let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + PlaybackMessage::ChangeVolume { delta } => { + let volume = self.play.volume(); + self.play.set_volume(volume + delta as f64); } - //TODO handle deletion of current track - PlaybackMessage::RemoveTracks { positions } => { - let mut queue = self.queue.lock().unwrap(); - queue.remove_tracks(&positions); - let queue_update_tx = self.queue_update_tx.clone(); - queue_update_tx.send(queue.clone()).unwrap(); + PlaybackMessage::ToggleMute => { + let muted = self.play.is_muted(); + self.play.set_mute(!muted); } - PlaybackMessage::SetCurrent { position } => { - let result = { + PlaybackMessage::ToggleShuffle => { + todo!() + } + + PlaybackMessage::Next => { + let (result, stop, pos) = { let mut queue = self.queue.lock().unwrap(); - queue.set_current(position); - let queue_update_tx = self.queue_update_tx.clone(); - // queue_update_tx.send(queue.clone()).unwrap(); - if let Err(err) = queue_update_tx.send(queue.clone()) { - println!("{:?}", err) - }; - queue.current() + let position = queue.current_position + 1; + let stop = !queue.set_current_position(position); + (queue.current(), stop, position) + }; + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::QueueTrack(QueueTrack { + queue_position: pos, + track: result.clone(), + }); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) }; if let Some(track) = result { self.play(track).await; } + if stop { + self.stop_track() + } } - PlaybackMessage::GetCurrent { result_tx } => { - let current = self.get_active_track().await; - result_tx.send(current).unwrap(); - } - PlaybackMessage::Next => { - let (result, stop) = { + PlaybackMessage::Prev => { + let (result, stop, pos) = { let mut queue = self.queue.lock().unwrap(); - let position = queue.current + 1; - let stop = !queue.set_current(position); - let queue_update_tx = self.queue_update_tx.clone(); - // queue_update_tx.send(queue.clone()).unwrap(); - if let Err(err) = queue_update_tx.send(queue.clone()) { - println!("{:?}", err) - }; - (queue.current(), stop) + let position = queue.current_position - 1; + let stop = !queue.set_current_position(position); + (queue.current(), stop, position) + }; + let update = StreamUpdate::QueueTrack(QueueTrack { + queue_position: pos, + track: result.clone(), + }); + let queue_update_tx = self.update_tx.clone(); + if let Err(err) = queue_update_tx.send(update) { + println!("{:?}", err) }; if let Some(track) = result { @@ -420,26 +494,19 @@ impl Playback { } } - PlaybackMessage::PlayPause => { - let mut state = self.state.lock().unwrap(); - if *state == PlayState::Playing { - self.play.pause(); - // *state = PlayState::Paused - } else { - self.play.play(); - // *state = PlayState::Playing - } - } - PlaybackMessage::Stop => { - self.play.stop(); - // *self.state.lock().unwrap() = PlayState::Stopped; - } PlaybackMessage::StateChanged { state } => { *self.state.lock().unwrap() = state.clone(); - let active_track_tx = self.active_track_tx.clone(); - let active_track = self.get_active_track().await; - // active_track_tx.send(active_track).unwrap(); - if let Err(err) = active_track_tx.send(active_track) { + let active_track_tx = self.update_tx.clone(); + let active_track = self.get_queue_track().await; + let play_state = match state { + GstPlaystate::Playing => PlayState::Playing, + GstPlaystate::Paused => PlayState::Paused, + GstPlaystate::Stopped => PlayState::Stopped, + GstPlaystate::Buffering => PlayState::Loading, + _ => PlayState::Unspecified, + }; + let update = StreamUpdate::PlayState(play_state as i32); + if let Err(err) = active_track_tx.send(update) { println!("{:?}", err) }; } @@ -491,26 +558,27 @@ impl Playback { .await .map_err(|_| ProviderError::InternalError)? } - async fn get_active_track(&self) -> ActiveTrack { + async fn get_queue_track(&self) -> QueueTrack { + let queue_position: u32; let result = { let mut queue = self.queue.lock().unwrap(); + queue_position = queue.current_position; queue.current() }; let completion = 0; let gst_play_state = self.state.lock().unwrap(); let play_state = match *gst_play_state { - PlayState::Stopped => crabidy_core::proto::crabidy::TrackPlayState::Stopped, - PlayState::Buffering => crabidy_core::proto::crabidy::TrackPlayState::Loading, - PlayState::Playing => crabidy_core::proto::crabidy::TrackPlayState::Playing, - PlayState::Paused => crabidy_core::proto::crabidy::TrackPlayState::Paused, - _ => crabidy_core::proto::crabidy::TrackPlayState::Unspecified, + GstPlaystate::Stopped => PlayState::Stopped, + GstPlaystate::Buffering => PlayState::Loading, + GstPlaystate::Playing => PlayState::Playing, + GstPlaystate::Paused => PlayState::Paused, + _ => PlayState::Unspecified, }; let play_state = play_state as i32; - ActiveTrack { + QueueTrack { + queue_position, track: result, - completion, - play_state, } } @@ -522,7 +590,7 @@ impl Playback { }; { let mut state_guard = self.state.lock().unwrap(); - *state_guard = PlayState::Playing; + *state_guard = GstPlaystate::Playing; } self.play.stop(); self.play.set_uri(Some(&urls[0])); @@ -532,18 +600,18 @@ impl Playback { fn stop_track(&self) { { let mut state_guard = self.state.lock().unwrap(); - *state_guard = PlayState::Stopped; + *state_guard = GstPlaystate::Stopped; } self.play.stop(); } fn playpause(&self) { let mut state_guard = self.state.lock().unwrap(); - if *state_guard == PlayState::Playing { - *state_guard = PlayState::Paused; + if *state_guard == GstPlaystate::Playing { + *state_guard = GstPlaystate::Paused; self.play.pause(); } else { - *state_guard = PlayState::Playing; + *state_guard = GstPlaystate::Playing; self.play.play() } } @@ -551,22 +619,19 @@ impl Playback { #[derive(Debug)] struct RpcService { - queue_update_tx: tokio::sync::broadcast::Sender, - active_track_tx: tokio::sync::broadcast::Sender, + update_tx: tokio::sync::broadcast::Sender, playback_tx: flume::Sender, provider_tx: flume::Sender, } impl RpcService { fn new( - queue_update_rx: tokio::sync::broadcast::Sender, - active_track_rx: tokio::sync::broadcast::Sender, + update_rx: tokio::sync::broadcast::Sender, playback_tx: flume::Sender, provider_tx: flume::Sender, ) -> Self { Self { - queue_update_tx: queue_update_rx, - active_track_tx: active_track_rx, + update_tx: update_rx, playback_tx, provider_tx, } @@ -575,11 +640,12 @@ impl RpcService { #[tonic::async_trait] impl CrabidyService for RpcService { - type GetQueueUpdatesStream = - Pin> + Send>>; + type GetUpdateStreamStream = + Pin> + Send>>; - type GetTrackUpdatesStream = - Pin> + Send>>; + async fn init(&self, request: Request) -> Result, Status> { + todo!() + } async fn get_library_node( &self, @@ -604,147 +670,93 @@ impl CrabidyService for RpcService { Err(err) => Err(Status::internal(err.to_string())), } } - async fn get_track( - &self, - request: Request, - ) -> Result, Status> { - let provider_tx = self.provider_tx.clone(); - let (result_tx, result_rx) = flume::bounded(1); - provider_tx - .send_async(ProviderMessage::GetTrack { - uuid: request.into_inner().uuid, - result_tx, - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let result = result_rx - .recv_async() - .await - .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; - match result { - Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })), - Err(err) => Err(Status::internal(err.to_string())), - } - } - - async fn queue_track( + async fn queue( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx - .send_async(PlaybackMessage::QueueTrack { - uuid: req.uuid.clone(), + .send_async(PlaybackMessage::Queue { + uuids: req.uuid.clone(), }) .await .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = QueueTrackResponse {}; + let reply = QueueResponse {}; Ok(Response::new(reply)) } - async fn queue_library_node( + async fn replace( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx - .send_async(PlaybackMessage::QueueNode { - uuid: req.uuid.clone(), + .send_async(PlaybackMessage::Replace { + uuids: req.uuid.clone(), }) .await .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = QueueLibraryNodeResponse {}; + let reply = ReplaceResponse {}; Ok(Response::new(reply)) } - async fn replace_with_track( + async fn append( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx - .send_async(PlaybackMessage::ReplaceWithTrack { - uuid: req.uuid.clone(), + .send_async(PlaybackMessage::Append { + uuids: req.uuid.clone(), }) .await .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = ReplaceWithTrackResponse {}; + let reply = AppendResponse {}; Ok(Response::new(reply)) } - async fn replace_with_node( + async fn remove( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx - .send_async(PlaybackMessage::ReplaceWithNode { - uuid: req.uuid.clone(), - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = ReplaceWithNodeResponse {}; - Ok(Response::new(reply)) - } - - async fn append_track( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::AppendTrack { - uuid: req.uuid.clone(), - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = AppendTrackResponse {}; - Ok(Response::new(reply)) - } - - async fn append_node( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::AppendNode { - uuid: req.uuid.clone(), - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = AppendNodeResponse {}; - Ok(Response::new(reply)) - } - - async fn remove_tracks( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::RemoveTracks { + .send_async(PlaybackMessage::Remove { positions: req.positions, }) .await .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = RemoveTracksResponse {}; + let reply = RemoveResponse {}; Ok(Response::new(reply)) } - async fn set_current_track( + async fn insert( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::Insert { + position: req.position, + uuids: req.uuid, + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = InsertResponse {}; + Ok(Response::new(reply)) + } + + async fn set_current( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx @@ -753,22 +765,22 @@ impl CrabidyService for RpcService { }) .await .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = SetCurrentTrackResponse {}; + let reply = SetCurrentResponse {}; Ok(Response::new(reply)) } - async fn get_queue_updates( + async fn get_update_stream( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let update_rx = self.queue_update_tx.subscribe(); + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let update_rx = self.update_tx.subscribe(); let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx); let output_stream = update_stream .into_stream() - .map(|queue_result| match queue_result { - Ok(queue) => Ok(GetQueueUpdatesResponse { - queue_update_result: Some(QueueUpdateResult::Full(queue)), + .map(|update_result| match update_result { + Ok(update) => Ok(GetUpdateStreamResponse { + update: Some(update), }), Err(_) => Err(tonic::Status::new( tonic::Code::Unknown, @@ -778,15 +790,6 @@ impl CrabidyService for RpcService { Ok(Response::new(Box::pin(output_stream))) } - - async fn get_queue( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let reply = GetQueueResponse { queue: None }; - Ok(Response::new(reply)) - } - async fn save_queue( &self, request: tonic::Request, @@ -802,50 +805,98 @@ impl CrabidyService for RpcService { ) -> std::result::Result, tonic::Status> { let playback_tx = self.playback_tx.clone(); playback_tx - .send_async(PlaybackMessage::PlayPause) + .send_async(PlaybackMessage::TogglePlay) .await .unwrap(); let reply = TogglePlayResponse {}; Ok(Response::new(reply)) } + async fn toggle_shuffle( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + playback_tx + .send_async(PlaybackMessage::ToggleShuffle) + .await + .unwrap(); + let reply = ToggleShuffleResponse {}; + Ok(Response::new(reply)) + } + async fn stop( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + playback_tx.send_async(PlaybackMessage::Stop).await.unwrap(); let reply = StopResponse {}; Ok(Response::new(reply)) } - async fn get_active_track( + async fn change_volume( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let reply = GetActiveTrackResponse { - active_track: None, - // track: None, - // play_state: TrackPlayState::Stopped as i32, - // completion: 0, - }; + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let delta = request.into_inner().delta; + let playback_tx = self.playback_tx.clone(); + playback_tx + .send_async(PlaybackMessage::ChangeVolume { delta }) + .await + .unwrap(); + let reply = ChangeVolumeResponse {}; Ok(Response::new(reply)) } - async fn get_track_updates( + async fn toggle_mute( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let update_rx = self.active_track_tx.subscribe(); - let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx); + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + playback_tx + .send_async(PlaybackMessage::ToggleMute) + .await + .unwrap(); + let reply = ToggleMuteResponse {}; + Ok(Response::new(reply)) + } - let output_stream = update_stream.map(|active_track_result| match active_track_result { - Ok(active_track) => Ok(GetTrackUpdatesResponse { - active_track: Some(active_track), - }), - Err(_) => Err(tonic::Status::new( - tonic::Code::Unknown, - "Internal channel error", - )), - }); - Ok(Response::new(Box::pin(output_stream))) + async fn next( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + playback_tx.send_async(PlaybackMessage::Next).await.unwrap(); + let reply = NextResponse {}; + Ok(Response::new(reply)) + } + + async fn prev( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + playback_tx.send_async(PlaybackMessage::Prev).await.unwrap(); + let reply = PrevResponse {}; + Ok(Response::new(reply)) + } + + async fn restart_track( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + playback_tx.send_async(PlaybackMessage::Prev).await.unwrap(); + let reply = RestartTrackResponse {}; + Ok(Response::new(reply)) } } + +fn is_track(uuid: &str) -> bool { + uuid.starts_with("track:") +} + +fn is_node(uuid: &str) -> bool { + uuid.starts_with("node:") +} diff --git a/tidaldy/src/lib.rs b/tidaldy/src/lib.rs index cd47c71..f990982 100644 --- a/tidaldy/src/lib.rs +++ b/tidaldy/src/lib.rs @@ -44,7 +44,8 @@ impl crabidy_core::ProviderClient for Client { &self, track_uuid: &str, ) -> Result, crabidy_core::ProviderError> { - let Ok(playback) = self.get_track_playback(track_uuid).await else { + let (_, track_uuid, _) = split_uuid(track_uuid); + let Ok(playback) = self.get_track_playback(&track_uuid).await else { return Err(crabidy_core::ProviderError::FetchError) }; let Ok(manifest) = playback.get_manifest() else { @@ -66,14 +67,13 @@ impl crabidy_core::ProviderClient for Client { fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode { let global_root = crabidy_core::proto::crabidy::LibraryNode::new(); let children = vec![crabidy_core::proto::crabidy::LibraryNodeChild::new( - "userplaylists".to_string(), + "node:userplaylists".to_string(), "playlists".to_string(), )]; crabidy_core::proto::crabidy::LibraryNode { - uuid: "tidal".to_string(), + uuid: "node:tidal".to_string(), title: "tidal".to_string(), parent: Some(format!("{}", global_root.uuid)), - state: crabidy_core::proto::crabidy::LibraryNodeState::Done as i32, tracks: Vec::new(), children, is_queable: false, @@ -87,14 +87,13 @@ impl crabidy_core::ProviderClient for Client { let Some(user_id) = self.settings.login.user_id.clone() else { return Err(crabidy_core::ProviderError::UnknownUser) }; - let (module, uuid) = split_uuid(uuid); + let (_kind, module, uuid) = split_uuid(uuid); let node = match module.as_str() { "userplaylists" => { let mut node = crabidy_core::proto::crabidy::LibraryNode { - uuid: "userplaylists".to_string(), + uuid: "node:userplaylists".to_string(), title: "playlists".to_string(), - parent: Some("tidal".to_string()), - state: crabidy_core::proto::crabidy::LibraryNodeState::Unspecified as i32, + parent: Some("node:tidal".to_string()), tracks: Vec::new(), children: Vec::new(), is_queable: false, @@ -104,7 +103,7 @@ impl crabidy_core::ProviderClient for Client { .await? { let child = crabidy_core::proto::crabidy::LibraryNodeChild::new( - format!("playlist:{}", playlist.playlist.uuid), + format!("node:playlist:{}", playlist.playlist.uuid), playlist.playlist.title, ); node.children.push(child); @@ -121,7 +120,7 @@ impl crabidy_core::ProviderClient for Client { .map(|t| t.into()) .collect(); node.tracks = tracks; - node.parent = Some("userplaylists".to_string()); + node.parent = Some("node:userplaylists".to_string()); node } _ => return Err(crabidy_core::ProviderError::MalformedUuid), @@ -130,11 +129,12 @@ impl crabidy_core::ProviderClient for Client { } } -fn split_uuid(uuid: &str) -> (String, String) { - let mut split = uuid.splitn(2, ':'); +fn split_uuid(uuid: &str) -> (String, String, String) { + let mut split = uuid.splitn(3, ':'); ( split.next().unwrap_or("").to_string(), split.next().unwrap_or("").to_string(), + split.next().unwrap_or("").to_string(), ) } @@ -369,6 +369,7 @@ impl Client { } pub async fn get_track(&self, track_id: &str) -> Result { + let (_, track_id, _) = split_uuid(track_id); self.make_request(&format!("tracks/{}", track_id), None) .await } diff --git a/tidaldy/src/models.rs b/tidaldy/src/models.rs index 69accba..8dc3f64 100644 --- a/tidaldy/src/models.rs +++ b/tidaldy/src/models.rs @@ -158,9 +158,10 @@ pub struct Track { impl From for crabidy_core::proto::crabidy::Track { fn from(track: Track) -> Self { Self { - uuid: track.id.to_string(), + uuid: format!("track:{}", track.id), title: track.title, artist: track.artist.name, + album: Some(track.album.into()), duration: Some(track.duration as u32), } } @@ -169,9 +170,10 @@ impl From for crabidy_core::proto::crabidy::Track { impl From<&Track> for crabidy_core::proto::crabidy::Track { fn from(track: &Track) -> Self { Self { - uuid: track.id.to_string(), + uuid: format!("track:{}", track.id), title: track.title.clone(), artist: track.artist.name.clone(), + album: Some(track.album.clone().into()), duration: Some(track.duration as u32), } } @@ -208,6 +210,15 @@ pub struct Album { pub release_date: Option, } +impl From for crabidy_core::proto::crabidy::Album { + fn from(album: Album) -> Self { + Self { + title: album.title, + release_date: album.release_date, + } + } +} + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Mixes { @@ -306,10 +317,9 @@ impl From for crabidy_core::proto::crabidy::LibraryNode { fn from(a: Playlist) -> Self { crabidy_core::proto::crabidy::LibraryNode { title: a.title, - uuid: format!("playlist:{}", a.uuid), + uuid: format!("node:playlist:{}", a.uuid), tracks: Vec::new(), parent: None, - state: crabidy_core::proto::crabidy::LibraryNodeState::Done as i32, children: Vec::new(), is_queable: true, }