diff --git a/Cargo.lock b/Cargo.lock index cb372d6..abf4753 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,7 @@ dependencies = [ "futures", "gstreamer", "gstreamer-play", + "log", "once_cell", "serde", "serde_json", @@ -389,6 +390,20 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", + "tracing-appender", + "tracing-log", + "tracing-subscriber", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", ] [[package]] @@ -1264,12 +1279,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" [[package]] name = "lru-cache" @@ -1400,6 +1412,16 @@ dependencies = [ "zbus", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -1500,6 +1522,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking" version = "2.1.0" @@ -2127,6 +2155,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook" version = "0.3.15" @@ -2315,6 +2352,16 @@ dependencies = [ "syn 2.0.16", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tidaldy" version = "0.0.0" @@ -2332,6 +2379,7 @@ dependencies = [ "thiserror", "tokio", "toml 0.7.4", + "tracing", ] [[package]] @@ -2351,8 +2399,10 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" dependencies = [ + "itoa", "serde", "time-core", + "time-macros", ] [[package]] @@ -2361,6 +2411,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +[[package]] +name = "time-macros" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b" +dependencies = [ + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2580,6 +2639,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" +dependencies = [ + "crossbeam-channel", + "time 0.3.21", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.24" @@ -2598,6 +2668,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -2729,6 +2825,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version-compare" version = "0.1.1" diff --git a/crabidy-core/src/lib.rs b/crabidy-core/src/lib.rs index 83162d5..1e6c534 100644 --- a/crabidy-core/src/lib.rs +++ b/crabidy-core/src/lib.rs @@ -56,7 +56,7 @@ pub enum QueueError { } impl Queue { - pub fn current(&self) -> Option { + pub fn current_track(&self) -> Option { if self.current_position < self.tracks.len() as u32 { Some(self.tracks[self.current_position as usize].clone()) } else { @@ -73,6 +73,15 @@ impl Queue { } } + pub fn prev_track(&mut self) -> Option { + if 0 < self.current_position { + self.current_position -= 1; + Some(self.tracks[self.current_position as usize].clone()) + } else { + None + } + } + pub fn set_current_position(&mut self, current_position: u32) -> bool { if current_position < self.tracks.len() as u32 { self.current_position = current_position; @@ -82,9 +91,14 @@ impl Queue { } } - pub fn replace_with_tracks(&mut self, tracks: &[Track]) { + pub fn replace_with_tracks(&mut self, tracks: &[Track]) -> Option { self.current_position = 0; self.tracks = tracks.to_vec(); + if 0 < self.tracks.len() as u32 { + Some(self.tracks[0].clone()) + } else { + None + } } pub fn append_tracks(&mut self, tracks: &[Track]) { @@ -94,23 +108,35 @@ impl Queue { pub fn queue_tracks(&mut self, tracks: &[Track]) { let tail: Vec = self .tracks - .splice((self.current_position as usize).., tracks.to_vec()) + .splice((self.current_position as usize + 1).., tracks.to_vec()) .collect(); self.tracks.extend(tail); } - pub fn remove_tracks(&mut self, positions: &[u32]) { + pub fn remove_tracks(&mut self, positions: &[u32]) -> Option { + let mut play_next = false; for pos in positions { + if pos == &self.current_position { + play_next = true; + } + if pos < &self.current_position { + self.current_position -= 1; + } if *pos < self.tracks.len() as u32 { self.tracks.remove(*pos as usize); } } + if play_next { + self.current_track() + } else { + None + } } pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) { let tail: Vec = self .tracks - .splice((position as usize).., tracks.to_vec()) + .splice((position as usize + 1).., tracks.to_vec()) .collect(); self.tracks.extend(tail); } diff --git a/crabidy-server/Cargo.toml b/crabidy-server/Cargo.toml index ef12777..38b4888 100644 --- a/crabidy-server/Cargo.toml +++ b/crabidy-server/Cargo.toml @@ -21,3 +21,8 @@ async-trait = "0.1.68" futures = "0.3.28" tokio-stream = { version = "0.1.14", features = ["sync"] } dirs = "5.0.1" +tracing = "0.1.37" +tracing-subscriber = "0.3.17" +tracing-appender = "0.2.2" +tracing-log = "0.1.3" +log = "0.4.18" diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index 9a2858c..4972e6b 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -1,64 +1,43 @@ -use async_trait::async_trait; use crabidy_core::proto::crabidy::{ - crabidy_service_server::{CrabidyService, CrabidyServiceServer}, - get_update_stream_response::Update as StreamUpdate, - AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse, - GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse, - InitRequest, InitResponse, InsertRequest, InsertResponse, LibraryNode, LibraryNodeChild, - NextRequest, NextResponse, PlayState, PrevRequest, PrevResponse, Queue, QueueRequest, - QueueResponse, QueueTrack, RemoveRequest, RemoveResponse, ReplaceRequest, ReplaceResponse, - RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, SaveQueueResponse, - SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, ToggleMuteRequest, - ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, ToggleShuffleRequest, - ToggleShuffleResponse, Track, TrackPosition, + crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, Track, }; use crabidy_core::{ProviderClient, ProviderError}; -use futures::TryStreamExt; -use gstreamer_play::{Play, PlayMessage, PlayState as GstPlaystate, PlayVideoRenderer}; -use tokio_stream::StreamExt; +use gstreamer_play::{PlayMessage, PlayState as GstPlaystate}; +use tracing::{debug_span, info, instrument, warn, Span}; +use tracing_subscriber::{filter::Targets, prelude::*}; -use std::{ - fs, - path::PathBuf, - pin::Pin, - sync::{Arc, Mutex}, -}; -use tonic::{transport::Server, Request, Response, Result, Status}; +mod playback; +use playback::Playback; +mod provider; +use provider::ProviderOrchestrator; +mod rpc; +use rpc::RpcService; -fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender) { - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { - match PlayMessage::parse(&msg) { - Ok(PlayMessage::EndOfStream) => { - tx.send(PlaybackMessage::Next).unwrap(); - } - Ok(PlayMessage::StateChanged { state }) => { - tx.send(PlaybackMessage::StateChanged { state }).unwrap(); - } - Ok(PlayMessage::PositionUpdated { position }) => { - let position = position - .and_then(|t| Some(t.mseconds() as u32)) - .unwrap_or(0); - tx.send(PlaybackMessage::PostitionChanged { position }) - .unwrap(); - } - Ok(PlayMessage::Buffering { percent }) => {} - Ok(PlayMessage::VolumeChanged { volume }) => { - let volume = volume as f32; - tx.send(PlaybackMessage::VolumeChanged { volume }).unwrap(); - } - Ok(PlayMessage::MuteChanged { muted }) => { - tx.send(PlaybackMessage::MuteChanged { muted }).unwrap(); - } - - Ok(PlayMessage::MediaInfoUpdated { info }) => {} - _ => println!("Unknown message: {:?}", msg), - } - } -} +use tonic::{transport::Server, Result}; #[tokio::main] async fn main() -> Result<(), Box> { + if let Err(err) = tracing_log::LogTracer::init_with_filter(log::LevelFilter::Debug) { + println!("Failed to initialize log tracer: {}", err); + } + let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stderr()); + + let targets_filter = + Targets::new().with_target("crabidy_server", tracing::level_filters::LevelFilter::DEBUG); + let subscriber = tracing_subscriber::fmt::layer() + .with_writer(non_blocking) + .with_file(true) + .with_line_number(true); + + let registry = tracing_subscriber::registry() + .with(targets_filter) + .with(subscriber); + + tracing::subscriber::set_global_default(registry) + .expect("Setting the default tracing subscriber failed"); + gstreamer::init()?; + info!("gstreamer initialized"); let (update_tx, _) = tokio::sync::broadcast::channel(2048); let orchestrator = ProviderOrchestrator::init("").await.unwrap(); @@ -71,6 +50,7 @@ async fn main() -> Result<(), Box> { std::thread::spawn(|| { poll_play_bus(bus, playback_tx); }); + info!("gstreamer bus handler started"); let crabidy_service = RpcService::new( update_tx, @@ -78,7 +58,9 @@ async fn main() -> Result<(), Box> { orchestrator.provider_tx.clone(), ); orchestrator.run(); + info!("provider orchestrator started"); playback.run(); + info!("playback started"); let addr = "[::1]:50051".parse()?; Server::builder() @@ -89,928 +71,143 @@ async fn main() -> Result<(), Box> { Ok(()) } +#[instrument(skip(bus, tx))] +fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender) { + for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { + let span = debug_span!("play-chan"); + match PlayMessage::parse(&msg) { + Ok(PlayMessage::EndOfStream) => { + tx.send(PlaybackMessage::Next { span }).unwrap(); + } + Ok(PlayMessage::StateChanged { state }) => { + tx.send(PlaybackMessage::StateChanged { state, span }) + .unwrap(); + } + Ok(PlayMessage::PositionUpdated { position }) => { + let position = position + .and_then(|t| Some(t.mseconds() as u32)) + .unwrap_or(0); + tx.send(PlaybackMessage::PostitionChanged { position, span }) + .unwrap(); + } + Ok(PlayMessage::Buffering { percent: _ }) => {} + Ok(PlayMessage::VolumeChanged { volume }) => { + let volume = volume as f32; + tx.send(PlaybackMessage::VolumeChanged { volume, span }) + .unwrap(); + } + Ok(PlayMessage::MuteChanged { muted }) => { + tx.send(PlaybackMessage::MuteChanged { muted, span }) + .unwrap(); + } + + Ok(PlayMessage::MediaInfoUpdated { info: _ }) => {} + Ok(PlayMessage::UriLoaded) => {} + Ok(PlayMessage::VideoDimensionsChanged { + width: _, + height: _, + }) => {} + Ok(PlayMessage::DurationChanged { duration: _ }) => {} + _ => println!("Unknown message: {:?}", msg), + } + } +} + #[derive(Debug)] -enum ProviderMessage { - GetNode { +pub enum ProviderMessage { + GetLibraryNode { uuid: String, result_tx: flume::Sender>, + span: Span, }, GetTrack { uuid: String, result_tx: flume::Sender>, + span: Span, }, GetTrackUrls { uuid: String, result_tx: flume::Sender, ProviderError>>, + span: Span, }, FlattenNode { uuid: String, result_tx: flume::Sender>, + span: Span, }, } #[derive(Debug)] -struct ProviderOrchestrator { - provider_tx: flume::Sender, - provider_rx: flume::Receiver, - // known_tracks: RwLock>, - // known_nodes: RwLock>, - tidal_client: Arc, -} - -impl ProviderOrchestrator { - fn run(self) { - tokio::spawn(async move { - while let Ok(msg) = self.provider_rx.recv_async().await { - match msg { - ProviderMessage::GetNode { uuid, result_tx } => { - let result = self.get_lib_node(&uuid).await; - result_tx.send(result).unwrap(); - } - ProviderMessage::GetTrack { uuid, result_tx } => { - let result = self.get_metadata_for_track(&uuid).await; - result_tx.send(result).unwrap(); - } - ProviderMessage::GetTrackUrls { uuid, result_tx } => { - let result = self.get_urls_for_track(&uuid).await; - result_tx.send(result).unwrap(); - } - ProviderMessage::FlattenNode { uuid, result_tx } => { - let result = self.flatten_node(&uuid).await; - result_tx.send(result).unwrap(); - } - } - } - }); - } - async fn flatten_node(&self, node_uuid: &str) -> Vec { - let mut tracks = Vec::with_capacity(1000); - let mut nodes_to_go = Vec::with_capacity(100); - nodes_to_go.push(node_uuid.to_string()); - while let Some(node_uuid) = nodes_to_go.pop() { - let Ok(node) = self.get_lib_node(&node_uuid).await else { - continue - }; - if node.is_queable { - tracks.extend(node.tracks); - nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid)) - } - } - tracks - } -} - -#[async_trait] -impl ProviderClient for ProviderOrchestrator { - async fn init(_s: &str) -> Result { - let config_dir = dirs::config_dir() - .map(|d| d.join("crabidy")) - .unwrap_or(PathBuf::from("/tmp")); - let dir_exists = tokio::fs::try_exists(&config_dir) - .await - .map_err(|e| ProviderError::Config(e.to_string()))?; - if !dir_exists { - tokio::fs::create_dir(&config_dir) - .await - .map_err(|e| ProviderError::Config(e.to_string()))?; - } - let config_file = config_dir.join("tidaly.toml"); - let raw_toml_settings = fs::read_to_string(&config_file).unwrap_or("".to_owned()); - let tidal_client = Arc::new(tidaldy::Client::init(&raw_toml_settings).await.unwrap()); - let new_toml_config = tidal_client.settings(); - tokio::fs::write(&config_file, new_toml_config) - .await - .map_err(|e| ProviderError::Config(e.to_string())); - let (provider_tx, provider_rx) = flume::bounded(100); - Ok(Self { - provider_rx, - provider_tx, - tidal_client, - }) - } - fn settings(&self) -> String { - "".to_owned() - } - async fn get_urls_for_track(&self, track_uuid: &str) -> Result, ProviderError> { - self.tidal_client.get_urls_for_track(track_uuid).await - } - async fn get_metadata_for_track(&self, track_uuid: &str) -> Result { - self.tidal_client.get_metadata_for_track(track_uuid).await - } - fn get_lib_root(&self) -> LibraryNode { - let mut root_node = LibraryNode::new(); - let child = LibraryNodeChild::new("node:tidal".to_owned(), "tidal".to_owned()); - root_node.children.push(child); - println!("Global root node {:?}", root_node); - root_node - } - async fn get_lib_node(&self, uuid: &str) -> Result { - println!("get_lib_node {}", uuid); - if uuid == "node:/" { - return Ok(self.get_lib_root()); - } - if uuid == "node:tidal" { - return Ok(self.tidal_client.get_lib_root()); - } - self.tidal_client.get_lib_node(uuid).await - } -} - -#[derive(Debug)] -enum PlaybackMessage { +pub enum PlaybackMessage { Init { result_tx: flume::Sender, + span: Span, }, Replace { uuids: Vec, + span: Span, }, Queue { uuids: Vec, + span: Span, }, Append { uuids: Vec, + span: Span, }, Remove { positions: Vec, + span: Span, }, Insert { position: u32, uuids: Vec, + span: Span, }, SetCurrent { position: u32, + span: Span, + }, + TogglePlay { + span: Span, + }, + ToggleShuffle { + span: Span, + }, + Stop { + span: Span, }, - TogglePlay, - ToggleShuffle, - Stop, ChangeVolume { delta: f32, + span: Span, + }, + ToggleMute { + span: Span, + }, + Next { + span: Span, + }, + Prev { + span: Span, + }, + RestartTrack { + span: Span, }, - ToggleMute, - Next, - Prev, - RestartTrack, StateChanged { state: GstPlaystate, + span: Span, }, VolumeChanged { volume: f32, + span: Span, }, MuteChanged { muted: bool, + span: Span, }, PostitionChanged { position: u32, + span: Span, }, } - -#[derive(Debug)] -struct Playback { - update_tx: tokio::sync::broadcast::Sender, - provider_tx: flume::Sender, - playback_tx: flume::Sender, - playback_rx: flume::Receiver, - queue: Mutex, - state: Mutex, - play: Play, - creation: std::time::Instant, -} - -impl Playback { - fn new( - update_tx: tokio::sync::broadcast::Sender, - provider_tx: flume::Sender, - ) -> Self { - let (playback_tx, playback_rx) = flume::bounded(10); - let queue = Mutex::new(Queue { - timestamp: 0, - current_position: 0, - tracks: Vec::new(), - }); - let state = Mutex::new(GstPlaystate::Stopped); - let play = Play::new(None::); - let creation = std::time::Instant::now(); - Self { - update_tx, - provider_tx, - playback_tx, - playback_rx, - queue, - state, - play, - creation, - } - } - fn run(self) { - tokio::spawn(async move { - while let Ok(message) = self.playback_rx.recv_async().await { - match message { - PlaybackMessage::Init { result_tx } => { - let response = { - let queue = self.queue.lock().unwrap(); - let queue_track = QueueTrack { - queue_position: queue.current_position, - track: queue.current(), - }; - let position = TrackPosition { - duration: self - .play - .duration() - .map(|t| t.mseconds() as u32) - .unwrap_or(0), - position: self - .play - .position() - .map(|t| t.mseconds() as u32) - .unwrap_or(0), - }; - let play_state = match *self.state.lock().unwrap() { - GstPlaystate::Playing => PlayState::Playing, - GstPlaystate::Paused => PlayState::Paused, - GstPlaystate::Stopped => PlayState::Stopped, - GstPlaystate::Buffering => PlayState::Loading, - _ => PlayState::Unspecified, - }; - InitResponse { - queue: Some(queue.clone()), - queue_track: Some(queue_track), - play_state: play_state as i32, - volume: self.play.volume() as f32, - mute: self.play.is_muted(), - position: Some(position), - } - }; - result_tx.send(response).unwrap(); - } - PlaybackMessage::Replace { uuids } => { - let mut all_tracks = Vec::new(); - for uuid in uuids { - if is_track(&uuid) { - println!("Track {}", uuid); - if let Ok(track) = self.get_track(&uuid).await { - all_tracks.push(track); - } - } else { - println!("Node {}", uuid); - let tracks = self.flatten_node(&uuid).await; - all_tracks.extend(tracks); - } - } - let current = { - let mut queue = self.queue.lock().unwrap(); - queue.set_current_position(0); - queue.replace_with_tracks(&all_tracks); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - } - let track = queue.current(); - let update = StreamUpdate::QueueTrack(QueueTrack { - queue_position: queue.current_position, - track, - }); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - } - queue.current() - }; - if let Some(track) = current { - self.play(track).await; - } - } - - PlaybackMessage::Queue { uuids } => { - let mut all_tracks = Vec::new(); - for uuid in uuids { - if is_track(&uuid) { - println!("Track {}", uuid); - if let Ok(track) = self.get_track(&uuid).await { - all_tracks.push(track); - } - } else { - println!("Node {}", uuid); - let tracks = self.flatten_node(&uuid).await; - all_tracks.extend(tracks); - } - } - { - let mut queue = self.queue.lock().unwrap(); - queue.queue_tracks(&all_tracks); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - } - } - } - - PlaybackMessage::Append { uuids } => { - let mut all_tracks = Vec::new(); - for uuid in uuids { - if is_track(&uuid) { - println!("Track {}", uuid); - if let Ok(track) = self.get_track(&uuid).await { - all_tracks.push(track); - } - } else { - println!("Node {}", uuid); - let tracks = self.flatten_node(&uuid).await; - all_tracks.extend(tracks); - } - } - { - let mut queue = self.queue.lock().unwrap(); - queue.append_tracks(&all_tracks); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - } - } - } - - PlaybackMessage::Remove { positions } => { - let mut queue = self.queue.lock().unwrap(); - queue.remove_tracks(&positions); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - queue_update_tx.send(update).unwrap(); - if positions.contains(&queue.current_position) { - let playback_tx = self.playback_tx.clone(); - playback_tx.send(PlaybackMessage::Next).unwrap(); - } - } - - PlaybackMessage::Insert { position, uuids } => { - let mut all_tracks = Vec::new(); - for uuid in uuids { - if is_track(&uuid) { - if let Ok(track) = self.get_track(&uuid).await { - all_tracks.push(track); - } - } else { - let tracks = self.flatten_node(&uuid).await; - all_tracks.extend(tracks); - } - } - let mut queue = self.queue.lock().unwrap(); - queue.insert_tracks(position, &all_tracks); - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::Queue(queue.clone()); - queue_update_tx.send(update).unwrap(); - } - - PlaybackMessage::SetCurrent { - position: queue_position, - } => { - let track = { - let mut queue = self.queue.lock().unwrap(); - queue.set_current_position(queue_position); - queue.current() - }; - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::QueueTrack(QueueTrack { - queue_position, - track: track.clone(), - }); - - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - }; - if let Some(track) = track { - self.play(track).await; - } - } - - PlaybackMessage::TogglePlay => { - let mut state = self.state.lock().unwrap(); - if *state == GstPlaystate::Playing { - self.play.pause(); - } else { - self.play.play(); - } - } - - PlaybackMessage::Stop => { - self.play.stop(); - } - - PlaybackMessage::ChangeVolume { delta } => { - let volume = self.play.volume(); - self.play.set_volume(volume + delta as f64); - } - - PlaybackMessage::ToggleMute => { - let muted = self.play.is_muted(); - self.play.set_mute(!muted); - } - - PlaybackMessage::ToggleShuffle => { - todo!() - } - - PlaybackMessage::Next => { - let (result, stop, pos) = { - let mut queue = self.queue.lock().unwrap(); - let position = queue.current_position + 1; - let stop = !queue.set_current_position(position); - let pos = queue.current_position; - (queue.current(), stop, pos) - }; - let queue_update_tx = self.update_tx.clone(); - let update = StreamUpdate::QueueTrack(QueueTrack { - queue_position: pos, - track: result.clone(), - }); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - }; - - if let Some(track) = result { - self.play(track).await; - } - if stop { - self.stop_track() - } - } - PlaybackMessage::Prev => { - let (result, stop, pos) = { - let mut queue = self.queue.lock().unwrap(); - let position = queue.current_position - 1; - let stop = !queue.set_current_position(position); - let pos = queue.current_position; - (queue.current(), stop, pos) - }; - let update = StreamUpdate::QueueTrack(QueueTrack { - queue_position: pos, - track: result.clone(), - }); - let queue_update_tx = self.update_tx.clone(); - if let Err(err) = queue_update_tx.send(update) { - println!("{:?}", err) - }; - - if let Some(track) = result { - self.play(track).await; - } - if stop { - self.stop_track() - } - } - - PlaybackMessage::StateChanged { state } => { - *self.state.lock().unwrap() = state.clone(); - let active_track_tx = self.update_tx.clone(); - let play_state = match state { - GstPlaystate::Playing => PlayState::Playing, - GstPlaystate::Paused => PlayState::Paused, - GstPlaystate::Stopped => PlayState::Stopped, - GstPlaystate::Buffering => PlayState::Loading, - _ => PlayState::Unspecified, - }; - let update = StreamUpdate::PlayState(play_state as i32); - if let Err(err) = active_track_tx.send(update) { - println!("{:?}", err) - }; - } - PlaybackMessage::RestartTrack => { - self.play.stop(); - self.play.play(); - } - PlaybackMessage::VolumeChanged { volume } => { - let update_tx = self.update_tx.clone(); - let update = StreamUpdate::Volume(volume); - if let Err(err) = update_tx.send(update) { - println!("{:?}", err) - } - } - PlaybackMessage::MuteChanged { muted } => { - let update_tx = self.update_tx.clone(); - let update = StreamUpdate::Mute(muted); - if let Err(err) = update_tx.send(update) { - println!("{:?}", err) - } - } - PlaybackMessage::PostitionChanged { position } => { - let update_tx = self.update_tx.clone(); - let duration = self - .play - .duration() - .and_then(|t| Some(t.mseconds() as u32)) - .unwrap_or(0); - let update = StreamUpdate::Position(TrackPosition { duration, position }); - if let Err(err) = update_tx.send(update) { - println!("{:?}", err) - } - } - } - } - }); - } - async fn flatten_node(&self, uuid: &str) -> Vec { - let tx = self.provider_tx.clone(); - let (result_tx, result_rx) = flume::bounded(1); - let Ok(_) = tx.send_async(ProviderMessage::FlattenNode { - uuid: uuid.to_string(), - result_tx, - }).await else { - return Vec::new(); - }; - let Ok(tracks) = result_rx - .recv_async() - .await else { - return Vec::new(); - }; - tracks - } - async fn get_track(&self, uuid: &str) -> Result { - let tx = self.provider_tx.clone(); - let (result_tx, result_rx) = flume::bounded(1); - tx.send_async(ProviderMessage::GetTrack { - uuid: uuid.to_string(), - result_tx, - }) - .await - .map_err(|_| ProviderError::InternalError)?; - result_rx - .recv_async() - .await - .map_err(|_| ProviderError::InternalError)? - } - async fn get_urls_for_track(&self, uuid: &str) -> Result, ProviderError> { - let tx = self.provider_tx.clone(); - let (result_tx, result_rx) = flume::bounded(1); - tx.send_async(ProviderMessage::GetTrackUrls { - uuid: uuid.to_string(), - result_tx, - }) - .await - .map_err(|_| ProviderError::InternalError)?; - result_rx - .recv_async() - .await - .map_err(|_| ProviderError::InternalError)? - } - async fn get_queue_track(&self) -> QueueTrack { - let queue_position: u32; - let result = { - let mut queue = self.queue.lock().unwrap(); - queue_position = queue.current_position; - queue.current() - }; - let completion = 0; - let gst_play_state = self.state.lock().unwrap(); - let play_state = match *gst_play_state { - GstPlaystate::Stopped => PlayState::Stopped, - GstPlaystate::Buffering => PlayState::Loading, - GstPlaystate::Playing => PlayState::Playing, - GstPlaystate::Paused => PlayState::Paused, - _ => PlayState::Unspecified, - }; - let play_state = play_state as i32; - - QueueTrack { - queue_position, - track: result, - } - } - - async fn play(&self, track: Track) { - let Ok(urls) = self.get_urls_for_track(&track.uuid).await else { - let playback_tx = self.playback_tx.clone(); - playback_tx.send(PlaybackMessage::Next).unwrap(); - return - }; - { - let mut state_guard = self.state.lock().unwrap(); - *state_guard = GstPlaystate::Playing; - } - self.play.stop(); - self.play.set_uri(Some(&urls[0])); - self.play.play(); - } - - fn stop_track(&self) { - { - let mut state_guard = self.state.lock().unwrap(); - *state_guard = GstPlaystate::Stopped; - } - self.play.stop(); - } - - fn playpause(&self) { - let mut state_guard = self.state.lock().unwrap(); - if *state_guard == GstPlaystate::Playing { - *state_guard = GstPlaystate::Paused; - self.play.pause(); - } else { - *state_guard = GstPlaystate::Playing; - self.play.play() - } - } -} - -#[derive(Debug)] -struct RpcService { - update_tx: tokio::sync::broadcast::Sender, - playback_tx: flume::Sender, - provider_tx: flume::Sender, -} - -impl RpcService { - fn new( - update_rx: tokio::sync::broadcast::Sender, - playback_tx: flume::Sender, - provider_tx: flume::Sender, - ) -> Self { - Self { - update_tx: update_rx, - playback_tx, - provider_tx, - } - } -} - -#[tonic::async_trait] -impl CrabidyService for RpcService { - type GetUpdateStreamStream = - Pin> + Send>>; - - async fn init(&self, request: Request) -> Result, Status> { - let playback_tx = self.playback_tx.clone(); - let (result_tx, result_rx) = flume::bounded(1); - let Ok(response) = playback_tx - .send_async(PlaybackMessage::Init { result_tx }) - .await - else { - return Err(Status::internal("Sending init failed internally")); - - }; - let Ok(response) = result_rx - .recv_async() - .await - else { - return Err(Status::internal("Init failed internally")); - - }; - Ok(Response::new(response)) - } - - async fn get_library_node( - &self, - request: Request, - ) -> Result, Status> { - let provider_tx = self.provider_tx.clone(); - let (result_tx, result_rx) = flume::bounded(1); - - provider_tx - .send_async(ProviderMessage::GetNode { - uuid: request.into_inner().uuid, - result_tx, - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let result = result_rx - .recv_async() - .await - .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; - match result { - Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })), - Err(err) => Err(Status::internal(err.to_string())), - } - } - - async fn queue( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::Queue { - uuids: req.uuid.clone(), - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - - let reply = QueueResponse {}; - Ok(Response::new(reply)) - } - - async fn replace( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::Replace { - uuids: req.uuid.clone(), - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = ReplaceResponse {}; - Ok(Response::new(reply)) - } - - async fn append( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::Append { - uuids: req.uuid.clone(), - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = AppendResponse {}; - Ok(Response::new(reply)) - } - - async fn remove( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::Remove { - positions: req.positions, - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = RemoveResponse {}; - Ok(Response::new(reply)) - } - - async fn insert( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::Insert { - position: req.position, - uuids: req.uuid, - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = InsertResponse {}; - Ok(Response::new(reply)) - } - - async fn set_current( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - let req = request.into_inner(); - playback_tx - .send_async(PlaybackMessage::SetCurrent { - position: req.position, - }) - .await - .map_err(|_| Status::internal("Failed to send request via channel"))?; - let reply = SetCurrentResponse {}; - Ok(Response::new(reply)) - } - - async fn get_update_stream( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let update_rx = self.update_tx.subscribe(); - let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx); - - let output_stream = update_stream - .into_stream() - .map(|update_result| match update_result { - Ok(update) => Ok(GetUpdateStreamResponse { - update: Some(update), - }), - Err(_) => Err(tonic::Status::new( - tonic::Code::Unknown, - "Internal channel error", - )), - }); - - Ok(Response::new(Box::pin(output_stream))) - } - async fn save_queue( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let reply = SaveQueueResponse {}; - Ok(Response::new(reply)) - } - - /// Playback - async fn toggle_play( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx - .send_async(PlaybackMessage::TogglePlay) - .await - .unwrap(); - let reply = TogglePlayResponse {}; - Ok(Response::new(reply)) - } - - async fn toggle_shuffle( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx - .send_async(PlaybackMessage::ToggleShuffle) - .await - .unwrap(); - let reply = ToggleShuffleResponse {}; - Ok(Response::new(reply)) - } - - async fn stop( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx.send_async(PlaybackMessage::Stop).await.unwrap(); - let reply = StopResponse {}; - Ok(Response::new(reply)) - } - - async fn change_volume( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let delta = request.into_inner().delta; - let playback_tx = self.playback_tx.clone(); - playback_tx - .send_async(PlaybackMessage::ChangeVolume { delta }) - .await - .unwrap(); - let reply = ChangeVolumeResponse {}; - Ok(Response::new(reply)) - } - - async fn toggle_mute( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx - .send_async(PlaybackMessage::ToggleMute) - .await - .unwrap(); - let reply = ToggleMuteResponse {}; - Ok(Response::new(reply)) - } - - async fn next( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx.send_async(PlaybackMessage::Next).await.unwrap(); - let reply = NextResponse {}; - Ok(Response::new(reply)) - } - - async fn prev( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx.send_async(PlaybackMessage::Prev).await.unwrap(); - let reply = PrevResponse {}; - Ok(Response::new(reply)) - } - - async fn restart_track( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let playback_tx = self.playback_tx.clone(); - playback_tx - .send_async(PlaybackMessage::RestartTrack) - .await - .unwrap(); - let reply = RestartTrackResponse {}; - Ok(Response::new(reply)) - } -} - -fn is_track(uuid: &str) -> bool { - uuid.starts_with("track:") -} - -fn is_node(uuid: &str) -> bool { - uuid.starts_with("node:") -} diff --git a/tidaldy/Cargo.toml b/tidaldy/Cargo.toml index de62722..99770ce 100644 --- a/tidaldy/Cargo.toml +++ b/tidaldy/Cargo.toml @@ -19,6 +19,7 @@ serde_urlencoded = "0.7.1" thiserror = "1.0.40" tokio = { version = "1.28.1", features = ["full", "time"] } toml = "0.7.4" +tracing = "0.1.37" [dev-dependencies] tokio = { version = "1.28.1", features = ["full"] } diff --git a/tidaldy/src/lib.rs b/tidaldy/src/lib.rs index f990982..3c0b867 100644 --- a/tidaldy/src/lib.rs +++ b/tidaldy/src/lib.rs @@ -3,6 +3,7 @@ use reqwest::Client as HttpClient; use serde::de::DeserializeOwned; use tokio::time::{sleep, Duration, Instant}; +use tracing::{debug, instrument}; pub mod config; pub mod models; use async_trait::async_trait; @@ -16,6 +17,7 @@ pub struct Client { #[async_trait] impl crabidy_core::ProviderClient for Client { + #[instrument(skip(raw_toml_settings))] async fn init(raw_toml_settings: &str) -> Result { let settings: config::Settings = if let Ok(settings) = toml::from_str(raw_toml_settings) { settings @@ -37,34 +39,43 @@ impl crabidy_core::ProviderClient for Client { } Err(crabidy_core::ProviderError::CouldNotLogin) } + #[instrument(skip(self))] fn settings(&self) -> String { - toml::to_string_pretty(&self.settings).unwrap() + toml::to_string_pretty(&self.settings).unwrap_or_default() } + #[instrument(skip(self))] async fn get_urls_for_track( &self, track_uuid: &str, ) -> Result, crabidy_core::ProviderError> { + debug!("get_urls_for_track {}", track_uuid); let (_, track_uuid, _) = split_uuid(track_uuid); let Ok(playback) = self.get_track_playback(&track_uuid).await else { return Err(crabidy_core::ProviderError::FetchError) }; + debug!("playback {:?}", playback); let Ok(manifest) = playback.get_manifest() else { return Err(crabidy_core::ProviderError::FetchError) }; + debug!("manifest {:?}", manifest); Ok(manifest.urls) } + #[instrument(skip(self))] async fn get_metadata_for_track( &self, track_uuid: &str, ) -> Result { + debug!("get_metadata_for_track {}", track_uuid); let Ok(track) = self.get_track(track_uuid).await else { return Err(crabidy_core::ProviderError::FetchError) }; Ok(track.into()) } + #[instrument(skip(self))] fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode { + debug!("get_lib_root"); let global_root = crabidy_core::proto::crabidy::LibraryNode::new(); let children = vec![crabidy_core::proto::crabidy::LibraryNodeChild::new( "node:userplaylists".to_string(), @@ -80,6 +91,7 @@ impl crabidy_core::ProviderClient for Client { } } + #[instrument(skip(self))] async fn get_lib_node( &self, uuid: &str, @@ -87,6 +99,7 @@ impl crabidy_core::ProviderClient for Client { let Some(user_id) = self.settings.login.user_id.clone() else { return Err(crabidy_core::ProviderError::UnknownUser) }; + debug!("get_lib_node {}", uuid); let (_kind, module, uuid) = split_uuid(uuid); let node = match module.as_str() { "userplaylists" => { @@ -129,6 +142,7 @@ impl crabidy_core::ProviderClient for Client { } } +#[instrument] fn split_uuid(uuid: &str) -> (String, String, String) { let mut split = uuid.splitn(3, ':'); ( @@ -150,10 +164,12 @@ impl Client { }) } + #[instrument] pub fn get_user_id(&self) -> Option { self.settings.login.user_id.clone() } + #[instrument] pub async fn make_request( &self, uri: &str, @@ -187,6 +203,7 @@ impl Client { Ok(response) } + #[instrument] pub async fn make_paginated_request( &self, uri: &str, @@ -244,6 +261,7 @@ impl Client { Ok(items) } + #[instrument] pub async fn make_explorer_request( &self, uri: &str, @@ -278,6 +296,7 @@ impl Client { Ok(()) } + #[instrument] pub async fn search(&self, query: &str) -> Result<(), ClientError> { let query = vec![("query", query.to_string())]; self.make_explorer_request(&format!("search/artists"), Some(&query)) @@ -285,6 +304,7 @@ impl Client { Ok(()) } + #[instrument] pub async fn get_playlist_tracks( &self, playlist_uuid: &str, @@ -294,18 +314,21 @@ impl Client { .await?) } + #[instrument] pub async fn get_playlist(&self, playlist_uuid: &str) -> Result { Ok(self .make_request(&format!("playlists/{}", playlist_uuid), None) .await?) } + #[instrument] pub async fn get_users_playlists(&self, user_id: u64) -> Result, ClientError> { Ok(self .make_paginated_request(&format!("users/{}/playlists", user_id), None) .await?) } + #[instrument] pub async fn get_users_playlists_and_favorite_playlists( &self, user_id: &str, @@ -318,6 +341,7 @@ impl Client { .await?) } + #[instrument] pub async fn explore_get_users_playlists_and_favorite_playlists( &self, user_id: u64, @@ -335,6 +359,7 @@ impl Client { Ok(()) } + #[instrument] pub async fn get_users_favorites(&self, user_id: u64) -> Result<(), ClientError> { self.make_explorer_request( &format!("users/{}/favorites", user_id), @@ -345,6 +370,7 @@ impl Client { Ok(()) } + #[instrument] pub async fn get_user(&self, user_id: u64) -> Result<(), ClientError> { self.make_explorer_request( &format!("users/{}", user_id), @@ -355,6 +381,7 @@ impl Client { Ok(()) } + #[instrument] pub async fn get_track_playback(&self, track_id: &str) -> Result { let query = vec![ ("audioquality", "LOSSLESS".to_string()), @@ -368,12 +395,14 @@ impl Client { .await } + #[instrument] pub async fn get_track(&self, track_id: &str) -> Result { let (_, track_id, _) = split_uuid(track_id); self.make_request(&format!("tracks/{}", track_id), None) .await } + #[instrument] pub async fn login_web(&mut self) -> Result<(), ClientError> { let code_response = self.get_device_code().await?; let now = Instant::now(); @@ -399,6 +428,7 @@ impl Client { Err(ClientError::ConnectionError) } + #[instrument(skip(self))] pub async fn login_config(&mut self) -> Result<(), ClientError> { let Some(access_token) = self.settings.login.access_token.clone() else { return Err(ClientError::AuthError( @@ -427,6 +457,7 @@ impl Client { Ok(()) } + #[instrument] pub async fn refresh_access_token(&self) -> Result { let Some(refresh_token) = self.settings.login.refresh_token.clone() else { return Err(ClientError::AuthError( @@ -462,6 +493,7 @@ impl Client { )) } } + #[instrument] async fn get_device_code(&self) -> Result { let req = DeviceAuthRequest { client_id: self.settings.oauth.client_id.clone(), @@ -487,6 +519,7 @@ impl Client { Ok(code) } + #[instrument] pub async fn check_auth_status( &self, device_code: &str, @@ -533,7 +566,7 @@ mod tests { fn setup() -> Client { let settings = crate::config::Settings::default(); - Client::new(settings).unwrap() + Client::new(settings).expect("could not create tidaldy client") } #[tokio::test]