From af63d50dc52d292e85d9e8ee0d2bddd894438c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20M=C3=BCndelein?= Date: Sat, 3 Jun 2023 17:48:51 +0200 Subject: [PATCH] Add missing files after refactoring --- crabidy-server/src/playback.rs | 526 +++++++++++++++++++++++++++++++++ crabidy-server/src/provider.rs | 158 ++++++++++ crabidy-server/src/rpc.rs | 395 +++++++++++++++++++++++++ 3 files changed, 1079 insertions(+) create mode 100644 crabidy-server/src/playback.rs create mode 100644 crabidy-server/src/provider.rs create mode 100644 crabidy-server/src/rpc.rs diff --git a/crabidy-server/src/playback.rs b/crabidy-server/src/playback.rs new file mode 100644 index 0000000..0af44ea --- /dev/null +++ b/crabidy-server/src/playback.rs @@ -0,0 +1,526 @@ +use crate::PlaybackMessage; +use crate::ProviderMessage; +use crabidy_core::proto::crabidy::{ + get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack, + Track, TrackPosition, +}; +use crabidy_core::ProviderError; +use gstreamer_play::{Play, PlayState as GstPlaystate, PlayVideoRenderer}; +use std::sync::Mutex; +use tracing::debug_span; +use tracing::{debug, error, instrument, trace, warn, Instrument}; + +#[derive(Debug)] +pub struct Playback { + update_tx: tokio::sync::broadcast::Sender, + provider_tx: flume::Sender, + pub playback_tx: flume::Sender, + playback_rx: flume::Receiver, + queue: Mutex, + state: Mutex, + pub play: Play, +} + +impl Playback { + pub fn new( + update_tx: tokio::sync::broadcast::Sender, + provider_tx: flume::Sender, + ) -> Self { + let (playback_tx, playback_rx) = flume::bounded(10); + let queue = Mutex::new(Queue { + timestamp: 0, + current_position: 0, + tracks: Vec::new(), + }); + let state = Mutex::new(GstPlaystate::Stopped); + let play = Play::new(None::); + Self { + update_tx, + provider_tx, + playback_tx, + playback_rx, + queue, + state, + play, + } + } + #[instrument] + pub fn run(self) { + tokio::spawn(async move { + while let Ok(message) = self.playback_rx.recv_async().in_current_span().await { + match message { + PlaybackMessage::Init { result_tx, span } => { + let _e = span.enter(); + let response = { + let queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + let queue_track = QueueTrack { + queue_position: queue.current_position, + track: queue.current_track(), + }; + trace!("queue_track {:?}", queue_track); + debug!("released queue_track lock"); + let position = TrackPosition { + duration: self + .play + .duration() + .map(|t| t.mseconds() as u32) + .unwrap_or(0), + position: self + .play + .position() + .map(|t| t.mseconds() as u32) + .unwrap_or(0), + }; + trace!("position {:?}", position); + let play_state = { + debug!("getting play state lock"); + match *self.state.lock().unwrap() { + GstPlaystate::Playing => PlayState::Playing, + GstPlaystate::Paused => PlayState::Paused, + GstPlaystate::Stopped => PlayState::Stopped, + GstPlaystate::Buffering => PlayState::Loading, + _ => PlayState::Unspecified, + } + }; + trace!("play_state {:?}", play_state); + debug!("released play state lock"); + InitResponse { + queue: Some(queue.clone()), + queue_track: Some(queue_track), + play_state: play_state as i32, + volume: self.play.volume() as f32, + mute: self.play.is_muted(), + position: Some(position), + } + }; + trace!("response {:?}", response); + result_tx.send(response).unwrap(); + } + PlaybackMessage::Replace { uuids, span } => { + let _e = span.enter(); + let mut all_tracks = Vec::new(); + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).in_current_span().await { + all_tracks.push(track); + } + } else { + let tracks = self.flatten_node(&uuid).in_current_span().await; + all_tracks.extend(tracks); + } + } + trace!("got tracks {:?}", all_tracks); + let current = { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.replace_with_tracks(&all_tracks); + queue.set_current_position(0); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + queue_update_tx.send(update).unwrap(); + queue.current_track() + }; + debug!("got current {:?}", current); + self.play(current).in_current_span().await; + } + + PlaybackMessage::Queue { uuids, span } => { + let _e = span.enter(); + debug!("queing"); + let mut all_tracks = Vec::new(); + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).in_current_span().await { + all_tracks.push(track); + } + } else { + let tracks = self.flatten_node(&uuid).in_current_span().await; + all_tracks.extend(tracks); + } + } + trace!("got tracks {:?}", all_tracks); + { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.queue_tracks(&all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + } + } + debug!("que lock released"); + } + + PlaybackMessage::Append { uuids, span } => { + let _e = span.enter(); + debug!("appending"); + let mut all_tracks = Vec::new(); + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).in_current_span().await { + all_tracks.push(track); + } + } else { + let tracks = self.flatten_node(&uuid).in_current_span().await; + all_tracks.extend(tracks); + } + } + trace!("got tracks {:?}", all_tracks); + { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.append_tracks(&all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + } + } + debug!("queue lock released"); + } + + PlaybackMessage::Remove { positions, span } => { + let _e = span.enter(); + debug!("removing"); + let track = { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + let track = queue.remove_tracks(&positions); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + queue_update_tx.send(update).unwrap(); + track + }; + debug!("queue lock released"); + self.play(track).in_current_span().await; + } + + PlaybackMessage::Insert { + position, + uuids, + span, + } => { + let _e = span.enter(); + debug!("inserting"); + let mut all_tracks = Vec::new(); + for uuid in uuids { + if is_track(&uuid) { + if let Ok(track) = self.get_track(&uuid).in_current_span().await { + all_tracks.push(track); + } + } else { + let tracks = self.flatten_node(&uuid).in_current_span().await; + all_tracks.extend(tracks); + } + } + trace!("got tracks {:?}", all_tracks); + { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.insert_tracks(position, &all_tracks); + let queue_update_tx = self.update_tx.clone(); + let update = StreamUpdate::Queue(queue.clone()); + queue_update_tx.send(update).unwrap(); + } + debug!("queue lock released"); + } + + PlaybackMessage::SetCurrent { + position: queue_position, + span, + } => { + let _e = span.enter(); + debug!("setting current"); + let track = { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.set_current_position(queue_position); + queue.current_track() + }; + debug!("quue lock released and got current {:?}", track); + self.play(track).in_current_span().await; + } + + PlaybackMessage::TogglePlay { span } => { + let _e = span.enter(); + debug!("toggling play"); + { + let state = self.state.lock().unwrap(); + debug!("got state lock"); + if *state == GstPlaystate::Playing { + self.play.pause(); + } else { + self.play.play(); + } + } + debug!("state lock released"); + } + + PlaybackMessage::Stop { span } => { + let _e = span.enter(); + debug!("stopping"); + self.play.stop(); + } + + PlaybackMessage::ChangeVolume { delta, span } => { + let _e = span.enter(); + debug!("changing volume"); + let volume = self.play.volume(); + debug!("got volume {:?}", volume); + self.play.set_volume(volume + delta as f64); + } + + PlaybackMessage::ToggleMute { span } => { + let _e = span.enter(); + debug!("toggling mute"); + let muted = self.play.is_muted(); + debug!("got muted {:?}", muted); + self.play.set_mute(!muted); + } + + PlaybackMessage::ToggleShuffle { span } => { + let _e = span.enter(); + debug!("toggling shuffle"); + todo!() + } + + PlaybackMessage::Next { span } => { + let _e = span.enter(); + debug!("nexting"); + let track = { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.next_track() + }; + debug!("released queue lock and got track {:?}", track); + + self.play_or_stop(track).in_current_span().await; + } + + PlaybackMessage::Prev { span } => { + let _e = span.enter(); + debug!("preving"); + let track = { + let mut queue = self.queue.lock().unwrap(); + debug!("got queue lock"); + queue.prev_track() + }; + debug!("released queue lock and got track {:?}", track); + self.play_or_stop(track).in_current_span().await; + } + + PlaybackMessage::StateChanged { state, span } => { + let _e = span.enter(); + debug!("state changed"); + + let play_state = { + *self.state.lock().unwrap() = state.clone(); + debug!("got state lock"); + + match state { + GstPlaystate::Playing => PlayState::Playing, + GstPlaystate::Paused => PlayState::Paused, + GstPlaystate::Stopped => PlayState::Stopped, + GstPlaystate::Buffering => PlayState::Loading, + _ => PlayState::Unspecified, + } + }; + debug!("released state lock and got play state {:?}", play_state); + let active_track_tx = self.update_tx.clone(); + let update = StreamUpdate::PlayState(play_state as i32); + if let Err(err) = active_track_tx.send(update) { + error!("{:?}", err) + }; + } + + PlaybackMessage::RestartTrack { span } => { + let _e = span.enter(); + debug!("restarting track"); + self.play.stop(); + self.play.play(); + } + + PlaybackMessage::VolumeChanged { volume, span } => { + let _e = span.enter(); + trace!("volume changed"); + let update_tx = self.update_tx.clone(); + let update = StreamUpdate::Volume(volume); + if let Err(err) = update_tx.send(update) { + error!("{:?}", err) + } + } + + PlaybackMessage::MuteChanged { muted, span } => { + let _e = span.enter(); + trace!("mute changed"); + let update_tx = self.update_tx.clone(); + let update = StreamUpdate::Mute(muted); + if let Err(err) = update_tx.send(update) { + error!("{:?}", err) + } + } + + PlaybackMessage::PostitionChanged { position, span } => { + let _e = span.enter(); + trace!("position changed"); + let update_tx = self.update_tx.clone(); + let duration = self + .play + .duration() + .and_then(|t| Some(t.mseconds() as u32)) + .unwrap_or(0); + let update = StreamUpdate::Position(TrackPosition { duration, position }); + if let Err(err) = update_tx.send(update) { + error!("{:?}", err) + } + } + } + } + }); + } + + #[instrument(skip(self))] + async fn flatten_node(&self, uuid: &str) -> Vec { + let tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let span = debug_span!("prov-chan"); + let Ok(_) = tx.send_async(ProviderMessage::FlattenNode { + uuid: uuid.to_string(), + result_tx, + span, + }).in_current_span().await else { + return Vec::new(); + }; + let Ok(tracks) = result_rx + .recv_async() + .in_current_span() + .await else { + return Vec::new(); + }; + tracks + } + + #[instrument(skip(self))] + async fn get_track(&self, uuid: &str) -> Result { + let tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let span = tracing::trace_span!("prov-chan"); + tx.send_async(ProviderMessage::GetTrack { + uuid: uuid.to_string(), + result_tx, + span, + }) + .in_current_span() + .await + .map_err(|_| ProviderError::InternalError)?; + result_rx + .recv_async() + .in_current_span() + .await + .map_err(|_| ProviderError::InternalError)? + } + + #[instrument(skip(self))] + async fn get_urls_for_track(&self, uuid: &str) -> Result, ProviderError> { + let tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let span = tracing::trace_span!("prov-chan"); + tx.send_async(ProviderMessage::GetTrackUrls { + uuid: uuid.to_string(), + result_tx, + span, + }) + .in_current_span() + .await + .map_err(|_| ProviderError::InternalError)?; + result_rx + .recv_async() + .in_current_span() + .await + .map_err(|_| ProviderError::InternalError)? + } + + #[instrument(skip(self))] + async fn play_or_stop(&self, track: Option) { + if let Some(track) = track { + let mut uuid = track.uuid.clone(); + let urls = loop { + match self.get_urls_for_track(&uuid).in_current_span().await { + Ok(urls) => break urls, + Err(err) => { + warn!("no urls found for track {:?}: {}", track.uuid, err); + uuid = { + let mut queue = self.queue.lock().unwrap(); + if let Some(track) = queue.next_track() { + track.uuid.clone() + } else { + return; + } + } + } + } + }; + { + let queue = self.queue.lock().unwrap(); + let queue_update_tx = self.update_tx.clone(); + let track = queue.current_track(); + let update = StreamUpdate::QueueTrack(QueueTrack { + queue_position: queue.current_position, + track, + }); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + } + } + self.play.stop(); + self.play.set_uri(Some(&urls[0])); + self.play.play(); + } else { + self.play.stop(); + } + } + + #[instrument(skip(self))] + async fn play(&self, track: Option) { + if let Some(track) = track { + let mut uuid = track.uuid.clone(); + let urls = loop { + match self.get_urls_for_track(&uuid).in_current_span().await { + Ok(urls) => break urls, + Err(err) => { + warn!("no urls found for track {:?}: {}", track.uuid, err); + uuid = { + let mut queue = self.queue.lock().unwrap(); + if let Some(track) = queue.next_track() { + track.uuid.clone() + } else { + return; + } + } + } + } + }; + { + let queue = self.queue.lock().unwrap(); + let queue_update_tx = self.update_tx.clone(); + let track = queue.current_track(); + let update = StreamUpdate::QueueTrack(QueueTrack { + queue_position: queue.current_position, + track, + }); + if let Err(err) = queue_update_tx.send(update) { + error!("{:?}", err) + } + } + self.play.stop(); + self.play.set_uri(Some(&urls[0])); + self.play.play(); + } + } +} + +fn is_track(uuid: &str) -> bool { + uuid.starts_with("track:") +} diff --git a/crabidy-server/src/provider.rs b/crabidy-server/src/provider.rs new file mode 100644 index 0000000..984fb1a --- /dev/null +++ b/crabidy-server/src/provider.rs @@ -0,0 +1,158 @@ +use crate::ProviderMessage; +use async_trait::async_trait; +use crabidy_core::{ + proto::crabidy::{LibraryNode, LibraryNodeChild, Track}, + ProviderClient, ProviderError, +}; +use std::{fs, path::PathBuf, sync::Arc}; +use tracing::{debug, error, instrument, warn, Instrument}; + +#[derive(Debug)] +pub struct ProviderOrchestrator { + pub provider_tx: flume::Sender, + provider_rx: flume::Receiver, + // known_tracks: RwLock>, + // known_nodes: RwLock>, + tidal_client: Arc, +} + +impl ProviderOrchestrator { + #[instrument] + pub fn run(self) { + tokio::spawn(async move { + while let Ok(msg) = self.provider_rx.recv_async().in_current_span().await { + match msg { + ProviderMessage::GetLibraryNode { + uuid, + result_tx, + span, + } => { + let _e = span.enter(); + let result = self.get_lib_node(&uuid).in_current_span().await; + result_tx.send(result).unwrap(); + } + ProviderMessage::GetTrack { + uuid, + result_tx, + span, + } => { + let _e = span.enter(); + let result = self.get_metadata_for_track(&uuid).in_current_span().await; + result_tx.send(result).unwrap(); + } + ProviderMessage::GetTrackUrls { + uuid, + result_tx, + span, + } => { + let _e = span.enter(); + let result = self.get_urls_for_track(&uuid).in_current_span().await; + result_tx.send(result).unwrap(); + } + ProviderMessage::FlattenNode { + uuid, + result_tx, + span, + } => { + let _e = span.enter(); + let result = self.flatten_node(&uuid).in_current_span().await; + result_tx.send(result).unwrap(); + } + } + } + }); + } + #[instrument(skip(self))] + async fn flatten_node(&self, node_uuid: &str) -> Vec { + let mut tracks = Vec::with_capacity(1000); + let mut nodes_to_go = Vec::with_capacity(100); + nodes_to_go.push(node_uuid.to_string()); + while let Some(node_uuid) = nodes_to_go.pop() { + let Ok(node) = self.get_lib_node(&node_uuid).in_current_span().await else { + continue + }; + if node.is_queable { + tracks.extend(node.tracks); + nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid)) + } + } + tracks + } +} + +#[async_trait] +impl ProviderClient for ProviderOrchestrator { + #[instrument(skip(_s))] + async fn init(_s: &str) -> Result { + let config_dir = dirs::config_dir() + .map(|d| d.join("crabidy")) + .unwrap_or(PathBuf::from("/tmp")); + let dir_exists = tokio::fs::try_exists(&config_dir) + .in_current_span() + .await + .map_err(|e| ProviderError::Config(e.to_string()))?; + if !dir_exists { + tokio::fs::create_dir(&config_dir) + .in_current_span() + .await + .map_err(|e| ProviderError::Config(e.to_string()))?; + } + let config_file = config_dir.join("tidaly.toml"); + let raw_toml_settings = fs::read_to_string(&config_file).unwrap_or("".to_owned()); + let tidal_client = Arc::new( + tidaldy::Client::init(&raw_toml_settings) + .in_current_span() + .await + .unwrap(), + ); + let new_toml_config = tidal_client.settings(); + if let Err(err) = tokio::fs::write(&config_file, new_toml_config) + .in_current_span() + .await + { + error!("Failed to write config file: {}", err); + }; + let (provider_tx, provider_rx) = flume::bounded(100); + Ok(Self { + provider_rx, + provider_tx, + tidal_client, + }) + } + #[instrument(skip(self))] + fn settings(&self) -> String { + "".to_owned() + } + #[instrument(skip(self))] + async fn get_urls_for_track(&self, track_uuid: &str) -> Result, ProviderError> { + self.tidal_client + .get_urls_for_track(track_uuid) + .in_current_span() + .await + } + #[instrument(skip(self))] + async fn get_metadata_for_track(&self, track_uuid: &str) -> Result { + self.tidal_client + .get_metadata_for_track(track_uuid) + .in_current_span() + .await + } + #[instrument(skip(self))] + fn get_lib_root(&self) -> LibraryNode { + let mut root_node = LibraryNode::new(); + let child = LibraryNodeChild::new("node:tidal".to_owned(), "tidal".to_owned()); + root_node.children.push(child); + root_node + } + #[instrument(skip(self))] + async fn get_lib_node(&self, uuid: &str) -> Result { + debug!("get_lib_node"); + if uuid == "node:/" { + return Ok(self.get_lib_root()); + } + if uuid == "node:tidal" { + return Ok(self.tidal_client.get_lib_root()); + } + self.tidal_client.get_lib_node(uuid).in_current_span().await + } +} diff --git a/crabidy-server/src/rpc.rs b/crabidy-server/src/rpc.rs new file mode 100644 index 0000000..38a1b8e --- /dev/null +++ b/crabidy-server/src/rpc.rs @@ -0,0 +1,395 @@ +use crate::{PlaybackMessage, ProviderMessage}; +use crabidy_core::proto::crabidy::{ + crabidy_service_server::CrabidyService, get_update_stream_response::Update as StreamUpdate, + AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse, + GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse, + InitRequest, InitResponse, InsertRequest, InsertResponse, NextRequest, NextResponse, + PrevRequest, PrevResponse, QueueRequest, QueueResponse, RemoveRequest, RemoveResponse, + ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, + SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, + ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, + ToggleShuffleRequest, ToggleShuffleResponse, +}; +use futures::TryStreamExt; +use std::pin::Pin; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status}; +use tracing::{debug, debug_span, error, instrument, trace, Instrument, Span}; + +#[derive(Debug)] +pub struct RpcService { + update_tx: tokio::sync::broadcast::Sender, + playback_tx: flume::Sender, + provider_tx: flume::Sender, +} + +impl RpcService { + pub fn new( + update_rx: tokio::sync::broadcast::Sender, + playback_tx: flume::Sender, + provider_tx: flume::Sender, + ) -> Self { + Self { + update_tx: update_rx, + playback_tx, + provider_tx, + } + } +} + +#[tonic::async_trait] +impl CrabidyService for RpcService { + type GetUpdateStreamStream = + Pin> + Send>>; + + #[instrument(skip(self, _request))] + async fn init(&self, _request: Request) -> Result, Status> { + debug!("Received init request"); + let playback_tx = self.playback_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let span = debug_span!("play-chan"); + if let Err(err) = playback_tx + .send_async(PlaybackMessage::Init { result_tx, span }) + .in_current_span() + .await + { + error!("{:?}", err); + return Err(Status::internal("Sending Init via internal channel failed")); + } + let response = result_rx + .recv_async() + .in_current_span() + .await + .map_err(|e| { + error!("{:?}", e); + return Status::internal("Failed to receive response from provider channel"); + })?; + Ok(Response::new(response)) + } + + #[instrument(skip(self, request), fields(uuid))] + async fn get_library_node( + &self, + request: Request, + ) -> Result, Status> { + let uuid = request.into_inner().uuid; + Span::current().record("uuid", &uuid); + debug!("Received get_library_node request"); + let provider_tx = self.provider_tx.clone(); + let (result_tx, result_rx) = flume::bounded(1); + let span = debug_span!("prov-chan"); + provider_tx + .send_async(ProviderMessage::GetLibraryNode { + uuid, + result_tx, + span, + }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let result = result_rx + .recv_async() + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to receive response from provider channel"))?; + match result { + Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })), + Err(err) => Err(Status::internal(err.to_string())), + } + } + + #[instrument(skip(self, request), fields(uuids))] + async fn queue( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let uuids = request.into_inner().uuid.clone(); + Span::current().record("uuids", format!("{:?}", uuids)); + debug!("Received queue request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Queue { uuids, span }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + + let reply = QueueResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, request), fields(uuids))] + async fn replace( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let uuids = request.into_inner().uuid.clone(); + Span::current().record("uuids", format!("{:?}", uuids)); + debug!("Received replace request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Replace { uuids, span }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = ReplaceResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, request), fields(uuids))] + async fn append( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let uuids = request.into_inner().uuid.clone(); + Span::current().record("uuids", format!("{:?}", uuids)); + debug!("Received append request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Append { uuids, span }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = AppendResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, request), fields(positions))] + async fn remove( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let positions = request.into_inner().positions; + Span::current().record("positions", format!("{:?}", positions)); + debug!("Received remove request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Remove { positions, span }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = RemoveResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, request), fields(uuids, position))] + async fn insert( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let req = request.into_inner(); + let uuids = req.uuid.clone(); + let position = req.position; + Span::current().record("uuids", format!("{:?}", uuids)); + Span::current().record("position", position); + debug!("Received insert request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Insert { + position: req.position, + uuids, + span, + }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = InsertResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, request), fields(position))] + async fn set_current( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let position = request.into_inner().position; + Span::current().record("position", position); + debug!("Received set_current request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::SetCurrent { position, span }) + .in_current_span() + .await + .map_err(|_| Status::internal("Failed to send request via channel"))?; + let reply = SetCurrentResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn get_update_stream( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received get_update_stream request"); + let update_rx = self.update_tx.subscribe(); + let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx); + + let output_stream = update_stream.into_stream().map(|update_result| { + trace!("Got update: {:?}", update_result); + match update_result { + Ok(update) => Ok(GetUpdateStreamResponse { + update: Some(update), + }), + Err(_) => Err(tonic::Status::new( + tonic::Code::Unknown, + "Internal channel error", + )), + } + }); + + Ok(Response::new(Box::pin(output_stream))) + } + #[instrument(skip(self, _request))] + async fn save_queue( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received save_queue request"); + let reply = SaveQueueResponse {}; + Ok(Response::new(reply)) + } + + /// Playback + #[instrument(skip(self, _request))] + async fn toggle_play( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received toggle_play request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::TogglePlay { span }) + .in_current_span() + .await + .unwrap(); + let reply = TogglePlayResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn toggle_shuffle( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received toggle_shuffle request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::ToggleShuffle { span }) + .in_current_span() + .await + .unwrap(); + let reply = ToggleShuffleResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn stop( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received stop request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Stop { span }) + .in_current_span() + .await + .unwrap(); + let reply = StopResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, request), fields(delta))] + async fn change_volume( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let delta = request.into_inner().delta; + Span::current().record("delta", delta); + debug!("Received change_volume request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::ChangeVolume { delta, span }) + .in_current_span() + .await + .unwrap(); + let reply = ChangeVolumeResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn toggle_mute( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received toggle_mute request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::ToggleMute { span }) + .in_current_span() + .await + .unwrap(); + let reply = ToggleMuteResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn next( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received next request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Next { span }) + .in_current_span() + .await + .unwrap(); + let reply = NextResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn prev( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received prev request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::Prev { span }) + .in_current_span() + .await + .unwrap(); + let reply = PrevResponse {}; + Ok(Response::new(reply)) + } + + #[instrument(skip(self, _request))] + async fn restart_track( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + debug!("Received restart_track request"); + let playback_tx = self.playback_tx.clone(); + let span = debug_span!("play-chan"); + playback_tx + .send_async(PlaybackMessage::RestartTrack { span }) + .in_current_span() + .await + .unwrap(); + let reply = RestartTrackResponse {}; + Ok(Response::new(reply)) + } +}