Adjust tui to new protobuf spec

This commit is contained in:
chmanie 2023-06-02 15:36:08 +02:00
parent 47b788bf9c
commit 8d01cfce73
2 changed files with 74 additions and 140 deletions

View File

@ -1,9 +1,9 @@
mod rpc; mod rpc;
use crabidy_core::proto::crabidy::{ use crabidy_core::proto::crabidy::{
crabidy_service_client::CrabidyServiceClient, get_queue_updates_response::QueueUpdateResult, crabidy_service_client::CrabidyServiceClient,
ActiveTrack, GetLibraryNodeRequest, GetQueueUpdatesRequest, GetQueueUpdatesResponse, get_update_stream_response::Update as StreamUpdate, GetLibraryNodeRequest, LibraryNode,
GetTrackUpdatesResponse, LibraryNode, LibraryNodeState, Queue, Track, TrackPlayState, PlayState, Queue, QueueTrack, Track, TrackPosition,
}; };
use crossterm::{ use crossterm::{
@ -165,7 +165,7 @@ struct UiItem {
struct QueueView { struct QueueView {
// FIXME: implement skip on server, remove current // FIXME: implement skip on server, remove current
current: usize, current_position: usize,
list: Vec<UiItem>, list: Vec<UiItem>,
list_state: ListState, list_state: ListState,
} }
@ -187,8 +187,8 @@ impl ListView for QueueView {
impl QueueView { impl QueueView {
// FIXME: implement skip on server // FIXME: implement skip on server
fn skip(&self, tx: &Sender<MessageFromUi>) { fn skip(&self, tx: &Sender<MessageFromUi>) {
if self.current < self.get_size() - 1 { if self.current_position < self.get_size() - 1 {
tx.send(MessageFromUi::SetCurrentTrack(self.current + 1)); tx.send(MessageFromUi::SetCurrentTrack(self.current_position + 1));
} }
} }
fn play_selected(&self, tx: &Sender<MessageFromUi>) { fn play_selected(&self, tx: &Sender<MessageFromUi>) {
@ -196,9 +196,11 @@ impl QueueView {
tx.send(MessageFromUi::SetCurrentTrack(pos)); tx.send(MessageFromUi::SetCurrentTrack(pos));
} }
} }
fn update_position(&mut self, pos: usize) {
self.current_position = pos;
}
fn update(&mut self, queue: Queue) { fn update(&mut self, queue: Queue) {
// FIXME: rename current to current_pos in proto buf definition self.current_position = queue.current_position as usize;
self.current = queue.current as usize;
self.list = queue self.list = queue
.tracks .tracks
.iter() .iter()
@ -207,7 +209,7 @@ impl QueueView {
uuid: t.uuid.clone(), uuid: t.uuid.clone(),
title: format!("{} - {}", t.artist, t.title), title: format!("{} - {}", t.artist, t.title),
kind: UiItemKind::Track, kind: UiItemKind::Track,
active: i == queue.current as usize, active: i == queue.current_position as usize,
}) })
.collect(); .collect();
@ -265,7 +267,7 @@ impl LibraryView {
} }
fn queue_replace_with_item(&mut self, tx: &Sender<MessageFromUi>) { fn queue_replace_with_item(&mut self, tx: &Sender<MessageFromUi>) {
if let Some(item) = self.get_selected() { if let Some(item) = self.get_selected() {
tx.send(MessageFromUi::ReplaceWithItem(item.uuid.clone(), item.kind)); tx.send(MessageFromUi::ReplaceQueue(item.uuid.clone()));
} }
} }
fn prev_selected(&self) -> usize { fn prev_selected(&self) -> usize {
@ -314,36 +316,25 @@ impl LibraryView {
struct NowPlayingView { struct NowPlayingView {
completion: Option<u32>, completion: Option<u32>,
elapsed: Option<f64>, elapsed: Option<f64>,
playing_state: TrackPlayState, play_state: PlayState,
track: Option<Track>, track: Option<Track>,
} }
impl NowPlayingView { impl NowPlayingView {
fn update(&mut self, active_track: ActiveTrack) { fn update_play_state(&mut self, play_state: PlayState) {
self.playing_state = active_track.play_state(); self.play_state = play_state;
if let Some(next_track) = active_track.track {
if let Some(duration) = next_track.duration {
self.completion = Some(active_track.completion);
self.elapsed = Some(active_track.completion as f64 / duration as f64);
} }
fn update_position(&mut self, pos: TrackPosition) {}
let changed = if let Some(current_track) = &self.track { fn update_track(&mut self, active: Option<Track>) {
current_track.uuid != next_track.uuid if let Some(track) = &active {
} else {
true
};
if changed {
Notification::new() Notification::new()
.summary("Crabidy playing") .summary("Crabidy playing")
.body(&format!("{} by {}", next_track.title, next_track.artist)) // FIXME: album
.body(&format!("{} by {}", track.title, track.artist))
.show() .show()
.unwrap(); .unwrap();
self.track = Some(next_track);
}
} }
self.track = active;
} }
} }
@ -365,14 +356,14 @@ impl App {
parent: None, parent: None,
}; };
let queue = QueueView { let queue = QueueView {
current: 0, current_position: 0,
list: Vec::new(), list: Vec::new(),
list_state: ListState::default(), list_state: ListState::default(),
}; };
let now_playing = NowPlayingView { let now_playing = NowPlayingView {
completion: None, completion: None,
elapsed: None, elapsed: None,
playing_state: TrackPlayState::Unspecified, play_state: PlayState::Unspecified,
track: None, track: None,
}; };
App { App {
@ -395,14 +386,13 @@ impl App {
// FIXME: Rename this // FIXME: Rename this
enum MessageToUi { enum MessageToUi {
ReplaceLibraryNode(LibraryNode), ReplaceLibraryNode(LibraryNode),
QueueStreamUpdate(QueueUpdateResult), Update(StreamUpdate),
TrackStreamUpdate(ActiveTrack),
} }
// FIXME: Rename this // FIXME: Rename this
enum MessageFromUi { enum MessageFromUi {
GetLibraryNode(String), GetLibraryNode(String),
ReplaceWithItem(String, UiItemKind), ReplaceQueue(String),
SetCurrentTrack(usize), SetCurrentTrack(usize),
TogglePlay, TogglePlay,
} }
@ -420,15 +410,8 @@ async fn poll(
tx.send(MessageToUi::ReplaceLibraryNode(node.clone())); tx.send(MessageToUi::ReplaceLibraryNode(node.clone()));
} }
}, },
MessageFromUi::ReplaceWithItem(uuid, kind) => { MessageFromUi::ReplaceQueue(uuid) => {
match kind { rpc_client.replace_queue(&uuid).await?
UiItemKind::Node => {
rpc_client.replace_queue_with_node(&uuid).await?
}
UiItemKind::Track => {
rpc_client.replace_queue_with_track(&uuid).await?
}
}
} }
MessageFromUi::TogglePlay => { MessageFromUi::TogglePlay => {
rpc_client.toggle_play().await? rpc_client.toggle_play().await?
@ -439,31 +422,19 @@ async fn poll(
} }
} }
Some(resp) = rpc_client.queue_updates_stream.next() => { Some(resp) = rpc_client.update_stream.next() => {
match resp { match resp {
Ok(resp) => { Ok(resp) => {
if let Some(res) = resp.queue_update_result { if let Some(update) = resp.update {
tx.send_async(MessageToUi::QueueStreamUpdate(res)).await?; tx.send_async(MessageToUi::Update(update)).await?;
} }
} }
Err(_) => { Err(_) => {
rpc_client.reconnect_queue_updates_stream().await; rpc_client.reconnect_update_stream().await;
} }
} }
} }
Some(resp) = rpc_client.track_updates_stream.next() => {
match resp {
Ok(resp) => {
if let Some(active_track) = resp.active_track {
tx.send_async(MessageToUi::TrackStreamUpdate(active_track)).await?;
}
}
Err(_) => {
rpc_client.reconnect_track_updates_stream().await;
}
}
}
} }
Ok(()) Ok(())
@ -518,15 +489,22 @@ fn run_ui(tx: Sender<MessageFromUi>, rx: Receiver<MessageToUi>) {
MessageToUi::ReplaceLibraryNode(node) => { MessageToUi::ReplaceLibraryNode(node) => {
app.library.update(node); app.library.update(node);
} }
MessageToUi::QueueStreamUpdate(queue_update) => match queue_update { MessageToUi::Update(update) => match update {
QueueUpdateResult::Full(queue) => { StreamUpdate::Queue(queue) => {
app.queue.update(queue); app.queue.update(queue);
} }
QueueUpdateResult::PositionChange(pos) => {} StreamUpdate::QueueTrack(track) => {
}, app.now_playing.update_track(track.track);
MessageToUi::TrackStreamUpdate(active_track) => { app.queue.update_position(track.queue_position as usize);
app.now_playing.update(active_track);
} }
StreamUpdate::Position(pos) => app.now_playing.update_position(pos),
StreamUpdate::PlayState(play_state) => {
if let Some(ps) = PlayState::from_i32(play_state) {
app.now_playing.update_play_state(ps);
}
}
_ => {}
},
} }
} }
@ -713,10 +691,10 @@ fn ui<B: Backend>(f: &mut Frame<B>, app: &mut App) {
.split(right_side[1]); .split(right_side[1]);
let media_info_text = if let Some(track) = &app.now_playing.track { let media_info_text = if let Some(track) = &app.now_playing.track {
let play_text = match &app.now_playing.playing_state { let play_text = match &app.now_playing.play_state {
TrackPlayState::Loading => "", PlayState::Loading => "",
TrackPlayState::Paused => "", PlayState::Paused => "",
TrackPlayState::Playing => "", PlayState::Playing => "",
_ => "", _ => "",
}; };
vec![ vec![

View File

@ -1,9 +1,8 @@
use crabidy_core::proto::crabidy::{ use crabidy_core::proto::crabidy::{
crabidy_service_client::CrabidyServiceClient, get_queue_updates_response::QueueUpdateResult, crabidy_service_client::CrabidyServiceClient, get_update_stream_response::Update,
GetLibraryNodeRequest, GetQueueUpdatesRequest, GetQueueUpdatesResponse, GetTrackUpdatesRequest, GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse,
GetTrackUpdatesResponse, LibraryNode, LibraryNodeState, ReplaceWithNodeRequest, LibraryNode, ReplaceRequest, SetCurrentRequest, SetCurrentResponse, TogglePlayRequest,
ReplaceWithNodeResponse, ReplaceWithTrackRequest, ReplaceWithTrackResponse, TogglePlayResponse,
SetCurrentTrackRequest, SetCurrentTrackResponse, TogglePlayRequest,
}; };
use std::{ use std::{
@ -38,8 +37,7 @@ impl Error for RpcClientError {}
pub struct RpcClient { pub struct RpcClient {
library_node_cache: HashMap<String, LibraryNode>, library_node_cache: HashMap<String, LibraryNode>,
client: CrabidyServiceClient<Channel>, client: CrabidyServiceClient<Channel>,
pub queue_updates_stream: Streaming<GetQueueUpdatesResponse>, pub update_stream: Streaming<GetUpdateStreamResponse>,
pub track_updates_stream: Streaming<GetTrackUpdatesResponse>,
} }
impl RpcClient { impl RpcClient {
@ -47,24 +45,22 @@ impl RpcClient {
let endpoint = Endpoint::from_static(addr).connect_lazy(); let endpoint = Endpoint::from_static(addr).connect_lazy();
let mut client = CrabidyServiceClient::new(endpoint); let mut client = CrabidyServiceClient::new(endpoint);
let queue_updates_stream = Self::get_queue_updates_stream(&mut client).await; let update_stream = Self::get_update_stream(&mut client).await;
let track_updates_stream = Self::get_track_updates_stream(&mut client).await;
let library_node_cache: HashMap<String, LibraryNode> = HashMap::new(); let library_node_cache: HashMap<String, LibraryNode> = HashMap::new();
Ok(RpcClient { Ok(RpcClient {
client, client,
library_node_cache, library_node_cache,
track_updates_stream, update_stream,
queue_updates_stream,
}) })
} }
async fn get_queue_updates_stream( async fn get_update_stream(
client: &mut CrabidyServiceClient<Channel>, client: &mut CrabidyServiceClient<Channel>,
) -> Streaming<GetQueueUpdatesResponse> { ) -> Streaming<GetUpdateStreamResponse> {
loop { loop {
let get_queue_updates_request = Request::new(GetQueueUpdatesRequest { timestamp: 0 }); let get_update_stream_request = Request::new(GetUpdateStreamRequest {});
if let Ok(resp) = client.get_queue_updates(get_queue_updates_request).await { if let Ok(resp) = client.get_update_stream(get_update_stream_request).await {
return resp.into_inner(); return resp.into_inner();
} else { } else {
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
@ -72,31 +68,10 @@ impl RpcClient {
} }
} }
async fn get_track_updates_stream( pub async fn reconnect_update_stream(&mut self) {
client: &mut CrabidyServiceClient<Channel>, let update_stream = Self::get_update_stream(&mut self.client).await;
) -> Streaming<GetTrackUpdatesResponse> { // FIXME: apparently mem::replace doesn't do anything here
loop { mem::replace(&mut self.update_stream, update_stream);
let get_track_updates_request = Request::new(GetTrackUpdatesRequest {
type_whitelist: Vec::new(),
type_blacklist: Vec::new(),
updates_skipped: 0,
});
if let Ok(resp) = client.get_track_updates(get_track_updates_request).await {
return resp.into_inner();
} else {
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
pub async fn reconnect_queue_updates_stream(&mut self) {
let queue_updates_stream = Self::get_queue_updates_stream(&mut self.client).await;
mem::replace(&mut self.queue_updates_stream, queue_updates_stream);
}
pub async fn reconnect_track_updates_stream(&mut self) {
let track_updates_stream = Self::get_track_updates_stream(&mut self.client).await;
mem::replace(&mut self.track_updates_stream, track_updates_stream);
} }
pub async fn get_library_node( pub async fn get_library_node(
@ -125,28 +100,12 @@ impl RpcClient {
Err(Box::new(RpcClientError::NotFound)) Err(Box::new(RpcClientError::NotFound))
} }
pub async fn replace_queue_with_node(&mut self, uuid: &str) -> Result<(), Box<dyn Error>> { pub async fn replace_queue(&mut self, uuid: &str) -> Result<(), Box<dyn Error>> {
let replace_with_node_request = Request::new(ReplaceWithNodeRequest { let replace_request = Request::new(ReplaceRequest {
uuid: uuid.to_string(), uuid: vec![uuid.to_string()],
}); });
let response = self let response = self.client.replace(replace_request).await?;
.client
.replace_with_node(replace_with_node_request)
.await?;
Ok(())
}
pub async fn replace_queue_with_track(&mut self, uuid: &str) -> Result<(), Box<dyn Error>> {
let replace_with_track_request = Request::new(ReplaceWithTrackRequest {
uuid: uuid.to_string(),
});
let response = self
.client
.replace_with_track(replace_with_track_request)
.await?;
Ok(()) Ok(())
} }
@ -160,14 +119,11 @@ impl RpcClient {
} }
pub async fn set_current_track(&mut self, pos: usize) -> Result<(), Box<dyn Error>> { pub async fn set_current_track(&mut self, pos: usize) -> Result<(), Box<dyn Error>> {
let set_current_track_request = Request::new(SetCurrentTrackRequest { let set_current_request = Request::new(SetCurrentRequest {
position: pos as u32, position: pos as u32,
}); });
let response = self let response = self.client.set_current(set_current_request).await?;
.client
.set_current_track(set_current_track_request)
.await?;
Ok(()) Ok(())
} }