Implement full async player API

This commit is contained in:
chmanie 2023-06-08 14:40:09 +02:00
parent 85d6d263e1
commit 983609e2f4
5 changed files with 253 additions and 103 deletions

1
Cargo.lock generated
View File

@ -180,6 +180,7 @@ dependencies = [
"stream-download",
"symphonia",
"thiserror",
"tracing",
"url",
]

View File

@ -11,3 +11,4 @@ anyhow = "1.0.71"
url = "2.4.0"
flume = "0.10.14"
thiserror = "1.0.40"
tracing = "0.1.37"

View File

@ -11,10 +11,11 @@ use symphonia::{
io::MediaSourceStream,
meta::{MetadataOptions, MetadataRevision},
probe::Hint,
units::{self, Time, TimeBase},
units::{Time, TimeBase},
},
default::get_probe,
};
use tracing::warn;
use rodio::Source;
@ -25,6 +26,7 @@ use crate::PlayerEngineCommand;
// But a decode error in more than 3 consecutive packets is fatal.
const MAX_DECODE_ERRORS: usize = 3;
#[derive(Clone)]
pub struct MediaInfo {
pub duration: Option<Duration>,
pub metadata: Option<MetadataRevision>,
@ -97,7 +99,7 @@ impl SymphoniaDecoder {
.map(|frames| track.codec_params.start_ts + frames)
.unwrap_or_default();
let mut elapsed = 0;
let mut _elapsed = 0;
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &DecoderOptions { verify: true })?;
@ -105,7 +107,7 @@ impl SymphoniaDecoder {
let mut decode_errors: usize = 0;
let decoded = loop {
let current_frame = probed.format.next_packet()?;
elapsed = current_frame.ts();
_elapsed = current_frame.ts();
match decoder.decode(&current_frame) {
Ok(decoded) => break decoded,
Err(e) => match e {
@ -142,7 +144,7 @@ impl SymphoniaDecoder {
spec,
time_base,
duration,
elapsed,
elapsed: _elapsed,
metadata,
track,
tx,
@ -259,7 +261,9 @@ impl Iterator for SymphoniaDecoder {
if err.kind() == std::io::ErrorKind::UnexpectedEof
&& err.to_string() == "end of stream"
{
self.tx.send(PlayerEngineCommand::Eos);
self.tx
.send(PlayerEngineCommand::Eos)
.unwrap_or_else(|e| warn!("Send error {}", e));
return None;
}
}

View File

@ -4,16 +4,16 @@ mod player_engine;
use std::thread;
use std::time::Duration;
use anyhow::{anyhow, Result};
use decoder::MediaInfo;
use anyhow::Result;
pub use decoder::MediaInfo;
use flume::{Receiver, Sender};
pub use player_engine::PlayerMessage;
use player_engine::{PlayerEngine, PlayerEngineCommand};
use tracing::warn;
// TODO:
// * Emit buffering
// * Emit errors
pub enum PlayerError {}
@ -35,26 +35,57 @@ impl Default for Player {
loop {
match rx_engine.recv() {
Ok(PlayerEngineCommand::Play(source_str, tx)) => {
let res = player.play(&source_str);
tx.send(res);
tx.send(player.play(&source_str))
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::Pause) => {
player.pause();
Ok(PlayerEngineCommand::Pause(tx)) => {
tx.send(player.pause())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::Unpause) => {
player.unpause();
Ok(PlayerEngineCommand::Unpause(tx)) => {
tx.send(player.unpause())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::Stop) => {
player.stop();
Ok(PlayerEngineCommand::Stop(tx)) => {
tx.send(player.stop())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::TogglePlay) => {
player.toggle_play();
Ok(PlayerEngineCommand::TogglePlay(tx)) => {
tx.send(player.toggle_play())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::Restart(tx)) => {
tx.send(player.restart())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::GetDuration(tx)) => {
tx.send(player.duration())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::GetElapsed(tx)) => {
tx.send(player.elapsed())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::GetVolume(tx)) => {
tx.send(player.volume())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::GetPaused(tx)) => {
tx.send(player.is_paused())
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::SetVolume(volume, tx)) => {
tx.send(player.set_volume(volume))
.unwrap_or_else(|e| warn!("Send error {}", e));
}
Ok(PlayerEngineCommand::SetElapsed(elapsed)) => {
player.handle_elapsed(elapsed);
}
Ok(PlayerEngineCommand::Eos) => {
player.handle_eos();
}
Err(e) => {
// FIXME: debug!(e);
warn!("Recv error {}", e);
}
}
}
@ -68,61 +99,72 @@ impl Default for Player {
}
impl Player {
// FIXME: this could check if the player started playing using a channel
// Then it would be async (wait for Playing for example)
pub async fn play(&self, source_str: &str) -> Result<MediaInfo> {
let (tx, rx) = flume::bounded(1);
self.tx_engine
.send(PlayerEngineCommand::Play(source_str.to_string(), tx));
if let Ok(res) = rx.recv_async().await {
return res;
}
// FIXME: add error type
Err(anyhow!("Player channel error"))
.send(PlayerEngineCommand::Play(source_str.to_string(), tx))?;
rx.recv_async().await?
}
pub async fn elpased(&self) -> Duration {
// FIXME: implement
Duration::default()
pub async fn restart(&self) -> Result<MediaInfo> {
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::Restart(tx))?;
rx.recv_async().await?
}
pub async fn duration(&self) -> Duration {
// FIXME: implement
Duration::default()
pub async fn elpased(&self) -> Result<Duration> {
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::GetElapsed(tx))?;
rx.recv_async().await?
}
pub async fn volume(&self) -> f32 {
// FIXME: implement
0.0
pub async fn duration(&self) -> Result<Duration> {
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::GetDuration(tx))?;
rx.recv_async().await?
}
pub async fn set_volume(&self) -> Result<()> {
// FIXME: implement
Ok(())
pub async fn volume(&self) -> Result<f32> {
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::GetVolume(tx))?;
rx.recv_async().await?
}
pub async fn is_paused(&self) -> Result<bool> {
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::GetPaused(tx))?;
rx.recv_async().await?
}
pub async fn set_volume(&self, volume: f32) -> Result<f32> {
let (tx, rx) = flume::bounded(1);
let vol = volume.clamp(0.0, 1.1);
self.tx_engine
.send(PlayerEngineCommand::SetVolume(vol, tx))?;
rx.recv_async().await?
}
pub async fn pause(&self) -> Result<()> {
self.tx_engine.send(PlayerEngineCommand::Pause);
Ok(())
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::Pause(tx))?;
rx.recv_async().await?
}
pub async fn unpause(&self) -> Result<()> {
self.tx_engine.send(PlayerEngineCommand::Unpause);
Ok(())
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::Unpause(tx))?;
rx.recv_async().await?
}
pub async fn toggle_play(&self) -> Result<()> {
self.tx_engine.send(PlayerEngineCommand::TogglePlay);
Ok(())
pub async fn toggle_play(&self) -> Result<bool> {
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::TogglePlay(tx))?;
rx.recv_async().await?
}
pub async fn stop(&self) -> Result<()> {
self.tx_engine.send(PlayerEngineCommand::Stop);
Ok(())
}
pub async fn restart(&self) -> Result<()> {
// FIXME: implement
Ok(())
let (tx, rx) = flume::bounded(1);
self.tx_engine.send(PlayerEngineCommand::Stop(tx))?;
rx.recv_async().await?
}
}

View File

@ -1,35 +1,44 @@
use flume::{Receiver, Sender};
use flume::Sender;
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use std::thread;
use std::time::Duration;
use symphonia::core::probe::Hint;
use tracing::warn;
use url::Url;
use crate::decoder::{MediaInfo, SymphoniaDecoder};
use anyhow::{anyhow, Result};
use rodio::source::{PeriodicAccess, SineWave};
use rodio::{OutputStream, OutputStreamHandle, Sink, Source};
use rodio::{OutputStream, Sink, Source};
use stream_download::StreamDownload;
use symphonia::core::io::{
MediaSource, MediaSourceStream, MediaSourceStreamOptions, ReadOnlySource,
};
use thiserror::Error;
pub enum PlayerEngineCommand {
Play(String, Sender<Result<MediaInfo>>),
Pause,
Unpause,
TogglePlay,
Stop,
SetVolume(f32, Sender<Result<f32>>),
Pause(Sender<Result<()>>),
Unpause(Sender<Result<()>>),
TogglePlay(Sender<Result<bool>>),
Restart(Sender<Result<MediaInfo>>),
Stop(Sender<Result<()>>),
GetDuration(Sender<Result<Duration>>),
GetElapsed(Sender<Result<Duration>>),
GetVolume(Sender<Result<f32>>),
GetPaused(Sender<Result<bool>>),
Eos,
SetElapsed(Duration),
}
// FIXME: sort out media info size (probably send pointers to stuff on the heap)
pub enum PlayerMessage {
// MediaInfo(MediaInfo),
Duration(Duration),
Elapsed(Duration),
Duration {
duration: Duration,
},
Elapsed {
duration: Duration,
elapsed: Duration,
},
Stopped,
Paused,
Playing,
@ -40,15 +49,29 @@ pub enum PlayerMessage {
// * Emit buffering
pub struct PlayerEngine {
elapsed: Duration,
// FIXME: We only need this to re-start a track
// Might do that using seeking in the future
current_source: Option<String>,
media_info: Option<MediaInfo>,
sink: Option<Sink>,
stream: Option<OutputStream>,
tx_engine: Sender<PlayerEngineCommand>,
tx_player: Sender<PlayerMessage>,
}
#[derive(Debug, Error)]
pub enum PlayerEngineError {
#[error("Sink is not playing")]
NotPlaying,
}
impl PlayerEngine {
pub fn new(tx_engine: Sender<PlayerEngineCommand>, tx_player: Sender<PlayerMessage>) -> Self {
Self {
current_source: None,
media_info: None,
elapsed: Duration::default(),
sink: None,
stream: None,
tx_engine,
@ -58,20 +81,33 @@ impl PlayerEngine {
pub fn play(&mut self, source_str: &str) -> Result<MediaInfo> {
let tx_player = self.tx_player.clone();
let tx_engine = self.tx_engine.clone();
let (stream, handle) = OutputStream::try_default()?;
let mut sink = Sink::try_new(&handle)?;
let sink = Sink::try_new(&handle)?;
let (source, hint) = self.get_source(source_str)?;
let mss = MediaSourceStream::new(source, MediaSourceStreamOptions::default());
let decoder = SymphoniaDecoder::new(mss, hint, self.tx_engine.clone())?;
let media_info = decoder.media_info();
let media_info_copy = media_info.clone();
let duration = media_info.duration.unwrap_or_default();
tx_player.send(PlayerMessage::Duration(
media_info.duration.unwrap_or_default(),
));
self.media_info = Some(media_info);
tx_player
.send(PlayerMessage::Duration { duration })
.unwrap_or_else(|e| warn!("Send error {}", e));
// FIXME: regularly update metadata revision
let decoder = decoder.periodic_access(Duration::from_millis(250), move |src| {
tx_player.send(PlayerMessage::Elapsed(src.elapsed()));
let elapsed = src.elapsed();
tx_engine
.send(PlayerEngineCommand::SetElapsed(elapsed))
.unwrap_or_else(|e| warn!("Send error {}", e));
tx_player
.send(PlayerMessage::Elapsed { elapsed, duration })
.unwrap_or_else(|e| warn!("Send error {}", e));
});
sink.append(decoder);
@ -80,65 +116,131 @@ impl PlayerEngine {
// The sink is used to control the stream
self.sink = Some(sink);
self.tx_player.send(PlayerMessage::Playing);
self.tx_player
.send(PlayerMessage::Playing)
.unwrap_or_else(|e| warn!("Send error {}", e));
Ok(media_info)
Ok(media_info_copy)
}
pub fn pause(&mut self) {
pub fn restart(&mut self) -> Result<MediaInfo> {
if let Some(source) = self.current_source.clone() {
self.reset()?;
return self.play(&source);
}
Err(PlayerEngineError::NotPlaying.into())
}
pub fn pause(&mut self) -> Result<()> {
if let Some(sink) = &self.sink {
sink.pause();
self.tx_player.send(PlayerMessage::Paused);
self.tx_player
.send(PlayerMessage::Paused)
.unwrap_or_else(|e| warn!("Send error {}", e));
return Ok(());
}
Err(PlayerEngineError::NotPlaying.into())
}
pub fn unpause(&mut self) {
pub fn unpause(&mut self) -> Result<()> {
if let Some(sink) = &self.sink {
sink.play();
self.tx_player.send(PlayerMessage::Playing);
self.tx_player
.send(PlayerMessage::Playing)
.unwrap_or_else(|e| warn!("Send error {}", e));
return Ok(());
}
Err(PlayerEngineError::NotPlaying.into())
}
pub fn toggle_play(&mut self) {
if self.is_stopped() {
return;
}
if self.is_paused() {
self.unpause();
pub fn toggle_play(&mut self) -> Result<bool> {
if let Some(sink) = &self.sink {
if sink.is_paused() {
sink.play();
return Ok(true);
} else {
self.pause();
sink.pause();
return Ok(false);
}
}
Err(PlayerEngineError::NotPlaying.into())
}
pub fn stop(&mut self) {
if let Some(sink) = &self.sink {
sink.stop();
self.sink.take();
self.stream.take();
self.tx_player.send(PlayerMessage::Stopped);
}
pub fn stop(&mut self) -> Result<()> {
self.reset()?;
self.tx_player
.send(PlayerMessage::Stopped)
.unwrap_or_else(|e| warn!("Send error {}", e));
Ok(())
}
pub fn handle_eos(&mut self) {
if let Some(sink) = &self.sink {
sink.stop();
self.sink.take();
self.stream.take();
self.tx_player.send(PlayerMessage::EndOfStream);
}
}
pub fn is_paused(&self) -> bool {
pub fn is_paused(&self) -> Result<bool> {
self.sink
.as_ref()
.map(|s| s.is_paused())
.unwrap_or_default()
.map_or(Err(PlayerEngineError::NotPlaying.into()), |s| {
Ok(s.is_paused())
})
}
pub fn is_stopped(&self) -> bool {
self.sink.is_none()
}
pub fn duration(&self) -> Result<Duration> {
self.media_info
.as_ref()
.map_or(Err(PlayerEngineError::NotPlaying.into()), |m| {
Ok(m.duration.unwrap_or_default())
})
}
pub fn elapsed(&self) -> Result<Duration> {
if self.is_stopped() {
return Err(PlayerEngineError::NotPlaying.into());
}
Ok(self.elapsed)
}
pub fn volume(&self) -> Result<f32> {
self.sink.as_ref().map_or(
Err(PlayerEngineError::NotPlaying.into()),
|s| Ok(s.volume()),
)
}
pub fn set_volume(&mut self, volume: f32) -> Result<f32> {
if let Some(sink) = &self.sink {
sink.set_volume(volume);
return Ok(sink.volume());
}
Err(PlayerEngineError::NotPlaying.into())
}
pub fn handle_eos(&mut self) {
self.reset().unwrap_or_else(|e| {
warn!("Sink error {}", e);
});
self.tx_player
.send(PlayerMessage::EndOfStream)
.unwrap_or_else(|e| warn!("Send error {}", e));
}
pub fn handle_elapsed(&mut self, elapsed: Duration) {
self.elapsed = elapsed;
}
fn reset(&mut self) -> Result<()> {
self.elapsed = Duration::default();
self.current_source = None;
if let Some(sink) = &self.sink {
sink.stop();
self.sink.take();
self.stream.take();
return Ok(());
}
Err(PlayerEngineError::NotPlaying.into())
}
fn get_source(&self, source_str: &str) -> Result<(Box<dyn MediaSource>, Hint)> {
match Url::parse(source_str) {
Ok(url) => {