From 0021f3463ff62522523d9afb6d5d62d5236564d4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 22 Jun 2018 20:39:57 -0500 Subject: [PATCH 1/3] slightly better way for the tcp sender goroutine(s) to block waiting for work --- src/yggdrasil/tcp.go | 45 ++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index db57b568..32cfbf37 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 32) // TODO? what size makes sense + out := make(chan []byte, 1) defer close(out) go func() { var shadow int64 @@ -255,13 +255,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { shadow++ } } - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } + send := make(chan []byte) + defer close(send) + go func() { + for msg := range send { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } + }() timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -278,9 +282,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send(nil) // TCP keep-alive traffic + send <- nil // TCP keep-alive traffic case msg := <-p.linkOut: - send(msg) + send <- msg case msg, ok := <-out: if !ok { return @@ -288,31 +292,32 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { put(msg) } for len(stack) > 0 { + // First make sure linkOut gets sent first, if it's non-empty select { case msg := <-p.linkOut: - send(msg) + send <- msg + default: + } + // Then block until we send or receive something + select { + case msg := <-p.linkOut: + send <- msg case msg, ok := <-out: if !ok { return } put(msg) - default: - msg := stack[len(stack)-1] + case send <- stack[len(stack)-1]: stack = stack[:len(stack)-1] - send(msg) p.updateQueueSize(-1) } } } }() p.out = func(msg []byte) { + p.updateQueueSize(1) defer func() { recover() }() - select { - case out <- msg: - p.updateQueueSize(1) - default: - util_putBytes(msg) - } + out <- msg } p.close = func() { sock.Close() } setNoDelay(sock, true) From cceecf4b1aba0799ee3d1453b047d73f802b937a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 22 Jun 2018 23:46:42 -0500 Subject: [PATCH 2/3] larger out queue size, make sure linkOut packets always get sent first --- src/yggdrasil/tcp.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 32cfbf37..fcefd42b 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 1) + out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack defer close(out) go func() { var shadow int64 @@ -296,6 +296,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: send <- msg + continue default: } // Then block until we send or receive something From 2ae213c2557b6b53cea416a07fce137b0104b097 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 01:10:18 -0500 Subject: [PATCH 3/3] I'll try sorting, that's a good trick --- src/yggdrasil/tcp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index fcefd42b..0df2d526 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "net" + "sort" "sync" "sync/atomic" "time" @@ -249,6 +250,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) + sort.SliceStable(stack, func(i, j int) bool { + // Sort in reverse order, with smallest messages at the end + return len(stack[i]) >= len(stack[j]) + }) for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:]