use super::*; use anyhow::{Context, Error, Result}; use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use serde_json::json; use std::sync::Arc; use stream::iter; use future::{AbortHandle, abortable}; /// There is a point at which a client falls far enough behind /// that it's probably not worth trying to catch them up; for now, /// implement this as a buffer size limit and disconnect a client if /// the cap is reached. More elegant solutions may be reached in the future. const CHANNEL_BUFFER: usize = 200; pub async fn greet(sink: &mut Sender) -> Result<()> { let mut greeting = iter(vec![ ServerMessage::Meta { m: Meta { version: "Unstable", helo: Some("Dedicated base2020 server".into()), }, }, ServerMessage::SetState { u: Some(0), s: json!({}), }, ]) .map(Ok); sink.send_all(&mut greeting) .await .context("Greeting client") } pub struct PlayerState { sender: Sender, } pub struct Server { players: Vec>, heartbeat_task: Option, } #[derive(Clone)] pub struct Handle { server: Arc>, } impl Server { pub fn create() -> Handle { Handle { server: Arc::new(Mutex::new(Server { players: Vec::new(), heartbeat_task: None, })), } } pub fn process_message(&mut self, player: usize, msg: ClientMessage) { debug!("Client#{} message: {:?}", player, &msg); } pub async fn add_player(&mut self, mut sender: Sender) -> Result { // TODO: limit total number of players // allot player ID let player_id = self .players .iter() .position(|slot| slot.is_none()) .unwrap_or_else(|| { self.players.push(None); self.players.len() - 1 }); // connect player greet(&mut sender).await?; self.players[player_id] = Some(PlayerState { sender }); info!("Client#{} connected", player_id); // ensure server task is running self.spinup(); Ok(player_id) } pub fn remove_player(&mut self, player_id: PlayerId) { if player_id < self.players.len() && self.players[player_id].is_some() { self.players[player_id] = None; info!("Client#{} disconnected", player_id); } else { error!("Tried to disconnect Client#{} but there was no record for them", player_id); } // check if no players left if self.players.iter().all(Option::is_none) { self.shutdown(); } } /// Start the heartbeat task, if it's not running fn spinup(&mut self) { if let None = self.heartbeat_task { info!("Starting heartbeat task"); let (task, handle) = abortable(async { info!("Heartbeat task started"); }); self.heartbeat_task = Some(handle); tokio::spawn(task); } } /// Stop any active heartbeat task fn shutdown(&mut self) { if let Some(handle) = self.heartbeat_task.take() { info!("Stopping heartbeat task"); handle.abort(); } } } pub async fn run_client( handle: Handle, source: &mut (impl Stream> + Send + Unpin), sink: &mut (impl Sink + Send + Unpin), ) -> Result<()> { let (sender, receiver) = channel(CHANNEL_BUFFER); // register player let player_id = handle.server.lock().await.add_player(sender).await?; let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink)); let input_task = async { loop { match source.try_next().await? { Some(msg) => handle.server.lock().await.process_message(player_id, msg), None => break, } }; // stop the sink so it won't error output_handle.abort(); Ok(()) }; // intentional aborting is not a reportable error let output_task = output_task .map(|result| result.unwrap_or(Ok(()))); let result = try_join(output_task, input_task).await.map(|((),())| ()); // deregister player, whether normally or due to error handle.server.lock().await.remove_player(player_id); result }