From 035fdf1a4b0ed0864266afa215c616455f487a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Fri, 26 May 2023 12:06:35 +0200 Subject: [PATCH] Add draft of working tidal playback Streaming yet to come. --- crabidy-core/crabidy/v1/crabidy.proto | 12 +- crabidy-core/src/lib.rs | 71 ++- crabidy-server/src/main.rs | 668 +++++++++++++++++++++++--- tidaldy/src/lib.rs | 16 +- 4 files changed, 685 insertions(+), 82 deletions(-) diff --git a/crabidy-core/crabidy/v1/crabidy.proto b/crabidy-core/crabidy/v1/crabidy.proto index d5455ce..2563135 100644 --- a/crabidy-core/crabidy/v1/crabidy.proto +++ b/crabidy-core/crabidy/v1/crabidy.proto @@ -175,14 +175,16 @@ message GetTrackUpdatesRequest { repeated string type_blacklist = 3; } -message GetActiveTrackResponse { +message ActiveTrack { optional Track track = 1; TrackPlayState play_state = 2; uint32 completion = 3; } -message GetTrackUpdatesResponse { - optional Track track = 1; - TrackPlayState play_state = 2; - uint32 completion = 3; +message GetActiveTrackResponse { + ActiveTrack active_track = 1; +} + +message GetTrackUpdatesResponse { + ActiveTrack active_track = 1; } diff --git a/crabidy-core/src/lib.rs b/crabidy-core/src/lib.rs index 3708a61..13addd5 100644 --- a/crabidy-core/src/lib.rs +++ b/crabidy-core/src/lib.rs @@ -1,7 +1,7 @@ pub mod proto; use async_trait::async_trait; -use proto::crabidy::{LibraryNode, LibraryNodeState}; +use proto::crabidy::{LibraryNode, LibraryNodeState, Queue, Track}; #[async_trait] pub trait ProviderClient: std::fmt::Debug + Send + Sync { @@ -10,8 +10,9 @@ pub trait ProviderClient: std::fmt::Debug + Send + Sync { Self: Sized; fn settings(&self) -> String; async fn get_urls_for_track(&self, track_uuid: &str) -> Result, ProviderError>; - fn get_library_root(&self) -> LibraryNode; - async fn get_library_node(&self, list_uuid: &str) -> Result; + async fn get_metadata_for_track(&self, track_uuid: &str) -> Result; + fn get_lib_root(&self) -> LibraryNode; + async fn get_lib_node(&self, list_uuid: &str) -> Result; } #[derive(Clone, Debug, Hash)] @@ -20,9 +21,16 @@ pub enum ProviderError { CouldNotLogin, FetchError, MalformedUuid, + InternalError, Other, } +impl std::fmt::Display for ProviderError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + impl LibraryNode { pub fn new() -> Self { Self { @@ -36,3 +44,60 @@ impl LibraryNode { } } } + +pub enum QueueError { + NotQueable, +} + +impl Queue { + pub fn current(&mut self) -> Option { + if self.current < self.tracks.len() as u32 { + Some(self.tracks[self.current as usize].clone()) + } else { + None + } + } + + pub fn next(&mut self) -> Option { + if self.current < self.tracks.len() as u32 { + self.current += 1; + Some(self.tracks[self.current as usize].clone()) + } else { + None + } + } + + pub fn set_current(&mut self, current: u32) -> bool { + if current < self.tracks.len() as u32 { + self.current = current; + true + } else { + false + } + } + + pub fn replace_with_tracks(&mut self, tracks: &[Track]) { + self.current = 0; + self.tracks = tracks.to_vec(); + } + + pub fn append_tracks(&mut self, tracks: &[Track]) { + self.tracks.extend(tracks.iter().cloned()); + } + + pub fn queue_tracks(&mut self, tracks: &[Track]) { + let tail: Vec = self + .tracks + .splice((self.current as usize).., tracks.to_vec()) + .collect(); + self.tracks.extend(tail); + } + + pub fn remove_tracks(&mut self, positions: &[u32]) { + for pos in positions { + if *pos < self.tracks.len() as u32 { + self.tracks.remove(*pos as usize); + } + } + } +} diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index a58c021..7c212fe 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -1,33 +1,79 @@ -use anyhow::Error; use async_trait::async_trait; use crabidy_core::proto::crabidy::{ crabidy_service_server::{CrabidyService, CrabidyServiceServer}, get_queue_updates_response::QueueUpdateResult, - AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse, + ActiveTrack, AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse, GetActiveTrackRequest, GetActiveTrackResponse, GetLibraryNodeRequest, GetLibraryNodeResponse, GetQueueRequest, GetQueueResponse, GetQueueUpdatesRequest, GetQueueUpdatesResponse, GetTrackRequest, GetTrackResponse, GetTrackUpdatesRequest, GetTrackUpdatesResponse, - LibraryNode, LibraryNodeState, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse, - QueuePositionChange, QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest, - RemoveTracksResponse, ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest, + LibraryNode, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse, QueuePositionChange, + QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest, RemoveTracksResponse, + ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest, ReplaceWithTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentTrackRequest, SetCurrentTrackResponse, StopRequest, StopResponse, TogglePlayRequest, TogglePlayResponse, - TrackPlayState, + Track, }; use crabidy_core::{ProviderClient, ProviderError}; use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer}; -// use once_cell::sync::OnceCell; -use std::{collections::HashMap, fs, pin::Pin, sync::RwLock}; -use tonic::{codegen::futures_core::Stream, transport::Server, Request, Response, Result, Status}; -// static CHANNEL: OnceCell> = OnceCell::new(); -// static ORCHESTRATOR_CHANNEL: OnceCell> = OnceCell::new(); +use std::{ + fs, + sync::{Arc, Mutex}, +}; +use tonic::{transport::Server, Request, Response, Result, Status}; #[tokio::main] async fn main() -> Result<(), Box> { - let orchestrator = ClientOrchestrator::init("").await.unwrap(); - let tx = orchestrator.run(); - let crabidy_service = AppState::new(tx); + let (queue_update_tx, queue_update_rx) = flume::bounded(10); + let (active_track_tx, active_track_rx) = flume::bounded(1000); + let orchestrator = ProviderOrchestrator::init("").await.unwrap(); + + let playback = Playback::new( + active_track_tx.clone(), + queue_update_tx.clone(), + orchestrator.provider_tx.clone(), + ); + + let bus = playback.play.message_bus(); + let playback_tx = playback.playback_tx.clone(); + + bus.set_sync_handler(move |_, msg| { + match PlayMessage::parse(msg) { + Ok(PlayMessage::EndOfStream) => { + println!("End of stream"); + playback_tx.send(PlaybackMessage::Next).unwrap(); + println!("Next messages was sent"); + } + Ok(PlayMessage::StateChanged { state }) => { + println!("State changed: {:?}", state); + playback_tx + .send(PlaybackMessage::StateChanged { state }) + .unwrap(); + } + Ok(PlayMessage::PositionUpdated { position }) => { + // println!("Position updated: {:?}", position); + } + Ok(PlayMessage::Buffering { percent }) => { + // println!("Position updated: {:?}", position); + } + Ok(PlayMessage::VolumeChanged { volume }) => {} + Ok(PlayMessage::MuteChanged { muted }) => { + println!("Mute changed to muted: {:?}", muted); + } + + Ok(PlayMessage::MediaInfoUpdated { info }) => {} + _ => println!("Unknown message: {:?}", msg), + } + gstreamer::BusSyncReply::Drop + }); + let crabidy_service = RpcService::new( + queue_update_rx, + active_track_rx, + playback.playback_tx.clone(), + orchestrator.provider_tx.clone(), + ); + orchestrator.run(); + playback.run(); let addr = "[::1]:50051".parse()?; Server::builder() @@ -38,59 +84,99 @@ async fn main() -> Result<(), Box> { Ok(()) } -enum OrchestratorMessage { +#[derive(Debug)] +enum ProviderMessage { GetNode { uuid: String, - callback: flume::Sender, + result_tx: flume::Sender>, }, - GetTracksPlaybackUrls { + GetTrack { uuid: String, - callback: flume::Sender>, + result_tx: flume::Sender>, + }, + GetTrackUrls { + uuid: String, + result_tx: flume::Sender, ProviderError>>, + }, + FlattenNode { + uuid: String, + result_tx: flume::Sender>, }, } #[derive(Debug)] -struct ClientOrchestrator { - rx: flume::Receiver, - tx: flume::Sender, - tidal_client: tidaldy::Client, +struct ProviderOrchestrator { + provider_tx: flume::Sender, + provider_rx: flume::Receiver, + // known_tracks: RwLock>, + // known_nodes: RwLock>, + tidal_client: Arc, } -impl ClientOrchestrator { - fn run(self) -> flume::Sender { - let tx = self.tx.clone(); +impl ProviderOrchestrator { + fn run(self) { tokio::spawn(async move { - while let Ok(msg) = self.rx.recv_async().await { + while let Ok(msg) = self.provider_rx.recv_async().await { + println!("Orchestrator {:?}", msg); match msg { - OrchestratorMessage::GetNode { uuid, callback } => { - let node = match uuid.as_str() { - "/" => self.get_library_root(), - _ => self.get_library_node(&uuid).await.unwrap(), - }; - callback.send_async(node).await; + ProviderMessage::GetNode { uuid, result_tx } => { + let result = self.get_lib_node(&uuid).await; + result_tx.send(result).unwrap(); } - OrchestratorMessage::GetTracksPlaybackUrls { uuid, callback } => { - let urls = self.get_urls_for_track(&uuid).await.unwrap(); - callback.send_async(urls).await; + ProviderMessage::GetTrack { uuid, result_tx } => { + let result = self.get_metadata_for_track(&uuid).await; + result_tx.send(result).unwrap(); + } + ProviderMessage::GetTrackUrls { uuid, result_tx } => { + let result = self.get_urls_for_track(&uuid).await; + result_tx.send(result).unwrap(); + } + ProviderMessage::FlattenNode { uuid, result_tx } => { + let result = self.flatten_node(&uuid).await; + result_tx.send(result).unwrap(); } } } }); - tx + } + async fn flatten_node(&self, node_uuid: &str) -> Vec { + let mut tracks = Vec::with_capacity(1000); + let mut nodes_to_go = Vec::with_capacity(100); + nodes_to_go.push(node_uuid.to_string()); + while let Some(node_uuid) = nodes_to_go.pop() { + let Ok(node) = self.get_lib_node(&node_uuid).await else { + continue + }; + tracks.extend(node.tracks); + nodes_to_go.extend(node.children); + } + tracks } } #[async_trait] -impl ProviderClient for ClientOrchestrator { +impl ProviderClient for ProviderOrchestrator { async fn init(_s: &str) -> Result { + gstreamer::init().unwrap(); + let play = Play::new(None::); + let state = Mutex::new(PlayState::Stopped); + let queue = Mutex::new(Queue { + timestamp: 0, + current: 0, + tracks: Vec::new(), + }); let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned()); - let tidal_client = tidaldy::Client::init(&raw_toml_settings).await.unwrap(); + let tidal_client = Arc::new(tidaldy::Client::init(&raw_toml_settings).await.unwrap()); let new_toml_config = tidal_client.settings(); fs::write("/tmp/tidaldy.toml", new_toml_config).unwrap(); - let (tx, rx) = flume::unbounded(); + // let known_tracks = HashMap::new(); + // let known_nodes = HashMap::new(); + let (provider_tx, provider_rx) = flume::bounded(100); Ok(Self { - rx, - tx, + provider_rx, + provider_tx, + // known_tracks, + // known_nodes, tidal_client, }) } @@ -100,36 +186,348 @@ impl ProviderClient for ClientOrchestrator { async fn get_urls_for_track(&self, track_uuid: &str) -> Result, ProviderError> { self.tidal_client.get_urls_for_track(track_uuid).await } - fn get_library_root(&self) -> LibraryNode { + async fn get_metadata_for_track(&self, track_uuid: &str) -> Result { + self.tidal_client.get_metadata_for_track(track_uuid).await + } + fn get_lib_root(&self) -> LibraryNode { let mut root_node = LibraryNode::new(); root_node.children.push("tidal".to_owned()); root_node } - async fn get_library_node(&self, uuid: &str) -> Result { + async fn get_lib_node(&self, uuid: &str) -> Result { if uuid == "tidal" { - return Ok(self.tidal_client.get_library_root()); + return Ok(self.tidal_client.get_lib_root()); } - self.tidal_client.get_library_node(uuid).await + self.tidal_client.get_lib_node(uuid).await } } #[derive(Debug)] -struct AppState { - known_nodes: RwLock>, - orchestrator_tx: flume::Sender, +enum PlaybackMessage { + ReplaceWithTrack { + uuid: String, + }, + ReplaceWithNode { + uuid: String, + }, + QueueTrack { + uuid: String, + }, + QueueNode { + uuid: String, + }, + ClearQueue, + GetQueue { + result_tx: flume::Sender, + }, + AppendTrack { + uuid: String, + }, + AppendNode { + uuid: String, + }, + RemoveTracks { + positions: Vec, + }, + SetCurrent { + position: u32, + }, + GetCurrent { + result_tx: flume::Sender, + }, + Next, + PlayPause, + Stop, + StateChanged { + state: PlayState, + }, } -impl AppState { - fn new(orchestrator_tx: flume::Sender) -> Self { +#[derive(Debug)] +struct Playback { + active_track_tx: flume::Sender, + queue_update_tx: flume::Sender, + provider_tx: flume::Sender, + playback_tx: flume::Sender, + playback_rx: flume::Receiver, + queue: Mutex, + state: Mutex, + play: Play, + creation: std::time::Instant, +} + +impl Playback { + fn new( + active_track_tx: flume::Sender, + queue_update_tx: flume::Sender, + provider_tx: flume::Sender, + ) -> Self { + let (playback_tx, playback_rx) = flume::bounded(10); + let queue = Mutex::new(Queue { + timestamp: 0, + current: 0, + tracks: Vec::new(), + }); + let state = Mutex::new(PlayState::Stopped); + let play = Play::new(None::); + let creation = std::time::Instant::now(); Self { - known_nodes: RwLock::new(HashMap::new()), - orchestrator_tx, + active_track_tx, + queue_update_tx, + provider_tx, + playback_tx, + playback_rx, + queue, + state, + play, + creation, + } + } + fn run(self) { + tokio::spawn(async move { + while let Ok(message) = self.playback_rx.recv_async().await { + println!("Playback{:?}", message); + match message { + PlaybackMessage::ReplaceWithTrack { uuid } => { + if let Ok(track) = self.get_track(&uuid).await { + { + let mut queue = self.queue.lock().unwrap(); + queue.replace_with_tracks(&[track.clone()]); + } + self.play(track).await; + } + } + + PlaybackMessage::ReplaceWithNode { uuid } => { + let tracks = self.flatten_node(&uuid).await; + { + let mut queue = self.queue.lock().unwrap(); + queue.replace_with_tracks(&tracks); + } + if tracks.len() > 0 { + self.play(tracks[0].clone()).await; + } + } + + PlaybackMessage::QueueTrack { uuid } => { + if let Ok(track) = self.get_track(&uuid).await { + let mut queue = self.queue.lock().unwrap(); + queue.queue_tracks(&[track]); + } + } + PlaybackMessage::QueueNode { uuid } => { + let tracks = self.flatten_node(&uuid).await; + let mut queue = self.queue.lock().unwrap(); + queue.queue_tracks(&tracks); + } + + PlaybackMessage::GetQueue { result_tx } => { + let queue = self.queue.lock().unwrap(); + result_tx.send(queue.clone()).unwrap(); + } + PlaybackMessage::AppendTrack { uuid } => { + if let Ok(track) = self.get_track(&uuid).await { + let mut queue = self.queue.lock().unwrap(); + queue.append_tracks(&[track]); + } + } + PlaybackMessage::AppendNode { uuid } => { + let tracks = self.flatten_node(&uuid).await; + let mut queue = self.queue.lock().unwrap(); + queue.append_tracks(&tracks); + } + + PlaybackMessage::ClearQueue => { + let mut queue = self.queue.lock().unwrap(); + queue.replace_with_tracks(&vec![]); + self.stop_track() + } + + //TODO handle deletion of current track + PlaybackMessage::RemoveTracks { positions } => { + let mut queue = self.queue.lock().unwrap(); + queue.remove_tracks(&positions); + } + + PlaybackMessage::SetCurrent { position } => { + let result = { + let mut queue = self.queue.lock().unwrap(); + queue.set_current(position); + queue.current() + }; + + if let Some(track) = result { + self.play(track).await; + } + } + PlaybackMessage::GetCurrent { result_tx } => { + let current = self.get_active_track().await; + result_tx.send(current).unwrap(); + } + PlaybackMessage::Next => { + let (result, stop) = { + let mut queue = self.queue.lock().unwrap(); + let position = queue.current + 1; + let stop = !queue.set_current(position); + (queue.current(), stop) + }; + + if let Some(track) = result { + self.play(track).await; + } + if stop { + self.stop_track() + } + } + + PlaybackMessage::PlayPause => { + let mut state = self.state.lock().unwrap(); + if *state == PlayState::Playing { + self.play.pause(); + *state = PlayState::Paused + } else { + self.play.play(); + *state = PlayState::Playing + } + } + PlaybackMessage::Stop => { + self.play.stop(); + *self.state.lock().unwrap() = PlayState::Stopped; + } + PlaybackMessage::StateChanged { state } => *self.state.lock().unwrap() = state, + } + } + }); + } + async fn flatten_node(&self, uuid: &str) -> Vec { + let tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let Ok(_) = tx.send_async(ProviderMessage::FlattenNode { + uuid: uuid.to_string(), + result_tx, + }).await else { + return Vec::new(); + }; + let Ok(tracks) = result_rx + .recv_async() + .await else { + return Vec::new(); + }; + tracks + } + async fn get_track(&self, uuid: &str) -> Result { + let tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + tx.send_async(ProviderMessage::GetTrack { + uuid: uuid.to_string(), + result_tx, + }) + .await + .map_err(|_| ProviderError::InternalError)?; + result_rx + .recv_async() + .await + .map_err(|_| ProviderError::InternalError)? + } + async fn get_urls_for_track(&self, uuid: &str) -> Result, ProviderError> { + let tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + tx.send_async(ProviderMessage::GetTrackUrls { + uuid: uuid.to_string(), + result_tx, + }) + .await + .map_err(|_| ProviderError::InternalError)?; + result_rx + .recv_async() + .await + .map_err(|_| ProviderError::InternalError)? + } + async fn get_active_track(&self) -> ActiveTrack { + let result = { + let mut queue = self.queue.lock().unwrap(); + queue.current() + }; + let completion = 0; + let gst_play_state = self.state.lock().unwrap(); + let play_state = match *gst_play_state { + PlayState::Stopped => crabidy_core::proto::crabidy::TrackPlayState::Stopped, + PlayState::Buffering => crabidy_core::proto::crabidy::TrackPlayState::Loading, + PlayState::Playing => crabidy_core::proto::crabidy::TrackPlayState::Playing, + PlayState::Paused => crabidy_core::proto::crabidy::TrackPlayState::Paused, + _ => crabidy_core::proto::crabidy::TrackPlayState::Unspecified, + }; + let play_state = play_state as i32; + + ActiveTrack { + track: result, + completion, + play_state, + } + } + + async fn play(&self, track: Track) { + let Ok(urls) = self.get_urls_for_track(&track.uuid).await else { + return + }; + println!("Playing urls {:?}", urls); + { + let mut state_guard = self.state.lock().unwrap(); + *state_guard = PlayState::Playing; + } + self.play.set_uri(Some(&urls[0])); + self.play.play(); + } + + fn stop_track(&self) { + println!("Stopping"); + { + let mut state_guard = self.state.lock().unwrap(); + *state_guard = PlayState::Stopped; + } + self.play.stop(); + } + + fn playpause(&self) { + let mut state_guard = self.state.lock().unwrap(); + if *state_guard == PlayState::Playing { + println!("Pausing"); + *state_guard = PlayState::Paused; + self.play.pause(); + } else { + println!("Playing"); + *state_guard = PlayState::Playing; + self.play.play() + } + } +} + +#[derive(Debug)] +struct RpcService { + queue_update_rx: flume::Receiver, + active_track_rx: flume::Receiver, + playback_tx: flume::Sender, + provider_tx: flume::Sender, +} + +impl RpcService { + fn new( + queue_update_rx: flume::Receiver, + active_track_rx: flume::Receiver, + playback_tx: flume::Sender, + provider_tx: flume::Sender, + ) -> Self { + Self { + queue_update_rx, + active_track_rx, + playback_tx, + provider_tx, } } } #[tonic::async_trait] -impl CrabidyService for AppState { +impl CrabidyService for RpcService { type GetQueueUpdatesStream = futures::stream::Iter>>; type GetTrackUpdatesStream = @@ -140,87 +538,183 @@ impl CrabidyService for AppState { request: Request, ) -> Result, Status> { println!("Got a library node request: {:?}", request); - let node_uuid = request.into_inner().uuid; - let (tx, rx) = flume::bounded(1); - self.orchestrator_tx - .send_async(OrchestratorMessage::GetNode { - uuid: node_uuid, - callback: tx, + let provider_tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + + provider_tx + .send_async(ProviderMessage::GetNode { + uuid: request.into_inner().uuid, + result_tx, }) .await - .unwrap(); - let node = rx.recv_async().await.unwrap(); - let resp = GetLibraryNodeResponse { node: Some(node) }; - Ok(Response::new(resp)) + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let result = result_rx + .recv_async() + .await + .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; + println!("Got a library node response: {:?}", result); + match result { + Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })), + Err(err) => Err(Status::internal(err.to_string())), + } } async fn get_track( &self, request: Request, ) -> Result, Status> { - println!("Got a track request: {:?}", request); + println!("Got a library track request: {:?}", request); + let provider_tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); - let req = request.into_inner(); - - let reply = GetTrackResponse { track: None }; - Ok(Response::new(reply)) + provider_tx + .send_async(ProviderMessage::GetTrack { + uuid: request.into_inner().uuid, + result_tx, + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let result = result_rx + .recv_async() + .await + .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; + println!("Got a library node response: {:?}", result); + match result { + Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })), + Err(err) => Err(Status::internal(err.to_string())), + } } async fn queue_track( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::QueueTrack { + uuid: req.uuid.clone(), + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = QueueTrackResponse {}; Ok(Response::new(reply)) } + async fn queue_library_node( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::QueueNode { + uuid: req.uuid.clone(), + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = QueueLibraryNodeResponse {}; Ok(Response::new(reply)) } + async fn replace_with_track( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + println!("Got a replace with track request: {:?}", request); + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::ReplaceWithTrack { + uuid: req.uuid.clone(), + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = ReplaceWithTrackResponse {}; Ok(Response::new(reply)) } + async fn replace_with_node( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + println!("Got a replace with node request: {:?}", request); + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::ReplaceWithNode { + uuid: req.uuid.clone(), + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = ReplaceWithNodeResponse {}; Ok(Response::new(reply)) } + async fn append_track( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::AppendTrack { + uuid: req.uuid.clone(), + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = AppendTrackResponse {}; Ok(Response::new(reply)) } + async fn append_node( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::AppendNode { + uuid: req.uuid.clone(), + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = AppendNodeResponse {}; Ok(Response::new(reply)) } + async fn remove_tracks( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::RemoveTracks { + positions: req.positions, + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = RemoveTracksResponse {}; Ok(Response::new(reply)) } + async fn set_current_track( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let playback_tx = self.playback_tx.clone(); + let req = request.into_inner(); + playback_tx + .send_async(PlaybackMessage::SetCurrent { + position: req.position, + }) + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; let reply = SetCurrentTrackResponse {}; Ok(Response::new(reply)) } + async fn get_queue_updates( &self, request: tonic::Request, @@ -242,6 +736,7 @@ impl CrabidyService for AppState { let output_stream = futures::stream::iter(queue_vec.into_iter()); Ok(Response::new(output_stream)) } + async fn get_queue( &self, request: tonic::Request, @@ -249,6 +744,7 @@ impl CrabidyService for AppState { let reply = GetQueueResponse { queue: None }; Ok(Response::new(reply)) } + async fn save_queue( &self, request: tonic::Request, @@ -256,6 +752,7 @@ impl CrabidyService for AppState { let reply = SaveQueueResponse {}; Ok(Response::new(reply)) } + /// Playback async fn toggle_play( &self, @@ -264,6 +761,7 @@ impl CrabidyService for AppState { let reply = TogglePlayResponse {}; Ok(Response::new(reply)) } + async fn stop( &self, request: tonic::Request, @@ -271,17 +769,20 @@ impl CrabidyService for AppState { let reply = StopResponse {}; Ok(Response::new(reply)) } + async fn get_active_track( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let reply = GetActiveTrackResponse { - track: None, - play_state: TrackPlayState::Stopped as i32, - completion: 0, + active_track: None, + // track: None, + // play_state: TrackPlayState::Stopped as i32, + // completion: 0, }; Ok(Response::new(reply)) } + async fn get_track_updates( &self, request: tonic::Request, @@ -292,6 +793,31 @@ impl CrabidyService for AppState { } } +// async fn listen_for_events(app: RpcService, rx: flume::Receiver<()>) { +// tokio::spawn(async move { +// println!("Listening for events"); +// loop { +// println!("Waiting for next message"); +// while let Ok(_) = rx.recv_async().await { +// println!("Received next message"); +// let track_result = { +// let mut queue_guard = app.queue.lock().unwrap(); +// println!("{:?}", queue_guard); +// queue_guard.next() +// }; +// println!("{:?}", track_result); +// if let Some(track) = track_result { +// println!("Playing next track {:?}", track); +// app.play(Some(track)).await; +// } else { +// println!("Stopping at end of playlist"); +// app.stop_track(); +// } +// } +// } +// }); +// } + // #[derive(Debug)] // enum Input { // PlayTrack { diff --git a/tidaldy/src/lib.rs b/tidaldy/src/lib.rs index b577734..fd70123 100644 --- a/tidaldy/src/lib.rs +++ b/tidaldy/src/lib.rs @@ -53,7 +53,17 @@ impl crabidy_core::ProviderClient for Client { Ok(manifest.urls) } - fn get_library_root(&self) -> crabidy_core::proto::crabidy::LibraryNode { + async fn get_metadata_for_track( + &self, + track_uuid: &str, + ) -> Result { + let Ok(track) = self.get_track(track_uuid).await else { + return Err(crabidy_core::ProviderError::FetchError) + }; + Ok(track.into()) + } + + fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode { let global_root = crabidy_core::proto::crabidy::LibraryNode::new(); let children = vec!["userplaylists".to_string()]; crabidy_core::proto::crabidy::LibraryNode { @@ -67,7 +77,7 @@ impl crabidy_core::ProviderClient for Client { } } - async fn get_library_node( + async fn get_lib_node( &self, uuid: &str, ) -> Result { @@ -353,7 +363,7 @@ impl Client { .await } - pub async fn get_track(&self, track_id: String) -> Result { + pub async fn get_track(&self, track_id: &str) -> Result { self.make_request(&format!("tracks/{}", track_id), None) .await }