Implement init on the server

This commit is contained in:
Hans Mündelein 2023-06-02 19:20:30 +02:00
parent d1bd0adf4f
commit 4fbeccdde1
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
3 changed files with 74 additions and 15 deletions

View File

@ -35,7 +35,7 @@ message InitResponse {
Queue queue = 1;
QueueTrack queue_track = 2;
PlayState play_state = 3;
uint32 volume = 4;
float volume = 4;
bool mute = 5;
TrackPosition position = 6;
}
@ -92,7 +92,7 @@ message GetUpdateStreamResponse {
Queue queue = 1;
QueueTrack queue_track = 2;
PlayState play_state = 3;
uint32 volume = 4;
float volume = 4;
bool mute = 5;
TrackPosition position = 6;
}

View File

@ -55,7 +55,7 @@ pub enum QueueError {
}
impl Queue {
pub fn current(&mut self) -> Option<Track> {
pub fn current(&self) -> Option<Track> {
if self.current_position < self.tracks.len() as u32 {
Some(self.tracks[self.current_position as usize].clone())
} else {

View File

@ -10,7 +10,7 @@ use crabidy_core::proto::crabidy::{
RestartTrackRequest, RestartTrackResponse, SaveQueueRequest, SaveQueueResponse,
SetCurrentRequest, SetCurrentResponse, StopRequest, StopResponse, ToggleMuteRequest,
ToggleMuteResponse, TogglePlayRequest, TogglePlayResponse, ToggleShuffleRequest,
ToggleShuffleResponse, Track,
ToggleShuffleResponse, Track, TrackPosition,
};
use crabidy_core::{ProviderClient, ProviderError};
use futures::TryStreamExt;
@ -149,24 +149,14 @@ impl ProviderOrchestrator {
#[async_trait]
impl ProviderClient for ProviderOrchestrator {
async fn init(_s: &str) -> Result<Self, ProviderError> {
let state = Mutex::new(PlayState::Stopped);
let queue = Mutex::new(Queue {
timestamp: 0,
current_position: 0,
tracks: Vec::new(),
});
let raw_toml_settings = fs::read_to_string("/tmp/tidaldy.toml").unwrap_or("".to_owned());
let tidal_client = Arc::new(tidaldy::Client::init(&raw_toml_settings).await.unwrap());
let new_toml_config = tidal_client.settings();
fs::write("/tmp/tidaldy.toml", new_toml_config).unwrap();
// let known_tracks = HashMap::new();
// let known_nodes = HashMap::new();
let (provider_tx, provider_rx) = flume::bounded(100);
Ok(Self {
provider_rx,
provider_tx,
// known_tracks,
// known_nodes,
tidal_client,
})
}
@ -200,6 +190,9 @@ impl ProviderClient for ProviderOrchestrator {
#[derive(Debug)]
enum PlaybackMessage {
Init {
result_tx: flume::Sender<InitResponse>,
},
Replace {
uuids: Vec<String>,
},
@ -237,6 +230,13 @@ enum PlaybackMessage {
StateChanged {
state: GstPlaystate,
},
MuteChanged {
muted: bool,
},
PostitionChanged {
position: u32,
},
}
#[derive(Debug)]
@ -280,6 +280,43 @@ impl Playback {
tokio::spawn(async move {
while let Ok(message) = self.playback_rx.recv_async().await {
match message {
PlaybackMessage::Init { result_tx } => {
let response = {
let queue = self.queue.lock().unwrap();
let queue_track = QueueTrack {
queue_position: queue.current_position,
track: queue.current(),
};
let position = TrackPosition {
duration: self
.play
.duration()
.and_then(|t| Some(t.mseconds() as u32))
.unwrap_or(0),
position: self
.play
.position()
.and_then(|t| Some(t.mseconds() as u32))
.unwrap_or(0),
};
let play_state = match *self.state.lock().unwrap() {
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
GstPlaystate::Stopped => PlayState::Stopped,
GstPlaystate::Buffering => PlayState::Loading,
_ => PlayState::Unspecified,
};
InitResponse {
queue: Some(queue.clone()),
queue_track: Some(queue_track),
play_state: play_state as i32,
volume: self.play.volume() as f32,
mute: self.play.is_muted(),
position: Some(position),
}
};
result_tx.send(response).unwrap();
}
PlaybackMessage::Replace { uuids } => {
println!("Replace {:?}", uuids);
let mut all_tracks = Vec::new();
@ -510,6 +547,12 @@ impl Playback {
println!("{:?}", err)
};
}
PlaybackMessage::MuteChanged { muted } => {
todo!()
}
PlaybackMessage::PostitionChanged { position } => {
todo!()
}
}
}
});
@ -644,7 +687,23 @@ impl CrabidyService for RpcService {
Pin<Box<dyn tokio_stream::Stream<Item = Result<GetUpdateStreamResponse, Status>> + Send>>;
async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitResponse>, Status> {
todo!()
let playback_tx = self.playback_tx.clone();
let (result_tx, result_rx) = flume::bounded(1);
let Ok(response) = playback_tx
.send_async(PlaybackMessage::Init { result_tx })
.await
else {
return Err(Status::internal("Sending init failed internally"));
};
let Ok(response) = result_rx
.recv_async()
.await
else {
return Err(Status::internal("Init failed internally"));
};
Ok(Response::new(response))
}
async fn get_library_node(