From 9e1efb886ac9de40821948a1fb45ebb2d3652022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Fri, 26 May 2023 19:11:55 +0200 Subject: [PATCH] Add first streaming server version --- Cargo.lock | 1 + crabidy-server/Cargo.toml | 1 + crabidy-server/src/main.rs | 221 ++++++++++--------------------------- tidaldy/src/lib.rs | 2 - 4 files changed, 58 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dea29cd..8cef92f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,6 +241,7 @@ dependencies = [ "serde_json", "tidaldy", "tokio", + "tokio-stream", "tonic", ] diff --git a/crabidy-server/Cargo.toml b/crabidy-server/Cargo.toml index 442bd2a..5859cbb 100644 --- a/crabidy-server/Cargo.toml +++ b/crabidy-server/Cargo.toml @@ -19,3 +19,4 @@ flume = "0.10.14" tonic = "0.9.2" async-trait = "0.1.68" futures = "0.3.28" +tokio-stream = "0.1.14" diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index 15642dd..5f46a95 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -14,10 +14,13 @@ use crabidy_core::proto::crabidy::{ Track, }; use crabidy_core::{ProviderClient, ProviderError}; +use futures::TryStreamExt; use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer}; +use tokio_stream::StreamExt; use std::{ fs, + pin::Pin, sync::{Arc, Mutex}, }; use tonic::{transport::Server, Request, Response, Result, Status}; @@ -40,26 +43,17 @@ async fn main() -> Result<(), Box> { bus.set_sync_handler(move |_, msg| { match PlayMessage::parse(msg) { Ok(PlayMessage::EndOfStream) => { - println!("End of stream"); playback_tx.send(PlaybackMessage::Next).unwrap(); - println!("Next messages was sent"); } Ok(PlayMessage::StateChanged { state }) => { - println!("State changed: {:?}", state); playback_tx .send(PlaybackMessage::StateChanged { state }) .unwrap(); } - Ok(PlayMessage::PositionUpdated { position }) => { - // println!("Position updated: {:?}", position); - } - Ok(PlayMessage::Buffering { percent }) => { - // println!("Position updated: {:?}", position); - } + Ok(PlayMessage::PositionUpdated { position }) => {} + Ok(PlayMessage::Buffering { percent }) => {} Ok(PlayMessage::VolumeChanged { volume }) => {} - Ok(PlayMessage::MuteChanged { muted }) => { - println!("Mute changed to muted: {:?}", muted); - } + Ok(PlayMessage::MuteChanged { muted }) => {} Ok(PlayMessage::MediaInfoUpdated { info }) => {} _ => println!("Unknown message: {:?}", msg), @@ -117,7 +111,6 @@ impl ProviderOrchestrator { fn run(self) { tokio::spawn(async move { while let Ok(msg) = self.provider_rx.recv_async().await { - println!("Orchestrator {:?}", msg); match msg { ProviderMessage::GetNode { uuid, result_tx } => { let result = self.get_lib_node(&uuid).await; @@ -290,7 +283,6 @@ impl Playback { fn run(self) { tokio::spawn(async move { while let Ok(message) = self.playback_rx.recv_async().await { - println!("Playback{:?}", message); match message { PlaybackMessage::ReplaceWithTrack { uuid } => { if let Ok(track) = self.get_track(&uuid).await { @@ -307,6 +299,8 @@ impl Playback { { let mut queue = self.queue.lock().unwrap(); queue.replace_with_tracks(&tracks); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } if tracks.len() > 0 { self.play(tracks[0].clone()).await; @@ -317,12 +311,16 @@ impl Playback { if let Ok(track) = self.get_track(&uuid).await { let mut queue = self.queue.lock().unwrap(); queue.queue_tracks(&[track]); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } } PlaybackMessage::QueueNode { uuid } => { let tracks = self.flatten_node(&uuid).await; let mut queue = self.queue.lock().unwrap(); queue.queue_tracks(&tracks); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } PlaybackMessage::GetQueue { result_tx } => { @@ -333,30 +331,40 @@ impl Playback { if let Ok(track) = self.get_track(&uuid).await { let mut queue = self.queue.lock().unwrap(); queue.append_tracks(&[track]); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } } PlaybackMessage::AppendNode { uuid } => { let tracks = self.flatten_node(&uuid).await; let mut queue = self.queue.lock().unwrap(); queue.append_tracks(&tracks); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } PlaybackMessage::ClearQueue => { let mut queue = self.queue.lock().unwrap(); queue.replace_with_tracks(&vec![]); - self.stop_track() + self.stop_track(); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } //TODO handle deletion of current track PlaybackMessage::RemoveTracks { positions } => { let mut queue = self.queue.lock().unwrap(); queue.remove_tracks(&positions); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); } PlaybackMessage::SetCurrent { position } => { let result = { let mut queue = self.queue.lock().unwrap(); queue.set_current(position); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); queue.current() }; @@ -373,6 +381,8 @@ impl Playback { let mut queue = self.queue.lock().unwrap(); let position = queue.current + 1; let stop = !queue.set_current(position); + let queue_update_tx = self.queue_update_tx.clone(); + queue_update_tx.send(queue.clone()).unwrap(); (queue.current(), stop) }; @@ -388,17 +398,22 @@ impl Playback { let mut state = self.state.lock().unwrap(); if *state == PlayState::Playing { self.play.pause(); - *state = PlayState::Paused + // *state = PlayState::Paused } else { self.play.play(); - *state = PlayState::Playing + // *state = PlayState::Playing } } PlaybackMessage::Stop => { self.play.stop(); - *self.state.lock().unwrap() = PlayState::Stopped; + // *self.state.lock().unwrap() = PlayState::Stopped; + } + PlaybackMessage::StateChanged { state } => { + *self.state.lock().unwrap() = state.clone(); + let active_track_tx = self.active_track_tx.clone(); + let active_track = self.get_active_track().await; + active_track_tx.send(active_track).unwrap(); } - PlaybackMessage::StateChanged { state } => *self.state.lock().unwrap() = state, } } }); @@ -472,9 +487,10 @@ impl Playback { async fn play(&self, track: Track) { let Ok(urls) = self.get_urls_for_track(&track.uuid).await else { + let playback_tx = self.playback_tx.clone(); + playback_tx.send(PlaybackMessage::Next).unwrap(); return }; - println!("Playing urls {:?}", urls); { let mut state_guard = self.state.lock().unwrap(); *state_guard = PlayState::Playing; @@ -484,7 +500,6 @@ impl Playback { } fn stop_track(&self) { - println!("Stopping"); { let mut state_guard = self.state.lock().unwrap(); *state_guard = PlayState::Stopped; @@ -495,11 +510,9 @@ impl Playback { fn playpause(&self) { let mut state_guard = self.state.lock().unwrap(); if *state_guard == PlayState::Playing { - println!("Pausing"); *state_guard = PlayState::Paused; self.play.pause(); } else { - println!("Playing"); *state_guard = PlayState::Playing; self.play.play() } @@ -533,15 +546,15 @@ impl RpcService { #[tonic::async_trait] impl CrabidyService for RpcService { type GetQueueUpdatesStream = - futures::stream::Iter>>; + Pin> + Send>>; + type GetTrackUpdatesStream = - futures::stream::Iter>>; + Pin> + Send>>; async fn get_library_node( &self, request: Request, ) -> Result, Status> { - println!("Got a library node request: {:?}", request); let provider_tx = self.provider_tx.clone(); let (result_tx, result_rx) = flume::bounded(1); @@ -556,7 +569,6 @@ impl CrabidyService for RpcService { .recv_async() .await .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; - println!("Got a library node response: {:?}", result); match result { Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })), Err(err) => Err(Status::internal(err.to_string())), @@ -566,7 +578,6 @@ impl CrabidyService for RpcService { &self, request: Request, ) -> Result, Status> { - println!("Got a library track request: {:?}", request); let provider_tx = self.provider_tx.clone(); let (result_tx, result_rx) = flume::bounded(1); @@ -581,7 +592,6 @@ impl CrabidyService for RpcService { .recv_async() .await .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; - println!("Got a library node response: {:?}", result); match result { Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })), Err(err) => Err(Status::internal(err.to_string())), @@ -625,7 +635,6 @@ impl CrabidyService for RpcService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - println!("Got a replace with track request: {:?}", request); let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx @@ -642,7 +651,6 @@ impl CrabidyService for RpcService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - println!("Got a replace with node request: {:?}", request); let playback_tx = self.playback_tx.clone(); let req = request.into_inner(); playback_tx @@ -723,22 +731,10 @@ impl CrabidyService for RpcService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - let queue_vec: Vec> = vec![ - Ok(GetQueueUpdatesResponse { - queue_update_result: Some(QueueUpdateResult::PositionChange(QueuePositionChange { - timestamp: 12345, - new_position: 42, - })), - }), - Ok(GetQueueUpdatesResponse { - queue_update_result: Some(QueueUpdateResult::PositionChange(QueuePositionChange { - timestamp: 6666, - new_position: 11, - })), - }), - ]; - let output_stream = futures::stream::iter(queue_vec.into_iter()); - Ok(Response::new(output_stream)) + let update_rx = self.queue_update_rx.clone(); + + let output_stream = update_rx.into_stream().map(map_queue); + Ok(Response::new(Box::pin(output_stream))) } async fn get_queue( @@ -791,124 +787,19 @@ impl CrabidyService for RpcService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - let track_vec: Vec> = vec![]; - let output_stream = futures::stream::iter(track_vec.into_iter()); - Ok(Response::new(output_stream)) + let update_rx = self.active_track_rx.clone(); + + let output_stream = update_rx.into_stream().map(|active_track| { + Ok(GetTrackUpdatesResponse { + active_track: Some(active_track), + }) + }); + Ok(Response::new(Box::pin(output_stream))) } } -// async fn listen_for_events(app: RpcService, rx: flume::Receiver<()>) { -// tokio::spawn(async move { -// println!("Listening for events"); -// loop { -// println!("Waiting for next message"); -// while let Ok(_) = rx.recv_async().await { -// println!("Received next message"); -// let track_result = { -// let mut queue_guard = app.queue.lock().unwrap(); -// println!("{:?}", queue_guard); -// queue_guard.next() -// }; -// println!("{:?}", track_result); -// if let Some(track) = track_result { -// println!("Playing next track {:?}", track); -// app.play(Some(track)).await; -// } else { -// println!("Stopping at end of playlist"); -// app.stop_track(); -// } -// } -// } -// }); -// } - -// #[derive(Debug)] -// enum Input { -// PlayTrack { -// track_id: String, -// }, -// StopTrack { -// track_id: String, -// }, -// GetTrack { -// track_id: String, -// response: tokio::sync::oneshot::Sender, -// }, -// GetPlaylistList { -// response: tokio::sync::oneshot::Sender>, -// }, -// TrackOver, -// } - -// async fn run() -> Result<(), Error> { -// gstreamer::init().unwrap(); - -// let play = Play::new(None::); -// let bus = play.message_bus(); -// let (tx, rx) = flume::bounded(64); -// let bus_tx = tx.clone(); -// bus.set_sync_handler(move |_, msg| { -// match PlayMessage::parse(msg) { -// Ok(PlayMessage::EndOfStream) => {} -// Ok(PlayMessage::StateChanged { state }) => { -// println!("State changed: {:?}", state); -// } -// Ok(PlayMessage::PositionUpdated { position }) => { -// println!("Position updated: {:?}", position); -// } -// _ => {} -// } -// gstreamer::BusSyncReply::Drop -// }); -// let mut state = PlayState::Stopped; -// CHANNEL.set(tx).unwrap(); - -// while let Ok(input) = rx.recv_async().await { -// match (&mut state, input) { -// (_, Input::TrackOver) => { -// state = PlayState::Stopped; -// println!("Track stopped"); -// } -// (_, Input::StopTrack { track_id }) => { -// println!("Stopping track {}", track_id); -// play.stop(); -// state = PlayState::Stopped; -// } -// (_, Input::GetTrack { track_id, response }) => { -// let track = client.get_track(track_id).await.unwrap(); -// response.send(track).unwrap(); -// } -// (_, Input::GetPlaylistList { response }) => { -// println!("Getting playlists"); -// let user_id = client.get_user_id().unwrap(); -// println!("Getting playlists for user {}", user_id); -// let list = client -// .get_users_playlists_and_favorite_playlists(&user_id) -// .await -// .unwrap(); -// response.send(list).unwrap(); -// } -// (PlayState::Stopped, Input::PlayTrack { track_id }) => { -// println!("Playing track {}", track_id); -// let track_playback = client.get_track_playback(&track_id).await.unwrap(); -// let manifest = track_playback.get_manifest().unwrap(); -// play.set_uri(Some(&manifest.urls[0])); -// play.play(); -// state = PlayState::Playing; -// } -// (PlayState::Paused, Input::PlayTrack { track_id }) => { -// println!("Unpausing track {}", track_id); -// play.play(); -// state = PlayState::Playing; -// } -// (PlayState::Playing, Input::PlayTrack { track_id }) => { -// println!("Pausing track {}", track_id); -// play.pause(); -// state = PlayState::Paused; -// } -// _ => {} -// } -// } -// print!("done"); -// Ok(()) -// } +fn map_queue(queue: Queue) -> Result { + Ok(GetQueueUpdatesResponse { + queue_update_result: Some(QueueUpdateResult::Full(queue)), + }) +} diff --git a/tidaldy/src/lib.rs b/tidaldy/src/lib.rs index 6d846b1..cd47c71 100644 --- a/tidaldy/src/lib.rs +++ b/tidaldy/src/lib.rs @@ -380,7 +380,6 @@ impl Client { while now.elapsed().as_secs() <= code_response.expires_in { let login = self.check_auth_status(&code_response.device_code).await; if login.is_err() { - // println!("login failed with {:?}", login); sleep(Duration::from_secs(code_response.interval)).await; continue; } @@ -510,7 +509,6 @@ impl Client { .header("Content-Type", "application/x-www-form-urlencoded") .send() .await?; - // println!("{:#?} -> {}", res.status(), res.status().is_success()); if !res.status().is_success() { if res.status().is_client_error() { return Err(ClientError::AuthError(format!(