use super::*; use anyhow::{Context, Error, Result}; use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use serde_json::json; use std::sync::Arc; use stream::iter; /// There is a point at which a client falls far enough behind /// that it's probably not worth trying to catch them up; for now, /// implement this as a buffer size limit and disconnect a client if /// the cap is reached. More elegant solutions may be reached in the future. const CHANNEL_BUFFER: usize = 200; pub async fn greet(sink: &mut Sender) -> Result<()> { let mut greeting = iter(vec![ ServerMessage::Meta { m: Meta { version: "Unstable", helo: Some("Dedicated base2020 server".into()), }, }, ServerMessage::SetState { u: Some(0), s: json!({}), }, ]) .map(Ok); sink.send_all(&mut greeting) .await .context("Greeting client") } pub struct PlayerState { sender: Sender, } pub struct Server { players: Vec>, } #[derive(Clone)] pub struct Handle { server: Arc>, } impl Server { pub fn create() -> Handle { Handle { server: Arc::new(Mutex::new(Server { players: Vec::new(), })), } } pub fn process_message(&mut self, player: usize, msg: ClientMessage) { debug!("Client#{} message: {:?}", player, &msg); } pub async fn add_player(&mut self, mut sender: Sender) -> Result { // TODO: limit total number of players // allot player ID let player_id = self .players .iter() .position(|slot| slot.is_none()) .unwrap_or_else(|| { self.players.push(None); self.players.len() - 1 }); // connect player greet(&mut sender).await?; self.players[player_id] = Some(PlayerState { sender }); info!("Client#{} connected", player_id); Ok(player_id) } pub fn remove_player(&mut self, player_id: PlayerId) { if player_id < self.players.len() && self.players[player_id].is_some() { self.players[player_id] = None; info!("Client#{} disconnected", player_id); } else { error!("Tried to disconnect Client#{} but there was no record for them", player_id); } } } pub async fn run_client( handle: Handle, source: &mut (impl Stream> + Send + Unpin), sink: &mut (impl Sink + Send + Unpin), ) -> Result<()> { let (sender, receiver) = channel(CHANNEL_BUFFER); // register player let player_id = handle.server.lock().await.add_player(sender).await?; let output_task = receiver.map(Ok).forward(sink); let input_task = async { loop { match source.next().await { Some(Ok(msg)) => handle.server.lock().await.process_message(player_id, msg), Some(Err(error)) => return Err(error), None => break Ok(()), } } }; let result = try_join(output_task, input_task).await.map(|_| ()); // deregister player, whether normally or via error handle.server.lock().await.remove_player(player_id); result }