From 0b9e6ceebe597bf26a86d80ec1f24d83b219c9b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Tue, 23 May 2023 13:11:22 +0200 Subject: [PATCH] Add server stubs for newly stuctred RPCs --- Cargo.lock | 220 ++++++++++++++++++++++++++++++++++++- crabidy-server/Cargo.toml | 1 + crabidy-server/src/main.rs | 181 +++++++++++++++++++++++++----- 3 files changed, 373 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f33f46..9609550 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" + +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -17,6 +29,12 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "ascii" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" + [[package]] name = "async-trait" version = "0.1.68" @@ -112,6 +130,12 @@ version = "3.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c6ed94e98ecff0c12dd1b04c15ec0d7d9458ca8fe806cea6f12954efe74c63b" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.4.0" @@ -128,10 +152,11 @@ checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" name = "cbd-tui" version = "0.1.0" dependencies = [ - "crabidy-core", "crossterm", + "cynic", "flume", "ratatui", + "reqwest", "tokio", ] @@ -172,6 +197,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "combine" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" +dependencies = [ + "ascii", + "byteorder", + "either", + "memchr", + "unreachable", +] + [[package]] name = "confique" version = "0.2.3" @@ -213,6 +251,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "counter" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d458e66999348f56fd3ffcfbb7f7951542075ca8359687c703de6500c1ddccd" +dependencies = [ + "num-traits", +] + [[package]] name = "cpufeatures" version = "0.2.7" @@ -228,6 +275,7 @@ version = "0.1.0" dependencies = [ "crabidy-core", "tokio", + "tokio-stream", "tonic", ] @@ -250,6 +298,7 @@ dependencies = [ "async-trait", "crabidy-core", "flume", + "futures", "gstreamer", "gstreamer-play", "once_cell", @@ -295,6 +344,83 @@ dependencies = [ "typenum", ] +[[package]] +name = "cynic" +version = "3.0.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2155e722fd76f4fa51635811279242e78cf312de1165e53cec4316425fd97c4d" +dependencies = [ + "cynic-proc-macros", + "reqwest", + "serde", + "serde_json", + "static_assertions", + "thiserror", +] + +[[package]] +name = "cynic-codegen" +version = "3.0.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2454afe901e4de5b4cbdf45a18453fff9d6dfe3bb096babd6a62882995a5138c" +dependencies = [ + "counter", + "darling", + "graphql-parser", + "once_cell", + "ouroboros", + "proc-macro2", + "quote", + "strsim", + "syn 1.0.109", + "thiserror", +] + +[[package]] +name = "cynic-proc-macros" +version = "3.0.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd3ce67a4faeea7002a853acefe43938bc897d593f11abd76039dfb7ec445f0" +dependencies = [ + "cynic-codegen", + "syn 1.0.109", +] + +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 1.0.109", +] + +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core", + "quote", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -399,6 +525,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -406,6 +547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -425,6 +567,12 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + [[package]] name = "futures-macro" version = "0.3.28" @@ -454,9 +602,13 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -557,6 +709,16 @@ dependencies = [ "system-deps", ] +[[package]] +name = "graphql-parser" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" +dependencies = [ + "combine", + "thiserror", +] + [[package]] name = "gstreamer" version = "0.20.5" @@ -843,6 +1005,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.3.0" @@ -1129,6 +1297,29 @@ dependencies = [ "paste", ] +[[package]] +name = "ouroboros" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1358bd1558bd2a083fed428ffeda486fbfb323e698cdda7794259d592ca72db" +dependencies = [ + "aliasable", + "ouroboros_macro", +] + +[[package]] +name = "ouroboros_macro" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7d21ccd03305a674437ee1248f3ab5d4b1db095cf1caf49f1713ddf61956b7" +dependencies = [ + "Inflector", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1765,6 +1956,18 @@ dependencies = [ "lock_api", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "syn" version = "1.0.109" @@ -2164,6 +2367,15 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unreachable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" +dependencies = [ + "void", +] + [[package]] name = "unsafe-libyaml" version = "0.2.8" @@ -2205,6 +2417,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "want" version = "0.3.0" diff --git a/crabidy-server/Cargo.toml b/crabidy-server/Cargo.toml index bcd1eba..442bd2a 100644 --- a/crabidy-server/Cargo.toml +++ b/crabidy-server/Cargo.toml @@ -18,3 +18,4 @@ serde = "1.0.163" flume = "0.10.14" tonic = "0.9.2" async-trait = "0.1.68" +futures = "0.3.28" diff --git a/crabidy-server/src/main.rs b/crabidy-server/src/main.rs index fe766af..de275e6 100644 --- a/crabidy-server/src/main.rs +++ b/crabidy-server/src/main.rs @@ -1,34 +1,36 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use crabidy_core::proto::crabidy::{ - library_service_server::{LibraryService, LibraryServiceServer}, - playback_server::{Playback, PlaybackServer}, - queue_server::{Queue, QueueServer}, - GetLibraryNodeRequest, GetLibraryNodeResponse, GetTrackRequest, GetTrackResponse, LibraryNode, - LibraryNodeState, + crabidy_service_server::{CrabidyService, CrabidyServiceServer}, + AppendNodeRequest, AppendNodeResponse, AppendTrackRequest, AppendTrackResponse, + GetActiveTrackRequest, GetActiveTrackResponse, GetLibraryNodeRequest, GetLibraryNodeResponse, + GetQueueRequest, GetQueueResponse, GetQueueUpdatesRequest, GetQueueUpdatesResponse, + GetTrackRequest, GetTrackResponse, GetTrackUpdatesRequest, GetTrackUpdatesResponse, + LibraryNode, LibraryNodeState, Queue, QueueLibraryNodeRequest, QueueLibraryNodeResponse, + QueueTrackRequest, QueueTrackResponse, RemoveTracksRequest, RemoveTracksResponse, + ReplaceWithNodeRequest, ReplaceWithNodeResponse, ReplaceWithTrackRequest, + ReplaceWithTrackResponse, SaveQueueRequest, SaveQueueResponse, SetCurrentTrackRequest, + SetCurrentTrackResponse, StopRequest, StopResponse, TogglePlayRequest, TogglePlayResponse, + TrackPlayState, }; use crabidy_core::{ProviderClient, ProviderError}; use gstreamer_play::{Play, PlayMessage, PlayState, PlayVideoRenderer}; -use once_cell::sync::OnceCell; -use std::{ - collections::HashMap, - fs, - sync::{Arc, RwLock}, -}; -use tonic::{transport::Server, Request, Response, Status}; +// use once_cell::sync::OnceCell; +use std::{collections::HashMap, fs, pin::Pin, sync::RwLock}; +use tonic::{codegen::futures_core::Stream, transport::Server, Request, Response, Status}; // static CHANNEL: OnceCell> = OnceCell::new(); -static ORCHESTRATOR_CHANNEL: OnceCell> = OnceCell::new(); +// static ORCHESTRATOR_CHANNEL: OnceCell> = OnceCell::new(); #[tokio::main] async fn main() -> Result<(), Box> { let orchestrator = ClientOrchestrator::init("").await.unwrap(); - orchestrator.run(); - let addr = "[::1]:50051".parse()?; - let crabidy_service = Library::new(); + let tx = orchestrator.run(); + let crabidy_service = AppState::new(tx); + let addr = "[::1]:50051".parse()?; Server::builder() - .add_service(LibraryServiceServer::new(crabidy_service)) + .add_service(CrabidyServiceServer::new(crabidy_service)) .serve(addr) .await?; @@ -49,11 +51,13 @@ enum OrchestratorMessage { #[derive(Debug)] struct ClientOrchestrator { rx: flume::Receiver, + tx: flume::Sender, tidal_client: tidaldy::Client, } impl ClientOrchestrator { - fn run(self) { + fn run(self) -> flume::Sender { + let tx = self.tx.clone(); tokio::spawn(async move { while let Ok(msg) = self.rx.recv_async().await { match msg { @@ -71,6 +75,7 @@ impl ClientOrchestrator { } } }); + tx } } @@ -82,8 +87,11 @@ impl ProviderClient for ClientOrchestrator { let new_toml_config = tidal_client.settings(); fs::write("/tmp/tidaldy.toml", new_toml_config).unwrap(); let (tx, rx) = flume::unbounded(); - ORCHESTRATOR_CHANNEL.set(tx).unwrap(); - Ok(Self { rx, tidal_client }) + Ok(Self { + rx, + tx, + tidal_client, + }) } fn settings(&self) -> String { "".to_owned() @@ -105,22 +113,27 @@ impl ProviderClient for ClientOrchestrator { } #[derive(Debug)] -struct Library { +struct AppState { known_nodes: RwLock>, - clients: Arc>>, + orchestrator_tx: flume::Sender, } -impl Library { - fn new() -> Self { +impl AppState { + fn new(orchestrator_tx: flume::Sender) -> Self { Self { known_nodes: RwLock::new(HashMap::new()), - clients: Arc::new(HashMap::new()), + orchestrator_tx, } } } #[tonic::async_trait] -impl LibraryService for Library { +impl CrabidyService for AppState { + type GetQueueUpdatesStream = + futures::stream::Iter>>; + type GetTrackUpdatesStream = + futures::stream::Iter>>; + async fn get_library_node( &self, request: Request, @@ -128,8 +141,7 @@ impl LibraryService for Library { println!("Got a library node request: {:?}", request); let node_uuid = request.into_inner().uuid; let (tx, rx) = flume::bounded(1); - ORCHESTRATOR_CHANNEL - .wait() + self.orchestrator_tx .send_async(OrchestratorMessage::GetNode { uuid: node_uuid, callback: tx, @@ -151,6 +163,119 @@ impl LibraryService for Library { let reply = GetTrackResponse { track: None }; Ok(Response::new(reply)) } + + async fn queue_track( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = QueueTrackResponse {}; + Ok(Response::new(reply)) + } + async fn queue_library_node( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = QueueLibraryNodeResponse {}; + Ok(Response::new(reply)) + } + async fn replace_with_track( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = ReplaceWithTrackResponse {}; + Ok(Response::new(reply)) + } + async fn replace_with_node( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = ReplaceWithNodeResponse {}; + Ok(Response::new(reply)) + } + async fn append_track( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = AppendTrackResponse {}; + Ok(Response::new(reply)) + } + async fn append_node( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = AppendNodeResponse {}; + Ok(Response::new(reply)) + } + async fn remove_tracks( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = RemoveTracksResponse {}; + Ok(Response::new(reply)) + } + async fn set_current_track( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = SetCurrentTrackResponse {}; + Ok(Response::new(reply)) + } + async fn get_queue_updates( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let queue_vec: Vec> = vec![]; + let output_stream = futures::stream::iter(queue_vec.into_iter()); + Ok(Response::new(output_stream)) + } + async fn get_queue( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = GetQueueResponse { queue: None }; + Ok(Response::new(reply)) + } + async fn save_queue( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = SaveQueueResponse {}; + Ok(Response::new(reply)) + } + /// Playback + async fn toggle_play( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = TogglePlayResponse {}; + Ok(Response::new(reply)) + } + async fn stop( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = StopResponse {}; + Ok(Response::new(reply)) + } + async fn get_active_track( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let reply = GetActiveTrackResponse { + track: None, + play_state: TrackPlayState::Stopped as i32, + completion: 0, + }; + Ok(Response::new(reply)) + } + async fn get_track_updates( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let track_vec: Vec> = vec![]; + let output_stream = futures::stream::iter(track_vec.into_iter()); + Ok(Response::new(output_stream)) + } } // #[derive(Debug)]