use crate::PlaybackMessage; use crate::ProviderMessage; use audio_player::Player; use crabidy_core::proto::crabidy::QueueModifiers; use crabidy_core::proto::crabidy::{ get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, QueueTrack, Track, TrackPosition, }; use crabidy_core::ProviderError; use crabidy_server::QueueManager; use std::sync::Mutex; use tracing::debug_span; use tracing::{debug, error, instrument, trace, warn, Instrument}; pub struct Playback { update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, pub playback_tx: flume::Sender, playback_rx: flume::Receiver, queue: Mutex, state: Mutex, pub player: Player, } impl Playback { pub fn new( update_tx: tokio::sync::broadcast::Sender, provider_tx: flume::Sender, ) -> Self { let (playback_tx, playback_rx) = flume::bounded(10); let queue = Mutex::new(QueueManager::new()); let state = Mutex::new(PlayState::Stopped); let player = Player::default(); Self { update_tx, provider_tx, playback_tx, playback_rx, queue, state, player, } } pub fn run(self) { tokio::spawn(async move { while let Ok(message) = self.playback_rx.recv_async().await { match message { PlaybackMessage::Init { result_tx, span } => { let _e = span.enter(); let repeat; let shuffle; let response = { let queue = self.queue.lock().unwrap(); debug!("got queue lock"); repeat = queue.repeat; shuffle = queue.shuffle; let queue_track = QueueTrack { queue_position: queue.current_position() as u32, track: queue.current_track(), }; trace!("queue_track {:?}", queue_track); debug!("released queue_track lock"); let position = TrackPosition { duration: 0, position: 0, }; trace!("position {:?}", position); let play_state = { debug!("getting play state lock"); match *self.state.lock().unwrap() { PlayState::Playing => PlayState::Playing, PlayState::Paused => PlayState::Paused, PlayState::Stopped => PlayState::Stopped, PlayState::Loading => PlayState::Loading, _ => PlayState::Unspecified, } }; trace!("play_state {:?}", play_state); debug!("released play state lock"); InitResponse { queue: Some(queue.clone().into()), queue_track: Some(queue_track), play_state: play_state as i32, volume: 0.0, mute: false, position: Some(position), mods: Some(QueueModifiers { repeat, shuffle }), } }; trace!("response {:?}", response); result_tx.send(response).unwrap(); } PlaybackMessage::Replace { uuids, span } => { let _e = span.enter(); let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { if let Ok(track) = self.get_track(&uuid).in_current_span().await { all_tracks.push(track); } } else { let tracks = self.flatten_node(&uuid).in_current_span().await; all_tracks.extend(tracks); } debug!("uuid: {:?}", uuid); } trace!("got tracks {:?}", all_tracks); let current = { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.replace_with_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); queue.current_track() }; debug!("got current {:?}", current); self.play(current).in_current_span().await; } PlaybackMessage::Queue { uuids, span } => { let _e = span.enter(); debug!("queing"); let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { if let Ok(track) = self.get_track(&uuid).in_current_span().await { all_tracks.push(track); } } else { let tracks = self.flatten_node(&uuid).in_current_span().await; all_tracks.extend(tracks); } } trace!("got tracks {:?}", all_tracks); { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.queue_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } } debug!("que lock released"); } PlaybackMessage::Append { uuids, span } => { let _e = span.enter(); debug!("appending"); let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { if let Ok(track) = self.get_track(&uuid).in_current_span().await { all_tracks.push(track); } } else { let tracks = self.flatten_node(&uuid).in_current_span().await; all_tracks.extend(tracks); } } trace!("got tracks {:?}", all_tracks); { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.append_tracks(&all_tracks); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } } debug!("queue lock released"); } PlaybackMessage::Remove { positions, span } => { let _e = span.enter(); debug!("removing"); let track = { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); let track = queue.remove_tracks(&positions); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); track }; debug!("queue lock released"); let state = *self.state.lock().unwrap(); if state == PlayState::Playing { self.play(track).in_current_span().await; } } PlaybackMessage::Insert { position, uuids, span, } => { let _e = span.enter(); debug!("inserting"); let mut all_tracks = Vec::new(); for uuid in uuids { if is_track(&uuid) { if let Ok(track) = self.get_track(&uuid).in_current_span().await { all_tracks.push(track); } } else { let tracks = self.flatten_node(&uuid).in_current_span().await; all_tracks.extend(tracks); } } trace!("got tracks {:?}", all_tracks); { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.insert_tracks(position, &all_tracks); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); } debug!("queue lock released"); } PlaybackMessage::Clear { exclude_current, span, } => { let _e = span.enter(); debug!("clearing"); let should_stop = { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); let should_stop = queue.clear(exclude_current); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Queue(queue.clone().into()); queue_update_tx.send(update).unwrap(); should_stop }; debug!("queue lock released"); if should_stop { self.player.stop().in_current_span().await; } } PlaybackMessage::SetCurrent { position: queue_position, span, } => { let _e = span.enter(); debug!("setting current"); let track = { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.set_current_position(queue_position); queue.current_track() }; debug!("quue lock released and got current {:?}", track); self.play(track).in_current_span().await; } PlaybackMessage::ToggleShuffle { span } => { let _e = span.enter(); debug!("toggling shuffle"); let shuffle; let repeat; { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); repeat = queue.repeat; if queue.shuffle { queue.shuffle_off() } else { queue.shuffle_on() } shuffle = queue.shuffle; } debug!("queue lock released"); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Mods(QueueModifiers { shuffle, repeat }); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } } PlaybackMessage::ToggleRepeat { span } => { let _e = span.enter(); debug!("toggling repeat"); let shuffle; let repeat; { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); shuffle = queue.shuffle; if queue.repeat { queue.repeat = false } else { queue.repeat = true } repeat = queue.repeat; } debug!("queue lock released"); let queue_update_tx = self.update_tx.clone(); let update = StreamUpdate::Mods(QueueModifiers { shuffle, repeat }); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } } PlaybackMessage::TogglePlay { span } => { let _e = span.enter(); debug!("toggling play"); { let state = *self.state.lock().unwrap(); debug!("got state lock"); if state == PlayState::Playing { if let Err(err) = self.player.pause().await { error!("{:?}", err) } } else if let Err(err) = self.player.unpause().await { error!("{:?}", err) } } debug!("state lock released"); } PlaybackMessage::Stop { span } => { let _e = span.enter(); debug!("stopping"); if let Err(err) = self.player.stop().await { error!("{:?}", err) } } PlaybackMessage::ChangeVolume { delta, span } => { let _e = span.enter(); debug!("changing volume"); if let Ok(volume) = self.player.volume().await { debug!("got volume {:?}", volume); if let Err(err) = self.player.set_volume(volume + delta).await { error!("{:?}", err) }; } } PlaybackMessage::ToggleMute { span } => { let _e = span.enter(); debug!("toggling mute"); // let muted = self.player.is_muted(); // debug!("got muted {:?}", muted); // self.player.set_mute(!muted); } PlaybackMessage::Next { span } => { let _e = span.enter(); debug!("nexting"); let track = { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.next_track() }; debug!("released queue lock and got track {:?}", track); self.play_or_stop(track).in_current_span().await; } PlaybackMessage::Prev { span } => { let _e = span.enter(); debug!("preving"); let track = { let mut queue = self.queue.lock().unwrap(); debug!("got queue lock"); queue.prev_track() }; debug!("released queue lock and got track {:?}", track); self.play_or_stop(track).in_current_span().await; } PlaybackMessage::StateChanged { state, span } => { let _e = span.enter(); debug!("state changed"); let play_state = { *self.state.lock().unwrap() = state; state }; debug!("released state lock and got play state {:?}", play_state); let active_track_tx = self.update_tx.clone(); let update = StreamUpdate::PlayState(play_state as i32); if let Err(err) = active_track_tx.send(update) { error!("{:?}", err) }; } PlaybackMessage::RestartTrack { span } => { let _e = span.enter(); debug!("restarting track"); if let Err(err) = self.player.restart().await { error!("{:?}", err) } } PlaybackMessage::VolumeChanged { volume, span } => { let _e = span.enter(); trace!("volume changed"); let update_tx = self.update_tx.clone(); let update = StreamUpdate::Volume(volume); if let Err(err) = update_tx.send(update) { error!("{:?}", err) } } PlaybackMessage::MuteChanged { muted, span } => { let _e = span.enter(); trace!("mute changed"); let update_tx = self.update_tx.clone(); let update = StreamUpdate::Mute(muted); if let Err(err) = update_tx.send(update) { error!("{:?}", err) } } PlaybackMessage::PostitionChanged { duration, position, span, } => { let _e = span.enter(); trace!("position changed"); let update_tx = self.update_tx.clone(); let update = StreamUpdate::Position(TrackPosition { duration, position }); if let Err(err) = update_tx.send(update) { error!("{:?}", err) } } } } }); } #[instrument(skip(self))] async fn flatten_node(&self, uuid: &str) -> Vec { let tx = self.provider_tx.clone(); let (result_tx, result_rx) = flume::bounded(1); let span = debug_span!("prov-chan"); let Ok(_) = tx.send_async(ProviderMessage::FlattenNode { uuid: uuid.to_string(), result_tx, span, }).in_current_span().await else { return Vec::new(); }; let Ok(tracks) = result_rx .recv_async() .in_current_span() .await else { return Vec::new(); }; tracks } #[instrument(skip(self))] async fn get_track(&self, uuid: &str) -> Result { debug!("getting track"); let tx = self.provider_tx.clone(); let (result_tx, result_rx) = flume::bounded(1); let span = tracing::trace_span!("prov-chan"); tx.send_async(ProviderMessage::GetTrack { uuid: uuid.to_string(), result_tx, span, }) .in_current_span() .await .map_err(|_| ProviderError::InternalError)?; result_rx .recv_async() .in_current_span() .await .map_err(|_| ProviderError::InternalError)? } #[instrument(skip(self))] async fn get_urls_for_track(&self, uuid: &str) -> Result, ProviderError> { let tx = self.provider_tx.clone(); let (result_tx, result_rx) = flume::bounded(1); let span = tracing::trace_span!("prov-chan"); tx.send_async(ProviderMessage::GetTrackUrls { uuid: uuid.to_string(), result_tx, span, }) .in_current_span() .await .map_err(|_| ProviderError::InternalError)?; result_rx .recv_async() .in_current_span() .await .map_err(|_| ProviderError::InternalError)? } #[instrument(skip(self))] async fn play_or_stop(&self, track: Option) { if let Some(track) = track { let mut uuid = track.uuid.clone(); let urls = loop { match self.get_urls_for_track(&uuid).in_current_span().await { Ok(urls) => break urls, Err(err) => { warn!("no urls found for track {:?}: {}", track.uuid, err); uuid = { let mut queue = self.queue.lock().unwrap(); if let Some(track) = queue.next_track() { track.uuid.clone() } else { return; } } } } }; { let queue = self.queue.lock().unwrap(); let queue_update_tx = self.update_tx.clone(); let track = queue.current_track(); let update = StreamUpdate::QueueTrack(QueueTrack { queue_position: queue.current_position() as u32, track, }); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } } if let Err(err) = self.player.play(&urls[0]).await { error!("{:?}", err) }; } else if let Err(err) = self.player.stop().await { error!("{:?}", err) } } #[instrument(skip(self))] async fn play(&self, track: Option) { if let Some(track) = track { let mut uuid = track.uuid.clone(); let urls = loop { match self.get_urls_for_track(&uuid).in_current_span().await { Ok(urls) => break urls, Err(err) => { warn!("no urls found for track {:?}: {}", track.uuid, err); uuid = { let mut queue = self.queue.lock().unwrap(); if let Some(track) = queue.next_track() { track.uuid.clone() } else { return; } } } } }; { let queue = self.queue.lock().unwrap(); let queue_update_tx = self.update_tx.clone(); let track = queue.current_track(); let update = StreamUpdate::QueueTrack(QueueTrack { queue_position: queue.current_position() as u32, track, }); if let Err(err) = queue_update_tx.send(update) { error!("{:?}", err) } } if let Err(err) = self.player.play(&urls[0]).await { error!("{:?}", err) } } } } fn is_track(uuid: &str) -> bool { uuid.starts_with("track:") }