From cce9e89eeb80dd65ccb16be514be42470d9bb65a Mon Sep 17 00:00:00 2001 From: chmanie Date: Sat, 27 May 2023 19:15:21 +0200 Subject: [PATCH] Reconnect streams after connection failure --- cbd-tui/src/main.rs | 121 ++++++++++++++++++++++++++------------------ cbd-tui/src/rpc.rs | 93 ++++++++++++++++++++-------------- 2 files changed, 127 insertions(+), 87 deletions(-) diff --git a/cbd-tui/src/main.rs b/cbd-tui/src/main.rs index 281dc13..3142897 100644 --- a/cbd-tui/src/main.rs +++ b/cbd-tui/src/main.rs @@ -25,15 +25,16 @@ use ratatui::{ }; use rpc::RpcClient; use std::{ + cell::RefCell, collections::HashMap, error::Error, fmt, io, println, thread, time::{Duration, Instant}, vec, }; -use tokio::{select, signal, task}; +use tokio::{fs, select, signal, task}; use tokio_stream::StreamExt; -use tonic::{transport::Channel, Request, Streaming}; +use tonic::{transport::Channel, Request, Status, Streaming}; trait ListView { fn get_size(&self) -> usize; @@ -360,62 +361,82 @@ enum MessageFromUi { TogglePlay, } +async fn poll( + rpc_client: &mut RpcClient, + rx: &Receiver, + tx: &Sender, +) -> Result<(), Box> { + select! { + Ok(msg) = &mut rx.recv_async() => { + match msg { + MessageFromUi::Quit => { + return Ok(()); + }, + MessageFromUi::GetLibraryNode(uuid) => { + if let Some(node) = rpc_client.get_library_node(&uuid).await? { + tx.send(MessageToUi::ReplaceLibraryNode(node.clone())); + } + }, + MessageFromUi::ReplaceWithItem(uuid, kind) => { + match kind { + UiItemKind::Node => { + rpc_client.replace_queue_with_node(&uuid).await? + } + UiItemKind::Track => { + rpc_client.replace_queue_with_track(&uuid).await? + } + } + } + MessageFromUi::TogglePlay => { + rpc_client.toggle_play().await? + } + MessageFromUi::SetCurrentTrack(pos) => { + rpc_client.set_current_track(pos).await? + } + + } + } + Some(resp) = rpc_client.queue_updates_stream.next() => { + match resp { + Ok(resp) => { + if let Some(res) = resp.queue_update_result { + tx.send_async(MessageToUi::QueueStreamUpdate(res)).await?; + } + } + Err(_) => { + rpc_client.reconnect_queue_updates_stream().await; + } + + } + } + Some(resp) = rpc_client.track_updates_stream.next() => { + match resp { + Ok(resp) => { + if let Some(active_track) = resp.active_track { + tx.send_async(MessageToUi::TrackStreamUpdate(active_track)).await?; + } + } + Err(_) => { + rpc_client.reconnect_track_updates_stream().await; + } + } + } + } + + Ok(()) +} + async fn orchestrate<'a>( (tx, rx): (Sender, Receiver), ) -> Result<(), Box> { - let mut rpc_client = rpc::RpcClient::connect("http://192.168.178.32:50051").await?; + let mut rpc_client = rpc::RpcClient::connect("http://127.0.0.1:50051").await?; if let Some(root_node) = rpc_client.get_library_node("/").await? { tx.send(MessageToUi::ReplaceLibraryNode(root_node.clone())); } - // FIXME: stream failures, do we need to re-establish the stream? - let mut queue_update_stream = rpc_client.get_queue_updates_stream().await?; - let mut track_update_stream = rpc_client.get_track_updates_stream().await?; - loop { - select! { - Ok(msg) = &mut rx.recv_async() => { - match msg { - MessageFromUi::Quit => { - break Ok(()); - }, - MessageFromUi::GetLibraryNode(uuid) => { - if let Some(node) = rpc_client.get_library_node(&uuid).await? { - tx.send(MessageToUi::ReplaceLibraryNode(node.clone())); - } - }, - MessageFromUi::ReplaceWithItem(uuid, kind) => { - match kind { - UiItemKind::Node => { - rpc_client.replace_queue_with_node(&uuid).await? - } - UiItemKind::Track => { - rpc_client.replace_queue_with_track(&uuid).await? - } - } - } - MessageFromUi::TogglePlay => { - rpc_client.toggle_play().await? - } - MessageFromUi::SetCurrentTrack(pos) => { - rpc_client.set_current_track(pos).await? - } - - } - } - Some(Ok(resp)) = queue_update_stream.next() => { - if let Some(res) = resp.queue_update_result { - tx.send_async(MessageToUi::QueueStreamUpdate(res)).await; - } - } - Some(Ok(resp)) = track_update_stream.next() => { - if let Some(active_track) = resp.active_track { - tx.send_async(MessageToUi::TrackStreamUpdate(active_track)).await; - } - - } - } + poll(&mut rpc_client, &rx, &tx).await.ok(); } } @@ -429,7 +450,7 @@ async fn main() -> Result<(), Box> { }); // FIXME: unwrap - tokio::spawn(async move { orchestrate((tx, rx)).await.unwrap() }); + tokio::spawn(async move { orchestrate((tx, rx)).await.ok() }); signal::ctrl_c().await.unwrap(); diff --git a/cbd-tui/src/rpc.rs b/cbd-tui/src/rpc.rs index a9ee190..7046373 100644 --- a/cbd-tui/src/rpc.rs +++ b/cbd-tui/src/rpc.rs @@ -9,13 +9,16 @@ use crabidy_core::proto::crabidy::{ use std::{ collections::HashMap, error::Error, - fmt, io, println, thread, + fmt, io, mem, println, thread, time::{Duration, Instant}, vec, }; use tokio::task; use tokio_stream::StreamExt; -use tonic::{transport::Channel, Request, Streaming}; +use tonic::{ + transport::{Channel, Endpoint}, + Request, Streaming, +}; #[derive(Debug)] enum RpcClientError { @@ -35,18 +38,67 @@ impl Error for RpcClientError {} pub struct RpcClient { library_node_cache: HashMap, client: CrabidyServiceClient, + pub queue_updates_stream: Streaming, + pub track_updates_stream: Streaming, } impl RpcClient { - pub async fn connect(addr: &'static str) -> Result { - let client = CrabidyServiceClient::connect(addr).await?; + pub async fn connect(addr: &'static str) -> Result> { + let endpoint = Endpoint::from_static(addr).connect_lazy(); + let mut client = CrabidyServiceClient::new(endpoint); + + let queue_updates_stream = Self::get_queue_updates_stream(&mut client).await; + let track_updates_stream = Self::get_track_updates_stream(&mut client).await; let library_node_cache: HashMap = HashMap::new(); + Ok(RpcClient { client, library_node_cache, + track_updates_stream, + queue_updates_stream, }) } + async fn get_queue_updates_stream( + client: &mut CrabidyServiceClient, + ) -> Streaming { + loop { + let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 }); + if let Ok(resp) = client.get_queue_updates(get_queue_updates_request).await { + return resp.into_inner(); + } else { + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + + async fn get_track_updates_stream( + client: &mut CrabidyServiceClient, + ) -> Streaming { + loop { + let get_track_updates_request = Request::new(GetTrackUpdatesRequest { + type_whitelist: Vec::new(), + type_blacklist: Vec::new(), + updates_skipped: 0, + }); + if let Ok(resp) = client.get_track_updates(get_track_updates_request).await { + return resp.into_inner(); + } else { + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + + pub async fn reconnect_queue_updates_stream(&mut self) { + let queue_updates_stream = Self::get_queue_updates_stream(&mut self.client).await; + mem::replace(&mut self.queue_updates_stream, queue_updates_stream); + } + + pub async fn reconnect_track_updates_stream(&mut self) { + let track_updates_stream = Self::get_track_updates_stream(&mut self.client).await; + mem::replace(&mut self.track_updates_stream, track_updates_stream); + } + pub async fn get_library_node( &mut self, uuid: &str, @@ -67,45 +119,12 @@ impl RpcClient { if let Some(library_node) = response.into_inner().node { self.library_node_cache .insert(uuid.to_string(), library_node); - // FIXME: is that necessary? return Ok(self.library_node_cache.get(uuid)); } Err(Box::new(RpcClientError::NotFound)) } - pub async fn get_queue_updates_stream( - &mut self, - ) -> Result, Box> { - // FIXME: Adjust request params to what we need - let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 }); - - let stream = self - .client - .get_queue_updates(get_queue_updates_request) - .await? - .into_inner(); - Ok(stream) - } - - pub async fn get_track_updates_stream( - &mut self, - ) -> Result, Box> { - // FIXME: Adjust request params to what we need - let get_queue_updates_request = Request::new(GetTrackUpdatesRequest { - type_whitelist: Vec::new(), - type_blacklist: Vec::new(), - updates_skipped: 0, - }); - - let stream = self - .client - .get_track_updates(get_queue_updates_request) - .await? - .into_inner(); - Ok(stream) - } - pub async fn replace_queue_with_node(&mut self, uuid: &str) -> Result<(), Box> { let replace_with_node_request = Request::new(ReplaceWithNodeRequest { uuid: uuid.to_string(),