Implement curretn proto spec for server

This commit is contained in:
Hans Mündelein 2023-06-02 21:06:35 +02:00
parent c19b025818
commit c3654ade89
Signed by: hans
GPG Key ID: BA7B55E984CE74F4
1 changed files with 96 additions and 56 deletions

View File

@ -34,10 +34,21 @@ fn poll_play_bus(bus: gstreamer::Bus, tx: flume::Sender<PlaybackMessage>) {
Ok(PlayMessage::StateChanged { state }) => {
tx.send(PlaybackMessage::StateChanged { state }).unwrap();
}
Ok(PlayMessage::PositionUpdated { position }) => {}
Ok(PlayMessage::PositionUpdated { position }) => {
let position = position
.and_then(|t| Some(t.mseconds() as u32))
.unwrap_or(0);
tx.send(PlaybackMessage::PostitionChanged { position })
.unwrap();
}
Ok(PlayMessage::Buffering { percent }) => {}
Ok(PlayMessage::VolumeChanged { volume }) => {}
Ok(PlayMessage::MuteChanged { muted }) => {}
Ok(PlayMessage::VolumeChanged { volume }) => {
let volume = volume as f32;
tx.send(PlaybackMessage::VolumeChanged { volume }).unwrap();
}
Ok(PlayMessage::MuteChanged { muted }) => {
tx.send(PlaybackMessage::MuteChanged { muted }).unwrap();
}
Ok(PlayMessage::MediaInfoUpdated { info }) => {}
_ => println!("Unknown message: {:?}", msg),
@ -140,8 +151,10 @@ impl ProviderOrchestrator {
let Ok(node) = self.get_lib_node(&node_uuid).await else {
continue
};
tracks.extend(node.tracks);
nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid))
if node.is_queable {
tracks.extend(node.tracks);
nodes_to_go.extend(node.children.into_iter().map(|c| c.uuid))
}
}
tracks
}
@ -227,12 +240,6 @@ enum PlaybackMessage {
SetCurrent {
position: u32,
},
GetQueue {
result_tx: flume::Sender<Queue>,
},
GetQueueTrack {
result_tx: flume::Sender<QueueTrack>,
},
TogglePlay,
ToggleShuffle,
Stop,
@ -242,9 +249,13 @@ enum PlaybackMessage {
ToggleMute,
Next,
Prev,
RestartTrack,
StateChanged {
state: GstPlaystate,
},
VolumeChanged {
volume: f32,
},
MuteChanged {
muted: bool,
@ -333,7 +344,6 @@ impl Playback {
result_tx.send(response).unwrap();
}
PlaybackMessage::Replace { uuids } => {
println!("Replace {:?}", uuids);
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
@ -356,6 +366,14 @@ impl Playback {
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
}
let track = queue.current();
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: queue.current_position,
track,
});
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
}
queue.current()
};
if let Some(track) = current {
@ -364,61 +382,65 @@ impl Playback {
}
PlaybackMessage::Queue { uuids } => {
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
println!("Track {}", uuid);
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&[track]);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
all_tracks.push(track);
}
} else {
println!("Node {}", uuid);
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
all_tracks.extend(tracks);
}
}
{
let mut queue = self.queue.lock().unwrap();
queue.queue_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
}
}
}
PlaybackMessage::Append { uuids } => {
let mut all_tracks = Vec::new();
for uuid in uuids {
if is_track(&uuid) {
println!("Track {}", uuid);
if let Ok(track) = self.get_track(&uuid).await {
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&[track]);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
};
all_tracks.push(track);
}
} else {
println!("Node {}", uuid);
let tracks = self.flatten_node(&uuid).await;
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
all_tracks.extend(tracks);
}
}
{
let mut queue = self.queue.lock().unwrap();
queue.append_tracks(&all_tracks);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
if let Err(err) = queue_update_tx.send(update) {
println!("{:?}", err)
}
}
}
//TODO handle deletion of current track
PlaybackMessage::Remove { positions } => {
let mut queue = self.queue.lock().unwrap();
queue.remove_tracks(&positions);
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::Queue(queue.clone());
queue_update_tx.send(update).unwrap();
if positions.contains(&queue.current_position) {
let playback_tx = self.playback_tx.clone();
playback_tx.send(PlaybackMessage::Next).unwrap();
}
}
PlaybackMessage::Insert { position, uuids } => {
@ -462,16 +484,6 @@ impl Playback {
}
}
PlaybackMessage::GetQueue { result_tx } => {
let queue = self.queue.lock().unwrap();
result_tx.send(queue.clone()).unwrap();
}
PlaybackMessage::GetQueueTrack { result_tx } => {
let current = self.get_queue_track().await;
result_tx.send(current).unwrap();
}
PlaybackMessage::TogglePlay => {
let mut state = self.state.lock().unwrap();
if *state == GstPlaystate::Playing {
@ -504,7 +516,8 @@ impl Playback {
let mut queue = self.queue.lock().unwrap();
let position = queue.current_position + 1;
let stop = !queue.set_current_position(position);
(queue.current(), stop, position)
let pos = queue.current_position;
(queue.current(), stop, pos)
};
let queue_update_tx = self.update_tx.clone();
let update = StreamUpdate::QueueTrack(QueueTrack {
@ -527,7 +540,8 @@ impl Playback {
let mut queue = self.queue.lock().unwrap();
let position = queue.current_position - 1;
let stop = !queue.set_current_position(position);
(queue.current(), stop, position)
let pos = queue.current_position;
(queue.current(), stop, pos)
};
let update = StreamUpdate::QueueTrack(QueueTrack {
queue_position: pos,
@ -549,7 +563,6 @@ impl Playback {
PlaybackMessage::StateChanged { state } => {
*self.state.lock().unwrap() = state.clone();
let active_track_tx = self.update_tx.clone();
let active_track = self.get_queue_track().await;
let play_state = match state {
GstPlaystate::Playing => PlayState::Playing,
GstPlaystate::Paused => PlayState::Paused,
@ -562,11 +575,35 @@ impl Playback {
println!("{:?}", err)
};
}
PlaybackMessage::RestartTrack => {
self.play.stop();
self.play.play();
}
PlaybackMessage::VolumeChanged { volume } => {
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Volume(volume);
if let Err(err) = update_tx.send(update) {
println!("{:?}", err)
}
}
PlaybackMessage::MuteChanged { muted } => {
todo!()
let update_tx = self.update_tx.clone();
let update = StreamUpdate::Mute(muted);
if let Err(err) = update_tx.send(update) {
println!("{:?}", err)
}
}
PlaybackMessage::PostitionChanged { position } => {
todo!()
let update_tx = self.update_tx.clone();
let duration = self
.play
.duration()
.and_then(|t| Some(t.mseconds() as u32))
.unwrap_or(0);
let update = StreamUpdate::Position(TrackPosition { duration, position });
if let Err(err) = update_tx.send(update) {
println!("{:?}", err)
}
}
}
}
@ -961,7 +998,10 @@ impl CrabidyService for RpcService {
request: tonic::Request<RestartTrackRequest>,
) -> std::result::Result<tonic::Response<RestartTrackResponse>, tonic::Status> {
let playback_tx = self.playback_tx.clone();
playback_tx.send_async(PlaybackMessage::Prev).await.unwrap();
playback_tx
.send_async(PlaybackMessage::RestartTrack)
.await
.unwrap();
let reply = RestartTrackResponse {};
Ok(Response::new(reply))
}