Fix broken song deadlock and improve observability

This commit is contained in:
Hans Mündelein 2023-06-03 13:04:59 +02:00
parent e0f7ad5a9b
commit 4043865ad4
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
6 changed files with 295 additions and 931 deletions

112
Cargo.lock generated
View File

@ -382,6 +382,7 @@ dependencies = [
"futures", "futures",
"gstreamer", "gstreamer",
"gstreamer-play", "gstreamer-play",
"log",
"once_cell", "once_cell",
"serde", "serde",
"serde_json", "serde_json",
@ -389,6 +390,20 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",
"tracing",
"tracing-appender",
"tracing-log",
"tracing-subscriber",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
] ]
[[package]] [[package]]
@ -1264,12 +1279,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "lru-cache" name = "lru-cache"
@ -1400,6 +1412,16 @@ dependencies = [
"zbus", "zbus",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.45" version = "0.1.45"
@ -1500,6 +1522,12 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "parking" name = "parking"
version = "2.1.0" version = "2.1.0"
@ -2127,6 +2155,15 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "signal-hook" name = "signal-hook"
version = "0.3.15" version = "0.3.15"
@ -2315,6 +2352,16 @@ dependencies = [
"syn 2.0.16", "syn 2.0.16",
] ]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]] [[package]]
name = "tidaldy" name = "tidaldy"
version = "0.0.0" version = "0.0.0"
@ -2332,6 +2379,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"toml 0.7.4", "toml 0.7.4",
"tracing",
] ]
[[package]] [[package]]
@ -2351,8 +2399,10 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
dependencies = [ dependencies = [
"itoa",
"serde", "serde",
"time-core", "time-core",
"time-macros",
] ]
[[package]] [[package]]
@ -2361,6 +2411,15 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
[[package]]
name = "time-macros"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b"
dependencies = [
"time-core",
]
[[package]] [[package]]
name = "tinyvec" name = "tinyvec"
version = "1.6.0" version = "1.6.0"
@ -2580,6 +2639,17 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-appender"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"crossbeam-channel",
"time 0.3.21",
"tracing-subscriber",
]
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.24" version = "0.1.24"
@ -2598,6 +2668,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
] ]
[[package]] [[package]]
@ -2729,6 +2825,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]] [[package]]
name = "version-compare" name = "version-compare"
version = "0.1.1" version = "0.1.1"

View File

@ -56,7 +56,7 @@ pub enum QueueError {
} }
impl Queue { impl Queue {
pub fn current(&self) -> Option<Track> { pub fn current_track(&self) -> Option<Track> {
if self.current_position < self.tracks.len() as u32 { if self.current_position < self.tracks.len() as u32 {
Some(self.tracks[self.current_position as usize].clone()) Some(self.tracks[self.current_position as usize].clone())
} else { } else {
@ -73,6 +73,15 @@ impl Queue {
} }
} }
pub fn prev_track(&mut self) -> Option<Track> {
if 0 < self.current_position {
self.current_position -= 1;
Some(self.tracks[self.current_position as usize].clone())
} else {
None
}
}
pub fn set_current_position(&mut self, current_position: u32) -> bool { pub fn set_current_position(&mut self, current_position: u32) -> bool {
if current_position < self.tracks.len() as u32 { if current_position < self.tracks.len() as u32 {
self.current_position = current_position; self.current_position = current_position;
@ -82,9 +91,14 @@ impl Queue {
} }
} }
pub fn replace_with_tracks(&mut self, tracks: &[Track]) { pub fn replace_with_tracks(&mut self, tracks: &[Track]) -> Option<Track> {
self.current_position = 0; self.current_position = 0;
self.tracks = tracks.to_vec(); self.tracks = tracks.to_vec();
if 0 < self.tracks.len() as u32 {
Some(self.tracks[0].clone())
} else {
None
}
} }
pub fn append_tracks(&mut self, tracks: &[Track]) { pub fn append_tracks(&mut self, tracks: &[Track]) {
@ -94,23 +108,35 @@ impl Queue {
pub fn queue_tracks(&mut self, tracks: &[Track]) { pub fn queue_tracks(&mut self, tracks: &[Track]) {
let tail: Vec<Track> = self let tail: Vec<Track> = self
.tracks .tracks
.splice((self.current_position as usize).., tracks.to_vec()) .splice((self.current_position as usize + 1).., tracks.to_vec())
.collect(); .collect();
self.tracks.extend(tail); self.tracks.extend(tail);
} }
pub fn remove_tracks(&mut self, positions: &[u32]) { pub fn remove_tracks(&mut self, positions: &[u32]) -> Option<Track> {
let mut play_next = false;
for pos in positions { for pos in positions {
if pos == &self.current_position {
play_next = true;
}
if pos < &self.current_position {
self.current_position -= 1;
}
if *pos < self.tracks.len() as u32 { if *pos < self.tracks.len() as u32 {
self.tracks.remove(*pos as usize); self.tracks.remove(*pos as usize);
} }
} }
if play_next {
self.current_track()
} else {
None
}
} }
pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) { pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) {
let tail: Vec<Track> = self let tail: Vec<Track> = self
.tracks .tracks
.splice((position as usize).., tracks.to_vec()) .splice((position as usize + 1).., tracks.to_vec())
.collect(); .collect();
self.tracks.extend(tail); self.tracks.extend(tail);
} }

View File

@ -21,3 +21,8 @@ async-trait = "0.1.68"
futures = "0.3.28" futures = "0.3.28"
tokio-stream = { version = "0.1.14", features = ["sync"] } tokio-stream = { version = "0.1.14", features = ["sync"] }
dirs = "5.0.1" dirs = "5.0.1"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
tracing-appender = "0.2.2"
tracing-log = "0.1.3"
log = "0.4.18"

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@ serde_urlencoded = "0.7.1"
thiserror = "1.0.40" thiserror = "1.0.40"
tokio = { version = "1.28.1", features = ["full", "time"] } tokio = { version = "1.28.1", features = ["full", "time"] }
toml = "0.7.4" toml = "0.7.4"
tracing = "0.1.37"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.28.1", features = ["full"] } tokio = { version = "1.28.1", features = ["full"] }

View File

@ -3,6 +3,7 @@
use reqwest::Client as HttpClient; use reqwest::Client as HttpClient;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tokio::time::{sleep, Duration, Instant}; use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, instrument};
pub mod config; pub mod config;
pub mod models; pub mod models;
use async_trait::async_trait; use async_trait::async_trait;
@ -16,6 +17,7 @@ pub struct Client {
#[async_trait] #[async_trait]
impl crabidy_core::ProviderClient for Client { impl crabidy_core::ProviderClient for Client {
#[instrument(skip(raw_toml_settings))]
async fn init(raw_toml_settings: &str) -> Result<Self, crabidy_core::ProviderError> { async fn init(raw_toml_settings: &str) -> Result<Self, crabidy_core::ProviderError> {
let settings: config::Settings = if let Ok(settings) = toml::from_str(raw_toml_settings) { let settings: config::Settings = if let Ok(settings) = toml::from_str(raw_toml_settings) {
settings settings
@ -37,34 +39,43 @@ impl crabidy_core::ProviderClient for Client {
} }
Err(crabidy_core::ProviderError::CouldNotLogin) Err(crabidy_core::ProviderError::CouldNotLogin)
} }
#[instrument(skip(self))]
fn settings(&self) -> String { fn settings(&self) -> String {
toml::to_string_pretty(&self.settings).unwrap() toml::to_string_pretty(&self.settings).unwrap_or_default()
} }
#[instrument(skip(self))]
async fn get_urls_for_track( async fn get_urls_for_track(
&self, &self,
track_uuid: &str, track_uuid: &str,
) -> Result<Vec<String>, crabidy_core::ProviderError> { ) -> Result<Vec<String>, crabidy_core::ProviderError> {
debug!("get_urls_for_track {}", track_uuid);
let (_, track_uuid, _) = split_uuid(track_uuid); let (_, track_uuid, _) = split_uuid(track_uuid);
let Ok(playback) = self.get_track_playback(&track_uuid).await else { let Ok(playback) = self.get_track_playback(&track_uuid).await else {
return Err(crabidy_core::ProviderError::FetchError) return Err(crabidy_core::ProviderError::FetchError)
}; };
debug!("playback {:?}", playback);
let Ok(manifest) = playback.get_manifest() else { let Ok(manifest) = playback.get_manifest() else {
return Err(crabidy_core::ProviderError::FetchError) return Err(crabidy_core::ProviderError::FetchError)
}; };
debug!("manifest {:?}", manifest);
Ok(manifest.urls) Ok(manifest.urls)
} }
#[instrument(skip(self))]
async fn get_metadata_for_track( async fn get_metadata_for_track(
&self, &self,
track_uuid: &str, track_uuid: &str,
) -> Result<crabidy_core::proto::crabidy::Track, crabidy_core::ProviderError> { ) -> Result<crabidy_core::proto::crabidy::Track, crabidy_core::ProviderError> {
debug!("get_metadata_for_track {}", track_uuid);
let Ok(track) = self.get_track(track_uuid).await else { let Ok(track) = self.get_track(track_uuid).await else {
return Err(crabidy_core::ProviderError::FetchError) return Err(crabidy_core::ProviderError::FetchError)
}; };
Ok(track.into()) Ok(track.into())
} }
#[instrument(skip(self))]
fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode { fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode {
debug!("get_lib_root");
let global_root = crabidy_core::proto::crabidy::LibraryNode::new(); let global_root = crabidy_core::proto::crabidy::LibraryNode::new();
let children = vec![crabidy_core::proto::crabidy::LibraryNodeChild::new( let children = vec![crabidy_core::proto::crabidy::LibraryNodeChild::new(
"node:userplaylists".to_string(), "node:userplaylists".to_string(),
@ -80,6 +91,7 @@ impl crabidy_core::ProviderClient for Client {
} }
} }
#[instrument(skip(self))]
async fn get_lib_node( async fn get_lib_node(
&self, &self,
uuid: &str, uuid: &str,
@ -87,6 +99,7 @@ impl crabidy_core::ProviderClient for Client {
let Some(user_id) = self.settings.login.user_id.clone() else { let Some(user_id) = self.settings.login.user_id.clone() else {
return Err(crabidy_core::ProviderError::UnknownUser) return Err(crabidy_core::ProviderError::UnknownUser)
}; };
debug!("get_lib_node {}", uuid);
let (_kind, module, uuid) = split_uuid(uuid); let (_kind, module, uuid) = split_uuid(uuid);
let node = match module.as_str() { let node = match module.as_str() {
"userplaylists" => { "userplaylists" => {
@ -129,6 +142,7 @@ impl crabidy_core::ProviderClient for Client {
} }
} }
#[instrument]
fn split_uuid(uuid: &str) -> (String, String, String) { fn split_uuid(uuid: &str) -> (String, String, String) {
let mut split = uuid.splitn(3, ':'); let mut split = uuid.splitn(3, ':');
( (
@ -150,10 +164,12 @@ impl Client {
}) })
} }
#[instrument]
pub fn get_user_id(&self) -> Option<String> { pub fn get_user_id(&self) -> Option<String> {
self.settings.login.user_id.clone() self.settings.login.user_id.clone()
} }
#[instrument]
pub async fn make_request<T: DeserializeOwned>( pub async fn make_request<T: DeserializeOwned>(
&self, &self,
uri: &str, uri: &str,
@ -187,6 +203,7 @@ impl Client {
Ok(response) Ok(response)
} }
#[instrument]
pub async fn make_paginated_request<T: DeserializeOwned>( pub async fn make_paginated_request<T: DeserializeOwned>(
&self, &self,
uri: &str, uri: &str,
@ -244,6 +261,7 @@ impl Client {
Ok(items) Ok(items)
} }
#[instrument]
pub async fn make_explorer_request( pub async fn make_explorer_request(
&self, &self,
uri: &str, uri: &str,
@ -278,6 +296,7 @@ impl Client {
Ok(()) Ok(())
} }
#[instrument]
pub async fn search(&self, query: &str) -> Result<(), ClientError> { pub async fn search(&self, query: &str) -> Result<(), ClientError> {
let query = vec![("query", query.to_string())]; let query = vec![("query", query.to_string())];
self.make_explorer_request(&format!("search/artists"), Some(&query)) self.make_explorer_request(&format!("search/artists"), Some(&query))
@ -285,6 +304,7 @@ impl Client {
Ok(()) Ok(())
} }
#[instrument]
pub async fn get_playlist_tracks( pub async fn get_playlist_tracks(
&self, &self,
playlist_uuid: &str, playlist_uuid: &str,
@ -294,18 +314,21 @@ impl Client {
.await?) .await?)
} }
#[instrument]
pub async fn get_playlist(&self, playlist_uuid: &str) -> Result<Playlist, ClientError> { pub async fn get_playlist(&self, playlist_uuid: &str) -> Result<Playlist, ClientError> {
Ok(self Ok(self
.make_request(&format!("playlists/{}", playlist_uuid), None) .make_request(&format!("playlists/{}", playlist_uuid), None)
.await?) .await?)
} }
#[instrument]
pub async fn get_users_playlists(&self, user_id: u64) -> Result<Vec<Playlist>, ClientError> { pub async fn get_users_playlists(&self, user_id: u64) -> Result<Vec<Playlist>, ClientError> {
Ok(self Ok(self
.make_paginated_request(&format!("users/{}/playlists", user_id), None) .make_paginated_request(&format!("users/{}/playlists", user_id), None)
.await?) .await?)
} }
#[instrument]
pub async fn get_users_playlists_and_favorite_playlists( pub async fn get_users_playlists_and_favorite_playlists(
&self, &self,
user_id: &str, user_id: &str,
@ -318,6 +341,7 @@ impl Client {
.await?) .await?)
} }
#[instrument]
pub async fn explore_get_users_playlists_and_favorite_playlists( pub async fn explore_get_users_playlists_and_favorite_playlists(
&self, &self,
user_id: u64, user_id: u64,
@ -335,6 +359,7 @@ impl Client {
Ok(()) Ok(())
} }
#[instrument]
pub async fn get_users_favorites(&self, user_id: u64) -> Result<(), ClientError> { pub async fn get_users_favorites(&self, user_id: u64) -> Result<(), ClientError> {
self.make_explorer_request( self.make_explorer_request(
&format!("users/{}/favorites", user_id), &format!("users/{}/favorites", user_id),
@ -345,6 +370,7 @@ impl Client {
Ok(()) Ok(())
} }
#[instrument]
pub async fn get_user(&self, user_id: u64) -> Result<(), ClientError> { pub async fn get_user(&self, user_id: u64) -> Result<(), ClientError> {
self.make_explorer_request( self.make_explorer_request(
&format!("users/{}", user_id), &format!("users/{}", user_id),
@ -355,6 +381,7 @@ impl Client {
Ok(()) Ok(())
} }
#[instrument]
pub async fn get_track_playback(&self, track_id: &str) -> Result<TrackPlayback, ClientError> { pub async fn get_track_playback(&self, track_id: &str) -> Result<TrackPlayback, ClientError> {
let query = vec![ let query = vec![
("audioquality", "LOSSLESS".to_string()), ("audioquality", "LOSSLESS".to_string()),
@ -368,12 +395,14 @@ impl Client {
.await .await
} }
#[instrument]
pub async fn get_track(&self, track_id: &str) -> Result<Track, ClientError> { pub async fn get_track(&self, track_id: &str) -> Result<Track, ClientError> {
let (_, track_id, _) = split_uuid(track_id); let (_, track_id, _) = split_uuid(track_id);
self.make_request(&format!("tracks/{}", track_id), None) self.make_request(&format!("tracks/{}", track_id), None)
.await .await
} }
#[instrument]
pub async fn login_web(&mut self) -> Result<(), ClientError> { pub async fn login_web(&mut self) -> Result<(), ClientError> {
let code_response = self.get_device_code().await?; let code_response = self.get_device_code().await?;
let now = Instant::now(); let now = Instant::now();
@ -399,6 +428,7 @@ impl Client {
Err(ClientError::ConnectionError) Err(ClientError::ConnectionError)
} }
#[instrument(skip(self))]
pub async fn login_config(&mut self) -> Result<(), ClientError> { pub async fn login_config(&mut self) -> Result<(), ClientError> {
let Some(access_token) = self.settings.login.access_token.clone() else { let Some(access_token) = self.settings.login.access_token.clone() else {
return Err(ClientError::AuthError( return Err(ClientError::AuthError(
@ -427,6 +457,7 @@ impl Client {
Ok(()) Ok(())
} }
#[instrument]
pub async fn refresh_access_token(&self) -> Result<RefreshResponse, ClientError> { pub async fn refresh_access_token(&self) -> Result<RefreshResponse, ClientError> {
let Some(refresh_token) = self.settings.login.refresh_token.clone() else { let Some(refresh_token) = self.settings.login.refresh_token.clone() else {
return Err(ClientError::AuthError( return Err(ClientError::AuthError(
@ -462,6 +493,7 @@ impl Client {
)) ))
} }
} }
#[instrument]
async fn get_device_code(&self) -> Result<DeviceAuthResponse, ClientError> { async fn get_device_code(&self) -> Result<DeviceAuthResponse, ClientError> {
let req = DeviceAuthRequest { let req = DeviceAuthRequest {
client_id: self.settings.oauth.client_id.clone(), client_id: self.settings.oauth.client_id.clone(),
@ -487,6 +519,7 @@ impl Client {
Ok(code) Ok(code)
} }
#[instrument]
pub async fn check_auth_status( pub async fn check_auth_status(
&self, &self,
device_code: &str, device_code: &str,
@ -533,7 +566,7 @@ mod tests {
fn setup() -> Client { fn setup() -> Client {
let settings = crate::config::Settings::default(); let settings = crate::config::Settings::default();
Client::new(settings).unwrap() Client::new(settings).expect("could not create tidaldy client")
} }
#[tokio::test] #[tokio::test]