Skip to content

Commit

Permalink
handle disconnection and stabilize networking
Browse files Browse the repository at this point in the history
  • Loading branch information
Uriopass committed Mar 25, 2021
1 parent b9752db commit c60c721
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 53 deletions.
9 changes: 8 additions & 1 deletion native_app/src/game_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl State {

let commands = std::mem::take(&mut *self.uiw.write::<WorldCommands>());

match *self.uiw.write::<NetworkState>() {
let mut net_state = self.uiw.write::<NetworkState>();
match *net_state {
NetworkState::Singleplayer(ref mut step) => {
let goria = &mut self.goria; // mut for tick
let sched = &mut self.game_schedule;
Expand Down Expand Up @@ -171,9 +172,15 @@ impl State {
PollResult::Error => {
log::error!("there was an error polling the client");
}
PollResult::Disconnect => {
log::error!("got disconnected :-( continuing with server world but it's bad");
*net_state = NetworkState::Singleplayer(Timestep::new());
}
},
}

drop(net_state);

let real_delta = ctx.delta;
self.uiw.write::<Timings>().all.add_value(real_delta as f32);

Expand Down
2 changes: 1 addition & 1 deletion native_app/src/gui/windows/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ fn start_client(info: &mut NetworkConnectionInfo) -> Option<Client> {
addr: parsed_addr.ip(),
port: if port != 80 { Some(port) } else { None },
period: Duration::from_secs_f64(Timestep::DT),
frame_buffer_advance: 12,
frame_buffer_advance: 8,
}) {
Ok(x) => x,
Err(e) => {
Expand Down
17 changes: 13 additions & 4 deletions networking/src/authent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Authent {
ack: Frame,
name: String,
) -> Option<AuthentResponse> {
let state = self.get_client_state_mut(e).unwrap();
let state = self.get_client_state_mut(e)?;

if let ClientConnectState::Connecting {
reliable,
Expand All @@ -72,6 +72,7 @@ impl Authent {
});
}

// Unwrap ok: already checked right before
*self.get_client_state_mut(e).unwrap() = ClientConnectState::Connected(Client {
id: UserID(hash),
name,
Expand Down Expand Up @@ -117,12 +118,20 @@ impl Authent {
net.send(e, &*encode(&ServerReliablePacket::Challenge(client_id)));
}

pub fn tcp_disconnected(&mut self, e: Endpoint) -> Option<Client> {
let id = self.addr_to_client.get(&e.addr())?;
pub fn disconnected(&mut self, e: Endpoint) -> Option<Client> {
let id = self.addr_to_client.remove(&e.addr())?;
let client = self.clients.remove(&id)?;

if let ClientConnectState::Connecting {
unreliable: Some(unreliable),
..
} = client
{
self.addr_to_client.remove(&unreliable.addr());
return None;
}
if let ClientConnectState::Connected(c) = client {
self.addr_to_client.remove(&c.unreliable.addr());
self.addr_to_client.remove(&c.reliable.addr());
self.n_connected_clients -= 1;

return Some(c);
Expand Down
4 changes: 3 additions & 1 deletion networking/src/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl CatchUp {
}

pub fn ack(&mut self, c: &Client) {
self.frame_history.get_mut(&c.id).unwrap().ready = true;
if let Some(x) = self.frame_history.get_mut(&c.id) {
x.ready = true;
}
}

pub fn update(&mut self, c: &mut Client, net: &mut Network) {
Expand Down
43 changes: 24 additions & 19 deletions networking/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum PollResult<W, I> {
Input(Vec<Vec<ServerInput<I>>>),
GameWorld(I, W),
Error,
Disconnect,
}

#[allow(clippy::large_enum_variant)]
Expand All @@ -42,6 +43,7 @@ enum ClientState<I> {
next_inputs: Option<Vec<Vec<ServerInput<I>>>>,
},
Playing(ClientPlayoutBuffer),
Disconnected,
}

pub struct Client<WORLD: DeserializeOwned, INPUT: Serialize + DeserializeOwned + Default> {
Expand Down Expand Up @@ -93,28 +95,36 @@ impl<W: DeserializeOwned, I: Serialize + DeserializeOwned + Default> Client<W, I
})
}

#[allow(clippy::collapsible_if)]
pub fn poll(&mut self, input: I) -> PollResult<W, I> {
while let Some(x) = self.events.try_receive() {
match x {
NetEvent::Message(e, m) => {
if e.resource_id().adapter_id() == Transport::FramedTcp.id() {
let packet = decode(&*m).expect("invalid reliable packet");
self.message_reliable(packet);
if let Some(packet) = decode(&*m) {
self.message_reliable(packet);
} else {
log::error!("could not decode reliable packet from server");
}
} else {
let packet = decode(&*m).expect("invalid reliable packet");
self.message_unreliable(packet)
if let Some(packet) = decode(&*m) {
self.message_unreliable(packet)
} else {
log::error!("could not decode unreliable packet from server");
}
}
}
NetEvent::Connected(e, _) => {
log::info!("connected {}", e)
}
NetEvent::Disconnected(e) => {
log::info!("disconnected {}", e)
}
NetEvent::Disconnected(_) => self.state = ClientState::Disconnected,
}
}

match self.state {
ClientState::Disconnected => {
return PollResult::Disconnect;
}
ClientState::Connecting => {
return PollResult::Wait(input);
}
Expand Down Expand Up @@ -157,7 +167,7 @@ impl<W: DeserializeOwned, I: Serialize + DeserializeOwned + Default> Client<W, I

let advance = buffer.advance();

let fba = self.lag_compensate;
let fba = self.lag_compensate.max(1);
let to_consume = match advance {
0 => 0,
_ if (1..=fba).contains(&advance) => 1,
Expand All @@ -167,16 +177,16 @@ impl<W: DeserializeOwned, I: Serialize + DeserializeOwned + Default> Client<W, I
};

if to_consume > 0 {
assert!(to_consume <= advance);
self.clock = Instant::now();
let net = &mut self.network;
let udp = self.udp;
let name = &self.name;
let ack_frame = buffer.consumed_frame() + Frame(advance);
let id = self.id;

let multi = (0..to_consume)
.map(move |_| {
log::info!("{}: sending inputs to server", name);
// unwrap ok: to_consume must be less than advance
let (inp, pack) = buffer.try_consume(&mut mk_input).unwrap();
net.send(
udp,
Expand Down Expand Up @@ -268,14 +278,8 @@ impl<W: DeserializeOwned, I: Serialize + DeserializeOwned + Default> Client<W, I
fn message_unreliable(&mut self, p: ServerUnreliablePacket) {
match p {
ServerUnreliablePacket::Input(inp) => {
log::info!(
"{}: received inputs from server. {}->{}",
self.name,
inp.first().unwrap().0 .0,
inp.last().unwrap().0 .0
);
for (frame, inp) in inp {
if let ClientState::Playing(ref mut buffer) = self.state {
if let ClientState::Playing(ref mut buffer) = self.state {
for (frame, inp) in inp {
let _ = buffer.insert_serv_input(frame, inp);
}
}
Expand All @@ -289,8 +293,9 @@ impl<W: DeserializeOwned, I: Serialize + DeserializeOwned + Default> Client<W, I
ClientState::Downloading(_) => "Downloading map...".to_string(),
ClientState::CatchingUp { .. } => "Catching up...".to_string(),
ClientState::Playing(ref buf) => {
format!("Playing! Buffer health: {}", buf.advance())
format!("Playing! Buffer advance: {}", buf.advance())
}
ClientState::Disconnected => "Disconnected :-(".to_string(),
}
}
}
Expand Down
63 changes: 41 additions & 22 deletions networking/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,22 @@ impl<WORLD: Serialize> Server<WORLD> {
self.send_long_running();
while let Some(ev) = self.events.try_receive() {
match ev {
NetEvent::Message(e, data) => {
if is_reliable(&e) {
let packet = match decode::<ClientReliablePacket>(&data) {
Some(x) => x,
None => break,
};

let _ = self.message_reliable(e, packet, world);
} else {
let packet = match decode::<ClientUnreliablePacket>(&data) {
Some(x) => x,
None => break,
};

let _ = self.message_unreliable(e, packet);
NetEvent::Message(e, data) => match is_reliable(&e) {
true => {
if let Some(packet) = decode::<ClientReliablePacket>(&data) {
let _ = self.message_reliable(e, packet, world);
} else {
log::error!("client sent invalid reliable packet");
}
}
}
false => {
if let Some(packet) = decode::<ClientUnreliablePacket>(&data) {
let _ = self.message_unreliable(e, packet);
} else {
log::error!("client sent invalid unreliable packet");
}
}
},
NetEvent::Connected(e, _) => self.tcp_connected(e),
NetEvent::Disconnected(e) => self.tcp_disconnected(e),
}
Expand All @@ -106,6 +105,17 @@ impl<WORLD: Serialize> Server<WORLD> {

self.clock = Instant::now();

let to_disconnect = self
.authent
.iter_playing()
.filter(|v| self.buffer.lag(v.ack).is_none())
.map(|x| x.reliable)
.collect::<Vec<_>>();

for e in to_disconnect {
self.disconnect(e);
}

let clients_playing = self.authent.iter_playing();

let (consumed_inputs, inputs) = self.buffer.consume(clients_playing.clone().map(|c| c.ack));
Expand Down Expand Up @@ -139,7 +149,7 @@ impl<WORLD: Serialize> Server<WORLD> {
ClientUnreliablePacket::Input { input, ack_frame } => {
let client = self.authent.get_client_mut(e)?;

log::info!("{}: received inputs {:?}", client.name, ack_frame);
//log::info!("{}: received inputs {:?}", client.name, ack_frame);
client.ack = ack_frame;

for (frame, input) in input {
Expand Down Expand Up @@ -207,11 +217,7 @@ impl<WORLD: Serialize> Server<WORLD> {
}

fn tcp_disconnected(&mut self, e: Endpoint) {
if let Some(c) = self.authent.tcp_disconnected(e) {
self.buffer.disconnected(c.id);
self.catchup.disconnected(c.id);
self.worldsend.disconnected(c.id);
}
self.disconnect(e);
}

pub fn describe(&self) -> String {
Expand All @@ -225,6 +231,19 @@ impl<WORLD: Serialize> Server<WORLD> {
}
s
}

fn disconnect(&mut self, e: Endpoint) {
if e.resource_id().adapter_id() == Transport::Udp.id() {
log::error!("trying to disconnect udp endpoint");
return;
}
if let Some(c) = self.authent.disconnected(e) {
log::info!("player {} disconnected", c.name);
self.buffer.disconnected(c.id);
self.catchup.disconnected(c.id);
self.worldsend.disconnected(c.id);
}
}
}

fn is_reliable(e: &Endpoint) -> bool {
Expand Down
16 changes: 11 additions & 5 deletions networking/src/server/server_playout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl ServerPlayoutBuffer {
}

pub fn insert_input(&mut self, user: UserID, frame: Frame, input: PlayerInput) {
if frame.0 + 128 <= self.consumed_frame.0 {
if frame.0 + self.past.len() <= self.consumed_frame.0 {
log::info!("input was far too late");
return;
}
Expand All @@ -47,6 +47,15 @@ impl ServerPlayoutBuffer {
}
}

pub fn lag(&self, f: Frame) -> Option<u32> {
let lag = self.consumed_frame.0 - f.0;
if lag < self.past.len() - 1 {
Some(lag)
} else {
None
}
}

// call when a user has disconnected
pub fn disconnected(&mut self, user: UserID) {
self.dedup.remove(&user);
Expand All @@ -68,15 +77,12 @@ impl ServerPlayoutBuffer {
*v.get_mut(next_frame) = false;
}

log::info!("consuming {}", next_frame.0);

let mut result = vec![];
let merged = merge_partial_inputs(&mut self.next);

for ack_frame in acknowledged {
let lag = self.lag(ack_frame).expect("lag is too big");
debug_assert!(ack_frame <= self.consumed_frame);
let lag = self.consumed_frame.0 - ack_frame.0;
debug_assert!(lag < self.past.len());

let v = (1..=lag)
.map(|i| {
Expand Down

0 comments on commit c60c721

Please sign in to comment.