Add first streaming server version

This commit is contained in:
Hans Mündelein 2023-05-26 19:11:55 +02:00
parent 12aafe2dc0
commit 9e1efb886a
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
4 changed files with 58 additions and 167 deletions

1
Cargo.lock generated
View File

@ -241,6 +241,7 @@ dependencies = [
"serde_json", "serde_json",
"tidaldy", "tidaldy",
"tokio", "tokio",
"tokio-stream",
"tonic", "tonic",
] ]

View File

@ -19,3 +19,4 @@ flume = "0.10.14"
tonic = "0.9.2" tonic = "0.9.2"
async-trait = "0.1.68" async-trait = "0.1.68"
futures = "0.3.28" futures = "0.3.28"
tokio-stream = "0.1.14"

View File

@ -14,10 +14,13 @@ use crabidy_core::proto::crabidy::{
Track, Track,
}; };
use crabidy_core::{ProviderClient, ProviderError}; use crabidy_core::{ProviderClient, ProviderError};
use futures::TryStreamExt;
use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer}; use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer};
use tokio_stream::StreamExt;
use std::{ use std::{
fs, fs,
pin::Pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use tonic::{transport::Server, Request, Response, Result, Status}; use tonic::{transport::Server, Request, Response, Result, Status};
@ -40,26 +43,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bus.set_sync_handler(move |_, msg| { bus.set_sync_handler(move |_, msg| {
match PlayMessage::parse(msg) { match PlayMessage::parse(msg) {
Ok(PlayMessage::EndOfStream) => { Ok(PlayMessage::EndOfStream) => {
println!("End of stream");
playback_tx.send(PlaybackMessage::Next).unwrap(); playback_tx.send(PlaybackMessage::Next).unwrap();
println!("Next messages was sent");
} }
Ok(PlayMessage::StateChanged { state }) => { Ok(PlayMessage::StateChanged { state }) => {
println!("State changed: {:?}", state);
playback_tx playback_tx
.send(PlaybackMessage::StateChanged { state }) .send(PlaybackMessage::StateChanged { state })
.unwrap(); .unwrap();
} }
Ok(PlayMessage::PositionUpdated { position }) => { Ok(PlayMessage::PositionUpdated { position }) => {}
// println!("Position updated: {:?}", position); Ok(PlayMessage::Buffering { percent }) => {}
}
Ok(PlayMessage::Buffering { percent }) => {
// println!("Position updated: {:?}", position);
}
Ok(PlayMessage::VolumeChanged { volume }) => {} Ok(PlayMessage::VolumeChanged { volume }) => {}
Ok(PlayMessage::MuteChanged { muted }) => { Ok(PlayMessage::MuteChanged { muted }) => {}
println!("Mute changed to muted: {:?}", muted);
}
Ok(PlayMessage::MediaInfoUpdated { info }) => {} Ok(PlayMessage::MediaInfoUpdated { info }) => {}
_ => println!("Unknown message: {:?}", msg), _ => println!("Unknown message: {:?}", msg),
@ -117,7 +111,6 @@ impl ProviderOrchestrator {
fn run(self) { fn run(self) {
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(msg) = self.provider_rx.recv_async().await { while let Ok(msg) = self.provider_rx.recv_async().await {
println!("Orchestrator {:?}", msg);
match msg { match msg {
ProviderMessage::GetNode { uuid, result_tx } => { ProviderMessage::GetNode { uuid, result_tx } => {
let result = self.get_lib_node(&uuid).await; let result = self.get_lib_node(&uuid).await;
@ -290,7 +283,6 @@ impl Playback {
fn run(self) { fn run(self) {
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(message) = self.playback_rx.recv_async().await { while let Ok(message) = self.playback_rx.recv_async().await {
println!("Playback{:?}", message);
match message { match message {
PlaybackMessage::ReplaceWithTrack { uuid } => { PlaybackMessage::ReplaceWithTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await { if let Ok(track) = self.get_track(&uuid).await {
@ -307,6 +299,8 @@ impl Playback {
{ {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&tracks); queue.replace_with_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
if tracks.len() > 0 { if tracks.len() > 0 {
self.play(tracks[0].clone()).await; self.play(tracks[0].clone()).await;
@ -317,12 +311,16 @@ impl Playback {
if let Ok(track) = self.get_track(&uuid).await { if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&[track]); queue.queue_tracks(&[track]);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
} }
PlaybackMessage::QueueNode { uuid } => { PlaybackMessage::QueueNode { uuid } => {
let tracks = self.flatten_node(&uuid).await; let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&tracks); queue.queue_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
PlaybackMessage::GetQueue { result_tx } => { PlaybackMessage::GetQueue { result_tx } => {
@ -333,30 +331,40 @@ impl Playback {
if let Ok(track) = self.get_track(&uuid).await { if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&[track]); queue.append_tracks(&[track]);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
} }
PlaybackMessage::AppendNode { uuid } => { PlaybackMessage::AppendNode { uuid } => {
let tracks = self.flatten_node(&uuid).await; let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&tracks); queue.append_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
PlaybackMessage::ClearQueue => { PlaybackMessage::ClearQueue => {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&vec![]); queue.replace_with_tracks(&vec![]);
self.stop_track() self.stop_track();
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
//TODO handle deletion of current track //TODO handle deletion of current track
PlaybackMessage::RemoveTracks { positions } => { PlaybackMessage::RemoveTracks { positions } => {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.remove_tracks(&positions); queue.remove_tracks(&positions);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
} }
PlaybackMessage::SetCurrent { position } => { PlaybackMessage::SetCurrent { position } => {
let result = { let result = {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
queue.set_current(position); queue.set_current(position);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
queue.current() queue.current()
}; };
@ -373,6 +381,8 @@ impl Playback {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
let position = queue.current + 1; let position = queue.current + 1;
let stop = !queue.set_current(position); let stop = !queue.set_current(position);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
(queue.current(), stop) (queue.current(), stop)
}; };
@ -388,17 +398,22 @@ impl Playback {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if *state == PlayState::Playing { if *state == PlayState::Playing {
self.play.pause(); self.play.pause();
*state = PlayState::Paused // *state = PlayState::Paused
} else { } else {
self.play.play(); self.play.play();
*state = PlayState::Playing // *state = PlayState::Playing
} }
} }
PlaybackMessage::Stop => { PlaybackMessage::Stop => {
self.play.stop(); self.play.stop();
*self.state.lock().unwrap() = PlayState::Stopped; // *self.state.lock().unwrap() = PlayState::Stopped;
}
PlaybackMessage::StateChanged { state } => {
*self.state.lock().unwrap() = state.clone();
let active_track_tx = self.active_track_tx.clone();
let active_track = self.get_active_track().await;
active_track_tx.send(active_track).unwrap();
} }
PlaybackMessage::StateChanged { state } => *self.state.lock().unwrap() = state,
} }
} }
}); });
@ -472,9 +487,10 @@ impl Playback {
async fn play(&self, track: Track) { async fn play(&self, track: Track) {
let Ok(urls) = self.get_urls_for_track(&track.uuid).await else { let Ok(urls) = self.get_urls_for_track(&track.uuid).await else {
let playback_tx = self.playback_tx.clone();
playback_tx.send(PlaybackMessage::Next).unwrap();
return return
}; };
println!("Playing urls {:?}", urls);
{ {
let mut state_guard = self.state.lock().unwrap(); let mut state_guard = self.state.lock().unwrap();
*state_guard = PlayState::Playing; *state_guard = PlayState::Playing;
@ -484,7 +500,6 @@ impl Playback {
} }
fn stop_track(&self) { fn stop_track(&self) {
println!("Stopping");
{ {
let mut state_guard = self.state.lock().unwrap(); let mut state_guard = self.state.lock().unwrap();
*state_guard = PlayState::Stopped; *state_guard = PlayState::Stopped;
@ -495,11 +510,9 @@ impl Playback {
fn playpause(&self) { fn playpause(&self) {
let mut state_guard = self.state.lock().unwrap(); let mut state_guard = self.state.lock().unwrap();
if *state_guard == PlayState::Playing { if *state_guard == PlayState::Playing {
println!("Pausing");
*state_guard = PlayState::Paused; *state_guard = PlayState::Paused;
self.play.pause(); self.play.pause();
} else { } else {
println!("Playing");
*state_guard = PlayState::Playing; *state_guard = PlayState::Playing;
self.play.play() self.play.play()
} }
@ -533,15 +546,15 @@ impl RpcService {
#[tonic::async_trait] #[tonic::async_trait]
impl CrabidyService for RpcService { impl CrabidyService for RpcService {
type GetQueueUpdatesStream = type GetQueueUpdatesStream =
futures::stream::Iter<std::vec::IntoIter<Result<GetQueueUpdatesResponse, Status>>>; Pin<Box<dyn tokio_stream::Stream<Item = Result<GetQueueUpdatesResponse, Status>> + Send>>;
type GetTrackUpdatesStream = type GetTrackUpdatesStream =
futures::stream::Iter<std::vec::IntoIter<Result<GetTrackUpdatesResponse, Status>>>; Pin<Box<dyn tokio_stream::Stream<Item = Result<GetTrackUpdatesResponse, Status>> + Send>>;
async fn get_library_node( async fn get_library_node(
&self, &self,
request: Request<GetLibraryNodeRequest>, request: Request<GetLibraryNodeRequest>,
) -> Result<Response<GetLibraryNodeResponse>, Status> { ) -> Result<Response<GetLibraryNodeResponse>, Status> {
println!("Got a library node request: {:?}", request);
let provider_tx = self.provider_tx.clone(); let provider_tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1); let (result_tx, result_rx) = flume::bounded(1);
@ -556,7 +569,6 @@ impl CrabidyService for RpcService {
.recv_async() .recv_async()
.await .await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?; .map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
println!("Got a library node response: {:?}", result);
match result { match result {
Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })), Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })),
Err(err) => Err(Status::internal(err.to_string())), Err(err) => Err(Status::internal(err.to_string())),
@ -566,7 +578,6 @@ impl CrabidyService for RpcService {
&self, &self,
request: Request<GetTrackRequest>, request: Request<GetTrackRequest>,
) -> Result<Response<GetTrackResponse>, Status> { ) -> Result<Response<GetTrackResponse>, Status> {
println!("Got a library track request: {:?}", request);
let provider_tx = self.provider_tx.clone(); let provider_tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1); let (result_tx, result_rx) = flume::bounded(1);
@ -581,7 +592,6 @@ impl CrabidyService for RpcService {
.recv_async() .recv_async()
.await .await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?; .map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
println!("Got a library node response: {:?}", result);
match result { match result {
Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })), Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })),
Err(err) => Err(Status::internal(err.to_string())), Err(err) => Err(Status::internal(err.to_string())),
@ -625,7 +635,6 @@ impl CrabidyService for RpcService {
&self, &self,
request: tonic::Request<ReplaceWithTrackRequest>, request: tonic::Request<ReplaceWithTrackRequest>,
) -> std::result::Result<tonic::Response<ReplaceWithTrackResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<ReplaceWithTrackResponse>, tonic::Status> {
println!("Got a replace with track request: {:?}", request);
let playback_tx = self.playback_tx.clone(); let playback_tx = self.playback_tx.clone();
let req = request.into_inner(); let req = request.into_inner();
playback_tx playback_tx
@ -642,7 +651,6 @@ impl CrabidyService for RpcService {
&self, &self,
request: tonic::Request<ReplaceWithNodeRequest>, request: tonic::Request<ReplaceWithNodeRequest>,
) -> std::result::Result<tonic::Response<ReplaceWithNodeResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<ReplaceWithNodeResponse>, tonic::Status> {
println!("Got a replace with node request: {:?}", request);
let playback_tx = self.playback_tx.clone(); let playback_tx = self.playback_tx.clone();
let req = request.into_inner(); let req = request.into_inner();
playback_tx playback_tx
@ -723,22 +731,10 @@ impl CrabidyService for RpcService {
&self, &self,
request: tonic::Request<GetQueueUpdatesRequest>, request: tonic::Request<GetQueueUpdatesRequest>,
) -> std::result::Result<tonic::Response<Self::GetQueueUpdatesStream>, tonic::Status> { ) -> std::result::Result<tonic::Response<Self::GetQueueUpdatesStream>, tonic::Status> {
let queue_vec: Vec<Result<GetQueueUpdatesResponse, Status>> = vec![ let update_rx = self.queue_update_rx.clone();
Ok(GetQueueUpdatesResponse {
queue_update_result: Some(QueueUpdateResult::PositionChange(QueuePositionChange { let output_stream = update_rx.into_stream().map(map_queue);
timestamp: 12345, Ok(Response::new(Box::pin(output_stream)))
new_position: 42,
})),
}),
Ok(GetQueueUpdatesResponse {
queue_update_result: Some(QueueUpdateResult::PositionChange(QueuePositionChange {
timestamp: 6666,
new_position: 11,
})),
}),
];
let output_stream = futures::stream::iter(queue_vec.into_iter());
Ok(Response::new(output_stream))
} }
async fn get_queue( async fn get_queue(
@ -791,124 +787,19 @@ impl CrabidyService for RpcService {
&self, &self,
request: tonic::Request<GetTrackUpdatesRequest>, request: tonic::Request<GetTrackUpdatesRequest>,
) -> std::result::Result<tonic::Response<Self::GetTrackUpdatesStream>, tonic::Status> { ) -> std::result::Result<tonic::Response<Self::GetTrackUpdatesStream>, tonic::Status> {
let track_vec: Vec<Result<GetTrackUpdatesResponse, Status>> = vec![]; let update_rx = self.active_track_rx.clone();
let output_stream = futures::stream::iter(track_vec.into_iter());
Ok(Response::new(output_stream)) let output_stream = update_rx.into_stream().map(|active_track| {
Ok(GetTrackUpdatesResponse {
active_track: Some(active_track),
})
});
Ok(Response::new(Box::pin(output_stream)))
} }
} }
// async fn listen_for_events(app: RpcService, rx: flume::Receiver<()>) { fn map_queue(queue: Queue) -> Result<GetQueueUpdatesResponse, Status> {
// tokio::spawn(async move { Ok(GetQueueUpdatesResponse {
// println!("Listening for events"); queue_update_result: Some(QueueUpdateResult::Full(queue)),
// loop { })
// println!("Waiting for next message"); }
// while let Ok(_) = rx.recv_async().await {
// println!("Received next message");
// let track_result = {
// let mut queue_guard = app.queue.lock().unwrap();
// println!("{:?}", queue_guard);
// queue_guard.next()
// };
// println!("{:?}", track_result);
// if let Some(track) = track_result {
// println!("Playing next track {:?}", track);
// app.play(Some(track)).await;
// } else {
// println!("Stopping at end of playlist");
// app.stop_track();
// }
// }
// }
// });
// }
// #[derive(Debug)]
// enum Input {
// PlayTrack {
// track_id: String,
// },
// StopTrack {
// track_id: String,
// },
// GetTrack {
// track_id: String,
// response: tokio::sync::oneshot::Sender<tidaldy::Track>,
// },
// GetPlaylistList {
// response: tokio::sync::oneshot::Sender<Vec<tidaldy::PlaylistAndFavorite>>,
// },
// TrackOver,
// }
// async fn run() -> Result<(), Error> {
// gstreamer::init().unwrap();
// let play = Play::new(None::<PlayVideoRenderer>);
// let bus = play.message_bus();
// let (tx, rx) = flume::bounded(64);
// let bus_tx = tx.clone();
// bus.set_sync_handler(move |_, msg| {
// match PlayMessage::parse(msg) {
// Ok(PlayMessage::EndOfStream) => {}
// Ok(PlayMessage::StateChanged { state }) => {
// println!("State changed: {:?}", state);
// }
// Ok(PlayMessage::PositionUpdated { position }) => {
// println!("Position updated: {:?}", position);
// }
// _ => {}
// }
// gstreamer::BusSyncReply::Drop
// });
// let mut state = PlayState::Stopped;
// CHANNEL.set(tx).unwrap();
// while let Ok(input) = rx.recv_async().await {
// match (&mut state, input) {
// (_, Input::TrackOver) => {
// state = PlayState::Stopped;
// println!("Track stopped");
// }
// (_, Input::StopTrack { track_id }) => {
// println!("Stopping track {}", track_id);
// play.stop();
// state = PlayState::Stopped;
// }
// (_, Input::GetTrack { track_id, response }) => {
// let track = client.get_track(track_id).await.unwrap();
// response.send(track).unwrap();
// }
// (_, Input::GetPlaylistList { response }) => {
// println!("Getting playlists");
// let user_id = client.get_user_id().unwrap();
// println!("Getting playlists for user {}", user_id);
// let list = client
// .get_users_playlists_and_favorite_playlists(&user_id)
// .await
// .unwrap();
// response.send(list).unwrap();
// }
// (PlayState::Stopped, Input::PlayTrack { track_id }) => {
// println!("Playing track {}", track_id);
// let track_playback = client.get_track_playback(&track_id).await.unwrap();
// let manifest = track_playback.get_manifest().unwrap();
// play.set_uri(Some(&manifest.urls[0]));
// play.play();
// state = PlayState::Playing;
// }
// (PlayState::Paused, Input::PlayTrack { track_id }) => {
// println!("Unpausing track {}", track_id);
// play.play();
// state = PlayState::Playing;
// }
// (PlayState::Playing, Input::PlayTrack { track_id }) => {
// println!("Pausing track {}", track_id);
// play.pause();
// state = PlayState::Paused;
// }
// _ => {}
// }
// }
// print!("done");
// Ok(())
// }

View File

@ -380,7 +380,6 @@ impl Client {
while now.elapsed().as_secs() <= code_response.expires_in { while now.elapsed().as_secs() <= code_response.expires_in {
let login = self.check_auth_status(&code_response.device_code).await; let login = self.check_auth_status(&code_response.device_code).await;
if login.is_err() { if login.is_err() {
// println!("login failed with {:?}", login);
sleep(Duration::from_secs(code_response.interval)).await; sleep(Duration::from_secs(code_response.interval)).await;
continue; continue;
} }
@ -510,7 +509,6 @@ impl Client {
.header("Content-Type", "application/x-www-form-urlencoded") .header("Content-Type", "application/x-www-form-urlencoded")
.send() .send()
.await?; .await?;
// println!("{:#?} -> {}", res.status(), res.status().is_success());
if !res.status().is_success() { if !res.status().is_success() {
if res.status().is_client_error() { if res.status().is_client_error() {
return Err(ClientError::AuthError(format!( return Err(ClientError::AuthError(format!(