diff --git a/Cargo.lock b/Cargo.lock index d7031ac..d6855a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,6 +175,7 @@ name = "audio-player" version = "0.1.0" dependencies = [ "anyhow", + "flume", "rodio", "stream-download", "symphonia", diff --git a/audio-player/Cargo.toml b/audio-player/Cargo.toml index 107b265..77877bf 100644 --- a/audio-player/Cargo.toml +++ b/audio-player/Cargo.toml @@ -9,3 +9,4 @@ symphonia = { version = "0.5.3", features = ["all"] } stream-download = { git = "https://github.com/aschey/stream-download-rs.git" } anyhow = "1.0.71" url = "2.4.0" +flume = "0.10.14" diff --git a/audio-player/examples/basic.rs b/audio-player/examples/basic.rs new file mode 100644 index 0000000..bcdae67 --- /dev/null +++ b/audio-player/examples/basic.rs @@ -0,0 +1,26 @@ +use audio_player::{Player, PlayerMessage}; + +fn main() { + let player = Player::default(); + + player.play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav"); + + loop { + match player.messages.recv() { + Ok(PlayerMessage::Duration(duration)) => { + println!("DURATION: {:?}", duration); + } + Ok(PlayerMessage::Elapsed(el)) => { + println!("ELAPSED: {:?}", el); + if el.as_secs() >= 10 { + player.stop(); + } + } + Ok(PlayerMessage::Stopped) => { + println!("STOPPED"); + break; + } + _ => {} + } + } +} diff --git a/audio-player/src/decoder.rs b/audio-player/src/decoder.rs index 3249a46..6b0c4c3 100644 --- a/audio-player/src/decoder.rs +++ b/audio-player/src/decoder.rs @@ -1,3 +1,4 @@ +use flume::Sender; use std::error::Error; use std::fmt; use std::time::Duration; @@ -17,6 +18,8 @@ use symphonia::{ use rodio::Source; +use crate::PlayerEngineCommand; + // Decoder errors are not considered fatal. // The correct action is to just get a new packet and try again. // But a decode error in more than 3 consecutive packets is fatal. @@ -35,15 +38,20 @@ pub struct SymphoniaDecoder { buffer: SampleBuffer, spec: SignalSpec, time_base: Option, - duration: Option, + duration: u64, elapsed: u64, metadata: Option, track: Track, + tx: Sender, } impl SymphoniaDecoder { - pub fn new(mss: MediaSourceStream, hint: Hint) -> Result { - match SymphoniaDecoder::init(mss, hint) { + pub fn new( + mss: MediaSourceStream, + hint: Hint, + tx: Sender, + ) -> Result { + match SymphoniaDecoder::init(mss, hint, tx) { Err(e) => match e { SymphoniaError::IoError(e) => Err(DecoderError::IoError(e.to_string())), SymphoniaError::DecodeError(e) => Err(DecoderError::DecodeError(e)), @@ -66,6 +74,7 @@ impl SymphoniaDecoder { fn init( mss: MediaSourceStream, hint: Hint, + tx: Sender, ) -> symphonia::core::errors::Result> { let format_opts: FormatOptions = FormatOptions { enable_gapless: true, @@ -82,20 +91,12 @@ impl SymphoniaDecoder { let time_base = track.codec_params.time_base; - let dur = track + let duration = track .codec_params .n_frames .map(|frames| track.codec_params.start_ts + frames) .unwrap_or_default(); - let duration = match time_base { - Some(tb) => { - let time = tb.calc_time(dur); - Some(Duration::from_secs_f64(time.seconds as f64 + time.frac)) - } - None => None, - }; - let mut elapsed = 0; let mut decoder = symphonia::default::get_codecs() @@ -144,13 +145,14 @@ impl SymphoniaDecoder { elapsed, metadata, track, + tx, })) } #[inline] pub fn media_info(&self) -> MediaInfo { MediaInfo { - duration: self.duration, + duration: self.total_duration(), metadata: self.metadata.clone(), track: self.track.clone(), } @@ -217,7 +219,13 @@ impl Source for SymphoniaDecoder { #[inline] fn total_duration(&self) -> Option { - self.duration + match self.time_base { + Some(tb) => { + let time = tb.calc_time(self.duration); + Some(Duration::from_secs_f64(time.seconds as f64 + time.frac)) + } + None => None, + } } } @@ -247,6 +255,14 @@ impl Iterator for SymphoniaDecoder { }, } } + Err(SymphoniaError::IoError(err)) => { + if err.kind() == std::io::ErrorKind::UnexpectedEof + && err.to_string() == "end of stream" + { + self.tx.send(PlayerEngineCommand::Eos); + return None; + } + } Err(_) => return None, } }; diff --git a/audio-player/src/lib.rs b/audio-player/src/lib.rs new file mode 100644 index 0000000..70e94d8 --- /dev/null +++ b/audio-player/src/lib.rs @@ -0,0 +1,89 @@ +mod decoder; +mod player_engine; + +use std::thread; +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use flume::{Receiver, Sender}; + +pub use player_engine::PlayerMessage; +use player_engine::{PlayerEngine, PlayerEngineCommand}; + +// TODO: +// * Emit buffering +// * Emit errors + +pub struct Player { + pub messages: Receiver, + tx_engine: Sender, +} + +impl Default for Player { + fn default() -> Self { + let (tx_engine, rx_engine) = flume::bounded(10); + let (tx_player, messages): (Sender, Receiver) = + flume::bounded(10); + + let tx_decoder = tx_engine.clone(); + + thread::spawn(move || { + let mut player = PlayerEngine::new(tx_decoder, tx_player); + loop { + match rx_engine.recv() { + Ok(PlayerEngineCommand::Play(source_str)) => { + player.play(&source_str); + } + Ok(PlayerEngineCommand::Pause) => { + player.pause(); + } + Ok(PlayerEngineCommand::Unpause) => { + player.unpause(); + } + Ok(PlayerEngineCommand::Stop) => { + player.stop(); + } + Ok(PlayerEngineCommand::TogglePlay) => { + player.toggle_play(); + } + Ok(PlayerEngineCommand::Eos) => { + player.stop(); + } + Err(e) => { + // FIXME: debug!(e); + } + } + } + }); + + Self { + messages, + tx_engine, + } + } +} + +impl Player { + // FIXME: this could check if the player started playing using a channel + // Then it would be async (wait for Playing for example) + pub fn play(&self, source_str: &str) { + self.tx_engine + .send(PlayerEngineCommand::Play(source_str.to_string())); + } + + pub fn pause(&self) { + self.tx_engine.send(PlayerEngineCommand::Pause); + } + + pub fn unpause(&self) { + self.tx_engine.send(PlayerEngineCommand::Unpause); + } + + pub fn toggle_play(&self) { + self.tx_engine.send(PlayerEngineCommand::TogglePlay); + } + + pub fn stop(&self) { + self.tx_engine.send(PlayerEngineCommand::Stop); + } +} diff --git a/audio-player/src/main.rs b/audio-player/src/player_engine.rs similarity index 65% rename from audio-player/src/main.rs rename to audio-player/src/player_engine.rs index 0dbd36a..42a95f6 100644 --- a/audio-player/src/main.rs +++ b/audio-player/src/player_engine.rs @@ -1,5 +1,4 @@ -mod decoder; - +use flume::{Receiver, Sender}; use std::fs::File; use std::io::BufReader; use std::path::Path; @@ -8,8 +7,8 @@ use std::time::Duration; use symphonia::core::probe::Hint; use url::Url; +use crate::decoder::{MediaInfo, SymphoniaDecoder}; use anyhow::{anyhow, Result}; -use decoder::SymphoniaDecoder; use rodio::source::{PeriodicAccess, SineWave}; use rodio::{OutputStream, OutputStreamHandle, Sink, Source}; use stream_download::StreamDownload; @@ -17,23 +16,42 @@ use symphonia::core::io::{ MediaSource, MediaSourceStream, MediaSourceStreamOptions, ReadOnlySource, }; -struct Player { - sink: Option, - stream: Option, +pub enum PlayerEngineCommand { + Play(String), + Pause, + Unpause, + TogglePlay, + Stop, + Eos, +} + +// FIXME: sort out media info size (probably send pointers to stuff on the heap) +pub enum PlayerMessage { + // MediaInfo(MediaInfo), + Duration(Duration), + Elapsed(Duration), + Stopped, + Paused, + Playing, } // TODO: -// * Emit Metadata -// * Emit duration -// * Emit track data -// * Emit EOS // * Emit buffering -impl Player { - pub fn default() -> Self { +pub struct PlayerEngine { + sink: Option, + stream: Option, + tx_engine: Sender, + tx_player: Sender, +} + +impl PlayerEngine { + pub fn new(tx_engine: Sender, tx_player: Sender) -> Self { Self { sink: None, stream: None, + tx_engine, + tx_player, } } @@ -43,16 +61,18 @@ impl Player { let (source, hint) = self.get_source(source_str)?; let mss = MediaSourceStream::new(source, MediaSourceStreamOptions::default()); - let decoder = SymphoniaDecoder::new(mss, hint)?; + let tx_player = self.tx_player.clone(); + + let decoder = SymphoniaDecoder::new(mss, hint, self.tx_engine.clone())?; let media_info = decoder.media_info(); + tx_player.send(PlayerMessage::Duration( + media_info.duration.unwrap_or_default(), + )); + // tx_player.send(PlayerEngineMessage::MediaInfo(media_info)); - let decoder = decoder.periodic_access(Duration::from_millis(500), |src| { - println!("ELAPSED: {:?}", src.elapsed()); - - if src.elapsed().as_secs() > 10 { - src.seek(Duration::from_secs(2)); - } + let decoder = decoder.periodic_access(Duration::from_millis(250), move |src| { + tx_player.send(PlayerMessage::Elapsed(src.elapsed())); }); sink.append(decoder); @@ -62,18 +82,33 @@ impl Player { // The sink is used to control the stream self.sink = Some(sink); + self.tx_player.send(PlayerMessage::Playing); + Ok(()) } pub fn pause(&mut self) { if let Some(sink) = &self.sink { sink.pause(); + self.tx_player.send(PlayerMessage::Paused); } } pub fn unpause(&mut self) { if let Some(sink) = &self.sink { sink.play(); + self.tx_player.send(PlayerMessage::Playing); + } + } + + pub fn toggle_play(&mut self) { + if self.is_stopped() { + return; + } + if self.is_paused() { + self.unpause(); + } else { + self.pause(); } } @@ -82,15 +117,15 @@ impl Player { sink.stop(); self.sink.take(); self.stream.take(); + self.tx_player.send(PlayerMessage::Stopped); } } - pub fn is_playing(&self) -> bool { - self.sink.as_ref().map(|s| !s.is_paused()).unwrap_or(false) - } - pub fn is_paused(&self) -> bool { - self.sink.as_ref().map(|s| s.is_paused()).unwrap_or(false) + self.sink + .as_ref() + .map(|s| s.is_paused()) + .unwrap_or_default() } pub fn is_stopped(&self) -> bool { @@ -130,12 +165,3 @@ impl Player { hint } } - -fn main() { - let mut player = Player::default(); - player.play("./Slip.m4a"); - - thread::sleep(Duration::from_millis(5000)); - - player.stop(); -}