diff --git a/audio-player/examples/basic.rs b/audio-player/examples/basic.rs index bc4b1b8..7ddb628 100644 --- a/audio-player/examples/basic.rs +++ b/audio-player/examples/basic.rs @@ -1,23 +1,26 @@ +use std::{thread, time::Duration}; + use audio_player::{Player, PlayerMessage}; #[tokio::main] async fn main() { let player = Player::default(); + let messages = player.messages.clone(); - player - .play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav") - .await; - - loop { - match player.messages.recv_async().await { + // Make sure we read all the messages in time + thread::spawn(move || loop { + match messages.recv() { + Ok(PlayerMessage::Playing) => { + println!("PLAYING NEW TRACK"); + } Ok(PlayerMessage::Duration { duration }) => { println!("DURATION: {:?}", duration); } - Ok(PlayerMessage::Elapsed { duration, elapsed }) => { + Ok(PlayerMessage::Elapsed { + duration: _, + elapsed, + }) => { println!("ELAPSED: {:?}", elapsed); - if elapsed.as_secs() >= 10 { - player.stop().await; - } } Ok(PlayerMessage::Stopped) => { println!("STOPPED"); @@ -25,5 +28,19 @@ async fn main() { } _ => {} } - } + }); + + player + .play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav") + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(5)).await; + + player + .play("https://www2.cs.uic.edu/~i101/SoundFiles/PinkPanther60.wav") + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(60)).await; } diff --git a/audio-player/src/player.rs b/audio-player/src/player.rs index 5456235..467ff2a 100644 --- a/audio-player/src/player.rs +++ b/audio-player/src/player.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Result; use flume::{Receiver, Sender}; -use tracing::warn; +use tracing::{error, warn}; use crate::decoder::MediaInfo; use crate::player_engine::{PlayerEngine, PlayerEngineCommand, PlayerMessage}; @@ -27,7 +27,14 @@ impl Default for Player { let tx_decoder = tx_engine.clone(); thread::spawn(move || { - let mut player = PlayerEngine::new(tx_decoder, tx_player); + let mut player = match PlayerEngine::init(tx_decoder, tx_player) { + Err(e) => { + error!("Could not initialize player: {}", e); + return; + } + Ok(engine) => engine, + }; + loop { match rx_engine.recv() { Ok(PlayerEngineCommand::Play(source_str, tx)) => { @@ -123,7 +130,7 @@ impl Player { pub async fn volume(&self) -> Result { let (tx, rx) = flume::bounded(1); self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?; - rx.recv_async().await? + Ok(rx.recv_async().await?) } pub async fn is_paused(&self) -> Result { @@ -136,7 +143,7 @@ impl Player { let (tx, rx) = flume::bounded(1); self.tx_engine .send(PlayerEngineCommand::SetVolume(volume, tx))?; - rx.recv_async().await? + Ok(rx.recv_async().await?) } pub async fn pause(&self) -> Result<()> { diff --git a/audio-player/src/player_engine.rs b/audio-player/src/player_engine.rs index 7cc95f0..fc98193 100644 --- a/audio-player/src/player_engine.rs +++ b/audio-player/src/player_engine.rs @@ -17,7 +17,7 @@ use thiserror::Error; pub enum PlayerEngineCommand { Play(String, Sender>), - SetVolume(f32, Sender>), + SetVolume(f32, Sender), Pause(Sender>), Unpause(Sender>), TogglePlay(Sender>), @@ -25,7 +25,7 @@ pub enum PlayerEngineCommand { Stop(Sender>), GetDuration(Sender>), GetElapsed(Sender>), - GetVolume(Sender>), + GetVolume(Sender), GetPaused(Sender>), Eos, SetElapsed(Duration), @@ -54,8 +54,9 @@ pub struct PlayerEngine { // Might do that using seeking in the future current_source: Option, media_info: Option, - sink: Option, - stream: Option, + sink: Sink, + // We need to keep the stream around as it will stop playing when it's dropped + _stream: OutputStream, tx_engine: Sender, tx_player: Sender, } @@ -67,24 +68,31 @@ pub enum PlayerEngineError { } impl PlayerEngine { - pub fn new(tx_engine: Sender, tx_player: Sender) -> Self { - Self { + pub fn init( + tx_engine: Sender, + tx_player: Sender, + ) -> Result { + let (_stream, handle) = OutputStream::try_default()?; + let sink = Sink::try_new(&handle)?; + Ok(Self { current_source: None, media_info: None, elapsed: Duration::default(), - sink: None, - stream: None, + sink, + _stream, tx_engine, tx_player, - } + }) } pub fn play(&mut self, source_str: &str) -> Result { let tx_player = self.tx_player.clone(); let tx_engine = self.tx_engine.clone(); - let (stream, handle) = OutputStream::try_default()?; - let sink = Sink::try_new(&handle)?; + if !self.sink.empty() { + self.reset(); + } + let (source, hint) = self.get_source(source_str)?; let mss = MediaSourceStream::new(source, MediaSourceStreamOptions::default()); let decoder = SymphoniaDecoder::new(mss, hint, self.tx_engine.clone())?; @@ -110,12 +118,9 @@ impl PlayerEngine { .send(PlayerMessage::Elapsed { elapsed, duration }) .unwrap_or_else(|e| warn!("Send error {}", e)); }); - sink.append(decoder); - // We need to keep the stream around, otherwise it gets dropped outside of this scope - self.stream = Some(stream); - // The sink is used to control the stream - self.sink = Some(sink); + self.sink.append(decoder); + self.sink.play(); self.tx_player .send(PlayerMessage::Playing) @@ -126,49 +131,51 @@ impl PlayerEngine { pub fn restart(&mut self) -> Result { if let Some(source) = self.current_source.clone() { - self.reset()?; return self.play(&source); } Err(PlayerEngineError::NotPlaying.into()) } pub fn pause(&mut self) -> Result<()> { - if let Some(sink) = &self.sink { - sink.pause(); - self.tx_player - .send(PlayerMessage::Paused) - .unwrap_or_else(|e| warn!("Send error {}", e)); - return Ok(()); + if self.is_stopped() { + return Err(PlayerEngineError::NotPlaying.into()); } - Err(PlayerEngineError::NotPlaying.into()) + self.sink.pause(); + self.tx_player + .send(PlayerMessage::Paused) + .unwrap_or_else(|e| warn!("Send error {}", e)); + Ok(()) } pub fn unpause(&mut self) -> Result<()> { - if let Some(sink) = &self.sink { - sink.play(); - self.tx_player - .send(PlayerMessage::Playing) - .unwrap_or_else(|e| warn!("Send error {}", e)); - return Ok(()); + if self.is_stopped() { + return Err(PlayerEngineError::NotPlaying.into()); } - Err(PlayerEngineError::NotPlaying.into()) + self.sink.play(); + self.tx_player + .send(PlayerMessage::Playing) + .unwrap_or_else(|e| warn!("Send error {}", e)); + Ok(()) } pub fn toggle_play(&mut self) -> Result { - if let Some(sink) = &self.sink { - if sink.is_paused() { - sink.play(); - return Ok(true); - } else { - sink.pause(); - return Ok(false); - } + if self.is_stopped() { + return Err(PlayerEngineError::NotPlaying.into()); + } + if self.sink.is_paused() { + self.sink.play(); + Ok(true) + } else { + self.sink.pause(); + Ok(false) } - Err(PlayerEngineError::NotPlaying.into()) } pub fn stop(&mut self) -> Result<()> { - self.reset()?; + if self.is_stopped() { + return Err(PlayerEngineError::NotPlaying.into()); + } + self.reset(); self.tx_player .send(PlayerMessage::Stopped) .unwrap_or_else(|e| warn!("Send error {}", e)); @@ -176,15 +183,14 @@ impl PlayerEngine { } pub fn is_paused(&self) -> Result { - self.sink - .as_ref() - .map_or(Err(PlayerEngineError::NotPlaying.into()), |s| { - Ok(s.is_paused()) - }) + if self.is_stopped() { + return Err(PlayerEngineError::NotPlaying.into()); + } + Ok(self.sink.is_paused()) } pub fn is_stopped(&self) -> bool { - self.sink.is_none() + self.sink.len() == 0 } pub fn duration(&self) -> Result { @@ -202,25 +208,17 @@ impl PlayerEngine { Ok(self.elapsed) } - pub fn volume(&self) -> Result { - self.sink.as_ref().map_or( - Err(PlayerEngineError::NotPlaying.into()), - |s| Ok(s.volume()), - ) + pub fn volume(&self) -> f32 { + self.sink.volume() } - pub fn set_volume(&mut self, volume: f32) -> Result { - if let Some(sink) = &self.sink { - sink.set_volume(volume.clamp(0.0, 1.1)); - return Ok(sink.volume()); - } - Err(PlayerEngineError::NotPlaying.into()) + pub fn set_volume(&mut self, volume: f32) -> f32 { + self.sink.set_volume(volume.clamp(0.0, 1.1)); + self.sink.volume() } pub fn handle_eos(&mut self) { - self.reset().unwrap_or_else(|e| { - warn!("Sink error {}", e); - }); + self.reset(); self.tx_player .send(PlayerMessage::EndOfStream) .unwrap_or_else(|e| warn!("Send error {}", e)); @@ -230,16 +228,11 @@ impl PlayerEngine { self.elapsed = elapsed; } - fn reset(&mut self) -> Result<()> { + fn reset(&mut self) { self.elapsed = Duration::default(); self.current_source = None; - if let Some(sink) = &self.sink { - sink.stop(); - self.sink.take(); - self.stream.take(); - return Ok(()); - } - Err(PlayerEngineError::NotPlaying.into()) + self.sink.pause(); + self.sink.clear(); } fn get_source(&self, source_str: &str) -> Result<(Box, Hint)> {