Reconnect streams after connection failure
This commit is contained in:
parent
af0dba7a25
commit
cce9e89eeb
|
|
@ -25,15 +25,16 @@ use ratatui::{
|
||||||
};
|
};
|
||||||
use rpc::RpcClient;
|
use rpc::RpcClient;
|
||||||
use std::{
|
use std::{
|
||||||
|
cell::RefCell,
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
error::Error,
|
error::Error,
|
||||||
fmt, io, println, thread,
|
fmt, io, println, thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
vec,
|
vec,
|
||||||
};
|
};
|
||||||
use tokio::{select, signal, task};
|
use tokio::{fs, select, signal, task};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tonic::{transport::Channel, Request, Streaming};
|
use tonic::{transport::Channel, Request, Status, Streaming};
|
||||||
|
|
||||||
trait ListView {
|
trait ListView {
|
||||||
fn get_size(&self) -> usize;
|
fn get_size(&self) -> usize;
|
||||||
|
|
@ -360,62 +361,82 @@ enum MessageFromUi {
|
||||||
TogglePlay,
|
TogglePlay,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn poll(
|
||||||
|
rpc_client: &mut RpcClient,
|
||||||
|
rx: &Receiver<MessageFromUi>,
|
||||||
|
tx: &Sender<MessageToUi>,
|
||||||
|
) -> Result<(), Box<dyn Error>> {
|
||||||
|
select! {
|
||||||
|
Ok(msg) = &mut rx.recv_async() => {
|
||||||
|
match msg {
|
||||||
|
MessageFromUi::Quit => {
|
||||||
|
return Ok(());
|
||||||
|
},
|
||||||
|
MessageFromUi::GetLibraryNode(uuid) => {
|
||||||
|
if let Some(node) = rpc_client.get_library_node(&uuid).await? {
|
||||||
|
tx.send(MessageToUi::ReplaceLibraryNode(node.clone()));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
MessageFromUi::ReplaceWithItem(uuid, kind) => {
|
||||||
|
match kind {
|
||||||
|
UiItemKind::Node => {
|
||||||
|
rpc_client.replace_queue_with_node(&uuid).await?
|
||||||
|
}
|
||||||
|
UiItemKind::Track => {
|
||||||
|
rpc_client.replace_queue_with_track(&uuid).await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageFromUi::TogglePlay => {
|
||||||
|
rpc_client.toggle_play().await?
|
||||||
|
}
|
||||||
|
MessageFromUi::SetCurrentTrack(pos) => {
|
||||||
|
rpc_client.set_current_track(pos).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(resp) = rpc_client.queue_updates_stream.next() => {
|
||||||
|
match resp {
|
||||||
|
Ok(resp) => {
|
||||||
|
if let Some(res) = resp.queue_update_result {
|
||||||
|
tx.send_async(MessageToUi::QueueStreamUpdate(res)).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
rpc_client.reconnect_queue_updates_stream().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(resp) = rpc_client.track_updates_stream.next() => {
|
||||||
|
match resp {
|
||||||
|
Ok(resp) => {
|
||||||
|
if let Some(active_track) = resp.active_track {
|
||||||
|
tx.send_async(MessageToUi::TrackStreamUpdate(active_track)).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
rpc_client.reconnect_track_updates_stream().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn orchestrate<'a>(
|
async fn orchestrate<'a>(
|
||||||
(tx, rx): (Sender<MessageToUi>, Receiver<MessageFromUi>),
|
(tx, rx): (Sender<MessageToUi>, Receiver<MessageFromUi>),
|
||||||
) -> Result<(), Box<dyn Error>> {
|
) -> Result<(), Box<dyn Error>> {
|
||||||
let mut rpc_client = rpc::RpcClient::connect("http://192.168.178.32:50051").await?;
|
let mut rpc_client = rpc::RpcClient::connect("http://127.0.0.1:50051").await?;
|
||||||
|
|
||||||
if let Some(root_node) = rpc_client.get_library_node("/").await? {
|
if let Some(root_node) = rpc_client.get_library_node("/").await? {
|
||||||
tx.send(MessageToUi::ReplaceLibraryNode(root_node.clone()));
|
tx.send(MessageToUi::ReplaceLibraryNode(root_node.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: stream failures, do we need to re-establish the stream?
|
|
||||||
let mut queue_update_stream = rpc_client.get_queue_updates_stream().await?;
|
|
||||||
let mut track_update_stream = rpc_client.get_track_updates_stream().await?;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
select! {
|
poll(&mut rpc_client, &rx, &tx).await.ok();
|
||||||
Ok(msg) = &mut rx.recv_async() => {
|
|
||||||
match msg {
|
|
||||||
MessageFromUi::Quit => {
|
|
||||||
break Ok(());
|
|
||||||
},
|
|
||||||
MessageFromUi::GetLibraryNode(uuid) => {
|
|
||||||
if let Some(node) = rpc_client.get_library_node(&uuid).await? {
|
|
||||||
tx.send(MessageToUi::ReplaceLibraryNode(node.clone()));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
MessageFromUi::ReplaceWithItem(uuid, kind) => {
|
|
||||||
match kind {
|
|
||||||
UiItemKind::Node => {
|
|
||||||
rpc_client.replace_queue_with_node(&uuid).await?
|
|
||||||
}
|
|
||||||
UiItemKind::Track => {
|
|
||||||
rpc_client.replace_queue_with_track(&uuid).await?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MessageFromUi::TogglePlay => {
|
|
||||||
rpc_client.toggle_play().await?
|
|
||||||
}
|
|
||||||
MessageFromUi::SetCurrentTrack(pos) => {
|
|
||||||
rpc_client.set_current_track(pos).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(Ok(resp)) = queue_update_stream.next() => {
|
|
||||||
if let Some(res) = resp.queue_update_result {
|
|
||||||
tx.send_async(MessageToUi::QueueStreamUpdate(res)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(Ok(resp)) = track_update_stream.next() => {
|
|
||||||
if let Some(active_track) = resp.active_track {
|
|
||||||
tx.send_async(MessageToUi::TrackStreamUpdate(active_track)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -429,7 +450,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
});
|
});
|
||||||
|
|
||||||
// FIXME: unwrap
|
// FIXME: unwrap
|
||||||
tokio::spawn(async move { orchestrate((tx, rx)).await.unwrap() });
|
tokio::spawn(async move { orchestrate((tx, rx)).await.ok() });
|
||||||
|
|
||||||
signal::ctrl_c().await.unwrap();
|
signal::ctrl_c().await.unwrap();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,13 +9,16 @@ use crabidy_core::proto::crabidy::{
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
error::Error,
|
error::Error,
|
||||||
fmt, io, println, thread,
|
fmt, io, mem, println, thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
vec,
|
vec,
|
||||||
};
|
};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tonic::{transport::Channel, Request, Streaming};
|
use tonic::{
|
||||||
|
transport::{Channel, Endpoint},
|
||||||
|
Request, Streaming,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum RpcClientError {
|
enum RpcClientError {
|
||||||
|
|
@ -35,18 +38,67 @@ impl Error for RpcClientError {}
|
||||||
pub struct RpcClient {
|
pub struct RpcClient {
|
||||||
library_node_cache: HashMap<String, LibraryNode>,
|
library_node_cache: HashMap<String, LibraryNode>,
|
||||||
client: CrabidyServiceClient<Channel>,
|
client: CrabidyServiceClient<Channel>,
|
||||||
|
pub queue_updates_stream: Streaming<GetQueueUpdatesResponse>,
|
||||||
|
pub track_updates_stream: Streaming<GetTrackUpdatesResponse>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcClient {
|
impl RpcClient {
|
||||||
pub async fn connect(addr: &'static str) -> Result<RpcClient, tonic::transport::Error> {
|
pub async fn connect(addr: &'static str) -> Result<RpcClient, Box<dyn Error>> {
|
||||||
let client = CrabidyServiceClient::connect(addr).await?;
|
let endpoint = Endpoint::from_static(addr).connect_lazy();
|
||||||
|
let mut client = CrabidyServiceClient::new(endpoint);
|
||||||
|
|
||||||
|
let queue_updates_stream = Self::get_queue_updates_stream(&mut client).await;
|
||||||
|
let track_updates_stream = Self::get_track_updates_stream(&mut client).await;
|
||||||
let library_node_cache: HashMap<String, LibraryNode> = HashMap::new();
|
let library_node_cache: HashMap<String, LibraryNode> = HashMap::new();
|
||||||
|
|
||||||
Ok(RpcClient {
|
Ok(RpcClient {
|
||||||
client,
|
client,
|
||||||
library_node_cache,
|
library_node_cache,
|
||||||
|
track_updates_stream,
|
||||||
|
queue_updates_stream,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_queue_updates_stream(
|
||||||
|
client: &mut CrabidyServiceClient<Channel>,
|
||||||
|
) -> Streaming<GetQueueUpdatesResponse> {
|
||||||
|
loop {
|
||||||
|
let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 });
|
||||||
|
if let Ok(resp) = client.get_queue_updates(get_queue_updates_request).await {
|
||||||
|
return resp.into_inner();
|
||||||
|
} else {
|
||||||
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_track_updates_stream(
|
||||||
|
client: &mut CrabidyServiceClient<Channel>,
|
||||||
|
) -> Streaming<GetTrackUpdatesResponse> {
|
||||||
|
loop {
|
||||||
|
let get_track_updates_request = Request::new(GetTrackUpdatesRequest {
|
||||||
|
type_whitelist: Vec::new(),
|
||||||
|
type_blacklist: Vec::new(),
|
||||||
|
updates_skipped: 0,
|
||||||
|
});
|
||||||
|
if let Ok(resp) = client.get_track_updates(get_track_updates_request).await {
|
||||||
|
return resp.into_inner();
|
||||||
|
} else {
|
||||||
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reconnect_queue_updates_stream(&mut self) {
|
||||||
|
let queue_updates_stream = Self::get_queue_updates_stream(&mut self.client).await;
|
||||||
|
mem::replace(&mut self.queue_updates_stream, queue_updates_stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reconnect_track_updates_stream(&mut self) {
|
||||||
|
let track_updates_stream = Self::get_track_updates_stream(&mut self.client).await;
|
||||||
|
mem::replace(&mut self.track_updates_stream, track_updates_stream);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_library_node(
|
pub async fn get_library_node(
|
||||||
&mut self,
|
&mut self,
|
||||||
uuid: &str,
|
uuid: &str,
|
||||||
|
|
@ -67,45 +119,12 @@ impl RpcClient {
|
||||||
if let Some(library_node) = response.into_inner().node {
|
if let Some(library_node) = response.into_inner().node {
|
||||||
self.library_node_cache
|
self.library_node_cache
|
||||||
.insert(uuid.to_string(), library_node);
|
.insert(uuid.to_string(), library_node);
|
||||||
// FIXME: is that necessary?
|
|
||||||
return Ok(self.library_node_cache.get(uuid));
|
return Ok(self.library_node_cache.get(uuid));
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(Box::new(RpcClientError::NotFound))
|
Err(Box::new(RpcClientError::NotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_queue_updates_stream(
|
|
||||||
&mut self,
|
|
||||||
) -> Result<Streaming<GetQueueUpdatesResponse>, Box<dyn Error>> {
|
|
||||||
// FIXME: Adjust request params to what we need
|
|
||||||
let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 });
|
|
||||||
|
|
||||||
let stream = self
|
|
||||||
.client
|
|
||||||
.get_queue_updates(get_queue_updates_request)
|
|
||||||
.await?
|
|
||||||
.into_inner();
|
|
||||||
Ok(stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_track_updates_stream(
|
|
||||||
&mut self,
|
|
||||||
) -> Result<Streaming<GetTrackUpdatesResponse>, Box<dyn Error>> {
|
|
||||||
// FIXME: Adjust request params to what we need
|
|
||||||
let get_queue_updates_request = Request::new(GetTrackUpdatesRequest {
|
|
||||||
type_whitelist: Vec::new(),
|
|
||||||
type_blacklist: Vec::new(),
|
|
||||||
updates_skipped: 0,
|
|
||||||
});
|
|
||||||
|
|
||||||
let stream = self
|
|
||||||
.client
|
|
||||||
.get_track_updates(get_queue_updates_request)
|
|
||||||
.await?
|
|
||||||
.into_inner();
|
|
||||||
Ok(stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn replace_queue_with_node(&mut self, uuid: &str) -> Result<(), Box<dyn Error>> {
|
pub async fn replace_queue_with_node(&mut self, uuid: &str) -> Result<(), Box<dyn Error>> {
|
||||||
let replace_with_node_request = Request::new(ReplaceWithNodeRequest {
|
let replace_with_node_request = Request::new(ReplaceWithNodeRequest {
|
||||||
uuid: uuid.to_string(),
|
uuid: uuid.to_string(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue