Skip to content

Commit

Permalink
do not reserve in kcp core, reserve bytes in callback func
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Jul 31, 2024
1 parent cc09441 commit bf6f88f
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 31 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<img src="kcp-go.png" alt="kcp-go" height="50px" />
<img src="assets/kcp-go.png" alt="kcp-go" height="50px" />


[![GoDoc][1]][2] [![Powered][9]][10] [![MIT licensed][11]][12] [![Build Status][3]][4] [![Go Report Card][5]][6] [![Coverage Statusd][7]][8] [![Sourcegraph][13]][14]
Expand Down Expand Up @@ -44,7 +44,7 @@ For complete documentation, see the associated [Godoc](https://godoc.org/github.

## Specification

<img src="frame.png" alt="Frame Format" height="109px" />
<img src="assets/frame.png" alt="Frame Format" height="109px" />

```
NONCE:
Expand Down Expand Up @@ -200,7 +200,7 @@ ok github.com/xtaci/kcp-go/v5 64.151s


## Typical Flame Graph
![Flame Graph in kcptun](flame.png)
![Flame Graph in kcptun](assets/flame.png)

## Key Design Considerations

Expand Down
File renamed without changes
File renamed without changes
File renamed without changes
30 changes: 7 additions & 23 deletions kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ type KCP struct {

acklist []ackItem

buffer []byte
reserved int
output output_callback
buffer []byte
output output_callback
}

type ackItem struct {
Expand Down Expand Up @@ -222,19 +221,6 @@ func (kcp *KCP) delSegment(seg *segment) {
}
}

// ReserveBytes keeps n bytes untouched from the beginning of the buffer,
// the output_callback function should be aware of this.
//
// Return false if n >= mss
func (kcp *KCP) ReserveBytes(n int) bool {
if n >= int(kcp.mtu-IKCP_OVERHEAD) || n < 0 {
return false
}
kcp.reserved = n
kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(n)
return true
}

// PeekSize checks the size of next message in the recv queue
func (kcp *KCP) PeekSize() (length int) {
if len(kcp.rcv_queue) == 0 {
Expand Down Expand Up @@ -700,21 +686,21 @@ func (kcp *KCP) flush(ackOnly bool) uint32 {
seg.una = kcp.rcv_nxt

buffer := kcp.buffer
ptr := buffer[kcp.reserved:] // keep n bytes untouched
ptr := buffer

// makeSpace makes room for writing
makeSpace := func(space int) {
size := len(buffer) - len(ptr)
if size+space > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer[kcp.reserved:]
ptr = buffer
}
}

// flush bytes in buffer if there is any
flushBuffer := func() {
size := len(buffer) - len(ptr)
if size > kcp.reserved {
if size > 0 {
kcp.output(buffer, size)
}
}
Expand Down Expand Up @@ -1011,16 +997,14 @@ func (kcp *KCP) SetMtu(mtu int) int {
if mtu < 50 || mtu < IKCP_OVERHEAD {
return -1
}
if kcp.reserved >= int(kcp.mtu-IKCP_OVERHEAD) || kcp.reserved < 0 {
return -1
}

buffer := make([]byte, mtu)
if buffer == nil {
return -2
}

kcp.mtu = uint32(mtu)
kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(kcp.reserved)
kcp.mss = kcp.mtu - IKCP_OVERHEAD
kcp.buffer = buffer
return 0
}
Expand Down
8 changes: 4 additions & 4 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn

sess.kcp = NewKCP(conv, func(buf []byte, size int) {
// A basic check for the minimum packet size
if size >= IKCP_OVERHEAD+sess.headerSize {
if size >= IKCP_OVERHEAD {
// make a copy
bts := xmitBuf.Get().([]byte)[:size]
copy(bts, buf)
bts := xmitBuf.Get().([]byte)[:size+sess.headerSize]
// copy the data to a new buffer, and reserve header space
copy(bts[sess.headerSize:], buf)

// delivery to post processing
select {
Expand All @@ -226,7 +227,6 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn

}
})
sess.kcp.ReserveBytes(sess.headerSize)

// create post-processing goroutine
go sess.postProcess()
Expand Down
33 changes: 32 additions & 1 deletion sess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
package kcp

import (
"bytes"
"crypto/rand"
"crypto/sha1"
"fmt"
"io"
Expand Down Expand Up @@ -116,7 +118,7 @@ func listenEcho(port int) (net.Listener, error) {
//block, _ := NewTEABlockCrypt(pass[:16])
//block, _ := NewAESBlockCrypt(pass)
block, _ := NewSalsa20BlockCrypt(pass)
return ListenWithOptions(fmt.Sprintf("127.0.0.1:%v", port), block, 10, 0)
return ListenWithOptions(fmt.Sprintf("127.0.0.1:%v", port), block, 10, 1)
}
func listenTinyBufferEcho(port int) (net.Listener, error) {
//block, _ := NewNoneBlockCrypt(pass)
Expand Down Expand Up @@ -663,3 +665,32 @@ func TestUDPSessionNonOwnedPacketConn(t *testing.T) {
t.Fatal("non-owned PacketConn closed after UDPSession.Close()")
}
}

// this function test the data correctness with FEC and encryption enabled
func TestReliability(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()

cli, err := dialEcho(port)
if err != nil {
panic(err)
}
cli.SetWriteDelay(false)
const N = 100000
buf := make([]byte, 128)
msg := make([]byte, 128)

for i := 0; i < N; i++ {
io.ReadFull(rand.Reader, msg)
cli.Write([]byte(msg))
if n, err := io.ReadFull(cli, buf); err == nil {
if !bytes.Equal(buf[:n], msg) {
t.Fail()
}
} else {
panic(err)
}
}
cli.Close()
}

0 comments on commit bf6f88f

Please sign in to comment.