Add rpc module

This commit is contained in:
chmanie 2023-05-26 10:49:11 +02:00
parent e231de87da
commit 13ff6a741d
2 changed files with 194 additions and 56 deletions

View File

@ -1,6 +1,9 @@
mod rpc;
use crabidy_core::proto::crabidy::{
crabidy_service_client::CrabidyServiceClient, get_queue_updates_response::QueueUpdateResult,
GetLibraryNodeRequest, GetQueueUpdatesRequest, LibraryNode, LibraryNodeState,
GetLibraryNodeRequest, GetQueueUpdatesRequest, GetQueueUpdatesResponse,
GetTrackUpdatesResponse, LibraryNode, LibraryNodeState,
};
use crossterm::{
@ -17,15 +20,18 @@ use ratatui::{
widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap},
Frame, Terminal,
};
use rpc::RpcClient;
use std::{
collections::HashMap,
error::Error,
io, println, thread,
fmt, io, println, thread,
time::{Duration, Instant},
vec,
};
use tokio::task;
use tokio::{select, task};
use tokio_stream::StreamExt;
use tonic::Request;
// use
use tonic::{transport::Channel, Request, Streaming};
struct StatefulList<T> {
state: ListState,
@ -109,10 +115,57 @@ impl App {
}
}
enum Message {
enum Message<'a> {
Quit,
LibraryData(String),
QueueData(String),
// FIXME: Is String OK here?
GetLibraryNode(&'a str),
LibraryNodeReceived(LibraryNode),
QueueStreamUpdate(QueueUpdateResult),
TrackStreamUpdate(GetTrackUpdatesResponse),
}
async fn orchestrate<'a>(
(tx, rx): (Sender<Message<'a>>, Receiver<Message<'a>>),
) -> Result<(), Box<dyn Error>> {
let mut rpc_client = rpc::RpcClient::connect("http://[::1]:50051").await?;
if let Some(root_node) = rpc_client.get_library_node("/").await? {
// FIXME: Is it ok to clone here?
tx.send(Message::LibraryNodeReceived(root_node.clone()));
}
// FIXME: stream failures, do we need to re-establish the stream?
let mut queue_update_stream = rpc_client.get_queue_updates_stream().await?;
let mut track_update_stream = rpc_client.get_track_updates_stream().await?;
loop {
select! {
Ok(msg) = &mut rx.recv_async() => {
match msg {
// FIXME: How can I make sure I have all match arms implmenented?
// (Some messages are not applicable here)
Message::Quit => {
break Ok(());
},
Message::GetLibraryNode(uuid) => {
if let Some(node) = rpc_client.get_library_node(uuid).await? {
// FIXME: Is it ok to clone here?
tx.send(Message::LibraryNodeReceived(node.clone()));
}
}
_ => {},
}
}
Some(Ok(resp)) = queue_update_stream.next() => {
if let Some(res) = resp.queue_update_result {
tx.send_async(Message::QueueStreamUpdate(res)).await;
}
}
Some(Ok(resp)) = track_update_stream.next() => {
tx.send(Message::TrackStreamUpdate(resp));
}
}
}
}
#[tokio::main]
@ -124,41 +177,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
run_ui(ui_tx, ui_rx);
});
let mut client = CrabidyServiceClient::connect("http://[::1]:50051").await?;
// FIXME: unwrap
tokio::spawn(async move { orchestrate((tx, rx)).await.unwrap() });
let get_library_node_request = Request::new(GetLibraryNodeRequest {
uuid: "/".to_string(),
});
let response = client.get_library_node(get_library_node_request).await?;
if let Some(node) = response.into_inner().node {
node.children.iter().for_each(|c| {
tx.send(Message::LibraryData(c.to_string()));
})
}
let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 });
let mut queue_update_stream = client
.get_queue_updates(get_queue_updates_request)
.await?
.into_inner();
loop {
while let Some(Ok(resp)) = queue_update_stream.next().await {
if let Some(QueueUpdateResult::PositionChange(pos)) = resp.queue_update_result {
tx.send(Message::QueueData(pos.timestamp.to_string()));
}
}
match rx.recv() {
Ok(Message::Quit) => {
break;
}
_ => {}
}
}
// loop {
// match rx.recv() {
// Ok(Message::Quit) => {
// break;
// }
// _ => {}
// }
// }
Ok(())
}
@ -179,11 +208,12 @@ fn run_ui(tx: Sender<Message>, rx: Receiver<Message>) {
loop {
for message in rx.try_iter() {
match message {
Message::LibraryData(title) => {
Message::LibraryNodeReceived(node) => {
// FIXME: this is obviously bullshit
// FIXME: DO NOT PUSH LIBRARY_NODES ONTO THE UI, IT SHOULD GET ITS OWN TYPE
app.library.items.push(LibraryNode {
uuid: title.clone(),
name: title.clone(),
uuid: node.uuid,
name: node.name,
children: Vec::new(),
is_queable: false,
parent: None,
@ -191,11 +221,14 @@ fn run_ui(tx: Sender<Message>, rx: Receiver<Message>) {
tracks: Vec::new(),
});
}
Message::QueueData(random_no) => {
Message::QueueStreamUpdate(queue_update) => match queue_update {
QueueUpdateResult::Full(queue) => {}
QueueUpdateResult::PositionChange(pos) => {
// FIXME: this is obviously bullshit
// FIXME: DO NOT PUSH LIBRARY_NODES ONTO THE UI, IT SHOULD GET ITS OWN TYPE
app.queue.items.push(LibraryNode {
uuid: random_no.clone(),
name: random_no.clone(),
uuid: pos.timestamp.to_string(),
name: pos.timestamp.to_string(),
children: Vec::new(),
is_queable: false,
parent: None,
@ -203,6 +236,7 @@ fn run_ui(tx: Sender<Message>, rx: Receiver<Message>) {
tracks: Vec::new(),
});
}
},
_ => {}
}
}

104
cbd-tui/src/rpc.rs Normal file
View File

@ -0,0 +1,104 @@
use crabidy_core::proto::crabidy::{
crabidy_service_client::CrabidyServiceClient, get_queue_updates_response::QueueUpdateResult,
GetLibraryNodeRequest, GetQueueUpdatesRequest, GetQueueUpdatesResponse, GetTrackUpdatesRequest,
GetTrackUpdatesResponse, LibraryNode, LibraryNodeState,
};
use std::{
collections::HashMap,
error::Error,
fmt, io, println, thread,
time::{Duration, Instant},
vec,
};
use tokio::task;
use tokio_stream::StreamExt;
use tonic::{transport::Channel, Request, Streaming};
#[derive(Debug)]
enum RpcClientError {
NotFound,
}
impl fmt::Display for RpcClientError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RpcClientError::NotFound => write!(f, "Requested item not found"),
}
}
}
impl Error for RpcClientError {}
pub struct RpcClient<'a> {
library_node_cache: HashMap<&'a str, LibraryNode>,
client: CrabidyServiceClient<Channel>,
}
impl<'a> RpcClient<'a> {
pub async fn connect(addr: &'static str) -> Result<RpcClient<'a>, tonic::transport::Error> {
let client = CrabidyServiceClient::connect(addr).await?;
let library_node_cache: HashMap<&str, LibraryNode> = HashMap::new();
Ok(RpcClient {
client,
library_node_cache,
})
}
pub async fn get_library_node(
&mut self,
uuid: &'a str,
) -> Result<Option<&LibraryNode>, Box<dyn Error>> {
if self.library_node_cache.contains_key(uuid) {
return Ok(self.library_node_cache.get(uuid));
}
let get_library_node_request = Request::new(GetLibraryNodeRequest {
uuid: uuid.to_string(),
});
let response = self
.client
.get_library_node(get_library_node_request)
.await?;
if let Some(library_node) = response.into_inner().node {
self.library_node_cache.insert(uuid, library_node);
// FIXME: is that necessary?
return Ok(self.library_node_cache.get(uuid));
}
Err(Box::new(RpcClientError::NotFound))
}
pub async fn get_queue_updates_stream(
&mut self,
) -> Result<Streaming<GetQueueUpdatesResponse>, Box<dyn Error>> {
// FIXME: Adjust request params to what we need
let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 });
let stream = self
.client
.get_queue_updates(get_queue_updates_request)
.await?
.into_inner();
Ok(stream)
}
pub async fn get_track_updates_stream(
&mut self,
) -> Result<Streaming<GetTrackUpdatesResponse>, Box<dyn Error>> {
// FIXME: Adjust request params to what we need
let get_queue_updates_request = Request::new(GetTrackUpdatesRequest {
type_whitelist: Vec::new(),
type_blacklist: Vec::new(),
updates_skipped: 0,
});
let stream = self
.client
.get_track_updates(get_queue_updates_request)
.await?
.into_inner();
Ok(stream)
}
}