Re-use sink when playing new track

This commit is contained in:
chmanie 2023-06-08 18:13:02 +02:00
parent a16da8a400
commit 9935161b17
3 changed files with 101 additions and 84 deletions

View File

@ -1,23 +1,26 @@
use std::{thread, time::Duration};
use audio_player::{Player, PlayerMessage}; use audio_player::{Player, PlayerMessage};
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let player = Player::default(); let player = Player::default();
let messages = player.messages.clone();
player // Make sure we read all the messages in time
.play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav") thread::spawn(move || loop {
.await; match messages.recv() {
Ok(PlayerMessage::Playing) => {
loop { println!("PLAYING NEW TRACK");
match player.messages.recv_async().await { }
Ok(PlayerMessage::Duration { duration }) => { Ok(PlayerMessage::Duration { duration }) => {
println!("DURATION: {:?}", duration); println!("DURATION: {:?}", duration);
} }
Ok(PlayerMessage::Elapsed { duration, elapsed }) => { Ok(PlayerMessage::Elapsed {
duration: _,
elapsed,
}) => {
println!("ELAPSED: {:?}", elapsed); println!("ELAPSED: {:?}", elapsed);
if elapsed.as_secs() >= 10 {
player.stop().await;
}
} }
Ok(PlayerMessage::Stopped) => { Ok(PlayerMessage::Stopped) => {
println!("STOPPED"); println!("STOPPED");
@ -25,5 +28,19 @@ async fn main() {
} }
_ => {} _ => {}
} }
} });
player
.play("https://www2.cs.uic.edu/~i101/SoundFiles/CantinaBand60.wav")
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
player
.play("https://www2.cs.uic.edu/~i101/SoundFiles/PinkPanther60.wav")
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(60)).await;
} }

View File

@ -3,7 +3,7 @@ use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use flume::{Receiver, Sender}; use flume::{Receiver, Sender};
use tracing::warn; use tracing::{error, warn};
use crate::decoder::MediaInfo; use crate::decoder::MediaInfo;
use crate::player_engine::{PlayerEngine, PlayerEngineCommand, PlayerMessage}; use crate::player_engine::{PlayerEngine, PlayerEngineCommand, PlayerMessage};
@ -27,7 +27,14 @@ impl Default for Player {
let tx_decoder = tx_engine.clone(); let tx_decoder = tx_engine.clone();
thread::spawn(move || { thread::spawn(move || {
let mut player = PlayerEngine::new(tx_decoder, tx_player); let mut player = match PlayerEngine::init(tx_decoder, tx_player) {
Err(e) => {
error!("Could not initialize player: {}", e);
return;
}
Ok(engine) => engine,
};
loop { loop {
match rx_engine.recv() { match rx_engine.recv() {
Ok(PlayerEngineCommand::Play(source_str, tx)) => { Ok(PlayerEngineCommand::Play(source_str, tx)) => {
@ -123,7 +130,7 @@ impl Player {
pub async fn volume(&self) -> Result<f32> { pub async fn volume(&self) -> Result<f32> {
let (tx, rx) = flume::bounded(1); let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?; self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?;
rx.recv_async().await? Ok(rx.recv_async().await?)
} }
pub async fn is_paused(&self) -> Result<bool> { pub async fn is_paused(&self) -> Result<bool> {
@ -136,7 +143,7 @@ impl Player {
let (tx, rx) = flume::bounded(1); let (tx, rx) = flume::bounded(1);
self.tx_engine self.tx_engine
.send(PlayerEngineCommand::SetVolume(volume, tx))?; .send(PlayerEngineCommand::SetVolume(volume, tx))?;
rx.recv_async().await? Ok(rx.recv_async().await?)
} }
pub async fn pause(&self) -> Result<()> { pub async fn pause(&self) -> Result<()> {

View File

@ -17,7 +17,7 @@ use thiserror::Error;
pub enum PlayerEngineCommand { pub enum PlayerEngineCommand {
Play(String, Sender<Result<MediaInfo>>), Play(String, Sender<Result<MediaInfo>>),
SetVolume(f32, Sender<Result<f32>>), SetVolume(f32, Sender<f32>),
Pause(Sender<Result<()>>), Pause(Sender<Result<()>>),
Unpause(Sender<Result<()>>), Unpause(Sender<Result<()>>),
TogglePlay(Sender<Result<bool>>), TogglePlay(Sender<Result<bool>>),
@ -25,7 +25,7 @@ pub enum PlayerEngineCommand {
Stop(Sender<Result<()>>), Stop(Sender<Result<()>>),
GetDuration(Sender<Result<Duration>>), GetDuration(Sender<Result<Duration>>),
GetElapsed(Sender<Result<Duration>>), GetElapsed(Sender<Result<Duration>>),
GetVolume(Sender<Result<f32>>), GetVolume(Sender<f32>),
GetPaused(Sender<Result<bool>>), GetPaused(Sender<Result<bool>>),
Eos, Eos,
SetElapsed(Duration), SetElapsed(Duration),
@ -54,8 +54,9 @@ pub struct PlayerEngine {
// Might do that using seeking in the future // Might do that using seeking in the future
current_source: Option<String>, current_source: Option<String>,
media_info: Option<MediaInfo>, media_info: Option<MediaInfo>,
sink: Option<Sink>, sink: Sink,
stream: Option<OutputStream>, // We need to keep the stream around as it will stop playing when it's dropped
_stream: OutputStream,
tx_engine: Sender<PlayerEngineCommand>, tx_engine: Sender<PlayerEngineCommand>,
tx_player: Sender<PlayerMessage>, tx_player: Sender<PlayerMessage>,
} }
@ -67,24 +68,31 @@ pub enum PlayerEngineError {
} }
impl PlayerEngine { impl PlayerEngine {
pub fn new(tx_engine: Sender<PlayerEngineCommand>, tx_player: Sender<PlayerMessage>) -> Self { pub fn init(
Self { tx_engine: Sender<PlayerEngineCommand>,
tx_player: Sender<PlayerMessage>,
) -> Result<Self> {
let (_stream, handle) = OutputStream::try_default()?;
let sink = Sink::try_new(&handle)?;
Ok(Self {
current_source: None, current_source: None,
media_info: None, media_info: None,
elapsed: Duration::default(), elapsed: Duration::default(),
sink: None, sink,
stream: None, _stream,
tx_engine, tx_engine,
tx_player, tx_player,
} })
} }
pub fn play(&mut self, source_str: &str) -> Result<MediaInfo> { pub fn play(&mut self, source_str: &str) -> Result<MediaInfo> {
let tx_player = self.tx_player.clone(); let tx_player = self.tx_player.clone();
let tx_engine = self.tx_engine.clone(); let tx_engine = self.tx_engine.clone();
let (stream, handle) = OutputStream::try_default()?; if !self.sink.empty() {
let sink = Sink::try_new(&handle)?; self.reset();
}
let (source, hint) = self.get_source(source_str)?; let (source, hint) = self.get_source(source_str)?;
let mss = MediaSourceStream::new(source, MediaSourceStreamOptions::default()); let mss = MediaSourceStream::new(source, MediaSourceStreamOptions::default());
let decoder = SymphoniaDecoder::new(mss, hint, self.tx_engine.clone())?; let decoder = SymphoniaDecoder::new(mss, hint, self.tx_engine.clone())?;
@ -110,12 +118,9 @@ impl PlayerEngine {
.send(PlayerMessage::Elapsed { elapsed, duration }) .send(PlayerMessage::Elapsed { elapsed, duration })
.unwrap_or_else(|e| warn!("Send error {}", e)); .unwrap_or_else(|e| warn!("Send error {}", e));
}); });
sink.append(decoder);
// We need to keep the stream around, otherwise it gets dropped outside of this scope self.sink.append(decoder);
self.stream = Some(stream); self.sink.play();
// The sink is used to control the stream
self.sink = Some(sink);
self.tx_player self.tx_player
.send(PlayerMessage::Playing) .send(PlayerMessage::Playing)
@ -126,49 +131,51 @@ impl PlayerEngine {
pub fn restart(&mut self) -> Result<MediaInfo> { pub fn restart(&mut self) -> Result<MediaInfo> {
if let Some(source) = self.current_source.clone() { if let Some(source) = self.current_source.clone() {
self.reset()?;
return self.play(&source); return self.play(&source);
} }
Err(PlayerEngineError::NotPlaying.into()) Err(PlayerEngineError::NotPlaying.into())
} }
pub fn pause(&mut self) -> Result<()> { pub fn pause(&mut self) -> Result<()> {
if let Some(sink) = &self.sink { if self.is_stopped() {
sink.pause(); return Err(PlayerEngineError::NotPlaying.into());
}
self.sink.pause();
self.tx_player self.tx_player
.send(PlayerMessage::Paused) .send(PlayerMessage::Paused)
.unwrap_or_else(|e| warn!("Send error {}", e)); .unwrap_or_else(|e| warn!("Send error {}", e));
return Ok(()); Ok(())
}
Err(PlayerEngineError::NotPlaying.into())
} }
pub fn unpause(&mut self) -> Result<()> { pub fn unpause(&mut self) -> Result<()> {
if let Some(sink) = &self.sink { if self.is_stopped() {
sink.play(); return Err(PlayerEngineError::NotPlaying.into());
}
self.sink.play();
self.tx_player self.tx_player
.send(PlayerMessage::Playing) .send(PlayerMessage::Playing)
.unwrap_or_else(|e| warn!("Send error {}", e)); .unwrap_or_else(|e| warn!("Send error {}", e));
return Ok(()); Ok(())
}
Err(PlayerEngineError::NotPlaying.into())
} }
pub fn toggle_play(&mut self) -> Result<bool> { pub fn toggle_play(&mut self) -> Result<bool> {
if let Some(sink) = &self.sink { if self.is_stopped() {
if sink.is_paused() { return Err(PlayerEngineError::NotPlaying.into());
sink.play(); }
return Ok(true); if self.sink.is_paused() {
self.sink.play();
Ok(true)
} else { } else {
sink.pause(); self.sink.pause();
return Ok(false); Ok(false)
} }
} }
Err(PlayerEngineError::NotPlaying.into())
}
pub fn stop(&mut self) -> Result<()> { pub fn stop(&mut self) -> Result<()> {
self.reset()?; if self.is_stopped() {
return Err(PlayerEngineError::NotPlaying.into());
}
self.reset();
self.tx_player self.tx_player
.send(PlayerMessage::Stopped) .send(PlayerMessage::Stopped)
.unwrap_or_else(|e| warn!("Send error {}", e)); .unwrap_or_else(|e| warn!("Send error {}", e));
@ -176,15 +183,14 @@ impl PlayerEngine {
} }
pub fn is_paused(&self) -> Result<bool> { pub fn is_paused(&self) -> Result<bool> {
self.sink if self.is_stopped() {
.as_ref() return Err(PlayerEngineError::NotPlaying.into());
.map_or(Err(PlayerEngineError::NotPlaying.into()), |s| { }
Ok(s.is_paused()) Ok(self.sink.is_paused())
})
} }
pub fn is_stopped(&self) -> bool { pub fn is_stopped(&self) -> bool {
self.sink.is_none() self.sink.len() == 0
} }
pub fn duration(&self) -> Result<Duration> { pub fn duration(&self) -> Result<Duration> {
@ -202,25 +208,17 @@ impl PlayerEngine {
Ok(self.elapsed) Ok(self.elapsed)
} }
pub fn volume(&self) -> Result<f32> { pub fn volume(&self) -> f32 {
self.sink.as_ref().map_or( self.sink.volume()
Err(PlayerEngineError::NotPlaying.into()),
|s| Ok(s.volume()),
)
} }
pub fn set_volume(&mut self, volume: f32) -> Result<f32> { pub fn set_volume(&mut self, volume: f32) -> f32 {
if let Some(sink) = &self.sink { self.sink.set_volume(volume.clamp(0.0, 1.1));
sink.set_volume(volume.clamp(0.0, 1.1)); self.sink.volume()
return Ok(sink.volume());
}
Err(PlayerEngineError::NotPlaying.into())
} }
pub fn handle_eos(&mut self) { pub fn handle_eos(&mut self) {
self.reset().unwrap_or_else(|e| { self.reset();
warn!("Sink error {}", e);
});
self.tx_player self.tx_player
.send(PlayerMessage::EndOfStream) .send(PlayerMessage::EndOfStream)
.unwrap_or_else(|e| warn!("Send error {}", e)); .unwrap_or_else(|e| warn!("Send error {}", e));
@ -230,16 +228,11 @@ impl PlayerEngine {
self.elapsed = elapsed; self.elapsed = elapsed;
} }
fn reset(&mut self) -> Result<()> { fn reset(&mut self) {
self.elapsed = Duration::default(); self.elapsed = Duration::default();
self.current_source = None; self.current_source = None;
if let Some(sink) = &self.sink { self.sink.pause();
sink.stop(); self.sink.clear();
self.sink.take();
self.stream.take();
return Ok(());
}
Err(PlayerEngineError::NotPlaying.into())
} }
fn get_source(&self, source_str: &str) -> Result<(Box<dyn MediaSource>, Hint)> { fn get_source(&self, source_str: &str) -> Result<(Box<dyn MediaSource>, Hint)> {