From 5c505445234ed560a269aa9f87c4bb0e150cd75e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Wed, 14 Jun 2023 09:57:08 +0200 Subject: [PATCH] Unnwrap server code --- crabidy-server/src/lib.rs | 18 +++-- crabidy-server/src/main.rs | 45 ++++++----- crabidy-server/src/playback.rs | 138 +++++++++++++++++++++++++-------- crabidy-server/src/provider.rs | 34 ++++---- crabidy-server/src/rpc.rs | 54 ++++++++----- 5 files changed, 195 insertions(+), 94 deletions(-) diff --git a/crabidy-server/src/lib.rs b/crabidy-server/src/lib.rs index ed22ba1..03c575e 100644 --- a/crabidy-server/src/lib.rs +++ b/crabidy-server/src/lib.rs @@ -16,7 +16,11 @@ pub struct QueueManager { impl From for Queue { fn from(queue_manager: QueueManager) -> Self { Self { - timestamp: queue_manager.created_at.elapsed().unwrap().as_secs(), + timestamp: queue_manager + .created_at + .elapsed() + .expect("failed to get elapsed time") + .as_secs(), current_position: queue_manager.current_position() as u32, tracks: queue_manager.tracks, } @@ -164,16 +168,20 @@ impl QueueManager { } let pos = self.current_position(); let order_additions: Vec = (len..len + tracks.len()).collect(); + debug!( + "extending play of len {:#?} with {:#?}", + len, order_additions + ); self.play_order.extend(order_additions); let tail: Vec = self .tracks .splice((self.current_position() + 1).., tracks.to_vec()) .collect(); self.tracks.extend(tail); - self.play_order - .iter_mut() - .filter(|i| (pos as usize) < **i) - .for_each(|i| *i += len); + // self.play_order + // .iter_mut() + // .filter(|i| (pos as usize) < **i) + // .for_each(|i| *i += len); if self.shuffle { self.shuffle_behind(self.current_offset); } diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index ce476e1..0fc163d 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -3,7 +3,7 @@ use crabidy_core::proto::crabidy::{ crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, PlayState, Track, }; use crabidy_core::{ProviderClient, ProviderError}; -use tracing::{debug_span, info, instrument, warn, Span}; +use tracing::{debug_span, error, info, instrument, warn, Span}; use tracing_subscriber::{filter::Targets, prelude::*}; mod playback; @@ -39,7 +39,9 @@ async fn main() -> Result<(), Box> { info!("audio player started initialized"); let (update_tx, _) = tokio::sync::broadcast::channel(2048); - let orchestrator = ProviderOrchestrator::init("").await.unwrap(); + let orchestrator = ProviderOrchestrator::init("") + .await + .expect("failed to init orchestrator"); let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone()); @@ -76,44 +78,51 @@ fn poll_play_bus(rx: flume::Receiver, tx: flume::Sender { - tx.send(PlaybackMessage::Next { span }).unwrap(); + if let Err(err) = tx.send(PlaybackMessage::Next { span }) { + error!("failed to send next message: {}", err); + } } PlayerMessage::Stopped => { - tx.send(PlaybackMessage::StateChanged { + if let Err(err) = tx.send(PlaybackMessage::StateChanged { state: PlayState::Stopped, span, - }) - .unwrap(); + }) { + error!("failed to send stopped message: {}", err); + } } PlayerMessage::Paused => { - tx.send(PlaybackMessage::StateChanged { + if let Err(err) = tx.send(PlaybackMessage::StateChanged { state: PlayState::Paused, span, - }) - .unwrap(); + }) { + error!("failed to send paused message: {}", err); + } } PlayerMessage::Playing => { - tx.send(PlaybackMessage::StateChanged { + if let Err(err) = tx.send(PlaybackMessage::StateChanged { state: PlayState::Playing, span, - }) - .unwrap(); + }) { + error!("failed to send playing message: {}", err); + } } PlayerMessage::Elapsed { duration, elapsed } => { - tx.send(PlaybackMessage::PostitionChanged { + if let Err(err) = tx.send(PlaybackMessage::PostitionChanged { duration: duration.as_millis() as u32, position: elapsed.as_millis() as u32, span, - }) - .unwrap(); + }) { + error!("failed to send elapsed message: {}", err); + } } PlayerMessage::Duration { duration } => { - tx.send(PlaybackMessage::PostitionChanged { + if let Err(err) = tx.send(PlaybackMessage::PostitionChanged { duration: duration.as_millis() as u32, position: 0, span, - }) - .unwrap(); + }) { + error!("failed to send duration message: {}", err); + } } } } diff --git a/crabidy-server/src/playback.rs b/crabidy-server/src/playback.rs index 2acc1e7..f29e456 100644 --- a/crabidy-server/src/playback.rs +++ b/crabidy-server/src/playback.rs @@ -51,7 +51,10 @@ impl Playback { let repeat; let shuffle; let response = { - let queue = self.queue.lock().unwrap(); + let Ok(queue) = self.queue.lock() else { + error!("failed to get queue lock"); + continue; + }; debug!("got queue lock"); repeat = queue.repeat; shuffle = queue.shuffle; @@ -69,13 +72,11 @@ impl Playback { trace!("position {:?}", position); let play_state = { debug!("getting play state lock"); - match *self.state.lock().unwrap() { - PlayState::Playing => PlayState::Playing, - PlayState::Paused => PlayState::Paused, - PlayState::Stopped => PlayState::Stopped, - PlayState::Loading => PlayState::Loading, - _ => PlayState::Unspecified, - } + let Ok(play_state) = self.state.lock() else { + error!("failed to get play state lock"); + continue; + }; + *play_state }; trace!("play_state {:?}", play_state); debug!("released play state lock"); @@ -90,7 +91,9 @@ impl Playback { } }; trace!("response {:?}", response); - result_tx.send(response).unwrap(); + if let Err(err) = result_tx.send(response) { + error!("failed to send response: {:#?}", err); + } } PlaybackMessage::Replace { uuids, span } => { let _e = span.enter(); @@ -108,12 +111,17 @@ impl Playback { } trace!("got tracks {:?}", all_tracks); let current = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.replace_with_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); - queue_update_tx.send(update).unwrap(); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + }; queue.current_track() }; debug!("got current {:?}", current); @@ -136,7 +144,10 @@ impl Playback { } trace!("got tracks {:?}", all_tracks); { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.queue_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); @@ -164,7 +175,10 @@ impl Playback { } trace!("got tracks {:?}", all_tracks); { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.append_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); @@ -180,16 +194,27 @@ impl Playback { let _e = span.enter(); debug!("removing"); let track = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); let track = queue.remove_tracks(&positions); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); - queue_update_tx.send(update).unwrap(); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + }; track }; debug!("queue lock released"); - let state = *self.state.lock().unwrap(); + let state = { + let Ok(state) = self.state.lock() else { + error!("failed to get play state lock"); + continue; + }; + *state + }; if state == PlayState::Playing { self.play(track).in_current_span().await; } @@ -215,12 +240,17 @@ impl Playback { } trace!("got tracks {:?}", all_tracks); { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.insert_tracks(position, &all_tracks); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); - queue_update_tx.send(update).unwrap(); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + }; } debug!("queue lock released"); } @@ -232,17 +262,24 @@ impl Playback { let _e = span.enter(); debug!("clearing"); let should_stop = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); let should_stop = queue.clear(exclude_current); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); - queue_update_tx.send(update).unwrap(); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + }; should_stop }; debug!("queue lock released"); if should_stop { - self.player.stop().in_current_span().await; + if let Err(err) = self.player.stop().in_current_span().await { + error!("{:?}", err) + } } } @@ -253,7 +290,10 @@ impl Playback { let _e = span.enter(); debug!("setting current"); let track = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.set_current_position(queue_position); queue.current_track() @@ -268,7 +308,10 @@ impl Playback { let shuffle; let repeat; { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); repeat = queue.repeat; if queue.shuffle { @@ -292,7 +335,10 @@ impl Playback { let shuffle; let repeat; { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); shuffle = queue.shuffle; if queue.repeat { @@ -314,7 +360,13 @@ impl Playback { let _e = span.enter(); debug!("toggling play"); { - let state = *self.state.lock().unwrap(); + let state = { + let Ok(state) = self.state.lock() else { + debug!("got state lock"); + continue; + }; + *state + }; debug!("got state lock"); if state == PlayState::Playing { if let Err(err) = self.player.pause().await { @@ -358,7 +410,10 @@ impl Playback { let _e = span.enter(); debug!("nexting"); let track = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.next_track() }; @@ -371,7 +426,10 @@ impl Playback { let _e = span.enter(); debug!("preving"); let track = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; debug!("got queue lock"); queue.prev_track() }; @@ -384,7 +442,11 @@ impl Playback { debug!("state changed"); let play_state = { - *self.state.lock().unwrap() = state; + let Ok(mut state_lock) = self.state.lock() else { + debug!("got state lock"); + continue; + }; + *state_lock = state; state }; debug!("released state lock and got play state {:?}", play_state); @@ -513,7 +575,10 @@ impl Playback { Err(err) => { warn!("no urls found for track {:?}: {}", track.uuid, err); uuid = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("got queue lock"); + continue; + }; if let Some(track) = queue.next_track() { track.uuid.clone() } else { @@ -524,7 +589,10 @@ impl Playback { } }; { - let queue = self.queue.lock().unwrap(); + let Ok(queue) = self.queue.lock() else { + error!("poisend queue lock"); + return + }; let queue_update_tx = self.update_tx.clone(); let track = queue.current_track(); let update = StreamUpdate::QueueTrack(QueueTrack { @@ -553,7 +621,10 @@ impl Playback { Err(err) => { warn!("no urls found for track {:?}: {}", track.uuid, err); uuid = { - let mut queue = self.queue.lock().unwrap(); + let Ok(mut queue) = self.queue.lock() else { + debug!("poisend queue lock"); + return; + }; if let Some(track) = queue.next_track() { track.uuid.clone() } else { @@ -564,7 +635,10 @@ impl Playback { } }; { - let queue = self.queue.lock().unwrap(); + let Ok(queue) = self.queue.lock() else { + error!("poisend queue lock"); + return + }; let queue_update_tx = self.update_tx.clone(); let track = queue.current_track(); let update = StreamUpdate::QueueTrack(QueueTrack { diff --git a/crabidy-server/src/provider.rs b/crabidy-server/src/provider.rs index 1f83228..6e97056 100644 --- a/crabidy-server/src/provider.rs +++ b/crabidy-server/src/provider.rs @@ -28,11 +28,9 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.get_lib_node(&uuid).in_current_span().await; - result_tx - .send_async(result) - .in_current_span() - .await - .unwrap(); + if let Err(err) = result_tx.send_async(result).in_current_span().await { + error!("failed to send result: {}", err); + } } ProviderMessage::GetTrack { uuid, @@ -41,11 +39,9 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.get_metadata_for_track(&uuid).in_current_span().await; - result_tx - .send_async(result) - .in_current_span() - .await - .unwrap(); + if let Err(err) = result_tx.send_async(result).in_current_span().await { + error!("failed to send result: {}", err); + } } ProviderMessage::GetTrackUrls { uuid, @@ -54,11 +50,9 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.get_urls_for_track(&uuid).in_current_span().await; - result_tx - .send_async(result) - .in_current_span() - .await - .unwrap(); + if let Err(err) = result_tx.send_async(result).in_current_span().await { + error!("failed to send result: {}", err); + } } ProviderMessage::FlattenNode { uuid, @@ -67,11 +61,9 @@ impl ProviderOrchestrator { } => { let _e = span.enter(); let result = self.flatten_node(&uuid).in_current_span().await; - result_tx - .send_async(result) - .in_current_span() - .await - .unwrap(); + if let Err(err) = result_tx.send_async(result).in_current_span().await { + error!("failed to send result: {}", err); + } } } } @@ -118,7 +110,7 @@ impl ProviderClient for ProviderOrchestrator { tidaldy::Client::init(&raw_toml_settings) .in_current_span() .await - .unwrap(), + .expect("Failed to init Tidal clienta"), ); let new_toml_config = tidal_client.settings(); if let Err(err) = tokio::fs::write(&config_file, new_toml_config) diff --git a/crabidy-server/src/rpc.rs b/crabidy-server/src/rpc.rs index ef674c8..2623f10 100644 --- a/crabidy-server/src/rpc.rs +++ b/crabidy-server/src/rpc.rs @@ -250,11 +250,13 @@ impl CrabidyService for RpcService { debug!("Received toggle_shuffle request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::ToggleShuffle { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = ToggleShuffleResponse {}; Ok(Response::new(reply)) } @@ -267,11 +269,13 @@ impl CrabidyService for RpcService { debug!("Received toggle_repeat request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::ToggleRepeat { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = ToggleRepeatResponse {}; Ok(Response::new(reply)) } @@ -320,11 +324,13 @@ impl CrabidyService for RpcService { debug!("Received toggle_play request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::TogglePlay { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = TogglePlayResponse {}; Ok(Response::new(reply)) } @@ -337,11 +343,13 @@ impl CrabidyService for RpcService { debug!("Received stop request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::Stop { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = StopResponse {}; Ok(Response::new(reply)) } @@ -356,11 +364,13 @@ impl CrabidyService for RpcService { debug!("Received change_volume request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::ChangeVolume { delta, span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = ChangeVolumeResponse {}; Ok(Response::new(reply)) } @@ -373,11 +383,13 @@ impl CrabidyService for RpcService { debug!("Received toggle_mute request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::ToggleMute { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = ToggleMuteResponse {}; Ok(Response::new(reply)) } @@ -390,11 +402,13 @@ impl CrabidyService for RpcService { debug!("Received next request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::Next { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = NextResponse {}; Ok(Response::new(reply)) } @@ -407,11 +421,13 @@ impl CrabidyService for RpcService { debug!("Received prev request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::Prev { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = PrevResponse {}; Ok(Response::new(reply)) } @@ -424,11 +440,13 @@ impl CrabidyService for RpcService { debug!("Received restart_track request"); let playback_tx = self.playback_tx.clone(); let span = debug_span!("play-chan"); - playback_tx + if let Err(err) = playback_tx .send_async(PlaybackMessage::RestartTrack { span }) .in_current_span() .await - .unwrap(); + { + error!("Failed to send request via channel: {}", err); + } let reply = RestartTrackResponse {}; Ok(Response::new(reply)) }