diff --git a/Cargo.lock b/Cargo.lock index 4b72490..bf65bac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,6 +180,7 @@ dependencies = [ "stream-download", "symphonia", "thiserror", + "tokio", "tracing", "url", ] diff --git a/audio-player/Cargo.toml b/audio-player/Cargo.toml index b3336e2..3944354 100644 --- a/audio-player/Cargo.toml +++ b/audio-player/Cargo.toml @@ -4,7 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -rodio = { version = "0.17.1", default-features = false, features = ["symphonia-all"] } +rodio = { version = "0.17.1", default-features = false, features = [ + "symphonia-all", +] } symphonia = { version = "0.5.3", features = ["all"] } stream-download = { git = "https://github.com/aschey/stream-download-rs.git" } anyhow = "1.0.71" @@ -12,3 +14,6 @@ url = "2.4.0" flume = "0.10.14" thiserror = "1.0.40" tracing = "0.1.37" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } diff --git a/audio-player/examples/basic.rs b/audio-player/examples/basic.rs index bcdae67..bc4b1b8 100644 --- a/audio-player/examples/basic.rs +++ b/audio-player/examples/basic.rs @@ -1,19 +1,22 @@ use audio_player::{Player, PlayerMessage}; -fn main() { +#[tokio::main] +async fn main() { let player = Player::default(); - player.play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav"); + player + .play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav") + .await; loop { - match player.messages.recv() { - Ok(PlayerMessage::Duration(duration)) => { + match player.messages.recv_async().await { + Ok(PlayerMessage::Duration { duration }) => { println!("DURATION: {:?}", duration); } - Ok(PlayerMessage::Elapsed(el)) => { - println!("ELAPSED: {:?}", el); - if el.as_secs() >= 10 { - player.stop(); + Ok(PlayerMessage::Elapsed { duration, elapsed }) => { + println!("ELAPSED: {:?}", elapsed); + if elapsed.as_secs() >= 10 { + player.stop().await; } } Ok(PlayerMessage::Stopped) => { diff --git a/audio-player/src/decoder.rs b/audio-player/src/decoder.rs index f388e14..8a5dcc6 100644 --- a/audio-player/src/decoder.rs +++ b/audio-player/src/decoder.rs @@ -1,7 +1,9 @@ -use flume::Sender; use std::error::Error; use std::fmt; use std::time::Duration; + +use flume::Sender; +use rodio::Source; use symphonia::{ core::{ audio::{AudioBufferRef, SampleBuffer, SignalSpec}, @@ -17,9 +19,7 @@ use symphonia::{ }; use tracing::warn; -use rodio::Source; - -use crate::PlayerEngineCommand; +use crate::player_engine::PlayerEngineCommand; // Decoder errors are not considered fatal. // The correct action is to just get a new packet and try again. diff --git a/audio-player/src/lib.rs b/audio-player/src/lib.rs index 0875016..c054884 100644 --- a/audio-player/src/lib.rs +++ b/audio-player/src/lib.rs @@ -1,170 +1,10 @@ mod decoder; +mod player; mod player_engine; -use std::thread; -use std::time::Duration; - -use anyhow::Result; pub use decoder::MediaInfo; -use flume::{Receiver, Sender}; - +pub use player::{Player, PlayerError}; pub use player_engine::PlayerMessage; -use player_engine::{PlayerEngine, PlayerEngineCommand}; -use tracing::warn; -// TODO: -// * Emit buffering -pub enum PlayerError {} -pub struct Player { - pub messages: Receiver, - tx_engine: Sender, -} - -impl Default for Player { - fn default() -> Self { - let (tx_engine, rx_engine) = flume::bounded(10); - let (tx_player, messages): (Sender, Receiver) = - flume::bounded(10); - - let tx_decoder = tx_engine.clone(); - - thread::spawn(move || { - let mut player = PlayerEngine::new(tx_decoder, tx_player); - loop { - match rx_engine.recv() { - Ok(PlayerEngineCommand::Play(source_str, tx)) => { - tx.send(player.play(&source_str)) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::Pause(tx)) => { - tx.send(player.pause()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::Unpause(tx)) => { - tx.send(player.unpause()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::Stop(tx)) => { - tx.send(player.stop()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::TogglePlay(tx)) => { - tx.send(player.toggle_play()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::Restart(tx)) => { - tx.send(player.restart()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::GetDuration(tx)) => { - tx.send(player.duration()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::GetElapsed(tx)) => { - tx.send(player.elapsed()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::GetVolume(tx)) => { - tx.send(player.volume()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::GetPaused(tx)) => { - tx.send(player.is_paused()) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::SetVolume(volume, tx)) => { - tx.send(player.set_volume(volume)) - .unwrap_or_else(|e| warn!("Send error {}", e)); - } - Ok(PlayerEngineCommand::SetElapsed(elapsed)) => { - player.handle_elapsed(elapsed); - } - Ok(PlayerEngineCommand::Eos) => { - player.handle_eos(); - } - Err(e) => { - warn!("Recv error {}", e); - } - } - } - }); - - Self { - messages, - tx_engine, - } - } -} - -impl Player { - pub async fn play(&self, source_str: &str) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine - .send(PlayerEngineCommand::Play(source_str.to_string(), tx))?; - rx.recv_async().await? - } - - pub async fn restart(&self) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::Restart(tx))?; - rx.recv_async().await? - } - - pub async fn elpased(&self) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::GetElapsed(tx))?; - rx.recv_async().await? - } - - pub async fn duration(&self) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::GetDuration(tx))?; - rx.recv_async().await? - } - - pub async fn volume(&self) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?; - rx.recv_async().await? - } - - pub async fn is_paused(&self) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::GetPaused(tx))?; - rx.recv_async().await? - } - - pub async fn set_volume(&self, volume: f32) -> Result { - let (tx, rx) = flume::bounded(1); - let vol = volume.clamp(0.0, 1.1); - self.tx_engine - .send(PlayerEngineCommand::SetVolume(vol, tx))?; - rx.recv_async().await? - } - - pub async fn pause(&self) -> Result<()> { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::Pause(tx))?; - rx.recv_async().await? - } - - pub async fn unpause(&self) -> Result<()> { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::Unpause(tx))?; - rx.recv_async().await? - } - - pub async fn toggle_play(&self) -> Result { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::TogglePlay(tx))?; - rx.recv_async().await? - } - - pub async fn stop(&self) -> Result<()> { - let (tx, rx) = flume::bounded(1); - self.tx_engine.send(PlayerEngineCommand::Stop(tx))?; - rx.recv_async().await? - } -} diff --git a/audio-player/src/player.rs b/audio-player/src/player.rs new file mode 100644 index 0000000..562f496 --- /dev/null +++ b/audio-player/src/player.rs @@ -0,0 +1,166 @@ +use std::thread; +use std::time::Duration; + +use anyhow::Result; +use flume::{Receiver, Sender}; +use tracing::warn; + +use crate::decoder::MediaInfo; +use crate::player_engine::{PlayerEngine, PlayerEngineCommand, PlayerMessage}; + +// TODO: +// * Emit buffering + +pub enum PlayerError {} + +pub struct Player { + pub messages: Receiver, + tx_engine: Sender, +} + +impl Default for Player { + fn default() -> Self { + let (tx_engine, rx_engine) = flume::bounded(10); + let (tx_player, messages): (Sender, Receiver) = + flume::bounded(10); + + let tx_decoder = tx_engine.clone(); + + thread::spawn(move || { + let mut player = PlayerEngine::new(tx_decoder, tx_player); + loop { + match rx_engine.recv() { + Ok(PlayerEngineCommand::Play(source_str, tx)) => { + tx.send(player.play(&source_str)) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::Pause(tx)) => { + tx.send(player.pause()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::Unpause(tx)) => { + tx.send(player.unpause()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::Stop(tx)) => { + tx.send(player.stop()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::TogglePlay(tx)) => { + tx.send(player.toggle_play()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::Restart(tx)) => { + tx.send(player.restart()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetDuration(tx)) => { + tx.send(player.duration()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetElapsed(tx)) => { + tx.send(player.elapsed()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetVolume(tx)) => { + tx.send(player.volume()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::GetPaused(tx)) => { + tx.send(player.is_paused()) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::SetVolume(volume, tx)) => { + tx.send(player.set_volume(volume)) + .unwrap_or_else(|e| warn!("Send error {}", e)); + } + Ok(PlayerEngineCommand::SetElapsed(elapsed)) => { + player.handle_elapsed(elapsed); + } + Ok(PlayerEngineCommand::Eos) => { + player.handle_eos(); + } + Err(e) => { + warn!("Recv error {}", e); + } + } + } + }); + + Self { + messages, + tx_engine, + } + } +} + +impl Player { + pub async fn play(&self, source_str: &str) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine + .send(PlayerEngineCommand::Play(source_str.to_string(), tx))?; + rx.recv_async().await? + } + + pub async fn restart(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Restart(tx))?; + rx.recv_async().await? + } + + pub async fn elpased(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetElapsed(tx))?; + rx.recv_async().await? + } + + pub async fn duration(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetDuration(tx))?; + rx.recv_async().await? + } + + pub async fn volume(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?; + rx.recv_async().await? + } + + pub async fn is_paused(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::GetPaused(tx))?; + rx.recv_async().await? + } + + pub async fn set_volume(&self, volume: f32) -> Result { + let (tx, rx) = flume::bounded(1); + let vol = volume.clamp(0.0, 1.1); + self.tx_engine + .send(PlayerEngineCommand::SetVolume(vol, tx))?; + rx.recv_async().await? + } + + pub async fn pause(&self) -> Result<()> { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Pause(tx))?; + rx.recv_async().await? + } + + pub async fn unpause(&self) -> Result<()> { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Unpause(tx))?; + rx.recv_async().await? + } + + pub async fn toggle_play(&self) -> Result { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::TogglePlay(tx))?; + rx.recv_async().await? + } + + pub async fn stop(&self) -> Result<()> { + let (tx, rx) = flume::bounded(1); + self.tx_engine.send(PlayerEngineCommand::Stop(tx))?; + rx.recv_async().await? + } +}