Integrate new audio player

This commit is contained in:
Hans Mündelein 2023-06-08 16:10:17 +02:00
parent 5de624ffab
commit a31608aae9
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
6 changed files with 132 additions and 412 deletions

303
Cargo.lock generated
View File

@ -164,12 +164,6 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
[[package]]
name = "atomic_refcell"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79d6dc922a2792b006573f60b2648076355daeae5ce9cb59507e5908c9625d31"
[[package]] [[package]]
name = "audio-player" name = "audio-player"
version = "0.1.0" version = "0.1.0"
@ -366,16 +360,6 @@ dependencies = [
"nom", "nom",
] ]
[[package]]
name = "cfg-expr"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8790cf1286da485c72cf5fc7aeba308438800036ec67d89425924c4807268c9"
dependencies = [
"smallvec",
"target-lexicon",
]
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -545,12 +529,11 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"audio-player",
"crabidy-core", "crabidy-core",
"dirs", "dirs",
"flume", "flume",
"futures", "futures",
"gstreamer",
"gstreamer-play",
"log", "log",
"once_cell", "once_cell",
"serde", "serde",
@ -954,209 +937,12 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "gio-sys"
version = "0.17.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b1d43b0d7968b48455244ecafe41192871257f5740aa6b095eb19db78e362a5"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"winapi",
]
[[package]]
name = "glib"
version = "0.17.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f1de7cbde31ea4f0a919453a2dcece5d54d5b70e08f8ad254dc4840f5f09b6"
dependencies = [
"bitflags",
"futures-channel",
"futures-core",
"futures-executor",
"futures-task",
"futures-util",
"gio-sys",
"glib-macros",
"glib-sys",
"gobject-sys",
"libc",
"memchr",
"once_cell",
"smallvec",
"thiserror",
]
[[package]]
name = "glib-macros"
version = "0.17.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a7206c5c03851ef126ea1444990e81fdd6765fb799d5bc694e4897ca01bb97f"
dependencies = [
"anyhow",
"heck 0.4.1",
"proc-macro-crate",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "glib-sys"
version = "0.17.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f00ad0a1bf548e61adfff15d83430941d9e1bb620e334f779edd1c745680a5"
dependencies = [
"libc",
"system-deps",
]
[[package]] [[package]]
name = "glob" name = "glob"
version = "0.3.1" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gobject-sys"
version = "0.17.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15e75b0000a64632b2d8ca3cf856af9308e3a970844f6e9659bd197f026793d0"
dependencies = [
"glib-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer"
version = "0.20.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4530401c89be6dc10d77ae1587b811cf455c97dce7abf594cb9164527c7da7fc"
dependencies = [
"bitflags",
"cfg-if",
"futures-channel",
"futures-core",
"futures-util",
"glib",
"gstreamer-sys",
"libc",
"muldiv",
"num-integer",
"num-rational",
"once_cell",
"option-operations",
"paste",
"pretty-hex",
"smallvec",
"thiserror",
]
[[package]]
name = "gstreamer-base"
version = "0.20.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b8ff5dfbf7bcaf1466a385b836bad0d8da25759f121458727fdda1f771c69b3"
dependencies = [
"atomic_refcell",
"bitflags",
"cfg-if",
"glib",
"gstreamer",
"gstreamer-base-sys",
"libc",
]
[[package]]
name = "gstreamer-base-sys"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26114ed96f6668380f5a1554128159e98e06c3a7a8460f216d7cd6dce28f928c"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-play"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f752a53171e330c7f56db24ca91d99b7958dc86395ebe91b117226d339b29306"
dependencies = [
"bitflags",
"glib",
"gstreamer",
"gstreamer-play-sys",
"gstreamer-video",
"libc",
"once_cell",
]
[[package]]
name = "gstreamer-play-sys"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b69030bd53c3e5988a1e13bdb55ae8d922f8e9c2b522bfa2442bc13906829fb"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"gstreamer-video-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-sys"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e56fe047adef7d47dbafa8bc1340fddb53c325e16574763063702fc94b5786d2"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-video"
version = "0.20.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dce97769effde2d779dc4f7037b37106457b74e53f2a711bddc90b30ffeb7e06"
dependencies = [
"bitflags",
"cfg-if",
"futures-channel",
"glib",
"gstreamer",
"gstreamer-base",
"gstreamer-video-sys",
"libc",
"once_cell",
]
[[package]]
name = "gstreamer-video-sys"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66ddb6112d438aac0004d2db6053a572f92b1c5e0e9d6ff6c71d9245f7f73e46"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.19" version = "0.3.19"
@ -1648,12 +1434,6 @@ dependencies = [
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
[[package]]
name = "muldiv"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"
[[package]] [[package]]
name = "multimap" name = "multimap"
version = "0.8.3" version = "0.8.3"
@ -1794,17 +1574,6 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "num-rational"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.15" version = "0.2.15"
@ -1953,15 +1722,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "option-operations"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c26d27bb1aeab65138e4bf7666045169d1717febcc9ff870166be8348b223d0"
dependencies = [
"paste",
]
[[package]] [[package]]
name = "ordered-stream" name = "ordered-stream"
version = "0.2.0" version = "0.2.0"
@ -2007,12 +1767,6 @@ dependencies = [
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
[[package]]
name = "paste"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79"
[[package]] [[package]]
name = "peeking_take_while" name = "peeking_take_while"
version = "0.1.2" version = "0.1.2"
@ -2139,12 +1893,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pretty-hex"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6fa0831dd7cc608c38a5e323422a0077678fa5744aa2be4ad91c4ece8eec8d5"
[[package]] [[package]]
name = "prettyplease" name = "prettyplease"
version = "0.1.25" version = "0.1.25"
@ -2165,30 +1913,6 @@ dependencies = [
"toml_edit", "toml_edit",
] ]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn 1.0.109",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.58" version = "1.0.58"
@ -3025,25 +2749,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "system-deps"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5fa6fb9ee296c0dc2df41a656ca7948546d061958115ddb0bcaae43ad0d17d2"
dependencies = [
"cfg-expr",
"heck 0.4.1",
"pkg-config",
"toml 0.7.4",
"version-compare",
]
[[package]]
name = "target-lexicon"
version = "0.12.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd1ba337640d60c3e96bc6f0638a939b9c9a7f2c316a1598c279828b3d1dc8c5"
[[package]] [[package]]
name = "tauri-winrt-notification" name = "tauri-winrt-notification"
version = "0.1.0" version = "0.1.0"
@ -3583,12 +3288,6 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version-compare"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "579a42fc0b8e0c63b76519a339be31bed574929511fa53c1a3acae26eb258f29"
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.4" version = "0.9.4"

View File

@ -7,11 +7,10 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.71" anyhow = "1.0.71"
gstreamer = "0.20.5"
gstreamer-play = "0.20.2"
tokio = { version = "1.28.0", features = ["full"] } tokio = { version = "1.28.0", features = ["full"] }
tidaldy = { path = "../tidaldy" } tidaldy = { path = "../tidaldy" }
crabidy-core = { path = "../crabidy-core" } crabidy-core = { path = "../crabidy-core" }
audio-player = { path = "../audio-player" }
once_cell = "1.17.1" once_cell = "1.17.1"
serde_json = "1.0.96" serde_json = "1.0.96"
serde = "1.0.163" serde = "1.0.163"

View File

@ -1,8 +1,8 @@
use audio_player::PlayerMessage;
use crabidy_core::proto::crabidy::{ use crabidy_core::proto::crabidy::{
crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, Track, crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, PlayState, Track,
}; };
use crabidy_core::{ProviderClient, ProviderError}; use crabidy_core::{ProviderClient, ProviderError};
use gstreamer_play::{PlayMessage, PlayState as GstPlaystate};
use tracing::{debug_span, info, instrument, warn, Span}; use tracing::{debug_span, info, instrument, warn, Span};
use tracing_subscriber::{filter::Targets, prelude::*}; use tracing_subscriber::{filter::Targets, prelude::*};
@ -36,19 +36,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(registry) tracing::subscriber::set_global_default(registry)
.expect("Setting the default tracing subscriber failed"); .expect("Setting the default tracing subscriber failed");
gstreamer::init()?; info!("audio player started initialized");
info!("gstreamer initialized");
let (update_tx, _) = tokio::sync::broadcast::channel(2048); let (update_tx, _) = tokio::sync::broadcast::channel(2048);
let orchestrator = ProviderOrchestrator::init("").await.unwrap(); let orchestrator = ProviderOrchestrator::init("").await.unwrap();
let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone()); let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone());
let bus = playback.play.message_bus();
let playback_tx = playback.playback_tx.clone(); let playback_tx = playback.playback_tx.clone();
let player_msg = playback.player.messages.clone();
std::thread::spawn(|| { std::thread::spawn(|| {
poll_play_bus(bus, playback_tx); poll_play_bus(player_msg, playback_tx);
}); });
info!("gstreamer bus handler started"); info!("gstreamer bus handler started");
@ -71,44 +70,51 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
#[instrument(skip(bus, tx))] #[instrument(skip(rx, tx))]
fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender<PlaybackMessage>) { fn poll_play_bus(rx: flume::Receiver<PlayerMessage>, tx: flume::Sender<PlaybackMessage>) {
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { for msg in rx.iter() {
let span = debug_span!("play-chan"); let span = debug_span!("play-chan");
match PlayMessage::parse(&msg) { match msg {
Ok(PlayMessage::EndOfStream) => { PlayerMessage::EndOfStream => {
tx.send(PlaybackMessage::Next { span }).unwrap(); tx.send(PlaybackMessage::Next { span }).unwrap();
} }
Ok(PlayMessage::StateChanged { state }) => { PlayerMessage::Stopped => {
tx.send(PlaybackMessage::StateChanged { state, span }) tx.send(PlaybackMessage::StateChanged {
.unwrap(); state: PlayState::Stopped,
span,
})
.unwrap();
} }
Ok(PlayMessage::PositionUpdated { position }) => { PlayerMessage::Paused => {
let position = position tx.send(PlaybackMessage::StateChanged {
.and_then(|t| Some(t.mseconds() as u32)) state: PlayState::Paused,
.unwrap_or(0); span,
tx.send(PlaybackMessage::PostitionChanged { position, span }) })
.unwrap(); .unwrap();
} }
Ok(PlayMessage::Buffering { percent: _ }) => {} PlayerMessage::Playing => {
Ok(PlayMessage::VolumeChanged { volume }) => { tx.send(PlaybackMessage::StateChanged {
let volume = volume as f32; state: PlayState::Playing,
tx.send(PlaybackMessage::VolumeChanged { volume, span }) span,
.unwrap(); })
.unwrap();
} }
Ok(PlayMessage::MuteChanged { muted }) => { PlayerMessage::Elapsed { duration, elapsed } => {
tx.send(PlaybackMessage::MuteChanged { muted, span }) tx.send(PlaybackMessage::PostitionChanged {
.unwrap(); duration: duration.as_millis() as u32,
position: elapsed.as_millis() as u32,
span,
})
.unwrap();
}
PlayerMessage::Duration { duration } => {
tx.send(PlaybackMessage::PostitionChanged {
duration: duration.as_millis() as u32,
position: 0,
span,
})
.unwrap();
} }
Ok(PlayMessage::MediaInfoUpdated { info: _ }) => {}
Ok(PlayMessage::UriLoaded) => {}
Ok(PlayMessage::VideoDimensionsChanged {
width: _,
height: _,
}) => {}
Ok(PlayMessage::DurationChanged { duration: _ }) => {}
_ => println!("Unknown message: {:?}", msg),
} }
} }
} }
@ -194,7 +200,7 @@ pub enum PlaybackMessage {
span: Span, span: Span,
}, },
StateChanged { StateChanged {
state: GstPlaystate, state: PlayState,
span: Span, span: Span,
}, },
VolumeChanged { VolumeChanged {
@ -207,6 +213,7 @@ pub enum PlaybackMessage {
span: Span, span: Span,
}, },
PostitionChanged { PostitionChanged {
duration: u32,
position: u32, position: u32,
span: Span, span: Span,
}, },

View File

@ -1,24 +1,23 @@
use crate::PlaybackMessage; use crate::PlaybackMessage;
use crate::ProviderMessage; use crate::ProviderMessage;
use audio_player::Player;
use crabidy_core::proto::crabidy::{ use crabidy_core::proto::crabidy::{
get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack, get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack,
Track, TrackPosition, Track, TrackPosition,
}; };
use crabidy_core::ProviderError; use crabidy_core::ProviderError;
use gstreamer_play::{Play, PlayState as GstPlaystate, PlayVideoRenderer};
use std::sync::Mutex; use std::sync::Mutex;
use tracing::debug_span; use tracing::debug_span;
use tracing::{debug, error, instrument, trace, warn, Instrument}; use tracing::{debug, error, instrument, trace, warn, Instrument};
#[derive(Debug)]
pub struct Playback { pub struct Playback {
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>, update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
provider_tx: flume::Sender<ProviderMessage>, provider_tx: flume::Sender<ProviderMessage>,
pub playback_tx: flume::Sender<PlaybackMessage>, pub playback_tx: flume::Sender<PlaybackMessage>,
playback_rx: flume::Receiver<PlaybackMessage>, playback_rx: flume::Receiver<PlaybackMessage>,
queue: Mutex<Queue>, queue: Mutex<Queue>,
state: Mutex<GstPlaystate>, state: Mutex<PlayState>,
pub play: Play, pub player: Player,
} }
impl Playback { impl Playback {
@ -32,8 +31,8 @@ impl Playback {
current_position: 0, current_position: 0,
tracks: Vec::new(), tracks: Vec::new(),
}); });
let state = Mutex::new(GstPlaystate::Stopped); let state = Mutex::new(PlayState::Stopped);
let play = Play::new(None::<PlayVideoRenderer>); let player = Player::default();
Self { Self {
update_tx, update_tx,
provider_tx, provider_tx,
@ -41,13 +40,13 @@ impl Playback {
playback_rx, playback_rx,
queue, queue,
state, state,
play, player,
} }
} }
#[instrument]
pub fn run(self) { pub fn run(self) {
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(message) = self.playback_rx.recv_async().in_current_span().await { while let Ok(message) = self.playback_rx.recv_async().await {
match message { match message {
PlaybackMessage::Init { result_tx, span } => { PlaybackMessage::Init { result_tx, span } => {
let _e = span.enter(); let _e = span.enter();
@ -60,26 +59,19 @@ impl Playback {
}; };
trace!("queue_track {:?}", queue_track); trace!("queue_track {:?}", queue_track);
debug!("released queue_track lock"); debug!("released queue_track lock");
let position = TrackPosition { let position = TrackPosition {
duration: self duration: 0,
.play position: 0,
.duration()
.map(|t| t.mseconds() as u32)
.unwrap_or(0),
position: self
.play
.position()
.map(|t| t.mseconds() as u32)
.unwrap_or(0),
}; };
trace!("position {:?}", position); trace!("position {:?}", position);
let play_state = { let play_state = {
debug!("getting play state lock"); debug!("getting play state lock");
match *self.state.lock().unwrap() { match *self.state.lock().unwrap() {
GstPlaystate::Playing => PlayState::Playing, PlayState::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused, PlayState::Paused => PlayState::Paused,
GstPlaystate::Stopped => PlayState::Stopped, PlayState::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading, PlayState::Loading => PlayState::Loading,
_ => PlayState::Unspecified, _ => PlayState::Unspecified,
} }
}; };
@ -89,8 +81,8 @@ impl Playback {
queue: Some(queue.clone()), queue: Some(queue.clone()),
queue_track: Some(queue_track), queue_track: Some(queue_track),
play_state: play_state as i32, play_state: play_state as i32,
volume: self.play.volume() as f32, volume: 0.0,
mute: self.play.is_muted(), mute: false,
position: Some(position), position: Some(position),
} }
}; };
@ -109,6 +101,7 @@ impl Playback {
let tracks = self.flatten_node(&uuid).in_current_span().await; let tracks = self.flatten_node(&uuid).in_current_span().await;
all_tracks.extend(tracks); all_tracks.extend(tracks);
} }
debug!("uuid: {:?}", uuid);
} }
trace!("got tracks {:?}", all_tracks); trace!("got tracks {:?}", all_tracks);
let current = { let current = {
@ -247,12 +240,14 @@ impl Playback {
let _e = span.enter(); let _e = span.enter();
debug!("toggling play"); debug!("toggling play");
{ {
let state = self.state.lock().unwrap(); let state = *self.state.lock().unwrap();
debug!("got state lock"); debug!("got state lock");
if *state == GstPlaystate::Playing { if state == PlayState::Playing {
self.play.pause(); if let Err(err) = self.player.pause().await {
} else { error!("{:?}", err)
self.play.play(); }
} else if let Err(err) = self.player.unpause().await {
error!("{:?}", err)
} }
} }
debug!("state lock released"); debug!("state lock released");
@ -261,23 +256,28 @@ impl Playback {
PlaybackMessage::Stop { span } => { PlaybackMessage::Stop { span } => {
let _e = span.enter(); let _e = span.enter();
debug!("stopping"); debug!("stopping");
self.play.stop(); if let Err(err) = self.player.stop().await {
error!("{:?}", err)
}
} }
PlaybackMessage::ChangeVolume { delta, span } => { PlaybackMessage::ChangeVolume { delta, span } => {
let _e = span.enter(); let _e = span.enter();
debug!("changing volume"); debug!("changing volume");
let volume = self.play.volume(); if let Ok(volume) = self.player.volume().await {
debug!("got volume {:?}", volume); debug!("got volume {:?}", volume);
self.play.set_volume(volume + delta as f64); if let Err(err) = self.player.set_volume(volume + delta).await {
error!("{:?}", err)
};
}
} }
PlaybackMessage::ToggleMute { span } => { PlaybackMessage::ToggleMute { span } => {
let _e = span.enter(); let _e = span.enter();
debug!("toggling mute"); debug!("toggling mute");
let muted = self.play.is_muted(); // let muted = self.player.is_muted();
debug!("got muted {:?}", muted); // debug!("got muted {:?}", muted);
self.play.set_mute(!muted); // self.player.set_mute(!muted);
} }
PlaybackMessage::ToggleShuffle { span } => { PlaybackMessage::ToggleShuffle { span } => {
@ -316,16 +316,8 @@ impl Playback {
debug!("state changed"); debug!("state changed");
let play_state = { let play_state = {
*self.state.lock().unwrap() = state.clone(); *self.state.lock().unwrap() = state;
debug!("got state lock"); state
match state {
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
GstPlaystate::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading,
_ => PlayState::Unspecified,
}
}; };
debug!("released state lock and got play state {:?}", play_state); debug!("released state lock and got play state {:?}", play_state);
let active_track_tx = self.update_tx.clone(); let active_track_tx = self.update_tx.clone();
@ -338,8 +330,9 @@ impl Playback {
PlaybackMessage::RestartTrack { span } => { PlaybackMessage::RestartTrack { span } => {
let _e = span.enter(); let _e = span.enter();
debug!("restarting track"); debug!("restarting track");
self.play.stop(); if let Err(err) = self.player.restart().await {
self.play.play(); error!("{:?}", err)
}
} }
PlaybackMessage::VolumeChanged { volume, span } => { PlaybackMessage::VolumeChanged { volume, span } => {
@ -362,15 +355,14 @@ impl Playback {
} }
} }
PlaybackMessage::PostitionChanged { position, span } => { PlaybackMessage::PostitionChanged {
duration,
position,
span,
} => {
let _e = span.enter(); let _e = span.enter();
trace!("position changed"); trace!("position changed");
let update_tx = self.update_tx.clone(); let update_tx = self.update_tx.clone();
let duration = self
.play
.duration()
.and_then(|t| Some(t.mseconds() as u32))
.unwrap_or(0);
let update = StreamUpdate::Position(TrackPosition { duration, position }); let update = StreamUpdate::Position(TrackPosition { duration, position });
if let Err(err) = update_tx.send(update) { if let Err(err) = update_tx.send(update) {
error!("{:?}", err) error!("{:?}", err)
@ -404,6 +396,7 @@ impl Playback {
#[instrument(skip(self))] #[instrument(skip(self))]
async fn get_track(&self, uuid: &str) -> Result<Track, ProviderError> { async fn get_track(&self, uuid: &str) -> Result<Track, ProviderError> {
debug!("getting track");
let tx = self.provider_tx.clone(); let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1); let (result_tx, result_rx) = flume::bounded(1);
let span = tracing::trace_span!("prov-chan"); let span = tracing::trace_span!("prov-chan");
@ -474,11 +467,14 @@ impl Playback {
error!("{:?}", err) error!("{:?}", err)
} }
} }
self.play.stop(); if let Err(err) = self.player.stop().await {
self.play.set_uri(Some(&urls[0])); error!("{:?}", err)
self.play.play(); };
} else { if let Err(err) = self.player.play(&urls[0]).await {
self.play.stop(); error!("{:?}", err)
};
} else if let Err(err) = self.player.stop().await {
error!("{:?}", err)
} }
} }
@ -514,9 +510,12 @@ impl Playback {
error!("{:?}", err) error!("{:?}", err)
} }
} }
self.play.stop(); if let Err(err) = self.player.stop().await {
self.play.set_uri(Some(&urls[0])); error!("{:?}", err)
self.play.play(); };
if let Err(err) = self.player.play(&urls[0]).await {
error!("{:?}", err)
}
} }
} }
} }

View File

@ -17,10 +17,9 @@ pub struct ProviderOrchestrator {
} }
impl ProviderOrchestrator { impl ProviderOrchestrator {
#[instrument]
pub fn run(self) { pub fn run(self) {
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(msg) = self.provider_rx.recv_async().in_current_span().await { while let Ok(msg) = self.provider_rx.recv_async().await {
match msg { match msg {
ProviderMessage::GetLibraryNode { ProviderMessage::GetLibraryNode {
uuid, uuid,
@ -29,7 +28,11 @@ impl ProviderOrchestrator {
} => { } => {
let _e = span.enter(); let _e = span.enter();
let result = self.get_lib_node(&uuid).in_current_span().await; let result = self.get_lib_node(&uuid).in_current_span().await;
result_tx.send(result).unwrap(); result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
} }
ProviderMessage::GetTrack { ProviderMessage::GetTrack {
uuid, uuid,
@ -38,7 +41,11 @@ impl ProviderOrchestrator {
} => { } => {
let _e = span.enter(); let _e = span.enter();
let result = self.get_metadata_for_track(&uuid).in_current_span().await; let result = self.get_metadata_for_track(&uuid).in_current_span().await;
result_tx.send(result).unwrap(); result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
} }
ProviderMessage::GetTrackUrls { ProviderMessage::GetTrackUrls {
uuid, uuid,
@ -47,7 +54,11 @@ impl ProviderOrchestrator {
} => { } => {
let _e = span.enter(); let _e = span.enter();
let result = self.get_urls_for_track(&uuid).in_current_span().await; let result = self.get_urls_for_track(&uuid).in_current_span().await;
result_tx.send(result).unwrap(); result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
} }
ProviderMessage::FlattenNode { ProviderMessage::FlattenNode {
uuid, uuid,
@ -56,7 +67,11 @@ impl ProviderOrchestrator {
} => { } => {
let _e = span.enter(); let _e = span.enter();
let result = self.flatten_node(&uuid).in_current_span().await; let result = self.flatten_node(&uuid).in_current_span().await;
result_tx.send(result).unwrap(); result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
} }
} }
} }
@ -132,6 +147,7 @@ impl ProviderClient for ProviderOrchestrator {
} }
#[instrument(skip(self))] #[instrument(skip(self))]
async fn get_metadata_for_track(&self, track_uuid: &str) -> Result<Track, ProviderError> { async fn get_metadata_for_track(&self, track_uuid: &str) -> Result<Track, ProviderError> {
debug!("get_metadata_for_track");
self.tidal_client self.tidal_client
.get_metadata_for_track(track_uuid) .get_metadata_for_track(track_uuid)
.in_current_span() .in_current_span()

View File

@ -62,7 +62,7 @@ impl CrabidyService for RpcService {
.await .await
.map_err(|e| { .map_err(|e| {
error!("{:?}", e); error!("{:?}", e);
return Status::internal("Failed to receive response from provider channel"); Status::internal("Failed to receive response from provider channel")
})?; })?;
Ok(Response::new(response)) Ok(Response::new(response))
} }