diff --git a/Cargo.lock b/Cargo.lock index 31f1f2e..4b72490 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,6 +180,7 @@ dependencies = [ "stream-download", "symphonia", "thiserror", + "tracing", "url", ] diff --git a/audio-player/Cargo.toml b/audio-player/Cargo.toml index c0ea07f..b3336e2 100644 --- a/audio-player/Cargo.toml +++ b/audio-player/Cargo.toml @@ -11,3 +11,4 @@ anyhow = "1.0.71" url = "2.4.0" flume = "0.10.14" thiserror = "1.0.40" +tracing = "0.1.37" diff --git a/audio-player/src/decoder.rs b/audio-player/src/decoder.rs index 6b0c4c3..f388e14 100644 --- a/audio-player/src/decoder.rs +++ b/audio-player/src/decoder.rs @@ -11,10 +11,11 @@ use symphonia::{ io::MediaSourceStream, meta::{MetadataOptions, MetadataRevision}, probe::Hint, - units::{self, Time, TimeBase}, + units::{Time, TimeBase}, }, default::get_probe, }; +use tracing::warn; use rodio::Source; @@ -25,6 +26,7 @@ use crate::PlayerEngineCommand; // But a decode error in more than 3 consecutive packets is fatal. const MAX_DECODE_ERRORS: usize = 3; +#[derive(Clone)] pub struct MediaInfo { pub duration: Option, pub metadata: Option, @@ -97,7 +99,7 @@ impl SymphoniaDecoder { .map(|frames| track.codec_params.start_ts + frames) .unwrap_or_default(); - let mut elapsed = 0; + let mut _elapsed = 0; let mut decoder = symphonia::default::get_codecs() .make(&track.codec_params, &DecoderOptions { verify: true })?; @@ -105,7 +107,7 @@ impl SymphoniaDecoder { let mut decode_errors: usize = 0; let decoded = loop { let current_frame = probed.format.next_packet()?; - elapsed = current_frame.ts(); + _elapsed = current_frame.ts(); match decoder.decode(¤t_frame) { Ok(decoded) => break decoded, Err(e) => match e { @@ -142,7 +144,7 @@ impl SymphoniaDecoder { spec, time_base, duration, - elapsed, + elapsed: _elapsed, metadata, track, tx, @@ -259,7 +261,9 @@ impl Iterator for SymphoniaDecoder { if err.kind() == std::io::ErrorKind::UnexpectedEof && err.to_string() == "end of stream" { - self.tx.send(PlayerEngineCommand::Eos); + self.tx + .send(PlayerEngineCommand::Eos) + .unwrap_or_else(|e| warn!("Send error {}", e)); return None; } } diff --git a/audio-player/src/lib.rs b/audio-player/src/lib.rs index aca683d..0875016 100644 --- a/audio-player/src/lib.rs +++ b/audio-player/src/lib.rs @@ -4,16 +4,16 @@ mod player_engine; use std::thread; use std::time::Duration; -use anyhow::{anyhow, Result}; -use decoder::MediaInfo; +use anyhow::Result; +pub use decoder::MediaInfo; use flume::{Receiver, Sender}; pub use player_engine::PlayerMessage; use player_engine::{PlayerEngine, PlayerEngineCommand}; +use tracing::warn; // TODO: // * Emit buffering -// * Emit errors pub enum PlayerError {} @@ -35,26 +35,57 @@ impl Default for Player { loop { match rx_engine.recv() { Ok(PlayerEngineCommand::Play(source_str, tx)) => { - let res = player.play(&source_str); - tx.send(res); + tx.send(player.play(&source_str)) + .unwrap_or_else(|e| warn!("Send error {}", e)); } - Ok(PlayerEngineCommand::Pause) => { - player.pause(); + Ok(PlayerEngineCommand::Pause(tx)) => { + tx.send(player.pause()) + .unwrap_or_else(|e| warn!("Send error {}", e)); } - Ok(PlayerEngineCommand::Unpause) => { - player.unpause(); + Ok(PlayerEngineCommand::Unpause(tx)) => { + tx.send(player.unpause()) + .unwrap_or_else(|e| warn!("Send error {}", e)); } - Ok(PlayerEngineCommand::Stop) => { - player.stop(); + Ok(PlayerEngineCommand::Stop(tx)) => { + tx.send(player.stop()) + .unwrap_or_else(|e| warn!("Send error {}", e)); } - Ok(PlayerEngineCommand::TogglePlay) => { - player.toggle_play(); + Ok(PlayerEngineCommand::TogglePlay(tx)) => { + tx.send(player.toggle_play()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::Restart(tx)) => { + tx.send(player.restart()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetDuration(tx)) => { + tx.send(player.duration()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetElapsed(tx)) => { + tx.send(player.elapsed()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetVolume(tx)) => { + tx.send(player.volume()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetPaused(tx)) => { + tx.send(player.is_paused()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::SetVolume(volume, tx)) => { + tx.send(player.set_volume(volume)) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::SetElapsed(elapsed)) => { + player.handle_elapsed(elapsed); } Ok(PlayerEngineCommand::Eos) => { player.handle_eos(); } Err(e) => { - // FIXME: debug!(e); + warn!("Recv error {}", e); } } } @@ -68,61 +99,72 @@ impl Default for Player { } impl Player { - // FIXME: this could check if the player started playing using a channel - // Then it would be async (wait for Playing for example) pub async fn play(&self, source_str: &str) -> Result { let (tx, rx) = flume::bounded(1); self.tx_engine - .send(PlayerEngineCommand::Play(source_str.to_string(), tx)); - if let Ok(res) = rx.recv_async().await { - return res; - } - // FIXME: add error type - Err(anyhow!("Player channel error")) + .send(PlayerEngineCommand::Play(source_str.to_string(), tx))?; + rx.recv_async().await? } - pub async fn elpased(&self) -> Duration { - // FIXME: implement - Duration::default() + pub async fn restart(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Restart(tx))?; + rx.recv_async().await? } - pub async fn duration(&self) -> Duration { - // FIXME: implement - Duration::default() + pub async fn elpased(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetElapsed(tx))?; + rx.recv_async().await? } - pub async fn volume(&self) -> f32 { - // FIXME: implement - 0.0 + pub async fn duration(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetDuration(tx))?; + rx.recv_async().await? } - pub async fn set_volume(&self) -> Result<()> { - // FIXME: implement - Ok(()) + pub async fn volume(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?; + rx.recv_async().await? + } + + pub async fn is_paused(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetPaused(tx))?; + rx.recv_async().await? + } + + pub async fn set_volume(&self, volume: f32) -> Result { + let (tx, rx) = flume::bounded(1); + let vol = volume.clamp(0.0, 1.1); + self.tx_engine + .send(PlayerEngineCommand::SetVolume(vol, tx))?; + rx.recv_async().await? } pub async fn pause(&self) -> Result<()> { - self.tx_engine.send(PlayerEngineCommand::Pause); - Ok(()) + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Pause(tx))?; + rx.recv_async().await? } pub async fn unpause(&self) -> Result<()> { - self.tx_engine.send(PlayerEngineCommand::Unpause); - Ok(()) + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Unpause(tx))?; + rx.recv_async().await? } - pub async fn toggle_play(&self) -> Result<()> { - self.tx_engine.send(PlayerEngineCommand::TogglePlay); - Ok(()) + pub async fn toggle_play(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::TogglePlay(tx))?; + rx.recv_async().await? } pub async fn stop(&self) -> Result<()> { - self.tx_engine.send(PlayerEngineCommand::Stop); - Ok(()) - } - - pub async fn restart(&self) -> Result<()> { - // FIXME: implement - Ok(()) + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Stop(tx))?; + rx.recv_async().await? } } diff --git a/audio-player/src/player_engine.rs b/audio-player/src/player_engine.rs index 325fbe0..b68b0c6 100644 --- a/audio-player/src/player_engine.rs +++ b/audio-player/src/player_engine.rs @@ -1,35 +1,44 @@ -use flume::{Receiver, Sender}; +use flume::Sender; use std::fs::File; -use std::io::BufReader; use std::path::Path; -use std::thread; use std::time::Duration; use symphonia::core::probe::Hint; +use tracing::warn; use url::Url; use crate::decoder::{MediaInfo, SymphoniaDecoder}; use anyhow::{anyhow, Result}; -use rodio::source::{PeriodicAccess, SineWave}; -use rodio::{OutputStream, OutputStreamHandle, Sink, Source}; +use rodio::{OutputStream, Sink, Source}; use stream_download::StreamDownload; use symphonia::core::io::{ MediaSource, MediaSourceStream, MediaSourceStreamOptions, ReadOnlySource, }; +use thiserror::Error; pub enum PlayerEngineCommand { Play(String, Sender>), - Pause, - Unpause, - TogglePlay, - Stop, + SetVolume(f32, Sender>), + Pause(Sender>), + Unpause(Sender>), + TogglePlay(Sender>), + Restart(Sender>), + Stop(Sender>), + GetDuration(Sender>), + GetElapsed(Sender>), + GetVolume(Sender>), + GetPaused(Sender>), Eos, + SetElapsed(Duration), } -// FIXME: sort out media info size (probably send pointers to stuff on the heap) pub enum PlayerMessage { - // MediaInfo(MediaInfo), - Duration(Duration), - Elapsed(Duration), + Duration { + duration: Duration, + }, + Elapsed { + duration: Duration, + elapsed: Duration, + }, Stopped, Paused, Playing, @@ -40,15 +49,29 @@ pub enum PlayerMessage { // * Emit buffering pub struct PlayerEngine { + elapsed: Duration, + // FIXME: We only need this to re-start a track + // Might do that using seeking in the future + current_source: Option, + media_info: Option, sink: Option, stream: Option, tx_engine: Sender, tx_player: Sender, } +#[derive(Debug, Error)] +pub enum PlayerEngineError { + #[error("Sink is not playing")] + NotPlaying, +} + impl PlayerEngine { pub fn new(tx_engine: Sender, tx_player: Sender) -> Self { Self { + current_source: None, + media_info: None, + elapsed: Duration::default(), sink: None, stream: None, tx_engine, @@ -58,20 +81,33 @@ impl PlayerEngine { pub fn play(&mut self, source_str: &str) -> Result { let tx_player = self.tx_player.clone(); + let tx_engine = self.tx_engine.clone(); let (stream, handle) = OutputStream::try_default()?; - let mut sink = Sink::try_new(&handle)?; + let sink = Sink::try_new(&handle)?; let (source, hint) = self.get_source(source_str)?; let mss = MediaSourceStream::new(source, MediaSourceStreamOptions::default()); let decoder = SymphoniaDecoder::new(mss, hint, self.tx_engine.clone())?; + let media_info = decoder.media_info(); + let media_info_copy = media_info.clone(); + let duration = media_info.duration.unwrap_or_default(); - tx_player.send(PlayerMessage::Duration( - media_info.duration.unwrap_or_default(), - )); + self.media_info = Some(media_info); + tx_player + .send(PlayerMessage::Duration { duration }) + .unwrap_or_else(|e| warn!("Send error {}", e)); + + // FIXME: regularly update metadata revision let decoder = decoder.periodic_access(Duration::from_millis(250), move |src| { - tx_player.send(PlayerMessage::Elapsed(src.elapsed())); + let elapsed = src.elapsed(); + tx_engine + .send(PlayerEngineCommand::SetElapsed(elapsed)) + .unwrap_or_else(|e| warn!("Send error {}", e)); + tx_player + .send(PlayerMessage::Elapsed { elapsed, duration }) + .unwrap_or_else(|e| warn!("Send error {}", e)); }); sink.append(decoder); @@ -80,65 +116,131 @@ impl PlayerEngine { // The sink is used to control the stream self.sink = Some(sink); - self.tx_player.send(PlayerMessage::Playing); + self.tx_player + .send(PlayerMessage::Playing) + .unwrap_or_else(|e| warn!("Send error {}", e)); - Ok(media_info) + Ok(media_info_copy) } - pub fn pause(&mut self) { + pub fn restart(&mut self) -> Result { + if let Some(source) = self.current_source.clone() { + self.reset()?; + return self.play(&source); + } + Err(PlayerEngineError::NotPlaying.into()) + } + + pub fn pause(&mut self) -> Result<()> { if let Some(sink) = &self.sink { sink.pause(); - self.tx_player.send(PlayerMessage::Paused); + self.tx_player + .send(PlayerMessage::Paused) + .unwrap_or_else(|e| warn!("Send error {}", e)); + return Ok(()); } + Err(PlayerEngineError::NotPlaying.into()) } - pub fn unpause(&mut self) { + pub fn unpause(&mut self) -> Result<()> { if let Some(sink) = &self.sink { sink.play(); - self.tx_player.send(PlayerMessage::Playing); + self.tx_player + .send(PlayerMessage::Playing) + .unwrap_or_else(|e| warn!("Send error {}", e)); + return Ok(()); } + Err(PlayerEngineError::NotPlaying.into()) } - pub fn toggle_play(&mut self) { - if self.is_stopped() { - return; - } - if self.is_paused() { - self.unpause(); - } else { - self.pause(); - } - } - - pub fn stop(&mut self) { + pub fn toggle_play(&mut self) -> Result { if let Some(sink) = &self.sink { - sink.stop(); - self.sink.take(); - self.stream.take(); - self.tx_player.send(PlayerMessage::Stopped); + if sink.is_paused() { + sink.play(); + return Ok(true); + } else { + sink.pause(); + return Ok(false); + } } + Err(PlayerEngineError::NotPlaying.into()) } - pub fn handle_eos(&mut self) { - if let Some(sink) = &self.sink { - sink.stop(); - self.sink.take(); - self.stream.take(); - self.tx_player.send(PlayerMessage::EndOfStream); - } + pub fn stop(&mut self) -> Result<()> { + self.reset()?; + self.tx_player + .send(PlayerMessage::Stopped) + .unwrap_or_else(|e| warn!("Send error {}", e)); + Ok(()) } - pub fn is_paused(&self) -> bool { + pub fn is_paused(&self) -> Result { self.sink .as_ref() - .map(|s| s.is_paused()) - .unwrap_or_default() + .map_or(Err(PlayerEngineError::NotPlaying.into()), |s| { + Ok(s.is_paused()) + }) } pub fn is_stopped(&self) -> bool { self.sink.is_none() } + pub fn duration(&self) -> Result { + self.media_info + .as_ref() + .map_or(Err(PlayerEngineError::NotPlaying.into()), |m| { + Ok(m.duration.unwrap_or_default()) + }) + } + + pub fn elapsed(&self) -> Result { + if self.is_stopped() { + return Err(PlayerEngineError::NotPlaying.into()); + } + Ok(self.elapsed) + } + + pub fn volume(&self) -> Result { + self.sink.as_ref().map_or( + Err(PlayerEngineError::NotPlaying.into()), + |s| Ok(s.volume()), + ) + } + + pub fn set_volume(&mut self, volume: f32) -> Result { + if let Some(sink) = &self.sink { + sink.set_volume(volume); + return Ok(sink.volume()); + } + Err(PlayerEngineError::NotPlaying.into()) + } + + pub fn handle_eos(&mut self) { + self.reset().unwrap_or_else(|e| { + warn!("Sink error {}", e); + }); + self.tx_player + .send(PlayerMessage::EndOfStream) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + + pub fn handle_elapsed(&mut self, elapsed: Duration) { + self.elapsed = elapsed; + } + + fn reset(&mut self) -> Result<()> { + self.elapsed = Duration::default(); + self.current_source = None; + if let Some(sink) = &self.sink { + sink.stop(); + self.sink.take(); + self.stream.take(); + return Ok(()); + } + Err(PlayerEngineError::NotPlaying.into()) + } + fn get_source(&self, source_str: &str) -> Result<(Box, Hint)> { match Url::parse(source_str) { Ok(url) => {