diff --git a/Cargo.lock b/Cargo.lock index bf65bac..807c604 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,12 +164,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" -[[package]] -name = "atomic_refcell" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79d6dc922a2792b006573f60b2648076355daeae5ce9cb59507e5908c9625d31" - [[package]] name = "audio-player" version = "0.1.0" @@ -366,16 +360,6 @@ dependencies = [ "nom", ] -[[package]] -name = "cfg-expr" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8790cf1286da485c72cf5fc7aeba308438800036ec67d89425924c4807268c9" -dependencies = [ - "smallvec", - "target-lexicon", -] - [[package]] name = "cfg-if" version = "1.0.0" @@ -545,12 +529,11 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "audio-player", "crabidy-core", "dirs", "flume", "futures", - "gstreamer", - "gstreamer-play", "log", "once_cell", "serde", @@ -954,209 +937,12 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "gio-sys" -version = "0.17.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b1d43b0d7968b48455244ecafe41192871257f5740aa6b095eb19db78e362a5" -dependencies = [ - "glib-sys", - "gobject-sys", - "libc", - "system-deps", - "winapi", -] - -[[package]] -name = "glib" -version = "0.17.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7f1de7cbde31ea4f0a919453a2dcece5d54d5b70e08f8ad254dc4840f5f09b6" -dependencies = [ - "bitflags", - "futures-channel", - "futures-core", - "futures-executor", - "futures-task", - "futures-util", - "gio-sys", - "glib-macros", - "glib-sys", - "gobject-sys", - "libc", - "memchr", - "once_cell", - "smallvec", - "thiserror", -] - -[[package]] -name = "glib-macros" -version = "0.17.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a7206c5c03851ef126ea1444990e81fdd6765fb799d5bc694e4897ca01bb97f" -dependencies = [ - "anyhow", - "heck 0.4.1", - "proc-macro-crate", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - -[[package]] -name = "glib-sys" -version = "0.17.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f00ad0a1bf548e61adfff15d83430941d9e1bb620e334f779edd1c745680a5" -dependencies = [ - "libc", - "system-deps", -] - [[package]] name = "glob" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "gobject-sys" -version = "0.17.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e75b0000a64632b2d8ca3cf856af9308e3a970844f6e9659bd197f026793d0" -dependencies = [ - "glib-sys", - "libc", - "system-deps", -] - -[[package]] -name = "gstreamer" -version = "0.20.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4530401c89be6dc10d77ae1587b811cf455c97dce7abf594cb9164527c7da7fc" -dependencies = [ - "bitflags", - "cfg-if", - "futures-channel", - "futures-core", - "futures-util", - "glib", - "gstreamer-sys", - "libc", - "muldiv", - "num-integer", - "num-rational", - "once_cell", - "option-operations", - "paste", - "pretty-hex", - "smallvec", - "thiserror", -] - -[[package]] -name = "gstreamer-base" -version = "0.20.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b8ff5dfbf7bcaf1466a385b836bad0d8da25759f121458727fdda1f771c69b3" -dependencies = [ - "atomic_refcell", - "bitflags", - "cfg-if", - "glib", - "gstreamer", - "gstreamer-base-sys", - "libc", -] - -[[package]] -name = "gstreamer-base-sys" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26114ed96f6668380f5a1554128159e98e06c3a7a8460f216d7cd6dce28f928c" -dependencies = [ - "glib-sys", - "gobject-sys", - "gstreamer-sys", - "libc", - "system-deps", -] - -[[package]] -name = "gstreamer-play" -version = "0.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f752a53171e330c7f56db24ca91d99b7958dc86395ebe91b117226d339b29306" -dependencies = [ - "bitflags", - "glib", - "gstreamer", - "gstreamer-play-sys", - "gstreamer-video", - "libc", - "once_cell", -] - -[[package]] -name = "gstreamer-play-sys" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b69030bd53c3e5988a1e13bdb55ae8d922f8e9c2b522bfa2442bc13906829fb" -dependencies = [ - "glib-sys", - "gobject-sys", - "gstreamer-sys", - "gstreamer-video-sys", - "libc", - "system-deps", -] - -[[package]] -name = "gstreamer-sys" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e56fe047adef7d47dbafa8bc1340fddb53c325e16574763063702fc94b5786d2" -dependencies = [ - "glib-sys", - "gobject-sys", - "libc", - "system-deps", -] - -[[package]] -name = "gstreamer-video" -version = "0.20.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce97769effde2d779dc4f7037b37106457b74e53f2a711bddc90b30ffeb7e06" -dependencies = [ - "bitflags", - "cfg-if", - "futures-channel", - "glib", - "gstreamer", - "gstreamer-base", - "gstreamer-video-sys", - "libc", - "once_cell", -] - -[[package]] -name = "gstreamer-video-sys" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66ddb6112d438aac0004d2db6053a572f92b1c5e0e9d6ff6c71d9245f7f73e46" -dependencies = [ - "glib-sys", - "gobject-sys", - "gstreamer-base-sys", - "gstreamer-sys", - "libc", - "system-deps", -] - [[package]] name = "h2" version = "0.3.19" @@ -1648,12 +1434,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "muldiv" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" - [[package]] name = "multimap" version = "0.8.3" @@ -1794,17 +1574,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-rational" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.15" @@ -1953,15 +1722,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "option-operations" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c26d27bb1aeab65138e4bf7666045169d1717febcc9ff870166be8348b223d0" -dependencies = [ - "paste", -] - [[package]] name = "ordered-stream" version = "0.2.0" @@ -2007,12 +1767,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "paste" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" - [[package]] name = "peeking_take_while" version = "0.1.2" @@ -2139,12 +1893,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pretty-hex" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa0831dd7cc608c38a5e323422a0077678fa5744aa2be4ad91c4ece8eec8d5" - [[package]] name = "prettyplease" version = "0.1.25" @@ -2165,30 +1913,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" version = "1.0.58" @@ -3025,25 +2749,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" -[[package]] -name = "system-deps" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5fa6fb9ee296c0dc2df41a656ca7948546d061958115ddb0bcaae43ad0d17d2" -dependencies = [ - "cfg-expr", - "heck 0.4.1", - "pkg-config", - "toml 0.7.4", - "version-compare", -] - -[[package]] -name = "target-lexicon" -version = "0.12.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd1ba337640d60c3e96bc6f0638a939b9c9a7f2c316a1598c279828b3d1dc8c5" - [[package]] name = "tauri-winrt-notification" version = "0.1.0" @@ -3583,12 +3288,6 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" -[[package]] -name = "version-compare" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "579a42fc0b8e0c63b76519a339be31bed574929511fa53c1a3acae26eb258f29" - [[package]] name = "version_check" version = "0.9.4" diff --git a/crabidy-server/Cargo.toml b/crabidy-server/Cargo.toml index 38b4888..d0c905c 100644 --- a/crabidy-server/Cargo.toml +++ b/crabidy-server/Cargo.toml @@ -7,11 +7,10 @@ edition = "2021" [dependencies] anyhow = "1.0.71" -gstreamer = "0.20.5" -gstreamer-play = "0.20.2" tokio = { version = "1.28.0", features = ["full"] } tidaldy = { path = "../tidaldy" } crabidy-core = { path = "../crabidy-core" } +audio-player = { path = "../audio-player" } once_cell = "1.17.1" serde_json = "1.0.96" serde = "1.0.163" diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index 4972e6b..b6fcf0b 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -1,8 +1,8 @@ +use audio_player::PlayerMessage; use crabidy_core::proto::crabidy::{ - crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, Track, + crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, PlayState, Track, }; use crabidy_core::{ProviderClient, ProviderError}; -use gstreamer_play::{PlayMessage, PlayState as GstPlaystate}; use tracing::{debug_span, info, instrument, warn, Span}; use tracing_subscriber::{filter::Targets, prelude::*}; @@ -36,19 +36,18 @@ async fn main() -> Result<(), Box> { tracing::subscriber::set_global_default(registry) .expect("Setting the default tracing subscriber failed"); - gstreamer::init()?; - info!("gstreamer initialized"); + info!("audio player started initialized"); let (update_tx, _) = tokio::sync::broadcast::channel(2048); let orchestrator = ProviderOrchestrator::init("").await.unwrap(); let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone()); - let bus = playback.play.message_bus(); let playback_tx = playback.playback_tx.clone(); + let player_msg = playback.player.messages.clone(); std::thread::spawn(|| { - poll_play_bus(bus, playback_tx); + poll_play_bus(player_msg, playback_tx); }); info!("gstreamer bus handler started"); @@ -71,44 +70,51 @@ async fn main() -> Result<(), Box> { Ok(()) } -#[instrument(skip(bus, tx))] -fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender) { - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { +#[instrument(skip(rx, tx))] +fn poll_play_bus(rx: flume::Receiver, tx: flume::Sender) { + for msg in rx.iter() { let span = debug_span!("play-chan"); - match PlayMessage::parse(&msg) { - Ok(PlayMessage::EndOfStream) => { + match msg { + PlayerMessage::EndOfStream => { tx.send(PlaybackMessage::Next { span }).unwrap(); } - Ok(PlayMessage::StateChanged { state }) => { - tx.send(PlaybackMessage::StateChanged { state, span }) - .unwrap(); + PlayerMessage::Stopped => { + tx.send(PlaybackMessage::StateChanged { + state: PlayState::Stopped, + span, + }) + .unwrap(); } - Ok(PlayMessage::PositionUpdated { position }) => { - let position = position - .and_then(|t| Some(t.mseconds() as u32)) - .unwrap_or(0); - tx.send(PlaybackMessage::PostitionChanged { position, span }) - .unwrap(); + PlayerMessage::Paused => { + tx.send(PlaybackMessage::StateChanged { + state: PlayState::Paused, + span, + }) + .unwrap(); } - Ok(PlayMessage::Buffering { percent: _ }) => {} - Ok(PlayMessage::VolumeChanged { volume }) => { - let volume = volume as f32; - tx.send(PlaybackMessage::VolumeChanged { volume, span }) - .unwrap(); + PlayerMessage::Playing => { + tx.send(PlaybackMessage::StateChanged { + state: PlayState::Playing, + span, + }) + .unwrap(); } - Ok(PlayMessage::MuteChanged { muted }) => { - tx.send(PlaybackMessage::MuteChanged { muted, span }) - .unwrap(); + PlayerMessage::Elapsed { duration, elapsed } => { + tx.send(PlaybackMessage::PostitionChanged { + duration: duration.as_millis() as u32, + position: elapsed.as_millis() as u32, + span, + }) + .unwrap(); + } + PlayerMessage::Duration { duration } => { + tx.send(PlaybackMessage::PostitionChanged { + duration: duration.as_millis() as u32, + position: 0, + span, + }) + .unwrap(); } - - Ok(PlayMessage::MediaInfoUpdated { info: _ }) => {} - Ok(PlayMessage::UriLoaded) => {} - Ok(PlayMessage::VideoDimensionsChanged { - width: _, - height: _, - }) => {} - Ok(PlayMessage::DurationChanged { duration: _ }) => {} - _ => println!("Unknown message: {:?}", msg), } } } @@ -194,7 +200,7 @@ pub enum PlaybackMessage { span: Span, }, StateChanged { - state: GstPlaystate, + state: PlayState, span: Span, }, VolumeChanged { @@ -207,6 +213,7 @@ pub enum PlaybackMessage { span: Span, }, PostitionChanged { + duration: u32, position: u32, span: Span, }, diff --git a/crabidy-server/src/playback.rs b/crabidy-server/src/playback.rs index 0af44ea..5889d85 100644 --- a/crabidy-server/src/playback.rs +++ b/crabidy-server/src/playback.rs @@ -1,24 +1,23 @@ use crate::PlaybackMessage; use crate::ProviderMessage; +use audio_player::Player; use crabidy_core::proto::crabidy::{ get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack, Track, TrackPosition, }; use crabidy_core::ProviderError; -use gstreamer_play::{Play, PlayState as GstPlaystate, PlayVideoRenderer}; use std::sync::Mutex; use tracing::debug_span; use tracing::{debug, error, instrument, trace, warn, Instrument}; -#[derive(Debug)] pub struct Playback { update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, pub playback_tx: flume::Sender, playback_rx: flume::Receiver, queue: Mutex, - state: Mutex, - pub play: Play, + state: Mutex, + pub player: Player, } impl Playback { @@ -32,8 +31,8 @@ impl Playback { current_position: 0, tracks: Vec::new(), }); - let state = Mutex::new(GstPlaystate::Stopped); - let play = Play::new(None::); + let state = Mutex::new(PlayState::Stopped); + let player = Player::default(); Self { update_tx, provider_tx, @@ -41,13 +40,13 @@ impl Playback { playback_rx, queue, state, - play, + player, } } - #[instrument] + pub fn run(self) { tokio::spawn(async move { - while let Ok(message) = self.playback_rx.recv_async().in_current_span().await { + while let Ok(message) = self.playback_rx.recv_async().await { match message { PlaybackMessage::Init { result_tx, span } => { let _e = span.enter(); @@ -60,26 +59,19 @@ impl Playback { }; trace!("queue_track {:?}", queue_track); debug!("released queue_track lock"); + let position = TrackPosition { - duration: self - .play - .duration() - .map(|t| t.mseconds() as u32) - .unwrap_or(0), - position: self - .play - .position() - .map(|t| t.mseconds() as u32) - .unwrap_or(0), + duration: 0, + position: 0, }; trace!("position {:?}", position); let play_state = { debug!("getting play state lock"); match *self.state.lock().unwrap() { - GstPlaystate::Playing => PlayState::Playing, - GstPlaystate::Paused => PlayState::Paused, - GstPlaystate::Stopped => PlayState::Stopped, - GstPlaystate::Buffering => PlayState::Loading, + PlayState::Playing => PlayState::Playing, + PlayState::Paused => PlayState::Paused, + PlayState::Stopped => PlayState::Stopped, + PlayState::Loading => PlayState::Loading, _ => PlayState::Unspecified, } }; @@ -89,8 +81,8 @@ impl Playback { queue: Some(queue.clone()), queue_track: Some(queue_track), play_state: play_state as i32, - volume: self.play.volume() as f32, - mute: self.play.is_muted(), + volume: 0.0, + mute: false, position: Some(position), } }; @@ -109,6 +101,7 @@ impl Playback { let tracks = self.flatten_node(&uuid).in_current_span().await; all_tracks.extend(tracks); } + debug!("uuid: {:?}", uuid); } trace!("got tracks {:?}", all_tracks); let current = { @@ -247,12 +240,14 @@ impl Playback { let _e = span.enter(); debug!("toggling play"); { - let state = self.state.lock().unwrap(); + let state = *self.state.lock().unwrap(); debug!("got state lock"); - if *state == GstPlaystate::Playing { - self.play.pause(); - } else { - self.play.play(); + if state == PlayState::Playing { + if let Err(err) = self.player.pause().await { + error!("{:?}", err) + } + } else if let Err(err) = self.player.unpause().await { + error!("{:?}", err) } } debug!("state lock released"); @@ -261,23 +256,28 @@ impl Playback { PlaybackMessage::Stop { span } => { let _e = span.enter(); debug!("stopping"); - self.play.stop(); + if let Err(err) = self.player.stop().await { + error!("{:?}", err) + } } PlaybackMessage::ChangeVolume { delta, span } => { let _e = span.enter(); debug!("changing volume"); - let volume = self.play.volume(); - debug!("got volume {:?}", volume); - self.play.set_volume(volume + delta as f64); + if let Ok(volume) = self.player.volume().await { + debug!("got volume {:?}", volume); + if let Err(err) = self.player.set_volume(volume + delta).await { + error!("{:?}", err) + }; + } } PlaybackMessage::ToggleMute { span } => { let _e = span.enter(); debug!("toggling mute"); - let muted = self.play.is_muted(); - debug!("got muted {:?}", muted); - self.play.set_mute(!muted); + // let muted = self.player.is_muted(); + // debug!("got muted {:?}", muted); + // self.player.set_mute(!muted); } PlaybackMessage::ToggleShuffle { span } => { @@ -316,16 +316,8 @@ impl Playback { debug!("state changed"); let play_state = { - *self.state.lock().unwrap() = state.clone(); - debug!("got state lock"); - - match state { - GstPlaystate::Playing => PlayState::Playing, - GstPlaystate::Paused => PlayState::Paused, - GstPlaystate::Stopped => PlayState::Stopped, - GstPlaystate::Buffering => PlayState::Loading, - _ => PlayState::Unspecified, - } + *self.state.lock().unwrap() = state; + state }; debug!("released state lock and got play state {:?}", play_state); let active_track_tx = self.update_tx.clone(); @@ -338,8 +330,9 @@ impl Playback { PlaybackMessage::RestartTrack { span } => { let _e = span.enter(); debug!("restarting track"); - self.play.stop(); - self.play.play(); + if let Err(err) = self.player.restart().await { + error!("{:?}", err) + } } PlaybackMessage::VolumeChanged { volume, span } => { @@ -362,15 +355,14 @@ impl Playback { } } - PlaybackMessage::PostitionChanged { position, span } => { + PlaybackMessage::PostitionChanged { + duration, + position, + span, + } => { let _e = span.enter(); trace!("position changed"); let update_tx = self.update_tx.clone(); - let duration = self - .play - .duration() - .and_then(|t| Some(t.mseconds() as u32)) - .unwrap_or(0); let update = StreamUpdate::Position(TrackPosition { duration, position }); if let Err(err) = update_tx.send(update) { error!("{:?}", err) @@ -404,6 +396,7 @@ impl Playback { #[instrument(skip(self))] async fn get_track(&self, uuid: &str) -> Result { + debug!("getting track"); let tx = self.provider_tx.clone(); let (result_tx, result_rx) = flume::bounded(1); let span = tracing::trace_span!("prov-chan"); @@ -474,11 +467,14 @@ impl Playback { error!("{:?}", err) } } - self.play.stop(); - self.play.set_uri(Some(&urls[0])); - self.play.play(); - } else { - self.play.stop(); + if let Err(err) = self.player.stop().await { + error!("{:?}", err) + }; + if let Err(err) = self.player.play(&urls[0]).await { + error!("{:?}", err) + }; + } else if let Err(err) = self.player.stop().await { + error!("{:?}", err) } } @@ -514,9 +510,12 @@ impl Playback { error!("{:?}", err) } } - self.play.stop(); - self.play.set_uri(Some(&urls[0])); - self.play.play(); + if let Err(err) = self.player.stop().await { + error!("{:?}", err) + }; + if let Err(err) = self.player.play(&urls[0]).await { + error!("{:?}", err) + } } } } diff --git a/crabidy-server/src/provider.rs b/crabidy-server/src/provider.rs index 984fb1a..fa97a07 100644 --- a/crabidy-server/src/provider.rs +++ b/crabidy-server/src/provider.rs @@ -17,10 +17,9 @@ pub struct ProviderOrchestrator { } impl ProviderOrchestrator { - #[instrument] pub fn run(self) { tokio::spawn(async move { - while let Ok(msg) = self.provider_rx.recv_async().in_current_span().await { + while let Ok(msg) = self.provider_rx.recv_async().await { match msg { ProviderMessage::GetLibraryNode { uuid, @@ -29,7 +28,11 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.get_lib_node(&uuid).in_current_span().await; - result_tx.send(result).unwrap(); + result_tx + .send_async(result) + .in_current_span() + .await + .unwrap(); } ProviderMessage::GetTrack { uuid, @@ -38,7 +41,11 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.get_metadata_for_track(&uuid).in_current_span().await; - result_tx.send(result).unwrap(); + result_tx + .send_async(result) + .in_current_span() + .await + .unwrap(); } ProviderMessage::GetTrackUrls { uuid, @@ -47,7 +54,11 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.get_urls_for_track(&uuid).in_current_span().await; - result_tx.send(result).unwrap(); + result_tx + .send_async(result) + .in_current_span() + .await + .unwrap(); } ProviderMessage::FlattenNode { uuid, @@ -56,7 +67,11 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.flatten_node(&uuid).in_current_span().await; - result_tx.send(result).unwrap(); + result_tx + .send_async(result) + .in_current_span() + .await + .unwrap(); } } } @@ -132,6 +147,7 @@ impl ProviderClient for ProviderOrchestrator { } #[instrument(skip(self))] async fn get_metadata_for_track(&self, track_uuid: &str) -> Result { + debug!("get_metadata_for_track"); self.tidal_client .get_metadata_for_track(track_uuid) .in_current_span() diff --git a/crabidy-server/src/rpc.rs b/crabidy-server/src/rpc.rs index 38a1b8e..036fac3 100644 --- a/crabidy-server/src/rpc.rs +++ b/crabidy-server/src/rpc.rs @@ -62,7 +62,7 @@ impl CrabidyService for RpcService { .await .map_err(|e| { error!("{:?}", e); - return Status::internal("Failed to receive response from provider channel"); + Status::internal("Failed to receive response from provider channel") })?; Ok(Response::new(response)) }