use super::*; use anyhow::{Context, Error, Result}; use future::{abortable, AbortHandle}; use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use serde_json::json; use std::sync::Arc; use stream::iter; use tokio::time::{interval, Instant}; /// There is a point at which a client falls far enough behind /// that it's probably not worth trying to catch them up; for now, /// implement this as a buffer size limit and disconnect a client if /// the cap is reached. More elegant solutions may be reached in the future. const CHANNEL_BUFFER: usize = 200; pub async fn greet(sink: &mut Sender) -> Result<()> { let mut greeting = iter(vec![ ServerMessage::Meta { meta: Meta { version: "Unstable", helo: Some("Dedicated base2020 server".into()), }, }, ServerMessage::SetState { player_id: Some(0), state: json!({}), }, ]) .map(Ok); sink.send_all(&mut greeting) .await .context("Greeting client") } pub struct PlayerState { sender: Sender, } pub struct Server { players: Vec>, heartbeat_task: Option, } #[derive(Clone)] pub struct Handle { server: Arc>, } impl Server { pub fn create() -> Handle { Handle { server: Arc::new(Mutex::new(Server { players: Vec::new(), heartbeat_task: None, })), } } /// connect a client to this server /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. pub async fn add_player( &mut self, mut sender: Sender, handle: &Handle, ) -> Result { // TODO: limit total number of players // allot player ID let player_id = self .players .iter() .position(|slot| slot.is_none()) .unwrap_or_else(|| { self.players.push(None); self.players.len() - 1 }); // connect player greet(&mut sender).await?; self.players[player_id] = Some(PlayerState { sender }); info!("Client#{} connected", player_id); // ensure server task is running self.spinup(handle); Ok(player_id) } pub fn process_message(&mut self, player: usize, msg: ClientMessage) { trace!("Client#{} message: {:?}", player, &msg); } fn tick(&mut self, tick: Instant) { trace!("Tick {:?}", tick) } pub fn remove_player(&mut self, player_id: PlayerId) { if player_id < self.players.len() && self.players[player_id].is_some() { self.players[player_id] = None; info!("Client#{} disconnected", player_id); } else { error!( "Tried to disconnect Client#{} but there was no record for them", player_id ); } // check if no players left if self.players.iter().all(Option::is_none) { self.shutdown(); } } /// Start the heartbeat task, if it's not running /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. fn spinup(&mut self, server_handle: &Handle) { if let None = self.heartbeat_task { info!("Starting heartbeat task"); // Take a reference to the server so the server tick task can access its state + send messages; // A strong reference is fine, because we manually abort (& drop) this task when // no more players are connected. let server_handle = server_handle.clone(); let (task, abort_handle) = abortable(async move { info!("Heartbeat task started"); let mut ticks = interval(TICK_LENGTH); while let Some(tick) = ticks.next().await { server_handle.server.lock().await.tick(tick); } }); self.heartbeat_task = Some(abort_handle); tokio::spawn(task); } } /// Stop any active heartbeat task fn shutdown(&mut self) { if let Some(handle) = self.heartbeat_task.take() { info!("Stopping heartbeat task"); handle.abort(); } } } pub async fn run_client( handle: Handle, source: &mut (impl Stream> + Send + Unpin), sink: &mut (impl Sink + Send + Unpin), ) -> Result<()> { let (sender, receiver) = channel(CHANNEL_BUFFER); // register player let player_id = handle.server.lock().await.add_player(sender, &handle).await?; let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink)); let input_task = async { loop { match source.try_next().await? { Some(msg) => handle.server.lock().await.process_message(player_id, msg), None => break, } } // stop the sink so it won't error output_handle.abort(); Ok(()) }; // intentional aborting is not a reportable error let output_task = output_task.map(|result| result.unwrap_or(Ok(()))); let result = try_join(output_task, input_task).await.map(|((), ())| ()); // deregister player, whether normally or due to error handle.server.lock().await.remove_player(player_id); result }