diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index db57b568..0df2d526 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "net" + "sort" "sync" "sync/atomic" "time" @@ -242,26 +243,34 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 32) // TODO? what size makes sense + out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack defer close(out) go func() { var shadow int64 var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) + sort.SliceStable(stack, func(i, j int) bool { + // Sort in reverse order, with smallest messages at the end + return len(stack[i]) >= len(stack[j]) + }) for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] shadow++ } } - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } + send := make(chan []byte) + defer close(send) + go func() { + for msg := range send { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } + }() timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -278,9 +287,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send(nil) // TCP keep-alive traffic + send <- nil // TCP keep-alive traffic case msg := <-p.linkOut: - send(msg) + send <- msg case msg, ok := <-out: if !ok { return @@ -288,31 +297,33 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { put(msg) } for len(stack) > 0 { + // First make sure linkOut gets sent first, if it's non-empty select { case msg := <-p.linkOut: - send(msg) + send <- msg + continue + default: + } + // Then block until we send or receive something + select { + case msg := <-p.linkOut: + send <- msg case msg, ok := <-out: if !ok { return } put(msg) - default: - msg := stack[len(stack)-1] + case send <- stack[len(stack)-1]: stack = stack[:len(stack)-1] - send(msg) p.updateQueueSize(-1) } } } }() p.out = func(msg []byte) { + p.updateQueueSize(1) defer func() { recover() }() - select { - case out <- msg: - p.updateQueueSize(1) - default: - util_putBytes(msg) - } + out <- msg } p.close = func() { sock.Close() } setNoDelay(sock, true)