From 58c97df6a0465a78eab59668fd3ba42e7ac46f3c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 4 Oct 2019 19:00:08 -0500 Subject: [PATCH] go back to writing 1 packet at a time, and set tcp buffers to a reasonable value, this reduces speed somewhat (mostly for many small packets) but dramatically decreases latency when under load --- src/yggdrasil/link.go | 26 +++++++--------- src/yggdrasil/peer.go | 18 ++++------- src/yggdrasil/router.go | 8 ++--- src/yggdrasil/stream.go | 24 +++++++-------- src/yggdrasil/switch.go | 66 +++++++++++++++++------------------------ src/yggdrasil/tcp.go | 1 + 6 files changed, 58 insertions(+), 85 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 98c080c7..f3c41f6a 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -39,7 +39,7 @@ type linkInfo struct { type linkInterfaceMsgIO interface { readMsg() ([]byte, error) - writeMsgs([][]byte) (int, error) + writeMsg([]byte) (int, error) close() error // These are temporary workarounds to stream semantics _sendMetaBytes([]byte) error @@ -219,11 +219,11 @@ func (intf *linkInterface) handler() error { // More cleanup can go here intf.link.core.peers.removePeer(intf.peer.port) }() - intf.peer.out = func(msgs [][]byte) { - intf.writer.sendFrom(intf.peer, msgs, false) + intf.peer.out = func(msg []byte) { + intf.writer.sendFrom(intf.peer, msg, false) } intf.peer.linkOut = func(bs []byte) { - intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) + intf.writer.sendFrom(intf.peer, bs, true) } themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() @@ -370,7 +370,7 @@ func (intf *linkInterface) notifyDoKeepAlive() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil - intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic + intf.writer.sendFrom(nil, nil, true) // Empty keep-alive traffic } }) } @@ -382,13 +382,9 @@ type linkWriter struct { intf *linkInterface } -func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { +func (w *linkWriter) sendFrom(from phony.Actor, bs []byte, isLinkTraffic bool) { w.Act(from, func() { - var size int - for _, bs := range bss { - size += len(bs) - } - w.intf.notifySending(size, isLinkTraffic) + w.intf.notifySending(len(bs), isLinkTraffic) // start a timer that will fire if we get stuck in writeMsgs for an oddly long time var once sync.Once timer := time.AfterFunc(time.Millisecond, func() { @@ -399,14 +395,12 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool w.intf.Act(nil, w.intf._notifySyscall) }) }) - w.intf.msgIO.writeMsgs(bss) + w.intf.msgIO.writeMsg(bs) // Make sure we either stop the timer from doing anything or wait until it's done once.Do(func() { timer.Stop() }) - w.intf.notifySent(size, isLinkTraffic) + w.intf.notifySent(len(bs), isLinkTraffic) // Cleanup - for _, bs := range bss { - util.PutBytes(bs) - } + util.PutBytes(bs) }) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 381e6917..7046afed 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -104,7 +104,7 @@ type peer struct { firstSeen time.Time // To track uptime for getPeers linkOut func([]byte) // used for protocol traffic (bypasses the switch) dinfo *dhtInfo // used to keep the DHT working - out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes done (chan struct{}) // closed to exit the linkLoop close func() // Called when a peer is removed, to close the underlying connection, or via admin api // The below aren't actually useful internally, they're just gathered for getPeers statistics @@ -244,22 +244,16 @@ func (p *peer) _handleTraffic(packet []byte) { p.core.switchTable.packetInFrom(p, packet) } -func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { +func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) { p.Act(from, func() { - p._sendPackets(packets) + p._sendPackets(packet) }) } // This just calls p.out(packet) for now. -func (p *peer) _sendPackets(packets [][]byte) { - // Is there ever a case where something more complicated is needed? - // What if p.out blocks? - var size int - for _, packet := range packets { - size += len(packet) - } - p.bytesSent += uint64(size) - p.out(packets) +func (p *peer) _sendPackets(packet []byte) { + p.bytesSent += uint64(len(packet)) + p.out(packet) } // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 64c81701..4a3cc275 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -62,7 +62,7 @@ func (r *router) init(core *Core) { }, } p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - p.out = func(packets [][]byte) { r.handlePackets(p, packets) } + p.out = func(packet []byte) { r.handlePacket(p, packet) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() @@ -97,11 +97,9 @@ func (r *router) start() error { } // In practice, the switch will call this with 1 packet -func (r *router) handlePackets(from phony.Actor, packets [][]byte) { +func (r *router) handlePacket(from phony.Actor, packet []byte) { r.Act(from, func() { - for _, packet := range packets { - r._handlePacket(packet) - } + r._handlePacket(packet) }) } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 4ab37c29..472368b9 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "net" "github.com/yggdrasil-network/yggdrasil-go/src/util" ) @@ -16,7 +15,7 @@ var _ = linkInterfaceMsgIO(&stream{}) type stream struct { rwc io.ReadWriteCloser inputBuffer *bufio.Reader - outputBuffer net.Buffers + outputBuffer *bufio.Writer } func (s *stream) close() error { @@ -32,22 +31,19 @@ func (s *stream) init(rwc io.ReadWriteCloser) { s.rwc = rwc // TODO call something to do the metadata exchange s.inputBuffer = bufio.NewReaderSize(s.rwc, 2*streamMsgSize) + s.outputBuffer = bufio.NewWriterSize(s.rwc, streamMsgSize) } // writeMsg writes a message with stream padding, and is *not* thread safe. -func (s *stream) writeMsgs(bss [][]byte) (int, error) { - buf := s.outputBuffer[:0] - var written int - for _, bs := range bss { - buf = append(buf, streamMsg[:]) - buf = append(buf, wire_encode_uint64(uint64(len(bs)))) - buf = append(buf, bs) - written += len(bs) +func (s *stream) writeMsg(bs []byte) (int, error) { + s.outputBuffer.Write(streamMsg[:]) + s.outputBuffer.Write(wire_encode_uint64(uint64(len(bs)))) + n, err := s.outputBuffer.Write(bs) + err2 := s.outputBuffer.Flush() + if err == nil { + err = err2 } - s.outputBuffer = buf[:0] // So we can reuse the same underlying array later - _, err := buf.WriteTo(s.rwc) - // TODO only include number of bytes from bs *successfully* written? - return written, err + return n, err } // readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ba30758c..1102e5b2 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -672,7 +672,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen if len(closer) == 0 { // TODO? call the router directly, and remove the whole concept of a self peer? self := t.core.peers.getPorts()[0] - self.sendPacketsFrom(t, [][]byte{packet}) + self.sendPacketFrom(t, packet) return true } var best *closerInfo @@ -718,7 +718,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen if best != nil { if _, isIdle := idle[best.elem.port]; isIdle { delete(idle, best.elem.port) - ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + ports[best.elem.port].sendPacketFrom(t, packet) return true } } @@ -795,49 +795,39 @@ func (t *switchTable) _handleIdle(port switchPort) bool { if to == nil { return true } - var packets [][]byte - var psize int t.queues._cleanup(t) now := time.Now() - for psize < 65535 { - var best string - var bestPriority float64 - for streamID, buf := range t.queues.bufs { - // Filter over the streams that this node is closer to - // Keep the one with the smallest queue - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority > bestPriority && t.portIsCloser(coords, port) { - best = streamID - bestPriority = priority - } + var best string + var bestPriority float64 + for streamID, buf := range t.queues.bufs { + // Filter over the streams that this node is closer to + // Keep the one with the smallest queue + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + priority := float64(now.Sub(packet.time)) / float64(buf.size) + if priority > bestPriority && t.portIsCloser(coords, port) { + best = streamID + bestPriority = priority } - if bestPriority != 0 { - buf := t.queues.bufs[best] - var packet switch_packetInfo - // TODO decide if this should be LIFO or FIFO - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - t.queues.size -= uint64(len(packet.bytes)) - if len(buf.packets) == 0 { - delete(t.queues.bufs, best) - } else { - // Need to update the map, since buf was retrieved by value - t.queues.bufs[best] = buf - } - packets = append(packets, packet.bytes) - psize += len(packet.bytes) + } + if bestPriority != 0 { + buf := t.queues.bufs[best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + t.queues.size -= uint64(len(packet.bytes)) + if len(buf.packets) == 0 { + delete(t.queues.bufs, best) } else { - // Finished finding packets - break + // Need to update the map, since buf was retrieved by value + t.queues.bufs[best] = buf } - } - if len(packets) > 0 { - to.sendPacketsFrom(t, packets) + to.sendPacketFrom(t, packet.bytes) return true + } else { + return false } - return false } func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 66f708c2..829c287c 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -60,6 +60,7 @@ func (t *tcp) setExtraOptions(c net.Conn) { switch sock := c.(type) { case *net.TCPConn: sock.SetNoDelay(true) + sock.SetWriteBuffer(streamMsgSize) // TODO something for socks5 default: }