2020-05-24 20:53:30 -04:00
|
|
|
use super::*;
|
2020-05-25 22:17:44 -04:00
|
|
|
use anyhow::{Context, Error, Result};
|
2020-06-08 00:02:15 -04:00
|
|
|
use future::{abortable, AbortHandle};
|
2020-05-25 22:17:44 -04:00
|
|
|
use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*};
|
2020-05-24 23:14:25 -04:00
|
|
|
use serde_json::json;
|
2020-05-25 22:17:44 -04:00
|
|
|
use std::sync::Arc;
|
2020-05-24 20:53:30 -04:00
|
|
|
use stream::iter;
|
2020-06-08 00:02:15 -04:00
|
|
|
use tokio::time::{interval, Instant};
|
2020-05-24 20:53:30 -04:00
|
|
|
|
2020-05-30 16:10:55 -04:00
|
|
|
/// There is a point at which a client falls far enough behind
|
|
|
|
/// that it's probably not worth trying to catch them up; for now,
|
|
|
|
/// implement this as a buffer size limit and disconnect a client if
|
|
|
|
/// the cap is reached. More elegant solutions may be reached in the future.
|
|
|
|
const CHANNEL_BUFFER: usize = 200;
|
|
|
|
|
2020-06-08 19:56:48 -04:00
|
|
|
pub async fn greet(sink: &mut Sender<ServerMessage>, player_id: Option<PlayerId>) -> Result<()> {
|
2020-05-24 20:53:30 -04:00
|
|
|
let mut greeting = iter(vec![
|
|
|
|
ServerMessage::Meta {
|
2020-06-08 19:50:56 -04:00
|
|
|
meta: Meta {
|
2020-05-24 20:53:30 -04:00
|
|
|
version: "Unstable",
|
|
|
|
helo: Some("Dedicated base2020 server".into()),
|
|
|
|
},
|
|
|
|
},
|
2020-05-30 16:10:55 -04:00
|
|
|
ServerMessage::SetState {
|
2020-06-08 19:56:48 -04:00
|
|
|
player_id,
|
2020-06-08 19:50:56 -04:00
|
|
|
state: json!({}),
|
2020-05-30 16:10:55 -04:00
|
|
|
},
|
2020-05-24 20:53:30 -04:00
|
|
|
])
|
|
|
|
.map(Ok);
|
|
|
|
|
2020-05-25 22:17:44 -04:00
|
|
|
sink.send_all(&mut greeting)
|
|
|
|
.await
|
|
|
|
.context("Greeting client")
|
|
|
|
}
|
|
|
|
|
2020-05-30 16:10:55 -04:00
|
|
|
pub struct PlayerState {
|
|
|
|
sender: Sender<ServerMessage>,
|
2020-06-08 20:06:02 -04:00
|
|
|
input: PlayerInput,
|
2020-05-30 16:10:55 -04:00
|
|
|
}
|
|
|
|
|
2020-05-25 23:38:18 -04:00
|
|
|
pub struct Server {
|
2020-05-30 16:10:55 -04:00
|
|
|
players: Vec<Option<PlayerState>>,
|
2020-06-05 16:46:51 -04:00
|
|
|
heartbeat_task: Option<AbortHandle>,
|
2020-05-25 23:38:18 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct Handle {
|
|
|
|
server: Arc<Mutex<Server>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Server {
|
|
|
|
pub fn create() -> Handle {
|
|
|
|
Handle {
|
|
|
|
server: Arc::new(Mutex::new(Server {
|
2020-05-30 16:10:55 -04:00
|
|
|
players: Vec::new(),
|
2020-06-05 16:46:51 -04:00
|
|
|
heartbeat_task: None,
|
2020-05-25 23:38:18 -04:00
|
|
|
})),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-08 00:02:15 -04:00
|
|
|
/// connect a client to this server
|
|
|
|
/// TODO: currently needs to be passed a handle to this server instance, which feels flakey.
|
|
|
|
pub async fn add_player(
|
|
|
|
&mut self,
|
|
|
|
mut sender: Sender<ServerMessage>,
|
|
|
|
handle: &Handle,
|
|
|
|
) -> Result<PlayerId> {
|
2020-05-30 16:10:55 -04:00
|
|
|
// TODO: limit total number of players
|
|
|
|
|
|
|
|
// allot player ID
|
|
|
|
let player_id = self
|
|
|
|
.players
|
|
|
|
.iter()
|
|
|
|
.position(|slot| slot.is_none())
|
|
|
|
.unwrap_or_else(|| {
|
|
|
|
self.players.push(None);
|
|
|
|
self.players.len() - 1
|
|
|
|
});
|
|
|
|
|
|
|
|
// connect player
|
2020-06-08 19:56:48 -04:00
|
|
|
greet(&mut sender, Some(player_id)).await?;
|
2020-06-08 20:06:02 -04:00
|
|
|
self.players[player_id] = Some(PlayerState {
|
|
|
|
sender,
|
|
|
|
input: json!([]),
|
|
|
|
});
|
2020-05-30 16:10:55 -04:00
|
|
|
info!("Client#{} connected", player_id);
|
2020-06-05 16:46:51 -04:00
|
|
|
|
|
|
|
// ensure server task is running
|
2020-06-08 00:02:15 -04:00
|
|
|
self.spinup(handle);
|
2020-06-05 16:46:51 -04:00
|
|
|
|
2020-05-30 16:10:55 -04:00
|
|
|
Ok(player_id)
|
|
|
|
}
|
|
|
|
|
2020-06-08 00:02:15 -04:00
|
|
|
pub fn process_message(&mut self, player: usize, msg: ClientMessage) {
|
|
|
|
trace!("Client#{} message: {:?}", player, &msg);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn tick(&mut self, tick: Instant) {
|
2020-06-08 20:09:31 -04:00
|
|
|
trace!("Tick {:?}", tick);
|
|
|
|
|
|
|
|
let total_input = self
|
|
|
|
.players
|
|
|
|
.iter()
|
|
|
|
.map(|player| match player {
|
|
|
|
Some(PlayerState { input, .. }) => input.clone(),
|
|
|
|
None => json!([]),
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
self.broadcast(ServerMessage::Input { total_input });
|
|
|
|
}
|
|
|
|
|
|
|
|
fn broadcast(&mut self, msg: ServerMessage) {
|
|
|
|
// iterate by index instead of iterator, because we need to call
|
|
|
|
// remove_player(&mut self) in the error case
|
|
|
|
for slot in 0..self.players.len() {
|
|
|
|
if let Some(ref mut player) = self.players[slot] {
|
|
|
|
// don't poll ready; we give the channel enough buffer that an overflow indicates
|
|
|
|
// the client has fallen hopelessly behind.
|
|
|
|
if player.sender.start_send(msg.clone()).is_err() {
|
|
|
|
info!("Client#{} fell behind", slot);
|
|
|
|
self.remove_player(slot);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-06-08 00:02:15 -04:00
|
|
|
}
|
|
|
|
|
2020-05-30 16:10:55 -04:00
|
|
|
pub fn remove_player(&mut self, player_id: PlayerId) {
|
|
|
|
if player_id < self.players.len() && self.players[player_id].is_some() {
|
|
|
|
self.players[player_id] = None;
|
|
|
|
info!("Client#{} disconnected", player_id);
|
|
|
|
} else {
|
2020-06-08 00:02:15 -04:00
|
|
|
error!(
|
|
|
|
"Tried to disconnect Client#{} but there was no record for them",
|
|
|
|
player_id
|
|
|
|
);
|
2020-05-30 16:10:55 -04:00
|
|
|
}
|
2020-06-05 16:46:51 -04:00
|
|
|
|
|
|
|
// check if no players left
|
|
|
|
if self.players.iter().all(Option::is_none) {
|
|
|
|
self.shutdown();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Start the heartbeat task, if it's not running
|
2020-06-08 00:02:15 -04:00
|
|
|
/// TODO: currently needs to be passed a handle to this server instance, which feels flakey.
|
|
|
|
fn spinup(&mut self, server_handle: &Handle) {
|
2020-06-05 16:46:51 -04:00
|
|
|
if let None = self.heartbeat_task {
|
|
|
|
info!("Starting heartbeat task");
|
2020-06-08 00:02:15 -04:00
|
|
|
// Take a reference to the server so the server tick task can access its state + send messages;
|
|
|
|
// A strong reference is fine, because we manually abort (& drop) this task when
|
|
|
|
// no more players are connected.
|
|
|
|
let server_handle = server_handle.clone();
|
|
|
|
let (task, abort_handle) = abortable(async move {
|
2020-06-05 16:46:51 -04:00
|
|
|
info!("Heartbeat task started");
|
2020-06-08 00:02:15 -04:00
|
|
|
let mut ticks = interval(TICK_LENGTH);
|
|
|
|
while let Some(tick) = ticks.next().await {
|
|
|
|
server_handle.server.lock().await.tick(tick);
|
|
|
|
}
|
2020-06-05 16:46:51 -04:00
|
|
|
});
|
2020-06-08 00:02:15 -04:00
|
|
|
self.heartbeat_task = Some(abort_handle);
|
2020-06-05 16:46:51 -04:00
|
|
|
tokio::spawn(task);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Stop any active heartbeat task
|
|
|
|
fn shutdown(&mut self) {
|
|
|
|
if let Some(handle) = self.heartbeat_task.take() {
|
|
|
|
info!("Stopping heartbeat task");
|
|
|
|
handle.abort();
|
|
|
|
}
|
2020-05-30 16:10:55 -04:00
|
|
|
}
|
2020-05-25 23:38:18 -04:00
|
|
|
}
|
|
|
|
|
2020-05-25 23:24:55 -04:00
|
|
|
pub async fn run_client(
|
2020-05-25 23:38:18 -04:00
|
|
|
handle: Handle,
|
2020-05-25 23:24:55 -04:00
|
|
|
source: &mut (impl Stream<Item = Result<ClientMessage, Error>> + Send + Unpin),
|
2020-05-30 16:10:55 -04:00
|
|
|
sink: &mut (impl Sink<ServerMessage, Error = Error> + Send + Unpin),
|
2020-05-25 23:24:55 -04:00
|
|
|
) -> Result<()> {
|
2020-05-30 16:10:55 -04:00
|
|
|
let (sender, receiver) = channel(CHANNEL_BUFFER);
|
|
|
|
|
|
|
|
// register player
|
2020-06-08 20:06:02 -04:00
|
|
|
let player_id = handle
|
|
|
|
.server
|
|
|
|
.lock()
|
|
|
|
.await
|
|
|
|
.add_player(sender, &handle)
|
|
|
|
.await?;
|
2020-05-30 16:10:55 -04:00
|
|
|
|
2020-06-03 00:50:12 -04:00
|
|
|
let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink));
|
2020-05-25 22:17:44 -04:00
|
|
|
|
|
|
|
let input_task = async {
|
|
|
|
loop {
|
2020-05-30 19:16:50 -04:00
|
|
|
match source.try_next().await? {
|
|
|
|
Some(msg) => handle.server.lock().await.process_message(player_id, msg),
|
2020-06-03 00:50:12 -04:00
|
|
|
None => break,
|
2020-05-25 22:17:44 -04:00
|
|
|
}
|
2020-06-08 00:02:15 -04:00
|
|
|
}
|
2020-06-03 00:50:12 -04:00
|
|
|
// stop the sink so it won't error
|
|
|
|
output_handle.abort();
|
|
|
|
Ok(())
|
2020-05-25 22:17:44 -04:00
|
|
|
};
|
2020-06-03 00:50:12 -04:00
|
|
|
|
|
|
|
// intentional aborting is not a reportable error
|
2020-06-08 00:02:15 -04:00
|
|
|
let output_task = output_task.map(|result| result.unwrap_or(Ok(())));
|
2020-06-03 00:50:12 -04:00
|
|
|
|
2020-06-08 00:02:15 -04:00
|
|
|
let result = try_join(output_task, input_task).await.map(|((), ())| ());
|
2020-05-30 16:10:55 -04:00
|
|
|
|
2020-05-30 19:16:50 -04:00
|
|
|
// deregister player, whether normally or due to error
|
2020-05-30 16:10:55 -04:00
|
|
|
handle.server.lock().await.remove_player(player_id);
|
|
|
|
|
|
|
|
result
|
2020-05-24 20:53:30 -04:00
|
|
|
}
|