go back to writing 1 packet at a time, and set tcp buffers to a reasonable value, this reduces speed somewhat (mostly for many small packets) but dramatically decreases latency when under load

This commit is contained in:
Arceliar 2019-10-04 19:00:08 -05:00
parent 6ddb0f93f3
commit 58c97df6a0
6 changed files with 58 additions and 85 deletions

View file

@ -39,7 +39,7 @@ type linkInfo struct {
type linkInterfaceMsgIO interface { type linkInterfaceMsgIO interface {
readMsg() ([]byte, error) readMsg() ([]byte, error)
writeMsgs([][]byte) (int, error) writeMsg([]byte) (int, error)
close() error close() error
// These are temporary workarounds to stream semantics // These are temporary workarounds to stream semantics
_sendMetaBytes([]byte) error _sendMetaBytes([]byte) error
@ -219,11 +219,11 @@ func (intf *linkInterface) handler() error {
// More cleanup can go here // More cleanup can go here
intf.link.core.peers.removePeer(intf.peer.port) intf.link.core.peers.removePeer(intf.peer.port)
}() }()
intf.peer.out = func(msgs [][]byte) { intf.peer.out = func(msg []byte) {
intf.writer.sendFrom(intf.peer, msgs, false) intf.writer.sendFrom(intf.peer, msg, false)
} }
intf.peer.linkOut = func(bs []byte) { intf.peer.linkOut = func(bs []byte) {
intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) intf.writer.sendFrom(intf.peer, bs, true)
} }
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
@ -370,7 +370,7 @@ func (intf *linkInterface) notifyDoKeepAlive() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic intf.writer.sendFrom(nil, nil, true) // Empty keep-alive traffic
} }
}) })
} }
@ -382,13 +382,9 @@ type linkWriter struct {
intf *linkInterface intf *linkInterface
} }
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { func (w *linkWriter) sendFrom(from phony.Actor, bs []byte, isLinkTraffic bool) {
w.Act(from, func() { w.Act(from, func() {
var size int w.intf.notifySending(len(bs), isLinkTraffic)
for _, bs := range bss {
size += len(bs)
}
w.intf.notifySending(size, isLinkTraffic)
// start a timer that will fire if we get stuck in writeMsgs for an oddly long time // start a timer that will fire if we get stuck in writeMsgs for an oddly long time
var once sync.Once var once sync.Once
timer := time.AfterFunc(time.Millisecond, func() { timer := time.AfterFunc(time.Millisecond, func() {
@ -399,14 +395,12 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
w.intf.Act(nil, w.intf._notifySyscall) w.intf.Act(nil, w.intf._notifySyscall)
}) })
}) })
w.intf.msgIO.writeMsgs(bss) w.intf.msgIO.writeMsg(bs)
// Make sure we either stop the timer from doing anything or wait until it's done // Make sure we either stop the timer from doing anything or wait until it's done
once.Do(func() { timer.Stop() }) once.Do(func() { timer.Stop() })
w.intf.notifySent(size, isLinkTraffic) w.intf.notifySent(len(bs), isLinkTraffic)
// Cleanup // Cleanup
for _, bs := range bss {
util.PutBytes(bs) util.PutBytes(bs)
}
}) })
} }

View file

@ -104,7 +104,7 @@ type peer struct {
firstSeen time.Time // To track uptime for getPeers firstSeen time.Time // To track uptime for getPeers
linkOut func([]byte) // used for protocol traffic (bypasses the switch) linkOut func([]byte) // used for protocol traffic (bypasses the switch)
dinfo *dhtInfo // used to keep the DHT working dinfo *dhtInfo // used to keep the DHT working
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
done (chan struct{}) // closed to exit the linkLoop done (chan struct{}) // closed to exit the linkLoop
close func() // Called when a peer is removed, to close the underlying connection, or via admin api close func() // Called when a peer is removed, to close the underlying connection, or via admin api
// The below aren't actually useful internally, they're just gathered for getPeers statistics // The below aren't actually useful internally, they're just gathered for getPeers statistics
@ -244,22 +244,16 @@ func (p *peer) _handleTraffic(packet []byte) {
p.core.switchTable.packetInFrom(p, packet) p.core.switchTable.packetInFrom(p, packet)
} }
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) {
p.Act(from, func() { p.Act(from, func() {
p._sendPackets(packets) p._sendPackets(packet)
}) })
} }
// This just calls p.out(packet) for now. // This just calls p.out(packet) for now.
func (p *peer) _sendPackets(packets [][]byte) { func (p *peer) _sendPackets(packet []byte) {
// Is there ever a case where something more complicated is needed? p.bytesSent += uint64(len(packet))
// What if p.out blocks? p.out(packet)
var size int
for _, packet := range packets {
size += len(packet)
}
p.bytesSent += uint64(size)
p.out(packets)
} }
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.

View file

@ -62,7 +62,7 @@ func (r *router) init(core *Core) {
}, },
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packets [][]byte) { r.handlePackets(p, packets) } p.out = func(packet []byte) { r.handlePacket(p, packet) }
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
r.nodeinfo.init(r.core) r.nodeinfo.init(r.core)
r.core.config.Mutex.RLock() r.core.config.Mutex.RLock()
@ -97,11 +97,9 @@ func (r *router) start() error {
} }
// In practice, the switch will call this with 1 packet // In practice, the switch will call this with 1 packet
func (r *router) handlePackets(from phony.Actor, packets [][]byte) { func (r *router) handlePacket(from phony.Actor, packet []byte) {
r.Act(from, func() { r.Act(from, func() {
for _, packet := range packets {
r._handlePacket(packet) r._handlePacket(packet)
}
}) })
} }

View file

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
@ -16,7 +15,7 @@ var _ = linkInterfaceMsgIO(&stream{})
type stream struct { type stream struct {
rwc io.ReadWriteCloser rwc io.ReadWriteCloser
inputBuffer *bufio.Reader inputBuffer *bufio.Reader
outputBuffer net.Buffers outputBuffer *bufio.Writer
} }
func (s *stream) close() error { func (s *stream) close() error {
@ -32,22 +31,19 @@ func (s *stream) init(rwc io.ReadWriteCloser) {
s.rwc = rwc s.rwc = rwc
// TODO call something to do the metadata exchange // TODO call something to do the metadata exchange
s.inputBuffer = bufio.NewReaderSize(s.rwc, 2*streamMsgSize) s.inputBuffer = bufio.NewReaderSize(s.rwc, 2*streamMsgSize)
s.outputBuffer = bufio.NewWriterSize(s.rwc, streamMsgSize)
} }
// writeMsg writes a message with stream padding, and is *not* thread safe. // writeMsg writes a message with stream padding, and is *not* thread safe.
func (s *stream) writeMsgs(bss [][]byte) (int, error) { func (s *stream) writeMsg(bs []byte) (int, error) {
buf := s.outputBuffer[:0] s.outputBuffer.Write(streamMsg[:])
var written int s.outputBuffer.Write(wire_encode_uint64(uint64(len(bs))))
for _, bs := range bss { n, err := s.outputBuffer.Write(bs)
buf = append(buf, streamMsg[:]) err2 := s.outputBuffer.Flush()
buf = append(buf, wire_encode_uint64(uint64(len(bs)))) if err == nil {
buf = append(buf, bs) err = err2
written += len(bs)
} }
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later return n, err
_, err := buf.WriteTo(s.rwc)
// TODO only include number of bytes from bs *successfully* written?
return written, err
} }
// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. // readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.

View file

@ -672,7 +672,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen
if len(closer) == 0 { if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer? // TODO? call the router directly, and remove the whole concept of a self peer?
self := t.core.peers.getPorts()[0] self := t.core.peers.getPorts()[0]
self.sendPacketsFrom(t, [][]byte{packet}) self.sendPacketFrom(t, packet)
return true return true
} }
var best *closerInfo var best *closerInfo
@ -718,7 +718,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen
if best != nil { if best != nil {
if _, isIdle := idle[best.elem.port]; isIdle { if _, isIdle := idle[best.elem.port]; isIdle {
delete(idle, best.elem.port) delete(idle, best.elem.port)
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) ports[best.elem.port].sendPacketFrom(t, packet)
return true return true
} }
} }
@ -795,11 +795,8 @@ func (t *switchTable) _handleIdle(port switchPort) bool {
if to == nil { if to == nil {
return true return true
} }
var packets [][]byte
var psize int
t.queues._cleanup(t) t.queues._cleanup(t)
now := time.Now() now := time.Now()
for psize < 65535 {
var best string var best string
var bestPriority float64 var bestPriority float64
for streamID, buf := range t.queues.bufs { for streamID, buf := range t.queues.bufs {
@ -826,18 +823,11 @@ func (t *switchTable) _handleIdle(port switchPort) bool {
// Need to update the map, since buf was retrieved by value // Need to update the map, since buf was retrieved by value
t.queues.bufs[best] = buf t.queues.bufs[best] = buf
} }
packets = append(packets, packet.bytes) to.sendPacketFrom(t, packet.bytes)
psize += len(packet.bytes)
} else {
// Finished finding packets
break
}
}
if len(packets) > 0 {
to.sendPacketsFrom(t, packets)
return true return true
} } else {
return false return false
}
} }
func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {

View file

@ -60,6 +60,7 @@ func (t *tcp) setExtraOptions(c net.Conn) {
switch sock := c.(type) { switch sock := c.(type) {
case *net.TCPConn: case *net.TCPConn:
sock.SetNoDelay(true) sock.SetNoDelay(true)
sock.SetWriteBuffer(streamMsgSize)
// TODO something for socks5 // TODO something for socks5
default: default:
} }