Compare commits

..

20 Commits

Author SHA1 Message Date
chmanie a1987869c2 Styling fixes
CI checks / stable / fmt (push) Has been cancelled Details
2023-10-08 23:48:49 +02:00
Hans Mündelein d6406d1e09
Add users artists album support
CI checks / stable / fmt (push) Successful in 4s Details
CI release / stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) (push) Successful in 14m15s Details
CI release / stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) (push) Successful in 20m49s Details
CI release / stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) (push) Successful in 22m2s Details
2023-06-17 13:16:19 +02:00
Hans Mündelein 24e968ac08
Start playing when adding to an empty queue
CI checks / stable / fmt (push) Successful in 3s Details
CI release / stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) (push) Successful in 5m52s Details
CI release / stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) (push) Successful in 7m41s Details
CI release / stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) (push) Successful in 7m40s Details
2023-06-14 11:49:27 +02:00
Hans Mündelein a4303b9b70
Change tidal track duration to milliseconds
CI checks / stable / fmt (push) Successful in 3s Details
2023-06-14 11:31:15 +02:00
Hans Mündelein efc36e88f8
Don't error on failing broadcast channel send
CI checks / stable / fmt (push) Successful in 3s Details
2023-06-14 11:21:33 +02:00
Hans Mündelein 6db11131c0
Fix incorrect offset when turning off shuffle
CI checks / stable / fmt (push) Successful in 4s Details
2023-06-14 11:15:42 +02:00
Hans Mündelein f8a77ee6ed
Stop playing if last track is removed
CI checks / stable / fmt (push) Successful in 3s Details
2023-06-14 10:59:05 +02:00
Hans Mündelein 97ebb44ca5
Fix panic on next on empty queue
CI checks / stable / fmt (push) Successful in 3s Details
2023-06-14 10:51:29 +02:00
Hans Mündelein dfd2e0af92
Fix incorrect position update on queue and insert
CI checks / stable / fmt (push) Successful in 4s Details
2023-06-14 10:44:26 +02:00
Hans Mündelein 5c50544523
Unnwrap server code
CI checks / stable / fmt (push) Successful in 4s Details
2023-06-14 09:57:26 +02:00
chmanie 73cc79d776 Clean up a bit
CI checks / stable / fmt (push) Successful in 4s Details
2023-06-13 10:12:26 +02:00
chmanie a7c2fe391b Try tag filtering again
CI release / stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) (push) Successful in 4m52s Details
CI release / stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) (push) Successful in 6m7s Details
CI release / stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) (push) Successful in 7m5s Details
2023-06-13 10:00:16 +02:00
chmanie 2f89886e5d Show album and release date
stable / fmt Details
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
2023-06-13 00:59:54 +02:00
chmanie 4b34fa7233 Add key command to select currently playing track
stable / fmt Details
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
2023-06-13 00:17:56 +02:00
Hans Mündelein 902c0b903f
Replace some unwraps with more error logging
stable / fmt Details
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
2023-06-12 22:34:39 +02:00
Hans Mündelein 5d1a62c630
Add exclude current clearing for the server
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
stable / fmt Details
2023-06-12 22:18:34 +02:00
chmanie 02f47d682b Implement clear queue in tui
stable / fmt Details
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
2023-06-12 22:07:41 +02:00
chmanie 6ac13a710c Fail release workflow if not in tags
stable / fmt Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
2023-06-12 21:48:09 +02:00
Hans Mündelein 18671683ff
Add clear queue to server
stable / fmt Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
2023-06-12 20:49:37 +02:00
chmanie e71398d243 Add proto definition for ClearQueue
stable / cross-${{ matrix.target }} (aarch64-unknown-linux-gnu) Details
stable / fmt Details
stable / cross-${{ matrix.target }} (armv7-unknown-linux-gnueabihf) Details
stable / cross-${{ matrix.target }} (x86_64-unknown-linux-gnu) Details
2023-06-12 20:23:22 +02:00
18 changed files with 870 additions and 288 deletions

View File

@ -3,14 +3,14 @@ run-name: CI release
on:
push:
tags:
- '*'
- 'v*'
jobs:
release:
runs-on: rust-cross
name: stable / cross-${{ matrix.target }}
strategy:
fail-fast: false
fail-fast: true
matrix:
target: ["armv7-unknown-linux-gnueabihf", "aarch64-unknown-linux-gnu", "x86_64-unknown-linux-gnu"]
steps:

Binary file not shown.

View File

@ -222,6 +222,7 @@ impl PlayerEngine {
SEEK_TO.store(time.as_secs(), Ordering::SeqCst);
// FIXME: ideally we would like to return once the seeking is successful
// then return the current elapsed time
// Cond-var might be needed to sleep this (seeking takes time)
Ok(time)
}

View File

@ -1,5 +1,6 @@
pub use ratatui::widgets::ListState;
// FIXME: Move marking stuff here, to be able to use it in queue as well
pub trait StatefulList {
fn get_size(&self) -> usize;
fn select(&mut self, idx: Option<usize>);

View File

@ -65,6 +65,7 @@ pub enum MessageFromUi {
InsertTracks(Vec<String>, usize),
RemoveTracks(Vec<usize>),
ReplaceQueue(Vec<String>),
ClearQueue(bool),
NextTrack,
PrevTrack,
RestartTrack,

View File

@ -45,10 +45,21 @@ impl NowPlaying {
}
pub fn update_track(&mut self, active: Option<Track>) {
if let Some(track) = &active {
let body = if let Some(ref album) = track.album {
format!(
"{} by {}\n\n{} ({})",
track.title,
track.artist,
album.title,
// FIXME: get out year and format differently if it's missing
album.release_date()
)
} else {
format!("{} by {}", track.title, track.artist,)
};
Notification::new()
.summary("Crabidy playing")
// FIXME: album
.body(&format!("{} by {}", track.title, track.artist))
.summary("Now playing")
.body(&body)
.show()
.unwrap();
}

View File

@ -41,6 +41,9 @@ impl Queue {
self.tx.send(MessageFromUi::SetCurrentTrack(pos));
}
}
pub fn select_current(&mut self) {
self.select(Some(self.current_position));
}
pub fn remove_track(&mut self) {
if let Some(pos) = self.selected() {
// FIXME: mark multiple tracks on queue and remove them

View File

@ -1,9 +1,4 @@
use crabidy_core::{
clap::{self},
clap_serde_derive,
serde::Serialize,
ClapSerde,
};
use crabidy_core::{clap, clap_serde_derive, serde::Serialize, ClapSerde};
#[derive(ClapSerde, Serialize, Debug)]
#[clap(author, version, about)]

View File

@ -123,6 +123,9 @@ async fn poll(
MessageFromUi::ToggleRepeat => {
rpc_client.toggle_repeat().await?
}
MessageFromUi::ClearQueue(exclude_current) => {
rpc_client.clear_queue(exclude_current).await?
}
}
}
Some(resp) = rpc_client.update_stream.next() => {
@ -300,12 +303,21 @@ fn run_ui(tx: Sender<MessageFromUi>, rx: Receiver<MessageToUi>) {
(UiFocus::Queue, KeyModifiers::CONTROL, KeyCode::Char('u')) => {
app.queue.up();
}
(UiFocus::Queue, KeyModifiers::NONE, KeyCode::Char('o')) => {
app.queue.select_current();
}
(UiFocus::Queue, KeyModifiers::NONE, KeyCode::Enter) => {
app.queue.play_selected();
}
(UiFocus::Queue, KeyModifiers::NONE, KeyCode::Char('d')) => {
app.queue.remove_track();
}
(UiFocus::Queue, KeyModifiers::NONE, KeyCode::Char('c')) => {
tx.send(MessageFromUi::ClearQueue(true));
}
(UiFocus::Queue, KeyModifiers::SHIFT, KeyCode::Char('C')) => {
tx.send(MessageFromUi::ClearQueue(false));
}
_ => {}
}
}

View File

@ -1,7 +1,7 @@
use crabidy_core::proto::crabidy::{
crabidy_service_client::CrabidyServiceClient, AppendRequest, ChangeVolumeRequest,
GetLibraryNodeRequest, GetUpdateStreamRequest, GetUpdateStreamResponse, InitRequest,
InitResponse, InsertRequest, LibraryNode, NextRequest, PrevRequest, QueueRequest,
ClearQueueRequest, GetLibraryNodeRequest, GetUpdateStreamRequest, GetUpdateStreamResponse,
InitRequest, InitResponse, InsertRequest, LibraryNode, NextRequest, PrevRequest, QueueRequest,
RemoveRequest, ReplaceRequest, RestartTrackRequest, SetCurrentRequest, ToggleMuteRequest,
TogglePlayRequest, ToggleRepeatRequest, ToggleShuffleRequest,
};
@ -128,6 +128,12 @@ impl RpcClient {
Ok(())
}
pub async fn clear_queue(&mut self, exclude_current: bool) -> Result<(), Box<dyn Error>> {
let clear_queue_request = Request::new(ClearQueueRequest { exclude_current });
self.client.clear_queue(clear_queue_request).await?;
Ok(())
}
pub async fn replace_queue(&mut self, uuids: Vec<String>) -> Result<(), Box<dyn Error>> {
let replace_request = Request::new(ReplaceRequest { uuids });
self.client.replace(replace_request).await?;

View File

@ -14,6 +14,7 @@ service CrabidyService {
rpc Append(AppendRequest) returns (AppendResponse);
rpc Remove(RemoveRequest) returns (RemoveResponse);
rpc Insert(InsertRequest) returns (InsertResponse);
rpc ClearQueue(ClearQueueRequest) returns (ClearQueueResponse);
rpc SetCurrent(SetCurrentRequest) returns (SetCurrentResponse);
rpc ToggleShuffle(ToggleShuffleRequest) returns (ToggleShuffleResponse);
rpc ToggleRepeat(ToggleRepeatRequest) returns (ToggleRepeatResponse);
@ -93,6 +94,11 @@ message SaveQueueRequest {
}
message SaveQueueResponse {}
message ClearQueueRequest {
bool exclude_current = 1;
}
message ClearQueueResponse {}
// Stream
message GetUpdateStreamRequest {}
message GetUpdateStreamResponse {

View File

@ -1,9 +1,9 @@
use crabidy_core::proto::crabidy::{Queue, Track};
use rand::{seq::SliceRandom, thread_rng};
use std::time::SystemTime;
use tracing::debug;
use tracing::{debug, error};
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct QueueManager {
created_at: SystemTime,
current_offset: usize,
@ -16,7 +16,11 @@ pub struct QueueManager {
impl From<QueueManager> for Queue {
fn from(queue_manager: QueueManager) -> Self {
Self {
timestamp: queue_manager.created_at.elapsed().unwrap().as_secs(),
timestamp: queue_manager
.created_at
.elapsed()
.expect("failed to get elapsed time")
.as_secs(),
current_position: queue_manager.current_position() as u32,
tracks: queue_manager.tracks,
}
@ -42,6 +46,10 @@ impl QueueManager {
}
}
pub fn is_last_track(&self) -> bool {
self.current_position() == self.tracks.len() - 1
}
pub fn shuffle_on(&mut self) {
self.shuffle = true;
self.shuffle_before(self.current_offset);
@ -50,6 +58,8 @@ impl QueueManager {
pub fn shuffle_off(&mut self) {
self.shuffle = false;
let pos = self.current_position();
self.current_offset = pos;
self.play_order = (0..self.tracks.len()).collect();
}
@ -75,6 +85,9 @@ impl QueueManager {
pub fn next_track(&mut self) -> Option<Track> {
let len = self.tracks.len();
if len == 0 {
return None;
};
if self.current_offset < len - 1 {
self.current_offset += 1;
let current_pos = self.current_position();
@ -111,11 +124,15 @@ impl QueueManager {
if self.shuffle {
self.shuffle_all();
}
let current_offset = self
let Some(current_offset) = self
.play_order
.iter()
.position(|&i| i == current_position as usize)
.unwrap();
else {
error!("invalid current position");
error!("queue: {:#?}", self);
return false
};
if self.shuffle {
self.play_order.swap(0, current_offset);
self.current_offset = 0;
@ -142,36 +159,19 @@ impl QueueManager {
}
}
pub fn append_tracks(&mut self, tracks: &[Track]) {
pub fn append_tracks(&mut self, tracks: &[Track]) -> Option<Track> {
let len = self.tracks.len();
let is_empty = len == 0;
let order_additions: Vec<usize> = (len..len + tracks.len()).collect();
self.play_order.extend(order_additions);
self.tracks.extend(tracks.iter().cloned());
if self.shuffle {
self.shuffle_behind(self.current_offset);
}
}
pub fn queue_tracks(&mut self, tracks: &[Track]) {
let len = self.tracks.len();
if len == 0 {
self.replace_with_tracks(tracks);
return;
}
let pos = self.current_position();
let order_additions: Vec<usize> = (len..len + tracks.len()).collect();
self.play_order.extend(order_additions);
let tail: Vec<Track> = self
.tracks
.splice((self.current_position() + 1).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
self.play_order
.iter_mut()
.filter(|i| (pos as usize) < **i)
.for_each(|i| *i += len);
if self.shuffle {
self.shuffle_behind(self.current_offset);
if is_empty {
self.current_track()
} else {
None
}
}
@ -184,11 +184,15 @@ impl QueueManager {
if *pos == self.current_position() as u32 {
play_next = true;
}
let offset = self
let Some(offset) = self
.play_order
.iter()
.position(|&i| i == *pos as usize)
.unwrap();
else {
error!("invalid current position");
error!("queue: {:#?}", self);
return None
};
if offset < self.current_offset {
self.current_offset -= 1;
}
@ -206,8 +210,11 @@ impl QueueManager {
}
}
pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) {
pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) -> Option<Track> {
let len = self.tracks.len();
if len == 0 {
return self.replace_with_tracks(tracks);
}
let order_additions: Vec<usize> = (len..len + tracks.len()).collect();
self.play_order.extend(order_additions);
let tail: Vec<Track> = self
@ -215,9 +222,57 @@ impl QueueManager {
.splice((position as usize + 1).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
let mut changed: Vec<usize> = Vec::new();
// in shuffle mode, it might be that we played already postions which are behind
// the insertion point and which postions are shifted by the lenght of the inserted
// track
for i in self
.play_order
.iter_mut()
.take(self.current_offset)
.filter(|i| (position as usize) < **i)
{
*i += len;
changed.push(*i);
}
if !self.shuffle {
// if we don't shuffle, there should be no positions alredy played behind the
// current track
assert!(changed.is_empty());
}
// the newly inserted indices need to replaced with the ones that we already handled
self.play_order
.iter_mut()
.skip(self.current_offset)
.for_each(|i| {
if changed.contains(i) {
*i -= len;
}
});
if self.shuffle {
self.shuffle_behind(self.current_offset);
}
None
}
pub fn queue_tracks(&mut self, tracks: &[Track]) -> Option<Track> {
let pos = self.current_position();
self.insert_tracks(pos as u32, tracks)
}
pub fn clear(&mut self, exclude_current: bool) -> bool {
let current_track = self.current_track();
self.current_offset = 0;
self.tracks.clear();
if exclude_current {
if let Some(track) = current_track {
self.tracks.push(track);
}
}
!exclude_current
}
}

View File

@ -3,7 +3,7 @@ use crabidy_core::proto::crabidy::{
crabidy_service_server::CrabidyServiceServer, InitResponse, LibraryNode, PlayState, Track,
};
use crabidy_core::{ProviderClient, ProviderError};
use tracing::{debug_span, info, instrument, warn, Span};
use tracing::{debug_span, error, info, instrument, level_filters, warn, Span};
use tracing_subscriber::{filter::Targets, prelude::*};
mod playback;
@ -22,8 +22,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stderr());
let targets_filter =
Targets::new().with_target("crabidy_server", tracing::level_filters::LevelFilter::DEBUG);
let targets_filter = Targets::new()
.with_target("crabidy_server", tracing::level_filters::LevelFilter::DEBUG)
.with_target("tidaldy", level_filters::LevelFilter::DEBUG);
let subscriber = tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_file(true)
@ -39,7 +40,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("audio player started initialized");
let (update_tx, _) = tokio::sync::broadcast::channel(2048);
let orchestrator = ProviderOrchestrator::init("").await.unwrap();
let orchestrator = ProviderOrchestrator::init("")
.await
.expect("failed to init orchestrator");
let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone());
@ -76,44 +79,51 @@ fn poll_play_bus(rx: flume::Receiver<PlayerMessage>, tx: flume::Sender<PlaybackM
let span = debug_span!("play-chan");
match msg {
PlayerMessage::EndOfStream => {
tx.send(PlaybackMessage::Next { span }).unwrap();
if let Err(err) = tx.send(PlaybackMessage::Next { span }) {
error!("failed to send next message: {}", err);
}
}
PlayerMessage::Stopped => {
tx.send(PlaybackMessage::StateChanged {
if let Err(err) = tx.send(PlaybackMessage::StateChanged {
state: PlayState::Stopped,
span,
})
.unwrap();
}) {
error!("failed to send stopped message: {}", err);
}
}
PlayerMessage::Paused => {
tx.send(PlaybackMessage::StateChanged {
if let Err(err) = tx.send(PlaybackMessage::StateChanged {
state: PlayState::Paused,
span,
})
.unwrap();
}) {
error!("failed to send paused message: {}", err);
}
}
PlayerMessage::Playing => {
tx.send(PlaybackMessage::StateChanged {
if let Err(err) = tx.send(PlaybackMessage::StateChanged {
state: PlayState::Playing,
span,
})
.unwrap();
}) {
error!("failed to send playing message: {}", err);
}
}
PlayerMessage::Elapsed { duration, elapsed } => {
tx.send(PlaybackMessage::PostitionChanged {
if let Err(err) = tx.send(PlaybackMessage::PostitionChanged {
duration: duration.as_millis() as u32,
position: elapsed.as_millis() as u32,
span,
})
.unwrap();
}) {
error!("failed to send elapsed message: {}", err);
}
}
PlayerMessage::Duration { duration } => {
tx.send(PlaybackMessage::PostitionChanged {
if let Err(err) = tx.send(PlaybackMessage::PostitionChanged {
duration: duration.as_millis() as u32,
position: 0,
span,
})
.unwrap();
}) {
error!("failed to send duration message: {}", err);
}
}
}
}
@ -170,6 +180,10 @@ pub enum PlaybackMessage {
uuids: Vec<String>,
span: Span,
},
Clear {
exclude_current: bool,
span: Span,
},
SetCurrent {
position: u32,
span: Span,

View File

@ -51,7 +51,10 @@ impl Playback {
let repeat;
let shuffle;
let response = {
let queue = self.queue.lock().unwrap();
let Ok(queue) = self.queue.lock() else {
error!("failed to get queue lock");
continue;
};
debug!("got queue lock");
repeat = queue.repeat;
shuffle = queue.shuffle;
@ -69,13 +72,11 @@ impl Playback {
trace!("position {:?}", position);
let play_state = {
debug!("getting play state lock");
match *self.state.lock().unwrap() {
PlayState::Playing => PlayState::Playing,
PlayState::Paused => PlayState::Paused,
PlayState::Stopped => PlayState::Stopped,
PlayState::Loading => PlayState::Loading,
_ => PlayState::Unspecified,
}
let Ok(play_state) = self.state.lock() else {
error!("failed to get play state lock");
continue;
};
*play_state
};
trace!("play_state {:?}", play_state);
debug!("released play state lock");
@ -90,7 +91,9 @@ impl Playback {
}
};
trace!("response {:?}", response);
result_tx.send(response).unwrap();
if let Err(err) = result_tx.send(response) {
error!("failed to send response: {:#?}", err);
}
}
PlaybackMessage::Replace { uuids, span } => {
let _e = span.enter();
@ -108,12 +111,17 @@ impl Playback {
}
trace!("got tracks {:?}", all_tracks);
let current = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.replace_with_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
if let Err(err) = queue_update_tx.send(update) {
trace!("{:?}", err)
};
queue.current_track()
};
debug!("got current {:?}", current);
@ -135,17 +143,22 @@ impl Playback {
}
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
let track = {
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
queue.queue_tracks(&all_tracks);
continue;
};
debug!("got queue lock");
let track = queue.queue_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
}
trace!("{:?}", err)
}
track
};
debug!("que lock released");
self.play(track).in_current_span().await;
}
PlaybackMessage::Append { uuids, span } => {
@ -163,37 +176,61 @@ impl Playback {
}
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
let track = {
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
queue.append_tracks(&all_tracks);
continue;
};
debug!("got queue lock");
let track = queue.append_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
}
trace!("{:?}", err)
}
track
};
debug!("queue lock released");
self.play(track).in_current_span().await;
}
PlaybackMessage::Remove { positions, span } => {
let _e = span.enter();
let is_last;
debug!("removing");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
is_last = queue.is_last_track();
let track = queue.remove_tracks(&positions);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
if let Err(err) = queue_update_tx.send(update) {
trace!("{:?}", err)
};
track
};
debug!("queue lock released");
let state = *self.state.lock().unwrap();
let state = {
let Ok(state) = self.state.lock() else {
error!("failed to get play state lock");
continue;
};
*state
};
if state == PlayState::Playing {
if is_last {
if let Err(err) = self.player.stop().in_current_span().await {
error!("{:?}", err)
}
} else {
self.play(track).in_current_span().await;
}
}
}
PlaybackMessage::Insert {
position,
@ -214,15 +251,50 @@ impl Playback {
}
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
let track = {
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
queue.insert_tracks(position, &all_tracks);
continue;
};
debug!("got queue lock");
let track = queue.insert_tracks(position, &all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
queue_update_tx.send(update).unwrap();
}
if let Err(err) = queue_update_tx.send(update) {
trace!("{:?}", err)
};
track
};
debug!("queue lock released");
self.play(track).in_current_span().await;
}
PlaybackMessage::Clear {
exclude_current,
span,
} => {
let _e = span.enter();
debug!("clearing");
let should_stop = {
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
let should_stop = queue.clear(exclude_current);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone().into());
if let Err(err) = queue_update_tx.send(update) {
trace!("{:?}", err)
};
should_stop
};
debug!("queue lock released");
if should_stop {
if let Err(err) = self.player.stop().in_current_span().await {
error!("{:?}", err)
}
}
}
PlaybackMessage::SetCurrent {
@ -232,7 +304,10 @@ impl Playback {
let _e = span.enter();
debug!("setting current");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.set_current_position(queue_position);
queue.current_track()
@ -247,7 +322,10 @@ impl Playback {
let shuffle;
let repeat;
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
repeat = queue.repeat;
if queue.shuffle {
@ -261,7 +339,7 @@ impl Playback {
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Mods(QueueModifiers { shuffle, repeat });
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
@ -271,7 +349,10 @@ impl Playback {
let shuffle;
let repeat;
{
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
shuffle = queue.shuffle;
if queue.repeat {
@ -285,7 +366,7 @@ impl Playback {
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Mods(QueueModifiers { shuffle, repeat });
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
@ -293,7 +374,13 @@ impl Playback {
let _e = span.enter();
debug!("toggling play");
{
let state = *self.state.lock().unwrap();
let state = {
let Ok(state) = self.state.lock() else {
debug!("got state lock");
continue;
};
*state
};
debug!("got state lock");
if state == PlayState::Playing {
if let Err(err) = self.player.pause().await {
@ -337,7 +424,10 @@ impl Playback {
let _e = span.enter();
debug!("nexting");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.next_track()
};
@ -350,7 +440,10 @@ impl Playback {
let _e = span.enter();
debug!("preving");
let track = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
debug!("got queue lock");
queue.prev_track()
};
@ -363,14 +456,18 @@ impl Playback {
debug!("state changed");
let play_state = {
*self.state.lock().unwrap() = state;
let Ok(mut state_lock) = self.state.lock() else {
debug!("got state lock");
continue;
};
*state_lock = state;
state
};
debug!("released state lock and got play state {:?}", play_state);
let active_track_tx = self.update_tx.clone();
let update = StreamUpdate::PlayState(play_state as i32);
if let Err(err) = active_track_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
};
}
@ -388,7 +485,7 @@ impl Playback {
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Volume(volume);
if let Err(err) = update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
@ -398,7 +495,7 @@ impl Playback {
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Mute(muted);
if let Err(err) = update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
@ -412,7 +509,7 @@ impl Playback {
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Position(TrackPosition { duration, position });
if let Err(err) = update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
}
@ -422,6 +519,7 @@ impl Playback {
#[instrument(skip(self))]
async fn flatten_node(&self, uuid: &str) -> Vec<Track> {
debug!("flattening node");
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = debug_span!("prov-chan");
@ -464,6 +562,7 @@ impl Playback {
#[instrument(skip(self))]
async fn get_urls_for_track(&self, uuid: &str) -> Result<Vec<String>, ProviderError> {
debug!("getting urls for track");
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = tracing::trace_span!("prov-chan");
@ -484,6 +583,7 @@ impl Playback {
#[instrument(skip(self))]
async fn play_or_stop(&self, track: Option<Track>) {
debug!("play or stop");
if let Some(track) = track {
let mut uuid = track.uuid.clone();
let urls = loop {
@ -492,7 +592,10 @@ impl Playback {
Err(err) => {
warn!("no urls found for track {:?}: {}", track.uuid, err);
uuid = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("got queue lock");
continue;
};
if let Some(track) = queue.next_track() {
track.uuid.clone()
} else {
@ -503,7 +606,10 @@ impl Playback {
}
};
{
let queue = self.queue.lock().unwrap();
let Ok(queue) = self.queue.lock() else {
error!("poisend queue lock");
return
};
let queue_update_tx = self.update_tx.clone();
let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack {
@ -511,7 +617,7 @@ impl Playback {
track,
});
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
if let Err(err) = self.player.play(&urls[0]).await {
@ -524,6 +630,7 @@ impl Playback {
#[instrument(skip(self))]
async fn play(&self, track: Option<Track>) {
debug!("play");
if let Some(track) = track {
let mut uuid = track.uuid.clone();
let urls = loop {
@ -532,7 +639,10 @@ impl Playback {
Err(err) => {
warn!("no urls found for track {:?}: {}", track.uuid, err);
uuid = {
let mut queue = self.queue.lock().unwrap();
let Ok(mut queue) = self.queue.lock() else {
debug!("poisend queue lock");
return;
};
if let Some(track) = queue.next_track() {
track.uuid.clone()
} else {
@ -543,7 +653,10 @@ impl Playback {
}
};
{
let queue = self.queue.lock().unwrap();
let Ok(queue) = self.queue.lock() else {
error!("poisend queue lock");
return
};
let queue_update_tx = self.update_tx.clone();
let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack {
@ -551,7 +664,7 @@ impl Playback {
track,
});
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
trace!("{:?}", err)
}
}
if let Err(err) = self.player.play(&urls[0]).await {

View File

@ -28,11 +28,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.get_lib_node(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
ProviderMessage::GetTrack {
uuid,
@ -41,11 +39,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.get_metadata_for_track(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
ProviderMessage::GetTrackUrls {
uuid,
@ -54,11 +50,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.get_urls_for_track(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
ProviderMessage::FlattenNode {
uuid,
@ -67,11 +61,9 @@ impl ProviderOrchestrator {
} => {
let _e = span.enter();
let result = self.flatten_node(&uuid).in_current_span().await;
result_tx
.send_async(result)
.in_current_span()
.await
.unwrap();
if let Err(err) = result_tx.send_async(result).in_current_span().await {
error!("failed to send result: {}", err);
}
}
}
}
@ -118,7 +110,7 @@ impl ProviderClient for ProviderOrchestrator {
tidaldy::Client::init(&raw_toml_settings)
.in_current_span()
.await
.unwrap(),
.expect("Failed to init Tidal clienta"),
);
let new_toml_config = tidal_client.settings();
if let Err(err) = tokio::fs::write(&config_file, new_toml_config)
@ -140,6 +132,7 @@ impl ProviderClient for ProviderOrchestrator {
}
#[instrument(skip(self))]
async fn get_urls_for_track(&self, track_uuid: &str) -> Result<Vec<String>, ProviderError> {
debug!("get_urls_for_track");
self.tidal_client
.get_urls_for_track(track_uuid)
.in_current_span()
@ -155,6 +148,7 @@ impl ProviderClient for ProviderOrchestrator {
}
#[instrument(skip(self))]
fn get_lib_root(&self) -> LibraryNode {
debug!("get_lib_root in provider manager");
let mut root_node = LibraryNode::new();
let child = LibraryNodeChild::new("node:tidal".to_owned(), "tidal".to_owned(), false);
root_node.children.push(child);
@ -162,13 +156,16 @@ impl ProviderClient for ProviderOrchestrator {
}
#[instrument(skip(self))]
async fn get_lib_node(&self, uuid: &str) -> Result<LibraryNode, ProviderError> {
debug!("get_lib_node");
debug!("get_lib_node in provider manager");
if uuid == "node:/" {
debug!("get global root");
return Ok(self.get_lib_root());
}
if uuid == "node:tidal" {
debug!("get tidal root");
return Ok(self.tidal_client.get_lib_root());
}
debug!("tidal node");
self.tidal_client.get_lib_node(uuid).in_current_span().await
}
}

View File

@ -1,13 +1,13 @@
use crate::{PlaybackMessage, ProviderMessage};
use crabidy_core::proto::crabidy::{
crabidy_service_server::CrabidyService, get_update_stream_response::Update as StreamUpdate,
AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse,
GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse,
InitRequest, InitResponse, InsertRequest, InsertResponse, NextRequest, NextResponse,
PrevRequest, PrevResponse, QueueRequest, QueueResponse, RemoveRequest, RemoveResponse,
ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse, SaveQueueRequest,
SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse,
ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse,
AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse, ClearQueueRequest,
ClearQueueResponse, GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest,
GetUpdateStreamResponse, InitRequest, InitResponse, InsertRequest, InsertResponse, NextRequest,
NextResponse, PrevRequest, PrevResponse, QueueRequest, QueueResponse, RemoveRequest,
RemoveResponse, ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse,
SaveQueueRequest, SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest,
StopResponse, ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse,
ToggleRepeatRequest, ToggleRepeatResponse, ToggleShuffleRequest, ToggleShuffleResponse,
};
use futures::TryStreamExt;
@ -91,10 +91,16 @@ impl CrabidyService for RpcService {
.recv_async()
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
.map_err(|e| {
error!("{:?}", e);
Status::internal("Failed to receive response from provider channel")
})?;
match result {
Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })),
Err(err) => Err(Status::internal(err.to_string())),
Err(err) => {
error!("{:?}", err);
Err(Status::internal(err.to_string()))
}
}
}
@ -201,6 +207,28 @@ impl CrabidyService for RpcService {
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(exclude_current))]
async fn clear_queue(
&self,
request: tonic::Request<ClearQueueRequest>,
) -> std::result::Result<tonic::Response<ClearQueueResponse>, tonic::Status> {
let exclude_current = request.into_inner().exclude_current;
Span::current().record("exclude_current", exclude_current);
debug!("Received clear_queue request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Clear {
exclude_current,
span,
})
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = ClearQueueResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(position))]
async fn set_current(
&self,
@ -228,11 +256,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_shuffle request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ToggleShuffle { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ToggleShuffleResponse {};
Ok(Response::new(reply))
}
@ -245,11 +275,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_repeat request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ToggleRepeat { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ToggleRepeatResponse {};
Ok(Response::new(reply))
}
@ -278,6 +310,7 @@ impl CrabidyService for RpcService {
Ok(Response::new(Box::pin(output_stream)))
}
#[instrument(skip(self, _request))]
async fn save_queue(
&self,
@ -297,11 +330,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_play request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::TogglePlay { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = TogglePlayResponse {};
Ok(Response::new(reply))
}
@ -314,11 +349,13 @@ impl CrabidyService for RpcService {
debug!("Received stop request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Stop { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = StopResponse {};
Ok(Response::new(reply))
}
@ -333,11 +370,13 @@ impl CrabidyService for RpcService {
debug!("Received change_volume request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ChangeVolume { delta, span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ChangeVolumeResponse {};
Ok(Response::new(reply))
}
@ -350,11 +389,13 @@ impl CrabidyService for RpcService {
debug!("Received toggle_mute request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::ToggleMute { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = ToggleMuteResponse {};
Ok(Response::new(reply))
}
@ -367,11 +408,13 @@ impl CrabidyService for RpcService {
debug!("Received next request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Next { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = NextResponse {};
Ok(Response::new(reply))
}
@ -384,11 +427,13 @@ impl CrabidyService for RpcService {
debug!("Received prev request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Prev { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = PrevResponse {};
Ok(Response::new(reply))
}
@ -401,11 +446,13 @@ impl CrabidyService for RpcService {
debug!("Received restart_track request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
if let Err(err) = playback_tx
.send_async(PlaybackMessage::RestartTrack { span })
.in_current_span()
.await
.unwrap();
{
error!("Failed to send request via channel: {}", err);
}
let reply = RestartTrackResponse {};
Ok(Response::new(reply))
}

View File

@ -1,9 +1,11 @@
use std::fmt::format;
/// Lots of stuff and especially the auth handling is shamelessly copied from
/// https://github.com/MinisculeGirraffe/tdl
use reqwest::Client as HttpClient;
use serde::de::DeserializeOwned;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, instrument};
use tracing::{debug, error, info, instrument};
pub mod config;
pub mod models;
use async_trait::async_trait;
@ -75,13 +77,20 @@ impl crabidy_core::ProviderClient for Client {
#[instrument(skip(self))]
fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode {
debug!("get_lib_root");
debug!("get_lib_root in tidaldy");
let global_root = crabidy_core::proto::crabidy::LibraryNode::new();
let children = vec![crabidy_core::proto::crabidy::LibraryNodeChild::new(
let children = vec![
crabidy_core::proto::crabidy::LibraryNodeChild::new(
"node:userplaylists".to_string(),
"playlists".to_string(),
false,
)];
),
crabidy_core::proto::crabidy::LibraryNodeChild::new(
"node:userartists".to_string(),
"artists".to_string(),
false,
),
];
crabidy_core::proto::crabidy::LibraryNode {
uuid: "node:tidal".to_string(),
title: "tidal".to_string(),
@ -100,8 +109,9 @@ impl crabidy_core::ProviderClient for Client {
let Some(user_id) = self.settings.login.user_id.clone() else {
return Err(crabidy_core::ProviderError::UnknownUser)
};
debug!("get_lib_node {}", uuid);
debug!("get_lib_node in tidaldy{}", uuid);
let (_kind, module, uuid) = split_uuid(uuid);
error!("module:{},uuid: {}", module, uuid);
let node = match module.as_str() {
"userplaylists" => {
let mut node = crabidy_core::proto::crabidy::LibraryNode {
@ -138,6 +148,53 @@ impl crabidy_core::ProviderClient for Client {
node.parent = Some("node:userplaylists".to_string());
node
}
"userartists" => {
let mut node = crabidy_core::proto::crabidy::LibraryNode {
uuid: "node:userartists".to_string(),
title: "artists".to_string(),
parent: Some("node:tidal".to_string()),
tracks: Vec::new(),
children: Vec::new(),
is_queable: false,
};
for artist in self.get_users_artists(&user_id).await? {
let child = crabidy_core::proto::crabidy::LibraryNodeChild::new(
format!("node:artist:{}", artist.item.id),
artist.item.name,
true,
);
node.children.push(child);
}
node
}
"artist" => {
info!("artist");
let mut node: crabidy_core::proto::crabidy::LibraryNode =
self.get_artist(&uuid).await?.into();
let children: Vec<crabidy_core::proto::crabidy::LibraryNodeChild> = self
.get_artist_albums(&uuid)
.await?
.iter()
.map(|t| t.into())
.collect();
node.children = children;
node.parent = Some("node:userartists".to_string());
node
}
"album" => {
let album = self.get_album(&uuid).await?;
let artis_id = album.artist.clone().unwrap().id;
let mut node: crabidy_core::proto::crabidy::LibraryNode = album.into();
let tracks: Vec<crabidy_core::proto::crabidy::Track> = self
.get_album_tracks(&uuid)
.await?
.iter()
.map(|t| t.into())
.collect();
node.tracks = tracks;
node.parent = Some(format!("node:artist:{}", artis_id));
node
}
_ => return Err(crabidy_core::ProviderError::MalformedUuid),
};
Ok(node)
@ -166,17 +223,18 @@ impl Client {
})
}
#[instrument]
#[instrument(skip(self))]
pub fn get_user_id(&self) -> Option<String> {
self.settings.login.user_id.clone()
}
#[instrument]
#[instrument(skip(self))]
pub async fn make_request<T: DeserializeOwned>(
&self,
uri: &str,
query: Option<&[(&str, String)]>,
) -> Result<T, ClientError> {
debug!("make_request {}", uri);
let Some(ref access_token) = self.settings.login.access_token.clone() else {
return Err(ClientError::AuthError(
"No access token found".to_string(),
@ -199,18 +257,27 @@ impl Client {
.bearer_auth(access_token)
.query(&params)
.send()
.await?
.await
.map_err(|e| {
error!("{:?}", e);
e
})?
.json()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
Ok(response)
}
#[instrument]
#[instrument(skip(self))]
pub async fn make_paginated_request<T: DeserializeOwned>(
&self,
uri: &str,
query: Option<&[(&str, String)]>,
) -> Result<Vec<T>, ClientError> {
debug!("make_paginated_request {}", uri);
let Some(ref access_token) = self.settings.login.access_token.clone() else {
return Err(ClientError::AuthError(
"No access token found".to_string(),
@ -236,9 +303,17 @@ impl Client {
.bearer_auth(access_token)
.query(&params)
.send()
.await?
.await
.map_err(|e| {
error!("{:?}", e);
e
})?
.json()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
let mut items = Vec::with_capacity(response.total_number_of_items);
items.extend(response.items);
while response.offset + limit < response.total_number_of_items {
@ -255,15 +330,23 @@ impl Client {
.bearer_auth(access_token)
.query(&params)
.send()
.await?
.await
.map_err(|e| {
error!("{:?}", e);
e
})?
.json()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
items.extend(response.items);
}
Ok(items)
}
#[instrument]
#[instrument(skip(self))]
pub async fn make_explorer_request(
&self,
uri: &str,
@ -291,14 +374,22 @@ impl Client {
.bearer_auth(access_token)
.query(&params)
.send()
.await?
.await
.map_err(|e| {
error!("{:?}", e);
e
})?
.text()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
println!("{:?}", response);
Ok(())
}
#[instrument]
#[instrument(skip(self))]
pub async fn search(&self, query: &str) -> Result<(), ClientError> {
let query = vec![("query", query.to_string())];
self.make_explorer_request(&format!("search/artists"), Some(&query))
@ -306,7 +397,7 @@ impl Client {
Ok(())
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_playlist_tracks(
&self,
playlist_uuid: &str,
@ -316,21 +407,35 @@ impl Client {
.await?)
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_playlist(&self, playlist_uuid: &str) -> Result<Playlist, ClientError> {
Ok(self
.make_request(&format!("playlists/{}", playlist_uuid), None)
.await?)
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_artist(&self, artist_uuid: &str) -> Result<Artist, ClientError> {
Ok(self
.make_request(&format!("artists/{}", artist_uuid), None)
.await?)
}
#[instrument(skip(self))]
pub async fn get_artist_albums(&self, artist_uuid: &str) -> Result<Vec<Album>, ClientError> {
Ok(self
.make_paginated_request(&format!("artists/{}/albums", artist_uuid), None)
.await?)
}
#[instrument(skip(self))]
pub async fn get_users_playlists(&self, user_id: u64) -> Result<Vec<Playlist>, ClientError> {
Ok(self
.make_paginated_request(&format!("users/{}/playlists", user_id), None)
.await?)
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_users_playlists_and_favorite_playlists(
&self,
user_id: &str,
@ -343,25 +448,7 @@ impl Client {
.await?)
}
#[instrument]
pub async fn explore_get_users_playlists_and_favorite_playlists(
&self,
user_id: u64,
) -> Result<(), ClientError> {
let limit = 50;
let offset = 0;
let limit_param = ("limit", limit.to_string());
let offset_param = ("offset", offset.to_string());
let params: Vec<(&str, String)> = vec![limit_param, offset_param];
self.make_explorer_request(
&format!("users/{}/playlistsAndFavoritePlaylists", user_id),
Some(&params[..]),
)
.await?;
Ok(())
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_users_favorites(&self, user_id: u64) -> Result<(), ClientError> {
self.make_explorer_request(
&format!("users/{}/favorites", user_id),
@ -372,7 +459,18 @@ impl Client {
Ok(())
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_users_artists(&self, user_id: &str) -> Result<Vec<ArtistItem>, ClientError> {
Ok(self
.make_paginated_request(
&format!("users/{}/favorites/artists", user_id),
None,
// Some(&query),
)
.await?)
}
#[instrument(skip(self))]
pub async fn get_user(&self, user_id: u64) -> Result<(), ClientError> {
self.make_explorer_request(
&format!("users/{}", user_id),
@ -383,7 +481,19 @@ impl Client {
Ok(())
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_album(&self, album_id: &str) -> Result<Album, ClientError> {
self.make_request(&format!("albums/{}/", album_id), None)
.await
}
#[instrument(skip(self))]
pub async fn get_album_tracks(&self, album_id: &str) -> Result<Vec<Track>, ClientError> {
self.make_paginated_request(&format!("albums/{}/tracks", album_id), None)
.await
}
#[instrument(skip(self))]
pub async fn get_track_playback(&self, track_id: &str) -> Result<TrackPlayback, ClientError> {
let query = vec![
("audioquality", "LOSSLESS".to_string()),
@ -397,14 +507,14 @@ impl Client {
.await
}
#[instrument]
#[instrument(skip(self))]
pub async fn get_track(&self, track_id: &str) -> Result<Track, ClientError> {
let (_, track_id, _) = split_uuid(track_id);
self.make_request(&format!("tracks/{}", track_id), None)
.await
}
#[instrument]
#[instrument(skip(self))]
pub async fn login_web(&mut self) -> Result<(), ClientError> {
let code_response = self.get_device_code().await?;
let now = Instant::now();
@ -443,7 +553,11 @@ impl Client {
.get(format!("{}/sessions", self.settings.base_url))
.bearer_auth(access_token)
.send()
.await?
.await
.map_err(|e| {
error!("{:?}", e);
e
})?
.status()
.is_success()
{
@ -459,7 +573,7 @@ impl Client {
Ok(())
}
#[instrument]
#[instrument(skip(self))]
pub async fn refresh_access_token(&self) -> Result<RefreshResponse, ClientError> {
let Some(refresh_token) = self.settings.login.refresh_token.clone() else {
return Err(ClientError::AuthError(
@ -485,7 +599,11 @@ impl Client {
)
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
if req.status().is_success() {
let res = req.json::<RefreshResponse>().await?;
Ok(res)
@ -495,7 +613,7 @@ impl Client {
))
}
}
#[instrument]
#[instrument(skip(self))]
async fn get_device_code(&self) -> Result<DeviceAuthResponse, ClientError> {
let req = DeviceAuthRequest {
client_id: self.settings.oauth.client_id.clone(),
@ -512,7 +630,11 @@ impl Client {
.header("Content-Type", "application/x-www-form-urlencoded")
.body(payload)
.send()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
if !res.status().is_success() {
return Err(ClientError::AuthError(res.status().to_string()));
@ -521,7 +643,7 @@ impl Client {
Ok(code)
}
#[instrument]
#[instrument(skip(self))]
pub async fn check_auth_status(
&self,
device_code: &str,
@ -544,7 +666,11 @@ impl Client {
.body(payload)
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await?;
.await
.map_err(|e| {
error!("{:?}", e);
e
})?;
if !res.status().is_success() {
if res.status().is_client_error() {
return Err(ClientError::AuthError(format!(
@ -564,25 +690,26 @@ impl Client {
#[cfg(test)]
mod tests {
use crabidy_core::ProviderClient;
use super::*;
fn setup() -> Client {
let settings = crate::config::Settings::default();
Client::new(settings).expect("could not create tidaldy client")
async fn setup() -> Client {
let raw_toml_settings =
std::fs::read_to_string("/home/hans/.config/crabidy/tidaldy.toml").unwrap();
Client::init(&raw_toml_settings).await.unwrap()
}
#[tokio::test]
async fn test_get_device_code() {
let client = setup();
println!("{:#?}", client);
let response = client.get_device_code().await.unwrap();
assert!(!response.device_code.is_empty());
assert_eq!(response.device_code.len(), 36);
assert!(!response.user_code.is_empty());
assert_eq!(response.user_code.len(), 5);
assert!(!response.verification_uri.is_empty());
assert!(!response.verification_uri_complete.is_empty());
assert!(response.expires_in == 300);
assert!(response.interval != 0);
async fn test() {
let client = setup().await;
let user = client.settings.login.user_id.clone().unwrap();
let result = client.get_users_artists(&user).await.unwrap();
println!("{:?}", result);
let result = client.get_artist("5293333").await.unwrap();
println!("{:?}", result);
let result = client.get_album("244167550").await.unwrap();
println!("{:?}", result);
assert!(false);
}
}

View File

@ -1,5 +1,6 @@
use std::{str::FromStr, string::FromUtf8Error};
use crabidy_core::proto::crabidy::{LibraryNode, LibraryNodeChild};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
@ -15,15 +16,58 @@ pub struct Page<T> {
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Item {
pub struct ArtistItem {
pub created: String,
pub item: Artist,
}
impl From<ArtistItem> for LibraryNode {
fn from(item: ArtistItem) -> Self {
Self {
uuid: format!("artist:{}", item.item.id),
title: item.item.name,
children: Vec::new(),
parent: None,
tracks: Vec::new(),
is_queable: true,
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Artist {
pub id: i64,
pub name: String,
pub artist_types: Vec<String>,
pub url: String,
pub picture: Value,
pub popularity: i64,
pub artist_roles: Vec<ArtistRole>,
pub mixes: Mixes,
pub artist_types: Option<Vec<String>>,
pub url: Option<String>,
pub picture: Option<Value>,
pub popularity: Option<i64>,
pub artist_roles: Option<Vec<ArtistRole>>,
pub mixes: Option<ArtistMixes>,
}
impl From<Artist> for LibraryNode {
fn from(artist: Artist) -> Self {
Self {
uuid: format!("node:artist:{}", artist.id),
title: artist.name,
children: Vec::new(),
parent: None,
tracks: Vec::new(),
is_queable: true,
}
}
}
impl From<Artist> for LibraryNodeChild {
fn from(artist: Artist) -> Self {
Self {
uuid: format!("node:artist:{}", artist.id),
title: artist.name,
is_queable: true,
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -126,43 +170,76 @@ impl TrackPlayback {
}
}
// #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
// #[serde(rename_all = "camelCase")]
// pub struct Track {
// pub id: u64,
// pub title: String,
// pub duration: u64,
// pub replay_gain: f64,
// pub peak: f64,
// pub allow_streaming: bool,
// pub stream_ready: bool,
// pub stream_start_date: Option<String>,
// pub premium_streaming_only: bool,
// pub track_number: u64,
// pub volume_number: u64,
// pub version: Value,
// pub popularity: u64,
// pub copyright: Option<String>,
// pub url: Option<String>,
// pub isrc: Option<String>,
// pub editable: bool,
// pub explicit: bool,
// pub audio_quality: String,
// pub audio_modes: Vec<String>,
// pub artist: Artist,
// pub artists: Vec<Artist>,
// pub album: Album,
// pub mixes: TrackMixes,
// }
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Track {
pub id: u64,
pub id: i64,
pub title: String,
pub duration: u64,
pub replay_gain: f64,
pub peak: f64,
pub allow_streaming: bool,
pub stream_ready: bool,
pub duration: Option<i64>,
pub replay_gain: Option<f64>,
pub peak: Option<f64>,
pub allow_streaming: Option<bool>,
pub stream_ready: Option<bool>,
pub ad_supported_stream_ready: Option<bool>,
pub stream_start_date: Option<String>,
pub premium_streaming_only: bool,
pub track_number: u64,
pub volume_number: u64,
pub version: Value,
pub popularity: u64,
pub premium_streaming_only: Option<bool>,
pub track_number: Option<i64>,
pub volume_number: Option<i64>,
pub version: Option<Value>,
pub popularity: Option<i64>,
pub copyright: Option<String>,
pub url: Option<String>,
pub isrc: Option<String>,
pub editable: bool,
pub explicit: bool,
pub audio_quality: String,
pub audio_modes: Vec<String>,
pub artist: Artist,
pub artists: Vec<Artist>,
pub album: Album,
pub mixes: Mixes,
pub editable: Option<bool>,
pub explicit: Option<bool>,
pub audio_quality: Option<String>,
pub audio_modes: Option<Vec<String>>,
pub media_metadata: Option<MediaMetadata>,
pub artist: Option<Artist>,
pub artists: Option<Vec<Artist>>,
pub album: Option<Album>,
pub mixes: Option<TrackMixes>,
}
impl From<Track> for crabidy_core::proto::crabidy::Track {
fn from(track: Track) -> Self {
Self {
uuid: format!("track:{}", track.id),
title: track.title,
artist: track.artist.name,
album: Some(track.album.into()),
duration: Some(track.duration as u32),
artist: match track.artist {
Some(a) => a.name.clone(),
None => "".to_string(),
},
album: track.album.map(|a| a.into()),
duration: track.duration.map(|d| d as u32 * 1000),
}
}
}
@ -172,42 +249,143 @@ impl From<&Track> for crabidy_core::proto::crabidy::Track {
Self {
uuid: format!("track:{}", track.id),
title: track.title.clone(),
artist: track.artist.name.clone(),
album: Some(track.album.clone().into()),
duration: Some(track.duration as u32),
artist: match track.artist.as_ref() {
Some(a) => a.name.clone(),
None => "".to_string(),
},
album: track.album.clone().map(|a| a.into()),
duration: track.duration.map(|d| d as u32 * 1000),
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Artist {
pub id: i64,
pub name: String,
#[serde(rename = "type")]
pub type_field: String,
pub picture: Value,
}
// #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
// #[serde(rename_all = "camelCase")]
// pub struct Artist {
// pub id: i64,
// pub name: String,
// #[serde(rename = "type")]
// pub type_field: String,
// pub picture: Value,
// }
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Artist2 {
pub id: i64,
pub name: String,
#[serde(rename = "type")]
pub type_field: String,
pub picture: Value,
}
// #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
// #[serde(rename_all = "camelCase")]
// pub struct Artist2 {
// pub id: i64,
// pub name: String,
// #[serde(rename = "type")]
// pub type_field: String,
// pub picture: Value,
// }
// #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
// #[serde(rename_all = "camelCase")]
// pub struct Album {
// pub id: i64,
// pub title: String,
// pub cover: String,
// pub vibrant_color: String,
// pub video_cover: Value,
// pub release_date: Option<String>,
// }
//
// #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
// #[serde(rename_all = "camelCase")]
// pub struct Root {
// pub id: i64,
// pub title: String,
// pub duration: i64,
// pub stream_ready: bool,
// pub ad_supported_stream_ready: bool,
// pub stream_start_date: String,
// pub allow_streaming: bool,
// pub premium_streaming_only: bool,
// pub number_of_tracks: i64,
// pub number_of_videos: i64,
// pub number_of_volumes: i64,
// pub release_date: String,
// pub copyright: String,
// #[serde(rename = "type")]
// pub type_field: String,
// pub version: Value,
// pub url: String,
// pub cover: String,
// pub vibrant_color: String,
// pub video_cover: Value,
// pub explicit: bool,
// pub upc: String,
// pub popularity: i64,
// pub audio_quality: String,
// pub audio_modes: Vec<String>,
// pub media_metadata: MediaMetadata,
// pub artist: Artist,
// pub artists: Vec<Artist2>,
// }
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Album {
pub id: i64,
pub title: String,
pub cover: String,
pub vibrant_color: String,
pub video_cover: Value,
pub cover: Option<String>,
pub vibrant_color: Option<String>,
pub release_date: Option<String>,
pub duration: Option<i64>,
pub stream_ready: Option<bool>,
pub ad_supported_stream_ready: Option<bool>,
pub stream_start_date: Option<String>,
pub allow_streaming: Option<bool>,
pub premium_streaming_only: Option<bool>,
pub number_of_tracks: Option<i64>,
pub number_of_videos: Option<i64>,
pub number_of_volumes: Option<i64>,
pub copyright: Option<String>,
#[serde(rename = "type")]
pub type_field: Option<String>,
pub version: Option<Value>,
pub url: Option<String>,
pub video_cover: Option<Value>,
pub explicit: Option<bool>,
pub upc: Option<String>,
pub popularity: Option<i64>,
pub audio_quality: Option<String>,
pub audio_modes: Option<Vec<String>>,
pub media_metadata: Option<MediaMetadata>,
pub artist: Option<Artist>,
pub artists: Option<Vec<Artist>>,
}
impl From<Album> for crabidy_core::proto::crabidy::LibraryNode {
fn from(album: Album) -> Self {
Self {
uuid: format!("node:album:{}", album.id),
title: album.title,
children: Vec::new(),
parent: None,
tracks: Vec::new(),
is_queable: true,
}
}
}
impl From<Album> for crabidy_core::proto::crabidy::LibraryNodeChild {
fn from(album: Album) -> Self {
Self {
uuid: format!("node:album:{}", album.id),
title: album.title,
is_queable: true,
}
}
}
impl From<&Album> for crabidy_core::proto::crabidy::LibraryNodeChild {
fn from(album: &Album) -> Self {
Self {
uuid: format!("node:album:{}", album.id),
title: album.title.clone(),
is_queable: true,
}
}
}
impl From<Album> for crabidy_core::proto::crabidy::Album {
@ -221,11 +399,26 @@ impl From<Album> for crabidy_core::proto::crabidy::Album {
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Mixes {
pub struct MediaMetadata {
pub tags: Vec<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TrackMixes {
#[serde(rename = "TRACK_MIX")]
pub track_mix: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ArtistMixes {
#[serde(rename = "MASTER_ARTIST_MIX")]
pub master_artist_mix: Option<String>,
#[serde(rename = "ARTIST_MIX")]
pub artist_mix: Option<String>,
}
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all(deserialize = "camelCase"))]
pub struct PlaybackManifest {
@ -359,7 +552,7 @@ pub struct PlaylistTrack {
pub artist: Artist,
pub artists: Vec<Artist>,
pub album: Album,
pub mixes: Mixes,
pub mixes: TrackMixes,
pub date_added: String,
pub index: i64,
pub item_uuid: String,