diff --git a/cbd-tui/src/main.rs b/cbd-tui/src/main.rs index 8b5d298..47b3025 100644 --- a/cbd-tui/src/main.rs +++ b/cbd-tui/src/main.rs @@ -1,6 +1,9 @@ +mod rpc; + use crabidy_core::proto::crabidy::{ crabidy_service_client::CrabidyServiceClient, get_queue_updates_response::QueueUpdateResult, - GetLibraryNodeRequest, GetQueueUpdatesRequest, LibraryNode, LibraryNodeState, + GetLibraryNodeRequest, GetQueueUpdatesRequest, GetQueueUpdatesResponse, + GetTrackUpdatesResponse, LibraryNode, LibraryNodeState, }; use crossterm::{ @@ -17,15 +20,18 @@ use ratatui::{ widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap}, Frame, Terminal, }; +use rpc::RpcClient; use std::{ + collections::HashMap, error::Error, - io, println, thread, + fmt, io, println, thread, time::{Duration, Instant}, vec, }; -use tokio::task; +use tokio::{select, task}; use tokio_stream::StreamExt; -use tonic::Request; +// use +use tonic::{transport::Channel, Request, Streaming}; struct StatefulList { state: ListState, @@ -109,10 +115,57 @@ impl App { } } -enum Message { +enum Message<'a> { Quit, - LibraryData(String), - QueueData(String), + // FIXME: Is String OK here? + GetLibraryNode(&'a str), + LibraryNodeReceived(LibraryNode), + QueueStreamUpdate(QueueUpdateResult), + TrackStreamUpdate(GetTrackUpdatesResponse), +} + +async fn orchestrate<'a>( + (tx, rx): (Sender>, Receiver>), +) -> Result<(), Box> { + let mut rpc_client = rpc::RpcClient::connect("http://[::1]:50051").await?; + + if let Some(root_node) = rpc_client.get_library_node("/").await? { + // FIXME: Is it ok to clone here? + tx.send(Message::LibraryNodeReceived(root_node.clone())); + } + + // FIXME: stream failures, do we need to re-establish the stream? + let mut queue_update_stream = rpc_client.get_queue_updates_stream().await?; + let mut track_update_stream = rpc_client.get_track_updates_stream().await?; + + loop { + select! { + Ok(msg) = &mut rx.recv_async() => { + match msg { + // FIXME: How can I make sure I have all match arms implmenented? + // (Some messages are not applicable here) + Message::Quit => { + break Ok(()); + }, + Message::GetLibraryNode(uuid) => { + if let Some(node) = rpc_client.get_library_node(uuid).await? { + // FIXME: Is it ok to clone here? + tx.send(Message::LibraryNodeReceived(node.clone())); + } + } + _ => {}, + } + } + Some(Ok(resp)) = queue_update_stream.next() => { + if let Some(res) = resp.queue_update_result { + tx.send_async(Message::QueueStreamUpdate(res)).await; + } + } + Some(Ok(resp)) = track_update_stream.next() => { + tx.send(Message::TrackStreamUpdate(resp)); + } + } + } } #[tokio::main] @@ -124,41 +177,17 @@ async fn main() -> Result<(), Box> { run_ui(ui_tx, ui_rx); }); - let mut client = CrabidyServiceClient::connect("http://[::1]:50051").await?; + // FIXME: unwrap + tokio::spawn(async move { orchestrate((tx, rx)).await.unwrap() }); - let get_library_node_request = Request::new(GetLibraryNodeRequest { - uuid: "/".to_string(), - }); - - let response = client.get_library_node(get_library_node_request).await?; - - if let Some(node) = response.into_inner().node { - node.children.iter().for_each(|c| { - tx.send(Message::LibraryData(c.to_string())); - }) - } - - let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 }); - - let mut queue_update_stream = client - .get_queue_updates(get_queue_updates_request) - .await? - .into_inner(); - - loop { - while let Some(Ok(resp)) = queue_update_stream.next().await { - if let Some(QueueUpdateResult::PositionChange(pos)) = resp.queue_update_result { - tx.send(Message::QueueData(pos.timestamp.to_string())); - } - } - - match rx.recv() { - Ok(Message::Quit) => { - break; - } - _ => {} - } - } + // loop { + // match rx.recv() { + // Ok(Message::Quit) => { + // break; + // } + // _ => {} + // } + // } Ok(()) } @@ -179,23 +208,12 @@ fn run_ui(tx: Sender, rx: Receiver) { loop { for message in rx.try_iter() { match message { - Message::LibraryData(title) => { + Message::LibraryNodeReceived(node) => { // FIXME: this is obviously bullshit + // FIXME: DO NOT PUSH LIBRARY_NODES ONTO THE UI, IT SHOULD GET ITS OWN TYPE app.library.items.push(LibraryNode { - uuid: title.clone(), - name: title.clone(), - children: Vec::new(), - is_queable: false, - parent: None, - state: LibraryNodeState::Done.into(), - tracks: Vec::new(), - }); - } - Message::QueueData(random_no) => { - // FIXME: this is obviously bullshit - app.queue.items.push(LibraryNode { - uuid: random_no.clone(), - name: random_no.clone(), + uuid: node.uuid, + name: node.name, children: Vec::new(), is_queable: false, parent: None, @@ -203,6 +221,22 @@ fn run_ui(tx: Sender, rx: Receiver) { tracks: Vec::new(), }); } + Message::QueueStreamUpdate(queue_update) => match queue_update { + QueueUpdateResult::Full(queue) => {} + QueueUpdateResult::PositionChange(pos) => { + // FIXME: this is obviously bullshit + // FIXME: DO NOT PUSH LIBRARY_NODES ONTO THE UI, IT SHOULD GET ITS OWN TYPE + app.queue.items.push(LibraryNode { + uuid: pos.timestamp.to_string(), + name: pos.timestamp.to_string(), + children: Vec::new(), + is_queable: false, + parent: None, + state: LibraryNodeState::Done.into(), + tracks: Vec::new(), + }); + } + }, _ => {} } } diff --git a/cbd-tui/src/rpc.rs b/cbd-tui/src/rpc.rs new file mode 100644 index 0000000..1a3e2ac --- /dev/null +++ b/cbd-tui/src/rpc.rs @@ -0,0 +1,104 @@ +use crabidy_core::proto::crabidy::{ + crabidy_service_client::CrabidyServiceClient, get_queue_updates_response::QueueUpdateResult, + GetLibraryNodeRequest, GetQueueUpdatesRequest, GetQueueUpdatesResponse, GetTrackUpdatesRequest, + GetTrackUpdatesResponse, LibraryNode, LibraryNodeState, +}; + +use std::{ + collections::HashMap, + error::Error, + fmt, io, println, thread, + time::{Duration, Instant}, + vec, +}; +use tokio::task; +use tokio_stream::StreamExt; +use tonic::{transport::Channel, Request, Streaming}; + +#[derive(Debug)] +enum RpcClientError { + NotFound, +} + +impl fmt::Display for RpcClientError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RpcClientError::NotFound => write!(f, "Requested item not found"), + } + } +} + +impl Error for RpcClientError {} + +pub struct RpcClient<'a> { + library_node_cache: HashMap<&'a str, LibraryNode>, + client: CrabidyServiceClient, +} + +impl<'a> RpcClient<'a> { + pub async fn connect(addr: &'static str) -> Result, tonic::transport::Error> { + let client = CrabidyServiceClient::connect(addr).await?; + let library_node_cache: HashMap<&str, LibraryNode> = HashMap::new(); + Ok(RpcClient { + client, + library_node_cache, + }) + } + pub async fn get_library_node( + &mut self, + uuid: &'a str, + ) -> Result, Box> { + if self.library_node_cache.contains_key(uuid) { + return Ok(self.library_node_cache.get(uuid)); + } + + let get_library_node_request = Request::new(GetLibraryNodeRequest { + uuid: uuid.to_string(), + }); + + let response = self + .client + .get_library_node(get_library_node_request) + .await?; + + if let Some(library_node) = response.into_inner().node { + self.library_node_cache.insert(uuid, library_node); + // FIXME: is that necessary? + return Ok(self.library_node_cache.get(uuid)); + } + + Err(Box::new(RpcClientError::NotFound)) + } + + pub async fn get_queue_updates_stream( + &mut self, + ) -> Result, Box> { + // FIXME: Adjust request params to what we need + let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 }); + + let stream = self + .client + .get_queue_updates(get_queue_updates_request) + .await? + .into_inner(); + Ok(stream) + } + + pub async fn get_track_updates_stream( + &mut self, + ) -> Result, Box> { + // FIXME: Adjust request params to what we need + let get_queue_updates_request = Request::new(GetTrackUpdatesRequest { + type_whitelist: Vec::new(), + type_blacklist: Vec::new(), + updates_skipped: 0, + }); + + let stream = self + .client + .get_track_updates(get_queue_updates_request) + .await? + .into_inner(); + Ok(stream) + } +}