Add draft of working tidal playback

Streaming yet to come.
This commit is contained in:
Hans Mündelein 2023-05-26 12:06:35 +02:00
parent acf34c2722
commit 035fdf1a4b
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
4 changed files with 685 additions and 82 deletions

View File

@ -175,14 +175,16 @@ message GetTrackUpdatesRequest {
repeated string type_blacklist = 3; repeated string type_blacklist = 3;
} }
message GetActiveTrackResponse { message ActiveTrack {
optional Track track = 1; optional Track track = 1;
TrackPlayState play_state = 2; TrackPlayState play_state = 2;
uint32 completion = 3; uint32 completion = 3;
} }
message GetTrackUpdatesResponse { message GetActiveTrackResponse {
optional Track track = 1; ActiveTrack active_track = 1;
TrackPlayState play_state = 2; }
uint32 completion = 3;
message GetTrackUpdatesResponse {
ActiveTrack active_track = 1;
} }

View File

@ -1,7 +1,7 @@
pub mod proto; pub mod proto;
use async_trait::async_trait; use async_trait::async_trait;
use proto::crabidy::{LibraryNode, LibraryNodeState}; use proto::crabidy::{LibraryNode, LibraryNodeState, Queue, Track};
#[async_trait] #[async_trait]
pub trait ProviderClient: std::fmt::Debug + Send + Sync { pub trait ProviderClient: std::fmt::Debug + Send + Sync {
@ -10,8 +10,9 @@ pub trait ProviderClient: std::fmt::Debug + Send + Sync {
Self: Sized; Self: Sized;
fn settings(&self) -> String; fn settings(&self) -> String;
async fn get_urls_for_track(&self, track_uuid: &str) -> Result<Vec<String>, ProviderError>; async fn get_urls_for_track(&self, track_uuid: &str) -> Result<Vec<String>, ProviderError>;
fn get_library_root(&self) -> LibraryNode; async fn get_metadata_for_track(&self, track_uuid: &str) -> Result<Track, ProviderError>;
async fn get_library_node(&self, list_uuid: &str) -> Result<LibraryNode, ProviderError>; fn get_lib_root(&self) -> LibraryNode;
async fn get_lib_node(&self, list_uuid: &str) -> Result<LibraryNode, ProviderError>;
} }
#[derive(Clone, Debug, Hash)] #[derive(Clone, Debug, Hash)]
@ -20,9 +21,16 @@ pub enum ProviderError {
CouldNotLogin, CouldNotLogin,
FetchError, FetchError,
MalformedUuid, MalformedUuid,
InternalError,
Other, Other,
} }
impl std::fmt::Display for ProviderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl LibraryNode { impl LibraryNode {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -36,3 +44,60 @@ impl LibraryNode {
} }
} }
} }
pub enum QueueError {
NotQueable,
}
impl Queue {
pub fn current(&mut self) -> Option<Track> {
if self.current < self.tracks.len() as u32 {
Some(self.tracks[self.current as usize].clone())
} else {
None
}
}
pub fn next(&mut self) -> Option<Track> {
if self.current < self.tracks.len() as u32 {
self.current += 1;
Some(self.tracks[self.current as usize].clone())
} else {
None
}
}
pub fn set_current(&mut self, current: u32) -> bool {
if current < self.tracks.len() as u32 {
self.current = current;
true
} else {
false
}
}
pub fn replace_with_tracks(&mut self, tracks: &[Track]) {
self.current = 0;
self.tracks = tracks.to_vec();
}
pub fn append_tracks(&mut self, tracks: &[Track]) {
self.tracks.extend(tracks.iter().cloned());
}
pub fn queue_tracks(&mut self, tracks: &[Track]) {
let tail: Vec<Track> = self
.tracks
.splice((self.current as usize).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
}
pub fn remove_tracks(&mut self, positions: &[u32]) {
for pos in positions {
if *pos < self.tracks.len() as u32 {
self.tracks.remove(*pos as usize);
}
}
}
}

View File

@ -1,33 +1,79 @@
use anyhow::Error;
use async_trait::async_trait; use async_trait::async_trait;
use crabidy_core::proto::crabidy::{ use crabidy_core::proto::crabidy::{
crabidy_service_server::{CrabidyService, CrabidyServiceServer}, crabidy_service_server::{CrabidyService, CrabidyServiceServer},
get_queue_updates_response::QueueUpdateResult, get_queue_updates_response::QueueUpdateResult,
AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse, ActiveTrack, AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse,
GetActiveTrackRequest, GetActiveTrackResponse, GetLibraryNodeRequest, GetLibraryNodeResponse, GetActiveTrackRequest, GetActiveTrackResponse, GetLibraryNodeRequest, GetLibraryNodeResponse,
GetQueueRequest, GetQueueResponse, GetQueueUpdatesRequest, GetQueueUpdatesResponse, GetQueueRequest, GetQueueResponse, GetQueueUpdatesRequest, GetQueueUpdatesResponse,
GetTrackRequest, GetTrackResponse, GetTrackUpdatesRequest, GetTrackUpdatesResponse, GetTrackRequest, GetTrackResponse, GetTrackUpdatesRequest, GetTrackUpdatesResponse,
LibraryNode, LibraryNodeState, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse, LibraryNode, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse, QueuePositionChange,
QueuePositionChange, QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest, QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest, RemoveTracksResponse,
RemoveTracksResponse, ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest, ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest,
ReplaceWithTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentTrackRequest, ReplaceWithTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentTrackRequest,
SetCurrentTrackResponse, StopRequest, StopResponse, TogglePlayRequest, TogglePlayResponse, SetCurrentTrackResponse, StopRequest, StopResponse, TogglePlayRequest, TogglePlayResponse,
TrackPlayState, Track,
}; };
use crabidy_core::{ProviderClient, ProviderError}; use crabidy_core::{ProviderClient, ProviderError};
use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer}; use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer};
// use once_cell::sync::OnceCell;
use std::{collections::HashMap, fs, pin::Pin, sync::RwLock};
use tonic::{codegen::futures_core::Stream, transport::Server, Request, Response, Result, Status};
// static CHANNEL: OnceCell<flume::Sender<Input>> = OnceCell::new(); use std::{
// static ORCHESTRATOR_CHANNEL: OnceCell<flume::Sender<OrchestratorMessage>> = OnceCell::new(); fs,
sync::{Arc, Mutex},
};
use tonic::{transport::Server, Request, Response, Result, Status};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let orchestrator = ClientOrchestrator::init("").await.unwrap(); let (queue_update_tx, queue_update_rx) = flume::bounded(10);
let tx = orchestrator.run(); let (active_track_tx, active_track_rx) = flume::bounded(1000);
let crabidy_service = AppState::new(tx); let orchestrator = ProviderOrchestrator::init("").await.unwrap();
let playback = Playback::new(
active_track_tx.clone(),
queue_update_tx.clone(),
orchestrator.provider_tx.clone(),
);
let bus = playback.play.message_bus();
let playback_tx = playback.playback_tx.clone();
bus.set_sync_handler(move |_, msg| {
match PlayMessage::parse(msg) {
Ok(PlayMessage::EndOfStream) => {
println!("End of stream");
playback_tx.send(PlaybackMessage::Next).unwrap();
println!("Next messages was sent");
}
Ok(PlayMessage::StateChanged { state }) => {
println!("State changed: {:?}", state);
playback_tx
.send(PlaybackMessage::StateChanged { state })
.unwrap();
}
Ok(PlayMessage::PositionUpdated { position }) => {
// println!("Position updated: {:?}", position);
}
Ok(PlayMessage::Buffering { percent }) => {
// println!("Position updated: {:?}", position);
}
Ok(PlayMessage::VolumeChanged { volume }) => {}
Ok(PlayMessage::MuteChanged { muted }) => {
println!("Mute changed to muted: {:?}", muted);
}
Ok(PlayMessage::MediaInfoUpdated { info }) => {}
_ => println!("Unknown message: {:?}", msg),
}
gstreamer::BusSyncReply::Drop
});
let crabidy_service = RpcService::new(
queue_update_rx,
active_track_rx,
playback.playback_tx.clone(),
orchestrator.provider_tx.clone(),
);
orchestrator.run();
playback.run();
let addr = "[::1]:50051".parse()?; let addr = "[::1]:50051".parse()?;
Server::builder() Server::builder()
@ -38,59 +84,99 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
enum OrchestratorMessage { #[derive(Debug)]
enum ProviderMessage {
GetNode { GetNode {
uuid: String, uuid: String,
callback: flume::Sender<LibraryNode>, result_tx: flume::Sender<Result<LibraryNode, ProviderError>>,
}, },
GetTracksPlaybackUrls { GetTrack {
uuid: String, uuid: String,
callback: flume::Sender<Vec<String>>, result_tx: flume::Sender<Result<Track, ProviderError>>,
},
GetTrackUrls {
uuid: String,
result_tx: flume::Sender<Result<Vec<String>, ProviderError>>,
},
FlattenNode {
uuid: String,
result_tx: flume::Sender<Vec<Track>>,
}, },
} }
#[derive(Debug)] #[derive(Debug)]
struct ClientOrchestrator { struct ProviderOrchestrator {
rx: flume::Receiver<OrchestratorMessage>, provider_tx: flume::Sender<ProviderMessage>,
tx: flume::Sender<OrchestratorMessage>, provider_rx: flume::Receiver<ProviderMessage>,
tidal_client: tidaldy::Client, // known_tracks: RwLock<HashMap<String, Track>>,
// known_nodes: RwLock<HashMap<String, LibraryNode>>,
tidal_client: Arc<tidaldy::Client>,
} }
impl ClientOrchestrator { impl ProviderOrchestrator {
fn run(self) -> flume::Sender<OrchestratorMessage> { fn run(self) {
let tx = self.tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(msg) = self.rx.recv_async().await { while let Ok(msg) = self.provider_rx.recv_async().await {
println!("Orchestrator {:?}", msg);
match msg { match msg {
OrchestratorMessage::GetNode { uuid, callback } => { ProviderMessage::GetNode { uuid, result_tx } => {
let node = match uuid.as_str() { let result = self.get_lib_node(&uuid).await;
"/" => self.get_library_root(), result_tx.send(result).unwrap();
_ => self.get_library_node(&uuid).await.unwrap(),
};
callback.send_async(node).await;
} }
OrchestratorMessage::GetTracksPlaybackUrls { uuid, callback } => { ProviderMessage::GetTrack { uuid, result_tx } => {
let urls = self.get_urls_for_track(&uuid).await.unwrap(); let result = self.get_metadata_for_track(&uuid).await;
callback.send_async(urls).await; result_tx.send(result).unwrap();
}
ProviderMessage::GetTrackUrls { uuid, result_tx } => {
let result = self.get_urls_for_track(&uuid).await;
result_tx.send(result).unwrap();
}
ProviderMessage::FlattenNode { uuid, result_tx } => {
let result = self.flatten_node(&uuid).await;
result_tx.send(result).unwrap();
} }
} }
} }
}); });
tx }
async fn flatten_node(&self, node_uuid: &str) -> Vec<Track> {
let mut tracks = Vec::with_capacity(1000);
let mut nodes_to_go = Vec::with_capacity(100);
nodes_to_go.push(node_uuid.to_string());
while let Some(node_uuid) = nodes_to_go.pop() {
let Ok(node) = self.get_lib_node(&node_uuid).await else {
continue
};
tracks.extend(node.tracks);
nodes_to_go.extend(node.children);
}
tracks
} }
} }
#[async_trait] #[async_trait]
impl ProviderClient for ClientOrchestrator { impl ProviderClient for ProviderOrchestrator {
async fn init(_s: &str) -> Result<Self, ProviderError> { async fn init(_s: &str) -> Result<Self, ProviderError> {
gstreamer::init().unwrap();
let play = Play::new(None::<PlayVideoRenderer>);
let state = Mutex::new(PlayState::Stopped);
let queue = Mutex::new(Queue {
timestamp: 0,
current: 0,
tracks: Vec::new(),
});
let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned()); let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned());
let tidal_client = tidaldy::Client::init(&raw_toml_settings).await.unwrap(); let tidal_client = Arc::new(tidaldy::Client::init(&raw_toml_settings).await.unwrap());
let new_toml_config = tidal_client.settings(); let new_toml_config = tidal_client.settings();
fs::write("/tmp/tidaldy.toml", new_toml_config).unwrap(); fs::write("/tmp/tidaldy.toml", new_toml_config).unwrap();
let (tx, rx) = flume::unbounded(); // let known_tracks = HashMap::new();
// let known_nodes = HashMap::new();
let (provider_tx, provider_rx) = flume::bounded(100);
Ok(Self { Ok(Self {
rx, provider_rx,
tx, provider_tx,
// known_tracks,
// known_nodes,
tidal_client, tidal_client,
}) })
} }
@ -100,36 +186,348 @@ impl ProviderClient for ClientOrchestrator {
async fn get_urls_for_track(&self, track_uuid: &str) -> Result<Vec<String>, ProviderError> { async fn get_urls_for_track(&self, track_uuid: &str) -> Result<Vec<String>, ProviderError> {
self.tidal_client.get_urls_for_track(track_uuid).await self.tidal_client.get_urls_for_track(track_uuid).await
} }
fn get_library_root(&self) -> LibraryNode { async fn get_metadata_for_track(&self, track_uuid: &str) -> Result<Track, ProviderError> {
self.tidal_client.get_metadata_for_track(track_uuid).await
}
fn get_lib_root(&self) -> LibraryNode {
let mut root_node = LibraryNode::new(); let mut root_node = LibraryNode::new();
root_node.children.push("tidal".to_owned()); root_node.children.push("tidal".to_owned());
root_node root_node
} }
async fn get_library_node(&self, uuid: &str) -> Result<LibraryNode, ProviderError> { async fn get_lib_node(&self, uuid: &str) -> Result<LibraryNode, ProviderError> {
if uuid == "tidal" { if uuid == "tidal" {
return Ok(self.tidal_client.get_library_root()); return Ok(self.tidal_client.get_lib_root());
} }
self.tidal_client.get_library_node(uuid).await self.tidal_client.get_lib_node(uuid).await
} }
} }
#[derive(Debug)] #[derive(Debug)]
struct AppState { enum PlaybackMessage {
known_nodes: RwLock<HashMap<String, LibraryNode>>, ReplaceWithTrack {
orchestrator_tx: flume::Sender<OrchestratorMessage>, uuid: String,
},
ReplaceWithNode {
uuid: String,
},
QueueTrack {
uuid: String,
},
QueueNode {
uuid: String,
},
ClearQueue,
GetQueue {
result_tx: flume::Sender<Queue>,
},
AppendTrack {
uuid: String,
},
AppendNode {
uuid: String,
},
RemoveTracks {
positions: Vec<u32>,
},
SetCurrent {
position: u32,
},
GetCurrent {
result_tx: flume::Sender<ActiveTrack>,
},
Next,
PlayPause,
Stop,
StateChanged {
state: PlayState,
},
} }
impl AppState { #[derive(Debug)]
fn new(orchestrator_tx: flume::Sender<OrchestratorMessage>) -> Self { struct Playback {
active_track_tx: flume::Sender<ActiveTrack>,
queue_update_tx: flume::Sender<Queue>,
provider_tx: flume::Sender<ProviderMessage>,
playback_tx: flume::Sender<PlaybackMessage>,
playback_rx: flume::Receiver<PlaybackMessage>,
queue: Mutex<Queue>,
state: Mutex<PlayState>,
play: Play,
creation: std::time::Instant,
}
impl Playback {
fn new(
active_track_tx: flume::Sender<ActiveTrack>,
queue_update_tx: flume::Sender<Queue>,
provider_tx: flume::Sender<ProviderMessage>,
) -> Self {
let (playback_tx, playback_rx) = flume::bounded(10);
let queue = Mutex::new(Queue {
timestamp: 0,
current: 0,
tracks: Vec::new(),
});
let state = Mutex::new(PlayState::Stopped);
let play = Play::new(None::<PlayVideoRenderer>);
let creation = std::time::Instant::now();
Self { Self {
known_nodes: RwLock::new(HashMap::new()), active_track_tx,
orchestrator_tx, queue_update_tx,
provider_tx,
playback_tx,
playback_rx,
queue,
state,
play,
creation,
}
}
fn run(self) {
tokio::spawn(async move {
while let Ok(message) = self.playback_rx.recv_async().await {
println!("Playback{:?}", message);
match message {
PlaybackMessage::ReplaceWithTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await {
{
let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&[track.clone()]);
}
self.play(track).await;
}
}
PlaybackMessage::ReplaceWithNode { uuid } => {
let tracks = self.flatten_node(&uuid).await;
{
let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&tracks);
}
if tracks.len() > 0 {
self.play(tracks[0].clone()).await;
}
}
PlaybackMessage::QueueTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&[track]);
}
}
PlaybackMessage::QueueNode { uuid } => {
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&tracks);
}
PlaybackMessage::GetQueue { result_tx } => {
let queue = self.queue.lock().unwrap();
result_tx.send(queue.clone()).unwrap();
}
PlaybackMessage::AppendTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&[track]);
}
}
PlaybackMessage::AppendNode { uuid } => {
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&tracks);
}
PlaybackMessage::ClearQueue => {
let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&vec![]);
self.stop_track()
}
//TODO handle deletion of current track
PlaybackMessage::RemoveTracks { positions } => {
let mut queue = self.queue.lock().unwrap();
queue.remove_tracks(&positions);
}
PlaybackMessage::SetCurrent { position } => {
let result = {
let mut queue = self.queue.lock().unwrap();
queue.set_current(position);
queue.current()
};
if let Some(track) = result {
self.play(track).await;
}
}
PlaybackMessage::GetCurrent { result_tx } => {
let current = self.get_active_track().await;
result_tx.send(current).unwrap();
}
PlaybackMessage::Next => {
let (result, stop) = {
let mut queue = self.queue.lock().unwrap();
let position = queue.current + 1;
let stop = !queue.set_current(position);
(queue.current(), stop)
};
if let Some(track) = result {
self.play(track).await;
}
if stop {
self.stop_track()
}
}
PlaybackMessage::PlayPause => {
let mut state = self.state.lock().unwrap();
if *state == PlayState::Playing {
self.play.pause();
*state = PlayState::Paused
} else {
self.play.play();
*state = PlayState::Playing
}
}
PlaybackMessage::Stop => {
self.play.stop();
*self.state.lock().unwrap() = PlayState::Stopped;
}
PlaybackMessage::StateChanged { state } => *self.state.lock().unwrap() = state,
}
}
});
}
async fn flatten_node(&self, uuid: &str) -> Vec<Track> {
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let Ok(_) = tx.send_async(ProviderMessage::FlattenNode {
uuid: uuid.to_string(),
result_tx,
}).await else {
return Vec::new();
};
let Ok(tracks) = result_rx
.recv_async()
.await else {
return Vec::new();
};
tracks
}
async fn get_track(&self, uuid: &str) -> Result<Track, ProviderError> {
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
tx.send_async(ProviderMessage::GetTrack {
uuid: uuid.to_string(),
result_tx,
})
.await
.map_err(|_| ProviderError::InternalError)?;
result_rx
.recv_async()
.await
.map_err(|_| ProviderError::InternalError)?
}
async fn get_urls_for_track(&self, uuid: &str) -> Result<Vec<String>, ProviderError> {
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
tx.send_async(ProviderMessage::GetTrackUrls {
uuid: uuid.to_string(),
result_tx,
})
.await
.map_err(|_| ProviderError::InternalError)?;
result_rx
.recv_async()
.await
.map_err(|_| ProviderError::InternalError)?
}
async fn get_active_track(&self) -> ActiveTrack {
let result = {
let mut queue = self.queue.lock().unwrap();
queue.current()
};
let completion = 0;
let gst_play_state = self.state.lock().unwrap();
let play_state = match *gst_play_state {
PlayState::Stopped => crabidy_core::proto::crabidy::TrackPlayState::Stopped,
PlayState::Buffering => crabidy_core::proto::crabidy::TrackPlayState::Loading,
PlayState::Playing => crabidy_core::proto::crabidy::TrackPlayState::Playing,
PlayState::Paused => crabidy_core::proto::crabidy::TrackPlayState::Paused,
_ => crabidy_core::proto::crabidy::TrackPlayState::Unspecified,
};
let play_state = play_state as i32;
ActiveTrack {
track: result,
completion,
play_state,
}
}
async fn play(&self, track: Track) {
let Ok(urls) = self.get_urls_for_track(&track.uuid).await else {
return
};
println!("Playing urls {:?}", urls);
{
let mut state_guard = self.state.lock().unwrap();
*state_guard = PlayState::Playing;
}
self.play.set_uri(Some(&urls[0]));
self.play.play();
}
fn stop_track(&self) {
println!("Stopping");
{
let mut state_guard = self.state.lock().unwrap();
*state_guard = PlayState::Stopped;
}
self.play.stop();
}
fn playpause(&self) {
let mut state_guard = self.state.lock().unwrap();
if *state_guard == PlayState::Playing {
println!("Pausing");
*state_guard = PlayState::Paused;
self.play.pause();
} else {
println!("Playing");
*state_guard = PlayState::Playing;
self.play.play()
}
}
}
#[derive(Debug)]
struct RpcService {
queue_update_rx: flume::Receiver<Queue>,
active_track_rx: flume::Receiver<ActiveTrack>,
playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>,
}
impl RpcService {
fn new(
queue_update_rx: flume::Receiver<Queue>,
active_track_rx: flume::Receiver<ActiveTrack>,
playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>,
) -> Self {
Self {
queue_update_rx,
active_track_rx,
playback_tx,
provider_tx,
} }
} }
} }
#[tonic::async_trait] #[tonic::async_trait]
impl CrabidyService for AppState { impl CrabidyService for RpcService {
type GetQueueUpdatesStream = type GetQueueUpdatesStream =
futures::stream::Iter<std::vec::IntoIter<Result<GetQueueUpdatesResponse, Status>>>; futures::stream::Iter<std::vec::IntoIter<Result<GetQueueUpdatesResponse, Status>>>;
type GetTrackUpdatesStream = type GetTrackUpdatesStream =
@ -140,87 +538,183 @@ impl CrabidyService for AppState {
request: Request<GetLibraryNodeRequest>, request: Request<GetLibraryNodeRequest>,
) -> Result<Response<GetLibraryNodeResponse>, Status> { ) -> Result<Response<GetLibraryNodeResponse>, Status> {
println!("Got a library node request: {:?}", request); println!("Got a library node request: {:?}", request);
let node_uuid = request.into_inner().uuid; let provider_tx = self.provider_tx.clone();
let (tx, rx) = flume::bounded(1); let (result_tx, result_rx) = flume::bounded(1);
self.orchestrator_tx
.send_async(OrchestratorMessage::GetNode { provider_tx
uuid: node_uuid, .send_async(ProviderMessage::GetNode {
callback: tx, uuid: request.into_inner().uuid,
result_tx,
}) })
.await .await
.unwrap(); .map_err(|_| Status::internal("Failed to send request via channel"))?;
let node = rx.recv_async().await.unwrap(); let result = result_rx
let resp = GetLibraryNodeResponse { node: Some(node) }; .recv_async()
Ok(Response::new(resp)) .await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
println!("Got a library node response: {:?}", result);
match result {
Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })),
Err(err) => Err(Status::internal(err.to_string())),
}
} }
async fn get_track( async fn get_track(
&self, &self,
request: Request<GetTrackRequest>, request: Request<GetTrackRequest>,
) -> Result<Response<GetTrackResponse>, Status> { ) -> Result<Response<GetTrackResponse>, Status> {
println!("Got a track request: {:?}", request); println!("Got a library track request: {:?}", request);
let provider_tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let req = request.into_inner(); provider_tx
.send_async(ProviderMessage::GetTrack {
let reply = GetTrackResponse { track: None }; uuid: request.into_inner().uuid,
Ok(Response::new(reply)) result_tx,
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let result = result_rx
.recv_async()
.await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
println!("Got a library node response: {:?}", result);
match result {
Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })),
Err(err) => Err(Status::internal(err.to_string())),
}
} }
async fn queue_track( async fn queue_track(
&self, &self,
request: tonic::Request<QueueTrackRequest>, request: tonic::Request<QueueTrackRequest>,
) -> std::result::Result<tonic::Response<QueueTrackResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<QueueTrackResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::QueueTrack {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = QueueTrackResponse {}; let reply = QueueTrackResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn queue_library_node( async fn queue_library_node(
&self, &self,
request: tonic::Request<QueueLibraryNodeRequest>, request: tonic::Request<QueueLibraryNodeRequest>,
) -> std::result::Result<tonic::Response<QueueLibraryNodeResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<QueueLibraryNodeResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::QueueNode {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = QueueLibraryNodeResponse {}; let reply = QueueLibraryNodeResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn replace_with_track( async fn replace_with_track(
&self, &self,
request: tonic::Request<ReplaceWithTrackRequest>, request: tonic::Request<ReplaceWithTrackRequest>,
) -> std::result::Result<tonic::Response<ReplaceWithTrackResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<ReplaceWithTrackResponse>, tonic::Status> {
println!("Got a replace with track request: {:?}", request);
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::ReplaceWithTrack {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = ReplaceWithTrackResponse {}; let reply = ReplaceWithTrackResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn replace_with_node( async fn replace_with_node(
&self, &self,
request: tonic::Request<ReplaceWithNodeRequest>, request: tonic::Request<ReplaceWithNodeRequest>,
) -> std::result::Result<tonic::Response<ReplaceWithNodeResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<ReplaceWithNodeResponse>, tonic::Status> {
println!("Got a replace with node request: {:?}", request);
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::ReplaceWithNode {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = ReplaceWithNodeResponse {}; let reply = ReplaceWithNodeResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn append_track( async fn append_track(
&self, &self,
request: tonic::Request<AppendTrackRequest>, request: tonic::Request<AppendTrackRequest>,
) -> std::result::Result<tonic::Response<AppendTrackResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<AppendTrackResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::AppendTrack {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = AppendTrackResponse {}; let reply = AppendTrackResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn append_node( async fn append_node(
&self, &self,
request: tonic::Request<AppendNodeRequest>, request: tonic::Request<AppendNodeRequest>,
) -> std::result::Result<tonic::Response<AppendNodeResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<AppendNodeResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::AppendNode {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = AppendNodeResponse {}; let reply = AppendNodeResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn remove_tracks( async fn remove_tracks(
&self, &self,
request: tonic::Request<RemoveTracksRequest>, request: tonic::Request<RemoveTracksRequest>,
) -> std::result::Result<tonic::Response<RemoveTracksResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<RemoveTracksResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::RemoveTracks {
positions: req.positions,
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = RemoveTracksResponse {}; let reply = RemoveTracksResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn set_current_track( async fn set_current_track(
&self, &self,
request: tonic::Request<SetCurrentTrackRequest>, request: tonic::Request<SetCurrentTrackRequest>,
) -> std::result::Result<tonic::Response<SetCurrentTrackResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<SetCurrentTrackResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::SetCurrent {
position: req.position,
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = SetCurrentTrackResponse {}; let reply = SetCurrentTrackResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn get_queue_updates( async fn get_queue_updates(
&self, &self,
request: tonic::Request<GetQueueUpdatesRequest>, request: tonic::Request<GetQueueUpdatesRequest>,
@ -242,6 +736,7 @@ impl CrabidyService for AppState {
let output_stream = futures::stream::iter(queue_vec.into_iter()); let output_stream = futures::stream::iter(queue_vec.into_iter());
Ok(Response::new(output_stream)) Ok(Response::new(output_stream))
} }
async fn get_queue( async fn get_queue(
&self, &self,
request: tonic::Request<GetQueueRequest>, request: tonic::Request<GetQueueRequest>,
@ -249,6 +744,7 @@ impl CrabidyService for AppState {
let reply = GetQueueResponse { queue: None }; let reply = GetQueueResponse { queue: None };
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn save_queue( async fn save_queue(
&self, &self,
request: tonic::Request<SaveQueueRequest>, request: tonic::Request<SaveQueueRequest>,
@ -256,6 +752,7 @@ impl CrabidyService for AppState {
let reply = SaveQueueResponse {}; let reply = SaveQueueResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
/// Playback /// Playback
async fn toggle_play( async fn toggle_play(
&self, &self,
@ -264,6 +761,7 @@ impl CrabidyService for AppState {
let reply = TogglePlayResponse {}; let reply = TogglePlayResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn stop( async fn stop(
&self, &self,
request: tonic::Request<StopRequest>, request: tonic::Request<StopRequest>,
@ -271,17 +769,20 @@ impl CrabidyService for AppState {
let reply = StopResponse {}; let reply = StopResponse {};
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn get_active_track( async fn get_active_track(
&self, &self,
request: tonic::Request<GetActiveTrackRequest>, request: tonic::Request<GetActiveTrackRequest>,
) -> std::result::Result<tonic::Response<GetActiveTrackResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<GetActiveTrackResponse>, tonic::Status> {
let reply = GetActiveTrackResponse { let reply = GetActiveTrackResponse {
track: None, active_track: None,
play_state: TrackPlayState::Stopped as i32, // track: None,
completion: 0, // play_state: TrackPlayState::Stopped as i32,
// completion: 0,
}; };
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn get_track_updates( async fn get_track_updates(
&self, &self,
request: tonic::Request<GetTrackUpdatesRequest>, request: tonic::Request<GetTrackUpdatesRequest>,
@ -292,6 +793,31 @@ impl CrabidyService for AppState {
} }
} }
// async fn listen_for_events(app: RpcService, rx: flume::Receiver<()>) {
// tokio::spawn(async move {
// println!("Listening for events");
// loop {
// println!("Waiting for next message");
// while let Ok(_) = rx.recv_async().await {
// println!("Received next message");
// let track_result = {
// let mut queue_guard = app.queue.lock().unwrap();
// println!("{:?}", queue_guard);
// queue_guard.next()
// };
// println!("{:?}", track_result);
// if let Some(track) = track_result {
// println!("Playing next track {:?}", track);
// app.play(Some(track)).await;
// } else {
// println!("Stopping at end of playlist");
// app.stop_track();
// }
// }
// }
// });
// }
// #[derive(Debug)] // #[derive(Debug)]
// enum Input { // enum Input {
// PlayTrack { // PlayTrack {

View File

@ -53,7 +53,17 @@ impl crabidy_core::ProviderClient for Client {
Ok(manifest.urls) Ok(manifest.urls)
} }
fn get_library_root(&self) -> crabidy_core::proto::crabidy::LibraryNode { async fn get_metadata_for_track(
&self,
track_uuid: &str,
) -> Result<crabidy_core::proto::crabidy::Track, crabidy_core::ProviderError> {
let Ok(track) = self.get_track(track_uuid).await else {
return Err(crabidy_core::ProviderError::FetchError)
};
Ok(track.into())
}
fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode {
let global_root = crabidy_core::proto::crabidy::LibraryNode::new(); let global_root = crabidy_core::proto::crabidy::LibraryNode::new();
let children = vec!["userplaylists".to_string()]; let children = vec!["userplaylists".to_string()];
crabidy_core::proto::crabidy::LibraryNode { crabidy_core::proto::crabidy::LibraryNode {
@ -67,7 +77,7 @@ impl crabidy_core::ProviderClient for Client {
} }
} }
async fn get_library_node( async fn get_lib_node(
&self, &self,
uuid: &str, uuid: &str,
) -> Result<crabidy_core::proto::crabidy::LibraryNode, crabidy_core::ProviderError> { ) -> Result<crabidy_core::proto::crabidy::LibraryNode, crabidy_core::ProviderError> {
@ -353,7 +363,7 @@ impl Client {
.await .await
} }
pub async fn get_track(&self, track_id: String) -> Result<Track, ClientError> { pub async fn get_track(&self, track_id: &str) -> Result<Track, ClientError> {
self.make_request(&format!("tracks/{}", track_id), None) self.make_request(&format!("tracks/{}", track_id), None)
.await .await
} }