This commit is contained in:
Arceliar 2018-06-23 07:12:24 +00:00 committed by GitHub
commit dd79d9f5dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -242,26 +243,34 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
in := func(bs []byte) { in := func(bs []byte) {
p.handlePacket(bs) p.handlePacket(bs)
} }
out := make(chan []byte, 32) // TODO? what size makes sense out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack
defer close(out) defer close(out)
go func() { go func() {
var shadow int64 var shadow int64
var stack [][]byte var stack [][]byte
put := func(msg []byte) { put := func(msg []byte) {
stack = append(stack, msg) stack = append(stack, msg)
sort.SliceStable(stack, func(i, j int) bool {
// Sort in reverse order, with smallest messages at the end
return len(stack[i]) >= len(stack[j])
})
for len(stack) > 32 { for len(stack) > 32 {
util_putBytes(stack[0]) util_putBytes(stack[0])
stack = stack[1:] stack = stack[1:]
shadow++ shadow++
} }
} }
send := func(msg []byte) { send := make(chan []byte)
defer close(send)
go func() {
for msg := range send {
msgLen := wire_encode_uint64(uint64(len(msg))) msgLen := wire_encode_uint64(uint64(len(msg)))
buf := net.Buffers{tcp_msg[:], msgLen, msg} buf := net.Buffers{tcp_msg[:], msgLen, msg}
buf.WriteTo(sock) buf.WriteTo(sock)
atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg)))
util_putBytes(msg) util_putBytes(msg)
} }
}()
timerInterval := tcp_timeout * 2 / 3 timerInterval := tcp_timeout * 2 / 3
timer := time.NewTimer(timerInterval) timer := time.NewTimer(timerInterval)
defer timer.Stop() defer timer.Stop()
@ -278,9 +287,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
timer.Reset(timerInterval) timer.Reset(timerInterval)
select { select {
case _ = <-timer.C: case _ = <-timer.C:
send(nil) // TCP keep-alive traffic send <- nil // TCP keep-alive traffic
case msg := <-p.linkOut: case msg := <-p.linkOut:
send(msg) send <- msg
case msg, ok := <-out: case msg, ok := <-out:
if !ok { if !ok {
return return
@ -288,31 +297,33 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
put(msg) put(msg)
} }
for len(stack) > 0 { for len(stack) > 0 {
// First make sure linkOut gets sent first, if it's non-empty
select { select {
case msg := <-p.linkOut: case msg := <-p.linkOut:
send(msg) send <- msg
continue
default:
}
// Then block until we send or receive something
select {
case msg := <-p.linkOut:
send <- msg
case msg, ok := <-out: case msg, ok := <-out:
if !ok { if !ok {
return return
} }
put(msg) put(msg)
default: case send <- stack[len(stack)-1]:
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1] stack = stack[:len(stack)-1]
send(msg)
p.updateQueueSize(-1) p.updateQueueSize(-1)
} }
} }
} }
}() }()
p.out = func(msg []byte) { p.out = func(msg []byte) {
defer func() { recover() }()
select {
case out <- msg:
p.updateQueueSize(1) p.updateQueueSize(1)
default: defer func() { recover() }()
util_putBytes(msg) out <- msg
}
} }
p.close = func() { sock.Close() } p.close = func() { sock.Close() }
setNoDelay(sock, true) setNoDelay(sock, true)