Unnwrap server code
CI checks / stable / fmt (push) Successful in 4s Details

This commit is contained in:
Hans Mündelein 2023-06-14 09:57:08 +02:00
parent 73cc79d776
commit 5c50544523
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
5 changed files with 195 additions and 94 deletions

View File

@ -16,7 +16,11 @@ pub struct QueueManager {
impl From<QueueManager> for Queue {
fn from(queue_manager: QueueManager) -> Self {
Self {
timestamp: queue_manager.created_at.elapsed().unwrap().as_secs(),
timestamp: queue_manager
.created_at
.elapsed()
.expect("failed to get elapsed time")
.as_secs(),
current_position: queue_manager.current_position() as u32,
tracks: queue_manager.tracks,
}
@ -164,16 +168,20 @@ impl QueueManager {
}
let pos = self.current_position();
let order_additions: Vec<usize> = (len..len + tracks.len()).collect();
debug!(
"extending play of len {:#?} with {:#?}",
len, order_additions
);
self.play_order.extend(order_additions);
let tail: Vec<Track> = self
.tracks
.splice((self.current_position() + 1).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
self.play_order
.iter_mut()
.filter(|i| (pos as usize) < **i)
.for_each(|i| *i += len);
// self.play_order
// .iter_mut()
// .filter(|i| (pos as usize) < **i)
// .for_each(|i| *i += len);
if self.shuffle {
self.shuffle_behind(self.current_offset);
}

View File

@ -3,7 +3,7 @@ use crabidy_core::proto::crabidy::{
crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, PlayState, Track,
};
use crabidy_core::{ProviderClient, ProviderError};
use tracing::{debug_span, info, instrument, warn, Span};
use tracing::{debug_span, error, info, instrument, warn, Span};
use tracing_subscriber::{filter::Targets, prelude::*};
mod playback;
@ -39,7 +39,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("audio player started initialized");
let (update_tx, _) = tokio::sync::broadcast::channel(2048);
let orchestrator = ProviderOrchestrator::init("").await.unwrap();
let orchestrator = ProviderOrchestrator::init("")
.await
.expect("failed to init orchestrator");
let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone());
@ -76,44 +78,51 @@ fn poll_play_bus(rx: flume::Receiver<PlayerMessage>, tx: flume::Sender<PlaybackM
let span = debug_span!("play-chan");
match msg {
PlayerMessage::EndOfStream => {
tx.send(PlaybackMessage::Next { span }).unwrap();
if let Err(err) = tx.send(PlaybackMessage::Next { span }) {
error!("failed to send next message: {}", err);
}
}
PlayerMessage::Stopped => {
tx.send(PlaybackMessage::StateChanged {
if let Err(err) = tx.send(PlaybackMessage::StateChanged {
state: PlayState::Stopped,
span,
})
.unwrap();
}) {
error!("failed to send stopped message: {}", err);
}
}
PlayerMessage::Paused => {
tx.send(PlaybackMessage::StateChanged {
if let Err(err) = tx.send(PlaybackMessage::StateChanged {
state: PlayState::Paused,
span,
})
.unwrap();
}) {
error!("failed to send paused message: {}", err);
}
}
PlayerMessage::Playing => {
tx.send(PlaybackMessage::StateChanged {
if let Err(err) = tx.send(PlaybackMessage::StateChanged {
state: PlayState::Playing,
span,
})
.unwrap();
}) {
error!("failed to send playing message: {}", err);
}
}
PlayerMessage::Elapsed { duration, elapsed } => {
tx.send(PlaybackMessage::PostitionChanged {
if let Err(err) = tx.send(PlaybackMessage::PostitionChanged {
duration: duration.as_millis() as u32,
position: elapsed.as_millis() as u32,
span,
})
.unwrap();
}) {
error!("failed to send elapsed message: {}", err);
}
}
PlayerMessage::Duration { duration } => {
tx.send(PlaybackMessage::PostitionChanged {
if let Err(err) = tx.send(PlaybackMessage::PostitionChanged {
duration: duration.as_millis() as u32,
position: 0,
span,
})
.unwrap();
}) {
error!("failed to send duration message: {}", err);
}
}
}
}

View File

@ -51,7 +51,10 @@ impl Playback {
let repeat;
let shuffle;
let response = {
let queue = self.queue.lock().unwrap();
let Ok(queue) = self.queue.lock() else {
error!("failed to get queue lock");
continue;
};
debug!("got queue lock");
repeat = queue.repeat;
shuffle = queue.shuffle;
@ -69,13 +72,11 @@ impl Playback {
trace!("position {:?}", position);
let play_state = {
debug!("getting play state lock");
match *self.state.lock().unwrap() {
PlayState::Playing => PlayState::Playing,
PlayState::Paused => PlayState::Paused,
PlayState::Stopped => PlayState::Stopped,
PlayState::Loading => PlayState::Loading,
_ => PlayState::Unspecified,
}
let Ok(play_state) = self.state.lock() else {
error!("failed to get play state lock");
continue;
};
*play_state
};
trace!("play_state {:?}", play_state);
debug!("released play state lock");
@ -90,7 +91,9 @@ impl Playback {
}
};
trace!("response {:?}", response);
result_tx.send(response).unwrap();
if let Err(err) = result_tx.send(response) {
error!("failed to send response: {:#?}", err);
}
}
PlaybackMessage::Replace { uuids, span } => {
let _e = span.enter();
@ -108,12 +111,17 @@ impl Playback {
}
trace!("got tracks {:?}", all_tracks);
let current = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.replace_with_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
};
queue.current_track()
};
debug!("got current {:?}", current);
@ -136,7 +144,10 @@ impl Playback {
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.queue_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
@ -164,7 +175,10 @@ impl Playback {
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.append_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
@ -180,16 +194,27 @@ impl Playback {
let _e = span.enter();
debug!("removing");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
let track = queue.remove_tracks(&positions);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
};
track
};
debug!("queue lock released");
let state = *self.state.lock().unwrap();
let state = {
let Ok(state) = self.state.lock() else {
error!("failed to get play state lock");
continue;
};
*state
};
if state == PlayState::Playing {
self.play(track).in_current_span().await;
}
@ -215,12 +240,17 @@ impl Playback {
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.insert_tracks(position, &all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
};
}
debug!("queue lock released");
}
@ -232,17 +262,24 @@ impl Playback {
let _e = span.enter();
debug!("clearing");
let should_stop = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
let should_stop = queue.clear(exclude_current);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
};
should_stop
};
debug!("queue lock released");
if should_stop {
self.player.stop().in_current_span().await;
if let Err(err) = self.player.stop().in_current_span().await {
error!("{:?}", err)
}
}
}
@ -253,7 +290,10 @@ impl Playback {
let _e = span.enter();
debug!("setting current");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.set_current_position(queue_position);
queue.current_track()
@ -268,7 +308,10 @@ impl Playback {
let shuffle;
let repeat;
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
repeat = queue.repeat;
if queue.shuffle {
@ -292,7 +335,10 @@ impl Playback {
let shuffle;
let repeat;
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
shuffle = queue.shuffle;
if queue.repeat {
@ -314,7 +360,13 @@ impl Playback {
let _e = span.enter();
debug!("toggling play");
{
let state = *self.state.lock().unwrap();
let state = {
let Ok(state) = self.state.lock() else {
debug!("got state lock");
continue;
};
*state
};
debug!("got state lock");
if state == PlayState::Playing {
if let Err(err) = self.player.pause().await {
@ -358,7 +410,10 @@ impl Playback {
let _e = span.enter();
debug!("nexting");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.next_track()
};
@ -371,7 +426,10 @@ impl Playback {
let _e = span.enter();
debug!("preving");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.prev_track()
};
@ -384,7 +442,11 @@ impl Playback {
debug!("state changed");
let play_state = {
*self.state.lock().unwrap() = state;
let Ok(mut state_lock) = self.state.lock() else {
debug!("got state lock");
continue;
};
*state_lock = state;
state
};
debug!("released state lock and got play state {:?}", play_state);
@ -513,7 +575,10 @@ impl Playback {
Err(err) => {
warn!("no urls found for track {:?}: {}", track.uuid, err);
uuid = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
if let Some(track) = queue.next_track() {
track.uuid.clone()
} else {
@ -524,7 +589,10 @@ impl Playback {
}
};
{
let queue = self.queue.lock().unwrap();
let Ok(queue) = self.queue.lock() else {
error!("poisend queue lock");
return
};
let queue_update_tx = self.update_tx.clone();
let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack {
@ -553,7 +621,10 @@ impl Playback {
Err(err) => {
warn!("no urls found for track {:?}: {}", track.uuid, err);
uuid = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("poisend queue lock");
return;
};
if let Some(track) = queue.next_track() {
track.uuid.clone()
} else {
@ -564,7 +635,10 @@ impl Playback {
}
};
{
let queue = self.queue.lock().unwrap();
let Ok(queue) = self.queue.lock() else {
error!("poisend queue lock");
return
};
let queue_update_tx = self.update_tx.clone();
let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack {

View File

@ -28,11 +28,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.get_lib_node(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
ProviderMessage::GetTrack {
uuid,
@ -41,11 +39,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.get_metadata_for_track(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
ProviderMessage::GetTrackUrls {
uuid,
@ -54,11 +50,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.get_urls_for_track(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
ProviderMessage::FlattenNode {
uuid,
@ -67,11 +61,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.flatten_node(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
}
}
@ -118,7 +110,7 @@ impl ProviderClient for ProviderOrchestrator {
tidaldy::Client::init(&raw_toml_settings)
.in_current_span()
.await
.unwrap(),
.expect("Failed to init Tidal clienta"),
);
let new_toml_config = tidal_client.settings();
if let Err(err) = tokio::fs::write(&config_file, new_toml_config)

View File

@ -250,11 +250,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_shuffle request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ToggleShuffle { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ToggleShuffleResponse {};
Ok(Response::new(reply))
}
@ -267,11 +269,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_repeat request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ToggleRepeat { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ToggleRepeatResponse {};
Ok(Response::new(reply))
}
@ -320,11 +324,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_play request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::TogglePlay { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = TogglePlayResponse {};
Ok(Response::new(reply))
}
@ -337,11 +343,13 @@ impl CrabidyService for RpcService {
debug!("Received stop request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Stop { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = StopResponse {};
Ok(Response::new(reply))
}
@ -356,11 +364,13 @@ impl CrabidyService for RpcService {
debug!("Received change_volume request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ChangeVolume { delta, span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ChangeVolumeResponse {};
Ok(Response::new(reply))
}
@ -373,11 +383,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_mute request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ToggleMute { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ToggleMuteResponse {};
Ok(Response::new(reply))
}
@ -390,11 +402,13 @@ impl CrabidyService for RpcService {
debug!("Received next request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Next { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = NextResponse {};
Ok(Response::new(reply))
}
@ -407,11 +421,13 @@ impl CrabidyService for RpcService {
debug!("Received prev request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Prev { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = PrevResponse {};
Ok(Response::new(reply))
}
@ -424,11 +440,13 @@ impl CrabidyService for RpcService {
debug!("Received restart_track request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::RestartTrack { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = RestartTrackResponse {};
Ok(Response::new(reply))
}