Add missing files after refactoring

This commit is contained in:
Hans Mündelein 2023-06-03 17:48:51 +02:00
parent 4043865ad4
commit af63d50dc5
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
3 changed files with 1079 additions and 0 deletions

View File

@ -0,0 +1,526 @@
use crate::PlaybackMessage;
use crate::ProviderMessage;
use crabidy_core::proto::crabidy::{
get_update_stream_response::Update as StreamUpdate, InitResponse, PlayState, Queue, QueueTrack,
Track, TrackPosition,
};
use crabidy_core::ProviderError;
use gstreamer_play::{Play, PlayState as GstPlaystate, PlayVideoRenderer};
use std::sync::Mutex;
use tracing::debug_span;
use tracing::{debug, error, instrument, trace, warn, Instrument};
#[derive(Debug)]
pub struct Playback {
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
provider_tx: flume::Sender<ProviderMessage>,
pub playback_tx: flume::Sender<PlaybackMessage>,
playback_rx: flume::Receiver<PlaybackMessage>,
queue: Mutex<Queue>,
state: Mutex<GstPlaystate>,
pub play: Play,
}
impl Playback {
pub fn new(
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
provider_tx: flume::Sender<ProviderMessage>,
) -> Self {
let (playback_tx, playback_rx) = flume::bounded(10);
let queue = Mutex::new(Queue {
timestamp: 0,
current_position: 0,
tracks: Vec::new(),
});
let state = Mutex::new(GstPlaystate::Stopped);
let play = Play::new(None::<PlayVideoRenderer>);
Self {
update_tx,
provider_tx,
playback_tx,
playback_rx,
queue,
state,
play,
}
}
#[instrument]
pub fn run(self) {
tokio::spawn(async move {
while let Ok(message) = self.playback_rx.recv_async().in_current_span().await {
match message {
PlaybackMessage::Init { result_tx, span } => {
let _e = span.enter();
let response = {
let queue = self.queue.lock().unwrap();
debug!("got queue lock");
let queue_track = QueueTrack {
queue_position: queue.current_position,
track: queue.current_track(),
};
trace!("queue_track {:?}", queue_track);
debug!("released queue_track lock");
let position = TrackPosition {
duration: self
.play
.duration()
.map(|t| t.mseconds() as u32)
.unwrap_or(0),
position: self
.play
.position()
.map(|t| t.mseconds() as u32)
.unwrap_or(0),
};
trace!("position {:?}", position);
let play_state = {
debug!("getting play state lock");
match *self.state.lock().unwrap() {
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
GstPlaystate::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading,
_ => PlayState::Unspecified,
}
};
trace!("play_state {:?}", play_state);
debug!("released play state lock");
InitResponse {
queue: Some(queue.clone()),
queue_track: Some(queue_track),
play_state: play_state as i32,
volume: self.play.volume() as f32,
mute: self.play.is_muted(),
position: Some(position),
}
};
trace!("response {:?}", response);
result_tx.send(response).unwrap();
}
PlaybackMessage::Replace { uuids, span } => {
let _e = span.enter();
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).in_current_span().await {
all_tracks.push(track);
}
} else {
let tracks = self.flatten_node(&uuid).in_current_span().await;
all_tracks.extend(tracks);
}
}
trace!("got tracks {:?}", all_tracks);
let current = {
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.replace_with_tracks(&all_tracks);
queue.set_current_position(0);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
queue.current_track()
};
debug!("got current {:?}", current);
self.play(current).in_current_span().await;
}
PlaybackMessage::Queue { uuids, span } => {
let _e = span.enter();
debug!("queing");
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).in_current_span().await {
all_tracks.push(track);
}
} else {
let tracks = self.flatten_node(&uuid).in_current_span().await;
all_tracks.extend(tracks);
}
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.queue_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
}
}
debug!("que lock released");
}
PlaybackMessage::Append { uuids, span } => {
let _e = span.enter();
debug!("appending");
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).in_current_span().await {
all_tracks.push(track);
}
} else {
let tracks = self.flatten_node(&uuid).in_current_span().await;
all_tracks.extend(tracks);
}
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.append_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
}
}
debug!("queue lock released");
}
PlaybackMessage::Remove { positions, span } => {
let _e = span.enter();
debug!("removing");
let track = {
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
let track = queue.remove_tracks(&positions);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
track
};
debug!("queue lock released");
self.play(track).in_current_span().await;
}
PlaybackMessage::Insert {
position,
uuids,
span,
} => {
let _e = span.enter();
debug!("inserting");
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
if let Ok(track) = self.get_track(&uuid).in_current_span().await {
all_tracks.push(track);
}
} else {
let tracks = self.flatten_node(&uuid).in_current_span().await;
all_tracks.extend(tracks);
}
}
trace!("got tracks {:?}", all_tracks);
{
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.insert_tracks(position, &all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
}
debug!("queue lock released");
}
PlaybackMessage::SetCurrent {
position: queue_position,
span,
} => {
let _e = span.enter();
debug!("setting current");
let track = {
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.set_current_position(queue_position);
queue.current_track()
};
debug!("quue lock released and got current {:?}", track);
self.play(track).in_current_span().await;
}
PlaybackMessage::TogglePlay { span } => {
let _e = span.enter();
debug!("toggling play");
{
let state = self.state.lock().unwrap();
debug!("got state lock");
if *state == GstPlaystate::Playing {
self.play.pause();
} else {
self.play.play();
}
}
debug!("state lock released");
}
PlaybackMessage::Stop { span } => {
let _e = span.enter();
debug!("stopping");
self.play.stop();
}
PlaybackMessage::ChangeVolume { delta, span } => {
let _e = span.enter();
debug!("changing volume");
let volume = self.play.volume();
debug!("got volume {:?}", volume);
self.play.set_volume(volume + delta as f64);
}
PlaybackMessage::ToggleMute { span } => {
let _e = span.enter();
debug!("toggling mute");
let muted = self.play.is_muted();
debug!("got muted {:?}", muted);
self.play.set_mute(!muted);
}
PlaybackMessage::ToggleShuffle { span } => {
let _e = span.enter();
debug!("toggling shuffle");
todo!()
}
PlaybackMessage::Next { span } => {
let _e = span.enter();
debug!("nexting");
let track = {
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.next_track()
};
debug!("released queue lock and got track {:?}", track);
self.play_or_stop(track).in_current_span().await;
}
PlaybackMessage::Prev { span } => {
let _e = span.enter();
debug!("preving");
let track = {
let mut queue = self.queue.lock().unwrap();
debug!("got queue lock");
queue.prev_track()
};
debug!("released queue lock and got track {:?}", track);
self.play_or_stop(track).in_current_span().await;
}
PlaybackMessage::StateChanged { state, span } => {
let _e = span.enter();
debug!("state changed");
let play_state = {
*self.state.lock().unwrap() = state.clone();
debug!("got state lock");
match state {
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
GstPlaystate::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading,
_ => PlayState::Unspecified,
}
};
debug!("released state lock and got play state {:?}", play_state);
let active_track_tx = self.update_tx.clone();
let update = StreamUpdate::PlayState(play_state as i32);
if let Err(err) = active_track_tx.send(update) {
error!("{:?}", err)
};
}
PlaybackMessage::RestartTrack { span } => {
let _e = span.enter();
debug!("restarting track");
self.play.stop();
self.play.play();
}
PlaybackMessage::VolumeChanged { volume, span } => {
let _e = span.enter();
trace!("volume changed");
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Volume(volume);
if let Err(err) = update_tx.send(update) {
error!("{:?}", err)
}
}
PlaybackMessage::MuteChanged { muted, span } => {
let _e = span.enter();
trace!("mute changed");
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Mute(muted);
if let Err(err) = update_tx.send(update) {
error!("{:?}", err)
}
}
PlaybackMessage::PostitionChanged { position, span } => {
let _e = span.enter();
trace!("position changed");
let update_tx = self.update_tx.clone();
let duration = self
.play
.duration()
.and_then(|t| Some(t.mseconds() as u32))
.unwrap_or(0);
let update = StreamUpdate::Position(TrackPosition { duration, position });
if let Err(err) = update_tx.send(update) {
error!("{:?}", err)
}
}
}
}
});
}
#[instrument(skip(self))]
async fn flatten_node(&self, uuid: &str) -> Vec<Track> {
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = debug_span!("prov-chan");
let Ok(_) = tx.send_async(ProviderMessage::FlattenNode {
uuid: uuid.to_string(),
result_tx,
span,
}).in_current_span().await else {
return Vec::new();
};
let Ok(tracks) = result_rx
.recv_async()
.in_current_span()
.await else {
return Vec::new();
};
tracks
}
#[instrument(skip(self))]
async fn get_track(&self, uuid: &str) -> Result<Track, ProviderError> {
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = tracing::trace_span!("prov-chan");
tx.send_async(ProviderMessage::GetTrack {
uuid: uuid.to_string(),
result_tx,
span,
})
.in_current_span()
.await
.map_err(|_| ProviderError::InternalError)?;
result_rx
.recv_async()
.in_current_span()
.await
.map_err(|_| ProviderError::InternalError)?
}
#[instrument(skip(self))]
async fn get_urls_for_track(&self, uuid: &str) -> Result<Vec<String>, ProviderError> {
let tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = tracing::trace_span!("prov-chan");
tx.send_async(ProviderMessage::GetTrackUrls {
uuid: uuid.to_string(),
result_tx,
span,
})
.in_current_span()
.await
.map_err(|_| ProviderError::InternalError)?;
result_rx
.recv_async()
.in_current_span()
.await
.map_err(|_| ProviderError::InternalError)?
}
#[instrument(skip(self))]
async fn play_or_stop(&self, track: Option<Track>) {
if let Some(track) = track {
let mut uuid = track.uuid.clone();
let urls = loop {
match self.get_urls_for_track(&uuid).in_current_span().await {
Ok(urls) => break urls,
Err(err) => {
warn!("no urls found for track {:?}: {}", track.uuid, err);
uuid = {
let mut queue = self.queue.lock().unwrap();
if let Some(track) = queue.next_track() {
track.uuid.clone()
} else {
return;
}
}
}
}
};
{
let queue = self.queue.lock().unwrap();
let queue_update_tx = self.update_tx.clone();
let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: queue.current_position,
track,
});
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
}
}
self.play.stop();
self.play.set_uri(Some(&urls[0]));
self.play.play();
} else {
self.play.stop();
}
}
#[instrument(skip(self))]
async fn play(&self, track: Option<Track>) {
if let Some(track) = track {
let mut uuid = track.uuid.clone();
let urls = loop {
match self.get_urls_for_track(&uuid).in_current_span().await {
Ok(urls) => break urls,
Err(err) => {
warn!("no urls found for track {:?}: {}", track.uuid, err);
uuid = {
let mut queue = self.queue.lock().unwrap();
if let Some(track) = queue.next_track() {
track.uuid.clone()
} else {
return;
}
}
}
}
};
{
let queue = self.queue.lock().unwrap();
let queue_update_tx = self.update_tx.clone();
let track = queue.current_track();
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: queue.current_position,
track,
});
if let Err(err) = queue_update_tx.send(update) {
error!("{:?}", err)
}
}
self.play.stop();
self.play.set_uri(Some(&urls[0]));
self.play.play();
}
}
}
fn is_track(uuid: &str) -> bool {
uuid.starts_with("track:")
}

View File

@ -0,0 +1,158 @@
use crate::ProviderMessage;
use async_trait::async_trait;
use crabidy_core::{
proto::crabidy::{LibraryNode, LibraryNodeChild, Track},
ProviderClient, ProviderError,
};
use std::{fs, path::PathBuf, sync::Arc};
use tracing::{debug, error, instrument, warn, Instrument};
#[derive(Debug)]
pub struct ProviderOrchestrator {
pub provider_tx: flume::Sender<ProviderMessage>,
provider_rx: flume::Receiver<ProviderMessage>,
// known_tracks: RwLock<HashMap<String, Track>>,
// known_nodes: RwLock<HashMap<String, LibraryNode>>,
tidal_client: Arc<tidaldy::Client>,
}
impl ProviderOrchestrator {
#[instrument]
pub fn run(self) {
tokio::spawn(async move {
while let Ok(msg) = self.provider_rx.recv_async().in_current_span().await {
match msg {
ProviderMessage::GetLibraryNode {
uuid,
result_tx,
span,
} => {
let _e = span.enter();
let result = self.get_lib_node(&uuid).in_current_span().await;
result_tx.send(result).unwrap();
}
ProviderMessage::GetTrack {
uuid,
result_tx,
span,
} => {
let _e = span.enter();
let result = self.get_metadata_for_track(&uuid).in_current_span().await;
result_tx.send(result).unwrap();
}
ProviderMessage::GetTrackUrls {
uuid,
result_tx,
span,
} => {
let _e = span.enter();
let result = self.get_urls_for_track(&uuid).in_current_span().await;
result_tx.send(result).unwrap();
}
ProviderMessage::FlattenNode {
uuid,
result_tx,
span,
} => {
let _e = span.enter();
let result = self.flatten_node(&uuid).in_current_span().await;
result_tx.send(result).unwrap();
}
}
}
});
}
#[instrument(skip(self))]
async fn flatten_node(&self, node_uuid: &str) -> Vec<Track> {
let mut tracks = Vec::with_capacity(1000);
let mut nodes_to_go = Vec::with_capacity(100);
nodes_to_go.push(node_uuid.to_string());
while let Some(node_uuid) = nodes_to_go.pop() {
let Ok(node) = self.get_lib_node(&node_uuid).in_current_span().await else {
continue
};
if node.is_queable {
tracks.extend(node.tracks);
nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid))
}
}
tracks
}
}
#[async_trait]
impl ProviderClient for ProviderOrchestrator {
#[instrument(skip(_s))]
async fn init(_s: &str) -> Result<Self, ProviderError> {
let config_dir = dirs::config_dir()
.map(|d| d.join("crabidy"))
.unwrap_or(PathBuf::from("/tmp"));
let dir_exists = tokio::fs::try_exists(&config_dir)
.in_current_span()
.await
.map_err(|e| ProviderError::Config(e.to_string()))?;
if !dir_exists {
tokio::fs::create_dir(&config_dir)
.in_current_span()
.await
.map_err(|e| ProviderError::Config(e.to_string()))?;
}
let config_file = config_dir.join("tidaly.toml");
let raw_toml_settings = fs::read_to_string(&config_file).unwrap_or("".to_owned());
let tidal_client = Arc::new(
tidaldy::Client::init(&raw_toml_settings)
.in_current_span()
.await
.unwrap(),
);
let new_toml_config = tidal_client.settings();
if let Err(err) = tokio::fs::write(&config_file, new_toml_config)
.in_current_span()
.await
{
error!("Failed to write config file: {}", err);
};
let (provider_tx, provider_rx) = flume::bounded(100);
Ok(Self {
provider_rx,
provider_tx,
tidal_client,
})
}
#[instrument(skip(self))]
fn settings(&self) -> String {
"".to_owned()
}
#[instrument(skip(self))]
async fn get_urls_for_track(&self, track_uuid: &str) -> Result<Vec<String>, ProviderError> {
self.tidal_client
.get_urls_for_track(track_uuid)
.in_current_span()
.await
}
#[instrument(skip(self))]
async fn get_metadata_for_track(&self, track_uuid: &str) -> Result<Track, ProviderError> {
self.tidal_client
.get_metadata_for_track(track_uuid)
.in_current_span()
.await
}
#[instrument(skip(self))]
fn get_lib_root(&self) -> LibraryNode {
let mut root_node = LibraryNode::new();
let child = LibraryNodeChild::new("node:tidal".to_owned(), "tidal".to_owned());
root_node.children.push(child);
root_node
}
#[instrument(skip(self))]
async fn get_lib_node(&self, uuid: &str) -> Result<LibraryNode, ProviderError> {
debug!("get_lib_node");
if uuid == "node:/" {
return Ok(self.get_lib_root());
}
if uuid == "node:tidal" {
return Ok(self.tidal_client.get_lib_root());
}
self.tidal_client.get_lib_node(uuid).in_current_span().await
}
}

395
crabidy-server/src/rpc.rs Normal file
View File

@ -0,0 +1,395 @@
use crate::{PlaybackMessage, ProviderMessage};
use crabidy_core::proto::crabidy::{
crabidy_service_server::CrabidyService, get_update_stream_response::Update as StreamUpdate,
AppendRequest, AppendResponse, ChangeVolumeRequest, ChangeVolumeResponse,
GetLibraryNodeRequest, GetLibraryNodeResponse, GetUpdateStreamRequest, GetUpdateStreamResponse,
InitRequest, InitResponse, InsertRequest, InsertResponse, NextRequest, NextResponse,
PrevRequest, PrevResponse, QueueRequest, QueueResponse, RemoveRequest, RemoveResponse,
ReplaceRequest, ReplaceResponse, RestartTrackRequest, RestartTrackResponse, SaveQueueRequest,
SaveQueueResponse, SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse,
ToggleMuteRequest, ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse,
ToggleShuffleRequest, ToggleShuffleResponse,
};
use futures::TryStreamExt;
use std::pin::Pin;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status};
use tracing::{debug, debug_span, error, instrument, trace, Instrument, Span};
#[derive(Debug)]
pub struct RpcService {
update_tx: tokio::sync::broadcast::Sender<StreamUpdate>,
playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>,
}
impl RpcService {
pub fn new(
update_rx: tokio::sync::broadcast::Sender<StreamUpdate>,
playback_tx: flume::Sender<PlaybackMessage>,
provider_tx: flume::Sender<ProviderMessage>,
) -> Self {
Self {
update_tx: update_rx,
playback_tx,
provider_tx,
}
}
}
#[tonic::async_trait]
impl CrabidyService for RpcService {
type GetUpdateStreamStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<GetUpdateStreamResponse, Status>> + Send>>;
#[instrument(skip(self, _request))]
async fn init(&self, _request: Request<InitRequest>) -> Result<Response<InitResponse>, Status> {
debug!("Received init request");
let playback_tx = self.playback_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = debug_span!("play-chan");
if let Err(err) = playback_tx
.send_async(PlaybackMessage::Init { result_tx, span })
.in_current_span()
.await
{
error!("{:?}", err);
return Err(Status::internal("Sending Init via internal channel failed"));
}
let response = result_rx
.recv_async()
.in_current_span()
.await
.map_err(|e| {
error!("{:?}", e);
return Status::internal("Failed to receive response from provider channel");
})?;
Ok(Response::new(response))
}
#[instrument(skip(self, request), fields(uuid))]
async fn get_library_node(
&self,
request: Request<GetLibraryNodeRequest>,
) -> Result<Response<GetLibraryNodeResponse>, Status> {
let uuid = request.into_inner().uuid;
Span::current().record("uuid", &uuid);
debug!("Received get_library_node request");
let provider_tx = self.provider_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let span = debug_span!("prov-chan");
provider_tx
.send_async(ProviderMessage::GetLibraryNode {
uuid,
result_tx,
span,
})
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let result = result_rx
.recv_async()
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to receive response from provider channel"))?;
match result {
Ok(node) => Ok(Response::new(GetLibraryNodeResponse { node: Some(node) })),
Err(err) => Err(Status::internal(err.to_string())),
}
}
#[instrument(skip(self, request), fields(uuids))]
async fn queue(
&self,
request: tonic::Request<QueueRequest>,
) -> std::result::Result<tonic::Response<QueueResponse>, tonic::Status> {
let uuids = request.into_inner().uuid.clone();
Span::current().record("uuids", format!("{:?}", uuids));
debug!("Received queue request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Queue { uuids, span })
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = QueueResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(uuids))]
async fn replace(
&self,
request: tonic::Request<ReplaceRequest>,
) -> std::result::Result<tonic::Response<ReplaceResponse>, tonic::Status> {
let uuids = request.into_inner().uuid.clone();
Span::current().record("uuids", format!("{:?}", uuids));
debug!("Received replace request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Replace { uuids, span })
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = ReplaceResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(uuids))]
async fn append(
&self,
request: tonic::Request<AppendRequest>,
) -> std::result::Result<tonic::Response<AppendResponse>, tonic::Status> {
let uuids = request.into_inner().uuid.clone();
Span::current().record("uuids", format!("{:?}", uuids));
debug!("Received append request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Append { uuids, span })
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = AppendResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(positions))]
async fn remove(
&self,
request: tonic::Request<RemoveRequest>,
) -> std::result::Result<tonic::Response<RemoveResponse>, tonic::Status> {
let positions = request.into_inner().positions;
Span::current().record("positions", format!("{:?}", positions));
debug!("Received remove request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Remove { positions, span })
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = RemoveResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(uuids, position))]
async fn insert(
&self,
request: tonic::Request<InsertRequest>,
) -> std::result::Result<tonic::Response<InsertResponse>, tonic::Status> {
let req = request.into_inner();
let uuids = req.uuid.clone();
let position = req.position;
Span::current().record("uuids", format!("{:?}", uuids));
Span::current().record("position", position);
debug!("Received insert request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Insert {
position: req.position,
uuids,
span,
})
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = InsertResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(position))]
async fn set_current(
&self,
request: tonic::Request<SetCurrentRequest>,
) -> std::result::Result<tonic::Response<SetCurrentResponse>, tonic::Status> {
let position = request.into_inner().position;
Span::current().record("position", position);
debug!("Received set_current request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::SetCurrent { position, span })
.in_current_span()
.await
.map_err(|_| Status::internal("Failed to send request via channel"))?;
let reply = SetCurrentResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn get_update_stream(
&self,
_request: tonic::Request<GetUpdateStreamRequest>,
) -> std::result::Result<tonic::Response<Self::GetUpdateStreamStream>, tonic::Status> {
debug!("Received get_update_stream request");
let update_rx = self.update_tx.subscribe();
let update_stream = tokio_stream::wrappers::BroadcastStream::new(update_rx);
let output_stream = update_stream.into_stream().map(|update_result| {
trace!("Got update: {:?}", update_result);
match update_result {
Ok(update) => Ok(GetUpdateStreamResponse {
update: Some(update),
}),
Err(_) => Err(tonic::Status::new(
tonic::Code::Unknown,
"Internal channel error",
)),
}
});
Ok(Response::new(Box::pin(output_stream)))
}
#[instrument(skip(self, _request))]
async fn save_queue(
&self,
_request: tonic::Request<SaveQueueRequest>,
) -> std::result::Result<tonic::Response<SaveQueueResponse>, tonic::Status> {
debug!("Received save_queue request");
let reply = SaveQueueResponse {};
Ok(Response::new(reply))
}
/// Playback
#[instrument(skip(self, _request))]
async fn toggle_play(
&self,
_request: tonic::Request<TogglePlayRequest>,
) -> std::result::Result<tonic::Response<TogglePlayResponse>, tonic::Status> {
debug!("Received toggle_play request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::TogglePlay { span })
.in_current_span()
.await
.unwrap();
let reply = TogglePlayResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn toggle_shuffle(
&self,
_request: tonic::Request<ToggleShuffleRequest>,
) -> std::result::Result<tonic::Response<ToggleShuffleResponse>, tonic::Status> {
debug!("Received toggle_shuffle request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::ToggleShuffle { span })
.in_current_span()
.await
.unwrap();
let reply = ToggleShuffleResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn stop(
&self,
_request: tonic::Request<StopRequest>,
) -> std::result::Result<tonic::Response<StopResponse>, tonic::Status> {
debug!("Received stop request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Stop { span })
.in_current_span()
.await
.unwrap();
let reply = StopResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, request), fields(delta))]
async fn change_volume(
&self,
request: tonic::Request<ChangeVolumeRequest>,
) -> std::result::Result<tonic::Response<ChangeVolumeResponse>, tonic::Status> {
let delta = request.into_inner().delta;
Span::current().record("delta", delta);
debug!("Received change_volume request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::ChangeVolume { delta, span })
.in_current_span()
.await
.unwrap();
let reply = ChangeVolumeResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn toggle_mute(
&self,
_request: tonic::Request<ToggleMuteRequest>,
) -> std::result::Result<tonic::Response<ToggleMuteResponse>, tonic::Status> {
debug!("Received toggle_mute request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::ToggleMute { span })
.in_current_span()
.await
.unwrap();
let reply = ToggleMuteResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn next(
&self,
_request: tonic::Request<NextRequest>,
) -> std::result::Result<tonic::Response<NextResponse>, tonic::Status> {
debug!("Received next request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Next { span })
.in_current_span()
.await
.unwrap();
let reply = NextResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn prev(
&self,
_request: tonic::Request<PrevRequest>,
) -> std::result::Result<tonic::Response<PrevResponse>, tonic::Status> {
debug!("Received prev request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::Prev { span })
.in_current_span()
.await
.unwrap();
let reply = PrevResponse {};
Ok(Response::new(reply))
}
#[instrument(skip(self, _request))]
async fn restart_track(
&self,
_request: tonic::Request<RestartTrackRequest>,
) -> std::result::Result<tonic::Response<RestartTrackResponse>, tonic::Status> {
debug!("Received restart_track request");
let playback_tx = self.playback_tx.clone();
let span = debug_span!("play-chan");
playback_tx
.send_async(PlaybackMessage::RestartTrack { span })
.in_current_span()
.await
.unwrap();
let reply = RestartTrackResponse {};
Ok(Response::new(reply))
}
}