Adjust server to new proto spec

This commit is contained in:
Hans Mündelein 2023-06-02 18:41:19 +02:00
parent 1e7203a9f5
commit 5a42ddfbdb
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
6 changed files with 421 additions and 350 deletions

View File

@ -349,7 +349,7 @@ impl App {
fn new() -> App {
let mut library = LibraryView {
title: "Library".to_string(),
uuid: "/".to_string(),
uuid: "node:/".to_string(),
list: Vec::new(),
list_state: ListState::default(),
positions: HashMap::new(),
@ -443,9 +443,9 @@ async fn poll(
async fn orchestrate<'a>(
(tx, rx): (Sender<MessageToUi>, Receiver<MessageFromUi>),
) -> Result<(), Box<dyn Error>> {
let mut rpc_client = rpc::RpcClient::connect("http://127.0.0.1:50051").await?;
let mut rpc_client = rpc::RpcClient::connect("http://localhost:50051").await?;
if let Some(root_node) = rpc_client.get_library_node("/").await? {
if let Some(root_node) = rpc_client.get_library_node("node:/").await? {
tx.send(MessageToUi::ReplaceLibraryNode(root_node.clone()));
}

View File

@ -65,12 +65,13 @@ message AppendRequest {
message AppendResponse {}
message RemoveRequest {
repeated string uuid = 1;
repeated uint32 positions = 1;
}
message RemoveResponse {}
message InsertRequest {
repeated string uuid = 1;
uint32 position = 1;
repeated string uuid = 2;
}
message InsertResponse {}
@ -105,7 +106,7 @@ message StopRequest {}
message StopResponse {}
message ChangeVolumeRequest {
int32 delta = 1;
float delta = 1;
}
message ChangeVolumeResponse {}

View File

@ -34,7 +34,7 @@ impl std::fmt::Display for ProviderError {
impl LibraryNode {
pub fn new() -> Self {
Self {
uuid: "/".to_string(),
uuid: "node:/".to_string(),
title: "/".to_string(),
children: Vec::new(),
parent: None,
@ -105,4 +105,12 @@ impl Queue {
}
}
}
pub fn insert_tracks(&mut self, position: u32, tracks: &[Track]) {
let tail: Vec<Track> = self
.tracks
.splice((position as usize).., tracks.to_vec())
.collect();
self.tracks.extend(tail);
}
}

View File

@ -1,21 +1,20 @@
use async_trait::async_trait;
use crabidy_core::proto::crabidy::{
crabidy_service_server::{CrabidyService, CrabidyServiceServer},
get_queue_updates_response::QueueUpdateResult,
ActiveTrack, AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse,
GetActiveTrackRequest, GetActiveTrackResponse, GetLibraryNodeRequest, GetLibraryNodeResponse,
GetQueueRequest, GetQueueResponse, GetQueueUpdatesRequest, GetQueueUpdatesResponse,
GetTrackRequest, GetTrackResponse, GetTrackUpdatesRequest, GetTrackUpdatesResponse,
LibraryNode, LibraryNodeChild, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse,
QueuePositionChange, QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest,
RemoveTracksResponse, ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest,
ReplaceWithTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentTrackRequest,
SetCurrentTrackResponse, StopRequest, StopResponse, TogglePlayRequest, TogglePlayResponse,
Track,
get_update_stream_response::Update as StreamUpdate,
AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse,
GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse,
InitRequest, InitResponse, InsertRequest, InsertResponse, LibraryNode, LibraryNodeChild,
NextRequest, NextResponse, PlayState, PrevRequest, PrevResponse, Queue, QueueRequest,
QueueResponse, QueueTrack, RemoveRequest, RemoveResponse, ReplaceRequest, ReplaceResponse,
RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, SaveQueueResponse,
SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, ToggleMuteRequest,
ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, ToggleShuffleRequest,
ToggleShuffleResponse, Track,
};
use crabidy_core::{ProviderClient, ProviderError};
use futures::TryStreamExt;
use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer};
use gstreamer_play::{Play, PlayMessage, PlayState as GstPlaystate, PlayVideoRenderer};
use tokio_stream::StreamExt;
use std::{
@ -49,15 +48,10 @@ fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender<PlaybackMessage>) {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
gstreamer::init()?;
let (queue_update_tx, _) = tokio::sync::broadcast::channel(100);
let (active_track_tx, _) = tokio::sync::broadcast::channel(1000);
let (update_tx, _) = tokio::sync::broadcast::channel(2048);
let orchestrator = ProviderOrchestrator::init("").await.unwrap();
let playback = Playback::new(
active_track_tx.clone(),
queue_update_tx.clone(),
orchestrator.provider_tx.clone(),
);
let playback = Playback::new(update_tx.clone(), orchestrator.provider_tx.clone());
let bus = playback.play.message_bus();
let playback_tx = playback.playback_tx.clone();
@ -67,8 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
let crabidy_service = RpcService::new(
queue_update_tx,
active_track_tx,
update_tx,
playback.playback_tx.clone(),
orchestrator.provider_tx.clone(),
);
@ -159,7 +152,7 @@ impl ProviderClient for ProviderOrchestrator {
let state = Mutex::new(PlayState::Stopped);
let queue = Mutex::new(Queue {
timestamp: 0,
current: 0,
current_position: 0,
tracks: Vec::new(),
});
let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned());
@ -188,15 +181,17 @@ impl ProviderClient for ProviderOrchestrator {
}
fn get_lib_root(&self) -> LibraryNode {
let mut root_node = LibraryNode::new();
let child = LibraryNodeChild::new("tidal".to_owned(), "tidal".to_owned());
let child = LibraryNodeChild::new("node:tidal".to_owned(), "tidal".to_owned());
root_node.children.push(child);
println!("Global root node {:?}", root_node);
root_node
}
async fn get_lib_node(&self, uuid: &str) -> Result<LibraryNode, ProviderError> {
if uuid == "/" {
println!("get_lib_node {}", uuid);
if uuid == "node:/" {
return Ok(self.get_lib_root());
}
if uuid == "tidal" {
if uuid == "node:tidal" {
return Ok(self.tidal_client.get_lib_root());
}
self.tidal_client.get_lib_node(uuid).await
@ -205,76 +200,73 @@ impl ProviderClient for ProviderOrchestrator {
#[derive(Debug)]
enum PlaybackMessage {
ReplaceWithTrack {
uuid: String,
Replace {
uuids: Vec<String>,
},
ReplaceWithNode {
uuid: String,
Queue {
uuids: Vec<String>,
},
QueueTrack {
uuid: String,
Append {
uuids: Vec<String>,
},
QueueNode {
uuid: String,
},
ClearQueue,
GetQueue {
result_tx: flume::Sender<Queue>,
},
AppendTrack {
uuid: String,
},
AppendNode {
uuid: String,
},
RemoveTracks {
Remove {
positions: Vec<u32>,
},
Insert {
position: u32,
uuids: Vec<String>,
},
SetCurrent {
position: u32,
},
GetCurrent {
result_tx: flume::Sender<ActiveTrack>,
GetQueue {
result_tx: flume::Sender<Queue>,
},
Next,
PlayPause,
GetQueueTrack {
result_tx: flume::Sender<QueueTrack>,
},
TogglePlay,
ToggleShuffle,
Stop,
ChangeVolume {
delta: f32,
},
ToggleMute,
Next,
Prev,
StateChanged {
state: PlayState,
state: GstPlaystate,
},
}
#[derive(Debug)]
struct Playback {
active_track_tx: tokio::sync::broadcast::Sender<ActiveTrack>,
queue_update_tx: tokio::sync::broadcast::Sender<Queue>,
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
provider_tx: flume::Sender<ProviderMessage>,
playback_tx: flume::Sender<PlaybackMessage>,
playback_rx: flume::Receiver<PlaybackMessage>,
queue: Mutex<Queue>,
state: Mutex<PlayState>,
state: Mutex<GstPlaystate>,
play: Play,
creation: std::time::Instant,
}
impl Playback {
fn new(
active_track_tx: tokio::sync::broadcast::Sender<ActiveTrack>,
queue_update_tx: tokio::sync::broadcast::Sender<Queue>,
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
provider_tx: flume::Sender<ProviderMessage>,
) -> Self {
let (playback_tx, playback_rx) = flume::bounded(10);
let queue = Mutex::new(Queue {
timestamp: 0,
current: 0,
current_position: 0,
tracks: Vec::new(),
});
let state = Mutex::new(PlayState::Stopped);
let state = Mutex::new(GstPlaystate::Stopped);
let play = Play::new(None::<PlayVideoRenderer>);
let creation = std::time::Instant::now();
Self {
active_track_tx,
queue_update_tx,
update_tx,
provider_tx,
playback_tx,
playback_rx,
@ -288,128 +280,210 @@ impl Playback {
tokio::spawn(async move {
while let Ok(message) = self.playback_rx.recv_async().await {
match message {
PlaybackMessage::ReplaceWithTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await {
{
let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&[track.clone()]);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
PlaybackMessage::Replace { uuids } => {
println!("Replace {:?}", uuids);
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
println!("Track {}", uuid);
if let Ok(track) = self.get_track(&uuid).await {
all_tracks.push(track);
}
} else {
println!("Node {}", uuid);
let tracks = self.flatten_node(&uuid).await;
all_tracks.extend(tracks);
}
}
let current = {
let mut queue = self.queue.lock().unwrap();
queue.set_current_position(0);
queue.replace_with_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
}
queue.current()
};
if let Some(track) = current {
self.play(track).await;
}
}
PlaybackMessage::ReplaceWithNode { uuid } => {
let tracks = self.flatten_node(&uuid).await;
{
let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
}
if !tracks.is_empty() {
self.play(tracks[0].clone()).await;
PlaybackMessage::Queue { uuids } => {
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&[track]);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
}
} else {
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
}
}
}
PlaybackMessage::QueueTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&[track]);
let queue_update_tx = self.queue_update_tx.clone();
// queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
PlaybackMessage::Append { uuids } => {
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&[track]);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
}
} else {
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
}
}
}
PlaybackMessage::QueueNode { uuid } => {
let tracks = self.flatten_node(&uuid).await;
//TODO handle deletion of current track
PlaybackMessage::Remove { positions } => {
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone();
if let Err(err) = queue_update_tx.send(queue.clone()) {
queue.remove_tracks(&positions);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
}
PlaybackMessage::Insert { position, uuids } => {
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).await {
all_tracks.push(track);
}
} else {
let tracks = self.flatten_node(&uuid).await;
all_tracks.extend(tracks);
}
}
let mut queue = self.queue.lock().unwrap();
queue.insert_tracks(position, &all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
}
PlaybackMessage::SetCurrent {
position: queue_position,
} => {
let track = {
let mut queue = self.queue.lock().unwrap();
queue.set_current_position(queue_position);
queue.current()
};
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position,
track: track.clone(),
});
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
// queue_update_tx.send(queue.clone()).unwrap();
if let Some(track) = track {
self.play(track).await;
}
}
PlaybackMessage::GetQueue { result_tx } => {
let queue = self.queue.lock().unwrap();
result_tx.send(queue.clone()).unwrap();
}
PlaybackMessage::AppendTrack { uuid } => {
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&[track]);
let queue_update_tx = self.queue_update_tx.clone();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
// queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
PlaybackMessage::GetQueueTrack { result_tx } => {
let current = self.get_queue_track().await;
result_tx.send(current).unwrap();
}
PlaybackMessage::TogglePlay => {
let mut state = self.state.lock().unwrap();
if *state == GstPlaystate::Playing {
self.play.pause();
} else {
self.play.play();
}
}
PlaybackMessage::AppendNode { uuid } => {
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&tracks);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
PlaybackMessage::Stop => {
self.play.stop();
}
PlaybackMessage::ClearQueue => {
let mut queue = self.queue.lock().unwrap();
queue.replace_with_tracks(&vec![]);
self.stop_track();
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
PlaybackMessage::ChangeVolume { delta } => {
let volume = self.play.volume();
self.play.set_volume(volume + delta as f64);
}
//TODO handle deletion of current track
PlaybackMessage::RemoveTracks { positions } => {
let mut queue = self.queue.lock().unwrap();
queue.remove_tracks(&positions);
let queue_update_tx = self.queue_update_tx.clone();
queue_update_tx.send(queue.clone()).unwrap();
PlaybackMessage::ToggleMute => {
let muted = self.play.is_muted();
self.play.set_mute(!muted);
}
PlaybackMessage::SetCurrent { position } => {
let result = {
PlaybackMessage::ToggleShuffle => {
todo!()
}
PlaybackMessage::Next => {
let (result, stop, pos) = {
let mut queue = self.queue.lock().unwrap();
queue.set_current(position);
let queue_update_tx = self.queue_update_tx.clone();
// queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
queue.current()
let position = queue.current_position + 1;
let stop = !queue.set_current_position(position);
(queue.current(), stop, position)
};
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: pos,
track: result.clone(),
});
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
if let Some(track) = result {
self.play(track).await;
}
if stop {
self.stop_track()
}
}
PlaybackMessage::GetCurrent { result_tx } => {
let current = self.get_active_track().await;
result_tx.send(current).unwrap();
}
PlaybackMessage::Next => {
let (result, stop) = {
PlaybackMessage::Prev => {
let (result, stop, pos) = {
let mut queue = self.queue.lock().unwrap();
let position = queue.current + 1;
let stop = !queue.set_current(position);
let queue_update_tx = self.queue_update_tx.clone();
// queue_update_tx.send(queue.clone()).unwrap();
if let Err(err) = queue_update_tx.send(queue.clone()) {
println!("{:?}", err)
};
(queue.current(), stop)
let position = queue.current_position - 1;
let stop = !queue.set_current_position(position);
(queue.current(), stop, position)
};
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: pos,
track: result.clone(),
});
let queue_update_tx = self.update_tx.clone();
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
if let Some(track) = result {
@ -420,26 +494,19 @@ impl Playback {
}
}
PlaybackMessage::PlayPause => {
let mut state = self.state.lock().unwrap();
if *state == PlayState::Playing {
self.play.pause();
// *state = PlayState::Paused
} else {
self.play.play();
// *state = PlayState::Playing
}
}
PlaybackMessage::Stop => {
self.play.stop();
// *self.state.lock().unwrap() = PlayState::Stopped;
}
PlaybackMessage::StateChanged { state } => {
*self.state.lock().unwrap() = state.clone();
let active_track_tx = self.active_track_tx.clone();
let active_track = self.get_active_track().await;
// active_track_tx.send(active_track).unwrap();
if let Err(err) = active_track_tx.send(active_track) {
let active_track_tx = self.update_tx.clone();
let active_track = self.get_queue_track().await;
let play_state = match state {
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
GstPlaystate::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading,
_ => PlayState::Unspecified,
};
let update = StreamUpdate::PlayState(play_state as i32);
if let Err(err) = active_track_tx.send(update) {
println!("{:?}", err)
};
}
@ -491,26 +558,27 @@ impl Playback {
.await
.map_err(|_| ProviderError::InternalError)?
}
async fn get_active_track(&self) -> ActiveTrack {
async fn get_queue_track(&self) -> QueueTrack {
let queue_position: u32;
let result = {
let mut queue = self.queue.lock().unwrap();
queue_position = queue.current_position;
queue.current()
};
let completion = 0;
let gst_play_state = self.state.lock().unwrap();
let play_state = match *gst_play_state {
PlayState::Stopped => crabidy_core::proto::crabidy::TrackPlayState::Stopped,
PlayState::Buffering => crabidy_core::proto::crabidy::TrackPlayState::Loading,
PlayState::Playing => crabidy_core::proto::crabidy::TrackPlayState::Playing,
PlayState::Paused => crabidy_core::proto::crabidy::TrackPlayState::Paused,
_ => crabidy_core::proto::crabidy::TrackPlayState::Unspecified,
GstPlaystate::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading,
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
_ => PlayState::Unspecified,
};
let play_state = play_state as i32;
ActiveTrack {
QueueTrack {
queue_position,
track: result,
completion,
play_state,
}
}
@ -522,7 +590,7 @@ impl Playback {
};
{
let mut state_guard = self.state.lock().unwrap();
*state_guard = PlayState::Playing;
*state_guard = GstPlaystate::Playing;
}
self.play.stop();
self.play.set_uri(Some(&urls[0]));
@ -532,18 +600,18 @@ impl Playback {
fn stop_track(&self) {
{
let mut state_guard = self.state.lock().unwrap();
*state_guard = PlayState::Stopped;
*state_guard = GstPlaystate::Stopped;
}
self.play.stop();
}
fn playpause(&self) {
let mut state_guard = self.state.lock().unwrap();
if *state_guard == PlayState::Playing {
*state_guard = PlayState::Paused;
if *state_guard == GstPlaystate::Playing {
*state_guard = GstPlaystate::Paused;
self.play.pause();
} else {
*state_guard = PlayState::Playing;
*state_guard = GstPlaystate::Playing;
self.play.play()
}
}
@ -551,22 +619,19 @@ impl Playback {
#[derive(Debug)]
struct RpcService {
queue_update_tx: tokio::sync::broadcast::Sender<Queue>,
active_track_tx: tokio::sync::broadcast::Sender<ActiveTrack>,
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>,
}
impl RpcService {
fn new(
queue_update_rx: tokio::sync::broadcast::Sender<Queue>,
active_track_rx: tokio::sync::broadcast::Sender<ActiveTrack>,
update_rx: tokio::sync::broadcast::Sender<StreamUpdate>,
playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>,
) -> Self {
Self {
queue_update_tx: queue_update_rx,
active_track_tx: active_track_rx,
update_tx: update_rx,
playback_tx,
provider_tx,
}
@ -575,11 +640,12 @@ impl RpcService {
#[tonic::async_trait]
impl CrabidyService for RpcService {
type GetQueueUpdatesStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<GetQueueUpdatesResponse, Status>> + Send>>;
type GetUpdateStreamStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<GetUpdateStreamResponse, Status>> + Send>>;
type GetTrackUpdatesStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<GetTrackUpdatesResponse, Status>> + Send>>;
async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitResponse>, Status> {
todo!()
}
async fn get_library_node(
&self,
@ -604,147 +670,93 @@ impl CrabidyService for RpcService {
Err(err) => Err(Status::internal(err.to_string())),
}
}
async fn get_track(
&self,
request: Request<GetTrackRequest>,
) -> Result<Response<GetTrackResponse>, Status> {
let provider_tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
provider_tx
.send_async(ProviderMessage::GetTrack {
uuid: request.into_inner().uuid,
result_tx,
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let result = result_rx
.recv_async()
.await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
match result {
Ok(track) => Ok(Response::new(GetTrackResponse { track: Some(track) })),
Err(err) => Err(Status::internal(err.to_string())),
}
}
async fn queue_track(
async fn queue(
&self,
request: tonic::Request<QueueTrackRequest>,
) -> std::result::Result<tonic::Response<QueueTrackResponse>, tonic::Status> {
request: tonic::Request<QueueRequest>,
) -> std::result::Result<tonic::Response<QueueResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::QueueTrack {
uuid: req.uuid.clone(),
.send_async(PlaybackMessage::Queue {
uuids: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = QueueTrackResponse {};
let reply = QueueResponse {};
Ok(Response::new(reply))
}
async fn queue_library_node(
async fn replace(
&self,
request: tonic::Request<QueueLibraryNodeRequest>,
) -> std::result::Result<tonic::Response<QueueLibraryNodeResponse>, tonic::Status> {
request: tonic::Request<ReplaceRequest>,
) -> std::result::Result<tonic::Response<ReplaceResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::QueueNode {
uuid: req.uuid.clone(),
.send_async(PlaybackMessage::Replace {
uuids: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = QueueLibraryNodeResponse {};
let reply = ReplaceResponse {};
Ok(Response::new(reply))
}
async fn replace_with_track(
async fn append(
&self,
request: tonic::Request<ReplaceWithTrackRequest>,
) -> std::result::Result<tonic::Response<ReplaceWithTrackResponse>, tonic::Status> {
request: tonic::Request<AppendRequest>,
) -> std::result::Result<tonic::Response<AppendResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::ReplaceWithTrack {
uuid: req.uuid.clone(),
.send_async(PlaybackMessage::Append {
uuids: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = ReplaceWithTrackResponse {};
let reply = AppendResponse {};
Ok(Response::new(reply))
}
async fn replace_with_node(
async fn remove(
&self,
request: tonic::Request<ReplaceWithNodeRequest>,
) -> std::result::Result<tonic::Response<ReplaceWithNodeResponse>, tonic::Status> {
request: tonic::Request<RemoveRequest>,
) -> std::result::Result<tonic::Response<RemoveResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::ReplaceWithNode {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = ReplaceWithNodeResponse {};
Ok(Response::new(reply))
}
async fn append_track(
&self,
request: tonic::Request<AppendTrackRequest>,
) -> std::result::Result<tonic::Response<AppendTrackResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::AppendTrack {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = AppendTrackResponse {};
Ok(Response::new(reply))
}
async fn append_node(
&self,
request: tonic::Request<AppendNodeRequest>,
) -> std::result::Result<tonic::Response<AppendNodeResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::AppendNode {
uuid: req.uuid.clone(),
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = AppendNodeResponse {};
Ok(Response::new(reply))
}
async fn remove_tracks(
&self,
request: tonic::Request<RemoveTracksRequest>,
) -> std::result::Result<tonic::Response<RemoveTracksResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::RemoveTracks {
.send_async(PlaybackMessage::Remove {
positions: req.positions,
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = RemoveTracksResponse {};
let reply = RemoveResponse {};
Ok(Response::new(reply))
}
async fn set_current_track(
async fn insert(
&self,
request: tonic::Request<SetCurrentTrackRequest>,
) -> std::result::Result<tonic::Response<SetCurrentTrackResponse>, tonic::Status> {
request: tonic::Request<InsertRequest>,
) -> std::result::Result<tonic::Response<InsertResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
.send_async(PlaybackMessage::Insert {
position: req.position,
uuids: req.uuid,
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = InsertResponse {};
Ok(Response::new(reply))
}
async fn set_current(
&self,
request: tonic::Request<SetCurrentRequest>,
) -> std::result::Result<tonic::Response<SetCurrentResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
let req = request.into_inner();
playback_tx
@ -753,22 +765,22 @@ impl CrabidyService for RpcService {
})
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = SetCurrentTrackResponse {};
let reply = SetCurrentResponse {};
Ok(Response::new(reply))
}
async fn get_queue_updates(
async fn get_update_stream(
&self,
request: tonic::Request<GetQueueUpdatesRequest>,
) -> std::result::Result<tonic::Response<Self::GetQueueUpdatesStream>, tonic::Status> {
let update_rx = self.queue_update_tx.subscribe();
request: tonic::Request<GetUpdateStreamRequest>,
) -> std::result::Result<tonic::Response<Self::GetUpdateStreamStream>, tonic::Status> {
let update_rx = self.update_tx.subscribe();
let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx);
let output_stream = update_stream
.into_stream()
.map(|queue_result| match queue_result {
Ok(queue) => Ok(GetQueueUpdatesResponse {
queue_update_result: Some(QueueUpdateResult::Full(queue)),
.map(|update_result| match update_result {
Ok(update) => Ok(GetUpdateStreamResponse {
update: Some(update),
}),
Err(_) => Err(tonic::Status::new(
tonic::Code::Unknown,
@ -778,15 +790,6 @@ impl CrabidyService for RpcService {
Ok(Response::new(Box::pin(output_stream)))
}
async fn get_queue(
&self,
request: tonic::Request<GetQueueRequest>,
) -> std::result::Result<tonic::Response<GetQueueResponse>, tonic::Status> {
let reply = GetQueueResponse { queue: None };
Ok(Response::new(reply))
}
async fn save_queue(
&self,
request: tonic::Request<SaveQueueRequest>,
@ -802,50 +805,98 @@ impl CrabidyService for RpcService {
) -> std::result::Result<tonic::Response<TogglePlayResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx
.send_async(PlaybackMessage::PlayPause)
.send_async(PlaybackMessage::TogglePlay)
.await
.unwrap();
let reply = TogglePlayResponse {};
Ok(Response::new(reply))
}
async fn toggle_shuffle(
&self,
request: tonic::Request<ToggleShuffleRequest>,
) -> std::result::Result<tonic::Response<ToggleShuffleResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx
.send_async(PlaybackMessage::ToggleShuffle)
.await
.unwrap();
let reply = ToggleShuffleResponse {};
Ok(Response::new(reply))
}
async fn stop(
&self,
request: tonic::Request<StopRequest>,
) -> std::result::Result<tonic::Response<StopResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx.send_async(PlaybackMessage::Stop).await.unwrap();
let reply = StopResponse {};
Ok(Response::new(reply))
}
async fn get_active_track(
async fn change_volume(
&self,
request: tonic::Request<GetActiveTrackRequest>,
) -> std::result::Result<tonic::Response<GetActiveTrackResponse>, tonic::Status> {
let reply = GetActiveTrackResponse {
active_track: None,
// track: None,
// play_state: TrackPlayState::Stopped as i32,
// completion: 0,
};
request: tonic::Request<ChangeVolumeRequest>,
) -> std::result::Result<tonic::Response<ChangeVolumeResponse>, tonic::Status> {
let delta = request.into_inner().delta;
let playback_tx = self.playback_tx.clone();
playback_tx
.send_async(PlaybackMessage::ChangeVolume { delta })
.await
.unwrap();
let reply = ChangeVolumeResponse {};
Ok(Response::new(reply))
}
async fn get_track_updates(
async fn toggle_mute(
&self,
request: tonic::Request<GetTrackUpdatesRequest>,
) -> std::result::Result<tonic::Response<Self::GetTrackUpdatesStream>, tonic::Status> {
let update_rx = self.active_track_tx.subscribe();
let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx);
request: tonic::Request<ToggleMuteRequest>,
) -> std::result::Result<tonic::Response<ToggleMuteResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx
.send_async(PlaybackMessage::ToggleMute)
.await
.unwrap();
let reply = ToggleMuteResponse {};
Ok(Response::new(reply))
}
let output_stream = update_stream.map(|active_track_result| match active_track_result {
Ok(active_track) => Ok(GetTrackUpdatesResponse {
active_track: Some(active_track),
}),
Err(_) => Err(tonic::Status::new(
tonic::Code::Unknown,
"Internal channel error",
)),
});
Ok(Response::new(Box::pin(output_stream)))
async fn next(
&self,
request: tonic::Request<NextRequest>,
) -> std::result::Result<tonic::Response<NextResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx.send_async(PlaybackMessage::Next).await.unwrap();
let reply = NextResponse {};
Ok(Response::new(reply))
}
async fn prev(
&self,
request: tonic::Request<PrevRequest>,
) -> std::result::Result<tonic::Response<PrevResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx.send_async(PlaybackMessage::Prev).await.unwrap();
let reply = PrevResponse {};
Ok(Response::new(reply))
}
async fn restart_track(
&self,
request: tonic::Request<RestartTrackRequest>,
) -> std::result::Result<tonic::Response<RestartTrackResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx.send_async(PlaybackMessage::Prev).await.unwrap();
let reply = RestartTrackResponse {};
Ok(Response::new(reply))
}
}
fn is_track(uuid: &str) -> bool {
uuid.starts_with("track:")
}
fn is_node(uuid: &str) -> bool {
uuid.starts_with("node:")
}

View File

@ -44,7 +44,8 @@ impl crabidy_core::ProviderClient for Client {
&self,
track_uuid: &str,
) -> Result<Vec<String>, crabidy_core::ProviderError> {
let Ok(playback) = self.get_track_playback(track_uuid).await else {
let (_, track_uuid, _) = split_uuid(track_uuid);
let Ok(playback) = self.get_track_playback(&track_uuid).await else {
return Err(crabidy_core::ProviderError::FetchError)
};
let Ok(manifest) = playback.get_manifest() else {
@ -66,14 +67,13 @@ impl crabidy_core::ProviderClient for Client {
fn get_lib_root(&self) -> crabidy_core::proto::crabidy::LibraryNode {
let global_root = crabidy_core::proto::crabidy::LibraryNode::new();
let children = vec![crabidy_core::proto::crabidy::LibraryNodeChild::new(
"userplaylists".to_string(),
"node:userplaylists".to_string(),
"playlists".to_string(),
)];
crabidy_core::proto::crabidy::LibraryNode {
uuid: "tidal".to_string(),
uuid: "node:tidal".to_string(),
title: "tidal".to_string(),
parent: Some(format!("{}", global_root.uuid)),
state: crabidy_core::proto::crabidy::LibraryNodeState::Done as i32,
tracks: Vec::new(),
children,
is_queable: false,
@ -87,14 +87,13 @@ impl crabidy_core::ProviderClient for Client {
let Some(user_id) = self.settings.login.user_id.clone() else {
return Err(crabidy_core::ProviderError::UnknownUser)
};
let (module, uuid) = split_uuid(uuid);
let (_kind, module, uuid) = split_uuid(uuid);
let node = match module.as_str() {
"userplaylists" => {
let mut node = crabidy_core::proto::crabidy::LibraryNode {
uuid: "userplaylists".to_string(),
uuid: "node:userplaylists".to_string(),
title: "playlists".to_string(),
parent: Some("tidal".to_string()),
state: crabidy_core::proto::crabidy::LibraryNodeState::Unspecified as i32,
parent: Some("node:tidal".to_string()),
tracks: Vec::new(),
children: Vec::new(),
is_queable: false,
@ -104,7 +103,7 @@ impl crabidy_core::ProviderClient for Client {
.await?
{
let child = crabidy_core::proto::crabidy::LibraryNodeChild::new(
format!("playlist:{}", playlist.playlist.uuid),
format!("node:playlist:{}", playlist.playlist.uuid),
playlist.playlist.title,
);
node.children.push(child);
@ -121,7 +120,7 @@ impl crabidy_core::ProviderClient for Client {
.map(|t| t.into())
.collect();
node.tracks = tracks;
node.parent = Some("userplaylists".to_string());
node.parent = Some("node:userplaylists".to_string());
node
}
_ => return Err(crabidy_core::ProviderError::MalformedUuid),
@ -130,11 +129,12 @@ impl crabidy_core::ProviderClient for Client {
}
}
fn split_uuid(uuid: &str) -> (String, String) {
let mut split = uuid.splitn(2, ':');
fn split_uuid(uuid: &str) -> (String, String, String) {
let mut split = uuid.splitn(3, ':');
(
split.next().unwrap_or("").to_string(),
split.next().unwrap_or("").to_string(),
split.next().unwrap_or("").to_string(),
)
}
@ -369,6 +369,7 @@ impl Client {
}
pub async fn get_track(&self, track_id: &str) -> Result<Track, ClientError> {
let (_, track_id, _) = split_uuid(track_id);
self.make_request(&format!("tracks/{}", track_id), None)
.await
}

View File

@ -158,9 +158,10 @@ pub struct Track {
impl From<Track> for crabidy_core::proto::crabidy::Track {
fn from(track: Track) -> Self {
Self {
uuid: track.id.to_string(),
uuid: format!("track:{}", track.id),
title: track.title,
artist: track.artist.name,
album: Some(track.album.into()),
duration: Some(track.duration as u32),
}
}
@ -169,9 +170,10 @@ impl From<Track> for crabidy_core::proto::crabidy::Track {
impl From<&Track> for crabidy_core::proto::crabidy::Track {
fn from(track: &Track) -> Self {
Self {
uuid: track.id.to_string(),
uuid: format!("track:{}", track.id),
title: track.title.clone(),
artist: track.artist.name.clone(),
album: Some(track.album.clone().into()),
duration: Some(track.duration as u32),
}
}
@ -208,6 +210,15 @@ pub struct Album {
pub release_date: Option<String>,
}
impl From<Album> for crabidy_core::proto::crabidy::Album {
fn from(album: Album) -> Self {
Self {
title: album.title,
release_date: album.release_date,
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Mixes {
@ -306,10 +317,9 @@ impl From<Playlist> for crabidy_core::proto::crabidy::LibraryNode {
fn from(a: Playlist) -> Self {
crabidy_core::proto::crabidy::LibraryNode {
title: a.title,
uuid: format!("playlist:{}", a.uuid),
uuid: format!("node:playlist:{}", a.uuid),
tracks: Vec::new(),
parent: None,
state: crabidy_core::proto::crabidy::LibraryNodeState::Done as i32,
children: Vec::new(),
is_queable: true,
}