Skip to content

Commit

Permalink
Merge pull request #102 from clostra/shutdown
Browse files Browse the repository at this point in the history
utp_shutdown
  • Loading branch information
ssiloti authored May 1, 2018
2 parents dc36aed + 9e69461 commit 53faf77
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 63 deletions.
18 changes: 13 additions & 5 deletions ucat.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ utp_socket *s;
int fd;
int buf_len = 0;
unsigned char *buf, *p;
int eof_flag, quit_flag, exit_code;
int eof_flag, utp_eof_flag, utp_shutdown_flag, quit_flag, exit_code;

void die(char *fmt, ...)
{
Expand Down Expand Up @@ -149,8 +149,13 @@ void write_data(void)
out:
if (buf_len == 0 && eof_flag) {
if (s) {
debug("Buffer empty, and previously found EOF. Closing socket\n");
utp_close(s);
debug("Buffer empty, and previously found EOF. Shutdown socket\n");
utp_shutdown_flag = 1;
if (!utp_eof_flag) {
utp_shutdown(s, SHUT_WR);
} else {
utp_close(s);
}
}
else {
quit_flag = 1;
Expand Down Expand Up @@ -223,8 +228,11 @@ uint64 callback_on_state_change(utp_callback_arguments *a)
break;

case UTP_STATE_EOF:
debug("Received EOF from socket; closing\n");
utp_close(a->socket);
debug("Received EOF from socket\n");
utp_eof_flag = 1;
if (utp_shutdown_flag) {
utp_close(a->socket);
}
break;

case UTP_STATE_DESTROYING:
Expand Down
1 change: 1 addition & 0 deletions utp.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void utp_read_drained (utp_socket *s);
int utp_get_delays (utp_socket *s, uint32 *ours, uint32 *theirs, uint32 *age);
utp_socket_stats* utp_get_stats (utp_socket *s);
utp_context* utp_get_context (utp_socket *s);
void utp_shutdown (utp_socket *s, int how);
void utp_close (utp_socket *s);

#ifdef __cplusplus
Expand Down
162 changes: 104 additions & 58 deletions utp_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,12 @@ enum CONN_STATE {
CS_SYN_RECV,
CS_CONNECTED,
CS_CONNECTED_FULL,
CS_GOT_FIN,
CS_DESTROY_DELAY,
CS_FIN_SENT,
CS_RESET,
CS_DESTROY
};

static const cstr statenames[] = {
"UNINITIALIZED", "IDLE","SYN_SENT", "SYN_RECV", "CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
"UNINITIALIZED", "IDLE","SYN_SENT", "SYN_RECV", "CONNECTED","CONNECTED_FULL","DESTROY_DELAY","RESET","DESTROY"
};

struct OutgoingPacket {
Expand Down Expand Up @@ -430,6 +427,19 @@ struct UTPSocket {

// Is a FIN packet in the reassembly buffer?
bool got_fin:1;
// Have we reached the FIN?
bool got_fin_reached:1;

// Have we sent our FIN?
bool fin_sent:1;
// Has our fin been ACKed?
bool fin_sent_acked:1;

// Reading is disabled
bool read_shutdown:1;
// User called utp_close()
bool close_requested:1;

// Timeout procedure
bool fast_timeout:1;

Expand Down Expand Up @@ -776,7 +786,7 @@ void UTPSocket::send_ack(bool synack)

// we never need to send EACK for connections
// that are shutting down
if (reorder_count != 0 && state < CS_GOT_FIN) {
if (reorder_count != 0 && !got_fin_reached) {
// if reorder count > 0, send an EACK.
// reorder count should always be 0
// for synacks, so this should not be
Expand Down Expand Up @@ -1127,8 +1137,7 @@ void UTPSocket::check_timeouts()
case CS_SYN_SENT:
case CS_SYN_RECV:
case CS_CONNECTED_FULL:
case CS_CONNECTED:
case CS_FIN_SENT: {
case CS_CONNECTED: {

// Reset max window...
if ((int)(ctx->current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
Expand Down Expand Up @@ -1183,7 +1192,7 @@ void UTPSocket::check_timeouts()
// 4 consecutive transmissions have timed out. Kill it. If we
// haven't even connected yet, give up after only 2 consecutive
// failed transmissions.
if (state == CS_FIN_SENT)
if (close_requested)
state = CS_DESTROY;
else
state = CS_RESET;
Expand Down Expand Up @@ -1259,24 +1268,14 @@ void UTPSocket::check_timeouts()
utp_call_on_state_change(this->ctx, this, UTP_STATE_WRITABLE);
}

if (state >= CS_CONNECTED && state < CS_GOT_FIN) {
if (state >= CS_CONNECTED && !fin_sent) {
if ((int)(ctx->current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
send_keep_alive();
}
}
break;
}

// Close?
case CS_GOT_FIN:
case CS_DESTROY_DELAY:
if ((int)(ctx->current_ms - rto_timeout) >= 0) {
state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
if (cur_window_packets > 0) {
utp_call_on_error(ctx, this, UTP_ECONNRESET);
}
}
break;
// prevent warning
case CS_UNINITIALIZED:
case CS_IDLE:
Expand Down Expand Up @@ -2173,12 +2172,14 @@ size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, boo
else
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_CONNECT);

// We've sent a fin, and everything was ACKed (including the FIN),
// it's safe to destroy the socket. cur_window_packets == acks
// means that this packet acked all the remaining packets that
// were in-flight.
} else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) {
conn->state = CS_DESTROY;
// We've sent a fin, and everything was ACKed (including the FIN).
// cur_window_packets == acks means that this packet acked all
// the remaining packets that were in-flight.
} else if (conn->fin_sent && conn->cur_window_packets == acks) {
conn->fin_sent_acked = true;
if (conn->close_requested) {
conn->state = CS_DESTROY;
}
}

// Update fast resend counter
Expand Down Expand Up @@ -2314,8 +2315,7 @@ size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, boo

// The connection is not in a state that can accept data?
if (conn->state != CS_CONNECTED &&
conn->state != CS_CONNECTED_FULL &&
conn->state != CS_FIN_SENT) {
conn->state != CS_CONNECTED_FULL) {
return 0;
}

Expand All @@ -2341,7 +2341,7 @@ size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, boo
// Getting an in-order packet?
if (seqnr == 0) {
size_t count = packet_end - data;
if (count > 0 && conn->state != CS_FIN_SENT) {
if (count > 0 && !conn->read_shutdown) {

#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Got Data len:%u (rb:%u)", (uint)count, (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
Expand All @@ -2356,18 +2356,15 @@ size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, boo
// in the reorder buffer.
for (;;) {

if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
if (conn->state != CS_FIN_SENT) {
conn->state = CS_GOT_FIN;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
if (!conn->got_fin_reached && conn->got_fin && conn->eof_pkt == conn->ack_nr) {
conn->got_fin_reached = true;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);

#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Posting EOF");
#endif

#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Posting EOF");
#endif

utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
}
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);

// if the other end wants to close, ack
conn->send_ack();
Expand All @@ -2392,7 +2389,7 @@ size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, boo
break;
conn->inbuf.put(conn->ack_nr+1, NULL);
count = *(uint*)p;
if (count > 0 && conn->state != CS_FIN_SENT) {
if (count > 0 && !conn->read_shutdown) {
// Pass the bytes to the upper layer
utp_call_on_read(conn->ctx, conn, p + sizeof(uint), count);
}
Expand Down Expand Up @@ -2589,6 +2586,11 @@ utp_socket* utp_create_socket(utp_context *ctx)
conn->timeout_seq_nr = 0;
conn->last_rcv_win = 0;
conn->got_fin = false;
conn->got_fin_reached = false;
conn->fin_sent = false;
conn->fin_sent_acked = false;
conn->read_shutdown = false;
conn->close_requested = false;
conn->fast_timeout = false;
conn->rtt = 0;
conn->retransmit_timeout = 0;
Expand Down Expand Up @@ -2862,7 +2864,7 @@ int utp_process_udp(utp_context *ctx, const byte *buffer, size_t len, const stru
ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for existing connection");
#endif

if (conn->state == CS_FIN_SENT)
if (conn->close_requested)
conn->state = CS_DESTROY;
else
conn->state = CS_RESET;
Expand Down Expand Up @@ -3128,18 +3130,18 @@ int utp_process_icmp_error(utp_context *ctx, const byte *buffer, size_t len, con
#endif
return 1;

case CS_FIN_SENT:
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s in state CS_FIN_SENT, setting state to CS_DESTROY and causing error %d", addrfmt(addr, addrbuf), err);
#endif
conn->state = CS_DESTROY;
break;

default:
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s, setting state to CS_RESET and causing error %d", addrfmt(addr, addrbuf), err);
#endif
conn->state = CS_RESET;
if (conn->close_requested) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s after close, setting state to CS_DESTROY and causing error %d", addrfmt(addr, addrbuf), err);
#endif
conn->state = CS_DESTROY;
} else {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s, setting state to CS_RESET and causing error %d", addrfmt(addr, addrbuf), err);
#endif
conn->state = CS_RESET;
}
break;
}

Expand Down Expand Up @@ -3183,6 +3185,13 @@ ssize_t utp_writev(utp_socket *conn, struct utp_iovec *iovec_input, size_t num_i
return 0;
}

if (conn->fin_sent) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = false (fin_sent already)", (uint)bytes);
#endif
return 0;
}

conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);

// don't send unless it will all fit in the window
Expand Down Expand Up @@ -3352,8 +3361,6 @@ void utp_close(UTPSocket *conn)
if (!conn) return;

assert(conn->state != CS_UNINITIALIZED
&& conn->state != CS_DESTROY_DELAY
&& conn->state != CS_FIN_SENT
&& conn->state != CS_DESTROY);

#if UTP_DEBUG_LOGGING
Expand All @@ -3363,22 +3370,61 @@ void utp_close(UTPSocket *conn)
switch(conn->state) {
case CS_CONNECTED:
case CS_CONNECTED_FULL:
conn->state = CS_FIN_SENT;
conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
conn->read_shutdown = true;
conn->close_requested = true;
if (!conn->fin_sent) {
conn->fin_sent = true;
conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
} else if (conn->fin_sent_acked) {
conn->state = CS_DESTROY;
}
break;

case CS_SYN_SENT:
conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
// fall through
case CS_GOT_FIN:
conn->state = CS_DESTROY_DELAY;
break;
case CS_SYN_RECV:
// fall through
default:
conn->state = CS_DESTROY;
break;
}

#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_Close end in state:%s", statenames[conn->state]);
#endif
}

void utp_shutdown(UTPSocket *conn, int how)
{
assert(conn);
if (!conn) return;

assert(conn->state != CS_UNINITIALIZED
&& conn->state != CS_DESTROY);

#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_shutdown(%d) in state:%s", how, statenames[conn->state]);
#endif

if (how != SHUT_WR) {
conn->read_shutdown = true;
}
if (how != SHUT_RD) {
switch(conn->state) {
case CS_CONNECTED:
case CS_CONNECTED_FULL:
if (!conn->fin_sent) {
conn->fin_sent = true;
conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
}
break;
case CS_SYN_SENT:
conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
default:
break;
}
}
}

utp_context* utp_get_context(utp_socket *socket) {
Expand Down

0 comments on commit 53faf77

Please sign in to comment.