From c38544014c70990855e8a0c7c67ecc073f7abc8a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 17 Jul 2024 23:30:38 +0100 Subject: [PATCH] Vectorise reads from IPv6 RWC --- src/ipv6rwc/ipv6rwc.go | 58 +++++++++++++++++++++++++++++++++++++++++- src/tun/iface.go | 16 +++++------- src/tun/tun.go | 1 + 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/src/ipv6rwc/ipv6rwc.go b/src/ipv6rwc/ipv6rwc.go index 59f4f022..ef2e49f2 100644 --- a/src/ipv6rwc/ipv6rwc.go +++ b/src/ipv6rwc/ipv6rwc.go @@ -4,6 +4,7 @@ import ( "crypto/ed25519" "errors" "fmt" + "io" "net" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/core" + "github.com/yggdrasil-network/yggdrasil-go/src/tun" ) const keyStoreTimeout = 2 * time.Minute @@ -337,11 +339,40 @@ func (k *keyStore) MTU() uint64 { type ReadWriteCloser struct { keyStore + ch chan []byte + errch chan error +} + +const bufPoolSize = tun.TUN_OFFSET_BYTES + 65535 + +var bufPool = sync.Pool{ + New: func() any { + b := [bufPoolSize]byte{} + return b[:] + }, } func NewReadWriteCloser(c *core.Core) *ReadWriteCloser { rwc := new(ReadWriteCloser) rwc.init(c) + rwc.ch = make(chan []byte, tun.TUN_MAX_VECTOR) + rwc.errch = make(chan error, 1) + go func() { + for { + p := bufPool.Get().([]byte)[:bufPoolSize] + n, err := rwc.readPC(p[:]) + if err != nil || n == 0 { + if err == nil { + err = io.EOF + } + rwc.errch <- err + close(rwc.errch) + close(rwc.ch) + return + } + rwc.ch <- p[:n] + } + }() return rwc } @@ -354,7 +385,32 @@ func (rwc *ReadWriteCloser) Subnet() address.Subnet { } func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) { - return rwc.readPC(p) + msg := <-rwc.ch + if msg == nil { + return 0, <-rwc.errch + } + return copy(p, msg), nil +} + +func (rwc *ReadWriteCloser) ReadMany(b [][]byte, sizes []int, offset int) (c int, err error) { + var lb int + if c, lb = len(rwc.ch), len(b); c > lb { + c = lb + } + if c == 0 { + // If nothing is waiting yet then we should block + // for the next packet only. + c = 1 + } + for i := 0; i < c; i++ { + msg := <-rwc.ch + if msg == nil { + return i, <-rwc.errch + } + sizes[i] = offset + copy(b[i][offset:], msg) + bufPool.Put(msg) // nolint:staticcheck + } + return } func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) { diff --git a/src/tun/iface.go b/src/tun/iface.go index f0fa3512..4dee461c 100644 --- a/src/tun/iface.go +++ b/src/tun/iface.go @@ -1,9 +1,5 @@ package tun -import ( - "net" -) - const TUN_OFFSET_BYTES = 80 // sizeof(virtio_net_hdr) const TUN_MAX_VECTOR = 16 @@ -16,7 +12,7 @@ func (tun *TunAdapter) idealBatchSize() int { func (tun *TunAdapter) read() { vs := tun.idealBatchSize() - bufs := make(net.Buffers, vs) + bufs := make([][]byte, vs) sizes := make([]int, vs) for i := range bufs { bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535) @@ -36,13 +32,14 @@ func (tun *TunAdapter) read() { } func (tun *TunAdapter) write() { - vs := 1 // One at a time for now... eventually use tun.idealBatchSize() - bufs := make(net.Buffers, vs) + vs := tun.idealBatchSize() + bufs := make([][]byte, vs) + sizes := make([]int, vs) for i := range bufs { bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535) } for { - n, err := tun.rwc.Read(bufs[0][TUN_OFFSET_BYTES : TUN_OFFSET_BYTES+65535]) + n, err := tun.rwc.ReadMany(bufs, sizes, TUN_OFFSET_BYTES) if err != nil { tun.log.Errorln("Exiting TUN writer due to core read error:", err) return @@ -50,8 +47,7 @@ func (tun *TunAdapter) write() { if !tun.isEnabled { continue // Nothing to do, the tun isn't enabled } - bufs[0] = bufs[0][:TUN_OFFSET_BYTES+n] - if _, err = tun.iface.Write(bufs, TUN_OFFSET_BYTES); err != nil { + if _, err = tun.iface.Write(bufs[:n], TUN_OFFSET_BYTES); err != nil { tun.Act(nil, func() { if !tun.isOpen { tun.log.Errorln("TUN iface write error:", err) diff --git a/src/tun/tun.go b/src/tun/tun.go index be5528b3..d218698c 100644 --- a/src/tun/tun.go +++ b/src/tun/tun.go @@ -24,6 +24,7 @@ type MTU uint16 type ReadWriteCloser interface { io.ReadWriteCloser + ReadMany([][]byte, []int, int) (int, error) // Vectorised reads Address() address.Address Subnet() address.Subnet MaxMTU() uint64